问题描述

同事说有个项目不能自动创建新队列,每次新增队列都要先在rabbitmq后台手动创建,并且只有生产环境是这样,测试环境没有这个问题。

我一听还有这种事情,这怎么能忍,本着我不入地狱谁入地狱的精神开始排查这个问题,顺便在深入了解下springboot怎么实现rabbitmq队列的自动创建。

查找原因

正常来说,消费者使用@RabbitListener注解声明了queue、exchange之后会自动创建不存在的队列,并声明队列和exchange的绑定关系。在这个项目中也是这么用的,但是为什么没有创建队列呢。

首先通过观察启动日志发现如下报错信息片段

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no queue 'test_queue' in vhost 'test_vhost', class-id=50, method-id=10)

通过报错信息能很明确知道是因为队列在vhost中不存在,channel异常关闭。通过官网也能说明这一点

rabbitmq官网对channel错误信息描述但这还是因为没有创建队列导致的结果,还要继续找没有自动创建的原因。继续查看日志发现还有一条报错信息

Channel shutdown: channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'auto_delete' for exchange 'dome_exchange' in vhost 'test_vhost': received 'false' but current is 'true', class-id=40, method-id=10)

因为这条报错信息和当前队列并没有直接关系,所以被同事忽略了。但我看到channel.close关键字心想难道就是因为这个报错导致链接关闭没有创建队列?带着这个疑问继续排查问题。

查看源码

首先确认报错原因,通过rabbitmq后台查看deme_exchange,发现auto_delete参数确实为true

rabbitmq后台查看参数

消费者代码

消费者代码中并没有声明auto_delete参数,通过查看@Exchange注解发现auto_delete默认为false

/**

* @return true if the exchange is to be declared as auto-delete.

*/

String autoDelete() default "false";

这就导致应用启动时代码中声明的exchange和远程已经存在的参数不一致,导致报406错误。通过上面官网截图也可看到对406报错的说明能够印证这一点。至于不一致的原因,后来问同事他应该是在后台手动创建的exchange,并没有注意这些参数细节。所以一定要规范操作啊,不然各种挖坑。

找到406的报错原因后,继续排查为什么队列没有自动创建。通过查看@RabbitListener注解源码,发现对queues参数的说明如下(不同springboot版本说明可能不一样)

/**

* The queues for this listener.

* The entries can be 'queue name', 'property-placeholder keys' or 'expressions'.

* Expression must be resolved to the queue name or {@code Queue} object.

* The queue(s) must exist, or be otherwise defined elsewhere as a bean(s) with

* a {@link org.springframework.amqp.rabbit.core.RabbitAdmin} in the application

* context.

* Mutually exclusive with {@link #bindings()} and {@link #queuesToDeclare()}.

* @return the queue names or expressions (SpEL) to listen to from target

* @see org.springframework.amqp.rabbit.listener.MessageListenerContainer

*/

String[] queues() default {};

其中说到要么队列已经存在,或者在其他地方定义了RabbitAdmin这个bean。

继续查看RabbitAdmin源码发现几个关键信息,该类实现InitializingBean接口,该接口在Bean初始化完成后会执行指定逻辑,在InitializingBean接口下的afterPropertiesSet()方法中实现,RabbitAdmin重写的afterPropertiesSet()方法中发现会调用initialize()方法,该方法源码如下

/**

* Declares all the exchanges, queues and bindings in the enclosing application context, if any. It should be safe

* (but unnecessary) to call this method more than once.

*/

@Override // NOSONAR complexity

public void initialize() {

if (this.applicationContext == null) {

this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");

return;

}

this.logger.debug("Initializing declarations");

Collection contextExchanges = new LinkedList(

this.applicationContext.getBeansOfType(Exchange.class).values());

Collection contextQueues = new LinkedList(

this.applicationContext.getBeansOfType(Queue.class).values());

Collection contextBindings = new LinkedList(

this.applicationContext.getBeansOfType(Binding.class).values());

Collection customizers =

this.applicationContext.getBeansOfType(DeclarableCustomizer.class).values();

processDeclarables(contextExchanges, contextQueues, contextBindings);

final Collection exchanges = filterDeclarables(contextExchanges, customizers);

final Collection queues = filterDeclarables(contextQueues, customizers);

final Collection bindings = filterDeclarables(contextBindings, customizers);

for (Exchange exchange : exchanges) {

if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) {

this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("

+ exchange.getName()

+ ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "

+ "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "

+ "reopening the connection.");

}

}

for (Queue queue : queues) {

if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {

this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("

+ queue.getName()

+ ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"

+ queue.isExclusive() + ". "

+ "It will be redeclared if the broker stops and is restarted while the connection factory is "

+ "alive, but all messages will be lost.");

}

}

if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {

this.logger.debug("Nothing to declare");

return;

}

this.rabbitTemplate.execute(channel -> {

declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));

declareQueues(channel, queues.toArray(new Queue[queues.size()]));

declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));

return null;

});

this.logger.debug("Declarations finished");

}

可以看到initialize()方法中会从spring容器中获取所有的Exchange、Queue和Binding等bean,并在declareExchanges()、declareQueues()、declareBindings()等方法中创建不存在的exchange、queues和绑定关系。这也能解释为什么要在消费者中创建相应的exchange、queue和Bingding的bean,并且自动创建队列。

当declareExchanges()方法有异常返回406时,并不会继续执行下面的declareQueues()和declareBindings()方法,所以新队列并没有自动创建。至此就找到了队列没有创建的原因以及自动创建exchange和queues相关的源码。而测试环境没有报错的原因是因为不存在exchange参数不一致的情况。

RabbitAdmin是如何自动创建的

进一步思考发现RabbitAdmin这个bean并没有在代码中创建,那么它是什么时候注入到容器中的呢。

继续追查源码发现RabbitAdmin在RabbitAutoConfiguration类中被创建并注入到容器中

@Bean

@ConditionalOnSingleCandidate(ConnectionFactory.class)

@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)

@ConditionalOnMissingBean

public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {

return new RabbitAdmin(connectionFactory);

}

RabbitAutoConfiguration是rabbitmq的配置类,通过注解可以发现当存在RabbitTemplate类时会启用

@Configuration(proxyBeanMethods = false)

@ConditionalOnClass({ RabbitTemplate.class, Channel.class })

@EnableConfigurationProperties(RabbitProperties.class)

@Import(RabbitAnnotationDrivenConfiguration.class)

public class RabbitAutoConfiguration {

RabbitTemplate是对Rabbitmq封装的操作类,在项目中有被显式声明,所以RabbitAutoConfiguration也会被创建,至此rabbitmq自动创建队列的大致流程就被串起来了。

精彩内容

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。