Hadoop生態圈(二十六)- MapReduce工作流
目錄
1. MapReduce工作流
使用 Hadoop 裡面的 MapReduce 來處理海量資料是非常簡單方便的,但有時候我們的應用程式,往往需要多個 MR 作業,來計算結果,比如說一個最簡單的使用 MR 提取海量搜尋日誌的 TopN 的問題,注意,這裡面,其實涉及了兩個 MR 作業,第一個是詞頻統計,第兩個是排序求 TopN,這顯然是需要兩個 MapReduce 作業來完成的。其他的還有,比如一些資料探勘類的作業,常常需要迭代組合好幾個作業才能完成,這類作業類似於 DAG 類的任務,各個作業之間是具有先後,或相互依賴的關係,比如說,這一個作業的輸入,依賴上一個作業的輸出等等。
在 Hadoop 裡實際上提供了, JobControl類,來組合一個具有依賴關係的作業,在新版的API裡,又新增了ControlledJob類,細化了任務的分配,通過這兩個類,我們就可以輕鬆的完成類似DAG作業的模式
,這樣我們就可以通過一個提交來完成原來需要提交 2 次的任務,大大簡化了任務的繁瑣度。具有依賴式的作業提交後,hadoop 會根據依賴的關係,先後執行的 job 任務,每個任務的執行都是獨立的。
1.1 需求
針對 MapReduce reduce join 方式處理訂單和商品資料之間的關聯,需要進行兩步程式處理,首先把兩個資料集進行join操作,然後針對join的結果進行排序,保證同一筆訂單的商品資料聚集在一起。(具體可見上一篇 《Hadoop生態圈(二十五)- MapReduce Join操作》 )
兩個程式帶有依賴關係,可以使用工作流進行任務的設定,依賴的繫結,一起提交執行。
1.2 程式碼實現
1.2.1 reduce join、result sort程式
詳細可見上一篇 《Hadoop生態圈(二十五)- MapReduce Join操作》 的 Join 案例,這裡不再重複說。
1.2.2 作業流程控制類
該驅動類主要負責建立 reduce join 與 result sort 兩個 ControlledJob,最終通過 JobControl 實現。
public class MrJobFlow { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //第一個作業的配置 Job job1 = Job.getInstance(conf, ReduceJoinDriver.class.getSimpleName()); job1.setJarByClass(ReduceJoinDriver.class); job1.setMapperClass(ReduceJoinMapper.class); job1.setReducerClass(ReduceJoinReducer.class); job1.setMapOutputKeyClass(Text.class); job1.setMapOutputValueClass(Text.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job1, new Path("D:\\datasets\\mr_join\\input")); FileOutputFormat.setOutputPath(job1, new Path("D:\\datasets\\mr_join\\rjout")); //將普通作業包裝成受控作業 ControlledJob ctrljob1 = new ControlledJob(conf); ctrljob1.setJob(job1); //第二個作業的配置 Job job2 = Job.getInstance(conf, ReduceJoinSortApp.class.getSimpleName()); job2.setJarByClass(ReduceJoinSortApp.class); job2.setMapperClass(ReduceJoinSortApp.ReduceJoinMapper.class); job2.setReducerClass(ReduceJoinSortApp.ReduceJoinReducer.class); job2.setMapOutputKeyClass(Text.class); job2.setMapOutputValueClass(Text.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job2, new Path("D:\\datasets\\mr_join\\rjout")); FileOutputFormat.setOutputPath(job2, new Path("D:\\datasets\\mr_join\\rjresult")); //將普通作業包裝成受控作業 ControlledJob ctrljob2 = new ControlledJob(conf); ctrljob2.setJob(job2); //設定依賴job的依賴關係 ctrljob2.addDependingJob(ctrljob1); // 主控制容器,控制上面的總的兩個子作業 JobControl jobCtrl = new JobControl("myctrl"); // 新增到總的JobControl裡,進行控制 jobCtrl.addJob(ctrljob1); jobCtrl.addJob(ctrljob2); // 線上程啟動,記住一定要有這個 Thread t = new Thread(jobCtrl); t.start(); while(true) { if (jobCtrl.allFinished()) { // 如果作業成功完成,就列印成功作業的資訊 System.out.println(jobCtrl.getSuccessfulJobList()); jobCtrl.stop(); break; } } } }
1.3 執行結果
- RabbitMQ 客戶端原始碼系列 - Connection
- 2022年最新Python學習路線圖(內附影片資料)【六張圖帶你掌握Python技巧】
- 像寫SQL一樣去處理記憶體中的資料,SparkSQL入門教程
- 今年跳槽可以再等等
- ElasticSearch7.x基於Java API 快照備份和恢復
- 【如何成為SQL高手】第六關:聚合函式查詢
- Zookeeper實現分散式鎖
- Elasticsearch RestHighLevelClient --(八)
- cadence17.4 - Checking Hostname and HostID in license file Match Failed
- 【個人筆記】訊息佇列 - RabbitMQ
- HBase效能調優
- RocketMQ Producer
- ElasticSearch系列:elasticsearch kibana
- 狂神說筆記——SpringBoot操作資料庫22-5
- Hadoop生態圈(二十六)- MapReduce工作流
- 聊聊Dubbo的註冊中心、配置中心以及元資料中心
- Flink利用KafkaSource讀取Kafka資料做為資料來源
- 介面指南,快遞鳥物流開放平臺API對接,極兔速遞
- 訊息佇列內容解析
- KafkaApis處理FetchRequest請求原始碼解析