前言

在消息中间件的实际使用中如何保证消息的可靠性是我们必须要考虑的,因此就需要了解一下Rabbitmq的confirm模式、return模式,以及消费端的手动确认ACK等等了。

1、confirm模式:

此模式是作用在生产端的,开启了这个模式就可以知道消息有木有发送到exchange上。不管有没有发送到都会触发回调方法。

2、return模式:

此模式同样是作用在生产端的,这个模式就是为了知道消息有木有发送到对应的队列上。如果发送到了对应的队列不会触发回调方法,如果没有发送到对应的队列才会触发回调方法。

3、消费端的ACK确认:

这个是作用在消费端的,为了保证消息正确被消费者消费,分为手动确认和自动确认。

自动确认:消息一旦被消费了队列就会立马丢弃,不管你的业务逻辑最后是否正确执行。

手动确认:消息被消费了队列不会立马丢弃,需要手动应答,如果业务逻辑正确执行完成,那么通知队列你可以丢弃了,否则可以让消息再次返回队列处理或者直接丢弃也行。

还是直接上代码吧

代码实现

环境:2个SpringBoot工程(我这里就不搭建了)

1、生产端代码

生产端我直接用Junit来测试发送消息了,结构如下

pom.xml:

<dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-amqp</artifactId>

        </dependency>

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-test</artifactId>

            <scope>test</scope>

            <exclusions>

                <exclusion>

                    <groupId>org.junit.vintage</groupId>

                    <artifactId>junit-vintage-engine</artifactId>

                </exclusion>

            </exclusions>

        </dependency>

        <dependency>

            <groupId>org.springframework.amqp</groupId>

            <artifactId>spring-rabbit-test</artifactId>

            <scope>test</scope>

        </dependency>

        <dependency>

            <groupId>junit</groupId>

            <artifactId>junit</artifactId>

            <version>4.12</version>

            <scope>test</scope>

        </dependency>

yml配置文件:

spring:

  application:

    name: rabbitmq-springboot

  rabbitmq:

    host: localhost

    port: 5672

    username: guest

    password: guest

    virtual-host: /test   #虚拟主机

    publisher-returns: true  #开启return模式

    publisher-confirm-type: correlated  #开启confirm模式  旧的jar是 publisher-confirms: true

RouteProduce:

package com.lhm.route;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

/**

 * @Author: lhm

 * @Date: 2020/11/20 15:23

 * 4

 */

@Component

public class RouteProduce implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    @Autowired

    private RabbitTemplate rabbitTemplate;

    public void send_route(String exchange,String routeKey, Message message, CorrelationData correlationData){

        //不设置ReturnCallback和ConfirmCallback是没有办法触发回调方法的

        rabbitTemplate.setReturnCallback(this);

        rabbitTemplate.setConfirmCallback(this);

        rabbitTemplate.convertAndSend(exchange,routeKey,message,correlationData);

    }

    @Override

    public void confirm(CorrelationData correlationData, boolean ack, String acuse) {

        System.out.println(correlationData.getId());

        if (ack){

            System.out.println("消息发送到exchange成功");

        }else{

            System.out.println("消息发送到exchange失败,原因:"+acuse);

        }

    }

    /**

     * @param message  消息体

     * @param replyCode 响应code

     * @param replyText 响应内容

     * @param exchange 交换机

     * @param routingKey 队列

     */

    @Override

    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

        System.out.println(new String(message.getBody())+"---replyCode:"+replyCode+"---replyText"+replyText+"---exchange"+exchange+"---"+routingKey);

        System.out.println("消息返回处理中...");

        //可以重发消息

    }

}

Test方法:

package com.lhm;

import com.lhm.route.RouteProduce;

import org.junit.jupiter.api.Test;

import org.junit.runner.RunWith;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessageDeliveryMode;

import org.springframework.amqp.core.MessageProperties;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes = RabbitmqSpringbootApplication.class)

@RunWith(SpringRunner.class)

class RabbitmqSpringbootApplicationTests{

    @Autowired

    private RouteProduce routeProduce;

    //route 路由模式

    @Test

    public void testRoute(){

        //设置消息的唯一标识

        //可以在消费端取到 String msgId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");

        CorrelationData correlationId = new CorrelationData(java.util.UUID.randomUUID().toString());

        Message message = new Message("大家好,我是一个消息".getBytes(),new MessageProperties());

        //MessageProperties可以封装消息的很多属性,例如设置消息的过期时间,消息的是否持久化等等属性

        //PERSISTENT 持久化消息

        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);

        routeProduce.send_route("directs","info",message,correlationId);

    }

}

消费端代码

创建一个SpringBoot工程,结构如下

pom.xml:

引入这一个就行了

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-amqp</artifactId>

        </dependency>

yml配置文件:

server:

  port: 8080

