Presto on Apache Kafka 在 Uber 的大規模應用

語言: CN / TW / HK

本文最初發佈於 Uber 官方部落格,InfoQ 經授權翻譯如下

Uber 的目的就是要讓全世界變得更好,而大資料是一個非常重要的部分。Presto 和 Apache Kafka 在 Uber 的大資料棧中扮演了重要角色。Presto 是查詢聯盟的事實標準,它已經在互動查詢、近實時資料分析以及大規模資料分析中得到應用。Kafka 是一個支援很多用例的資料流中樞,比如 pub/sub、流處理等。在這篇文章中,我們將探討如何將這兩個重要的服務結合起來,即在 Uber 的 Kafka 上,通過 Presto 實現輕量級的互動式 SQL 查詢。

圖 1:Uber 的大資料棧

Uber 的 Presto 專案

Uber 通過開源的 Presto ,可以對任何資料來源進行查詢,不管是動態資料還是靜態資料。Presto 的多功能性讓我們可以做出智慧的、資料驅動的業務決策。我們運營著大約 15 個 Presto 叢集,跨越 5000 多個節點。我們擁有 7000 個每週活躍的使用者,他們每天都會進行 50 萬次的查詢,從 HDFS 中讀取 50PB 左右的資料。現在,Presto 可以通過可擴充套件的資料來源聯結器,查詢多種資料來源,比如 Apache HiveApache Pinot 、AresDb、MySQL、Elasticsearch 和 Apache Kafka。你還可以在我們之前的一些博文中找到更多有關 Presto 的資訊:

Uber 的 Apache Kafka 專案

Uber 是 Apache Kafka 部署規模最大的公司之一,每天處理數萬億條訊息和多達 PB 的資料。從圖 2 可以看出,Apache Kafka 是我們技術棧的基礎,支援大量不同的工作流,其中包括一個 pub-sub 訊息匯流排,用於從 Rider 和 Driver 應用中傳送事件資料,諸如 Apache Flink® 的流分析,把資料庫變更記錄傳送到下游使用者,並且把各種各樣的資料攝入到 Uber 的 Apache Hadoop® 資料湖中。我們已經完成了許多有趣的工作,以保證其效能、可靠性和使用者友好,比如:

圖 2:Uber 的 Kafka 專案

問題陳述

多年來,Uber 的資料團隊對於 Kafka 流的分析需求不斷增加,這是由於實時資料的及時、臨時的資料分析為資料科學家、工程師和運營團隊提供了寶貴的資訊。

下面是 Kafka 團隊的一個典型請求示例:運營團隊正在調查為何一些訊息沒有被一個關鍵的服務所處理,從而會對終端使用者造成直接影響。運營團隊隨後收集了一些 UUID,這些 UUID 報告了問題,並要求檢查它們是否存在於服務的輸入 / 輸出 Kafka 流中。如圖 3 所示,該請求可以被表述為查詢:“Kafka 主題 T 中是否缺少 UUID 為 X 的順序?”

圖 3:假定用例:檢查 Kafka 主題中是否缺少 UUID X 的順序

考慮的替代方案

這樣的問題一般都是由大資料進行實時分析來解決。在這個領域的各種技術中,我們專注於兩類開源解決方案,即:流處理和實時 OLAP 資料儲存。

流處理引擎,例如 Apache FlinkApache Storm™ksql 可以持續地處理流,並且輸出經過處理的流或者增量的維護可更新的檢視。由於使用者想要在以前的事件中執行點查詢或者執行分析查詢,所以這種流處理並不適用於以上問題。

另一方面,實時 OLAP 資料儲存,如 Apache PinotApache DruidClickhouse ,則更適合。這些 OLAP 儲存配備了高階的索引技術,所以可以為 Kafka 資料流建立索引,從而實現低延遲的查詢。實際上,Uber 早在數年之前就已經開始使用 Apache Pinot,而現在,Pinot 已經成為 Uber 資料平臺中的一個重要技術,它可以為多個關鍵任務進行實時分析應用。你可以看看我們以前發表的 博文 ,討論 Uber 如何使用 Pinot。

