返回

Laravel并发Upsert致MySQL死锁?解决方案全解析

mysql

Laravel 中并发控制台命令 upsert 操作导致的序列化死锁

你在 Laravel 11 中使用 upsert 批量更新 MySQL 8 数据库时,遇到了序列化死锁 (Serialization deadlock) 的问题。即便在唯一索引中加入了 worker 字段来区分不同的控制台命令,运行多个并发的控制台命令仍然会导致死锁。 好消息,我将给你提供分析和解决方案。

一、 问题原因分析

死锁通常发生在多个事务同时尝试获取相同资源的锁,且形成循环依赖时。 具体到你当前场景, 我认为原因可能有下面几点:

  1. upsert 的内部机制: Laravel 的 upsert 方法,底层实现可能是先尝试插入 (INSERT),如果因为唯一键冲突而失败,则转为更新 (UPDATE)。 在高并发场景下,多个 upsert 同时尝试插入相同记录,可能导致锁竞争。即使有 worker 字段,若两条记录的其他唯一索引字段相同,还是可能发生冲突,尤其是这些冲突几乎同时发生时。
  2. 隐式锁: MySQL (InnoDB 存储引擎) 在执行 INSERT ... ON DUPLICATE KEY UPDATE 语句时,会对涉及的行加锁, 包括 gap lock 和 next-key lock (具体锁的机制由隔离级别控制), 如果不当的设计会使隐式锁的范围变大。 这种隐式锁定行为,即便是不同的 worker,如果存在任何时间点的索引数据的更新(由于存在虚拟生成的key_hash字段) 都可能会影响。
  3. 索引设计: key_hash 列通过 MD5 哈希生成,理论上具有较好的散列性。但是,如果数据分布不均匀,或者哈希冲突的概率增加,就可能出现多个不同的输入产生相同的 key_hash 值,进一步增加冲突几率。 你的数据源于Redis stream, 可能会在极短的时间生成大量具有高度相同性的数据, 此时这种风险更大。
  4. 事务边界过大 : 代码里,对 items 里的每一行 aggregate 数据, 都开启一个数据库事物. 当items比较多时, 这等同于items.count() 这么多个事物要开启和执行. items中一旦数据有关联性(即便通过了你的worker区分), 事物都会变大和增加竞争锁概率.

二、 解决方案

下面是我给你提出的几个解决方案,你可以结合实际情况选择合适的方案,或者将多个方案结合使用。

1. 显式锁定 + 优化事务

  • 原理: 使用 SELECT ... FOR UPDATE 显式获取锁,确保在更新之前获得对行的排他锁。

  • 实现:
    将你的 store 方法改成如下。将事务的开启放到所有循环外层。 简化SQL的生成,避免Arr::get

    private static function store(Collection $items): void
    {
        if ($items->isEmpty()) {
            return;
        }
    
        DB::transaction(function () use ($items) {
            $items->each(function (array $aggregate) {
                  //简化$filteredAggregates 的获得方式, 并将循环提到这里
                  $filteredAggregates = collect(Arr::get($aggregate, 'aggregates', []))->map(function ($aggregate) {
                    return collect($aggregate)->except([
                        'company_id', 'affiliate_id', 'affiliate_campaign_id', 'buyer_id', 'buyer_tier_id'
                    ])->toArray();
                  })->toArray();
    
                foreach ($filteredAggregates as $aggregate) {
                     //  取出 key_hash, 单独给一个变量.清晰一些。
                    $keyHash = $aggregate['key_hash']; // 假设key_hash一定存在。
    
                     //  先悲观锁定, 再更新。
                     $existingRecord = StatisticAggregate::lockForUpdate()->where('key_hash', $keyHash)->first();
    
                      $column = $aggregate['column'] ?? 'total_other';
    
                      if ($existingRecord) {
                            // 存在记录,直接更新. 使用 increment
                            $existingRecord->increment($column, $aggregate[$column] ?? 0); //假定所有值是数值型。
    
                            //确保更新了updated_at 时间.
                            $existingRecord->updated_at = now();
                            $existingRecord->save();
    
                       } else {
                        //使用insert, insert 不需要考虑$column.直接把$aggregate 传入.
                           StatisticAggregate::create($aggregate);
    
                      }
                  }
    
            });
    
        }, 3); // 最多尝试3次
    }
    
  • 安全建议: 使用 lockForUpdate() 时,要确保事务尽可能短,以减少锁的持有时间。 同时,监控数据库的锁等待情况。

  • 进阶用法: 针对可能出现的长事务或锁等待时间过长的情况, 你可以对大事务进行分解. 对items 进行切割后批量store

     private static function store(Collection $items): void
    {
    ...
         // 假定每 500 条一个批次. 减少锁持有的时间和降低单个事物的压力。
        $items->chunk(500)->each(function (Collection $chunkedItems) {
    
               DB::transaction(function () use ($chunkedItems) {
    
                     //原来的 items 替换成 chunkedItems 即可
    
              });
    
        });
    
     ....
    }
    

2. 使用队列消化 (Queue-based Processing)

  • 原理:upsert 操作放入队列,由队列 worker 逐个处理,避免并发写入。这样做的目的不是消灭并发,而是把可能的“强冲突”并发给串行化执行。

  • 实现:

    1. 修改你的控制台命令,让它不再直接执行 upsert,而是将数据推送到一个队列中:

      // 在你的 console command 的 handle 方法中
      
       while (true) {
      
           //.... 其他代码保持一致。
      
            $entries = collect(Redis::connection('stream')->xrange(self::ingestKey($worker), '-', '+', self::$chunk));
      
            if ($entries->isEmpty()) {
                return $total;
            }
          foreach ($entries as $entryId => $entryData) {
      
              // $data 现在是未序列化过后的数据
              $data = unserialize($entryData['data']);
              // 不直接在这里调用 store 了.
             //StatisticAggregateStream::digest((int) $this->argument('worker'));
      
              //将每条 entry 都 push 到一个新的队列里
             dispatch(new ProcessStatisticAggregate($data));  // 创建一个新的 job
      
              //删掉已经入队列的.
              Redis::connection('stream')->xdel(self::ingestKey($worker), [$entryId]);
      
          }
      
           // .....
       }
      
    2. 创建一个新的 job (例如 ProcessStatisticAggregate):

      <?php
      
      namespace App\Jobs;
      
      use Illuminate\Bus\Queueable;
      use Illuminate\Contracts\Queue\ShouldQueue;
      use Illuminate\Foundation\Bus\Dispatchable;
      use Illuminate\Queue\InteractsWithQueue;
      use Illuminate\Queue\SerializesModels;
      use App\Services\StatisticAggregateStream;
      class ProcessStatisticAggregate implements ShouldQueue
      {
          use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
      
            public $tries = 3;
          /**
           * 数据
           */
          protected $data;
      
          /**
           * Create a new job instance.
           * @param array $data  //从redis里取出来的,经过反序列化后的原始数据.
           * @return void
           */
          public function __construct(array $data)
          {
              $this->data = $data;
          }
      
          /**
           * Execute the job.
           *
           * @return void
           */
          public function handle()
          {
            //把原来的处理逻辑放在这里,但是用单独的, 接收数组的方法
            StatisticAggregateStream::storeSingleItem($this->data);
          }
      }
      
      
    3. 修改 StatisticAggregateStream增加单条item 的处理函数

        /**
     * 单个 item 的 store 处理. 提取给 queue job 使用.
     * @param $item 单个条目数据,格式与你传入 items 的元素相同.
     *
     */
     public static function storeSingleItem(array $item): void
     {
    
        DB::transaction(function() use ($item){
    
              $column = Arr::get($item, 'column', 'total_other');
                $filteredAggregates = collect(Arr::get($item, 'aggregates', []))->map(function ($aggregate) {
                return collect($aggregate)->except([
                    'company_id', 'affiliate_id', 'affiliate_campaign_id', 'buyer_id', 'buyer_tier_id'
                    ])->toArray();
                })->toArray();
    
            //这里也不用循环,而是针对每个 $aggregate
             foreach($filteredAggregates as $aggregate) {
    
                    //同样使用 keyHash
                   $keyHash = $aggregate['key_hash'];
    
                  //采取尝试更新或创建
                    $statistic = StatisticAggregate::firstOrNew(['key_hash' => $keyHash]);
                    if (!$statistic->exists) {
                        // 如果是新记录,填充所有数据. 由于$aggregate 已经是过滤过后的. 可以全部合并。
                         $statistic->fill($aggregate);
    
                    }
    
                     //针对既有数据,
                     $statistic->{$column} = ($statistic->{$column} ?? 0) +  ($aggregate[$column] ?? 0); //如果有column则累加.
    
                    $statistic->save();
    
               }
    
        }, 3); // 尝试次数
    
    }
    
    1. 运行队列 worker: php artisan queue:work
      重要提醒 : 如果你的数据量比较大,或者这个queue的处理需要优化到极致,仍然需要监控和调整queueworker 数量.

