Laravel并发Upsert致MySQL死锁?解决方案全解析
2025-03-24 19:48:45
Laravel 中并发控制台命令 upsert 操作导致的序列化死锁
你在 Laravel 11 中使用 upsert
批量更新 MySQL 8 数据库时,遇到了序列化死锁 (Serialization deadlock) 的问题。即便在唯一索引中加入了 worker
字段来区分不同的控制台命令,运行多个并发的控制台命令仍然会导致死锁。 好消息,我将给你提供分析和解决方案。
一、 问题原因分析
死锁通常发生在多个事务同时尝试获取相同资源的锁,且形成循环依赖时。 具体到你当前场景, 我认为原因可能有下面几点:
upsert
的内部机制: Laravel 的upsert
方法,底层实现可能是先尝试插入 (INSERT),如果因为唯一键冲突而失败,则转为更新 (UPDATE)。 在高并发场景下,多个upsert
同时尝试插入相同记录,可能导致锁竞争。即使有worker
字段,若两条记录的其他唯一索引字段相同,还是可能发生冲突,尤其是这些冲突几乎同时发生时。- 隐式锁: MySQL (InnoDB 存储引擎) 在执行
INSERT ... ON DUPLICATE KEY UPDATE
语句时,会对涉及的行加锁, 包括 gap lock 和 next-key lock (具体锁的机制由隔离级别控制), 如果不当的设计会使隐式锁的范围变大。 这种隐式锁定行为,即便是不同的 worker,如果存在任何时间点的索引数据的更新(由于存在虚拟生成的key_hash
字段) 都可能会影响。 - 索引设计:
key_hash
列通过 MD5 哈希生成,理论上具有较好的散列性。但是,如果数据分布不均匀,或者哈希冲突的概率增加,就可能出现多个不同的输入产生相同的key_hash
值,进一步增加冲突几率。 你的数据源于Redis stream, 可能会在极短的时间生成大量具有高度相同性的数据, 此时这种风险更大。 - 事务边界过大 : 代码里,对
items
里的每一行aggregate
数据, 都开启一个数据库事物. 当items
比较多时, 这等同于items.count()
这么多个事物要开启和执行.items
中一旦数据有关联性(即便通过了你的worker区分), 事物都会变大和增加竞争锁概率.
二、 解决方案
下面是我给你提出的几个解决方案,你可以结合实际情况选择合适的方案,或者将多个方案结合使用。
1. 显式锁定 + 优化事务
-
原理: 使用
SELECT ... FOR UPDATE
显式获取锁,确保在更新之前获得对行的排他锁。 -
实现:
将你的store
方法改成如下。将事务的开启放到所有循环外层。 简化SQL的生成,避免Arr::get
private static function store(Collection $items): void { if ($items->isEmpty()) { return; } DB::transaction(function () use ($items) { $items->each(function (array $aggregate) { //简化$filteredAggregates 的获得方式, 并将循环提到这里 $filteredAggregates = collect(Arr::get($aggregate, 'aggregates', []))->map(function ($aggregate) { return collect($aggregate)->except([ 'company_id', 'affiliate_id', 'affiliate_campaign_id', 'buyer_id', 'buyer_tier_id' ])->toArray(); })->toArray(); foreach ($filteredAggregates as $aggregate) { // 取出 key_hash, 单独给一个变量.清晰一些。 $keyHash = $aggregate['key_hash']; // 假设key_hash一定存在。 // 先悲观锁定, 再更新。 $existingRecord = StatisticAggregate::lockForUpdate()->where('key_hash', $keyHash)->first(); $column = $aggregate['column'] ?? 'total_other'; if ($existingRecord) { // 存在记录,直接更新. 使用 increment $existingRecord->increment($column, $aggregate[$column] ?? 0); //假定所有值是数值型。 //确保更新了updated_at 时间. $existingRecord->updated_at = now(); $existingRecord->save(); } else { //使用insert, insert 不需要考虑$column.直接把$aggregate 传入. StatisticAggregate::create($aggregate); } } }); }, 3); // 最多尝试3次 }
-
安全建议: 使用
lockForUpdate()
时,要确保事务尽可能短,以减少锁的持有时间。 同时,监控数据库的锁等待情况。 -
进阶用法: 针对可能出现的长事务或锁等待时间过长的情况, 你可以对大事务进行分解. 对items 进行切割后批量
store
。private static function store(Collection $items): void { ... // 假定每 500 条一个批次. 减少锁持有的时间和降低单个事物的压力。 $items->chunk(500)->each(function (Collection $chunkedItems) { DB::transaction(function () use ($chunkedItems) { //原来的 items 替换成 chunkedItems 即可 }); }); .... }
2. 使用队列消化 (Queue-based Processing)
-
原理: 将
upsert
操作放入队列,由队列 worker 逐个处理,避免并发写入。这样做的目的不是消灭并发,而是把可能的“强冲突”并发给串行化执行。 -
实现:
-
修改你的控制台命令,让它不再直接执行
upsert
,而是将数据推送到一个队列中:// 在你的 console command 的 handle 方法中 while (true) { //.... 其他代码保持一致。 $entries = collect(Redis::connection('stream')->xrange(self::ingestKey($worker), '-', '+', self::$chunk)); if ($entries->isEmpty()) { return $total; } foreach ($entries as $entryId => $entryData) { // $data 现在是未序列化过后的数据 $data = unserialize($entryData['data']); // 不直接在这里调用 store 了. //StatisticAggregateStream::digest((int) $this->argument('worker')); //将每条 entry 都 push 到一个新的队列里 dispatch(new ProcessStatisticAggregate($data)); // 创建一个新的 job //删掉已经入队列的. Redis::connection('stream')->xdel(self::ingestKey($worker), [$entryId]); } // ..... }
-
创建一个新的 job (例如
ProcessStatisticAggregate
):<?php namespace App\Jobs; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; use App\Services\StatisticAggregateStream; class ProcessStatisticAggregate implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; public $tries = 3; /** * 数据 */ protected $data; /** * Create a new job instance. * @param array $data //从redis里取出来的,经过反序列化后的原始数据. * @return void */ public function __construct(array $data) { $this->data = $data; } /** * Execute the job. * * @return void */ public function handle() { //把原来的处理逻辑放在这里,但是用单独的, 接收数组的方法 StatisticAggregateStream::storeSingleItem($this->data); } }
-
修改
StatisticAggregateStream
增加单条item
的处理函数
/** * 单个 item 的 store 处理. 提取给 queue job 使用. * @param $item 单个条目数据,格式与你传入 items 的元素相同. * */ public static function storeSingleItem(array $item): void { DB::transaction(function() use ($item){ $column = Arr::get($item, 'column', 'total_other'); $filteredAggregates = collect(Arr::get($item, 'aggregates', []))->map(function ($aggregate) { return collect($aggregate)->except([ 'company_id', 'affiliate_id', 'affiliate_campaign_id', 'buyer_id', 'buyer_tier_id' ])->toArray(); })->toArray(); //这里也不用循环,而是针对每个 $aggregate foreach($filteredAggregates as $aggregate) { //同样使用 keyHash $keyHash = $aggregate['key_hash']; //采取尝试更新或创建 $statistic = StatisticAggregate::firstOrNew(['key_hash' => $keyHash]); if (!$statistic->exists) { // 如果是新记录,填充所有数据. 由于$aggregate 已经是过滤过后的. 可以全部合并。 $statistic->fill($aggregate); } //针对既有数据, $statistic->{$column} = ($statistic->{$column} ?? 0) + ($aggregate[$column] ?? 0); //如果有column则累加. $statistic->save(); } }, 3); // 尝试次数 }
- 运行队列 worker:
php artisan queue:work
重要提醒 : 如果你的数据量比较大,或者这个queue
的处理需要优化到极致,仍然需要监控和调整queue
的worker
数量.
-
3. Redis 侧预聚合(Pre-aggregation on Redis Side)
-
原理: 既然问题源于并发的
upsert
导致的数据库锁竞争, 而且你的数据本来就来源于 Redis。那么干脆直接利用 Redis 的原子性操作和计算能力, 在 Redis 侧先完成一部分预聚合。 可以理解为合并数据到stream
之前先完成一个预计算和合并操作. -
实现步骤:
1. 修改StoreBatchStatisticAggregate
的handle
部分,增加对重复key 的数据合并:public function handle(BatchStatisticAggregateCreation $event): void { //.... 其他代码一致。 $aggregates = ['worker' => $this->getWorker(), 'column' => null, 'value' => null, 'aggregates' => []]; $mergedAggregates = []; // 用于存放合并后的 aggregates foreach (collect(Arr::get($event->data, 'aggregates', []))->toArray() as $item) { $now = Carbon::parse(Arr::get($event->data, 'now', now())); if (! Arr::get($item, 'modelable_type', null) || ! Arr::get($item, 'modelable_id', null)) { continue; } // .... aggregate 构建保持一致 $aggregate = $this->preAggregateStatistic([....]); //..... foreach ($this->periods() as $period) { $bucket = $this->bucket($period, Arr::get($item, 'bucket_date', $now)); $mergedKey = //合并唯一性 key。 要与 $table->unique 的组成部分一致 Arr::get($aggregate,'for') . '_' . Arr::get($aggregate,'product') . '_' . Arr::get($aggregate, 'country') . '_' . Arr::get($aggregate,'worker') . '_' . $bucket. '_' . $period . '_' . Arr::get($aggregate, 'modelable_type') . '_' . Arr::get($aggregate,'modelable_id') . '_' . Arr::get($event->data, 'relations.company_id', 0) . '_' . Arr::get($event->data, 'relations.affiliate_id', 0) . '_' . Arr::get($event->data, 'relations.affiliate_campaign_id', 0) . '_' . Arr::get($event->data, 'relations.buyer_id', 0) . '_' . Arr::get($event->data, 'relations.buyer_tier_id', 0) . '_' . Carbon::parse($this->bucket($period, Arr::get($item, 'bucket_date', $now)))->toDateTimeString() . '_' . Carbon::parse($this->bucketEnd($period, $bucket))->toDateTimeString(); // 判断 key 是否存在 if (isset($mergedAggregates[$mergedKey])) { // key已存在.进行数据合并 $mergedAggregates[$mergedKey][$column] = ($mergedAggregates[$mergedKey][$column] ?? 0) + ($aggregate[$column] ?? 0); }else { //如果还不存在, 加入进去. 除了 column 和value 外的,把bucket 也并入。 $mergedAggregates[$mergedKey] = collect(array_merge([ 'bucket_starts_at' => Carbon::parse($this->bucket($period, Arr::get($item, 'bucket_date', $now))), 'bucket_ends_at' => Carbon::parse($this->bucketEnd($period, $bucket)), 'bucket' => $bucket, 'period' => $period, ], $aggregate))->except([ 'column', 'value' ])->toArray(); //写入合并的 $column 数据. $mergedAggregates[$mergedKey][$column] = $aggregate[$column] ?? 0; } } } $this->storeAggregates(array_values($mergedAggregates), Arr::get($aggregates, 'column', 'total_other'), Arr::get($aggregates, 'worker', 1) ?? 1); // array_values是为了去掉$mergedKey. }
2. 修改 `StatisticAggregateStream::append`: 使用 `hIncrBy`
```php
public static function append(array $data): void
{
$worker = Arr::get($data, 'worker', 1) ?? 1;
$column = Arr::get($data, 'column', 'total_other');
// 取出 aggregates 循环
foreach(Arr::get($data, 'aggregates', []) as $aggregate) {
//合并key。 与 $table->unique 的组合字段要一致
$keyHash = //合并唯一性 key。 要与 $table->unique 的组成部分一致
Arr::get($aggregate,'for') . '_' .
Arr::get($aggregate,'product') . '_' .
Arr::get($aggregate, 'country') . '_' .
$worker . '_' .
Arr::get($aggregate, 'bucket'). '_' .
Arr::get($aggregate, 'period') . '_' .
Arr::get($aggregate, 'modelable_type') . '_' .
Arr::get($aggregate,'modelable_id') . '_' .
Arr::get($aggregate, 'company_id', 0) . '_' .
Arr::get($aggregate, 'affiliate_id', 0) . '_' .
Arr::get($aggregate, 'affiliate_campaign_id', 0) . '_' .
Arr::get($aggregate, 'buyer_id', 0) . '_' .
Arr::get($aggregate, 'buyer_tier_id', 0) . '_' .
Arr::get($aggregate, 'bucket_starts_at') . '_' .
Arr::get($aggregate, 'bucket_ends_at');
$redisKey = self::ingestKey($worker); //使用 ingestKey
$aggregate['column'] = $column;
// 使用 hset 存
Redis::connection('stream')->hset($redisKey, $keyHash, serialize($aggregate));
}
}
```
3. 修改 `digest`,取出的时候进行反序列化, 然后使用 之前的`store` 或者 `storeSingleItem` 函数.
```
public static function digest(int $worker = 1): int
{
$total = 0;
while (true) {
$entries = Redis::connection('stream')->hgetall(self::ingestKey($worker)); //一次性取出所有.
if (empty($entries)) {
return $total;
}
$total += count($entries);
$items = [];
foreach($entries as $keyHash => $serializedData) {
$items[] = unserialize($serializedData);
//删除掉 redis 里的key。
Redis::connection('stream')->hdel(self::ingestKey($worker), $keyHash);
}
// 依旧可以使用 之前的 store 函数。
self::store(collect($items));
}
}
```
* **安全建议:** 在数据进入Redis 之前,要严格校验数据,做好防御性编程。
* **优点** :这种方式最大的优点就是极大的减少了数据入库的次数, 降低了SQL 的执行频率和冲突的概率. 将绝大多数合并逻辑放在Redis侧,可以减少数据库侧的死锁可能性。
4. 优化索引和表结构
-
原理: 合理的索引和表结构可以显著提升查询和更新性能,减少锁的范围。
-
建议:
-
Review
key_hash
: 你的key_hash
虚拟列是导致隐式锁增加的原因. 尤其是其计算中包含了日期字段bucket_starts_at
,bucket_ends_at
. 即便是不同的worker
仍然可能出现key_hash
的锁定. 你可以尝试将日期部分移除, 如果能接受较低唯一性冲突(可以通过监控来判断)。
如果不能移除日期部分,建议去掉DATE_FORMAT
, 直接使用时间戳:$table->char('key_hash', 16)->charset('binary')->virtualAs(" unhex(MD5(CONCAT( `for`, '', `product`, '', `country`, '', `worker`, '', `bucket`, '', `period`, '', `modelable_type`, '', `modelable_id`, '', `company_id`, '', `affiliate_id`, '', `affiliate_campaign_id`, '', `buyer_id`, '', `buyer_tier_id`, '', `bucket_starts_at`, '', //时间戳. `bucket_ends_at` )))
");
```
- 联合索引: 目前你的唯一索引包含
worker
和key_hash
. 建议把最常用在WHERE
子句做过滤条件的, 且选择性(cardinality) 比较高的字段放在联合索引的前面. (但是务必考虑查询的多样性) - 定期维护: 对于频繁更新的表,尤其是聚合表.随着时间推移会积累大量的索引碎片.定期(比如每天) 使用
OPTIMIZE TABLE statistic_aggregates;
来整理表空间.
-
5. 更换 ID 生成策略
-
原理: 当前使用ULID 作为主键。 如果发现由于 ULID 的生成存在某种规律性,与你的聚合业务结合起来后放大了冲突,那么更换一个 ID 生成策略可能有帮助. 例如Twitter 的 Snowflake ID (雪花ID) 生成. 这种生成策略的核心是将 ID 生成分散到多个节点进行, 避免中心化生成ID, 尤其是在时间维度高度相近的情况.
-
实施 需要在
statistic_aggregate_relations
表的id
生成逻辑上做修改。如果用了自增的id。还需要在数据迁移时进行妥善处理,这里就不展开具体代码了。