Flink 實踐 | B站流式傳輸架構的前世今生

語言: CN / TW / HK

本期作者

王翔宇

嗶哩嗶哩資深開發工程師

2017年加入B站,現服務於基礎架構實時團隊。先後負責B站日誌系統、實時流式傳輸工作。

魏澤豐

嗶哩嗶哩高階開發工程師

2021年加入B站,現服務於基礎架構實時團隊,負責實時流式傳輸以及Flink CDC相關工作。

高瑞超

嗶哩嗶哩資深開發工程師

2021年加入B站,現服務於基礎架構實時團隊,負責實時流式傳輸以及Flink connector相關工作。

01 背景

Lancer是B站的實時流式傳輸平臺,承載全站服務端、客戶端的資料上報/採集、傳輸、整合工作,秒級延遲,作為數倉入口是B站資料平臺的生命線。目前每日峰值 5000w/s rps, 3PB/天, 4K+條流的資料同步能力。

服務如此大的資料規模,對產品的可靠性、可擴充套件性和可維護性提出了很高的要求。流式傳輸的實現是一個很有挑戰的事情,聚焦快、準、穩的需求,  Lancer整體演進經歷了大管道模型、BU粒度管道模型、單流單作業模型三個階段的演進,下面我們娓娓道來。

02 關鍵詞說明

logid:每個業務方上報的資料流以logid進行標識,logid是資料在傳輸+整合過程中的元資訊標識。

資料來源:資料進入到lancer的入口,例如:log-agent,bfe-agent,flink cdc

lancer-gateway(資料閘道器):接收資料上報的閘道器。

資料緩衝層:也叫做內部kafka,用於解耦資料上報和資料分發。

lancer-collector(資料分發層):也叫做資料同步,可以根據實際場景完成不同端到端的資料同步。

03 技術演進

整個B站流式資料傳輸架構的演進大致經歷了三個階段。

3.1 架構V1.0-基於flume的

大管道資料傳輸架構(2019之前)

B站流式傳輸架構建立之初,資料流量和資料流條數相對較少,因此採用了全站的資料流混合在一個管道中進行處理,基於flume二次定製化的資料傳輸架構,架構如下:

  • 整個架構從資料生成到落地分為:資料來源、資料閘道器、資料緩衝、資料分發層。

  • 資料上報端基本採用sdk的方式直接傳送http和grpc請求上報。

  • 資料閘道器lancer-gateway是基於flume二次迭代的資料閘道器,用於承載資料的上報,支援兩種協議:http用於承載公網資料上報(web/app),grpc用於承載IDC內服務端資料上報。

  • 資料緩衝層使用kafka實現,用於解耦資料上報和資料分發。

  • 資料分發層lancer-collector同樣是基於flume二次迭代的資料分發層,用於將資料從緩衝層同步到ODS。

v1.0架構在使用中暴露出一些的痛點:

1. 資料來源端對於資料上報的可控性和容錯性較差,例如:

  • 資料閘道器故障情況下,資料來源端缺少快取能力,不能直接反壓,存在資料丟失隱患。

  • 重SDK:SDK中需要新增各種適配邏輯以應對上報異常情況

2. 整體架構是一個大管道模型,資源的劃分和隔離不明確,整體維護成本高,自身故障隔離性差。

3. 基於flume二次迭代的一些缺陷:

  • 邏輯複雜,效能差,我們需要的功能相對單一

  • hdfs分發場景,不支援exactly once語義,每次重啟,會導致資料大量重複

3.2 架構V2.0-BU粒度的

管道化架構(2020-2021)

針對v1.0的缺陷,我們引入了架構v2.0,架構如下:

此架構的關鍵細節如下:

1. 強化了資料上報源端的邊緣可控能力

  • 伺服器上部署log-agent承載服務端資料上報。

  • cdn上部署bfe-agent用於承載公網(web端、app端)資料上報。

  • log-agent/bfe-agent中整合資料緩衝、預聚合、流控、重試、降級等能力,資料上報sdk只需專注資料的生成和上報邏輯。

  • agent端基於logid的BU屬性,將資料路由到不同的管道。

2. 資料管道以BU為粒度搭建,管道間資源隔離,每個管道包含整套獨立的完整資料傳輸鏈路,並且資料管道支援基於airflow快速搭建。故障隔離做到BU級別。