3. Redis 侧预聚合(Pre-aggregation on Redis Side)

  • 原理: 既然问题源于并发的 upsert 导致的数据库锁竞争, 而且你的数据本来就来源于 Redis。那么干脆直接利用 Redis 的原子性操作和计算能力, 在 Redis 侧先完成一部分预聚合。 可以理解为合并数据到stream 之前先完成一个预计算和合并操作.

  • 实现步骤:
    1. 修改 StoreBatchStatisticAggregatehandle部分,增加对重复key 的数据合并:

    public function handle(BatchStatisticAggregateCreation $event): void
    {
    //.... 其他代码一致。
      $aggregates = ['worker' => $this->getWorker(), 'column' => null, 'value' => null, 'aggregates' => []];
        $mergedAggregates = []; // 用于存放合并后的 aggregates
    foreach (collect(Arr::get($event->data, 'aggregates', []))->toArray() as $item) {
            $now = Carbon::parse(Arr::get($event->data, 'now', now()));
    
             if (! Arr::get($item, 'modelable_type', null) || ! Arr::get($item, 'modelable_id', null)) {
               continue;
          }
        //  ....  aggregate 构建保持一致
            $aggregate = $this->preAggregateStatistic([....]);
        //.....
            foreach ($this->periods() as $period) {
                $bucket = $this->bucket($period, Arr::get($item, 'bucket_date', $now));
    
                $mergedKey =  //合并唯一性 key。 要与 $table->unique 的组成部分一致
                 Arr::get($aggregate,'for') . '_' .
                 Arr::get($aggregate,'product') . '_' .
                 Arr::get($aggregate, 'country') . '_' .
                Arr::get($aggregate,'worker') . '_' .
                  $bucket. '_' .
                 $period . '_' .
               Arr::get($aggregate, 'modelable_type') . '_' .
                Arr::get($aggregate,'modelable_id') . '_' .
              Arr::get($event->data, 'relations.company_id', 0)  . '_' .
             Arr::get($event->data, 'relations.affiliate_id', 0) . '_' .
            Arr::get($event->data, 'relations.affiliate_campaign_id', 0) . '_' .
             Arr::get($event->data, 'relations.buyer_id', 0) . '_' .
            Arr::get($event->data, 'relations.buyer_tier_id', 0) . '_' .
               Carbon::parse($this->bucket($period, Arr::get($item, 'bucket_date', $now)))->toDateTimeString() . '_' .
            Carbon::parse($this->bucketEnd($period, $bucket))->toDateTimeString();
                 // 判断 key 是否存在
                 if (isset($mergedAggregates[$mergedKey])) {
                    // key已存在.进行数据合并
                     $mergedAggregates[$mergedKey][$column] =  ($mergedAggregates[$mergedKey][$column] ?? 0) + ($aggregate[$column] ?? 0);
    
                 }else {
                  //如果还不存在, 加入进去.  除了 column 和value 外的,把bucket 也并入。
                     $mergedAggregates[$mergedKey] = collect(array_merge([
                          'bucket_starts_at' => Carbon::parse($this->bucket($period, Arr::get($item, 'bucket_date', $now))),
                        'bucket_ends_at' => Carbon::parse($this->bucketEnd($period, $bucket)),
                          'bucket' => $bucket,
                         'period' => $period,
                      ], $aggregate))->except([
                        'column', 'value'
                      ])->toArray();
                      //写入合并的 $column 数据.
                        $mergedAggregates[$mergedKey][$column] = $aggregate[$column] ?? 0;
              }
    
          }
    }
    
     $this->storeAggregates(array_values($mergedAggregates), Arr::get($aggregates, 'column', 'total_other'), Arr::get($aggregates, 'worker', 1) ?? 1); // array_values是为了去掉$mergedKey.
    
    }
    
    
