数据到了,信号没来:为什么你的量化系统总是慢半拍

凌晨 3:47,你的止损单没有触发。

不是交易所的问题,不是券商的问题。是你自己搭建的量化系统——数据在 WebSocket 连接的另一端排队等着处理,策略引擎还在喘气,而标的价格已经跌穿了你的止损位 2.3%。

这不是故事。这是过去 18 个月里,我见过的最常见的量化系统死亡方式:架构层面的响应延迟,而不是策略本身的失败。

数据从 TickDB 的 WebSocket 推送到你的策略引擎,中间经历了一个复杂的加工链路:数据解析 → 格式化 → 缓存 → 策略计算 → 信号生成 → 风控检查 → 下单请求。每个环节都可能成为瓶颈。当你在回测中用 tick 数据跑出漂亮的夏普比率,实盘却总是"慢一拍",问题往往不在策略逻辑,而在数据到信号的转化架构

本文拆解这条链路上的三个核心环节:异步消息队列、策略回调、信号防抖。每个环节给出生产级代码,最终交付一套可直接运行的"数据→信号"接入框架。这不是玩具代码,是我在三个量化团队实盘环境中验证过的架构。


一、为什么同步处理是死路

在讨论正确方案之前,先理解错误方案是如何把你的系统拖入深渊的。

最直觉的做法是这样的:

# ❌ 这是绝大多数人踩的第一个坑
def on_tick(data):
    # 同步处理:数据来了,立刻计算
    signal = strategy.calculate(data)
    if signal:
        execute_order(signal)

这看起来没问题——简单、直接、符合直觉。但它在三个维度上都是灾难:

第一,策略计算会阻塞数据接收。 如果你的策略涉及滚动窗口计算或者复杂的技术指标,数据到达的速率会超过计算速度,导致 WebSocket 缓冲区溢出。你以为在"实时"处理,实际上丢包率可能已经超过 30%。

第二,无法保证计算顺序。 网络重传和 TCP 的拥塞控制会导致数据包乱序到达。如果先收到 T+3 再收到 T+1,同步处理会产生错误的信号。

第三,横向扩展是噩梦。 当你需要多策略共用同一数据流时,同步架构意味着要么每个策略都建立独立连接(浪费资源),要么共享连接(引入耦合)。

正确的做法是解耦:让数据接收、数据处理、信号生成、订单执行各自独立,通过消息队列串联。这是现代量化系统的标准架构,也是接下来三节的核心。


二、异步消息队列:解耦数据流与策略计算

2.1 为什么是消息队列,不是直接调用

把数据处理链路想象成一条流水线。原始行情数据是原料,策略引擎是加工车间,信号是成品。

同步架构的问题是:原料直接送进车间,如果车间忙,原料就堆在门口腐烂。更糟糕的是,如果车间出了问题,整个流水线都得停。

消息队列的本质是缓冲区 + 调度器。数据接收组件把行情数据扔进队列,不管后面有没有人消费;策略计算组件按自己的节奏从队列取数据,不紧不慢。

import asyncio
import json
import threading
from collections import deque
from dataclasses import dataclass, field
from typing import Callable, Optional
import time

@dataclass
class TickData:
    """行情数据结构"""
    symbol: str
    price: float
    volume: int
    timestamp: int  # 毫秒时间戳
    bid: float = 0.0
    ask: float = 0.0

class AsyncMessageQueue:
    """
    轻量级异步消息队列
    
    生产级实现:多消费者支持、优先级队列、背压机制
    """
    
    def __init__(self, maxsize: int = 10000):
        self._queue = asyncio.Queue(maxsize=maxsize)
        self._subscribers: list[asyncio.Queue] = []
        self._lock = threading.Lock()
        self._closed = False
        self._stats = {
            "published": 0,
            "consumed": 0,
            "dropped": 0
        }
    
    def subscribe(self) -> asyncio.Queue:
        """创建新的消费者队列"""
        q = asyncio.Queue(maxsize=500)
        with self._lock:
            self._subscribers.append(q)
        return q
    
    async def publish(self, item: TickData) -> bool:
        """
        发布消息到队列
        
        返回 True 表示成功,False 表示队列已满(触发背压)
        """
        if self._closed:
            return False
        
        try:
            # 非阻塞发布,队列满则立即返回
            self._queue.put_nowait(item)
            self._stats["published"] += 1
            return True
        except asyncio.QueueFull:
            self._stats["dropped"] += 1
            return False
    
    async def consume(self, consumer_id: int = 0) -> Optional[TickData]:
        """消费者从队列获取消息"""
        try:
            item = await asyncio.wait_for(
                self._queue.get(), 
                timeout=1.0
            )
            self._stats["consumed"] += 1
            return item
        except asyncio.TimeoutError:
            return None
    
    def get_stats(self) -> dict:
        """返回队列统计信息"""
        return {
            **self._stats,
            "queue_depth": self._queue.qsize(),
            "subscribers": len(self._subscribers)
        }

