1、背景
以rabbitMq作为消息队列作为服务框架中的异步、削峰、解耦。以下是使用rabbitmq的代码示例,仅供参考学习。
rabbitmq的5种消息模式:
简单模式(Simple Mode):最简单的消息模式,它包含一个生产者、一个消费者和一个队列。生产者向队列里发送消息,消费者从队列中获取消息并消费。工作队列模式(Work Queue Mode):指向多个互相竞争的消费者发送消息的模式,它包含一个生产者、多个消费者和一个队列。当消费者获取消息处理耗时任务时,空闲的消费者从队列中获取并消费消息,每个消息只能被一个消费者处理,可以实现任务的并行处理。发布/订阅模式(Publish/Subscribe Mode):指同时向多个消费者发送消息的模式(类似广播的形式),它包含一个生产者、多个消费者、多个队列和一个交换机。每个消费者都可以从自己的队列中获取消息并处理,实现一对多的消息通信。路由模式(Routing Mode):可以根据路由键选择性给多个消费者发送消息的模式,它包含一个生产者、多个消费者、多个队列和一个交换机。交换机根据消息的路由键将消息发送到绑定到交换器上的特定队列,消费者根据自己关心的路由键绑定到队列上,只接收符合自己关心的路由键的消息。主题模式(Topic Mode):可以根据路由键匹配规则选择性给多个消费者发送消息的模式,它包含一个生产者、多个消费者、多个队列和一个交换机。交换机根据消息的路由键和通配符模式将消息发送到符合匹配规则的队列,消费者根据自己关心的通配符模式绑定到队列上,只接收符合自己关心的消息。
常用的基本是后面3种模式。
2、项目搭建
基于springcloud Hoxton.SR8、springboot 2.3.2.RELEASE、springcloud-alibaba 2.2.4.RELEASE。
2.1 创建父工程
搭建maven项目,名称可以自取,把pom.xml内容拷贝过去即可。
pom.xml
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
2.2 生产者
搭建模块maven项目,名称可以自取,把pom.xml内容拷贝过去即可。
pom.xml
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
application.yml
# 服务端口
server:
port: 8081
# 配置rabbitmq服务,以下配置其实spring-starter已经有默认的配置了
# 所以本地服务的时候配置不配置都可以
spring:
rabbitmq:
username: guest
password: guest
virtual-host: /
host: 127.0.0.1
port: 5672
启动类
package com.mcode.mq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @description:
* @author: xie
* @create: 2021-09-05 12:11
**/
@SpringBootApplication
public class MqProducerApplication {
public static void main(String[] args) {
SpringApplication.run(MqProducerApplication.class,args);
}
}
mq配置(这里采用编码方式)
(1)路由模式
package com.mcode.mq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @description:
* @author: xiebq
* @create: 2021-09-05 12:51
**/
@Configuration
public class DirectRabbitmqConfig {
//1:声明注册交换
@Bean
public DirectExchange directExchange(){
return new DirectExchange("direct-order-exchange",true,false);
}
//2:声明队列
@Bean
public Queue orderDirectQueue(){
return new Queue("order.direct.queue",true);
}
@Bean
public Queue smsDirectQueue(){
return new Queue("sms.direct.queue",true);
}
@Bean
public Queue emailDirectQueue(){
return new Queue("email.direct.queue",true);
}
//3:队列绑定交换机
@Bean
public Binding orderDirectBinding(){
return BindingBuilder.bind(orderDirectQueue()).to(directExchange()).with("order");
}
@Bean
public Binding smsDirectBinding(){
return BindingBuilder.bind(smsDirectQueue()).to(directExchange()).with("sms");
}
@Bean
public Binding emailDirectBinding(){
return BindingBuilder.bind(emailDirectQueue()).to(directExchange()).with("email");
}
}
(2)广播模式
package com.mcode.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @description:
* @author: xie
* @create: 2021-09-05 12:51
**/
@Configuration
public class FanoutRabbitmqConfig {
//1:声明注册交换
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanout-order-exchange",true,false);
}
//2:声明队列
@Bean
public Queue orderQueue(){
return new Queue("order.fanout.queue",true);
}
@Bean
public Queue smsQueue(){
return new Queue("sms.fanout.queue",true);
}
@Bean
public Queue emailQueue(){
return new Queue("email.fanout.queue",true);
}
//3:队列绑定交换机
@Bean
public Binding orderBinding(){
return BindingBuilder.bind(orderQueue()).to(fanoutExchange());
}
@Bean
public Binding smsBinding(){
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}
@Bean
public Binding emailBinding(){
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}
}
(3)过期队列
package com.mcode.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @description:
* @author: xie
* @create: 2021-09-05 12:51
* 过期时间
**/
@Configuration
public class TTLRabbitmqConfig {
//1:声明注册交换
@Bean
public DirectExchange directTtlExchange(){
return new DirectExchange("ttl-direct-exchange",true,false);
}
//2:声明队列
@Bean
public Queue directTtlQueue(){
//设置过期时间
Map
args.put("x-message-ttl",5000);
//配置死信队列
args.put("x-dead-letter-exchange","dead-direct-exchange");
args.put("x-dead-letter-routing-key","dead");
return new Queue("ttl.direct.queue",true,false,false,args);
}
@Bean
public Queue directTtlMsgQueue(){
//设置过期时间
Map
args.put("x-message-ttl",5000);
//配置死信队列
args.put("x-dead-letter-exchange","dead-direct-exchange");
args.put("x-dead-letter-routing-key","dead");
return new Queue("ttl.msg.direct.queue",true,false,false,args);
}
//3:队列绑定交换机
@Bean
public Binding directTtlBinding(){
return BindingBuilder.bind(directTtlQueue()).to(directTtlExchange()).with("ttl");
}
@Bean
public Binding directTtlMsgBinding(){
return BindingBuilder.bind(directTtlMsgQueue()).to(directTtlExchange()).with("ttlmsg");
}
}
package com.mcode.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @description:
* @author: xie
* @create: 2021-09-05 12:51
* 死信队列配置
**/
@Configuration
public class DeadRabbitmqConfig {
//1:声明注册交换
@Bean
public DirectExchange deadExchange(){
return new DirectExchange("dead-direct-exchange",true,false);
}
//2:声明队列
@Bean
public Queue deadQueue(){
return new Queue("dead.direct.queue",true);
}
//3:队列绑定交换机
@Bean
public Binding deadBinding(){
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
}
}
发送消息
package com.mcode.mq.service;
import cn.hutool.core.lang.Snowflake;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author: xie
* @create: 2021-09-05 12:37
**/
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* @Description: 生成订单
* @Param:
* @return:
* @Author: xiebq
* @Date: 2021/9/5
*/
public void makeOrder(String userId,String productId,int num){
//1:根据商品Id查询库存是否充足
//2:保存订单
Snowflake snowflake = new Snowflake(2L,1L);
String orderId = snowflake.nextIdStr();
System.out.println("生成订单成功: " + orderId);
//3:通过mq完后曾消息得分发
/**
* 参数说明
* @param1:交换机
* @param2:路由Key或者队列名称
* @param3:消息内容
*/
//************************** fanout ***********************************//
String exchangeName = "fanout-order-exchange";
String routingKey = "";
rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);
//************************** direct ***********************************//
String directExchangeName = "direct-order-exchange";
String directRoutingKey_order = "order";
String directRoutingKey_sms = "order";
rabbitTemplate.convertAndSend(directExchangeName,directRoutingKey_order,orderId);
rabbitTemplate.convertAndSend(directExchangeName,directRoutingKey_sms,orderId);
//************************** topic ***********************************//
String topicExchangeName = "topic-order-exchange";
String topicRoutingKey_order = "com.order.msg";
String topicRoutingKey_email = "com.email.msg";
String topicRoutingKey_sms = "com.sms.msg";
rabbitTemplate.convertAndSend(topicExchangeName,topicRoutingKey_order,orderId);
rabbitTemplate.convertAndSend(topicExchangeName,topicRoutingKey_email,orderId);
}
public void makeOrderTtl(String userId,String productId,int num){
Snowflake snowflake = new Snowflake(2L,1L);
String orderId = snowflake.nextIdStr();
System.out.println("生成订单成功: " + orderId);
//3:通过mq完后曾消息得分发
//************************** direct ***********************************//
String directTtlExchangeName = "ttl-direct-exchange";
String directTtlRoutingKey_order = "ttl";
rabbitTemplate.convertAndSend(directTtlExchangeName,directTtlRoutingKey_order,orderId);
}
public void makeOrderTtlMessage(String userId,String productId,int num){
Snowflake snowflake = new Snowflake(2L,1L);
String orderId = snowflake.nextIdStr();
System.out.println("生成订单成功: " + orderId);
//3:通过mq完后曾消息得分发
//************************** direct ***********************************//
String directTtlExchangeName = "ttl-direct-exchange";
String directTtlRoutingKey_order = "ttlmsg";
MessagePostProcessor postProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
rabbitTemplate.convertAndSend(directTtlExchangeName,directTtlRoutingKey_order,orderId,postProcessor);
}
}
以上是生产者的基本核心代码。
2.3 消费者
pom.xml
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
application.yml
# 服务端口
server:
port: 9081
# 配置rabbitmq服务,以下配置其实spring-starter已经有默认的配置了
# 所以本地服务的时候配置不配置都可以
spring:
rabbitmq:
username: guest
password: guest
virtual-host: /
host: 127.0.0.1
port: 5672
启动类
package com.mcode.mq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author: xie
* @date: 2021-09-06
**/
@SpringBootApplication
public class MqConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(MqConsumerApplication.class,args);
}
}
配置类(编码方式)
package com.mcode.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @description:
* @author: xie
* @create: 2021-09-05 12:51
* 队列的声明定义在生产者或者消费者都可以,但是相对来说还是定义在消费者这边比较好
**/
@Configuration
public class DirectRabbitmqConfig {
//1:声明注册交换
@Bean
public DirectExchange directExchange(){
return new DirectExchange("direct-order-exchange",true,false);
}
//2:声明队列
@Bean
public Queue orderDirectQueue(){
return new Queue("order.direct.queue",true);
}
@Bean
public Queue smsDirectQueue(){
return new Queue("sms.direct.queue",true);
}
@Bean
public Queue emailDirectQueue(){
return new Queue("email.direct.queue",true);
}
//3:队列绑定交换机
@Bean
public Binding orderDirectBinding(){
return BindingBuilder.bind(orderDirectQueue()).to(directExchange()).with("order");
}
@Bean
public Binding smsDirectBinding(){
return BindingBuilder.bind(smsDirectQueue()).to(directExchange()).with("sms");
}
@Bean
public Binding emailDirectBinding(){
return BindingBuilder.bind(emailDirectQueue()).to(directExchange()).with("email");
}
}
package com.mcode.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @description:
* @author: xie
* @create: 2021-09-05 12:51
**/
@Configuration
public class FanoutRabbitmqConfig {
//1:声明注册交换
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanout-order-exchange",true,false);
}
//2:声明队列
@Bean
public Queue orderQueue(){
return new Queue("order.fanout.queue",true);
}
@Bean
public Queue smsQueue(){
return new Queue("sms.fanout.queue",true);
}
@Bean
public Queue emailQueue(){
return new Queue("email.fanout.queue",true);
}
//3:队列绑定交换机
@Bean
public Binding orderBinding(){
return BindingBuilder.bind(orderQueue()).to(fanoutExchange());
}
@Bean
public Binding smsBinding(){
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}
@Bean
public Binding emailBinding(){
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}
}
package com.mcode.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @description:
* @author: xie
* @create: 2021-09-05 12:51
* 过期时间
**/
@Configuration
public class TTLRabbitmqConfig {
//1:声明注册交换
@Bean
public DirectExchange directTtlExchange(){
return new DirectExchange("ttl-direct-exchange",true,false);
}
//2:声明队列
@Bean
public Queue directTtlQueue(){
//设置过期时间
Map
args.put("x-message-ttl",5000);
return new Queue("ttl.direct.queue",true,false,false,args);
}
@Bean
public Queue directTtlMsgQueue(){
//设置过期时间
Map
args.put("x-message-ttl",5000);
return new Queue("ttl.msg.direct.queue",true,false,false,args);
}
//3:队列绑定交换机
@Bean
public Binding directTtlBinding(){
return BindingBuilder.bind(directTtlQueue()).to(directTtlExchange()).with("ttl");
}
@Bean
public Binding directTtlMsgBinding(){
return BindingBuilder.bind(directTtlMsgQueue()).to(directTtlExchange()).with("ttlmsg");
}
}
消费代码
主题模式
package com.mcode.mq.service.topic;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
* @author: xie
* @date: 2021-09-06
**/
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "order.topic.queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "topic-order-exchange",type = ExchangeTypes.TOPIC),
key = "#.order.#"
))
public class TopicOrderConsumer {
@RabbitHandler
public void reviceMessage(String message){
System.out.println("order-topic --- 接收到订单信息:" + message);
}
}
发布订阅模式
package com.mcode.mq.service.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author: xie
* @date: 2021-09-06
**/
@Component
@RabbitListener(queues = {"order.direct.queue"})
public class DirectOrderConsumer {
@RabbitHandler
public void reviceMessage(String message){
System.out.println("order-direct --- 接收到订单信息:" + message);
}
}
广播模式
package com.mcode.mq.service.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author: xiebq
* @date: 2021-09-06
**/
@Component
@RabbitListener(queues = {"order.fanout.queue"})
public class FanoutOrderConsumer {
@RabbitHandler
public void reviceMessage(String message){
System.out.println("order-fanout --- 接收到订单信息:" + message);
}
}
以上是消费者的核心代码。
3、参考代码
页面顶部即可下载。
相关文章
发表评论