​Elastic Stack最佳实践系列:Beats->ES,一个更轻型的架构选择

语言: CN / TW / HK

作者:李捷,Elastic首席云解决方案架构师

ELK生态下,构建日志分析系统的选择

说起开源的日志分析系统,ELK几乎无人不晓,这个生态并非是Elastic特意而为,毕竟Elasticsearch的初心是分布式的搜索引擎,被广泛用作日志系统纯粹一个“美丽的意外”,这是社区使用者推动而成。而现在各大云厂商推广自己的日志服务时,也往往将各种指标对标于ELK,可见其影响之广。

但其实,流行的架构中并非只有ELKB,当我们使用ELKB搭建一套日志系统时,除了Elasticsearch, Logstash, Kibana, beats之外,其实被广泛使用的还有另一个工具 —— Kafka。在这当中,Kafka的作用是明显的,作为一个中间件,一个缓冲,它起到了提高吞吐,隔离峰值影响,缓存日志数据,快速落盘,同时通过producer/consumer模式,让Logstash能够横向拓展的作用,还能够用作数据的多路分发。因此,大多数时候,我们看到的实际架构,按数据流转顺序排列,应该是 BKLEK架构

但我们在使用Kafka的时候,也并非是没有成本的,额外的一套分布式系统,更长的数据链路,都会是我们在做最后架构选型时的一些痒点,特别是随着我们产生的数据越来越多,BKLEK架构会变得越来越大,越来越重,成本、性能、运维的简易性都会成为我们评估日志系统的重要指标。因此,我们的问题是:在Elastic Stack都已经进化到了8.1的当下,我们是否还需要延续一直以来的惯性思维,认为在我们仍然在任何情况下都是需要 BKLEK 的架构呢?

在我们开始正式探讨之前,我们可以从现在普遍看到的新的架构图可以一猜端倪:

在这个架构中,所有的Integration的输出都是Elasticsearch,所有的数据处理都由ingest pipeline完成,数据的完整性和可靠性,由端点和elasticsearch之间的应答确认来保证。因此,我们本文探讨的是:

  1. 我们数据采集端直接到ES的架构,是否可取

  2. 我们什么时候可以使用这种架构

数据采集端直接到ES的架构分析

虽然Elastic原厂的Fleet与Elastic Agent已经处于GA(普遍可用)的阶段,但因为其本身的一些限制,比如:

  • Integration Repository需要连接外网

  • 需要单独的Fleet Server (在最新版本已经与APM server合并为Integration Server)

  • Elastic Agent还没有能完全覆盖Beats所支持的数据源

因此,现阶段,我们讨论数据采集端直接到ES的架构时,会主要集中在Beats->Elasticsearch这一架构。这一架构,相对于BKLEK架构来说,少了中间的Kafka,甚至我们可以忽略Logstash,因此,架构会相对精简,带来的好处包括:

  • 相对更低的成本

  • 更高的传输和数据处理效率

  • 更一致的安全特性

  • 更容易进行监控

当然,在带来以上好处的同时,我们也会失去Kafka所带来的各种好处。不过不用担心,Kafka的特性只是构建一个稳健的日志分析系统的充分条件,而非必要条件,在不少场景下,我们不一定是非Kafka不可。接下来,我们将讨论几个我们相对会比较关心的问题,以让大家了解,我们是否可以选择这种架构,什么时候选择这种架构,以及相应的最佳实践。

Beats -> Elasticsearch链路的健康保障

对于不使用Kafka的场景,我们可能始终会有点觉得不踏实。因为对Beats -> Elasticsearch这个简单架构不够了解,以至于我们信心不足。接下来,我们讨论一下,在这种简单架构下,我们是如何面对各种可能出现的问题的。

大多数架构师会担心的问题是流量波动的问题,如果突然出现日志流量的洪峰,是不是会影响到后端的日志系统。这个问题的答案是, 有影响,但影响有限

我们先明确一下日志系统的主要作用:即日志的集中管理,在统一的日志平台上提供所有日志的准实时的关联查询与分析能力。这里的核心是准实时和查询能力。

在流量洪峰的情况下,受影响的是“准实时”的能力,因为受限于日志系统的处理能力,如果日志产生的速度,大于日志系统处理的速度,则我们无法读取到最新的数据。这个问题,即便是我们有了Kafka也是无法解决的。

