返回

RocketMQ 源码分析:消息生产者(二)负载均衡

后端

导言

RocketMQ 是一个分布式消息队列系统,广泛用于海量数据的异步解耦和通信。消息生产者负责将消息发送到消息队列中,以实现数据的可靠传递。消息生产者负载均衡是 RocketMQ 中一个关键的特性,它确保消息均匀分布到不同的消息队列中,从而提升吞吐量和可用性。

消息队列选择

在普通场景下,消息发送时选择消息队列遵循以下原则:

  • 轮询: 消息依次发送到不同的消息队列,确保消息均匀分布。
  • 哈希: 根据消息的 Hash 值选择消息队列,保证相同消息总是发送到同一个消息队列,从而实现消息有序性。

开启消息发送延迟故障的场景

如果开启了消息发送延迟故障特性,消息生产者在选择消息队列时会考虑以下因素:

  • 消息延迟级别: RocketMQ 支持三种消息延迟级别:无延迟、延时 5s、延时 10s。
  • 消息队列的延迟队列数量: 每个消息队列都有一个延迟队列,用于存储延迟消息。

具体选择规则如下:

  • 如果消息没有设置延迟,则使用普通场景的消息队列选择策略。
  • 如果消息设置了延迟,则将消息发送到延迟队列数量与消息延迟级别相同的队列中。

代码分析

RocketMQ 中负责消息队列选择的核心代码位于 DefaultMQProducerImpl.java 中的 selectMessageQueue 方法:

public MessageQueue selectMessageQueue(TopicPublishInfo topicPublishInfo) {
    if (topicPublishInfo.getMq() != null) {
        return topicPublishInfo.getMq();
    }

    List<MessageQueue> messageQueueList = getTopicRouteData(topicPublishInfo.getTopicName()).getQueueDatas();
    MessageQueue mq = randomFromList(messageQueueList);
    if (topicPublishInfo.isDelayTimeSet()) {
        int delayLevel = topicPublishInfo.getDelayTimeLevel();
        mq = selectOneMessageQueueOfDelayLevel(mq, delayLevel);
    }
    return mq;
}
  • topicPublishInfo 中包含了消息发布相关信息,包括目标主题、消息、延迟级别等。
  • getTopicRouteData 方法获取主题的路由数据,其中包括所有可用消息队列信息。
  • randomFromList 方法从消息队列列表中随机选择一个消息队列。
  • 如果开启了消息发送延迟故障特性,selectOneMessageQueueOfDelayLevel 方法会根据消息延迟级别选择一个合适的延迟队列。

总结

RocketMQ 的消息生产者负载均衡策略通过轮询和哈希的方式确保消息均匀分布,同时考虑消息延迟级别,将延迟消息发送到相应的延迟队列中。这种设计提升了 RocketMQ 的吞吐量和可用性,满足了不同场景下的消息发送需求。