Elastic Stack最佳實踐系列:Beats->ES,一個更輕型的架構選擇
作者:李捷,Elastic首席雲解決方案架構師
ELK生態下,構建日誌分析系統的選擇
說起開源的日誌分析系統,ELK幾乎無人不曉,這個生態並非是Elastic特意而為,畢竟Elasticsearch的初心是分散式的搜尋引擎,被廣泛用作日誌系統純粹一個“美麗的意外”,這是社群使用者推動而成。而現在各大雲廠商推廣自己的日誌服務時,也往往將各種指標對標於ELK,可見其影響之廣。
但其實,流行的架構中並非只有ELKB,當我們使用ELKB搭建一套日誌系統時,除了Elasticsearch, Logstash, Kibana, beats之外,其實被廣泛使用的還有另一個工具 —— Kafka。在這當中,Kafka的作用是明顯的,作為一箇中間件,一個緩衝,它起到了提高吞吐,隔離峰值影響,快取日誌資料,快速落盤,同時通過producer/consumer模式,讓Logstash能夠橫向拓展的作用,還能夠用作資料的多路分發。因此,大多數時候,我們看到的實際架構,按資料流轉順序排列,應該是 BKLEK架構 。
但我們在使用Kafka的時候,也並非是沒有成本的,額外的一套分散式系統,更長的資料鏈路,都會是我們在做最後架構選型時的一些癢點,特別是隨著我們產生的資料越來越多,BKLEK架構會變得越來越大,越來越重,成本、效能、運維的簡易性都會成為我們評估日誌系統的重要指標。因此,我們的問題是:在Elastic Stack都已經進化到了8.1的當下,我們是否還需要延續一直以來的慣性思維,認為在我們仍然在任何情況下都是需要 BKLEK 的架構呢?
在我們開始正式探討之前,我們可以從現在普遍看到的新的架構圖可以一猜端倪:
在這個架構中,所有的Integration的輸出都是Elasticsearch,所有的資料處理都由ingest pipeline完成,資料的完整性和可靠性,由端點和elasticsearch之間的應答確認來保證。因此,我們本文探討的是:
-
我們資料採集端直接到ES的架構,是否可取
-
我們什麼時候可以使用這種架構
資料採集端直接到ES的架構分析
雖然Elastic原廠的Fleet與Elastic Agent已經處於GA(普遍可用)的階段,但因為其本身的一些限制,比如:
-
Integration Repository需要連線外網
-
需要單獨的Fleet Server (在最新版本已經與APM server合併為Integration Server)
-
Elastic Agent還沒有能完全覆蓋Beats所支援的資料來源
因此,現階段,我們討論資料採集端直接到ES的架構時,會主要集中在Beats->Elasticsearch這一架構。這一架構,相對於BKLEK架構來說,少了中間的Kafka,甚至我們可以忽略Logstash,因此,架構會相對精簡,帶來的好處包括:
-
相對更低的成本
-
更高的傳輸和資料處理效率
-
更一致的安全特性
-
更容易進行監控
當然,在帶來以上好處的同時,我們也會失去Kafka所帶來的各種好處。不過不用擔心,Kafka的特性只是構建一個穩健的日誌分析系統的充分條件,而非必要條件,在不少場景下,我們不一定是非Kafka不可。接下來,我們將討論幾個我們相對會比較關心的問題,以讓大家瞭解,我們是否可以選擇這種架構,什麼時候選擇這種架構,以及相應的最佳實踐。
Beats -> Elasticsearch鏈路的健康保障
對於不使用Kafka的場景,我們可能始終會有點覺得不踏實。因為對Beats -> Elasticsearch這個簡單架構不夠了解,以至於我們信心不足。接下來,我們討論一下,在這種簡單架構下,我們是如何面對各種可能出現的問題的。
大多數架構師會擔心的問題是流量波動的問題,如果突然出現日誌流量的洪峰,是不是會影響到後端的日誌系統。這個問題的答案是, 有影響,但影響有限 。
我們先明確一下日誌系統的主要作用:即日誌的集中管理,在統一的日誌平臺上提供所有日誌的準實時的關聯查詢與分析能力。這裡的核心是準實時和查詢能力。
在流量洪峰的情況下,受影響的是“準實時”的能力,因為受限於日誌系統的處理能力,如果日誌產生的速度,大於日誌系統處理的速度,則我們無法讀取到最新的資料。這個問題,即便是我們有了Kafka也是無法解決的。
而查詢的能力,幾乎不受影響。原因如下:
-
ES的讀寫是由不同的執行緒處理的,有各自獨立的執行緒池。過載的寫流量,會導致後到的寫請求,因為寫執行緒池已滿,而被拒絕。但不會導致查詢情況的拒接服務
-
ES從7.0開始,使用真實記憶體斷路器,能夠避免由於過多的請求,導致節點記憶體OOM。但在日誌場景,特定時間內日誌請求的數量是有明確上限的,該上限為Beats的數量,相比於高併發的讀場景,Beats幾乎不可能造成請求數量的過載
-
Beats與Elasticsearch之間有背壓檢測機制,當Beats檢測到ES有拒絕寫服務的情況出現時,會主動限流降低,避免對ES產生持續的壓力與影響。具體參見,附錄1
或許這種“輕描淡寫”的描述,讓大家覺得有點難以相信,甚至與平時的經驗有點不符。但實際上真正的問題來自於不合理的架構:
-
大多數企業會選擇維護一個“怪物”級別的ES叢集,即,將業務搜尋和日誌查詢兩個完全不同的場景混用與同一個ES叢集。因此,當某些日誌流量洪峰來臨,打滿了“write”執行緒池之後,會發現業務資料也無法寫入了。但仍然的,kafka也解決不了這個問題,因為,即便有kafka在前面擋住真正的日誌資料洪流,你也很難判斷kafka後面的logtash叢集,會不會導致ES叢集拒絕寫服務, 因為消費日誌永遠比索引日誌要快 ,logstash會持續消費kafka中的資料,直到感受到背壓,才會減緩輸出的頻率, 這點和Filebeat是一樣的 。
-
大多數在早期就已經在使用ES的企業,由於缺乏合理的架構設計,或者缺乏足夠的升級叢集的信心,叢集一直停留在5,6等老版本的,因此,整個架構缺乏足夠的健壯性和可恢復性,一些關鍵能力,比如背壓檢測,記憶體斷路器,Heap memory offload等功能是沒有的。導致叢集較為脆弱,在流量洪峰時,出現OOM等問題,也是有的。但仍然,kafka也解決不了這個問題。
因此,我們在做架構選型之前,確定要不要使用Beats -> Elasticsearch這一架構之前, 不妨先審視一下,我們當前是不是在業務和日誌混用ES叢集,我們的ES叢集版本是不是過低,運維缺乏升級的能力 ?這兩個反而是更致命的問題
回到流量洪峰問題的解決。即便加上kafka,也只是治標不治本,看起來架構更加健壯了一點,但實際上也並不能幫助我們提高後端的消費能力,我們整體面臨的情況和使用Beats -> Elasticsearch這一架構是沒有多大的區別的(Kafka最大的好處,就是把這些資料落盤到自己的磁碟上了,但這真的是我們在這個場景下需要解決的問題嗎?)。
流量洪峰的治本方案,應該是著眼於快速的故障排查,快速的找到出現故障的機器並解決故障。(出現業務暴增也不是不可能的情況, 但正常的業務日誌撐爆日誌系統的可能性非常小 ,因為首先會撐爆的是產生日誌的業務系統,因此,一旦業務暴增,我們需要考慮業務系統和日誌系統的同步擴容)
我們可以從Elastic Stack上快速的找到這樣問題的答案,而且方法很多,這裡舉例一二。
-
監控日誌叢集的bulk拒絕率
-
監控日誌錯誤率
-
使用機器學習判斷日誌異常
在發現異常之後,我們可以進一步定位,通過儀表板,檢視具體是哪個服務出現了異常:
另一個方面,在使用Kafka與Logstah的情況下,其實也是會有資料丟失的風險的。資料處理的鏈路越長,架構越複雜,帶來的脆弱性就越多。在這個架構中,Kafka通過冗餘提供了高可用。但Logstash確成為了風險的一環,雖然其和Kafka、ES之間都有應答確認的機制。但一旦Logstash在消費Kafka之後,正確投遞ES之前發生了崩潰,則資料將會丟失。並且我們很可能並不知道資料丟失了。
Beats->Elasticsearch的鏈路效率
在使用Kafka與Logstah的情況下,資料需要由Beats首先落盤到Kafka的分散式日誌檔案中,再由Logstash從Kafka中消費,之後,資料又要根據Logstash與ES之間的背壓,將資料落盤到Logstash的Disk Queue上。相比於直接由Beats到ES的架構,這裡有了兩次資料落盤,讀盤的操作;
另一方方面,在基線測試中,我們可以看到Elasticsearch的Ingest node相對Logstash的ETL處理的能力會來得更加高效。其原因在於,Logstash是一個Java-Ruby解析器上的應用,在JVM上執行Ruby解析器的執行效率,不如純java應用的ingest node。並且處理完的資料,相對於已經包含了叢集內部路由資訊的ingest node,在Logstsh作為ETL的架構下,還需要中間一個額外的ES節點負責bulk request的路由分發,需要更多的網路跳轉才可以最終寫到對應的data node。
更一致的安全特性
而在鏈路安全方面,ingest node處於ES叢集內,天然的使用統一的安全策略。而Beats->ES本身就是通過HTTPS,再配合證書或者密碼驗證的方式,可以保證鏈路的安全。
而如果在鏈路中再增加Kafka與Logstash,則整條鏈路的安全配置則會更加複雜,稍有配置不慎,還會存在更多的資料洩露的風險。
更容易進行監控
在監控方面,簡單的架構會讓我們更容易發現系統的異常。並且,我們有現成的手段和現成的告警規則
什麼是最佳的架構?
“最佳架構”這個詞一定是一個偽命題,因為不存在能解決所有問題的銀彈。但指導準則是有的, 即按需選擇合適的架構 。
我們的指導原則是:
-
大多數的日誌資料鏈路,儘量選擇簡單且健壯的架構。Beats->Elasticsearch的架構足夠簡單和健壯,這個應該是大多數情況下,以Elastic Stack為基礎的日誌系統應該選擇的資料攝入方式
-
對於缺乏可靠性的資料鏈路,以及需要高吞吐的能力、快速將資料從邊緣節點移出的,需選擇Kafka的幫助
-
整個架構中的資料鏈路可以是異構的,根據資料的屬性,選擇合適的鏈路
大家可以看到,我們在這裡建議的是大多數情況下,我們應該選擇Beats->Elasticsearch這一簡單的架構。其原因就像我們之前提到的:
-
Beats->Elasticsearch的鏈路足夠健壯
-
Beats->Elasticsearch的鏈路效率更高
-
Beats->Elasticsearch有更好的安全一致性
而當我們確實需要Kafka和Logstash的資料鏈路,可以參考我們的一些配置建議[1][2]。
接下來,我們再總結一下我們該如何判斷是否需要Kafka
我們在什麼情況下,需要Kafka
作為一個Queue,Kafka在日誌場景的作用,並不像在其他的業務應用場景中那麼重要。它最主要的作用就是能夠將資料快速落盤,並且以冗餘的方式儲存在分散式的日誌檔案中。同時,通過producer/consumer機制,讓後端的ETL工具可以併發的消費,並且提供再消費的容錯能力。
這裡的重點是資料“快速”“落盤”。那我們在什麼場景下非常需要這兩個能力呢?我們先說“落盤”
資料需要落盤
有一些資料,是轉瞬即逝型的資料,比如,我們通過各種探針,exporter採集資料後,在採集端沒有落盤,如果不及時儲存起來,就會丟失這部分資料。類似於物聯網資料,指標資料,apm資料都屬於這種型別的資料。
資料產生於那一個瞬間,並且在產生資料的地方,我們並沒有什麼機制去儲存這些資料,因此,在資料真正進入後端的儲存分析之前,我們往往需要一個Queue能夠幫我們把這些資料落盤。這時,Kafka幾乎就是我們在技術選型時的不二選擇。
資料需要高速轉移
再說“快速”,Kafka的高吞吐也是非常重要的一個能力,也是其得以讓人追捧的主要原因之一。可以設想這樣的一種場景,在雲原生環境下,我們非常動態的去建立各種計算資源,以應對業務流量的變化,雖然每個計算資源產生的資料落盤了,但由於它可能會被銷燬,因此,我們需要在它被銷燬之前,把其產生的日誌資料搬移到Pods之外,對於這種轉瞬即逝的資源所產生的資料,也需要Kafka的能力。
應對資料高峰
日誌資料或基於事件的資料很少具有一致的、可預測的大小或流速。考慮一個在週五晚上升級應用程式的場景,您部署的應用程式有一個嚴重的錯誤,即資訊被過度記錄,淹沒了您的日誌基礎設施。在其他多租戶用例(例如遊戲和電子商務行業)中,這種峰值或資料爆發也相當普遍。在這種情況下使用像 Kafka 這樣的訊息佇列也能來緩衝資料,減緩影響。
資料的多路分發
還有一種情況,是我們可能需要Kafka將資料分發到別的地方,我們可以定義多個消費端,分別去消費Kafka裡的資料,將資料分發到不同的資料處理系統。注意,不要圖省事,只建立Logstash作為消費端,然後嘗試用Logstash的多個Output去分發。因為Logstash並不能保證過個Output之間的資料同步與一致,我相信不僅是Logstash,其他的消費端也無法做此保證,因此需要多個消費端分別消費Kafka裡的資料。當然,這裡不是隻有kafka一個選擇,也可以資料入湖,再從湖中消費。
因此,總結一下,如果你的資料因為沒有快速落盤,而存在丟失的風險;如果你的資料吞吐很大,需要及時轉移,以及需要應對可能出現的資料洪峰,則你需要Kafka。
我們在什麼情況下,可以不需要Kafka
先總結一下我們在什麼時候必須要用Kafka:
我們稍微回想一下,在通常的日誌場景下我們是如何採集日誌的:
filebeat.autodiscover: providers: - type: kubernetes node: ${NODE_NAME} hints.enabled: true templates: - condition: contains: kubernetes.container.name: "opbeans-" config: - type: container paths: - "/var/log/containers/*-${data.kubernetes.container.id}.log" include_lines: ['^{'] tail_files: true
複製
我們需要指定特定的日誌目錄,也就意味著,我們是從磁碟上採集日誌資料的。而在雲環境上,如果我們採用了雲盤作為資料盤,這些日誌資料在產生端,即已經實現了冗餘的落盤儲存。因此,相對於物聯網資料,指標資料,apm資料這種ephemeral資料,已經落盤的日誌資料大多數時候,並不需要額外用一個Queue再去做一次資料的落盤與冗餘儲存。
但在之前的分析中,我們也說過,在某些ephemeral環境下,如果儲存資料的本地檔案快取,在我們將資料搬移出去之前就被銷燬的話,就會有資料丟失的風險。但幸運的是,大多數情況下,我們是能夠輕鬆判斷出這種風險的,大多數非動態擴縮容的應用,比如那些部署在虛擬機器上的應用,這種風險的可能性極小,並且可以控制。相對於快速將資料搬出到kafka, 增加本地檔案儲存的大小,反而是一個更簡單,更低成本的選擇 。
並且,當我們發現數據沒有正確的寫入日誌分析系統時,重新讓filebeat採集一次日誌,會比讓Logstash去調整offset,在Kafka上重新consume來得簡單得多,直觀得多,你不需要了解被採集的檔案和kafka topic、partition之間的對映關係,你不需要知道檔案與offset的對映關係,以及如何操作re-consume。而且通過Beats與Elasticsearch之間的端到端的應答確認機制和背壓探測機制,我們更容易保證資料的正確寫入。
因此,總結一下,如果你的資料是日誌,這些日誌被寫入到磁碟,並且不會在短期內被刪除。我們可以選擇不用Kafka
總結
為了防止大家漏掉關鍵的點,在最後再強調三遍:
-
日誌系統切勿與業務搜尋系統混用一個ES叢集
-
日誌系統切勿與業務搜尋系統混用一個ES叢集
-
日誌系統切勿與業務搜尋系統混用一個ES叢集
這是我看過的 最悲劇的 日誌分析系統的架構,沒有之一。
回到我們本文內容,一個常見的問題是,我們常常忽略架構設計的重要性,懶於去思考其中變數的影響,希望通過一刀切的簡單方案去做事情( 一上來就是BKLEK架構,不去思考這樣做是否值得,我們是否還有更多的選擇 )。但即便是我們全部使用了Beats, Kafka, Logstash, Elasticsearch, Kibana五個元件,也不是所有的資料鏈路都走一樣的路徑,我們可以針對接入系統的特性,優先順序,重要程度,要求的指標,有所取捨。
因此,雖然選擇Elastic Stack是一個一定不會錯的選擇,但如果你覺得這套系統複雜了,成本高了,那希望本文可能給你幫助,或許在只有日誌資料的這個階段,你的系統還是可以簡化的,有更多選擇的。
附錄
1, filebeat與Elasticsearch之間的背壓檢測協議
Filebeat 在將資料傳送到 Logstash 或 Elasticsearch 時使用背壓敏感協議來處理大量資料。如果 Logstash 忙於處理資料,它會讓 Filebeat 知道減慢其讀取速度。一旦擁塞得到解決,Filebeat 將恢復到原來的速度並繼續傳送資料。
關於細節,可以檢視以下配置:
# The number of seconds to wait before trying to reconnect to Elasticsearch # after a network error. After waiting backoff.init seconds, the Beat # tries to reconnect. If the attempt fails, the backoff timer is increased # exponentially up to backoff.max. After a successful connection, the backoff # timer is reset. The default is 1s. backoff.init: 1s # The maximum number of seconds to wait before attempting to connect to # Elasticsearch after a network error. The default is 60s. backoff.max: 60s
複製
與Libbeat的實現(https://github.com/elastic/beats/下):
/libbeat/outputs/elasticsearch/elasticsearch.go (elasticsearch output會使用backoff初始化client)
/libbeat/outputs/elasticsearch/client.go#L204 (client會判斷介面呼叫是否成功,如果失敗,會返回err)
/libbeat/outputs/backoff.go#L38(判斷介面的返回,如果有err,等待)
func (b *backoffClient) Publish(ctx context.Context, batch publisher.Batch) error { err := b.client.Publish(ctx, batch) if err != nil { b.client.Close() } backoff.WaitOnError(b.backoff, err) return err}
點選原文,獲取更多內容
免費體驗活動專區
Elasticsearch 新使用者可享 2核4G,0元 體驗 30 天! 順暢體驗雲上叢集
掃碼關注 「騰訊雲大資料」 ,瞭解騰訊雲 Elasticsearch 更多資訊 ~
騰訊雲大資料
長按二維碼
關注我們
- 打造次世代分析型資料庫(三):列存表最佳實踐
- Elastic Stack最佳實踐系列:Beats->ES,一個更輕型的架構選擇
- Flink資源排程模型
- 最佳實踐:MySQL CDC 同步資料到 ES
- 騰訊雲ES:一站式配置,TKE容器日誌採集與分析就是這麼簡單!
- 速度提升10倍,騰訊基於Iceberg的資料治理與優化實踐
- Flink 實踐教程:入門(12):元資料的使用
- Flink Metrics&REST API 介紹和原理解析
- Flink 最佳實踐:TDSQL Connector 的使用(上)
- Flink Watermark 機制及總結
- Flink 實踐教程-進階(10):自定義聚合函式(UDAF)
- Flink 實踐教程-進階(9):自定義表值函式(UDTF)
- 資料分析小結:使用流計算 Oceanus(Flink) SQL 作業進行資料型別轉換
- Flink 實踐教程-進階(8):自定義標量函式(UDF)
- 實時數倉:基於 Flink CDC 實現 Oracle 資料實時更新到 Kudu
- 基於流計算 Oceanus(Flink) CDC 做好資料整合場景
- Flink 實現 MySQL CDC 動態同步表結構
- 流計算 Oceanus | Flink JVM 記憶體超限的分析方法總結
- Flink 實踐教程-進階(6):CEP 複雜事件處理
- 騰訊雲 AI 視覺產品基於流計算 Oceanus(Flink)的計費資料去重嘗試