返回

一维晶格KMC模拟:Gillespie算法常见错误与解决方案

python

修复一维晶格动力学蒙特卡洛模拟:常见陷阱与解决方案

我们碰到的问题是:尝试在一维晶格上使用动力学蒙特卡洛(KMC,具体指 Gillespie 算法)模拟粒子跳跃,目标是复现通过常微分方程(ODE)求解速率方程得到的稳态粒子布居数。但 KMC 模拟结果却和 ODE 对不上。

问题代码展示了两种方法:

  1. ODE 求解器 :通过 scipy.integrate.solve_ivp 解速率方程,得到了预期的稳态结果,可以作为基准。
  2. KMC 模拟 :尝试实现 Gillespie 算法,但得到的稳态布居数不正确。

模型示意图如下(源自 SciPost Phys. 16, 029 (2024)):粒子在长度为 lattice_length 的一维晶格上,可以左右跳跃,速率依赖于当前位置和邻近位置的粒子数 n(考虑了玻色子增强因子 (1+n))。左右两端连接着粒子数固定的“库”(reservoir),nLeftnRight

晶格粒子输运示意图

我们来分析 KMC 实现中可能出错的地方,并给出修正方案。

哪里出了问题?剖析 KMC 实现误区

Gillespie 算法是一种精确的随机模拟方法,它正确模拟了系统随时间演化的随机性。它包含两个关键步骤:

  1. 下一个事件发生的时间 :系统保持当前状态的时间 dt 是一个服从指数分布的随机变量,其参数为所有可能事件的总速率 R_total。计算公式为 dt = -log(U1) / R_total,其中 U1 是 (0, 1] 区间上的均匀随机数。
  2. 选择下一个发生的事件 :从所有可能的事件中,根据每个事件的速率 r_i 占总速率 R_total 的比例 (p_i = r_i / R_total),随机选择一个事件执行。

对照提供的 KMC 代码,最可能的问题出在时间推进模拟循环 的结构上。

原 KMC 代码片段:

timeSize=500
time_MC=np.linspace(0, timeSize, timeSize)
# ...

def MC_sim(dummy):
    n=np.zeros(lattice_length)
    nInTime=np.zeros((timeSize, lattice_length))

    for tIdx in range(timeSize): # <--- 这里是关键问题所在
        # ... 计算所有速率 rates ...
        sumRates = np.sum(rates)
        probabilities=rates/sumRates
        events=np.random.choice(len(rates), p=probabilities)
        # ... 更新状态 n ...
        for i in range(lattice_length):
            nInTime[tIdx, i]=n[i] # <--- 记录状态
    return n, nInTime

这段代码的 for tIdx in range(timeSize): 循环结构,本质上是在执行固定数量的蒙特卡洛“步”,而不是模拟到某个特定的 物理时间。每次循环,无论总速率 sumRates 多大,都只执行 一个 事件,并且时间好像是均匀地前进了 1 个单位(或者说,记录点均匀分布在“步数”轴上)。这违背了 Gillespie 算法的核心:时间步长 dt 是随机的,并且反比于总速率 R_total 。系统状态变化快(总速率高)时,时间步长短;变化慢(总速率低)时,时间步长长。

固定的循环次数 (timeSize) 导致:

  • 时间尺度错误 :模拟的总 物理时间 无法直接控制,且与 timeSize 没有简单的线性关系。
  • 稳态偏差 :如果模拟步数不足以让系统达到稳态,或者步数太多但时间尺度不对,平均结果自然会偏离 ODE 计算的真实稳态。
  • 状态记录问题nInTime[tIdx, i] = n[i] 记录的是第 tIdx事件发生后 的状态,但时间轴 time_MC 却是均匀的,这扭曲了真实的动力学过程。

解决方案:修正 KMC 算法实现

我们需要根据 Gillespie 算法的原理,修改 KMC 模拟函数 MC_sim

核心问题:修正时间推进机制

这是最关键的修改。我们需要引入模拟时间变量,并在每次事件后根据 dt = -log(U1) / R_total 来推进时间。模拟的停止条件应该是达到某个总的模拟时长 total_simulation_time,而不是固定的事件步数。

原理详解

  1. 计算总速率 (Propensity) :在每一步开始时,根据当前系统状态 n,计算所有可能发生的事件(粒子跳跃、与库交换)的速率 r_i。将它们加起来得到总速率 R_total = sum(r_i)
  2. 计算下一事件时间 :生成一个 (0, 1] 之间的均匀随机数 U1。计算时间增量 dt = -np.log(U1) / R_total。注意处理 R_total 可能为零的情况(系统达到冻结状态)。
  3. 选择发生的事件 :生成一个 (0, 1] 之间的均匀随机数 U2。遍历所有事件,累加它们的速率 r_i。当累加和首次超过 U2 * R_total 时,选择对应的那个事件。或者,直接使用 np.random.choice,以 rates / R_total 作为概率分布来选择事件索引。
  4. 更新状态 :根据选中的事件,更新系统的状态 n(例如,某个格点粒子数增减)。
  5. 推进时间 :将当前模拟时间 current_time 增加 dtcurrent_time += dt
  6. 循环与终止 :重复步骤 1-5,直到 current_time 达到预设的总模拟时间 total_simulation_time