而查询的能力,几乎不受影响。原因如下:

  • ES的读写是由不同的线程处理的,有各自独立的线程池。过载的写流量,会导致后到的写请求,因为写线程池已满,而被拒绝。但不会导致查询情况的拒接服务

  • ES从7.0开始,使用真实内存断路器,能够避免由于过多的请求,导致节点内存OOM。但在日志场景,特定时间内日志请求的数量是有明确上限的,该上限为Beats的数量,相比于高并发的读场景,Beats几乎不可能造成请求数量的过载

  • Beats与Elasticsearch之间有背压检测机制,当Beats检测到ES有拒绝写服务的情况出现时,会主动限流降低,避免对ES产生持续的压力与影响。具体参见,附录1

或许这种“轻描淡写”的描述,让大家觉得有点难以相信,甚至与平时的经验有点不符。但实际上真正的问题来自于不合理的架构:

  • 大多数企业会选择维护一个“怪物”级别的ES集群,即,将业务搜索和日志查询两个完全不同的场景混用与同一个ES集群。因此,当某些日志流量洪峰来临,打满了“write”线程池之后,会发现业务数据也无法写入了。但仍然的,kafka也解决不了这个问题,因为,即便有kafka在前面挡住真正的日志数据洪流,你也很难判断kafka后面的logtash集群,会不会导致ES集群拒绝写服务, 因为消费日志永远比索引日志要快 ,logstash会持续消费kafka中的数据,直到感受到背压,才会减缓输出的频率, 这点和Filebeat是一样的

  • 大多数在早期就已经在使用ES的企业,由于缺乏合理的架构设计,或者缺乏足够的升级集群的信心,集群一直停留在5,6等老版本的,因此,整个架构缺乏足够的健壮性和可恢复性,一些关键能力,比如背压检测,内存断路器,Heap memory offload等功能是没有的。导致集群较为脆弱,在流量洪峰时,出现OOM等问题,也是有的。但仍然,kafka也解决不了这个问题。

因此,我们在做架构选型之前,确定要不要使用Beats -> Elasticsearch这一架构之前, 不妨先审视一下,我们当前是不是在业务和日志混用ES集群,我们的ES集群版本是不是过低,运维缺乏升级的能力 ?这两个反而是更致命的问题

回到流量洪峰问题的解决。即便加上kafka,也只是治标不治本,看起来架构更加健壮了一点,但实际上也并不能帮助我们提高后端的消费能力,我们整体面临的情况和使用Beats -> Elasticsearch这一架构是没有多大的区别的(Kafka最大的好处,就是把这些数据落盘到自己的磁盘上了,但这真的是我们在这个场景下需要解决的问题吗?)。

流量洪峰的治本方案,应该是着眼于快速的故障排查,快速的找到出现故障的机器并解决故障。(出现业务暴增也不是不可能的情况, 但正常的业务日志撑爆日志系统的可能性非常小 ,因为首先会撑爆的是产生日志的业务系统,因此,一旦业务暴增,我们需要考虑业务系统和日志系统的同步扩容)

我们可以从Elastic Stack上快速的找到这样问题的答案,而且方法很多,这里举例一二。

  • 监控日志集群的bulk拒绝率

  • 监控日志错误率

  • 使用机器学习判断日志异常

在发现异常之后,我们可以进一步定位,通过仪表板,查看具体是哪个服务出现了异常:

另一个方面,在使用Kafka与Logstah的情况下,其实也是会有数据丢失的风险的。数据处理的链路越长,架构越复杂,带来的脆弱性就越多。在这个架构中,Kafka通过冗余提供了高可用。但Logstash确成为了风险的一环,虽然其和Kafka、ES之间都有应答确认的机制。但一旦Logstash在消费Kafka之后,正确投递ES之前发生了崩溃,则数据将会丢失。并且我们很可能并不知道数据丢失了。

Beats->Elasticsearch的链路效率

在使用Kafka与Logstah的情况下,数据需要由Beats首先落盘到Kafka的分布式日志文件中,再由Logstash从Kafka中消费,之后,数据又要根据Logstash与ES之间的背压,将数据落盘到Logstash的Disk Queue上。相比于直接由Beats到ES的架构,这里有了两次数据落盘,读盘的操作;

另一方方面,在基线测试中,我们可以看到Elasticsearch的Ingest node相对Logstash的ETL处理的能力会来得更加高效。其原因在于,Logstash是一个Java-Ruby解析器上的应用,在JVM上运行Ruby解析器的运行效率,不如纯java应用的ingest node。并且处理完的数据,相对于已经包含了集群内部路由信息的ingest node,在Logstsh作为ETL的架构下,还需要中间一个额外的ES节点负责bulk request的路由分发,需要更多的网络跳转才可以最终写到对应的data node。

更一致的安全特性

而在链路安全方面,ingest node处于ES集群内,天然的使用统一的安全策略。而Beats->ES本身就是通过HTTPS,再配合证书或者密码验证的方式,可以保证链路的安全。

