1、背景

以rabbitMq作为消息队列作为服务框架中的异步、削峰、解耦。以下是使用rabbitmq的代码示例,仅供参考学习。

rabbitmq的5种消息模式:

简单模式(Simple Mode):最简单的消息模式,它包含一个生产者、一个消费者和一个队列。生产者向队列里发送消息,消费者从队列中获取消息并消费。工作队列模式(Work Queue Mode):指向多个互相竞争的消费者发送消息的模式,它包含一个生产者、多个消费者和一个队列。当消费者获取消息处理耗时任务时,空闲的消费者从队列中获取并消费消息,每个消息只能被一个消费者处理,可以实现任务的并行处理。发布/订阅模式(Publish/Subscribe Mode):指同时向多个消费者发送消息的模式(类似广播的形式),它包含一个生产者、多个消费者、多个队列和一个交换机。每个消费者都可以从自己的队列中获取消息并处理,实现一对多的消息通信。路由模式(Routing Mode):可以根据路由键选择性给多个消费者发送消息的模式,它包含一个生产者、多个消费者、多个队列和一个交换机。交换机根据消息的路由键将消息发送到绑定到交换器上的特定队列,消费者根据自己关心的路由键绑定到队列上,只接收符合自己关心的路由键的消息。主题模式(Topic Mode):可以根据路由键匹配规则选择性给多个消费者发送消息的模式,它包含一个生产者、多个消费者、多个队列和一个交换机。交换机根据消息的路由键和通配符模式将消息发送到符合匹配规则的队列,消费者根据自己关心的通配符模式绑定到队列上,只接收符合自己关心的消息。

常用的基本是后面3种模式。

2、项目搭建

基于springcloud Hoxton.SR8、springboot 2.3.2.RELEASE、springcloud-alibaba 2.2.4.RELEASE。

2.1 创建父工程

搭建maven项目,名称可以自取,把pom.xml内容拷贝过去即可。

 pom.xml

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

mcode-rabbitmq

pom

rabbitmq-hello

rabbitmq-demo

springboot-order-mq-producer

springboot-order-mq-consumer

rabbitmq-seckill

UTF-8

1.8

1.8

1.1.10

Hoxton.SR8

2.3.2.RELEASE

2.2.4.RELEASE

1.2.35

3.6

1.0-SNAPSHOT

2.1.0

org.springframework.cloud

spring-cloud-dependencies

${spring-cloud.version}

pom

import

org.springframework.boot

spring-boot-dependencies

${springboot.version}

pom

import

com.alibaba.cloud

spring-cloud-alibaba-dependencies

${springcloudalibaba.version}

pom

import

com.alibaba

druid

${druid.version}

com.alibaba

fastjson

${fastjson.version}

org.apache.commons

commons-lang3

${commons-lang3.version}

com.mcode

seata-common

1.0-SNAPSHOT

org.mybatis.spring.boot

mybatis-spring-boot-starter

${mybatis.version}

cn.hutool

hutool-all

5.1.4

org.projectlombok

lombok

1.18.12

org.springframework.boot

spring-boot-maven-plugin

${springboot.version}

2.2 生产者

搭建模块maven项目,名称可以自取,把pom.xml内容拷贝过去即可。

pom.xml

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

mcode-rabbitmq

com.mcode

1.0-SNAPSHOT

4.0.0

springboot-order-mq-producer

org.springframework.boot

spring-boot-starter-amqp

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-test

application.yml

# 服务端口

server:

port: 8081

# 配置rabbitmq服务,以下配置其实spring-starter已经有默认的配置了

# 所以本地服务的时候配置不配置都可以

spring:

rabbitmq:

username: guest

password: guest

virtual-host: /

host: 127.0.0.1

port: 5672

启动类

package com.mcode.mq;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

/**

* @description:

* @author: xie

* @create: 2021-09-05 12:11

**/

@SpringBootApplication

public class MqProducerApplication {

public static void main(String[] args) {

SpringApplication.run(MqProducerApplication.class,args);

}

}

mq配置(这里采用编码方式)

(1)路由模式

package com.mcode.mq.config;

import org.springframework.amqp.core.*;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* @description:

* @author: xiebq

* @create: 2021-09-05 12:51

**/

@Configuration

