Laravel + RabbitMQ: 使用 ssi-anik/laravel-amqp 处理消息
2025-01-09 20:04:57
Laravel 应用中处理 RabbitMQ 消息
消息队列在异步处理和微服务架构中扮演关键角色。当涉及到 Laravel 与 RabbitMQ 的集成时,选择正确的库和理解消息消费方式至关重要。这里讨论在使用 ssi-anik/laravel-amqp
包时如何高效地处理 RabbitMQ 消息,以及它与先前 vyuldashev/laravel-queue-rabbitmq
的不同之处。
理解问题根源
从 vyuldashev/laravel-queue-rabbitmq
过渡到 ssi-anik/laravel-amqp
,最直接的感受就是消费消息方式的变化。前者提供了类似 php artisan rabbitmq:consume
或 php artisan queue:work
这样的命令,可以轻松地持续监控队列并处理消息。后者则未直接提供等效的命令。虽然你可以使用 Amqp::connection(...)->publish(...)
发送消息到队列,但消费消息的过程需要更深入的理解。尝试运行 amqp:consume
会产生命名空间错误,说明该命令并非由该包提供。
解决方案:自定义命令与 Laravel 队列
问题的核心在于,ssi-anik/laravel-amqp
主要专注于底层 AMQP 交互,并没有内置的守护进程式消费命令。我们通常的处理方式是构建一个自定义的 Artisan 命令或结合 Laravel 队列功能。
1. 构建自定义消费命令
为了创建一个可长时间运行的消费命令,我们需要执行以下操作:
-
创建 Artisan 命令: 使用
php artisan make:command ConsumeQueue
创建一个名为ConsumeQueue
的 Artisan 命令。 -
实现命令逻辑: 在
app/Console/Commands/ConsumeQueue.php
中,修改handle()
方法来实现消息消费逻辑:
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Log;
use VladimirYuldashev\LaravelQueueRabbitMQ\Facades\RabbitMQ; // 注意引入
use Illuminate\Support\Facades\App;
class ConsumeQueue extends Command
{
protected $signature = 'amqp:consume-custom {connection}';
protected $description = 'Consumes messages from the specified AMQP queue';
public function handle()
{
$connectionName = $this->argument('connection');
try {
RabbitMQ::connection($connectionName)->consume(function ($message, $channel) {
try {
//处理消息的逻辑在这里
Log::info('Message Received' . $message->body);
// 确认消息,使其从队列移除
$channel->basic_ack($message->delivery_info['delivery_tag']);
}
catch (\Exception $exception)
{
$channel->basic_nack($message->delivery_info['delivery_tag'], false, true);
Log::error("Error processing message on ". $connectionName .": ".$exception->getMessage());
}
}, null , [], null, [
'prefetch_count' => config("queue.connections.{$connectionName}.qos_prefetch_count") ?? 1,
'qos_global' => false,
]);
} catch (\Exception $exception)
{
Log::error("Failed to initiate listener to : ". $connectionName .": ".$exception->getMessage());
}
// 为了守护运行需要一个 loop
while (true)
{
if (!App::runningInConsole()){
return false;
}
// pause the application
sleep(1);
}
}
}
这里:
RabbitMQ::connection($connectionName)->consume()
获取指定的连接实例并启动消费流程。- 在闭包中编写了业务处理代码, 使用 try/catch 处理错误消息。 必须注意
basic_ack
和basic_nack
使用,保证消息的可靠传输,否则消息无法删除,队列会不断增大,占用系统资源。 - 最后通过死循环确保服务长时间运行,只有非cli环境(比如web)的时候才能退出死循环。
- 运行命令: 执行
php artisan amqp:consume-custom queuename
(把queuename
替换为你的连接名)即可启动消息消费者。这个消费者会一直运行,直到手动停止。
2. 利用 Laravel 队列系统
另一个可行的方案是使用 Laravel 的队列系统结合 ssi-anik/laravel-amqp
发送和接收消息。这使得我们可以充分利用 Laravel 的队列管理功能,比如重试策略和后台任务处理。
-
创建队列任务: 使用
php artisan make:job ProcessMessage
生成一个任务类。 -
配置
ProcessMessage.php
: 在app/Jobs/ProcessMessage.php
中,添加你的消息处理逻辑。这可以与之前自定义命令中consume
函数内部代码类似,主要是需要确认消息处理后调用basic_ack
。