MapReduce入門實戰
MapReduce 思想
MapReduce 是 Google 提出的一個軟件架構,用於大規模數據集的並行運算。概率“Map(映射)”和“Reduce(歸約)”以及它們的思想都是從函數式編程語言借鑑的,還有從矢量編程語言借來的特性。
當前的軟件實現是指定一個“Map”函數,用來把一組鍵值對映射成一組新的鍵值對,指定併發的“Reduce”函數,用來保證所有映射的鍵值對中的每一個都共享相同的鍵組。
Hadoop MapReduce 的任務過程分為兩個階段:
- Map 階段:把大任務分解為若干個小任務來並行處理。這些任務可以並行計算,彼此之間沒有依賴關係。
- Reduce 階段:對 map 階段的結果進行全局彙總。
Hadoop 序列化
為什麼要序列化?
序列化是我們通過網絡通信傳輸數據時或者把對象持久化到文件,需要把對象序列化成二進制的結構。
觀察源碼時發現自定義 Mapper 類與自定義 Reducer 類都有泛型類約束,比如自定義 Mapper 有四個泛型參數,但是都不是 Java 基本類型。
為什麼 Hadoop 要選擇建立自己的序列化格式而不使用 java 自帶 serializable?
- 序列化在分佈式程序中非常重要,在 Hadoop 中,集羣中多個節點的進程間的通信是通過 RPC(遠程過程調用:RemoteProcedureCall)實現;RPC 將消息序列化成二進制流發送到遠程節點,遠程節點再將接收到的二進制數據反序列化為原始的消息,因此 RPC 往往追求如下特點:
- 數據更緊湊,能充分利用網絡帶寬資源
- 快速:序列化和反序列化的性能開銷更低
- Hadoop 使用的是自己的序列化格式 Writable,它比 java 的序列化 serialization 更緊湊速度更快。一個對象使用 Serializable 序列化後,會攜帶很多額外信息比如校驗信息,Header,繼承體系等
Java 基本類型與 Hadoop 常用序列化類型
Java 基本類型 | Hadoop Writable 類型 |
---|---|
boolean | BooleanWritable |
byte | ByteWritable |
int | IntWritable |
float | FloatWritable |
long | LongWritable |
double | DoubleWritable |
String | Text |
map | MapWritable |
array | ArrayWritable |
基本的序列化類型往往不能滿足需求,比如我們常常需要傳遞一些自定義的 bean 對象。在 Hadoop 中為了實現自定義對象序列化需要實現 Writable 接口。
- 實現 Writable 接口
- 有無參構造函數
- 重寫序列化 write 方法和反序列化 readFields 方法。(注意序列化和反序列化的字段順序必須完全一致)
- 如果自定義 Bean 對象需要放在 Mapper 輸出 KV 中的 K 裏面,那麼該對象還需要實現 Comparable 接口,因為 MapReduce 框架中的 Shuffle 過程要求 key 必須能排序
案例實戰
需求:下面有一個水果攤老闆的一個售賣記錄,這三列分別是:水果名稱、水果重量、還有總價。我們需要統計每個水果的總重量和重價。
蘋果 3 12 李子 4 8 蘋果 2 8 桃子 4 20 香蕉 2 4 火龍果 1 4
- 配置 Hadoop 環境變量
- 導入 maven 依賴
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop-version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop-version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop-version}</version> </dependency>
- 編寫保存售賣記錄的實體類
@Setter @Getter public class FruitsRecord implements Writable { private int weight; private double totalPrice; @Override public void write(DataOutput out) throws IOException { out.writeInt(weight); out.writeDouble(totalPrice); } @Override public void readFields(DataInput in) throws IOException { this.weight = in.readInt(); this.totalPrice = in.readDouble(); } @Override public String toString() { return "FruitsRecord{" + "weight=" + weight + ", totalPrice=" + totalPrice + '}'; } }
- 編寫 Mapper 類
import com.mmc.hadoop.bean.FruitsRecord; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FruitsMapper extends Mapper<LongWritable,Text,Text, FruitsRecord> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FruitsRecord>.Context context) throws IOException, InterruptedException { //獲取一行的數據 String line = value.toString(); String[] fields = line.split(" "); Text outKey= new Text(fields[0]); FruitsRecord fruitsRecord=new FruitsRecord(); fruitsRecord.setWeight(Integer.parseInt(fields[1])); fruitsRecord.setTotalPrice(Double.parseDouble(fields[2])); context.write(outKey,fruitsRecord); } }
- 編寫 Reduce 類
import com.mmc.hadoop.bean.FruitsRecord; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FruitsReducer extends Reducer<Text, FruitsRecord,Text,FruitsRecord> { @Override protected void reduce(Text key, Iterable<FruitsRecord> values, Reducer<Text, FruitsRecord, Text, FruitsRecord>.Context context) throws IOException, InterruptedException { int totalWeight = 0; double totalPrice =0; for (FruitsRecord fruitsRecord : values){ totalWeight += fruitsRecord.getWeight(); totalPrice+= fruitsRecord.getTotalPrice(); } FruitsRecord fruitsRecord = new FruitsRecord(); fruitsRecord.setWeight(totalWeight); fruitsRecord.setTotalPrice(totalPrice); context.write(key, fruitsRecord); } }
- 編寫 Driver 類
import com.mmc.hadoop.bean.FruitsRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class FruitsDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // System.setProperty("java.library.path","d://"); Configuration conf = new Configuration(); Job job=Job.getInstance(conf,"FruitsDriver"); //指定本程序的jar包所在的路徑 job.setJarByClass(FruitsDriver.class); //指定本業務job要使用的mapper/Reducer業務類 job.setMapperClass(FruitsMapper.class); job.setReducerClass(FruitsReducer.class); //指定mapper輸出數據的kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FruitsRecord.class); //指定reduce輸出數據的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FruitsRecord.class); //指定job的輸入文件目錄和輸出目錄 FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); boolean result = job.waitForCompletion(true); System.exit( result ? 0: 1); } }
總結:
Mapper 裏面,Mapper 類的四個泛型分別為入參的 KV 和出參的 KV。Reduce 裏面的也有4個泛型,分別為入參的KV和出參的KV。Reduce入參的 KV 與 Mapper 裏面出參的 KV 類型是對應的。只不過 Reduce 的入參的 Value 類型是集合類型的。
時序圖如下:

運行任務
本地模式
直接在 IDEA 中運行驅動類即可。因為程序裏輸入文件路徑和輸出文件路徑是取的 main 函數裏的 args。所以運行的時候需要指定參數。
遇到的問題
問題 1:
問題 2:創建目錄錯誤
org.apache.hadoop.io.nativeio.NativeIO$Windows.createDirectoryWithMode0(Ljava/lang/String
解決方案:
兩個問題都是 windows 的 hadoop/bin 目錄下缺少文件導致的。文件下載路徑: http://github.com/cdarlint/winutils
- 找到對應版本的 hadoop.dll 和 winutils.exe 下載下來放到 hadoop/bin 目錄下。
- C: windows\System32 放入 hadoop.dll 文件
- 重啟電腦
輸出目錄:

打開結果文件 part-r-00000:
李子 FruitsRecord{weight=4, totalPrice=8.0} 桃子 FruitsRecord{weight=4, totalPrice=20.0} 火龍果 FruitsRecord{weight=1, totalPrice=4.0} 蘋果 FruitsRecord{weight=5, totalPrice=20.0} 香蕉 FruitsRecord{weight=2, totalPrice=4.0}
Yarn 集羣模式
- 把程序打包成 jar 包,上傳到 linux
- 將測試的 txt 上傳到 HDFS 上面
- 啟動 Hadoop 集羣
- 使用 Hadoop 命令提交任務運行
hadoop jar wc.jar com.mmc.hadoop.FruitsDriver /user/input /user/output
- 記一次批量更新整型類型的列 → 探究 UPDATE 的使用細節
- 編碼中的Adapter,不僅是一種設計模式,更是一種架構理念與解決方案
- 線程池底層原理詳解與源碼分析
- 30分鐘掌握 Webpack
- 線性迴歸大結局(嶺(Ridge)、 Lasso迴歸原理、公式推導),你想要的這裏都有
- Django 之路由層
- 【前端必會】webpack loader 到底是什麼
- day42-反射01
- 中心化決議管理——雲端分析
- HashMap底層原理及jdk1.8源碼解讀
- 詳解JS中 call 方法的實現
- 打印 Logger 日誌時,需不需要再封裝一下工具類?
- 初識設計模式 - 代理模式
- 設計模式---享元模式
- 密碼學奇妙之旅、01 CFB密文反饋模式、AES標準、Golang代碼
- [ML從入門到入門] 支持向量機:從SVM的推導過程到SMO的收斂性討論
- 從應用訪問Pod元數據-DownwardApi的應用
- Springboot之 Mybatis 多數據源實現
- Java 泛型程序設計
- CAS核心思想、底層實現