返回

RocketMQ Broker 停写功能源码剖析

后端

如何让 RocketMQ broker 平滑停写?——源码解析

在进行 RocketMQ 集群平滑升级时,无损升级的最佳实践是:

  1. 启动新 broker。
  2. 停止旧 broker 的写操作。
  3. 旧 broker 消费完所有消息(包括延时消息)后下线。

本文将深入剖析 RocketMQ broker 的停写功能源码,探究其实现原理和具体流程。

背景

当我们需要平滑升级 broker 时,采用无损升级策略可以最大程度地保障消息的可靠性和业务连续性。具体步骤如下:

  1. 启动新 broker,让其承担新消息的写入和消费。
  2. 停止旧 broker 的写操作,使其不再接收新的消息。
  3. 旧 broker 消费完所有积压的消息后,包括延时消息在内,即可安全下线。

其中,第二步——停止旧 broker 的写操作至关重要。本文将重点分析 RocketMQ 如何实现这一功能。

源码解析

RocketMQ 中,broker 停写功能主要由 SuspendFlag 类实现。SuspendFlag 是一个布尔值,当其为 true 时,表示 broker 处于停写状态,不再接收新的消息。

DefaultMessageStore 类中,有一个 suspend() 方法,用于设置 SuspendFlagtrue,从而使 broker 进入停写状态。

public void suspend() {
    this.suspendFlag.set(true);
}

SuspendFlagtrue 时,消息写入请求会被拒绝,并且会抛出 ServiceNotAvailableException 异常。

if (this.suspendFlag.get()) {
    throw new ServiceNotAvailableException(
        "Service Not Available, maybe the broker is suspending");
}

停写流程

RocketMQ broker 的停写流程如下:

  1. 客户端触发停写: 当客户端向 broker 发送消息时,如果 broker 处于停写状态,客户端会收到 ServiceNotAvailableException 异常。
  2. 客户端重试: 客户端会根据异常提示,重试发送消息。
  3. broker 拒绝重试: 如果 broker 仍处于停写状态,会继续拒绝重试的消息。
  4. 客户端标记失败: 经过多次重试失败后,客户端会将消息标记为失败,并尝试发送到其他 broker。
  5. broker 恢复写操作: 当旧 broker 消费完所有消息后,会将 SuspendFlag 重置为 false,恢复写操作。

优势

RocketMQ broker 的停写功能具有以下优势:

  • 无损升级: 通过停写旧 broker,可以确保旧 broker 上的所有消息都被消费完,避免消息丢失。
  • 平滑升级: 在停写旧 broker 期间,新 broker 可以继续接收和消费消息,不会影响业务。
  • 可控下线: 旧 broker 在消费完所有消息后才下线,可以避免因消息未消费完而导致的异常。