技術實踐 | 如何基於 Flink 實現通用的聚合指標計算框架

語言: CN / TW / HK

1 引言

網易雲信作為一個 PaaS 服務,需要對線上業務進行實時監控,實時感知服務的“心跳”、“脈搏”、“血壓”等健康狀況。通過採集服務拿到 SDK、伺服器等端的心跳埋點日誌,是一個非常龐大且雜亂無序的資料集,而如何才能有效利用這些資料?服務監控平臺要做的事情就是對海量資料進行實時分析,聚合出表徵服務的“心跳”、“脈搏”、“血壓”的核心指標,並將其直觀的展示給相關同學。這其中核心的能力便是 :實時分析和實時聚合

在之前的《網易雲信服務監控平臺實踐》一文中,我們圍繞資料採集、資料處理、監控告警、資料應用 4 個環節,介紹了網易雲信服務監控平臺的整體框架。本文是對網易雲信在聚合指標計算邏輯上的進一步詳述。

基於明細資料集進行實時聚合,生產一個聚合指標,業界常用的實現方式是 Spark Streaming、Flink SQL / Stream API。不論是何種方式,我們都需要通過寫程式碼來指定資料來源、資料清洗邏輯、聚合維度、聚合視窗大小、聚合運算元等。如此繁雜的邏輯和程式碼,無論是開發、測試,還是後續任務的維護,都需要投入大量的人力/物力成本。而我們程式設計師要做的便是化繁為簡、實現大巧不工。

本文將闡述網易雲信是如何基於 Flink 的 Stream API,實現一套通用的聚合指標計算框架。

2 整體架構

整體架構

如上圖所示,是我們基於 Flink 自研的聚合指標完整加工鏈路,其中涉及到的模組包括:

  • source:定期載入聚合規則,並根據聚合規則按需建立 Kafka 的 Consumer,並持續消費資料。
  • process:包括分組邏輯、視窗邏輯、聚合邏輯、環比計算邏輯等。從圖中可以看到,我們在聚合階段分成了兩個,這樣做的目的是什麼?其中的好處是什麼呢?做過分散式和併發計算的,都會遇到一個共同的敵人:**資料傾斜。**在我們 PaaS 服務中頭部客戶會更加明顯,所以傾斜非常嚴重,分成兩個階段進行聚合的奧妙下文中會詳細說明。
  • sink:是資料輸出層,目前預設輸出到 Kafka 和 InfluxDB,前者用於驅動後續計算(如告警通知等),後者用於資料展示以及查詢服務等。
  • reporter:全鏈路統計各個環節的執行狀況,如輸入/輸出 QPS、計算耗時、消費堆積、遲到資料量等。

下文將詳細介紹這幾個模組的設計和實現思路。

3 source

規則配置

為了便於聚合指標的生產和維護,我們將指標計算過程中涉及到的關鍵引數進行了抽象提煉,提供了視覺化配置頁面,如下圖所示。下文會結合具體場景介紹各個引數的用途。

視覺化配置頁面

規則載入

在聚合任務執行過程中,我們會定期載入配置。如果檢測到有新增的 Topic,我們會建立 kafka-consumer 執行緒,接收上游實時資料流。同理,對於已經失效的配置,我們會關閉消費執行緒,並清理相關的 reporter。

資料消費

對於資料來源相同的聚合指標,我們共用一個 kafka-consumer,拉取到記錄並解析後,對每個聚合指標分別呼叫 collect() 進行資料分發。如果指標的資料篩選規則(配置項)非空,在資料分發前需要進行資料過濾,不滿足條件的資料直接丟棄。

4 process

整體計算流程

基於 Flink 的 Stream API 實現聚合計算的核心程式碼如下所示:

SingleOutputStreamOperator<MetricContext> aggResult = src
        .assignTimestampsAndWatermarks(new MetricWatermark())
        .keyBy(new MetricKeyBy())
        .window(new MetricTimeWindow())
        .aggregate(new MetricAggFuction());
  • MetricWatermark():根據指定的時間欄位(配置項⑧)獲取輸入資料的 timestamp,並驅動計算流的 watermark 往前推進。
  • MetricKeyBy():指定聚合維度,類似於 MySQL 中 groupby,根據分組欄位(配置項⑥),從資料中獲取聚合維度的取值,拼接成分組 key。
  • MetricTimeWindow():配置項⑧中指定了聚合計算的視窗大小。如果配置了定時輸出,我們就建立滑動視窗,否則就建立滾動視窗。
  • MetricAggFuction():實現配置項②指定的各種運算元的計算,下文將詳細介紹各個運算元的實現原理。

二次聚合

對於大資料量的聚合計算,資料傾斜是不得不考慮的問題,資料傾斜意味著規則中配置的分組欄位(配置項⑥)指定的聚合 key 存在熱點。我們的計算框架在設計之初就考慮瞭如何解決資料傾斜問題,就是將聚合過程拆分成2階段:

  • 第1階段:將資料隨機打散,進行預聚合。
  • 第2階段:將第1階段的預聚合結果作為輸入,進行最終的聚合。

具體實現:判斷併發度引數 parallelism(配置項⑦) 是否大於1,如果 parallelism 大於1,生成一個 [0, parallelism) 之間的隨機數作為 randomKey,在第1階段聚合 keyBy() 中,將依據分組欄位(配置項⑥)獲取的 key 與 randomKey 拼接,生成最終的聚合 key,從而實現了資料隨機打散。

聚合運算元

作為一個平臺型的產品,我們提供瞭如下常見的聚合運算元。由於採用了二次聚合邏輯,各個運算元在第1階段和第2階段採用了相應的計算策略。

