返回

RocketMQ消息发送之旅:从生产者到Broker服务器的旅程

后端

在RocketMQ中,消息的发送是一个复杂的过程,涉及到多个组件和步骤。为了便于理解,我们将整个过程分为以下几个阶段:

  1. 消息创建 :生产者创建一条消息,并指定消息的主题和内容。
  2. 消息发送 :生产者将消息发送到Broker服务器。
  3. 消息接收 :Broker服务器接收消息,并将其存储到存储系统中。
  4. 消息消费 :消费者从Broker服务器订阅主题,并消费消息。

在本文中,我们将重点关注消息发送的阶段,并详细分析生产者是如何将消息发送到Broker服务器的。

消息发送流程

消息发送流程主要分为以下几个步骤:

  1. 生产者初始化 :生产者首先需要初始化,包括创建Producer对象、设置必要参数等。
  2. 发送消息 :生产者通过调用Producer对象的send方法发送消息。
  3. 消息序列化 :生产者将消息序列化为字节数组。
  4. 消息压缩 :生产者将消息字节数组压缩,以减少网络传输的开销。
  5. 消息签名 :生产者对消息字节数组进行签名,以确保消息的完整性。
  6. 消息加密 :生产者对消息字节数组进行加密,以保护消息的安全性。
  7. 消息发送 :生产者将消息字节数组发送到Broker服务器。
  8. Broker服务器接收消息 :Broker服务器接收消息字节数组,并将其存储到存储系统中。

源码分析

下面我们以RocketMQ的Java客户端为例,来分析消息发送的源码。

public void send(Message msg) {
    this.send(msg, this.defaultTopic);
}

public void send(Message msg, String topic) {
    this.send(msg, topic, null);
}

public void send(Message msg, String topic, String keys) {
    try {
        this.send(msg, topic, keys, this.defaultInstanceName);
    } catch (RequestTimeoutException e) {
        throw new MQClientException("send message to broker outime exception", e);
    }
}

从上面的代码中,我们可以看到,生产者通过调用send方法来发送消息。send方法有三个参数:消息对象、主题和键。其中,主题是必填项,键是可选的。如果键不为空,则消息将被发送到指定的分区。

public void send(Message msg, String topic, String keys, String instanceName)
        throws MQClientException, RequestTimeoutException {
    MessageQueue mq = this.selectOneMessageQueue(topic, keys);
    if (null == mq) {
        throw new MQClientException("Not found message queue for this topic: " + topic);
    }

    if (msg.getDelayTimeLevel() > 0) {
        if (msg.getDelayTimeLevel() > this.maxDelayTimeLevel) {
            msg.setDelayTimeLevel(this.maxDelayTimeLevel);
        }

        if (mq.getBrokerName().equals(LocalBrokerName)) {
            this.sendKernel(msg, mq);
            return;
        }
    }

    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long timeout = this.sendDefaultTimeout;
    while (timeout >= 0) {
        Broker broker = this.selectBroker(mq.getBrokerName());
        if (null == broker) {
            throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist");
        }

        long costTime = System.currentTimeMillis() - beginTimestampPrev;
        if (timeout < costTime) {
            throw new RequestTimeoutException("send message timeout, " + costTime + " ms, " + timeout + " ms");
        }

        try {
            this.sendKernel(msg, mq, broker);
            return;
        } catch (RequestTimeoutException e) {
            if (this.clientChannelException) {
                throw e;
            }

            logger.warn(SEND_WARNING_MSG, mq.getBrokerName(), e.getMessage());
            timeout -= System.currentTimeMillis() - beginTimestampPrev;
            beginTimestampPrev = System.currentTimeMillis();
            continue;
        } catch (MQClientException e) {
            if (this.clientChannelException && e instanceof MQBrokerException && e.getResponseCode() == ResponseCode.SYSTEM_ERROR
                || e.getResponseCode() == ResponseCode.NO_PERMISSION || e.getResponseCode() == ResponseCode.NO_TOPIC) {
                throw e;
            }

            logger.warn(SEND_WARNING_MSG, mq.getBrokerName(), e.getMessage());
            timeout -= System.currentTimeMillis() - beginTimestampPrev;
            beginTimestampPrev = System.currentTimeMillis();
            continue;
        } catch (Exception e) {
            logger.warn(SEND_WARNING_MSG, mq.getBrokerName(),
                "SEND_MESSAGE_EXCEPTION: " + e.toString());
            timeout -= System.currentTimeMillis() - beginTimestampPrev;
            beginTimestampPrev = System.currentTimeMillis();
            continue;
        }
    }

    throw new RequestTimeoutException("send message to broker outime exception");
}

在send方法中,生产者首先会选择一个消息队列。消息队列是Broker服务器上存储消息的地方。选择消息队列的策略有很多种,比如轮询、哈希等。

private void sendKernel(Message msg, MessageQueue mq) throws MQClientException {
    SendResult sendResult = this.sendKernel(msg, mq, null);
    long costTime = System.currentTimeMillis() - this.beginTimestamp;
    if (null == sendResult) {
        this.errorMap.put(mq.getBrokerName(), new MQClientException(ResponseCode.SYSTEM_ERROR,
            "Send Message Timeout"));
    } else {
        this.successMap.put(mq.getBrokerName(), new MQClientException(ResponseCode.SUCCESS,
            "Send Message Success"));
        this.counter.increaseSendMessageCount();
        this.counter.recordResponseTime(costTime);
    }
}

选择好消息队列后,生产者会将消息发送到消息队列中。发送消息的操作是通过调用Broker服务器上的sendKernel方法来完成的。

private SendResult sendKernel(Message msg, MessageQueue mq, Broker broker)
        throws MQClientException, RequestTimeoutException {
    int sysFlag = msg.getSysFlag();
    if (tranMsg) {
        sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
    }

    long bornTime = System.currentTimeMillis();
    bornTime = bornTime - (bornTime % 1000);
    msg.setBornTimestamp(bornTime);
    msg.setFlag(sysFlag);
    msg.setBodyCRC(MessageCRC.computeCrc(msg.getBody()));
    // retry and batchSend
    int topicLength = msg.getTopic().length();
    int bodyLength = msg.getBody() != null ? msg.getBody().length : 0;
    int msgLength = topicLength + bodyLength;
    if (this.msgCount.get() >= this.maxMsgsInBatch || msgLength > this.maxMessageSize) {
        this.executeSendKernelResult = this.executeSendKernelResult();
        this.msgCount.set(0);
    }

    //construct an array of Message and MessageQueue
    Message[] messages = this.topicMessageMap.get(mq.getTopic());
    if (null == messages) {
        messages = new Message[this.batchSendSize];
        this.topicMessageMap.put(mq.getTopic(), messages);
    }

    messages[this.msgCount.get()] = msg;
    this.queueOffsetTable.put(mq, this.msgCount.get());
    this.msgCount.incrementAndGet();

    if (this.msgCount.get() >= this.batchSendSize || msgLength > this.maxMessageSize) {
        this.executeSendKernelResult = this.executeSendKernelResult();
        this.msgCount.set(0);
    }

    return this.executeSendKernelResult;
}

在sendKernel方法中,生产者首先会对消息进行一些处理,比如设置消息的标志、计算消息的CRC校验值等。然后,生产者会将消息添加到一个消息数组中。这个消息数组是用来批量发送消息的。

private SendResult executeSendKernelResult() throws MQClientException, RequestTimeoutException {
    final int batchSize = this.msgCount