返回

RocketMQ 原生 API:掌控消息队列,实现可靠有序的通信

后端

在分布式系统中,消息队列扮演着至关重要的角色,保障了不同系统、组件间的信息传输可靠有序。Apache RocketMQ 作为一款开源的消息队列中间件,以其高吞吐、低延迟、高可靠等特性备受青睐。RocketMQ 原生 API 为开发者提供了灵活高效的操作界面,本文将深入探讨如何利用原生 API 实现顺序消息、广播消息、延迟消息和过滤消息的收发。

一、RocketMQ 原生 API 入门

RocketMQ 提供了一系列 Java 语言编写的原生 API,涵盖了消息生产、消费和管理等方方面面。在使用原生 API 之前,需要引入相关的依赖包:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>5.0.0</version>
</dependency>

二、顺序消息的收发

顺序消息保证了消息在队列中的顺序与生产顺序一致,适用于需要严格保证消息顺序的场景,如订单处理、账务流水等。

2.1 生产者示例

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class OrderProducer {

    public static void main(String[] args) throws Exception {
        // 创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("order_group");
        // 设置 Name Server 地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动生产者
        producer.start();

        // 循环发送 10 条顺序消息
        for (int i = 0; i < 10; i++) {
            // 创建消息
            Message message = new Message("order_topic", "order_" + i);
            // 设置消息的顺序键,保证消息有序性
            message.setKeys("order_key");
            // 发送消息
            producer.send(message);
        }

        // 关闭生产者
        producer.shutdown();
    }
}

2.2 消费者示例

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class OrderConsumer {

    public static void main(String[] args) throws Exception {
        // 创建消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_group");
        // 设置 Name Server 地址
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅主题
        consumer.subscribe("order_topic", "*");

        // 注册顺序消息监听器
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    // 处理顺序消息
                    System.out.println("Receive message: " + new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
    }
}

三、广播消息的收发

广播消息将消息发送到所有订阅该主题的消费者,适用于需要将消息广播到多个系统的场景,如系统通知、营销活动等。

3.1 生产者示例

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class BroadcastProducer {

    public static void main(String[] args) throws Exception {
        // 创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("broadcast_group");
        // 设置 Name Server 地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动生产者
        producer.start();

        // 创建广播消息
        Message message = new Message("broadcast_topic", "broadcast_message");
        // 发送广播消息
        producer.send(message);

        // 关闭生产者
        producer.shutdown();
    }
}

3.2 消费者示例

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class BroadcastConsumer {

    public static void main(String[] args) throws Exception {
        // 创建消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast_group");
        // 设置 Name Server 地址
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅主题
        consumer.subscribe("broadcast_topic", "*");

        // 注册并发消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    // 处理广播消息
                    System.out.println("Receive message: " + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
    }
}

四、延迟消息的收发

延迟消息在指定的时间后才被消费,适用于需要定时执行任务、到期提醒等场景。

4.1 生产者示例

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class DelayProducer {

    public static void main(String[] args) throws Exception {
        // 创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("delay_group");
        // 设置 Name Server 地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动生产者
        producer.start();

        // 创建延迟消息
        Message message = new Message("delay_topic", "delay_message");
        // 设置延迟等级,单位是毫秒,这里设置 10 秒后消费
        message.setDelayTimeLevel(3);

        // 发送延迟消息
        producer.send(message);

        // 关闭生产者
        producer.shutdown();
    }
}

4.2 消费者示例

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class DelayConsumer {

    public static void main(String[] args) throws Exception {
        // 创建消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_group");
        // 设置 Name Server 地址
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅主题
        consumer.subscribe("delay_topic", "*");

        // 注册并发消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    // 处理延迟消息
                    System.out.println("Receive message: " + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
    }
}

五、过滤消息的收发

过滤消息通过 Tag 实现消息的精细化分类和筛选,适用于需要根据不同条件消费不同类型消息的场景。

5.1 生产者示例

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class FilterProducer {