凌晨三点,你的策略为什么没有触发?

凌晨 3:17 分,你设置的止损信号没有成交。

不是行情没有到,也不是策略逻辑错了。问题是:行情数据穿透了你的系统,但没有任何东西去“叫醒”策略引擎。你以为你在跑一个实时系统,实际上你只是在轮询一个 API 轮询循环,而那个循环在凌晨被 GC 冻结了 800 毫秒——刚好错过了一波瞬时流动性真空。

这不是你的策略写得不好。这是架构的问题。

实时数据从交易所到你的策略,中间有一整套“信号传导链条”:数据接收 → 消息路由 → 策略回调 → 信号防抖 → 下单执行。每个环节都有它自己的延迟预算和失败模式。当你的策略被动等待数据,而不是被数据主动驱动时,毫秒级的响应就成了一句空话。

本文拆解这条链条的完整工程实现:如何用异步消息队列承接 TickDB 的 WebSocket 推送,如何设计策略回调的线程安全架构,如何在高频信号场景下做防抖过滤,以及如何在 Linux 生产环境中真正压住毫秒级延迟。


一、为什么轮询必死:实时系统的响应模型

在讨论正确做法之前,有必要说清楚为什么常见的“定时拉取”方案从架构上就无法满足实盘需求。

1.1 轮询的本质问题

最常见的错误架构是这样的:

# ❌ 典型的轮询陷阱:你在“问”数据,而不是被“通知”
while True:
    data = requests.get("https://api.tickdb.ai/v1/market/kline/latest",
                         headers={"X-API-Key": os.environ.get("TICKDB_API_KEY")},
                         timeout=(3.05, 10))
    process(data)
    time.sleep(1)  # 不管你设多短,GC、网络抖动、服务器限频都会打乱节奏

这段代码有三个根本性缺陷:

第一,响应延迟不可控。 你设了 1 秒轮询间隔,但网络 RTT 通常在 50-200ms,加上服务器处理时间和 GC 暂停,真实响应延迟在 200ms 到 2000ms 之间剧烈波动。CTA 级别的高频策略需要 10ms 以内的响应,你完全达不到。

第二,资源浪费严重。 即使 TickDB 支持,/kline/latest 每次请求都会触发一次完整的 HTTP 连接建立(TCP 三次握手 + TLS 握手),在没有新数据时同样消耗你的 API 调用配额。

第三,无状态设计无法感知市场节奏。 轮询无法判断两次请求之间发生了什么——你拿到的只是快照,没有增量信号、没有流动性变化的上下文。

1.2 事件驱动模型的核心优势

正确的模型是发布-订阅(Pub/Sub),也叫事件驱动:

交易所/数据源
    ↓ (TCP 长连接)
TickDB WebSocket 服务器
    ↓ (推送,不等待请求)
你的消息队列 (asyncio Queue / Redis Pub/Sub / Kafka)
    ↓ (异步消费,不阻塞)
策略回调函数 (被数据主动触发)
    ↓
信号计算 + 防抖
    ↓
下单执行

在这个模型里,数据主动推入你的系统,策略引擎被数据叫醒。响应延迟取决于消息队列的消费吞吐,而不是轮询间隔。

TickDB 的 WebSocket 接口提供以下核心频道,对应不同的数据粒度:

频道 数据内容 典型延迟 适用场景
kline K 线合成数据 <100ms 均线突破、均线金叉等基于 K 线的策略
depth 订单簿深度(多档位) <100ms 盘口倾斜、流动性监测、限价单挂单
ticker 最新价、成交量、24h 涨跌 <100ms 价格突变告警、成交放量检测
trade 逐笔成交(港股/数字货币) <50ms 订单流分析、大单分割检测

⚠️ 工程提醒:trades 频道不支持美股和 A 股。若你的策略需要美股逐笔数据,请使用 ticker 频道配合历史 K 线做信号计算。具体数据能力边界请参考 TickDB 官方文档。


二、TickDB WebSocket 接入:被很多人忽略的连接质量

