返回

消息队列RabbitMQ入门:从零开始玩转简单、队列、发布确认模式

后端

RabbitMQ入门:玩转消息队列,从简单到可靠

什么是消息队列?

消息队列是计算机应用系统中的一种特殊消息传递方式,用于在应用系统之间传递消息。它就好比一个邮局,负责处理邮件的收发。在微服务架构中,消息队列是必不可少的组件之一,它可以帮助我们实现服务解耦、负载均衡、异步处理等功能。

RabbitMQ:开源消息队列之王

RabbitMQ是目前最流行的开源消息队列之一,它简单易用、功能强大,深受广大开发者的喜爱。本文将从零开始,带你一步步掌握RabbitMQ的三种核心模式——简单模式、队列模式、发布确认模式,让你快速实现数据通信!

简单模式:基础的消息发送和接收

简单模式是最基础的消息发送和接收模式,它由一个生产者和一个消费者组成。生产者将消息发送到消息队列,消费者从消息队列中接收消息。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置连接参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("hello", false, false, false, null);
        // 发送消息
        String message = "Hello World!";
        channel.basicPublish("", "hello", null, message.getBytes());
        System.out.println("消息发送成功:" + message);
        // 关闭频道和连接
        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Consumer {

    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置连接参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("hello", false, false, false, null);
        // 接收消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("消息接收成功:" + message);
        };
        channel.basicConsume("hello", true, deliverCallback, consumerTag -> {});
        // 关闭频道和连接
        channel.close();
        connection.close();
    }
}

队列模式:负载均衡和并发处理

队列模式与简单模式类似,但它可以同时有多个消费者。当生产者发送消息时,消息会进入队列,然后由多个消费者同时处理。这样可以实现负载均衡,提高系统的处理效率。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置连接参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("hello", false, false, false, null);
        // 发送消息
        for (int i = 0; i < 10; i++) {
            String message = "消息" + i;
            channel.basicPublish("", "hello", null, message.getBytes());
            System.out.println("消息发送成功:" + message);
        }
        // 关闭频道和连接
        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Consumer {

    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置连接参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("hello", false, false, false, null);
        // 接收消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("消息接收成功:" + message);
        };
        channel.basicConsume("hello", true, deliverCallback, consumerTag -> {});
        // 关闭频道和连接
        channel.close();
        connection.close();
    }
}

发布确认模式:确保消息的可靠传递

发布确认模式可以确保消息的可靠传递。当生产者发送消息时,它会等待消息队列的确认,直到确认收到消息后,生产者才会继续发送下一条消息。这样可以防止消息丢失。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConfirmListener;

public class Producer {

    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置连接参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("hello", false, false, false, null);
        // 启用发布确认模式
        channel.confirmSelect();
        // 发送消息
        for (int i = 0; i < 10; i++) {
            String message = "消息" + i;
            channel.basicPublish("", "hello", null, message.getBytes());
            System.out.println("消息发送成功:" + message);
        }
        // 等待确认
        channel.waitForConfirmsOrDie();
        // 关闭频道和连接
        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Consumer {

    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置连接参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("hello", false, false, false, null);
        // 接收消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("消息接收成功:" + message);
        };
        channel.basicConsume("hello", true, deliverCallback, consumerTag -> {});
        // 关闭频道和连接
        channel.close();
        connection.close();
    }
}

常见问题解答

1. RabbitMQ与其他消息队列有什么区别?

RabbitMQ是一个开源的消息队列,而像Kafka这样的其他消息队列是商业化的。RabbitMQ的特点是易于使用、功能强大,而Kafka的特点是高性能和可扩展性。

2. 什么时候应该使用消息队列?

消息队列应该在以下情况下使用:

  • 服务解耦
  • 负载均衡