Mapreduce例項(三):資料去重

語言: CN / TW / HK

大家好,我是風雲,歡迎大家關注我的部落格 或者 微信公眾號【笑看風雲路】,在未來的日子裡我們一起來學習大資料相關的技術,一起努力奮鬥,遇見更好的自己!

實現思路

資料去重的最終目標是讓原始資料中出現次數超過一次的資料在輸出檔案中只出現一次。在MapReduce流程中,map的輸出經過shuffle過程聚整合後交給reduce。我們自然而然會想到將同一個資料的所有記錄都交給一臺reduce機器,無論這個資料出現多少次,只要在最終結果中輸出一次就可以了。具體就是reduce的輸入應該以資料作為key,而對value-list則沒有要求(可以設定為空)。當reduce接收到一個時就直接將輸入的key複製到輸出的key中,並將value設定成空值,然後輸出

MaprReduce去重流程如下圖所示:

img

編寫程式碼

Mapper程式碼

java public static class Map extends Mapper<Object , Text , Text , NullWritable> //map將輸入中的value複製到輸出資料的key上,並直接輸出 { private static Text newKey=new Text(); //從輸入中得到的每行的資料的型別 public void map(Object key,Text value,Context context) throws IOException, InterruptedException //實現map函式 { //獲取並輸出每一次的處理過程 String line=value.toString(); System.out.println(line); String arr[]=line.split("\t"); newKey.set(arr[1]); context.write(newKey, NullWritable.get()); System.out.println(newKey); } }

Mapper階段採用Hadoop的預設的作業輸入方式,把輸入的value用split()方法擷取,截取出的商品id欄位設定為key,設定value為空,然後直接輸出

Reducer程式碼

java public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable>{ public void reduce(Text key,Iterable<NullWritable> values,Context context) throws IOException, InterruptedException //實現reduce函式 { context.write(key,NullWritable.get()); //獲取並輸出每一次的處理過程 } } map輸出的鍵值對經過shuffle過程,聚成後,會交給reduce函式。reduce函式,不管每個key 有多少個value,它直接將輸入的值賦值給輸出的key,將輸出的value設定為空,然後輸出就可以了。

完整程式碼

java package mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class Filter{ public static class Map extends Mapper<Object , Text , Text , NullWritable>{ private static Text newKey=new Text(); public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ String line=value.toString(); System.out.println(line); String arr[]=line.split("\t"); newKey.set(arr[1]); context.write(newKey, NullWritable.get()); System.out.println(newKey); } } public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable>{ public void reduce(Text key,Iterable<NullWritable> values,Context context) throws IOException, InterruptedException{ context.write(key,NullWritable.get()); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ Configuration conf=new Configuration(); System.out.println("start"); Job job =new Job(conf,"filter"); job.setJarByClass(Filter.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path in=new Path("hdfs://localhost:9000/mymapreduce2/in/buyer_favorite1"); Path out=new Path("hdfs://localhost:9000/mymapreduce2/out"); FileInputFormat.addInputPath(job,in); FileOutputFormat.setOutputPath(job,out); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

-------------- end ----------------

微信公眾號:掃描下方二維碼或 搜尋 笑看風雲路 關注

笑看風雲路