文章目录

在Idea上实践RabbitMQ(一)添加新用户测试实战速览Work模型下的发布者直接对队列发布消息报错可能存在的问题Work模型下消费者消费消息交换机与队列绑定补充:总结:

在Idea上实践RabbitMQ(一)

本节内容:RabbitMQ在IDEA的使用,以及所踩的坑。举例项目使用的是rabbitTemplate模版。

核心知识点回顾

RabbitMQ中Work模型 多个消费者绑定一个队列,共同消费队列中的信息。不存在交换机。 发布者->交换机->队列->消费者rabbitTemplate模板:SpringBoot对RabbitMQ的模板封装

首先确保RabbitMQ正常运行

本次操作基于RabbitMQ3.8.34版本

添加新用户测试

为了测试,我在进行实操前首先添加一个新用户test

进入RabbitMQ的管理界面,点击用户界面 添加用户、密码都为test 添加成功用户列表处就会出现test 退出当前用户更换test用户 为test用户添加虚拟主机

实战速览

本次测试采用RabbitMQ项目作为父项目,consumer、publisher作为子项目。通过父项目来管理子项目的依赖 借助SpringBoot的rabbitTemplate模版,使用RabbitMQ的项目都要导入RabbitMQ依赖,并配置application.yml。 在项目创建之初勾选: 或 在pom文件中添加对应依赖:

org.springframework.boot

spring-boot-starter-amqp

org.springframework.amqp

spring-rabbit-test

test

application.yml配置

spring:

rabbitmq:

host: 你的IP地址

port: 5672

virtual-host: 虚拟主机名

username: 用户名

password: 密码

server:

port:

端口号

注意:端口号要不一样

Work模型下的发布者直接对队列发布消息

在测试类中模拟发布消息,我为对应的虚拟主机创建了一个queueName队列。实际命名一般采取 字符串.queue 的形式。

package com.example.publisher;

import org.junit.jupiter.api.Test;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest

class PublisherApplicationTests {

@Autowired

private RabbitTemplate rabbitTemplate;

@Test

void contextLoads() {

String queueName="queueName";

String msg = "hello,";

for (int i=0;i<20;i++){

//在无交换机的情况下,发布者直接将信息发送给队列queueName。我这里重复对队列发送了20次消息。

rabbitTemplate.convertAndSend(queueName,msg+i);

}

}

}

报错可能存在的问题

host不对:host为部署到哪台机器的ip地址,个人电脑就本机地址,云服务器就用云服务器的ip地址port端口问题:端口为通信端口5672,不是控制界面的端口15672虚拟主机:确定好虚拟主机是否存在。注意/问题,我的配置文件里不加/访问成功。加上/访问失败。

Work模型下消费者消费消息

核心注解:@RabbitListener(queues = “队列名”),监听指定队列名的队列

package com.q.consumer.listeners;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

public class MQListener {

//消费者监听队列的消息,参数设为消费者发送的类型

//这里消费者有两个,都对queueName进行监听

@RabbitListener(queues = "queueName")

public void listenerQueueA(String msg) throws InterruptedException {

System.out.println("msg1 = " + msg);

Thread.sleep(200);

}

@RabbitListener(queues = "queueName")

public void listenerQueueB(String msg) throws InterruptedException {

System.out.println("msg2 = " + msg);

Thread.sleep(100);

}

}

补充:

默认情况下。如果监听同一个队列的消费者存在多个,队列会轮询投递消费者消息,且同一条消息只会被消费一次。但是队列不会考虑消费者是否消费完成。消费者会出现消费堆积问题。以下配置可以让消费者确保每次只会消费一条消息,只有消费者消费完成才会继续接受下一条消息。这样不同消费者的消费情况就会不同。即一旦消费者完成了一条消息的处理,它将立即从队列中获取并处理下一条消息而不是轮询。

spring:

rabbitmq:

listener:

simple:

prefetch:1

交换机与队列绑定

    为什么会有交换机?假设一个消息想要被同时多次处理,如果只有队列,消息就只能传递给一个队列,然后一个队列再被消费者消费。这就等于一个消息(因为一条消息只能被消费一次)被一个消费者消费了,无法满足需求。     如果存在一个交换机,消息给交换机,交换机再去把同一条消息分配给不同队列,这就能满足同时多次处理消息的需求。     交换机分类:

Fanout:广播(通知交换机绑定的所有队列) Direct:定向(通知与交换机绑定的key相同的所有队列) Topic:话题(通知与交换机绑定的通配符吻合的所有队列) 在topic中 #代表任意字符串,*代表一个字符串(记忆方法:代表任意字符即一个完整的字符串) 如a.# 代表以a开头的所有字符串 a. 以a开头后接一个字符串 在SpringBoot中支持两种绑定形式: 1) 在管理界面手动添加并绑定好对应的exchange和queue 2)在配置/注解里绑定exchange和queue 流程:发布者发布消息到交换机->消费者消费消息,其中交换机绑定队列只需注解或者手动绑定,但是想要创建队列和交换机还是要在配置类手动创建。 补充:交换机也分临时和持久,选择时这些都需要注意,默认都是持久化。 发布者发布消息 package com.example.publisher;