很多人以为 WebSocket“连上就能用”,但实际生产环境中,网络不是稳定的。AWS/GCP 的跨区网络抖动平均每月 2-3 次,交易所行情源在盘前盘后切换时连接会断开,限频机制(code: 3001)随时可能触发。如果你的连接没有健壮的重连机制,凌晨的流动性事件照样会从你眼皮底下溜走。

2.1 完整的 WebSocket 连接管理器

以下是一个生产级的 TickDB WebSocket 客户端,包含了心跳保活、指数退避重连、抖动、限频自适应等所有必要组件:

import os
import json
import time
import random
import asyncio
import websockets
import threading
from collections import deque
from typing import Callable, Optional


class TickDBWebSocketClient:
    """
    TickDB WebSocket 客户端 - 生产级实现
    
    包含:心跳保活、指数退避重连、抖动、限频自适应
    ⚠️ 高频场景建议使用 asyncio 原生实现(非同步封装版本)
    """

    def __init__(
        self,
        api_key: Optional[str] = None,
        on_message: Optional[Callable] = None,
        on_connect: Optional[Callable] = None,
        on_error: Optional[Callable] = None,
    ):
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("必须提供 API Key,建议通过环境变量 TICKDB_API_KEY 设置")

        self.base_url = "wss://api.tickdb.ai/v1/ws/market"
        self.on_message = on_message
        self.on_connect = on_connect
        self.on_error = on_error

        # 重连配置
        self.base_reconnect_delay = 1.0
        self.max_reconnect_delay = 60.0
        self.reconnect_jitter_ratio = 0.1

        # 限频状态
        self.rate_limit_until: float = 0

        # 连接状态
        self._running = False
        self._ws = None
        self._thread: Optional[threading.Thread] = None
        self._lock = threading.Lock()

    def start(self):
        """启动 WebSocket 连接(在独立线程中运行)"""
        if self._running:
            return
        self._running = True
        self._thread = threading.Thread(target=self._run_loop, daemon=True)
        self._thread.start()

    def stop(self):
        """安全停止连接"""
        self._running = False
        if self._ws:
            asyncio.run(self._safe_close())

    def _calculate_reconnect_delay(self, retry: int) -> float:
        """指数退避 + 抖动,避免惊群效应"""
        delay = min(self.base_reconnect_delay * (2 ** retry), self.max_reconnect_delay)
        jitter = random.uniform(0, delay * self.reconnect_jitter_ratio)
        return delay + jitter

    async def _safe_close(self):
        """安全关闭 WebSocket 连接"""
        try:
            if self._ws and self._ws.open:
                await self._ws.close()
        except Exception:
            pass

    async def _websocket_connect(self):
        """建立 WebSocket 连接"""
        headers = []
        url = f"{self.base_url}?api_key={self.api_key}"
        self._ws = await websockets.connect(url, ping_interval=20, ping_timeout=10)
        return self._ws

    async def _send_subscribe(self, ws, channels: list[dict]):
        """订阅 TickDB 频道"""
        for channel in channels:
            subscribe_msg = {
                "cmd": "subscribe",
                "channel": channel["channel"],
                "symbols": channel["symbols"],
            }
            await ws.send(json.dumps(subscribe_msg))
            print(f"[TickDB] 已订阅频道: {channel['channel']} -> {channel['symbols']}")

    def _run_loop_sync(self):
        """同步线程入口(内部使用 asyncio 事件循环)"""
        asyncio.run(self._run_loop())

    def _run_loop(self):
        """主消息循环"""
        retry = 0

        while self._running:
            # 检查限频状态
            if time.time() < self.rate_limit_until:
                sleep_time = self.rate_limit_until - time.time()
                print(f"[TickDB] 限频等待 {sleep_time:.1f}s")
                time.sleep(sleep_time)

            try:
                ws = asyncio.run(self._websocket_connect())
                retry = 0  # 连接成功,重置重试计数

                if self.on_connect:
                    self.on_connect()

                # ⚠️ 以下为示例订阅配置,请替换为你实际的交易品种
                channels = [
                    {"channel": "ticker", "symbols": ["AAPL.US", "NVDA.US"]},
                    {"channel": "depth", "symbols": ["AAPL.US"]},
                ]
                asyncio.run(self._send_subscribe(ws, channels))

                # 消息消费循环
                while self._running:
                    try:
                        message = asyncio.run(ws.recv())
                        data = json.loads(message)

                        # ⚠️ TickDB 返回格式以实际文档为准,此处为结构示意
                        if data.get("code") == 3001:
                            retry_after = int(data.get("retry_after", 5))
                            self.rate_limit_until = time.time() + retry_after
                            print(f"[TickDB] 触发限频,等待 {retry_after}s")
                            continue

                        if self.on_message:
                            self.on_message(data)

                    except websockets.exceptions.ConnectionClosed:
                        print("[TickDB] 连接意外断开,触发重连")
                        break

            except Exception as e:
                if not self._running:
                    break

                if self.on_error:
                    self.on_error(e)

                delay = self._calculate_reconnect_delay(retry)
                print(f"[TickDB] 连接异常: {e},{delay:.1f}s 后重试 (第 {retry+1} 次)")
                retry += 1
                time.sleep(delay)

    def run_forever(self):
        """阻塞主线程运行(适用于简单脚本场景)"""
        self.start()
        try:
            while self._running:
                time.sleep(1)
        except KeyboardInterrupt:
            self.stop()

