消息如何保证送达 - 之 RabbitMQ 发布确认高级
2023-11-03 20:06:50
我们知道,RabbitMQ提供了多种消息发布确认模式,包括Publisher Confirms和Transaction Confirms,它们可以帮助我们确保消息可靠地发送到Broker。而在实际应用中,我们可能还会遇到一些更复杂的情况,比如:
- 消息发送成功,但由于网络故障或其他原因,Broker没有及时将消息持久化到磁盘,导致消息丢失。
- 消息发送成功,但消费者在处理消息时崩溃,导致消息没有被正确处理。
- 消息发送成功,但消费者在处理消息时非常慢,导致消息积压。
针对这些情况,RabbitMQ提供了更多高级的发布确认机制来帮助我们处理。
1. Publisher Confirms
Publisher Confirms允许生产者在消息发送到Broker后收到确认消息,确认消息成功持久化到磁盘。如果生产者在超时时间内没有收到确认消息,它可以重新发送消息。
启用Publisher Confirms:
在生产者代码中设置channel.ConfirmSelect(true)
监听确认消息:
channel.NotifyConfirm(func(deliveryConfirmation Confirm) {
if deliveryConfirmation.Ack {
// 消息成功发送到Broker
} else {
// 消息发送失败,可以重新发送
}
})
2. Transaction Confirms
Transaction Confirms允许生产者将多个消息打包成一个事务,并一次性提交或回滚事务。如果事务提交成功,则所有消息都将被持久化到磁盘。如果事务回滚,则所有消息都将被丢弃。
启用Transaction Confirms:
在生产者代码中设置channel.TxSelect()
开始事务:
channel.Tx()
提交事务:
channel.TxCommit()
回滚事务:
channel.TxRollback()
3. Mandatory Message Delivery
Mandatory Message Delivery确保消息要么被成功发送到队列,要么被Broker退回给生产者。如果消息被退回,生产者可以重新发送消息或将其丢弃。
启用Mandatory Message Delivery:
在生产者代码中设置channel.ConfirmSelect(true)和channel.MandatoryPublish(true)
监听返回消息:
channel.NotifyReturn(func(deliveryReturn DeliveryReturn) {
// 消息被Broker退回,可以重新发送或丢弃
})
4. Immediate Delivery
Immediate Delivery确保消息在发送到队列之前就被持久化到磁盘。这可以防止消息丢失,但也可能会降低吞吐量。
启用Immediate Delivery:
在生产者代码中设置channel.ConfirmSelect(true)和channel.ImmediateDelivery(true)
5. Nack Requeue
Nack Requeue允许消费者在处理消息时将其重新放回队列。这可以防止消息丢失,但可能会导致消息被重复处理。
启用Nack Requeue:
在消费者代码中设置channel.Qos(1, 0, false)
处理消息:
func (d *Delivery) Ack(ack bool) error {
if !ack {
// 消息处理失败,重新放回队列
return d.Nack(false, true)
}
// 消息处理成功,确认消息
return d.Ack(true)
}
总结
通过使用这些高级的发布确认机制,我们可以确保消息可靠地发送到Broker并被正确处理。这对于需要确保消息可靠性的应用非常重要。