3. 資料閘道器升級到自研lancer-gateway2.0,邏輯精簡,支援流控反壓,並且適配kafka failover, 基於k8s進行部署。

4. hdfs分發基於flink jar進行實現:支援exactly once語義保證。

V2.0架構相對於v1.0, 重點提升了資料上報邊緣的可控力、BU粒度管道間的資源劃分和隔離性。但是隨著B站流式資料傳輸規模的快速增加,對資料傳輸的時效性、成本、質量也提出了越來越高的要求,V2.0也逐漸暴露出了一些缺陷:

1. logid級別隔離性差:

  • 單個管道內部某個logid流量陡增,幾倍甚至幾十倍,依然會造成整個管道的資料分發延遲,

  • 單個管道內分發層元件故障重啟,例如:hdfs分發對應的flink jar作業掛掉重啟,從checkpoint恢復,此管道內所有的logid的hdfs分發都會存在歸檔延遲隱患。

2. 閘道器是非同步傳送模型,極端情況下(元件崩潰),存在資料丟失風險。

3. ods層區域性熱點/故障影響放大

  • 由於分發層一個作業同時分發多個logid,這種大作業模型更易受到ods層區域性熱點的影響,例如:hdfs某個datanode熱點,會導致某個分發作業整體寫阻塞,進而影響到此分發作業的其他logid, kafka分發同理。

  • hdfs單個檔案塊的所有副本失效,會導致對應分發任務整體掛掉重啟。

4. hdfs小檔案問題放大

  • hdfs分發對應的flink jar作業為了保證吞吐,整體設定的併發度相對較大。因此對於管道內的所有logid,同一時刻都會開啟併發度大小的檔案數,對於流量低的logid,就會造成小檔案數量變大的問題。

針對上述痛點,最直接的解決思路就是整體架構做進一步的隔離,以單logid為維度實現資料傳輸+分發。面臨的挑戰主要有以下幾個方面:

  • 如何保證全鏈路以logid為單位進行隔離,如何在資源使用可控的情況下合理控流並且保證資料流之間的隔離性

  • 需要與外部系統進行大量的互動,如何適配外部系統的各種問題:區域性熱點、故障

  • 整合作業的數量指數級增加,如何保障高效能、穩定性的同時並且高效的進行管理、運維、質量監控。

3.3 架構V3.0-基於Flink SQL的

單流單作業資料整合方案

在V3.0架構中,我們對整體傳輸鏈路進行了單作業單資料流隔離改造,並且基於Flink SQL支撐資料分發場景。架構如下:

相比v2.0, 資源池容量管理上依然以BU為粒度,但是每個logid的傳輸和分發相互獨立,互不影響。具體邏輯如下 :

  • agent:整體上報SDK和agent接收+傳送邏輯按照logid進行隔離改造,logid間採集傳送相互隔離。

  • lancer-gateway3.0:logid的請求處理之間相互隔離,當kafka傳送受阻,直接反壓給agent端,下面詳細介紹。

  • 資料緩衝層:每個logid對應一個獨立的內部kafka topic,實現資料的緩衝。

  • 資料分發層:分發層對每個logid的啟動獨立的flink sql作業進行資料的分發,單個logid處理受阻,只會導致當個logid的資料堆積。

相較於之前的實現,v3.0架構具有以下的優勢:

1. 可靠性:

  • 功能質量上整理鏈路可以保證資料不丟失,閘道器層以同步方式傳送資料,可以保證資料被持久化到內部kafka;flink支援狀態恢復和exactly once的語義,同樣保證資料不丟。

2. 可維護性上:

  • 隔離性上logid之間相互隔離,一個logid出現問題,其他logid不受影響。

  • 資源分配以logid為最小單位,可以精確控制單個logid的資源使用。

3. 可擴充套件性:

  • 可以以單個logid為單位靈活管控:靈活的擴縮資源

04 V3.0架構具體實現

我們重點介紹下,當前V3.0結構各個分層的實現。

4.1 資料上報邊緣層

4.1.1 log-agent

基於go自研,外掛化架構,部署於物理機,可靠、高效的支援服務端資料上報。

