返回

用RabbitMQ实现路由模式的简单指南

后端

1. 简介

路由模式跟发布订阅模式类似,然后在订阅模式的基础上加上了类型,订阅模式是分发到所有绑定到交换机的队列,路由模式只分发到绑定在交换机上面指定路由键的队列。

2. 路由模式的简单实现

我们先来介绍一下如何在Laravel中实现路由模式。首先,我们需要安装rabbitmq/laravel包。

composer require rabbitmq/laravel

安装完成后,我们需要在config/rabbitmq.php配置文件中进行一些配置。

'default' => [
    'host' => env('RABBITMQ_HOST', 'localhost'),
    'port' => env('RABBITMQ_PORT', 5672),
    'username' => env('RABBITMQ_USERNAME', 'guest'),
    'password' => env('RABBITMQ_PASSWORD', 'guest'),
    'vhost' => env('RABBITMQ_VHOST', '/'),
    'exchange' => 'direct_logs',
    'queue' => 'logs',
    'routing_key' => 'info',
    'durable' => true,
    'auto_delete' => false,
    'passive' => false,
    'exclusive' => false,
    'nowait' => false,
    'persistent' => true,
    'qos' => [
        'prefetch_size' => 0,
        'prefetch_count' => 1,
        'global' => false,
    ],
    'network_recovery_interval' => 5,
    'retry_delay' => 5,
    'max_retries' => 3,
],

接下来,我们需要在Laravel中创建一个Producer类。Producer类负责将消息发送到RabbitMQ中。

class Producer
{
    public function send($message)
    {
        $connection = new RabbitMQ();
        $channel = $connection->channel();

        $channel->exchange_declare('direct_logs', 'direct', false, false, false);

        $channel->basic_publish($message, 'direct_logs', 'info');

        echo " [x] Sent $message\n";

        $channel->close();
        $connection->close();
    }
}

最后,我们需要在Laravel中创建一个Consumer类。Consumer类负责从RabbitMQ中接收消息。

class Consumer
{
    public function receive()
    {
        $connection = new RabbitMQ();
        $channel = $connection->channel();

        $channel->exchange_declare('direct_logs', 'direct', false, false, false);

        $channel->queue_declare('logs', false, false, false, false);

        $channel->queue_bind('logs', 'direct_logs', 'info');

        $channel->basic_consume('logs', '', false, true, false, false, function ($message) use ($channel) {
            echo ' [x] Received ', $message->body, "\n";

            $channel->basic_ack($message->delivery_info['delivery_tag']);
        });

        while (count($channel->callbacks)) {
            $channel->wait();
        }

        $channel->close();
        $connection->close();
    }
}

至此,我们就完成了路由模式的简单实现。