毫秒级响应:实时行情到策略信号的全链路架构设计

数据来了,然后呢?

这是大多数量化系统最容易被忽视的一环。

你能拿到实时行情——盘口变化、成交推送、K线合成——这些都不难。但数据到了之后,如何让策略在毫秒级别做出反应、如何避免信号抖动导致的频繁开平仓、如何在整个链路中保持稳定性和可观测性,才是真正区分“回测能跑”和“实盘能用”的分水岭。

本文不聊策略本身。我们只讨论一件事:从 TickDB 推送过来的原始行情,到你的策略引擎发出交易信号,这中间的系统架构该怎么搭。


一、为什么实时数据接入不是一个“接 API”那么简单

很多人把行情接入理解为“调一个接口,拿到数据,触发计算”。这个模型在回测环境中足够用,但在实盘中会暴露三个致命问题:

第一,推送数据的流量是不均匀的。

热门标的在财报发布前后,订单簿更新频率可能从每秒几十次飙升到每秒上千次。如果你的策略触发计算没有任何缓冲机制,高频数据会直接打爆 CPU。更糟糕的是,在波动最剧烈的时刻,你的策略反而因为计算排队而错失最佳执行窗口。

第二,策略回调的时序是不可控的。

Python 的 GIL 意味着多线程回调存在竞争条件,同一时刻多个数据到达可能导致回调乱序执行。如果策略内部维护了状态(比如持仓记录、窗口均值),乱序回调会让状态计算结果完全错误。

第三,信号抖动是真实存在的工程问题。

以一个简单的均线突破策略为例:价格在上穿均线的瞬间,行情数据可能在 10ms 内连续推送 5 次更新,其中 3 次满足突破条件、2 次不满足。如果不加处理,策略可能在同一分钟内发出 3 次相互矛盾的信号。

这三个问题的根源在于:数据推送是异步的、高频的、不可靠的,而策略计算期望是同步的、低频的、确定的。 中间必须有一层架构来完成这个转化。


二、整体架构:三层分离原则

解决上述问题的核心思路是架构分层。我不推荐把数据接收、消息队列、策略计算全部写在同一个进程里——短期看代码量少,长期看扩展性和稳定性都是灾难。

一个生产可用的实盘架构通常分为三层:

┌─────────────────────────────────────────────────────────┐
│  Layer 1: 数据接入层                                      │
│  TickDB WebSocket ──→ 数据接收 + 心跳保活 + 重连         │
└──────────────────────────┬──────────────────────────────┘
                           │ 标准化消息
                           ▼
┌─────────────────────────────────────────────────────────┐
│  Layer 2: 消息队列层                                      │
│  内存队列 / Redis Stream / ZeroMQ ──→ 缓冲 + 限频 + 分发 │
└──────────────────────────┬──────────────────────────────┘
                           │ 消费事件
                           ▼
┌─────────────────────────────────────────────────────────┐
│  Layer 3: 策略引擎层                                      │
│  信号生成 ──→ 防抖处理 ──→ 风控检查 ──→ 信号输出         │
└─────────────────────────────────────────────────────────┘

分层的好处是每层的关注点完全隔离:接入层负责稳定性和重连,队列层负责缓冲和限频,策略层负责业务逻辑。当 TickDB 服务端出现短暂抖动时,队列层的缓冲可以兜底;当策略计算量激增时,队列层的积压可以被监控和告警。

接下来我们逐层展开,重点看每一层的工程实现细节。


三、Layer 1:数据接入——WebSocket 稳定连接实战

3.1 为什么要用 WebSocket 而非 REST 轮询

在实盘场景下,REST 轮询有三个根本性缺陷:轮询间隔固定,无法捕捉两次轮询之间的瞬时变化;每次请求都有网络往返开销,在低延迟策略中这是不可接受的;服务端无法主动推送,大量无效请求浪费带宽和配额。

WebSocket 建立一次连接后,服务端可以主动推送数据,延迟可以控制在 100ms 以内(取决于网络质量)。对于需要捕捉订单簿深度变化或成交密度变化的策略,这是必须的。

