實時數倉在搜尋的實踐應用(Kafka)

語言: CN / TW / HK

一、概述

Apache Kafka 發展至今,已經是一個很成熟的訊息佇列元件了,也是大資料生態圈中不可或缺的一員。Apache Kafka 社群非常的活躍,通過社群成員不斷的貢獻程式碼和迭代專案,使得 Apache Kafka 功能越發豐富、效能越發穩定,成為企業大資料技術架構解決方案中重要的一環。

Apache Kafka 作為一個熱門訊息佇列中介軟體,具備高效可靠的訊息處理能力,且擁有非常廣泛的應用領域。那麼,今天就來聊一聊基於 Kafka 的實時數倉在搜尋的實踐應用。

二、為什麼需要 Kafka

在設計大資料技術架構之前,通常會做一些技術調研。我們會去思考一下為什麼需要 Kafka?怎麼判斷選擇的 Kafka 技術能否滿足當前的技術要求?

2.1 早期的資料架構

早期的資料型別比較簡單,業務架構也比較簡單,就是將需要的資料儲存下來。比如將遊戲類的資料儲存到資料庫(MySQL、Oracle)。但是,隨著業務的增量,儲存的資料型別也隨之增加了,然後我們需要使用的大資料叢集,利用資料倉庫來將這些資料進行分類儲存,如下圖所示:

但是,資料倉庫儲存資料是有時延的,通常時延為T+1。而現在的資料服務物件對時延要求均有很高的要求,例如物聯網、微服務、移動端APP等等,皆需要實時處理這些資料。

2.2 Kafka 的出現

Kafka 的出現,給日益增長的複雜業務,提供了新的儲存方案。將各種複雜的業務資料統一儲存到 Kafka 裡面,然後在通過 Kafka 做資料分流。如下圖所示:

這裡,可以將影片、遊戲、音樂等不同型別的資料統一儲存到 Kafka 裡面,然後在通過流處理對 Kafka 裡面的資料做分流操作。例如,將資料儲存到資料倉庫、將計算的結果儲存到KV做實時分析等。

通常訊息系統常見的有兩種,它們分別是:

  • 訊息佇列:佇列消費者充當了工作組的角色,每條訊息記錄只能傳遞給一個工作程序,從而有效的劃分工作流程;

  • 生產&消費:消費者通常是互相獨立的,每個消費者都可以獲得每條訊息的副本。

這兩種方式都是有效和實用的,通過訊息佇列將工作內容分開,用於容錯和擴充套件;生產和消費能夠允許多租戶,來使得系統解耦。而 Apache Kafka 的優點之一在於它將訊息佇列、生產和消費結合到了一個強大的訊息系統當中。

同時,Kafka 擁有正確的訊息處理特性,主要體現在以下幾個方面:

  • 可擴充套件性:當 Kafka 的效能(如儲存、吞吐等)達到瓶頸時,可以通過水平擴充套件來提升效能;

  • 真實儲存:Kafka 的資料是實時落地在磁碟上的,不會因為叢集重啟或故障而丟失資料;

  • 實時處理:能夠整合主流的計算引擎(如Flink、Spark等),對資料進行實時處理;

  • 順序寫入:磁碟順序 I/O 讀寫,跳過磁頭“定址”時間,提高讀寫速度;

  • 記憶體對映:作業系統分頁儲存利用記憶體提升 I/O 效能,實現檔案到記憶體的對映,通過同步或者非同步來控制 Flush;

  • 零拷貝:將磁碟檔案的資料複製到“頁面快取”一次,然後將資料從“頁面快取”直接傳送到網路;

  • 高效儲存:Topic 和 Partition 拆為多個檔案片段(Segment),定期清理無效檔案。採用稀疏儲存,間隔若干位元組建立一條索引,防止索引檔案過大。

2.3 簡單的應用場景

這裡,我們可以通過一個簡單直觀的應用場景,來了解 Kafka 的用途。

場景:假如使用者A正在玩一款遊戲,某一天使用者A喜歡上了遊戲裡面的一款道具,打算購買,於是在當天 14:00 時充值了 10 元,在逛遊戲商店時又喜歡上了另一款道具,於是在 14:30 時又充值了 30 元,接著在 15:00 時開始下單購買,花費了 20 元,剩餘金額為 20 元。那麼,整個事件流,對應到庫表裡面的資料明細應該是如下圖所示:

三、Kafka解決了什麼問題

