RabbitMQ 的五种消息模型

语言: CN / TW / HK

RabbitMQ 提供了 6 种消息模型,但常用的是前面 5 种,第 6 种实际上为 RPC,所以一般来说了解前面 5 种即可,而对于后面三种,是根据 Exchange 类型划分的。

注:对下面模式的讲解主要基于 Java 原生 API 操作 ,因此在项目中需要添加如下依赖。

<code data-type="codeline"><dependency></code><code data-type="codeline">    <groupId>com.rabbitmq</groupId></code><code data-type="codeline">    <artifactId>amqp-client</artifactId></code><code data-type="codeline">    <version>5.9.0</version></code><code data-type="codeline"></dependency></code>

复制代码

为了后续的操作先定义一个连接 rabbitmq 的连接工具类

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQUtils {
private static ConnectionFactory connectionFactory;
static { connectionFactory = new ConnectionFactory(); //我们把重量级资源通过单例模式加载 connectionFactory.setHost("192.168.153.128"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); //上面创建的VHost connectionFactory.setVirtualHost("/order"); }
//定义提供连接对象的方法 public static Connection getConnection() { try { return connectionFactory.newConnection(); } catch (Exception e) { e.printStackTrace(); } return null; }
//定义关闭通道和关闭连接工具方法 public static void closeConnectionAndChanel(Channel channel, Connection conn) { try { if (channel != null) { channel.close(); } if (conn != null) { conn.close(); } } catch (Exception e) { e.printStackTrace(); } }}

复制代码

基本消息模型

RabbitMQ 是一个消息代理:它接受和转发消息。可以将其视为邮局:当你将要投递的邮件放入邮箱时,你可以确定信件承运人最终会将邮件递送给你的收件人。在这个比喻中,RabbitMQ 是一个邮箱、一个邮局和一个信件载体。

  • P:生产者,发送消息到消息队列

  • C:消费者:消息的接受者,会一直等待消息到来。

  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

1、发送消息

在原生 JavaAPI 中,通过 queueDeclare 方法去申明队列:

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;

复制代码

参数说明:

  • queue,队列名称。

  • durable,是否持久化,如果持久化,mq 重启后队列还在。

  • exclusive,是否独占连接,队列只允许在该连接中访问,如果 connection 连接关闭队列则自动删除,如果将此参数设置 true 可用于临时队列的创建。

  • autoDelete,自动删除,队列不再使用时是否自动删除此队列,如果将此参数和 exclusive 参数设置为 true 就可以实现临时队列(队列不用了就自动删除)。

  • arguments参数,可以设置一个队列的扩展参数,比如:可设置存活时间等。

主要通过 basicPublish 方法

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

复制代码

参数说明:

  • exchange,交换机,如果不指定将使用 mq 的默认交换机(设置为"")。

  • routingKey,路由 key,交换机根据路由 key 来将消息转发到指定的队列,如果使用默认交换机,routingKey 设置为队列的名称。

  • props,消息的属性。

  • body,消息内容。

代码实现

Producer

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
public class Producer {
//定义队列名称 private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception { // 1、获取到连接 Connection connection = RabbitMQUtils.getConnection(); // 2、从连接中创建通道,使用通道才能完成消息相关的操作 Channel channel = connection.createChannel(); // 3、声明(创建)队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 4、消息内容 String message = "Hello World!"; // 向指定的队列中发送消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); //关闭通道和连接 channel.close(); connection.close(); }}

复制代码

去控制台查看:

2、接收消息

接收消息 consumer#handleDelivery 方法:

void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException;

复制代码

Consumer

import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
public class Consumer { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //实现消费方法 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { System.out.println(new String(body)); } }; //自动ack channel.basicConsume(QUEUE_NAME, true, consumer); }}

复制代码

work 消息模型

多个消费者监听同一队列。消费者接收到消息后, 通过线程池异步消费。但是一个消息只能被一个消费者获取。work queue 常用于避免消息堆积问题。

  • P:生产者,发布任务。

  • C1:消费者 1,领取任务并且完成任务,假设完成速度较慢(模拟耗时)

  • C2:消费者 2,领取任务并且完成任务,假设完成速度较快

Producer

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQUtils {
//定义提供连接对象的方法 public static Connection getConnection() { try { ConnectionFactory connectionFactory = new ConnectionFactory(); //我们把重量级资源通过单例模式加载 connectionFactory.setHost("192.168.153.128"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("order"); return connectionFactory.newConnection(); } catch (Exception e) { e.printStackTrace(); } return null; }
//定义关闭通道和关闭连接工具方法 public static void closeConnectionAndChanel(Channel channel, Connection conn) { try { if (channel != null) { channel.close(); } if (conn != null) { conn.close(); } } catch (Exception e) { e.printStackTrace(); } }}

复制代码

Consumer1

import cn.javatv.javaAPI.RabbitMQUtils;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
import java.util.concurrent.TimeUnit;
public class Consumer1 { private final static String QUEUE_NAME = "work";
public static void main(String[] args) throws Exception { // 获取到连接 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //实现消费方法 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { try { //模拟任务耗时 TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Consumer1_" +new String(body)); } }; //自动ack channel.basicConsume(QUEUE_NAME, true, consumer); }}

复制代码

Consumer2

import cn.javatv.javaAPI.RabbitMQUtils;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
public class Consumer2 { private final static String QUEUE_NAME = "work";
public static void main(String[] args) throws Exception { // 获取到连接 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //实现消费方法 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { System.out.println("Consumer2_" + new String(body)); } }; //自动ack channel.basicConsume(QUEUE_NAME, true, consumer); }}

复制代码

先启动消费者,在启动生成者,输出如下:

我们发现消费者是按照轮询消费的,但这种消费存在一个问题,假如 Consumer1 处理能力极快,Consumer2 (代码中休眠了 2s)处理能力极慢,这是 Consumer2 会严重拖累整体消费进度,而 Consuemr1 又早早的完成任务而无所事事。

能者多劳

从上面的结果可以看出,任务是平均分配的,也就是说,不管你上个任务是否完成,我继续把后面的任务分发给你,而实际上为了效率,谁消费得越快,谁就得到越多。因此可以通过 BasicQos 方法的参数设为 1,前提是在手动 ack 的情况下才生效,即 autoAck = false

import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
import java.io.IOException;import java.util.concurrent.TimeUnit;
public class Consumer2 { private final static String QUEUE_NAME = "work";
public static void main(String[] args) throws Exception { // 获取到连接 Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //设置消费者同时只能处理一条消息 channel.basicQos(1); //实现消费方法 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { //模拟任务耗时 TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Consumer2_" + new String(body)); //确认消息 channel.basicAck(envelope.getDeliveryTag(),false); } }; //手动ack channel.basicConsume(QUEUE_NAME, false, consumer); }}

复制代码

输出结果:

可以看到 Consumer1 消费了 19 个,Consumer2 才消费 1 个。

Publish/Subscribe-Fanout

一次向多个消费者发送消息,该模式的交换机类型为 Fanout,也称为广播。

它具有以下性质:

  • 可以有多个消费者。

  • 每个消费者有自己的 queue。

  • 每个队列都要绑定到 Exchange。

  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。

  • 交换机把消息发送给绑定过的所有队列。

  • 队列的消费者都能拿到消息,实现一条消息被多个消费者消费。

Producer

import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import java.io.IOException;import java.util.concurrent.TimeoutException;
public class FanoutProducer {
public final static String EXCHANGE_NAME = "fanout";
public static void main(String[] args) throws IOException, TimeoutException { //建立连接 Connection connection = RabbitMQUtils.getConnection(); // 创建一个信道 Channel channel = connection.createChannel(); // 指定转发类型为FANOUT channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //发送3条消息,且路由键不同 for (int i = 1; i <= 3; i++) { //路由键,循环3次,路由键为routekey1,routekey2,routekey3 String routekey = "routekey" + i; // 发送的消息 String message = "fanout_" + i; /* * 参数1:exchange name 交换机 * 参数2:routing key 路由键 */ channel.basicPublish(EXCHANGE_NAME, routekey, null, message.getBytes()); System.out.println(" [x] Sent '" + routekey + "':'" + message + "'"); } // 关闭 channel.close(); connection.close(); }}

复制代码

Consumer1

import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer1 {
public final static String EXCHANGE_NAME = "fanout";
public static void main(String[] argv) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); // 声明一个随机队列 String queueName = channel.queueDeclare().getQueue(); /* * 队列绑定到交换器上时,是允许绑定多个路由键的,也就是多重绑定 */ String[] routekeys = {"routekey1", "routekey2", "routekey3"}; for (String routekey : routekeys) { //绑定 channel.queueBind(queueName, FanoutProducer.EXCHANGE_NAME, routekey); } System.out.println("[" + queueName + "]等待消息:"); // 创建队列消费者 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接收" + envelope.getRoutingKey() + ":" + message); } }; channel.basicConsume(queueName, true, consumer); }}

复制代码

我们看看 fanout 的定义:

消息广播到绑定的队列,不管队列绑定了什么路由键,消息经过交换机,每个队列都有一份。

换句话说,只要队列和交换机绑定,不在乎路由键是什么都能接收消息。

如绑定一个不存在的路由键:

import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
import java.io.IOException;import java.util.concurrent.TimeoutException;
/** * 类说明:fanout消费者--绑定一个不存在的路由键 */public class Consumer2 {
public final static String EXCHANGE_NAME = "fanout";
public static void main(String[] argv) throws IOException, TimeoutException { Connection connection = RabbitMQUtils.getConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); // 声明一个随机队列 String queueName = channel.queueDeclare().getQueue(); //设置一个不存在的路由键 String routekey = "xxx"; channel.queueBind(queueName, FanoutProducer.EXCHANGE_NAME, routekey); System.out.println("队列[" + queueName + "]等待消息:");
// 创建队列消费者 final Consumer consumerB = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); //记录日志到文件: System.out.println("接收消息 [" + envelope.getRoutingKey() + "] " + message); } }; channel.basicConsume(queueName, true, consumerB); }}

复制代码

输出:

<code data-type="codeline">队列[amq.gen-G2LL566wrSH3mGBUF6XKCQ]等待消息:</code><code data-type="codeline">接收消息 [routekey1] fanout_1</code><code data-type="codeline">接收消息 [routekey2] fanout_2</code><code data-type="codeline">接收消息 [routekey3] fanout_3</code>

复制代码

不管我们如何调整生产者和消费者的路由键,都对消息的接收没有影响。

Routing-Direct

在 Direct 模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由 key),消息的发送方在向 Exchange 发送消息时,也必须指定消息的 routing key。

  • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个 routing key。

  • X:Exchange,接收生产者的消息,然后把消息递交给 与 routing key 完全匹配的队列。

  • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息。

  • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息。

Producer

发送 3 种不同类型的日志。

import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import java.io.IOException;import java.util.concurrent.TimeoutException;
public class DirectProducer {
public final static String EXCHANGE_NAME = "direct";
public static void main(String[] args) throws IOException, TimeoutException { //创建连接、连接到RabbitMQ Connection connection = RabbitMQUtils.getConnection(); //创建信道 Channel channel = connection.createChannel(); //在信道中设置交换器 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //申明队列(放在消费者中去做) String[] routeKeys = {"info", "warning", "error"}; for (int i = 1; i <= 6; i++) { String routeKey = routeKeys[i % 3]; String msg = routeKey + "日志"; //发布消息 channel.basicPublish(EXCHANGE_NAME, routeKey, null, msg.getBytes()); System.out.println("Sent:" + msg); } channel.close(); connection.close(); }}

复制代码

生产消息:

<code data-type="codeline">Sent:warning日志</code><code data-type="codeline">Sent:error日志</code><code data-type="codeline">Sent:info日志</code><code data-type="codeline">Sent:warning日志</code><code data-type="codeline">Sent:error日志</code><code data-type="codeline">Sent:info日志</code>

复制代码

Consumer1

指定需要 routing key 为 error 的消息。

import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
import java.io.IOException;import java.util.concurrent.TimeoutException;
public class Consumer1 {
public final static String EXCHANGE_NAME = "direct";
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { //创建连接、连接到RabbitMQ Connection connection = RabbitMQUtils.getConnection(); //创建一个信道 final Channel channel = connection.createChannel(); //信道设置交换器类型(direct) channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //声明一个随机队列 String queueName = channel.queueDeclare().getQueue(); //绑定 channel.queueBind(queueName, DirectProducer.EXCHANGE_NAME, "error"); System.out.println("队列[" + queueName + "]等待消息:"); // 创建队列消费者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接收消息 [" + envelope.getRoutingKey() + "] " + message); } }; channel.basicConsume(queueName, true, consumer); }}

复制代码

接收消息:

<code data-type="codeline">队列[amq.gen-NhIiesUDi547ZGr4JBEsnA]等待消息:</code><code data-type="codeline">接收消息 [error] error日志</code><code data-type="codeline">接收消息 [error] error日志</code>

复制代码

Consumer1

指定需要 routing key 为 info、error、warning 的消息。

import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
import java.io.IOException;import java.util.concurrent.TimeoutException;
public class Consumer2 {
public final static String EXCHANGE_NAME = "direct";
public static void main(String[] args) throws IOException { //创建连接、连接到RabbitMQ Connection connection = RabbitMQUtils.getConnection(); //创建一个信道 final Channel channel = connection.createChannel(); //信道设置交换器类型(direct) channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //声明一个随机队列 String queueName = channel.queueDeclare().getQueue(); //绑定 String[] routeKeys = {"info", "warning", "error"}; for (String routekey : routeKeys) { channel.queueBind(queueName, DirectProducer.EXCHANGE_NAME, routekey); } System.out.println("队列[" + queueName + "]等待消息:"); // 创建队列消费者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接收消息 [" + envelope.getRoutingKey() + "] " + message); } }; channel.basicConsume(queueName, true, consumer); }}

复制代码

接收消息:

<code data-type="codeline">队列[amq.gen-thfvXuQSfXHEVFRwHKZAFA]等待消息:</code><code data-type="codeline">接收消息 [warning] warning日志</code><code data-type="codeline">接收消息 [error] error日志</code><code data-type="codeline">接收消息 [info] info日志</code><code data-type="codeline">接收消息 [warning] warning日志</code><code data-type="codeline">接收消息 [error] error日志</code><code data-type="codeline">接收消息 [info] info日志</code>

复制代码

Topics-topic

Topic 类型的 Exchange 与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!

  • #:匹配一个或多个词

  • *:匹配一个词

<code data-type="codeline">user.#  # 可以匹配到 user.add  user.add.batch</code><code data-type="codeline">user.*  # 只能匹配到 user.add ,不能匹配到 user.add.batch</code>

复制代码

假如你准备去买宠物,宠物的种类有 rabbit,cat,dog,宠物的颜色有 white,blue,grey,宠物的性格为 A,B,C。若按照路由键规则:种类 . 颜色 . 性格,则会产生 3*3*3=27 条消息,如 rabbit.white.A

Producer

import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import java.io.IOException;import java.util.concurrent.TimeoutException;
public class TopicProducer {
public final static String EXCHANGE_NAME = "topic";
public static void main(String[] args) throws IOException, TimeoutException { //创建连接、连接到RabbitMQ Connection connection = RabbitMQUtils.getConnection(); // 创建一个信道 Channel channel = connection.createChannel(); // 指定转发 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //宠物种类 String[] pets = {"rabbit", "cat", "dog"}; for (int i = 0; i < 3; i++) { //宠物颜色 String[] colors = {"white", "blue", "grey"}; for (int j = 0; j < 3; j++) { //宠物性格 String[] character = {"A", "B", "C"}; for (int k = 0; k < 3; k++) { // 发送的消息 String routeKey = pets[i % 3] + "." + colors[j % 3] + "." + character[k % 3]; String message = "宠物信息:" + routeKey; channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes()); System.out.println(" [x] Sent " + message); } } } // 关闭连接 channel.close(); connection.close(); }}

复制代码

Consumer

1、如果你是开宠物店,需要所有的宠物

routingKey = #
import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer {
public static void main(String[] argv) throws IOException { //创建连接、连接到RabbitMQ Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(TopicProducer.EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //声明一个随机队列 String queueName = channel.queueDeclare().getQueue(); //routingKey设置为 # channel.queueBind(queueName, TopicProducer.EXCHANGE_NAME, "#"); System.out.println("队列[" + queueName + "]等待消息:"); // 创建队列消费者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接收消息 [" + envelope.getRoutingKey() + "] " + message); } }; channel.basicConsume(queueName, true, consumer); }}

复制代码

接收消息:

<code data-type="codeline">队列[amq.gen-eaK9M1vqEtY6WjivxrzqfA]等待消息:</code><code data-type="codeline">接收消息 [rabbit.white.A] 宠物信息:rabbit.white.A</code><code data-type="codeline">接收消息 [rabbit.white.B] 宠物信息:rabbit.white.B</code><code data-type="codeline">接收消息 [rabbit.white.C] 宠物信息:rabbit.white.C</code><code data-type="codeline">接收消息 [rabbit.blue.A] 宠物信息:rabbit.blue.A</code><code data-type="codeline">接收消息 [rabbit.blue.B] 宠物信息:rabbit.blue.B</code><code data-type="codeline">......</code><code data-type="codeline">//接收所有消息,省略</code>

复制代码

2、如果你仅仅是想买猫,但是想先了解猫的颜色和性格

消费者代码同上,修改 channel.queueBind(queueName,TopicProducer.EXCHANGE_NAME,"cat.#") 即可

routingKey = cat.#

接收消息

<code data-type="codeline">队列[amq.gen-Fy0aH4610sLNLrkoJKl_uA]等待消息:</code><code data-type="codeline">接收消息 [cat.white.A] 宠物信息:cat.white.A</code><code data-type="codeline">接收消息 [cat.white.B] 宠物信息:cat.white.B</code><code data-type="codeline">接收消息 [cat.white.C] 宠物信息:cat.white.C</code><code data-type="codeline">接收消息 [cat.blue.A] 宠物信息:cat.blue.A</code><code data-type="codeline">接收消息 [cat.blue.B] 宠物信息:cat.blue.B</code><code data-type="codeline">接收消息 [cat.blue.C] 宠物信息:cat.blue.C</code><code data-type="codeline">接收消息 [cat.grey.A] 宠物信息:cat.grey.A</code><code data-type="codeline">接收消息 [cat.grey.B] 宠物信息:cat.grey.B</code><code data-type="codeline">接收消息 [cat.grey.C] 宠物信息:cat.grey.C</code>

复制代码

3、如果你想买 A 种性格的猫

routingKey = cat.*.A 或 routingKey = cat.#.A

接收消息:

<code data-type="codeline">队列[amq.gen-xSuwMezB1VcEhcR0SfeKGA]等待消息:</code><code data-type="codeline">接收消息 [cat.white.A] 宠物信息:cat.white.A</code><code data-type="codeline">接收消息 [cat.blue.A] 宠物信息:cat.blue.A</code><code data-type="codeline">接收消息 [cat.grey.A] 宠物信息:cat.grey.A</code>

复制代码

4、如果你想买白颜色的宠物

routingKey = #.white.#

接收消息:

<code data-type="codeline">队列[amq.gen-1HSVv0nTfApQ_PT98lF-qQ]等待消息:</code><code data-type="codeline">接收消息 [rabbit.white.A] 宠物信息:rabbit.white.A</code><code data-type="codeline">接收消息 [rabbit.white.B] 宠物信息:rabbit.white.B</code><code data-type="codeline">接收消息 [rabbit.white.C] 宠物信息:rabbit.white.C</code><code data-type="codeline">接收消息 [cat.white.A] 宠物信息:cat.white.A</code><code data-type="codeline">接收消息 [cat.white.B] 宠物信息:cat.white.B</code><code data-type="codeline">接收消息 [cat.white.C] 宠物信息:cat.white.C</code><code data-type="codeline">接收消息 [dog.white.A] 宠物信息:dog.white.A</code><code data-type="codeline">接收消息 [dog.white.B] 宠物信息:dog.white.B</code><code data-type="codeline">接收消息 [dog.white.C] 宠物信息:dog.white.C</code>

复制代码

5、如果你想买 B 种性格的宠物

routingKey = #.B

接收消息:

<code data-type="codeline">队列[amq.gen-K-XtEdYjBHwcx6nAuUwhBg]等待消息:</code><code data-type="codeline">接收消息 [rabbit.white.B] 宠物信息:rabbit.white.B</code><code data-type="codeline">接收消息 [rabbit.blue.B] 宠物信息:rabbit.blue.B</code><code data-type="codeline">接收消息 [rabbit.grey.B] 宠物信息:rabbit.grey.B</code><code data-type="codeline">接收消息 [cat.white.B] 宠物信息:cat.white.B</code><code data-type="codeline">接收消息 [cat.blue.B] 宠物信息:cat.blue.B</code><code data-type="codeline">接收消息 [cat.grey.B] 宠物信息:cat.grey.B</code><code data-type="codeline">接收消息 [dog.white.B] 宠物信息:dog.white.B</code><code data-type="codeline">接收消息 [dog.blue.B] 宠物信息:dog.blue.B</code><code data-type="codeline">接收消息 [dog.grey.B] 宠物信息:dog.grey.B</code>

复制代码

6、如果你想买白色,C 种性格的猫

routingKey = cat.white.C

接收消息:

<code data-type="codeline">队列[amq.gen-LojPv9XhqR_y5SE0wqeduA]等待消息:</code><code data-type="codeline">接收消息 [cat.white.C] 宠物信息:cat.white.C</code>

复制代码