返回
MQ 的应用场景:从初识到精通,全面了解 MQ 的使用
见解分享
2023-10-25 05:51:27
MQ:分布式系统的关键角色,解耦、异步、削锋
在分布式系统中,消息队列(MQ)扮演着至关重要的角色。其强大的解耦、异步处理和流量削锋能力,为系统带来了诸多益处,提升了可靠性、扩展性和性能。
一、系统解耦:组件之间互不干扰
MQ宛如一座桥梁,将分布式系统中的各个组件巧妙隔离开来。它们可以独立运行,互不干扰,有效解决了紧密耦合带来的脆弱性和维护难度。
// 生产者代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class Producer {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 设置队列名称
String queueName = "hello";
// 声明队列
channel.queueDeclare(queueName, false, false, false, null);
// 发送消息
String message = "Hello World!";
channel.basicPublish("", queueName, null, message.getBytes());
// 关闭信道和连接
channel.close();
connection.close();
}
}
// 消费者代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class Consumer {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 设置队列名称
String queueName = "hello";
// 声明队列
channel.queueDeclare(queueName, false, false, false, null);
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws Exception {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
}
};
// 监听队列
channel.basicConsume(queueName, true, consumer);
// 保持连接
System.out.println("Waiting for messages...");
while (true) {
Thread.sleep(1000);
}
}
}
二、异步处理:提高系统吞吐量
在高并发系统中,耗时的任务往往会拖累主流程的性能。MQ可以将这些任务放入队列,让专门的消费者异步处理,从而提高系统的吞吐量。
// 生产者代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class Producer {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 设置队列名称
String queueName = "task_queue";
// 声明队列
channel.queueDeclare(queueName, false, false, false, null);
// 发送消息
String message = "Hello World!";
channel.basicPublish("", queueName, null, message.getBytes());
// 关闭信道和连接
channel.close();
connection.close();
}
}
// 消费者代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class Consumer {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 设置队列名称
String queueName = "task_queue";
// 声明队列
channel.queueDeclare(queueName, false, false, false, null);
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws Exception {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
// 模拟耗时处理
Thread.sleep(1000);
// 确认处理完成
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 监听队列
channel.basicConsume(queueName, false, consumer);
// 保持连接
System.out.println("Waiting for messages...");
while (true) {
Thread.sleep(1000);
}
}
}
三、流量削锋:平滑系统负载
MQ可以充当流量缓冲区,削减突发流量高峰,让系统平滑地处理请求。
// 生产者代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class Producer {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 设置队列名称
String queueName = "peak_queue";
// 声明队列
channel.queueDeclare(queueName, false, false, false, null);
// 发送大量消息
for (int i = 0; i < 10000; i++) {
String message = "Hello World! " + i;
channel.basicPublish("", queueName, null, message.getBytes());
}
// 关闭信道和连接
channel.close();
connection.close();
}
}
// 消费者代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class Consumer {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 设置队列名称
String queueName = "peak_queue";
// 声明队列
channel.queueDeclare(queueName, false, false, false, null);
// 设置预取消息数量
channel.basicQos(10);
// 创建消费者
Consumer consumer = new DefaultConsumer(channel