Apache Doris 1.1 特性揭祕:Flink 實時寫入如何兼顧高吞吐和低延時

語言: CN / TW / HK

導讀:隨著資料實時化需求的日益增多,資料的時效性對企業的精細化運營越來越重要,使得實時數倉在這一過程中起到了不可替代的作用。本文將基於使用者遇到的問題與挑戰,揭祕 Apache Doris 1.1 特性,對 Flink 實時寫入 Apache Doris 的優化實現與未來規劃進行詳細的介紹。

背景

隨著資料實時化需求的日益增多,資料的時效性對企業的精細化運營越來越重要,在海量資料中,如何能實時有效的挖掘出有價值的資訊,快速的獲取資料反饋,協助公司更快的做出決策,更好的進行產品迭代,實時數倉在這一過程中起到了不可替代的作用

在這種形勢下,Apache Doris 作為一款實時 MPP 分析型資料庫脫穎而出,同時具備高效能、簡單易用等特性,具有豐富的資料接入方式,結合 Flink 流式計算,可以讓使用者快速將 Kafka 中的非結構化資料以及 MySQL 等上游業務庫中的變更資料,快速同步到 Doris 實時數倉中,同時 Doris 提供亞秒級分析查詢的能力,可以有效地滿足實時 OLAP、實時資料看板以及實時資料服務等場景的需求。

挑戰

通常實時數倉要保證端到端高併發以及低延遲,往往面臨諸多挑戰,比如:

  • 如何保證端到端的秒級別資料同步
  • 如何快速保證資料可見性
  • 在高併發大壓力下,如何解決大量小檔案寫入的問題?
  • 如何確保端到端的 Exactly Once 語義?

結合這些挑戰,同時對使用者使用 Flink+Doris 構建實時數倉的業務場景進行深入調研,在掌握了使用者使用的痛點之後,我們在 Doris 1.1 版本中進行了針對性的優化,大幅提升實時數倉構建的使用者體驗,同時提升系統的穩定性,系統資源消耗也得到了大幅的優化。

優化

流式寫入

Flink Doris Connector 最初的做法是在接收到資料後,快取到記憶體 Batch 中,通過攢批的方式進行寫入,同時使用 batch.size、batch.interval 等引數來控制 Stream Load 寫入的時機。這種方式通常在引數合理的情況下可以穩定執行,一旦引數不合理導致頻繁的 Stream Load,便會引發 Compaction 不及時,從而導致 version 過多的錯誤(-235);其次,當資料過多時,為了減少 Stream Load 的寫入時機,batch.size 過大的設定還可能會引發 Flink 任務的 OOM。為了解決這個問題,我們引入了流式寫入

  1. Flink 任務啟動後,會非同步發起一個 Stream Load 的 Http 請求。

  2. 接收到實時資料後,通過 Http 的分塊傳輸編碼(Chunked transfer encoding)機制持續向 Doris 傳輸資料。

  3. 在 Checkpoint 時結束 Http 請求,完成本次 Stream Load 寫入,同時非同步發起下一次 Stream Load 的請求。

  4. 繼續接收實時資料,後續流程同上。

由於採用 Chunked 機制傳輸資料,就避免了攢批對記憶體的壓力,同時將寫入的時機和 Checkpoint 繫結起來,使得 Stream Load 的時機可控,並且為下面的 Exactly-Once 語義提供了基礎。

Exactly-Once

