1、延迟队列

所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费

2、使用场景

1、订单在十分钟之内未支付则自动取消 2、预定会议后,需要在预定时间点前十分钟通知各个与会人员参加会议。 3、淘宝七天自动确认收货,自动评价功能等

3、TTL(消息存活时间)

TTL 是 RabbitMQ 中一个消息或者队列的属性 表示一条消息或是该队列中的所有消息的最大存活时间,单位是毫秒;目前有两种方法可以设置消息的 TTL。 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。 第二种方法是对消息本身进行单独设置,每条消息的 TTL 可以不同。 如果两种方法一起使用,则消息的过期时间以两者之间较小的那个数值为准。消息在队列中的生存时间一旦超过设置的 TTL 值时,就会变成“死信”。

当设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列则会被丢到死信队列中) 当设置了消息的 TTL 属性,那么消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意一点是,如果不设置 TTL,表示消息永远不会过期

4、队列 TTL

可以使用x-message-ttl参数设置当前队列中所有消息的过期时间,即当前队列中所有的消息过期时间都一样; 建两个队列 QA 和 QB,两个队列 TTL 分别设置为 10s 和 30s,然后再创建一个交换机 X 和 死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如上图所示。

4.1、yml配置文件

# rabbitmq配置

rabbitmq:

host: 127.0.0.1

port: 5672

username: guest

password: guest

listener:

simple:

concurrency: 1

max-concurrency: 1

acknowledge-mode: manual

prefetch: 1

4.2、配置类

import org.springframework.amqp.core.*;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;

@Configuration

public class TtlQueueConfig {

/**

* 普通交换机名称

*/

public static final String X_EXCHANGE = "X";

/**

* 死信交换机名称

*/

public static final String Y_DEAD_LETTER_EXCHANGE = "Y";

/**

* 普通队列名称

*/

public static final String QUEUE_A = "QA";

public static final String QUEUE_B = "QB";

/**

* 死信队列名称

*/

public static final String DEAD_LETTER_QUEUE = "QD";

/**

* 声明 XExchange

*/

@Bean

public DirectExchange xExchange(){

return new DirectExchange(X_EXCHANGE);

}

/**

* 声明 yExchange

*/

@Bean

public DirectExchange yExchange(){

return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);

}

/**

* 声明队列 QA

* 过期时间 10s

*/

@Bean

public Queue queueA(){

Map arguments = new HashMap<>(3);

// 设置死信交换机

arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);

// 设置死信路由键

arguments.put("x-dead-letter-routing-key", "YD");

// 设置过期时间

arguments.put("x-message-ttl", 10000);

return new Queue(QUEUE_A, true, false, false, arguments);

}

/**

* 声明队列 QB

* 过期时间 40s

*/

@Bean

public Queue queueB(){

Map arguments = new HashMap<>(3);

// 设置死信交换机

arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);

// 设置死信路由键

arguments.put("x-dead-letter-routing-key", "YD");

// 设置过期时间

arguments.put("x-message-ttl", 40000);

return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();

}

/**

* 死信队列QD

*/

@Bean

public Queue queueD(){

return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();

}

/**

* 队列 A 与 X交换机绑定

*/

@Bean

public Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){

return BindingBuilder.bind(queueA).to(xExchange).with("XA");

}

/**

* 队列 B 与 X交换机绑定

*/

@Bean

public Binding queueBBindingX(){

return new Binding(QUEUE_B, Binding.DestinationType.QUEUE, X_EXCHANGE, "XB", null);

}

/**

* 队列 D 与 y交换机(死信交换机)绑定

*/

@Bean

public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){

return BindingBuilder.bind(queueD).to(yExchange).with("YD");

}

}

4.3、生产者

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.PathVariable;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@RestController

@RequestMapping("/ttl")

public class SendMsgController {

@Autowired

private RabbitTemplate rabbitTemplate;

@RequestMapping("/sendMsg/{message}")

public String sendMsg(@PathVariable String message){

System.out.println("当前时间:"+ new Date() +"发送一条消息"+ message +"给两个队列");

rabbitTemplate.convertAndSend("X", "XA", "消息来自TTL为10s队列QA:"+message);

rabbitTemplate.convertAndSend("X", "XB", "消息来自TTL为40s队列QB:"+message);

return "发送成功";

}

}

4.4、消费者

import com.rabbitmq.client.Channel;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

import java.io.IOException;

import java.util.Date;

@Component

public class DeadLetterConsumer {

@RabbitListener(queues = "QD")

public void receiveD(String msgData, Message message, Channel channel) throws IOException {

//手动确认

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

System.out.println("当前时间"+ new Date() +",收到死信队列的消息:"+msgData);

}

}

4.5、结果

QA消息在 10s 后变成了死信消息,然后被消费者消费掉了, QB消息在 30s 后变成了死信消息,然后被消费掉,这样一个延时队列就完成了。

5、消息 TTL