⚠️ 工程提醒:上述代码使用 threading.Thread + 内部 asyncio.run() 封装,适合策略逻辑不复杂、对延迟要求在 100ms 级别的场景。如果你需要处理高频 orderflow 数据(<10ms 延迟),建议完全迁移到 asyncio 原生架构,避免 GIL 锁竞争。

2.2 连接状态监控

WebSocket 连接断开不一定有明显报错。你需要一个独立的监控线程定期检查连接健康状态:

import time

def start_connection_monitor(client: TickDBWebSocketClient, interval: float = 30):
    """定期检查连接健康状态,连接断开超过阈值自动告警"""
    def monitor():
        last_heartbeat = time.time()
        while True:
            time.sleep(interval)
            if time.time() - last_heartbeat > interval * 3:
                print(f"[告警] WebSocket 连接已失活超过 {interval * 3:.0f}s")
                # 触发告警:飞书/邮件/Slack
            else:
                print(f"[心跳] 连接正常,上次消息 {time.time() - last_heartbeat:.1f}s 前")
    
    thread = threading.Thread(target=monitor, daemon=True)
    thread.start()
    return thread

三、策略回调的线程安全设计

WebSocket 消息消费和策略计算通常在不同的执行上下文里运行。如果你在回调中直接修改共享状态(持仓、信号标志、资金状态),Race Condition(竞态条件)会在你毫无防备时让你的账户爆仓。

3.1 危险示例 vs 安全示例

# ❌ 线程不安全:on_message 直接操作共享状态
class UnsafeStrategy:
    def __init__(self):
        self.position = 0  # 共享状态
        self.last_signal_time = 0
    
    def on_tick(self, data):
        # 多线程并发调用这里!
        if self.position == 0 and self._signal_condition(data):
            self.position = 100  # 可能被多个线程同时判断为 True
            self.last_signal_time = time.time()

# ✅ 线程安全:使用锁保护共享状态,消息入队后统一消费
class SafeStrategy:
    def __init__(self):
        self.position = 0
        self.last_signal_time = 0
        self._lock = threading.Lock()
        self._signal_queue: deque = deque()
    
    def on_tick(self, data):
        """WebSocket 回调:只做一件事,把消息安全地放入队列"""
        with self._lock:
            self._signal_queue.append({
                "data": data,
                "timestamp": time.time()
            })
    
    def process_signals(self):
        """在独立线程中处理信号队列(轮询消费)"""
        while True:
            with self._lock:
                if not self._signal_queue:
                    continue
                item = self._signal_queue.popleft()
            
            data = item["data"]
            self._calculate_and_execute(data)

