MapReduce核心原理

語言: CN / TW / HK

MapTask 執行機制詳解

MapTask 流程

詳細步驟:

  1. 讀取資料的元件 InputFormat 會通過 getSplits 方法對輸入目錄中檔案進行邏輯切片規劃得到 splits,有多少 split 就對應啟動多少個 MapTask。split 與 block 的對應關係預設是一對一。
  2. 將輸入檔案切分為 splits 之後,由 RecordReader(預設是 LineRecordReader)物件進行讀取,以\n 作為分隔符,讀取一行資料,返回<key,value>。key 表示每行首字元偏移值,value 表示這一行文字內容
  3. 讀取 split 返回<key,value>,進入使用者自己實現的 Mapper 類中,執行使用者重寫的 map 函式
  4. map 函式執行完後,將 map 的每條結果通過 context.write 進行 collect 資料收集。在 collect 中,會先對其進行分割槽處理,預設使用 HashPartitioner

MapReduce 提供了 Partitioner 介面,它的作用是可以根據 key 或者 value 及 reduce 的數量來決定這對輸出資料交由哪個 reduce task 處理。預設的演算法是對 key 進行 hash 然後再以 reduce task 數量取模。預設的取模方式只是為了平均 reduce 的處理能力,如果使用者自己對 Partitioner 有需求,可以訂製並設定到 job 上。

  1. 接下來,會將資料寫入記憶體,記憶體中這片區域叫做環形緩衝區,緩衝區的作用是批量收集 map 結果,減少磁碟 IO 的影響。我們的 key/value 對以及 Partition 的結果都會被寫入緩衝區。當然寫入之前,key 值和 value 值都會被序列化成位元組陣列。

環形緩衝區其實是一個數組,陣列中存放著 key、value 的元資料資訊,包括 partition、key 的起始位置、value 的起始位置、value 的長度。環形結構是一個抽象的概念。

緩衝區有大小限制,預設是 100MB。當 map task 的輸出結果超過閾值(總大小的 0.8,由 spill.percent 控制),就會往磁碟寫資料。這個從記憶體往磁碟寫資料的過程被稱為 Spill,中文可譯為溢寫。溢寫操作會單獨開一個執行緒,並鎖定這 80M 記憶體,執行寫入,而 map task 的輸出結果還能往剩下的 20MB 記憶體中寫。

  1. 當溢寫執行緒啟動後,需要對這 80M 空間內的 key 進行排序。

如果 job 設定了 Combiner,那麼 Combiner 將對相同 key 的鍵值對進行合併,減少溢寫到磁碟的數量。Combiner 會優化 MapReduce 的中間結果,所以在整個模型中會多次使用。

那哪些場景才能使用 Combiner 呢?Combiner 的輸出是 Reducer 的輸入,且 Combiner 絕不能改變最終的計算結果。Combiner 只能用於那種 Reduce 的輸入和輸入型別完全一致,且不影響最終結果的場景。如:累加、求最大值等等。

  1. 合併溢寫檔案:每次溢寫會在磁碟上生成一個臨時檔案,如果 map 的輸出結果真的很大,有多次溢寫發生,就有產生多個臨時檔案。當整個資料處理結束後,會對磁碟的臨時檔案進行合併,因為最終的檔案只有一個。而且會為這個合併後的一個檔案提供一個索引檔案,記錄每個 reduce 對應資料的偏移量。

MapTask 的並行度

MapTask 的併發度由切片決定。

舉例:如果現在有 2 個檔案,檔案 a 大小 300M,檔案 b 大小 100M。

一個 block 塊是 128M,那麼檔案 A 分為了 3 塊,檔案 B 分為了一塊。

注意:如果檔案 B 是 129M,它並不會分為 2 塊,它是允許超一點的。

Reduce Task 執行機制詳解

Reduce 大致分為 copy、sort、reduce 三個階段。

詳細步驟:

  1. Copy 階段,Reduce 程序啟動一些資料 copy 執行緒(Fetcher),通過 HTTP 方式請求 maptask 獲取屬於自己的檔案
  2. Merge 階段,這裡的 merge 如 map 端的 merge 動作,只是陣列中存放的是不同的 map 端 copy 來的數值。copy 過來的資料會先放入記憶體緩衝區中,這裡的緩衝區大小比 map 端的更靈活。

    merge 有三種形式:記憶體到記憶體、記憶體到磁碟、磁碟到磁碟。預設情況下第一種形式不啟用。當記憶體中的資料量到達一定閾值,就啟動記憶體到磁碟的 merge。與 map 端類似,這也是溢寫的過程,這個過程中如果你設定有 Combiner,也是會啟用的,然後再磁碟中生成了眾多的溢寫檔案。第二種 merge 方式一直在執行,直到沒有 map 端的資料時才結束,然後啟動第三種磁碟到磁碟的 merge 方式生成最終的檔案。

  3. 合併排序。把分散的資料合併成一個大的資料後,還會再對合並後的資料排序。
  4. 對排序後的鍵值對呼叫 reduce 方法,鍵相等的鍵值對呼叫一次 reduce 方法,每次呼叫產生零個或者多個鍵值對,最後把這些輸出的鍵值對寫入到 HDFS 檔案中。

