返回
RocketMQ消息发送之旅:从生产者到Broker服务器的旅程
后端
2023-12-04 23:48:28
在RocketMQ中,消息的发送是一个复杂的过程,涉及到多个组件和步骤。为了便于理解,我们将整个过程分为以下几个阶段:
- 消息创建 :生产者创建一条消息,并指定消息的主题和内容。
- 消息发送 :生产者将消息发送到Broker服务器。
- 消息接收 :Broker服务器接收消息,并将其存储到存储系统中。
- 消息消费 :消费者从Broker服务器订阅主题,并消费消息。
在本文中,我们将重点关注消息发送的阶段,并详细分析生产者是如何将消息发送到Broker服务器的。
消息发送流程
消息发送流程主要分为以下几个步骤:
- 生产者初始化 :生产者首先需要初始化,包括创建Producer对象、设置必要参数等。
- 发送消息 :生产者通过调用Producer对象的send方法发送消息。
- 消息序列化 :生产者将消息序列化为字节数组。
- 消息压缩 :生产者将消息字节数组压缩,以减少网络传输的开销。
- 消息签名 :生产者对消息字节数组进行签名,以确保消息的完整性。
- 消息加密 :生产者对消息字节数组进行加密,以保护消息的安全性。
- 消息发送 :生产者将消息字节数组发送到Broker服务器。
- Broker服务器接收消息 :Broker服务器接收消息字节数组,并将其存储到存储系统中。
源码分析
下面我们以RocketMQ的Java客户端为例,来分析消息发送的源码。
public void send(Message msg) {
this.send(msg, this.defaultTopic);
}
public void send(Message msg, String topic) {
this.send(msg, topic, null);
}
public void send(Message msg, String topic, String keys) {
try {
this.send(msg, topic, keys, this.defaultInstanceName);
} catch (RequestTimeoutException e) {
throw new MQClientException("send message to broker outime exception", e);
}
}
从上面的代码中,我们可以看到,生产者通过调用send方法来发送消息。send方法有三个参数:消息对象、主题和键。其中,主题是必填项,键是可选的。如果键不为空,则消息将被发送到指定的分区。
public void send(Message msg, String topic, String keys, String instanceName)
throws MQClientException, RequestTimeoutException {
MessageQueue mq = this.selectOneMessageQueue(topic, keys);
if (null == mq) {
throw new MQClientException("Not found message queue for this topic: " + topic);
}
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.maxDelayTimeLevel) {
msg.setDelayTimeLevel(this.maxDelayTimeLevel);
}
if (mq.getBrokerName().equals(LocalBrokerName)) {
this.sendKernel(msg, mq);
return;
}
}
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long timeout = this.sendDefaultTimeout;
while (timeout >= 0) {
Broker broker = this.selectBroker(mq.getBrokerName());
if (null == broker) {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist");
}
long costTime = System.currentTimeMillis() - beginTimestampPrev;
if (timeout < costTime) {
throw new RequestTimeoutException("send message timeout, " + costTime + " ms, " + timeout + " ms");
}
try {
this.sendKernel(msg, mq, broker);
return;
} catch (RequestTimeoutException e) {
if (this.clientChannelException) {
throw e;
}
logger.warn(SEND_WARNING_MSG, mq.getBrokerName(), e.getMessage());
timeout -= System.currentTimeMillis() - beginTimestampPrev;
beginTimestampPrev = System.currentTimeMillis();
continue;
} catch (MQClientException e) {
if (this.clientChannelException && e instanceof MQBrokerException && e.getResponseCode() == ResponseCode.SYSTEM_ERROR
|| e.getResponseCode() == ResponseCode.NO_PERMISSION || e.getResponseCode() == ResponseCode.NO_TOPIC) {
throw e;
}
logger.warn(SEND_WARNING_MSG, mq.getBrokerName(), e.getMessage());
timeout -= System.currentTimeMillis() - beginTimestampPrev;
beginTimestampPrev = System.currentTimeMillis();
continue;
} catch (Exception e) {
logger.warn(SEND_WARNING_MSG, mq.getBrokerName(),
"SEND_MESSAGE_EXCEPTION: " + e.toString());
timeout -= System.currentTimeMillis() - beginTimestampPrev;
beginTimestampPrev = System.currentTimeMillis();
continue;
}
}
throw new RequestTimeoutException("send message to broker outime exception");
}
在send方法中,生产者首先会选择一个消息队列。消息队列是Broker服务器上存储消息的地方。选择消息队列的策略有很多种,比如轮询、哈希等。
private void sendKernel(Message msg, MessageQueue mq) throws MQClientException {
SendResult sendResult = this.sendKernel(msg, mq, null);
long costTime = System.currentTimeMillis() - this.beginTimestamp;
if (null == sendResult) {
this.errorMap.put(mq.getBrokerName(), new MQClientException(ResponseCode.SYSTEM_ERROR,
"Send Message Timeout"));
} else {
this.successMap.put(mq.getBrokerName(), new MQClientException(ResponseCode.SUCCESS,
"Send Message Success"));
this.counter.increaseSendMessageCount();
this.counter.recordResponseTime(costTime);
}
}
选择好消息队列后,生产者会将消息发送到消息队列中。发送消息的操作是通过调用Broker服务器上的sendKernel方法来完成的。
private SendResult sendKernel(Message msg, MessageQueue mq, Broker broker)
throws MQClientException, RequestTimeoutException {
int sysFlag = msg.getSysFlag();
if (tranMsg) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
long bornTime = System.currentTimeMillis();
bornTime = bornTime - (bornTime % 1000);
msg.setBornTimestamp(bornTime);
msg.setFlag(sysFlag);
msg.setBodyCRC(MessageCRC.computeCrc(msg.getBody()));
// retry and batchSend
int topicLength = msg.getTopic().length();
int bodyLength = msg.getBody() != null ? msg.getBody().length : 0;
int msgLength = topicLength + bodyLength;
if (this.msgCount.get() >= this.maxMsgsInBatch || msgLength > this.maxMessageSize) {
this.executeSendKernelResult = this.executeSendKernelResult();
this.msgCount.set(0);
}
//construct an array of Message and MessageQueue
Message[] messages = this.topicMessageMap.get(mq.getTopic());
if (null == messages) {
messages = new Message[this.batchSendSize];
this.topicMessageMap.put(mq.getTopic(), messages);
}
messages[this.msgCount.get()] = msg;
this.queueOffsetTable.put(mq, this.msgCount.get());
this.msgCount.incrementAndGet();
if (this.msgCount.get() >= this.batchSendSize || msgLength > this.maxMessageSize) {
this.executeSendKernelResult = this.executeSendKernelResult();
this.msgCount.set(0);
}
return this.executeSendKernelResult;
}
在sendKernel方法中,生产者首先会对消息进行一些处理,比如设置消息的标志、计算消息的CRC校验值等。然后,生产者会将消息添加到一个消息数组中。这个消息数组是用来批量发送消息的。
private SendResult executeSendKernelResult() throws MQClientException, RequestTimeoutException {
final int batchSize = this.msgCount