Exactly-Once 語義是指即使在機器或應用出現故障的情況下,也不會重複處理資料或者丟失資料。Flink 很早就支援 End-to-End 的 Exactly-Once 場景,主要是通過兩階段提交協議來實現 Sink 運算元的 Exactly-Once 語義。在 Flink 兩階段提交的基礎上,同時藉助 Doris 1.0 的 Stream Load 兩階段提交,Flink Doris Connector 實現了 Exactly Once 語義,具體原理如下:

  1. Flink 任務在啟動的時候,會發起一個 Stream Load 的 PreCommit 請求,此時會先開啟一個事務,同時會通過 Http 的 Chunked 機制將資料持續傳送到 Doris。

  1. 在 Checkpoint 時,結束資料寫入,同時完成 Http 請求,並且將事務狀態設定為預提交(PreCommitted),此時資料已經寫入 BE,對使用者不可見。

  1. Checkpoint 完成後,發起 Commit 請求,並且將事務狀態設定為提交(Committed),完成後資料對使用者可見。

  1. Flink 應用意外掛掉後,從 Checkpoint 重啟時,若上次事務為預提交(PreCommitted)狀態,則會發起回滾請求,並且將事務狀態設定為 Aborted。

基於此,可以藉助 Flink Doris Connector 實現資料實時入庫時資料不丟不重。

秒級別資料同步

高併發寫入場景下的端到端秒級別資料同步以及資料的實時可見能力,需要 Doris 具備如下幾方面的能力:

事務處理能力

Flink 實時寫入以 Stream Load 2PC 的方式與 Doris 進行互動,需要 Doris 具備對應的事務處理能力,保障事務基本的 ACID 特性,在高併發場景下支撐 Flink 秒級別的資料同步。

資料版本的快速聚合能力

Doris 裡面一次匯入會產生一個數據版本,在高併發寫入場景下必然帶來的一個影響是資料版本過多,且單次匯入的資料量不會太大。持續的高併發小檔案寫入場景對 Doris 並不友好,極其考驗 Doris 資料合併的實時性以及效能,進而會影響到查詢的效能。Doris 在 1.1 中大幅增強了資料 Compaction 能力,對於新增資料能夠快速完成聚合,避免分片資料中的版本過多導致的 -235 錯誤以及帶來的查詢效率問題。

首先,在 Doris 1.1 版本中,引入了 QuickCompaction,增加了主動觸發式的 Compaction 檢查,在資料版本增加的時候主動觸發 Compaction。同時通過提升分片元資訊掃描的能力,快速的發現數據版本多的分片,觸發 Compaction。通過主動式觸發加被動式掃描的方式,徹底解決資料合併的實時性問題。

同時,針對高頻的小檔案 Cumulative Compaction,實現了 Compaction 任務的排程隔離,防止重量級的 Base Compaction 對新增資料的合併造成影響。

最後,針對小檔案合併,優化了小檔案合併的策略,採用梯度合併的方式,每次參與合併的檔案都屬於同一個資料量級,防止大小差別很大的版本進行合併,逐漸有層次的合併,減少單個檔案參與合併的次數,能夠大幅的節省系統的 CPU 消耗。

Doris 1.1 對高併發匯入、秒級別資料同步、資料實時可見等場景都做了針對性優化,大大增加了 Flink + Doris 系統的易用性以及穩定性,節省了叢集整體資源。

效果

通用 Flink 高併發場景

在調研的通用場景中,使用 Flink 同步上游 Kafka 中的非結構化資料,經過 ETL 後使用 Flink Doris Connector 將資料實時寫入 Doris 中。這裡客戶場景極其嚴苛,上游維持以每秒 10w 的超高頻率寫入,需要資料能夠在 5s 內完成上下游同步,實現秒級別的資料可見。這裡 Flink 配置為 20 併發,Checkpoint 間隔 5s,Doris 1.1 的表現相當優異。具體體現在如下幾個方面:

Compaction 實時性

資料能快速合併,Tablet 資料版本個數維持在 50 以下, Compaction Score 穩定。相比於之前高併發匯入頻出的 -235 問題,Compaction 合併效率有 10+ 倍提升

CPU 資源消耗

Doris 1.1 針對小檔案的 Compaction 進行了策略優化,在上述高併發匯入場景,CPU 資源消耗下降 25%。

QPS 查詢延遲穩定

通過降低 CPU 使用率,減少資料版本的個數,提升了資料整體有序性,從而減少了 SQL 查詢的延遲。

