目录

一、消息不丢失1.消息确认2.消息确认业务封装2.1 发送确认消息测试2.2 消息发送失败,设置重发机制

一、消息不丢失

消息的不丢失,在MQ角度考虑,一般有三种途径: 1,生产者不丢数据 2,MQ服务器不丢数据 3,消费者不丢数据 保证消息不丢失有两种实现方式: 1,开启事务模式 2,消息确认模式 说明:开启事务会大幅降低消息发送及接收效率,使用的相对较少,因此我们生产环境一般都采取消息确认模式,以下我们只是讲解消息确认模式

1.消息确认

消息持久化 如果希望RabbitMQ重启之后消息不丢失,那么需要对以下3种实体均配置持久化 Exchange 声明exchange时设置持久化(durable = true)并且不自动删除(autoDelete = false) Queue 声明queue时设置持久化(durable = true)并且不自动删除(autoDelete = false) message 发送消息时通过设置deliveryMode=2持久化消息

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。那么如何持久化呢,其实也很容易,就下面两步:

1、将queue的持久化标识durable设置为true,则代表是一个持久的队列

2、发送消息的时候将deliveryMode=2

这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据

发送确认 有时,业务处理成功,消息也发了,但是我们并不知道消息是否成功到达了rabbitmq,如果由于网络等原因导致业务成功而消息发送失败,那么发送方将出现不一致的问题,此时可以使用rabbitmq的发送确认功能,即要求rabbitmq显式告知我们消息是否已成功发送。

手动消费确认 有时,消息被正确投递到消费方,但是消费方处理失败,那么便会出现消费方的不一致问题。比如:订单已创建的消息发送到用户积分子系统中用于增加用户积分,但是积分消费方处理却都失败了,用户就会问:我购买了东西为什么积分并没有增加呢? 要解决这个问题,需要引入消费方确认,即只有消息被成功处理之后才告知rabbitmq以ack,否则告知rabbitmq以nack

2.消息确认业务封装

service-mq修改配置 开启rabbitmq消息确认配置,在common的配置文件中都已经配置好了!

spring:

rabbitmq:

host: 192.168.121.140

port: 5672

username: admin

password: admin

publisher-confirms-type: correlated #交换机的确认

publisher-returns: true #队列的确认

listener:

simple:

acknowledge-mode: manual #默认情况下消息消费者是自动确认消息的,如果要手动确认消息则需要修改确认模式为manual

prefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发

搭建rabbit-util模块 由于消息队列是公共模块,我们把mq的相关业务封装到该模块,其他service微服务模块都可能使用,因此我们把他封装到一个单独的模块,需要使用mq的模块直接引用该模块即可 搭建方式如: pom.xml

org.springframework.boot

spring-boot-starter-actuator

org.springframework.cloud

spring-cloud-starter-bus-amqp

4.2.4 封装发送端消息确认

/**

* @Description 消息发送确认

*

* ConfirmCallback 只确认消息是否正确到达 Exchange 中

* ReturnCallback 消息没有正确到达队列时触发回调,如果正确到达队列不执行

*

* 1. 如果消息没有到exchange,则confirm回调,ack=false

* 2. 如果消息到达exchange,则confirm回调,ack=true

* 3. exchange到queue成功,则不回调return

* 4. exchange到queue失败,则回调return

*

*/

@Component

@Slf4j

public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

@Autowired

private RabbitTemplate rabbitTemplate;

// 修饰一个非静态的void()方法,在服务器加载Servlet的时候运行,并且只会被服务器执行一次在构造函数之后执行,init()方法之前执行。

@PostConstruct

public void init() {

rabbitTemplate.setConfirmCallback(this); //指定 ConfirmCallback

rabbitTemplate.setReturnCallback(this); //指定 ReturnCallback

}

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

if (ack) {

log.info("消息发送成功:" + JSON.toJSONString(correlationData));

} else {

log.info("消息发送失败:" + cause + " 数据:" + JSON.toJSONString(correlationData));

}

}

@Override

public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

// 反序列化对象输出

System.out.println("消息主体: " + new String(message.getBody()));

System.out.println("应答码: " + replyCode);

System.out.println("描述:" + replyText);

System.out.println("消息使用的交换器 exchange : " + exchange);

System.out.println("消息使用的路由键 routing : " + routingKey);

}

}

封装消息发送

@Service

public class RabbitService {

@Autowired

private RabbitTemplate rabbitTemplate;

/**

* 发送消息

* @param exchange 交换机

* @param routingKey 路由键

* @param message 消息

*/

public boolean sendMessage(String exchange, String routingKey, Object message) {

rabbitTemplate.convertAndSend(exchange, routingKey, message);

return true;

}

}

2.1 发送确认消息测试

消息发送端

@RestController

@RequestMapping("/mq")

public class MqController {

@Autowired

private RabbitService rabbitService;

/**

* 消息发送

*/

//http://localhost:8282/mq/sendConfirm

@GetMapping("sendConfirm")

public Result sendConfirm() {

rabbitService.sendMessage("exchange.confirm", "routing.confirm", "来人了,开始接客吧!");

return Result.ok();

}

}