spring:

  application:

    name: consumer

  rabbitmq:

    host: localhost

    port: 5672

    username: guest

    password: guest

    virtual-host: /test   #虚拟主机

    listener:

      simple:

        # 签收模式为手动签收

        #none意味着没有任何的应答会被发送。

        #manual意味着监听者必须通过调用Channel.basicAck()来告知所有的消息

        #auto意味着容器会自动应答,除非MessageListener抛出异常,这是默认配置方式。

        acknowledge-mode: manual

        # 每次从RabbitMQ获取的消息数量

        prefetch: 1

        #default-requeue-rejected 设置成false,让无法消费的消息进入死信队列,如果没有加入死信队列这玩意不要

        default-requeue-rejected: false

        # 每个队列启动的消费者数量

        concurrency: 1

        # 每个队列最大的消费者数量

        max-concurrency: 1

#        retry:

#          enabled: true  #开启重试

#          max-attempts: 4  #重试次数,默认3

Consumer消费端:

流程:

1、消息根据路由key到达receive,开始进行业务逻辑处理

2、如果消息不为空的话进入输出打印,打印完直接手动ack,告诉队列可以丢弃了

3、如果消息为空的话抛出一个异常然后进行捕获,判断消息是否重新处理过了(第一次为false,如果重新投入队列还是没有手动ack,就为true)

4、如果message.getMessageProperties().getRedelivered()为false,调用basicNack将消息再次返回队列。

5、如果message.getMessageProperties().getRedelivered()为true,调用basicReject直接投入死信队列。

消息变成死信的三种情况:1、消息被拒绝(basic.rgject/basic.nack)并且requeue设置为false时    2、消息ttl过期     3、超过队列最大长度

package com.lhm.rabbitmq.consumer;

import com.rabbitmq.client.Channel;

import org.springframework.amqp.core.ExchangeTypes;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.*;

import org.springframework.stereotype.Component;

import java.io.IOException;

/**

 * @Author: lhm

 * @Date: 2020/11/20 21:39

 * 4

 */

@Component

public class Consumer {

    @RabbitListener(bindings = {

            @QueueBinding(

                    value = @Queue(value = "direct.info"//指定一下队列名,默认持久队列,不指定则为临时队列

                            ,arguments = {

                            @Argument(name = "x-dead-letter-exchange",value = "dlx.exchange"), //指定一下死信交换机

                            @Argument(name = "x-dead-letter-routing-key",value = "deadKey"),  //指定死信交换机的路由key

                            @Argument(name = "x-message-ttl",value = "3000",type = "java.lang.Long") //指定队列的过期时间,type需要指定为Long,否则会抛异常

                            //,@Argument(name = "x-max-length",value = "3") //指定队列最大长度,超过会被投入死信,至于type是否需要指定为Long,本人没试过

                    }

                    ),

                    exchange = @Exchange(value = "directs"),//Exchang的默认类型就是direct,所以type可以不写

                    key = {"info"}

            )

    })

    private void receive(Message message, Channel channel) throws Exception{

        String msg = new String(message.getBody());

        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        String msgId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");

        try {

            if (msg!=null&&!"".equals(msg)){

                System.out.println("message; "+msg);

                System.out.println("业务处理成功");

                //ture:确认本条消息以及之前没有确认的消息,false:仅确认本条消息

                //basicAck 后队列会丢弃掉消息

                channel.basicAck(deliveryTag, false);

                //放开channel.basicNack(deliveryTag,false,true); 将channel.basicAck(deliveryTag, false);注释掉那么消息3秒内没有ack就会投入死信

                //channel.basicNack(deliveryTag,false,true);

            }else {

                //模拟业务处理失败抛出异常

                System.out.println("业务处理失败");

                throw new IOException();

            }

        }catch (Exception e){

            e.printStackTrace();

            //message.getMessageProperties().getRedelivered() 第一次得到的是false ,所以不会投入死信

            if (message.getMessageProperties().getRedelivered()){

                System.out.println("消息已重复处理失败,拒绝再次接收... ,将直接投入死信队列");

                // 拒绝消息  requeue:true该条消息重新返回MQ false直接丢弃

                channel.basicReject(deliveryTag, false);

            }else {

                System.out.println("消息即将再次返回队列处理...  ,如果再一次无法正常basicAck,那么就会执行basicReject");

                /*

                 * deliveryTag:该消息的index

                 * multiple: ture确认本条消息以及之前没有确认的消息(批量),false仅确认本条消息

                 * requeue: true该条消息重新返回MQ queue,MQ broker将会重新发送该条消息

                 */

                channel.basicNack(deliveryTag,false,true);

            }

        }

    }

    @RabbitListener(bindings = {

            @QueueBinding(

                    value = @Queue(value = "dlx.queue"),

                    exchange = @Exchange(value = "dlx.exchange"),//Exchang的默认类型就是direct,所以type可以不写

                    key = {"deadKey"}

            )

    })

    public void receive2(Message message,Channel channel) throws Exception{

        System.out.println("我是一条死信:"+new String(message.getBody()));

        //打印完直接丢弃消息

        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);

    }

}

至于结果啥的我就不贴了,贴出来实在太乱了。写出来也是为了做个笔记,有需要的可以参考参考。

原文链接:https://blog.csdn.net/aa821198112/article/details/109902695