Apache RocketMQ + Hudi 快速構建 Lakehouse

語言: CN / TW / HK

本文目錄

  • 背景知識

  • 大資料時代的構架演進

  • RocketMQ Connector&Stream

  • Apache Hudi

  • 構建Lakehouse實操

本文標題包含三個關鍵詞:Lakehouse、RocketMQ、Hudi。我們先從整體Lakehouse架構入手,隨後逐步分析架構產生的原因、架構元件特點以及構建Lakehouse架構的實操部分。

01

背 景 知 識

1 、Lakehouse架構

Lakehouse最初由Databrick提出,並對Lakehouse架構特徵有如下要求:

(1)事務支援

企業內部許多資料管道通常會併發讀寫資料。對ACID事務的支援確保了多方併發讀寫資料時的一致性問題;

(2)Schema enforcement and governance

Lakehouse應該有一種方式可以支援模式執行和演進、支援DW schema的正規化(如星星或雪花模型),能夠對資料完整性進行推理,並且具有健壯的治理和審計機制;

(3)開放性

使用的儲存格式是開放式和標準化的(如parquet),並且為各類工具和引擎,包括機器學習和Python/R庫,提供API,以便它們可以直接有效地訪問資料;

(4)BI支援

Lakehouse可以直接在源資料上使用BI工具。這樣可以提高資料新鮮度、減少延遲,並且降低了在資料池和資料倉庫中操作兩個資料副本的成本;

(5)儲存與計算分離

在實踐中,這意味著儲存和計算使用單獨的叢集,因此這些系統能夠擴充套件到支援更大的使用者併發和資料量。一些現代數倉也具有此屬性;

(6)支援從非結構化資料到結構化資料的多種資料型別

Lakehouse可用於儲存、優化、分析和訪問許多資料應用所需的包括image、video、audio、text以及半結構化資料;

(7)支援各種工作負載

包括資料科學、機器學習以及SQL和分析。可能需要多種工具來支援這些工作負載,但它們底層都依賴同一資料儲存庫;

(8)端到端流

實時報表是許多企業中的標準應用。對流的支援消除了需要構建單獨系統來專門用於服務實時資料應用的需求。

從上述對Lakehouse架構的特點描述我們可以看出,針對單一功能,我們可以利用某些開源產品組合構建出一套解決方案。但對於全部功能的支援,目前好像沒有一個通用的解決方案。接下來,我們先了解大資料時代主流的資料處理架構是怎樣的。

02

大資料時代的架構演進

1、大資料時代的開源產品

大資料時代的開源產品種類繁多,訊息領域的RocketMQ、Kafka; 計算領域的flink、spark、storm; 儲存領域的HDFS、Hbase、Redis、ElasticSearch、Hudi、DeltaLake等等。

為什麼會產生這麼多開源產品呢?首先在大資料時代資料量越來越大,而且每個業務的需求也各不相同,因此就產生出各種型別的產品供架構師選擇,用於支援各類場景。然而眾多的品類產品也給架構師們帶來一些困擾,比如選型困難、試錯成本高、學習成本高、架構複雜等等。

2、當前主流的多層架構

大資料領域的處理處理場景包含資料分析、BI、科學計算、機器學習、指標監控等場景,針對不同場景,業務方會根據業務特點選擇不同的計算引擎和儲存引擎;例如交易指標可以採用binlog + CDC+ RocketMQ + Flink + Hbase + ELK組合,用於BI和Metric視覺化。

(1)多層架構的優點:支援廣泛的業務場景;

(2)多層架構的缺點:

  • 處理鏈路長,延遲高;

  • 資料副本多,成本翻倍;

  • 學習成本高;

造成多層架構缺點主要原因是儲存鏈路和計算鏈路太長。

  • 我們真的需要如此多的解決方案來支援廣泛的業務場景嗎? Lakehouse架構是否可以統一解決方案?

  • 多層架構的儲存層是否可以合併? Hudi產品是否能夠支援多種儲存需求?

  • 多層架構的計算層是否可以合併? RocketMQ stream是否能夠融合訊息層和計算層?

當前主流的多層架構

3、Lakehouse架構產生

Lakehouse架構是多層架構的升級版本,將儲存層複雜度繼續降低到一層。再進一步壓縮計算層,將訊息層和計算層融合,RocketMQ stream充當計算的角色。我們得到如下圖所示的新架構。新架構中,訊息出入口通過RocketMQ connector實現,訊息計算層由RocketMQ stream實現,在RocketMQ內部完成訊息計算中間態的流轉;計算結果通過RocketMQ-Hudi-connector收口落庫Hudi,Hudi支援多種索引,並提供統一的API輸出給不同產品。

Lakehouse架構

下面我們分析下該架構的特點。

(1)Lakehouse架構的優點:

  • 鏈路更短,更適合實時場景,資料新鮮感高;

  • 成本可控,降低了儲存成本;

  • 學習成本低,對程式設計師友好;

  • 運維複雜度大幅降低;

(2)Lakehouse架構的缺點

對訊息產品和資料湖產品的穩定性、易用性等要求高,同時訊息產品需要支援計算場景,資料湖產品需要提供強大的索引功能。

(3)選擇

在Lakehouse架構中我們選擇訊息產品RocketMQ和資料湖產品Hudi。

同時,可以利用RocketMQ stream在RocketMQ叢集上將計算層放在其中整合,這樣就將計算層降低到一層,能夠滿足絕大部分中小型大資料處理場景。

接下來我們逐步分析RocketMQ和Hudi兩款產品的特點。

03

RocketMQ Connector & Stream

RocketMQ  發展歷程圖

RocketMQ從2017年開始進入Apache孵化,2018年RocketMQ 4.0釋出完成雲原生化,2021年RocketMQ 5.0釋出全面融合訊息、事件、流。

1、業務訊息領域首選

RocketMQ作為一款“讓人睡得著覺的訊息產品”成為業務訊息領域的首選,這主要源於產品的以下特點:

(1)金融級高可靠

經歷了阿里巴巴雙十一的洪峰檢驗;

(2)極簡架構

如下圖所示, RocketMQ的架構主要包含兩部分包括:源資料叢集NameServer Cluster和計算儲存叢集Broker Cluster。

RocketMQ 構架圖

NameServer節點無狀態,可以非常簡單的進行橫向擴容。Broker節點採用主備方式保證資料高可靠性,支援一主多備的場景,配置靈活。

搭建方式:只需要簡單的程式碼就可以搭建RocketMQ叢集:

Jar:

 nohup sh bin/mqnamesrv &
nohup sh bin/mqbroker -n localhost:9876 &

On K8S:

kubectl apply -f example/rocketmq_cluster.yaml

(3) 極低運維成本

RocketMQ的運維成本很低,提供了很好的CLI工具MQAdmin,MQAdmin提供了豐富的命令支援,覆蓋叢集健康狀態檢查、叢集進出流量管控等多個方面。 例如,mqadmin clusterList一條命令可以獲取到當前叢集全部節點狀態(生產消費流量、延遲、排隊長度、磁碟水位等); mqadmin updateBrokerConfig命令可以實時設定broker節點或topic的可讀可寫狀態,從而可以動態摘除臨時不可用節點,達到生產消費的流量遷移效果。

(4)豐富的訊息型別

RocketMQ支援的訊息型別包括:普通訊息、事務訊息、延遲訊息、定時訊息、順序訊息等。能夠輕鬆支援大資料場景和業務場景。

(5)高吞吐、低延遲

壓測場景主備同步複製模式,每臺Broker節點都可以將磁碟利用率打滿,同時可以將p99延遲控制在毫秒級別。

2、RocketMQ 5.0概況

RocketMQ 5.0是生於雲、長於雲的雲原生訊息、事件、流超融合平臺,它具有以下特點:

(1)輕量級SDK

  • 全面支援雲原生通訊標準 gRPC 協議;

  • 無狀態 Pop 消費模式,多語言友好,易整合;

(2)極簡架構

  • 無外部依賴,降低運維負擔;

  • 節點間鬆散耦合,任意服務節點可隨時遷移;

(3)可分可合的儲存計算分離

  • Broker 升級為真正的無狀態服務節點,無 binding;

  • Broker 和 Store節點分離部署、獨立擴縮;

  • 多協議標準支援,無廠商鎖定;

  • 可分可合,適應多種業務場景,降低運維負擔;

如下圖所示,計算叢集(Broker)主要包括抽象模型和相對應的協議適配,以及消費能力和治理能力。儲存叢集(Store)主要分為訊息儲存CommitLog(多型別訊息儲存、多模態儲存)和索引儲存Index(多元索引)兩部分,如果可以充分發揮雲上儲存的能力,將CommitLog和Index配置在雲端的檔案系統就可以天然的實現儲存和計算分離。

(4)多模儲存支援

  • 滿足不同基礎場景下的高可用訴求;

  • 充分利用雲上基礎設施,降低成本;

(5)雲原生基礎設施:

  • 可觀測效能力雲原生化,OpenTelemetry 標準化;

  • Kubernetes 一鍵式部署擴容交付。

RocketMQ 5.02021年度大事件及未來規劃

3 RocketMQConnector

a、傳統資料流