代码修改示例

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.pyplot import cm

# ... (保留 ODE 部分和参数定义) ...

###--------------------修正后的 KMC 模拟----------##########

def corrected_MC_sim(total_simulation_time, record_interval=None):
    """
    执行修正后的 Kinetic Monte Carlo 模拟 (Gillespie Algorithm)。

    Args:
        total_simulation_time (float): 模拟的总物理时间。
        record_interval (float, optional): 记录数据的时间间隔。
                                          如果为 None,则只返回最终状态。
                                          建议设置一个值来观察时间演化。

    Returns:
        tuple:
            - n_final (np.ndarray): 模拟结束时的晶格布居数。
            - time_points (list): 记录数据的时间点 (如果 record_interval 不为 None)。
            - n_history (list): 对应时间点的布居数历史 (如果 record_interval 不为 None)。
                                每个元素是 np.ndarray。
    """
    n = np.zeros(lattice_length) # 使用初始条件
    current_time = 0.0

    # 用于记录历史数据
    time_points = []
    n_history = []
    next_record_time = 0.0

    if record_interval is not None:
        time_points.append(current_time)
        n_history.append(n.copy())
        next_record_time = record_interval

    while current_time < total_simulation_time:
        rates = []
        event_list = []
        event_updates = [] # 存储每个事件如何改变状态 n

        # 计算边界事件速率和更新规则
        # 从格点 0 跳到左库 (减少 n[0])
        rate_0L = kappaL * n[0] * (1 + nLeft) # 注意: rate_01 在原代码意为从格点1到库,这里调整命名 L代表Left Reservoir
        if rate_0L > 1e-9: # 避免速率过小带来的数值问题
            rates.append(rate_0L)
            event_list.append("0->L")
            event_updates.append((0, -1))

        # 从左库跳到格点 0 (增加 n[0])
        rate_L0 = kappaL * nLeft * (1 + n[0])
        if rate_L0 > 1e-9:
            rates.append(rate_L0)
            event_list.append("L->0")
            event_updates.append((0, +1))

        # 从右库跳到最后一个格点 (增加 n[lattice_length-1])
        rate_RLast = kappaR * nRight * (1 + n[lattice_length - 1]) # rate_LastR 在原代码指从右库来
        if rate_RLast > 1e-9:
            rates.append(rate_RLast)
            event_list.append("R->Last")
            event_updates.append((lattice_length - 1, +1))

        # 从最后一个格点跳到右库 (减少 n[lattice_length-1])
        rate_LastR = kappaR * n[lattice_length - 1] * (1 + nRight) # rate_RLast 在原代码指跳向右库
        if rate_LastR > 1e-9:
            rates.append(rate_LastR)
            event_list.append("Last->R")
            event_updates.append((lattice_length - 1, -1))

        # 计算内部跳跃速率和更新规则
        for i in range(lattice_length - 1):
            # 从 i+1 跳到 i (左跳, 增加 n[i], 减少 n[i+1])
            rate_L_hop = GammaL * n[i + 1] * (1 + n[i])
            if rate_L_hop > 1e-9:
                rates.append(rate_L_hop)
                event_list.append(f"{i+1}->{i}")
                event_updates.append(((i, +1), (i + 1, -1)))

            # 从 i 跳到 i+1 (右跳, 减少 n[i], 增加 n[i+1])
            rate_R_hop = GammaR * n[i] * (1 + n[i + 1])
            if rate_R_hop > 1e-9:
                rates.append(rate_R_hop)
                event_list.append(f"{i}->{i+1}")
                event_updates.append(((i, -1), (i + 1, +1)))

        # 计算总速率
        sumRates = np.sum(rates)

        # 如果总速率为 0 (系统冻结),提前结束或跳过时间步
        if sumRates < 1e-10:
            # 可以选择直接跳到结束时间,或认为系统已达稳态
            # print(f"Warning: sumRates near zero at time {current_time}. Simulation might be stuck.")
            current_time = total_simulation_time # 直接跳到结束
            break # 或者直接退出循环

        # 计算时间步长 dt
        dt = -np.log(np.random.uniform(0.0, 1.0)) / sumRates

        # 记录数据(如果需要且时间到达)
        if record_interval is not None:
            while current_time + dt >= next_record_time and next_record_time <= total_simulation_time :
                 # 需要插值或者记录刚好越过记录点前的状态
                 # 简单起见,这里记录dt发生前的状态,对应上一个时间点之后、此次事件之前的状态
                 # 如果需要更精确的时间点插值会更复杂
                 time_points.append(next_record_time)
                 n_history.append(n.copy())
                 next_record_time += record_interval

        # 推进时间
        current_time += dt

        # 如果已经超过总时间,结束本次模拟
        if current_time > total_simulation_time:
            break

        # 选择事件
        probabilities = rates / sumRates
        chosen_event_index = np.random.choice(len(rates), p=probabilities)

        # 更新状态
        update_info = event_updates[chosen_event_index]
        if isinstance(update_info[0], tuple): # 内部跳跃事件
            for idx, change in update_info:
                n[idx] += change
        else: # 边界事件
            idx, change = update_info
            n[idx] += change

        # 确保粒子数非负 (虽然物理模型通常保证,但检查一下无妨)
        # n = np.maximum(n, 0) # 如果模型允许n变成负数(不太可能在此模型),需要处理

    # 添加最后的状态和时间点
    if record_interval is not None and time_points[-1] < total_simulation_time:
         time_points.append(total_simulation_time)
         n_history.append(n.copy()) # 记录最终状态

    if record_interval is not None:
        return n.copy(), time_points, n_history # 返回最终状态和历史记录
    else:
        return n.copy(), None, None # 只返回最终状态

进阶使用技巧与注意事项

  • 选择 total_simulation_time : 这个时间需要足够长,确保系统能够达到并维持在稳态。可以先运行一次较长时间的模拟,观察 n(t) 的变化,判断达到稳态所需的时间。ODE 的结果可以提供一个时间尺度的参考。
  • 处理 sumRates == 0 :在某些模型中,系统可能达到一个吸收态或冻结状态,所有速率都变为零。代码中需要有逻辑处理这种情况,避免除以零。上述代码中加入了检查 (if sumRates < 1e-10)。
  • 记录数据策略 :KMC 的时间步长是变化的。如果需要绘制 n 随时间 t 的变化,不能简单地记录每个事件后的状态。推荐方法是:
    • 固定时间间隔记录 :像上面代码示例那样,设定一个 record_interval,当 current_time 跨过某个记录时间点 next_record_time 时,记录下 刚好在那个时间点之前 的状态 n
    • 事件驱动记录 :只记录发生变化的格点的状态和时间。这更节省空间,但后续处理可能复杂些。
  • 数值精度 :当速率非常小时,dt 可能变得非常大。加入一个小的阈值(如 1e-91e-10)来忽略极小的速率,可以避免潜在的数值问题。

确保状态更新准确无误

检查 KMC 代码中更新 n 的逻辑是否完全对应所选的事件。

原理

每个事件都必须精确地反映到 n 数组的变化上。例如,一个粒子从格点 i 跳到 i+1,必须执行 n[i] -= 1n[i+1] += 1。边界事件(与库交换)也要确保只影响相应的边界格点 (n[0]n[lattice_length-1])。

代码检查

原代码的 if/elif 结构看似是正确的,但映射关系依赖 events 整数值和 event_list 字符串的隐式对应。建议使用更明确的方式关联事件和状态更新。

修正后的 corrected_MC_sim 函数使用了 event_updates 列表,直接存储每个事件对应的 (index, change)((index1, change1), (index2, change2)),这样更清晰且不易出错。务必仔细核对每个事件的 event_updates 元组是否正确反映了该事件对粒子数的影响。

安全建议

  • 索引边界 :特别注意 lattice_length - 1 作为最后一个格点的索引,避免数组越界。
  • 更新原子性 :对于跳跃事件,确保增减操作在逻辑上是“同时”完成的(即基于事件发生 的状态计算速率,然后一次性更新两个相关格点的 n 值)。

数据记录与平均

为了得到可靠的稳态结果,需要运行多次独立的 KMC 模拟,并对结果进行平均。

原理

单次 KMC 轨迹是随机的。为了得到期望的平均行为(与 ODE 结果比较),需要:

  1. 运行足够长 :确保单次模拟达到了稳态。
  2. 丢弃暂态 :忽略模拟早期(系统还未达到稳态时)的数据。
  3. 多次采样 :运行 SampleRange 次独立的 KMC 模拟。
  4. 计算平均值
    • 稳态平均值 :对每次模拟在稳态阶段的粒子数 n 进行时间平均,然后再将这些时间平均值在多次模拟间做系综平均。或者,更简单地,取每次模拟运行结束时(确保已达稳态)的 n 值,然后在多次模拟间求平均。
    • 时间演化平均 :如果想看平均的时间演化曲线,需要对齐每次模拟的时间轴(可能需要插值),然后对同一时间点的 n 值求平均。使用固定时间间隔记录 n_history 会使这一步更容易。

代码/操作步骤

# --- 运行多次模拟并进行平均 ---
SampleRange = 2000 # 与原代码保持一致,按需调整
total_simulation_time = 100.0 # 需要选择一个足够长的时间,参考 ODE 结果
record_interval = 1.0 # 每隔 1.0 个物理时间单位记录一次状态

n_final_summed = np.zeros(lattice_length)
all_time_points = []
all_n_histories = []

print(f"Running {SampleRange} KMC simulations up to time {total_simulation_time}...")
for s in range(SampleRange):
    # 为了演示时间演化,我们启用记录
    n_final, time_points, n_history = corrected_MC_sim(total_simulation_time, record_interval)

    n_final_summed += n_final
    if time_points: # 确保有记录
        all_time_points.append(time_points)
        all_n_histories.append(n_history)
    # 进度提示
    if (s + 1) % (SampleRange // 10) == 0:
        print(f"Completed {s + 1}/{SampleRange} simulations.")

# 计算稳态平均布居数
n_final_avg = n_final_summed / SampleRange

# --- 处理和绘制时间演化平均 (需要对齐时间点,这里做个简化处理) ---
# 假设所有模拟都跑到了 total_simulation_time 且 record_interval 固定
# 我们需要找到一个共同的时间网格,并对每个网格点上的布居数做平均
if all_time_points:
    # 创建一个标准的时间轴
    common_time_grid = np.arange(0, total_simulation_time + record_interval/2, record_interval) # 包含终点
    nInTimeAvg = np.zeros((len(common_time_grid), lattice_length))
    counts = np.zeros(len(common_time_grid)) # 记录每个时间点有多少次模拟贡献了数据

    for s in range(SampleRange):
        sim_times = np.array(all_time_points[s])
        sim_history = np.array(all_n_histories[s])

        # 对于每个标准时间点,找到最近的模拟记录点(或进行插值)
        # 简单方法:对每个标准时间点 t_grid,找 sim_times 中 <= t_grid 的最大值对应的索引
        indices = np.searchsorted(sim_times, common_time_grid, side='right') - 1
        indices = np.maximum(0, indices) # 处理开始时间之前的请求

        valid_mask = indices < len(sim_history) # 确保索引有效

        nInTimeAvg[valid_mask] += sim_history[indices[valid_mask]]
        counts[valid_mask] += 1

    # 防止除零
    counts[counts == 0] = 1
    nInTimeAvg /= counts[:, np.newaxis] # 广播除法

    # --- 绘图 ---
    plt.figure(figsize=(12, 7))
    cmap = cm.terrain(np.linspace(0, 0.8, lattice_length)) # 复用颜色映射
    for i in range(lattice_length):
        plt.plot(common_time_grid, nInTimeAvg[:, i], label=f'Site {i}', color=cmap[i])
    plt.legend()
    plt.title("Averaged Time Traces (KMC)")
    plt.xlabel("Time")
    plt.ylabel("Average Population")
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.show()


# 绘制 KMC 稳态平均结果
plt.figure()
plt.plot(lattice, n_final_avg, 'o-', label='KMC Steady State Avg.')
plt.plot(lattice, steady_state, 's--', label='ODE Steady State') # 对比 ODE 结果
plt.xlabel("Lattice Site")
plt.ylabel("Population")
plt.title("Comparison of Steady State Populations")
plt.legend()
plt.grid(True, linestyle='--', alpha=0.6)
plt.show()

进阶技巧

  • 平衡态检测 :可以实现更复杂的逻辑来自动检测模拟是否达到稳态,例如观察某段时间内布居数的波动是否稳定在某个水平。
  • 时间相关性 :分析达到稳态后,某个格点粒子数随时间变化的自相关函数,可以了解系统的弛豫时间尺度。

对比修正后的结果

执行完上述修正后的 KMC 代码和平均过程,绘制出的 n_final_avg 应该与 ODE 解算出的 steady_state 非常接近(在统计误差范围内)。同时,绘制的 nInTimeAvg 时间演化曲线也应该展现出与 ODE 曲线相似的趋向稳态的过程。如果结果仍然不符,可能需要再次检查:

  • 速率公式 :KMC 和 ODE 中的速率项(包括 (1+n) 因子)是否完全一致?
  • 参数设置GammaL, GammaR, kappaL, kappaR, nLeft, nRight, lattice_length 等参数是否两边都设置正确?
  • 初始条件 :ODE 和 KMC 是否使用了相同的 initial_condition

通过以上步骤,重点修正时间推进机制,并辅以仔细的状态更新核对和正确的平均方法,应该能够解决 KMC 模拟结果与 ODE 不符的问题,让你在一维晶格上成功执行动力学蒙特卡洛模拟。