目录

0.交换机种类和区别

1.声明队列和交换机以及RountKey

2.初始化循环绑定

3.声明交换机

4.监听队列

4.1 监听普通队列

4.2监听死信队列

 5.削峰填谷的实现

0.交换机种类和区别

Direct Exchange(直连交换机):

直连交换机将消息发送到与消息的路由键完全匹配的队列。它是最简单的交换机类型之一。当一个队列使用某个直连交换机绑定时,它需要指定一个绑定键(binding key),当消息的路由键与该绑定键完全匹配时,消息会被发送到该队列。 Fanout Exchange(扇出交换机):

扇出交换机会将消息发送到与其绑定的所有队列,忽略消息的路由键。当一个队列使用扇出交换机绑定时,它会接收到交换机发送的所有消息,无论消息的路由键是什么。 Topic Exchange(主题交换机):

主题交换机根据消息的路由键和绑定键之间的模式匹配来路由消息。绑定键可以使用通配符进行匹配,支持 '*' 匹配一个单词,'#' 匹配零个或多个单词,从而允许更灵活的路由规则。 Headers Exchange(标头交换机):

标头交换机根据消息的标头(headers)中的键值对来路由消息,而不是使用路由键。在将队列绑定到标头交换机时,可以指定一组标头键值对,只有当消息的标头中包含与绑定相匹配的所有键值对时,消息才会被路由到该队列。

如果满足key的前提下,绑定同一个交换机的队列都会分配到相同数量的信息

比如此时交换机有20条信息,a,b队列都会分配到20条信息

默认情况下,会轮询分配给消费者,也可以设置最多获取多少条未被消费的信息,根据消费者的消费能力来设置

1.声明队列和交换机以及RountKey

package com.example.config;

import lombok.Getter;

@Getter

public enum RabbitmqBind {

DATA_CLEAN_PROCESS_DEAD(

RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,

RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS_DEAD,

RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS_DEAD,

false,

false,

null,

null

),

DATA_CLEAN_PROCESS(

RabbitMqExchangeEnum.E_DIRECT_RCP,

RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS,

RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS,

true,

true,

RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,

RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS_DEAD),

SMS_CLEAN_DEAD(

RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,

RabbitMqQueueConstants.Q_API_TO_DCN_SMS_DEAD,

RabbitmqRoutingKey.K_DATA_CLEAN_FINISH_DEAD,

true,

false,

null,

null

),

SMS_CLEAN(

RabbitMqExchangeEnum.E_TOPIC_RCP,

RabbitMqQueueConstants.Q_API_TO_DCN_SMS,

RabbitmqRoutingKey.K_API_TO_DCN_SMS,

true,

true,

RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,

RabbitmqRoutingKey.K_DATA_CLEAN_FINISH_DEAD

),

;

/**

* 交换机

*/

private RabbitMqExchangeEnum exchange;

/**

* 队列名称

*/

private String queueName;

/**

* 路由Key

*/

private RabbitmqRoutingKey routingKey;

/**

* 绑定标识

* 是否启用

*/

private Boolean isBind;

/**

* 是否绑定死信

*/

private Boolean isDeathBelief;

/**

* 绑定的死信交换机

*/

private RabbitMqExchangeEnum boundDeadExchange;

/**

* 死信key

*/

private RabbitmqRoutingKey deadRoutingKey;

RabbitmqBind(RabbitMqExchangeEnum exchange, String queueName, RabbitmqRoutingKey routingKey, Boolean isBind,

Boolean isDeathBelief, RabbitMqExchangeEnum boundDeadExchange, RabbitmqRoutingKey deadRoutingKey

) {

this.exchange = exchange;

this.queueName = queueName;

this.routingKey = routingKey;

this.isBind = isBind;

this.isDeathBelief = isDeathBelief;

this.boundDeadExchange = boundDeadExchange;

this.deadRoutingKey = deadRoutingKey;

}

/**

* 交换机

*/

@Getter

public enum RabbitMqExchangeEnum {

/**

* 交换机定义,类型 - 名称

*/

E_DIRECT_RCP("direct", "E_DIRECT_RCP"),

DEAD_E_DIRECT_RCP("direct", "DEAD_E_DIRECT_RCP"),

E_TOPIC_RCP("topic", "E_TOPIC_RCP"),

E_TOPIC_PAY("topic", "E_TOPIC_PAY");

private String exchangeType;

private String exchangeName;

RabbitMqExchangeEnum(String exchangeType, String exchangeName) {

this.exchangeType = exchangeType;

this.exchangeName = exchangeName;

}

}

/**

* 队列名定义

*/

public interface RabbitMqQueueConstants {

/**

* 接收清洗数据

*/

String Q_DATA_CLEAN_PROCESS = "RMPS_TO_RCP_DATA_CLEAN_PROCESS";

/**

* 清洗结束通知

*/

String Q_API_TO_DCN_SMS = "Q_API_TO_DCN_SMS";

/**

* 死信队列

*/

String Q_DATA_CLEAN_PROCESS_DEAD = "Q_DATA_CLEAN_PROCESS_DEAD";

/**

* 清洗结束通知死信队列

*/

String Q_API_TO_DCN_SMS_DEAD = "Q_API_TO_DCN_SMS_DEAD";

}

/**

* routingKey

*/

@Getter

public enum RabbitmqRoutingKey {

/**

* 路由

*/

K_DATA_CLEAN_PROCESS("K_DATA_CLEAN_PROCESS"),

K_API_TO_DCN_SMS("K_API_TO_DCN_SMS"),

// 路由绑定死信路由

DEAD("DEAD"),

//死信路由

K_DATA_CLEAN_PROCESS_DEAD("K_DATA_CLEAN_PROCESS_DEAD"),

K_DATA_CLEAN_FINISH_DEAD("K_DATA_CLEAN_FINISH_DEAD"),

;

private String keyName;

RabbitmqRoutingKey(String keyName) {

this.keyName = keyName;

}

}

}

