一、rabbitmq如何保证消息不丢失?

RabbitMq丢失消息的场景大致分为以上四种情况。

        1.生产者在发消息给交换机的过程中消息丢失

        2.交换机成功收到消息,再路由给队列的过程中丢失

        3.队列未持久化,服务器宕机导致丢失

        4.消费者未成功消费消息,导致消息丢失

1. 对于1、2过程中出现消息丢失的问题可以使用生产者的发布确认模式。(配置如下)

/**

* rabbitTemplate初始化配置

* @param connectionFactory spring提供的RabbitMq连接池

* @return

*/

@Bean

public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {

//设置生产者消息发送确认类型

connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);

//设置消息发送后需要回调

connectionFactory.setPublisherReturns(true);

//创建RabbitTemplate

RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

//设置Rabbitmq消息的序列化方式

rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

//如果设置了 rabbitTemplate.setMandatory(true) 参数,则会将消息退回给 producer。并执行回调函数 returnedMessage。

rabbitTemplate.setMandatory(true);

//confirmCallBack是确保消息投递到交换机的回调

//消息进入exchange是触发

rabbitTemplate.setConfirmCallback((correlationData, ack, cuase) ->{

if (ack){

log.info("消息成功到达交换机");

}else {

log.info("消息投递到交换机失败,可考虑重新投递");

}

});

//returnCallBack是确保消息通过交换机转发给消息队列的回调

//消息未送达队列触发回调

rabbitTemplate.setReturnCallback((message,replyCode,replyText, exchange, routingKey) -> {

log.error("消息发送失败,未送达队列,message:{},replyCode:{},replyText:{},exchange:{},exchange:{}",message, replyCode, replyText, exchange, routingKey);

});

return rabbitTemplate;

}

 配置好了之后,生产者直接调用convertAndSend即可发送消息。

ps:如果消息在由交换机到队列的过程中失败,还可以为交换机指定备份交换机

//声明队列

@Bean

public Queue directQueue(){

return new Queue("direct-queue",true,false,false,null);

}

//声明交换机,并指定备份交换机

@Bean

public DirectExchange directExchange() {

Map map = new HashMap<>();

map.put("alternate-exchange","alternate-exchange");

return new DirectExchange("direct-exchange",true,false,map);

}

//声明备份队列

@Bean

public Queue alternateQueue(){

return new Queue("alternate-queue",true,false,false,null);

}

/**

* 备份交换机使用扇形模式,可以绑定多个队列,每个队列分别执行消费,预警,入库等操作

* @return

*/

@Bean

public FanoutExchange alternateExchange() {

return new FanoutExchange("alternate-exchange",true,false);

}

//绑定工作交换机与队列

@Bean

public Binding directBinding(){

return BindingBuilder.bind(directQueue()).to(directExchange()).with("hello");

}

//绑定备份交换机与队列

@Bean

public Binding alternateBinding(){

return BindingBuilder.bind(alternateQueue()).to(alternateExchange());

}

 2. 对于场景3 出现消息丢失的可能存在以下两点

        1. 如果消息队列未设置持久化,则可通过设置队列持久化来保存内存中的消息到磁盘,以免服务宕机导致队列中的消息丢失。

        2.如果队列设置了持久化,队列中的部分还未全部持久化到磁盘,此事服务宕机,就可能回丢失少量的消息。这种情况可以搭建镜像集群来解决。

3. 对于场景4 消费者方可以使用手动ack+消息重新入队的方式解决。

@RabbitListener(queues = "direct-queue")

public void consumeMsg(String msg, Message message, Channel channel) throws IOException {

try{

//处理业务逻辑

log.info("收到消息,msg:{}",msg);

long deliveryTag = message.getMessageProperties().getDeliveryTag();

//签收消息

channel.basicAck(deliveryTag,false);

}catch (Exception e) {

//判断是否已经重新投递

if (message.getMessageProperties().getRedelivered()){

//已经重新入过队了,则直接丢弃(如果配置了死信队列,则转到私信队列里面)

channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);

}else {

//没有重新入队,则重新入队

channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);

}

}

}

ps: 在此种场景下还可以配置死信交换机,对丢失的消息做进一步的处理

//声明队列

@Bean

public Queue directQueue(){

Map map = new HashMap<>();

//指定进入的死信交换机

map.put("x-dead-letter-exchange","dead_direct_exchange");

//direct模式需要配置,fanout模式是不需要配置的

map.put("x-dead-letter-routing-key","dead");

return new Queue("direct-queue",true,false,false,map);

}

//声明死信队列

@Bean

public Queue deadQueue(){

return new Queue("dead-queue",true,false,false,null);

}

//声明死信交换机

@Bean

public DirectExchange deadExchange() {

return new DirectExchange("dead_direct_exchange",true,false);

}

@Bean

public Binding deadBinding(){

return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");

}

二、如何保证消息的幂等性

所谓消息的幂等性,就是保证消息不被重复消费。例如一条扣款消息,被消费了两次,扣了两次款,显然是有问题的。所以保证消息的幂等性是很有必要的。

这里我们采用全局唯一id的方式来保证幂等性:

        1. 生产者在发送消息之前先生成一个全局唯一Id作为消息的ID和消息体一起发送到队列,并存放在redis里面

        2. 消费者获取到消息后,先判断该消息Id是否存在于redis中,如果存在说明该消息还未被消费,则走消费逻辑,消费完后将redis中的消息id删除;如果不存在则说明消息已经被消费了,则直接签收不做任何处理。(代码实现如下所示)

//生产者

@Resource

private RabbitTemplate rabbitTemplate;

@Autowired

private StringRedisTemplate redisTemplate;

public void produceMsg(String msg) {

//生成全局唯一id

String msgId = UUID.randomUUID().toString();

//存到redis里面

redisTemplate.opsForValue().set("msg:"+msgId,"");

HashMap map = new HashMap<>();

map.put("messageId", msgId);

map.put("msg",msg);

rabbitTemplate.convertAndSend("direct-queue",map);

}

//消费者

@Autowired

private StringRedisTemplate redisTemplate;

@RabbitListener(queues = "direct-queue")

public void consumeMsg(String msg, Message message, Channel channel) throws IOException {

try{

log.info("收到消息,msg:{}",msg);

HashMap map = JSON.parseObject(msg, HashMap.class);

Boolean msgId = redisTemplate.hasKey("msg:" + map.get("messageId"));

if (Boolean.FALSE.equals(msgId)){

//不存在说明已经消费过,则不做任何处理直接签收

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

return;

}

//存在则继续执行消费逻辑

log.info("处理业务逻辑");

//删除redis消息Id

redisTemplate.delete("msg:" + map.get("messageId"));

//签收消息

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

}catch (Exception e) {

//判断是否已经重新投递

if (message.getMessageProperties().getRedelivered()){

//已经重新入过队了,则直接丢弃(如果配置了死信队列,则转到私信队列里面)

channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);

}else {

//没有重新入队,则重新入队

channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);

}

}

}

保障消息幂等的方式有很多,我们这里采用的时删除则代表已经消费过的方式。也可以采用修改状态的方式;或者也根据具体的业务场景,选用其它合理的处理方式。

三、如何保证消息的顺序性

1. 产生原因

例如:对订单状态的修改封装为消息,存到消息队列里面。由消费者消费,修改数据库中订单的状态。在生产环境中大多都是集群部署,即一个队列对应多个消费者。由于消费者消费消息的能力,或速度不同,导致原本对于id为1001的订单的状态修改顺序为【A,B,C】,变为了【B,C,A】,从而导致订单状态出现错误的现象。

2. 解决方案

        在上述场景中,问题主要出现在多个消费者对同一个队列进行消费上。即rabbitmq的work模式是无法保证消息的顺序消费的。要想保证顺序消费,只有使用Hello-world模式,即一个队列只有一个消费者,且需要注意的是消费者一次只能有一个线程对消息进行消费。

        但这样又带来一个问题,消费者消费消息的性能会大大下降。所以我们可以通过建立多个队列,每个队列也只绑定一个消费者。根据订单id做一致性Hash运算来决定被转发到哪一个队列里面。这样既保证了对同一个订单状态信息的顺序消费,又保证了消息的消费速率问题。

推荐文章

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。