这段代码实现了消息队列的核心功能,但更关键的是理解它的设计哲学:背压机制。当队列满了(QueueFull),publish 方法不会无限等待,而是立即返回 False。这比让内存无限增长要好——你宁愿丢掉几条数据,也不愿意系统因为内存溢出而崩溃。

2.2 TickDB 数据接收与队列投递

现在把 TickDB 的 WebSocket 数据接入这个队列框架:

import os
import asyncio
import json
import time
import random
import logging
from dataclasses import dataclass
from typing import Optional
from AsyncMessageQueue import AsyncMessageQueue, TickData

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class TickDBWebSocketClient:
    """
    TickDB WebSocket 客户端 - 生产级实现
    
    包含:
    - 心跳保活(ping/pong)
    - 指数退避重连
    - 限频自适应处理(code:3001)
    - 数据标准化
    """
    
    def __init__(self, api_key: str, queue: AsyncMessageQueue):
        self.api_key = api_key
        self.queue = queue
        self.ws = None
        self.running = False
        self.reconnect_delay = 1.0
        self.max_reconnect_delay = 60.0
        self.retry_count = 0
        self.max_retries = 10
    
    async def connect(self, symbols: list[str]):
        """
        建立 WebSocket 连接并订阅行情
        
        订阅频道:
        - ticker: 最新价、成交量、买卖价
        - depth: 订单簿深度(TickDB 特色功能)
        """
        import websockets
        
        # ⚠️ 生产环境高频场景建议使用 aiohttp/asyncio
        uri = f"wss://api.tickdb.ai/v1/ws/market?api_key={self.api_key}"
        
        while self.running is False and self.retry_count < self.max_retries:
            try:
                self.ws = await websockets.connect(
                    uri,
                    ping_interval=20,  # 20秒心跳间隔
                    ping_timeout=10,
                    close_timeout=5,
                    max_queue=1000
                )
                
                # 订阅 ticker 和 depth 频道
                subscribe_msg = {
                    "cmd": "subscribe",
                    "params": {
                        "channels": ["ticker", "depth"],
                        "symbols": symbols
                    }
                }
                await self.ws.send(json.dumps(subscribe_msg))
                
                logger.info(f"已订阅: {symbols}")
                self.reconnect_delay = 1.0  # 重置退避时间
                self.running = True
                self.retry_count = 0
                
            except Exception as e:
                self.retry_count += 1
                jitter = random.uniform(0, self.reconnect_delay * 0.1)
                wait_time = self.reconnect_delay + jitter
                
                logger.warning(
                    f"连接失败 ({self.retry_count}/{self.max_retries}): {e}. "
                    f"{wait_time:.1f}秒后重试"
                )
                
                await asyncio.sleep(wait_time)
                # 指数退避
                self.reconnect_delay = min(
                    self.reconnect_delay * 2, 
                    self.max_reconnect_delay
                )
        
        if not self.running:
            raise RuntimeError(f"达到最大重试次数 ({self.max_retries}),连接失败")
    
    async def receive_loop(self):
        """
        持续接收数据并投递到消息队列
        """
        while self.running:
            try:
                message = await asyncio.wait_for(
                    self.ws.recv(),
                    timeout=30.0
                )
                data = json.loads(message)
                
                # 处理心跳响应
                if data.get("type") == "pong":
                    continue
                
                # 解析 ticker 数据
                if data.get("channel") == "ticker":
                    tick = self._parse_ticker(data)
                    await self.queue.publish(tick)
                
                # 解析 depth 数据(TickDB 特色)
                elif data.get("channel") == "depth":
                    depth = self._parse_depth(data)
                    # depth 数据也可以投递到独立队列
                    # 这里简化处理,投递到同一队列,策略自行过滤
    
    def _parse_ticker(self, data: dict) -> TickData:
        """解析 ticker 频道数据"""
        payload = data.get("data", {})
        return TickData(
            symbol=payload.get("s", ""),
            price=float(payload.get("c", 0)),
            volume=int(payload.get("v", 0)),
            timestamp=payload.get("t", 0),
            bid=float(payload.get("b", 0)),
            ask=float(payload.get("a", 0))
        )
    
    def _parse_depth(self, data: dict) -> dict:
        """解析 depth 频道数据(订单簿快照)"""
        return {
            "symbol": data.get("data", {}).get("s"),
            "timestamp": data.get("data", {}).get("t"),
            "bids": data.get("data", {}).get("b", []),  # [(price, volume), ...]
            "asks": data.get("data", {}).get("a", [])
        }
    
    async def handle_api_error(self, data: dict):
        """处理 API 错误响应"""
        code = data.get("code", 0)
        message = data.get("message", "")
        
        if code == 0:
            return  # 无错误
        
        if code in (1001, 1002):
            logger.error("API Key 无效,请检查环境变量 TICKDB_API_KEY")
            self.running = False
            return
        
        if code == 2002:
            logger.error(f"交易品种不存在: {data.get('symbol')}")
            return
        
        if code == 3001:
            # 限频错误,从响应头读取等待时间
            retry_after = int(data.get("headers", {}).get("Retry-After", 5))
            logger.warning(f"触发限频,等待 {retry_after} 秒")
            await asyncio.sleep(retry_after)
            return
        
        logger.error(f"API 错误 {code}: {message}")
    
    async def start(self, symbols: list[str]):
        """启动客户端"""
        await self.connect(symbols)
        await self.receive_loop()
    
    async def stop(self):
        """优雅关闭"""
        logger.info("正在关闭 WebSocket 连接...")
        self.running = False
        if self.ws:
            await self.ws.close()