2.初始化循环绑定

package com.example.config;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.Exchange;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.core.QueueBuilder;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.core.RabbitAdmin;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;

import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.context.annotation.Lazy;

import javax.annotation.PostConstruct;

import javax.annotation.Resource;

import java.util.Arrays;

@Configuration

@ConditionalOnClass(EnableRabbit.class)

public class MqConfig {

@Resource

protected RabbitTemplate rabbitTemplate;

@Resource

ConnectionFactory connectionFactory;

//

// @Lazy

// @Autowired

// protected RabbitAdmin rabbitAdmin;

//

//

// public static final int DEFAULT_CONCURRENT = 10;

//

// @Bean("customContainerFactory")

// public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,

// ConnectionFactory connectionFactory) {

// SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

// factory.setConcurrentConsumers(DEFAULT_CONCURRENT);

// factory.setMaxConcurrentConsumers(DEFAULT_CONCURRENT);

// configurer.configure(factory, connectionFactory);

// return factory;

// }

//

// @Bean

// @ConditionalOnMissingBean

// public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {

// return new RabbitTransactionManager(connectionFactory);

// }

//

// @Bean

// @ConditionalOnMissingBean

// public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {

// return new RabbitAdmin(connectionFactory);

// }

@PostConstruct

protected void init() {

RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);

rabbitTemplate.setChannelTransacted(true);

//创建exchange

Arrays.stream(RabbitmqBind.RabbitMqExchangeEnum.values())

.forEach(rabbitMqExchangeEnum -> {

Exchange exchange = RabbitmqExchange

.getInstanceByType(rabbitMqExchangeEnum.getExchangeType())

.createExchange(rabbitMqExchangeEnum.getExchangeName());

rabbitAdmin.declareExchange(exchange);

}

);

//创建队列并绑定exchange

Arrays.stream(RabbitmqBind.values()).forEach(RabbitmqBind -> {

if (RabbitmqBind.getIsBind()) {

if (RabbitmqBind.getIsDeathBelief()) {

//需要绑定死信交换机的队列

rabbitAdmin.declareQueue(QueueBuilder.durable(RabbitmqBind.getQueueName())

.ttl(60000).deadLetterExchange(RabbitmqBind.getBoundDeadExchange().getExchangeName())

.deadLetterRoutingKey(RabbitmqBind.getDeadRoutingKey().getKeyName()).build());

rabbitAdmin.declareBinding(new Binding(RabbitmqBind.getQueueName(),

Binding.DestinationType.QUEUE,

RabbitmqBind.getExchange().getExchangeName(),

RabbitmqBind.getRoutingKey().getKeyName(), null));

} else {

//不需要绑定死信交换机的队列

rabbitAdmin.declareQueue(new Queue(RabbitmqBind.getQueueName(),

true, false, false, null));

rabbitAdmin.declareBinding(new Binding(RabbitmqBind.getQueueName(),

Binding.DestinationType.QUEUE,

RabbitmqBind.getExchange().getExchangeName(),

RabbitmqBind.getRoutingKey().getKeyName(), null));

}

}

});

}

}

 绑定的形式由枚举类中定义