import org.junit.jupiter.api.Test;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

//模拟发布者对交换机发布消息

@SpringBootTest

public class exchangerTest {

@Autowired

private RabbitTemplate rabbitTemplate;

@Test

void fanoutTest() {

String exchangeName="amq.fanout";

String msg = "hello,";

//routingKey,就是队列中的key,在广播模式中没有设置key,因此第二个参数设置null或""。

//如果是定向redict就要考虑key的值,定向也可以达成广播的形式,key同样设置为null或""。

//如果是topic,则传入完整的字符串,该字符串会自动与已经绑定的队列进行规则匹配

rabbitTemplate.convertAndSend(exchangeName,null,msg);

}

}

消费者消费消息 a方案: package com.q.consumer.listeners;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

//模拟消费者监听消息,不用考虑交换机与队列的绑定,直接监听队列就行了,生产者发送消息给队列时已经指定好了

@Component

public class MQListener2 {

@RabbitListener(queues = "fanout.queue1")

public void listenerQueue1(String msg) throws InterruptedException {

System.out.println("msg1 = " + msg);

}

@RabbitListener(queues = "fanout.queue2")

public void listenerQueue2(String msg) throws InterruptedException {

System.out.println("msg2 = " + msg);

}

}

b方案(一)在配置中绑定,配置类一般放在配置包下。绑定操作一般放在消费者项目中而不放在发布者项目中。当是定向或话题绑定key时,一次只能绑定一个key,如果一个队列绑定多个key,那么就要分开写多个binding绑定异常麻烦。 package com.example.consumer.config;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.boot.ApplicationRunner;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Slf4j

@Configuration

public class MQConfiguration {

@Bean

public FanoutExchange TestExchange(){

//ExchangeBuilder.fanoutExchange("Test.fanout").build();//也可以使用Builder来创建

return new FanoutExchange("Test.fanout");

}

@Bean

public Queue TestQueue1(){

//QueueBuilder.durable("Test.queue").build();

return new Queue("Test.queue");

}

@Bean

public Binding bindingTestQueue1(FanoutExchange TestExchange, Queue TestQueue1){

return BindingBuilder.bind(TestQueue1).to(TestExchange);

}

//当发现创建交换机和队列失败且不是自身端口、主机等设置问题时,请使用下面的方法

// @Bean

// ApplicationRunner runner(ConnectionFactory cf) {

// log.info("执行成功");

// return args -> cf.createConnection().close();

// }

}

b方案(二)将绑定操作放在监听消费者的注解中,在监听中不仅会绑定交换机和队列,还会创建对应的交换机和队列。所以使用注解的形式绑定最方便。 package listeners;

import org.springframework.amqp.rabbit.annotation.Exchange;

import org.springframework.amqp.rabbit.annotation.Queue;

import org.springframework.amqp.rabbit.annotation.QueueBinding;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

public class MQListener3 {

//记忆方法:嵌套注解,绑定。

//key代表exchange根据指定规则路由到队列中

//ExchangeType为rabbitmq依赖提供的类,里面提供可选择的类型,key里面即对应规则

@RabbitListener(bindings = @QueueBinding(

value = @Queue(name = "fanout.queue2"),

exchange = @Exchange(name = "amq.fanout",type = ExchangeTypes.DIRECT),

key = {"a","b"}

))

public void listenerQueue1(String msg) throws InterruptedException {

System.out.println("msg1 = " + msg);

}

}

补充:

springBoot的rabbitmq模版中提供了两类消息传递 临时消息:将消息存储在内存上,一旦rabbitmq重启消息会丢失。临时消息适合不太重要的一次性消息。 持久化消息:将消息存储在磁盘上 在RabbitMQ模版中默认都是持久的 以direct交换机为例子消息不持久化设置: 消息设置是在发布者中设置的:

MessagePostProcessor mpp = message -> {

// 设置消息不持久化

message.getMessageProperties()

//setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)和setExpiration()方法只能选择一个

.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);//设置为过期消息

//.setExpiration("5000"); // 设置为过期消息且过期时间为5秒

return message;

};

//四个参数对应的是,交换机名字,key,消息内容,

rabbitTemplate.convertAndSend("exchangeName"," routingKey", "message", mpp);

或者

MessageBuilder builder = (MessageBuilder) MessageBuilder.withBody("message".getBytes()) //message就是要发送的消息

.setContentEncoding("utf-8")

.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);

//.setExpiration("5000");

Message message = builder.build();

rabbitTemplate.convertAndSend("exchangeName"," routingKey", message);

总结:

在Springboot项目中,交换机和队列的创建放在配置类中并装配到容器中,而绑定操作最好放在消费者监听队列的注解中。 下一节

相关文章

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。