ELK+kafka docker快速搭建+.NetCore中使用

語言: CN / TW / HK

目錄

  • 一、搭建ES

  • 二、搭建Kibana

  • 三、搭建kafka

  • 四、搭建Logstash

  • 五、.Net Core寫日誌到kafka

  • 六、ELK日誌檢視

  • 七、其他方式寫日誌

ELK開源實時日誌分析平臺。ELK是Elasticsearch,Logstash,Kibana 的縮寫。

Elasticsearch:是個開源分散式搜尋引擎,簡稱ES

Logstash:是一個完全開源的工具,可以對日誌進行收集,過濾,儲存到ES

Kibana: 也是一個開源和免費的工具,這裡主要用作ES的視覺化介面工具,用於檢視日誌。

環境:centos7.9

回到頂部

一、搭建ES

先要調高jvm執行緒數限制,修改sysctl.conf

vim /etc/sysctl.conf

修改max_map_count調大,如果沒有這個設定,則新增一行

vm.max_map_count=262144

改完儲存後, 執行下面命令讓sysctl.conf檔案生效

sysctl –p

拉取es映象

#拉取映象,指定版本號
docker pull elasticsearch:7.13.2

新建elasticsearch.yml配置檔案並上傳到主機目錄用於配置檔案掛載,方便後面修改,這裡上傳到/home/es目錄。

http.host: 0.0.0.0
#跨域
http.cors.enabled: true
http.cors.allow
-origin: "*"

啟動es

docker run -d -p 9200:9200 -p 9300:9300 --name es -e ES_JAVA_OPTS="-Xms128m -Xmx256m" -v /home/es/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml elasticsearch:7.13.2
#9200是對外埠,9300是es內部通訊埠
#
-e ES_JAVA_OPTS="-Xms128m -Xmx256m" 限制記憶體,最小128,最大256
#
-v 把配置檔案掛載到es的docker裡的配置檔案

在瀏覽器開啟,ip:9200看到es資訊,就成功了,啟動可能需要一點時間,要等一會。

二、搭建Kibana

啟動kibana,這裡沒有先拉取映象,docker沒有映象會到公共庫嘗試拉取,下面的一樣。

#如果沒有映象會自動到公共庫拉取再啟動,-e 環境配置指向es的地址
docker run -p 5601:5601 -d --name kibana -e ELASTICSEARCH_URL=http://172.16.2.84:9200 -e ELASTICSEARCH_HOSTS=http://172.16.2.84:9200 kibana:7.13.2

在瀏覽器輸入ip:5601,能看到kibana的資訊,說明成功了

三、搭建kafka

1)kafka前置需要先安裝zookeeper

#-v /etc/localtime:/etc/localtime把本機的時間掛載進docker,讓docker同步主機的時間
docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime zookeeper

2)安裝kafka

docker run  -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.16.2.84:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.2.84:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
#KAFKA_BROKER_ID:kafka節點Id,叢集時要指定
#
KAFKA_ZOOKEEPER_CONNECT:配置zookeeper管理kafka的路徑,內網ip
#
KAFKA_ADVERTISED_LISTENERS:如果是外網的,則外網ip,把kafka的地址埠註冊給zookeeper,將告訴 zookeeper 自己的地址為 XXXX,當消費者向 zookeeper 詢問 kafka 的地址時,將會返回該地址
#
KAFKA_LISTENERS:配置kafka的監聽埠

kafka介面檢視可以用 kafka tool 地址:https://www.kafkatool.com/download.html

四、搭建Logstash

新建檔案logstash.yml,logstash配置資訊

http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [
"http://172.16.2.84:9200" ]
#host多節點用逗號隔開

新建檔案logstashkafka.conf,讀取kafka資訊和寫到es的資訊

input {
kafka {
topics
=> "logkafka" #訂閱kafka的topics
bootstrap_servers => "172.16.2.84:9092" # 從kafka的leader主機上提取快取
codec => "json" # 在提取kafka主機的日誌時,需要寫成json格式
}
}
output {
elasticsearch {
hosts
=> ["172.16.2.84:9200"]
index
=> "logkafka%{+yyyy.MM.dd}" #採集到es的索引名稱
#user => "elastic"
#password => "changeme"
}
}

把這兩個檔案放到主機目錄上,這裡放到/home/logstash

啟動logstash容器

docker run --rm -it --privileged=true -p 9600:9600 -d --name logstash -v /home/logstash/logstashkafka.conf:/usr/share/logstash/pipeline/logstash.conf -v /home/logstash/logstash.yml:/usr/share/logstash/config/logstash.yml logstash:7.13.2
#-v 把logstashkafka.conf和logstash.yml掛載到logstash容器

