返回

增量计算EWM:新行数据不重算

python

基于增量数据的指数加权移动平均计算

问题:追加新行后能否继续使用 EWM 计算

指数加权移动平均 (EWM) 在量化金融中是常用的技术指标,尤其适用于处理时序数据。 使用 pandas 库可以便捷计算 EWM,通过 ewm(...).mean() 函数可以一次性完成对整个时间序列的计算。但当有新数据行追加到已有的数据集中时,如果每次都对所有数据重新计算 EWM,会导致计算资源的浪费。 本文分析了这个问题,并提供了几种有效的解决方案。

挑战:批量处理与实时处理的差异

在历史数据回溯分析中,通常可以使用 pandas 直接对整个数据集计算 EWM。 但在实时系统中,新数据逐条到达,为了性能考虑,应避免重复计算已经处理过的数据, 这要求计算方法能利用之前的 EWM 计算结果,对新数据进行增量计算。 如何将批量计算 EWM 的方法平滑过渡到实时增量计算场景,是一项挑战。

解决方案一:手动计算 EWM

EWM 的核心思想是对旧值进行衰减加权,然后加入当前值的新权重,它可以用以下公式表示:
EWMA(t) = (1 - α) * EWMA(t-1) + α * value(t)

其中 α 是平滑系数,取决于 halflife (半衰期) 参数。我们可以通过如下步骤实现对新数据行的 EWM 手动计算:

  1. 计算衰减系数: 使用 alpha = 1 - np.exp(-np.log(2) / halflife) 从半衰期参数计算 alpha 系数。
  2. 使用上一轮EWM值: 获取数据框中上一次计算出的 EWM 值(如果没有则使用新数据值本身)。
  3. 根据公式更新EWM值: 利用上一轮 EWM 值和新的数据点,计算最新的 EWM 值。
import numpy as np
import pandas as pd

def calculate_ewm_alpha(halflife):
    return 1 - np.exp(-np.log(2) / halflife)

def update_ewma(df, new_value, halflife, prev_ewma_value=None):
  """增量更新EWM,使用传入的pre_ewma_value或上一行的ewma值."""

  alpha = calculate_ewm_alpha(halflife)

  if prev_ewma_value is not None:
    last_ewma = prev_ewma_value
  else:
    last_ewma = df["ewma"].iloc[-1] if not df.empty else new_value #使用last row ewm值 or 初始化new_value

  current_ewma = (1-alpha) * last_ewma + alpha * new_value

  return current_ewma


# 假设这是已有的数据框和它的 EWM 值
dates = pd.date_range('2021-01-01', '2022-01-01')
T = (dates.max()-dates.min()).days / 365
N = len(dates)
mu = 0.03
sigma = 0.5
S0 = 100
df = pd.DataFrame(index=dates, data={'value': np.random.randn(len(dates)) * sigma* np.sqrt(T/N) + mu * T / N +S0  })
halflife = 30
df['ewma'] = df['value'].ewm(halflife=halflife).mean()

# 新加入的行
df_new = pd.DataFrame(index=[pd.Timestamp('2022-01-02')], data={'value':105})

# 更新 EWMA 值
new_ewma = update_ewma(df, df_new["value"].iloc[0], halflife) # use new_value, with or w/o last ewm

# 将新行加入 DataFrame
df = pd.concat([df, df_new])

# 将计算的 EWMA 加入到 DataFrame
df.loc[df_new.index, "ewma"] = new_ewma

print(df.tail())

使用这个方法可以精确地完成增量EWM计算,但也需要在代码中手动管理中间变量和数据流。

解决方案二:使用 Pandas 内部状态

pandas 在执行 ewm().mean() 时内部维护了一个状态。虽然 Pandas 并没有直接公开访问这个状态, 但我们可以利用 Pandas 的分组功能来“伪造”出一个增量更新的效果,这个技巧是比较高效并且推荐的:

  1. 使用初始 DataFrame 计算 EWM: 和之前一样,首先用初始数据计算完整的 EWM 值, 将会用到的 halflife 以及其内部的 com 字段(可以通过 halflife 计算),这些数据保存下来,在后续新增数据的计算中会被使用。
  2. 新建 DataFrame 并扩展: 将新行加入一个仅包含单行的DataFrame。
  3. 构造用于 pandas 拼接的列: 在新数据帧中加入 'group_id' 列,值为上一个数据的 'group_id' +1,注意此处分组ID应该比已有数据的ID值都大。
  4. 使用 apply 合并DataFrame 在已有df中,拼接新增的单行df,并且利用分组和 apply() 函数继续计算 ewma。
