路由模式:

idea实现路由模式

package com.aaa.test.procedure; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; //发布消息 public class MyProcedureExDirect { @Test public void procedure() throws IOException, TimeoutException { //mq 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setUsername("cjh"); connectionFactory.setPassword("cjh"); connectionFactory.setHost("192.168.152.32"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/cjh"); // 创建管道 Connection connection = connectionFactory.newConnection(); // 管道 Channel channel = connection.createChannel(); /** * 发布订阅的时候 * 交换机->direct * routingkey是一个具体的值 * 队列 */ // 创建交换机 channel.exchangeDeclare("exchange_direct_test", BuiltinExchangeType.DIRECT,false); // 创建交换机 channel.queueDeclare("exchange_direct_queue_1",false,false,false,null); channel.queueDeclare("exchange_direct_queue_2",false,false,false,null); // 交换机绑定队列 channel.queueBind("exchange_direct_queue_1","exchange_direct_test","error"); channel.queueBind("exchange_direct_queue_2","exchange_direct_test","test"); channel.queueBind("exchange_direct_queue_2","exchange_direct_test","test2"); 发布消息 channel.basicPublish("exchange_direct_test","error",null,"我就是测试一下路由模式".getBytes()); } }

消费路由消息

package org.example; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Hello world! * */ public class ConsumerAppExchangeDirect { public static void main( String[] args ) throws IOException, TimeoutException { //mq 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setUsername("cjh"); connectionFactory.setPassword("cjh"); connectionFactory.setHost("192.168.152.32"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/cjh"); // 创建管道 Connection connection = connectionFactory.newConnection(); // 管道 Channel channel = connection.createChannel(); // 消费消息 Consumer consumer= new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 获取队列消息 body String s = new String(body); System.out.println("mq中的消息************* = " + s); } }; channel.basicConsume("exchange_direct_queue_1",true,consumer); } }

主题模式:

路由模式  角色是一样的

生产者  消费者  交换机 队列

交换机的类型topic 类型的

交换机和队列绑定的时候 routingkey的值属于通配符类型的

# 代表匹配一个或者多个单词  (多个单词之间是以.分割的)

Test.#    test.aaa   test.aaa.bbb

* 代表的是匹配一个单词

*.* 两个单词 test.aaa  test.test

package com.aaa.test.procedure; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; //发布消息 public class MyProcedureExTopic { @Test public void procedure() throws IOException, TimeoutException { //mq 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setUsername("cjh"); connectionFactory.setPassword("cjh"); connectionFactory.setHost("192.168.152.32"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/cjh"); // 创建管道 Connection connection = connectionFactory.newConnection(); // 管道 Channel channel = connection.createChannel(); /** * 发布订阅的时候 * 交换机->topic * routingkey是一个具体的值 * 队列 */ // 创建交换机 channel.exchangeDeclare("exchange_topic_test", BuiltinExchangeType.TOPIC,false); // 创建交换机 channel.queueDeclare("exchange_topic_queue_1",false,false,false,null); channel.queueDeclare("exchange_topic_queue_2",false,false,false,null); // 交换机绑定队列channel.queueBind("exchange_topic_queue_1","exchange_topic_test","test.#"); channel.queueBind("exchange_topic_queue_2","exchange_topic_test","*.aaa"); channel.queueBind("exchange_topic_queue_2","exchange_topic_test","test.*"); 发布消息 channel.basicPublish("exchange_topic_test","test.aaa",null,"我就是测试一下路由模式".getBytes()); } }

整合SpringBoot

加依赖

org.springframework.boot

spring-boot-starter-amqp

org.springframework.boot

spring-boot-starter-test

配置文件

spring: rabbitmq: username: cjh password: cjh host: 192.168.152.32 virtual-host: /cjh #虚拟主机在设置的时候以/开头 port: 5672 mq: exchange: name: test_exchange_topic queue: name1: test_topic_exchange_queue_1 name2: test_topic_exchange_queue_2

配置类

声明交换机

声明队列

交换机和对垒进行绑定

package com.aaa.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class TopicMqConfig { // 1.交换机 @Value("${mq.exchange.name}") private String EXCHANGENAME; @Value("${mq.queue.name1}") private String QUEUENAME1; @Value("${mq.queue.name2}") private String QUEUENAME2; // 声明交换机 @Bean("ex1") public Exchange getExChange(){ Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build(); return exchange; } // 2.队列 @Bean("queue1") public Queue getQueue1(){ Queue queue = QueueBuilder.nonDurable(QUEUENAME1).build(); return queue; } @Bean("queue2") public Queue getQueue2(){ Queue queue = QueueBuilder.nonDurable(QUEUENAME2).build(); return queue; } // 绑定交换机和队列 @Bean("binding1") public Binding bindingExQueue1(@Qualifier("ex1")Exchange exchange,@Qualifier("queue1") Queue queue){ Binding binding = BindingBuilder.bind(queue).to(exchange).with("*.*").noargs(); return binding; } @Bean("binding2") public Binding bindingExQueue2(@Qualifier("ex1")Exchange exchange,@Qualifier("queue2") Queue queue){ Binding binding = BindingBuilder.bind(queue).to(exchange).with("test.#").noargs(); return binding; }

}

生产者发布消息

package com.aaa; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; @SpringBootTest public class MqTest { @Value("${mq.exchange.name}") private String EXCHANGENAME; @Resource public RabbitTemplate rabbitTemplate; @Test void sendMsg(){ rabbitTemplate.convertAndSend(EXCHANGENAME,"test.topic","我就是测试一下springBoot和mq的整合"); } }

消费消息

消费者的服务application.yml和生产者是一样的

只需要监听队列(队列里面只要有消息就开始接收)

消息的可靠性传递

开启确认模式

package com.aaa; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; @SpringBootTest public class MqTest { @Value("${mq.exchange.name}") private String EXCHANGENAME; @Resource public RabbitTemplate rabbitTemplate; @Test void sendMsg(){ // 消息 访问成功了没 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * * @param correlationData * @param b 消息是否发送成功 * @param s */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { if (b){ System.out.println("发送成功"); }else { System.out.println("发送失败,原因是:"+s); } } }); rabbitTemplate.convertAndSend(EXCHANGENAME,"test.topic","我就是测试一下springBoot和mq的整合"); } }

Return(回退模式)

测试类

@Test void sendMsgReturn() { // 代表使用的是回退模式 rabbitTemplate.setMandatory(true); // 消息 rabbitTemplate.setReturnsCallback(returnedMessage -> System.out.println("消息回退了,回退的消息是:"+new String(returnedMessage.getMessage().getBody()))); rabbitTemplate.convertAndSend(EXCHANGENAME, "aaa.topic", "我就是测试一下springBoot和mq的整合"); }

相关阅读

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。