手把手教你打造RabbitMQ延时队列(附订单定时取消实例)
2024-02-01 20:05:41
RabbitMQ延时队列入门
什么是RabbitMQ延时队列?
RabbitMQ延时队列是一种特殊的队列,允许您在指定的时间后才将消息传递给消费者。这对于需要在特定时间点触发某个操作的任务非常有用,例如订单定时取消、邮件发送、数据同步等。
RabbitMQ延时队列的原理
RabbitMQ延时队列的实现原理是使用死信交换机(Dead Letter Exchange, DLX)和死信队列(Dead Letter Queue, DLQ)。当您向延时队列发送消息时,消息会先被路由到DLX,然后由DLX将消息发送到DLQ。DLQ中的消息会在指定的时间后被重新发送到正常的队列,从而触发消费者的处理。
如何打造RabbitMQ延时队列
- 创建死信交换机和死信队列
首先,您需要创建一个死信交换机和死信队列。死信交换机用于将消息从正常的队列路由到DLQ,而DLQ用于存储需要延迟的消息。
# 创建死信交换机
rabbitmqadmin declare exchange dlx name=delayed_exchange type=direct
# 创建死信队列
rabbitmqadmin declare queue dlq name=dead_letter_queue durable=true
- 绑定死信交换机和死信队列
接下来,您需要将死信交换机和死信队列绑定在一起。绑定时,需要指定一个路由键,该路由键将决定哪些消息会被路由到DLQ。
# 绑定死信交换机和死信队列
rabbitmqadmin bind queue dlq exchange=delayed_exchange routing_key=delayed
- 设置消息的延迟时间
当您向延时队列发送消息时,您需要指定消息的延迟时间。延迟时间可以是绝对时间或相对时间。绝对时间是指消息被发送到DLQ的具体时间点,而相对时间是指消息在DLQ中延迟的时间。
# 设置消息的延迟时间(绝对时间)
message = {
"body": "Hello, world!",
"properties": {
"expiration": "2023-03-08T12:00:00Z"
}
}
# 将消息发送到延时队列
channel.basic_publish(exchange='', routing_key='delayed', body=json.dumps(message))
# 设置消息的延迟时间(相对时间)
message = {
"body": "Hello, world!",
"properties": {
"x-message-ttl": 3600000 # 1 小时
}
}
# 将消息发送到延时队列
channel.basic_publish(exchange='', routing_key='delayed', body=json.dumps(message))
- 消费死信队列中的消息
当消息在DLQ中延迟到期后,它将被重新发送到正常的队列。此时,您就可以消费该消息,并执行相应的操作。
# 创建消费者
consumer = channel.basic_consume(
queue='normal_queue',
on_message_callback=callback,
auto_ack=True
)
# 启动消费者
channel.start_consuming()
订单定时取消实例
现在,我们以订单定时取消为例,演示如何使用RabbitMQ延时队列实现该功能。
- 创建延时队列
首先,我们需要创建一个延时队列,并将其命名为"order_cancellation_queue"。
# 创建延时队列
rabbitmqadmin declare queue order_cancellation_queue name=order_cancellation_queue durable=true
- 绑定死信交换机和死信队列
接下来,我们需要将死信交换机和死信队列绑定在一起,并指定路由键为"order_cancellation"。
# 绑定死信交换机和死信队列
rabbitmqadmin bind queue order_cancellation_queue exchange=delayed_exchange routing_key=order_cancellation
- 设置消息的延迟时间
当我们收到一个订单时,我们需要将该订单信息发送到延时队列,并指定延迟时间为1小时。
# 设置消息的延迟时间
message = {
"body": json.dumps(order),
"properties": {
"x-message-ttl": 3600000 # 1 小时
}
}
# 将消息发送到延时队列
channel.basic_publish(exchange='', routing_key='order_cancellation', body=json.dumps(message))
- 消费死信队列中的消息
当订单在DLQ中延迟到期后,它将被重新发送到"order_cancellation_queue"队列。此时,我们可以消费该消息,并执行订单取消操作。
# 创建消费者
consumer = channel.basic_consume(
queue='order_cancellation_queue',
on_message_callback=cancel_order,
auto_ack=True
)
# 启动消费者
channel.start_consuming()
- 取消订单
在cancel_order
函数中,我们可以实现订单取消的逻辑。
def cancel_order(ch, method, properties, body):
order = json.loads(body)
# 取消订单
order_service.cancel_order(order["id"])
总结
RabbitMQ延时队列是一种非常有用的工具,可以帮助您实现各种需要延迟处理的任务。通过本指南,您已经掌握了打造RabbitMQ延时队列的步骤和方法。赶快动手实践,将延时队列应用到您的项目中吧!