RabbitMQ--基础--7.5--工作模式--主题模式(Topic)
RabbitMQ–基础–7.5–工作模式–主题模式(Topic)
代码位置
https://gitee.com/DanShenGuiZu/learnDemo/tree/master/rabbitMq-learn/rabbitMq-03
1、介绍
- topic类型的交换机 实际上是 direct类型的交换机的一种
- 都将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中
- topic 在匹配规则上进行了扩展。使用了通配符
- 工作流程
- 消息生产者生产消息,把消息交给交换机 exchange
- 交换机 exchange 根据 key 的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费
1.1、匹配规则
1.1.1、RoutingKey
- 为一个点号 “.” 分割的字符串,被点号 “.” 分割开的每一段独立的字符串成为一个单词。
- 举例:com.rabbitmq.client、java.util.concurrent
1.1.2、BindingKey
- RountingKey一样,也是点号 “.” 分割的字符串
- BindingKey存在两种特殊的字符串,用于做模糊匹配
- “*” :用于匹配一个单词
- “#” :用于匹配多个单词(可以是零个)。
1.1.3、BindingKey模糊匹配案例
- com.#:可以匹配:com.zzc.java、com.zzc等
- com.*:可以匹配:com.zzc、com.java 等
1.2、Topic 模型
- 路由键为 com.rabbitmq.client 的消息,会同时路由到 Q1、Q2、Q3
- 路由键为 com.hidden.client 的消息,会路由到 Q2、Q3
- 路由键为 com.hidden.demo 的消息,会路由到 Q3
- 路由键为 java.util.concurrent 的消息,会被丢弃或者返回给生产者,因为,它没有匹配任何路由键

1.3、和路由模式区别
- 绑定键 BindingKey 中带有模糊匹配
- 路由键 RoutingKey 由多个单词构成
2、MQ实现
- Topic 在 direct 模式上面进一步筛选
2.1、创建队列
topic_queue1
topic_queue2
topic_queue3

2.2、新建一个交换机
- 新建一个交换机
- 类型为 topic
topic_exchange


2.3、绑定交换机与队列的关系
添加绑定键。
1. # 代表0个或多个
2. * 代表一个
2.3.1、测试使用的数据
topic_queue1
topic_queue2
topic_queue3
*.rabbitmq.*
*.*.client
com.#
com.rabbitmq.client

2.3.2、最终效果

2.4、在交换机 topic_exchange 中发送消息
- 需要指定一个路由键

2.5、查看消息
路由键为 com.rabbitmq.client 的消息,会同时路由到 topic_queue1、topic_queue2、topic_queue3

3、代码实现
3.1、代码结构

3.2、生产者
package com.example.rabbitmq03.business.test6;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
private static final String TOPIC_EXCHANGE_NAME = "code_topic_exchange";
public static void main(String[] args) throws Exception {
// 1. 获取连接
Connection connection = RabbitMqUtil.getConnection("生产者");
// 2. 通过连接获取通道 Channel
Channel channel = connection.createChannel();
// 3. 通过通道声明交换机,以及交换机类型为 direct
/**
* @param1:交换机名称
* @param2:交换机类型
*/
channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 4. 消息内容
String message = "你好 Topic!!";
String routingKey = "com.rabbitmq.client";
// 5. 发送消息到交换机,并指定路由键 RoutingKey 为 com.rabbitmq.client
channel.basicPublish(TOPIC_EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println("消息发送完成~~~发送的消息为:" + message);
// 6. 关闭信道、连接
RabbitMqUtil.close(connection, channel);
}
}
3.3、消费者
3.3.1、Consumer
package com.example.rabbitmq03.business.test6;
import java.io.IOException;
import com.rabbitmq.client.*;
public class Consumer {
private static final String TOPIC_QUEUE_NAME = "topic_queue1";
private static final String TOPIC_EXCHANGE_NAME = "code_topic_exchange";
public static void main(String[] args) throws Exception {
// 获取连接
Connection connection = RabbitMqUtil.getConnection("消费者");
// 获取通道
Channel channel = connection.createChannel();
String bindingKey = "*.rabbitmq.*";
// 绑定队列到交换机,并指定一个绑定键 BindingKey
channel.queueBind(TOPIC_QUEUE_NAME, TOPIC_EXCHANGE_NAME, bindingKey);
// 定义消费者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 消息体
String msg = new String(body, "utf-8");
System.out.println("收到消息:" + msg);
}
};
// 监听队列
channel.basicConsume(TOPIC_QUEUE_NAME, true, consumer);
System.out.println("开始接收消息~~~");
System.in.read();
// 关闭信道、连接
RabbitMqUtil.close(connection, channel);
}
}
3.3.1、Consumer2
package com.example.rabbitmq03.business.test6;
import java.io.IOException;
import com.rabbitmq.client.*;
public class Consumer2 {
private static final String TOPIC_QUEUE_NAME = "topic_queue2";
private static final String TOPIC_EXCHANGE_NAME = "code_topic_exchange";
public static void main(String[] args) throws Exception {
// 获取连接
Connection connection = RabbitMqUtil.getConnection("消费者");
// 获取通道
Channel channel = connection.createChannel();
String bindingKey = "*.*.client";
// 绑定队列到交换机,并指定一个绑定键 BindingKey
channel.queueBind(TOPIC_QUEUE_NAME, TOPIC_EXCHANGE_NAME, bindingKey);
// 定义消费者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 消息体
String msg = new String(body, "utf-8");
System.out.println("收到消息:" + msg);
}
};
// 监听队列
channel.basicConsume(TOPIC_QUEUE_NAME, true, consumer);
System.out.println("开始接收消息~~~");
System.in.read();
// 关闭信道、连接
RabbitMqUtil.close(connection, channel);
}
}
3.4、测试


