企業級日誌系統架構——ELK(Elasticsearch、Filebeat、Kafka、Logstash、Kibana)

語言: CN / TW / HK

一、概述

ELK是三個開源軟體的縮寫,分別表示:Elasticsearch , Logstash, Kibana , 它們都是開源軟體。新增了一個FileBeat,它是一個輕量級的日誌收集處理工具(Agent),Filebeat佔用資源少,適合於在各個伺服器上搜集日誌後傳輸給Logstash,官方也推薦此工具。

大致流程圖如下:

1)Elasticsearch 儲存

Elasticsearch是個開源分散式搜尋引擎,提供蒐集、分析、儲存資料三大功能。它的特點有:分散式,零配置,自動發現,索引自動分片,索引副本機制,restful風格介面,多資料來源,自動搜尋負載等。

2)Filebeat 日誌資料採集

filebeat是Beats中的一員,Beats在是一個輕量級日誌採集器,其實Beats家族有6個成員,早期的ELK架構中使用Logstash收集、解析日誌,但是Logstash對記憶體、cpu、io等資源消耗比較高。相比Logstash,Beats所佔系統的CPU和記憶體幾乎可以忽略不計。

Filebeat是用於轉發和集中日誌資料的輕量級傳送工具。Filebeat監視您指定的日誌檔案或位置,收集日誌事件。

目前Beats包含六種工具: - Packetbeat:網路資料(收集網路流量資料) - Metricbeat:指標(收集系統、程序和檔案系統級別的CPU和記憶體使用情況等資料) - Filebeat:日誌檔案(收集檔案資料) - Winlogbeat:windows事件日誌(收集Windows事件日誌資料) - Auditbeat:審計資料(收集審計日誌) - Heartbeat:執行時間監控(收集系統執行時的資料)

工作的流程圖如下:

優點 - Filebeat 只是一個二進位制檔案沒有任何依賴。它佔用資源極少。

缺點 - Filebeat 的應用範圍十分有限,因此在某些場景下咱們會碰到問題。在 5.x 版本中,它還具有過濾的能力。

3)Kafka

kafka能幫助我們削峰。ELK可以使用redis作為訊息佇列,但redis作為訊息佇列不是強項而且redis叢集不如專業的訊息釋出系統kafka。kafka安裝可以參考我之前的文章:Kafka原理介紹+安裝+基本操作(kafka on k8s)

4)Logstash 過濾

Logstash 主要是用來日誌的蒐集、分析、過濾日誌的工具,支援大量的資料獲取方式。一般工作方式為c/s架構,client端安裝在需要收集日誌的主機上,server端負責將收到的各節點日誌進行過濾、修改等操作在一併發往elasticsearch上去。

優點 - 可伸縮性

節拍應該在一組Logstash節點之間進行負載平衡。 建議至少使用兩個Logstash節點以實現高可用性。 每個Logstash節點只部署一個Beats輸入是很常見的,但每個Logstash節點也可以部署多個Beats輸入,以便為不同的資料來源公開獨立的端點。 - 彈性 Logstash持久佇列提供跨節點故障的保護。對於Logstash中的磁碟級彈性,確保磁碟冗餘非常重要。對於內部部署,建議您配置RAID。在雲或容器化環境中執行時,建議您使用具有反映資料SLA的複製策略的永久磁碟。 - 可過濾 對事件欄位執行常規轉換。您可以重新命名,刪除,替換和修改事件中的欄位。

缺點 - Logstash耗資源較大,執行佔用CPU和記憶體高。另外沒有訊息佇列快取,存在資料丟失隱患。

5)Kibana 展示

Kibana 也是一個開源和免費的工具,Kibana可以為 Logstash 和 ElasticSearch 提供的日誌分析友好的 Web 介面,可以幫助彙總、分析和搜尋重要資料日誌。

filebeat和logstash的關係

因為logstash是jvm跑的,資源消耗比較大,所以後來作者又用golang寫了一個功能較少但是資源消耗也小的輕量級的logstash-forwarder。不過作者只是一個人,加入http://elastic.co公司以後,因為es公司本身還收購了另一個開源專案packetbeat,而這個專案專門就是用golang的,有整個團隊,所以es公司乾脆把logstash-forwarder的開發工作也合併到同一個golang團隊來搞,於是新的專案就叫filebeat了。

二、helm3安裝ELK

詳細流程圖如下:

1)準備條件

1、新增helm倉庫

bash $ helm repo add elastic http://helm.elastic.co

2)helm3安裝elasticsearch

1、自定義values

主要是設定storage Class 持久化和資源限制,本人電腦資源有限,所以這裡就把資源調小了很多,小夥伴們可以根據自己配置自定義哈。

```bash $ # 叢集名稱 clusterName: "elasticsearch"

ElasticSearch 6.8+ 預設安裝了 x-pack 外掛,部分功能免費,這裡選禁用

esConfig: elasticsearch.yml: | network.host: 0.0.0.0 cluster.name: "elasticsearch" xpack.security.enabled: false resources: requests: memory: 1Gi volumeClaimTemplate: storageClassName: "bigdata-nfs-storage" accessModes: [ "ReadWriteOnce" ] resources: requests: storage: 3Gi service: type: NodePort port: 9000 nodePort: 31311 ```

禁用Kibana安全提示(Elasticsearch built-in security features are not enabled) xpack.security.enabled: false

2、開始安裝Elasitcsearch

安裝過程比較慢,因為官方映象下載比較慢

bash $ helm install es elastic/elasticsearch -f my-values.yaml --namespace bigdata

bash W1207 23:10:57.980283 21465 warnings.go:70] policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+, unavailable in v1.25+; use policy/v1 PodDisruptionBudget W1207 23:10:58.015416 21465 warnings.go:70] policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+, unavailable in v1.25+; use policy/v1 PodDisruptionBudget NAME: es LAST DEPLOYED: Tue Dec 7 23:10:57 2021 NAMESPACE: bigdata STATUS: deployed REVISION: 1 NOTES: 1. Watch all cluster members come up. $ kubectl get pods --namespace=bigdata -l app=elasticsearch-master -w2. Test cluster health using Helm test. $ helm --namespace=bigdata test es 檢視,需要所有pod都正常執行才正常,下載映象有點慢,需要稍等一段時間再檢視

bash $ kubectl get pod -n bigdata -l app=elasticsearch-master $ kubectl get pvc -n bigdata $ watch kubectl get pod -n bigdata -l app=elasticsearch-master

3、驗證

bash $ helm --namespace=bigdata test es $ kubectl get pod,svc -n bigdata -l app=elasticsearch-master -o wide $ curl 192.168.0.113:31311/_cat/health $ curl 192.168.0.113:31311/_cat/nodes

4、清理

bash $ helm uninstall es -n bigdata $ kubectl delete pvc elasticsearch-master-elasticsearch-master-0 -n bigdata $ kubectl delete pvc elasticsearch-master-elasticsearch-master-1 -n bigdata $ kubectl delete pvc elasticsearch-master-elasticsearch-master-2 -n bigdata

3)helm3安裝Kibana

1、自定義values

域名(elasticsearch-master-headless.bigdata.svc.cluster.local)的由來不清楚的,可以參考我之前的文章:Kubernetes(k8s)DNS(CoreDNS)介紹 ```bash $ cat < my-values.yaml

此處修改了kibana的配置檔案,預設位置/usr/share/kibana/kibana.yaml

kibanaConfig: kibana.yml: | server.port: 5601 server.host: "0.0.0.0" elasticsearch.hosts: [ "elasticsearch-master-headless.bigdata.svc.cluster.local:9200" ] resources: requests: cpu: "1000m" memory: "256Mi" limits: cpu: "1000m" memory: "1Gi" service: #type: ClusterIP type: NodePort loadBalancerIP: "" port: 5601 nodePort: "30026" EOF ```

2、開始安裝Kibana

bash $ helm install kibana elastic/kibana -f my-values.yaml --namespace bigdata

3、驗證

bash $ kubectl get pod,svc -n bigdata -l app=kibana 瀏覽器訪問:http://192.168.0.113:30026/

4、清理

bash $ helm uninstall kibana -n bigdata

4)helm3安裝Filebeat

filebeat預設收集宿主機上docker的日誌路徑:/var/lib/docker/containers。如果我們修改了docker的安裝路徑要怎麼收集呢,很簡單修改chart裡的DaemonSet檔案裡邊的hostPath引數:

bash - name: varlibdockercontainers hostPath: path: /var/lib/docker/containers #改為docker安裝路徑 當然也可以自定義values修改,這裡推薦自定義values方式修改採集日誌路徑

1、自定義values

預設是將資料儲存到ES,這裡做修改資料儲存到Kafka ```bash $ cat < my-values.yaml daemonset: filebeatConfig: filebeat.yml: | filebeat.inputs: - type: container paths: - /var/log/containers/*.log

  output.elasticsearch:
    enabled: false
    host: '${NODE_NAME}'
    hosts: '${ELASTICSEARCH_HOSTS:elasticsearch-master:9200}'
  output.kafka:
   enabled: true
   hosts: ["kafka-headless.bigdata.svc.cluster.local:9092"]
   topic: test

EOF ```

2、開始安裝Filefeat

bash $ helm install filebeat elastic/filebeat -f my-values.yaml --namespace bigdata $ kubectl get pods --namespace=bigdata -l app=filebeat-filebeat -w

3、驗證

```bash

先登入kafka客戶端

$ kubectl exec --tty -i kafka-client --namespace bigdata -- bash

再消費資料

$ kafka-console-consumer.sh --bootstrap-server kafka.bigdata.svc.cluster.local:9092 --topic test ```

看到已經可以消費資料了,說明資料已經儲存到kafka了。

檢視kafka資料積壓情況

bash $ kubectl exec --tty -i kafka-client --namespace bigdata -- bash $ kafka-consumer-groups.sh --bootstrap-server kafka-0.kafka-headless.bigdata.svc.cluster.local:9092 --describe --group mygroup 發現大量資料都是處於積壓的狀態

接下來就是部署logstash去消費kafka資料,最後儲存到ES。

4、清理

bash $ helm uninstall filebeat -n bigdata

5)helm3安裝Logstash

1、自定義values

【注意】記得把ES和kafka的地址換成自己環境的。 ```bash $ cat < my-values.yaml logstashConfig: logstash.yml: | xpack.monitoring.enabled: false

logstashPipeline: logstash.yml: | input { kafka { bootstrap_servers => "kafka-headless.bigdata.svc.cluster.local:9092" topics => ["test"] group_id => "mygroup" #如果使用元資料就不能使用下面的byte位元組序列化,否則會報錯 #key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer" #value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer" consumer_threads => 1 #預設為false,只有為true的時候才會獲取到元資料 decorate_events => true auto_offset_reset => "earliest" } } filter { mutate { #從kafka的key中獲取資料並按照逗號切割 split => ["[@metadata][kafka][key]", ","] add_field => { #將切割後的第一位資料放入自定義的“index”欄位中 "index" => "%{[@metadata][kafka][key][0]}" } } } output { elasticsearch { pool_max => 1000 pool_max_per_route => 200 hosts => ["elasticsearch-master-headless.bigdata.svc.cluster.local:9200"] index => "test-%{+YYYY.MM.dd}" } }

資源限制

resources: requests: cpu: "100m" memory: "256Mi" limits: cpu: "1000m" memory: "1Gi"

volumeClaimTemplate: accessModes: ["ReadWriteOnce"] resources: requests: storage: 3Gi EOF ```


output plugin 輸出外掛,將事件傳送到特定目標:

stdout { codec => rubydebug } // 開啟debug模式,可在控制檯輸出 - stdout :標準輸出。將事件輸出到螢幕上

bash output{ stdout{ codec => "rubydebug" } } - file :將事件寫入檔案

bash output{ file { path => "/data/logstash/%{host}/{application} codec => line { format => "%{message}"} } } } - kafka :將事件傳送到kafka

bash output{ kafka{ bootstrap_servers => "localhost:9092" topic_id => "test_topic" #必需的設定。生成訊息的主題 } } - elasticseach :在es中儲存日誌

bash output{ elasticsearch { #user => elastic #password => changeme hosts => "localhost:9200" index => "nginx-access-log-%{+YYYY.MM.dd}" } }


2、開始安裝Logstash

bash $ helm install logstash elastic/logstash -f my-values.yaml --namespace bigdata

bash $ kubectl get pods --namespace=bigdata -l app=logstash-logstash

3、驗證

1、登入kibana檢視索引是否建立

2、檢視logs

bash $ kubectl logs -f logstash-logstash-0 -n bigdata >logs $ tail -100 logs

3、檢視kafka消費情況 bash $ kubectl exec --tty -i kafka-client --namespace bigdata -- bash $ kafka-consumer-groups.sh --bootstrap-server kafka-0.kafka-headless.bigdata.svc.cluster.local:9092 --describe --group mygroup

4、通過kibana檢視索引資料(Kibana版本:7.15.0) 建立索引模式

Management-》Stack Management-》Kibana-》Index patterns

通過上面建立的索引模式查詢資料(Discover)

4、清理

bash $ helm uninstall logstash -n bigdata

三、ELK相關的備份元件和備份方式

Elasticsearch備份兩種方式: - 將資料匯出成文字檔案,比如通過 elasticdumpesm 等工具將儲存在 Elasticsearch 中的資料匯出到檔案中。適用資料量小的場景。 - 備份 elasticsearch data 目錄中檔案的形式來做快照,藉助 Elasticsearch 中 snapshot 介面實現的功能。適用大資料量的場景

1)Elasticsearch的snapshot快照備份

  • 優點:通過snapshot拍攝快照,然後定義快照備份策略,能夠實現快照自動化儲存,可以定義各種策略來滿足自己不同的備份
  • 缺點:還原不夠靈活,拍攝快照進行備份很快,但是還原的時候沒辦法隨意進行還原,類似虛擬機器快照

1、配置備份目錄

在 elasticsearch.yml 的配置檔案中註明可以用作備份路徑 path.repo ,如下所示:

yaml path.repo: ["/mount/backups", "/mount/longterm_backups"] 配置好後,就可以使用 snapshot api 來建立一個 repository 了,如下我們建立一個名為 my_backup 的 repository。 ```bash PUT /_snapshot/my_backup { "type": "fs", "settings": { "location": "/mount/backups/my_backup" } }

``` 2、開始通過API介面備份 有了 repostiroy 後,我們就可以做備份了,也叫快照,也就是記錄當下資料的狀態。如下所示我們建立一個名為 snapshot_1 的快照。

bash PUT /_snapshot/my_backup/snapshot_1?wait_for_completion=true

【溫馨提示】wait_for_completion 為 true 是指該 api 在備份執行完畢後再返回結果,否則預設是非同步執行的,我們這裡為了立刻看到效果,所以設定了該引數,線上執行時不用設定該引數,讓其在後臺非同步執行即可。

3、增量備份

bash PUT /_snapshot/my_backup/snapshot_2?wait_for_completion=true

當執行完畢後,你會發現 /mount/backups/my_backup 體積變大了。這說明新資料備份進來了。要說明的一點是,當你在同一個 repository 中做多次 snapshot 時,elasticsearch 會檢查要備份的資料 segment 檔案是否有變化,如果沒有變化則不處理,否則只會把發生變化的 segment file 備份下來。這其實就實現了增量備份。

4、資料恢復 通過呼叫如下 api 即可快速實現恢復功能: bash POST /_snapshot/my_backup/snapshot_1/_restore?wait_for_completion=true { "indices": "index_1", "rename_replacement": "restored_index_1" }

2)elasticdump備份遷移es資料

索引資料匯出為檔案(備份)

```bash

匯出索引Mapping資料

elasticdump \ --input=http://es例項IP:9200/index_name/index_type \ --output=/data/my_index_mapping.json \ # 存放目錄 --type=mapping

匯出索引資料

elasticdump \ --input=http://es例項IP:9200/index_name/index_type \ --output=/data/my_index.json \ --type=data ``` 索引資料檔案匯入至索引(恢復)

```bash

Mapping 資料匯入至索引

elasticdump \ --output=http://es例項IP:9200/index_name \ --input=/home/indexdata/roll_vote_mapping.json \ # 匯入資料目錄 --type=mapping

ES文件資料匯入至索引

elasticdump \ --output=http:///es例項IP:9200/index_name \ --input=/home/indexdata/roll_vote.json \ --type=data ```

可直接將備份資料匯入另一個es叢集

bash elasticdump --input=http://127.0.0.1:9200/test_event --output=http://127.0.0.2:9200/test_event --type=data

type型別

type是ES資料匯出匯入型別,Elasticdump工具支援以下資料型別: | type型別 | 說明 | |--|--| | mapping | ES的索引對映結構資料 | |data |ES的資料 | |settings |ES的索引庫預設配置 | |analyzer |ES的分詞器 | |template |ES的模板結構資料 | |alias |ES的索引別名 |

3)esm備份遷移es資料

備份es資料

bash esm -s http://10.33.8.103:9201 -x "petition_data" -b 5 --count=5000 --sliced_scroll_size=10 --refresh -o=./es_backup.bin

-w 表示執行緒數 -b 表示一次bulk請求資料大小,單位MB預設 5M -c 一次scroll請求數量 匯入恢復es資料

bash esm -d http://172.16.20.20:9201 -y "petition_data6" -c 5000 -b 5 --refresh -i=./dump.bin


四、彩蛋

還有個日誌系統架構跟ELK架構很相似(Elasticsearch、Flume、Kafka、Flink、Kibana),只是把Filebeat換成了Flume,Logstash換成了Flink。後面也會寫篇文章分享出來,請耐心等待……