返回

揭秘RocketMQ路由中心NameServer源码

后端

RocketMQ作为业界领先的消息队列中间件,在金融、电商、物流等领域广泛应用。其路由中心NameServer负责协调和管理整个集群,是RocketMQ的核心组件之一。深入理解NameServer的源码,可以帮助我们更好地掌握RocketMQ的运行机制和特性。

NameServer的功能和作用

NameServer是RocketMQ的路由中心,主要负责以下功能:

  • 集群管理: 负责整个RocketMQ集群的管理,维护集群中所有Broker的信息,包括Broker的地址、端口、存储容量等。

  • 路由服务: 为生产者和消费者提供路由服务,根据Topic信息帮助生产者找到对应的Broker,并帮助消费者找到对应的消息队列。

  • 负载均衡: 通过合理的路由策略,将消息均匀地分配到不同的Broker上,实现负载均衡。

  • 高可用: NameServer采用主备模式,当主NameServer发生故障时,备NameServer会自动切换为主NameServer,保证集群的高可用性。

NameServer的实现

NameServer的实现主要包括以下几个方面:

  • 数据结构: NameServer内部使用了一个名为“TopicRouteData”的数据结构来存储Topic信息,其中包括Topic的名称、队列数量、Broker地址、读写权限等信息。

  • 路由算法: NameServer采用轮询算法来进行路由,即根据Topic的名称依次从TopicRouteData中获取Broker地址,直到找到一个可用的Broker。

  • 负载均衡: NameServer通过维护每个Broker的负载情况,并将消息均匀地分配到不同的Broker上,实现负载均衡。

  • 高可用: NameServer采用主备模式,当主NameServer发生故障时,备NameServer会自动切换为主NameServer,保证集群的高可用性。

源码分析

为了更深入地理解NameServer的实现,我们来看一下其源码。

1. 启动类

NameServer的启动类是org.apache.rocketmq.namesrv.NamesrvStartup,其main方法负责启动NameServer。

public static void main(String[] args) {
    try {
        NamesrvStartup.main0(args);
    } catch (Throwable e) {
        System.err.println("The Name Server Boot Failure");
        e.printStackTrace();
    }
}

2. 配置文件

NameServer的配置文件位于conf/namesrv.properties,主要配置项包括:

namesrv.kvConfigPath=/Users/apple/rocketmq-data/kvConfig
#Name Server存储Topic信息
namesrv.configStorePath=/Users/apple/rocketmq-data/configStore
#Name Server的IP地址
namesrv.address=127.0.0.1:9876

3. 数据结构

NameServer内部使用了一个名为“TopicRouteData”的数据结构来存储Topic信息,其中包括Topic的名称、队列数量、Broker地址、读写权限等信息。

public class TopicRouteData {

    private String topic;

    private List<QueueData> queueDatas;

    private List<BrokerData> brokerDatas;

    private int filterServerTable;

}

4. 路由算法

NameServer采用轮询算法来进行路由,即根据Topic的名称依次从TopicRouteData中获取Broker地址,直到找到一个可用的Broker。

public BrokerData selectBrokerData(int pos, String topic) {
    TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
    if (topicRouteData != null && topicRouteData.getBrokerDatas().size() > 0) {
        List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
        int index = pos % brokerDatas.size();
        return brokerDatas.get(index);
    }

    return null;
}

5. 负载均衡

NameServer通过维护每个Broker的负载情况,并将消息均匀地分配到不同的Broker上,实现负载均衡。

public BrokerData selectBrokerData(final int pos, final String topic, final JSONTopic jsonTopic,
        final String defaultBroker, final int defaultTopicQueueNums, boolean nowriteEnable) {
    TopicRouteData topicRouteData = this.topicRouteTable.get(topic);

    if (topicRouteData != null && topicRouteData.getBrokerDatas().size() > 0) {
        List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
        int index = pos % brokerDatas.size();
        BrokerData bd = brokerDatas.get(index);

        boolean changed = false;
        if (jsonTopic != null && jsonTopic.getWriteQueueNums() > 0) {
            if (!topicRouteData.isWriteable(bd.getCluster())) {
                changed = true;
                this.setBrokerWritable(topic, bd.getCluster(), true);
            }
        } else if (this.topicConfigManager.isWriteOnly(topic)) {
            if (topicRouteData.isWriteableAll() || !nowriteEnable) {
                changed = true;
                this.setBrokerWritable(topic, null, false);
            }
        }

        if (changed) {
            topicRouteData.sortBrokerData(jsonTopic.getWriteQueueNums(), this.topicConfigManager);
        }

        return brokerDatas.get(index);
    }

    if (defaultBroker != null) {
        TopicRouteData topicRouteDataDefault = this.topicRouteTable.get(defaultBroker);
        if (topicRouteDataDefault != null) {
            List<BrokerData> brokerDatasDefault = topicRouteDataDefault.getBrokerDatas();
            int defaultIndex = pos % brokerDatasDefault.size();
            return brokerDatasDefault.get(defaultIndex);
        }
    }

    return null;
}

6. 高可用

NameServer采用主备模式,当主NameServer发生故障时,备NameServer会自动切换为主NameServer,保证集群的高可用性。

public void switchNameServer(final List<String> nameServerAddressList) {
    NameServerConfig nameServerConfig = this.nameServerConfig;
    if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
        String oldNamesrvAddr = nameServerConfig.getNamesrvAddr();
        String newNamesrvAddr = nameServerAddressList.get(0);

        if (oldNamesrvAddr == null || oldNamesrvAddr.equals(newNamesrvAddr)) {
            return;
        }

        logger.info("Receive Register Name Server, " + nameServerAddressList);

        String masterAddr = this.namesrvAddr;
        if (masterAddr != null) {
            masterAddr = masterAddr.substring(masterAddr.indexOf("//") + 2);
        }

        if (masterAddr != null && oldNamesrvAddr.equals(masterAddr)) {
            this.namesrvAddr = newNamesrvAddr;
            logger.info("The master name server switched to " + this.namesrvAddr);
        }
    }
}

总结

通过对NameServer源码的分析,我们深入了解了RocketMQ路由中心是如何实现集群管理、路由服务、负载均衡、高可用等功能的。这些知识对于我们更好地掌握RocketMQ的运行机制和特性非常有帮助。