增量计算EWM:新行数据不重算
2025-01-07 15:39:11
基于增量数据的指数加权移动平均计算
问题:追加新行后能否继续使用 EWM 计算
指数加权移动平均 (EWM) 在量化金融中是常用的技术指标,尤其适用于处理时序数据。 使用 pandas
库可以便捷计算 EWM,通过 ewm(...).mean()
函数可以一次性完成对整个时间序列的计算。但当有新数据行追加到已有的数据集中时,如果每次都对所有数据重新计算 EWM,会导致计算资源的浪费。 本文分析了这个问题,并提供了几种有效的解决方案。
挑战:批量处理与实时处理的差异
在历史数据回溯分析中,通常可以使用 pandas
直接对整个数据集计算 EWM。 但在实时系统中,新数据逐条到达,为了性能考虑,应避免重复计算已经处理过的数据, 这要求计算方法能利用之前的 EWM 计算结果,对新数据进行增量计算。 如何将批量计算 EWM 的方法平滑过渡到实时增量计算场景,是一项挑战。
解决方案一:手动计算 EWM
EWM 的核心思想是对旧值进行衰减加权,然后加入当前值的新权重,它可以用以下公式表示:
EWMA(t) = (1 - α) * EWMA(t-1) + α * value(t)
其中 α 是平滑系数,取决于 halflife
(半衰期) 参数。我们可以通过如下步骤实现对新数据行的 EWM 手动计算:
- 计算衰减系数: 使用
alpha = 1 - np.exp(-np.log(2) / halflife)
从半衰期参数计算 alpha 系数。 - 使用上一轮EWM值: 获取数据框中上一次计算出的 EWM 值(如果没有则使用新数据值本身)。
- 根据公式更新EWM值: 利用上一轮 EWM 值和新的数据点,计算最新的 EWM 值。
import numpy as np
import pandas as pd
def calculate_ewm_alpha(halflife):
return 1 - np.exp(-np.log(2) / halflife)
def update_ewma(df, new_value, halflife, prev_ewma_value=None):
"""增量更新EWM,使用传入的pre_ewma_value或上一行的ewma值."""
alpha = calculate_ewm_alpha(halflife)
if prev_ewma_value is not None:
last_ewma = prev_ewma_value
else:
last_ewma = df["ewma"].iloc[-1] if not df.empty else new_value #使用last row ewm值 or 初始化new_value
current_ewma = (1-alpha) * last_ewma + alpha * new_value
return current_ewma
# 假设这是已有的数据框和它的 EWM 值
dates = pd.date_range('2021-01-01', '2022-01-01')
T = (dates.max()-dates.min()).days / 365
N = len(dates)
mu = 0.03
sigma = 0.5
S0 = 100
df = pd.DataFrame(index=dates, data={'value': np.random.randn(len(dates)) * sigma* np.sqrt(T/N) + mu * T / N +S0 })
halflife = 30
df['ewma'] = df['value'].ewm(halflife=halflife).mean()
# 新加入的行
df_new = pd.DataFrame(index=[pd.Timestamp('2022-01-02')], data={'value':105})
# 更新 EWMA 值
new_ewma = update_ewma(df, df_new["value"].iloc[0], halflife) # use new_value, with or w/o last ewm
# 将新行加入 DataFrame
df = pd.concat([df, df_new])
# 将计算的 EWMA 加入到 DataFrame
df.loc[df_new.index, "ewma"] = new_ewma
print(df.tail())
使用这个方法可以精确地完成增量EWM计算,但也需要在代码中手动管理中间变量和数据流。
解决方案二:使用 Pandas 内部状态
pandas
在执行 ewm().mean()
时内部维护了一个状态。虽然 Pandas 并没有直接公开访问这个状态, 但我们可以利用 Pandas 的分组功能来“伪造”出一个增量更新的效果,这个技巧是比较高效并且推荐的:
- 使用初始 DataFrame 计算 EWM: 和之前一样,首先用初始数据计算完整的 EWM 值, 将会用到的
halflife
以及其内部的com
字段(可以通过 halflife 计算),这些数据保存下来,在后续新增数据的计算中会被使用。 - 新建 DataFrame 并扩展: 将新行加入一个仅包含单行的DataFrame。
- 构造用于 pandas 拼接的列: 在新数据帧中加入 'group_id' 列,值为上一个数据的 'group_id' +1,注意此处分组ID应该比已有数据的ID值都大。
- 使用 apply 合并DataFrame 在已有df中,拼接新增的单行df,并且利用分组和
apply()
函数继续计算 ewma。
import numpy as np
import pandas as pd
#模拟数据
def gbm(T, N, mu, sigma, S0):
dt = float(T)/N
t = np.linspace(0, T, N)
W = np.random.standard_normal(size = N)
W = np.cumsum(W)*np.sqrt(dt)
X = (mu-0.5*sigma**2)*t + sigma*W
S = S0*np.exp(X)
return S
dates = pd.date_range('2021-01-01', '2022-01-01')
T = (dates.max()-dates.min()).days / 365
N = len(dates)
mu = 0.03
sigma = 0.5
S0 = 100
df = pd.DataFrame(index=dates, data={'value': gbm(T, N, mu, sigma, S0)})
halflife = 30
# 计算初始 EWM
ewm_initial = df['value'].ewm(halflife=halflife)
df['ewma'] = ewm_initial.mean()
# 保存用于复现eaw内部参数的com 值,这里是利用halflife参数推算出来的
com_value = ewm_initial.com[0]
# 获取最新日期
last_date = df.index[-1]
# 模拟新数据,注意:这里需要保证 新增行index的唯一性和连贯性
df_new = pd.DataFrame(index=[last_date+ pd.Timedelta(days=1)], data={'value':105})
new_date = df_new.index[0]
# 计算当前最新的 group id,注意: 必须大于历史group id的值
new_group_id = 1 if not ('group_id' in df.columns ) else df['group_id'].max() + 1
df_new["group_id"] = new_group_id
df["group_id"] = 0
#拼接df数据,此时还未完成计算ewma值
df_full = pd.concat([df, df_new])
def apply_ewm(group):
if group["group_id"].max() ==0: #只执行对现有dataframe执行
return group["value"].ewm(com =com_value).mean()
if len(group) > 1: #存在多个group
last_ewma = group[group["group_id"] == 0]["value"].ewm(com = com_value).mean().iloc[-1] if len(group[group["group_id"] == 0])> 0 else None #取原始数据里ewma的最后一行,没有就返回None
else:
last_ewma = None #新数据为第一条,返回none
ewm_res = group[group["group_id"] != 0]["value"].ewm(com = com_value, initial = last_ewma).mean()
return pd.concat([group[group["group_id"] == 0],ewm_res.to_frame(name = "value")]) #返回合并数据
# 根据 group ID分组,然后计算 EWM
result = df_full.groupby('group_id').apply(apply_ewm).reset_index(drop=True)
# 更新新加入的 EWM 列, 根据group ID 分组再获取值
df.loc[df_new.index, "ewma"]= result["value"].loc[result["group_id"] ==new_group_id ].iloc[0]
# 打印结果
print(df.tail())
该方法避免了重新计算所有历史数据,其核心是通过groupby()
结合 apply()
函数模拟出增量EWM计算的行为,有效复用 pandas
内部 EWM 的计算逻辑,效率更高,且不需要我们手动维护变量。
安全建议
在实现 EWM 的增量计算时,务必:
- 验证公式的正确性。
- 考虑数值稳定性问题,特别是在数据量很大或数值范围很广的时候。
- 测试各种数据情况,特别是边界条件。
- 在生产环境使用之前,仔细审查代码并进行压力测试。
总结
本文提供了两种有效解决 pandas
EWM 在增量计算场景下的问题的方案。 开发者可根据实际需求和性能要求选择合适的方案。 推荐使用第二种方案,因为它通过 pandas 的 groupby()
方法重用了其自身的 EWM 逻辑,效率较高且更为简洁。
通过本文,可以更好地处理时序数据的 EWM 计算,使你的程序更加高效。