文章目录
在Idea上实践RabbitMQ(一)添加新用户测试实战速览Work模型下的发布者直接对队列发布消息报错可能存在的问题Work模型下消费者消费消息交换机与队列绑定补充:总结:
在Idea上实践RabbitMQ(一)
本节内容:RabbitMQ在IDEA的使用,以及所踩的坑。举例项目使用的是rabbitTemplate模版。
核心知识点回顾
RabbitMQ中Work模型 多个消费者绑定一个队列,共同消费队列中的信息。不存在交换机。 发布者->交换机->队列->消费者rabbitTemplate模板:SpringBoot对RabbitMQ的模板封装
首先确保RabbitMQ正常运行
本次操作基于RabbitMQ3.8.34版本
添加新用户测试
为了测试,我在进行实操前首先添加一个新用户test
进入RabbitMQ的管理界面,点击用户界面 添加用户、密码都为test 添加成功用户列表处就会出现test 退出当前用户更换test用户 为test用户添加虚拟主机
实战速览
本次测试采用RabbitMQ项目作为父项目,consumer、publisher作为子项目。通过父项目来管理子项目的依赖 借助SpringBoot的rabbitTemplate模版,使用RabbitMQ的项目都要导入RabbitMQ依赖,并配置application.yml。 在项目创建之初勾选: 或 在pom文件中添加对应依赖:
application.yml配置
spring:
rabbitmq:
host: 你的IP地址
port: 5672
virtual-host: 虚拟主机名
username: 用户名
password: 密码
server:
port:
端口号
注意:端口号要不一样
Work模型下的发布者直接对队列发布消息
在测试类中模拟发布消息,我为对应的虚拟主机创建了一个queueName队列。实际命名一般采取 字符串.queue 的形式。
package com.example.publisher;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class PublisherApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
String queueName="queueName";
String msg = "hello,";
for (int i=0;i<20;i++){
//在无交换机的情况下,发布者直接将信息发送给队列queueName。我这里重复对队列发送了20次消息。
rabbitTemplate.convertAndSend(queueName,msg+i);
}
}
}
报错可能存在的问题
host不对:host为部署到哪台机器的ip地址,个人电脑就本机地址,云服务器就用云服务器的ip地址port端口问题:端口为通信端口5672,不是控制界面的端口15672虚拟主机:确定好虚拟主机是否存在。注意/问题,我的配置文件里不加/访问成功。加上/访问失败。
Work模型下消费者消费消息
核心注解:@RabbitListener(queues = “队列名”),监听指定队列名的队列
package com.q.consumer.listeners;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MQListener {
//消费者监听队列的消息,参数设为消费者发送的类型
//这里消费者有两个,都对queueName进行监听
@RabbitListener(queues = "queueName")
public void listenerQueueA(String msg) throws InterruptedException {
System.out.println("msg1 = " + msg);
Thread.sleep(200);
}
@RabbitListener(queues = "queueName")
public void listenerQueueB(String msg) throws InterruptedException {
System.out.println("msg2 = " + msg);
Thread.sleep(100);
}
}
补充:
默认情况下。如果监听同一个队列的消费者存在多个,队列会轮询投递消费者消息,且同一条消息只会被消费一次。但是队列不会考虑消费者是否消费完成。消费者会出现消费堆积问题。以下配置可以让消费者确保每次只会消费一条消息,只有消费者消费完成才会继续接受下一条消息。这样不同消费者的消费情况就会不同。即一旦消费者完成了一条消息的处理,它将立即从队列中获取并处理下一条消息而不是轮询。
spring:
rabbitmq:
listener:
simple:
prefetch:1
交换机与队列绑定
为什么会有交换机?假设一个消息想要被同时多次处理,如果只有队列,消息就只能传递给一个队列,然后一个队列再被消费者消费。这就等于一个消息(因为一条消息只能被消费一次)被一个消费者消费了,无法满足需求。 如果存在一个交换机,消息给交换机,交换机再去把同一条消息分配给不同队列,这就能满足同时多次处理消息的需求。 交换机分类:
Fanout:广播(通知交换机绑定的所有队列) Direct:定向(通知与交换机绑定的key相同的所有队列) Topic:话题(通知与交换机绑定的通配符吻合的所有队列) 在topic中 #代表任意字符串,*代表一个字符串(记忆方法:代表任意字符即一个完整的字符串) 如a.# 代表以a开头的所有字符串 a. 以a开头后接一个字符串 在SpringBoot中支持两种绑定形式: 1) 在管理界面手动添加并绑定好对应的exchange和queue 2)在配置/注解里绑定exchange和queue 流程:发布者发布消息到交换机->消费者消费消息,其中交换机绑定队列只需注解或者手动绑定,但是想要创建队列和交换机还是要在配置类手动创建。 补充:交换机也分临时和持久,选择时这些都需要注意,默认都是持久化。 发布者发布消息 package com.example.publisher;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
//模拟发布者对交换机发布消息
@SpringBootTest
public class exchangerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void fanoutTest() {
String exchangeName="amq.fanout";
String msg = "hello,";
//routingKey,就是队列中的key,在广播模式中没有设置key,因此第二个参数设置null或""。
//如果是定向redict就要考虑key的值,定向也可以达成广播的形式,key同样设置为null或""。
//如果是topic,则传入完整的字符串,该字符串会自动与已经绑定的队列进行规则匹配
rabbitTemplate.convertAndSend(exchangeName,null,msg);
}
}
消费者消费消息 a方案: package com.q.consumer.listeners;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
//模拟消费者监听消息,不用考虑交换机与队列的绑定,直接监听队列就行了,生产者发送消息给队列时已经指定好了
@Component
public class MQListener2 {
@RabbitListener(queues = "fanout.queue1")
public void listenerQueue1(String msg) throws InterruptedException {
System.out.println("msg1 = " + msg);
}
@RabbitListener(queues = "fanout.queue2")
public void listenerQueue2(String msg) throws InterruptedException {
System.out.println("msg2 = " + msg);
}
}
b方案(一)在配置中绑定,配置类一般放在配置包下。绑定操作一般放在消费者项目中而不放在发布者项目中。当是定向或话题绑定key时,一次只能绑定一个key,如果一个队列绑定多个key,那么就要分开写多个binding绑定异常麻烦。 package com.example.consumer.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class MQConfiguration {
@Bean
public FanoutExchange TestExchange(){
//ExchangeBuilder.fanoutExchange("Test.fanout").build();//也可以使用Builder来创建
return new FanoutExchange("Test.fanout");
}
@Bean
public Queue TestQueue1(){
//QueueBuilder.durable("Test.queue").build();
return new Queue("Test.queue");
}
@Bean
public Binding bindingTestQueue1(FanoutExchange TestExchange, Queue TestQueue1){
return BindingBuilder.bind(TestQueue1).to(TestExchange);
}
//当发现创建交换机和队列失败且不是自身端口、主机等设置问题时,请使用下面的方法
// @Bean
// ApplicationRunner runner(ConnectionFactory cf) {
// log.info("执行成功");
// return args -> cf.createConnection().close();
// }
}
b方案(二)将绑定操作放在监听消费者的注解中,在监听中不仅会绑定交换机和队列,还会创建对应的交换机和队列。所以使用注解的形式绑定最方便。 package listeners;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MQListener3 {
//记忆方法:嵌套注解,绑定。
//key代表exchange根据指定规则路由到队列中
//ExchangeType为rabbitmq依赖提供的类,里面提供可选择的类型,key里面即对应规则
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "fanout.queue2"),
exchange = @Exchange(name = "amq.fanout",type = ExchangeTypes.DIRECT),
key = {"a","b"}
))
public void listenerQueue1(String msg) throws InterruptedException {
System.out.println("msg1 = " + msg);
}
}
补充:
springBoot的rabbitmq模版中提供了两类消息传递 临时消息:将消息存储在内存上,一旦rabbitmq重启消息会丢失。临时消息适合不太重要的一次性消息。 持久化消息:将消息存储在磁盘上 在RabbitMQ模版中默认都是持久的 以direct交换机为例子消息不持久化设置: 消息设置是在发布者中设置的:
MessagePostProcessor mpp = message -> {
// 设置消息不持久化
message.getMessageProperties()
//setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)和setExpiration()方法只能选择一个
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);//设置为过期消息
//.setExpiration("5000"); // 设置为过期消息且过期时间为5秒
return message;
};
//四个参数对应的是,交换机名字,key,消息内容,
rabbitTemplate.convertAndSend("exchangeName"," routingKey", "message", mpp);
或者
MessageBuilder builder = (MessageBuilder) MessageBuilder.withBody("message".getBytes()) //message就是要发送的消息
.setContentEncoding("utf-8")
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
//.setExpiration("5000");
Message message = builder.build();
rabbitTemplate.convertAndSend("exchangeName"," routingKey", message);
总结:
在Springboot项目中,交换机和队列的创建放在配置类中并装配到容器中,而绑定操作最好放在消费者监听队列的注解中。 下一节
相关文章
发表评论