返回
RocketMQ 原生 API:掌控消息队列,实现可靠有序的通信
后端
2023-11-10 02:29:15
在分布式系统中,消息队列扮演着至关重要的角色,保障了不同系统、组件间的信息传输可靠有序。Apache RocketMQ 作为一款开源的消息队列中间件,以其高吞吐、低延迟、高可靠等特性备受青睐。RocketMQ 原生 API 为开发者提供了灵活高效的操作界面,本文将深入探讨如何利用原生 API 实现顺序消息、广播消息、延迟消息和过滤消息的收发。
一、RocketMQ 原生 API 入门
RocketMQ 提供了一系列 Java 语言编写的原生 API,涵盖了消息生产、消费和管理等方方面面。在使用原生 API 之前,需要引入相关的依赖包:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.0.0</version>
</dependency>
二、顺序消息的收发
顺序消息保证了消息在队列中的顺序与生产顺序一致,适用于需要严格保证消息顺序的场景,如订单处理、账务流水等。
2.1 生产者示例
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class OrderProducer {
public static void main(String[] args) throws Exception {
// 创建生产者
DefaultMQProducer producer = new DefaultMQProducer("order_group");
// 设置 Name Server 地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
// 循环发送 10 条顺序消息
for (int i = 0; i < 10; i++) {
// 创建消息
Message message = new Message("order_topic", "order_" + i);
// 设置消息的顺序键,保证消息有序性
message.setKeys("order_key");
// 发送消息
producer.send(message);
}
// 关闭生产者
producer.shutdown();
}
}
2.2 消费者示例
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class OrderConsumer {
public static void main(String[] args) throws Exception {
// 创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_group");
// 设置 Name Server 地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅主题
consumer.subscribe("order_topic", "*");
// 注册顺序消息监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
// 处理顺序消息
System.out.println("Receive message: " + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}
三、广播消息的收发
广播消息将消息发送到所有订阅该主题的消费者,适用于需要将消息广播到多个系统的场景,如系统通知、营销活动等。
3.1 生产者示例
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class BroadcastProducer {
public static void main(String[] args) throws Exception {
// 创建生产者
DefaultMQProducer producer = new DefaultMQProducer("broadcast_group");
// 设置 Name Server 地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
// 创建广播消息
Message message = new Message("broadcast_topic", "broadcast_message");
// 发送广播消息
producer.send(message);
// 关闭生产者
producer.shutdown();
}
}
3.2 消费者示例
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class BroadcastConsumer {
public static void main(String[] args) throws Exception {
// 创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast_group");
// 设置 Name Server 地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅主题
consumer.subscribe("broadcast_topic", "*");
// 注册并发消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 处理广播消息
System.out.println("Receive message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}
四、延迟消息的收发
延迟消息在指定的时间后才被消费,适用于需要定时执行任务、到期提醒等场景。
4.1 生产者示例
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class DelayProducer {
public static void main(String[] args) throws Exception {
// 创建生产者
DefaultMQProducer producer = new DefaultMQProducer("delay_group");
// 设置 Name Server 地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
// 创建延迟消息
Message message = new Message("delay_topic", "delay_message");
// 设置延迟等级,单位是毫秒,这里设置 10 秒后消费
message.setDelayTimeLevel(3);
// 发送延迟消息
producer.send(message);
// 关闭生产者
producer.shutdown();
}
}
4.2 消费者示例
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class DelayConsumer {
public static void main(String[] args) throws Exception {
// 创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_group");
// 设置 Name Server 地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅主题
consumer.subscribe("delay_topic", "*");
// 注册并发消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 处理延迟消息
System.out.println("Receive message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}
五、过滤消息的收发
过滤消息通过 Tag 实现消息的精细化分类和筛选,适用于需要根据不同条件消费不同类型消息的场景。
5.1 生产者示例
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class FilterProducer {