好了,到這裡所有需要的容器都建好了執行docker ps -a看下所有容器

五、.Net Core寫日誌到kafka

.Net Core只要把日誌寫到kafka,然後logstash訂閱kafka把日誌寫到es即可,這裡是.Net Core5.0演示。

NuGet包安裝Confluent.Kafka

class Program
{
static async Task Main(string[] args)
{
Console.WriteLine(
"Hello World!");
//kafka節點
string brokerList = "172.16.2.84:9092";
string topicName = "logkafka";
while (true)
{
Console.WriteLine($
"{Environment.NewLine}請輸入傳送的內容,傳送topics名:{topicName}");
string content = Console.ReadLine();
await ConfluentKafka.PublishAsync(brokerList, topicName, content);
}
}
}
public class ConfluentKafka
{
public static async Task PublishAsync(string brokerList, string topicName, string content)
{
//生產者配置
var config = new ProducerConfig
{
BootstrapServers
= brokerList, //kafka節點
Acks = Acks.None //ack機制,0不等伺服器確認,1主節點確認返回ack,-1全部節點同步完返回ack
};
using (var producer = new ProducerBuilder<string, string>(config)
.Build())
{
try
{
//key要給值,根據key做負載均衡,不然如果多節點,key不給值會全部寫有一個分割槽
var deliveryReport = await producer.
ProduceAsync(
topicName,
new Message<string, string> { Key = (new Random().Next(1, 10)).ToString(), Value = content });
Console.WriteLine($
"向kafka傳送了資料: {deliveryReport.TopicPartitionOffset}");
}
catch (ProduceException<string, string> e)
{
Console.WriteLine($
"向kafka傳送資料失敗:{e.Message}");
}
}
}
}

執行程式,向kafka寫資料

檢視kafka佇列,已經有資料

六、ELK日誌檢視

開啟Kibana介面,ip:5601,開啟左邊的面板

點選索引匹配

輸入匹配符,匹配到es日誌索引,下一步

索引匹配建立完畢,返回到面板

七、其他方式寫日誌

上面介紹的是kafka方式寫日誌,也是ELK最佳方式,能承載大量的日誌傳輸,這裡列一下其他常用的方式,主要是修改logstash採集配置就可以了。

1.讀目錄檔案方式,根據給的目錄,檔案型別去讀取寫到es,logstash配置

# Sample Logstash configuration for creating a simple
#
Beats -> Logstash -> Elasticsearch pipeline.

input {
file {
path
=> "D:/Log/Application/*log.log" #讀取目錄下log.log結尾的檔案
start_position => beginning
}
file {
path
=> "D:/Log/Application2/*log.log" #讀取目錄下log.log結尾的檔案
start_position => beginning
}
}
output {
elasticsearch {
hosts
=> ["172.16.2.84:9200"]
index
=> "filelog"
#user => "elastic"
#password => "changeme"
}
}

2.Tcp方式,通過tcp方式寫日誌,在log4net和nlog中直接可配tcp請求logstash,配置

input {
tcp{
port
=> 8001
type
=> "TcpLog"
}
}
output {
elasticsearch {
hosts
=> ["172.16.2.84:9200"]
index
=> "tcplog"
#user => "elastic"
#password => "changeme"
}
}

3.reids,把日誌推送到一個list型別的Key,logstash會訂閱這個key讀取訊息寫到es

input {
redis {
codec
=> plain
host
=> "127.0.0.1"
port
=> 6379
data_type
=> list
key
=> "listlog"
db
=> 0
}
}
output {
elasticsearch {
hosts
=> ["172.16.2.84:9200"]
index
=> "redislog"
#user => "elastic"
#password => "changeme"
}
}

4.RabbitMQ,把日誌寫到一個佇列,讓logstash訂閱佇列獲取日誌寫到es

input {
rabbitmq {
host
=> "127.0.0.1" #RabbitMQ-IP地址
vhost => "test" #虛擬主機
port => 5672 #埠號
user => "admin" #使用者名稱
password => "123456" #密碼
queue => "LogQueue" #佇列
durable => false #持久化跟佇列配置一致
codec => "plain" #格式
}
}

output {
elasticsearch {
hosts
=> ["172.16.2.84:9200"]
index
=> "rabbitmqlog"
}
}

出處:

https://www.cnblogs.com/wei325/archive/2021/08/16/15145842.html

版權申明:本文來源於網友收集或網友提供,僅供學習交流之用,如果有侵權,請轉告版主或者留言,本公眾號立即刪除。