毫秒级响应:实时行情到策略信号的全链路架构设计
数据来了,然后呢?
这是大多数量化系统最容易被忽视的一环。
你能拿到实时行情——盘口变化、成交推送、K线合成——这些都不难。但数据到了之后,如何让策略在毫秒级别做出反应、如何避免信号抖动导致的频繁开平仓、如何在整个链路中保持稳定性和可观测性,才是真正区分“回测能跑”和“实盘能用”的分水岭。
本文不聊策略本身。我们只讨论一件事:从 TickDB 推送过来的原始行情,到你的策略引擎发出交易信号,这中间的系统架构该怎么搭。
一、为什么实时数据接入不是一个“接 API”那么简单
很多人把行情接入理解为“调一个接口,拿到数据,触发计算”。这个模型在回测环境中足够用,但在实盘中会暴露三个致命问题:
第一,推送数据的流量是不均匀的。
热门标的在财报发布前后,订单簿更新频率可能从每秒几十次飙升到每秒上千次。如果你的策略触发计算没有任何缓冲机制,高频数据会直接打爆 CPU。更糟糕的是,在波动最剧烈的时刻,你的策略反而因为计算排队而错失最佳执行窗口。
第二,策略回调的时序是不可控的。
Python 的 GIL 意味着多线程回调存在竞争条件,同一时刻多个数据到达可能导致回调乱序执行。如果策略内部维护了状态(比如持仓记录、窗口均值),乱序回调会让状态计算结果完全错误。
第三,信号抖动是真实存在的工程问题。
以一个简单的均线突破策略为例:价格在上穿均线的瞬间,行情数据可能在 10ms 内连续推送 5 次更新,其中 3 次满足突破条件、2 次不满足。如果不加处理,策略可能在同一分钟内发出 3 次相互矛盾的信号。
这三个问题的根源在于:数据推送是异步的、高频的、不可靠的,而策略计算期望是同步的、低频的、确定的。 中间必须有一层架构来完成这个转化。
二、整体架构:三层分离原则
解决上述问题的核心思路是架构分层。我不推荐把数据接收、消息队列、策略计算全部写在同一个进程里——短期看代码量少,长期看扩展性和稳定性都是灾难。
一个生产可用的实盘架构通常分为三层:
┌─────────────────────────────────────────────────────────┐
│ Layer 1: 数据接入层 │
│ TickDB WebSocket ──→ 数据接收 + 心跳保活 + 重连 │
└──────────────────────────┬──────────────────────────────┘
│ 标准化消息
▼
┌─────────────────────────────────────────────────────────┐
│ Layer 2: 消息队列层 │
│ 内存队列 / Redis Stream / ZeroMQ ──→ 缓冲 + 限频 + 分发 │
└──────────────────────────┬──────────────────────────────┘
│ 消费事件
▼
┌─────────────────────────────────────────────────────────┐
│ Layer 3: 策略引擎层 │
│ 信号生成 ──→ 防抖处理 ──→ 风控检查 ──→ 信号输出 │
└─────────────────────────────────────────────────────────┘
分层的好处是每层的关注点完全隔离:接入层负责稳定性和重连,队列层负责缓冲和限频,策略层负责业务逻辑。当 TickDB 服务端出现短暂抖动时,队列层的缓冲可以兜底;当策略计算量激增时,队列层的积压可以被监控和告警。
接下来我们逐层展开,重点看每一层的工程实现细节。
三、Layer 1:数据接入——WebSocket 稳定连接实战
3.1 为什么要用 WebSocket 而非 REST 轮询
在实盘场景下,REST 轮询有三个根本性缺陷:轮询间隔固定,无法捕捉两次轮询之间的瞬时变化;每次请求都有网络往返开销,在低延迟策略中这是不可接受的;服务端无法主动推送,大量无效请求浪费带宽和配额。
WebSocket 建立一次连接后,服务端可以主动推送数据,延迟可以控制在 100ms 以内(取决于网络质量)。对于需要捕捉订单簿深度变化或成交密度变化的策略,这是必须的。
3.2 生产级 WebSocket 客户端
以下是一个可以直接在生产环境中使用的 WebSocket 接入模块。它包含了三个核心工程能力:心跳保活、指数退避重连、以及限频自适应处理。
import os
import json
import time
import random
import threading
import websocket
from typing import Callable, Optional
from dataclasses import dataclass, field
@dataclass
class WebSocketConfig:
"""WebSocket 连接配置"""
url: str
api_key: str
ping_interval: int = 20 # 心跳间隔(秒)
ping_timeout: int = 10 # 心跳超时(秒)
connect_timeout: int = 5 # 连接超时(秒)
max_retries: int = 10
base_retry_delay: float = 1.0
max_retry_delay: float = 60.0
on_message: Optional[Callable[[dict], None]] = None
on_connect: Optional[Callable[[], None]] = None
on_disconnect: Optional[Callable[[Exception | None], None]] = None
class TickDBWebSocketClient:
"""
TickDB WebSocket 客户端(生产级)
包含:心跳保活、指数退避重连、限频自适应处理
⚠️ 高频场景建议使用 aiohttp/asyncio 重构
"""
def __init__(self, config: WebSocketConfig):
self.config = config
self._ws: Optional[websocket.WebSocketApp] = None
self._thread: Optional[threading.Thread] = None
self._running = False
self._retry_count = 0
self._last_pong_time: float = 0
self._rate_limit_until: float = 0 # 限频等待截止时间
def connect(self, subscriptions: list[dict]) -> None:
"""
启动 WebSocket 连接并订阅指定频道
Args:
subscriptions: 订阅配置列表
示例: [{"channel": "depth", "symbol": "AAPL.US"},
{"channel": "trade", "symbol": "AAPL.US"}]
"""
self._running = True
self._retry_count = 0
self._thread = threading.Thread(target=self._run_loop, args=(subscriptions,))
self._thread.daemon = True
self._thread.start()
def _run_loop(self, subscriptions: list[dict]) -> None:
while self._running and self._retry_count <= self.config.max_retries:
try:
# WebSocket 鉴权通过 URL 参数传递
ws_url = f"{self.config.url}?api_key={self.config.api_key}"
headers = {
"Content-Type": "application/json",
}
self._ws = websocket.WebSocketApp(
ws_url,
header=headers,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
on_pong=self._on_pong,
)
# 设置连接超时
self._ws.timeout = self.config.connect_timeout
# 启动心跳线程
ping_thread = threading.Thread(target=self._ping_loop, daemon=True)
ping_thread.start()
# 建立连接并发送订阅请求
self._ws.run_forever(
ping_interval=self.config.ping_interval,
ping_timeout=self.config.ping_timeout,
)
except Exception as e:
self.config.on_disconnect and self.config.on_disconnect(e)
self._schedule_reconnect(subscriptions)
def _ping_loop(self) -> None:
"""心跳保活:定期发送 ping 并检测响应"""
while self._running:
time.sleep(self.config.ping_interval)
if self._ws and self._ws.sock and self._ws.sock.connected:
try:
self._ws.sock.ping()
self._last_pong_time = time.time()
except Exception:
pass
def _on_pong(self, ws, payload=b'') -> None:
"""检测心跳响应,超时则判定连接异常"""
self._last_pong_time = time.time()
def _on_message(self, ws, raw_message: str) -> None:
"""消息处理:区分心跳响应、限频通知和业务数据"""
try:
msg = json.loads(raw_message)
# 处理限频响应(code: 3001)
if msg.get("code") == 3001:
retry_after = int(msg.get("retry_after", 5))
self._rate_limit_until = time.time() + retry_after
print(f"[TickDB] Rate limited. Waiting {retry_after}s before retry.")
time.sleep(retry_after)
return
# 业务数据传递给回调
if self.config.on_message:
self.config.on_message(msg)
except json.JSONDecodeError:
# 非 JSON 消息可能是 pong 确认帧,直接忽略
pass
def _on_error(self, ws, error) -> None:
print(f"[TickDB] WebSocket error: {error}")
def _on_close(self, ws, close_code, close_msg) -> None:
print(f"[TickDB] Connection closed: {close_code} {close_msg}")
self.config.on_disconnect and self.config.on_disconnect(None)
def _schedule_reconnect(self, subscriptions: list[dict]) -> None:
"""指数退避重连:避免频繁重试加重服务端压力"""
self._retry_count += 1
if not self._running:
return
# 指数退避 + 抖动(防止惊群效应)
delay = min(
self.config.base_retry_delay * (2 ** (self._retry_count - 1)),
self.config.max_retry_delay,
)
jitter = random.uniform(0, delay * 0.1)
total_delay = delay + jitter
print(f"[TickDB] Reconnecting in {total_delay:.1f}s (attempt {self._retry_count})")
time.sleep(total_delay)
def subscribe(self, subscriptions: list[dict]) -> None:
"""运行时动态订阅新标的"""
if self._ws and self._ws.sock and self._ws.sock.connected:
msg = json.dumps({"cmd": "subscribe", "data": subscriptions})
self._ws.send(msg)
def disconnect(self) -> None:
self._running = False
if self._ws:
self._ws.close()
if self._thread:
self._thread.join(timeout=2)
工程要点解释:
心跳保活机制不是“发个 ping 就算了”。TickDB 服务端可能因为负载过高而无法及时响应 pong,如果连续两次心跳超时,应该判定连接已失效并主动重连。在上述代码中,_on_pong 记录了最后响应时间,_ping_loop 可以扩展为检测 time.time() - self._last_pong_time > 2 * ping_interval 时触发重连。
限频自适应处理(code: 3001)是一个常被忽略但极其重要的能力。当 TickDB 返回限频错误时,盲目重试会加剧服务端压力,正确的做法是读取 retry_after 字段,等待指定时间后再重试。上述代码在 _on_message 中处理了这个逻辑,并在重连时保持这个等待时间。
环境变量存储 API Key 的原则必须贯彻:永远不要把密钥硬编码在代码中。使用 os.environ.get("TICKDB_API_KEY") 从环境变量读取,在生产环境中通过 Kubernetes Secret 或环境注入的方式配置。
四、Layer 2:消息队列——缓冲与限频的艺术
4.1 为什么需要消息队列
Layer 1 解决了“数据怎么进来”的问题,Layer 2 要解决的是“数据怎么处理”。这里的核心矛盾是:生产端(TickDB)的发送频率是不可控的,但消费端(策略引擎)的处理能力是有限的。
消息队列在这里扮演两个角色:缓冲器和稳压器。
作为缓冲器,队列可以吸收短期内的数据洪峰。假设某个标的在 1 秒内收到了 200 条 depth 更新,但策略只需要每 100ms 计算一次——队列可以把这 200 条数据合并/抽样后再交给策略,避免无意义的重复计算。
作为稳压器,队列可以屏蔽上游的抖动。即使 TickDB 服务端因为维护出现 30 秒的连接中断,队列中已有的消息仍然可以被策略正常消费,直到积压被耗尽。
4.2 队列选型:内存队列 vs Redis Stream vs ZeroMQ
| 方案 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
Python queue.Queue |
单进程、低延迟、策略数量少 | 零依赖、极低延迟 | 进程崩溃数据丢失、无法跨进程共享 |
| Redis Stream | 多进程、需要持久化、需要消费组 | 支持持久化、消费组重平衡 | 引入额外依赖、延迟略高 |
| ZeroMQ | 超低延迟、进程间通信 | 延迟极低、模式丰富 | 配置复杂、运维成本高 |
大多数个人量化开发者,从 queue.Queue 开始是合理的。 只有当你需要多策略进程共享同一个数据源时,才值得引入 Redis Stream。
4.3 基于 queue.Queue 的内存队列实现
以下是一个带容量限制和丢弃策略的内存消息队列:
import queue
import time
import threading
from dataclasses import dataclass
from enum import Enum
from typing import Optional
class OverflowStrategy(Enum):
"""队列满时的溢出策略"""
DROP_OLDEST = "drop_oldest" # 丢弃最旧的消息(默认)
DROP_LATEST = "drop_latest" # 丢弃最新消息
BLOCK = "block" # 阻塞等待(不推荐,可能导致数据滞后)
@dataclass
class MarketMessage:
"""标准化市场消息格式"""
symbol: str
channel: str # "depth", "trade", "kline"
timestamp: float # 事件时间戳(Unix 秒)
sequence: int # 序列号,用于检测乱序
data: dict # 原始数据 payload
def age_ms(self) -> float:
"""计算消息从产生到当前的延迟(毫秒)"""
return (time.time() - self.timestamp) * 1000
class MarketDataQueue:
"""
行情消息队列
支持:
- 定长环形缓冲,防止内存泄漏
- 溢出策略配置
- 生产者/消费者限速同步
"""
def __init__(
self,
maxsize: int = 1000,
overflow: OverflowStrategy = OverflowStrategy.DROP_OLDEST,
):
self._queue: queue.Queue[Optional[MarketMessage]] = queue.Queue(maxsize=maxsize)
self._overflow = overflow
self._dropped_count = 0 # 丢弃计数,用于监控
self._lock = threading.Lock()
def put(self, msg: MarketMessage, block: bool = True) -> bool:
"""
写入消息到队列
Returns:
True: 写入成功
False: 因溢出策略被丢弃
"""
try:
if self._overflow == OverflowStrategy.DROP_OLDEST:
# 非阻塞写入,队列满时丢弃最旧消息
# 使用 get_nowait 模拟环形缓冲行为
try:
self._queue.put_nowait(None) # None 作为丢弃标记
self._dropped_count += 1
except queue.Full:
# 队列已满,丢弃当前消息
self._dropped_count += 1
return False
elif self._overflow == OverflowStrategy.DROP_LATEST:
# 丢弃最新消息,保留旧消息
try:
self._queue.put_nowait(msg)
except queue.Full:
try:
self._queue.get_nowait() # 丢弃最旧消息
self._queue.put_nowait(msg)
self._dropped_count += 1
except queue.Empty:
pass
else:
self._queue.put(msg, block=block)
return True
except queue.Full:
with self._lock:
self._dropped_count += 1
return False
def get(self, timeout: float = 1.0) -> Optional[MarketMessage]:
"""从队列中取出消息,超时返回 None"""
try:
return self._queue.get(timeout=timeout)
except queue.Empty:
return None
def get_stats(self) -> dict:
"""获取队列统计信息(用于监控告警)"""
return {
"size": self._queue.qsize(),
"dropped": self._dropped_count,
"is_near_full": self._queue.qsize() > self._queue.maxsize * 0.8,
}
这里有一个容易被忽视的设计细节:溢出策略的选择。
DROP_OLDEST 适合“实时行情优先”场景——宁可丢弃旧数据,也要保证策略处理的是最新行情。但代价是,如果你的策略依赖完整的成交记录(用于订单流分析),丢弃消息会导致信号失真。
DROP_LATEST 适合“数据完整性优先”场景——保留所有历史数据,但代价是队列可能积压,在极端波动时延迟会逐渐增大。
如何选择:一个实用的经验法则是——如果你的策略是基于 snapshot(快照类指标,如均线、布林带)工作,用 DROP_OLDEST;如果你的策略是基于 tick 级别累计量(成交量加权价格、订单流不平衡)工作,用 DROP_LATEST,并配合告警机制监控积压深度。
五、Layer 3:策略引擎——信号生成与防抖处理
5.1 策略回调的设计原则
策略回调是整个链路中唯一一个与业务逻辑直接相关的环节。它的设计质量直接影响信号的质量。
一个常见的错误写法是这样的:
# ❌ 错误示例:把所有逻辑写在回调里
def on_depth_update(msg):
price = msg['data']['bid1']
volume = msg['data']['bid1_vol']
if price > ma(close_prices, 20):
send_order(...)
这个写法的三个问题:
- 无法单元测试——业务逻辑和消息接收耦合在一起
- 状态不可见——无法在外部监控策略当前的窗口状态
- 错误处理粗糙——任何异常都会导致回调崩溃,进而丢失连接
正确的做法是将策略引擎设计为一个独立组件,通过接口与数据层解耦:
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from collections import deque
import time
@dataclass
class Signal:
"""标准化信号格式"""
symbol: str
direction: str # "long" | "short" | "exit"
strength: float # 0.0 ~ 1.0,信号强度
trigger_price: float
trigger_time: float
metadata: dict = field(default_factory=dict)
class BaseStrategy(ABC):
"""
策略基类
设计原则:
1. 策略状态(窗口、缓存)对外可见
2. 信号输出通过抽象方法,不依赖具体执行层
3. 所有异常在框架层捕获,不向上传播
"""
def __init__(self, symbol: str):
self.symbol = symbol
self._state: dict = {} # 策略内部状态,可被外部查询
self._last_signal_time: float = 0
@abstractmethod
def on_market_data(self, msg: MarketMessage) -> Signal | None:
"""
处理市场数据,返回交易信号(无信号时返回 None)
⚠️ 此方法不得抛出异常,所有异常在框架层处理
"""
pass
def update_state(self, key: str, value) -> None:
"""更新策略状态(供监控和调试使用)"""
self._state[key] = value
def get_state(self) -> dict:
"""获取当前策略状态"""
return {
"symbol": self.symbol,
"last_signal_time": self._last_signal_time,
**self._state,
}
def record_signal(self, signal: Signal) -> None:
"""记录信号发送时间(用于防抖)"""
self._last_signal_time = signal.trigger_time
这样设计的核心好处是:策略变成了一个纯函数式的输入输出模块。on_market_data 接收一个 MarketMessage,输出一个 Signal 或 None。它的内部状态可以通过 get_state() 随时被外部监控抓取。这为后续的防抖处理、信号审计和策略调优提供了基础设施。
5.2 信号防抖:解决抖动问题的核心机制
回到文章开头提到的均线突破抖动问题。价格快速波动时,同一方向的信号可能在毫秒级别内被重复触发,导致策略在不该开仓的时候开仓、平仓、再开仓。
信号防抖(Debounce)是解决这个问题的主流方案。类比电子电路中的防抖概念:信号变化后,需要等待一个“稳定窗口”,只有在这个窗口内信号持续满足条件,才认为信号是真实有效的。
5.2.1 时间窗口防抖
最简单的防抖策略是基于时间的:在同一个方向上,两次信号之间的最小间隔必须大于某个阈值。
@dataclass
class DebounceConfig:
"""防抖配置"""
min_interval_sec: float = 0.5 # 同向信号最小间隔
cooldown_after_opposite: float = 0.0 # 反向信号后立即可发信号(默认关闭)
warmup_count: int = 0 # 信号前需要满足条件的最小次数
class SignalDebouncer:
"""
信号防抖器
支持三种模式:
- 时间窗口防抖:基于时间间隔
- 计数防抖:基于条件满足次数
- 复合防抖:时间 + 计数同时满足
"""
def __init__(self, config: DebounceConfig):
self.config = config
# 记录每个方向最后一次信号的发送时间
self._last_signal: dict[str, float] = {}
# 记录连续满足条件的计数
self._consecutive_count: dict[str, int] = {}
# 记录当前有效信号方向
self._active_direction: str | None = None
def should_emit(self, direction: str, current_time: float) -> bool:
"""
判断给定方向的信号是否应该发出
Args:
direction: 信号方向 ("long" | "short" | "exit")
current_time: 当前时间戳
Returns:
True: 可以发出信号
False: 触发防抖,信号被抑制
"""
last_time = self._last_signal.get(direction, 0)
interval = current_time - last_time
# 时间窗口检查
if interval < self.config.min_interval_sec:
return False
# 计数窗口检查(如果配置了 warmup_count)
if self.config.warmup_count > 0:
current_count = self._consecutive_count.get(direction, 0)
if current_count < self.config.warmup_count:
return False
return True
def record(self, direction: str, signal_time: float) -> None:
"""记录信号已发出"""
self._last_signal[direction] = signal_time
self._consecutive_count[direction] = 0
self._active_direction = direction
def tick(self, direction: str, condition_met: bool) -> None:
"""
每收到一个数据点时调用,用于更新计数窗口
Args:
direction: 当前数据的方向
condition_met: 是否满足触发条件
"""
if not condition_met:
# 条件不满足时重置计数
self._consecutive_count[direction] = 0
return
self._consecutive_count[direction] = \
self._consecutive_count.get(direction, 0) + 1
def get_cooldown_remaining(self, direction: str, current_time: float) -> float:
"""获取指定方向的剩余冷却时间(秒)"""
last_time = self._last_signal.get(direction, 0)
remaining = self.config.min_interval_sec - (current_time - last_time)
return max(0.0, remaining)
5.2.2 一个完整的均线突破策略示例
将上述组件串联起来,下面是一个使用 TickDB depth 数据和均线突破信号的完整策略框架:
import os
import time
from collections import deque
class MovingAverageBreakStrategy(BaseStrategy):
"""
均线突破策略 + 防抖处理
信号逻辑:
- 当 depth 频道的买一价突破 N 周期均线时,发送做多信号
- 当买一价跌破均线时,发送平多信号
- 防抖配置:同向信号间隔至少 2 秒,连续满足条件 3 次才触发
"""
def __init__(
self,
symbol: str,
lookback_period: int = 20,
debounce_interval: float = 2.0,
debounce_warmup: int = 3,
):
super().__init__(symbol)
self.lookback_period = lookback_period
self._price_window: deque[float] = deque(maxlen=lookback_period + 10)
self._current_ma: float = 0.0
debounce_config = DebounceConfig(
min_interval_sec=debounce_interval,
warmup_count=debounce_warmup,
)
self._debouncer = SignalDebouncer(debounce_config)
self._last_direction: str | None = None
def on_market_data(self, msg: MarketMessage) -> Signal | None:
"""处理 depth 频道数据,检测均线突破"""
if msg.channel != "depth":
return None
# 从 depth 数据中提取最佳买价
data = msg.data
bid_price = float(data.get("bid1", 0))
if bid_price <= 0:
return None
# 更新价格窗口
self._price_window.append(bid_price)
if len(self._price_window) < self.lookback_period:
self.update_state("status", "warming_up")
return None
# 计算均线
prices = list(self._price_window)[-self.lookback_period:]
self._current_ma = sum(prices) / len(prices)
self.update_state("current_ma", round(self._current_ma, 4))
self.update_state("current_price", bid_price)
self.update_state("status", "active")
# 计算突破方向
prev_prices = list(self._price_window)[-self.lookback_period - 1:-1]
prev_ma = sum(prev_prices) / len(prev_prices)
signal_direction: str | None = None
if bid_price > self._current_ma and prev_prices[-1] <= prev_ma:
signal_direction = "long"
elif bid_price < self._current_ma and prev_prices[-1] >= prev_ma:
signal_direction = "exit"
if signal_direction is None:
# 无论是否有信号,都更新计数窗口
self._debouncer.tick("long", bid_price > self._current_ma)
return None
# 防抖检查
if not self._debouncer.should_emit(signal_direction, msg.timestamp):
self.update_state("last_action", "debounced")
return None
# 构建信号
signal = Signal(
symbol=self.symbol,
direction=signal_direction,
strength=min(abs(bid_price - self._current_ma) / self._current_ma * 100, 1.0),
trigger_price=bid_price,
trigger_time=msg.timestamp,
metadata={
"ma": self._current_ma,
"break_type": "above" if signal_direction == "long" else "below",
},
)
# 记录信号,更新防抖状态
self._debouncer.record(signal_direction, msg.timestamp)
self._last_direction = signal_direction
self.update_state("last_action", f"signal_{signal_direction}")
self.update_state("last_signal_time", msg.timestamp)
return signal
防抖策略的一个关键细节:为什么需要 warmup_count?
单纯的基于时间的防抖存在一个问题——在趋势初期,价格可能快速反复穿越均线(市场噪音)。warmup_count = 3 意味着信号不仅要在时间上间隔 2 秒,还必须连续 3 次都满足突破条件。这样可以过滤掉单次虚假突破。
当然,warmup_count 也不是越高越好。如果设置过高,策略的响应速度会显著下降,在快速趋势行情中可能错过大半涨幅。实践中建议根据策略的时间周期和标的的波动特征来调参:高频策略用低 warmup_count + 低 min_interval,趋势策略可以适当提高。
5.3 策略引擎的完整运行循环
最后,把上述所有组件串联成一个可执行的运行循环:
import os
def build_engine(symbol: str):
"""构建完整的实盘引擎"""
# 1. 初始化消息队列(容量 1000,超出时丢弃最旧消息)
data_queue = MarketDataQueue(maxsize=1000, overflow=OverflowStrategy.DROP_OLDEST)
# 2. 初始化策略
strategy = MovingAverageBreakStrategy(
symbol=symbol,
lookback_period=20,
debounce_interval=2.0,
debounce_warmup=3,
)
# 3. 消费循环:独立线程,持续从队列取数据喂给策略
def consume_loop():
while True:
msg = data_queue.get(timeout=0.5)
if msg is None:
continue
try:
signal = strategy.on_market_data(msg)
if signal:
print(f"[Signal] {signal.direction.upper()} {signal.symbol} "
f"@ {signal.trigger_price:.4f} "
f"(strength={signal.strength:.3f}, "
f"MA={signal.metadata['ma']:.4f})")
# TODO: 这里接入风控层和下单层
except Exception as e:
# ⚠️ 关键:异常不能向上传播,否则会中断回调链
print(f"[Strategy] Error processing {symbol}: {e}")
consumer_thread = threading.Thread(target=consume_loop, daemon=True)
consumer_thread.start()
# 4. 初始化 WebSocket 客户端
def on_message(msg: dict):
"""将收到的消息转换为标准化格式并写入队列"""
market_msg = MarketMessage(
symbol=msg.get("symbol", symbol),
channel=msg.get("channel", ""),
timestamp=msg.get("ts", time.time()) / 1000,
sequence=msg.get("seq", 0),
data=msg.get("data", {}),
)
data_queue.put(market_msg)
ws_config = WebSocketConfig(
url="wss://api.tickdb.ai/ws/v1/market",
api_key=os.environ.get("TICKDB_API_KEY", ""),
ping_interval=20,
max_retries=10,
on_message=on_message,
)
client = TickDBWebSocketClient(ws_config)
# 5. 订阅 depth 频道
client.connect(subscriptions=[
{"channel": "depth", "symbol": symbol},
])
return {
"client": client,
"strategy": strategy,
"queue": data_queue,
}
if __name__ == "__main__":
engine = build_engine("AAPL.US")
try:
while True:
time.sleep(30)
stats = engine["queue"].get_stats()
state = engine["strategy"].get_state()
print(f"[Monitor] Queue: {stats}, Strategy: {state}")
except KeyboardInterrupt:
engine["client"].disconnect()
print("[Engine] Shutdown complete.")
三个值得强调的工程实践:
第一,异常隔离。 在 consume_loop 中,策略的 on_market_data 调用被 try/except 完全包裹。这意味着任何策略逻辑错误都不会导致消息队列消费线程崩溃。实盘中这个设计至关重要——一个未捕获的 KeyError 可能导致整个进程退出,而此时你的数据链路已经悄然中断,你却毫不知情。
第二,监控与状态暴露。 每 30 秒打印一次队列统计和策略状态。这不是多余的日志,而是生产环境的必要可观测性基础设施。在实际部署中,建议将这些指标发送到 Prometheus 或飞书告警机器人,当队列丢弃率突增或策略状态异常时第一时间收到通知。
第三,优雅关闭。 KeyboardInterrupt 触发时,先断开 WebSocket 连接,再退出主进程。这个顺序不能颠倒——直接退出进程会导致 TCP 连接被强制关闭,可能触发服务端将你的连接标记为异常断开,影响后续重连的速率限制。
六、整个链路的延迟拆解
理解整个架构后,有必要量化每个环节的延迟,这样才能知道瓶颈在哪里。
| 环节 | 典型延迟 | 可优化空间 |
|---|---|---|
| TickDB 服务端 → 网络传输 | 20-80ms(取决于地域) | 使用 TickDB 同区域节点 |
| WebSocket 接收 + JSON 解析 | 0.5-2ms | C 扩展库(基本无法优化) |
| 消息写入队列 | <0.1ms | 无 |
| 消费线程从队列取出 | <0.1ms(队列非空时) | 无 |
策略 on_market_data 计算 |
0.1-5ms(取决于策略复杂度) | 热点代码用 Cython/Numba |
| 信号防抖检查 | <0.1ms | 无 |
| 端到端总延迟(正常负载) | 30-100ms |
瓶颈分析:对于大多数策略来说,网络延迟是主要瓶颈,代码层面的优化(除非策略计算极其复杂)收益有限。但有一个例外——如果你的策略需要在单核 CPU 上处理多个标的的订阅,on_market_data 的计算时间会成为瓶颈。此时可以考虑两个方向:将策略计算搬到独立进程池,或者将热点计算用 Cython 加速。
七、部署方案速查
| 场景 | 推荐配置 | 说明 |
|---|---|---|
| 个人开发者 / 策略验证 | 单进程 + 内存队列 | 最简部署,快速验证策略逻辑 |
| 多策略并行 | 主进程 + Redis Stream + 多消费者 | 数据源统一接入,策略进程独立运行 |
| 机构级低延迟 | C++ 重写核心路径 + 内存队列 | 跳过 Python GIL 限制,追求极致低延迟 |
| 需要灾备 | 主备双链路 + 健康检查 | 任何一路断开时自动切换 |
八、结语:架构是杠杆
回到开篇的问题:数据到了之后,怎么触发策略计算?如何保证毫秒级响应?
这篇文章的核心结论不是某一段具体代码,而是三层分离的架构思想。接入层解决稳定性问题,队列层解决流量问题,策略层解决业务问题。三者各司其职,整体的可靠性和可维护性才能达到生产级别。
架构是一种杠杆。好的架构让小团队也能维护复杂的实盘系统,坏的架构让大团队在稳定性和扩展性上反复踩坑。从 TickDB 拿到数据只是第一步——你用什么样的架构去处理这些数据,才真正决定了策略能不能在实盘中活下去。
下一步行动
如果你想亲手搭建这套架构:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台生成 API Key
- 设置环境变量
TICKDB_API_KEY,复制本文代码即可运行
如果你想了解更多 TickDB 实时数据频道的使用方式:
- depth 频道:订单簿深度数据,用于计算买卖压力比
- trade 频道:逐笔成交推送,用于订单流分析(港股和数字货币)
如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,可直接用自然语言查询 TickDB 的实时和历史行情数据。
如果你希望获得机构级的数据完整性和 SLA 保障,联系 [email protected] 了解专业版/企业版方案。
本文不构成任何投资建议。市场有风险,投资需谨慎。