theme: vue-pro

前言

前面章节我们至少知道了rabbitmq的几个核心组件, 比如 exchange queue 和 routing key

还有java编程方面的 channel 和 connection

但是这些还不够运用于生产环境

本章内容

消息确认机制(message confirm)消息return机制

消息确认机制(confirm)

开始confirm机制, 生产者消息投递到exchange, exchange就会立即ack给生产者, 如果无法投递到exchange, 那么生产者就是产生nack

事务机制和 publisher confirm 机制确保的是消息能够正确地发送至 RabbitMQ,这里的“发送至 RabbitMQ”的含义是指消息被正确地发往至 RabbitMQ 的交换器,如果此交换器没有匹配的队列,那么消息也会丢失。

问: 事务机制为什么不讲?

答: 事务机制效率比较慢, P发送消息需要等待C的响应, 会导致极大的耗费性能

问: return机制或者confirm机制收到消息不成功后, 你要怎么处理呢?

答: 一般处理方式是: 重试几次, 如果还不通过, 那么打印日志, 交给人工处理

这里的重试可以考虑使用指数退避(重试频率从快到慢的过程)

异步方式消息确认

详细过程是在发送消息前保存消息到一个队列中, 在拿到遇到消息发送失败后, 我们拿取到队列中的消息, 然后尝试重试补偿

注意 Confirm 机制需要主动开启, 上图缺少了这段代码: channel.confirmSelect();

问: 这里引入redis保存这消息不合适吧?

答: 看起来这里还可以将消息保存到一个高并发队列中, 开一个定时器定时解决(立马再次发送消息大概率还是会失败, 每次等他个几百毫秒吧), 失败累计次数, 直到超过域值email通知运营人员

但是使用高并发队列只不过在内存中, 进程崩溃后改消息队列中的消息也将丢失

问: 那有别的方式吗?

答: 有, 保存到mysql或者直接持久化到queue中, 等到管理员找到原因并恢复后再次发送

同步消息确认

channel.queueDeclare(QUEUE_NAME, true, false, true, null);

channel.confirmSelect();

stopWatch.start();

for (int i = 0; i < 1000; i++) {

channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,

String.format("hello%d", i).getBytes(StandardCharsets.UTF_8));

// 单个消息确认机制

boolean flag = channel.waitForConfirms();

if (flag) {

System.err.println("消息发送成功");

}

}

但是1000次循环就需要调用 1000次 waitForConfirms 这效率特别低

channel.queueDeclare(QUEUE_NAME, true, false, true, null);

channel.confirmSelect();

int batchSize = 100;

int outstandingMessageCount = 0;

stopWatch.start();

for (int i = 0; i < 1000; i++) {

channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,

String.format("hello%d", i).getBytes(StandardCharsets.UTF_8));

outstandingMessageCount++;

if (batchSize == outstandingMessageCount) {

// 这是批量消息处理

channel.waitForConfirms();

outstandingMessageCount = 0;

}

}

这里就改成100次消息发送, 处理一次同步操作消息等待

消息return机制

当前的消息通过exchange投递到queue不成功, 就会触发return机制的函数

mandatory参数的作用是什么?

文字描述: 在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

immediate参数的作用(已弃用)

RabbitMQ 3.0版本开始去掉了对immediate参数的支持,原因是因为immediate参数会影响镜像队列的性能、增加了代码的复杂度,建议采用TTL和DLX的方法替代。

消息回退机制代码实现

Producer的思路非常简单, 设置mandatory = true, 然后channel.addReturnListener

public static final String EXCHANGE = "exchange";

public static final String ROUTING_KEY = "";

public static void main(String[] args) throws Exception {

ConnectionFactory factory = RabbitUtils.INSTANCE.connectionFactory();

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true,

true, null);

ConcurrentSkipListMap map = new ConcurrentSkipListMap<>();

channel.confirmSelect();

channel.addConfirmListener(new ConfirmListener() {

@Override

public void handleAck(long deliveryTag, boolean multiple) throws IOException {

System.err.println("消息发送成功...");

if (multiple) {

ConcurrentNavigableMap headMap = map.headMap(deliveryTag);

headMap.clear();

} else {

map.remove(deliveryTag);

}

}

@Override

public void handleNack(long deliveryTag, boolean multiple) throws IOException {

System.err.println("消息发送失败");

// 意味着这里存在多个消息被否定, 可以使用 head 拿到小于 deliveryTag 的消息, 全部进行重试

if (multiple) {

ConcurrentNavigableMap messageHeadMap = map.headMap(deliveryTag);

// 这里可以将消息保存到数据库或者redis中

messageHeadMap.forEach((id, s) -> {

System.err.println("id: " + id + " message: " + s);

});

messageHeadMap.clear();

} else {

String messageNode = map.remove(deliveryTag);

// 这里可以将消息保存到数据库或者redis中

System.err.println("message: " + messageNode);

}

}

});

