返回

Laravel + RabbitMQ: 使用 ssi-anik/laravel-amqp 处理消息

php

Laravel 应用中处理 RabbitMQ 消息

消息队列在异步处理和微服务架构中扮演关键角色。当涉及到 Laravel 与 RabbitMQ 的集成时,选择正确的库和理解消息消费方式至关重要。这里讨论在使用 ssi-anik/laravel-amqp 包时如何高效地处理 RabbitMQ 消息,以及它与先前 vyuldashev/laravel-queue-rabbitmq 的不同之处。

理解问题根源

vyuldashev/laravel-queue-rabbitmq 过渡到 ssi-anik/laravel-amqp,最直接的感受就是消费消息方式的变化。前者提供了类似 php artisan rabbitmq:consumephp artisan queue:work 这样的命令,可以轻松地持续监控队列并处理消息。后者则未直接提供等效的命令。虽然你可以使用 Amqp::connection(...)->publish(...) 发送消息到队列,但消费消息的过程需要更深入的理解。尝试运行 amqp:consume 会产生命名空间错误,说明该命令并非由该包提供。

解决方案:自定义命令与 Laravel 队列

问题的核心在于,ssi-anik/laravel-amqp 主要专注于底层 AMQP 交互,并没有内置的守护进程式消费命令。我们通常的处理方式是构建一个自定义的 Artisan 命令或结合 Laravel 队列功能。

1. 构建自定义消费命令

为了创建一个可长时间运行的消费命令,我们需要执行以下操作:

  1. 创建 Artisan 命令: 使用 php artisan make:command ConsumeQueue 创建一个名为 ConsumeQueue 的 Artisan 命令。

  2. 实现命令逻辑:app/Console/Commands/ConsumeQueue.php 中,修改 handle() 方法来实现消息消费逻辑:

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Illuminate\Support\Facades\Log;
use VladimirYuldashev\LaravelQueueRabbitMQ\Facades\RabbitMQ; // 注意引入
use Illuminate\Support\Facades\App;
class ConsumeQueue extends Command
{
    protected $signature = 'amqp:consume-custom {connection}';

    protected $description = 'Consumes messages from the specified AMQP queue';

    public function handle()
    {
        $connectionName = $this->argument('connection');

        try {

             RabbitMQ::connection($connectionName)->consume(function ($message, $channel) {

                  try {
                      //处理消息的逻辑在这里
                        Log::info('Message Received' . $message->body);
                      // 确认消息,使其从队列移除
                     $channel->basic_ack($message->delivery_info['delivery_tag']);

                  }
                   catch (\Exception $exception)
                   {
                        $channel->basic_nack($message->delivery_info['delivery_tag'], false, true);
                    Log::error("Error processing message on ". $connectionName .": ".$exception->getMessage());
                     }
           }, null , [], null, [
                  'prefetch_count' => config("queue.connections.{$connectionName}.qos_prefetch_count") ?? 1,
                  'qos_global' =>  false,
           ]);

           } catch (\Exception $exception)
          {
            Log::error("Failed to initiate listener to : ". $connectionName .": ".$exception->getMessage());
         }
       // 为了守护运行需要一个 loop

            while (true)
                {

                if (!App::runningInConsole()){
                    return false;
                }

                    // pause the application
                    sleep(1);
               }


    }
}

这里:

  • RabbitMQ::connection($connectionName)->consume() 获取指定的连接实例并启动消费流程。
  • 在闭包中编写了业务处理代码, 使用 try/catch 处理错误消息。 必须注意 basic_ackbasic_nack 使用,保证消息的可靠传输,否则消息无法删除,队列会不断增大,占用系统资源。
  • 最后通过死循环确保服务长时间运行,只有非cli环境(比如web)的时候才能退出死循环。
  1. 运行命令: 执行 php artisan amqp:consume-custom queuename (把 queuename 替换为你的连接名)即可启动消息消费者。这个消费者会一直运行,直到手动停止。