import numpy as np
import pandas as pd

#模拟数据
def gbm(T, N, mu, sigma, S0):        
    dt = float(T)/N
    t = np.linspace(0, T, N)
    W = np.random.standard_normal(size = N) 
    W = np.cumsum(W)*np.sqrt(dt) 
    X = (mu-0.5*sigma**2)*t + sigma*W 
    S = S0*np.exp(X)
    return S

dates = pd.date_range('2021-01-01', '2022-01-01')
T = (dates.max()-dates.min()).days / 365
N = len(dates)
mu = 0.03
sigma = 0.5
S0 = 100
df = pd.DataFrame(index=dates, data={'value': gbm(T, N, mu, sigma, S0)}) 
halflife = 30


# 计算初始 EWM
ewm_initial = df['value'].ewm(halflife=halflife)
df['ewma'] = ewm_initial.mean()


# 保存用于复现eaw内部参数的com 值,这里是利用halflife参数推算出来的
com_value = ewm_initial.com[0]

# 获取最新日期
last_date = df.index[-1]


# 模拟新数据,注意:这里需要保证 新增行index的唯一性和连贯性
df_new = pd.DataFrame(index=[last_date+ pd.Timedelta(days=1)], data={'value':105})
new_date = df_new.index[0]

# 计算当前最新的 group id,注意:  必须大于历史group id的值
new_group_id = 1 if not  ('group_id' in df.columns ) else df['group_id'].max() + 1


df_new["group_id"] = new_group_id
df["group_id"] = 0

#拼接df数据,此时还未完成计算ewma值
df_full = pd.concat([df, df_new])

def apply_ewm(group):

  if group["group_id"].max() ==0: #只执行对现有dataframe执行
      return group["value"].ewm(com =com_value).mean()
  
  if  len(group) > 1: #存在多个group
      last_ewma =  group[group["group_id"] == 0]["value"].ewm(com = com_value).mean().iloc[-1] if len(group[group["group_id"] == 0])> 0  else None #取原始数据里ewma的最后一行,没有就返回None
  else:
       last_ewma = None  #新数据为第一条,返回none

  ewm_res =  group[group["group_id"] != 0]["value"].ewm(com = com_value, initial =  last_ewma).mean()
  return pd.concat([group[group["group_id"] == 0],ewm_res.to_frame(name = "value")])  #返回合并数据


# 根据 group ID分组,然后计算 EWM
result = df_full.groupby('group_id').apply(apply_ewm).reset_index(drop=True)

# 更新新加入的 EWM 列, 根据group ID 分组再获取值
df.loc[df_new.index, "ewma"]= result["value"].loc[result["group_id"] ==new_group_id ].iloc[0]

# 打印结果
print(df.tail())

该方法避免了重新计算所有历史数据,其核心是通过groupby()结合 apply()函数模拟出增量EWM计算的行为,有效复用 pandas 内部 EWM 的计算逻辑,效率更高,且不需要我们手动维护变量。

安全建议

在实现 EWM 的增量计算时,务必:

  • 验证公式的正确性。
  • 考虑数值稳定性问题,特别是在数据量很大或数值范围很广的时候。
  • 测试各种数据情况,特别是边界条件。
  • 在生产环境使用之前,仔细审查代码并进行压力测试。

总结

本文提供了两种有效解决 pandas EWM 在增量计算场景下的问题的方案。 开发者可根据实际需求和性能要求选择合适的方案。 推荐使用第二种方案,因为它通过 pandas 的 groupby() 方法重用了其自身的 EWM 逻辑,效率较高且更为简洁。

通过本文,可以更好地处理时序数据的 EWM 计算,使你的程序更加高效。