開源交流丨批流一體資料整合工具ChunJun同步Hive事務表原理詳解及實戰分享

語言: CN / TW / HK

原文連結:批流一體資料整合工具ChunJun同步Hive事務表原理詳解及實戰分享

課件獲取:關注公眾號__ “數棧研習社”,後臺私信 “ChengYing”__ 獲得直播課件

影片回放:點選這裡

ChengYing 開源專案地址:github gitee 喜歡我們的專案給我們點個__ STAR!STAR!!STAR!!!(重要的事情說三遍)__

技術交流釘釘 qun:30537511

本期我們帶大家回顧一下無倦同學的直播分享《Chunjun同步Hive事務表詳解》

一、Hive事務表的結構及原理

Hive是基於Hadoop的一個數據倉庫工具,用來進行資料提取、轉化、載入,這是一種可以儲存、查詢和分析儲存在Hadoop中的大規模資料的機制。Hive資料倉庫工具能將結構化的資料檔案對映為一張資料庫表,並提供SQL查詢功能,能將SQL語句轉變成MapReduce任務來執行。

在分享Hive事務表的具體內容前,我們先來了解下HIve 事務表在 HDFS 儲存上的一些限制。

Hive雖然支援了具有ACID語義的事務,但是沒有像在MySQL中使用那樣方便,有很多侷限性,具體限制如下:

  • 尚不支援BEGIN,COMMIT和ROLLBACK,所有語言操作都是自動提交的

  • 僅支援ORC檔案格式(STORED AS ORC)

  • 預設情況下事務配置為關閉,需要配置引數開啟使用

  • 表必須是分桶表(Bucketed)才可以使用事務功能

  • 表必須內部表,外部表無法建立事務表

  • 表引數transactional必須為true

  • 外部表不能成為ACID表,不允許從非ACID會話讀取/寫入ACID表

以下矩陣包括可以使用Hive建立的表的型別、是否支援ACID屬性、所需的儲存格式以及關鍵的SQL操作。

file

瞭解完Hive事務表的限制,現在我們具體瞭解下Hive事務表的內容。

1、事務表文件名字詳解

  • 基礎目錄:

$partition/base_$wid/$bucket

  • 增量目錄:

$partition/delta_$wid_$wid_$stid/$bucket

  • 引數目錄:

$partition/delete_delta_$wid_$wid_$stid/$bucket

file

2、事務表文件內容詳解

$ orc-tools data bucket_00000

{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":0,"currentTransaction":1,"row":{"id":1,"name":"Jerry","age":18}}

{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":1,"currentTransaction":1,"row":{"id":2,"name":"Tom","age":19}}

{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":2,"currentTransaction":1,"row":{"id":3,"name":"Kate","age":20}}

  • operation 0 表示插入、1 表示更新,2 表示刪除。由於使用了 split-update,UPDATE 是不會出現的。

  • originalTransaction是該條記錄的原始寫事務 ID:

a、對於 INSERT 操作,該值和 currentTransaction 是一致的;

b、對於 DELETE,則是該條記錄第一次插入時的寫事務 ID。

  • bucket 是一個 32 位整型,由 BucketCodec 編碼,各個二進位制位的含義為:

a、1-3 位:編碼版本,當前是 001;

b、4 位:保留;

c、5-16 位:分桶 ID,由 0 開始。分桶 ID 是由 CLUSTERED BY 子句所指定的欄位、以及分桶的數量決定的。該值和 bucket_N 中的 N 一致;

d、17-20 位:保留;

e、21-32 位:語句 ID;

舉例來說,整型 536936448 的二進位制格式為 00100000000000010000000000000000,即它是按版本 1 的格式編碼的,分桶 ID 為 1。

  • rowId 是一個自增的唯一 ID,在寫事務和分桶的組合中唯一;

  • currentTransaction 當前的寫事務 ID;

  • row 具體資料。對於 DELETE 語句,則為 null。

3、更新 Hive 事務表資料

UPDATE employee SET age = 21 WHERE id = 2;

這條語句會先查詢出所有符合條件的記錄,獲取它們的 row_id 資訊,然後分別建立 delete 和 delta 目錄:

/user/hive/warehouse/employee/delta_0000001_0000001_0000/bucket_00000

/user/hive/warehouse/employee/delete_delta_0000002_0000002_0000/bucket_00000 (update)

/user/hive/warehouse/employee/delta_0000002_0000002_0000/bucket_00000 (update)

delete_delta_0000002_0000002_0000/bucket_00000

包含了刪除的記錄:

{"operation":2,"originalTransaction":1,"bucket":536870912,"rowId":1,"currentTransaction":2,"row":null}

delta_0000002_0000002_0000/bucket_00000

包含更新後的資料:

{"operation":0,"originalTransaction":2,"bucket":536870912,"rowId":0,"currentTransaction":2,"row":{"id":2,"name":"Tom","salary":21}}

4、Row_ID 資訊怎麼查?

file

5、事務表壓縮(Compact)

隨著寫操作的積累,表中的 delta 和 delete 檔案會越來越多,事務表的讀取過程中需要合併所有檔案,數量一多勢必會影響效率,此外小檔案對 HDFS 這樣的檔案系統也不夠友好,因此Hive 引入了壓縮(Compaction)的概念,分為 Minor 和 Major 兩類。

● Minor

Minor Compaction 會將所有的 delta 檔案壓縮為一個檔案,delete 也壓縮為一個。壓縮後的結果檔名中會包含寫事務 ID 範圍,同時省略掉語句 ID。

壓縮過程是在 Hive Metastore 中執行的,會根據一定閾值自動觸發。我們也可以使用如下語句人工觸發:

ALTER TABLE dtstack COMPACT 'MINOR'。

● Major

Major Compaction 會將所有的 delta 檔案,delete 檔案壓縮到一個 base 檔案。壓縮後的結果檔名中會包含所有寫事務 ID 的最大事務 ID。

壓縮過程是在 Hive Metastore 中執行的,會根據一定閾值自動觸發。我們也可以使用如下語句人工觸發:

ALTER TABLE dtstack COMPACT 'MAJOR'。

6、檔案內容詳解

ALTER TABLE employee COMPACT 'minor';

語句執行前:

/user/hive/warehouse/employee/delta_0000001_0000001_0000

/user/hive/warehouse/employee/delta_0000002_0000002_0000 (insert 建立, mary的資料)

/user/hive/warehouse/employee/delete_delta_0000002_0000002_0001 (update)

/user/hive/warehouse/employee/delta_0000002_0000002_0001 (update)

語句執行後:

/user/hive/warehouse/employee/delete_delta_0000001_0000002

/user/hive/warehouse/employee/delta_0000001_0000002

7、讀 Hive 事務表

我們可以看到 ACID 事務表中會包含三類檔案,分別是 base、delta、以及 delete。檔案中的每一行資料都會以 row_id 作為標識並排序。從 ACID 事務表中讀取資料就是對這些檔案進行合併,從而得到最新事務的結果。這一過程是在 OrcInputFormat 和 OrcRawRecordMerger 類中實現的,本質上是一個合併排序的演算法。

以下列檔案為例,產生這些檔案的操作為:

  1. 插入三條記錄

  2. 進行一次 Major Compaction

  3. 然後更新兩條記錄。

1-0-0-1 是對 originalTransaction - bucketId - rowId - currentTra

file

8、合併演算法

對所有資料行按照 (originalTransaction, bucketId, rowId) 正序排列,(currentTransaction) 倒序排列,即:

originalTransaction-bucketId-rowId-currentTransaction

(base_1)1-0-0-1

(delete_2)1-0-1-2# 被跳過(DELETE)

(base_1)1-0-1-1 # 被跳過(當前記錄的 row_id(1) 和上條資料一樣)

(delete_2)1-0-2-2 # 被跳過(DELETE)

(base_1)1-0-2-1 # 被跳過(當前記錄的 row_id(2) 和上條資料一樣)

(delta_2)2-0-0-2

(delta_2)2-0-1-2

獲取第一條記錄;

  1. 如果當前記錄的 row_id 和上條資料一樣,則跳過;

  2. 如果當前記錄的操作型別為 DELETE,也跳過;

通過以上兩條規則,對於 1-0-1-2 和 1-0-1-1,這條記錄會被跳過;

如果沒有跳過,記錄將被輸出給下游;

重複以上過程。

合併過程是流式的,即 Hive 會將所有檔案開啟,預讀第一條記錄,並將 row_id 資訊存入到 ReaderKey 型別中。

file

三、ChunJun讀寫Hive事務表實戰

瞭解完Hive事務表的基本原理後,我們來為大家分享如何在ChunJun中讀寫Hive事務表。

1、事務表資料準備

-- 建立事務表

create table dtstack(

id int,

name string,

age int

)

stored as orc

TBLPROPERTIES('transactional'='true');

-- 插入 10 條測試資料

insert into dtstack (id, name, age)

values (1, "aa", 11), (2, "bb", 12), (3, "cc", 13), (4, "dd", 14), (5, "ee", 15),

   (6, "ff", 16), (7, "gg", 17), (8, "hh", 18), (9, "ii", 19), (10, "jj", 20);

2、配置 ChunJun json 指令碼

file

file

file

3、提交任務(讀寫事務表)

#啟動 Session

/root/wujuan/flink-1.12.7/bin/yarn-session.sh -t $ChunJun_HOME -d

#提交 Yarn Session 任務

#讀取事務表

/root/wujuan/ChunJun/bin/ChunJun-yarn-session.sh -job /root/wujuan/ChunJun/ChunJun-examples/json/hive3/hive3_transaction_stream.json -confProp {"yarn.application.id":"application_1650792512832_0134"}

#寫入事務表

/root/wujuan/ChunJun/bin/ChunJun-yarn-session.sh -job /root/wujuan/ChunJun/ChunJun-examples/json/hive3/stream_hive3_transaction.json -confProp {"yarn.application.id":"application_1650792512832_0134"}

根據上一行結果替換 yarn.application.id

三、ChunJun 讀寫Hive事務表原始碼分析

壓縮器是在 Metastore 境內執行的一組後臺程式,用於支援 ACID 系統。它由 Initiator、 Worker、 Cleaner、 AcidHouseKeeperService 和其他一些組成。

1、Compactor

● Delta File Compaction

在不斷的對錶修改中,會建立越來越多的delta檔案,需要這些檔案需要被壓縮以保證效能。有兩種型別的壓縮,即(minor)小壓縮和(major)大壓縮:

minor 需要一組現有的delta檔案,並將它們重寫為每個桶的一個delta檔案

major 需要一個或多個delta檔案和桶的基礎檔案,並將它們改寫成每個桶的新基礎檔案。major 需要更久,但是效果更好

所有的壓縮工作都是在後臺進行的,並不妨礙對資料的併發讀寫。在壓縮之後系統會等待,直到所有舊檔案的讀都結束,然後刪除舊檔案。

●Initiator

這個模組負責發現哪些表或分割槽要進行壓縮。這應該在元儲存中使用hive.compactor.initiator.on來啟用。 每個 Compact 任務處理一個分割槽(如果表是未分割槽的,則處理整個表)。如果某個分割槽的連續壓實失敗次數超過 hive.compactor.initiator.failed.compacts.threshold,這個分割槽的自動壓縮排程將停止。

● Worker

每個Worker處理一個壓縮任務。 一個壓縮是一個MapReduce作業,其名稱為以下形式。<hostname>-compactor-<db>.<table>.<partition>。 每個Worker將作業提交給叢集(如果定義了hive.compactor.job.queue),並等待作業完成。hive.compactor.worker.threads決定了每個Metastore中Worker的數量。 Hive倉庫中的Worker總數決定了併發壓縮的最大數量。

● Cleaner

這個程序是在壓縮後,在確定不再需要delta檔案後,將其刪除。

● AcidHouseKeeperService

這個程序尋找那些在hive.txn.timeout時間內沒有心跳的事務並中止它們。系統假定發起交易的客戶端停止心跳後崩潰了,它鎖定的資源應該被釋放。

● SHOW COMPACTIONS

該命令顯示當前執行的壓實和最近的壓實歷史(可配置保留期)的資訊。這個歷史顯示從HIVE-12353開始可用。

● Compact 重點配置

file

2、如何 debug Hive

  1. debug hive client

hive --debug

  1. debug hive metastore

hive --service metastore --debug:port=8881,mainSuspend=y,childSuspend=n --hiveconf hive.root.logger=DEBUG,console

file

  1. debug hive mr 任務

file

3、讀寫過濾和CompactorMR排序的關鍵程式碼

file

file

4、Minor&Major 合併原始碼(CompactorMR Map 類)

file

四、ChunJun 檔案系統未來規劃

最後為大家介紹ChunJun 檔案系統未來規劃:

● 基於 FLIP-27 優化檔案系統

批流統一實現,簡單的執行緒模型,分片和讀資料分離。

● Hive 的分片優化

分片更精細化,粒度更細,充分發揮併發能力

● 完善 Exactly Once 語義

加強異常情況健壯性。

● HDFS 檔案系統的斷點續傳

根據分割槽,檔案個數,檔案行數等確定端點位置,狀態儲存在 checkpoint 裡面。

● 實時採集檔案

實時監控目錄下的多個追加檔案。

● 檔案系統格式的通用性

JSON、CSV、Text、XM、EXCELL 統一抽取公共包。

袋鼠雲開源框架釘釘技術交流群(30537511),歡迎對大資料開源專案有興趣的同學加入交流最新技術資訊,開源專案庫地址:https://github.com/DTStack

「其他文章」