3.2 生产级 WebSocket 客户端

以下是一个可以直接在生产环境中使用的 WebSocket 接入模块。它包含了三个核心工程能力:心跳保活、指数退避重连、以及限频自适应处理。

import os
import json
import time
import random
import threading
import websocket
from typing import Callable, Optional
from dataclasses import dataclass, field


@dataclass
class WebSocketConfig:
    """WebSocket 连接配置"""
    url: str
    api_key: str
    ping_interval: int = 20  # 心跳间隔(秒)
    ping_timeout: int = 10  # 心跳超时(秒)
    connect_timeout: int = 5  # 连接超时(秒)
    max_retries: int = 10
    base_retry_delay: float = 1.0
    max_retry_delay: float = 60.0
    on_message: Optional[Callable[[dict], None]] = None
    on_connect: Optional[Callable[[], None]] = None
    on_disconnect: Optional[Callable[[Exception | None], None]] = None


class TickDBWebSocketClient:
    """
    TickDB WebSocket 客户端(生产级)
    包含:心跳保活、指数退避重连、限频自适应处理
    ⚠️ 高频场景建议使用 aiohttp/asyncio 重构
    """

    def __init__(self, config: WebSocketConfig):
        self.config = config
        self._ws: Optional[websocket.WebSocketApp] = None
        self._thread: Optional[threading.Thread] = None
        self._running = False
        self._retry_count = 0
        self._last_pong_time: float = 0
        self._rate_limit_until: float = 0  # 限频等待截止时间

    def connect(self, subscriptions: list[dict]) -> None:
        """
        启动 WebSocket 连接并订阅指定频道
        
        Args:
            subscriptions: 订阅配置列表
            示例: [{"channel": "depth", "symbol": "AAPL.US"}, 
                   {"channel": "trade", "symbol": "AAPL.US"}]
        """
        self._running = True
        self._retry_count = 0
        self._thread = threading.Thread(target=self._run_loop, args=(subscriptions,))
        self._thread.daemon = True
        self._thread.start()

    def _run_loop(self, subscriptions: list[dict]) -> None:
        while self._running and self._retry_count <= self.config.max_retries:
            try:
                # WebSocket 鉴权通过 URL 参数传递
                ws_url = f"{self.config.url}?api_key={self.config.api_key}"
                headers = {
                    "Content-Type": "application/json",
                }
                self._ws = websocket.WebSocketApp(
                    ws_url,
                    header=headers,
                    on_message=self._on_message,
                    on_error=self._on_error,
                    on_close=self._on_close,
                    on_pong=self._on_pong,
                )

                # 设置连接超时
                self._ws.timeout = self.config.connect_timeout

                # 启动心跳线程
                ping_thread = threading.Thread(target=self._ping_loop, daemon=True)
                ping_thread.start()

                # 建立连接并发送订阅请求
                self._ws.run_forever(
                    ping_interval=self.config.ping_interval,
                    ping_timeout=self.config.ping_timeout,
                )

            except Exception as e:
                self.config.on_disconnect and self.config.on_disconnect(e)
                self._schedule_reconnect(subscriptions)

    def _ping_loop(self) -> None:
        """心跳保活:定期发送 ping 并检测响应"""
        while self._running:
            time.sleep(self.config.ping_interval)
            if self._ws and self._ws.sock and self._ws.sock.connected:
                try:
                    self._ws.sock.ping()
                    self._last_pong_time = time.time()
                except Exception:
                    pass

    def _on_pong(self, ws, payload=b'') -> None:
        """检测心跳响应,超时则判定连接异常"""
        self._last_pong_time = time.time()

    def _on_message(self, ws, raw_message: str) -> None:
        """消息处理:区分心跳响应、限频通知和业务数据"""
        try:
            msg = json.loads(raw_message)

            # 处理限频响应(code: 3001)
            if msg.get("code") == 3001:
                retry_after = int(msg.get("retry_after", 5))
                self._rate_limit_until = time.time() + retry_after
                print(f"[TickDB] Rate limited. Waiting {retry_after}s before retry.")
                time.sleep(retry_after)
                return

            # 业务数据传递给回调
            if self.config.on_message:
                self.config.on_message(msg)

        except json.JSONDecodeError:
            # 非 JSON 消息可能是 pong 确认帧,直接忽略
            pass

    def _on_error(self, ws, error) -> None:
        print(f"[TickDB] WebSocket error: {error}")

    def _on_close(self, ws, close_code, close_msg) -> None:
        print(f"[TickDB] Connection closed: {close_code} {close_msg}")
        self.config.on_disconnect and self.config.on_disconnect(None)

    def _schedule_reconnect(self, subscriptions: list[dict]) -> None:
        """指数退避重连:避免频繁重试加重服务端压力"""
        self._retry_count += 1
        if not self._running:
            return

        # 指数退避 + 抖动(防止惊群效应)
        delay = min(
            self.config.base_retry_delay * (2 ** (self._retry_count - 1)),
            self.config.max_retry_delay,
        )
        jitter = random.uniform(0, delay * 0.1)
        total_delay = delay + jitter

        print(f"[TickDB] Reconnecting in {total_delay:.1f}s (attempt {self._retry_count})")
        time.sleep(total_delay)

    def subscribe(self, subscriptions: list[dict]) -> None:
        """运行时动态订阅新标的"""
        if self._ws and self._ws.sock and self._ws.sock.connected:
            msg = json.dumps({"cmd": "subscribe", "data": subscriptions})
            self._ws.send(msg)

    def disconnect(self) -> None:
        self._running = False
        if self._ws:
            self._ws.close()
        if self._thread:
            self._thread.join(timeout=2)