時間架構分為收集、處理、傳送三層,具有以下主要特性:

  • 支援檔案採集和unix sock兩種資料上報方式

  • 與閘道器GRPC通訊:ACK+退避重試+流控

  • 整體上報SDK和agent接收+傳送邏輯按照logid進行隔離改造,單logid處理相互隔離:每個logid啟動獨立的pipeline進行採集、解析、傳送。

  • 閘道器基於服務發現,自適應閘道器的調整

  • 傳送受阻情況下,基於磁碟進行本地堆積

  • logid粒度的埋點監控,實時監控資料的處理狀態

  • CGroup資源限制:CPU + 記憶體

  • 資料聚合傳送,提升傳輸效率

  • 支援物理機和容器日誌此採集,配置隨應用釋出,自適應配置的增、刪、改。

4.1.2 bfe-agent

基於go自研,部署於cdn,用於承載公網資料上報。

邊緣cdn節點,cdn伺服器上部署nginx和bfe-agent,bfe-agent整體實現架構與log-agent類似,對於web和app端資料上報請求QPS高、突發性強的特點,主要強化了以下能力:

  • 應對流量陡增:基於邊緣節點的本地緩衝起到削峰作用

  • 策略(降級、流控)前置,增強可控力

  • logid級別分流隔離, 支援等級劃分

  • 聚合壓縮回傳以提升資料傳輸效率、降低成本,回源QPS降低90%以上。

4.2 資料上報閘道器層

v3.0方案中,資料資料閘道器的架構如下:

資料閘道器功能特性如下:

  • kafka的通用代理層:支援grpc /http協議

  • 基於kafka send callback實現了同步傳送模型,保證資料不丟:資料寫入kafka後,再對請求返回ack

  • 請求不拆分:基於agent的聚合機制,只支援單次請求單條記錄,因此一條記錄對應一條快取層kakfa的訊息

  • lancer-gateway3.0根據請求的topic資訊,傳送請求到對應的kafka叢集

  • lancer-gateway3.0適配kafka叢集的區域性熱點:支援partition動態剔除

  • logid與topic一一對應,處理流程中相互隔離:一個topic傳送受阻,不影響其他的topic

整個資料閘道器中的實現難點是:單gateway承載多logid處理的過程中如何保證隔離性和公平性,我們參考了Golang 中GMP的機制,整體資料流程如下:

1. 收到的請求,會把請求放到logid對應的請求佇列,如果佇列滿,直接拒絕請求

2. 每個kafka叢集,會初始化一個N大小的kafka producer pool,其中每個producer會遍歷所有的佇列,進行資料的傳送。

3. 對於每個logid的請求佇列,會從兩個維護限制資源的佔用,以保證公平性和隔離性

  • 限制當個logid佇列繫結的producer數量

  • 基於時間片限定當個producer服務於單個佇列的時間長度

4.3 資料上報分發層

隨著flink在實時計算領域的成熟,其高效能、低延遲、exactly once語義保證、批流一體、豐富的資料來源支援、活躍的社群等優勢,促使我們選擇了以flink sql作為資料分發層的解決方案。當前我們主要支援了kafka→hive, kafka→kafka, cdc→kafka->hudi/hive三種場景:

1. kafka→hive

  • 以流式方式,實時匯入資料到hive。

  • file rolling on check,保證exactly once。

  • 按照event time寫入分割槽和歸檔,歸檔延遲小於15min

  • 支援text+lzo(行存)和 orc+zstd(列存)兩種儲存格式。

  • 支援下游作業增量同步。

2. kafka→kafka

  • 以流式方式,支援資料的實時同步

  • 支援kafka header metadata資訊的透傳

3. cdc→kafka->hudi/hive

  • 以實時流的方式同步全量和增量資料,整個cdc的使用場景分為兩個環節

  • cdc → kafka

  • 基於cdc 2.1,同步mysql的全量和增量binlog同步

  • 單sql作業支援分庫分表、多庫多表的同步。

  • 支援根據db和table自定義策略分流到不同的資料緩衝層kafka topic

  • kafka→hudi/hive

  • 消費單topic同步到單張hudi/hive表,支援event_time落分割槽。

  • 保證資料最終一致性

05 Flink connector功能迭代

在Flink SQL資料分發場景的支援中,針對我們遇到的實際需求,對社群原生connector進行了對應的優化,我們針對性的介紹下。

5.1 hive sink connector優化

斷流空分割槽提交