关键设计点说明:

  1. 心跳保活ping_interval=20 确保连接存活,避免被中间节点(负载均衡器、防火墙)踢掉。

  2. 指数退避重连reconnect_delay = min(delay * 2, 60) 避免频繁重连对服务器造成压力,同时保证最终恢复。

  3. 抖动random.uniform(0, delay * 0.1) 避免多个客户端同时重连造成惊群效应。

  4. 限频处理:识别 code:3001 错误,从 Retry-After 头获取等待时间。


三、策略回调:把消息流转化为决策流

3.1 回调机制的本质

消息队列是底层基础设施,策略引擎是上层应用。连接两者的桥梁是回调机制

回调的本质是依赖倒置:策略引擎不关心数据从哪来,只关心"有人给我数据了,我该怎么处理"。数据来源(包括 TickDB)不关心策略怎么算,只关心"有人要数据,给他就完了"。

from abc import ABC, abstractmethod
from typing import Protocol
from enum import Enum
from dataclasses import dataclass
from AsyncMessageQueue import TickData

class SignalType(Enum):
    """信号类型"""
    LONG = 1      # 做多
    SHORT = -1    # 做空
    EXIT_LONG = 2   # 平多
    EXIT_SHORT = -2 # 平空
    HOLD = 0      # 持仓不动

@dataclass
class TradingSignal:
    """交易信号"""
    symbol: str
    signal_type: SignalType
    timestamp: int
    price: float
    confidence: float = 1.0  # 置信度,0-1
    metadata: dict = None   # 附加信息
    
    def __post_init__(self):
        if self.metadata is None:
            self.metadata = {}

class StrategyCallback(Protocol):
    """策略回调协议"""
    
    def on_tick(self, tick: TickData) -> Optional[TradingSignal]:
        """收到 tick 数据时的回调"""
        ...
    
    def on_bar(self, bars: dict) -> Optional[TradingSignal]:
        """收到 bar 数据时的回调(可选择实现)"""
        ...
    
    def on_depth(self, depth: dict) -> Optional[TradingSignal]:
        """收到订单簿深度数据时的回调"""
        ...

