阅模型-匹配模式,相比于前两种订阅模型,是更细致的分组,允许 在RoutingKey 中使用匹配符

*:匹配一个单词#:匹配0个或多个单词

RabbitMQ 订阅模型-匹配(topics)模式主要有以下六个角色构成:

生产者(producer/ publisher):一个发送消息的用户应用程序。交换机(Exchange) :在 RabbitMQ 的消息传递模型中,对于 Exchange 的核心思想就是:生产者生产的消息从不会直接发送到队列,生产者只能将消息发送到交换机。交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。Exchanges 的类型:直接(direct)、主题(topic)、标题(headers)、扇出(fanout)Routing Key 路由关键字,exchange 根据这个关键字进行消息投递,Exchange 接收到的消息会带有 RoutingKey 这个字段,Exchange 就是根据这个 RoutingKey 和当前 Exchange 所有绑定的 BindingKey 做匹配,如果满足要求,就往 BindingKey 所绑定的 Queue 发送消息,这样我们就解决了我们向 RabbitMQ 发送一次消息,可以分发到不同的 Queue 的过程消费者1(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序,领取任务并完成消费者2(consumer):领取任务并完成…队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

文章目录

一、RabbitMQ 订阅模型-匹配(topics)模式1、RabbitMQ 匹配(topics)模式2、匹配(topics)模式组成

二、RabbitMQ 订阅模型-匹配(topics)模式实现1、添加 Maven 依赖2、封装工具类 ConnectionUtil3、生产者实现4、消费者-1 实现5、消费者-2 实现

三、订阅模型 三种模式区别1、RabbitMQ 消息订阅(Fanout)模式2、RabbitMQ 路由(direct)模式3、RabbitMQ 主题(topic)模式

一、RabbitMQ 订阅模型-匹配(topics)模式

1、RabbitMQ 匹配(topics)模式

阅模型-匹配模式,相比于前两种订阅模型,是更细致的分组,允许 在RoutingKey 中使用匹配符

*:匹配一个单词#:匹配0个或多个单词

2、匹配(topics)模式组成

RabbitMQ 订阅模型-匹配(topics)模式主要有以下六个角色构成:

生产者(producer/ publisher):一个发送消息的用户应用程序。交换机(Exchange) :在 RabbitMQ 的消息传递模型中,对于 Exchange 的核心思想就是:生产者生产的消息从不会直接发送到队列,生产者只能将消息发送到交换机。交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。Exchanges 的类型:直接(direct)、主题(topic)、标题(headers)、扇出(fanout)Routing Key 路由关键字,exchange 根据这个关键字进行消息投递,Exchange 接收到的消息会带有 RoutingKey 这个字段,Exchange 就是根据这个 RoutingKey 和当前 Exchange 所有绑定的 BindingKey 做匹配,如果满足要求,就往 BindingKey 所绑定的 Queue 发送消息,这样我们就解决了我们向 RabbitMQ 发送一次消息,可以分发到不同的 Queue 的过程消费者1(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序,领取任务并完成消费者2(consumer):领取任务并完成…队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

二、RabbitMQ 订阅模型-匹配(topics)模式实现

1、添加 Maven 依赖

# 在 pom.xml 文件中添加以下依赖

com.rabbitmq

amqp-client

5.16.0

junit

junit

4.13.2

test

...

2、封装工具类 ConnectionUtil

package com.lizhengi.topics;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

/**

* @author liziheng

* @version 1.0.0

* @description 连接工具类封装

* @date 2022-12-26 11:39 上午

**/

public class ConnectionUtil {

/**

* 创建 MQ 连接工厂对象

*/

private static final ConnectionFactory CONNECTION_FACTORY;

// 类加载时,重量级资源加载

static {

CONNECTION_FACTORY = new ConnectionFactory();

//设置连接MQ主机

CONNECTION_FACTORY.setHost("127.0.0.1");

//设置连接MQ端口号

CONNECTION_FACTORY.setPort(5672);

//设置连接MQ虚拟主机

CONNECTION_FACTORY.setVirtualHost("/test");

//设置连接MQ虚拟主机的用户名密码

CONNECTION_FACTORY.setUsername("admin");

CONNECTION_FACTORY.setPassword("123456");

}

/**

* 建立与RabbitMQ连接 工具方法

*

* @return Connection

*/

public static Connection getConnection() {

try {

//返回连接对象

return CONNECTION_FACTORY.newConnection();

} catch (Exception e) {

e.printStackTrace();

}

return null;

}

/**

* 关闭通道和连接 工具方法

*

* @param channel Channel

* @param connection Connection

*/

public static void closeConnectionAndChanel(Channel channel, Connection connection) {

try {

if (channel != null) {

channel.close();

}

if (connection != null) {

connection.close();

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

3、生产者实现

package com.lizhengi.topics;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import java.io.IOException;

/**

* @author liziheng

* @version 1.0.0

* @description 订阅模型-匹配(topics)模式 生产者

* @date 2022-12-26 4:15 下午

**/

public class Producer {

public static void main(String[] args) throws IOException {

// 获取连接对象

Connection connection = ConnectionUtil.getConnection();

assert connection != null;

Channel channel = connection.createChannel();

/* 为通道声明交换机

* 参数1:交换机名称;参数2:交换机类型 topic

*/

channel.exchangeDeclare("logs_topic", "topic");

//发送消息

String routingKey = "user.save";

channel.basicPublish("logs_topic", routingKey, null, ("The message's routingKey is " + routingKey + " !").getBytes());

//释放资源

ConnectionUtil.closeConnectionAndChanel(channel, connection);

}

}

4、消费者-1 实现

package com.lizhengi.topics;

import com.rabbitmq.client.*;

import java.io.IOException;

/**

* @author liziheng

* @version 1.0.0

* @description 订阅模型-匹配(topics)模式 消费者

* @date 2022-12-26 4:17 下午

**/

public class Customer1 {

public static void main(String[] args) throws IOException {

// 获取连接对象

Connection connection = ConnectionUtil.getConnection();

assert connection != null;

Channel channel = connection.createChannel();

// 绑定交换机

channel.exchangeDeclare("logs_topic", "topic");

// 为单个 Customer 创建临时队列

String queueName = channel.queueDeclare().getQueue();

// 绑定交换机和队列和routingKey

// 参数1:队列名字;参数2:交换机名称;参数3:routingKey 路由key

channel.queueBind(queueName, "logs_topic", "user.*");

// 消费消息

channel.basicConsume(queueName, true, new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {

System.out.println("消费者1:" + new String(body));

}

});

}

}

5、消费者-2 实现

package com.lizhengi.topics;

import com.rabbitmq.client.*;

import java.io.IOException;

/**

* @author liziheng

* @version 1.0.0

* @description 订阅模型-匹配(topics)模式 消费者

* @date 2022-12-26 4:17 下午

**/

public class Customer2 {

public static void main(String[] args) throws IOException {

// 获取连接对象

Connection connection = ConnectionUtil.getConnection();

assert connection != null;

Channel channel = connection.createChannel();

// 绑定交换机

channel.exchangeDeclare("logs_topic", "topic");

// 为单个 Customer 创建临时队列

String queueName = channel.queueDeclare().getQueue();

// 绑定交换机和队列和routingKey

// 参数1:队列名字;参数2:交换机名称;参数3:routingKey 路由key

channel.queueBind(queueName, "logs_topic", "user.*.*");

// 消费消息

channel.basicConsume(queueName, true, new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {

System.out.println("消费者1:" + new String(body));

}

});

}

}

三、订阅模型 三种模式区别

1、RabbitMQ 消息订阅(Fanout)模式

RabbitMQ 消息订阅(Fanout)模式把交换机(Exchange)收到的消息发送给所有绑定了该交换机的队列,忽略路由(RoutingKey)。

这种模式下,消息会被所有消费者消费。也就是说,只要是"绑定"到某个交换机的队列,都会收到生产者发送到该交换机的消息。

2、RabbitMQ 路由(direct)模式

RabbitMQ 路由(direct)模式生产者发送信息时,需要指定一个路由(RoutingKey),交换机(Exchange)会根据路由将消息发送到绑定了此路由的队列中。

3、RabbitMQ 主题(topic)模式

在实际的运用中,广播模式(fanout)和路由模式(direct)虽然功能能支持一定场景,但是任然有一定的局限性,比如不能根据多重条件来进行路由选择。

发送给主题模式交换机的信息不能是任意设置的选择键,必须是用小数点隔开的一系列的标识符,标识符可以随意填充,但是一般是与某些特征相关联。

选择键不能超过 255 个字符。合法的键比如 “vip.user.order”,“common.user.pay”。 绑定键必须以相同的格式,特殊情况:“*” (星号)可以代替任意一个标识符;“#”(井号)可以代替0个或多个标识符。

相关阅读

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。