Mapreduce實例(三):數據去重

語言: CN / TW / HK

大家好,我是風雲,歡迎大家關注我的博客 或者 微信公眾號【笑看風雲路】,在未來的日子裏我們一起來學習大數據相關的技術,一起努力奮鬥,遇見更好的自己!

實現思路

數據去重的最終目標是讓原始數據中出現次數超過一次的數據在輸出文件中只出現一次。在MapReduce流程中,map的輸出經過shuffle過程聚集成後交給reduce。我們自然而然會想到將同一個數據的所有記錄都交給一台reduce機器,無論這個數據出現多少次,只要在最終結果中輸出一次就可以了。具體就是reduce的輸入應該以數據作為key,而對value-list則沒有要求(可以設置為空)。當reduce接收到一個時就直接將輸入的key複製到輸出的key中,並將value設置成空值,然後輸出

MaprReduce去重流程如下圖所示:

img

編寫代碼

Mapper代碼

java public static class Map extends Mapper<Object , Text , Text , NullWritable> //map將輸入中的value複製到輸出數據的key上,並直接輸出 { private static Text newKey=new Text(); //從輸入中得到的每行的數據的類型 public void map(Object key,Text value,Context context) throws IOException, InterruptedException //實現map函數 { //獲取並輸出每一次的處理過程 String line=value.toString(); System.out.println(line); String arr[]=line.split("\t"); newKey.set(arr[1]); context.write(newKey, NullWritable.get()); System.out.println(newKey); } }

Mapper階段採用Hadoop的默認的作業輸入方式,把輸入的value用split()方法截取,截取出的商品id字段設置為key,設置value為空,然後直接輸出

Reducer代碼

java public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable>{ public void reduce(Text key,Iterable<NullWritable> values,Context context) throws IOException, InterruptedException //實現reduce函數 { context.write(key,NullWritable.get()); //獲取並輸出每一次的處理過程 } } map輸出的鍵值對經過shuffle過程,聚成後,會交給reduce函數。reduce函數,不管每個key 有多少個value,它直接將輸入的值賦值給輸出的key,將輸出的value設置為空,然後輸出就可以了。

完整代碼

java package mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class Filter{ public static class Map extends Mapper<Object , Text , Text , NullWritable>{ private static Text newKey=new Text(); public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ String line=value.toString(); System.out.println(line); String arr[]=line.split("\t"); newKey.set(arr[1]); context.write(newKey, NullWritable.get()); System.out.println(newKey); } } public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable>{ public void reduce(Text key,Iterable<NullWritable> values,Context context) throws IOException, InterruptedException{ context.write(key,NullWritable.get()); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ Configuration conf=new Configuration(); System.out.println("start"); Job job =new Job(conf,"filter"); job.setJarByClass(Filter.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path in=new Path("hdfs://localhost:9000/mymapreduce2/in/buyer_favorite1"); Path out=new Path("hdfs://localhost:9000/mymapreduce2/out"); FileInputFormat.addInputPath(job,in); FileOutputFormat.setOutputPath(job,out); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

-------------- end ----------------

微信公眾號:掃描下方二維碼或 搜索 笑看風雲路 關注

笑看風雲路