0%

RabbitMQ高级篇

  • 可靠性
  • 业务幂等性
  • 延迟消息

二、高级篇

  • 消息队列存在可靠性问题,即存在消息丢失的可能性:

    • 消息从生产者发送消息,到消费者处理消息,需要经过的流程是这样的:

      消息从生产者到消费者的每一步都可能导致消息丢失:

      • 发送消息时丢失:
        • 生产者发送消息时连接 MQ 失败
        • 生产者发送消息到达 MQ 后未找到 Exchange
        • 生产者发送消息到达 MQ 的 Exchange 后,未找到合适的 Queue
        • 消息到达 MQ 后,处理消息的进程发生异常
      • MQ 导致消息丢失:
        • 消息到达 MQ,保存到队列后,尚未消费就突然宕机
      • 消费者处理消息时:
        • 消息接收后尚未处理突然宕机
        • 消息接收后处理过程中抛出异常
    • 综上所述,我们要解决消息丢失问题,保证 MQ 的可靠性,就必须从 3 个方面入手:

      • 确保生产者一定把消息发送到 MQ
      • 确保 MQ 不会将消息弄丢
      • 确保消费者一定要处理消息

2.1 生产者的可靠性

2.2.1 ★生产者重连机制

  • 首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。

    • 为了解决这个问题,SpringAMQP 提供了消息发送时的重试机制。即:当 RabbitTemplate 与 MQ 连接超时后,多次重试:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      spring:
      rabbitmq:
      connection-timeout: 1s # 设置MQ的连接超时时间
      template:
      retry:
      enabled: true # 开启超时重试机制
      initial-interval: 1000ms # 失败后的初始等待时间
      multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
      max-attempts: 3 # 最大重试次数
      • 注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过 SpringAMQP 提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

2.2.2 ★生产者确认机制

  • 一般情况下,只要生产者与 MQ 之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。不过,在少数情况下,也会出现消息发送到 MQ 之后丢失的现象,比如:

    • MQ 内部处理消息的进程发生了异常
    • 生产者发送消息到达 MQ 后未找到 Exchange
    • 生产者发送消息到达 MQ 的 Exchange 后,未找到合适的 Queue,因此无法路由

    针对上述情况,RabbitMQ 提供了生产者消息确认机制,包括 Publisher ConfirmPublisher Return 两种。在开启确认机制的情况下,当生产者发送消息给 MQ 后,MQ 会根据消息处理的情况返回不同的回执。具体如图所示:

    • 总结如下:

      • 消息投递到 MQ,但是路由失败时,通过 Publisher Return 返回异常信息,同时返回 ACK 的确认信息,代表投递成功

      • 临时消息投递到了 MQ,并且入队成功,返回 ACK,告知投递成功

        持久消息投递到了 MQ,并且入队完成持久化,返回 ACK,告知投递成功

      • 其它情况都会返回 NACK,告知投递失败

      其中 acknack 属于 Publisher Confirm 机制,ack 是投递成功,nack 是投递失败。

      return 则属于 Publisher Return 机制。默认两种机制都是关闭状态,需要通过配置文件来开启。

