返回
RocketMQ 5.0 源码解读:深入 Pull 模式的奥秘
后端
2023-12-05 17:48:53
在 RocketMQ 的浩瀚代码海洋中,Pull 模式宛如一颗璀璨的明珠,吸引着我们一探究竟。作为一名技术博客创作专家,我将结合我的独特视角,为您呈现一篇别具一格的 RocketMQ 5.0 源码解读,深入剖析 Pull 模式的奥秘。
Pull 模式的独特之处
与其他两种模式不同,Pull 模式下,消费者主动从 Broker 获取消息,实现了消息主动权的转移。这种模式的优点显而易见:
- 更灵活的消费控制: 消费者可以根据自己的实际情况和需求,主动调整消息的拉取频率和数量。
- 更高的可靠性: 消费者可以控制拉取消息的时机和方式,避免了因网络波动或其他因素导致的消息丢失。
- 更适合大并发场景: 在高并发场景下,Pull 模式可以有效降低 Broker 的压力,提高系统的整体稳定性。
Pull 模式的实现原理
Pull 模式的实现主要涉及三个核心步骤:
- 消费者订阅主题: 消费者首先需要向 Broker 订阅特定的主题,以表明自己感兴趣的消息类型。
- 消费者轮询 Broker: 消费者会定期轮询 Broker,查询是否有新的消息。轮询的频率由消费者自己控制。
- Broker 返回消息: 如果 Broker 中存在符合消费者订阅条件的消息,则 Broker 会将这些消息返回给消费者。
Pull 模式的源码解析
1. 消费者订阅主题
在 com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl
类中,subscribe
方法负责订阅主题:
public void subscribe(String topic, String subExpression) {
TopicSubscribeInfo info = new TopicSubscribeInfo(topic, subExpression);
Set<TopicSubscribeInfo> set = new HashSet<TopicSubscribeInfo>();
set.add(info);
this.subscriptionInfoTable.put(topic, set);
}
2. 消费者轮询 Broker
在 com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl
类中,pull
方法负责轮询 Broker:
public PullResult pull(PullRequest pullRequest) {
long offset = pullRequest.getNextOffset();
if (offset < 0) {
offset = 0;
}
PullResult result = sendPullRequest(pullRequest.getTopic(), pullRequest.getQueueId(), offset);
result.setPullRequest(pullRequest);
return result;
}
3. Broker 返回消息
在 com.alibaba.rocketmq.broker.processor.PullMessageProcessor
类中,processRequest
方法负责处理消费者发来的 Pull 请求:
public PullMessageResponse processRequest(Object request) {
PullMessageRequest pullRequest = (PullMessageRequest) request;
long offset = pullRequest.getOffset();
long lastTimestamp = pullRequest.getLastTimestamp();
int msgSize = pullRequest.getBatchSize();
// 根据条件查询消息
SelectMessageByOffsetRequest requestOffset = new SelectMessageByOffsetRequest(pullRequest.getTopic(),
pullRequest.getQueueId(), offset, msgSize, false);
MessageQueue mq = new MessageQueue();
mq.setTopic(pullRequest.getTopic());
mq.setQueueId(pullRequest.getQueueId());
PullMessageResponse response = new PullMessageResponse();
MessageQueue offsetInfo = null;
MessageQueue messageQueue = (MessageQueue) brokerController.getMessageStore().selectOneMessageByOffset(
mq, requestOffset, offsetInfo);
if (messageQueue != null) {
response.setStartOffset(messageQueue.getOffset());
response.setMaxOffset(offsetInfo.getOffset());
response.setEndOffset(brokerController.getMessageStore().getMaxOffsetInQueue(messageQueue));
// 查询符合条件的消息
SelectMessageByOffsetRequest request = new SelectMessageByOffsetRequest(pullRequest.getTopic(),
pullRequest.getQueueId(), offset, msgSize, pullRequest.getSubExpression());
GetMessageResult result = brokerController.getMessageStore().getMessage(mq, request);
response.setMinOffset(result.getMinOffset());
if (result.getBuffer() != null) {
try {
response.setMsgFound(true);
byte[] data = new byte[result.getBuffer().remaining()];
result.getBuffer().get(data);
response.setBody(data);
response.setSuggestWhichBrokerId(result.getSuggestBrokerId());
} catch (Throwable t) {
response.setMsgFound(false);
logger.error("Failed to read data from select message by offset response", t);
} finally {
result.release();
}
} else {
response.setMsgFound(false);
}
}
return response;
}
总结
RocketMQ 5.0 的 Pull 模式通过主动拉取消息的机制,赋予消费者更多的控制权和灵活性。深入理解其实现原理,有助于我们更好地利用这一模式,打造更加健壮、高性能的消息系统。
希望这篇独具特色的技术博客,能够为您的 RocketMQ 探索之旅增添一抹亮色。欢迎交流和探讨,让我们共同在技术海洋中乘风破浪!