1. 简介

异步消息传递是一种松散耦合的分布式通信,在实现事件驱动体系结构方面越来越流行。幸运的是,Spring框架提供了 SpringAMQP 项目,允许我们构建基于AMQP的消息传递解决方案。

另一方面,在此类环境中处理错误可能是一项艰巨的任务。因此,在本教程中,我们将介绍处理错误的不同策略。

2. 环境设置

在本教程中,我们将使用实现 AMQP 标准的RabbitMQ。此外,Spring AMQP提供了spring-rabbit模块,这使得集成变得非常容易。

让我们将 RabbitMQ 作为独立服务器运行。我们将通过执行以下命令在Docker 容器中运行它:

docker run -d -p 5672:5672 -p 15672:15672 --name my-rabbit rabbitmq:3-managementCopy

有关详细的配置和项目依赖项设置,请参阅我们的Spring AMQP文章。

3. 故障场景

通常,与其分布式性质,与单体或单打包应用程序相比,基于消息传递的系统可能发生更多类型的错误。

我们可以指出一些类型的异常:

网络或I/O 相关 – 网络连接和 I/O 操作的常规故障

与协议或基础结构相关的错误,通常表示消息传递基础结构的配置错误

代理相关 – 警告客户端与 AMQP 代理之间配置不正确的故障。例如,达到定义的限制或阈值、身份验证或无效的策略配置

应用程序和消息相关 – 通常表示违反某些业务或应用程序规则的异常

当然,此故障列表并不详尽,但包含最常见的错误类型。

我们应该注意,Spring AMQP开箱即用地处理与连接相关的低级问题,例如通过应用重试或重新排队策略。此外,大多数故障和错误都转换为AmqpException或其子类之一。

在接下来的部分中,我们将主要关注特定于应用程序的错误和高级错误,然后介绍全局错误处理策略。

4. 项目设置

现在,让我们定义一个简单的队列和交换配置来开始:

public static final String QUEUE_MESSAGES = "baeldung-messages-queue";

public static final String EXCHANGE_MESSAGES = "baeldung-messages-exchange";

@Bean

Queue messagesQueue() {

return QueueBuilder.durable(QUEUE_MESSAGES)

.build();

}

@Bean

DirectExchange messagesExchange() {

return new DirectExchange(EXCHANGE_MESSAGES);

}

@Bean

Binding bindingMessages() {

return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);

}Copy

接下来,让我们创建一个简单的生产者:

public void sendMessage() {

rabbitTemplate

.convertAndSend(SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES,

SimpleDLQAmqpConfiguration.QUEUE_MESSAGES, "Some message id:" + messageNumber++);

}Copy

最后,引发异常的使用者:

@RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES)

public void receiveMessage(Message message) throws BusinessException {

throw new BusinessException();

}Copy

默认情况下,所有失败的消息将立即在目标队列的头部一遍又一遍地重新排队。

让我们通过执行下一个 Maven 命令来运行我们的示例应用程序:

mvn spring-boot:run -Dstart-class=com.baeldung.springamqp.errorhandling.ErrorHandlingAppCopy

现在我们应该看到类似的结果输出:

WARN 22260 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :

Execution of Rabbit message listener failed.

Caused by: com.baeldung.springamqp.errorhandling.errorhandler.BusinessException: nullCopy

因此,默认情况下,我们将在输出中看到无限数量的此类消息。

要更改此行为,我们有两个选项:

在侦听器端将默认重新排队拒绝选项设置为false–spring.rabbitmq.listener.simple.default-requeue-rejected=false

抛出一个AmqpRejectAndDontRequeueException –对于将来没有意义的消息,他可能很有用,因此可以丢弃它们。

现在,让我们了解如何以更智能的方式处理失败的消息。

5. 死信队列

死信队列 (DLQ) 是保存未送达或失败邮件的队列。DLQ允许我们处理错误或错误消息,监控故障模式并从系统中的异常中恢复。

更重要的是,这有助于防止队列中的无限循环,这些循环不断处理错误消息并降低系统性能。

总的来说,有两个主要概念:死信交换(DLX)和死信队列(DLQ)本身。实际上,DLX是一种正常的交换,我们可以将其定义为常见类型之一:直接,主题或扇出。

了解生产者对队列一无所知非常重要。它只知道交换,所有生成的消息都根据交换配置和消息路由密钥进行路由。

现在让我们看看如何通过应用死信队列方法来处理异常。

5.1. 基本配置

为了配置 DLQ,我们需要在定义队列时指定其他参数:

@Bean

Queue messagesQueue() {

return QueueBuilder.durable(QUEUE_MESSAGES)

.withArgument("x-dead-letter-exchange", "")

.withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ)

.build();

}

@Bean

Queue deadLetterQueue() {

return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();

}Copy

在上面的示例中,我们使用了两个额外的参数:x-dead-letter-exchange和x-dead-letter-routeing-key。x-dead-letter-exchange选项的空字符串值告诉代理使用默认交换。