单独给某条消息设置ttl(生产环境中居多) 可以使用Expiration参数来设置单个消息的过期时间。当时间一到就会被移出队列。 一次设置中我们同时设置了两种消息过期方式和的话,时间短的会优先触发,比如,我们给某个队列设置过期时间为10秒,所有发到这个队列的消息10秒内未被消费都会过期,我们同时又给某个消息设置过期时间为5秒,则在消息发送到队列中5秒后就会过期。 新增了一个队列 QC,该队列不设置 TTL 时间,绑定关系如上图所示

5.1、配置类新增代码

public static final String QUEUE_C = "QC";

/**

* 死信队列 QC

*/

@Bean

public Queue queueC(){

Map arguments = new HashMap<>(2);

// 声明当前队列绑定的死信交换机

arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);

// 声明当前队列的死信路由key

arguments.put("x-dead-letter-routing-key", "YD");

return new Queue(QUEUE_C, false, false, false, arguments);

}

/**

* 队列 c 与 X交换机绑定

*/

@Bean

public Binding queueCBindingX(){

return new Binding(QUEUE_C, Binding.DestinationType.QUEUE, X_EXCHANGE, "XC", null);

}

5.2、生产者新增方法

@RequestMapping("/sendMsg1/{message}/{ttlTime}")

public String sendMsg1(@PathVariable String message, @PathVariable String ttlTime){

MessagePostProcessor messagePostProcessor = message1 -> {

message1.getMessageProperties().setExpiration(ttlTime);

return message1;

};

System.out.println("当前时间:"+ new Date() +"发送一条消息("+ ttlTime +"毫秒过期)给QC队列:"+message);

rabbitTemplate.convertAndSend("X", "XC", "消息来自队列QC:"+message, messagePostProcessor);

return "发送成功";

}

5.3、测试结果

两条消息的过期时间不一致,过期时间短的那条消息,在过期时间到了以后并没有立即被消费,而是和过期时间长的那条消息一起被消费了。 所以,如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡”,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先被执行。

6、Rabbitmq 插件实现延迟队列

rabbitMq版本 3.10.9 Erlang 25.0 延时消息交换机插件 3.10.2

6.1、rabbitmq安装参考文章 https://blog.csdn.net/weixin_45486926/article/details/127170831?spm=1001.2014.3001.5502

6.2、延时消息交换机插件下载地址 https://www.rabbitmq.com/community-plugins.html

6.3、开启延时消息交换机插件

将插件放到RabbitMQ安装目录的plugins文件中 在RabbitMQ 安装目的sbin用cmd使用如下命令安装插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

开启插件后,启动RabbitMQ,访问登录后访问http://localhost:15672,用guest/guest登录后 在交换机exchanges的tab下,底部新增将看到下图,则表示插件已启动,就可以使用了。

6.4、延时插件实现延时队列

创建一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如上图所示

6.4.1、配置类

import org.springframework.amqp.core.*;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;

@Configuration

public class DelayedConfig {

/**

* 延时交换机名称

*/

public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";

/**

* 普通队列名称

*/

public static final String DELAYED_QUEUE_NAME = "delayed.queue";

/**

* 延时路由key

*/

public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";

/**

* 死信队列QD

*/

@Bean

public Queue delayedQueue(){

return new Queue(DELAYED_QUEUE_NAME, false, false, false, null);

}

/**

* 定义延迟交换机

* 需要死信交换机和死信队列,支持消息延迟投递,消息投递之后没有到达投递时间,是不会投递给队列

* 而是存储在一个分布式表,当投递时间到达,才会投递到目标队列

* @return

*/

@Bean

public CustomExchange delayedExchange(){

Map args = new HashMap<>(1);

args.put("x-delayed-type", "direct");//交换机的类型

return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);

}

/**

* 队列与交换机绑定

*/

@Bean

public Binding queueABindingX(Queue delayedQueue, CustomExchange delayedExchange){

return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();

}

}

6.4.2、生产者

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.PathVariable;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@Slf4j

@RestController

@RequestMapping("/delayed")

public class SendMessageController {

@Autowired

private RabbitTemplate rabbitTemplate;

public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";

public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";

@RequestMapping("/sendMsg/{message}/{delayTime}")

public String sendMsg(@PathVariable String message, @PathVariable Integer delayTime){

rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, messagePostProcessor ->{

messagePostProcessor.getMessageProperties().setDelay(delayTime);

return messagePostProcessor;

});

log.info("当前时间:{},发送一条延迟{}毫秒的信息给队列delay.queue:{}", new Date(), delayTime, message);

return "发送成功";

}

}

6.4.3、消费者

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

import java.io.IOException;

import java.util.Date;

@Slf4j

@Component

public class DeadLetterConsumer {

public static final String DELAYED_QUEUE_NAME = "delayed.queue";

@RabbitListener(queues = DELAYED_QUEUE_NAME)

public void receiveDelayedQueue(Message message, Channel channel) throws IOException {

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//手动消息确认

String msg = new String(message.getBody());

log.info("当前时间:{},收到延时队列的消息:{}", new Date(), msg);

}

}

6.4.4、测试结果

延时短的消息被先消费掉了,符合预期结果

好文阅读

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。