Flink 實踐 | Apache Flink 在米哈遊的落地實踐

語言: CN / TW / HK

摘要:本文是來自米哈遊大資料部對於Flink在米哈遊應用及實踐的分享。

本篇內容主要分為四個部分:

1.背景介紹

2.實時平臺建設

3.實時數倉和資料湖探索

4.未來發展與展望

背景介紹

米哈遊成立於2011年,致力於為使用者提供美好的、超出預期的產品與內容。公司陸續推出了多款高品質人氣產品,包括《崩壞學園2》、《崩壞3》、《未定事件簿》、《原神》,動態桌面軟體《人工桌面》以及社群產品《米遊社》,並圍繞原創IP打造了動畫、漫畫、音樂、小說及周邊等多元產品。總部位於中國上海,並在新加坡、美國、加拿大、日本、韓國等國家和地區進行全球化佈局。

Flink在米哈遊大資料發展過程中,一直扮演著重要角色。自實時計算平臺建立以來,Flink作為實時計算引擎,經歷了多個發展階段,實時計算平臺也在不斷的迭代完善。在米哈遊內部,實時計算平臺被稱作Mlink,主要以Flink為主,相容Spark Streaming任務。從起初的Flink Jar包任務為主,發展到以Flink Sql為主,不斷的降低的使用門檻和提高了任務的開發效率;從起初的基礎的Flink任務開發,發展到跨區域、跨雲廠商的任務多版本管理,滿足了業務發展的需求。在發展的過程中,米哈遊不斷的關注著社群的發展,並同社群和阿里雲同學保持密切的聯絡。

Mlink主要是基於Yarn資源管理的計算平臺,支援了數倉、演算法、推薦、風控、大屏等業務。任務數1000+,Sql任務佔比80%左右。使用的Yarn Vcores超5000核,記憶體10T左右。其中單個任務峰值吞吐在500萬QPS。每天吞吐的資料規模超千億。

實時平臺建設

問題剖析

在Flink探索發展的過程中,都會遇到Flink使用的一些痛點,大家遇到的,同樣,我們在探索和實踐過程中也有所感觸。總結起來,大概以下五個方面:一是Jar任務的開發成本高,對不熟悉Flink程式碼的同學使用成本過高,同時,Jar任務維護成本高,一些程式碼邏輯的改動會涉及到從新打包、上傳,上線等動作。二是任務管理功能缺失,其中多租戶、歷史版本回溯、開發版本和線上版本管理、UDF管理、血緣管理是實時平臺管理的重要內容。三是Flink引擎本身管理,主要涉及到多Flink版本管理,任務引數配置、常用Connector的二次開發、多資源環境管理等問題。四是任務的告警監控管理,任務問題診斷。五是同離線數倉互通,包括Hive Catalog管理,實時和離線排程依賴管理等。

上面的五個問題,可能是普遍的問題,所以各家公司都會基於內部自建或者開源專案二次開發,來滿足自身任務開發管理需求。對於米哈遊,除了上述五個問題,還存在跨區域,跨雲廠商中遇到的問題需要解決,主要是跨區域之後,任務上線和提交效率,跨雲廠商,資源環境不一致等。

解決方案

實時平臺建設主要圍繞如上問題。目前實時平臺架構如下:

圖1:多雲多環境實時平臺架構

前端控制雲環境的切換。Backend Service主要負責使用者許可權管理、任務的多版本管理、血緣管理,任務運維,任務上下線,任務監控和告警等工作。Executor Service主要負責任務解析、任務提交執行、任務下線和同各類資源管理器互動等工作。其中,Backend Service到Executor Service通過Thrift協議通訊,Executor Service的實現可以多語言擴充套件。架構設計主要解決跨地區跨雲廠商問題,實現任務管理和任務執行之間解耦。

圖2:Mlink平臺開發頁面

圖3:Mlink平臺運維頁面

圖4:Mlink平臺同步任務頁面

Mlink實時計算平臺主要設計了概覽、開發、資源管理、運維、資料探查、同步任務、使用者管理和執行器管理等模組。其中開發頁面主要是使用者編寫任務和引數配置,包含歷史版本管理等內容。資源管理主要是Jar包任務和UDF管理。運維主要是任務啟停、任務執行監控、任務告警配置等。資料探查部分主要是預覽部分資料功能,比如Kafka Topic支援按分割槽、按時間或者Offset預覽資料。同步任務主要是為了方便管理同步任務,比如CDC到Iceberg一鍵同步和執行管理。執行器負責Executor的運維工作,包括Executor上下線,健康狀態監控等。

遇到的挑戰

平臺建設和迭代過程中,我們也遇到了不少的挑戰,也產生了一些比較好的實踐。主要分享四個方面。

Executor Service開發和維護方面

Executor主要涉及到Jar和Sql任務解析提交部分。一開始的方案為了解決跨地區傳輸效率問題,特別是大的Jar包傳輸,由後端進行任務解析,最後傳輸job graph到Executor,Executor再通過資源管理器Api提交,這個因為後端解析環境不一致問題,部分任務解析過程中會存在action動作,特別是涉及到hive表和Iceberg表部分。最後採用後端不執行,改由Executor解析的方案。Executor在解析過程中,遇到了Executor在執行很長一段時間後,會出現元空間OOM的情況。這個主要是因為Executor不斷的載入任務需要Class類,會導致使用的元空間記憶體不斷增加。這個主要是通過任務解析完成之後,解除安裝類載入器和堆GC設定來解決。

監控方面

