揭秘RocketMQ路由中心NameServer源码
2024-02-03 19:19:42
RocketMQ作为业界领先的消息队列中间件,在金融、电商、物流等领域广泛应用。其路由中心NameServer负责协调和管理整个集群,是RocketMQ的核心组件之一。深入理解NameServer的源码,可以帮助我们更好地掌握RocketMQ的运行机制和特性。
NameServer的功能和作用
NameServer是RocketMQ的路由中心,主要负责以下功能:
-
集群管理: 负责整个RocketMQ集群的管理,维护集群中所有Broker的信息,包括Broker的地址、端口、存储容量等。
-
路由服务: 为生产者和消费者提供路由服务,根据Topic信息帮助生产者找到对应的Broker,并帮助消费者找到对应的消息队列。
-
负载均衡: 通过合理的路由策略,将消息均匀地分配到不同的Broker上,实现负载均衡。
-
高可用: NameServer采用主备模式,当主NameServer发生故障时,备NameServer会自动切换为主NameServer,保证集群的高可用性。
NameServer的实现
NameServer的实现主要包括以下几个方面:
-
数据结构: NameServer内部使用了一个名为“TopicRouteData”的数据结构来存储Topic信息,其中包括Topic的名称、队列数量、Broker地址、读写权限等信息。
-
路由算法: NameServer采用轮询算法来进行路由,即根据Topic的名称依次从TopicRouteData中获取Broker地址,直到找到一个可用的Broker。
-
负载均衡: NameServer通过维护每个Broker的负载情况,并将消息均匀地分配到不同的Broker上,实现负载均衡。
-
高可用: NameServer采用主备模式,当主NameServer发生故障时,备NameServer会自动切换为主NameServer,保证集群的高可用性。
源码分析
为了更深入地理解NameServer的实现,我们来看一下其源码。
1. 启动类
NameServer的启动类是org.apache.rocketmq.namesrv.NamesrvStartup,其main方法负责启动NameServer。
public static void main(String[] args) {
try {
NamesrvStartup.main0(args);
} catch (Throwable e) {
System.err.println("The Name Server Boot Failure");
e.printStackTrace();
}
}
2. 配置文件
NameServer的配置文件位于conf/namesrv.properties,主要配置项包括:
namesrv.kvConfigPath=/Users/apple/rocketmq-data/kvConfig
#Name Server存储Topic信息
namesrv.configStorePath=/Users/apple/rocketmq-data/configStore
#Name Server的IP地址
namesrv.address=127.0.0.1:9876
3. 数据结构
NameServer内部使用了一个名为“TopicRouteData”的数据结构来存储Topic信息,其中包括Topic的名称、队列数量、Broker地址、读写权限等信息。
public class TopicRouteData {
private String topic;
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
private int filterServerTable;
}
4. 路由算法
NameServer采用轮询算法来进行路由,即根据Topic的名称依次从TopicRouteData中获取Broker地址,直到找到一个可用的Broker。
public BrokerData selectBrokerData(int pos, String topic) {
TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
if (topicRouteData != null && topicRouteData.getBrokerDatas().size() > 0) {
List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
int index = pos % brokerDatas.size();
return brokerDatas.get(index);
}
return null;
}
5. 负载均衡
NameServer通过维护每个Broker的负载情况,并将消息均匀地分配到不同的Broker上,实现负载均衡。
public BrokerData selectBrokerData(final int pos, final String topic, final JSONTopic jsonTopic,
final String defaultBroker, final int defaultTopicQueueNums, boolean nowriteEnable) {
TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
if (topicRouteData != null && topicRouteData.getBrokerDatas().size() > 0) {
List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
int index = pos % brokerDatas.size();
BrokerData bd = brokerDatas.get(index);
boolean changed = false;
if (jsonTopic != null && jsonTopic.getWriteQueueNums() > 0) {
if (!topicRouteData.isWriteable(bd.getCluster())) {
changed = true;
this.setBrokerWritable(topic, bd.getCluster(), true);
}
} else if (this.topicConfigManager.isWriteOnly(topic)) {
if (topicRouteData.isWriteableAll() || !nowriteEnable) {
changed = true;
this.setBrokerWritable(topic, null, false);
}
}
if (changed) {
topicRouteData.sortBrokerData(jsonTopic.getWriteQueueNums(), this.topicConfigManager);
}
return brokerDatas.get(index);
}
if (defaultBroker != null) {
TopicRouteData topicRouteDataDefault = this.topicRouteTable.get(defaultBroker);
if (topicRouteDataDefault != null) {
List<BrokerData> brokerDatasDefault = topicRouteDataDefault.getBrokerDatas();
int defaultIndex = pos % brokerDatasDefault.size();
return brokerDatasDefault.get(defaultIndex);
}
}
return null;
}
6. 高可用
NameServer采用主备模式,当主NameServer发生故障时,备NameServer会自动切换为主NameServer,保证集群的高可用性。
public void switchNameServer(final List<String> nameServerAddressList) {
NameServerConfig nameServerConfig = this.nameServerConfig;
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
String oldNamesrvAddr = nameServerConfig.getNamesrvAddr();
String newNamesrvAddr = nameServerAddressList.get(0);
if (oldNamesrvAddr == null || oldNamesrvAddr.equals(newNamesrvAddr)) {
return;
}
logger.info("Receive Register Name Server, " + nameServerAddressList);
String masterAddr = this.namesrvAddr;
if (masterAddr != null) {
masterAddr = masterAddr.substring(masterAddr.indexOf("//") + 2);
}
if (masterAddr != null && oldNamesrvAddr.equals(masterAddr)) {
this.namesrvAddr = newNamesrvAddr;
logger.info("The master name server switched to " + this.namesrvAddr);
}
}
}
总结
通过对NameServer源码的分析,我们深入了解了RocketMQ路由中心是如何实现集群管理、路由服务、负载均衡、高可用等功能的。这些知识对于我们更好地掌握RocketMQ的运行机制和特性非常有帮助。