返回

Firestore事务并发难题:有效避免数据丢失

python

搞定 Firestore 事务并发难题:城市数据不再神秘失踪

问题:并发写入 Firestore,城市数据莫名其妙丢了

最近在搞一个需求,需要追踪全国、各省以及省内所有城市的人口信息。最终的目标是要有个地方汇总统计总人口、省份数量和城市数量。其中,省份的数据大概长这样,一个文档包含所有省份,每个省份下有个城市列表:

{
    "Arizona": {
        "cities": [{
            "city": "Phoenix",
            "city_code": 123
        }]
    },
    "California": {
         "cities": [{
            "city": "Los Angeles",
            "city_code": 456
        }]
    }
    // ... 其他省份
}

为了处理并发请求(毕竟有俩后端服务实例同时在写 Firestore),代码里加了 Firestore 事务。逻辑是这样的:

  1. 检查省份是否存在。
  2. 不存在?那就把省份连同第一个城市一起加进去。
  3. 存在?那就用 ArrayUnion 把新城市追加到已有省份的 cities 数组里。
  4. 告诉调用方,这个省份是新增的还是本来就有的。
  5. 根据上一步的结果,再去更新全局的总人口、省份数、城市数等聚合指标。

代码瞅着像是这样:

# 假设 db 和 collection 已经初始化好
# collection 指向包含 states_doc_id 的集合
# states_doc_id 是那个包含所有省份信息的大文档的 ID
# aggregated_metrics 是处理聚合指标更新的对象

def add(population, state, city, city_code):
    is_new_state = add_state(state, city, city_code)
    # 如果 add_state 成功了,才更新聚合指标
    aggregated_metrics.update(population, is_new_state)


def add_state(state, city, city_code):
    @firestore.transactional
    def transaction_logic(transactn, states_ref) -> bool:
        """事务内的具体操作逻辑"""
        snapshot = states_ref.get(transaction=transactn)
        states_data = snapshot.to_dict() if snapshot.exists else {}

        # 用 f-string 构建字段路径是正确的
        field_path = f'{state}.cities'

        if state in states_data:
            # 省份已存在,追加城市
            transactn.update(states_ref, {
                field_path: firestore.ArrayUnion([{
                    'city': city,
                    'code': city_code, # 注意: 原始代码这里写的是 'code', 示例是 city_code, 保持一致
                }])
            })
            return False # 返回 False 表示省份不是新增的
        else:
            # 省份不存在,新增省份和城市
            # 注意:这里用 set + merge=True 效果是正确的,它会创建 state 字段(如果不存在)
            # 或者合并到现有的 document 数据中(如果文档已存在但没有 state 字段)
            transactn.set(states_ref, {
                state: {
                    'cities': [{
                        'city': city,
                        'code': city_code,
                    }]
                }}, merge=True)
            return True # 返回 True 表示省份是新增的

    states_ref = collection.document(states_doc_id)
    # 创建一个事务对象
    transaction = db.transaction()
    # 执行事务逻辑,注意这里调用的是内部定义的 transaction_logic
    is_new_state = transaction_logic(transaction, states_ref)
    # !!!注意:原始代码这里调用的是 add_state 自身,导致了递归和类型错误
    # 应该调用内部定义的、带有 @firestore.transactional 装饰器的函数
    # 或者最好使用 db.run_transaction()

    # 手动创建并传递 transaction 对象,这通常不是推荐的用法
    # 而且这里执行后没有 commit 操作,依赖于 @firestore.transactional 的魔法?
    # 但装饰器是用在内部函数上的,外部调用方式有点怪

    return is_new_state

