返回
RocketMQ如何保证消费顺序?
后端
2023-12-17 11:10:12
消费任务的实现类ConsumerRequest
RocketMQ的消费任务是以ConsumerRequest的形式进行封装,用于消费者从Broker拉取消息的请求,包括以下主要属性:
- BrokerName: Broker服务器的名称。
- QueueId: 消息队列的ID。
- QueueOffset: 消息队列的偏移量,用于标识待消费消息的起始位置。
- MaxMsgs: 每次拉取消息的最大数量。
- MessageModel: 消息模型,有CLUSTERING(集群模式)和BROADCASTING(广播模式)两种。
- ConsumeType: 消费类型,有PULL(拉取模式)和PUSH(推送模式)两种。
- ConsumerGroup: 消费组的名称。
本地缓存队列ProcessQueue
RocketMQ的消费者本地维护了一个ProcessQueue,用于缓存从Broker拉取的消息,并按顺序进行消费。ProcessQueue的主要属性包括:
- QueueId: 消息队列的ID。
- Messages: 缓存的消息列表。
- LockedMsgs: 正在被消费的消息列表。
- MaxMsgs: 缓存消息的最大数量。
- ConsumeType: 消费类型,有PULL(拉取模式)和PUSH(推送模式)两种。
- MessageModel: 消息模型,有CLUSTERING(集群模式)和BROADCASTING(广播模式)两种。
并发消息的消费请求
对于并发消息,ConsumerRequest中的MessageModel设置为CLUSTERING,ConsumeType设置为PULL,表示消费者从Broker拉取消息后,可以并发地消费这些消息。此时,ProcessQueue中的Messages列表存储了拉取到的所有消息,LockedMsgs列表为空,消费者可以从Messages列表中取出任意一条消息进行消费。
顺序消息的消费
对于顺序消息,ConsumerRequest中的MessageModel设置为ORDERED,ConsumeType设置为PULL,表示消费者从Broker拉取消息后,必须按顺序消费这些消息。此时,ProcessQueue中的Messages列表存储了拉取到的所有消息,LockedMsgs列表存储了正在被消费的消息,消费者只能从LockedMsgs列表中取出消息进行消费。
当消费者消费完一条顺序消息后,会将该消息从LockedMsgs列表中删除,并从Messages列表中取出下一条消息放入LockedMsgs列表中,以此保证消息的顺序消费。
总结
RocketMQ通过使用ConsumerRequest和ProcessQueue来实现消息的顺序消费。对于并发消息,消费者可以并发地消费消息;对于顺序消息,消费者必须按顺序消费消息。