工程要点解释:

心跳保活机制不是“发个 ping 就算了”。TickDB 服务端可能因为负载过高而无法及时响应 pong,如果连续两次心跳超时,应该判定连接已失效并主动重连。在上述代码中,_on_pong 记录了最后响应时间,_ping_loop 可以扩展为检测 time.time() - self._last_pong_time > 2 * ping_interval 时触发重连。

限频自适应处理code: 3001)是一个常被忽略但极其重要的能力。当 TickDB 返回限频错误时,盲目重试会加剧服务端压力,正确的做法是读取 retry_after 字段,等待指定时间后再重试。上述代码在 _on_message 中处理了这个逻辑,并在重连时保持这个等待时间。

环境变量存储 API Key 的原则必须贯彻:永远不要把密钥硬编码在代码中。使用 os.environ.get("TICKDB_API_KEY") 从环境变量读取,在生产环境中通过 Kubernetes Secret 或环境注入的方式配置。


四、Layer 2:消息队列——缓冲与限频的艺术

4.1 为什么需要消息队列

Layer 1 解决了“数据怎么进来”的问题,Layer 2 要解决的是“数据怎么处理”。这里的核心矛盾是:生产端(TickDB)的发送频率是不可控的,但消费端(策略引擎)的处理能力是有限的。

消息队列在这里扮演两个角色:缓冲器稳压器

作为缓冲器,队列可以吸收短期内的数据洪峰。假设某个标的在 1 秒内收到了 200 条 depth 更新,但策略只需要每 100ms 计算一次——队列可以把这 200 条数据合并/抽样后再交给策略,避免无意义的重复计算。

作为稳压器,队列可以屏蔽上游的抖动。即使 TickDB 服务端因为维护出现 30 秒的连接中断,队列中已有的消息仍然可以被策略正常消费,直到积压被耗尽。

4.2 队列选型:内存队列 vs Redis Stream vs ZeroMQ

方案 适用场景 优点 缺点
Python queue.Queue 单进程、低延迟、策略数量少 零依赖、极低延迟 进程崩溃数据丢失、无法跨进程共享
Redis Stream 多进程、需要持久化、需要消费组 支持持久化、消费组重平衡 引入额外依赖、延迟略高
ZeroMQ 超低延迟、进程间通信 延迟极低、模式丰富 配置复杂、运维成本高

大多数个人量化开发者,从 queue.Queue 开始是合理的。 只有当你需要多策略进程共享同一个数据源时,才值得引入 Redis Stream。

