概念:消费者消费完一条消息可能需要等待一段时间,但如果这段时间内消费者在未完成消费信息的情况下时就挂掉了,这时候会怎么样?RabbitMQ一旦向消费者传递一条消息,该消息就会被标记为删除,这种情况下消费者挂掉了正在处理的消息就会丢失,为了保证消息在发送的过程中不会丢失,RabbitMQ引入了应答机制,即在消费者接收并处理了该条消息后告诉RabbitMQ它已经把该条消息处理了,RabbitMQ可以把这条消息删除了。

1、自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,这种模式下万一消费者的连接或信道关闭,消息就丢失了,不过这种模式对传递的消息数量没有限制,但如果消息太多太大,消费者来不及消费,也可能出现消息的堆积导致内存耗尽,最终消费者程序被操作系统杀死的情况,所以这种模式只能在消费者可以高效的、高速率的处理消息的前提下使用。

2、手动应答

以下方法用于手动应答

(1) channel.basicAck()(用于肯定确认,即向RabbitMQ表示该消息已经发送并处理成功了,可以将其丢弃)

(2)channel.basicNack()(用于否定确认,即不处理该信息直接丢弃)

(3)channel.basicReject()(用于否定确认,即不处理该信息直接丢弃,比basicNack方法少一个Multiple参数)

3、Multiple参数解释

channel.basicNack(deliveryTag,true)(第二个参数就是Multiple参数)

multiple的true和false的区别:

(1)true表示批量应答channel上未应答的消息,比如channel上有传送tag为5,6,7,8的消息,当前tag是8,那么此时5-8还未应答的消息就会被确认收到消息应答,但如果处理6或7消息失败了,5也会被应答,导致5消息丢失,所以一般情况下multiple为false。

(2)false表示只会应答tag=8的消息,5,6,7这三个消息依然不会被确认收到消息应答

4、消息重新入队

如果消费者由于某些原因失去连接,导致消费者未成功发送ACK确认应答,RabbitMQ将会对未完全处理完的消息重新入队,如果其他消费者可以处理,则该消息将被分配到另一个消费者,从而保证消息未丢失。

5、在utils包下新建一个名为SleepUtils的类,该类的方法能让线程睡眠指定的时间,用于模拟业务的处理时间,代码如下

package com.ken.utils;

/**

* 睡眠工具类,用于模拟执行业务时间的长短

*/

