记一次RabbitMQ设置手动ack报错:Channel closed; cannot ack/nack
报错内容
java.lang.IllegalStateException: Channel closed; cannot ack/nack at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1181) at com.sun.proxy.$Proxy216.basicAck(Unknown Source)
前置条件
使用了nacos作为服务注册,rabbitmq的配置文件设置了手动ack,yml文件也是相同配置,配置文件如下:
listener: simple: acknowledge-mode: manual #采取手动应答 retry: enabled: true # 是否支持重试
mq工厂配置:
@Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; }
消费者代码如下:
/** * 处理消息 * @param plan * @param channel * @param message * @throws IOException */ @RabbitHandler public void handle(@Payload HdPlanResponse plan, Channel channel, Message message) throws IOException { LOGGER.info("receive message plan:{}",plan); try{ hdMessageService.saveMessageByType(plan); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ LOGGER.error("消费失败原因:",e); LOGGER.info("消费失败重试 plan:{}",plan); if(message.getMessageProperties().getRedelivered()){ LOGGER.info("重复失败丢弃消息 plan:{}",plan); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); }else{ channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true); } } }
出现错误情况
当同时发送两条mq消息同时消费的时候,第一条mq消息是没有问题的,但是从第二条开始就会报错:
java.lang.IllegalStateException: Channel closed; cannot ack/nack at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1181) at com.sun.proxy.$Proxy216.basicAck(Unknown Source) at com.lishicloud.datacenter.red.manage.message.PlanMessageReceiver.handle(PlanMessageReceiver.java:43) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.invoke(DelegatingInvocableHandler.java:130) at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:66) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:220) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:148) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:133) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1579) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1498) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1486) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1477) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1421) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:963) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:81) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1284) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1190) at java.lang.Thread.run(Thread.java:748)
出现问题原因
虽然我们配置文件里设置了手动ack,但是我们在工厂里设置了自己的json解析器,如果设置了自己的json解析器就会导致配置文件里的手动ack配置失效,具体原因应该是rabbitmq的配置加载顺序问题
解决办法
需要在工厂类里再次重新设置一下手动ack,代码如下:
@Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //开启手动 ack return factory; }
原文链接:https://blog.csdn.net/qq_42894258/article/details/118357205
发表评论