2.2.3 生产者确认代码实现

  1. 在 publisher 微服务的 application.yml 中添加配置:

    1
    2
    3
    4
    spring:
    rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制
    • 这里 publisher-confirm-type 有三种模式可选:

      • none:关闭 confirm 机制
      • simple:同步阻塞等待 MQ 的回执
      • correlated:MQ 异步回调返回回执

      一般推荐使用 correlated,异步回调机制。

  2. 每个 RabbitTemplate 只能配置一个 ReturnCallback,因此我们可以在配置类中统一设置。我们在 publisher 模块定义一个配置类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    @Slf4j
    @AllArgsConstructor
    @Configuration
    public class MqConfig {
    private final RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
    rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
    @Override
    public void returnedMessage(ReturnedMessage returned) {
    log.error("触发return callback,");
    log.debug("exchange: {}", returned.getExchange());
    log.debug("routingKey: {}", returned.getRoutingKey());
    log.debug("message: {}", returned.getMessage());
    log.debug("replyCode: {}", returned.getReplyCode());
    log.debug("replyText: {}", returned.getReplyText());
    }
    });
    }
    }
  3. 由于每个消息发送时的处理逻辑不一定相同,因此 ConfirmCallback 需要在每次发消息时定义。具体来说,是在调用 RabbitTemplate 中的 convertAndSend 方法时,多传递一个参数:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    @Test
    void testPublisherConfirm() {
    // 1.创建CorrelationData
    CorrelationData cd = new CorrelationData();
    // 2.给Future添加ConfirmCallback
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
    @Override
    public void onFailure(Throwable ex) {
    // 2.1.Future发生异常时的处理逻辑,基本不会触发
    log.error("send message fail", ex);
    }
    @Override
    public void onSuccess(CorrelationData.Confirm result) {
    // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
    if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false代表nack回执
    log.debug("发送消息成功,收到 ack!");
    }else{ // result.getReason(),String类型,返回nack时的异常描述
    log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
    }
    }
    });
    // 3.发送消息
    rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
    }
    • 这里的 CorrelationData 中包含两个核心的东西:
      • id:消息的唯一标示,MQ 对不同的消息的回执以此做判断,避免混淆
      • SettableListenableFuture:回执结果的 Future 对象
        • 将来 MQ 的回执就会通过这个 Future 对象来返回,我们可以提前给 CorrelationData 中的 Future 添加回调函数来处理消息回执。

    通过测试可以知道,如果传递的 RoutingKey 是错误的,路由失败后,会触发 return callback,同时也收到了 ack。当修改为正确的 RoutingKey 以后,就不会触发 return callback 了,而是只收到 ack。而如果连交换机都是错误的,则只会收到 nack

    • 注意:开启生产者确认比较消耗 MQ 性能,一般不建议开启。而且大家思考一下触发确认的几种情况:
      • 路由失败:一般是因为 RoutingKey 错误导致,往往是编程导致
      • 交换机名称错误:同样是编程错误导致
      • MQ 内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启 ConfirmCallback 处理 nack 就可以了,对于 nack 消息可以有限次数重试,依然失败则记录异常消息。

2.2 MQ的可靠性

  • 在默认情况下,RabbitMQ 会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:
    • 一旦 MQ 宕机,内存中的消息会丢失 —> 数据持久化
    • 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发 MQ 阻塞 —> Lazy Queue

2.2.1 数据持久化

  • RabbitMQ 实现数据持久化包括 3 个方面:

    • 交换机持久化
    • 队列持久化
    • 消息持久化

    说明:在开启持久化机制以后,如果同时还开启了生产者确认,那么 MQ 会在消息持久化以后才发送 ACK 回执,进一步确保消息的可靠性。不过出于性能考虑,为了减少 IO 次数,发送到 MQ 的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在 100 毫秒左右,这就会导致 ACK 有一定的延迟,因此建议生产者确认全部采用异步方式。

2.2.2 Lazy Queue

  • 在默认情况下,RabbitMQ 会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:

    • 消费者宕机或出现网络故障
    • 消息发送量激增,超过了消费者处理速度
    • 消费者处理业务发生阻塞

    一旦出现消息堆积问题,RabbitMQ 的内存占用就会越来越高,直到触发内存预警上限。此时 RabbitMQ 会将内存消息刷到磁盘上,这个行为称为 PageOutPageOut 会耗费一段时间,并且会阻塞队列进程。因此在这个过程中 RabbitMQ 不会再处理新的消息,生产者的所有请求都会被阻塞。

    • 为了解决这个问题,从 RabbitMQ 的 3.6.0 版本开始,就增加了 Lazy Queue 的模式,也就是惰性队列。惰性队列的特征如下:

      • 接收到消息后直接存入磁盘而非内存
      • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
      • 支持数百万条的消息存储

      而在 3.12 版本之后,LazyQueue 已经成为所有队列的默认格式,无法更改。

  • 基于Bean的方式声明 Lazy Queue:

    1
    2
    3
    4
    5
    6
    7
    @Bean
    public Queue lazyQueue(){
    return QueueBuilder
    .durable("lazy.queue")
    .lazy() // 开启Lazy模式
    .build();
    }
  • 基于注解的方式声明 Lazy Queue:

    1
    2
    3
    4
    5
    6
    7
    8
    @RabbitListener(queuesToDeclare = @Queue(
    name = "lazy.queue",
    durable = "true",
    arguments = @Argument(name = "x-queue-mode", value = "lazy")
    ))
    public void listenLazyQueue(String msg){
    log.info("接收到 lazy.queue的消息:{}", msg);
    }