⚠️ 工程提醒:上述轮询消费模型在高吞吐场景下会引入固定延迟。如果信号频率 > 100条/秒,建议使用 asyncio.Queue 替代 deque + threading.Lock,由事件循环原生驱动,消除锁竞争开销。

3.2 异步消息队列的工程实现

import asyncio
from dataclasses import dataclass, field
from typing import Any, Optional
from enum import Enum
import time


class SignalType(Enum):
    MARKET_BUY = "market_buy"
    MARKET_SELL = "market_sell"
    LIMIT_BUY = "limit_buy"
    LIMIT_SELL = "limit_sell"
    CANCEL = "cancel"


@dataclass
class Signal:
    signal_type: SignalType
    symbol: str
    quantity: float
    price: Optional[float] = None
    reason: str = ""
    metadata: dict = field(default_factory=dict)
    created_at: float = field(default_factory=time.time)
    signal_id: str = ""


class AsyncSignalQueue:
    """
    异步信号队列:解耦消息接收与策略计算
    支持信号防抖、同类型信号合并、超时丢弃
    """

    def __init__(self, deduplication_window_ms: int = 500):
        self._queue: asyncio.Queue[Signal] = asyncio.Queue()
        self._deduplication_window_ms = deduplication_window_ms
        self._last_signals: dict[str, float] = {}  # symbol -> last_signal_time
        self._running = False

    async def enqueue(self, signal: Signal) -> bool:
        """
        入队并做信号防抖。
        返回 True 表示信号被接受,返回 False 表示被去重过滤。
        """
        key = f"{signal.symbol}:{signal.signal_type.value}"
        now = time.time() * 1000
        last_time = self._last_signals.get(key, 0)

        if now - last_time < self._deduplication_window_ms:
            # ⚠️ 信号被防抖过滤(详见第四章)
            return False

        self._last_signals[key] = now
        await self._queue.put(signal)
        return True

    async def consumer(self, callback: callable):
        """
        异步消费队列中的信号。
        callback 应为 async 函数,接收 Signal 对象。
        """
        self._running = True
        while self._running:
            try:
                signal = await asyncio.wait_for(self._queue.get(), timeout=1.0)
                await callback(signal)
            except asyncio.TimeoutError:
                continue
            except Exception as e:
                print(f"[错误] 信号处理异常: {e}")

    def stop(self):
        self._running = False

四、信号防抖:为什么你的止损策略会反复触发

信号防抖不是“去重”这么简单。在高频行情中,止损信号可能在 50ms 内触发 8 次。如果这 8 次都送到了下单模块,要么你的账户被连续的市价单拉爆,要么滑点叠加到让你亏 3 倍的止损幅度。

4.1 防抖的三层过滤机制

原始信号流
    ↓
第一层:同标的同方向防抖(时间窗口)
    ↓
第二层:信号强度过滤(偏离阈值)
    ↓
第三层:前置信号确认(成交量放大、价差收窄等)
    ↓
可执行信号

第一层:时间窗口防抖。 同一个标的、同一个方向,500ms 内只允许一次信号。这是最基础的去重手段。

