返回
RocketMQ 单个消息过大时的妥善处理方式
后端
2024-01-12 07:55:14
引言
在使用 RocketMQ 进行消息传递时,有时可能会遇到单个消息大小超过限制的情况。默认情况下,RocketMQ 的 maxMessageSize
配置项限制为 4MB。当消息大小超过此限制时,将无法发送消息,并会抛出异常。
针对这种情况,本文介绍一种不更改 maxMessageSize
配置项的处理方式,即通过消息切割来实现消息的发送。这种方法可以将大消息拆分为多个小消息,并分别发送。
消息切割与发送
-
消息切割
首先,需要对大消息进行切割。这里可以使用一些第三方库,例如 Apache Commons Lang 的
StringUtils
类。该类提供了split()
方法,可以将字符串按照指定的分隔符切割成多个子字符串。String largeMessage = "This is a large message."; String[] smallMessages = StringUtils.split(largeMessage, ".");
-
消息发送
接下来,将切割后的小消息分别发送到 RocketMQ 中。需要注意的是,在发送小消息时,需要指定一个唯一的
messageId
。这有助于在接收端将小消息重新组合成大消息。for (String smallMessage : smallMessages) { Message message = new Message(); message.setBody(smallMessage.getBytes()); message.setMessageId(UUID.randomUUID().toString()); producer.send(message); }
消息接收与重组
在接收端,需要对收到的消息进行重组。这里可以使用一些第三方库,例如 Apache Commons Lang 的 StringUtils
类。该类提供了 join()
方法,可以将多个子字符串按照指定的分隔符连接成一个字符串。
List<String> smallMessages = new ArrayList<>();
for (Message message : consumer.receive()) {
smallMessages.add(new String(message.getBody()));
}
String largeMessage = StringUtils.join(smallMessages, ".");
结论
通过消息切割与发送,可以实现大消息在 RocketMQ 中的发送与接收。这种方法不需要更改 maxMessageSize
配置项,因此可以避免影响其他消息的发送。