4.3 基于 queue.Queue 的内存队列实现

以下是一个带容量限制和丢弃策略的内存消息队列:

import queue
import time
import threading
from dataclasses import dataclass
from enum import Enum
from typing import Optional


class OverflowStrategy(Enum):
    """队列满时的溢出策略"""
    DROP_OLDEST = "drop_oldest"      # 丢弃最旧的消息(默认)
    DROP_LATEST = "drop_latest"      # 丢弃最新消息
    BLOCK = "block"                  # 阻塞等待(不推荐,可能导致数据滞后)


@dataclass
class MarketMessage:
    """标准化市场消息格式"""
    symbol: str
    channel: str          # "depth", "trade", "kline"
    timestamp: float      # 事件时间戳(Unix 秒)
    sequence: int          # 序列号,用于检测乱序
    data: dict             # 原始数据 payload

    def age_ms(self) -> float:
        """计算消息从产生到当前的延迟(毫秒)"""
        return (time.time() - self.timestamp) * 1000


class MarketDataQueue:
    """
    行情消息队列
    
    支持:
    - 定长环形缓冲,防止内存泄漏
    - 溢出策略配置
    - 生产者/消费者限速同步
    """

    def __init__(
        self,
        maxsize: int = 1000,
        overflow: OverflowStrategy = OverflowStrategy.DROP_OLDEST,
    ):
        self._queue: queue.Queue[Optional[MarketMessage]] = queue.Queue(maxsize=maxsize)
        self._overflow = overflow
        self._dropped_count = 0  # 丢弃计数,用于监控
        self._lock = threading.Lock()

    def put(self, msg: MarketMessage, block: bool = True) -> bool:
        """
        写入消息到队列
        
        Returns:
            True: 写入成功
            False: 因溢出策略被丢弃
        """
        try:
            if self._overflow == OverflowStrategy.DROP_OLDEST:
                # 非阻塞写入,队列满时丢弃最旧消息
                # 使用 get_nowait 模拟环形缓冲行为
                try:
                    self._queue.put_nowait(None)  # None 作为丢弃标记
                    self._dropped_count += 1
                except queue.Full:
                    # 队列已满,丢弃当前消息
                    self._dropped_count += 1
                    return False
            elif self._overflow == OverflowStrategy.DROP_LATEST:
                # 丢弃最新消息,保留旧消息
                try:
                    self._queue.put_nowait(msg)
                except queue.Full:
                    try:
                        self._queue.get_nowait()  # 丢弃最旧消息
                        self._queue.put_nowait(msg)
                        self._dropped_count += 1
                    except queue.Empty:
                        pass
            else:
                self._queue.put(msg, block=block)

            return True

        except queue.Full:
            with self._lock:
                self._dropped_count += 1
            return False

    def get(self, timeout: float = 1.0) -> Optional[MarketMessage]:
        """从队列中取出消息,超时返回 None"""
        try:
            return self._queue.get(timeout=timeout)
        except queue.Empty:
            return None

    def get_stats(self) -> dict:
        """获取队列统计信息(用于监控告警)"""
        return {
            "size": self._queue.qsize(),
            "dropped": self._dropped_count,
            "is_near_full": self._queue.qsize() > self._queue.maxsize * 0.8,
        }

这里有一个容易被忽视的设计细节:溢出策略的选择。

DROP_OLDEST 适合“实时行情优先”场景——宁可丢弃旧数据,也要保证策略处理的是最新行情。但代价是,如果你的策略依赖完整的成交记录(用于订单流分析),丢弃消息会导致信号失真。

DROP_LATEST 适合“数据完整性优先”场景——保留所有历史数据,但代价是队列可能积压,在极端波动时延迟会逐渐增大。

如何选择:一个实用的经验法则是——如果你的策略是基于 snapshot(快照类指标,如均线、布林带)工作,用 DROP_OLDEST;如果你的策略是基于 tick 级别累计量(成交量加权价格、订单流不平衡)工作,用 DROP_LATEST,并配合告警机制监控积压深度。