class SignalDebouncer:
    """
    信号防抖器 - 三层过滤机制
    
    第一层:时间窗口(防止重复触发)
    第二层:偏离阈值(避免噪声信号)
    第三层:前置条件(成交量/流动性确认)
    """

    def __init__(
        self,
        window_ms: int = 500,
        price_deviation_threshold: float = 0.002,  # 价格偏离 0.2% 以上才触发
        volume_confirmation_ratio: float = 1.5,   # 成交量超过均量 1.5 倍确认
    ):
        self.window_ms = window_ms
        self.price_deviation_threshold = price_deviation_threshold
        self.volume_confirmation_ratio = volume_confirmation_ratio

        self._last_signals: dict[str, float] = {}
        self._price_history: dict[str, deque] = {}
        self._volume_history: dict[str, deque] = {}

    def should_emit(self, symbol: str, signal_type: str,
                    current_price: float, volume: float) -> tuple[bool, str]:
        """
        判断信号是否应该发出。返回 (是否通过, 拒绝原因)。
        """
        key = f"{symbol}:{signal_type}"
        now = time.time() * 1000

        # ===== 第一层:时间窗口 =====
        last_time = self._last_signals.get(key, 0)
        if now - last_time < self.window_ms:
            return False, f"时间窗口过滤(同向信号 {now - last_time:.0f}ms 前)"

        # ===== 第二层:偏离阈值 =====
        if symbol in self._price_history and self._price_history[symbol]:
            baseline_price = self._price_history[symbol][-1]
            deviation = abs(current_price - baseline_price) / baseline_price
            if deviation < self.price_deviation_threshold:
                return False, f"偏离不足({deviation*100:.3f}% < {self.price_deviation_threshold*100:.1f}%)"

        # ===== 第三层:成交量确认 =====
        if symbol in self._volume_history and len(self._volume_history[symbol]) >= 5:
            avg_volume = sum(self._volume_history[symbol]) / len(self._volume_history[symbol])
            if volume < avg_volume * self.volume_confirmation_ratio:
                return False, f"量能不足({volume:.0f} < {avg_volume * self.volume_confirmation_ratio:.0f})"

        return True, "通过所有过滤"

    def update_price(self, symbol: str, price: float, volume: float):
        """更新基准价格和成交量历史(供下一轮判断使用)"""
        if symbol not in self._price_history:
            self._price_history[symbol] = deque(maxlen=20)
        if symbol not in self._volume_history:
            self._volume_history[symbol] = deque(maxlen=20)

        self._price_history[symbol].append(price)
        self._volume_history[symbol].append(volume)

    def record_signal(self, symbol: str, signal_type: str):
        """记录信号发出时间(触发时间窗口计时)"""
        key = f"{symbol}:{signal_type}"
        self._last_signals[key] = time.time() * 1000

4.2 集成到完整信号链路

防抖器需要与策略回调、异步队列一起协作才能真正发挥作用。以下是三者的集成示例:

class StrategyEngine:
    """
    策略引擎:整合 WebSocket 接收 → 防抖过滤 → 异步信号队列 → 策略计算 → 订单执行
    """

    def __init__(self, symbols: list[str], api_key: str):
        self.symbols = symbols
        self.api_key = api_key

        # 核心组件
        self.ws_client = TickDBWebSocketClient(
            api_key=api_key,
            on_message=self._on_tick,
        )
        self.signal_queue = AsyncSignalQueue(deduplication_window_ms=500)
        self.debouncer = SignalDebouncer(
            window_ms=500,
            price_deviation_threshold=0.002,
            volume_confirmation_ratio=1.5,
        )
        self.strategy_tasks: list[asyncio.Task] = []

    def _on_tick(self, data: dict):
        """
        WebSocket 回调:在网络线程中执行,必须是纯函数,不阻塞。
        ⚠️ 不要在这里做复杂计算,只做数据解析和队列写入。
        """
        channel = data.get("channel")
        symbol = data.get("symbol")

        if channel == "ticker":
            ticker = data.get("data", {})
            price = float(ticker.get("last", 0))
            volume = float(ticker.get("volume", 0))

            # 更新基准数据(用于防抖判断)
            self.debouncer.update_price(symbol, price, volume)

            # 生成信号候选
            should_emit, reason = self.debouncer.should_emit(
                symbol, "stop_loss", price, volume
            )

            if should_emit:
                signal = Signal(
                    signal_type=SignalType.MARKET_SELL,
                    symbol=symbol,
                    quantity=self._calculate_position_size(symbol),
                    price=None,
                    reason=f"止损触发:价格 {price},量 {volume}",
                    metadata={"trigger_price": price, "volume": volume}
                )
                # 异步入队
                asyncio.create_task(self.signal_queue.enqueue(signal))
            else:
                print(f"[防抖] 拒绝信号: {symbol} - {reason}")

    async def _signal_consumer(self, signal: Signal):
        """
        异步消费信号队列:策略计算 + 订单执行
        """
        print(f"[信号] {signal.signal_type.value} {signal.symbol} "
              f"x {signal.quantity} @ {signal.reason}")

        # ===== 策略计算层 =====
        risk_assessment = await self._assess_risk(signal)

        if not risk_assessment["proceed"]:
            print(f"[策略] 风险评估拒绝:{risk_assessment['reason']}")
            return

        # ===== 订单执行层 =====
        await self._execute_order(signal)
        self.debouncer.record_signal(signal.symbol, signal.signal_type.value)

    async def _assess_risk(self, signal: Signal) -> dict:
        """
        风险评估层:在下单前做二次确认。
        包括:流动性检查、仓位检查、订单频率检查。
        """
        # 示例:检查卖方流动性深度
        # 实际生产中应接入 depth 频道数据
        return {
            "proceed": True,
            "reason": "pass"
        }

    async def _execute_order(self, signal: Signal):
        """
        订单执行层(对接券商 API 或 TickDB 订单通道)。
        ⚠️ 此处为示意,实际生产中需要接入真实券商接口。
        """
        print(f"[执行] 下单 {signal.signal_type.value} "
              f"{signal.symbol} {signal.quantity}股")
        # 实际调用:await broker_api.send_order(signal)
        await asyncio.sleep(0.1)  # 模拟网络延迟

    async def run(self):
        """启动策略引擎(asyncio 主循环)"""
        print(f"[引擎] 启动策略引擎,监控标的: {self.symbols}")

        # 启动信号消费协程
        consumer_task = asyncio.create_task(
            self.signal_queue.consumer(self._signal_consumer)
        )

        # 启动 WebSocket 客户端(需要在新线程中运行,因为它是同步封装)
        import threading
        ws_thread = threading.Thread(
            target=self.ws_client._run_loop_sync,
            daemon=True
        )
        ws_thread.start()

        print("[引擎] 所有组件已启动,按 Ctrl+C 退出")

        try:
            while True:
                await asyncio.sleep(1)
        except KeyboardInterrupt:
            consumer_task.cancel()
            self.ws_client.stop()
            print("[引擎] 已关闭")