背景:B站離線作業的拉起依賴上游分割槽提交,HDFS分割槽提交的判斷依賴於作業整體watermark的推進,但是某些logid在斷流的情況下,如何進行分割槽的提交呢

解決辦法:

如圖所示:當所有的StreamFileWriter連續兩次checkpoint內未處理任何資料的情況下,StreamingFileCommiter會判定發生了斷流,按照當前時間提交分割槽。

支援下游增量資料同步

背景:傳統方式ods到dwd的資料同步只有當ods層分割槽ready之後才會開始,時效性較差,如何加速資料的同步?

解決辦法:

  • 不依賴ods層分割槽ready,當ods目錄中檔案生成後,即可開始資料的處理,以增量的方式讀取資料檔案。

  • 通過HDFS的list操作來獲取需要讀取的檔案,對NameNode壓力較大,為此我們提供了檔案list列表索引(包括檔名和資料條數),下游只需要讀取索引,即可獲取增量檔案列表。

  • 實現中索引檔案狀態被持久化到state中,snapshot中生成.inflight狀態臨時檔案,notifyCheckpointComplete中將檔案rename成commit正式檔案, 提供exactly once語義保證。

  • 下游作業讀取檔案索引,支援ods到dwd的增量資料同步。

orc+zstd

背景:相較於行式儲存,列式儲存在壓縮比上有著顯著的優勢。

解決辦法:支援orc+zstd, 經過測試,相較於text+lzo,空間節省在40%以上。

hdfs非同步close

背景:snapshot階段flush資料,close檔案經常因為個別檔案慢拖累整體吞吐。

解決辦法:

  • 將close超時的檔案扔到非同步佇列中。也就是 close 的動作不會去堵塞整個主鏈路的處理,提升hdfs區域性熱點情況下的吞吐。非同步close 檔案列表儲存到pendingPartsForCurrentCheckpoint,並且持久化到 state 當中。故障恢復時,也能繼續對檔案進行關閉。

  • 非同步close的引入,會引入分割槽提前建立的隱患,為此引入了對於bucket狀態的判斷。對於某分割槽,只有當隸屬於此分割槽的所有bucket中的pendingPartsForCurrentCheckpoint為空(所有檔案都進行了關閉),才在commit運算元中進行分割槽的提交。

小檔案合併

背景:rolling on checkpoint的滾動策略,會導致檔案數量的膨脹,對namenode產生較大的壓力。

解決辦法:

  • 引入了小檔案合併功能,在checkpoint完成後,由 Streaming writer 的 notifyCheckpointComplete 方法觸發合併操作,向下遊傳送EndCheckpoint訊號。

  • coordinator 收到每個writer的EndCheckpoint後,開始進行檔案的分組,封裝成一個個compactunit廣播下游,全部unit傳送完之後,再廣播EndCompaction。

  • compact operator找到屬於自己的任務後開始處理,當收到EndCompaction後,往下游傳送分割槽提交資訊。

5.2 kafka connector優化

支援protobuf format

背景:使用者有處理protobuf格式資料的需求

解決辦法:

  • 使用protoc 生成java類,打包jar,上傳到實時計算平臺。

  • 實現對應的DeserializationSchema和SerializationSchema,動態載入pb類並通過反射呼叫方法,完成pb bytes與RowData的互轉。

kafka sink支援自定義分流

背景:使用者希望在一個sql作業中根據需要,靈活定製將訊息傳送到指定kafka 叢集和topic。

解決辦法:

  • 支援使用者自定義udf,靈活選擇sql中的欄位作為udf的入參,在udf內部,使用者根據業務場景定製邏輯,返回topic或者broker list。最終sink內部發送到對應的kafka叢集和topic。

  • kakfa sink內部動態載入udf,通過反射機制實時獲取對應的broker和topic,同時支援結果的快取。

  • 例子:

CREATE TABLE sink_test (

broker_name_arg varchar,

topic_name_arg varchar,

message string,

t1 string

) WITH(

'bootstrapServers' = 'BrokerUdf(broker_name_arg)', // 根據broker_name_arg作為udf引數計算brokers

'bootstrapServers.udf.class' = 'com.bilibili.lancer.udf.BrokerUdf', // 獲取brokers Udf

'topic' = 'TopicUdf(broker_name_arg, topic_name_arg)', // 根據broker_name_arg和topic_name_arg作為udf引數計算topic

'topic.udf.class' = 'com.bilibili.lancer.udf.TopicUdf', // 計算topoc Udf

'udf.cache.min' = '1', // 快取時間

'exclude.udf.field' = 'false', // udf的相關欄位是否輸出

'connector' = 'kafka-diversion'

);

