RocketMQ Streams在雲安全及 IoT 場景下的大規模最佳實踐
本文作者:袁小棟,Apache RocketMQ Committer,RocketMQ Streams Cofonder,阿里雲安全智慧計算引擎負責人
RocketMQ Streams簡介
RocketMQ Streams包含以下四個部分的定義:
(1)Lib包:輕量,啟動即可執行。只需要從git下載原始碼,編譯成jar包即可使用。
(2)SQL引擎:相容了Flink的SQL語法,也相容了UDF、UETF和UDAF,可以用Flink的語法或將Flink任務直接遷移並進行使用。
(3) 輕量的邊緣計算引擎:RocketMQ Streams和RocketMQ做了深度的整合。因為RocketMQ支援MQTT,所以RocketMQ Streams支援雲端計算的場景。此外,RocketMQ還支援訊息的儲存和轉存,因此基本能夠滿足邊緣計算的大部分場景。
(4)SDK:其元件可以獨立使用,也可以嵌入到業務裡使用。
現有的大資料框架比如Flink、Spark、Storm等都已經十分成熟,而我們在此基礎上依然研發RocketMQ Streams這樣一個開放框架的原因,主要基於以下考慮:
Flink是一個底座比較重的大資料元件,叢集開銷和框架開銷佔比較大,運維成本也比較高,因此適合做中臺,由專門的運維人員部署,形成一個大的中臺業務。
但是實際業務中必然存在中臺無法滿足的場景,比如某產品依賴大資料的能力,需要將產品輸出給使用者,在使用者的IDC裡部署。如果將大資料計算能力也攜帶一起部署,則會產生三個問題:第一,部署麻煩,因為Flink的部署成本比較高;第二,運維成本較高;第三,資源問題,Flink的任務需要提前預設資源,不同的使用者日誌量不一樣,預設資源會很大,Flink無法滿足需求。
RocketMQ Streams的定位是適合隨產品輸出的場景,不適合中臺。比如安全風控、邊緣計算、訊息佇列、流計算等,都適合RocketMQ Streams。因此,RocketMQ Streams和Flink的能力可以互為補充。
RocketMQ Streams具有以下 特點:
(1)輕量:RocketMQ Streams是輕量的,1core 1g即可部署;依賴較輕,除了訊息佇列沒有其他依賴;釋出簡單,可通過SQL熱更新的方式釋出。
(2)高擴充套件:RocketMQ Streams可以擴充套件Source、Sink、UDF等。
(3)高效能:對過濾做了很多優化,因此在高過濾場景下,效能可提升3-5倍;RocketMQ Streams也實現了一些任務的輕量化,在SQL同源任務歸併的場景下,資源可節省50%;同時,它基於流計算,可以實現毫秒延遲。
(4)多部署模式:jar包即服務;可以基於C/S模式通過提交SQL熱釋出,也可以通過SDK整合到業務裡。
(5)超大維表支援:支援超大的維表,RocketMQ Streams自研的快取記憶體佔比僅為Java Map的16.7%;同機器上的多個任務可以共享,節省資源;維表支援千萬級別,不需要指定索引,可根據join條件自動推斷索引,做到類似map的O(1)匹配。
(6)豐富功能:支援精確計算一次以及靈活的視窗,比如滾動視窗、滑動視窗、分發視窗,也支援雙流Join、維表Join、轉化、過濾、大資料開發場景等功能。
上圖為RocketMQ Streams支援的一些常規大資料運算元,與其他大資料框架基本相似,可以進行擴充套件。
RocketMQ Streams架構及實現原理
無論是Spark還是Flink,一個成功的大資料元件往往都需要由一個很大的團隊經歷幾年的時間才能打磨完成。實現RocketMQ Streams主要會面臨以下挑戰:
-
大資料計算功能多且架構複雜,是否能夠實現?
-
與Flink等大資料框架的核心差異是什麼,是否會做成Flink的裁剪版?
實現一個輕量級、高效能的大資料計算框架,必須要有和Flink不一樣的思路。
從業務架構分析,一個常規RocketMQ業務的架構基本相同,包括輸入、無狀態的計算、輸出結果等。這種常規的RocketMQ業務架構有兩個優點:首先,比較輕量,負載均衡、容錯都由RocketMQ完成,不需要另外做;其次,部署簡單,如果RocketMQ阻塞,直接擴容業務、增加消費能力即可。
但是這種常規架構很難實現統計、join以及視窗計算等複雜計算。要實現此類複雜計算,必須實現shuffle,而要實現shuffle則必須實現不同運算元之間的通訊。運算元之間的通訊需要有全域性的排程和全域性的任務管理,而全域性的排程和全域性的任務管理又需要資源的管理和對任務資源的分配。上述的需求會導致架構變得複雜,使短時間內的實現存在穩定性和複雜性等方面的困難。
逆向思考可以看到複雜性的根源是shuffle,解決思路是藉助訊息佇列的中轉實現shuffle。以shuffle作為分割,將複雜的拓撲變為簡單的拓撲。只需重點突破整個架構的搭建、視窗計算的補充、效能的提升這三個難關,即可實現一個既輕量又有高效能的大資料計算功能框架。
大資料架構包括Spark、Flink等,常規設計思路是計算和叢集的管理一體化。叢集的管理要解決高可用問題、task分配和排程問題、job和task容錯問題,因此大資料架構的實現存在巨大挑戰:
(1)叢集的管理需求使架構更重,因為高可用意味著必須引入元件。而且在資源消耗方面,一個叢集模式至少需要三個階段,而叢集的開銷可能需要 10%的記憶體。一旦管理結構叢集化,任務的分配、資源的設定都需要預設。
(2)類似視窗計算的狀態儲存要求比較高。大資料元件的部署對記憶體、大磁碟有要求,而這種要求無疑會增加架構的複雜性。
(3)通過訊息佇列中轉來實現shuffle的方案可能會加大RocketMQ的壓力,增加部署的複雜性。
以上三點是基於大資料架構來思考實現一個輕量化架構的挑戰。換一種思路,聚焦於核心業務,用業務架構的思路去思考。
常規的大資料業務都會有一個訊息佇列,無論訊息佇列是不是RocketMQ。而大多數訊息佇列都會實現分片的負載均衡和容錯的管理,計算和管理的分離可以借用MQ的叢集能力,儲存可以採用RocketMQ的壓縮儲存來實現。
MQ的最小排程單元是分片,它可以對分片進行負載均衡、容錯、排程等操作。只要將任務和分片進行對映,借用MQ的分片管理,即可實現task管理,無需額外實現管理能力。複用RocketMQ的壓縮儲存,也不需要額外實現儲存。此外,用MQ做shuffle會加大MQ的壓力。MQ的訊息量增加,使得CPU的使用率也會增加,整體資源使用率也會增加,因此要採用策略來降低資源消耗。
視窗計算的實時性不高,比如10分鐘的視窗只需要每10分鐘取出結果。因此可以採用微批的方式,比如1000條計算一次,將1000條基於shuffle key進行分組,分組以後多條資料合併成一條。RocketMQ基於QPS的壓力,資料量變大,QPS下降,CPU的壓力反而不大,然後進行壓縮,將資料量降低,最終可以減少shuffle開銷。
最終結果如下:
(1)採用shared-nothing架構,沒有任何叢集和框架的開銷。
(2)輕依賴:沒有增加任何額外的依賴,雖然有MQ依賴,但MQ是業務必需的,可以直接複用業務的MQ。
(3)計算機不需要任何依賴,部署輕量,1core 1g即可部署。
(4)輕消耗:shuffle的中轉實現了微批、壓縮和多條歸併的策略,7000的QPS只需要0.12的CPU和300兆的記憶體,資源消耗非常低。
(5)輕擴容:擴容非常簡單,因為採用shared-nothing架構,類似於web伺服器,訊息堆積時增加例項即可擴容。
視窗的精確計算一次(Exactly-once)是一個難點。流是無邊界的,要進行統計計算,則必須劃分視窗。如上圖,假設D的位置是一個count,計算10分鐘一共接收多少條資料,有兩種實現方式:
(1)每一條資料過來都進行快取,每十分鐘將所有資料進行統計取得結果。這種方式對儲存的壓力比較大,也不高效。
(2)比較優雅的方式:每來一條資料都只存中間結果,比如第一條資料,中間結果是1,第2條是2,第3條是3。但這也存在一個問題,如果某個節點出問題或某個任務出問題,中間結果會變為不可控的狀態。假如3宕機, 2和1可能會繼續完成計算,也可能出現問題不進行計算。因此在B被拉起的時候回放哪條訊息是不可知的,這種情況無法成為精確計算一次。
Flink是業界精確計算最優雅的方案。它的思路很簡單,在某個時間點將整個叢集的狀態進行一次映象,每隔一段時間映象一次。出現問題時,將所有運算元的狀態恢復一遍再計算。
整體的計算流程如下:job manager定期發checkpoint 給它的資料來源。發生兩個checkpoint時,checkpoint會隨著資料在運算元裡走。每個運算元接收到 checkpoint時,需要備份自己的狀態,比如視窗運算元接收到一個checkpoint,還無法進行狀態備份,需要等到另一個checkpoint也到了之後才能做狀態的備份,該設計稱為對齊等待。等待的過程取決於兩個checkpoint之間流速的差異。等兩個checkpoint都到了以後,再同步地進行狀態儲存,將本地的狀態儲存到遠端的狀態。
以上過程開銷較大,開啟checkpoint使任務效能會降低約30%。任務越複雜,系統的開銷越大。此外,恢復時長也需要考慮,當運算元和任務出問題重啟時,必須從遠端讀取完整的狀態,所有運算元恢復以後才能開始計算。恢復過程可能需要幾秒到幾分鐘,時間較長。
Flink雖然是一個優雅的方案,但依然存在很多重操作,這是因為Flink方案從整個拓撲考慮,因此思考點較為複雜。
而簡化的思路為,將複雜的拓撲通過shuffle拆分成很多簡單的子Job。每個子Job的邏輯也很簡單,包括三點:第一,從 source 接收資料;第二,進行運算元的計算;第三,將資料寫到Sink。有些子Job運算元是有狀態,而有些是無狀態,無狀態的算例只需要保證至少消費一次的邏輯即可。
以上思路可能帶來的後果是輸出的Sink裡存在重複的資料。如果該Sink是最終的結果,則由Sink自己決定能不能去重;如果是shuffle的佇列,則會在後面有狀態的運算元裡完成精確計算一次的邏輯。
而有狀態的消費資料裡存在重複資料,只需進行去重。去重的邏輯如下:在狀態儲存時,除了儲存中間的計算結果,還需將元資料進行儲存。元資料指現有的中間結果計算用到的分片以及分片的最大offset對應的資料。資料來的時候,如果該分片的offset比已經計算的小,則將它丟棄,從而通過去重完成了精確計算一次的邏輯。遠端儲存只需儲存一份,無需定時地儲存一份完整資料。
另外,Checkpoint 也不會阻塞流程,因為一個Checkpoint的傳送只是負責獲取運算元當前已經儲存到遠端的元資料,而運算元的儲存過程完全可以非同步和微批地進行儲存。Checkpoint到達運算元後,只需要告訴其結果,不會產生任何阻塞。
訊息源儲存offset是基於所有狀態運算元裡面的分片元素,取每個相同分片裡的最小值儲存,則崩潰後恢復時一定能保證至少消費一次。此處可以有重複的資料,可通過去重保證精確計算一次。
SQL 優化器是阿里云云盾的需求。因為需要將公共雲的規則遷到專有云,而專有云的資源有限,只能用原先4%的記憶體和不到30%的CPU去執行原先1.2倍的規則。而專有云的另一特點是擴容成本較高,可能按月擴容,很難擴機器。
綜上,公共雲遷移到專有用,存在兩個巨大挑戰:
第一,要用有限的資源承載更多的規則;
第二,安全場景需要不停地增加規則來保證安全性,規則要增加,但需要保證資源不隨規則增加。
因此,進行優化的時候也需要考慮安全的特點。而大部分大資料計算都具有此特點,所以這是一個通用的方案。
基於安全的特點來分析,安全的特點有三:
(1)正則或過濾類的表示式比較多,可以使用更快的引擎來承載。
(2)一些表示式的欄位重複率較高,比如命令列,無論有多少個引數,運維工作的命令都很相似。
(3)資料來源比較少,但是每個資料來源的規則比較多。
基於以上三個特點,我們思考整體的解法如下:
(1)任務歸併,減少任務開銷。每起一個任務都會有一個執行緒池,佔有一定的記憶體開銷。而這些任務來自同一個資料來源,因此可以將同資料來源的公共部分抽取出來,比如對消費資料來源實現部分欄位的標準化,而對應的規則可以封裝成大任務。按照以上邏輯,10個任務放在一起,只需要5core 5g,所以資源消耗更少,執行緒更少,記憶體使用也更少。該解法稱為同源歸併邏輯。
而資源變少帶來的問題是會將這一組任務的容錯放在一起,一個任務有問題也會影響到其他任務。因此,使用者可以按需選擇任務型別,分為資源敏感型和錯誤敏感型。
另外,同源歸併不會導致規則放在一起變複雜而使開發測試變得更困難。因為對於開發測試,每一個規則依然是獨立的,稱為動態同源歸併。同源歸併是下發一個策略後自動歸併,將策略撤銷則會恢復為獨立執行,這是可以動態調配的策略。
(2)表示式指紋。一個規則裡有很多過濾條件,解決思路是將所有任務裡的過濾條件都在編譯期間統一收集。
一個過濾的表示式基本是三元組,包括變數、操作、值。比如正則,變數是command line,操作是正則,值是正則的串。按變數進行分組,比如一個command line有10個表示式,另一個command line有20個表示式,放在一起是30個表示式,將它分為一個表示式分組。
表示式分組的目的是快取。一條訊息到達時,處理流程為:先檢查快取,所有表示式分組逐個檢查。如果快取裡存在,則直接應用結果;否則,將這一組表示式全部計算完成然後生成結果。生成的結果是一個bit set,比如100個表示式會有一個100 個bit位,代表該表示式是否觸發。
將command line 和bit set放在快取裡,再來一條相同command line的時候,所有表示式都不需要計算,可直接獲得結果。然後檢視上下文是否存在該結果,如果存在則直接使用,否則再進行計算。按照該流程,如果command line的重複率較高,比如有80%的重複率,只有20%的command line會被真正地計算,其他的只需O(1)時間來獲取結果。
在欄位重複率較高的場景中,此策略需要的計算資源大幅下降,因為它能將複雜的正則計算轉化成O(1)的比對計算,資源不會隨著規則增加而增加。
此外,使用正則的場景中欄位重複率比較高,這是一個通用的特性。但即使重複率比較低,因為整體資源開銷只增加了一個O(1)的比對,不會增加額外的開銷。
(3)Hyperscan正則加速。將所有任務的表示式放在一起,對錶達式進行預編譯,尤其是正則類。比如用Hyperscan可以對1000個表示式進行預編譯,每個表示式單個執行與預編譯在一起執行,兩者之間約有10倍差距。如果欄位重複率不高,可以用Hyperscan加速正則的執行。
流式資料量非常大,存在快取是否能撐住以及用什麼快取來承載的問題。
首先,快取是否能撐住可以通過設定邊界解決,比如設定300兆的快取,超過則丟掉一些資料。
其次,可以採用壓縮快取來承載,用很少的資源能夠承載大的資料量。Java的map之所以佔用資源較大,是因為存在很多同步、對齊、指標開銷。所以快取只能基於原生的bit陣列實現,可以降低資源開銷。一個key可能比較長,比如一個command line可能需要幾十個或幾百個字元。但使用md5儲存可以壓縮到十幾個位元組,而且md5的衝突率、碰撞率非常低。
因此,使用md5儲存key,無論key多大,都能夠壓縮到16個位元組。value則用原始的位元組儲存,沒有任何頭部開銷。
測試顯示,50Byte的key,20Byte的value,1000萬的資料用壓縮儲存可以達到原始資料的一半,達到Java map的17%,壓縮效果顯著。
最終效果:1.2倍的安全規則,採用32core 40g支撐12000QPS,沒有增加任何物理機,也不需要擴容,可滿足需求。
RocketMQ Streams在阿里雲安全的應用
RocketMQ Streams的第一個應用是專有云的安全。
專有云和公有云不一樣,專有云的資源有限,將整個雲部署在使用者的IDC機房,在使用者機房裡擴資源需要向用戶申請和採購,不像公有云可以隨時擴充資源。因此專有用的彈性不如公共雲。
大資料計算是一個產品,不是使用者購買以後才會輸出。使用者買了安全,但不一定買大資料計算,這種情況造成使用者買了安全卻沒有大資料計算。如果因為安全而幫使用者買大資料計算,大資料計算的成本可能比安全更高,導致了大資料落地很難,入侵檢測的能力較差,風險較高。因此,我們最終的策略是採用RocketMQ Streams方案為使用者實現部署。
RocketMQ Streams方案需要32core 40g 的記憶體,即可承載12000QPS,基本能滿足使用者的需求;對比記憶體資源,只使用了原先公共雲記憶體資源的4%,CPU的30%不到;從能力覆蓋層面看,覆蓋了全部安全規則,也相容了Flink的語法規則,只需開發一次;從安全效果層面看,因為覆蓋了所有的安全規則,實現了安全效果100%保障;從產品覆蓋層面看,有多個產品在應用RocketMQ Streams。
RocketMQ Streams的第二個應用場景是雲安全中心的混合雲。Gartner預測將來80%的企業都會採用混合雲和多雲的部署模式,但是混合雲和多雲需要統一的安全運營管理。
多雲或邊緣計算存在的問題主要在於,比如在阿里購買了一些ECS,在騰訊、華為購買了一些ECS,在國外也購買了一些ECS,如果要將ECS的資料匯聚在一起,日誌量較大,上傳成本也高。國外的ECS資料回到國內會受到一些限制,除此之外,如果頻寬不夠,上傳日誌也可能影響正常業務。
我們提供的解決策略很簡單,將RocketMQ Streams和RocketMQ整合,支援訊息佇列儲存,也支援ETL和流計算,部署到邊緣端。比如阿里作為一個統一管控區,在騰訊購買兩臺ECS,將RocketMQ Streams和邊緣計算引擎部署上,可稱為邊緣計算。在邊緣計算告警,丟掉原始日誌,只回傳告警。原始日誌在本地還可轉存給使用者,如果使用者需要,也能支援熱更新。只需一個zip包,一鍵SH即可安裝。
RocketMQ Streams的第三個應用場景是IOT。IOT是典型的邊緣計算場景,挑戰在於需要用4core 8g來混部其業務和RocketMQ Streams任務,幾百個任務的壓力非常大。而且它需要採用MQTT這樣標準的IOT協議輸入,也需要自定義規則引擎的能力進行統計計算、特徵計算、Join計算和維表計算。
RocketMQ Streams的MQTT直接複用了RocketMQ。維表方面,可以實現維表共享,支援千萬級維表,可在同一臺機器共享多個例項。SQL則可以支援歸併和熱更新。
最終在IOT場景中完成了RocketMQ Streams的能力建設和使用者落地。
未來規劃
RocketMQ Streams的未來規劃包括四個方向:
(1)目前引擎只完成了核心能力建設,配套能力尚有所欠缺,未來會進行比如資源排程、監控、控制檯、穩定性等方面的完善,使開源使用者能夠更好地落地。
(2)繼續打磨邊緣計算的最佳實踐。
(3)CEP、流批一體和機器學習能力的推進。
(4)豐富訊息接入的能力,比如增加檔案、syslog、http的接入;繼續增強ETL的能力,打造訊息閉環;支援 ES 作為資料來源,基於搜尋的結果。
相關開源地址:
RocketMQ-Streams:
http://github.com/apache/rocketmq-streams
RocketMQ-Streams-SQL:
http://github.com/alibaba/rsqldb
加入 Apache RocketMQ 社群
十年鑄劍,Apache RocketMQ 的成長離不開全球接近 500 位開發者的積極參與貢獻,相信在下個版本你就是 Apache RocketMQ 的貢獻者,在社群不僅可以結識社群大牛,提升技術水平,也可以提升個人影響力,促進自身成長。
社群 5.0 版本正在進行著如火如荼的開發,另外還有接近 30 個 SIG(興趣小組)等你加入,歡迎立志打造世界級分散式系統的同學加入社群,新增社群開發者微信:rocketmq666 即可進群,參與貢獻,打造下一代訊息、事件、流融合處理平臺。
微信掃碼新增小火箭進群
另外還可以加入釘釘群與 RocketMQ 愛好者一起廣泛討論:
釘釘掃碼加群
關注「Apache RocketMQ」公眾號,獲取更多技術乾貨
- 從 JDK 9 到 19,我們幫您提煉了和雲原生場景有關的能力列表(上)
- 統一觀測丨如何使用Prometheus 實現效能壓測指標可觀測
- CNStack 2.0:雲原生的技術中臺
- 全景剖析阿里雲容器網路資料鏈路(五):Terway ENI-Trunking
- 全景剖析阿里雲容器網路資料鏈路(四):Terway IPVLAN EBPF
- 全景剖析阿里雲容器網路資料鏈路(三):Terway ENIIP
- 談談我工作中的23個設計模式
- 雲邊協同下的統一應用管理:基於 OpenYurt 和 KubeVela 的解決方案
- OpenKruise v1.3:新增自定義 Pod Probe 探針能力與大規模叢集效能顯著提升
- Koordinator v0.7: 為任務排程領域注入新活力
- 傳統大型國企雲原生轉型,如何解決彈性、運維和團隊協同等問題
- Dubbo 3 易用性升級之 Dubbo 官網大改版
- 阿里雲容器服務 ACK 產品技術動態(202208)
- RocketMQ Streams在雲安全及 IoT 場景下的大規模最佳實踐
- RocketMQ 5.0:無狀態代理模式的探索與實踐
- Apache RocketMQ 5.0 在Stream場景的儲存增強
- 快手 RocketMQ 高效能實踐
- RocketMQ DLedger架構在小米的大規模實踐
- 定時任務報警通知解決方案詳解
- Dubbo Mesh 總體技術架構方案