# ===== 启动入口 =====
if __name__ == "__main__":
    import os
    api_key = os.environ.get("TICKDB_API_KEY")
    if not api_key:
        raise RuntimeError("请设置环境变量 TICKDB_API_KEY")

    engine = StrategyEngine(
        symbols=["AAPL.US", "NVDA.US"],
        api_key=api_key,
    )
    asyncio.run(engine.run())

五、毫秒级延迟的代价:你在 Linux 生产环境需要做什么

代码写得再好,如果跑在一个默认配置的 Ubuntu 虚拟机上,GC 暂停、网络调度延迟、内核协议栈开销随时会吃掉你的 50ms 延迟预算。以下是生产环境必须检查的配置清单。

5.1 用户态配置(进程级)

# 绑定 CPU 核心,避免进程在核心之间迁移(减少缓存失效)
taskset -c 0,1 python3 strategy_engine.py

# 设置进程优先级(需要 sudo)
# -20 是最高优先级,0 是默认
sudo renice -20 -p $(pgrep -f strategy_engine.py)

5.2 内核网络配置

# 增加 UDP/TCP 接收缓冲区(默认太小)
sysctl -w net.core.rmem_max=134217728      # 128MB
sysctl -w net.core.rmem_default=67108864   # 64MB

# 开启 TCP BBR 拥塞控制(改善跨区网络 RTT)
sysctl -w net.core.default_qdisc=fq
sysctl -w net.ipv4.tcp_congestion_control=bbr

# 禁用连接跟踪(nf_conntrack,在高频连接时会产生额外延迟)
sudo sysctl -w net.netfilter.nf_conntrack_max=65536

5.3 Python 运行时优化

import gc
import sys

# 禁用 GC(在高吞吐场景下,GC 会造成不可预期的暂停)
# 改为手动控制回收时机(在盘前/盘后低峰期执行)
gc.disable()

# 或者每 N 次循环手动回收(避免长时间累积)
gc_cycles = 0
GC_INTERVAL = 10000