channel.addReturnListener(returnMessage -> {

System.err.println("消息被return回来了...");

int replyCode = returnMessage.getReplyCode();

String replyText = returnMessage.getReplyText();

String exchange = returnMessage.getExchange();

String routingKey = returnMessage.getRoutingKey();

// AMQP.BasicProperties properties = returnMessage.getProperties();

String message = new String(returnMessage.getBody(), Charset.defaultCharset());

System.err.println("message: " + message);

System.err.println("exchange: " + exchange);

System.err.println("routingKey: " + routingKey);

System.err.println("replyText: " + replyText);

System.err.println("replyCode: " + replyCode);

});

String message = "message";

map.put(channel.getNextPublishSeqNo(), message);

channel.basicPublish(EXCHANGE, ROUTING_KEY, true, false, MessageProperties.PERSISTENT_TEXT_PLAIN,

message.getBytes(Charset.defaultCharset()));

}

Consumer:

public static final String QUEUE = "queue";

public static final String EXCHANGE = "exchange";

public static void main(String[] args) throws Exception {

ConnectionFactory factory = RabbitUtils.INSTANCE.connectionFactory();

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true, true, null);

channel.queueDeclare(QUEUE, true, false, true, null);

channel.queueBind(QUEUE, EXCHANGE, "");

channel.basicConsume(QUEUE, false, (consumerTag, message) -> {

System.err.println("消息消费成功, consumeTag: " + consumerTag + " message: "

+ new String(message.getBody(), Charset.defaultCharset()));

channel.basicAck(message.getEnvelope().getDeliveryTag(), false);

}, consumerTag -> System.err.println("消息接受失败, consumeTag: " + consumerTag));

}

问题思考

有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。

但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。

而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?

另一个return机制: exchange备机

来总结下前面的问题

消息回退增加消费者代码复杂度回退回来的消息要怎么处理? 记录日志也不知道怎么处理? 难道要手动处理?

在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?

备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时, 就是为它创建一个备胎,**当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。**当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

备机架构设计

备机和return机制同时存在默认走哪个机制?

备份exchange模式优先级高于消息return模式

什么情况的消息会进入备机?

消息无法投递到queue的原因, 我想想都有哪些?

queue不存在queue满了根据 routing key 找不到 queue (和第一种很像)

消息进入备机后, 我们该做什么?

答:

保存消息报警告, 推消息告知管理员最后在解决问题后将消息转发给正常的exchange

有哪些方法保存消息?

答:

可以保存到数据库, 但有IO上限, 不是很好的选择.将消息持久化到queue中, 但是不创建自动消费的消费者, 我们可以在问题被发现并解决后再手动启动处理备份消息保存到redis中, 方案不错, 但是在整个分布式系统中引入了新的变量, 导致项目不太问题

最终我还是选择了redis, 因为业务需求, 我们需要对备份的消息进行发送和查询甚至是删除操作, 持久化到queue不好操作, 当然如果你持久化到queue那么你需要保证消息

另一个原因是redis是分布式系统不可或缺的一个组件, 早晚都会用上的

我们虽然选择了redis方案, 但不意味着第二个方案(持久化到queue)不好, 我本人更加推荐使用方案二, 但是第二种方案我们需要及时处理问题, 否则可能导致消息爆满, 当然redis方案也是

发送消息的目的地是哪里?

正常是 exchange

那么怎么发送消息?

首先我们需要对消息进行包装, 不仅仅有内容还需要有发送地址

大致结构是这样:

消息id, 消息内容, 消息创建时间, 消息优先级

消息id方式消息重复消费消息创建时间检查消息超过一定域值, 还没被消费, 后续可以做严重警告消息优先级: 可以当做消息重要性, 数字越大优先级越高

public class ConfirmConsumer {

public static final String BACKUP_EXCHANGE = "backup.exchange";

public static final String CONFIRM_EXCHANGE = "confirm.exchange";

public static final String CONFIRM_QUEUE = "confirm.queue";

public static void main(String[] args) throws Exception {

ConnectionFactory factory = RabbitUtils.INSTANCE.connectionFactory();

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

HashMap arguments = new HashMap<>();

// 将exchange无法发送的消息, 转发给备份exchange

arguments.put("alternate-exchange", BACKUP_EXCHANGE);

channel.exchangeDeclare(CONFIRM_EXCHANGE, BuiltinExchangeType.DIRECT, true, true, arguments);

channel.queueDeclare(CONFIRM_QUEUE, true, false, true, null);

channel.queueBind(CONFIRM_QUEUE, CONFIRM_EXCHANGE, "");

DeliverCallback deliverCallback = (consumerTag, message) -> {

System.err.println("消息接收时间: "

+ DatePattern.NORM_DATETIME_FORMAT.format(new Date())

+ " consumerTag: " + consumerTag + " message: " + message);

channel.basicAck(message.getEnvelope().getDeliveryTag(), false);

};

CancelCallback cancelCallback = System.err::println;

channel.basicConsume(CONFIRM_QUEUE, false, deliverCallback, cancelCallback);

}

}