2. 利用 Laravel 队列系统

另一个可行的方案是使用 Laravel 的队列系统结合 ssi-anik/laravel-amqp 发送和接收消息。这使得我们可以充分利用 Laravel 的队列管理功能,比如重试策略和后台任务处理。

  1. 创建队列任务: 使用 php artisan make:job ProcessMessage 生成一个任务类。

  2. 配置 ProcessMessage.phpapp/Jobs/ProcessMessage.php 中,添加你的消息处理逻辑。这可以与之前自定义命令中 consume 函数内部代码类似,主要是需要确认消息处理后调用 basic_ack

connectionName=$connectionName; $this->messageBody=$messageBody; $this->deliveryTag=$deliveryTag; $this->queue = $connectionName; } /** * Execute the job. * * @return void */ public function handle() { $message= json_decode( $this->messageBody, true); $connectionName = $this->connectionName; try { Log::info('Message Received (Queue)' . $this->messageBody); //处理消息的逻辑在这里 $channel= RabbitMQ::connection($connectionName)->getChannel(); // 确认消息,使其从队列移除 $channel->basic_ack($this->deliveryTag); } catch (Exception $exception) { Log::error("Error processing message on ". $connectionName .": ".$exception->getMessage()); } } } ``` 重要: ` __construct` 方法会接收消息体的原始值,而 handle 函数是真正进行消息处理的函数,如果使用其他的序列化方式,可能需要在构造方法处对接收值进行转换。同时 handle 需要传递 `$deliveryTag`, 用于消息应答. 3. **创建消费者命令:** 修改之前的 `ConsumeQueue.php` , 添加队列dispatch代码: ```php argument('connection'); try { RabbitMQ::connection($connectionName)->consume(function ($message, $channel) use (&$connectionName) { try { ProcessMessage::dispatch($connectionName, $message->body, $message->delivery_info['delivery_tag']); } catch (\Exception $exception) { Log::error("Error Dispatch Message on queue ". $connectionName .": ".$exception->getMessage()); $channel->basic_nack($message->delivery_info['delivery_tag'], false, true); } }, null , [], null, [ 'prefetch_count' => config("queue.connections.{$connectionName}.qos_prefetch_count") ?? 1, 'qos_global' => false, ]); } catch (\Exception $exception) { Log::error("Failed to initiate listener to : ". $connectionName .": ".$exception->getMessage()); } // 为了守护运行需要一个 loop while (true) { if (!app()->runningInConsole()){ return false; } sleep(1); } } } ``` 在这里我们将接受到的消息推入消息队列。 队列系统的错误处理机制会自动处理和重试任务,这样增加了系统的鲁棒性。 4. **启动队列监听:** 执行 `php artisan queue:work` ( 或者使用 supervisor 进行守护进程启动) 运行你的队列监听器,确保在接收到 RabbitMQ 消息时能够自动触发任务。 ### 安全建议 * **连接配置:** 在 `.env` 或 `config/queue.php` 中配置你的 RabbitMQ 连接参数,保证参数安全性。 * **异常处理:** 请确保你的 `consume` 回调函数以及 Laravel 队列任务能够正确处理各种异常情况,并提供恰当的错误处理机制。 使用 try catch 捕获错误并记录日志是良好的开发习惯。 * **消息确认:** 请确保使用`basic_ack`和 `basic_nack` 方法处理接受的消息。 * **速率限制:** 配置消息消费速率,可以使用 `prefetch_count` 设置, 避免应用被过多消息淹没。 综上,尽管 `ssi-anik/laravel-amqp` 不直接提供守护式消费命令,通过构建自定义 Artisan 命令或结合 Laravel 队列系统,依然能够高效安全地消费 RabbitMQ 消息,你需要根据自身项目的具体需求选择合适的方案。 理解包的原理并根据业务进行调整是成功的关键。