返回

突破3000订阅限制:动态管理金融工具WebSocket数据流

python

突破 3000 订阅限制:动态管理金融工具 WebSocket 数据流

当处理大量的金融工具数据流时,经常会遇到 API 的订阅限制。比如 KiteConnect API 限制 WebSocket 连接最多订阅 3000 个 instruments。 如果需要监控超过 3000 个(比如 10000 个) 并且每日变化的 instruments 列表,就需要一个有效的动态订阅/取消订阅机制。

本文将探讨如何通过轮询和分组等方法解决 KiteConnect API 3000 订阅限制的问题,实现对超大规模 instruments 列表的高效数据监控。

问题分析

KiteConnect API 的 WebSocket 接口通过 subscribeunsubscribe 函数管理订阅。直接订阅超过 3000 个 instruments 会导致错误。问题的核心在于如何在有限的订阅额度内,动态地轮换订阅列表,以覆盖所有需要监控的 instruments。 考虑到 API 有连接频率限制,同时要兼顾数据更新的实时性,需要在两者之间找到平衡点。

解决方案

以下将提供几种解决方案来应对 3000 instruments 订阅限制。

1. 轮询分组

将 10000 个 instruments 分成若干组,每组不超过 3000 个,然后周期性地轮流订阅这些分组。

  • 原理: 将 instruments 列表分割,每次只订阅其中一部分,通过循环切换实现所有 instruments 的覆盖。
  • 优点: 实现简单,易于理解和维护。
  • 缺点: 数据更新频率较低,每个 instrument 的数据更新周期与分组数量成正比。

代码示例及步骤:

import logging
import time
from kiteconnect import KiteTicker

instrument_list = list(range(1, 10001))  # 假设有 10000 个 instruments
CHUNK_SIZE = 2500 #  每组大小略小于 3000 ,预留一定缓冲空间
REFRESH_INTERVAL = 10 # 每 10 秒刷新一次

logging.basicConfig(level=logging.INFO)

kws = KiteTicker("my_api", "my_access_token")

def on_ticks(ws, ticks):
    logging.debug("Received ticks: %s", ticks)

def on_connect(ws, response):
    logging.info("Connected to Kite Ticker")
    subscribe_to_next_chunk(ws)

def on_close(ws, code, reason):
    logging.warning("Connection closed: %s - %s", code, reason)
    ws.stop()

def subscribe_to_next_chunk(ws):
    """订阅下一组 instruments"""
    start_index = ws.current_index if hasattr(ws, 'current_index') else 0
    end_index = min(start_index + CHUNK_SIZE, len(instrument_list))
    current_chunk = instrument_list[start_index:end_index]

    if ws.subscribed_tokens: # 如果存在已订阅的instruments,则先取消订阅
        ws.unsubscribe(ws.subscribed_tokens)

    ws.subscribe(current_chunk)
    ws.set_mode(ws.MODE_FULL, current_chunk) # 设置为完整模式
    ws.subscribed_tokens = current_chunk # 保存已订阅的instruments
    
    logging.info("Subscribed to instruments: %s - %s", start_index+1, end_index)

    ws.current_index = end_index
    if ws.current_index >= len(instrument_list):
        ws.current_index = 0 # 到达列表末尾则循环

    #  安排下一次订阅
    ws.next_subscribe_time = time.time() + REFRESH_INTERVAL

def on_tick(ws,ticks):
    """接收到Tick 数据后的回调函数 """
    logging.debug("Ticks received: %s", ticks)
    if time.time() >= ws.next_subscribe_time: #检查是否需要切换订阅
        subscribe_to_next_chunk(ws)

kws.on_ticks = on_tick
kws.on_connect = on_connect
kws.on_close = on_close
kws.connect()

while True:
    time.sleep(1)
    if not kws.is_connected():
      logging.info("Reconnecting...")
      kws.connect()
      time.sleep(5)

操作步骤:

  1. my_apimy_access_token 替换为你的 KiteConnect API 密钥和访问令牌。
  2. 根据实际 instrument 数量调整 CHUNK_SIZE
  3. 运行代码,程序将每 10 秒轮流订阅不同的 instruments 分组,并打印接收到的 ticks 数据。
  4. 考虑到网络波动及API的稳定性,加入了断线重连逻辑。当 Websocket 连接断开时,程序会尝试重新连接。

安全建议:

  • API 密钥和访问令牌应妥善保管,避免泄露。
  • 合理设置 REFRESH_INTERVAL ,避免过于频繁的订阅和取消订阅操作,这可能导致触发 API 的限流机制。
  • 日志记录应包含足够的信息,以便于故障排查。可以根据实际需求调整日志级别。

2. 优先级队列

如果不同的 instruments 有不同的重要程度,可以使用优先级队列来管理订阅列表,优先订阅重要的 instruments 。

  • 原理: 将 instruments 按照优先级排序,维护一个固定长度的订阅队列。每次按优先级从队列中取出 3000 个 instruments 进行订阅。
  • 优点: 可以优先保证高优先级 instruments 的数据更新频率。
  • 缺点: 实现相对复杂,需要维护一个优先级队列,并动态更新队列。 低优先级的instruments数据更新延迟会更高。

实现思路:

  1. 构建一个带优先级信息的 instrument 列表,例如 [(instrument_id, priority), ... ]。
  2. 使用 Python 的 heapq 模块实现一个优先级队列。
  3. 周期性地从队列中取出优先级最高的 3000 个 instruments 进行订阅。
  4. 根据 instruments 的优先级变化或新增 instruments,动态更新优先级队列。

限于篇幅,此处不再提供完整的代码示例。

3. 动态调整分组大小

根据 instruments 的活跃程度动态调整分组大小,活跃的 instruments 分到更小的组,保证更高的更新频率。

  • 原理: 通过监控 instruments 的交易量或价格波动等指标,判断其活跃程度。 活跃 instruments 组成较小的分组,并分配更多的订阅时间。
  • 优点: 兼顾了数据更新频率和覆盖范围,可以更有效地利用有限的订阅额度。
  • 缺点: 实现更复杂,需要额外的逻辑来判断 instruments 的活跃程度。

实现思路:

  1. 维护一个 instruments 活动度统计信息,如交易量、价格变动等。
  2. 根据活动度指标,将 instruments 分为不同的活跃度等级。
  3. 活跃度高的 instruments 分成更小的组,并增加订阅时间占比。
  4. 定期重新评估 instruments 的活动度,并调整分组和订阅策略。

相关资源:

总结:

本文介绍了三种解决 KiteConnect API 3000 订阅限制的方法。 轮询分组方法实现简单,适合对数据更新频率要求不高的场景。优先级队列方法可以保证高优先级 instruments 的数据更新,适用于需要区分 instrument 重要性的场景。动态调整分组大小方法则可以更有效地利用订阅额度,但实现也更复杂。 实际应用中需要根据具体需求选择合适的解决方案,或将多种方法结合使用,以达到最佳效果。