五、Layer 3:策略引擎——信号生成与防抖处理

5.1 策略回调的设计原则

策略回调是整个链路中唯一一个与业务逻辑直接相关的环节。它的设计质量直接影响信号的质量。

一个常见的错误写法是这样的:

# ❌ 错误示例:把所有逻辑写在回调里
def on_depth_update(msg):
    price = msg['data']['bid1']
    volume = msg['data']['bid1_vol']
    if price > ma(close_prices, 20):
        send_order(...)

这个写法的三个问题:

  1. 无法单元测试——业务逻辑和消息接收耦合在一起
  2. 状态不可见——无法在外部监控策略当前的窗口状态
  3. 错误处理粗糙——任何异常都会导致回调崩溃,进而丢失连接

正确的做法是将策略引擎设计为一个独立组件,通过接口与数据层解耦

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from collections import deque
import time


@dataclass
class Signal:
    """标准化信号格式"""
    symbol: str
    direction: str          # "long" | "short" | "exit"
    strength: float          # 0.0 ~ 1.0,信号强度
    trigger_price: float
    trigger_time: float
    metadata: dict = field(default_factory=dict)


class BaseStrategy(ABC):
    """
    策略基类
    
    设计原则:
    1. 策略状态(窗口、缓存)对外可见
    2. 信号输出通过抽象方法,不依赖具体执行层
    3. 所有异常在框架层捕获,不向上传播
    """

    def __init__(self, symbol: str):
        self.symbol = symbol
        self._state: dict = {}  # 策略内部状态,可被外部查询
        self._last_signal_time: float = 0

    @abstractmethod
    def on_market_data(self, msg: MarketMessage) -> Signal | None:
        """
        处理市场数据,返回交易信号(无信号时返回 None)
        
        ⚠️ 此方法不得抛出异常,所有异常在框架层处理
        """
        pass

    def update_state(self, key: str, value) -> None:
        """更新策略状态(供监控和调试使用)"""
        self._state[key] = value

    def get_state(self) -> dict:
        """获取当前策略状态"""
        return {
            "symbol": self.symbol,
            "last_signal_time": self._last_signal_time,
            **self._state,
        }

    def record_signal(self, signal: Signal) -> None:
        """记录信号发送时间(用于防抖)"""
        self._last_signal_time = signal.trigger_time

这样设计的核心好处是:策略变成了一个纯函数式的输入输出模块on_market_data 接收一个 MarketMessage,输出一个 SignalNone。它的内部状态可以通过 get_state() 随时被外部监控抓取。这为后续的防抖处理、信号审计和策略调优提供了基础设施。

5.2 信号防抖:解决抖动问题的核心机制

回到文章开头提到的均线突破抖动问题。价格快速波动时,同一方向的信号可能在毫秒级别内被重复触发,导致策略在不该开仓的时候开仓、平仓、再开仓。

信号防抖(Debounce)是解决这个问题的主流方案。类比电子电路中的防抖概念:信号变化后,需要等待一个“稳定窗口”,只有在这个窗口内信号持续满足条件,才认为信号是真实有效的。

5.2.1 时间窗口防抖

最简单的防抖策略是基于时间的:在同一个方向上,两次信号之间的最小间隔必须大于某个阈值。

@dataclass
class DebounceConfig:
    """防抖配置"""
    min_interval_sec: float = 0.5      # 同向信号最小间隔
    cooldown_after_opposite: float = 0.0  # 反向信号后立即可发信号(默认关闭)
    warmup_count: int = 0              # 信号前需要满足条件的最小次数