但是,實時 OLAP 需要一個非同尋常的載入過程,以建立一個從 Kafka 流中攝入的表,並對該表進行優化以達到最好的效能。另外,OLAP 儲存還需要儲存和計算資源來提供服務,因此這種解決方案被推薦給那些反覆查詢表並要求較低延遲的用例(如面向使用者的應用),但不包括臨時性的故障排除或探索。

所以,這個問題促使 Kafka 和 Presto 團隊共同尋找一種基於下列因素的輕量級解決方案:

  1. 它重用了現有的 Presto 部署,這是一項成熟的技術,在 Uber 已有多年實戰檢驗。

  2. 它不需要任何載入:Kafka 主題可以被發現,並且在建立後可以立即被查詢。

  3. Presto 以其強大的跨資料來源的查詢聯合能力而聞名,因此可以讓 Kafka 和其他資料來源(如 Hive/MySQL/Redis)進行關聯,從而獲得跨資料平臺的洞察力。

然而,這種 Presto 方法也存在其侷限性。例如,由於 Kafka 聯結器沒有建立索引,所以它的效能比實時 OLAP 儲存要差。另外,對於 Uber 的可擴充套件性需求,在聯結器上還有其他挑戰需要解決,我們將在下一節詳細說明。

Uber 面臨的挑戰

Presto 已經有一個 Kafka 聯結器 ,支援通過 Presto 查詢 Kafka。然而,這個解決方案並不完全適合我們在 Uber 的大規模 Kafka 架構。其中存在一些挑戰:

  • Kafka Topic 和 Cluster Discovery:在 Uber,我們將 Kafka 作為一種服務來提供,使用者可以隨時通過自助服務門戶向 Kafka 搭載新的主題。因此,我們必須要有一個動態的 Kafka 主題發現。但是,當前 Presto Kafka 聯結器中的 Kafka 主題和叢集發現是靜態的,因此需要我們在每次搭載新主題時都要重啟聯結器。

  • 資料模式發現:與 Kafka 主題和叢集發現類似,我們將模式註冊作為一項服務提供,並支援使用者自助載入。因此,我們需要 Presto-Kafka 聯結器能夠按需檢索最新的模式。

  • 查詢限制:對於我們來說,限制每一個查詢能夠從 Kafka 中消耗的資料數量非常重要。Uber 擁有很多大型的 Kafka 主題,其位元組率可以達到 500M/s。我們知道,與其他替代方案相比,Presto-Kafka 查詢速度相對緩慢,而要從 Kafka 中提取大量資料的查詢,則要花費相當長的時間。這對於使用者的體驗和 Kafka 叢集的健康都是不利的。

  • 配額控制:作為一個分散式的查詢引擎,Presto 可以以非常高的吞吐量同時消耗 Kafka 的訊息,這可能會導致 Kafka 叢集的潛在叢集退化。限制 Presto 的最大消費吞吐量對於 Kafka 叢集的穩定性至關重要。

架構

Uber 的資料生態系統為使用者提供了一種方法,可以編寫一個 SQL 查詢,並將其提交給 Presto 叢集執行。每個 Presto 叢集都有一個協調器節點,負責解析 SQL 語句,規劃查詢,併為人工節點執行的任務進行排程。Presto 內部的 Kafka 聯結器允許將 Kafka 主題作為表格使用,主題中的每條訊息在 Presto 中被表示為一行。在收到查詢時,協調器會確定查詢是否有適當的過濾器。一旦驗證完成,Kafka 聯結器從 Kafka 叢集管理服務中獲取叢集和主題資訊,從模式服務中獲取模式。然後, Presto 工作器與 Kafka 叢集並行對話,獲取所需的 Kafka 訊息。我們還為 Presto 使用者在 Kafka 叢集上設定了一個代理配額,這可以防止叢集的降級。

圖 4:高階架構

詳細改進

下面幾節將深入探討我們為克服現有 Presto Kafka 聯結器的侷限性所做的改進,使其能夠用於大規模用例。

