返回
RocketMQ Broker 停写功能源码剖析
后端
2023-11-10 20:10:41
如何让 RocketMQ broker 平滑停写?——源码解析
在进行 RocketMQ 集群平滑升级时,无损升级的最佳实践是:
- 启动新 broker。
- 停止旧 broker 的写操作。
- 旧 broker 消费完所有消息(包括延时消息)后下线。
本文将深入剖析 RocketMQ broker 的停写功能源码,探究其实现原理和具体流程。
背景
当我们需要平滑升级 broker 时,采用无损升级策略可以最大程度地保障消息的可靠性和业务连续性。具体步骤如下:
- 启动新 broker,让其承担新消息的写入和消费。
- 停止旧 broker 的写操作,使其不再接收新的消息。
- 旧 broker 消费完所有积压的消息后,包括延时消息在内,即可安全下线。
其中,第二步——停止旧 broker 的写操作至关重要。本文将重点分析 RocketMQ 如何实现这一功能。
源码解析
RocketMQ 中,broker 停写功能主要由 SuspendFlag
类实现。SuspendFlag
是一个布尔值,当其为 true
时,表示 broker 处于停写状态,不再接收新的消息。
在 DefaultMessageStore
类中,有一个 suspend()
方法,用于设置 SuspendFlag
为 true
,从而使 broker 进入停写状态。
public void suspend() {
this.suspendFlag.set(true);
}
当 SuspendFlag
为 true
时,消息写入请求会被拒绝,并且会抛出 ServiceNotAvailableException
异常。
if (this.suspendFlag.get()) {
throw new ServiceNotAvailableException(
"Service Not Available, maybe the broker is suspending");
}
停写流程
RocketMQ broker 的停写流程如下:
- 客户端触发停写: 当客户端向 broker 发送消息时,如果 broker 处于停写状态,客户端会收到
ServiceNotAvailableException
异常。 - 客户端重试: 客户端会根据异常提示,重试发送消息。
- broker 拒绝重试: 如果 broker 仍处于停写状态,会继续拒绝重试的消息。
- 客户端标记失败: 经过多次重试失败后,客户端会将消息标记为失败,并尝试发送到其他 broker。
- broker 恢复写操作: 当旧 broker 消费完所有消息后,会将
SuspendFlag
重置为false
,恢复写操作。
优势
RocketMQ broker 的停写功能具有以下优势:
- 无损升级: 通过停写旧 broker,可以确保旧 broker 上的所有消息都被消费完,避免消息丢失。
- 平滑升级: 在停写旧 broker 期间,新 broker 可以继续接收和消费消息,不会影响业务。
- 可控下线: 旧 broker 在消费完所有消息后才下线,可以避免因消息未消费完而导致的异常。