3.声明交换机

package com.example.config;

import lombok.Getter;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.Exchange;

import org.springframework.amqp.core.TopicExchange;

import java.util.Arrays;

@Getter

@Slf4j

public enum RabbitmqExchange {

DIRECT("direct"){

@Override

public Exchange createExchange(String exchangeName) {

return new DirectExchange(exchangeName, true, false);

}

},

TOPIC("topic"){

@Override

public Exchange createExchange(String exchangeName) {

return new TopicExchange(exchangeName, true, false);

}

};

public static RabbitmqExchange getInstanceByType(String type){

return Arrays.stream(RabbitmqExchange.values()).filter(e -> e.getType().equals(type))

.findAny()

.orElseThrow(() ->

// new ProcessException("无效的exchange type")

new RuntimeException("无效的exchange type")

);

}

private String type;

RabbitmqExchange(String type) {

this.type = type;

}

public abstract Exchange createExchange(String exchangeName);

}

4.监听队列

4.1 监听普通队列

package com.example.listener;

import com.example.config.RabbitmqBind;

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang.StringUtils;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Slf4j

@Component

@RabbitListener(queues = {

RabbitmqBind.RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS }, concurrency = "1-5")

//, containerFactory = "customContainerFactory"

public class MqListener {

@RabbitHandler

public void processMessage(String message) {

log.info("DataClean recive message :{} ", message);

process(message);

}

@RabbitHandler

public void processMessage(byte[] message) {

String msg = new String(message);

log.info("DataClean recive message :{} ", msg);

process(msg);

}

/**

* 处理推送消息

* @param message

*/

private void process(String message) {

log.info("process message :{}" , message);

if(StringUtils.isBlank(message)) {

log.error("process message is blank , message:{}" , message);

return;

}

}

}

 监听并处理任务

4.2监听死信队列

package com.example.listener;

import com.example.config.RabbitmqBind;

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang.StringUtils;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Slf4j

@Component

@RabbitListener(queues = {

RabbitmqBind.RabbitMqQueueConstants.Q_API_TO_DCN_SMS_DEAD }, concurrency = "1-5")

public class DeadListener {

@RabbitHandler

public void processMessage(String message) {

log.info("DataClean recive message :{} ", message);

process(message);

}

@RabbitHandler

public void processMessage(byte[] message) {

String msg = new String(message);

log.info("DataClean recive message :{} ", msg);

process(msg);

}

/**

* 处理推送消息

* @param message

*/

private void process(String message) {

log.info("Dead process message :{}" , message);

if(StringUtils.isBlank(message)) {

log.error("Dead process message is blank , message:{}" , message);

return;

}

}

}

 5.削峰填谷的实现

把高峰期的消息填进低峰期

可以用拉取的方式来实现

或者用消费者的最大数量和最小数量来实现

channel.basicQos();//设置最大获取未确认消息的数量,实现权重

精彩内容

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。