一、背景介绍

在使用mandatory参数和回退消息之后,消息的发布者可以确定自己的消息是否已经被RabbitMQ服务接收到了,能够有机会在发布者的消息无法被投递的时候去发现是哪些消息不可投递、原因是什么,然后及时去做处理(比如手动重新投递)。

但是存在一些情况,当消息发布者一时间不知道怎么去处理这些无法被路由的消息时,最多打个日志,然后触发报警,再来进行手动处理。通过日志去处理无法路由的消息比较麻烦,特别是当生产者所在的服务有多台机器的时候,手动复制日志非常容易出错,而且设置mandatory参数也会让发布者的复杂性增加,还需要在生产者当中去添加处理被退回的消息相关代码。

如果既不想丢失消息,又不希望增生产者的复杂性,该如何去处理呢?

我们知道,如果一旦消息从队列当中被取出,但是消费者因为某些原因无法处理消息,消息就会变成“死信”,而我们可以通过RabbitMQ的死信队列来专门把这些“死信”先暂时存储起来,等待被其他消费者进行处理,但是这个过程是发生在“消息队列----消费者”这个过程当中的,如图所示:

而我们现在期望能够在“发布者---交换机”这个过程当中,能够去保存没有被路由成功的消息,采用死信队列很显然是不太可能的,因为消息压根就没有到达队列当中。因此,需要引入了一个新的东西专门来处理这些无法被路由的消息,这个东西就是“备份交换机”。

1、备份交换机引入

在RabbitMQ当中,存在一种备份交换机的机制,它相当于是交换机的备胎,专门用来应对普通交换机不能路由成功的消息。当我们为一个交换机声明一个对应的备份交换机的时候,就是给它创建了一个备胎。一旦交换机收到了一条无法路由的消息是,就会把这条消息转发给备份交换机,由备份交换机去进行转发处理。

通常情况下,备份交换机都是fanout类型的,这样可以方便将所有的消息都投递到与其绑定的队列当中,然后我们在这个队列下边去进行信息的处理,甚至还可以去创建一个报警队列,用独立的消费者来专门监测和报警,省掉每次都通过日志去查看消息情况!

2、备份交换机架构

二、代码实现

1、依赖导入pom.xml

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-amqp

org.projectlombok

lombok

org.springframework.boot

spring-boot-devtools

2、定义配置文件application.yml

3、定义配置类

说明:配置类当中需要对所有的交换机、队列及绑定关系作管理;

@Configuration

public class PublisherConfirmConfig {

//声明一些常量

private static final String CONFIRM_EXCHANGE = "confirm.exchange";

private static final String CONFIRM_QUEUE = "confirm.queue";

private static final String BACKUP_EXCHANGE = "backup.exchange";

private static final String BACKUP_QUEUE = "backup.queue";

private static final String WARNING_QUEUE = "warning.queue";

/*

管理确认交换机,需要关联上备份交换机

*/

@Bean(CONFIRM_EXCHANGE)

public DirectExchange confirmExchange(){

/* return new DirectExchange(CONFIRM_EXCHANGE);*/

//关联上备份交换机

return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true)

.withArgument("alternate-exchange",BACKUP_EXCHANGE).build();

}

//管理备份交换机

@Bean(BACKUP_EXCHANGE)

public FanoutExchange backupExchange(){

return new FanoutExchange(BACKUP_EXCHANGE);

}

//管理确认队列

@Bean(CONFIRM_QUEUE)

public Queue confirmQueue(){

return QueueBuilder.durable(CONFIRM_QUEUE).build();

}

//管理备份队列

@Bean(BACKUP_QUEUE)

public Queue backupQueue(){

return QueueBuilder.durable(BACKUP_QUEUE).build();

}

//管理告警队列

@Bean(WARNING_QUEUE)

public Queue warningQueue(){

return QueueBuilder.durable(WARNING_QUEUE).build();

}

//绑定确认交换机和确认队列

@Bean

public Binding confirmQueueBindingConfirmExchange(

@Qualifier(CONFIRM_QUEUE) Queue queue,

@Qualifier(CONFIRM_EXCHANGE) Exchange exchange

){

return BindingBuilder.bind(queue).to(exchange).with("key1").noargs();

}

//绑定备份交换机和备份队列

@Bean

public Binding backupQueueBindingBackupExchange(

@Qualifier(BACKUP_QUEUE)Queue queue,

@Qualifier(BACKUP_EXCHANGE) Exchange exchange

){

return BindingBuilder.bind(queue).to(exchange).with("").noargs();

}

//绑定备份交换机和告警队列

@Bean

public Binding warningQueueBindingBackupExchange(

@Qualifier(WARNING_QUEUE)Queue queue,

@Qualifier(BACKUP_EXCHANGE) Exchange exchange

){

return BindingBuilder.bind(queue).to(exchange).with("").noargs();

}

}

4、配置RabbitMQ的确认和回退逻辑

@Component

@Slf4j

