返回

Pull模式细节分享:剖析RocketMQ拉取消息调度机制

后端


#


大家好,我是程序员大壮 。欢迎阅读「底层源码挖掘系列 」的第N篇文章。在这篇文章中,我将带大家深入剖析RocketMQ拉取消息调度机制



前言

在上一篇文章中,我们探讨了RocketMQ 推模式消息调度 的实现机制。本篇文章,我们将继续深入 Pull模式消息调度 的实现细节。拉取模式下,消费者需要主动从消息队列中拉取消息,并进行消费处理。这种方式虽然增加了应用层的编码复杂度,但可以提供更高的灵活性。


Pull模式的优点

相比于Push模式,Pull模式主要有以下几个优点:

  1. 灵活性高: 消费者可以根据自己的实际需求和处理能力,主动控制拉取消息的频率和数量。
  2. 减少服务器压力: 消费者只有在需要的时候才会向服务器发送拉取请求,从而减少了服务器的压力。
  3. 提高吞吐量: 当消息数量较大时,Pull模式可以避免服务器端由于积压的消息过多而导致的性能下降。

Pull模式的实现原理

在RocketMQ中,Pull模式的消息调度主要通过 Consumer 模块实现。消费者会周期性地向Broker发送拉取消息请求。Broker收到请求后,会根据消费者的 订阅关系消费进度 ,将符合条件的消息返回给消费者。


代码分析

public class PullConsumer {

    private MQClientInstance mqClientInstance;
    private PullMessageService pullMessageService;

    public PullConsumer(MQClientInstance mqClientInstance) {
        this.mqClientInstance = mqClientInstance;
        this.pullMessageService = new PullMessageService(mqClientInstance);
    }

    public void start() {
        this.pullMessageService.start();
    }

    public void shutdown() {
        this.pullMessageService.shutdown();
    }

    public PullResult pullMessage(MessageQueue messageQueue, long offset, int maxNums) {
        return this.pullMessageService.pullMessage(messageQueue, offset, maxNums);
    }

    public long fetchConsumeOffset(MessageQueue messageQueue) {
        return this.pullMessageService.fetchConsumeOffset(messageQueue);
    }

    public void updateConsumeOffset(MessageQueue messageQueue, long offset) {
        this.pullMessageService.updateConsumeOffset(messageQueue, offset);
    }
}
public class PullMessageService {

    private MQClientInstance mqClientInstance;
    private ConcurrentHashMap<MessageQueue, Long> offsetTable = new ConcurrentHashMap<>();

    public PullMessageService(MQClientInstance mqClientInstance) {
        this.mqClientInstance = mqClientInstance;
    }

    public void start() {
        for (MessageQueue messageQueue : mqClientInstance.getTopicRouteInfoTable().values()) {
            this.offsetTable.put(messageQueue, 0L);
        }

        this.pullMessageThread = new Thread(this::pullMessageLoop);
        this.pullMessageThread.setName("PullMessageService-Thread");
        this.pullMessageThread.start();
    }

    public void shutdown() {
        this.pullMessageThread.interrupt();
    }

    private void pullMessageLoop() {
        while (!this.pullMessageThread.isInterrupted()) {
            try {
                for (MessageQueue messageQueue : this.offsetTable.keySet()) {
                    long offset = this.offsetTable.get(messageQueue);
                    PullResult pullResult = this.mqClientInstance.getMQClientAPIImpl().pullMessage(
                            messageQueue,
                            offset,
                            32
                    );
                    this.offsetTable.put(messageQueue, pullResult.getNextBeginOffset());
                    // 处理消息
                    for (MessageExt messageExt : pullResult.getPullResultList()) {
                        // TODO: 处理消息
                    }
                }
            } catch (Exception e) {
                // TODO: 处理异常
            }
        }
    }

    public PullResult pullMessage(MessageQueue messageQueue, long offset, int maxNums) {
        return this.mqClientInstance.getMQClientAPIImpl().pullMessage(
                messageQueue,
                offset,
                maxNums
        );
    }

    public long fetchConsumeOffset(MessageQueue messageQueue) {
        return this.offsetTable.get(messageQueue);
    }

    public void updateConsumeOffset(MessageQueue messageQueue, long offset) {
        this.offsetTable.put(messageQueue, offset);
    }
}

总结

通过以上分析,我们对RocketMQ的Pull模式消息调度机制有了更深入的了解。这种模式虽然增加了应用层的编码复杂度,但可以提供更高的灵活性。希望这篇文章能够帮助大家更好地理解RocketMQ的消息调度机制。


参考资料

  1. RocketMQ文档
  2. RocketMQ源码

版权声明

本文章由程序员大壮原创,采用 知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行授权。