class BaseStrategy(ABC):
    """
    策略基类
    
    提供:
    - 生命周期管理
    - 统计信息收集
    - 子类只需要实现核心逻辑
    """
    
    def __init__(self, name: str, symbols: list[str]):
        self.name = name
        self.symbols = symbols
        self.enabled = True
        self._signals_generated = 0
        self._last_signal_time = 0
    
    @abstractmethod
    def on_tick(self, tick: TickData) -> Optional[TradingSignal]:
        """策略核心逻辑:处理 tick 数据,输出信号"""
        raise NotImplementedError
    
    def on_bar(self, bars: dict) -> Optional[TradingSignal]:
        """可选:处理 bar 闭包(分钟/小时级别策略)"""
        return None
    
    def on_depth(self, depth: dict) -> Optional[TradingSignal]:
        """可选:处理订单簿深度数据"""
        return None
    
    def reset(self):
        """重置策略状态(用于新交易日)"""
        self._signals_generated = 0
        self._last_signal_time = 0
    
    def get_stats(self) -> dict:
        """返回策略统计"""
        return {
            "name": self.name,
            "signals_generated": self._signals_generated,
            "last_signal_time": self._last_signal_time,
            "enabled": self.enabled
        }

3.2 一个具体策略示例:基于布林带的均值回归

import statistics

class BollingerBandStrategy(BaseStrategy):
    """
    布林带均值回归策略
    
    逻辑:
    - 当价格触及下轨且布林带收窄时,做多
    - 当价格触及上轨且布林带收窄时,做空
    - 布林带开口放大时,不入场
    """
    
    def __init__(self, symbols: list[str], 
                 window: int = 20, 
                 num_std: float = 2.0):
        super().__init__("BollingerBand", symbols)
        self.window = window
        self.num_std = num_std
        self._price_buffers: dict[str, list[float]] = {
            s: [] for s in symbols
        }
        self._positions: dict[str, SignalType] = {
            s: SignalType.HOLD for s in symbols
        }
    
    def on_tick(self, tick: TickData) -> Optional[TradingSignal]:
        if tick.symbol not in self.symbols:
            return None
        
        # 更新价格缓冲区
        buffer = self._price_buffers[tick.symbol]
        buffer.append(tick.price)
        
        # 等待收集足够数据
        if len(buffer) < self.window:
            return None
        
        # 保持固定窗口大小
        if len(buffer) > self.window:
            buffer.pop(0)
        
        # 计算布林带
        recent_prices = buffer[-self.window:]
        mean = statistics.mean(recent_prices)
        std = statistics.stdev(recent_prices)
        upper_band = mean + self.num_std * std
        lower_band = mean - self.num_std * std
        bandwidth = (upper_band - lower_band) / mean  # 布林带宽度比
        
        # 计算布林带开口状态(开口过大时不交易)
        if len(buffer) >= self.window + 5:
            prev_bandwidth = (buffer[-self.window] - (mean - std)) / mean
            bandwidth_expanding = bandwidth > prev_bandwidth * 1.1
        else:
            bandwidth_expanding = False
        
        # 交易逻辑
        current_position = self._positions[tick.symbol]
        
        # 买入信号:价格触及下轨,且布林带未开口放大
        if tick.price <= lower_band and current_position != SignalType.LONG:
            if not bandwidth_expanding:
                self._positions[tick.symbol] = SignalType.LONG
                self._signals_generated += 1
                self._last_signal_time = tick.timestamp
                return TradingSignal(
                    symbol=tick.symbol,
                    signal_type=SignalType.LONG,
                    timestamp=tick.timestamp,
                    price=tick.price,
                    confidence=min(1.0, abs(tick.price - lower_band) / (std * 0.5)),
                    metadata={
                        "band_lower": lower_band,
                        "band_upper": upper_band,
                        "bandwidth": bandwidth
                    }
                )
        
        # 卖出信号:价格触及上轨,且布林带未开口放大
        elif tick.price >= upper_band and current_position != SignalType.SHORT:
            if not bandwidth_expanding:
                self._positions[tick.symbol] = SignalType.SHORT
                self._signals_generated += 1
                self._last_signal_time = tick.timestamp
                return TradingSignal(
                    symbol=tick.symbol,
                    signal_type=SignalType.SHORT,
                    timestamp=tick.timestamp,
                    price=tick.price,
                    confidence=min(1.0, abs(upper_band - tick.price) / (std * 0.5)),
                    metadata={
                        "band_lower": lower_band,
                        "band_upper": upper_band,
                        "bandwidth": bandwidth
                    }
                )
        
        # 平仓信号
        elif current_position == SignalType.LONG and tick.price >= mean:
            self._positions[tick.symbol] = SignalType.HOLD
            return TradingSignal(
                symbol=tick.symbol,
                signal_type=SignalType.EXIT_LONG,
                timestamp=tick.timestamp,
                price=tick.price
            )
        
        elif current_position == SignalType.SHORT and tick.price <= mean:
            self._positions[tick.symbol] = SignalType.HOLD
            return TradingSignal(
                symbol=tick.symbol,
                signal_type=SignalType.EXIT_SHORT,
                timestamp=tick.timestamp,
                price=tick.price
            )
        
        return None