# 原始代码的问题分析:
# 1. add_state 函数内部又定义了一个同名函数 add_state,并且用 @firestore.transactional 装饰。
#    这本身没问题,但外部调用时,`is_new_state = add_state(transaction, states_ref)`
#    调用的是内部的 add_state,而不是外部的。
# 2. 手动创建 `transaction = db.transaction()` 并将其传递给由 `@firestore.transactional`
#    装饰的函数,这种方式官方文档并不推荐,也可能干扰了库对事务生命周期(包括重试)的管理。
#    标准用法是直接用 `db.run_transaction(transaction_logic, states_ref)`。
# 3. 聚合指标更新 `aggregated_metrics.update(...)` 在事务之外执行。如果事务成功提交,
#    但服务器在执行这行代码前挂了,聚合数据就永远丢了。

期望中,Firestore 事务应该能自动处理并发冲突,失败了也会自动重试几次。但现实骨感,跑起来发现,差不多一半的城市数据都丢了!看起来像是某个实例执行 add_state 时跪了,而且说好的重试也没生效?更糟的是,后续更新聚合指标的那步也没执行。到底哪里出了岔子?

刨根问底:为啥事务没按预期工作?

检查了代码和 Firestore 事务的特性,丢数据这事儿,根子可能在下面几个地方:

  1. 事务用法不对劲,自动重试可能“被跳过”了:
    你瞅瞅原始代码里 add_state 函数那块。它先手动搞了个 transaction = db.transaction(),然后把这个 transaction 对象传给了内部那个被 @firestore.transactional 装饰的函数。这操作有点非主流。
    通常,咱们用 Firestore 事务,要么直接在函数上加 @firestore.transactional 装饰器,然后通过 db.run_transaction(your_decorated_function, ...) 来跑;要么就是写一个普通函数(不带装饰器),然后把它传给 db.run_transaction(),像这样:db.run_transaction(your_transaction_logic_function, ...)
    这两种标准姿势,Firestore 客户端库会自动帮你管理事务的开始、提交、回滚,关键是它会自动处理并发冲突 时的重试 逻辑。
    你现在手动创建 transaction 对象,再把它传来传去,很可能就绕过了或者干扰了库内置的自动重试机制。库可能觉得:“哥们,你自己管事务对象了,那重试的事我就不操心了哈”。结果就是,一旦事务因为并发冲突失败,它就真失败了,没重试,数据自然就丢了。

  2. 操作拆分引入的原子性问题:
    代码的设计是先跑 add_state 这个事务,然后(事务成功的话)再跑 aggregated_metrics.update。这两步是分开的。
    这意味着啥?add_state 的事务保证了对 states_doc_id 这一个文档的读写是原子的。但它跟后面的 aggregated_metrics.update 可不是一个整体的原子操作。
    极端情况:事务 add_state 成功提交了,省份/城市数据写进去了。但就在执行 aggregated_metrics.update 之前,服务器实例挂了、重启了、或者网络嗝屁了。那这次操作对应的聚合指标(总人口、省份数等)就永远没机会更新了,数据自然就对不上了。

  3. 高并发下的写入瓶颈与事务失败:
    虽然事务能处理并发,但如果大量的请求同时涌进来,都要去修改同一个 states_doc_id 文档,那冲突的概率会非常高。Firestore 事务每次重试前都会重新读取数据,如果冲突持续发生,重试次数是有限的(默认 5 次)。超过次数限制,事务就会彻底失败,抛出异常。如果你的代码没有很好地捕捉和处理这种最终失败,那对应的数据写入和后续的聚合更新都会丢失。一个文档扛下全国所有省市的更新,确实压力山大。

解决方案:让 Firestore 事务乖乖听话

知道了原因,咱们就能对症下药了。下面提供几种思路,可以单独用,也可以组合拳。

解决方案一:规范事务用法,让自动重试跑起来

这是最直接的修改,先保证事务本身用对了。别手动创建 transaction 对象瞎传。

原理:
利用 Firestore Python 客户端库提供的 db.run_transaction() 方法。这个方法会负责创建事务、执行你提供的业务逻辑函数、并在遇到可重试错误(比如并发冲突)时自动重试。

怎么改代码:

把原来的 add_state 函数改成下面这样:

from google.cloud import firestore
from google.cloud.firestore_v1.transaction import Transaction
from google.cloud.firestore_v1.document import DocumentReference

# 假设 db, collection, states_doc_id, aggregated_metrics 已定义

def add_state_transaction_logic(transactn: Transaction, states_ref: DocumentReference, state: str, city: str, city_code: int) -> bool:
    """
    这是包含实际读写逻辑的函数,会被 run_transaction 调用。
    注意:所有需要在事务内用到的数据,要么作为参数传进来,要么在函数内部通过事务对象获取。
    """
    snapshot = states_ref.get(transaction=transactn)
    states_data = snapshot.to_dict() if snapshot.exists else {}

    field_path = f'{state}.cities' # 使用字段路径

    if state in states_data:
        # 省份已存在,追加城市
        transactn.update(states_ref, {
            field_path: firestore.ArrayUnion([{
                'city': city,
                'code': city_code,
            }])
        })
        return False # 不是新省份
    else:
        # 省份不存在,新增省份和城市
        transactn.set(states_ref, {
            state: {
                'cities': [{
                    'city': city,
                    'code': city_code,
                }]
            }}, merge=True)
        return True # 是新省份

def add_state_and_update_metrics(population: int, state: str, city: str, city_code: int):
    """
    主函数,负责调用事务并处理后续聚合更新。
    """
    states_ref = collection.document(states_doc_id)

    try:
        # 使用 db.run_transaction() 来执行事务逻辑
        # 将需要的数据作为参数传递给 add_state_transaction_logic
        is_new_state = db.run_transaction(add_state_transaction_logic, states_ref, state, city, city_code)

        # 事务成功提交后,才执行聚合指标更新
        aggregated_metrics.update(population, is_new_state)
        print(f"Successfully added/updated city {city} in {state}. Is new state: {is_new_state}")

    except Exception as e:
        # 事务最终失败(超过重试次数或其他错误)会抛出异常
        # 需要记录日志或做其他错误处理
        print(f"Error processing state {state}, city {city}: {e}")
        # 这里要考虑失败了怎么办?是否需要补偿?或者接受数据丢失?

# 调用示例
# add_state_and_update_metrics(10000, "California", "San Diego", 789)

解释:

  1. 我们把原来嵌套的、带装饰器的函数抽出来,变成了一个独立的 add_state_transaction_logic 函数。这个函数只负责事务内的读写操作,接受 transactn 对象和业务数据作为参数。
  2. 原来的 add_state 函数改名为 add_state_and_update_metrics,它负责准备数据、获取 states_ref,然后调用 db.run_transaction()
  3. db.run_transaction() 会自动处理事务的开始、提交、回滚和重试 。你只管提供逻辑函数 add_state_transaction_logic 和它需要的参数。
  4. 重要的是加上了 try...except 块。即使有自动重试,事务也可能因为持续冲突或其他错误最终失败。你需要捕获这个异常,记录日志,决定如何处理(比如告警、放入死信队列后续处理等)。

额外建议:

  • 日志记录:except 块里详细记录错误信息和相关参数(state, city 等),方便排查是哪个操作失败了。
  • 理解重试限制: 知道 Firestore 事务默认重试 5 次。如果你的应用并发非常高,冲突特别严重,可能需要考虑优化数据模型(见方案四)来降低冲突。

解决方案二:把聚合更新也塞进事务里 (谨慎操作)

既然 add_stateaggregated_metrics.update 分开执行有原子性问题,那把它们合并到一个事务里不就完了?

原理:
Firestore 事务可以包含对多个文档的读写操作。只要总写入数据量不超过限制(当前是 1MiB)且操作都在一个事务内完成,就能保证要么全部成功,要么全部失败回滚,实现原子性。

怎么改代码:

假设你的聚合指标也存在 Firestore 的某个文档里,比如 aggregate_doc_id

# 需要获取聚合指标文档的引用
aggregate_ref = collection.document(aggregate_doc_id)

