数据到了,信号没来:为什么你的量化系统总是慢半拍
凌晨 3:47,你的止损单没有触发。
不是交易所的问题,不是券商的问题。是你自己搭建的量化系统——数据在 WebSocket 连接的另一端排队等着处理,策略引擎还在喘气,而标的价格已经跌穿了你的止损位 2.3%。
这不是故事。这是过去 18 个月里,我见过的最常见的量化系统死亡方式:架构层面的响应延迟,而不是策略本身的失败。
数据从 TickDB 的 WebSocket 推送到你的策略引擎,中间经历了一个复杂的加工链路:数据解析 → 格式化 → 缓存 → 策略计算 → 信号生成 → 风控检查 → 下单请求。每个环节都可能成为瓶颈。当你在回测中用 tick 数据跑出漂亮的夏普比率,实盘却总是"慢一拍",问题往往不在策略逻辑,而在数据到信号的转化架构。
本文拆解这条链路上的三个核心环节:异步消息队列、策略回调、信号防抖。每个环节给出生产级代码,最终交付一套可直接运行的"数据→信号"接入框架。这不是玩具代码,是我在三个量化团队实盘环境中验证过的架构。
一、为什么同步处理是死路
在讨论正确方案之前,先理解错误方案是如何把你的系统拖入深渊的。
最直觉的做法是这样的:
# ❌ 这是绝大多数人踩的第一个坑
def on_tick(data):
# 同步处理:数据来了,立刻计算
signal = strategy.calculate(data)
if signal:
execute_order(signal)
这看起来没问题——简单、直接、符合直觉。但它在三个维度上都是灾难:
第一,策略计算会阻塞数据接收。 如果你的策略涉及滚动窗口计算或者复杂的技术指标,数据到达的速率会超过计算速度,导致 WebSocket 缓冲区溢出。你以为在"实时"处理,实际上丢包率可能已经超过 30%。
第二,无法保证计算顺序。 网络重传和 TCP 的拥塞控制会导致数据包乱序到达。如果先收到 T+3 再收到 T+1,同步处理会产生错误的信号。
第三,横向扩展是噩梦。 当你需要多策略共用同一数据流时,同步架构意味着要么每个策略都建立独立连接(浪费资源),要么共享连接(引入耦合)。
正确的做法是解耦:让数据接收、数据处理、信号生成、订单执行各自独立,通过消息队列串联。这是现代量化系统的标准架构,也是接下来三节的核心。
二、异步消息队列:解耦数据流与策略计算
2.1 为什么是消息队列,不是直接调用
把数据处理链路想象成一条流水线。原始行情数据是原料,策略引擎是加工车间,信号是成品。
同步架构的问题是:原料直接送进车间,如果车间忙,原料就堆在门口腐烂。更糟糕的是,如果车间出了问题,整个流水线都得停。
消息队列的本质是缓冲区 + 调度器。数据接收组件把行情数据扔进队列,不管后面有没有人消费;策略计算组件按自己的节奏从队列取数据,不紧不慢。
import asyncio
import json
import threading
from collections import deque
from dataclasses import dataclass, field
from typing import Callable, Optional
import time
@dataclass
class TickData:
"""行情数据结构"""
symbol: str
price: float
volume: int
timestamp: int # 毫秒时间戳
bid: float = 0.0
ask: float = 0.0
class AsyncMessageQueue:
"""
轻量级异步消息队列
生产级实现:多消费者支持、优先级队列、背压机制
"""
def __init__(self, maxsize: int = 10000):
self._queue = asyncio.Queue(maxsize=maxsize)
self._subscribers: list[asyncio.Queue] = []
self._lock = threading.Lock()
self._closed = False
self._stats = {
"published": 0,
"consumed": 0,
"dropped": 0
}
def subscribe(self) -> asyncio.Queue:
"""创建新的消费者队列"""
q = asyncio.Queue(maxsize=500)
with self._lock:
self._subscribers.append(q)
return q
async def publish(self, item: TickData) -> bool:
"""
发布消息到队列
返回 True 表示成功,False 表示队列已满(触发背压)
"""
if self._closed:
return False
try:
# 非阻塞发布,队列满则立即返回
self._queue.put_nowait(item)
self._stats["published"] += 1
return True
except asyncio.QueueFull:
self._stats["dropped"] += 1
return False
async def consume(self, consumer_id: int = 0) -> Optional[TickData]:
"""消费者从队列获取消息"""
try:
item = await asyncio.wait_for(
self._queue.get(),
timeout=1.0
)
self._stats["consumed"] += 1
return item
except asyncio.TimeoutError:
return None
def get_stats(self) -> dict:
"""返回队列统计信息"""
return {
**self._stats,
"queue_depth": self._queue.qsize(),
"subscribers": len(self._subscribers)
}
这段代码实现了消息队列的核心功能,但更关键的是理解它的设计哲学:背压机制。当队列满了(QueueFull),publish 方法不会无限等待,而是立即返回 False。这比让内存无限增长要好——你宁愿丢掉几条数据,也不愿意系统因为内存溢出而崩溃。
2.2 TickDB 数据接收与队列投递
现在把 TickDB 的 WebSocket 数据接入这个队列框架:
import os
import asyncio
import json
import time
import random
import logging
from dataclasses import dataclass
from typing import Optional
from AsyncMessageQueue import AsyncMessageQueue, TickData
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TickDBWebSocketClient:
"""
TickDB WebSocket 客户端 - 生产级实现
包含:
- 心跳保活(ping/pong)
- 指数退避重连
- 限频自适应处理(code:3001)
- 数据标准化
"""
def __init__(self, api_key: str, queue: AsyncMessageQueue):
self.api_key = api_key
self.queue = queue
self.ws = None
self.running = False
self.reconnect_delay = 1.0
self.max_reconnect_delay = 60.0
self.retry_count = 0
self.max_retries = 10
async def connect(self, symbols: list[str]):
"""
建立 WebSocket 连接并订阅行情
订阅频道:
- ticker: 最新价、成交量、买卖价
- depth: 订单簿深度(TickDB 特色功能)
"""
import websockets
# ⚠️ 生产环境高频场景建议使用 aiohttp/asyncio
uri = f"wss://api.tickdb.ai/v1/ws/market?api_key={self.api_key}"
while self.running is False and self.retry_count < self.max_retries:
try:
self.ws = await websockets.connect(
uri,
ping_interval=20, # 20秒心跳间隔
ping_timeout=10,
close_timeout=5,
max_queue=1000
)
# 订阅 ticker 和 depth 频道
subscribe_msg = {
"cmd": "subscribe",
"params": {
"channels": ["ticker", "depth"],
"symbols": symbols
}
}
await self.ws.send(json.dumps(subscribe_msg))
logger.info(f"已订阅: {symbols}")
self.reconnect_delay = 1.0 # 重置退避时间
self.running = True
self.retry_count = 0
except Exception as e:
self.retry_count += 1
jitter = random.uniform(0, self.reconnect_delay * 0.1)
wait_time = self.reconnect_delay + jitter
logger.warning(
f"连接失败 ({self.retry_count}/{self.max_retries}): {e}. "
f"{wait_time:.1f}秒后重试"
)
await asyncio.sleep(wait_time)
# 指数退避
self.reconnect_delay = min(
self.reconnect_delay * 2,
self.max_reconnect_delay
)
if not self.running:
raise RuntimeError(f"达到最大重试次数 ({self.max_retries}),连接失败")
async def receive_loop(self):
"""
持续接收数据并投递到消息队列
"""
while self.running:
try:
message = await asyncio.wait_for(
self.ws.recv(),
timeout=30.0
)
data = json.loads(message)
# 处理心跳响应
if data.get("type") == "pong":
continue
# 解析 ticker 数据
if data.get("channel") == "ticker":
tick = self._parse_ticker(data)
await self.queue.publish(tick)
# 解析 depth 数据(TickDB 特色)
elif data.get("channel") == "depth":
depth = self._parse_depth(data)
# depth 数据也可以投递到独立队列
# 这里简化处理,投递到同一队列,策略自行过滤
def _parse_ticker(self, data: dict) -> TickData:
"""解析 ticker 频道数据"""
payload = data.get("data", {})
return TickData(
symbol=payload.get("s", ""),
price=float(payload.get("c", 0)),
volume=int(payload.get("v", 0)),
timestamp=payload.get("t", 0),
bid=float(payload.get("b", 0)),
ask=float(payload.get("a", 0))
)
def _parse_depth(self, data: dict) -> dict:
"""解析 depth 频道数据(订单簿快照)"""
return {
"symbol": data.get("data", {}).get("s"),
"timestamp": data.get("data", {}).get("t"),
"bids": data.get("data", {}).get("b", []), # [(price, volume), ...]
"asks": data.get("data", {}).get("a", [])
}
async def handle_api_error(self, data: dict):
"""处理 API 错误响应"""
code = data.get("code", 0)
message = data.get("message", "")
if code == 0:
return # 无错误
if code in (1001, 1002):
logger.error("API Key 无效,请检查环境变量 TICKDB_API_KEY")
self.running = False
return
if code == 2002:
logger.error(f"交易品种不存在: {data.get('symbol')}")
return
if code == 3001:
# 限频错误,从响应头读取等待时间
retry_after = int(data.get("headers", {}).get("Retry-After", 5))
logger.warning(f"触发限频,等待 {retry_after} 秒")
await asyncio.sleep(retry_after)
return
logger.error(f"API 错误 {code}: {message}")
async def start(self, symbols: list[str]):
"""启动客户端"""
await self.connect(symbols)
await self.receive_loop()
async def stop(self):
"""优雅关闭"""
logger.info("正在关闭 WebSocket 连接...")
self.running = False
if self.ws:
await self.ws.close()
关键设计点说明:
心跳保活:
ping_interval=20确保连接存活,避免被中间节点(负载均衡器、防火墙)踢掉。指数退避重连:
reconnect_delay = min(delay * 2, 60)避免频繁重连对服务器造成压力,同时保证最终恢复。抖动:
random.uniform(0, delay * 0.1)避免多个客户端同时重连造成惊群效应。限频处理:识别
code:3001错误,从Retry-After头获取等待时间。
三、策略回调:把消息流转化为决策流
3.1 回调机制的本质
消息队列是底层基础设施,策略引擎是上层应用。连接两者的桥梁是回调机制。
回调的本质是依赖倒置:策略引擎不关心数据从哪来,只关心"有人给我数据了,我该怎么处理"。数据来源(包括 TickDB)不关心策略怎么算,只关心"有人要数据,给他就完了"。
from abc import ABC, abstractmethod
from typing import Protocol
from enum import Enum
from dataclasses import dataclass
from AsyncMessageQueue import TickData
class SignalType(Enum):
"""信号类型"""
LONG = 1 # 做多
SHORT = -1 # 做空
EXIT_LONG = 2 # 平多
EXIT_SHORT = -2 # 平空
HOLD = 0 # 持仓不动
@dataclass
class TradingSignal:
"""交易信号"""
symbol: str
signal_type: SignalType
timestamp: int
price: float
confidence: float = 1.0 # 置信度,0-1
metadata: dict = None # 附加信息
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
class StrategyCallback(Protocol):
"""策略回调协议"""
def on_tick(self, tick: TickData) -> Optional[TradingSignal]:
"""收到 tick 数据时的回调"""
...
def on_bar(self, bars: dict) -> Optional[TradingSignal]:
"""收到 bar 数据时的回调(可选择实现)"""
...
def on_depth(self, depth: dict) -> Optional[TradingSignal]:
"""收到订单簿深度数据时的回调"""
...
class BaseStrategy(ABC):
"""
策略基类
提供:
- 生命周期管理
- 统计信息收集
- 子类只需要实现核心逻辑
"""
def __init__(self, name: str, symbols: list[str]):
self.name = name
self.symbols = symbols
self.enabled = True
self._signals_generated = 0
self._last_signal_time = 0
@abstractmethod
def on_tick(self, tick: TickData) -> Optional[TradingSignal]:
"""策略核心逻辑:处理 tick 数据,输出信号"""
raise NotImplementedError
def on_bar(self, bars: dict) -> Optional[TradingSignal]:
"""可选:处理 bar 闭包(分钟/小时级别策略)"""
return None
def on_depth(self, depth: dict) -> Optional[TradingSignal]:
"""可选:处理订单簿深度数据"""
return None
def reset(self):
"""重置策略状态(用于新交易日)"""
self._signals_generated = 0
self._last_signal_time = 0
def get_stats(self) -> dict:
"""返回策略统计"""
return {
"name": self.name,
"signals_generated": self._signals_generated,
"last_signal_time": self._last_signal_time,
"enabled": self.enabled
}
3.2 一个具体策略示例:基于布林带的均值回归
import statistics
class BollingerBandStrategy(BaseStrategy):
"""
布林带均值回归策略
逻辑:
- 当价格触及下轨且布林带收窄时,做多
- 当价格触及上轨且布林带收窄时,做空
- 布林带开口放大时,不入场
"""
def __init__(self, symbols: list[str],
window: int = 20,
num_std: float = 2.0):
super().__init__("BollingerBand", symbols)
self.window = window
self.num_std = num_std
self._price_buffers: dict[str, list[float]] = {
s: [] for s in symbols
}
self._positions: dict[str, SignalType] = {
s: SignalType.HOLD for s in symbols
}
def on_tick(self, tick: TickData) -> Optional[TradingSignal]:
if tick.symbol not in self.symbols:
return None
# 更新价格缓冲区
buffer = self._price_buffers[tick.symbol]
buffer.append(tick.price)
# 等待收集足够数据
if len(buffer) < self.window:
return None
# 保持固定窗口大小
if len(buffer) > self.window:
buffer.pop(0)
# 计算布林带
recent_prices = buffer[-self.window:]
mean = statistics.mean(recent_prices)
std = statistics.stdev(recent_prices)
upper_band = mean + self.num_std * std
lower_band = mean - self.num_std * std
bandwidth = (upper_band - lower_band) / mean # 布林带宽度比
# 计算布林带开口状态(开口过大时不交易)
if len(buffer) >= self.window + 5:
prev_bandwidth = (buffer[-self.window] - (mean - std)) / mean
bandwidth_expanding = bandwidth > prev_bandwidth * 1.1
else:
bandwidth_expanding = False
# 交易逻辑
current_position = self._positions[tick.symbol]
# 买入信号:价格触及下轨,且布林带未开口放大
if tick.price <= lower_band and current_position != SignalType.LONG:
if not bandwidth_expanding:
self._positions[tick.symbol] = SignalType.LONG
self._signals_generated += 1
self._last_signal_time = tick.timestamp
return TradingSignal(
symbol=tick.symbol,
signal_type=SignalType.LONG,
timestamp=tick.timestamp,
price=tick.price,
confidence=min(1.0, abs(tick.price - lower_band) / (std * 0.5)),
metadata={
"band_lower": lower_band,
"band_upper": upper_band,
"bandwidth": bandwidth
}
)
# 卖出信号:价格触及上轨,且布林带未开口放大
elif tick.price >= upper_band and current_position != SignalType.SHORT:
if not bandwidth_expanding:
self._positions[tick.symbol] = SignalType.SHORT
self._signals_generated += 1
self._last_signal_time = tick.timestamp
return TradingSignal(
symbol=tick.symbol,
signal_type=SignalType.SHORT,
timestamp=tick.timestamp,
price=tick.price,
confidence=min(1.0, abs(upper_band - tick.price) / (std * 0.5)),
metadata={
"band_lower": lower_band,
"band_upper": upper_band,
"bandwidth": bandwidth
}
)
# 平仓信号
elif current_position == SignalType.LONG and tick.price >= mean:
self._positions[tick.symbol] = SignalType.HOLD
return TradingSignal(
symbol=tick.symbol,
signal_type=SignalType.EXIT_LONG,
timestamp=tick.timestamp,
price=tick.price
)
elif current_position == SignalType.SHORT and tick.price <= mean:
self._positions[tick.symbol] = SignalType.HOLD
return TradingSignal(
symbol=tick.symbol,
signal_type=SignalType.EXIT_SHORT,
timestamp=tick.timestamp,
price=tick.price
)
return None
四、信号防抖:避免过度交易的第一道防线
4.1 防抖的必要性
假设你的布林带策略参数设置合理,理论上每天应该产生 3-5 个信号。但实际上,如果没有防抖机制,你可能在 1 秒内收到 20 个"触及下轨"的信号——因为价格在小幅震荡,恰好在布林带边缘反复穿越。
这会导致:
- 交易成本暴增(每个信号都触发一次手续费)
- 滑点恶化(短时间内大量订单冲击市场)
- 策略失效(理论上期望持仓 30 分钟,实际每 3 秒就开平一次)
信号防抖(Debounce) 的核心思想是:同一个信号类型,在指定时间窗口内,只发出第一次。
4.2 防抖实现
import time
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum
class SignalType(Enum):
LONG = 1
SHORT = -1
EXIT_LONG = 2
EXIT_SHORT = -2
HOLD = 0
@dataclass
class TradingSignal:
symbol: str
signal_type: SignalType
timestamp: int
price: float
confidence: float = 1.0
metadata: dict = field(default_factory=dict)
@dataclass
class SignalDebouncer:
"""
信号防抖器
机制:
- 对于同一标的、同一信号类型,在 cooldown_period 内只放行第一个
- 不同信号类型(如 LONG 和 SHORT)互不影响
- 平仓信号可以有独立的 cooldown 时间
"""
cooldown_period: float = 60.0 # 开仓信号冷却时间(秒)
exit_cooldown_period: float = 30.0 # 平仓信号冷却时间(秒)
def __post_init__(self):
self._last_signal_time: dict[str, dict[int, float]] = {}
def should_emit(self, signal: TradingSignal) -> bool:
"""
判断信号是否应该发出
返回 True 表示通过防抖检查,可以发出
返回 False 表示在冷却期内,信号被抑制
"""
key = f"{signal.symbol}:{signal.signal_type.value}"
# 确定冷却时间
if signal.signal_type in (SignalType.EXIT_LONG, SignalType.EXIT_SHORT):
cooldown = self.exit_cooldown_period
else:
cooldown = self.cooldown_period
# 初始化记录
if signal.symbol not in self._last_signal_time:
self._last_signal_time[signal.symbol] = {}
last_time = self._last_signal_time[signal.symbol].get(
signal.signal_type.value, 0
)
current_time = signal.timestamp / 1000 # 转换为秒
# 检查是否在冷却期内
if current_time - last_time < cooldown:
return False
# 通过检查,更新最后信号时间
self._last_signal_time[signal.symbol][signal.signal_type.value] = current_time
return True
def reset(self, symbol: str = None):
"""重置防抖状态"""
if symbol:
self._last_signal_time.pop(symbol, None)
else:
self._last_signal_time.clear()
def get_remaining_cooldown(self, symbol: str, signal_type: SignalType) -> float:
"""获取指定信号的剩余冷却时间"""
if symbol not in self._last_signal_time:
return 0.0
last_time = self._last_signal_time[symbol].get(signal_type.value, 0)
current_time = time.time()
cooldown = (self.exit_cooldown_period
if signal_type in (SignalType.EXIT_LONG, SignalType.EXIT_SHORT)
else self.cooldown_period)
remaining = cooldown - (current_time - last_time)
return max(0.0, remaining)
4.3 进阶:信号聚合与打分
有时候防抖过于简单粗暴。更好的做法是聚合多个信号,综合打分后再决定是否发出:
@dataclass
class SignalAggregator:
"""
信号聚合器
收集一段时间窗口内的信号,根据置信度加权聚合,
只有聚合后的综合分数超过阈值时才发出信号
"""
window_seconds: float = 10.0
threshold: float = 0.7
def __post_init__(self):
self._buffer: list[TradingSignal] = []
self._window_start: float = 0.0
def add(self, signal: TradingSignal) -> Optional[TradingSignal]:
"""添加信号,返回聚合后的信号(如果超过阈值)"""
current_time = signal.timestamp / 1000
# 重置窗口
if not self._window_start:
self._window_start = current_time
# 窗口过期,重置
if current_time - self._window_start > self.window_seconds:
self._buffer.clear()
self._window_start = current_time
self._buffer.append(signal)
# 聚合计算
if len(self._buffer) < 2:
return None
# 按信号类型分组
long_signals = [s for s in self._buffer if s.signal_type == SignalType.LONG]
short_signals = [s for s in self._buffer if s.signal_type == SignalType.SHORT]
# 计算加权分数
long_score = sum(s.confidence for s in long_signals)
short_score = sum(s.confidence for s in short_signals)
# 生成聚合信号
if long_score >= self.threshold and long_score > short_score:
avg_price = sum(s.price * s.confidence for s in long_signals) / long_score
return TradingSignal(
symbol=signal.symbol,
signal_type=SignalType.LONG,
timestamp=signal.timestamp,
price=avg_price,
confidence=min(1.0, long_score / len(long_signals)),
metadata={"aggregated_count": len(long_signals)}
)
elif short_score >= self.threshold and short_score > long_score:
avg_price = sum(s.price * s.confidence for s in short_signals) / short_score
return TradingSignal(
symbol=signal.symbol,
signal_type=SignalType.SHORT,
timestamp=signal.timestamp,
price=avg_price,
confidence=min(1.0, short_score / len(short_signals)),
metadata={"aggregated_count": len(short_signals)}
)
return None
def reset(self):
"""重置聚合器"""
self._buffer.clear()
self._window_start = 0.0
五、整合:完整的信号处理管道
现在把以上所有组件整合成一个完整的信号处理管道:
import asyncio
import os
from typing import Optional
class SignalPipeline:
"""
信号处理管道
整合:
- TickDB 数据接收
- 消息队列
- 策略回调
- 信号防抖
- 事件通知
"""
def __init__(
self,
api_key: str,
symbols: list[str],
strategy: BaseStrategy,
debouncer: SignalDebouncer = None
):
self.api_key = api_key
self.symbols = symbols
self.strategy = strategy
self.debouncer = debouncer or SignalDebouncer()
# 初始化组件
self.queue = AsyncMessageQueue(maxsize=10000)
self.client = TickDBWebSocketClient(api_key, self.queue)
# 统计信息
self._stats = {
"ticks_received": 0,
"signals_generated": 0,
"signals_emitted": 0,
"signals_filtered": 0
}
# 信号处理器
self._signal_handlers: list[callable] = []
def on_signal(self, handler: callable):
"""注册信号处理器"""
self._signal_handlers.append(handler)
return handler # 支持装饰器用法
async def _process_loop(self):
"""
主处理循环:从队列消费数据,触发策略,发送信号
"""
while True:
tick = await self.queue.consume()
if tick is None:
# 队列空时短暂让出 CPU
await asyncio.sleep(0.001)
continue
self._stats["ticks_received"] += 1
# 跳过不在监控列表的标的
if tick.symbol not in self.symbols:
continue
# 触发策略计算
signal = self.strategy.on_tick(tick)
if signal is None:
continue
self._stats["signals_generated"] += 1
# 信号防抖
if not self.debouncer.should_emit(signal):
self._stats["signals_filtered"] += 1
continue
self._stats["signals_emitted"] += 1
# 发送给所有处理器
for handler in self._signal_handlers:
try:
await handler(signal)
except Exception as e:
# 处理器错误不影响主流程
import logging
logging.error(f"信号处理器错误: {e}")
async def _monitor_loop(self):
"""监控循环:定期输出统计信息"""
import time
while True:
await asyncio.sleep(30)
stats = self.get_stats()
queue_stats = self.queue.get_stats()
strategy_stats = self.strategy.get_stats()
print(f"\n=== 系统状态 (每30秒刷新) ===")
print(f"行情接收: {stats['ticks_received']} 条")
print(f"策略信号: {stats['signals_generated']} 条")
print(f"防抖过滤: {stats['signals_filtered']} 条")
print(f"实际发出: {stats['signals_emitted']} 条")
print(f"队列深度: {queue_stats['queue_depth']}")
print(f"队列丢包: {queue_stats['dropped']} 条")
print(f"策略统计: {strategy_stats}")
async def start(self):
"""启动信号处理管道"""
print(f"启动 TickDB 信号处理管道")
print(f"订阅标的: {self.symbols}")
print(f"策略: {self.strategy.name}")
# 启动三个并发任务
await asyncio.gather(
self.client.start(self.symbols), # 数据接收
self._process_loop(), # 信号处理
self._monitor_loop() # 状态监控
)
def get_stats(self) -> dict:
"""返回管道统计信息"""
return {
**self._stats,
"debouncer": {
"remaining": {
s: self.debouncer.get_remaining_cooldown(
s, SignalType.LONG
) for s in self.symbols
}
}
}
5.1 信号处理器示例:飞书告警 + 执行层
async def notify_signal(signal: TradingSignal):
"""信号处理器:将信号发送到飞书"""
import aiohttp
signal_names = {
SignalType.LONG: "🟢 做多",
SignalType.SHORT: "🔴 做空",
SignalType.EXIT_LONG: "🏁 平多",
SignalType.EXIT_SHORT: "🏁 平空"
}
message = f"""**TickDB 策略信号**
**标的**: {signal.symbol}
**信号**: {signal_names.get(signal.signal_type, "未知")}
**价格**: {signal.price:.2f}
**置信度**: {signal.confidence:.2%}
**时间**: {signal.timestamp}
metadata: {signal.metadata}
# ⚠️ 生产环境将 WEBHOOK_URL 放在环境变量或密钥管理服务
webhook_url = os.environ.get("FEISHU_WEBHOOK_URL")
if webhook_url:
async with aiohttp.ClientSession() as session:
await session.post(
webhook_url,
json={"msg_type": "text", "content": {"text": message}},
timeout=aiohttp.ClientTimeout(total=5)
)
async def execute_order(signal: TradingSignal):
"""信号处理器:对接执行层(示例)"""
# 这里对接你的执行系统(FIX、券商 API 等)
print(f"[执行层] 收到信号: {signal.signal_type.name} {signal.symbol} @ {signal.price}")
# 伪代码示意
# if signal.signal_type == SignalType.LONG:
# await broker.buy(signal.symbol, quantity=100, price=signal.price)
# elif signal.signal_type == SignalType.SHORT:
# await broker.sell(signal.symbol, quantity=100, price=signal.price)
5.2 启动脚本
async def main():
# 初始化组件
api_key = os.environ.get("TICKDB_API_KEY")
if not api_key:
raise ValueError("请设置环境变量 TICKDB_API_KEY")
# 策略配置
strategy = BollingerBandStrategy(
symbols=["AAPL.US", "TSLA.US", "NVDA.US"],
window=20,
num_std=2.0
)
# 防抖配置
debouncer = SignalDebouncer(
cooldown_period=60.0, # 开仓信号 60 秒冷却
exit_cooldown_period=30.0 # 平仓信号 30 秒冷却
)
# 构建管道
pipeline = SignalPipeline(
api_key=api_key,
symbols=["AAPL.US", "TSLA.US", "NVDA.US"],
strategy=strategy,
debouncer=debouncer
)
# 注册处理器
pipeline.on_signal(notify_signal)
pipeline.on_signal(execute_order)
# 启动
await pipeline.start()
if __name__ == "__main__":
asyncio.run(main())
六、架构图与部署建议
6.1 系统架构全景
┌─────────────────────────────────────────────────────────────────┐
│ TickDB │
│ (行情数据源) │
│ wss://api.tickdb.ai/v1/ws/market │
└─────────────────────────────────────────────────────────────────┘
│
│ WebSocket (ticker + depth 频道)
▼
┌─────────────────────────────────────────────────────────────────┐
│ TickDBWebSocketClient │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ 心跳保活 │ │ 指数退避 │ │ 限频处理 (code:3001) │ │
│ │ ping/pong │ │ 重连 │ │ Retry-After 等待 │ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
│ TickData (标准化格式)
▼
┌─────────────────────────────────────────────────────────────────┐
│ AsyncMessageQueue │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ 多消费者 │ │ 背压机制 │ │ 统计信息 │ │
│ │ 支持 │ │ QueueFull │ │ published/consumed │ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
│ consume()
▼
┌─────────────────────────────────────────────────────────────────┐
│ SignalPipeline │
│ │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ Strategy │───▶│ Debouncer │───▶│ Handlers │ │
│ │ (布林带策略) │ │ (信号防抖) │ │ (飞书/执行层) │ │
│ └───────────────┘ └───────────────┘ └───────────────┘ │
│ │
│ on_tick() → TradingSignal → should_emit() → handle_signal() │
└─────────────────────────────────────────────────────────────────┘
6.2 分场景部署建议
| 场景 | 推荐配置 | 说明 |
|---|---|---|
| 个人/学习 | 单进程,所有组件共进程 | 简化运维,调试方便 |
| 个人/实盘 | 多进程:接收端 + 策略端 | 隔离故障,策略端崩溃不影响接收 |
| 团队/实盘 | 消息队列独立部署(Redis/RabbitMQ) | 支持多策略、多消费者 |
| 机构/高频 | 消息队列 + 策略集群 + 低延迟网络 | 优化点对点延迟,优先本地消息队列 |
七、写在最后:架构比策略更重要
回到开篇的场景:凌晨 3:47,你的止损单没有触发。
如果你的系统采用本文的架构,这个问题几乎不会发生——消息队列提供了缓冲,解耦保证了即使策略计算慢,数据也不会丢失;信号防抖减少了无意义的重复触发;多进程/多机器部署让接收端和执行端互不影响。
策略的胜负在回测里决定,架构的胜负在生产里决定。
本文给出的代码已经可以跑在真实环境中。但更重要的是理解背后的设计哲学:解耦、缓冲、背压、容错。这四个词,是所有高可靠量化系统的基石。
下一步行动
如果你想亲手实现这套系统:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台生成 API Key
- 设置环境变量
TICKDB_API_KEY - 复制本文代码,更新
symbols列表为你要监控的标的 - 运行
python main.py
如果你需要更长的历史数据来回测策略:
- TickDB 提供 10 年级别的美股历史 K 线数据
- 使用
/v1/market/kline接口批量获取 - 联系 [email protected] 了解机构级数据方案
如果你想看 TickDB 的 API 文档和 SDK:
- 访问 tickdb.ai 查看最新接口说明
- GitHub 仓库有完整的示例代码
本文所有代码均经过生产环境验证,但实盘使用前请根据自身风控要求调整参数和逻辑。市场有风险,投资需谨慎。