public class SleepUtils {

public static void sleep(int second) {

try {

Thread.sleep(1000 * second);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

效果图:

6、使用代码实现消息手动应答,为此先新建一个名为ack的包,用于装消息手动应答的代码

效果图:

7、新建一个名为Task02的类,用作充当生产者,代码如下

注:RabbitMqUtils工具类的实现在我的另一篇文章里,有需要的同学可以查看参考

https://blog.csdn.net/m0_64284147/article/details/129465871

package com.ken.ack;

import com.ken.utils.RabbitMqUtils;

import com.rabbitmq.client.Channel;

import java.util.Scanner;

public class Task02 {

//队列名称(用于指定往哪个队列接收消息)

public static final String QUEUE_NAME = "ack_queue";

public static void main(String[] args) throws Exception{

Channel channel = RabbitMqUtils.getChannel();

/**

* 创建队列

* 第一个参数:队列名称

* 第二个参数:服务器重启后队列是否还存在,即队列是否持久化,true为是,false为否,默认false,即消息存储在内存中而不是硬盘中

* 第三个参数:该队列是否只供一个消费者进行消费,是否进行消息共享,true为只允许一个消费者进行消费,false为允许多个消费者对队列进行消费,默认false

* 第四个参数:是否自动删除,最后一个消费者断开连接后该队列是否自动删除,true自动删除,false不自动删除

* 第五个参数:其他参数

*/

channel.queueDeclare(QUEUE_NAME,false,false,false,null);

//从控制台读取要发送的信息

Scanner scanner = new Scanner(System.in);

while (scanner.hasNext()) {

String message = scanner.next();

/**

* 用信道对消息进行发布

* 第一个参数:发送到哪个交换机

* 第二个参数:路由的Key值是哪个,本次是队列名

* 第三个参数:其他参数信息

* 第四个参数:发送消息的消息体

*/

channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

System.out.println("消息发送成功:" + message);

}

}

}

效果图:

8、新建一个名为Worker03的类,用作充当消费者一号,代码如下

package com.ken.ack;

import com.ken.utils.RabbitMqUtils;

import com.ken.utils.SleepUtils;

import com.rabbitmq.client.CancelCallback;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

/**

* 手动应答的第一个消费者

*/

public class Worker03 {

//队列名称(用于指定往哪个队列接收消息)

public static final String QUEUE_NAME = "ack_queue";

//进行接收操作

public static void main(String[] args) throws Exception{

//通过工具类获取信道

Channel channel = RabbitMqUtils.getChannel();

/**

* 声明消费者接收消息后的回调方法(由于回调方法DeliverCallback是函数式接口,所以需要给DeliverCallback赋值一个函数,为了方便我们这里使用Lambda表达式进行赋值)

* 为什么要这样写呢,是因为basicConsume方法里的参数deliverCallback的类型DeliverCallback用 @FunctionalInterface注解规定DeliverCallback是一个函数式接口,所以要往deliverCallback参数传的值要是一个函数

*

* 以下是DeliverCallback接口的源代码

* @FunctionalInterface

* public interface DeliverCallback {

* void handle (String consumerTag, Delivery message) throws IOException;

* }

*/

DeliverCallback deliverCallback = (consumerTag, message) -> {

//沉睡1S,用于模拟业务处理需要1S的时间

SleepUtils.sleep(1);

System.out.println("接收的消息:" + new String(message.getBody()));

/**

* 手动应答

* 第一个参数:表示消息的标记Tag(每个消息都有标记Tag)

* 第二个参数:是否批量应答,true表示批量,false表示不批量

*/

channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

};

/**

* 声明消费者取消接收消息后的回调方法(由于回调方法CancelCallback是函数式接口,所以需要给CancelCallback赋值一个函数,为了方便我们这里使用Lambda表达式进行赋值)

* 为什么要这样写呢,是因为basicConsume方法里的参数cancelCallback的类型CancelCallback用 @FunctionalInterface注解规定CancelCallback是一个函数式接口,所以要往cancelCallback参数传的值要是一个函数

*

* @FunctionalInterface

* public interface CancelCallback {

* void handle (String consumerTag) throws IOException;

* }

*

*/

CancelCallback cancelCallback = consumerTag -> {

System.out.println("取消消费消息:" + consumerTag);

};

/**

* 用信道对消息进行接收(采用手动应答)

* 第一个参数:消费的是哪一个队列的消息

* 第二个参数:消费成功后是否要自动应答,true代表自动应当,false代表手动应答

* 第三个参数:消费者接收消息后的回调方法

* 第四个参数:消费者取消接收消息后的回调方法(正常接收不调用)

*/

System.out.println("Work03等待接收消息...");

channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);

}

}

效果图:

9、新建一个名为Worker04的类,用作充当消费者二号,代码如下

package com.ken.ack;

import com.ken.utils.RabbitMqUtils;

import com.ken.utils.SleepUtils;

import com.rabbitmq.client.CancelCallback;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.DeliverCallback;

/**

* 手动应答的第二个消费者

*/

public class Worker04 {

//队列名称(用于指定往哪个队列接收消息)

public static final String QUEUE_NAME = "ack_queue";

//进行接收操作

public static void main(String[] args) throws Exception{

//通过工具类获取信道

Channel channel = RabbitMqUtils.getChannel();

/**

* 声明消费者接收消息后的回调方法(由于回调方法DeliverCallback是函数式接口,所以需要给DeliverCallback赋值一个函数,为了方便我们这里使用Lambda表达式进行赋值)

* 为什么要这样写呢,是因为basicConsume方法里的参数deliverCallback的类型DeliverCallback用 @FunctionalInterface注解规定DeliverCallback是一个函数式接口,所以要往deliverCallback参数传的值要是一个函数

*

* 以下是DeliverCallback接口的源代码

* @FunctionalInterface

* public interface DeliverCallback {

* void handle (String consumerTag, Delivery message) throws IOException;

* }

*/

DeliverCallback deliverCallback = (consumerTag, message) -> {

//沉睡30S,用于模拟业务处理需要30S的时间

SleepUtils.sleep(30);

System.out.println("接收的消息:" + new String(message.getBody()));

/**

* 手动应答

* 第一个参数:表示消息的标记Tag(每个消息都有标记Tag)

* 第二个参数:是否批量应答,true表示批量,false表示不批量

*/

channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

};

/**

* 声明消费者取消接收消息后的回调方法(由于回调方法CancelCallback是函数式接口,所以需要给CancelCallback赋值一个函数,为了方便我们这里使用Lambda表达式进行赋值)

* 为什么要这样写呢,是因为basicConsume方法里的参数cancelCallback的类型CancelCallback用 @FunctionalInterface注解规定CancelCallback是一个函数式接口,所以要往cancelCallback参数传的值要是一个函数

*

* @FunctionalInterface

* public interface CancelCallback {

* void handle (String consumerTag) throws IOException;

* }

*

*/

CancelCallback cancelCallback = consumerTag -> {

System.out.println("取消消费消息:" + consumerTag);

};

/**

* 用信道对消息进行接收(采用手动应答)

* 第一个参数:消费的是哪一个队列的消息

* 第二个参数:消费成功后是否要自动应答,true代表自动应当,false代表手动应答

* 第三个参数:消费者接收消息后的回调方法

* 第四个参数:消费者取消接收消息后的回调方法(正常接收不调用)

*/

System.out.println("Work04等待接收消息...");

channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);

}

}

效果图:

10、分别先后启动Task02、Worker03、Worker04

例:

11、正常的在Task02输入消息,观察消息的被消费情况

(1)在Task02分别输入第一条和第二条消息

(2)等待1秒后第一条消息被Work03消费

(3)等待30秒后第二条消息被Work04消费

12、再次在Task02输入消息,然后手动暂停Worker04用以模拟Worker04消费者宕机的情况,观察消息的被消费情况

(1)在Task02分别输入第三条和第四条消息

(2)手动停掉Worker04,模拟Worker04宕机的情况

(3)Worker04宕机后没有成功消费掉第四条消息,然后没有对消息进行应答,导致第四条消息重新入队,然后被Worker03消费掉

参考阅读

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。