而如果在链路中再增加Kafka与Logstash,则整条链路的安全配置则会更加复杂,稍有配置不慎,还会存在更多的数据泄露的风险。

更容易进行监控

在监控方面,简单的架构会让我们更容易发现系统的异常。并且,我们有现成的手段和现成的告警规则

什么是最佳的架构?

“最佳架构”这个词一定是一个伪命题,因为不存在能解决所有问题的银弹。但指导准则是有的, 即按需选择合适的架构

企业通用架构

我们的指导原则是:

  • 大多数的日志数据链路,尽量选择简单且健壮的架构。Beats->Elasticsearch的架构足够简单和健壮,这个应该是大多数情况下,以Elastic Stack为基础的日志系统应该选择的数据摄入方式

  • 对于缺乏可靠性的数据链路,以及需要高吞吐的能力、快速将数据从边缘节点移出的,需选择Kafka的帮助

  • 整个架构中的数据链路可以是异构的,根据数据的属性,选择合适的链路

大家可以看到,我们在这里建议的是大多数情况下,我们应该选择Beats->Elasticsearch这一简单的架构。其原因就像我们之前提到的:

  • Beats->Elasticsearch的链路足够健壮

  • Beats->Elasticsearch的链路效率更高

  • Beats->Elasticsearch有更好的安全一致性

而当我们确实需要Kafka和Logstash的数据链路,可以参考我们的一些配置建议[1][2]。

接下来,我们再总结一下我们该如何判断是否需要Kafka

我们在什么情况下,需要Kafka

作为一个Queue,Kafka在日志场景的作用,并不像在其他的业务应用场景中那么重要。它最主要的作用就是能够将数据快速落盘,并且以冗余的方式存储在分布式的日志文件中。同时,通过producer/consumer机制,让后端的ETL工具可以并发的消费,并且提供再消费的容错能力。

这里的重点是数据“快速”“落盘”。那我们在什么场景下非常需要这两个能力呢?我们先说“落盘”

数据需要落盘

有一些数据,是转瞬即逝型的数据,比如,我们通过各种探针,exporter采集数据后,在采集端没有落盘,如果不及时存储起来,就会丢失这部分数据。类似于物联网数据,指标数据,apm数据都属于这种类型的数据。

数据产生于那一个瞬间,并且在产生数据的地方,我们并没有什么机制去存储这些数据,因此,在数据真正进入后端的存储分析之前,我们往往需要一个Queue能够帮我们把这些数据落盘。这时,Kafka几乎就是我们在技术选型时的不二选择。

数据需要高速转移

再说“快速”,Kafka的高吞吐也是非常重要的一个能力,也是其得以让人追捧的主要原因之一。可以设想这样的一种场景,在云原生环境下,我们非常动态的去创建各种计算资源,以应对业务流量的变化,虽然每个计算资源产生的数据落盘了,但由于它可能会被销毁,因此,我们需要在它被销毁之前,把其产生的日志数据搬移到Pods之外,对于这种转瞬即逝的资源所产生的数据,也需要Kafka的能力。

应对数据高峰

日志数据或基于事件的数据很少具有一致的、可预测的大小或流速。考虑一个在周五晚上升级应用程序的场景,您部署的应用程序有一个严重的错误,即信息被过度记录,淹没了您的日志基础设施。在其他多租户用例(例如游戏和电子商务行业)中,这种峰值或数据爆发也相当普遍。在这种情况下使用像 Kafka 这样的消息队列也能来缓冲数据,减缓影响。

数据的多路分发

还有一种情况,是我们可能需要Kafka将数据分发到别的地方,我们可以定义多个消费端,分别去消费Kafka里的数据,将数据分发到不同的数据处理系统。注意,不要图省事,只建立Logstash作为消费端,然后尝试用Logstash的多个Output去分发。因为Logstash并不能保证过个Output之间的数据同步与一致,我相信不仅是Logstash,其他的消费端也无法做此保证,因此需要多个消费端分别消费Kafka里的数据。当然,这里不是只有kafka一个选择,也可以数据入湖,再从湖中消费。

因此,总结一下,如果你的数据因为没有快速落盘,而存在丢失的风险;如果你的数据吞吐很大,需要及时转移,以及需要应对可能出现的数据洪峰,则你需要Kafka。

我们在什么情况下,可以不需要Kafka

先总结一下我们在什么时候必须要用Kafka:

我们稍微回想一下,在通常的日志场景下我们是如何采集日志的:

filebeat.autodiscover:
      providers:
        - type: kubernetes          node: ${NODE_NAME}
          hints.enabled: true
          templates:
            - condition:
                contains:
                  kubernetes.container.name: "opbeans-"
              config:
                - type: container                  paths:
                    - "/var/log/containers/*-${data.kubernetes.container.id}.log"
                  include_lines: ['^{']
                  tail_files: true

