消息接收的确认机制主要有三种模式:

自动确认AcknowledgeMode.NONE

RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。

所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。

一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。

根据情况确认AcknowledgeMode.AUTO

这也是SpringBoot集成RabbitMQ默认的消息确认情况,如果消费消息时有异常抛出,则会拒绝消息,反之如果没有捕获到异常则确认本次消费成功。

手动确认AcknowledgeMode.MANUAL

这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。

消费者收到消息后,手动调用basicAck/basicNack/basicReject后,RabbitMQ收到这些消息后,才认为本次投递成功。

1、创建手动确认消息的队列

@Configuration
public class DirectRabbitConfig {
    //Direct交换机 起名:directExchange
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("directExchange", true, false);
    }
    //需要手动确认消息的队列
    @Bean
    public Queue manualAckQueue() {
        return new Queue("manualAckQueue", true, false, false);
    }
    //手动确认消息的队列和直连交换机绑定
    @Bean
    public Binding bindingDirectForManualAck() {
        return BindingBuilder.bind(manualAckQueue()).to(directExchange()).with("manualAck");
    }
}

2、手动确认消息的监听实现

2.1、通过配置实现

spring:
  rabbitmq:
    host: 148.70.153.63
    port: 5672
    username: libai
    password: password
    listener:
      simple:
        # 手动确认
        acknowledge-mode: manual
        # 拒绝接收的消息是否重回队列
        default-requeue-rejected: true

2.2、配置类实现(更加灵活)

@Configuration
@Slf4j
public class MessageManualAckListenerConfig {
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        // RabbitMQ默认是自动确认,这里改为手动确认消息
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 设置需要手动确认消息的队列,可以同时设置多个,前提是队列需要提前创建好
        container.setQueueNames("manualAckQueue");
        // 设置监听消息的方法,匿名内部类方式
        container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
            // 开始消费消息
            log.info("body:\n{}", JSONUtil.toJsonPrettyStr(new String(message.getBody())));
            log.info("prop:\n{}", JSONUtil.toJsonPrettyStr(message.getMessageProperties()));
            // 手动确认
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            channel.basicAck(deliveryTag, false); // 肯定确认
        });
        return container;
    }
}

body:接收的消息内容。

messageProperties:消息的相关属性。

3、发送消息测试手动确认

3.1、调用接口

@RestController
public class MessageManualAckController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostMapping("/manualAck")
    public String manualAck() {
        Map<String, Object> map = new HashMap<>();
        map.put("messageId", String.valueOf(UUID.randomUUID()));
        map.put("messageData", "manualAck");
        map.put("createTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        rabbitTemplate.convertAndSend("directExchange", "manualAck", JSONUtil.toJsonStr(map));
        return "ok";
    }
}

3.2、查看控制台打印输出。

2020-09-17 22:35:28,635 [INFO] [simpleMessageListenerContainer-1] [net.zhaoxiaobin.rabbitmq.manual.MessageManualAckListenerConfig:32] [] body:
{
    "createTime": "2020-09-17 22:35:28",
    "messageId": "25398d1a-474e-48cd-a3df-460b780e9d97",
    "messageData": "manualAck"
}
2020-09-17 22:35:28,636 [INFO] [simpleMessageListenerContainer-1] [net.zhaoxiaobin.rabbitmq.manual.MessageManualAckListenerConfig:33] [] prop:
{
    "headers": {
        "spring_listener_return_correlation": "a6622c5e-22f0-4f39-bea5-4360ef8de66b"
    },
    "finalRetryForMessageWithNoId": false,
    "contentLengthSet": false,
    "deliveryTag": 3,
    "receivedExchange": "directExchange",
    "priority": 0,
    "receivedRoutingKey": "manualAck",
    "redelivered": false,
    "consumerTag": "amq.ctag-cqCpyMhe9Ak2vuv_RifFlQ",
    "receivedDeliveryMode": "PERSISTENT",
    "publishSequenceNumber": 0,
    "contentEncoding": "UTF-8",
    "contentLength": 0,
    "contentType": "text/plain",
    "consumerQueue": "manualAckQueue",
    "deliveryTagSet": true
}

3.3、消费消息时的状态变化

通过打断点方式查看当消息未被确认时在RabbitMQ server中的状态。

unacked

4、确认/拒绝消息

4.1、basicAck

确认接收消息。

第2个参数如果设为true,则表示批量确认当前通道中所有deliveryTag小于当前消息的所有消息。

4.2、basicNack

拒绝应答消息。

第2个参数如果设为true,则表示批量拒绝当前通道中所有deliveryTag小于当前消息的所有消息。

第3个参数如果设为true,则表示当前消息再次回到队列中等待被再次消费。

4.3、basicReject

拒绝接收消息。与basicNack作用类似,只不过一次只能拒绝单条消息。

对于拒绝接收消息并且重回队列使用时需要谨慎,避免使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,会导致大量消息的积压。

链接:https://www.jianshu.com/p/9bcb0242b441