返回

MQ 的应用场景:从初识到精通,全面了解 MQ 的使用

见解分享

MQ:分布式系统的关键角色,解耦、异步、削锋

在分布式系统中,消息队列(MQ)扮演着至关重要的角色。其强大的解耦、异步处理和流量削锋能力,为系统带来了诸多益处,提升了可靠性、扩展性和性能。

一、系统解耦:组件之间互不干扰

MQ宛如一座桥梁,将分布式系统中的各个组件巧妙隔离开来。它们可以独立运行,互不干扰,有效解决了紧密耦合带来的脆弱性和维护难度。

// 生产者代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置连接参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建信道
        Channel channel = connection.createChannel();
        // 设置队列名称
        String queueName = "hello";
        // 声明队列
        channel.queueDeclare(queueName, false, false, false, null);
        // 发送消息
        String message = "Hello World!";
        channel.basicPublish("", queueName, null, message.getBytes());
        // 关闭信道和连接
        channel.close();
        connection.close();
    }
}
// 消费者代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置连接参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建信道
        Channel channel = connection.createChannel();
        // 设置队列名称
        String queueName = "hello";
        // 声明队列
        channel.queueDeclare(queueName, false, false, false, null);
        // 创建消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws Exception {
                String message = new String(body, "UTF-8");
                System.out.println("Received message: " + message);
            }
        };
        // 监听队列
        channel.basicConsume(queueName, true, consumer);
        // 保持连接
        System.out.println("Waiting for messages...");
        while (true) {
            Thread.sleep(1000);
        }
    }
}

二、异步处理:提高系统吞吐量

在高并发系统中,耗时的任务往往会拖累主流程的性能。MQ可以将这些任务放入队列,让专门的消费者异步处理,从而提高系统的吞吐量。

// 生产者代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置连接参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建信道
        Channel channel = connection.createChannel();
        // 设置队列名称
        String queueName = "task_queue";
        // 声明队列
        channel.queueDeclare(queueName, false, false, false, null);
        // 发送消息
        String message = "Hello World!";
        channel.basicPublish("", queueName, null, message.getBytes());
        // 关闭信道和连接
        channel.close();
        connection.close();
    }
}
// 消费者代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置连接参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建信道
        Channel channel = connection.createChannel();
        // 设置队列名称
        String queueName = "task_queue";
        // 声明队列
        channel.queueDeclare(queueName, false, false, false, null);
        // 创建消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws Exception {
                String message = new String(body, "UTF-8");
                System.out.println("Received message: " + message);
                // 模拟耗时处理
                Thread.sleep(1000);
                // 确认处理完成
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 监听队列
        channel.basicConsume(queueName, false, consumer);
        // 保持连接
        System.out.println("Waiting for messages...");
        while (true) {
            Thread.sleep(1000);
        }
    }
}

三、流量削锋:平滑系统负载

MQ可以充当流量缓冲区,削减突发流量高峰,让系统平滑地处理请求。

// 生产者代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置连接参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建信道
        Channel channel = connection.createChannel();
        // 设置队列名称
        String queueName = "peak_queue";
        // 声明队列
        channel.queueDeclare(queueName, false, false, false, null);
        // 发送大量消息
        for (int i = 0; i < 10000; i++) {
            String message = "Hello World! " + i;
            channel.basicPublish("", queueName, null, message.getBytes());
        }
        // 关闭信道和连接
        channel.close();
        connection.close();
    }
}
// 消费者代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置连接参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建信道
        Channel channel = connection.createChannel();
        // 设置队列名称
        String queueName = "peak_queue";
        // 声明队列
        channel.queueDeclare(queueName, false, false, false, null);
        // 设置预取消息数量
        channel.basicQos(10);
        // 创建消费者
        Consumer consumer = new DefaultConsumer(channel