早期為響應專案快速上線,在伺服器或者雲伺服器上部署一個 WebServer,為個人電腦或者移動使用者提供訪問體驗,然後後臺在對接一個數據庫,為 Web 應用提供資料持久化以及資料查詢,流程如下圖所示:

但是,隨著使用者的迅速增長,使用者所有的訪問都直接通過 SQL 資料庫使得它不堪重負,資料庫的壓力也越來越大,不得不加上快取服務以降低 SQL 資料庫的荷載。

同時,為了理解使用者行為,又開始收集日誌並儲存到 Hadoop 這樣的大資料叢集上做離線處理,並且把日誌放在全文檢索系統(比如 ElasticSearch)中以便快速定位問題。由於需要給投資方看業務狀況,也需要把資料彙總到資料倉庫(比如 Hive)中以便提供互動式報表。此時的系統架構已經具有一定的複雜性了,將來可能還會加入實時模組以及外部資料互動。

本質上,這是一個數據整合問題。沒有任何一個系統能夠解決所有的事情,所以業務資料根據不同用途,存放在不同的系統,比如歸檔、分析、搜尋、快取等。資料冗餘本身沒有任何問題,但是不同系統之間太過複雜的資料同步卻是一種挑戰。如下圖所示:

而 Kafka 可以讓合適的資料以合適的形式出現在合適的地方。Kafka 的做法是提供訊息佇列,讓生產者向佇列的末尾新增資料,讓多個消費者從佇列裡面依次讀取資料然後自行處理。如果說之前連線的複雜度是 O(N^2),那麼現在複雜度降低到了 O(N),擴充套件起來也方便多了,流程如下圖所示:

四、Kafka的實踐應用

4.1 為什麼需要建設實時數倉

4.1.1 目的

通常情況下,在大資料場景中,儲存海量資料建設資料倉庫一般都是離線數倉(時延T+1),通過定時任務每天拉取增量資料,然後建立各個業務不同維度的資料,對外提供 T+1 的資料服務。計算和資料的實時性均比較差,業務人員無法根據自己的即時性需求獲取幾分鐘之前的實時資料。資料本身的價值隨著時間的流逝會逐步減弱,因此資料產生後必須儘快的到達使用者的手中,實時數倉的建設需求由此而來。

4.1.2 目標

為了適應業務高速迭代的特點,分析使用者行為,挖掘使用者價值,提高使用者留存,在實時資料可用性、可擴充套件性、易用性、以及準確性等方面提供更好的支援,因此需要建設實時數倉。主要目標包含如下所示:

  • 統一收斂資料出口:統一資料口徑,減少資料重複性建設;

  • 降低資料維護成本:提升資料準確性、及時性,優化資料使用體驗和成本;

  • 減少資料使用成本:提高資料複用率,避免實時資料重複消費。

4.2 如何構建實時數倉為搜尋提供資料

當前實時數倉比較主流的架構一般來說包含三個大的模組,它們分別是訊息佇列、計算引擎、以及儲存。結合上述對 Kafka 的綜合分析,結合搜尋的業務場景,引入 Kafka 作為訊息佇列,複用大資料平臺(BDSP)的能力作為計算引擎和儲存,具體架構如下圖所示:

4.3 流處理引擎選擇

目前業界比較通用的流處理引擎主要有兩種,它們分別是Flink和Spark,那麼如何選擇流處理引擎呢?我們可以對比以下特徵來決定選擇哪一種流處理引擎?

Flink作為一款開源的大資料流式計算引擎,它同時支援流批一體,引入Flink作為實時數倉建設的流引擎的主要原因如下:

  • 高吞吐、低延時;

  • 靈活的流視窗;

  • 輕量級容錯機制;

  • 流批一體

4.4 建設實時數倉遇到的問題

在建設初期,用於實時處理的 Kafka 叢集規模較小,單個 Topic 的資料容量非常大,不同的實時任務都會消費同一個大資料量的 Topic,這樣會導致 Kafka 叢集的 I/O 壓力非常的大。

因此,在使用的過程中會發現 Kafka 的壓力非常大,經常出現延時、I/O能效能告警。因此,我們採取了將大資料量的單 Topic 進行實時分發來解決這種問題,基於 Flink 設計瞭如下圖所示的資料分發流程。

上述流程,隨著業務型別和資料量的增加,又會面臨新的問題:

  • 資料量增加,隨著消費任務的增加,Kafka 叢集 I/O 負載大時會影響消費;

  • 不用業務之間 Topic 的消費沒有落地儲存(比如HDFS、HBase儲存等),會產生重複消費的情況;

  • 資料耦合度過高,遷移資料和任務難度大。

