MapReduce入門實戰
MapReduce 思想
MapReduce 是 Google 提出的一個軟體架構,用於大規模資料集的並行運算。概率“Map(對映)”和“Reduce(歸約)”以及它們的思想都是從函數語言程式設計語言借鑑的,還有從向量程式語言借來的特性。
當前的軟體實現是指定一個“Map”函式,用來把一組鍵值對對映成一組新的鍵值對,指定併發的“Reduce”函式,用來保證所有對映的鍵值對中的每一個都共享相同的鍵組。
Hadoop MapReduce 的任務過程分為兩個階段:
- Map 階段:把大任務分解為若干個小任務來並行處理。這些任務可以平行計算,彼此之間沒有依賴關係。
- Reduce 階段:對 map 階段的結果進行全域性彙總。
Hadoop 序列化
為什麼要序列化?
序列化是我們通過網路通訊傳輸資料時或者把物件持久化到檔案,需要把物件序列化成二進位制的結構。
觀察原始碼時發現自定義 Mapper 類與自定義 Reducer 類都有泛型類約束,比如自定義 Mapper 有四個泛型引數,但是都不是 Java 基本型別。
為什麼 Hadoop 要選擇建立自己的序列化格式而不使用 java 自帶 serializable?
- 序列化在分散式程式中非常重要,在 Hadoop 中,叢集中多個節點的程序間的通訊是通過 RPC(遠端過程呼叫:RemoteProcedureCall)實現;RPC 將訊息序列化成二進位制流傳送到遠端節點,遠端節點再將接收到的二進位制資料反序列化為原始的訊息,因此 RPC 往往追求如下特點:
- 資料更緊湊,能充分利用網路頻寬資源
- 快速:序列化和反序列化的效能開銷更低
- Hadoop 使用的是自己的序列化格式 Writable,它比 java 的序列化 serialization 更緊湊速度更快。一個物件使用 Serializable 序列化後,會攜帶很多額外資訊比如校驗資訊,Header,繼承體系等
Java 基本型別與 Hadoop 常用序列化型別
Java 基本型別 | Hadoop Writable 型別 |
---|---|
boolean | BooleanWritable |
byte | ByteWritable |
int | IntWritable |
float | FloatWritable |
long | LongWritable |
double | DoubleWritable |
String | Text |
map | MapWritable |
array | ArrayWritable |
基本的序列化型別往往不能滿足需求,比如我們常常需要傳遞一些自定義的 bean 物件。在 Hadoop 中為了實現自定義物件序列化需要實現 Writable 介面。
- 實現 Writable 介面
- 有無參建構函式
- 重寫序列化 write 方法和反序列化 readFields 方法。(注意序列化和反序列化的欄位順序必須完全一致)
- 如果自定義 Bean 物件需要放在 Mapper 輸出 KV 中的 K 裡面,那麼該物件還需要實現 Comparable 介面,因為 MapReduce 框架中的 Shuffle 過程要求 key 必須能排序
案例實戰
需求:下面有一個水果攤老闆的一個售賣記錄,這三列分別是:水果名稱、水果重量、還有總價。我們需要統計每個水果的總重量和重價。
蘋果 3 12 李子 4 8 蘋果 2 8 桃子 4 20 香蕉 2 4 火龍果 1 4
- 配置 Hadoop 環境變數
- 匯入 maven 依賴
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop-version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop-version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop-version}</version> </dependency>
- 編寫儲存售賣記錄的實體類
@Setter @Getter public class FruitsRecord implements Writable { private int weight; private double totalPrice; @Override public void write(DataOutput out) throws IOException { out.writeInt(weight); out.writeDouble(totalPrice); } @Override public void readFields(DataInput in) throws IOException { this.weight = in.readInt(); this.totalPrice = in.readDouble(); } @Override public String toString() { return "FruitsRecord{" + "weight=" + weight + ", totalPrice=" + totalPrice + '}'; } }
- 編寫 Mapper 類
import com.mmc.hadoop.bean.FruitsRecord; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FruitsMapper extends Mapper<LongWritable,Text,Text, FruitsRecord> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FruitsRecord>.Context context) throws IOException, InterruptedException { //獲取一行的資料 String line = value.toString(); String[] fields = line.split(" "); Text outKey= new Text(fields[0]); FruitsRecord fruitsRecord=new FruitsRecord(); fruitsRecord.setWeight(Integer.parseInt(fields[1])); fruitsRecord.setTotalPrice(Double.parseDouble(fields[2])); context.write(outKey,fruitsRecord); } }
- 編寫 Reduce 類
import com.mmc.hadoop.bean.FruitsRecord; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FruitsReducer extends Reducer<Text, FruitsRecord,Text,FruitsRecord> { @Override protected void reduce(Text key, Iterable<FruitsRecord> values, Reducer<Text, FruitsRecord, Text, FruitsRecord>.Context context) throws IOException, InterruptedException { int totalWeight = 0; double totalPrice =0; for (FruitsRecord fruitsRecord : values){ totalWeight += fruitsRecord.getWeight(); totalPrice+= fruitsRecord.getTotalPrice(); } FruitsRecord fruitsRecord = new FruitsRecord(); fruitsRecord.setWeight(totalWeight); fruitsRecord.setTotalPrice(totalPrice); context.write(key, fruitsRecord); } }
- 編寫 Driver 類
import com.mmc.hadoop.bean.FruitsRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class FruitsDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // System.setProperty("java.library.path","d://"); Configuration conf = new Configuration(); Job job=Job.getInstance(conf,"FruitsDriver"); //指定本程式的jar包所在的路徑 job.setJarByClass(FruitsDriver.class); //指定本業務job要使用的mapper/Reducer業務類 job.setMapperClass(FruitsMapper.class); job.setReducerClass(FruitsReducer.class); //指定mapper輸出資料的kv型別 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FruitsRecord.class); //指定reduce輸出資料的kv型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FruitsRecord.class); //指定job的輸入檔案目錄和輸出目錄 FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); boolean result = job.waitForCompletion(true); System.exit( result ? 0: 1); } }
總結:
Mapper 裡面,Mapper 類的四個泛型分別為入參的 KV 和出參的 KV。Reduce 裡面的也有4個泛型,分別為入參的KV和出參的KV。Reduce入參的 KV 與 Mapper 裡面出參的 KV 型別是對應的。只不過 Reduce 的入參的 Value 型別是集合型別的。
時序圖如下:

執行任務
本地模式
直接在 IDEA 中執行驅動類即可。因為程式裡輸入檔案路徑和輸出檔案路徑是取的 main 函式裡的 args。所以執行的時候需要指定引數。
遇到的問題
問題 1:
問題 2:建立目錄錯誤
org.apache.hadoop.io.nativeio.NativeIO$Windows.createDirectoryWithMode0(Ljava/lang/String
解決方案:
兩個問題都是 windows 的 hadoop/bin 目錄下缺少檔案導致的。檔案下載路徑: http://github.com/cdarlint/winutils
- 找到對應版本的 hadoop.dll 和 winutils.exe 下載下來放到 hadoop/bin 目錄下。
- C: windows\System32 放入 hadoop.dll 檔案
- 重啟電腦
輸出目錄:

開啟結果檔案 part-r-00000:
李子 FruitsRecord{weight=4, totalPrice=8.0} 桃子 FruitsRecord{weight=4, totalPrice=20.0} 火龍果 FruitsRecord{weight=1, totalPrice=4.0} 蘋果 FruitsRecord{weight=5, totalPrice=20.0} 香蕉 FruitsRecord{weight=2, totalPrice=4.0}
Yarn 叢集模式
- 把程式打包成 jar 包,上傳到 linux
- 將測試的 txt 上傳到 HDFS 上面
- 啟動 Hadoop 叢集
- 使用 Hadoop 命令提交任務執行
hadoop jar wc.jar com.mmc.hadoop.FruitsDriver /user/input /user/output
- 記一次批量更新整型型別的列 → 探究 UPDATE 的使用細節
- 編碼中的Adapter,不僅是一種設計模式,更是一種架構理念與解決方案
- 執行緒池底層原理詳解與原始碼分析
- 30分鐘掌握 Webpack
- 線性迴歸大結局(嶺(Ridge)、 Lasso迴歸原理、公式推導),你想要的這裡都有
- Django 之路由層
- 【前端必會】webpack loader 到底是什麼
- day42-反射01
- 中心化決議管理——雲端分析
- HashMap底層原理及jdk1.8原始碼解讀
- 詳解JS中 call 方法的實現
- 列印 Logger 日誌時,需不需要再封裝一下工具類?
- 初識設計模式 - 代理模式
- 設計模式---享元模式
- 密碼學奇妙之旅、01 CFB密文反饋模式、AES標準、Golang程式碼
- [ML從入門到入門] 支援向量機:從SVM的推導過程到SMO的收斂性討論
- 從應用訪問Pod元資料-DownwardApi的應用
- Springboot之 Mybatis 多資料來源實現
- Java 泛型程式設計
- CAS核心思想、底層實現