RocketMq-批量消息和消息过滤

语言: CN / TW / HK

持续创作,加速成长!这是我参与「掘金日新计划 · 6 月更文挑战」的第27天,点击查看活动详情

批量消息

在高并发场景中,批量发送消息能显著提高传递消息发送时的性能(减少网络连接及IO的开销)。使用批量消息时的限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK(集群时会细讲),且不能是延时消息。在发送批量消息时先构建一个消息对象集合,然后调用send(Collection msg)系列的方法即可。由于批量消息的4MB限制,所以一般情况下在集合中添加消息需要先计算当前集合中消息对象的大小是否超过限制,如果超过限制也可以使用分割消息的方式进行多次批量发送。

使用案例

一般批量发送(不考虑消息分割)

因为批量消息是一个Collection,所以送入消息可以是List,也可以使Set,这里为方便起见,使用List进行批量组装发送。

图片6.png

批量切分发送

如果消息的总长度可能大于4MB时,这时候最好把消息进行分割,案例中以1M大小进行消息分割。

我们需要发送10万元素的数组,这个量很大,怎么快速发送完。使用批量发送,同时每一批控制在1M左右确保不超过消息大小限制。

图片7.png

图片8.png

消息过滤

在实际的开发应用中,对于一类消息尽可能使用一个Topic进行存储,但在消费时需要选择您想要的消息,这时可以使用RocketMQ的消息过滤功能,具体实现是利用消息的Tag和Key。

Key一般用于消息在业务层面的唯一标识。对发送的消息设置好 Key,以后可以根据这个 Key 来查找消息。比如消息异常,消息丢失,进行查找会很方便。RocketMQ 会创建专门的索引文件,用来存储 Key与消息的映射,由于底层实现是 Hash 索引,应尽量使 Key唯一,避免潜在的哈希冲突。

Tag可以理解为是二级分类。以淘宝交易平台为例,订单消息和支付消息属于不同业务类型的消息,分别创建OrderTopic 和PayTopic,其中订单消息根据不同的商品品类以不同的 Tag 再进行细分,如手机类、家电类、男装类、女装类、化妆品类,最后它们都被各个不同的系统所接收。通过合理的使用 Topic 和 Tag,可以让业务结构清晰,更可以提高效率。

Key和Tag的主要差别是使用场景不同,Key主要用于通过命令行命令查询消息,而Tag用于在消息端的代码中,用来进行服务端消息过滤。

使用Key一般使用mqadmin管理工具,具体位置在RocketMQ/bin目录下。具体文档

Tag过滤

使用Tag过滤的方式是在消息生产时传入感兴趣的Tag标签,然后在消费端就可以根据Tag来选择您想要的消息。具体的操作是在创建Message的时候添加,一个Message只能有一个Tag。

使用案例

生产者发送60条消息,分别打上三种tag标签。

图片9.png

消费者消费时只选择TagA和TagB的消息。

图片10.png

注意事项

Tag过滤的形式非常简单,||代表或、*代表所有,所以使用Tag过滤这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。

Sql过滤

SQL特性可以通过发送消息时的属性来进行消息的过滤计算。具体的操作是使用SQL92标准的sql语句,前提是只有使用push模式的消费者才能用(消费的模式就是push)

SQL基本语法

数值比较: 比如:>,>=,<,<=,BETWEEN,=;

字符比较: 比如:=,<>,IN;IS NULL 或者 IS NOT NULL;

逻辑符号: AND,OR,NOT;

常量支持类型为: 数值,比如:123,3.1415;字符,比如:'abc',必须用单引号包裹起来;NULL,特殊的常量,布尔值,TRUE 或 FALSE

注意事项

Sql过滤需要Broker开启这项功能(如果消费时使用SQL过滤抛出异常错误,说明Sql92功能没有开启),需要修改Broker.conf配置文件。加入enablePropertyFilter=true 然后重启Broker服务。

使用案例

消息生产者,发送消息时加入消息属性,你能通过putUserProperty来设置消息的属性,以下案例中生产者发送10条消息,除了设置Tag之外,另外设置属性a的值。

图片11.png

用MessageSelector.bySql来使用sql筛选消息

图片12.png

消费结果:按照Tag和SQL过滤消费3条消息。

图片13.png