class SignalDebouncer:
    """
    信号防抖器
    
    支持三种模式:
    - 时间窗口防抖:基于时间间隔
    - 计数防抖:基于条件满足次数
    - 复合防抖:时间 + 计数同时满足
    """

    def __init__(self, config: DebounceConfig):
        self.config = config
        # 记录每个方向最后一次信号的发送时间
        self._last_signal: dict[str, float] = {}
        # 记录连续满足条件的计数
        self._consecutive_count: dict[str, int] = {}
        # 记录当前有效信号方向
        self._active_direction: str | None = None

    def should_emit(self, direction: str, current_time: float) -> bool:
        """
        判断给定方向的信号是否应该发出
        
        Args:
            direction: 信号方向 ("long" | "short" | "exit")
            current_time: 当前时间戳
            
        Returns:
            True: 可以发出信号
            False: 触发防抖,信号被抑制
        """
        last_time = self._last_signal.get(direction, 0)
        interval = current_time - last_time

        # 时间窗口检查
        if interval < self.config.min_interval_sec:
            return False

        # 计数窗口检查(如果配置了 warmup_count)
        if self.config.warmup_count > 0:
            current_count = self._consecutive_count.get(direction, 0)
            if current_count < self.config.warmup_count:
                return False

        return True

    def record(self, direction: str, signal_time: float) -> None:
        """记录信号已发出"""
        self._last_signal[direction] = signal_time
        self._consecutive_count[direction] = 0
        self._active_direction = direction

    def tick(self, direction: str, condition_met: bool) -> None:
        """
        每收到一个数据点时调用,用于更新计数窗口
        
        Args:
            direction: 当前数据的方向
            condition_met: 是否满足触发条件
        """
        if not condition_met:
            # 条件不满足时重置计数
            self._consecutive_count[direction] = 0
            return

        self._consecutive_count[direction] = \
            self._consecutive_count.get(direction, 0) + 1

    def get_cooldown_remaining(self, direction: str, current_time: float) -> float:
        """获取指定方向的剩余冷却时间(秒)"""
        last_time = self._last_signal.get(direction, 0)
        remaining = self.config.min_interval_sec - (current_time - last_time)
        return max(0.0, remaining)

5.2.2 一个完整的均线突破策略示例

将上述组件串联起来,下面是一个使用 TickDB depth 数据和均线突破信号的完整策略框架:

import os
import time
from collections import deque


class MovingAverageBreakStrategy(BaseStrategy):
    """
    均线突破策略 + 防抖处理
    
    信号逻辑:
    - 当 depth 频道的买一价突破 N 周期均线时,发送做多信号
    - 当买一价跌破均线时,发送平多信号
    - 防抖配置:同向信号间隔至少 2 秒,连续满足条件 3 次才触发
    """

    def __init__(
        self,
        symbol: str,
        lookback_period: int = 20,
        debounce_interval: float = 2.0,
        debounce_warmup: int = 3,
    ):
        super().__init__(symbol)
        self.lookback_period = lookback_period
        self._price_window: deque[float] = deque(maxlen=lookback_period + 10)
        self._current_ma: float = 0.0

        debounce_config = DebounceConfig(
            min_interval_sec=debounce_interval,
            warmup_count=debounce_warmup,
        )
        self._debouncer = SignalDebouncer(debounce_config)
        self._last_direction: str | None = None

    def on_market_data(self, msg: MarketMessage) -> Signal | None:
        """处理 depth 频道数据,检测均线突破"""
        if msg.channel != "depth":
            return None

        # 从 depth 数据中提取最佳买价
        data = msg.data
        bid_price = float(data.get("bid1", 0))
        if bid_price <= 0:
            return None

        # 更新价格窗口
        self._price_window.append(bid_price)
        if len(self._price_window) < self.lookback_period:
            self.update_state("status", "warming_up")
            return None

        # 计算均线
        prices = list(self._price_window)[-self.lookback_period:]
        self._current_ma = sum(prices) / len(prices)
        self.update_state("current_ma", round(self._current_ma, 4))
        self.update_state("current_price", bid_price)
        self.update_state("status", "active")

        # 计算突破方向
        prev_prices = list(self._price_window)[-self.lookback_period - 1:-1]
        prev_ma = sum(prev_prices) / len(prev_prices)

        signal_direction: str | None = None
        if bid_price > self._current_ma and prev_prices[-1] <= prev_ma:
            signal_direction = "long"
        elif bid_price < self._current_ma and prev_prices[-1] >= prev_ma:
            signal_direction = "exit"

        if signal_direction is None:
            # 无论是否有信号,都更新计数窗口
            self._debouncer.tick("long", bid_price > self._current_ma)
            return None

        # 防抖检查
        if not self._debouncer.should_emit(signal_direction, msg.timestamp):
            self.update_state("last_action", "debounced")
            return None

        # 构建信号
        signal = Signal(
            symbol=self.symbol,
            direction=signal_direction,
            strength=min(abs(bid_price - self._current_ma) / self._current_ma * 100, 1.0),
            trigger_price=bid_price,
            trigger_time=msg.timestamp,
            metadata={
                "ma": self._current_ma,
                "break_type": "above" if signal_direction == "long" else "below",
            },
        )

        # 记录信号,更新防抖状态
        self._debouncer.record(signal_direction, msg.timestamp)
        self._last_direction = signal_direction
        self.update_state("last_action", f"signal_{signal_direction}")
        self.update_state("last_signal_time", msg.timestamp)

        return signal