秒級別資料同步場景(極限大壓力)

單 BE 單 Tablet,客戶端 30 併發極限 Stream Load 壓測,資料在實時性<1s,Compaction Score 優化前後對比

使用建議

資料實時可見場景

對延遲要求特別嚴格的場景,比如秒級別資料同步,通常意味著單次匯入檔案較小,此時建議調小 cumulative_size_based_promotion_min_size_mbytes,單位是 MB,預設 64,可以設定成 8,能夠很大程度提升 Compaction 的實時性。

高併發場景

對於高併發的寫入場景,可以通過增加 Checkpoint 的間隔來減少 Stream Load 的頻率,比如 Checkpoint 可以設定為 5-10s,不僅可以增加 Flink 任務的吞吐,也可以減少小檔案的產生,避免給 Compaction 造成更多壓力。

此外,對資料實時性要求不高的場景,比如分鐘級別的資料同步,可以增加 Checkpoint 的間隔,比如 5-10 分鐘,此時 Flink Doris Connector 依然能夠通過兩階段提交 +checkpoint 機制來保證資料的完整性。

未來規劃

實時 Schema Change

目前通過 Flink CDC 實時接入資料時,當上遊業務表進行 Schema Change 操作時,必須先手動修改 Doris 中的 Schema 和 Flink 任務中的 Schema,最後再重啟任務,新的 Schema 的資料才可以同步過來。這樣使用方式需要人為的介入,會給使用者帶來極大的運維負擔。後續會針對 CDC 場景做到支援 Schema 實時變更,上游的 Schema Change 實時同步到下游,全面提升 Schema Change 的效率。

Doris 多表寫入

目前 Doris Sink 運算元僅支援同步單張表,所以對於整庫同步的操作,需要手動在 Flink 層面進行分流,寫到多個 Doris Sink 中,這無疑增加了開發者的難度,在後續版本中我們也將支援單個 Doris Sink 同步多張表,這樣就大大的簡化了使用者的操作。

自適應的 Compaction 引數調優

目前 Compaction 策略引數較多,在大部分通用場景能發揮較好的效果,但是在一些特殊場景下並不能高效的發揮作用。我們將在後續版本中持續優化,針對不同的場景,進行自適應的 Compaction 調優,在各類場景下提高資料合併效率,提升實時性。

單副本 Compaction

目前的 Compaction 策略是各 BE 單獨進行,在後續版本中我們將實現單副本 Compaction,通過克隆快照的方式實現 Compaction 任務,減少叢集 2/3 的 Compaction 任務,降低系統的負載,把更多的系統資源留給使用者側。

社群動態

很高興地告訴大家,今天 Apache Doris 1.1.1 版本已經正式Release。在 1.1.1 版本:

  • 增加了 ODBC 的向量化 Sink

  • 增加了 MemTracker,幫助 BE 側記憶體得到更好的觀測和控制

  • 修復了部分 1.1.0 版本中的 Bug

  • 1.1.1 版本同樣可支援直接從 0.15.x 跨過 1.0.x 版本進行升級

推薦大家下載和使用(下方連結需複製到瀏覽器開啟)。

發版通告:https://doris.apache.org/zh-CN/docs/releasenotes/release-1.1.1/

下載地址:https://github.com/apache/doris/releases/tag/1.1.1-rc03

加入社群

如果你對 Apache Doris 感興趣,請點選 閱讀原文 瞭解並加入 Doris!我們也發起了徵文活動 邀你講講與Doris “相遇 相知 相識”的故事,不僅有精美禮品相送,還可獲得 SelectDB 全渠道傳播曝光加持!最後,歡迎更多的開源技術愛好者加入 Apache Doris 社群,攜手成長,共建社群生態。

相關連結:

SelectDB 官方網站:

https://selectdb.com

Apache Doris 官方網站:

http://doris.apache.org

Apache Doris Github:

https://github.com/apache/doris

Apache Doris 開發者郵件組:

[email protected]

「其他文章」