记一次RabbitMQ设置手动ack报错:Channel closed; cannot ack/nack

报错内容

java.lang.IllegalStateException: Channel closed; cannot ack/nack
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1181)
at com.sun.proxy.$Proxy216.basicAck(Unknown Source)

前置条件

使用了nacos作为服务注册,rabbitmq的配置文件设置了手动ack,yml文件也是相同配置,配置文件如下:

listener:
      simple:
        acknowledge-mode: manual #采取手动应答
        retry:
          enabled: true # 是否支持重试

mq工厂配置:

@Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }

消费者代码如下:

/**
     * 处理消息
     * @param plan
     * @param channel
     * @param message
     * @throws IOException
     */
    @RabbitHandler
    public void handle(@Payload HdPlanResponse plan, Channel channel, Message message) throws IOException {
        LOGGER.info("receive message plan:{}",plan);
        try{
            hdMessageService.saveMessageByType(plan);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            LOGGER.error("消费失败原因:",e);
            LOGGER.info("消费失败重试 plan:{}",plan);
            if(message.getMessageProperties().getRedelivered()){
                LOGGER.info("重复失败丢弃消息 plan:{}",plan);
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            }else{
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
            }
        }
    }

出现错误情况

当同时发送两条mq消息同时消费的时候,第一条mq消息是没有问题的,但是从第二条开始就会报错:

java.lang.IllegalStateException: Channel closed; cannot ack/nack
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1181)
at com.sun.proxy.$Proxy216.basicAck(Unknown Source)
at com.lishicloud.datacenter.red.manage.message.PlanMessageReceiver.handle(PlanMessageReceiver.java:43)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.invoke(DelegatingInvocableHandler.java:130)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:66)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:220)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:148)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:133)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1579)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1498)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1486)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1477)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1421)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:963)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:81)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1284)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1190)
at java.lang.Thread.run(Thread.java:748)

出现问题原因

虽然我们配置文件里设置了手动ack,但是我们在工厂里设置了自己的json解析器,如果设置了自己的json解析器就会导致配置文件里的手动ack配置失效,具体原因应该是rabbitmq的配置加载顺序问题

解决办法

需要在工厂类里再次重新设置一下手动ack,代码如下:

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);             //开启手动 ack
        return factory;
    }

原文链接:https://blog.csdn.net/qq_42894258/article/details/118357205


大家都在看: