返回

消息如何保证送达 - 之 RabbitMQ 发布确认高级

后端

我们知道,RabbitMQ提供了多种消息发布确认模式,包括Publisher Confirms和Transaction Confirms,它们可以帮助我们确保消息可靠地发送到Broker。而在实际应用中,我们可能还会遇到一些更复杂的情况,比如:

  1. 消息发送成功,但由于网络故障或其他原因,Broker没有及时将消息持久化到磁盘,导致消息丢失。
  2. 消息发送成功,但消费者在处理消息时崩溃,导致消息没有被正确处理。
  3. 消息发送成功,但消费者在处理消息时非常慢,导致消息积压。

针对这些情况,RabbitMQ提供了更多高级的发布确认机制来帮助我们处理。

1. Publisher Confirms

Publisher Confirms允许生产者在消息发送到Broker后收到确认消息,确认消息成功持久化到磁盘。如果生产者在超时时间内没有收到确认消息,它可以重新发送消息。

启用Publisher Confirms:

在生产者代码中设置channel.ConfirmSelect(true)

监听确认消息:

channel.NotifyConfirm(func(deliveryConfirmation Confirm) {
  if deliveryConfirmation.Ack {
    // 消息成功发送到Broker
  } else {
    // 消息发送失败,可以重新发送
  }
})

2. Transaction Confirms

Transaction Confirms允许生产者将多个消息打包成一个事务,并一次性提交或回滚事务。如果事务提交成功,则所有消息都将被持久化到磁盘。如果事务回滚,则所有消息都将被丢弃。

启用Transaction Confirms:

在生产者代码中设置channel.TxSelect()

开始事务:

channel.Tx()

提交事务:

channel.TxCommit()

回滚事务:

channel.TxRollback()

3. Mandatory Message Delivery

Mandatory Message Delivery确保消息要么被成功发送到队列,要么被Broker退回给生产者。如果消息被退回,生产者可以重新发送消息或将其丢弃。

启用Mandatory Message Delivery:

在生产者代码中设置channel.ConfirmSelect(true)和channel.MandatoryPublish(true)

监听返回消息:

channel.NotifyReturn(func(deliveryReturn DeliveryReturn) {
  // 消息被Broker退回,可以重新发送或丢弃
})

4. Immediate Delivery

Immediate Delivery确保消息在发送到队列之前就被持久化到磁盘。这可以防止消息丢失,但也可能会降低吞吐量。

启用Immediate Delivery:

在生产者代码中设置channel.ConfirmSelect(true)和channel.ImmediateDelivery(true)

5. Nack Requeue

Nack Requeue允许消费者在处理消息时将其重新放回队列。这可以防止消息丢失,但可能会导致消息被重复处理。

启用Nack Requeue:

在消费者代码中设置channel.Qos(1, 0, false)

处理消息:

func (d *Delivery) Ack(ack bool) error {
  if !ack {
    // 消息处理失败,重新放回队列
    return d.Nack(false, true)
  }

  // 消息处理成功,确认消息
  return d.Ack(true)
}

总结

通过使用这些高级的发布确认机制,我们可以确保消息可靠地发送到Broker并被正确处理。这对于需要确保消息可靠性的应用非常重要。