public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{

@Autowired

private RabbitTemplate rabbitTemplate;

//向rabbitTemplate注入回调确认、回调回退的相关内容

@PostConstruct

private void init(){

//设置mandatory参数为true,让消息无法路由的时候触发回退

rabbitTemplate.setMandatory(true);

rabbitTemplate.setConfirmCallback(this);

rabbitTemplate.setReturnCallback(this);

}

/**

* 用于当broker收到发布者消息的时候,触发的回调确认方法,告知发布者消息已经到交换机了

* @param correlationData :消息相关的设置,包括消息的id和内容

* @param ack :消息是否收到确认

* @param s :消息没有收到的原因描述

*/

@Override

public void confirm(CorrelationData correlationData, boolean ack, String s) {

if(ack){

log.info("确认交换机已经收到id为:{} 的消息了!",correlationData.getId());

}else {

log.info("确认交换机还没有收到id为:{}的消息,原因是:{}",correlationData.getId(),s);

}

}

/**

* 用于当消息到达交换机之后,无法路由到队列时,MQ要执行的回退方法,告知发布者是什么消息路由失败

* @param message :消息

* @param replyCode :回退对应的编码

* @param replyText :回退原因描述

* @param exchange :交换机

* @param routingKey :交换机和队列绑定的路由规则

*/

@Override

public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

log.error("消息:{},被交换机:{},给退回了!退回的原因是:{}",

message.getBody().toString(),exchange,replyText);

}

}

5、定义消息发布者

说明:消息发布者只需要将消息投递到确认交换机即可,为了方便演示,发布一条正常消费的消息,一条不可路由的消息;

@RestController

@RequestMapping("/publish")

@Slf4j

public class Publisher {

private static final String CONFIRM_EXCHANGE = "confirm.exchange";

@Autowired

private RabbitTemplate rabbitTemplate;

//发布消息

@GetMapping("send/{message}")

public String sendMessage(@PathVariable("message")String message){

String date = new Date().toString();

log.info("生产者在:{},发布了消息:{}",date,message);

//发送一条路由key正确、id为1的消息

rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE,"key1",message,new CorrelationData("1"));

return "生产者在:"+date+",发布了一条消息:"+message;

}

//发布消息:不可路由的消息

@GetMapping("send/error/{message}")

public String sendMessage2(@PathVariable("message")String message){

String date = new Date().toString();

log.info("生产者在:{},发布了消息:{}",date,message);

//发送一条路由key不正确,id为2的消息

rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE,"key2",message,new CorrelationData("2"));

return "生产者在:"+date+",发布了一条消息:"+message;

}

}

6、分别定义消费者:confirm、backup、warning 

可以正常消费消息:

@Component

@Slf4j

public class ConfirmConsumerListener {

private static final String CONFIRM_QUEUE = "confirm.queue";

//监听确认队列当中的消息

@RabbitListener(queues = CONFIRM_QUEUE)

public void confirmMessage(Message message){

String date = new Date().toString();

log.info("消费者C1在:{},收到了确认队列当中的消息:{}",date,new String(message.getBody()));

}

}

处理不可路由消息的:

@Component

@Slf4j

public class BackupConsumerListener {

private static final String BACKUP_QUEUE = "backup.queue";

@RabbitListener(queues = BACKUP_QUEUE)

public void backupMessage(Message message){

String date = new Date().toString();

log.info("备用消费者在:{},收到了消息:{}",date,new String(message.getBody()));

}

}

做监测告警的:

@Component

@Slf4j

public class WarningConsumerListener {

private static final String WARNING_QUEUE = "warning.queue";

@RabbitListener(queues = WARNING_QUEUE)

public void warningMessage(Message message){

String date = new Date().toString();

log.info("告警在:{},发现了消息:{}",date,new String(message.getBody()));

}

}

7、结果分析

(1)发送一条路由key正确的消息:

可以看到:

消息已经正常路由到确认队列当中,然后被消费者C1消费了;

通过发布确认机制开启之后,交换机也在收到消息后给消息发布者回复了一条确认消息!

(2)发送一条路由key不正确的消息:

可以看到:

当消息无法被路由到正确的queue时,使用“备份交换机”机制之后,confirm交换机不再把消息直接回退、通知发布者,而是将消息转发给备份交换机,备份消费者、告警消费者从与备份交换机绑定的队列来消费消息;

交换机确认收到消息之后,提醒发布者的时间总是晚于消息处理时间;

三、小结

通过消息的发布确认机制,消息发布者可以知道交换机是否有收到发送的消息,但是当消息无法理由时,交换机会直接将消息丢弃,而发布者只知道交换机收到了消息、却不知道消息已经被丢弃了;通过回退消息,设置mandatory参数为true,可以在消息无法路由时,让交换机在将消息退回,告知发布者是什么消息被路由失败了;通过备份交换机机制,可以将那些无法路由的消息,直接交给类型为fanout的备份交换机进行存储,然后通过与备份交换机绑定的队列进行消息消费或者使用告警队列去做监测和告警;

推荐文章

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。