提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

发布订阅模式 (fanout)---案例前言@RabbitListener和@RabbitHandler的使用

1.通过Spring官网快速创建一个RabbitMQ的生产者项目2.导入项目后在application.yml文件中配置3.创建一个RabbitMqConfig配置类4. 生产者5.测试生产者创建mq是否成功访问网址: http://localhost:15672/#/queues

6.再创建一个消费者项目7. 创建消费者8.测试消费者接收信息

发布订阅模式 (fanout)—案例

前言

@RabbitListener和@RabbitHandler的使用

1.通过Spring官网快速创建一个RabbitMQ的生产者项目

2.导入项目后在application.yml文件中配置

# 服务端口

server:

port: 8081

#配置rabbitmq服务 测试不用写,默认本机

spring:

rabbitmq:

username: guest #默认账号

password: guest #默认密码

virtual-host: /

host: localhost

port: 5672

#消息确认配置项

#确认消息已发送到交换机: Exchange

publisher-confirm-type: correlated

#确认消息已发送到队列: Queue

publisher-returns: true

3.创建一个RabbitMqConfig配置类

package com.exam.RebbitMQ.config;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class RabbitMqConfig {

//1:声明注册fanout模式的交换机,参数1:对应的service的fanoutName,参数2:持久化(true,false),参数3:自动删除(false/true)

@Bean

public FanoutExchange fanoutExchange(){

return new FanoutExchange("fanout_order_exchang", true, false);

}

//2:声明队列 sms.fanout.queue,email.fanout.queue,duanxin.fanout.queue

//参数1:名字,参数2:持久化队列true

//短信队列

@Bean

public Queue smsQueue() {

System.err.println("执行了sms");

return new Queue("sms.fanout.queue",true);

}

@Bean

public Queue duanxinQueue() {

System.err.println("执行了duanxin");

return new Queue("duanxin.fanout.queue",true);

}

//邮箱队列

@Bean

public Queue emailQueue() {

System.err.println("执行了email");

return new Queue("email.fanout.queue",true);

}

//3:完成绑定关系(队列和交换机完成绑定关系)

@Bean

public Binding smsBinding() {

//把smsQueue放到fanoutExchange交换机上面

return BindingBuilder.bind(smsQueue()).to(fanoutExchange());

}

@Bean

public Binding duanxinBinding() {

//把duanxinQueue放到fanoutExchange交换机上面

return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange());

}

@Bean

public Binding emailBinding() {

//把emailQueue放到fanoutExchange交换机上面

return BindingBuilder.bind(emailQueue()).to(fanoutExchange());

}

}

4. 生产者

OrderService

package com.exam.RebbitMQ.service;

public interface OrderService {

void makeOrder(String userid,String productid,int num);

}

OrderServiceImpl

package com.exam.RebbitMQ.service.Impl;

import java.util.UUID;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

import com.exam.RebbitMQ.service.OrderService;

@Service

public class OrderServiceImpl implements OrderService{

@Autowired

private RabbitTemplate rabbitTemplate;

/**

* 模拟用户下单

**/

public void makeOrder(String userid,String productid,int num) {

//1.根据商品ID查询商品是否充足

//2.保存订单

String orderId = UUID.randomUUID().toString();

System.err.println("订单生成成功"+orderId);

//3.通过MQ来完成消息的分发

//参数1:交换机 参数二:路由key/queue队列名称 参数三:消息内容

String exchangName ="fanout_order_exchang";

String routingKey = "";

rabbitTemplate.convertAndSend(exchangName, routingKey, orderId);

}

}

5.测试生产者创建mq是否成功

在项目的test中发送请求

package com.huyi.rabbitmq;

import com.huyi.rabbitmq.service.OrderService;

import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest

class RabbitMqApplicationTests {

@Autowired

private OrderService orderService;

@Test

void contextLoads() {

orderService.makeOrder("1","1", 18);

}

}

访问网址: http://localhost:15672/#/queues

6.再创建一个消费者项目

yml配置

# 服务端口

server:

port: 8082

#配置rabbitmq服务 测试不用写,默认本机

spring:

rabbitmq:

username: guest #默认账号

password: guest #默认密码

virtual-host: /

host: localhost

port: 5672

#消息确认配置项

#确认消息已发送到交换机: Exchange

publisher-confirm-type: correlated

#确认消息已发送到队列: Queue

publisher-returns: true

7. 创建消费者

SmsConsumerService、SmsConsumerServiceImpl

package com.huyi.rabbitmq_consumber.service;

public interface SmsConsumerService {

void reviceMessage(String message);

}

package com.huyi.rabbitmq_consumber.service.Impl;

import com.huyi.rabbitmq_consumber.service.SmsConsumerService;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

import org.springframework.stereotype.Service;

@Component

public class SmsConsumerServiceImpl implements SmsConsumerService {

//注意:这里要和生产者RabbitMqConfig文件中的名字对应起来

@RabbitListener(queues = {"sms.fanout.queue"})

public void reviceMessage(String message) {

System.err.println("sms_fanout--接收到了订单信息");

}

}

EmailConsumerService、EmailConsumerServiceImpl

package com.huyi.rabbitmq_consumber.service;

public interface EmailConsumerService {

void reviceMessage(String message);

}

package com.huyi.rabbitmq_consumber.service.Impl;

import com.huyi.rabbitmq_consumber.service.EmailConsumerService;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

import org.springframework.stereotype.Service;

@Component

@RabbitListener(queues = {"email.fanout.queue"})

public class EmailConsumerServiceImpl implements EmailConsumerService {

@RabbitHandler

public void reviceMessage(String message) {

System.err.println("Email_fanout--接收到了订单信息"+message);

}

}

DuanxinConsumerService、DuanxinConsumerServiceImpl

package com.huyi.rabbitmq_consumber.service;

public interface DuanxinConsumerService {

void reviceMessage(String message);

}

package com.huyi.rabbitmq_consumber.service.Impl;

import com.huyi.rabbitmq_consumber.service.DuanxinConsumerService;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

import org.springframework.stereotype.Service;

@Component

@RabbitListener(queues = {"duanxin.fanout.queue"})

public class DuanxinConsumerServiceImpl implements DuanxinConsumerService {

@RabbitHandler

public void reviceMessage(String message) {

System.err.println("Duanxin_fanout--接收到了订单信息"+message);

}

}

8.测试消费者接收信息

启动消费者项目 启动生产者项目 查看消费者项目是否监听到了生产者的信息

精彩文章

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。