防抖策略的一个关键细节:为什么需要 warmup_count

单纯的基于时间的防抖存在一个问题——在趋势初期,价格可能快速反复穿越均线(市场噪音)。warmup_count = 3 意味着信号不仅要在时间上间隔 2 秒,还必须连续 3 次都满足突破条件。这样可以过滤掉单次虚假突破。

当然,warmup_count 也不是越高越好。如果设置过高,策略的响应速度会显著下降,在快速趋势行情中可能错过大半涨幅。实践中建议根据策略的时间周期和标的的波动特征来调参:高频策略用低 warmup_count + 低 min_interval,趋势策略可以适当提高。

5.3 策略引擎的完整运行循环

最后,把上述所有组件串联成一个可执行的运行循环:

import os


def build_engine(symbol: str):
    """构建完整的实盘引擎"""
    # 1. 初始化消息队列(容量 1000,超出时丢弃最旧消息)
    data_queue = MarketDataQueue(maxsize=1000, overflow=OverflowStrategy.DROP_OLDEST)

    # 2. 初始化策略
    strategy = MovingAverageBreakStrategy(
        symbol=symbol,
        lookback_period=20,
        debounce_interval=2.0,
        debounce_warmup=3,
    )

    # 3. 消费循环:独立线程,持续从队列取数据喂给策略
    def consume_loop():
        while True:
            msg = data_queue.get(timeout=0.5)
            if msg is None:
                continue

            try:
                signal = strategy.on_market_data(msg)
                if signal:
                    print(f"[Signal] {signal.direction.upper()} {signal.symbol} "
                          f"@ {signal.trigger_price:.4f} "
                          f"(strength={signal.strength:.3f}, "
                          f"MA={signal.metadata['ma']:.4f})")
                    # TODO: 这里接入风控层和下单层

            except Exception as e:
                # ⚠️ 关键:异常不能向上传播,否则会中断回调链
                print(f"[Strategy] Error processing {symbol}: {e}")

    consumer_thread = threading.Thread(target=consume_loop, daemon=True)
    consumer_thread.start()

    # 4. 初始化 WebSocket 客户端
    def on_message(msg: dict):
        """将收到的消息转换为标准化格式并写入队列"""
        market_msg = MarketMessage(
            symbol=msg.get("symbol", symbol),
            channel=msg.get("channel", ""),
            timestamp=msg.get("ts", time.time()) / 1000,
            sequence=msg.get("seq", 0),
            data=msg.get("data", {}),
        )
        data_queue.put(market_msg)

    ws_config = WebSocketConfig(
        url="wss://api.tickdb.ai/ws/v1/market",
        api_key=os.environ.get("TICKDB_API_KEY", ""),
        ping_interval=20,
        max_retries=10,
        on_message=on_message,
    )

    client = TickDBWebSocketClient(ws_config)

    # 5. 订阅 depth 频道
    client.connect(subscriptions=[
        {"channel": "depth", "symbol": symbol},
    ])

    return {
        "client": client,
        "strategy": strategy,
        "queue": data_queue,
    }