4.5 實時數倉方案進階

目前,主流的實時數倉架構通常有2種,它們分別是Lambda、Kappa。

4.5.1 Lambda

隨著實時性需求的提出,為了快速計算一些實時指標(比如,實時點選、曝光等),會在離線數倉大資料架構的基礎上增加一個實時計算的鏈路,並對訊息佇列實現資料來源的流失處理,通過消費訊息佇列中的資料 ,用流計算引擎來實現指標的增量計算,並推送到下游的資料服務中去,由下游資料服務層完成離線和實時結果的彙總。具體流程如下:

4.5.2 Kappa

Kappa架構只關心流式計算,資料以流的方式寫入到 Kafka ,然後通過 Flink 這類實時計算引擎將計算結果存放到資料服務層以供查詢。可以看作是在Lambda架構的基礎上簡化了離線數倉的部分。具體流程如下:

在實際建設實時數倉的過程中,我們結合這2種架構的思想來使用。實時數倉引入了類似於離線數倉的分層理念,主要是為了提供模型的複用率,同時也要考慮易用性、一致性、以及計算的成本。

4.5.3 實時數倉分層

在進階建設實時數倉時,分層架構的設計並不會像離線數倉那邊複雜,這是為了避免資料計算鏈路過長造成不必要的延時情況。具體流程圖如下所示:

  • ODS層:以Kafka 作為訊息佇列,將所有需要實時計算處理的資料放到對應的 Topic 進行處理;

  • DW層:通過Flink實時消費Topic中的資料,然後通過資料清理、多維度關聯(JOIN)等,將一些相同維度的業務系統、維表中的特徵屬性進行關聯,提供資料易用性和複用效能力,最終得到實時明細資料;

  • DIM層:用來儲存關聯的查詢的維度資訊,儲存介質可以按需選擇,比如HBase、Redis、MySQL等;

  • DA層:針對實時資料場景需求,進行高度聚合彙總,服務於KV、BI等場景。OLAP分析可以使用ClickHouse,KV可以選擇HBase(若資料量較小,可以採用Redis)。

通過上面的流程,建設實時數倉分層時,確保了對實時計算要求比較高的任務不會影響到BI報表、或者KV查詢。但是,會有新的問題需要解決:

Kafka 實時資料如何點查?

消費任務異常時如何分析?

4.5.4 Kafka監控

針對這些問題,我們調研和引入了Kafka 監控系統——Kafka Eagle(目前改名為EFAK)。複用該監控系統中比較重要的維度監控功能。

Kafka Eagle處理能夠滿足上訴兩個維度的監控需求之外,還提供了一些日常比較實用的功能,比如Topic記錄檢視、Topic容量檢視、消費和生產任務的速率、消費積壓等。我們採用了 Kafka-Eagle 來作為對實時數倉的任務監控。Kafka-Eagle 系統設計架構如下圖所示:

Kafka-Eagle 是一款完全開源的對 Kafka 叢集及應用做全面監控的系統,其核心由以下幾個部分組成:

  • 資料採集:核心資料來源 JMX 和 API 獲取;

  • 資料儲存:支援 MySQL 和 Sqlite 儲存;

  • 資料展示:消費者應用、圖表趨勢監控(包括叢集狀態、消費生產速率、消費積壓等)、開發的分散式 KSQL 查詢引擎,通過 KSQL 訊息查詢;

  • 資料告警:支援常用的 IM 告警(微信,釘釘,WebHook等),同時郵件、簡訊、電話告警也一併支援。

部分預覽截圖如下:

1)Topic最近7天寫入量分佈

預設展示所有Topic的每天寫入總量分佈,可選擇時間維度、Topic聚合維度,來檢視寫入量的分佈情況,預覽截圖如下所示:

2)KSQL查詢Topic訊息記錄

可以通過編寫SQL語句,來查詢(支援過濾條件)Topic中的訊息記錄,預覽截圖如下所示:

3)消費Topic積壓詳情

可以監控所有被消費的Topic的消費速率、消費積壓等詳情,預覽截圖如下所示:

五、參考資料

1.https://kafka.apache.org/documentation/

2.http://www.kafka-eagle.org/

3.https://github.com/smartloli/kafka-eagle

下方公眾號回覆: 實時數倉 ,直達資料