应用场景:

异步处理。把消息放入消息中间件中,等到需要的时候再去处理。

流量削峰 例如秒杀活动,在短时间内访问量急剧增加,使用消息队列,当消息队列满了就拒绝响应,跳转到错误页面,这样就可以使得系统不会因为超负载而崩溃

安装rabbitMQ

#拉取镜像

docker pull rabbitmq:3.8-management

#创建容器启动

docker run -d --restart=always -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:3.8-management

管理后台:http://IP:15672

搭建rabbit_util 模块 引入依赖

org.springframework.cloud

spring-cloud-starter-bus-amqp

添加service---就是对RabbitTemplate的一个封装,可以不封装直接使用RabbitTemplate import org.springframework.amqp.AmqpException;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessagePostProcessor;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

@Service

public class RabbitService {

// 引入操作rabbitmq 的模板

@Autowired

private RabbitTemplate rabbitTemplate;

/**

* 发送消息

* @param exchange 交换机

* @param routingKey 路由键

* @param message 消息

* @return

*/

public boolean sendMessage(String exchange,String routingKey, Object message){

// 调用发送数据的方法

rabbitTemplate.convertAndSend(exchange,routingKey,message);

return true;

}

/**

* 发送延迟消息的方法

* @param exchange 交换机

* @param routingKey 路由键

* @param message 消息内容

* @param delayTime 延迟时间

* @return

*/

public boolean sendDelayMessage(String exchange,String routingKey, Object message, int delayTime){

// 在发送消息的时候设置延迟时间

rabbitTemplate.convertAndSend(exchange, routingKey, message, new MessagePostProcessor() {

@Override

public Message postProcessMessage(Message message) throws AmqpException {

// 设置一个延迟时间

message.getMessageProperties().setDelay(delayTime*1000);

return message;

}

});

return true;

}

}

 配置mq消息转换器

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;

import org.springframework.amqp.support.converter.MessageConverter;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class MQConfig {

@Bean

public MessageConverter messageConverter(){

return new Jackson2JsonMessageConverter();

}

}

说明:默认是字符串转换器

添加消息的确认配置

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component

public class MQProducerAckConfig implements RabbitTemplate.ReturnCallback,RabbitTemplate.ConfirmCallback {

// 我们发送消息使用的是 private RabbitTemplate rabbitTemplate; 对象

// 如果不做设置的话 当前的rabbitTemplate 与当前的配置类没有任何关系!

@Autowired

private RabbitTemplate rabbitTemplate;

// 设置 表示修饰一个非静态的void方法,在服务器加载Servlet的时候运行。并且只执行一次!

@PostConstruct

public void init(){

rabbitTemplate.setReturnCallback(this);

rabbitTemplate.setConfirmCallback(this);

}

/**

* 表示消息是否正确发送到了交换机上

* @param correlationData 消息的载体

* @param ack 判断是否发送到交换机上

* @param cause 原因

*/

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

if(ack){

System.out.println("消息发送成功!");

}else {

System.out.println("消息发送失败!"+cause);

}

}

/**

* 消息如果没有正确发送到队列中,则会走这个方法!如果消息被正常处理,则这个方法不会走!

* @param message

* @param replyCode

* @param replyText

* @param exchange

* @param routingKey

*/

@Override

public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

System.out.println("消息主体: " + new String(message.getBody()));

System.out.println("应答码: " + replyCode);

System.out.println("描述:" + replyText);

System.out.println("消息使用的交换器 exchange : " + exchange);

System.out.println("消息使用的路由键 routing : " + routingKey);

}

}

推荐文章

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。