def update_state_and_aggregates_transaction(transactn: Transaction, states_ref: DocumentReference, aggregate_ref: DocumentReference, population: int, state: str, city: str, city_code: int):
    """事务内同时更新省份数据和聚合指标"""

    # 1. 处理省份数据 (同方案一的逻辑)
    snapshot = states_ref.get(transaction=transactn)
    states_data = snapshot.to_dict() if snapshot.exists else {}
    field_path = f'{state}.cities'
    is_new_state = False

    if state in states_data:
        transactn.update(states_ref, {
            field_path: firestore.ArrayUnion([{'city': city, 'code': city_code}])
        })
    else:
        transactn.set(states_ref, {
            state: { 'cities': [{'city': city, 'code': city_code}] }
        }, merge=True)
        is_new_state = True

    # 2. 处理聚合指标数据
    agg_snapshot = aggregate_ref.get(transaction=transactn)
    # 读取当前的聚合数据,如果文档不存在则初始化
    current_aggregates = agg_snapshot.to_dict() if agg_snapshot.exists else {'total_population': 0, 'total_states': 0, 'total_cities': 0}

    updates = {
        'total_population': firestore.Increment(population), # 使用 Increment 原子增加人口
        'total_cities': firestore.Increment(1) # 每次都增加一个城市
    }
    if is_new_state:
        updates['total_states'] = firestore.Increment(1) # 如果是新省份,省份数+1

    # 在事务中更新聚合指标文档
    transactn.update(aggregate_ref, updates)

    # 注意:run_transaction 期望事务函数返回一个值,如果不需要可以返回 None
    # 或者返回 is_new_state 供外部判断(虽然在这里可能意义不大了,因为聚合已在事务内完成)
    return is_new_state


def add_state_and_update_metrics_atomic(population: int, state: str, city: str, city_code: int):
    states_ref = collection.document(states_doc_id)
    aggregate_ref = collection.document(aggregate_doc_id) # 获取聚合文档引用

    try:
        # 调用包含聚合更新的事务逻辑
        db.run_transaction(update_state_and_aggregates_transaction, states_ref, aggregate_ref, population, state, city, city_code)
        print(f"Atomically added/updated city {city} in {state} and aggregates.")

    except Exception as e:
        print(f"Atomic update failed for state {state}, city {city}: {e}")
        # 错误处理...

# 调用示例
# add_state_and_update_metrics_atomic(15000, "Arizona", "Tucson", 321)

解释:

  1. 新的事务函数 update_state_and_aggregates_transaction 现在不仅操作 states_ref,还操作 aggregate_ref
  2. 它先完成省份/城市数据的读写判断和更新。
  3. 然后,它读取当前的聚合指标文档 (aggregate_ref.get)。
  4. 接着,它计算出需要对聚合指标做的改动(比如人口增加多少,城市数+1,如果是新省份则省份数+1)。这里用了 firestore.Increment(),这是 Firestore 提供的原子计数器,非常适合这种累加场景,能减少读取-计算-写入模式下的冲突。
  5. 最后,在同一个事务里更新聚合指标文档 (transactn.update(aggregate_ref, ...)).
  6. 这样,省份数据的修改和聚合指标的修改就捆绑在一起了,要么都成功,要么都失败。

风险与考量:

  • 增加事务冲突: 这个事务现在锁定了两个文档(states_doc_idaggregate_doc_id)。任何试图修改这两个文档中任意一个的其他并发事务,都会与此事务冲突。如果聚合指标更新非常频繁,这会大大增加冲突概率,可能导致更多的事务重试甚至最终失败。
  • 事务写入限制: Firestore 事务有大小限制。虽然不太可能超,但要知道有这回事。
  • 聚合文档热点: aggregate_doc_id 成了新的写入热点。所有操作都要更新它。

