Mapreduce例項(二):求平均值

語言: CN / TW / HK

大家好,我是風雲,歡迎大家關注我的部落格 或者 微信公眾號【笑看風雲路】,在未來的日子裡我們一起來學習大資料相關的技術,一起努力奮鬥,遇見更好的自己!

實現思路

求平均數是MapReduce比較常見的演算法,求平均數的演算法也比較簡單,一種思路是Map端讀取資料,在資料輸入到Reduce之前先經過shuffle,將map函式輸出的key值相同的所有的value值形成一個集合value-list,然後將輸入到Reduce端,Reduce端彙總並且統計記錄數,然後作商即可。具體原理如下圖所示:

img

編寫程式碼

Mapper程式碼

java public static class Map extends Mapper<Object , Text , Text , IntWritable>{ private static Text newKey=new Text(); //實現map函式 public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ // 將輸入的純文字檔案的資料轉化成String String line=value.toString(); System.out.println(line); String arr[]=line.split("\t"); newKey.set(arr[0]); int click=Integer.parseInt(arr[1]); context.write(newKey, new IntWritable(click)); } }

map端在採用Hadoop的預設輸入方式之後,將輸入的value值通過split()方法截取出來,我們把擷取的商品點選次數字段轉化為IntWritable型別並將其設定為value,把商品分類欄位設定為key,然後直接輸出key/value的值。

Reducer程式碼

```java public static class Reduce extends Reducer{
//實現reduce函式
public void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException{
int num=0;
int count=0;
for(IntWritable val:values){
num+=val.get(); //每個元素求和num
count++; //統計元素的次數count
}
int avg=num/count; //計算平均數

  context.write(key,new IntWritable(avg));  
}

}
```

map的輸出經過shuffle過程整合鍵值對,然後將鍵值對交給reduce。reduce端接收到values之後,將輸入的key直接複製給輸出的key,將values通過for迴圈把裡面的每個元素求和num並統計元素的次數count,然後用num除以count 得到平均值avg,將avg設定為value,最後直接輸出就可以了。

完整程式碼

```java package mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class MyAverage{
public static class Map extends Mapper{
private static Text newKey=new Text();
public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
String line=value.toString();
System.out.println(line);
String arr[]=line.split("\t");
newKey.set(arr[0]);
int click=Integer.parseInt(arr[1]);
context.write(newKey, new IntWritable(click));
}
}
public static class Reduce extends Reducer{
public void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException{
int num=0;
int count=0;
for(IntWritable val:values){
num+=val.get();
count++;
}
int avg=num/count;
context.write(key,new IntWritable(avg));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
Configuration conf=new Configuration();
System.out.println("start");
Job job =new Job(conf,"MyAverage");
job.setJarByClass(MyAverage.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path in=new Path("hdfs://localhost:9000/mymapreduce4/in/goods_click");
Path out=new Path("hdfs://localhost:9000/mymapreduce4/out");
FileInputFormat.addInputPath(job,in);
FileOutputFormat.setOutputPath(job,out);
System.exit(job.waitForCompletion(true) ? 0 : 1);

}

} ```

-------------- end ----------------

微信公眾號:掃描下方二維碼或 搜尋 笑看風雲路 關注

笑看風雲路