def on_tick(data):
    global gc_cycles
    gc_cycles += 1
    if gc_cycles % GC_INTERVAL == 0:
        gc.collect()  # 在可控时机触发,不影响行情处理

5.2 延迟预算分配参考

一个 100ms 延迟目标的系统,各环节预算大致如下:

环节 目标延迟 最大可容忍 超标后果
TickDB 服务器推送延迟 <30ms 50ms 行情源问题,联系 TickDB 支持
网络传输(AWS 同区) <10ms 20ms 检查网络路径,考虑专线
WebSocket 消息入队 <1ms 5ms 检查是否在 asyncio 事件循环内
防抖过滤 <2ms 5ms 过滤逻辑 O(n),检查历史窗口大小
策略计算 <10ms 30ms 算法优化,减少 Python 对象创建
订单发送(网络) <20ms 50ms 券商 API 延迟,检查链路
总计 <73ms <160ms 超出此范围需重新评估架构

⚠️ 工程提醒:上述预算基于单标的、低频信号场景。如果你同时监控 20+ 个标的,每个标的每秒 10+ 条 tick,你的 asyncio 事件循环可能会积压消息。确保在 _on_tick 中不做任何阻塞操作(包括日志写入),并将所有 I/O 操作放到队列中异步处理。


六、架构全览:信号传导的完整闭环

┌─────────────────────────────────────────────────────────────────┐
│                      TickDB WebSocket 服务器                      │
│                    (wss://api.tickdb.ai/v1/ws/market)            │
└─────────────────────────────────────────────────────────────────┘
                               │
                    TCP 长连接 + 心跳保活
                    自动重连 + 限频自适应
                               │
                               ▼
┌─────────────────────────────────────────────────────────────────┐
│                    WebSocket 客户端(T1:网络线程)               │
│                                                               │
│  on_message 回调 ──────────────────────────────────────┐     │
│      ↓                                                   │     │
│  仅做:JSON 解析 + 消息入 asyncio.Queue(<1ms)          │     │
│  ❌ 禁止:任何 I/O、任何计算、任何锁                     │     │
└─────────────────────────────────────────────────────────┘
                               │
                    跨线程消息传递(lock-free)
                               │
                               ▼
┌─────────────────────────────────────────────────────────────────┐
│              asyncio 事件循环(T2:策略计算线程)                  │
│                                                               │
│  AsyncSignalQueue ←──── 防抖器(SignalDebouncer)               │
│      ↓                                                     │
│  三层过滤:时间窗口 → 偏离阈值 → 量能确认                       │
│      ↓                                                     │
│  策略计算(异步):风险评估、仓位检查                           │
│      ↓                                                     │
│  信号输出 → 订单执行层                                         │
└─────────────────────────────────────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────────┐
│                         订单执行层                               │
│           券商 API / 交易所接口 / TickDB 订单通道               │
└─────────────────────────────────────────────────────────────────┘

结语

数据到了之后怎么触发策略?答案是:不要让你的策略去“拿”数据,让数据来“推”策略。

这条原则的背后是一整套工程体系:WebSocket 长连接保证实时推送,消息队列解耦接收与计算,线程安全架构消除竞态条件,三层防抖过滤消灭噪声信号,Linux 内核调优压住延迟底噪。每一个环节都有它独立的失败模式,但它们组合在一起,就构成了一个真正能跑在毫秒级响应窗口里的策略引擎。

架构对了,策略逻辑才有意义。


下一步行动

如果你希望亲手实现本文策略

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key,设置为环境变量 TICKDB_API_KEY
  3. 复制本文代码,根据你的交易品种修改订阅列表

如果你需要 10 年全量历史 K 线数据做策略回测,联系 [email protected] 了解机构方案。

如果你在评估不同数据源的实时性和稳定性,可以对比 TickDB 与其他主流数据接口的功能差异,控制台有完整的 API 文档。


风险提示:本文不构成任何投资建议。所有策略逻辑和代码示例仅供技术参考,不构成下单依据。实盘交易涉及真实资金,风险自担。市场有风险,投资需谨慎。