大資料技術之Datax

語言: CN / TW / HK

大家好,我是腳丫先生 (o^^o)

最近比較忙。

1、實時流模組的開發。

2、給小夥伴們準備最全面試題,已有 6 萬字。

不過在怎麼忙,都要保證持續的輸出。

源源不斷的知識輸出,就會有源源不斷的快樂輸入。

今天想和小夥伴們分享,自己在專案裡用的比較頻繁的採集工具Datax。

在日常大資料生產環境中,經常會有關係型資料庫和關係型資料庫,以及關係型和非關係型資料庫資料之間的互相轉換的需求,在需求選擇的初期解決問題的方法----離線資料同步工具/平臺。

好了,我們開始今天的正文。

一、Datax 概述

1.1 Datax 介紹

DataX 是阿里巴巴開源的一個異構資料來源離線同步工具,致力於實現包括關係型資料庫(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各種異構資料來源之間穩定高效的資料同步功能。

1.2 DataX 的設計

為了解決異構資料來源同步問題,DataX 將複雜的網狀的同步鏈路變成了星型資料鏈路,DataX 作為中間傳輸載體負責連線各種資料來源。當需要接入一個新的資料來源的時候,只需要將此資料來源對接到 DataX,便能跟已有的資料來源做到無縫資料同步。

1.3 框架設計

DataX 本身作為離線資料同步框架,採用 Framework + plugin 架構構建。將資料來源讀取和寫入抽象成為 Reader/Writer 外掛,納入到整個同步框架中。 DataX 在設計之初就將同步理念抽象成框架+外掛的形式.框架負責內部的序列化傳輸,緩衝,併發,轉換等而核心技術問題,資料的採集(Reader)和落地(Writer)完全交給外掛執行。

  • Read 資料採集模組,負責採集資料來源的資料,將資料傳送至 FrameWork。
  • Writer 資料寫入模組,負責不斷的向 FrameWork 取資料,並將資料寫入目的端。
  • FrameWork 用於連線 reader 和 write,作為兩者的資料傳輸通道,處理緩衝,流控,併發,轉換等核心技術問題。

1.4 DataX 外掛體系

DataX 目前已經有了比較全面的外掛體系,主流的 RDBMS 資料庫、NOSQL、大資料計算系統都已經接入,目前支援資料如下圖,詳情請點選:DataX 資料來源參考指南

1.5 執行原理

  • Job 完成單個數據同步的作業稱之為 job。DataX 接受到一個 Job 之後,將啟動一個程序來完成整個作業同步過程。負責資料清理,子任務劃分,TaskGroup 監控管理。
  • Task 由 Job 切分而來,是 DataX 作業的最小單元,每個 Task 負責一部分資料的同步工作。
  • Schedule 將 Task 組成 TaskGroup,預設單個任務組的併發數量為 5。
  • TaskGroup 負責啟動 Task。

詳細解說:DataX 完成單個數據同步的作業,我們稱之為 Job,DataX 接受到一個 Job 之後,將啟動一個程序來完成整個作業同步過程。

DataX Job 模組是單個作業的中樞管理節點,承擔了資料清理、子任務切分(將單一作業計算轉化為多個子 Task)、TaskGroup 管理等功能。DataXJob 啟動後,會根據不同的源端切分策略,將 Job 切分成多個小的 Task(子任務),以便於併發執行。

Task 便是 DataX 作業的最小單元,每一個 Task 都會負責一部分資料的同步工作。切分多個 Task 之後,DataX Job 會呼叫 Scheduler 模組,根據配置的併發資料量,將拆分成的 Task 重新組合,組裝成 TaskGroup(任務組)。

每一個 TaskGroup 負責以一定的併發執行完畢分配好的所有 Task,預設單個任務組的併發數量為 5。每一個 Task 都由 TaskGroup 負責啟動,Task 啟動後,會固定啟動 Reader—>Channel—>Writer 的執行緒來完成任務.

DataX 排程流程 舉例來說,使用者提交了一個 DataX 作業,並且配置了 20 個併發,目的是將一個 100 張分表的 mysql 資料同步到 odps 裡面。 DataX 的排程決策思路是:

  • 1 DataXJob 根據分庫分表切分成了 100 個 Task。
  • 2 根據 20 個併發,預設單個任務組的併發數量為 5,DataX 計算共需要分配 4 個 TaskGroup。
  • 3 這裡 4 個 TaskGroup 平分切分好的 100 個 Task,每一個 TaskGroup 負責以 5 個併發共計執行 25 個 Task。

二、快速入門

2.1 官方地址

下載地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

原始碼地址:https://github.com/alibaba/DataX

2.2 前置要求

  • Linux
  • JDK(1.8 以上,推薦 1.8)
  • Python(推薦 Python2.6.X)

2.3 安裝

(1) 將下載好的 datax.tar.gz 上傳到伺服器的/home/soft 資料夾下

``` [[email protected] soft]$ ls datax.tar.gz

```

(2) 解壓 datax.tar.gz 到/opt/module

[[email protected] soft]$ tar -zxvf datax.tar.gz -C /opt/module/

(3) 執行指令碼檢測

[[email protected] bin]$ cd /opt/module/datax/bin/ [[email protected]]$ python datax.py /opt/module/datax/job/job.json

(4) json 配置檔案註釋

java /*一個json就是一個job,一個job主要包含: content,setting 兩個屬性*/ { "job": { /*content是job的核心,主要放reader和writer外掛*/ "content": { /*raader外掛*/ "reader": {}, /*writer外掛*/ "writer": {} }, /*setting主要用來設定job的基本設定*/ "setting": { /*speed流量控制*/ "speed": { "channel": 1, /*同步時候的併發數*/ "byte": 1024 /*同步時候的位元組大小,影響速度,可選*/ }, /*髒資料控制,配置的意思是當髒資料大於10條,或者髒資料比例達到0.05%,任務就會報錯*/ "errorLimit": { "record": 10,/*髒資料最大記錄數閾值*/ "percentage": 0.05 /*髒資料佔比閾值*/ } } } }

json 的 reader 和 writer 內容根據外掛不同而變化,具體查詢官網

三、 DataX 類圖

整個流程大致如下 在這裡插入圖片描述 啟動步驟解析:

1、解析配置,包括 job.json、core.json、plugin.json 三個配置

2、設定 jobId 到 configuration 當中

3、啟動 Engine,通過 Engine.start()進入啟動程式

4、設定 RUNTIME_MODEconfiguration 當中

5、通過 JobContainer 的 start()方法啟動 6、依次執行 job 的 preHandler()、init()、prepare()、split()、schedule()、- post()、postHandle()等方法。

7、init()方法涉及到根據 configuration 來初始化 reader 和 writer 外掛,這裡涉及到 jar 包熱載入以及呼叫外掛 init()操作方法,同時設定 reader 和 writer 的 configuration 資訊

8、prepare()方法涉及到初始化 reader 和 writer 外掛的初始化,通過呼叫外掛的 prepare()方法實現,每個外掛都有自己的 jarLoader,通過整合 URLClassloader 實現而來

9、split()方法通過 adjustChannelNumber()方法調整 channel 個數,同時執行 reader 和 writer 最細粒度的切分,需要注意的是,writer 的切分結果要參照 reader 的切分結果,達到切分後數目相等,才能滿足 1:1 的通道模型

10、channel 的計數主要是根據 byte 和 record 的限速來實現的,在 split()的函式中第一步就是計算 channel 的大小

11、split()方法 reader 外掛會根據 channel 的值進行拆分,但是有些 reader 外掛可能不會參考 channel 的值,writer 外掛會完全根據 reader 的外掛 1:1 進行返回

12、split()方法內部的 mergeReaderAndWriterTaskConfigs()負責合併 reader、writer、以及 transformer 三者關係,生成 task 的配置,並且重寫 job.content 的配置

13、schedule()方法根據 split()拆分生成的 task 配置分配生成 taskGroup 物件,根據 task 的數量和單個 taskGroup 支援的 task 數量進行配置,兩者相除就可以得出 taskGroup 的數量

14、schdule()內部通過 AbstractScheduler 的 schedule()執行,繼續執行 startAllTaskGroup()方法建立所有的 TaskGroupContainer 組織相關的 task,TaskGroupContainerRunner 負責執行 TaskGroupContainer 執行分配的 task。scheduler的具體實現類為ProcessInnerScheduler。

15、taskGroupContainerExecutorService 啟動固定的執行緒池用以執行TaskGroupContainerRunner 物件,TaskGroupContainerRunner 的 run()方法呼叫 taskGroupContainer.start()方法,針對每個 channel 建立一個 TaskExecutor,通過 taskExecutor.doStart()啟動任務。

四、 Datax-web

datax是基於指令碼化的採集工具,因此小夥伴會問:有視覺化介面進行採集任務執行嗎?

這必須有。

4.1 Datax-web的介紹

DataX Web是在DataX之上開發的分散式資料同步工具,提供簡單易用的 操作介面,降低使用者使用DataX的學習成本,縮短任務配置時間。

使用者可通過頁面選擇資料來源即可建立資料同步任務。

支援RDBMS、Hive、HBase、ClickHouse、MongoDB等資料來源,RDBMS資料來源可批量建立資料同步任務,支援實時檢視資料同步進度及日誌並提供終止同步功能,整合並二次開發xxl-job可根據時間、自增主鍵增量同步資料。

4.2 Datax-web的搭建

小夥伴們可以根據官網的教程進行搭建,如有疑問的可以直接聯絡我。

datax-web搭建

我們在做大資料採集模組過程,是基於datax-web的原始碼製作成一個docker通用微服務容器,其他模組可以以遠端呼叫的方式使用它,大大節省了開發成本。

好了,今天就聊到這裡,之後會繼續深入分享。

祝各位終有所成,收穫滿滿!

我是腳丫先生,我們下期見~

更多精彩內容請關注 微信公眾號 👇「腳丫先生」🔥:\ \ \ 一枚熱衷於分享大資料基礎原理,技術實戰,架構設計與原型實現之外,還喜歡輸出一些個人私活案例。\ \ \ 更多精彩福利乾貨,期待您的關注 ~