进阶技巧:分片计数器 (Sharded Counters)
如果聚合指标更新实在太频繁,导致 aggregate_doc_id 热点问题严重,可以考虑使用 Firestore 的分片计数器模式。简单说就是把一个总计数器拆成多个(比如 10 个)子计数器文档,每次更新随机选一个子计数器加 1。读取总数时再把所有子计数器的值加起来。这样就把写入压力分散到了多个文档上,大大降低了单点冲突。实现起来复杂一些,但能有效提升高并发写入聚合数据的性能。

解决方案三:解耦操作,拥抱最终一致性 (推荐)

这个方案试图在事务简单性和数据一致性之间找个平衡。它不强求省份数据和聚合数据在 同一时刻 完全一致,而是保证省份数据写入是原子的,并且聚合数据 最终 会被正确更新。

原理:
只让事务负责最核心、最需要强一致性的部分——也就是 states_doc_id 的更新。聚合指标的更新则交给一个可靠的、异步的机制来处理,比如 Google Cloud Functions。

操作步骤/思路:

  1. 事务部分: add_state 的事务逻辑保持简单,只负责更新 states_doc_id,跟方案一类似。关键是,事务成功后,要想办法触发后续的聚合更新。

    • 方法 A (推荐): 利用 Cloud Functions 的 Firestore 触发器。创建一个 Cloud Function,让它监听 states_doc_id 文档的 onWrite (或 onUpdate) 事件。
    • 方法 B (备选):add_state 事务成功 之后 (在 try 块的成功分支里),往一个专门的 "待处理聚合任务" 集合(比如 aggregation_tasks)写入一条记录,包含需要增加的人口、是否是新省份等信息。
  2. 异步处理部分 (Cloud Function):

    • 对于方法 A:
      • Firestore 文档 states_doc_id 每次被成功修改(无论是新增省份还是添加城市),都会触发这个 Cloud Function。
      • 函数被触发时,会收到 beforeafter 两个状态的文档快照。它可以比较这两个快照,精确计算出这次变更导致了哪些变化(比如新增了一个城市,这个城市属于哪个省,对应的人口是多少——人口信息需要想办法传递给函数,可能需要稍微调整事务写入的数据,或者从请求源头获取)。
      • Cloud Function 拿到变化信息后,再去安全地更新聚合指标文档 (aggregate_doc_id)。它甚至可以用一个新的、独立的事务 来更新聚合指标,以保证聚合更新自身的原子性(如果它需要先读后写)。
    • 对于方法 B:
      • 创建一个 Cloud Function,监听 aggregation_tasks 集合的 onCreate 事件。
      • 每当有新的任务文档写入,函数被触发。它读取任务文档中的信息(人口增量、是否新省份等)。
      • 然后,函数执行聚合指标的更新操作(同样,可以用事务保证原子性)。
      • 处理完成后,可以删除或标记这个任务文档,表示已处理。

