文章目录

前言1、RabbitMQ延迟队列1.1、方式1:RabbitMQ通过死信机制来实现延迟队列的功能1.2、方式二:安装延迟队列插件1.2.1、安装延迟队列插件

2、消息确认机制2.1、生产确认2.2、消费确认

前言

实际业务中,例如秒杀系统,秒杀商品成功会有截止时间,这时需要用到RabbitMQ延迟服务。

1、RabbitMQ延迟队列

1.1、方式1:RabbitMQ通过死信机制来实现延迟队列的功能

TTL ,即 Time-To-Live,存活时间,消息和队列都可以设置存活时间Dead Letter,即死信,若给消息设置了存活时间,当超过存活时间后消息还没有被消费,则该消息变成了死信Dead Letter Exchanges(DLX),即死信交换机Dead Letter Routing Key (DLK),死信路由键

/***********************延迟队列*************************/

//创建立即消费队列

@Bean

public Queue immediateQueue(){

return new Queue("immediateQueue");

}

//创建立即消费交换机

@Bean

public DirectExchange immediateExchange(){

return new DirectExchange("immediateExchange");

}

@Bean

public Binding bindingImmediate(@Qualifier("immediateQueue") Queue queue,@Qualifier("immediateExchange") DirectExchange directExchange){

return BindingBuilder.bind(queue).to(directExchange).with("immediateRoutingKey");

}

//创建延迟队列

@Bean

public Queue delayQueue(){

Map params = new HashMap<>();

//死信队列转发的死信转发到立即处理信息的交换机

params.put("x-dead-letter-exchange","immediateExchange");

//死信转化携带的routing-key

params.put("x-dead-letter-routing-key","immediateRoutingKey");

//设置消息过期时间,单位:毫秒

params.put("x-message-ttl",60 * 1000);

return new Queue("delayQueue",true,false,false,params);

}

@Bean

public DirectExchange delayExchange(){

return new DirectExchange("delayExchange");

}

@Bean

public Binding bindingDelay(@Qualifier("delayQueue") Queue queue,@Qualifier("delayExchange") DirectExchange directExchange){

return BindingBuilder.bind(queue).to(directExchange).with("delayRoutingKey");

}

@Test

public void sendDelay(){

this.rabbitTemplate.convertAndSend("delayExchange","delayRoutingKey","Hello world topic");

}

1.2、方式二:安装延迟队列插件

1.2.1、安装延迟队列插件

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez下载解压,到plugins目录,执行以下的命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

/**************延迟队列一个单一queue*******************/

@Bean

public Queue delayNewQueue(){

return new Queue("delayNewQueue");

}

@Bean

public CustomExchange delayNewExchange(){

Map args = new HashMap<>();

// 设置类型,可以为fanout、direct、topic

args.put("x-delayed-type", "direct");

return new CustomExchange("delayNewExchange","x-delayed-message", true,false,args);

}

@Bean

public Binding bindingNewDelay(@Qualifier("delayNewQueue") Queue queue,@Qualifier("delayNewExchange") CustomExchange customExchange){

return BindingBuilder.bind(queue).to(customExchange).with("delayNewRoutingKey").noargs();

}

@Test

public void sendDelay() {

//生产端写完了

UserInfo userInfo = new UserInfo();

userInfo.setPassword("13432432");

userInfo.setUserAccount("tiger");

this.rabbitTemplate.convertAndSend("delayNewExchange", "delayNewRoutingKey", userInfo

, a -> {

//单位毫秒

a.getMessageProperties().setDelay(30000);

return a;

});

}

2、消息确认机制

消息确认分为两部分: 生产确认 和 消费确认。生产确认: 生产者生产消息后,将消息发送到交换机,触发确认回调;交换机将消息转发到绑定队列,若失败则触发返回回调。消费确认: 默认情况下消息被消费者从队列中获取后即发送确认,不管消费者处理消息时是否失败,不需要额外代码,但是不能保证消息被正确消费。我们增加手动确认,则需要代码中明确进行消息确认。

2.1、生产确认

@Bean

public RabbitTemplate getTemplate(ConnectionFactory connectionFactory){

RabbitTemplate template = new RabbitTemplate(connectionFactory);

//消息发送到交换器Exchange后触发回调

template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

// 可以进行消息入库操作

log.info("消息唯一标识 correlationData = {}", correlationData);

log.info("确认结果 ack = {}", ack);

log.info("失败原因 cause = {}", cause);

}

});

// 配置这个,下面的ReturnCallback 才会起作用

template.setMandatory(true);

// 如果消息从交换器发送到对应队列失败时触发(比如 根据发送消息时指定的routingKey找不到队列时会触发)

template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {

@Override

public void returnedMessage(ReturnedMessage returnedMessage) {

// 可以进行消息入库操作

log.info("消息主体 message = {}", returnedMessage.getMessage());

log.info("回复码 replyCode = {}", returnedMessage.getReplyCode());

log.info("回复描述 replyText = {}", returnedMessage.getReplyText());

log.info("交换机名字 exchange = {}", returnedMessage.getExchange());

log.info("路由键 routingKey = {}", returnedMessage.getRoutingKey());

}

});

return template;

}

spring:

cloud:

nacos:

discovery:

server-addr: localhost:8848

application:

name: user-service #微服务名称

datasource:

username: root

password: root

url: jdbc:mysql://127.0.0.1:3306/drp

driver-class-name: com.mysql.cj.jdbc.Driver

rabbitmq:

host: 127.0.0.1

port: 5672

username: tiger

password: tiger

virtual-host: tiger_vh

# 确认消息已发送到交换机(Exchange)

publisher-confirm-type: correlated

# 确认消息已发送到队列

publisher-returns: true

listener:

simple:

acknowledge-mode: manual # 开启消息消费手动确认

retry:

enabled: true

2.2、消费确认

@RabbitHandler

public void process(UserInfo data, Message message, Channel channel){

log.info("收到directQueue队列信息:" + data);

long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {

//成功消费确认

channel.basicAck(deliveryTag,true);

log.info("消费成功确认完毕。。。。。");

} catch (IOException e) {

log.error("确认消息时抛出异常 ", e);

// 重新确认,成功确认消息

try {

Thread.sleep(50);

channel.basicAck(deliveryTag, true);

} catch (IOException | InterruptedException e1) {

log.error("确认消息时抛出异常 ", e);

// 可以考虑入库

}

}catch (Exception e){

log.error("业务处理失败", e);

try {

// 失败确认

channel.basicNack(deliveryTag, false, false);

} catch (IOException e1) {

log.error("消息失败确认失败", e1);

}

}

}

相关阅读

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。