5.3 cdc connector優化

sql場景下多庫多表場景支援

背景:原生的flink cdc source在單個sql任務中,只能同步相同DDL定義的表,如果需要同步異構DDL,不得不啟動多個獨立的job進行同步。這樣會存在資源的額外開銷。

解決辦法:

  • sql定義去DDL:

    原生flink cdc source會對所有監聽到的資料在反序列化時根據sql ddl定義做column轉換和解析,以RowData的形式傳給下游。我們在cdc-source中新增了一種的format方式:changelog bytes序列化方式。該format在將資料反序列化時在不再進行column轉換和解析,而是將所有column直接轉換為changelog-json二進位制傳輸,外層將該二進位制資料直接封裝成RowData再傳給下游。對下游透明,下游在消費kafka資料的時候可以直接通過changelog-json反序列化進行資料解析。並且由於該改動減少了一次column的轉換和解析工作,通過實際測試下來發現除自動感知schema變更外還能提升1倍的吞。在kafka sink connector中,根據db和table進行分流,可以支援傳送到不同的topic。

  • 擴充套件metadata,新增sequence:

    將增量資料同步到kafka中,由於kafka存在多分割槽,因此必然會導致訊息亂序問題。因此需要提供一個單任務內嚴格單調遞增的sequence,用於下游消費者進行排序,保證資料的最終一致性。最終我們提取binlog中的gtid作為binlog訊息的sequence id,通過metadata的方式暴露處理來,寫入kafka record的header中,對於全量資料,sequence設定為0。

斷流場景分割槽提交支援

背景:由於整個cdc方案存在上游和下游兩個獨立的job,並且都是基於event time推進watermark做分割槽的提交,下游watermark的推進受阻可能受到資料正常斷流或者上游作業異常兩種原因的影響,如果正確判斷呢?

解決辦法:

  • 在cdc source connector內定義一種新型別的record HeartbeatRecord,此record時間為當前時間。當發現某張表資料停止傳送時,定期mock心跳資料進行傳送。正常斷流情況下,下游作業可以根據心跳資訊正常推進watermark,並且可以過濾丟棄此資訊。

  • 最終cdc connector sql樣例:

CREATE TABLE mysql_binlog (

host_name STRING METADATA FROM 'host_name' VIRTUAL,

db_name STRING METADATA FROM 'database_name' VIRTUAL,

table_name STRING METADATA FROM 'table_name' VIRTUAL,

operation_ts TIMESTAMP(3) METADATA FROM 'op_ts' VIRTUAL,

sequence BIGINT METADATA FROM 'sequence' VIRTUAL, // sequence嚴格單調遞增

heartbeat BOOLEAN METADATA FROM 'heartbeat'VIRTUAL, // 對於心跳資訊標識為true

mtime TIMESTAMP(3) METADATA FROM 'mtime'VIRTUAL, // 提取mtime,用於下游推進watermark

id BIGINT NOT NULL,

filed_list BYTES NOT NULL, // 去DDL,在source內部資料全部按照changelog-json格式進行序列化、

PRIMARY KEY(id) NOT ENFORCED

) WITH (

'connector' = 'mysql-cdc',

'hostname' = 'xxxx',

'port' = '3552',

'username' = 'datacenter_cdc',

'password' = 'xxx',

'database-name' = 'xxx',

'debezium.format' = 'bytes',

'table-name' = 'xxx',

'server-time-zone' = 'Asia/Shanghai',

'heartbeat.enable'='true',

'scan.incremental.snapshot.chunk.size' = '80960'

);

06 架構穩定性優化

為了保障流式傳輸穩定和高效執行,我們在以下幾個方面做了一些優化,分別介紹下:

6.1 管道熱點優化

作業在正常執行的過程中,經常遇到區域性熱點問題,例如kafka/hdfs io熱點導致區域性並行度消費速度下降或者寫入受阻、yarn佇列機器load不均勻導致作業部分並行度消費能力不如,雖然原因多種多樣,但是本質看,這些問題的一個共性就是由於區域性熱點導致區域性資料延遲。針對這個問題,我們分別從區域性流量排程和全域性流量排程兩個維度進行優化。