Kafka 叢集 / 主題和資料模式發現

我們做了一些改變以實現按需的叢集 / 主題和模式發現。首先,Kafka 主題元資料和資料模式是在執行時通過 KafkaMetadata 獲取的,我們提取了 TableDescriptionSupplier 介面來提供這些元資料,然後我們擴充套件了該介面並實現了一個新的策略,在執行時從內部 Kafka 叢集管理服務和模式註冊中心讀取 Kafka 主題元資料。同樣地,我們重構了 KafkaClusterMetadataSupplier ,並實現了一個新的策略,在執行時讀取叢集元資料。由於叢集元資料是按需獲取的,我們也能夠在一個 Kafka 聯結器中支援多個 Kafka 叢集。為所有這些元資料增加一個快取層,以減少對 Kafka 叢集管理模式服務的請求數量。

圖 5:Kafka 叢集 / 主題和資料模式發現

查詢過濾器

為了提高 Kafka 和 Presto 叢集的可靠性,我們希望避免大型查詢讀取過多的資料。為了實現這一點,我們增加了列過濾器的執行,檢查 Kafka 的 Presto 查詢的過濾器約束中是否存在 _timestamp_partition_offset 。沒有這些過濾器的查詢將被拒絕。

Kafka 叢集的配額控制

Kafka 是 Uber 的一個重要的基礎設施,有很多實時用例,Kafka 叢集的退化可能會產生巨大的影響,所以我們要不惜一切代價避免它。作為一個分散式的查詢引擎,Presto 可能會啟動數百個消費者執行緒,從 Kafka 併發地獲取訊息。這種消費模式可能會耗盡網路資源,並導致潛在的 Kafka 叢集退化,這是我們想要防止的。

我們可以做的一件事是,在 Presto 叢集層面上限制消費率,但從 Presto 方面來說,這不是很容易實現。作為一種選擇,我們決定利用 Kafka 的代理配額來實現我們的目標。我們做了一個改變,允許我們從聯結器配置中指定一個 Kafka 消費者客戶端 ID。有了這個改變,我們就能為 Presto 中的所有工作者使用一個靜態的 Kafka 客戶端 ID,而且他們將受制於同一個配額池。

當然,這種方法是有代價的。多個 presto 查詢同時進行,將需要更長的時間來完成。這是我們不得不作出的犧牲。在現實中,由於我們擁有查詢過濾器,所以大部分的查詢都可以在一定的時間裡完成。

結論

在推出該特性後,我們看到在做臨時探索時,生產力有了很大的提高。在這之前,工程師們需要花費數十分鐘甚至更長的時間來查詢我們上面提到的例子的資料,但現在我們可以寫一個簡單的 SQL 查詢 SELECT * FROM kafka.cluster.order WHERE uuid= ‘0e43a4-5213-11ec’,結果可以在幾秒鐘內返回。

圖 6:假設的用例。檢查 Kafka 主題中是否缺少 UUID X 的順序

截至寫這篇博文時,越來越多的使用者開始採用 Presto on Kafka 進行臨時探索。每天有 6000 個查詢,我們也從 Presto 使用者那裡得到了很好的反饋,他們說 Presto on Kafka 讓他們的資料分析變得更加容易。

在未來,我們計劃將我們所做的改進貢獻給開源社群。你也可以檢視我們的 PrestoCon 演講,瞭解更多關於我們所做工作的細節。

作者介紹:

Yang Yang,Uber 流資料團隊軟體工程師,致力於在 Uber 構建高度可擴充套件、可靠的 Kafka 生態系統,包括 uReplicator、Kafka Consumer Proxy 等內部工具。

Yupeng Fu,Uber 資料團隊首席軟體工程師。領導幾個流團隊構建可擴充套件、可靠和效能良好的流解決方案。他還是 Apache Pinot 的提交者。

Hitarth Trivedi,Uber 資料分析團隊的高階軟體工程師。他主要負責 Presto 的工作。

原文連結:

https://eng.uber.com/presto-on-apache-kafka-at-uber-scale/