public class DirectRabbitmqConfig {

//1:声明注册交换

@Bean

public DirectExchange directExchange(){

return new DirectExchange("direct-order-exchange",true,false);

}

//2:声明队列

@Bean

public Queue orderDirectQueue(){

return new Queue("order.direct.queue",true);

}

@Bean

public Queue smsDirectQueue(){

return new Queue("sms.direct.queue",true);

}

@Bean

public Queue emailDirectQueue(){

return new Queue("email.direct.queue",true);

}

//3:队列绑定交换机

@Bean

public Binding orderDirectBinding(){

return BindingBuilder.bind(orderDirectQueue()).to(directExchange()).with("order");

}

@Bean

public Binding smsDirectBinding(){

return BindingBuilder.bind(smsDirectQueue()).to(directExchange()).with("sms");

}

@Bean

public Binding emailDirectBinding(){

return BindingBuilder.bind(emailDirectQueue()).to(directExchange()).with("email");

}

}

(2)广播模式

package com.mcode.mq.config;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* @description:

* @author: xie

* @create: 2021-09-05 12:51

**/

@Configuration

public class FanoutRabbitmqConfig {

//1:声明注册交换

@Bean

public FanoutExchange fanoutExchange(){

return new FanoutExchange("fanout-order-exchange",true,false);

}

//2:声明队列

@Bean

public Queue orderQueue(){

return new Queue("order.fanout.queue",true);

}

@Bean

public Queue smsQueue(){

return new Queue("sms.fanout.queue",true);

}

@Bean

public Queue emailQueue(){

return new Queue("email.fanout.queue",true);

}

//3:队列绑定交换机

@Bean

public Binding orderBinding(){

return BindingBuilder.bind(orderQueue()).to(fanoutExchange());

}

@Bean

public Binding smsBinding(){

return BindingBuilder.bind(smsQueue()).to(fanoutExchange());

}

@Bean

public Binding emailBinding(){

return BindingBuilder.bind(emailQueue()).to(fanoutExchange());

}

}

(3)过期队列

package com.mcode.mq.config;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;

/**

* @description:

* @author: xie

* @create: 2021-09-05 12:51

* 过期时间

**/

@Configuration

public class TTLRabbitmqConfig {

//1:声明注册交换

@Bean

public DirectExchange directTtlExchange(){

return new DirectExchange("ttl-direct-exchange",true,false);

}

//2:声明队列

@Bean

public Queue directTtlQueue(){

//设置过期时间

Map args = new HashMap<>();

args.put("x-message-ttl",5000);

//配置死信队列

args.put("x-dead-letter-exchange","dead-direct-exchange");

args.put("x-dead-letter-routing-key","dead");

return new Queue("ttl.direct.queue",true,false,false,args);

}

@Bean

public Queue directTtlMsgQueue(){

//设置过期时间

Map args = new HashMap<>();

args.put("x-message-ttl",5000);

//配置死信队列

args.put("x-dead-letter-exchange","dead-direct-exchange");

args.put("x-dead-letter-routing-key","dead");

return new Queue("ttl.msg.direct.queue",true,false,false,args);

}

//3:队列绑定交换机

@Bean

public Binding directTtlBinding(){

return BindingBuilder.bind(directTtlQueue()).to(directTtlExchange()).with("ttl");

}

@Bean

public Binding directTtlMsgBinding(){

return BindingBuilder.bind(directTtlMsgQueue()).to(directTtlExchange()).with("ttlmsg");

}

}

package com.mcode.mq.config;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* @description:

* @author: xie

* @create: 2021-09-05 12:51

* 死信队列配置

**/

@Configuration

public class DeadRabbitmqConfig {

//1:声明注册交换

@Bean

public DirectExchange deadExchange(){

return new DirectExchange("dead-direct-exchange",true,false);

}

//2:声明队列

@Bean

public Queue deadQueue(){

return new Queue("dead.direct.queue",true);

}

//3:队列绑定交换机

@Bean

public Binding deadBinding(){

return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");

}

}

发送消息

package com.mcode.mq.service;

import cn.hutool.core.lang.Snowflake;

import org.springframework.amqp.AmqpException;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessagePostProcessor;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

/**

* @author: xie

* @create: 2021-09-05 12:37

**/

@Service