代码思路 (基于 Cloud Functions Firestore 触发器 - 方法 A):

  • Python 后端 (add_state 部分,类似方案一):

    def add_state_trigger_cf(population: int, state: str, city: str, city_code: int):
        states_ref = collection.document(states_doc_id)
        try:
            # 事务逻辑只更新省份数据,可以返回 is_new_state
            # 这里可能需要调整 transaction_logic,让它在更新时也附带一些信息,比如本次操作的 population
            # 或者依赖 Cloud Function 从别处获取 population
            # 假设 is_new_state = db.run_transaction(...) 成功执行
    
            # 事务成功即可,聚合交给 Cloud Function
            print(f"State/city data updated for {city}, {state}. Aggregation handled by Cloud Function.")
    
        except Exception as e:
            print(f"State update failed for {state}, {city}: {e}")
            # 错误处理
    

    (需要调整事务逻辑,可能需要在写入 cities 数组时包含 population,或者有其他机制让 Cloud Function 知道 population 值)

  • Cloud Function (Python 示例):

    import functions_framework
    from google.cloud import firestore
    
    db = firestore.Client()
    aggregate_ref = db.collection('your_collection_name').document('aggregate_doc_id') # 配置你的集合和文档ID
    
    @functions_framework.cloud_event
    def handle_state_update(cloud_event):
        """
        Triggered by changes to the states document.
        Calculates changes and updates aggregates.
        """
        # 从 CloudEvent 中获取 Firestore 事件数据
        firestore_payload = cloud_event.data.get("value", {})
        before_snapshot_data = firestore_payload.get("oldValue", {}).get("fields", {})
        after_snapshot_data = firestore_payload.get("value", {}).get("fields", {}) # 注意:这里是 Firestore REST API 的结构
    
        # TODO: 复杂的比较逻辑来确定发生了什么变化
        # - 是哪个省份被修改了?
        # - 是新增城市还是新增省份?
        # - 新增/修改涉及的人口是多少?(这个数据来源需要设计,可能需要从 after_snapshot_data 解析)
    
        # 假设通过比较,我们得到了 population_delta, city_increment=1, state_increment=(1 if new_state else 0)
    
        population_delta = calculate_population_delta(before_snapshot_data, after_snapshot_data)
        city_increment, state_increment = calculate_increments(before_snapshot_data, after_snapshot_data)
    
        if city_increment > 0: # 确保确实有变化需要更新
            try:
                # 使用事务更新聚合数据,保证读写原子性
                @firestore.transactional
                def update_aggregates_in_cf(transaction, aggregate_ref):
                    snapshot = aggregate_ref.get(transaction=transaction)
                    # 确保即使文档不存在也能安全更新
                    updates = {
                        'total_population': firestore.Increment(population_delta),
                        'total_cities': firestore.Increment(city_increment),
                    }
                    if state_increment > 0:
                        updates['total_states'] = firestore.Increment(state_increment)
    
                    if snapshot.exists:
                        transaction.update(aggregate_ref, updates)
                    else:
                        # 如果聚合文档第一次创建
                        initial_data = {
                            'total_population': population_delta,
                            'total_cities': city_increment,
                            'total_states': state_increment
                        }
                        transaction.set(aggregate_ref, initial_data)
    
                # 执行事务
                transaction = db.transaction()
                update_aggregates_in_cf(transaction, aggregate_ref)
                print("Aggregates updated successfully by Cloud Function.")
    
            except Exception as e:
                print(f"Error updating aggregates in Cloud Function: {e}")
                # 需要处理 Cloud Function 内部的失败(比如重试、日志、死信队列)
    
    # TODO: 实现 calculate_population_delta 和 calculate_increments 函数
    # 这两个函数需要解析 Firestore CloudEvent 的数据格式 (不是直接的 dict)
    # 并比较 before/after 状态来确定变化。这部分会比较复杂。
    
    

解释:

  1. 后端服务的 add_state 事务变得更轻量,只负责核心数据写入。
  2. Cloud Function 成为聚合逻辑的处理器,独立于主应用运行。
  3. 利用 Firestore 触发器,保证了只要省份数据有变动,聚合逻辑就 一定 会被调用(虽然可能延迟几秒)。
  4. Cloud Function 内部可以使用事务来保证聚合更新的原子性,并且由于它处理的是单一的聚合文档,冲突概率相对方案二可能更低(取决于 CF 的并发和执行频率)。
  5. 实现了最终一致性 :省份数据和聚合数据可能在短时间内不完全同步,但系统最终会自动达到一致状态。

额外建议:

  • 幂等性: Cloud Functions 可能会因为 Firestore 的"至少一次"传递保证而被触发多次。你的聚合更新逻辑最好设计成幂等的,即执行一次和执行多次的效果相同。使用 firestore.Increment() 天然具有一定的幂等性(相对于读取-计算-写入)。如果使用方法 B(任务队列),处理完任务后务必删除或标记任务,避免重复处理。
  • 监控: 需要监控 Cloud Function 的执行情况、错误率,以及(如果使用方法 B)任务队列的积压情况。
  • 数据传递: 如何将 population 这个数据传递给 Cloud Function 需要仔细设计。是写入 states_doc_id 文档中(可能让文档更臃肿),还是 CF 从其他来源获取?