(1)傳統資料流的弊端

  • 生產者消費者程式碼需要自己實現,成本高;

  • 資料同步的任務沒有統一管理;

  • 重複開發,程式碼質量參差不齊;

(2)解決方案:RocketMQ Connector

  • 合作共建,複用資料同步任務程式碼;

  • 統一的管理排程,提高資源利用率;

b、RocketMQ Connector資料同步流程

相比傳統資料流,RocketMQ connector資料流的不同在於將 source 和 sink 進行統一管理,同時它開放原始碼,社群也很活躍。

4、RocketMQ Connector架構

如上圖所示,RocketMQ Connector架構主要包含Runtime和Worker兩部分,另外還有生態Source&Sink。

(1)標準:OpenMessaging

(2)生態:支援ActiveMQ、Cassandra、ES、JDBC、JMS、MongoDB、Kafka、RabbitMQ、Mysql、Flume、Hbase、Redis等大資料領域的大部分產品;

(3)元件:Manager統一管理排程,如果有多個任務可以將所有任務統一進行負載均衡,均勻的分配到不同Worker上,同時Worker可以進行橫向擴容。

5、RocketMQ Stream

RocketMQ Stream是一款將計算層壓縮到一層的產品。它支援一些常見的運算元如window、join、維表,相容Flink SQL、UDF/UDAF/UDTF。

04

Apache Hudi

Hudi 是一個流式資料湖平臺,支援對海量資料快速更新。內建表格式,支援事務的儲存層、一系列表服務、資料服務(開箱即用的攝取工具)以及完善的運維監控工具。Hudi 可以將儲存解除安裝到阿里雲上的 OSS、AWS 的S3這些儲存上。

Hudi的特性包括:

  • 事務性寫入,MVCC/OCC併發控制;

  • 對記錄級別的更新、刪除的原生支援;

  • 面向查詢優化:小檔案自動管理,針對增量拉取優化的設計,自動壓縮、聚類以優化檔案佈局;

Apache Hudi是一套完整的資料湖平臺。它的特點有:

  • 各模組緊密整合,自我管理;

  • 使用 Spark、Flink、Java 寫入;

  • 使用 Spark、Flink、Hive、Presto、Trino、Impala、

    AWS Athena/Redshift等進行查詢;

  • 進行資料操作的開箱即用工具/服務。

Apache Hudi主要針對以下三類場景進行優化:

1、流式處理棧

(1) 增量處理;

(2) 快速、高效;

(3) 面向行;

(4) 未優化掃描;

2、批處理棧

(1) 批量處理;

(2) 低效;

(3) 掃描、列存格式;

3、增量處理棧

(1) 增量處理;

(2) 快速、高效;

(3) 掃描、列存格式。

05

構建 Lakehouse 實操

該部分只介紹主流程和實操配置項,本機搭建的實操細節可以參考附錄部分。

1、準備工作

RocketMQ version:4.9.0

rocketmq-connect-hudi version:0.0.1-SNAPSHOT

Hudi version:0.8.0

2、構建RocketMQ-Hudi-connector

(1) 下載:

git clone https://github.com/apache/rocketmq-externals.git

(2) 配置:

/data/lakehouse/rocketmq-externals/rocketmq-connect/rocketmq-connect-runtime/target/distribution/conf/connect.conf 中connector-plugin 路徑

(3) 編譯:

cd rocketmq-externals/rocketmq-connect-hudi
mvn clean install -DskipTest -U

rocketmq-connect-hudi-0.0.1-SNAPSHOT-jar-with-dependencies.jar就是我們需要使用的rocketmq-hudi-connector

3、執行

(1) 啟動或使用現有的RocketMQ叢集,並初始化元資料Topic:

connector-cluster-topic (叢集資訊) connector-config-topic (配置資訊)

connector-offset-topic (sink消費進度) connector-position-topic (source資料處理進度 並且為了保證訊息有序,每個topic可以只建一個queue)

(2) 啟動RocketMQ connector執行時

cd /data/lakehouse/rocketmq-externals/rocketmq-connect/rocketmq-connect-runtime
sh ./run_worker.sh ## Worker可以啟動多個

(3) 配置並啟動RocketMQ-hudi-connector任務

請求RocketMQ connector runtime建立任務

curl http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-hudi-sink-connector-name} ?config='{"connector-class":"org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector","topicNames":"topicc","tablePath":"file:///tmp/hudi_connector_test","tableName":"hudi_connector_test_table","insertShuffleParallelism":"2","upsertShuffleParallelism":"2","deleteParallelism":"2","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.RocketMQConverter","source-rocketmq":"127.0.0.1:9876","src-cluster":"DefaultCluster","refresh-interval":"10000","schemaPath":"/data/lakehouse/config/user.avsc"\}’
啟動成功會列印如下日誌:
2021-09-06 16:23:14 INFO pool-2-thread-1 - Open HoodieJavaWriteClient successfully

