返回

生产者消息确认机制实现RabbitMQ可靠投递

后端

RabbitMQ提供了可靠的消息投递机制,确保消息从生产者到消费者的可靠传输。在上一篇文章中,我们介绍了如何在 SpringBoot 中快速集成 RabbitMQ,本文将深入探讨 RabbitMQ 的消息确认机制,包括生产者消息确认机制和消费者消息确认机制。

生产者消息确认机制

生产者消息确认机制,是指生产者在发送消息后,等待并确认消息是否被服务器成功接收。

实现ConfirmCallback

要在 SpringBoot 中使用生产者消息确认机制,首先需要实现ConfirmCallback接口,该接口包含一个confirm(long deliveryTag, boolean multiple)方法,当服务器接收到消息后,会调用该方法。

public class ConfirmCallbackImpl implements ConfirmCallback {

    @Override
    public void confirm(long deliveryTag, boolean multiple) {
        if (multiple) {
            System.out.println("multiple deliveryTag已确认: " + deliveryTag);
        } else {
            System.out.println("deliveryTag已确认: " + deliveryTag);
        }
    }
}

使用ConfirmCallback

然后,在初始化RabbitTemplate时,设置ConfirmCallback接口的实现。

RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(new ConfirmCallbackImpl());

消息确认超时

在某些情况下,服务器可能需要花费更多的时间来确认消息,此时,生产者可以设置一个超时时间,如果超时时间内没有收到服务器的确认,则认为消息丢失,生产者需要重新发送消息。

rabbitTemplate.setMandatory(true);

设置mandatory属性为true,如果服务器无法将消息路由到队列,则会返回NackCallback。

NackCallback

NackCallback接口包含一个handle(long deliveryTag, boolean multiple)方法,当服务器无法成功将消息路由到队列时,会调用该方法。

rabbitTemplate.setReturnCallback(new ReturnCallbackImpl());

消费者消息确认机制

消费者消息确认机制,是指消费者在收到消息后,向服务器发送确认消息,表明已收到消息。

手动确认

消费者可以手动发送确认消息,在收到消息后,调用channel.basicAck(deliveryTag, false)方法确认消息。

channel.basicConsume(queueName, false, new Consumer() {
    @Override
    public void handleConsume(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("收到消息: " + message);
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});

异步确认

消费者也可以异步发送确认消息,在收到消息后,调用channel.basicAck(deliveryTag, true)方法确认消息。

channel.basicConsume(queueName, true, new Consumer() {
    @Override
    public void handleConsume(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("收到消息: " + message);
        channel.basicAck(envelope.getDeliveryTag(), true);
    }
});

总结

通过生产者消息确认机制和消费者消息确认机制,可以确保消息从生产者到消费者的可靠传输,防止消息丢失。在生产者消息确认机制中,可以使用ConfirmCallback接口和ReturnCallback接口来确认消息是否被服务器成功接收,以及处理无法路由的消息。在消费者消息确认机制中,可以使用手动确认和异步确认来确认已收到消息。