返回

成为消息队列专业人士,我们需要的仅仅是RocketMQ4.6.0的消费知识

后端

集群化、推送式、并行消费:RocketMQ 普通消息消费入门指南

引言

RocketMQ 是当今流行的消息队列技术,本文将深入探讨 RocketMQ 4.6.0 普通消息消费的实现原理,让你快速上手。普通消息消费支持集群部署,即多个消费者节点同时消费同一个消息队列。在这样的场景下,消息会以推送的方式发送到消费者。此外,消费者还可以设置并行消费,即每个消费者节点同时消费多个消息队列。

创建消息队列

bin/mqadmin createTopic -n YourTopic -b 1

该命令将创建一个名为 "YourTopic" 的消息队列,其中 "b 1" 表示该队列只有一个分区。

创建消费者组

bin/mqadmin createConsumerGroup -n YourConsumerGroup

该命令将创建一个名为 "YourConsumerGroup" 的消费者组。

启动消费者

Properties properties = new Properties();
properties.put(PropertyKeyConst.ConsumerId, "YourConsumerId");
properties.put(PropertyKeyConst.NamesrvAddr, "127.0.0.1:9876");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YourConsumerGroup", properties);
consumer.subscribe("YourTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            System.out.println("Received message: " + new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

这段 Java 代码创建一个 DefaultMQPushConsumer,它是一个推送式消费者。我们将它注册为 "YourConsumerGroup" 的成员,并订阅 "YourTopic" 中所有 tag 的消息。当收到消息时,我们会打印消息内容并将其标记为已消费。

生产消息

Properties properties = new Properties();
properties.put(PropertyKeyConst.ProducerId, "YourProducerId");
properties.put(PropertyKeyConst.NamesrvAddr, "127.0.0.1:9876");
DefaultMQProducer producer = new DefaultMQProducer("YourProducerGroup", properties);
producer.start();
for (int i = 0; i < 10; i++) {
    Message msg = new Message("YourTopic", "YourTag", ("Hello RocketMQ " + i).getBytes());
    producer.send(msg);
}
producer.shutdown();

这段 Java 代码创建一个 DefaultMQProducer,它是一个生产者。我们将它配置为使用 "YourProducerGroup",并向 "YourTopic" 发送 10 条消息,每条消息都带有 "YourTag"。

验证消费结果

启动消费者后,你可以在控制台中看到消费结果,每个消费者实例都会打印收到的消息。

结论

本文详细介绍了如何实现 RocketMQ 普通消息的集群化、推送式和并行消费。通过遵循这些步骤,你就可以轻松设置一个可靠和可扩展的消息消费系统。

常见问题解答

1. 为什么要使用集群部署?

集群部署允许多个消费者节点同时消费消息,提高吞吐量并增加容错性。

2. 推送式消费和拉取式消费有什么区别?

在推送式消费中,消息队列主动将消息推送到消费者,而在拉取式消费中,消费者需要主动从消息队列拉取消息。推送式消费的效率更高,但需要消息队列支持。

3. 并行消费如何提高性能?

并行消费允许一个消费者节点同时消费多个消息队列,这可以显著提高吞吐量,特别是在消息量很大的情况下。

4. 如何调整消费者并行度?

并行度可以通过 DefaultMQPushConsumer.setConsumeThreadMin 和 DefaultMQPushConsumer.setConsumeThreadMax 方法进行调整。

5. 如何处理消费失败的消息?

如果一个消息消费失败,RocketMQ 会自动重试消费。消费失败的消息也会存储在 "RETRY" topic 中,你可以根据需要手动重新消费这些消息。