2.3 消费者的可靠性

  • 当 RabbitMQ 向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:

    • 消息投递的过程中出现了网络故障
    • 消费者接收到消息后突然宕机
    • 消费者接收到消息后,因处理不当导致异常

    一旦发生上述情况,消息也会丢失。因此,RabbitMQ 必须知道消费者的处理状态,一旦消息处理失败才能重新投递消息。但问题来了:RabbitMQ 如何得知消费者的处理状态呢?

2.3.1 ★消费者确认机制

  • 为了确认消费者是否成功处理消息,RabbitMQ 提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 自己消息处理状态。回执有三种可选值:

    • ack:成功处理消息,RabbitMQ 从队列中删除该消息
    • nack:消息处理失败,RabbitMQ 需要再次投递消息
    • reject:消息处理失败并拒绝该消息,RabbitMQ 从队列中删除该消息
  • SpringAMQP 已经实现了消息确认功能。并允许我们通过配置文件选择 ACK 处理方式,有三种方式:

    • none:不处理。即消息投递给消费者后立刻 ack,消息会立刻从 MQ 删除。非常不安全,不建议使用
    • manual:手动模式。需要自己在业务代码中调用 api,发送 ackreject,存在业务入侵,但更灵活
    • auto:自动模式。SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ack. 当业务出现异常时,根据异常判断返回不同结果:
      • 如果是业务异常,会自动返回 nack
      • 如果是消息处理或校验异常,自动返回 reject
    1
    2
    3
    4
    5
    spring:
    rabbitmq:
    listener:
    simple:
    acknowledge-mode: auto

2.3.2 ★失败重试机制

  • 当消费者出现异常后,消息会不断 requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次 requeue 到队列,再次投递,直到消息处理成功为止。极端情况就是消费者一直无法执行成功,那么消息 requeue 就会无限循环,导致 mq 的消息处理飙升,带来不必要的压力。

    • 当然,上述极端情况发生的概率还是非常低的,不过不怕一万就怕万一。为了应对上述情况 Spring 又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的 requeue 到 mq 队列。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    spring:
    rabbitmq:
    listener:
    simple:
    acknowledge-mode: auto
    retry:
    enabled: true # 开启消费者失败重试
    initial-interval: 1000ms # 初识的失败等待时长为1秒
    multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
    max-attempts: 3 # 最大重试次数
    stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
    • 测试发现:

      • 消费者在失败后消息没有重新回到 MQ 无限重新投递,而是在本地重试了 3 次

      • 本地重试 3 次以后,抛出了 AmqpRejectAndDontRequeueException 异常(默认)。查看 RabbitMQ 控制台,发现消息被删除了,说明最后 SpringAMQP 返回的是 reject

        • 在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有 MessageRecovery 接口来处理,它包含三种不同的实现:

          • RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢弃消息。默认就是这种方式

          • ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack,消息重新入队

          • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

            比较合适的处理方案,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。代码实现如下:

            1. 在 consumer 服务中定义处理失败消息的交换机和队列:

              1
              2
              3
              4
              5
              6
              7
              8
              9
              10
              11
              12
              @Bean
              public DirectExchange errorMessageExchange(){
              return new DirectExchange("error.direct");
              }
              @Bean
              public Queue errorQueue(){
              return new Queue("error.queue", true);
              }
              @Bean
              public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
              return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");// error是routingKey
              }
            2. 定义一个 RepublishMessageRecoverer,关联队列和交换机:

              1
              2
              3
              4
              @Bean
              public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
              return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
              }
  • 结论:

    • 开启本地重试时,消息处理过程中抛出异常,不会 requeue 到队列,而是在消费者本地重试。
    • 重试达到最大次数后,根据 MessageRecovery 接口,对重试耗尽后的消息进行相应的处理。

2.4 业务幂等性

  • 何为幂等性?幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),例如求绝对值函数。在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:

    • 根据 id 删除数据
    • 查询数据
    • 新增数据

    数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

    • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
    • 退款业务。重复退款对商家而言会有经济损失。

    所以,我们要尽可能避免业务被重复执行。然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:

    • 页面卡顿时频繁刷新导致表单重复提交
    • 服务间调用的重试
    • MQ 消息的重复投递

    我们在用户支付成功后会发送 MQ 消息到交易服务,修改订单状态为已支付,就可能出现消息重复投递的情况。如果消费者不做判断,很有可能导致消息被消费多次,出现业务故障。举例:

    1. 假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。
    2. 由于某种原因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给交易服务。
    3. 但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。
    4. 退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。

    因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:

    • 唯一消息 ID
    • 业务状态判断

2.4.1 唯一消息ID

  • 这个思路非常简单:

    1. 每一条消息都生成一个唯一的 id,与消息一起投递给消费者
    2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息 ID 保存到数据库
    3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理

    该如何给消息添加唯一 ID 呢?其实很简单,SpringAMQP 的 MessageConverter 自带了 MessageID 的功能,我们只要开启这个功能即可。以 Jackson 的消息转换器为例:

    1
    2
    3
    4
    5
    6
    7
    8
    @Bean
    public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
    }

2.4.2 业务状态判断

  • 业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。

    • 相比较而言,消息 ID 的方案需要改造原有的数据库,且有数据库的 IO 操作,性能会下降,所以更推荐使用业务判断的方案。
  • 综上,支付服务与交易服务之间的订单状态一致性是如何保证的

    • 首先,支付服务会在用户支付成功以后利用 MQ 消息通知交易服务,完成订单状态同步。
    • 其次,为了保证 MQ 消息的可靠性,我们采用了生产者确认机制、消费者确认机制、消费者失败重试等策略,确保消息投递的可靠性。
    • 最后,我们还在交易服务设置了定时任务,交易服务定期主动查询支付服务中的订单支付状态。这样即便 MQ 通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

2.5 延迟消息

2.5.1 概念

  • 延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
  • 延迟任务:设置在一定时间之后才执行的任务。
    • 用延迟消息来完成延迟任务。

2.5.2 ★死信交换机

  • 当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter)

    • 消费者使用 basic.rejectbasic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
    • 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
    • 要投递的队列消息堆积满了,最早的消息可能成为死信

    如果一个队列中的消息已经成为死信,并且这个队列通过 dead-letter-exchange 属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange,DLX)。而此时假如有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

    死信交换机的作用:

    1. 收集那些因处理失败而被拒绝的消息
    2. 收集那些因队列满了而被拒绝的消息
    3. 收集因 TTL(有效期)到期的消息
  • 利用死信交换机,我们就可以实现延迟消息。

2.5.3 延迟消息插件

  • 基于死信队列虽然可以实现延迟消息,但是太麻烦了。RabbitMQ 的官方推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。

    • 直接看视频 P28
    • 推荐用这种方式来实现延迟消息

    注意:延迟消息插件内部会维护一个本地数据库表,同时使用 Elang Timers 功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的 CPU 开销,同时延迟消息的时间会存在误差。因此,不建议设置延迟时间过长的延迟消息

---------------The End---------------