運算元 第1階段聚合 第2階段聚合
min/max/sum/count 直接對輸入資料進行預聚合計算,輸出預聚合結果 對第1階段預聚合結果進行二次聚合計算,輸出最終結果
first/last 對輸入資料的 timestamp 進行比較,記錄最小/最大的 timestamp 以及對應的 value 值,輸出 <timestamp,value> 資料對 對 <timestamp,value> 資料對進行二次計算,輸出最終的 first/last
avg 計算該分組的和值和記錄數,輸出 <sum,cnt> 資料對 對 <sum,cnt> 資料對分別求和,然後輸出:總 sum / 總 cntcount
median/tp90/tp95 統計輸入資料的分佈,輸出 NumericHistogram 對輸入的 NumericHistogram 做 merge 操作,最終輸出中位數/tp90/tp95
count-distinct 輸出記錄桶資訊和點陣圖的 RoaringArray 對 RoaringArray 進行 merge 操作,最終輸出精確的去重計數結果
count-distinct(近似) 輸出基數計數物件 HyperLoglog 對 HyperLoglog 進行 merge 操作,最終輸出近似的去重計數結果

對於計算結果受全部資料影響的運算元,如 count-distinct(去重計數),常規思路是利用 set 的去重特性,將所有統計資料放在一個 Set 中,最終在聚合函式的 getResult 中輸出 Set 的 size。如果統計資料量非常大,這個 Set 物件就會非常大,對這個 Set 的 I/O 操作所消耗的時間將不能接受。

對於類 MapReduce 的大資料計算框架,效能的瓶頸往往出現在 shuffle 階段大物件的 I/O 上,因為資料需要序列化 / 傳輸 / 反序列化,Flink 也不例外。類似的運算元還有 median 和 tp95。

為此,需要對這些運算元做專門的優化,優化的思路就是儘量減少計算過程中使用的資料物件的大小,其中:

  • median/tp90/tp95:參考了 hive percentile_approx 的近似演算法,該演算法通過 NumericHistogram(一種非等距直方圖)記錄資料分佈,然後通過插值的方式得到相應的 tp 值(median 是 tp50)。
  • count-distinct:採用 RoaringBitmap 演算法,通過壓縮點陣圖的方式標記輸入樣本,最終得到精確的去重計數結果。
  • count-distinct(近似) :採用 HyperLoglog 演算法,通過基數計數的方式,得到近似的去重計數結果。該演算法適用於大資料集的去重計數。

後處理

後處理模組,是對第2階段聚合計算輸出資料進行再加工,主要有2個功能:

  • 複合指標計算:對原始統計指標進行組合計算,得到新的組合指標。例如,要統計登入成功率,我們可以先分別統計出分母(登入次數)和分子(登入成功的次數),然後將分子除以分母,從而得到一個新的組合指標。配置項③就是用來配置組合指標的計算規則。
  • 相對指標計算:告警規則中經常要判斷某個指標的相對變化情況(同比/環比)。我們利用 Flink 的state,能夠方便的計算出同比/環比指標,配置項④就是用來配置相對指標規則。

異常資料的處理

這裡所說的異常資料,分為兩類:遲到的資料和提前到的資料。

  • 遲到資料
    • 對於嚴重遲到的資料(大於聚合視窗的 allowedLateness),通過 sideOutputLateData 進行收集,並通過 reporter 統計上報,從而能夠在監控頁面進行視覺化監控。
    • 對於輕微遲到的資料(小於聚合視窗的 allowedLateness),會觸發視窗的重計算。如果每來一條遲到資料就觸發一次第 1 階段視窗的重計算,重計算結果傳導到第 2 階段聚合計算,就會導致部分資料的重複統計。為了解決重複統計的問題,我們在第 1 階段聚合 Trigger 中進行了特殊處理:視窗觸發採用 FIRE_AND_PURGE(計算並清理),及時清理已經參與過計算的資料。
  • 提前到的資料:這部分資料往往是資料上報端的時鐘不準導致。在計算這些資料的 timestamp 時要人為干預,避免影響整個計算流的 watermark。

5 sink

聚合計算得到的指標,預設輸出到 Kafka 和時序資料庫 InfluxDB。

  • kafka-sink:將指標標識(配置項①)作為 Kafka 的topic,將聚合結果傳送出去,下游接收到該資料流後可以進一步處理加工,如告警事件的生產等。
  • InfluxDB-sink:將指標標識(配置項①)作為時序資料庫的表名,將聚合結果持久化下來,用於 API 的資料查詢、以及視覺化報表展示等。

6 reporter

為了實時監控各個資料來源和聚合指標的執行情況,我們通過 InfluxDB+Grafana 組合,實現了聚合計算全鏈路監控:如各環節的輸入/輸出 QPS、計算耗時、消費堆積、遲到資料量等。

reporter

7 結語

目前,通過該通用聚合框架,承載了網易雲信 100+ 個不同維度的指標計算,帶來的收益也是比較可觀的:

  • 提效:採用了頁面配置化方式實現聚合指標的生產,開發週期從天級縮短到分鐘級。沒有資料開發經驗的同學也能夠自己動手完成指標的配置。
  • 維護簡單,資源利用率高:100+ 個指標只需維護 1 個 flink-job,資源消耗也從 300+ 個 CU 減少到 40CU。
  • 執行過程透明:藉助於全鏈路監控,哪個計算環節有瓶頸,哪個資料來源有問題,一目瞭然。

作者介紹

聖少友,網易雲信資料平臺資深開發工程師,從事資料平臺相關工作,負責服務監控平臺、資料應用平臺、質量服務平臺的設計開發工作。

更多技術乾貨,歡迎關注【網易智企技術+】微信公眾號