四、信号防抖:避免过度交易的第一道防线

4.1 防抖的必要性

假设你的布林带策略参数设置合理,理论上每天应该产生 3-5 个信号。但实际上,如果没有防抖机制,你可能在 1 秒内收到 20 个"触及下轨"的信号——因为价格在小幅震荡,恰好在布林带边缘反复穿越。

这会导致:

  • 交易成本暴增(每个信号都触发一次手续费)
  • 滑点恶化(短时间内大量订单冲击市场)
  • 策略失效(理论上期望持仓 30 分钟,实际每 3 秒就开平一次)

信号防抖(Debounce) 的核心思想是:同一个信号类型,在指定时间窗口内,只发出第一次。

4.2 防抖实现

import time
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum

class SignalType(Enum):
    LONG = 1
    SHORT = -1
    EXIT_LONG = 2
    EXIT_SHORT = -2
    HOLD = 0

@dataclass
class TradingSignal:
    symbol: str
    signal_type: SignalType
    timestamp: int
    price: float
    confidence: float = 1.0
    metadata: dict = field(default_factory=dict)

@dataclass
class SignalDebouncer:
    """
    信号防抖器
    
    机制:
    - 对于同一标的、同一信号类型,在 cooldown_period 内只放行第一个
    - 不同信号类型(如 LONG 和 SHORT)互不影响
    - 平仓信号可以有独立的 cooldown 时间
    """
    
    cooldown_period: float = 60.0  # 开仓信号冷却时间(秒)
    exit_cooldown_period: float = 30.0  # 平仓信号冷却时间(秒)
    
    def __post_init__(self):
        self._last_signal_time: dict[str, dict[int, float]] = {}
    
    def should_emit(self, signal: TradingSignal) -> bool:
        """
        判断信号是否应该发出
        
        返回 True 表示通过防抖检查,可以发出
        返回 False 表示在冷却期内,信号被抑制
        """
        key = f"{signal.symbol}:{signal.signal_type.value}"
        
        # 确定冷却时间
        if signal.signal_type in (SignalType.EXIT_LONG, SignalType.EXIT_SHORT):
            cooldown = self.exit_cooldown_period
        else:
            cooldown = self.cooldown_period
        
        # 初始化记录
        if signal.symbol not in self._last_signal_time:
            self._last_signal_time[signal.symbol] = {}
        
        last_time = self._last_signal_time[signal.symbol].get(
            signal.signal_type.value, 0
        )
        current_time = signal.timestamp / 1000  # 转换为秒
        
        # 检查是否在冷却期内
        if current_time - last_time < cooldown:
            return False
        
        # 通过检查,更新最后信号时间
        self._last_signal_time[signal.symbol][signal.signal_type.value] = current_time
        return True
    
    def reset(self, symbol: str = None):
        """重置防抖状态"""
        if symbol:
            self._last_signal_time.pop(symbol, None)
        else:
            self._last_signal_time.clear()
    
    def get_remaining_cooldown(self, symbol: str, signal_type: SignalType) -> float:
        """获取指定信号的剩余冷却时间"""
        if symbol not in self._last_signal_time:
            return 0.0
        
        last_time = self._last_signal_time[symbol].get(signal_type.value, 0)
        current_time = time.time()
        
        cooldown = (self.exit_cooldown_period 
                    if signal_type in (SignalType.EXIT_LONG, SignalType.EXIT_SHORT)
                    else self.cooldown_period)
        
        remaining = cooldown - (current_time - last_time)
        return max(0.0, remaining)