區域性流量排程

區域性流量排程的優化思路是在單個producer和task內部,分割槽之間進行流量的重分配。目前在兩個點就行了優化:

  • bsql Task manager內部subtask上下游通訊優化:

    整合作業並沒有keyby的需求,基於Flink Credit-based Flow Control反壓機制,可以通過Backlog Size判斷下游任務的處理負載,那麼我們就可以將Round-robin傳送的方式修改為根據Channel的Backlog Size資訊選擇負載更低的下游Channel傳送的方式。注意:此種策略只有source和sink端之間是rebalance/rescale時,才有效果。會造成一定的序列化開銷,但是測試下來可以接受。

  • kafka producer partition自動剔除機制:

    kafka producer在傳送資料callback異常(絕大多數是timeout)超出一定的閾值,會將對應tp從available partition list中進行剔除,後續record將不再發送到剔除的tp。同時,被剔除tp後續將進行恢復性測試,如果資料可以正常傳送,將重新放入到available partition list中。目前此機制在flink kafka sink connector和標準kafka client都進行了實現。

全域性流量排程

全域性流量排程的優化思路是整個傳輸鏈路層級之間的流量調配,目前我們將生產者(lancer-gateway)與消費者(flink sql kafka source)進行聯動,當消費者出現tp消費lag的情況,通過註冊黑名單(lag partition)到zookeeper,上游生產者感知黑名單,停止向高lag partition中繼續傳送資料。

Flink kafka source中基於flink AggregateFunction機制,kafka source subtask上報lag到job manager,job manager基於全域性lag判斷註冊黑名單到zookeeper

黑名單判斷邏輯:當單tp lag > min(全域性lag平均值,全域性lag中位數)* 倍數 && 單tp lag 大於 lag絕對值, 其中 "單tp lag 大於 lag絕對值" 是為了規避此機制過於敏感,"單tp lag > min(全域性lag平均值,全域性lag中位數)* 倍數" 用於篩選出頭部的lag tp。為了防止黑名單比例過大,黑名單剔除的tp數量上限不得大於全部tp數量的一定比例。

區域性流量排程和全域性流量排程在管道熱點優化效果上存在一定的互補性,但是也各有優勢。

6.2 全鏈路埋點質量監控

資料質量是重要一環,通常資料質量包含完整性、時效性、準確性、一致性、唯一性等方面,對於資料傳輸場景,當面我們重點關注完整性和時效性兩個方面

整體質量方案大致包含監控資料採集和規則配置兩個大的方向,整體架構如下:

監控資料採集

我們自研了trace系統:以logid為單位,我們在資料處理流程中的每一層都進行了監控埋點

  • 每層埋點包含三個方面:接收、傳送、內部錯誤。所有埋點資料以資料建立時間(ctime)進行視窗對齊,並且通過更新utime以統計層間和層內的處理耗時。

  • 通過監控埋點可以實時統計出:端到端、層級間、層級內部的資料處理耗時、完整性、錯誤數。

  • 當前方案缺陷:flink sql掛掉從ck恢復,監控資料不能保證冪等,後續需要進一步改進。

監控報警規則

我們針對資料流進行了分級,每個等級指定了不同的保障級別(SLA),SLA破線,報警通知oncall同學處理。

延遲歸檔報警:hdfs分割槽提交延遲,觸發報警。

實時完整性監控:基於trace資料,實時監控端到端的完整性,接收條數/落地條數

離線資料完整性:hdfs分割槽ready後,觸發dqc規則執行,對比接收條數(trace資料)/落地條數(hive查詢條數)

傳輸延遲監控:基於trace資料,計算端到端資料傳輸延遲的分位數。

DQC阻塞:離線資料完整性異常後,阻塞下游作業的排程。

6.3 kafka同步斷流重複優化

相對比2.0方案中flume方案,基於flink sql的kafka到kafka的實現方案明顯的一個變化就是作業的重啟、故障恢復會導致整體的斷流和一定比例的資料重複(從checkpoint恢復),因此如何降低使用者對此類問題的感知,至關重要。

