生产者消息确认机制实现RabbitMQ可靠投递
2024-01-14 08:33:09
RabbitMQ提供了可靠的消息投递机制,确保消息从生产者到消费者的可靠传输。在上一篇文章中,我们介绍了如何在 SpringBoot 中快速集成 RabbitMQ,本文将深入探讨 RabbitMQ 的消息确认机制,包括生产者消息确认机制和消费者消息确认机制。
生产者消息确认机制
生产者消息确认机制,是指生产者在发送消息后,等待并确认消息是否被服务器成功接收。
实现ConfirmCallback
要在 SpringBoot 中使用生产者消息确认机制,首先需要实现ConfirmCallback接口,该接口包含一个confirm(long deliveryTag, boolean multiple)方法,当服务器接收到消息后,会调用该方法。
public class ConfirmCallbackImpl implements ConfirmCallback {
@Override
public void confirm(long deliveryTag, boolean multiple) {
if (multiple) {
System.out.println("multiple deliveryTag已确认: " + deliveryTag);
} else {
System.out.println("deliveryTag已确认: " + deliveryTag);
}
}
}
使用ConfirmCallback
然后,在初始化RabbitTemplate时,设置ConfirmCallback接口的实现。
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(new ConfirmCallbackImpl());
消息确认超时
在某些情况下,服务器可能需要花费更多的时间来确认消息,此时,生产者可以设置一个超时时间,如果超时时间内没有收到服务器的确认,则认为消息丢失,生产者需要重新发送消息。
rabbitTemplate.setMandatory(true);
设置mandatory属性为true,如果服务器无法将消息路由到队列,则会返回NackCallback。
NackCallback
NackCallback接口包含一个handle(long deliveryTag, boolean multiple)方法,当服务器无法成功将消息路由到队列时,会调用该方法。
rabbitTemplate.setReturnCallback(new ReturnCallbackImpl());
消费者消息确认机制
消费者消息确认机制,是指消费者在收到消息后,向服务器发送确认消息,表明已收到消息。
手动确认
消费者可以手动发送确认消息,在收到消息后,调用channel.basicAck(deliveryTag, false)方法确认消息。
channel.basicConsume(queueName, false, new Consumer() {
@Override
public void handleConsume(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到消息: " + message);
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
异步确认
消费者也可以异步发送确认消息,在收到消息后,调用channel.basicAck(deliveryTag, true)方法确认消息。
channel.basicConsume(queueName, true, new Consumer() {
@Override
public void handleConsume(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到消息: " + message);
channel.basicAck(envelope.getDeliveryTag(), true);
}
});
总结
通过生产者消息确认机制和消费者消息确认机制,可以确保消息从生产者到消费者的可靠传输,防止消息丢失。在生产者消息确认机制中,可以使用ConfirmCallback接口和ReturnCallback接口来确认消息是否被服务器成功接收,以及处理无法路由的消息。在消费者消息确认机制中,可以使用手动确认和异步确认来确认已收到消息。