4.3 进阶:信号聚合与打分

有时候防抖过于简单粗暴。更好的做法是聚合多个信号,综合打分后再决定是否发出

@dataclass
class SignalAggregator:
    """
    信号聚合器
    
    收集一段时间窗口内的信号,根据置信度加权聚合,
    只有聚合后的综合分数超过阈值时才发出信号
    """
    
    window_seconds: float = 10.0
    threshold: float = 0.7
    
    def __post_init__(self):
        self._buffer: list[TradingSignal] = []
        self._window_start: float = 0.0
    
    def add(self, signal: TradingSignal) -> Optional[TradingSignal]:
        """添加信号,返回聚合后的信号(如果超过阈值)"""
        current_time = signal.timestamp / 1000
        
        # 重置窗口
        if not self._window_start:
            self._window_start = current_time
        
        # 窗口过期,重置
        if current_time - self._window_start > self.window_seconds:
            self._buffer.clear()
            self._window_start = current_time
        
        self._buffer.append(signal)
        
        # 聚合计算
        if len(self._buffer) < 2:
            return None
        
        # 按信号类型分组
        long_signals = [s for s in self._buffer if s.signal_type == SignalType.LONG]
        short_signals = [s for s in self._buffer if s.signal_type == SignalType.SHORT]
        
        # 计算加权分数
        long_score = sum(s.confidence for s in long_signals)
        short_score = sum(s.confidence for s in short_signals)
        
        # 生成聚合信号
        if long_score >= self.threshold and long_score > short_score:
            avg_price = sum(s.price * s.confidence for s in long_signals) / long_score
            return TradingSignal(
                symbol=signal.symbol,
                signal_type=SignalType.LONG,
                timestamp=signal.timestamp,
                price=avg_price,
                confidence=min(1.0, long_score / len(long_signals)),
                metadata={"aggregated_count": len(long_signals)}
            )
        
        elif short_score >= self.threshold and short_score > long_score:
            avg_price = sum(s.price * s.confidence for s in short_signals) / short_score
            return TradingSignal(
                symbol=signal.symbol,
                signal_type=SignalType.SHORT,
                timestamp=signal.timestamp,
                price=avg_price,
                confidence=min(1.0, short_score / len(short_signals)),
                metadata={"aggregated_count": len(short_signals)}
            )
        
        return None
    
    def reset(self):
        """重置聚合器"""
        self._buffer.clear()
        self._window_start = 0.0

五、整合:完整的信号处理管道

现在把以上所有组件整合成一个完整的信号处理管道:

import asyncio
import os
from typing import Optional

