RabbitMQ之手动应答消息

1.为什么需要手动应答

当消费者完成一个任务需要一段时间,如果其中一个消费者处理一个长的任务并且只处理了部分突然他挂掉了,会发生什么情况。RabbitMQ一旦向消费者传递了一条消息,便立即将该条消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费者的消息,因为他无法接收到。

1.应答分类

1.1手动应答

相应的,使用手动应答时,需要把autoAck属性设置为false,然后进行手动应答。

消息手动应答 有如下几个方法

A.Channel.basicAck(用于肯定确认)

RabbitMQ已知道该消息并且成功的处理消息, 可以将其丢弃了

B.Channel.basicNack(用于否定确认)

C.Channel.basicReject(用于否定确认)

与Channel.basicNack相比少一个参数

不处理该消息了直接拒绝,可以将其丢弃了

手动应答时还有一个参数:Multiple 是否批量处理,一般选择false ,不批量处理。

1.2自动应答

自动应答只需要设置这个属性为true,rabbitmq就会开启自动应答。但是使用自动应答时需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者channel关闭,那么消息就丢失了,也有可能因为没有对传递的消息数量进行限制,导致消息积压,有可能把内存耗尽。

使用手动应答的好处?

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或TCP连接丢失),导致消息未发送ACK确认,RabbitMQ将了解到消息未完全处理,并将对其重新排队。

手动应答生产者(Producer)代码实现:

@Slf4j

public class ExerciseT {

    private static final String QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("192.168.121.36");

        factory.setUsername("admin");

        factory.setPassword("123");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        Scanner scanner = new Scanner(System.in);

        System.out.println("请输入信息");

        while (scanner.hasNext()){

            String message = scanner.nextLine();

            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

            log.info("生产者发出消息: "+message);

        }

    }

}

消费者1代码:

@Slf4j

public class ExerciseW2 {

    private static final String QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("192.168.121.36");

        factory.setUsername("admin");

        factory.setPassword("123");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        DeliverCallback deliverCallback = (consumerTag, message) -> {

            String msg = new String(message.getBody(), "UTF-8");

            log.info("W2接收到消息:" + msg);

            try {

                Thread.sleep(2000);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

            //手动应答消息

            //1st param message tag   2.multiple: true 批量应答  false:不批量应答,实际开发中

            //使用false

            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);

        };

        CancelCallback cancelCallback = consumerTag -> {

            log.info("W2取消消息发送" + consumerTag);

        };

        channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);

    }

}


消费者2代码

@Slf4j

public class ExerciseW3 {

    private static final String QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("192.168.121.36");

        factory.setUsername("admin");

        factory.setPassword("123");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        DeliverCallback deliverCallback = (consumerTag, message) -> {

            String msg = new String(message.getBody(), "UTF-8");

            log.info("W3接收到消息:" + msg);

            try {

                Thread.sleep(5000);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

            //手动应答消息

            //1st param message tag   2.multiple: true 批量应答  false:不批量应答,实际开发中

            //使用false

            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);

        };

        CancelCallback cancelCallback = consumerTag -> {

            log.info("W3取消消息发送" + consumerTag);

        };

        channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);

    }

}

然后把生产者代码和两个消费者都run,

刚开始生产者发送两条消息,然后两个消费者也是

轮询接收消息

然后生产者连续发送多条消息,可以看到消费者2消息了两条消息,然后关闭消费者2,剩余的消息全部被消费者1全部消费了。就证明了上面论述使用手动应答可以自动重新入队,所以不会出现消息丢失的情况。

原文链接:https://blog.csdn.net/qq_42212926/article/details/123744307