(4) 此時向source topic生產的資料會自動寫入到1Hudi對應的table中,可以通過Hudi的api進行查詢。

4、配置解析

(1) RocketMQ connector需要配置RocketMQ叢集資訊和connector外掛位置,包含:connect工作節點id標識workerid、connect服務命令接收埠httpPort、rocketmq叢集namesrvAddr、connect本地配置儲存目錄storePathRootDir、connector外掛目錄pluginPaths 。

RocketMQ connector配置表

(2) Hudi任務需要配置Hudi表路徑tablePath和表名稱tableName,以及Hudi使用的Schema檔案。

Hudi任務配置表

點選閱讀原文即可檢視Lakehouse構建實操影片

附錄: 在本地Mac系統構建Lakehouse demo

涉及到的元件: ro cketmq、rocketmq-connector-runtime、rocketmq-connect-hudi、hudi、hdfs、avro、spark-shell

0、啟動hdfs
下載hadoop包

https://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-2.10.1/hadoop-2.10.1.tar.gz

cd /Users/osgoo/Documents/hadoop-2.10.1
vi core-site.xml
<configuration>
<property>
<name>fs.defaultFS</name>
<!-- 可以通過命令hostname 檢視主機名字 這裡的主機名字是hadoop1-->
<value>hdfs://localhost:9000</value>
</property>
<!--覆蓋掉core-default.xml中的預設配置-->
</configuration>


vi hdfs-site.xml
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>


./bin/hdfs namenode -format
./sbin/start-dfs.sh
jps 看下namenode,datanode
lsof -i:9000
./bin/hdfs dfs -mkdir -p /Users/osgoo/Downloads


1、啟動rocketmq叢集,建立rocketmq-connector內建topic
QickStart:https://rocketmq.apache.org/docs/quick-start/
sh mqadmin updatetopic -t connector-cluster-topic -n localhost:9876 -c DefaultCluster
sh mqadmin updatetopic -t connector-config-topic -n localhost:9876 -c DefaultCluster
sh mqadmin updatetopic -t connector-offset-topic -n localhost:9876 -c DefaultCluster
sh mqadmin updatetopic -t connector-position-topic -n localhost:9876 -c DefaultCluster


2、建立資料入湖的源端topic,testhudi1
sh mqadmin updatetopic -t testhudi1 -n localhost:9876 -c DefaultCluster


3、編譯rocketmq-connect-hudi-0.0.1-SNAPSHOT-jar-with-dependencies.jar
cd rocketmq-connect-hudi
mvn clean install -DskipTest -U


4、啟動rocketmq-connector runtime
配置connect.conf
--------------
workerId=DEFAULT_WORKER_1
storePathRootDir=/Users/osgoo/Downloads/storeRoot


## Http port for user to access REST API
httpPort=8082


# Rocketmq namesrvAddr
namesrvAddr=localhost:9876


# Source or sink connector jar file dir,The default value is rocketmq-connect-sample
pluginPaths=/Users/osgoo/Downloads/connector-plugins
---------------
拷貝 rocketmq-hudi-connector.jar 到 pluginPaths=/Users/osgoo/Downloads/connector-plugins


sh run_worker.sh


5、配置入湖config
curl http://localhost:8082/connectors/rocketmq-connect-hudi?config='\{"connector-class":"org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector","topicNames":"testhudi1","tablePath":"hdfs://localhost:9000/Users/osgoo/Documents/base-path7","tableName":"t7","insertShuffleParallelism":"2","upsertShuffleParallelism":"2","deleteParallelism":"2","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.RocketMQConverter","source-rocketmq":"127.0.0.1:9876","source-cluster":"DefaultCluster","refresh-interval":"10000","schemaPath":"/Users/osgoo/Downloads/user.avsc"\}'


6、傳送訊息到testhudi1




7、## 利用spark讀取


cd /Users/osgoo/Downloads/spark-3.1.2-bin-hadoop3.2/bin


./spark-shell \
--packages org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:3.0.1 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'


import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._


val tableName = "t7"
val basePath = "hdfs://localhost:9000/Users/osgoo/Documents/base-path7"


val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath + "/*")
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")


spark.sql("select * from hudi_trips_snapshot").show()

歡迎加入釘釘群與 Rocketmq 愛好者討論交流:

釘釘掃碼加群

END

Apache RocketMQ

RocketMQ - “訊息、事件、流”

一體化融合處理平臺