public class OrderService {

@Autowired

private RabbitTemplate rabbitTemplate;

/**

* @Description: 生成订单

* @Param:

* @return:

* @Author: xiebq

* @Date: 2021/9/5

*/

public void makeOrder(String userId,String productId,int num){

//1:根据商品Id查询库存是否充足

//2:保存订单

Snowflake snowflake = new Snowflake(2L,1L);

String orderId = snowflake.nextIdStr();

System.out.println("生成订单成功: " + orderId);

//3:通过mq完后曾消息得分发

/**

* 参数说明

* @param1:交换机

* @param2:路由Key或者队列名称

* @param3:消息内容

*/

//************************** fanout ***********************************//

String exchangeName = "fanout-order-exchange";

String routingKey = "";

rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);

//************************** direct ***********************************//

String directExchangeName = "direct-order-exchange";

String directRoutingKey_order = "order";

String directRoutingKey_sms = "order";

rabbitTemplate.convertAndSend(directExchangeName,directRoutingKey_order,orderId);

rabbitTemplate.convertAndSend(directExchangeName,directRoutingKey_sms,orderId);

//************************** topic ***********************************//

String topicExchangeName = "topic-order-exchange";

String topicRoutingKey_order = "com.order.msg";

String topicRoutingKey_email = "com.email.msg";

String topicRoutingKey_sms = "com.sms.msg";

rabbitTemplate.convertAndSend(topicExchangeName,topicRoutingKey_order,orderId);

rabbitTemplate.convertAndSend(topicExchangeName,topicRoutingKey_email,orderId);

}

public void makeOrderTtl(String userId,String productId,int num){

Snowflake snowflake = new Snowflake(2L,1L);

String orderId = snowflake.nextIdStr();

System.out.println("生成订单成功: " + orderId);

//3:通过mq完后曾消息得分发

//************************** direct ***********************************//

String directTtlExchangeName = "ttl-direct-exchange";

String directTtlRoutingKey_order = "ttl";

rabbitTemplate.convertAndSend(directTtlExchangeName,directTtlRoutingKey_order,orderId);

}

public void makeOrderTtlMessage(String userId,String productId,int num){

Snowflake snowflake = new Snowflake(2L,1L);

String orderId = snowflake.nextIdStr();

System.out.println("生成订单成功: " + orderId);

//3:通过mq完后曾消息得分发

//************************** direct ***********************************//

String directTtlExchangeName = "ttl-direct-exchange";

String directTtlRoutingKey_order = "ttlmsg";

MessagePostProcessor postProcessor = new MessagePostProcessor() {

@Override

public Message postProcessMessage(Message message) throws AmqpException {

message.getMessageProperties().setExpiration("5000");

message.getMessageProperties().setContentEncoding("UTF-8");

return message;

}

};

rabbitTemplate.convertAndSend(directTtlExchangeName,directTtlRoutingKey_order,orderId,postProcessor);

}

}

以上是生产者的基本核心代码。

2.3 消费者

pom.xml

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

mcode-rabbitmq

com.mcode

1.0-SNAPSHOT

4.0.0

springboot-order-mq-consumer

org.springframework.boot

spring-boot-starter-amqp

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-test

application.yml

# 服务端口

server:

port: 9081

# 配置rabbitmq服务,以下配置其实spring-starter已经有默认的配置了

# 所以本地服务的时候配置不配置都可以

spring:

rabbitmq:

username: guest

password: guest

virtual-host: /

host: 127.0.0.1

port: 5672

启动类

package com.mcode.mq;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

/**

* @author: xie

* @date: 2021-09-06

**/

@SpringBootApplication

public class MqConsumerApplication {

public static void main(String[] args) {

SpringApplication.run(MqConsumerApplication.class,args);

}

}

配置类(编码方式)

package com.mcode.mq.config;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* @description:

* @author: xie

* @create: 2021-09-05 12:51

* 队列的声明定义在生产者或者消费者都可以,但是相对来说还是定义在消费者这边比较好

**/

@Configuration

