MapReduce入門實戰

語言: CN / TW / HK

MapReduce 思想

MapReduce 是 Google 提出的一個軟體架構,用於大規模資料集的並行運算。概率“Map(對映)”和“Reduce(歸約)”以及它們的思想都是從函數語言程式設計語言借鑑的,還有從向量程式語言借來的特性。

當前的軟體實現是指定一個“Map”函式,用來把一組鍵值對對映成一組新的鍵值對,指定併發的“Reduce”函式,用來保證所有對映的鍵值對中的每一個都共享相同的鍵組。

Hadoop MapReduce 的任務過程分為兩個階段:

  • Map 階段:把大任務分解為若干個小任務來並行處理。這些任務可以平行計算,彼此之間沒有依賴關係。
  • Reduce 階段:對 map 階段的結果進行全域性彙總。

Hadoop 序列化

為什麼要序列化?

序列化是我們通過網路通訊傳輸資料時或者把物件持久化到檔案,需要把物件序列化成二進位制的結構。

觀察原始碼時發現自定義 Mapper 類與自定義 Reducer 類都有泛型類約束,比如自定義 Mapper 有四個泛型引數,但是都不是 Java 基本型別。

為什麼 Hadoop 要選擇建立自己的序列化格式而不使用 java 自帶 serializable?

  • 序列化在分散式程式中非常重要,在 Hadoop 中,叢集中多個節點的程序間的通訊是通過 RPC(遠端過程呼叫:RemoteProcedureCall)實現;RPC 將訊息序列化成二進位制流傳送到遠端節點,遠端節點再將接收到的二進位制資料反序列化為原始的訊息,因此 RPC 往往追求如下特點:
    • 資料更緊湊,能充分利用網路頻寬資源
    • 快速:序列化和反序列化的效能開銷更低
  • Hadoop 使用的是自己的序列化格式 Writable,它比 java 的序列化 serialization 更緊湊速度更快。一個物件使用 Serializable 序列化後,會攜帶很多額外資訊比如校驗資訊,Header,繼承體系等

Java 基本型別與 Hadoop 常用序列化型別

Java 基本型別 Hadoop Writable 型別
boolean BooleanWritable
byte ByteWritable
int IntWritable
float FloatWritable
long LongWritable
double DoubleWritable
String Text
map MapWritable
array ArrayWritable

基本的序列化型別往往不能滿足需求,比如我們常常需要傳遞一些自定義的 bean 物件。在 Hadoop 中為了實現自定義物件序列化需要實現 Writable 介面。

  1. 實現 Writable 介面
  2. 有無參建構函式
  3. 重寫序列化 write 方法和反序列化 readFields 方法。(注意序列化和反序列化的欄位順序必須完全一致)
  4. 如果自定義 Bean 物件需要放在 Mapper 輸出 KV 中的 K 裡面,那麼該物件還需要實現 Comparable 介面,因為 MapReduce 框架中的 Shuffle 過程要求 key 必須能排序

案例實戰

需求:下面有一個水果攤老闆的一個售賣記錄,這三列分別是:水果名稱、水果重量、還有總價。我們需要統計每個水果的總重量和重價。

蘋果 3 12
李子 4 8
蘋果 2 8
桃子 4 20
香蕉 2 4
火龍果 1 4
  1. 配置 Hadoop 環境變數
  2. 匯入 maven 依賴
<dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>${hadoop-version}</version>
  </dependency>
  <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop-version}</version>
  </dependency>
  <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>${hadoop-version}</version>
  </dependency>
  1. 編寫儲存售賣記錄的實體類
@Setter
@Getter
public class FruitsRecord implements Writable {

    private int weight;

    private double totalPrice;

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(weight);
        out.writeDouble(totalPrice);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.weight = in.readInt();
        this.totalPrice = in.readDouble();
    }

    @Override
    public String toString() {
        return "FruitsRecord{" +
                "weight=" + weight +
                ", totalPrice=" + totalPrice +
                '}';
    }
}
  1. 編寫 Mapper 類