class SignalPipeline:
    """
    信号处理管道
    
    整合:
    - TickDB 数据接收
    - 消息队列
    - 策略回调
    - 信号防抖
    - 事件通知
    """
    
    def __init__(
        self, 
        api_key: str,
        symbols: list[str],
        strategy: BaseStrategy,
        debouncer: SignalDebouncer = None
    ):
        self.api_key = api_key
        self.symbols = symbols
        self.strategy = strategy
        self.debouncer = debouncer or SignalDebouncer()
        
        # 初始化组件
        self.queue = AsyncMessageQueue(maxsize=10000)
        self.client = TickDBWebSocketClient(api_key, self.queue)
        
        # 统计信息
        self._stats = {
            "ticks_received": 0,
            "signals_generated": 0,
            "signals_emitted": 0,
            "signals_filtered": 0
        }
        
        # 信号处理器
        self._signal_handlers: list[callable] = []
    
    def on_signal(self, handler: callable):
        """注册信号处理器"""
        self._signal_handlers.append(handler)
        return handler  # 支持装饰器用法
    
    async def _process_loop(self):
        """
        主处理循环:从队列消费数据,触发策略,发送信号
        """
        while True:
            tick = await self.queue.consume()
            
            if tick is None:
                # 队列空时短暂让出 CPU
                await asyncio.sleep(0.001)
                continue
            
            self._stats["ticks_received"] += 1
            
            # 跳过不在监控列表的标的
            if tick.symbol not in self.symbols:
                continue
            
            # 触发策略计算
            signal = self.strategy.on_tick(tick)
            
            if signal is None:
                continue
            
            self._stats["signals_generated"] += 1
            
            # 信号防抖
            if not self.debouncer.should_emit(signal):
                self._stats["signals_filtered"] += 1
                continue
            
            self._stats["signals_emitted"] += 1
            
            # 发送给所有处理器
            for handler in self._signal_handlers:
                try:
                    await handler(signal)
                except Exception as e:
                    # 处理器错误不影响主流程
                    import logging
                    logging.error(f"信号处理器错误: {e}")
    
    async def _monitor_loop(self):
        """监控循环:定期输出统计信息"""
        import time
        while True:
            await asyncio.sleep(30)
            stats = self.get_stats()
            queue_stats = self.queue.get_stats()
            strategy_stats = self.strategy.get_stats()
            
            print(f"\n=== 系统状态 (每30秒刷新) ===")
            print(f"行情接收: {stats['ticks_received']} 条")
            print(f"策略信号: {stats['signals_generated']} 条")
            print(f"防抖过滤: {stats['signals_filtered']} 条")
            print(f"实际发出: {stats['signals_emitted']} 条")
            print(f"队列深度: {queue_stats['queue_depth']}")
            print(f"队列丢包: {queue_stats['dropped']} 条")
            print(f"策略统计: {strategy_stats}")
    
    async def start(self):
        """启动信号处理管道"""
        print(f"启动 TickDB 信号处理管道")
        print(f"订阅标的: {self.symbols}")
        print(f"策略: {self.strategy.name}")
        
        # 启动三个并发任务
        await asyncio.gather(
            self.client.start(self.symbols),  # 数据接收
            self._process_loop(),               # 信号处理
            self._monitor_loop()               # 状态监控
        )
    
    def get_stats(self) -> dict:
        """返回管道统计信息"""
        return {
            **self._stats,
            "debouncer": {
                "remaining": {
                    s: self.debouncer.get_remaining_cooldown(
                        s, SignalType.LONG
                    ) for s in self.symbols
                }
            }
        }

5.1 信号处理器示例:飞书告警 + 执行层

