返回

RocketMQ 单个消息过大时的妥善处理方式

后端

引言

在使用 RocketMQ 进行消息传递时,有时可能会遇到单个消息大小超过限制的情况。默认情况下,RocketMQ 的 maxMessageSize 配置项限制为 4MB。当消息大小超过此限制时,将无法发送消息,并会抛出异常。

针对这种情况,本文介绍一种不更改 maxMessageSize 配置项的处理方式,即通过消息切割来实现消息的发送。这种方法可以将大消息拆分为多个小消息,并分别发送。

消息切割与发送

  1. 消息切割

    首先,需要对大消息进行切割。这里可以使用一些第三方库,例如 Apache Commons Lang 的 StringUtils 类。该类提供了 split() 方法,可以将字符串按照指定的分隔符切割成多个子字符串。

    String largeMessage = "This is a large message.";
    String[] smallMessages = StringUtils.split(largeMessage, ".");
    
  2. 消息发送

    接下来,将切割后的小消息分别发送到 RocketMQ 中。需要注意的是,在发送小消息时,需要指定一个唯一的 messageId。这有助于在接收端将小消息重新组合成大消息。

    for (String smallMessage : smallMessages) {
        Message message = new Message();
        message.setBody(smallMessage.getBytes());
        message.setMessageId(UUID.randomUUID().toString());
        producer.send(message);
    }
    

消息接收与重组

在接收端,需要对收到的消息进行重组。这里可以使用一些第三方库,例如 Apache Commons Lang 的 StringUtils 类。该类提供了 join() 方法,可以将多个子字符串按照指定的分隔符连接成一个字符串。

List<String> smallMessages = new ArrayList<>();
for (Message message : consumer.receive()) {
    smallMessages.add(new String(message.getBody()));
}
String largeMessage = StringUtils.join(smallMessages, ".");

结论

通过消息切割与发送,可以实现大消息在 RocketMQ 中的发送与接收。这种方法不需要更改 maxMessageSize 配置项,因此可以避免影响其他消息的发送。