一、rabbitmq如何保证消息不丢失?
RabbitMq丢失消息的场景大致分为以上四种情况。
1.生产者在发消息给交换机的过程中消息丢失
2.交换机成功收到消息,再路由给队列的过程中丢失
3.队列未持久化,服务器宕机导致丢失
4.消费者未成功消费消息,导致消息丢失
1. 对于1、2过程中出现消息丢失的问题可以使用生产者的发布确认模式。(配置如下)
/**
* rabbitTemplate初始化配置
* @param connectionFactory spring提供的RabbitMq连接池
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
//设置生产者消息发送确认类型
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
//设置消息发送后需要回调
connectionFactory.setPublisherReturns(true);
//创建RabbitTemplate
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//设置Rabbitmq消息的序列化方式
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//如果设置了 rabbitTemplate.setMandatory(true) 参数,则会将消息退回给 producer。并执行回调函数 returnedMessage。
rabbitTemplate.setMandatory(true);
//confirmCallBack是确保消息投递到交换机的回调
//消息进入exchange是触发
rabbitTemplate.setConfirmCallback((correlationData, ack, cuase) ->{
if (ack){
log.info("消息成功到达交换机");
}else {
log.info("消息投递到交换机失败,可考虑重新投递");
}
});
//returnCallBack是确保消息通过交换机转发给消息队列的回调
//消息未送达队列触发回调
rabbitTemplate.setReturnCallback((message,replyCode,replyText, exchange, routingKey) -> {
log.error("消息发送失败,未送达队列,message:{},replyCode:{},replyText:{},exchange:{},exchange:{}",message, replyCode, replyText, exchange, routingKey);
});
return rabbitTemplate;
}
配置好了之后,生产者直接调用convertAndSend即可发送消息。
ps:如果消息在由交换机到队列的过程中失败,还可以为交换机指定备份交换机
//声明队列
@Bean
public Queue directQueue(){
return new Queue("direct-queue",true,false,false,null);
}
//声明交换机,并指定备份交换机
@Bean
public DirectExchange directExchange() {
Map
map.put("alternate-exchange","alternate-exchange");
return new DirectExchange("direct-exchange",true,false,map);
}
//声明备份队列
@Bean
public Queue alternateQueue(){
return new Queue("alternate-queue",true,false,false,null);
}
/**
* 备份交换机使用扇形模式,可以绑定多个队列,每个队列分别执行消费,预警,入库等操作
* @return
*/
@Bean
public FanoutExchange alternateExchange() {
return new FanoutExchange("alternate-exchange",true,false);
}
//绑定工作交换机与队列
@Bean
public Binding directBinding(){
return BindingBuilder.bind(directQueue()).to(directExchange()).with("hello");
}
//绑定备份交换机与队列
@Bean
public Binding alternateBinding(){
return BindingBuilder.bind(alternateQueue()).to(alternateExchange());
}
2. 对于场景3 出现消息丢失的可能存在以下两点
1. 如果消息队列未设置持久化,则可通过设置队列持久化来保存内存中的消息到磁盘,以免服务宕机导致队列中的消息丢失。
2.如果队列设置了持久化,队列中的部分还未全部持久化到磁盘,此事服务宕机,就可能回丢失少量的消息。这种情况可以搭建镜像集群来解决。
3. 对于场景4 消费者方可以使用手动ack+消息重新入队的方式解决。
@RabbitListener(queues = "direct-queue")
public void consumeMsg(String msg, Message message, Channel channel) throws IOException {
try{
//处理业务逻辑
log.info("收到消息,msg:{}",msg);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
//签收消息
channel.basicAck(deliveryTag,false);
}catch (Exception e) {
//判断是否已经重新投递
if (message.getMessageProperties().getRedelivered()){
//已经重新入过队了,则直接丢弃(如果配置了死信队列,则转到私信队列里面)
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}else {
//没有重新入队,则重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}
}
}
ps: 在此种场景下还可以配置死信交换机,对丢失的消息做进一步的处理
//声明队列
@Bean
public Queue directQueue(){
Map
//指定进入的死信交换机
map.put("x-dead-letter-exchange","dead_direct_exchange");
//direct模式需要配置,fanout模式是不需要配置的
map.put("x-dead-letter-routing-key","dead");
return new Queue("direct-queue",true,false,false,map);
}
//声明死信队列
@Bean
public Queue deadQueue(){
return new Queue("dead-queue",true,false,false,null);
}
//声明死信交换机
@Bean
public DirectExchange deadExchange() {
return new DirectExchange("dead_direct_exchange",true,false);
}
@Bean
public Binding deadBinding(){
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
}
二、如何保证消息的幂等性
所谓消息的幂等性,就是保证消息不被重复消费。例如一条扣款消息,被消费了两次,扣了两次款,显然是有问题的。所以保证消息的幂等性是很有必要的。
这里我们采用全局唯一id的方式来保证幂等性:
1. 生产者在发送消息之前先生成一个全局唯一Id作为消息的ID和消息体一起发送到队列,并存放在redis里面
2. 消费者获取到消息后,先判断该消息Id是否存在于redis中,如果存在说明该消息还未被消费,则走消费逻辑,消费完后将redis中的消息id删除;如果不存在则说明消息已经被消费了,则直接签收不做任何处理。(代码实现如下所示)
//生产者
@Resource
private RabbitTemplate rabbitTemplate;
@Autowired
private StringRedisTemplate redisTemplate;
public void produceMsg(String msg) {
//生成全局唯一id
String msgId = UUID.randomUUID().toString();
//存到redis里面
redisTemplate.opsForValue().set("msg:"+msgId,"");
HashMap
map.put("messageId", msgId);
map.put("msg",msg);
rabbitTemplate.convertAndSend("direct-queue",map);
}
//消费者
@Autowired
private StringRedisTemplate redisTemplate;
@RabbitListener(queues = "direct-queue")
public void consumeMsg(String msg, Message message, Channel channel) throws IOException {
try{
log.info("收到消息,msg:{}",msg);
HashMap map = JSON.parseObject(msg, HashMap.class);
Boolean msgId = redisTemplate.hasKey("msg:" + map.get("messageId"));
if (Boolean.FALSE.equals(msgId)){
//不存在说明已经消费过,则不做任何处理直接签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
return;
}
//存在则继续执行消费逻辑
log.info("处理业务逻辑");
//删除redis消息Id
redisTemplate.delete("msg:" + map.get("messageId"));
//签收消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e) {
//判断是否已经重新投递
if (message.getMessageProperties().getRedelivered()){
//已经重新入过队了,则直接丢弃(如果配置了死信队列,则转到私信队列里面)
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}else {
//没有重新入队,则重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}
}
}
保障消息幂等的方式有很多,我们这里采用的时删除则代表已经消费过的方式。也可以采用修改状态的方式;或者也根据具体的业务场景,选用其它合理的处理方式。
三、如何保证消息的顺序性
1. 产生原因
例如:对订单状态的修改封装为消息,存到消息队列里面。由消费者消费,修改数据库中订单的状态。在生产环境中大多都是集群部署,即一个队列对应多个消费者。由于消费者消费消息的能力,或速度不同,导致原本对于id为1001的订单的状态修改顺序为【A,B,C】,变为了【B,C,A】,从而导致订单状态出现错误的现象。
2. 解决方案
在上述场景中,问题主要出现在多个消费者对同一个队列进行消费上。即rabbitmq的work模式是无法保证消息的顺序消费的。要想保证顺序消费,只有使用Hello-world模式,即一个队列只有一个消费者,且需要注意的是消费者一次只能有一个线程对消息进行消费。
但这样又带来一个问题,消费者消费消息的性能会大大下降。所以我们可以通过建立多个队列,每个队列也只绑定一个消费者。根据订单id做一致性Hash运算来决定被转发到哪一个队列里面。这样既保证了对同一个订单状态信息的顺序消费,又保证了消息的消费速率问题。
推荐文章
发表评论