目录
0.交换机种类和区别
1.声明队列和交换机以及RountKey
2.初始化循环绑定
3.声明交换机
4.监听队列
4.1 监听普通队列
4.2监听死信队列
5.削峰填谷的实现
0.交换机种类和区别
Direct Exchange(直连交换机):
直连交换机将消息发送到与消息的路由键完全匹配的队列。它是最简单的交换机类型之一。当一个队列使用某个直连交换机绑定时,它需要指定一个绑定键(binding key),当消息的路由键与该绑定键完全匹配时,消息会被发送到该队列。 Fanout Exchange(扇出交换机):
扇出交换机会将消息发送到与其绑定的所有队列,忽略消息的路由键。当一个队列使用扇出交换机绑定时,它会接收到交换机发送的所有消息,无论消息的路由键是什么。 Topic Exchange(主题交换机):
主题交换机根据消息的路由键和绑定键之间的模式匹配来路由消息。绑定键可以使用通配符进行匹配,支持 '*' 匹配一个单词,'#' 匹配零个或多个单词,从而允许更灵活的路由规则。 Headers Exchange(标头交换机):
标头交换机根据消息的标头(headers)中的键值对来路由消息,而不是使用路由键。在将队列绑定到标头交换机时,可以指定一组标头键值对,只有当消息的标头中包含与绑定相匹配的所有键值对时,消息才会被路由到该队列。
如果满足key的前提下,绑定同一个交换机的队列都会分配到相同数量的信息
比如此时交换机有20条信息,a,b队列都会分配到20条信息
默认情况下,会轮询分配给消费者,也可以设置最多获取多少条未被消费的信息,根据消费者的消费能力来设置
1.声明队列和交换机以及RountKey
package com.example.config;
import lombok.Getter;
@Getter
public enum RabbitmqBind {
DATA_CLEAN_PROCESS_DEAD(
RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS_DEAD,
RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS_DEAD,
false,
false,
null,
null
),
DATA_CLEAN_PROCESS(
RabbitMqExchangeEnum.E_DIRECT_RCP,
RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS,
RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS,
true,
true,
RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS_DEAD),
SMS_CLEAN_DEAD(
RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
RabbitMqQueueConstants.Q_API_TO_DCN_SMS_DEAD,
RabbitmqRoutingKey.K_DATA_CLEAN_FINISH_DEAD,
true,
false,
null,
null
),
SMS_CLEAN(
RabbitMqExchangeEnum.E_TOPIC_RCP,
RabbitMqQueueConstants.Q_API_TO_DCN_SMS,
RabbitmqRoutingKey.K_API_TO_DCN_SMS,
true,
true,
RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
RabbitmqRoutingKey.K_DATA_CLEAN_FINISH_DEAD
),
;
/**
* 交换机
*/
private RabbitMqExchangeEnum exchange;
/**
* 队列名称
*/
private String queueName;
/**
* 路由Key
*/
private RabbitmqRoutingKey routingKey;
/**
* 绑定标识
* 是否启用
*/
private Boolean isBind;
/**
* 是否绑定死信
*/
private Boolean isDeathBelief;
/**
* 绑定的死信交换机
*/
private RabbitMqExchangeEnum boundDeadExchange;
/**
* 死信key
*/
private RabbitmqRoutingKey deadRoutingKey;
RabbitmqBind(RabbitMqExchangeEnum exchange, String queueName, RabbitmqRoutingKey routingKey, Boolean isBind,
Boolean isDeathBelief, RabbitMqExchangeEnum boundDeadExchange, RabbitmqRoutingKey deadRoutingKey
) {
this.exchange = exchange;
this.queueName = queueName;
this.routingKey = routingKey;
this.isBind = isBind;
this.isDeathBelief = isDeathBelief;
this.boundDeadExchange = boundDeadExchange;
this.deadRoutingKey = deadRoutingKey;
}
/**
* 交换机
*/
@Getter
public enum RabbitMqExchangeEnum {
/**
* 交换机定义,类型 - 名称
*/
E_DIRECT_RCP("direct", "E_DIRECT_RCP"),
DEAD_E_DIRECT_RCP("direct", "DEAD_E_DIRECT_RCP"),
E_TOPIC_RCP("topic", "E_TOPIC_RCP"),
E_TOPIC_PAY("topic", "E_TOPIC_PAY");
private String exchangeType;
private String exchangeName;
RabbitMqExchangeEnum(String exchangeType, String exchangeName) {
this.exchangeType = exchangeType;
this.exchangeName = exchangeName;
}
}
/**
* 队列名定义
*/
public interface RabbitMqQueueConstants {
/**
* 接收清洗数据
*/
String Q_DATA_CLEAN_PROCESS = "RMPS_TO_RCP_DATA_CLEAN_PROCESS";
/**
* 清洗结束通知
*/
String Q_API_TO_DCN_SMS = "Q_API_TO_DCN_SMS";
/**
* 死信队列
*/
String Q_DATA_CLEAN_PROCESS_DEAD = "Q_DATA_CLEAN_PROCESS_DEAD";
/**
* 清洗结束通知死信队列
*/
String Q_API_TO_DCN_SMS_DEAD = "Q_API_TO_DCN_SMS_DEAD";
}
/**
* routingKey
*/
@Getter
public enum RabbitmqRoutingKey {
/**
* 路由
*/
K_DATA_CLEAN_PROCESS("K_DATA_CLEAN_PROCESS"),
K_API_TO_DCN_SMS("K_API_TO_DCN_SMS"),
// 路由绑定死信路由
DEAD("DEAD"),
//死信路由
K_DATA_CLEAN_PROCESS_DEAD("K_DATA_CLEAN_PROCESS_DEAD"),
K_DATA_CLEAN_FINISH_DEAD("K_DATA_CLEAN_FINISH_DEAD"),
;
private String keyName;
RabbitmqRoutingKey(String keyName) {
this.keyName = keyName;
}
}
}
2.初始化循环绑定
package com.example.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Arrays;
@Configuration
@ConditionalOnClass(EnableRabbit.class)
public class MqConfig {
@Resource
protected RabbitTemplate rabbitTemplate;
@Resource
ConnectionFactory connectionFactory;
//
// @Lazy
// @Autowired
// protected RabbitAdmin rabbitAdmin;
//
//
// public static final int DEFAULT_CONCURRENT = 10;
//
// @Bean("customContainerFactory")
// public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
// ConnectionFactory connectionFactory) {
// SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// factory.setConcurrentConsumers(DEFAULT_CONCURRENT);
// factory.setMaxConcurrentConsumers(DEFAULT_CONCURRENT);
// configurer.configure(factory, connectionFactory);
// return factory;
// }
//
// @Bean
// @ConditionalOnMissingBean
// public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
// return new RabbitTransactionManager(connectionFactory);
// }
//
// @Bean
// @ConditionalOnMissingBean
// public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
// return new RabbitAdmin(connectionFactory);
// }
@PostConstruct
protected void init() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitTemplate.setChannelTransacted(true);
//创建exchange
Arrays.stream(RabbitmqBind.RabbitMqExchangeEnum.values())
.forEach(rabbitMqExchangeEnum -> {
Exchange exchange = RabbitmqExchange
.getInstanceByType(rabbitMqExchangeEnum.getExchangeType())
.createExchange(rabbitMqExchangeEnum.getExchangeName());
rabbitAdmin.declareExchange(exchange);
}
);
//创建队列并绑定exchange
Arrays.stream(RabbitmqBind.values()).forEach(RabbitmqBind -> {
if (RabbitmqBind.getIsBind()) {
if (RabbitmqBind.getIsDeathBelief()) {
//需要绑定死信交换机的队列
rabbitAdmin.declareQueue(QueueBuilder.durable(RabbitmqBind.getQueueName())
.ttl(60000).deadLetterExchange(RabbitmqBind.getBoundDeadExchange().getExchangeName())
.deadLetterRoutingKey(RabbitmqBind.getDeadRoutingKey().getKeyName()).build());
rabbitAdmin.declareBinding(new Binding(RabbitmqBind.getQueueName(),
Binding.DestinationType.QUEUE,
RabbitmqBind.getExchange().getExchangeName(),
RabbitmqBind.getRoutingKey().getKeyName(), null));
} else {
//不需要绑定死信交换机的队列
rabbitAdmin.declareQueue(new Queue(RabbitmqBind.getQueueName(),
true, false, false, null));
rabbitAdmin.declareBinding(new Binding(RabbitmqBind.getQueueName(),
Binding.DestinationType.QUEUE,
RabbitmqBind.getExchange().getExchangeName(),
RabbitmqBind.getRoutingKey().getKeyName(), null));
}
}
});
}
}
绑定的形式由枚举类中定义
3.声明交换机
package com.example.config;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.TopicExchange;
import java.util.Arrays;
@Getter
@Slf4j
public enum RabbitmqExchange {
DIRECT("direct"){
@Override
public Exchange createExchange(String exchangeName) {
return new DirectExchange(exchangeName, true, false);
}
},
TOPIC("topic"){
@Override
public Exchange createExchange(String exchangeName) {
return new TopicExchange(exchangeName, true, false);
}
};
public static RabbitmqExchange getInstanceByType(String type){
return Arrays.stream(RabbitmqExchange.values()).filter(e -> e.getType().equals(type))
.findAny()
.orElseThrow(() ->
// new ProcessException("无效的exchange type")
new RuntimeException("无效的exchange type")
);
}
private String type;
RabbitmqExchange(String type) {
this.type = type;
}
public abstract Exchange createExchange(String exchangeName);
}
4.监听队列
4.1 监听普通队列
package com.example.listener;
import com.example.config.RabbitmqBind;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RabbitListener(queues = {
RabbitmqBind.RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS }, concurrency = "1-5")
//, containerFactory = "customContainerFactory"
public class MqListener {
@RabbitHandler
public void processMessage(String message) {
log.info("DataClean recive message :{} ", message);
process(message);
}
@RabbitHandler
public void processMessage(byte[] message) {
String msg = new String(message);
log.info("DataClean recive message :{} ", msg);
process(msg);
}
/**
* 处理推送消息
* @param message
*/
private void process(String message) {
log.info("process message :{}" , message);
if(StringUtils.isBlank(message)) {
log.error("process message is blank , message:{}" , message);
return;
}
}
}
监听并处理任务
4.2监听死信队列
package com.example.listener;
import com.example.config.RabbitmqBind;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RabbitListener(queues = {
RabbitmqBind.RabbitMqQueueConstants.Q_API_TO_DCN_SMS_DEAD }, concurrency = "1-5")
public class DeadListener {
@RabbitHandler
public void processMessage(String message) {
log.info("DataClean recive message :{} ", message);
process(message);
}
@RabbitHandler
public void processMessage(byte[] message) {
String msg = new String(message);
log.info("DataClean recive message :{} ", msg);
process(msg);
}
/**
* 处理推送消息
* @param message
*/
private void process(String message) {
log.info("Dead process message :{}" , message);
if(StringUtils.isBlank(message)) {
log.error("Dead process message is blank , message:{}" , message);
return;
}
}
}
5.削峰填谷的实现
把高峰期的消息填进低峰期
可以用拉取的方式来实现
或者用消费者的最大数量和最小数量来实现
channel.basicQos();//设置最大获取未确认消息的数量,实现权重
精彩内容
发表评论