返回
消息队列RabbitMQ入门:从零开始玩转简单、队列、发布确认模式
后端
2023-09-15 17:22:26
RabbitMQ入门:玩转消息队列,从简单到可靠
什么是消息队列?
消息队列是计算机应用系统中的一种特殊消息传递方式,用于在应用系统之间传递消息。它就好比一个邮局,负责处理邮件的收发。在微服务架构中,消息队列是必不可少的组件之一,它可以帮助我们实现服务解耦、负载均衡、异步处理等功能。
RabbitMQ:开源消息队列之王
RabbitMQ是目前最流行的开源消息队列之一,它简单易用、功能强大,深受广大开发者的喜爱。本文将从零开始,带你一步步掌握RabbitMQ的三种核心模式——简单模式、队列模式、发布确认模式,让你快速实现数据通信!
简单模式:基础的消息发送和接收
简单模式是最基础的消息发送和接收模式,它由一个生产者和一个消费者组成。生产者将消息发送到消息队列,消费者从消息队列中接收消息。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// 创建连接
Connection connection = factory.newConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("hello", false, false, false, null);
// 发送消息
String message = "Hello World!";
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println("消息发送成功:" + message);
// 关闭频道和连接
channel.close();
connection.close();
}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Consumer {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// 创建连接
Connection connection = factory.newConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("hello", false, false, false, null);
// 接收消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消息接收成功:" + message);
};
channel.basicConsume("hello", true, deliverCallback, consumerTag -> {});
// 关闭频道和连接
channel.close();
connection.close();
}
}
队列模式:负载均衡和并发处理
队列模式与简单模式类似,但它可以同时有多个消费者。当生产者发送消息时,消息会进入队列,然后由多个消费者同时处理。这样可以实现负载均衡,提高系统的处理效率。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// 创建连接
Connection connection = factory.newConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("hello", false, false, false, null);
// 发送消息
for (int i = 0; i < 10; i++) {
String message = "消息" + i;
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println("消息发送成功:" + message);
}
// 关闭频道和连接
channel.close();
connection.close();
}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Consumer {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// 创建连接
Connection connection = factory.newConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("hello", false, false, false, null);
// 接收消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消息接收成功:" + message);
};
channel.basicConsume("hello", true, deliverCallback, consumerTag -> {});
// 关闭频道和连接
channel.close();
connection.close();
}
}
发布确认模式:确保消息的可靠传递
发布确认模式可以确保消息的可靠传递。当生产者发送消息时,它会等待消息队列的确认,直到确认收到消息后,生产者才会继续发送下一条消息。这样可以防止消息丢失。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConfirmListener;
public class Producer {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// 创建连接
Connection connection = factory.newConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("hello", false, false, false, null);
// 启用发布确认模式
channel.confirmSelect();
// 发送消息
for (int i = 0; i < 10; i++) {
String message = "消息" + i;
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println("消息发送成功:" + message);
}
// 等待确认
channel.waitForConfirmsOrDie();
// 关闭频道和连接
channel.close();
connection.close();
}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Consumer {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// 创建连接
Connection connection = factory.newConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("hello", false, false, false, null);
// 接收消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消息接收成功:" + message);
};
channel.basicConsume("hello", true, deliverCallback, consumerTag -> {});
// 关闭频道和连接
channel.close();
connection.close();
}
}
常见问题解答
1. RabbitMQ与其他消息队列有什么区别?
RabbitMQ是一个开源的消息队列,而像Kafka这样的其他消息队列是商业化的。RabbitMQ的特点是易于使用、功能强大,而Kafka的特点是高性能和可扩展性。
2. 什么时候应该使用消息队列?
消息队列应该在以下情况下使用:
- 服务解耦
- 负载均衡