ReduceTask 並行度

ReduceTask 的並行度同樣影響整個 job 的併發度和效率,ReduceTask 的數量是可以手動設定的:

job.setNumReduceTasks(4);

注意事項:

  1. ReduceTask 設定為 0,表示沒有 Reduce 階段,輸出檔案數和 MapTask 數量保持一致。
  2. ReduceTask 不設定就是預設為 1,輸出檔案數量為 1 個
  3. 如果資料分佈不均勻,可能再 Reduce 階段產生資料傾斜。

Shuffle 機制

map 階段處理的資料如何傳遞給 reduce 階段,是 MapReduce 框架中最關鍵的一個流程,這個流程叫做 shuffle。

簡單的說 shuffle 就是 MapTask 的 map 方法之後,ReduceTask 的 reduce 方法之前的資料處理過程。

核心流程就是:資料分割槽、排序、分組、combine、合併等。

1. MapReduce 分割槽和 ReduceTask 的數量

再 MapReduce 中,通過我們指定分割槽,將同一個分割槽的資料發給同一個 reduce 中處理(預設是相同 key 去往同一個分割槽)。

如何保證相同 key 的資料去往同一個 reduce 呢?只需要保證相同 key 的資料發到同一個分割槽即可。

自定義分割槽

  • 自定義類繼承 Partitioner
  • Driver 類設定自定義分割槽類
job.setPartitionerClass(CustomPartitioner.class);

實戰演練:將下面不同廠商的手機的售賣數量進行彙總分別輸出到不同的檔案區。

iphone_12 34
iphone_13 90
iphone_10 201
xiaomi_8 44
xiaomi_9 900

程式碼示例:

public class PhoneMapper extends Mapper<LongWritable, Text,Text, IntWritable>  {
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        String valueStr = value.toString();
        String[] split = valueStr.split(" ");
        Text outKey = new Text(split[0].split("_")[0]);
        IntWritable outValue = new IntWritable();
        outValue.set(Integer.valueOf(split[1]));
        context.write(outKey,outValue);
    }
}
public class PhonePartition extends Partitioner<Text, IntWritable> {

    @Override
    public int getPartition(Text text, IntWritable intWritable, int i) {
        String s = text.toString();
        String phoneName = s.split("_")[0];
        if(phoneName.equals("xiaomi")){
            return 1;
        }else if(phoneName.equals("iphone")){
            return 0;
        }
        return 0;
    }
}
public class PhoneReduce extends Reducer<Text, IntWritable, Text,IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        int sum=0;
        for (IntWritable value : values){
            sum+=value.get();
        }
        context.write(key,new IntWritable(sum));
    }
}
public class PhoneDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        Job job=Job.getInstance(conf,"PhoneDriver");

        //指定本程式的jar包所在的路徑
        job.setJarByClass(PhoneDriver.class);

        //指定本業務job要使用的mapper/Reducer業務類
        job.setMapperClass(PhoneMapper.class);
        job.setReducerClass(PhoneReduce.class);

        //指定mapper輸出資料的kv型別
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //指定reduce輸出資料的kv型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //指定job的輸入檔案目錄和輸出目錄
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        job.setNumReduceTasks(2);
        job.setPartitionerClass(PhonePartition.class);

        boolean result = job.waitForCompletion(true);
        System.exit( result ? 0: 1);
    }
}

執行結果:

需要注意的是:自定義的分割槽數量和 reduceTask 數量要保持一致

如果分割槽數量不止 1 個,但是 reduceTask 數量 1 個,此時只會輸出一個檔案

如果 reduceTask 數量大於分割槽數量,但是輸出多個空檔案

如果 reduceTask 數量小於分割槽數量,有可能會報錯

2. MapReduce 中的 Combiner

Combiner 元件的父類就是 Reducer,它和 Reducer 的區別在於執行的位置。

Combiner 在每一個 maptask 所在的節點執行,它的意義就是對每一個 maptask 的輸出進行區域性彙總,以減少網路傳輸量。

Combiner 能夠應用的前提是不影響最終的業務邏輯,此外 Combiner 的輸出 kv 應該和 reducer 輸入 kv 的型別對應。

自定義 Combiner 實現

  • 自定義類繼承 Reducer,重寫 reduce 方法
  • 在 Driver 類中設定使用 Combiner

實戰演練: 我們改造上面的統計手機售賣數量的程式,提前進行資料的統計,減少網路傳輸。

public class PhoneCombiner extends Reducer<Text, IntWritable,Text,IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {

        int sum =0;
        for (IntWritable intWritable:values){
            sum += intWritable.get();
        }
        context.write(key,new IntWritable(sum));
    }
}

在 Driver 裡進行設定 Combiner 類

job.setCombinerClass(PhoneCombiner.class);

debug 跟蹤發現原本 reduce 需要執行 6 次的,現在只執行了兩次。說明在前面的階段合併成功了。