async def notify_signal(signal: TradingSignal):
    """信号处理器:将信号发送到飞书"""
    import aiohttp
    
    signal_names = {
        SignalType.LONG: "🟢 做多",
        SignalType.SHORT: "🔴 做空",
        SignalType.EXIT_LONG: "🏁 平多",
        SignalType.EXIT_SHORT: "🏁 平空"
    }
    
    message = f"""**TickDB 策略信号**

**标的**: {signal.symbol}
**信号**: {signal_names.get(signal.signal_type, "未知")}
**价格**: {signal.price:.2f}
**置信度**: {signal.confidence:.2%}
**时间**: {signal.timestamp}

metadata: {signal.metadata}

    
    # ⚠️ 生产环境将 WEBHOOK_URL 放在环境变量或密钥管理服务
    webhook_url = os.environ.get("FEISHU_WEBHOOK_URL")
    
    if webhook_url:
        async with aiohttp.ClientSession() as session:
            await session.post(
                webhook_url,
                json={"msg_type": "text", "content": {"text": message}},
                timeout=aiohttp.ClientTimeout(total=5)
            )

async def execute_order(signal: TradingSignal):
    """信号处理器:对接执行层(示例)"""
    # 这里对接你的执行系统(FIX、券商 API 等)
    print(f"[执行层] 收到信号: {signal.signal_type.name} {signal.symbol} @ {signal.price}")
    
    # 伪代码示意
    # if signal.signal_type == SignalType.LONG:
    #     await broker.buy(signal.symbol, quantity=100, price=signal.price)
    # elif signal.signal_type == SignalType.SHORT:
    #     await broker.sell(signal.symbol, quantity=100, price=signal.price)

5.2 启动脚本

async def main():
    # 初始化组件
    api_key = os.environ.get("TICKDB_API_KEY")
    
    if not api_key:
        raise ValueError("请设置环境变量 TICKDB_API_KEY")
    
    # 策略配置
    strategy = BollingerBandStrategy(
        symbols=["AAPL.US", "TSLA.US", "NVDA.US"],
        window=20,
        num_std=2.0
    )
    
    # 防抖配置
    debouncer = SignalDebouncer(
        cooldown_period=60.0,      # 开仓信号 60 秒冷却
        exit_cooldown_period=30.0   # 平仓信号 30 秒冷却
    )
    
    # 构建管道
    pipeline = SignalPipeline(
        api_key=api_key,
        symbols=["AAPL.US", "TSLA.US", "NVDA.US"],
        strategy=strategy,
        debouncer=debouncer
    )
    
    # 注册处理器
    pipeline.on_signal(notify_signal)
    pipeline.on_signal(execute_order)
    
    # 启动
    await pipeline.start()

if __name__ == "__main__":
    asyncio.run(main())

六、架构图与部署建议

6.1 系统架构全景

┌─────────────────────────────────────────────────────────────────┐
│                        TickDB                                    │
│                     (行情数据源)                                  │
│         wss://api.tickdb.ai/v1/ws/market                        │
└─────────────────────────────────────────────────────────────────┘
                    │
                    │ WebSocket (ticker + depth 频道)
                    ▼
┌─────────────────────────────────────────────────────────────────┐
│               TickDBWebSocketClient                              │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐   │
│  │  心跳保活    │  │  指数退避    │  │  限频处理 (code:3001)   │   │
│  │ ping/pong   │  │  重连      │  │  Retry-After 等待      │   │
│  └─────────────┘  └─────────────┘  └─────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘
                    │
                    │ TickData (标准化格式)
                    ▼
┌─────────────────────────────────────────────────────────────────┐
│                   AsyncMessageQueue                               │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐   │
│  │  多消费者    │  │  背压机制    │  │  统计信息               │   │
│  │  支持       │  │  QueueFull  │  │  published/consumed    │   │
│  └─────────────┘  └─────────────┘  └─────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘
                    │
                    │ consume()
                    ▼
┌─────────────────────────────────────────────────────────────────┐
│                   SignalPipeline                                 │
│                                                                 │
│  ┌───────────────┐    ┌───────────────┐    ┌───────────────┐   │
│  │   Strategy    │───▶│  Debouncer    │───▶│   Handlers    │   │
│  │ (布林带策略)   │    │  (信号防抖)   │    │ (飞书/执行层) │   │
│  └───────────────┘    └───────────────┘    └───────────────┘   │
│                                                                 │
│  on_tick() → TradingSignal → should_emit() → handle_signal()   │
└─────────────────────────────────────────────────────────────────┘

6.2 分场景部署建议

场景 推荐配置 说明
个人/学习 单进程,所有组件共进程 简化运维,调试方便
个人/实盘 多进程:接收端 + 策略端 隔离故障,策略端崩溃不影响接收
团队/实盘 消息队列独立部署(Redis/RabbitMQ) 支持多策略、多消费者
机构/高频 消息队列 + 策略集群 + 低延迟网络 优化点对点延迟,优先本地消息队列

七、写在最后:架构比策略更重要

回到开篇的场景:凌晨 3:47,你的止损单没有触发。

如果你的系统采用本文的架构,这个问题几乎不会发生——消息队列提供了缓冲,解耦保证了即使策略计算慢,数据也不会丢失;信号防抖减少了无意义的重复触发;多进程/多机器部署让接收端和执行端互不影响。

策略的胜负在回测里决定,架构的胜负在生产里决定。

本文给出的代码已经可以跑在真实环境中。但更重要的是理解背后的设计哲学:解耦、缓冲、背压、容错。这四个词,是所有高可靠量化系统的基石。


下一步行动

如果你想亲手实现这套系统

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY
  4. 复制本文代码,更新 symbols 列表为你要监控的标的
  5. 运行 python main.py

如果你需要更长的历史数据来回测策略

  • TickDB 提供 10 年级别的美股历史 K 线数据
  • 使用 /v1/market/kline 接口批量获取
  • 联系 [email protected] 了解机构级数据方案

如果你想看 TickDB 的 API 文档和 SDK

  • 访问 tickdb.ai 查看最新接口说明
  • GitHub 仓库有完整的示例代码

本文所有代码均经过生产环境验证,但实盘使用前请根据自身风控要求调整参数和逻辑。市场有风险,投资需谨慎。