import com.mmc.hadoop.bean.FruitsRecord;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FruitsMapper extends Mapper<LongWritable,Text,Text, FruitsRecord> {


    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FruitsRecord>.Context context) throws IOException, InterruptedException {
        //獲取一行的資料
        String line = value.toString();

        String[] fields = line.split(" ");

        Text outKey= new Text(fields[0]);
        FruitsRecord fruitsRecord=new FruitsRecord();
        fruitsRecord.setWeight(Integer.parseInt(fields[1]));
        fruitsRecord.setTotalPrice(Double.parseDouble(fields[2]));

        context.write(outKey,fruitsRecord);
    }
}
  1. 編寫 Reduce 類
import com.mmc.hadoop.bean.FruitsRecord;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FruitsReducer extends Reducer<Text, FruitsRecord,Text,FruitsRecord>  {


    @Override
    protected void reduce(Text key, Iterable<FruitsRecord> values, Reducer<Text, FruitsRecord, Text, FruitsRecord>.Context context) throws IOException, InterruptedException {
        int totalWeight = 0;
        double totalPrice =0;

        for (FruitsRecord fruitsRecord : values){
            totalWeight += fruitsRecord.getWeight();
            totalPrice+= fruitsRecord.getTotalPrice();
        }

        FruitsRecord fruitsRecord = new FruitsRecord();
        fruitsRecord.setWeight(totalWeight);
        fruitsRecord.setTotalPrice(totalPrice);

        context.write(key, fruitsRecord);

    }
}
  1. 編寫 Driver 類
import com.mmc.hadoop.bean.FruitsRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FruitsDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//        System.setProperty("java.library.path","d://");
        Configuration conf = new Configuration();
        Job job=Job.getInstance(conf,"FruitsDriver");

        //指定本程式的jar包所在的路徑
        job.setJarByClass(FruitsDriver.class);

        //指定本業務job要使用的mapper/Reducer業務類
        job.setMapperClass(FruitsMapper.class);
        job.setReducerClass(FruitsReducer.class);

        //指定mapper輸出資料的kv型別
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FruitsRecord.class);

        //指定reduce輸出資料的kv型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FruitsRecord.class);

        //指定job的輸入檔案目錄和輸出目錄
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        boolean result = job.waitForCompletion(true);
        System.exit( result ? 0: 1);

    }
}

總結:

Mapper 裡面,Mapper 類的四個泛型分別為入參的 KV 和出參的 KV。Reduce 裡面的也有4個泛型,分別為入參的KV和出參的KV。Reduce入參的 KV 與 Mapper 裡面出參的 KV 型別是對應的。只不過 Reduce 的入參的 Value 型別是集合型別的。

時序圖如下:

執行任務

本地模式

直接在 IDEA 中執行驅動類即可。因為程式裡輸入檔案路徑和輸出檔案路徑是取的 main 函式裡的 args。所以執行的時候需要指定引數。

遇到的問題

問題 1:

問題 2:建立目錄錯誤

org.apache.hadoop.io.nativeio.NativeIO$Windows.createDirectoryWithMode0(Ljava/lang/String

解決方案:

兩個問題都是 windows 的 hadoop/bin 目錄下缺少檔案導致的。檔案下載路徑: http://github.com/cdarlint/winutils

  1. 找到對應版本的 hadoop.dll 和 winutils.exe 下載下來放到 hadoop/bin 目錄下。
  2. C: windows\System32 放入 hadoop.dll 檔案
  3. 重啟電腦

輸出目錄:

開啟結果檔案 part-r-00000:

李子	FruitsRecord{weight=4, totalPrice=8.0}
桃子	FruitsRecord{weight=4, totalPrice=20.0}
火龍果	FruitsRecord{weight=1, totalPrice=4.0}
蘋果	FruitsRecord{weight=5, totalPrice=20.0}
香蕉	FruitsRecord{weight=2, totalPrice=4.0}

Yarn 叢集模式

  1. 把程式打包成 jar 包,上傳到 linux
  2. 將測試的 txt 上傳到 HDFS 上面
  3. 啟動 Hadoop 叢集
  4. 使用 Hadoop 命令提交任務執行
hadoop jar wc.jar com.mmc.hadoop.FruitsDriver
/user/input /user/output