返回

手把手教你打造RabbitMQ延时队列(附订单定时取消实例)

后端

RabbitMQ延时队列入门

什么是RabbitMQ延时队列?

RabbitMQ延时队列是一种特殊的队列,允许您在指定的时间后才将消息传递给消费者。这对于需要在特定时间点触发某个操作的任务非常有用,例如订单定时取消、邮件发送、数据同步等。

RabbitMQ延时队列的原理

RabbitMQ延时队列的实现原理是使用死信交换机(Dead Letter Exchange, DLX)和死信队列(Dead Letter Queue, DLQ)。当您向延时队列发送消息时,消息会先被路由到DLX,然后由DLX将消息发送到DLQ。DLQ中的消息会在指定的时间后被重新发送到正常的队列,从而触发消费者的处理。

如何打造RabbitMQ延时队列

  1. 创建死信交换机和死信队列

首先,您需要创建一个死信交换机和死信队列。死信交换机用于将消息从正常的队列路由到DLQ,而DLQ用于存储需要延迟的消息。

# 创建死信交换机
rabbitmqadmin declare exchange dlx name=delayed_exchange type=direct

# 创建死信队列
rabbitmqadmin declare queue dlq name=dead_letter_queue durable=true
  1. 绑定死信交换机和死信队列

接下来,您需要将死信交换机和死信队列绑定在一起。绑定时,需要指定一个路由键,该路由键将决定哪些消息会被路由到DLQ。

# 绑定死信交换机和死信队列
rabbitmqadmin bind queue dlq exchange=delayed_exchange routing_key=delayed
  1. 设置消息的延迟时间

当您向延时队列发送消息时,您需要指定消息的延迟时间。延迟时间可以是绝对时间或相对时间。绝对时间是指消息被发送到DLQ的具体时间点,而相对时间是指消息在DLQ中延迟的时间。

# 设置消息的延迟时间(绝对时间)
message = {
    "body": "Hello, world!",
    "properties": {
        "expiration": "2023-03-08T12:00:00Z"
    }
}

# 将消息发送到延时队列
channel.basic_publish(exchange='', routing_key='delayed', body=json.dumps(message))
# 设置消息的延迟时间(相对时间)
message = {
    "body": "Hello, world!",
    "properties": {
        "x-message-ttl": 3600000 # 1 小时
    }
}

# 将消息发送到延时队列
channel.basic_publish(exchange='', routing_key='delayed', body=json.dumps(message))
  1. 消费死信队列中的消息

当消息在DLQ中延迟到期后,它将被重新发送到正常的队列。此时,您就可以消费该消息,并执行相应的操作。

# 创建消费者
consumer = channel.basic_consume(
    queue='normal_queue',
    on_message_callback=callback,
    auto_ack=True
)

# 启动消费者
channel.start_consuming()

订单定时取消实例

现在,我们以订单定时取消为例,演示如何使用RabbitMQ延时队列实现该功能。

  1. 创建延时队列

首先,我们需要创建一个延时队列,并将其命名为"order_cancellation_queue"。

# 创建延时队列
rabbitmqadmin declare queue order_cancellation_queue name=order_cancellation_queue durable=true
  1. 绑定死信交换机和死信队列

接下来,我们需要将死信交换机和死信队列绑定在一起,并指定路由键为"order_cancellation"。

# 绑定死信交换机和死信队列
rabbitmqadmin bind queue order_cancellation_queue exchange=delayed_exchange routing_key=order_cancellation
  1. 设置消息的延迟时间

当我们收到一个订单时,我们需要将该订单信息发送到延时队列,并指定延迟时间为1小时。

# 设置消息的延迟时间
message = {
    "body": json.dumps(order),
    "properties": {
        "x-message-ttl": 3600000 # 1 小时
    }
}

# 将消息发送到延时队列
channel.basic_publish(exchange='', routing_key='order_cancellation', body=json.dumps(message))
  1. 消费死信队列中的消息

当订单在DLQ中延迟到期后,它将被重新发送到"order_cancellation_queue"队列。此时,我们可以消费该消息,并执行订单取消操作。

# 创建消费者
consumer = channel.basic_consume(
    queue='order_cancellation_queue',
    on_message_callback=cancel_order,
    auto_ack=True
)

# 启动消费者
channel.start_consuming()
  1. 取消订单

cancel_order函数中,我们可以实现订单取消的逻辑。

def cancel_order(ch, method, properties, body):
    order = json.loads(body)
    # 取消订单
    order_service.cancel_order(order["id"])

总结

RabbitMQ延时队列是一种非常有用的工具,可以帮助您实现各种需要延迟处理的任务。通过本指南,您已经掌握了打造RabbitMQ延时队列的步骤和方法。赶快动手实践,将延时队列应用到您的项目中吧!