Spring RabbitMQ中消费者可使用@RabbitListener标注的方法进行处理,这里介绍下@RabbitListener注解详细的处理过程。

解析类

RabbitListenerAnnotationBeanPostProcessor类用于解析RabbitListener注解,该类实现了BeanPostProcessor,Ordered,BeanFactoryAware,BeanClassLoaderAware,EnvironmentAware说明该类具有以下特性:

是一个Bean后处理,@RabbitListener标注的类(或方法所在类)必须可被Bean实例化,也就是要么在@Bean方法中手动创建实例,或者用@@Component标注。

可设定Bean初始化优先级

注入BeanFactory,Environment,可以获取Bean工厂中的Bean实例和环境变量

单例实例化后处理

解析入口方法:postProcessAfterInitialization

查找RabbitListener标注的类或方法

Bean工厂类中的Bean都是通过动态代理创建的代理类,要获取目标类,先使用AopUtils

    Class<?> targetClass = AopUtils.getTargetClass(bean);

获取目标类中RabbitListener注解

有2种方式配置RabbitListener,一种使用RabbitListener标注类且使用RabbitHandler标注方法,另一种使用RabbitListener标注方法,两种模式同时存在也可以。使用RabbitHandler标注的方法和RabbitListener标注的方法都会被当做消费者的消息监听方法

    //查找RabbitListener标注的类

    Collection<RabbitListener> classLevelListeners = findListenerAnnotations(targetClass);

    ReflectionUtils.doWithMethods(targetClass, new ReflectionUtils.MethodCallback() {

            @Override

            public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {

                //查找RabbitListener标注的方法

                Collection<RabbitListener> listenerAnnotations = findListenerAnnotations(method);

                if (listenerAnnotations.size() > 0) {

                    methods.add(new ListenerMethod(method,listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()])));

                }

                //存在RabbitListener标注的类

                if (hasClassLevelListeners) {

                    //查找RabbitHandler标注的方法

                    RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class);

                    if (rabbitHandler != null) {

                        multiMethods.add(method);

                    }

                }

            }

        }, ReflectionUtils.USER_DECLARED_METHODS);//查找范围是除了Object类之外的方法

处理@RabbitListener标注的方法

@RabbitListener中可使用SpEL表达式

protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,Object adminTarget, String beanName) {

    endpoint.setBean(bean);

    endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);

    //Id

    endpoint.setId(getEndpointId(rabbitListener));

    //解析队列

    endpoint.setQueueNames(resolveQueues(rabbitListener));

    //独占队列

    endpoint.setExclusive(rabbitListener.exclusive());

    //分组

    if (StringUtils.hasText(group)) {

        Object resolvedGroup = resolveExpression(group);

        if (resolvedGroup instanceof String) {

            endpoint.setGroup((String) resolvedGroup);

        }

    }

    //优先级

    String priority = resolve(rabbitListener.priority());

    if (StringUtils.hasText(priority)) {

        endpoint.setPriority(Integer.valueOf(priority));

    }

    //查找rabbitAdmin

    String rabbitAdmin = resolve(rabbitListener.admin());

    if (StringUtils.hasText(rabbitAdmin)) {

        endpoint.setAdmin(this.beanFactory.getBean(rabbitAdmin, RabbitAdmin.class));

    }

    //查找containerFactory

    String containerFactoryBeanName = resolve(rabbitListener.containerFactory());

    if (StringUtils.hasText(containerFactoryBeanName)) {

        factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class);

    }

    //将endpoint关联到containerFactory中

    this.registrar.registerEndpoint(endpoint, factory);

}

private String[] resolveQueues(RabbitListener rabbitListener) {

    String[] queues = rabbitListener.queues();

    QueueBinding[] bindings = rabbitListener.bindings();

    //queues和bindings都配置会抛出异常

    if (queues.length > 0 && bindings.length > 0) {

        throw new BeanInitializationException("@RabbitListener can have 'queues' or 'bindings' but not both");

    }

    List<String> result = new ArrayList<String>();

    if (queues.length > 0) {

        for (int i = 0; i < queues.length; i++) {

            //队列名称可为SpEL表达式,如#{myQueue},注意不能带.,如my.queue(可能会报错)

            Object resolvedValue = resolveExpression(queues[i]);

            //解析队列名称,resolvedValue可为String[],Queue,String,最终都会解析为String,存放到result中

            resolveAsString(resolvedValue, result);

        }

    }else {

        return registerBeansForDeclaration(rabbitListener);

    }

    return result.toArray(new String[result.size()]);

}

private String[] registerBeansForDeclaration(RabbitListener rabbitListener) {

    List<String> queues = new ArrayList<String>();

    if (this.beanFactory instanceof ConfigurableBeanFactory) {

        for (QueueBinding binding : rabbitListener.bindings()) {

            //声明队列

            String queueName = declareQueue(binding);

            queues.add(queueName);

            //声明Exchange和Binding

            declareExchangeAndBinding(binding, queueName);

        }

    }

    return queues.toArray(new String[queues.size()]);

}

//具体声明队列和Exchange及Binding的代码可以去看源码

处理@RabbitListener标注的类和@RabbitHandler标注的方法

解析原理同上,只是解析信息封装在MultiMethodRabbitListenerEndpoint中,上面的是MethodRabbitListenerEndpoint

这里要说明下注解中均可以使用SpEL表达式,SpEL表达式可以解析BeanFactory和Environment中(spring.rabbitmq.emptyStringArguments(多个,分隔))的数据,如我们自己创建的Queue、Exchaneg、Binding等Bean这里都可以通过表达式引用(定义好Bean名称),还比如我们使用@Queue、@Exchange、@binding配置的属性,也可以通过表达式引用,这些配置值可以写在配置文件中。

beanFactory:DefaultListableBeanFactory实例,Bean工厂

resolver:StandardBeanExpressionResolver,通过SpEL表达式解析Bean实例

expressionContext:BeanExpressionContext,封装了beanFactory,Bean实例上下文,提供SpEL表达式解析

environment:StandardServletEnvironment

原文链接:https://www.jianshu.com/p/c2abde83a241


大家都在看: