開源交流丨批流一體資料整合框架ChunJun資料傳輸模組詳解分享

語言: CN / TW / HK

課件獲取:關注公眾號“ChunJun”,後臺私信 “課件” 獲得直播課件

影片回放:點選這裡

ChunJun開源專案地址:githubgitee 喜歡我們的專案給我們點個__ STAR!STAR!!STAR!!!(重要的事情說三遍)__

技術交流釘釘 qun:30537511

本期我們帶大家回顧一下六六同學的直播分享《ChunJun資料傳輸模組介紹》。

一、ChunJun資料型別轉換

1、型別轉換解決的問題

大家一聽到「ChunJun資料型別轉換」這個概念,可能會聯想到上下游之間進行資料互動時會涉及到的隱式轉換。如果上游和下游資料型別一致,則不需要對資料進行任何干預,直接進行下發即可。

但是大多數情況下會涉及到兩個問題,一是上游的資料來源型別和下游的資料來源型別不一致。比如MySql的varchar型別要寫到HdfsOrc檔案裡的string型別的話,在上游的表示是varchar,在下游的表示是string,但實際上中間段java的型別都是string。

另外一種情況則是,上下游之間不止資料來源型別不一樣,資料型別也不一樣,除了要做型別的對映之外,還需要對資料本身進行改動。比如,MySql的date型別要寫到下游timestamp型別,我們需要進行的操作是把date中的毫秒級的時間戳拿出來,轉換成timestamp的型別,再往下游去寫。

這樣就引出了一個問題,如何建立所有資料來源型別之間的對映/轉換關係?下面將為大家解答這個問題。

file

2、型別對映概覽

• client端:在Factory類中通過RawConverter類建立對映關係

• source端:將資料封裝成AbstractBaseColumn

• sink端:通過AbstractBaseColumn中的轉換方法將資料轉換成對應型別

file

ChunJun目前支援的資料型別對映關係圖如下:

file

3、型別對映詳解

以Timestamp為例,如果要寫入到Long型別的話,根據上文展示的ChunJun資料型別對映關係圖,最終對映到TimestampColumn中,具體流程如下圖:

file

上面這個例子描述的是一個單獨的欄位,正常情況下,會處理多個欄位,這時的型別對映詳解情況如下圖:

file as方法就是資料型別轉換的方法。使用這個機制之後,在下游可以只關心需要的資料型別,增加開發效率。

二、ChunJun資料傳輸過程

瞭解完ChunJun資料型別轉換後,我們來為大家分享ChunJun的資料傳輸過程。

1、上下游資料傳輸方式

在ChunJun中進行同步作業,有兩種情況,一是運算元鏈開啟的情況,上游的Source和下游的Sink會被合併成一個task,有同一個執行緒去做排程;二是把運算元鏈進行關閉,Source和Sink各自形成一個task,也有各自的執行緒去進行排程。

在運算元鏈開啟的情況下,上下游資料傳輸方式可分為兩種,物件重用和拷貝。

● 物件重用

· 上下游資料傳輸使用方法呼叫的形式,將上游產生的資料的物件引用直接交給下游

· 上下游運算元需要形成運算元鏈,作業開啟物件重用

· env.getConfig().enableObjectReuse();

● 拷貝

· 上游傳輸給下游的資料,需要經過一次深拷貝

· 上下游運算元需要形成運算元鏈

運算元鏈的好處是可以減少序列化的操作,那麼為什麼我們還要引入序列化呢?因為ChunJun的特殊性。ChunJun同步作業的話,只有上下游兩個運算元,且都對接了正式的資料來源,讀寫的時候會導致執行緒堵塞。因此上限由網路io決定,如果斷開運算元鏈,cpu會在一端執行緒阻塞的時候切換到另外一端。在序列化的效能較高時,執行緒上下文切換帶來的效能下降完全可以被彌補。

經過測試,序列化的效能比物件重用和拷貝高30%左右。

● 序列化

· 上下游資料傳輸依賴於網路傳輸。上游資料進行序列化成byte陣列後進行網路傳輸,下游收到資料後需要進行反序列化

· 上下游之間不形成運算元鏈

file

知道要做序列化後,會產生一些思考,帶著這些疑問,接著往下看。

• 序列化和反序列化在什麼時候發生?

• Flink支援哪些序列化?

• 序列化是怎麼做的?

• 怎麼找到適合的序列化方式?

• 如何實現自定義的序列化?

2、序列化傳輸過程

下圖是ChunJun在進行序列化操作時的資料傳輸鏈路圖:

file

3、DataOutView

file

4、TypeInformation介紹

file

5、kryo序列化&BaseSerializer

同樣是序列化一個int物件,對kryo來說,首先需要知道它的型別,然後從高位到低位依次去寫入。

DataOutputView則是直接呼叫一個writeInt的方法,寫一句關鍵程式碼即可:

UNSAFE.putInt(

this.buffer,

BASE_OFFSET + this.position, v);

file

三、ChunJun序列化實現

1、ColumnRowData序列化過程

ColumnRowData序列化過程採取標誌位+實際資料的方式,具體流程如下圖:

file

相對於kryo的序列化來說:

· 實現了更密集的儲存

· 相容null值

· 減少了不必要的資料傳輸

2、BinaryRowData結構

file 因為資料區一格只佔8個位元組,且每個index只能佔到一位,所以肯定存在一些沒法儲存在8位元組範圍之內的資料,可變長度部分就是用來存放資料區無法存放的資料。

3、BinaryRowData-setNull操作

看到上文的null值判斷區,有些同學可能會好奇這是什麼,又是怎麼進行操作的。下圖將對一個下標為11的資料去做setnull操作,進行簡單介紹:

file

4、BinaryRowData資料儲存方式

file

袋鼠雲開源框架釘釘技術交流群(30537511),歡迎對大資料開源專案有興趣的同學加入交流最新技術資訊,開源專案庫地址:https://github.com/DTStack

「其他文章」