Hadoop生態圈(二十六)- MapReduce工作流

語言: CN / TW / HK

目錄

1. MapReduce工作流

  使用 Hadoop 裡面的 MapReduce 來處理海量資料是非常簡單方便的,但有時候我們的應用程式,往往需要多個 MR 作業,來計算結果,比如說一個最簡單的使用 MR 提取海量搜尋日誌的 TopN 的問題,注意,這裡面,其實涉及了兩個 MR 作業,第一個是詞頻統計,第兩個是排序求 TopN,這顯然是需要兩個 MapReduce 作業來完成的。其他的還有,比如一些資料探勘類的作業,常常需要迭代組合好幾個作業才能完成,這類作業類似於 DAG 類的任務,各個作業之間是具有先後,或相互依賴的關係,比如說,這一個作業的輸入,依賴上一個作業的輸出等等。

  在 Hadoop 裡實際上提供了, JobControl類,來組合一個具有依賴關係的作業,在新版的API裡,又新增了ControlledJob類,細化了任務的分配,通過這兩個類,我們就可以輕鬆的完成類似DAG作業的模式 ,這樣我們就可以通過一個提交來完成原來需要提交 2 次的任務,大大簡化了任務的繁瑣度。具有依賴式的作業提交後,hadoop 會根據依賴的關係,先後執行的 job 任務,每個任務的執行都是獨立的。

1.1 需求

  針對 MapReduce reduce join 方式處理訂單和商品資料之間的關聯,需要進行兩步程式處理,首先把兩個資料集進行join操作,然後針對join的結果進行排序,保證同一筆訂單的商品資料聚集在一起。(具體可見上一篇 《Hadoop生態圈(二十五)- MapReduce Join操作》

  兩個程式帶有依賴關係,可以使用工作流進行任務的設定,依賴的繫結,一起提交執行。

1.2 程式碼實現

1.2.1 reduce join、result sort程式

  詳細可見上一篇 《Hadoop生態圈(二十五)- MapReduce Join操作》 的 Join 案例,這裡不再重複說。

1.2.2 作業流程控制類

  該驅動類主要負責建立 reduce join 與 result sort 兩個 ControlledJob,最終通過 JobControl 實現。

public class MrJobFlow {
 
    public static void main(String[] args) throws Exception {
 
        Configuration conf = new Configuration();

        //第一個作業的配置
        Job job1 = Job.getInstance(conf, ReduceJoinDriver.class.getSimpleName());
        job1.setJarByClass(ReduceJoinDriver.class);
        job1.setMapperClass(ReduceJoinMapper.class);
        job1.setReducerClass(ReduceJoinReducer.class);
        job1.setMapOutputKeyClass(Text.class);
        job1.setMapOutputValueClass(Text.class);
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job1, new Path("D:\\datasets\\mr_join\\input"));
        FileOutputFormat.setOutputPath(job1, new Path("D:\\datasets\\mr_join\\rjout"));
        //將普通作業包裝成受控作業
        ControlledJob ctrljob1 = new ControlledJob(conf);
        ctrljob1.setJob(job1);

        //第二個作業的配置
        Job job2 = Job.getInstance(conf, ReduceJoinSortApp.class.getSimpleName());
        job2.setJarByClass(ReduceJoinSortApp.class);
        job2.setMapperClass(ReduceJoinSortApp.ReduceJoinMapper.class);
		job2.setReducerClass(ReduceJoinSortApp.ReduceJoinReducer.class);
        job2.setMapOutputKeyClass(Text.class);
        job2.setMapOutputValueClass(Text.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(NullWritable.class);
        FileInputFormat.addInputPath(job2, new Path("D:\\datasets\\mr_join\\rjout"));
        FileOutputFormat.setOutputPath(job2, new Path("D:\\datasets\\mr_join\\rjresult"));
        //將普通作業包裝成受控作業
        ControlledJob ctrljob2 = new ControlledJob(conf);
        ctrljob2.setJob(job2);

        //設定依賴job的依賴關係
        ctrljob2.addDependingJob(ctrljob1);

        // 主控制容器,控制上面的總的兩個子作業
        JobControl jobCtrl = new JobControl("myctrl");

        // 新增到總的JobControl裡,進行控制
        jobCtrl.addJob(ctrljob1);
        jobCtrl.addJob(ctrljob2);

        // 線上程啟動,記住一定要有這個
        Thread t = new Thread(jobCtrl);
        t.start();

        while(true) {
 
            if (jobCtrl.allFinished()) {
 // 如果作業成功完成,就列印成功作業的資訊
                System.out.println(jobCtrl.getSuccessfulJobList());
                jobCtrl.stop();
                break;
            }
        }
    }
}

1.3 執行結果