返回

RocketMQ如何保证消费顺序?

后端

消费任务的实现类ConsumerRequest

RocketMQ的消费任务是以ConsumerRequest的形式进行封装,用于消费者从Broker拉取消息的请求,包括以下主要属性:

  • BrokerName: Broker服务器的名称。
  • QueueId: 消息队列的ID。
  • QueueOffset: 消息队列的偏移量,用于标识待消费消息的起始位置。
  • MaxMsgs: 每次拉取消息的最大数量。
  • MessageModel: 消息模型,有CLUSTERING(集群模式)和BROADCASTING(广播模式)两种。
  • ConsumeType: 消费类型,有PULL(拉取模式)和PUSH(推送模式)两种。
  • ConsumerGroup: 消费组的名称。

本地缓存队列ProcessQueue

RocketMQ的消费者本地维护了一个ProcessQueue,用于缓存从Broker拉取的消息,并按顺序进行消费。ProcessQueue的主要属性包括:

  • QueueId: 消息队列的ID。
  • Messages: 缓存的消息列表。
  • LockedMsgs: 正在被消费的消息列表。
  • MaxMsgs: 缓存消息的最大数量。
  • ConsumeType: 消费类型,有PULL(拉取模式)和PUSH(推送模式)两种。
  • MessageModel: 消息模型,有CLUSTERING(集群模式)和BROADCASTING(广播模式)两种。

并发消息的消费请求

对于并发消息,ConsumerRequest中的MessageModel设置为CLUSTERING,ConsumeType设置为PULL,表示消费者从Broker拉取消息后,可以并发地消费这些消息。此时,ProcessQueue中的Messages列表存储了拉取到的所有消息,LockedMsgs列表为空,消费者可以从Messages列表中取出任意一条消息进行消费。

顺序消息的消费

对于顺序消息,ConsumerRequest中的MessageModel设置为ORDERED,ConsumeType设置为PULL,表示消费者从Broker拉取消息后,必须按顺序消费这些消息。此时,ProcessQueue中的Messages列表存储了拉取到的所有消息,LockedMsgs列表存储了正在被消费的消息,消费者只能从LockedMsgs列表中取出消息进行消费。

当消费者消费完一条顺序消息后,会将该消息从LockedMsgs列表中删除,并从Messages列表中取出下一条消息放入LockedMsgs列表中,以此保证消息的顺序消费。

总结

RocketMQ通过使用ConsumerRequest和ProcessQueue来实现消息的顺序消费。对于并发消息,消费者可以并发地消费消息;对于顺序消息,消费者必须按顺序消费消息。