首先梳理下可能造成問題出現的原因:1)作業升級重啟 2)task manager故障 3)job manager 故障 4)checkpoint連續失敗,同時根據flink job整體提交流程,影響作業恢復速度的關鍵環節是資源的申請。根據上述分析和針對性測試,針對kafka同步場景的斷流重複採用瞭如下優化方式:

  • checkpoint interval設定成10s:降低從checkpoint恢復導致的資料重複比例

  • 基於session模式提交作業:作業重啟無需重複申請資源

  • jobmanager.execution.failover-strategy=region,單個tm掛掉後,只恢復對應的region,不用恢復整個作業。整合作業DAG相對簡單,可以儘量規避rebalance的出現,降低恢復的比例。

  • 使用小資源粒度task manager(2core cpu,8GB memory,2 slot):同等資源規模下,tm數量變多,單tm掛掉影響程度明顯變低。

  • 針對高優作業冗餘task manager:冗餘一個tm,當單個tm掛掉情況下,流量幾乎沒受影響

  • 基於zookeeper實現job manager ha:在開啟jm ha後,jm掛掉任務未斷流

  • 針對checkpoint連續失敗的場景,我們引入了regional checkpoint,以region(而不是整個topology)作為checkpoint管理的單位,防止個別task的ck失敗造成整個作業的失敗,可以有效防止在個別task的ck連續失敗的情況下需要回溯的資料量,減小叢集波動(網路,HDFS IO等)對checkpoint的影響

經過上述優化,經過測試一個(50core,400GB memory,50 slot)規模的作業,優化效果如下:

6.4 kafka流量動態failover能力

為了保證資料及時上報,Lancer對於資料緩衝層的kafka的傳送成功率依賴性很高,經常遇到的case是高峰期或者流量抖動導致的kafka寫入瓶頸。參考Netflix Hystrix 熔斷原理,我們在閘道器層實現了一種動態 kafka failover機制:閘道器可以根據實時的資料傳送情況計算熔斷率,根據熔斷率將流量在normal kafka和failover kafka之間動態調節。

  • 基於滑動時間視窗計算熔斷比例:滑動視窗的大小為10,每個視窗中統計1s內成功和失敗的次數。

  • 熔斷器狀態:關閉/開啟/半開,熔斷率=fail_total/sum_total , 為避免極端情況流量全切到 failover,熔斷率需要有一個上限配置。熔斷後的降級策略:normal kafka 熔斷後嘗試切 failover,failover kafka 如果也熔斷的話就切回 normal

  • 判斷邏輯:

6.5 全鏈路流控、反壓、降級

從端上上報到資料落地的整個流程中,為了保證穩定性和可控性,除了前述手段,我們還引入了整體流控、反壓、降級等技術手段,下面綜合介紹下。

從後向前,分為幾個環節:

1. 資料分發層:

  • 如果出現消費延遲,資料反壓到資料緩衝層kafka

  • 單作業內部通過backlog反壓做subtask之間的流量均衡

2. 資料閘道器層:

  • 如果寫入kafka延遲,直接返回流控碼(429)給資料上報端

  • 資料閘道器層和資料分發層之間通過 kafka tp級別流控排程適配區域性tp處理延遲。

3. 資料上報層:

  • 適配資料閘道器的流控返回:做退避重試

  • 基於本地磁碟進行資料的堆積

  • 配置動態推送生效主動取樣/降級堆積

6.6 開發階段質量驗證

為了在開發階段保證整體服務的正確性和穩定性,開發階段我們設計了一套完整的測試框架。

  • 新版本上線之前,我們會同時雙跑新舊兩條作業鏈路,將資料分別落入兩張hive表,並且進行全分割槽的條數和內容md5校驗,校驗結果以小時級別/天級別報表的形式發出。此測試框架保證了版本迭代的過程中,端到端的正確性。

  • 同時為了保證異常極端情況下資料的準確性,我們也引入了混沌測試,主動注入一些異常。異常包括:job manager掛掉,taskmanager掛掉、作業隨機重啟、區域性熱點、髒資料等等。

07 未來展望

  • 鏈路架構升級,接入公司級的資料閘道器(Databus),架構統一併且可以涵蓋更多的資料上報場景。

  • 雲原生,擁抱K8S,面向使用者quota管理,並且實現自動資源AutoScale。

  • 擁抱批流一體,強化增量化整合,覆蓋離線批整合場景,打造統一基於Flink的統一化整合框架。