解决方案四:优化数据结构,釜底抽薪

所有问题都围绕着那个巨大的 states_doc_id 文档展开。这么多服务实例抢着更新它,不打架才怪。那换个思路,别把所有鸡蛋放一个篮子里呢?

原理:
改变 Firestore 中的数据组织方式,减少并发写入时的冲突点。

思路:

  • 每个省份一个文档: 不再用一个 states_doc_id 包含所有省份。而是创建一个 states 集合,里面的每个文档代表一个省份,文档 ID 就是省份名(比如 /states/Arizona, /states/California)。

    // Collection: states
    // Document ID: Arizona
    {
        "cities": [
            {"city": "Phoenix", "code": 123},
            {"city": "Tucson", "code": 321}
        ]
        // 可能还有该省的其他信息
    }
    

    当添加一个加州的城市时,事务只需要锁定 /states/California 这个文档。同时添加亚利桑那州和加州的城市的操作就不会互相冲突了。只有当两个请求同时修改 同一个省份 时才会冲突。这大大降低了冲突概率。
    聚合指标仍然可以放在一个单独的文档里,或者也进行分片。

  • 城市作为子集合: 更进一步,可以把城市作为省份文档下的一个子集合。

    /states/Arizona (文档)
        /cities/Phoenix (文档) { "code": 123, "population": ... }
        /cities/Tucson (文档) { "code": 321, "population": ... }
    

    这样添加一个城市只是在对应省份下创建一个新文档,写入冲突几乎只可能发生在同时创建 同一个城市 时(可以通过文档 ID 设计避免)。使用 ArrayUnion 追加数组元素,在高并发下比创建新文档更容易冲突。

影响:

  • 读取: 获取所有省份列表需要查询 states 集合。获取某个省份的所有城市,如果是子集合方式,需要查询子集合。查询全国所有城市需要用到 Collection Group Query。
  • 写入: 事务的范围变小了,冲突减少了,成功率大大提高。
  • 聚合: 聚合更新的逻辑可能需要调整,特别是如果聚合指标还依赖于省份或城市文档中的数据。配合方案三(Cloud Functions)可能效果更好。

进阶技巧:
选择哪种数据结构取决于你的主要查询模式和更新频率。如果经常要获取整个国家的省份城市概览,单个大文档可能读取方便,但写入痛苦。如果主要是针对单个省份操作,拆分文档或使用子集合能极大改善写入性能。需要权衡。

安全加固

别忘了 Firestore 安全规则!无论你用哪种方案,确保:

  • 只有你的后端服务(通过服务账号认证)有权限写入 states 集合/文档 和聚合指标文档。
  • 根据需要设置读取权限。客户端(如果直接访问 Firestore 的话)通常只给读取权限,甚至可能只允许读取特定、非敏感的数据。
  • 对聚合指标文档的写入权限要特别小心控制。

总结思考

Firestore 事务是处理并发写入的好工具,但得用对姿势。原始代码的问题主要在于可能不正确的事务调用方式干扰了自动重试,以及将原子操作拆分导致数据不一致风险。

解决这类问题,通常思路是:

  1. 保证事务正确使用: 拥抱 db.run_transaction(),处理好最终失败的异常。
  2. 界定原子边界: 明确哪些操作必须捆绑在一起执行。如果多步操作需要原子性,考虑将它们纳入同一个事务(注意性能影响),或者接受最终一致性,使用异步机制(如 Cloud Functions)解耦。
  3. 优化数据模型: 如果并发冲突是主要矛盾,思考能否通过调整数据结构(如拆分文档、使用子集合、分片计数器)来降低热点竞争。

选择哪个方案或组合,取决于你对一致性、实时性、系统复杂度和成本的具体要求。没有银弹,只有最适合你场景的折中。