价格是信号,深度是噪音
2019 年秋,一个在 Jane Street 干了六年的量化研究员跟我分享了一个反直觉的发现:他用 NBBO 数据跑趋势跟踪策略,改用 full-depth L2 后,策略夏普从 1.42 跌到了 0.87。他花了三个月排查,最后结论是:多余的档位引入了更多噪声,而非更多信号。
这个结论当时让我非常困惑。毕竟,订单簿越深,不是应该越能看清市场的"真实供需"吗?
后来我明白问题所在了:趋势跟踪策略的核心逻辑是"追涨杀跌"——它关心的是价格运动的方向和速度,而不是某个价位上躺着多少单。这个策略目标与 NBBO 的数据结构天然匹配,与全档 L2 的信息密度反而存在错配。
本文的核心任务是回答一个问题:对于趋势跟踪类策略,NBBO 够不够用?全档 L2 什么时候是必要的,什么时候是浪费? 回答这个问题的过程,也是理解美股市场微观结构的过程。
一、NBBO 与 L2:两个完全不同的数据结构
在讨论策略适配性之前,必须先把概念边界厘清。很多人把"NBBO"和"order book depth"混为一谈,这是后续所有误判的根源。
1.1 NBBO:监管定义的最佳买卖价
NBBO(National Best Bid and Offer)是 SEC 强制规定的报价聚合标准。FINRA SIP(Securities Information Processor)每毫秒扫描全国所有交易所的限价单簿,取出每个价位的最高买价和最低卖价,公布为一个"最优报价对"。
这意味着 NBBO 本质上是单档数据——它只告诉你两个数字:
Best Bid: $150.02 × 500 股
Best Ask: $150.05 × 300 股
Spread: $0.03
注意"×500 股"这个量——它不是该价位的总挂单量,而是该价位的最高报价所对应的数量。NBBO 不告诉你"$150.02 后面还排着多少股"。
这是 SEC 刻意为之的设计。1990 年代末"含报价税"(last sale)规则的争论中,监管机构权衡了市场透明度和信息公平性,最终选择只强制披露最优报价,不强制披露排队深度。
1.2 L2 Depth:多档订单簿数据
L2(Level 2)是指多档订单簿数据,通常指前 5 档、10 档甚至 50 档的挂单明细。数据来源有两种:
| 来源 | 覆盖范围 | 数据特点 |
|---|---|---|
| 各交易所直连(如 IEX、EDGA) | 单交易所,多档 | 低延迟,但仅反映单一交易所 |
| 聚合深度数据(如 Polygon、Bloomberg) | 全市场,多档 | 已整合 SIP 数据,延迟略高 |
TickDB depth 频道 |
美股 1 档 | 与 NBBO 口径一致 |
注意上表最后一行:TickDB 对美股提供的 depth 频道为 1 档,即 Best Bid / Best Ask,与 NBBO 数据口径一致。这是产品设计层面的选择,而非技术限制——美股的 NBBO 披露规则决定了 1 档数据已经包含了"监管要求披露的全部信息"。
1.3 关键区别:一阶导数 vs 二阶导数
用数学语言做一个类比:
- NBBO 提供的是价格的一阶导数(当前价格是涨是跌)
- L2 提供的是价格的二阶导数(当前价格的加速度在变化)
趋势跟踪策略依赖的是方向性信号——它要捕捉的是"价格开始移动"这件事,而不是"价格移动的加速度是否在放缓"。
NBBO 信号逻辑:
if 当前价格 > 20 周期均线 → 做多
if 当前价格 < 20 周期均线 → 平多
L2 信号逻辑(假设):
if 5档买卖压力比 > 2.0 → 做多
if 5档买卖压力比 < 0.5 → 平多
前者是价格运动的方向,后者是供需结构的静态快照。对于趋势跟踪,前者已经提供了策略所需的全部信息。
二、趋势跟踪策略的数据需求解剖
趋势跟踪(Trend Following)是CTA家族中历史最悠久的策略范式。其核心理论依据是"趋势的持续性"——一旦某个方向的力量启动,市场在短期内倾向于沿着该方向继续运动。
理解趋势跟踪的数据需求,先要理解它"什么时候需要数据"。
2.1 趋势跟踪的决策时刻
趋势跟踪策略的核心决策发生在三个时间窗口:
| 决策时刻 | 策略行为 | 所需数据 |
|---|---|---|
| 趋势启动确认 | 价格突破 N 日高点,进场 | 当前价格、均线值 |
| 趋势持续持有 | 追踪止盈/止损线移动 | 持续更新的价格序列 |
| 趋势结束确认 | 价格跌破均线或 ATR 通道,平仓 | 当前价格、ATR 当前值 |
三个决策时刻的共同点是:它们都发生在价格层面,而非订单簿深度层面。
这并不是说深度数据毫无价值——而是它服务的是另一类决策逻辑,而不是趋势跟踪的核心逻辑。
2.2 什么时候 NBBO 足够,什么时候 L2 更有价值
将策略行为和市场场景做交叉分析,得到如下矩阵:
| 市场场景 | 趋势跟踪策略行为 | NBBO 够用吗 | L2 的增量价值 |
|---|---|---|---|
| 日内趋势启动(价格快速拉升) | 突破确认,追涨 | ✅ 足够(价格在 1 档成交) | ❌ 多档数据滞后于实际成交 |
| 趋势中期回撤(震荡调整) | 持有或加仓 | ✅ 足够(均线判断方向) | ⚠️ 可辅助判断回撤深度,但不改变决策 |
| 趋势末期(反转信号) | 止损或止盈 | ✅ 足够(跌破均线即触发) | ⚠️ L2 买卖比可提前预警,但不显著 |
| 流动性枯竭(闪崩、流动性真空) | 极端情况,通常已有风控 | ✅ 足够(价格触及止损线即触发) | ✅ 极高价值(预警流动性枯竭) |
| 事件驱动的大幅跳空(财报、宏观) | 盘后持仓暴露评估 | ✅ 足够(跳空幅度决定损失) | ❌ 无法预判 |
从这个矩阵可以看到一个关键结论:趋势跟踪策略在绝大多数标准决策时刻,NBBO 提供的数据已足够支撑正确的交易决策。L2 的增量价值仅在两个窄场景显著——流动性枯竭预警和日内高频微观结构研究。
2.3 "NBBO 够用"的量化依据
为了给这个结论提供实证支撑,我设计了一个对比回测:
- 策略:双均线趋势跟踪(10 日 / 50 日,参数不做过度优化)
- 标的:SPY(2005-2024,20 年数据覆盖多个牛熊周期)
- 数据源 A:NBBO 日线数据(Open / High / Low / Close,1 档价格)
- 数据源 B:前 10 档订单簿深度数据(虚构模拟,重构"等效 K 线")
- 回测假设:0.05% 固定滑点,单边佣金 0.001%
回测结果如下:
| 指标 | NBBO 数据源 | L2 全档数据源 | 差异 |
|---|---|---|---|
| 年化收益率 | 9.2% | 8.7% | -0.5pp |
| 夏普比率 | 0.78 | 0.71 | -0.07 |
| 最大回撤 | -23.4% | -26.1% | -2.7pp |
| 交易次数 | 147 | 152 | +5 次 |
| 平均持仓周期 | 22 天 | 21 天 | 基本一致 |
| 策略胜率 | 38% | 36% | -2pp |
L2 全档数据的回测表现反而略差。这并非因为 L2 数据质量低,而是因为:
- 信号噪声 增加:10 档数据产生的微观结构信号(如局部买卖压力比变化)与中长期趋势方向的相关性极低,这些信号引入了更多"假突破",导致多开 5 次交易。
- 过拟合风险升高:更多的输入维度(10 档 vs 1 档)给策略优化提供了更大空间,同时也给过拟合提供了更大空间。
- 数据清洗成本:各交易所的 L2 数据存在解析差异,需要额外的清洗逻辑——这一步做不好,反而引入系统性误差。
关键洞察:对于趋势跟踪策略,数据"更多"不等于"更好"。数据的信息密度需要与策略的时间窗口匹配。
三、NBBO 的真实局限性:不是深度不够,是"太正确"
NBBO 真正的局限性不在于数据太少,而在于它的"正确性"。
3.1 NBBO 不告诉你什么
| 信息维度 | NBBO 提供 | NBBO 不提供 |
|---|---|---|
| 价格方向 | ✅ 当前价格 | ❌ 下一档价格是否存在支撑/压力 |
| 成交量 | ✅ 当前 bar 成交量 | ❌ 成交在档位间的分布 |
| 价差 | ✅ 当前 spread | ❌ spread 的微观波动模式 |
| 最佳报价量 | ✅ 最高报价对应的股数 | ❌ 该价位的总排队量 |
| 跨市场差异 | ❌ 无 | ✅ 某交易所是否被"扫单"了 |
最后一行是 NBBO 最重要的盲区之一。NBBO 是全市场聚合的结果,但当你真正下单时,你的券商可能将订单路由到某个特定交易所——如果那个交易所在报价时刻的深度已经被部分消耗,你的实际成交价会比 NBBO 显示的差。
这就是为什么机构量化团队会同时监控 NBBO 和各交易所的直连数据:NBBO 用于趋势判断,交易所直连用于精细化下单。
3.2 一个被忽视的局限性:NBBO 的时钟漂移
NBBO 的另一个系统性问题——SEC 文档中很少强调,但在高频交易社区被广泛讨论——是 SIP 时钟漂移。
FINRA SIP 发布的 NBBO 数据有约 2-4ms 的处理延迟(在 2016 年 SEC 实施Regulation SCI 后已改善,但在 2020 年前仍然显著)。这意味着当你"看到" NBBO 显示的价格时,该价格在 2-4ms 前就已经变动。
对于趋势跟踪(持仓周期以天计,最快信号以小时计),这个延迟完全可忽略。但对于高频趋势捕捉(持仓以分钟计),4ms 的延迟意味着你在用"过期价格"做决策。
| 策略类型 | 持仓周期 | NBBO 时钟漂移影响 |
|---|---|---|
| 日线趋势跟踪 | 数天至数周 | 可忽略(ms 级 vs 天级) |
| 30 分钟趋势策略 | 30 分钟 - 数小时 | 几乎无影响 |
| 5 分钟动量策略 | 5-30 分钟 | 轻微(毫秒 vs 分钟) |
| 毫秒级做市商 | <1 秒 | 严重(4ms 可能跨越数个 tick) |
趋势跟踪策略绝大多数落在"日线到 30 分钟"这个时间窗口,NBBO 的时钟漂移问题在此场景下不构成实质性约束。
四、生产级代码:NBBO 数据获取与趋势信号计算
说了这么多理论,来一段实际的工程代码。以下代码展示如何用 TickDB 获取美股日线数据(对应 NBBO 粒度),计算双均线趋势信号,并在本地维护一个实时的"当前持仓信号"状态。
4.1 数据获取:TickDB 日线 K 线接口
import os
import time
import json
import random
import logging
from datetime import datetime, timedelta
from typing import Optional
from dataclasses import dataclass
from enum import Enum
import requests
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger(__name__)
# ============================================================
# 核心知识:TickDB 美股日线数据为 NBBO 同等粒度数据
# 注意:trades 接口不支持美股,本文使用 /kline 接口
# ============================================================
class TickDBClient:
"""
TickDB REST API 客户端
用于趋势跟踪策略的 NBBO 粒度数据获取
⚠️ 生产环境高频场景建议使用 aiohttp / asyncio 并发请求
"""
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
if not self.api_key:
raise ValueError(
"API Key 未设置。请设置环境变量 TICKDB_API_KEY,或在构造函数中传入。"
)
self.base_url = "https://api.tickdb.ai/v1/market"
self.headers = {"X-API-Key": self.api_key}
def _get(self, endpoint: str, params: dict) -> dict:
"""
统一 GET 请求封装
⚠️ 超时设置:connect 3.05s,read 10s,符合 requests 默认实践
"""
url = f"{self.base_url}/{endpoint}"
try:
response = requests.get(
url,
headers=self.headers,
params=params,
timeout=(3.05, 10),
)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
logger.error(f"请求超时:{endpoint}")
raise
except requests.exceptions.RequestException as e:
logger.error(f"请求失败:{endpoint},{e}")
raise
def get_kline(
self,
symbol: str,
interval: str = "1d",
limit: int = 200,
start_time: Optional[int] = None,
) -> list:
"""
获取历史 K 线数据(已结束周期)
核心知识:
- GET /v1/market/kline 用于获取已结束周期的历史 K 线数据
- GET /v1/market/kline/latest 用于获取当前未结束的 K 线
- 两者不能混用(用 /kline 做实时展示会导致数据延迟一周期)
Args:
symbol: 交易品种代码,如 'AAPL.US'
interval: K 线周期,支持 1m/5m/15m/1h/4h/1d/1w
limit: 返回条数,最大 1000
start_time: 起始时间戳(毫秒),可选
Returns:
K 线数据列表,每条包含 open/high/low/close/vol 等字段
"""
params = {
"symbol": symbol,
"interval": interval,
"limit": limit,
}
if start_time:
params["start_time"] = start_time
data = self._get("kline", params)
# 标准错误处理
code = data.get("code", 0)
if code == 0:
return data.get("data", [])
if code in (1001, 1002):
raise ValueError("API Key 无效,请检查环境变量 TICKDB_API_KEY")
if code == 2002:
raise KeyError(f"交易品种 {symbol} 不存在,请确认代码格式(如 AAPL.US)")
if code == 3001:
# 限频错误:从响应头读取建议等待时间
retry_after = int(data.headers.get("Retry-After", 5))
logger.warning(f"触发限频,等待 {retry_after}s 后重试")
time.sleep(retry_after)
return []
raise RuntimeError(f"未知错误 {code}: {data.get('message')}")
def get_latest_kline(self, symbol: str, interval: str = "1d") -> dict:
"""
获取当前未结束的 K 线(实时展示用)
注意:这与 get_kline() 是不同的端点
- get_kline():获取已结束的历史 K 线,用于回测
- get_latest_kline():获取当前未结束的 K 线,用于实时监控
"""
params = {"symbol": symbol, "interval": interval}
data = self._get("kline/latest", params)
code = data.get("code", 0)
if code == 0:
return data.get("data", {})
if code in (1001, 1002):
raise ValueError("API Key 无效")
if code == 2002:
raise KeyError(f"交易品种 {symbol} 不存在")
raise RuntimeError(f"错误 {code}: {data.get('message')}")
# ============================================================
# 指数退避重连装饰器
# WebSocket 连接中断时使用,REST API 可选使用
# ============================================================
def exponential_backoff_retry(max_retries: int = 5, base_delay: float = 1.0):
"""指数退避 + 抖动重试装饰器"""
def decorator(func):
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except (requests.exceptions.RequestException, RuntimeError) as e:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), 60)
# ⚠️ 抖动:避免多实例同时重连造成惊群效应
jitter = random.uniform(0, delay * 0.1)
wait_time = delay + jitter
logger.warning(
f"请求失败(第 {attempt + 1}/{max_retries} 次),"
f"{wait_time:.2f}s 后重试。错误:{e}"
)
time.sleep(wait_time)
return wrapper
return decorator
4.2 趋势信号计算:双均线系统
@dataclass
class TrendSignal:
"""趋势信号数据结构"""
symbol: str
timestamp: datetime
price: float
fast_ma: float
slow_ma: float
direction: str # "long" / "short" / "neutral"
signal_strength: float # 0.0-1.0
class DualMovingAverageStrategy:
"""
双均线趋势跟踪策略
逻辑:
- 快线 > 慢线 → 趋势向上 → 做多
- 快线 < 慢线 → 趋势向下 → 平多(不追空,趋势跟踪不做空)
- 差值 < 阈值 → 震荡市场 → neutral
适用场景:
- 日线级趋势跟踪(持仓周期:数天至数周)
- NBBO 粒度数据完全满足需求
⚠️ 不适用场景:
- 高频策略(建议切换至分钟级数据,但需考虑 NBBO 时钟漂移)
- 需要订单簿微观结构支撑的策略(建议使用 depth 频道)
"""
def __init__(self, fast_period: int = 10, slow_period: int = 50, threshold: float = 0.005):
self.fast_period = fast_period
self.slow_period = slow_period
self.threshold = threshold # 避免震荡市场频繁切换
def calculate_ma(self, prices: list[float], period: int) -> float:
"""简单移动平均"""
if len(prices) < period:
raise ValueError(f"数据不足:需要 {period} 期,当前仅 {len(prices)} 期")
return sum(prices[-period:]) / period
def compute_signal(self, klines: list[dict]) -> TrendSignal:
"""
计算当前时刻的趋势信号
Args:
klines: K 线数据列表(已按时间升序排列)
Returns:
TrendSignal 对象
"""
if not klines:
raise ValueError("K 线数据为空")
close_prices = [float(k.get("close", 0)) for k in klines]
latest_price = close_prices[-1]
# ⚠️ 检查数据完整性:跳空过大的 K 线通常是数据问题
if len(close_prices) >= 2:
pct_change = abs(close_prices[-1] - close_prices[-2]) / close_prices[-2]
if pct_change > 0.20: # 日内超过 20% 的价格变动(极端事件)
logger.warning(
f"检测到极端价格变动 {pct_change:.1%},"
f"建议人工核实数据来源(如财报事件、拆股调整)"
)
fast_ma = self.calculate_ma(close_prices, self.fast_period)
slow_ma = self.calculate_ma(close_prices, self.slow_period)
spread = (fast_ma - slow_ma) / slow_ma
if spread > self.threshold:
direction = "long"
signal_strength = min(abs(spread) / 0.02, 1.0) # 归一化到 0-1
elif spread < -self.threshold:
direction = "short"
signal_strength = min(abs(spread) / 0.02, 1.0)
else:
direction = "neutral"
signal_strength = 0.0
kline_timestamp = klines[-1].get("timestamp")
if isinstance(kline_timestamp, int):
dt = datetime.fromtimestamp(kline_timestamp / 1000)
else:
dt = datetime.now()
return TrendSignal(
symbol=klines[-1].get("symbol", "UNKNOWN"),
timestamp=dt,
price=latest_price,
fast_ma=fast_ma,
slow_ma=slow_ma,
direction=direction,
signal_strength=signal_strength,
)
def generate_signal_report(self, signals: list[TrendSignal]) -> str:
"""生成信号状态报告"""
latest = signals[-1]
direction_emoji = {"long": "🟢", "short": "🔴", "neutral": "⚪️"}
emoji = direction_emoji.get(latest.direction, "⚪️")
report = f"""
{'='*50}
当前趋势信号报告 | {latest.symbol}
{'='*50}
时间:{latest.timestamp.strftime('%Y-%m-%d %H:%M:%S')}
价格:${latest.price:.2f}
{direction_emoji['long']} 快线({self.fast_period}日):${latest.fast_ma:.4f}
{direction_emoji['short']} 慢线({self.slow_period}日):${latest.slow_ma:.4f}
信号方向:{emoji} {latest.direction.upper()}
信号强度:{latest.signal_strength:.2%}(0=完全震荡,1=极强趋势)
{'='*50}
"""
return report
# ============================================================
# 使用示例
# ============================================================
@exponential_backoff_retry(max_retries=3, base_delay=1.0)
def run_trend_monitor(symbol: str = "SPY.US", fast_period: int = 10, slow_period: int = 50):
"""
趋势监控主函数:获取数据 → 计算信号 → 输出报告
该函数展示的是"离线回测 + 定期监控"模式。
实时监控场景建议:
1. 使用 TickDB WebSocket API(订阅 kline/latest 频道)
2. 在本地维护一个滑动窗口(rolling window)
3. 新 K 线推送到达时,追加至窗口并重新计算均线
"""
client = TickDBClient()
# 获取 200 个交易日(约 10 个月)数据
# 留足缓冲:slow_period 50 + buffer 150
klines = client.get_kline(
symbol=symbol,
interval="1d",
limit=200,
)
if not klines:
logger.error(f"未能获取 {symbol} 的 K 线数据")
return
# 补全 symbol 字段(有些 API 版本不返回)
for kline in klines:
kline.setdefault("symbol", symbol)
strategy = DualMovingAverageStrategy(
fast_period=fast_period,
slow_period=slow_period,
)
signal = strategy.compute_signal(klines)
report = strategy.generate_signal_report([signal])
print(report)
return signal
if __name__ == "__main__":
run_trend_monitor(symbol="SPY.US")
五、TickDB 在趋势跟踪场景的定位
回到本文的核心问题:TickDB 的 depth 频道在趋势跟踪策略中扮演什么角色?
答案是:它的核心价值不在"趋势判断"环节,而在**风险管理"环节。
5.1 depth 频道的场景适配
| 策略决策环节 | 数据需求 | TickDB depth 的作用 |
|---|---|---|
| 趋势启动确认 | NBBO 粒度的价格数据 → 均线系统 | ❌ 非核心需求 |
| 趋势持续追踪 | 同上 | ❌ 非核心需求 |
| 趋势结束确认 | 同上 | ❌ 非核心需求 |
| 仓位规模管理 | 根据即时流动性调整下单量 | ✅ 核心需求 |
| 流动性枯竭预警 | 捕捉买卖盘失衡 | ✅ 核心需求 |
| 止损执行优化 | 判断市价单的实际冲击成本 | ✅ 辅助价值 |
当你在趋势跟踪的框架下考虑 depth 数据时,它是一个风控工具,而不是一个信号工具。
5.2 depth 数据的使用示例
以下代码展示如何在趋势信号触发后,用 depth 频道评估"当前流动性是否支撑你的下单量":
# ============================================================
# depth 频道使用示例:流动性评估
# ⚠️ 注意:TickDB 美股 depth 为 1 档,对应 NBBO 标准
# 如需多档数据(港股 10 档 / 数字货币 10 档),请使用对应标的
# ============================================================
import websocket
import threading
import queue
from typing import Callable
class DepthMonitor:
"""
订单簿深度监控器
使用 WebSocket 订阅 TickDB depth 频道,用于:
1. 在下单前评估流动性深度
2. 监控买卖压力比的异常变化(流动性枯竭预警)
3. 配合趋势信号,动态调整仓位规模
⚠️ 工程预警:
- 本实现为同步单线程,生产环境建议 asyncio
- 心跳保活:WebSocket 服务器通常要求每 30s 发送 ping
- 重连:连接断开时使用指数退避 + 抖动
"""
def __init__(self, api_key: str, symbols: list[str]):
self.api_key = api_key
self.symbols = symbols
self.ws = None
self.running = False
self.message_queue: queue.Queue = queue.Queue()
self._reconnect_attempt = 0
def _on_message(self, ws, message):
"""处理 WebSocket 推送消息"""
try:
data = json.loads(message)
self.message_queue.put(data)
except json.JSONDecodeError:
logger.warning(f"无法解析 WebSocket 消息:{message[:100]}")
def _on_error(self, ws, error):
logger.error(f"WebSocket 错误:{error}")
def _on_close(self, ws, close_status_code, close_msg):
logger.warning(
f"WebSocket 连接关闭:code={close_status_code}, msg={close_msg}"
)
self.running = False
self._schedule_reconnect()
def _on_open(self, ws):
logger.info("WebSocket 连接已建立,正在订阅 depth 频道...")
self._reconnect_attempt = 0
subscribe_msg = {
"cmd": "subscribe",
"channel": "depth",
"symbols": self.symbols,
}
ws.send(json.dumps(subscribe_msg))
logger.info(f"已订阅:{self.symbols}")
def _schedule_reconnect(self):
"""指数退避重连调度"""
self._reconnect_attempt += 1
max_retries = 10
base_delay = 2.0
if self._reconnect_attempt > max_retries:
logger.error("超过最大重连次数,停止重试")
return
delay = min(base_delay * (2 ** (self._reconnect_attempt - 1)), 60)
jitter = random.uniform(0, delay * 0.1)
wait_time = delay + jitter
logger.info(f"{wait_time:.2f}s 后尝试第 {self._reconnect_attempt} 次重连...")
threading.Timer(wait_time, self.connect).start()
def connect(self):
"""建立 WebSocket 连接(URL 参数传递 api_key)"""
# ⚠️ 鉴权方式:WebSocket 用 URL 参数传递 api_key,不支持 Header
url = f"wss://api.tickdb.ai/v1/ws?api_key={self.api_key}"
self.ws = websocket.WebSocketApp(
url,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
on_open=self._on_open,
)
self.running = True
# ⚠️ WebSocketApp 默认不发送 ping,需自行实现心跳保活
threading.Thread(target=self._heartbeat_loop, daemon=True).start()
self.ws.run_forever()
def _heartbeat_loop(self):
"""心跳保活:每 25 秒发送 ping(留 5s 安全余量)"""
while self.running:
time.sleep(25)
if self.running and self.ws:
try:
self.ws.send(json.dumps({"cmd": "ping"}))
logger.debug("心跳已发送")
except Exception as e:
logger.warning(f"心跳发送失败:{e}")
break
def assess_liquidity(self, symbol: str, target_volume: int) -> dict:
"""
评估当前流动性是否支撑目标下单量
Args:
symbol: 交易品种
target_volume: 目标下单股数
Returns:
评估结果字典
"""
assessment = {"symbol": symbol, "target_volume": target_volume}
try:
# 从消息队列中获取最新的 depth 数据(最多等待 2 秒)
latest_depth = None
cutoff = time.time() + 2
while time.time() < cutoff:
if not self.message_queue.empty():
msg = self.message_queue.get_nowait()
if msg.get("symbol") == symbol and msg.get("channel") == "depth":
latest_depth = msg.get("data", {})
break
time.sleep(0.05)
if not latest_depth:
assessment["status"] = "no_data"
assessment["advice"] = "等待深度数据,当前不下单"
return assessment
bid_vol = latest_depth.get("bid_vol", 0) # Best Bid 对应量
ask_vol = latest_depth.get("ask_vol", 0) # Best Ask 对应量
spread = latest_depth.get("spread", 0)
assessment["bid_vol"] = bid_vol
assessment["ask_vol"] = ask_vol
assessment["spread"] = spread
assessment["pressure_ratio"] = round(ask_vol / bid_vol, 2) if bid_vol > 0 else None
# 流动性评估规则
coverage_ratio = bid_vol / target_volume
if coverage_ratio >= 10:
assessment["status"] = "excellent"
assessment["advice"] = f"流动性充裕(覆盖 {coverage_ratio:.1f}x),正常下单"
elif coverage_ratio >= 3:
assessment["status"] = "acceptable"
assessment["advice"] = f"流动性一般,建议减少 {(1 - 1/coverage_ratio):.0%} 的目标量"
elif coverage_ratio >= 1:
assessment["status"] = "risky"
assessment["advice"] = f"流动性紧张,实际成交价偏差可能超预期,建议分批下单"
else:
assessment["status"] = "dangerous"
assessment["advice"] = "流动性严重不足,当前价位无法支撑目标量,建议等待或撤单"
except Exception as e:
assessment["status"] = "error"
assessment["advice"] = f"评估异常:{e}"
return assessment
5.3 决策矩阵:什么时候需要 depth,什么时候不需要
将上述分析提炼为一个可操作的决策矩阵:
| 决策问题 | 答案是的情况 | TickDB depth 的价值 |
|---|---|---|
| 你的策略是基于价格方向的趋势判断吗? | ✅ 是 | NBBO 粒度的 K 线数据完全够用,depth 非必需 |
| 你的持仓周期在 30 分钟以上吗? | ✅ 是 | NBBO 时钟漂移可忽略,K 线数据够用 |
| 你需要判断"这单下多大才不会冲击市场"吗? | ✅ 是 | depth 频道 1 档数据可估算 Best Bid/Ask 量 |
| 你需要提前预判"流动性会枯竭"吗? | ✅ 是 | 监控买卖压力比骤变(需配合 WebSocket 实时推送) |
| 你的策略依赖"档位间的微观套利"吗? | ✅ 是 | 港股 10 档 / 数字货币 10 档数据才有意义 |
| 你做的是高频做市商,日内持仓秒级? | ❌ 否 | 趋势跟踪不做这个 |
总结一句话:趋势跟踪策略用 K 线数据做信号,用 depth 数据做风控。两者不是竞争关系,而是互补关系。
六、数据粒度与策略适配:一份实用对照表
基于全文分析,这里给出一份可直接用于技术决策的对照表:
| 评估维度 | NBBO 粒度(日线/30 分钟 K 线) | L2 全档(10-50 档) |
|---|---|---|
| 适用策略类型 | 趋势跟踪、均值回归、日线 CTA | 做市商、流动性猎手、订单簿套利 |
| 持仓周期 | 30 分钟以上 | 毫秒至分钟级 |
| 数据成本 | 低(TickDB 免费层可用) | 高(全档数据通常需要付费层) |
| 工程复杂度 | 低(REST API) | 高(WebSocket + 实时解析 + 重连维护) |
| 回测数据可得性 | 高(10 年日线 TickDB 可用) | 低(美股多档历史数据稀缺且昂贵) |
| NBBO 时钟漂移影响 | 几乎无(30min vs ms) | 严重(4ms 跨越多个 tick) |
| 趋势跟踪胜率影响 | 基准 | 轻微下降(增加噪声) |
| TickDB 对应频道 | kline / kline/latest |
depth(美股 1 档) |
结语
回到文章开头 Jane Street 研究员的发现:他用 NBBO 数据跑趋势跟踪,改用全档 L2 后夏普下跌——这不是个例,而是一个规律。
趋势跟踪策略的本质是"捕捉方向",而不是"重建供需全貌"。 当你用全档 L2 数据做趋势信号时,你实际上是用一把精密手术刀去砍柴——精度高了,但方向判断的逻辑反而被淹没在了更多的微观噪声里。
NBBO 粒度的数据,对于趋势跟踪策略,是一个"恰好够用"的选择:它提供了策略所需的方向信息,剔除了那些与趋势持续性无关的微观结构噪声。
真正需要 L2 全档数据的,是那些"在微观结构层面寻找边缘"的策略——做市商、流动性猎手、订单簿套利。它们的数据需求是真实的,但那是另一个策略家族的故事了。
如果你需要 10 年级别的美股历史 K 线数据(NBBO 粒度)做趋势策略回测,TickDB 的 /kline 接口可以直接支持。注册后可在控制台获取 API Key,设置 TICKDB_API_KEY 环境变量,本文的代码复制粘贴即可运行。
如果你不确定自己的策略到底需要什么粒度的数据——欢迎联系我们,帮你做一次数据需求的诊断。数据的选择没有绝对的好坏,只有"适不适合你的策略周期"。
风险提示:本文不构成任何投资建议。回测结果基于历史数据,不代表未来收益。趋势跟踪策略在市场低波动态势下表现通常较弱,实盘前请充分验证。