复制

我们需要指定特定的日志目录,也就意味着,我们是从磁盘上采集日志数据的。而在云环境上,如果我们采用了云盘作为数据盘,这些日志数据在产生端,即已经实现了冗余的落盘存储。因此,相对于物联网数据,指标数据,apm数据这种ephemeral数据,已经落盘的日志数据大多数时候,并不需要额外用一个Queue再去做一次数据的落盘与冗余存储。

但在之前的分析中,我们也说过,在某些ephemeral环境下,如果存储数据的本地文件缓存,在我们将数据搬移出去之前就被销毁的话,就会有数据丢失的风险。但幸运的是,大多数情况下,我们是能够轻松判断出这种风险的,大多数非动态扩缩容的应用,比如那些部署在虚拟机上的应用,这种风险的可能性极小,并且可以控制。相对于快速将数据搬出到kafka, 增加本地文件存储的大小,反而是一个更简单,更低成本的选择

并且,当我们发现数据没有正确的写入日志分析系统时,重新让filebeat采集一次日志,会比让Logstash去调整offset,在Kafka上重新consume来得简单得多,直观得多,你不需要了解被采集的文件和kafka topic、partition之间的映射关系,你不需要知道文件与offset的映射关系,以及如何操作re-consume。而且通过Beats与Elasticsearch之间的端到端的应答确认机制和背压探测机制,我们更容易保证数据的正确写入。

因此,总结一下,如果你的数据是日志,这些日志被写入到磁盘,并且不会在短期内被删除。我们可以选择不用Kafka

总结

为了防止大家漏掉关键的点,在最后再强调三遍:

  • 日志系统切勿与业务搜索系统混用一个ES集群

  • 日志系统切勿与业务搜索系统混用一个ES集群

  • 日志系统切勿与业务搜索系统混用一个ES集群

这是我看过的 最悲剧的 日志分析系统的架构,没有之一。

回到我们本文内容,一个常见的问题是,我们常常忽略架构设计的重要性,懒于去思考其中变量的影响,希望通过一刀切的简单方案去做事情( 一上来就是BKLEK架构,不去思考这样做是否值得,我们是否还有更多的选择 )。但即便是我们全部使用了Beats, Kafka, Logstash, Elasticsearch, Kibana五个组件,也不是所有的数据链路都走一样的路径,我们可以针对接入系统的特性,优先级,重要程度,要求的指标,有所取舍。

因此,虽然选择Elastic Stack是一个一定不会错的选择,但如果你觉得这套系统复杂了,成本高了,那希望本文可能给你帮助,或许在只有日志数据的这个阶段,你的系统还是可以简化的,有更多选择的。

附录

1, filebeat与Elasticsearch之间的背压检测协议

Filebeat 在将数据发送到 Logstash 或 Elasticsearch 时使用背压敏感协议来处理大量数据。如果 Logstash 忙于处理数据,它会让 Filebeat 知道减慢其读取速度。一旦拥塞得到解决,Filebeat 将恢复到原来的速度并继续发送数据。

关于细节,可以查看以下配置:

# The number of seconds to wait before trying to reconnect to Elasticsearch
  # after a network error. After waiting backoff.init seconds, the Beat
  # tries to reconnect. If the attempt fails, the backoff timer is increased
  # exponentially up to backoff.max. After a successful connection, the backoff
  # timer is reset. The default is 1s.
  backoff.init: 1s  # The maximum number of seconds to wait before attempting to connect to
  # Elasticsearch after a network error. The default is 60s.
  backoff.max: 60s

复制

与Libbeat的实现(https://github.com/elastic/beats/下):

/libbeat/outputs/elasticsearch/elasticsearch.go (elasticsearch output会使用backoff初始化client)

/libbeat/outputs/elasticsearch/client.go#L204 (client会判断接口调用是否成功,如果失败,会返回err)

/libbeat/outputs/backoff.go#L38(判断接口的返回,如果有err,等待)

func (b *backoffClient) Publish(ctx context.Context, batch publisher.Batch) error {
	err := b.client.Publish(ctx, batch)
	if err != nil {
		b.client.Close()
	}
	backoff.WaitOnError(b.backoff, err)
	return err}

点击原文,获取更多内容

免费体验活动专区

Elasticsearch 新用户可享  2核4G,0元  体验  30  天! 顺畅体验云上集群

扫码关注 「腾讯云大数据」 ,了解腾讯云 Elasticsearch 更多信息 ~

腾讯云大数据

长按二维码
关注我们