public class DirectRabbitmqConfig {

//1:声明注册交换

@Bean

public DirectExchange directExchange(){

return new DirectExchange("direct-order-exchange",true,false);

}

//2:声明队列

@Bean

public Queue orderDirectQueue(){

return new Queue("order.direct.queue",true);

}

@Bean

public Queue smsDirectQueue(){

return new Queue("sms.direct.queue",true);

}

@Bean

public Queue emailDirectQueue(){

return new Queue("email.direct.queue",true);

}

//3:队列绑定交换机

@Bean

public Binding orderDirectBinding(){

return BindingBuilder.bind(orderDirectQueue()).to(directExchange()).with("order");

}

@Bean

public Binding smsDirectBinding(){

return BindingBuilder.bind(smsDirectQueue()).to(directExchange()).with("sms");

}

@Bean

public Binding emailDirectBinding(){

return BindingBuilder.bind(emailDirectQueue()).to(directExchange()).with("email");

}

}

package com.mcode.mq.config;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* @description:

* @author: xie

* @create: 2021-09-05 12:51

**/

@Configuration

public class FanoutRabbitmqConfig {

//1:声明注册交换

@Bean

public FanoutExchange fanoutExchange(){

return new FanoutExchange("fanout-order-exchange",true,false);

}

//2:声明队列

@Bean

public Queue orderQueue(){

return new Queue("order.fanout.queue",true);

}

@Bean

public Queue smsQueue(){

return new Queue("sms.fanout.queue",true);

}

@Bean

public Queue emailQueue(){

return new Queue("email.fanout.queue",true);

}

//3:队列绑定交换机

@Bean

public Binding orderBinding(){

return BindingBuilder.bind(orderQueue()).to(fanoutExchange());

}

@Bean

public Binding smsBinding(){

return BindingBuilder.bind(smsQueue()).to(fanoutExchange());

}

@Bean

public Binding emailBinding(){

return BindingBuilder.bind(emailQueue()).to(fanoutExchange());

}

}

package com.mcode.mq.config;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;

/**

* @description:

* @author: xie

* @create: 2021-09-05 12:51

* 过期时间

**/

@Configuration

public class TTLRabbitmqConfig {

//1:声明注册交换

@Bean

public DirectExchange directTtlExchange(){

return new DirectExchange("ttl-direct-exchange",true,false);

}

//2:声明队列

@Bean

public Queue directTtlQueue(){

//设置过期时间

Map args = new HashMap<>();

args.put("x-message-ttl",5000);

return new Queue("ttl.direct.queue",true,false,false,args);

}

@Bean

public Queue directTtlMsgQueue(){

//设置过期时间

Map args = new HashMap<>();

args.put("x-message-ttl",5000);

return new Queue("ttl.msg.direct.queue",true,false,false,args);

}

//3:队列绑定交换机

@Bean

public Binding directTtlBinding(){

return BindingBuilder.bind(directTtlQueue()).to(directTtlExchange()).with("ttl");

}

@Bean

public Binding directTtlMsgBinding(){

return BindingBuilder.bind(directTtlMsgQueue()).to(directTtlExchange()).with("ttlmsg");

}

}

消费代码

主题模式

package com.mcode.mq.service.topic;

import org.springframework.amqp.core.ExchangeTypes;

import org.springframework.amqp.rabbit.annotation.*;

import org.springframework.stereotype.Component;

/**

* @author: xie

* @date: 2021-09-06

**/

@Component

@RabbitListener(bindings = @QueueBinding(

value = @Queue(value = "order.topic.queue",durable = "true",autoDelete = "false"),

exchange = @Exchange(value = "topic-order-exchange",type = ExchangeTypes.TOPIC),

key = "#.order.#"

))

public class TopicOrderConsumer {

@RabbitHandler

public void reviceMessage(String message){

System.out.println("order-topic --- 接收到订单信息:" + message);

}

}

发布订阅模式

package com.mcode.mq.service.direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

/**

* @author: xie

* @date: 2021-09-06

**/

@Component

@RabbitListener(queues = {"order.direct.queue"})

public class DirectOrderConsumer {

@RabbitHandler

public void reviceMessage(String message){

System.out.println("order-direct --- 接收到订单信息:" + message);

}

}

广播模式

package com.mcode.mq.service.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

/**

* @author: xiebq

* @date: 2021-09-06

**/

@Component

@RabbitListener(queues = {"order.fanout.queue"})

public class FanoutOrderConsumer {

@RabbitHandler

public void reviceMessage(String message){

System.out.println("order-fanout --- 接收到订单信息:" + message);

}

}

以上是消费者的核心代码。

3、参考代码

        页面顶部即可下载。

相关文章

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。