RabbitMQ--基础--7.5--工作模式--主题模式(Topic)

RabbitMQ–基础–7.5–工作模式–主题模式(Topic)


代码位置

https://gitee.com/DanShenGuiZu/learnDemo/tree/master/rabbitMq-learn/rabbitMq-03

1、介绍

  1. topic类型的交换机 实际上是 direct类型的交换机的一种
    1. 都将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中
    2. topic 在匹配规则上进行了扩展。使用了通配符
  2. 工作流程
    1. 消息生产者生产消息,把消息交给交换机 exchange
    2. 交换机 exchange 根据 key 的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费

1.1、匹配规则

1.1.1、RoutingKey

  1. 为一个点号 “.” 分割的字符串,被点号 “.” 分割开的每一段独立的字符串成为一个单词。
  2. 举例:com.rabbitmq.client、java.util.concurrent

1.1.2、BindingKey

  1. RountingKey一样,也是点号 “.” 分割的字符串
  2. BindingKey存在两种特殊的字符串,用于做模糊匹配
    1. “*” :用于匹配一个单词
    2. “#” :用于匹配多个单词(可以是零个)。

1.1.3、BindingKey模糊匹配案例

  1. com.#:可以匹配:com.zzc.java、com.zzc等
  2. com.*:可以匹配:com.zzc、com.java 等

1.2、Topic 模型

  1. 路由键为 com.rabbitmq.client 的消息,会同时路由到 Q1、Q2、Q3
  2. 路由键为 com.hidden.client 的消息,会路由到 Q2、Q3
  3. 路由键为 com.hidden.demo 的消息,会路由到 Q3
  4. 路由键为 java.util.concurrent 的消息,会被丢弃或者返回给生产者,因为,它没有匹配任何路由键

在这里插入图片描述

1.3、和路由模式区别

  1. 绑定键 BindingKey 中带有模糊匹配
  2. 路由键 RoutingKey 由多个单词构成

2、MQ实现

  1. Topic 在 direct 模式上面进一步筛选

2.1、创建队列

topic_queue1
topic_queue2
topic_queue3

在这里插入图片描述

2.2、新建一个交换机

  1. 新建一个交换机
  2. 类型为 topic
topic_exchange

在这里插入图片描述

在这里插入图片描述

2.3、绑定交换机与队列的关系

添加绑定键。
1. # 代表0个或多个
2. * 代表一个

2.3.1、测试使用的数据

topic_queue1
topic_queue2
topic_queue3
*.rabbitmq.*
*.*.client
com.#
com.rabbitmq.client

在这里插入图片描述

2.3.2、最终效果

在这里插入图片描述

2.4、在交换机 topic_exchange 中发送消息

  1. 需要指定一个路由键

在这里插入图片描述

2.5、查看消息

路由键为 com.rabbitmq.client 的消息,会同时路由到 topic_queue1、topic_queue2、topic_queue3

在这里插入图片描述

3、代码实现

3.1、代码结构

在这里插入图片描述

3.2、生产者

package com.example.rabbitmq03.business.test6;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {
    
    private static final String TOPIC_EXCHANGE_NAME = "code_topic_exchange";
    
    public static void main(String[] args) throws Exception {
        // 1. 获取连接
        Connection connection = RabbitMqUtil.getConnection("生产者");
        
        // 2. 通过连接获取通道 Channel
        Channel channel = connection.createChannel();
        // 3. 通过通道声明交换机,以及交换机类型为 direct
        /**
         * @param1:交换机名称
         * @param2:交换机类型
         */
        channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        
        // 4. 消息内容
        String message = "你好 Topic!!";
        String routingKey = "com.rabbitmq.client";
        // 5. 发送消息到交换机,并指定路由键 RoutingKey 为 com.rabbitmq.client
        channel.basicPublish(TOPIC_EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println("消息发送完成~~~发送的消息为:" + message);
        // 6. 关闭信道、连接
        RabbitMqUtil.close(connection, channel);
    }
}

3.3、消费者

3.3.1、Consumer

package com.example.rabbitmq03.business.test6;

import java.io.IOException;

import com.rabbitmq.client.*;

public class Consumer {
    
    private static final String TOPIC_QUEUE_NAME = "topic_queue1";
    private static final String TOPIC_EXCHANGE_NAME = "code_topic_exchange";
    
    public static void main(String[] args) throws Exception {
        // 获取连接
        Connection connection = RabbitMqUtil.getConnection("消费者");
        // 获取通道
        Channel channel = connection.createChannel();
        String bindingKey = "*.rabbitmq.*";
        // 绑定队列到交换机,并指定一个绑定键 BindingKey
        channel.queueBind(TOPIC_QUEUE_NAME, TOPIC_EXCHANGE_NAME, bindingKey);
        
        // 定义消费者
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                // body 消息体
                String msg = new String(body, "utf-8");
                System.out.println("收到消息:" + msg);
            }
        };
        
        // 监听队列
        channel.basicConsume(TOPIC_QUEUE_NAME, true, consumer);
        
        System.out.println("开始接收消息~~~");
        System.in.read();
        // 关闭信道、连接
        RabbitMqUtil.close(connection, channel);
    }
}

3.3.1、Consumer2

package com.example.rabbitmq03.business.test6;

import java.io.IOException;

import com.rabbitmq.client.*;

public class Consumer2 {
    
    private static final String TOPIC_QUEUE_NAME = "topic_queue2";
    private static final String TOPIC_EXCHANGE_NAME = "code_topic_exchange";
    
    public static void main(String[] args) throws Exception {
        // 获取连接
        Connection connection = RabbitMqUtil.getConnection("消费者");
        // 获取通道
        Channel channel = connection.createChannel();
        String bindingKey = "*.*.client";
        // 绑定队列到交换机,并指定一个绑定键 BindingKey
        channel.queueBind(TOPIC_QUEUE_NAME, TOPIC_EXCHANGE_NAME, bindingKey);
        
        // 定义消费者
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                // body 消息体
                String msg = new String(body, "utf-8");
                System.out.println("收到消息:" + msg);
            }
        };
        
        // 监听队列
        channel.basicConsume(TOPIC_QUEUE_NAME, true, consumer);
        
        System.out.println("开始接收消息~~~");
        System.in.read();
        // 关闭信道、连接
        RabbitMqUtil.close(connection, channel);
    }
}

3.4、测试

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述