第二个参数与为简单消息设置路由密钥同样重要。此选项更改消息的初始路由密钥,以便 DLX 进一步路由。

5.2. 失败的消息路由

因此,当消息无法传递时,它将被路由到死信交换。但正如我们已经指出的,DLX是一种正常的交换。因此,如果失败的邮件路由密钥与交换不匹配,则不会将其传递到 DLQ。

Exchange: (AMQP default)

Routing Key: baeldung-messages-queue.dlqCopy

因此,如果我们在示例中省略x-dead-letter-routeing-key参数,失败的消息将卡在无限重试循环中。

此外,消息的原始元信息可在x-death标头中找到:

x-death:

count: 1

exchange: baeldung-messages-exchange

queue: baeldung-messages-queue

reason: rejected

routing-keys: baeldung-messages-queue

time: 1571232954

Copy

上述信息在 RabbitMQ 管理控制台中提供,该控制台通常在端口 15672 上本地运行。

除了这个配置,如果我们使用Spring Cloud Stream,我们甚至可以通过利用配置属性republishToDlq和autoBindDlq来简化配置过程。

5.3. 死信交换

在上一节中,我们已经看到当消息路由到死信交换时,路由密钥会发生变化。但这种行为并不总是可取的。我们可以通过自己配置 DLX 并使用扇出类型定义它来更改它:

public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";

@Bean

Queue messagesQueue() {

return QueueBuilder.durable(QUEUE_MESSAGES)

.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)

.build();

}

@Bean

FanoutExchange deadLetterExchange() {

return new FanoutExchange(DLX_EXCHANGE_MESSAGES);

}

@Bean

Queue deadLetterQueue() {

return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();

}

@Bean

Binding deadLetterBinding() {

return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());

}Copy

这次我们定义了扇出类型的自定义交换,因此消息将发送到所有有界队列。此外,我们将x-dead-letter-exchange参数的值设置为我们的 DLX 的名称。同时,我们删除了x-dead-letter-routeing-key参数。

现在,如果我们运行示例,失败的消息应该传递到 DLQ,但不更改初始路由密钥:

Exchange: baeldung-messages-queue.dlx

Routing Key: baeldung-messages-queue

Copy

5.4. 处理死信队列消息 

当然,我们之所以将它们移动到死信队列,是为了让它们可以在其他时间重新处理。

让我们为死信队列定义一个侦听器:

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)

public void processFailedMessages(Message message) {

log.info("Received failed message: {}", message.toString());

}Copy

如果我们现在运行我们的代码示例,我们应该看到日志输出:

WARN 11752 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :

Execution of Rabbit message listener failed.

INFO 11752 --- [ntContainer#1-1] c.b.s.e.consumer.SimpleDLQAmqpContainer :

Received failed message:Copy

我们收到了一条失败的消息,但下一步应该怎么做?答案取决于特定的系统要求、异常类型或消息类型。

例如,我们可以将消息重新排队到原始目的地:

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)

public void processFailedMessagesRequeue(Message failedMessage) {

log.info("Received failed message, requeueing: {}", failedMessage.toString());

rabbitTemplate.send(EXCHANGE_MESSAGES,

failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);

}Copy

但此类异常逻辑与默认重试策略并无不同:

INFO 23476 --- [ntContainer#0-1] c.b.s.e.c.RoutingDLQAmqpContainer :

Received message:

WARN 23476 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :

Execution of Rabbit message listener failed.

INFO 23476 --- [ntContainer#1-1] c.b.s.e.c.RoutingDLQAmqpContainer :

Received failed message, requeueing:Copy

常见的策略可能需要重试处理邮件n次,然后拒绝它。让我们通过利用消息标头来实现此策略:

public void processFailedMessagesRetryHeaders(Message failedMessage) {

Integer retriesCnt = (Integer) failedMessage.getMessageProperties()

.getHeaders().get(HEADER_X_RETRIES_COUNT);

if (retriesCnt == null) retriesCnt = 1;

if (retriesCnt > MAX_RETRIES_COUNT) {

log.info("Discarding message");

return;

}

log.info("Retrying message for the {} time", retriesCnt);

failedMessage.getMessageProperties()

.getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);

rabbitTemplate.send(EXCHANGE_MESSAGES,

failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);

}Copy

首先,我们获取x 重试计数标头的值,然后将此值与允许的最大值进行比较。随后,如果计数器达到尝试限制次数,则消息将被丢弃:

WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :

Execution of Rabbit message listener failed.

INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer :

Retrying message for the 1 time

WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :

Execution of Rabbit message listener failed.

INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer :

Retrying message for the 2 time

WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :

Execution of Rabbit message listener failed.

INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer :

Discarding messageCopy

我们应该补充一点,我们还可以利用x-message-ttl标头来设置消息应该被丢弃的时间。这可能有助于防止队列无限增长。

5.5. 停车场排队

另一方面,考虑一种情况,即我们不能只是丢弃一条消息,例如,它可能是银行领域的交易。或者,有时一条消息可能需要手动处理,或者我们只需要记录失败超过n 次的消息。

对于这种情况,有一个停车场队列的概念。我们可以将来自 DLQ 的所有消息(失败次数超过允许的次数)转发到停车场队列进行进一步处理。

现在让我们实现这个想法:

public static final String QUEUE_PARKING_LOT = QUEUE_MESSAGES + ".parking-lot";

public static final String EXCHANGE_PARKING_LOT = QUEUE_MESSAGES + "exchange.parking-lot";

@Bean

FanoutExchange parkingLotExchange() {

return new FanoutExchange(EXCHANGE_PARKING_LOT);

}

@Bean

Queue parkingLotQueue() {

return QueueBuilder.durable(QUEUE_PARKING_LOT).build();

}

@Bean

Binding parkingLotBinding() {

return BindingBuilder.bind(parkingLotQueue()).to(parkingLotExchange());

}Copy

其次,让我们重构侦听器逻辑以向停车场队列发送消息:

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)