if __name__ == "__main__":
    engine = build_engine("AAPL.US")

    try:
        while True:
            time.sleep(30)
            stats = engine["queue"].get_stats()
            state = engine["strategy"].get_state()
            print(f"[Monitor] Queue: {stats}, Strategy: {state}")
    except KeyboardInterrupt:
        engine["client"].disconnect()
        print("[Engine] Shutdown complete.")

三个值得强调的工程实践:

第一,异常隔离。consume_loop 中,策略的 on_market_data 调用被 try/except 完全包裹。这意味着任何策略逻辑错误都不会导致消息队列消费线程崩溃。实盘中这个设计至关重要——一个未捕获的 KeyError 可能导致整个进程退出,而此时你的数据链路已经悄然中断,你却毫不知情。

第二,监控与状态暴露。 每 30 秒打印一次队列统计和策略状态。这不是多余的日志,而是生产环境的必要可观测性基础设施。在实际部署中,建议将这些指标发送到 Prometheus 或飞书告警机器人,当队列丢弃率突增或策略状态异常时第一时间收到通知。

第三,优雅关闭。 KeyboardInterrupt 触发时,先断开 WebSocket 连接,再退出主进程。这个顺序不能颠倒——直接退出进程会导致 TCP 连接被强制关闭,可能触发服务端将你的连接标记为异常断开,影响后续重连的速率限制。


六、整个链路的延迟拆解

理解整个架构后,有必要量化每个环节的延迟,这样才能知道瓶颈在哪里。

环节 典型延迟 可优化空间
TickDB 服务端 → 网络传输 20-80ms(取决于地域) 使用 TickDB 同区域节点
WebSocket 接收 + JSON 解析 0.5-2ms C 扩展库(基本无法优化)
消息写入队列 <0.1ms
消费线程从队列取出 <0.1ms(队列非空时)
策略 on_market_data 计算 0.1-5ms(取决于策略复杂度) 热点代码用 Cython/Numba
信号防抖检查 <0.1ms
端到端总延迟(正常负载) 30-100ms

瓶颈分析:对于大多数策略来说,网络延迟是主要瓶颈,代码层面的优化(除非策略计算极其复杂)收益有限。但有一个例外——如果你的策略需要在单核 CPU 上处理多个标的的订阅,on_market_data 的计算时间会成为瓶颈。此时可以考虑两个方向:将策略计算搬到独立进程池,或者将热点计算用 Cython 加速。


七、部署方案速查

场景 推荐配置 说明
个人开发者 / 策略验证 单进程 + 内存队列 最简部署,快速验证策略逻辑
多策略并行 主进程 + Redis Stream + 多消费者 数据源统一接入,策略进程独立运行
机构级低延迟 C++ 重写核心路径 + 内存队列 跳过 Python GIL 限制,追求极致低延迟
需要灾备 主备双链路 + 健康检查 任何一路断开时自动切换

八、结语:架构是杠杆

回到开篇的问题:数据到了之后,怎么触发策略计算?如何保证毫秒级响应?

这篇文章的核心结论不是某一段具体代码,而是三层分离的架构思想。接入层解决稳定性问题,队列层解决流量问题,策略层解决业务问题。三者各司其职,整体的可靠性和可维护性才能达到生产级别。

架构是一种杠杆。好的架构让小团队也能维护复杂的实盘系统,坏的架构让大团队在稳定性和扩展性上反复踩坑。从 TickDB 拿到数据只是第一步——你用什么样的架构去处理这些数据,才真正决定了策略能不能在实盘中活下去。


下一步行动

如果你想亲手搭建这套架构

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY,复制本文代码即可运行

如果你想了解更多 TickDB 实时数据频道的使用方式

  • depth 频道:订单簿深度数据,用于计算买卖压力比
  • trade 频道:逐笔成交推送,用于订单流分析(港股和数字货币)

如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,可直接用自然语言查询 TickDB 的实时和历史行情数据。

如果你希望获得机构级的数据完整性和 SLA 保障,联系 [email protected] 了解专业版/企业版方案。


本文不构成任何投资建议。市场有风险,投资需谨慎。