監控採用的是Influxdb加Grafana的方案。在隨著任務量的不斷增加,Influxdb儲存的Series超過百萬,影響監控檢視的穩定性,查詢響應緩慢。一是擴充套件Influxdb,執行端通過一致性hash的方案,分配任務Metric上報到不同Influxdb。本身通過對Flink任務上報Metric進行一定程度的精簡。其次在監控上,比如Kafka消費監控,目前是支援消費條數的延遲監控,自定義了Kafka消費延遲時間的監控,主要是採集了Kafka最慢並行度消費的時間,能夠反映Kafka消費的最大延遲時間,能夠反映某個時間點的資料一定被消費了。

圖5:Grafana監控示例

Connector二次開發方面

在CDC1.0版本基礎上迭代,支援Mysql採集的時候動態擴充套件欄位和基於時間啟動消費位點、採集的庫表、位點等Schema資訊。在CDC2.0版本基礎上,增加了全量讀取庫表流控和不需要MySQL開啟Binlog的全量初始化功能。其中多CDC例項同步可能會對上游Mysql造成壓力,採用了Kafka作為資料中轉,根據庫表主鍵欄位作為Topic的Key,保證Binlog的順序,在下游不會出現資料亂序。

Iceberg作為資料湖方案,改造的點主要是Iceberg V2表的支援上面,也就是Upsert表。建立Iceberg管理中心,會根據合併策略定期優化和清理,Flink寫入主要保證在CDC到Iceberg V2表順序性,在如何減少Delete File上,在Iceberg寫入上增加了BloomFilter的支援,能夠顯著減少Delete File大小。Iceberg管理中心,支援了V2表合併和Flink提交衝突問題。

Clickhouse方面,重構了Clickhouse寫入程式碼,優化了Clickhouse的寫入效能,支援了本地表和分散式表寫入。

資料入湖和離線排程方面

實時平臺集成了Iceberg,並支援Iceberg Hadoop、Hive、Oss、S3多種Catalog。CDC到Iceberg入湖鏈路已經在部門生產業務上線使用。在資料入湖或者入倉中,如果下游表有被離線數倉用到的地方,都會有依賴排程問題,離線任務何時啟動?目前我們主要通過計算任務的延遲時間和Checkpoint時間來確保資料已經入倉入湖。以CDC或者Kafka到Iceberg為例。首先採集CDC端採集延遲時間,Kafka採集最慢並行度延遲時間,同時採集任務Checkpoint時間。現在的Checkpoint完成,Iceberg版本不一定會更新,基於此,對Iceberg寫入進行了改造。這樣一個同步任務,如果CDC採集端沒有延遲,Checkpoint也已經完成,可以保證某個小時的資料一定已經入倉。實時平臺提供任務延遲查詢介面。離線排程以此介面為排程依賴節點。這樣就保證了離線任務啟動時候,入倉資料的完整性。

實時數倉和資料湖探索

實時資料採集,目前主要是三條鏈路。一是日誌型別,主要是通過Filebeat採集寫入Kafka,Es作為Filebeat的監控。二是Api介面上報服務,後端接入Kafka。三是CDC採集全量加增量Mysql資料,寫入Kafka或者直接寫入Iceberg。之前是採用Canal作為增量採集方案,現在已經全部改為了CDC。

實時數倉架構設計和業內基本一致,包括ods、dwd、dws層,之後輸出到各應用系統,比如Clickhouse、Doris、Mysql、Redis等。目前主要以Kafka作為中間承載,也在探索Iceberg作為中間層的使用。Iceberg雖然具有流讀功能,但是流讀時候資料的順序性問題,一直沒有較好的方案解決,我們也是在探索過程中。探索的主要方向有兩個,一是將Kafka和Iceberg作為混合Source方案,Flink任務讀取混合Source之後,基於Iceberg快照記錄的Kafka位點,確定讀取範圍和切換點。二是社群Flip-188提出的引入Dynamic Table儲存實現。Flink內建表由兩部分組成,LogStore和FileStore。LogStore將滿足訊息系統的需要,而FileStore是列式格式檔案系統。在每個時間點,LogStore和FileStore都會為最新寫入的資料儲存完全相同的資料(LogStore有TTL),但物理佈局不同。

在實時數倉探索方面,主要是CDC到Iceberg入湖任務,已經在生產上使用。其中主要解決了四個問題:一是CDC採集問題,特別是多庫多表採集,會集中採集到Kafka,減少多個CDC任務對同一資料庫影響。二是Iceberg支援V2表寫入,包括寫入的索引過濾減少Delete檔案,Iceberg管理中心合併和提交衝突。三是支援分庫分表的資料校驗和資料延遲檢查。四是一鍵式任務生成。對於使用者而言,只需要填寫資料庫相關資訊,目標Iceberg表庫名和表名,並支援使用Kafka中轉,避免多個CDC例項採集同一個資料庫例項。通過上述四個問題的解決,能夠達到資料庫資料分鐘級資料入湖,入湖的資料校驗和資料延遲依賴達成,方便下游離線任務排程啟動。

圖6:資料入湖鏈路

未來發展與展望

主要有四點:

一是Flink動態表儲存能夠儘快實現落地,實現真正的實時數倉和流表一體。

二是Flink任務動態擴縮容、基於任務診斷的主動資源調整、細粒度資源調整。

三是Flink對批任務的讀寫優化,目前批任務Flink的使用面不如Spark,如果未來能夠在此補足,可以做到流批操作一個引擎,開發成本會顯著降低。

四是Flink加資料湖更好的落地推廣。