注意上面这段代码:

HashMap arguments = new HashMap<>();

// 将exchange无法发送的消息, 转发给备份exchange

arguments.put("alternate-exchange", BACKUP_EXCHANGE);

channel.exchangeDeclare(CONFIRM_EXCHANGE, BuiltinExchangeType.DIRECT, true, true, arguments);

在正常的exchange上绑定备份exchange

public class BackupConsumer {

public static final String BACKUP_EXCHANGE = "backup.exchange";

public static final String BACKUP_QUEUE = "backup.queue";

public static final String CONFIRM_EXCHANGE = "confirm.exchange";

public static void main(String[] args) throws Exception {

Scanner scanner = new Scanner(System.in);

ConnectionFactory factory = RabbitUtils.INSTANCE.connectionFactory();

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.exchangeDeclare(BACKUP_EXCHANGE, BuiltinExchangeType.FANOUT, true, true, null);

channel.queueDeclare(BACKUP_QUEUE, true, false, true, null);

channel.queueBind(BACKUP_QUEUE, BACKUP_EXCHANGE, "");

DeliverCallback deliverCallback = (consumerTag, message) -> {

System.err.println("消息保存, 然后问题发现, 解决问题, 最后发送消息...");

scanner.next();

System.err.println("发送消息: " + ", messageId: " + message.getProperties().getMessageId()

+ ", 消息内容: " + new String(message.getBody(), Charset.defaultCharset()));

channel.basicPublish(CONFIRM_EXCHANGE, "", message.getProperties(), message.getBody());

channel.basicAck(message.getEnvelope().getDeliveryTag(), false);

};

CancelCallback cancelCallback = System.err::println;

channel.basicConsume(BACKUP_QUEUE, false, deliverCallback, cancelCallback);

}

}

public class WarningConsumer {

public static final String BACKUP_EXCHANGE = "backup.exchange";

public static final String WARNING_QUEUE = "warning.queue";

public static void main(String[] args) throws Exception {

ConnectionFactory factory = RabbitUtils.INSTANCE.connectionFactory();

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.exchangeDeclare(BACKUP_EXCHANGE, BuiltinExchangeType.FANOUT, true, true, null);

channel.queueDeclare(WARNING_QUEUE, true, false, true, null);

channel.queueBind(WARNING_QUEUE, BACKUP_EXCHANGE, "");

DeliverCallback deliverCallback = (consumerTag, message) -> {

AMQP.BasicProperties properties = message.getProperties();

Integer priority = properties.getPriority();

String messageId = properties.getMessageId();

Date timestamp = properties.getTimestamp();

System.err.println("日志警告, 消息等级: " + priority + ", 消息id: " + messageId

+ ", 消息创建时间: " + DatePattern.NORM_DATETIME_FORMAT.format(timestamp) + ", 消息内容: "

+ new String(message.getBody(), Charset.defaultCharset()));

channel.basicAck(message.getEnvelope().getDeliveryTag(), false);

};

CancelCallback cancelCallback = System.err::println;

channel.basicConsume(WARNING_QUEUE, false, deliverCallback, cancelCallback);

}

}

public class Producer {

public static final String BACKUP_EXCHANGE = "backup.exchange";

public static final String CONFIRM_EXCHANGE = "confirm.exchange";

public static void main(String[] args) throws Exception {

ConnectionFactory factory = RabbitUtils.INSTANCE.connectionFactory();

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

HashMap arguments = new HashMap<>();

// 将exchange无法发送的消息, 转发给备份exchange

arguments.put("alternate-exchange", BACKUP_EXCHANGE);

channel.exchangeDeclare(CONFIRM_EXCHANGE, BuiltinExchangeType.DIRECT, true, true, arguments);

channel.confirmSelect();

channel.addConfirmListener((deliveryTag, multiple) -> {

String dateTime = DatePattern.NORM_DATETIME_FORMAT.format(new Date());

System.err.println(deliveryTag + ": 消息应答" + " '发送'时间为: " + dateTime);

}, (deliveryTag, multiple) -> System.err.println(deliveryTag + ": 消息未得到应答"));

// 消费者提供消息超时功能

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();

builder.contentType("text/plain");

builder.deliveryMode(2);

builder.priority(0);

for (int i = 0; i < 10; i++) {

builder.timestamp(new Date());

builder.messageId(UUID.fastUUID().toString(true));

String message = "content: " + UUID.fastUUID().toString(false) + i;

channel.basicPublish(CONFIRM_EXCHANGE, "",

builder.build(), message.getBytes(Charset.defaultCharset()));

}

}

}

记住, 这里我们的备份消费者并没有做真实的操作, 而是使用

System.err.println("消息保存, 然后问题发现, 解决问题, 最后发送消息...");

scanner.next();

channel.basicPublish(CONFIRM_EXCHANGE, "", message.getProperties(), message.getBody());

模拟了管理员找问题, 解决问题最后将消息转发给正常的exchange的过程

文章来源

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。