消息接收端

@Component

public class ConfirmReceiver {

@SneakyThrows

@RabbitListener(bindings=@QueueBinding(

value = @Queue(value = "queue.confirm",autoDelete = "false"),

exchange = @Exchange(value = "exchange.confirm",autoDelete = "true"),

key = {"routing.confirm"}))

public void process(Message message, Channel channel){

System.out.println("RabbitListener:"+new String(message.getBody()));

// false 确认一个消息,true 批量确认

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

}

}

测试:http://localhost:8282/mq/sendConfirm

2.2 消息发送失败,设置重发机制

实现思路:借助redis来实现重发机制 模块中添加依赖

org.springframework.boot

spring-boot-starter-data-redis

org.apache.commons

commons-pool2

com.alibaba

fastjson

自定义一个实体类来接收消息

@Data

public class GmallCorrelationData extends CorrelationData {

// 消息主体

private Object message;

// 交换机

private String exchange;

// 路由键

private String routingKey;

// 重试次数

private int retryCount = 0;

// 消息类型 是否是延迟消息

private boolean isDelay = false;

// 延迟时间

private int delayTime = 10;

}

修改发送方法

// 封装一个发送消息的方法

public Boolean sendMsg(String exchange,String routingKey, Object msg){

// 将发送的消息 赋值到 自定义的实体类

GmallCorrelationData gmallCorrelationData = new GmallCorrelationData();

// 声明一个correlationId的变量

String correlationId = UUID.randomUUID().toString().replaceAll("-","");

gmallCorrelationData.setId(correlationId);

gmallCorrelationData.setExchange(exchange);

gmallCorrelationData.setRoutingKey(routingKey);

gmallCorrelationData.setMessage(msg);

// 发送消息的时候,将这个gmallCorrelationData 对象放入缓存。

redisTemplate.opsForValue().set(correlationId, JSON.toJSONString(gmallCorrelationData),10, TimeUnit.MINUTES);

// 调用发送消息方法

//this.rabbitTemplate.convertAndSend(exchange,routingKey,msg);

this.rabbitTemplate.convertAndSend(exchange,routingKey,msg,gmallCorrelationData);

// 默认返回true

return true;

}

发送失败调用重发方法 MQProducerAckConfig 类中修改

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

// ack = true 说明消息正确发送到了交换机

if (ack){

System.out.println("哥们你来了.");

log.info("消息发送到了交换机");

}else {

// 消息没有到交换机

log.info("消息没发送到交换机");

// 调用重试发送方法

this.retrySendMsg(correlationData);

}

}

@Override

public void returnedMessage(Message message, int code, String codeText, String exchange, String routingKey) {

System.out.println("消息主体: " + new String(message.getBody()));

System.out.println("应答码: " + code);

System.out.println("描述:" + codeText);

System.out.println("消息使用的交换器 exchange : " + exchange);

System.out.println("消息使用的路由键 routing : " + routingKey);

// 获取这个CorrelationData对象的Id spring_returned_message_correlation

String correlationDataId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");

// 因为在发送消息的时候,已经将数据存储到缓存,通过 correlationDataId 来获取缓存的数据

String strJson = (String) this.redisTemplate.opsForValue().get(correlationDataId);

// 消息没有到队列的时候,则会调用重试发送方法

GmallCorrelationData gmallCorrelationData = JSON.parseObject(strJson,GmallCorrelationData.class);

// 调用方法 gmallCorrelationData 这对象中,至少的有,交换机,路由键,消息等内容.

this.retrySendMsg(gmallCorrelationData);

}

/**

* 重试发送方法

* @param correlationData 父类对象 它下面还有个子类对象 GmallCorrelationData

*/

private void retrySendMsg(CorrelationData correlationData) {

// 数据类型转换 统一转换为子类处理

GmallCorrelationData gmallCorrelationData = (GmallCorrelationData) correlationData;

// 获取到重试次数 初始值 0

int retryCount = gmallCorrelationData.getRetryCount();

// 判断

if (retryCount>=3){

// 不需要重试了

log.error("重试次数已到,发送消息失败:"+JSON.toJSONString(gmallCorrelationData));

} else {

// 变量更新

retryCount+=1;

// 重新赋值重试次数 第一次重试 0->1 1->2 2->3

gmallCorrelationData.setRetryCount(retryCount);

System.out.println("重试次数:\t"+retryCount);

// 更新缓存中的数据

this.redisTemplate.opsForValue().set(gmallCorrelationData.getId(),JSON.toJSONString(gmallCorrelationData),10, TimeUnit.MINUTES);

// 调用发送消息方法 表示发送普通消息 发送消息的时候,不能调用 new RabbitService().sendMsg() 这个方法

this.rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(),gmallCorrelationData.getRoutingKey(),gmallCorrelationData.getMessage(),gmallCorrelationData);

}

}

测试:只需修改(错误信息)

查看原文