返回
Pull模式细节分享:剖析RocketMQ拉取消息调度机制
后端
2023-12-06 05:06:33
#
大家好,我是程序员大壮 。欢迎阅读「底层源码挖掘系列 」的第N篇文章。在这篇文章中,我将带大家深入剖析RocketMQ拉取消息调度机制 。
前言
在上一篇文章中,我们探讨了RocketMQ 推模式消息调度 的实现机制。本篇文章,我们将继续深入 Pull模式消息调度 的实现细节。拉取模式下,消费者需要主动从消息队列中拉取消息,并进行消费处理。这种方式虽然增加了应用层的编码复杂度,但可以提供更高的灵活性。
Pull模式的优点
相比于Push模式,Pull模式主要有以下几个优点:
- 灵活性高: 消费者可以根据自己的实际需求和处理能力,主动控制拉取消息的频率和数量。
- 减少服务器压力: 消费者只有在需要的时候才会向服务器发送拉取请求,从而减少了服务器的压力。
- 提高吞吐量: 当消息数量较大时,Pull模式可以避免服务器端由于积压的消息过多而导致的性能下降。
Pull模式的实现原理
在RocketMQ中,Pull模式的消息调度主要通过 Consumer 模块实现。消费者会周期性地向Broker发送拉取消息请求。Broker收到请求后,会根据消费者的 订阅关系 和 消费进度 ,将符合条件的消息返回给消费者。
代码分析
public class PullConsumer {
private MQClientInstance mqClientInstance;
private PullMessageService pullMessageService;
public PullConsumer(MQClientInstance mqClientInstance) {
this.mqClientInstance = mqClientInstance;
this.pullMessageService = new PullMessageService(mqClientInstance);
}
public void start() {
this.pullMessageService.start();
}
public void shutdown() {
this.pullMessageService.shutdown();
}
public PullResult pullMessage(MessageQueue messageQueue, long offset, int maxNums) {
return this.pullMessageService.pullMessage(messageQueue, offset, maxNums);
}
public long fetchConsumeOffset(MessageQueue messageQueue) {
return this.pullMessageService.fetchConsumeOffset(messageQueue);
}
public void updateConsumeOffset(MessageQueue messageQueue, long offset) {
this.pullMessageService.updateConsumeOffset(messageQueue, offset);
}
}
public class PullMessageService {
private MQClientInstance mqClientInstance;
private ConcurrentHashMap<MessageQueue, Long> offsetTable = new ConcurrentHashMap<>();
public PullMessageService(MQClientInstance mqClientInstance) {
this.mqClientInstance = mqClientInstance;
}
public void start() {
for (MessageQueue messageQueue : mqClientInstance.getTopicRouteInfoTable().values()) {
this.offsetTable.put(messageQueue, 0L);
}
this.pullMessageThread = new Thread(this::pullMessageLoop);
this.pullMessageThread.setName("PullMessageService-Thread");
this.pullMessageThread.start();
}
public void shutdown() {
this.pullMessageThread.interrupt();
}
private void pullMessageLoop() {
while (!this.pullMessageThread.isInterrupted()) {
try {
for (MessageQueue messageQueue : this.offsetTable.keySet()) {
long offset = this.offsetTable.get(messageQueue);
PullResult pullResult = this.mqClientInstance.getMQClientAPIImpl().pullMessage(
messageQueue,
offset,
32
);
this.offsetTable.put(messageQueue, pullResult.getNextBeginOffset());
// 处理消息
for (MessageExt messageExt : pullResult.getPullResultList()) {
// TODO: 处理消息
}
}
} catch (Exception e) {
// TODO: 处理异常
}
}
}
public PullResult pullMessage(MessageQueue messageQueue, long offset, int maxNums) {
return this.mqClientInstance.getMQClientAPIImpl().pullMessage(
messageQueue,
offset,
maxNums
);
}
public long fetchConsumeOffset(MessageQueue messageQueue) {
return this.offsetTable.get(messageQueue);
}
public void updateConsumeOffset(MessageQueue messageQueue, long offset) {
this.offsetTable.put(messageQueue, offset);
}
}
总结
通过以上分析,我们对RocketMQ的Pull模式消息调度机制有了更深入的了解。这种模式虽然增加了应用层的编码复杂度,但可以提供更高的灵活性。希望这篇文章能够帮助大家更好地理解RocketMQ的消息调度机制。
参考资料
版权声明
本文章由程序员大壮原创,采用 知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行授权。