RabbitMQ 的五种消息模型
RabbitMQ 提供了 6 种消息模型,但常用的是前面 5 种,第 6 种实际上为 RPC,所以一般来说了解前面 5 种即可,而对于后面三种,是根据 Exchange 类型划分的。
注:对下面模式的讲解主要基于 Java 原生 API 操作 ,因此在项目中需要添加如下依赖。
<code data-type="codeline"><dependency></code><code data-type="codeline"> <groupId>com.rabbitmq</groupId></code><code data-type="codeline"> <artifactId>amqp-client</artifactId></code><code data-type="codeline"> <version>5.9.0</version></code><code data-type="codeline"></dependency></code>
复制代码
为了后续的操作先定义一个连接 rabbitmq 的连接工具类
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQUtils {
private static ConnectionFactory connectionFactory;
static {
connectionFactory = new ConnectionFactory();
//我们把重量级资源通过单例模式加载
connectionFactory.setHost("192.168.153.128");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
//上面创建的VHost
connectionFactory.setVirtualHost("/order");
}
//定义提供连接对象的方法
public static Connection getConnection() {
try {
return connectionFactory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
//定义关闭通道和关闭连接工具方法
public static void closeConnectionAndChanel(Channel channel, Connection conn) {
try {
if (channel != null) {
channel.close();
}
if (conn != null) {
conn.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
复制代码
基本消息模型
RabbitMQ 是一个消息代理:它接受和转发消息。可以将其视为邮局:当你将要投递的邮件放入邮箱时,你可以确定信件承运人最终会将邮件递送给你的收件人。在这个比喻中,RabbitMQ 是一个邮箱、一个邮局和一个信件载体。
-
P:生产者,发送消息到消息队列
-
C:消费者:消息的接受者,会一直等待消息到来。
-
queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
1、发送消息
在原生 JavaAPI 中,通过 queueDeclare
方法去申明队列:
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
复制代码
参数说明:
-
queue,队列名称。
-
durable,是否持久化,如果持久化,mq 重启后队列还在。
-
exclusive,是否独占连接,队列只允许在该连接中访问,如果 connection 连接关闭队列则自动删除,如果将此参数设置 true 可用于临时队列的创建。
-
autoDelete,自动删除,队列不再使用时是否自动删除此队列,如果将此参数和 exclusive 参数设置为 true 就可以实现临时队列(队列不用了就自动删除)。
-
arguments参数,可以设置一个队列的扩展参数,比如:可设置存活时间等。
主要通过 basicPublish
方法
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
复制代码
参数说明:
-
exchange,交换机,如果不指定将使用 mq 的默认交换机(设置为"")。
-
routingKey,路由 key,交换机根据路由 key 来将消息转发到指定的队列,如果使用默认交换机,routingKey 设置为队列的名称。
-
props,消息的属性。
-
body,消息内容。
代码实现
Producer
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
//定义队列名称
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
// 1、获取到连接
Connection connection = RabbitMQUtils.getConnection();
// 2、从连接中创建通道,使用通道才能完成消息相关的操作
Channel channel = connection.createChannel();
// 3、声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4、消息内容
String message = "Hello World!";
// 向指定的队列中发送消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
//关闭通道和连接
channel.close();
connection.close();
}
}
复制代码
去控制台查看:
2、接收消息
接收消息 consumer#handleDelivery
方法:
void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException;
复制代码
Consumer
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println(new String(body));
}
};
//自动ack
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
复制代码
work 消息模型
多个消费者监听同一队列。消费者接收到消息后, 通过线程池异步消费。但是一个消息只能被一个消费者获取。work queue 常用于避免消息堆积问题。
-
P:生产者,发布任务。
-
C1:消费者 1,领取任务并且完成任务,假设完成速度较慢(模拟耗时)
-
C2:消费者 2,领取任务并且完成任务,假设完成速度较快
Producer
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQUtils {
//定义提供连接对象的方法
public static Connection getConnection() {
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
//我们把重量级资源通过单例模式加载
connectionFactory.setHost("192.168.153.128");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("order");
return connectionFactory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
//定义关闭通道和关闭连接工具方法
public static void closeConnectionAndChanel(Channel channel, Connection conn) {
try {
if (channel != null) {
channel.close();
}
if (conn != null) {
conn.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
复制代码
Consumer1
import cn.javatv.javaAPI.RabbitMQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.util.concurrent.TimeUnit;
public class Consumer1 {
private final static String QUEUE_NAME = "work";
public static void main(String[] args) throws Exception {
// 获取到连接
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
try {
//模拟任务耗时
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Consumer1_" +new String(body));
}
};
//自动ack
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
复制代码
Consumer2
import cn.javatv.javaAPI.RabbitMQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Consumer2 {
private final static String QUEUE_NAME = "work";
public static void main(String[] args) throws Exception {
// 获取到连接
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("Consumer2_" + new String(body));
}
};
//自动ack
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
复制代码
先启动消费者,在启动生成者,输出如下:
我们发现消费者是按照轮询消费的,但这种消费存在一个问题,假如 Consumer1 处理能力极快,Consumer2 (代码中休眠了 2s)处理能力极慢,这是 Consumer2 会严重拖累整体消费进度,而 Consuemr1 又早早的完成任务而无所事事。
能者多劳
从上面的结果可以看出,任务是平均分配的,也就是说,不管你上个任务是否完成,我继续把后面的任务分发给你,而实际上为了效率,谁消费得越快,谁就得到越多。因此可以通过 BasicQos 方法的参数设为 1,前提是在手动 ack 的情况下才生效,即 autoAck = false 。
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class Consumer2 {
private final static String QUEUE_NAME = "work";
public static void main(String[] args) throws Exception {
// 获取到连接
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//设置消费者同时只能处理一条消息
channel.basicQos(1);
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//模拟任务耗时
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Consumer2_" + new String(body));
//确认消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//手动ack
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
复制代码
输出结果:
可以看到 Consumer1 消费了 19 个,Consumer2 才消费 1 个。
Publish/Subscribe-Fanout
一次向多个消费者发送消息,该模式的交换机类型为 Fanout,也称为广播。
它具有以下性质:
-
可以有多个消费者。
-
每个消费者有自己的 queue。
-
每个队列都要绑定到 Exchange。
-
生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
-
交换机把消息发送给绑定过的所有队列。
-
队列的消费者都能拿到消息,实现一条消息被多个消费者消费。
Producer
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class FanoutProducer {
public final static String EXCHANGE_NAME = "fanout";
public static void main(String[] args) throws IOException, TimeoutException {
//建立连接
Connection connection = RabbitMQUtils.getConnection();
// 创建一个信道
Channel channel = connection.createChannel();
// 指定转发类型为FANOUT
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//发送3条消息,且路由键不同
for (int i = 1; i <= 3; i++) {
//路由键,循环3次,路由键为routekey1,routekey2,routekey3
String routekey = "routekey" + i;
// 发送的消息
String message = "fanout_" + i;
/*
* 参数1:exchange name 交换机
* 参数2:routing key 路由键
*/
channel.basicPublish(EXCHANGE_NAME, routekey, null, message.getBytes());
System.out.println(" [x] Sent '" + routekey + "':'" + message + "'");
}
// 关闭
channel.close();
connection.close();
}
}
复制代码
Consumer1
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer1 {
public final static String EXCHANGE_NAME = "fanout";
public static void main(String[] argv) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 声明一个随机队列
String queueName = channel.queueDeclare().getQueue();
/*
* 队列绑定到交换器上时,是允许绑定多个路由键的,也就是多重绑定
*/
String[] routekeys = {"routekey1", "routekey2", "routekey3"};
for (String routekey : routekeys) {
//绑定
channel.queueBind(queueName, FanoutProducer.EXCHANGE_NAME, routekey);
}
System.out.println("[" + queueName + "]等待消息:");
// 创建队列消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("接收" + envelope.getRoutingKey() + ":" + message);
}
};
channel.basicConsume(queueName, true, consumer);
}
}
复制代码
我们看看 fanout 的定义:
消息广播到绑定的队列,不管队列绑定了什么路由键,消息经过交换机,每个队列都有一份。
换句话说,只要队列和交换机绑定,不在乎路由键是什么都能接收消息。
如绑定一个不存在的路由键:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 类说明:fanout消费者--绑定一个不存在的路由键
*/
public class Consumer2 {
public final static String EXCHANGE_NAME = "fanout";
public static void main(String[] argv) throws IOException, TimeoutException {
Connection connection = RabbitMQUtils.getConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 声明一个随机队列
String queueName = channel.queueDeclare().getQueue();
//设置一个不存在的路由键
String routekey = "xxx";
channel.queueBind(queueName, FanoutProducer.EXCHANGE_NAME, routekey);
System.out.println("队列[" + queueName + "]等待消息:");
// 创建队列消费者
final Consumer consumerB = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
//记录日志到文件:
System.out.println("接收消息 [" + envelope.getRoutingKey() + "] " + message);
}
};
channel.basicConsume(queueName, true, consumerB);
}
}
复制代码
输出:
<code data-type="codeline">队列[amq.gen-G2LL566wrSH3mGBUF6XKCQ]等待消息:</code><code data-type="codeline">接收消息 [routekey1] fanout_1</code><code data-type="codeline">接收消息 [routekey2] fanout_2</code><code data-type="codeline">接收消息 [routekey3] fanout_3</code>
复制代码
不管我们如何调整生产者和消费者的路由键,都对消息的接收没有影响。
Routing-Direct
在 Direct 模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由 key),消息的发送方在向 Exchange 发送消息时,也必须指定消息的 routing key。
-
P:生产者,向 Exchange 发送消息,发送消息时,会指定一个 routing key。
-
X:Exchange,接收生产者的消息,然后把消息递交给 与 routing key 完全匹配的队列。
-
C1:消费者,其所在队列指定了需要 routing key 为 error 的消息。
-
C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息。
Producer
发送 3 种不同类型的日志。
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DirectProducer {
public final static String EXCHANGE_NAME = "direct";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接、连接到RabbitMQ
Connection connection = RabbitMQUtils.getConnection();
//创建信道
Channel channel = connection.createChannel();
//在信道中设置交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//申明队列(放在消费者中去做)
String[] routeKeys = {"info", "warning", "error"};
for (int i = 1; i <= 6; i++) {
String routeKey = routeKeys[i % 3];
String msg = routeKey + "日志";
//发布消息
channel.basicPublish(EXCHANGE_NAME, routeKey, null, msg.getBytes());
System.out.println("Sent:" + msg);
}
channel.close();
connection.close();
}
}
复制代码
生产消息:
<code data-type="codeline">Sent:warning日志</code><code data-type="codeline">Sent:error日志</code><code data-type="codeline">Sent:info日志</code><code data-type="codeline">Sent:warning日志</code><code data-type="codeline">Sent:error日志</code><code data-type="codeline">Sent:info日志</code>
复制代码
Consumer1
指定需要 routing key 为 error 的消息。
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public final static String EXCHANGE_NAME = "direct";
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
//创建连接、连接到RabbitMQ
Connection connection = RabbitMQUtils.getConnection();
//创建一个信道
final Channel channel = connection.createChannel();
//信道设置交换器类型(direct)
channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明一个随机队列
String queueName = channel.queueDeclare().getQueue();
//绑定
channel.queueBind(queueName, DirectProducer.EXCHANGE_NAME, "error");
System.out.println("队列[" + queueName + "]等待消息:");
// 创建队列消费者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("接收消息 [" + envelope.getRoutingKey() + "] " + message);
}
};
channel.basicConsume(queueName, true, consumer);
}
}
复制代码
接收消息:
<code data-type="codeline">队列[amq.gen-NhIiesUDi547ZGr4JBEsnA]等待消息:</code><code data-type="codeline">接收消息 [error] error日志</code><code data-type="codeline">接收消息 [error] error日志</code>
复制代码
Consumer1
指定需要 routing key 为 info、error、warning 的消息。
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer2 {
public final static String EXCHANGE_NAME = "direct";
public static void main(String[] args) throws IOException {
//创建连接、连接到RabbitMQ
Connection connection = RabbitMQUtils.getConnection();
//创建一个信道
final Channel channel = connection.createChannel();
//信道设置交换器类型(direct)
channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明一个随机队列
String queueName = channel.queueDeclare().getQueue();
//绑定
String[] routeKeys = {"info", "warning", "error"};
for (String routekey : routeKeys) {
channel.queueBind(queueName, DirectProducer.EXCHANGE_NAME, routekey);
}
System.out.println("队列[" + queueName + "]等待消息:");
// 创建队列消费者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("接收消息 [" + envelope.getRoutingKey() + "] " + message);
}
};
channel.basicConsume(queueName, true, consumer);
}
}
复制代码
接收消息:
<code data-type="codeline">队列[amq.gen-thfvXuQSfXHEVFRwHKZAFA]等待消息:</code><code data-type="codeline">接收消息 [warning] warning日志</code><code data-type="codeline">接收消息 [error] error日志</code><code data-type="codeline">接收消息 [info] info日志</code><code data-type="codeline">接收消息 [warning] warning日志</code><code data-type="codeline">接收消息 [error] error日志</code><code data-type="codeline">接收消息 [info] info日志</code>
复制代码
Topics-topic
Topic 类型的 Exchange 与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
-
#:匹配一个或多个词
-
*:匹配一个词
<code data-type="codeline">user.# # 可以匹配到 user.add user.add.batch</code><code data-type="codeline">user.* # 只能匹配到 user.add ,不能匹配到 user.add.batch</code>
复制代码
假如你准备去买宠物,宠物的种类有 rabbit,cat,dog,宠物的颜色有 white,blue,grey,宠物的性格为 A,B,C。若按照路由键规则:种类 . 颜色 . 性格,则会产生 3*3*3=27
条消息,如 rabbit.white.A
。
Producer
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TopicProducer {
public final static String EXCHANGE_NAME = "topic";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接、连接到RabbitMQ
Connection connection = RabbitMQUtils.getConnection();
// 创建一个信道
Channel channel = connection.createChannel();
// 指定转发
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//宠物种类
String[] pets = {"rabbit", "cat", "dog"};
for (int i = 0; i < 3; i++) {
//宠物颜色
String[] colors = {"white", "blue", "grey"};
for (int j = 0; j < 3; j++) {
//宠物性格
String[] character = {"A", "B", "C"};
for (int k = 0; k < 3; k++) {
// 发送的消息
String routeKey = pets[i % 3] + "." + colors[j % 3] + "." + character[k % 3];
String message = "宠物信息:" + routeKey;
channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes());
System.out.println(" [x] Sent " + message);
}
}
}
// 关闭连接
channel.close();
connection.close();
}
}
复制代码
Consumer
1、如果你是开宠物店,需要所有的宠物
routingKey = #
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer {
public static void main(String[] argv) throws IOException {
//创建连接、连接到RabbitMQ
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(TopicProducer.EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//声明一个随机队列
String queueName = channel.queueDeclare().getQueue();
//routingKey设置为 #
channel.queueBind(queueName, TopicProducer.EXCHANGE_NAME, "#");
System.out.println("队列[" + queueName + "]等待消息:");
// 创建队列消费者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("接收消息 [" + envelope.getRoutingKey() + "] " + message);
}
};
channel.basicConsume(queueName, true, consumer);
}
}
复制代码
接收消息:
<code data-type="codeline">队列[amq.gen-eaK9M1vqEtY6WjivxrzqfA]等待消息:</code><code data-type="codeline">接收消息 [rabbit.white.A] 宠物信息:rabbit.white.A</code><code data-type="codeline">接收消息 [rabbit.white.B] 宠物信息:rabbit.white.B</code><code data-type="codeline">接收消息 [rabbit.white.C] 宠物信息:rabbit.white.C</code><code data-type="codeline">接收消息 [rabbit.blue.A] 宠物信息:rabbit.blue.A</code><code data-type="codeline">接收消息 [rabbit.blue.B] 宠物信息:rabbit.blue.B</code><code data-type="codeline">......</code><code data-type="codeline">//接收所有消息,省略</code>
复制代码
2、如果你仅仅是想买猫,但是想先了解猫的颜色和性格
消费者代码同上,修改 channel.queueBind(queueName,TopicProducer.EXCHANGE_NAME,"cat.#")
即可
routingKey = cat.#
接收消息
<code data-type="codeline">队列[amq.gen-Fy0aH4610sLNLrkoJKl_uA]等待消息:</code><code data-type="codeline">接收消息 [cat.white.A] 宠物信息:cat.white.A</code><code data-type="codeline">接收消息 [cat.white.B] 宠物信息:cat.white.B</code><code data-type="codeline">接收消息 [cat.white.C] 宠物信息:cat.white.C</code><code data-type="codeline">接收消息 [cat.blue.A] 宠物信息:cat.blue.A</code><code data-type="codeline">接收消息 [cat.blue.B] 宠物信息:cat.blue.B</code><code data-type="codeline">接收消息 [cat.blue.C] 宠物信息:cat.blue.C</code><code data-type="codeline">接收消息 [cat.grey.A] 宠物信息:cat.grey.A</code><code data-type="codeline">接收消息 [cat.grey.B] 宠物信息:cat.grey.B</code><code data-type="codeline">接收消息 [cat.grey.C] 宠物信息:cat.grey.C</code>
复制代码
3、如果你想买 A 种性格的猫
routingKey = cat.*.A 或 routingKey = cat.#.A
接收消息:
<code data-type="codeline">队列[amq.gen-xSuwMezB1VcEhcR0SfeKGA]等待消息:</code><code data-type="codeline">接收消息 [cat.white.A] 宠物信息:cat.white.A</code><code data-type="codeline">接收消息 [cat.blue.A] 宠物信息:cat.blue.A</code><code data-type="codeline">接收消息 [cat.grey.A] 宠物信息:cat.grey.A</code>
复制代码
4、如果你想买白颜色的宠物
routingKey = #.white.#
接收消息:
<code data-type="codeline">队列[amq.gen-1HSVv0nTfApQ_PT98lF-qQ]等待消息:</code><code data-type="codeline">接收消息 [rabbit.white.A] 宠物信息:rabbit.white.A</code><code data-type="codeline">接收消息 [rabbit.white.B] 宠物信息:rabbit.white.B</code><code data-type="codeline">接收消息 [rabbit.white.C] 宠物信息:rabbit.white.C</code><code data-type="codeline">接收消息 [cat.white.A] 宠物信息:cat.white.A</code><code data-type="codeline">接收消息 [cat.white.B] 宠物信息:cat.white.B</code><code data-type="codeline">接收消息 [cat.white.C] 宠物信息:cat.white.C</code><code data-type="codeline">接收消息 [dog.white.A] 宠物信息:dog.white.A</code><code data-type="codeline">接收消息 [dog.white.B] 宠物信息:dog.white.B</code><code data-type="codeline">接收消息 [dog.white.C] 宠物信息:dog.white.C</code>
复制代码
5、如果你想买 B 种性格的宠物
routingKey = #.B
接收消息:
<code data-type="codeline">队列[amq.gen-K-XtEdYjBHwcx6nAuUwhBg]等待消息:</code><code data-type="codeline">接收消息 [rabbit.white.B] 宠物信息:rabbit.white.B</code><code data-type="codeline">接收消息 [rabbit.blue.B] 宠物信息:rabbit.blue.B</code><code data-type="codeline">接收消息 [rabbit.grey.B] 宠物信息:rabbit.grey.B</code><code data-type="codeline">接收消息 [cat.white.B] 宠物信息:cat.white.B</code><code data-type="codeline">接收消息 [cat.blue.B] 宠物信息:cat.blue.B</code><code data-type="codeline">接收消息 [cat.grey.B] 宠物信息:cat.grey.B</code><code data-type="codeline">接收消息 [dog.white.B] 宠物信息:dog.white.B</code><code data-type="codeline">接收消息 [dog.blue.B] 宠物信息:dog.blue.B</code><code data-type="codeline">接收消息 [dog.grey.B] 宠物信息:dog.grey.B</code>
复制代码
6、如果你想买白色,C 种性格的猫
routingKey = cat.white.C
接收消息:
<code data-type="codeline">队列[amq.gen-LojPv9XhqR_y5SE0wqeduA]等待消息:</code><code data-type="codeline">接收消息 [cat.white.C] 宠物信息:cat.white.C</code>
复制代码
- 从“数据孤岛”到统一数据体系,明源云 DataOps 实践探索之路 | 卓越技术团队访谈录
- 如何管理你下载的一大堆 Python 包【❤️win 环境及 linux 环境下创建虚拟环境详解❤️】
- 在 Hadoop 环境里面统计西游记文章的词组(hdfs 实验)
- 一节课让你彻底搞懂 python 里面试最常问问题之一深浅复制
- 学 C 还是学 Java?做软件研发还需掌握哪些知识和技能?
- 云原生到底是什么?它会是未来发展的趋势吗?
- Java 流程控制
- 〖Docker 指南③〗Docker 镜像的深度解析
- 我不想 MySQL 分片
- 谷歌卷自己,继 Imagen 之后继续放大招:靠 200 亿参数由文本生成的图像惊呆网友!
- 雨林开源行:畅聊开源,走近 Gitee!
- 英伟达是如何做 GPU 编程的(一)
- 超级详细的 Maven 教程(基础 高级)
- 6 年技术迭代,一文详解阿里全球化出海 & 合规的挑战及探索
- 尤雨溪向 React 推荐自己研发的 Vite,网友:用第三方工具没有任何意义
- Mac 中 Git 如何忽略.DS_Store 文件
- 从算法到工程 - 推荐系统全面总结
- Android 自定义 View 之随机数验证码
- Meta 如何实现大规模无身份信息认证?
- 基于云内核的未来云计算架构