public void processFailedMessagesRetryWithParkingLot(Message failedMessage) {

Integer retriesCnt = (Integer) failedMessage.getMessageProperties()

.getHeaders().get(HEADER_X_RETRIES_COUNT);

if (retriesCnt == null) retriesCnt = 1;

if (retriesCnt > MAX_RETRIES_COUNT) {

log.info("Sending message to the parking lot queue");

rabbitTemplate.send(EXCHANGE_PARKING_LOT,

failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);

return;

}

log.info("Retrying message for the {} time", retriesCnt);

failedMessage.getMessageProperties()

.getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);

rabbitTemplate.send(EXCHANGE_MESSAGES,

failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);

}Copy

最终,我们还需要处理到达停车场队列的消息:

@RabbitListener(queues = QUEUE_PARKING_LOT)

public void processParkingLotQueue(Message failedMessage) {

log.info("Received message in parking lot queue");

// Save to DB or send a notification.

}Copy

现在我们可以将失败的消息保存到数据库,或者发送电子邮件通知。

让我们通过运行应用程序来测试此逻辑:

WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :

Execution of Rabbit message listener failed.

INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer :

Retrying message for the 1 time

WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :

Execution of Rabbit message listener failed.

INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer :

Retrying message for the 2 time

WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :

Execution of Rabbit message listener failed.

INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer :

Sending message to the parking lot queue

INFO 14768 --- [ntContainer#2-1] c.b.s.e.c.ParkingLotDLQAmqpContainer :

Received message in parking lot queueCopy

从输出中我们可以看到,在几次失败的尝试之后,消息被发送到停车场队列。

6. 自定义错误处理

在上一节中,我们已经了解了如何使用专用队列和交换处理故障。但是,有时我们可能需要捕获所有错误,例如记录或将它们持久保存到数据库中。

6.1. 全局错误处理程序

到目前为止,我们一直使用默认的SimpleRabbitListenerContainerFactory,而这个工厂默认使用ConditionalRejectingErrorHandler。此处理程序捕获不同的异常,并将它们转换为AmqpException层次结构中的异常之一。

值得一提的是,如果我们需要处理连接错误,那么我们需要实现ApplicationListener接口。

简单地说,ConditionalRejectingErrorHandler决定是否拒绝特定消息。当导致异常的邮件被拒绝时,它不会重新排队。

让我们定义一个自定义的错误处理程序,它只会简单地重新排队BusinessException:

public class CustomErrorHandler implements ErrorHandler {

@Override

public void handleError(Throwable t) {

if (!(t.getCause() instanceof BusinessException)) {

throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", t);

}

}

}Copy

此外,当我们在侦听器方法中抛出异常时,它被包装在ListenerExecutionFailedException 中。因此,我们需要调用getCause方法来获取源异常。

6.2.致命异常策略

在后台,此处理程序使用FatalExceptionStrategy来检查异常是否应被视为致命异常。如果是这样,失败的邮件将被拒绝。

默认情况下,这些异常是致命的:

消息转换异常

消息转换异常

MethodArgumentNotValidException

方法参数类型不匹配异常

NoSuchMethodException

类投射异常

与其实现ErrorHandler接口,我们可以只提供我们的FatalExceptionStrategy:

public class CustomFatalExceptionStrategy

extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {

@Override

public boolean isFatal(Throwable t) {

return !(t.getCause() instanceof BusinessException);

}

}Copy

最后,我们需要将自定义策略传递给ConditionalRejectingErrorHandler构造函数:

@Bean

public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(

ConnectionFactory connectionFactory,

SimpleRabbitListenerContainerFactoryConfigurer configurer) {

SimpleRabbitListenerContainerFactory factory =

new SimpleRabbitListenerContainerFactory();

configurer.configure(factory, connectionFactory);

factory.setErrorHandler(errorHandler());

return factory;

}

@Bean

public ErrorHandler errorHandler() {

return new ConditionalRejectingErrorHandler(customExceptionStrategy());

}

@Bean

FatalExceptionStrategy customExceptionStrategy() {

return new CustomFatalExceptionStrategy();

}Copy

推荐阅读

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。