2. 修改 `StatisticAggregateStream::append`: 使用 `hIncrBy`
    ```php
    public static function append(array $data): void
    {
         $worker = Arr::get($data, 'worker', 1) ?? 1;
       $column = Arr::get($data, 'column', 'total_other');

          //  取出 aggregates 循环
       foreach(Arr::get($data, 'aggregates', []) as $aggregate) {

        //合并key。 与 $table->unique  的组合字段要一致
          $keyHash =  //合并唯一性 key。 要与 $table->unique 的组成部分一致
            Arr::get($aggregate,'for') . '_' .
             Arr::get($aggregate,'product') . '_' .
             Arr::get($aggregate, 'country') . '_' .
             $worker . '_' .
               Arr::get($aggregate, 'bucket'). '_' .
             Arr::get($aggregate, 'period') . '_' .
          Arr::get($aggregate, 'modelable_type') . '_' .
             Arr::get($aggregate,'modelable_id') . '_' .
            Arr::get($aggregate, 'company_id', 0)  . '_' .
           Arr::get($aggregate, 'affiliate_id', 0) . '_' .
             Arr::get($aggregate, 'affiliate_campaign_id', 0) . '_' .
             Arr::get($aggregate, 'buyer_id', 0) . '_' .
          Arr::get($aggregate, 'buyer_tier_id', 0) . '_' .
             Arr::get($aggregate, 'bucket_starts_at') . '_' .
              Arr::get($aggregate, 'bucket_ends_at');
          $redisKey = self::ingestKey($worker); //使用 ingestKey
          $aggregate['column'] = $column;
           // 使用 hset 存
        Redis::connection('stream')->hset($redisKey, $keyHash,  serialize($aggregate));
     }

   }

    ```

3.  修改 `digest`,取出的时候进行反序列化, 然后使用 之前的`store` 或者 `storeSingleItem` 函数.

  ```
     public static function digest(int $worker = 1): int
     {
         $total = 0;

       while (true) {

         $entries = Redis::connection('stream')->hgetall(self::ingestKey($worker)); //一次性取出所有.

          if (empty($entries)) {
             return $total;
          }
        $total += count($entries);
           $items = [];

        foreach($entries as $keyHash => $serializedData) {

           $items[] = unserialize($serializedData);
             //删除掉 redis 里的key。
              Redis::connection('stream')->hdel(self::ingestKey($worker), $keyHash);

          }
        // 依旧可以使用 之前的  store 函数。
            self::store(collect($items));
     }
 }

  ```

*   **安全建议:**   在数据进入Redis 之前,要严格校验数据,做好防御性编程。
* **优点** :这种方式最大的优点就是极大的减少了数据入库的次数, 降低了SQL 的执行频率和冲突的概率. 将绝大多数合并逻辑放在Redis侧,可以减少数据库侧的死锁可能性。

4. 优化索引和表结构

  • 原理: 合理的索引和表结构可以显著提升查询和更新性能,减少锁的范围。

  • 建议:

    1. Review key_hash: 你的 key_hash 虚拟列是导致隐式锁增加的原因. 尤其是其计算中包含了日期字段 bucket_starts_at, bucket_ends_at. 即便是不同的 worker 仍然可能出现 key_hash 的锁定. 你可以尝试将日期部分移除, 如果能接受较低唯一性冲突(可以通过监控来判断)。
      如果不能移除日期部分,建议去掉 DATE_FORMAT, 直接使用时间戳:

      $table->char('key_hash', 16)->charset('binary')->virtualAs("
         unhex(MD5(CONCAT(
            `for`, '',
            `product`, '',
            `country`, '',
           `worker`, '',
           `bucket`, '',
            `period`, '',
           `modelable_type`, '',
          `modelable_id`, '',
             `company_id`, '',
             `affiliate_id`, '',
            `affiliate_campaign_id`, '',
              `buyer_id`, '',
           `buyer_tier_id`, '',
           `bucket_starts_at`, '',  //时间戳.
          `bucket_ends_at`
         )))
      

    ");

      ```
    
    1. 联合索引: 目前你的唯一索引包含 workerkey_hash. 建议把最常用在 WHERE 子句做过滤条件的, 且选择性(cardinality) 比较高的字段放在联合索引的前面. (但是务必考虑查询的多样性)
    2. 定期维护: 对于频繁更新的表,尤其是聚合表.随着时间推移会积累大量的索引碎片.定期(比如每天) 使用OPTIMIZE TABLE statistic_aggregates; 来整理表空间.

5. 更换 ID 生成策略

  • 原理: 当前使用ULID 作为主键。 如果发现由于 ULID 的生成存在某种规律性,与你的聚合业务结合起来后放大了冲突,那么更换一个 ID 生成策略可能有帮助. 例如Twitter 的 Snowflake ID (雪花ID) 生成. 这种生成策略的核心是将 ID 生成分散到多个节点进行, 避免中心化生成ID, 尤其是在时间维度高度相近的情况.

  • 实施 需要在 statistic_aggregate_relations表的id生成逻辑上做修改。如果用了自增的id。还需要在数据迁移时进行妥善处理,这里就不展开具体代码了。