订单簿买卖压力比:从 depth 数据到交易信号的完整实现
价格是结果,订单簿是原因。
每一个 TickDB depth 频道推送的快照,都是交易战场上多空双方实时摆出的阵地图——买一卖一所呈现的不只是当前价格,更隐含着市场参与者的心理边界、聪明钱的布局意图,以及流动性在微观层面收缩或扩张的节奏。
对于量化开发者而言,订单簿是一座富矿。问题在于,这座矿藏的 raw ore(原始矿石)并不能直接投入策略:高频推送的多档挂单量,如果逐帧处理,数据噪声会淹没真实信号;如果仅看买卖价差,又丢失了深度结构中埋藏的信息。
本文的目标,是将多档挂单量压缩成一个可回测、可落地、有物理直觉支撑的因子:买卖压力比(Bid-Ask Pressure Ratio)。我们从订单簿数据的结构解读出发,推导加权压力比的数学构造,给出生产级的 WebSocket 订阅代码与滑动窗口计算逻辑,最后将它嵌入回测框架,完成一个完整的事件驱动策略回测闭环。
一、为什么只看买卖价差远远不够
在进入技术细节之前,有必要先说清楚为什么我们需要多档数据,而不能止步于买卖价差(Bid-Ask Spread)。
1.1 买卖价差的局限
买卖价差反映的是最优报价之间的距离,可以衡量当前市场摩擦的宽窄,但它是标量,丢失了方向信息:
spread = ask[0] - bid[0]
假设两个时刻的 spread 都是 $0.02,但我们无法区分以下两种情况:
| 时刻 | 盘口状态 | 买卖价差 | 实际情况 |
|---|---|---|---|
| T1 | 买一 100.00(5000股) vs 卖一 100.02(5000股) | 0.02 | 多空均衡 |
| T2 | 买一 100.00(2000股) vs 卖一 100.02(8000股) | 0.02 | 空方压力明显更大 |
T1 和 T2 的 spread 相同,但市场格局截然不同。价差不会告诉我们这些。只有多档挂单量才能揭示深度结构中的偏压。
1.2 depth 频道的结构优势
TickDB 的 depth 频道按档位推送订单簿快照,每个档位包含 bid_price、bid_volume、ask_price、ask_volume。以港股为例,最大可获取 10 档深度:
depth snapshot:
bids: [ {price: 100.00, volume: 2000}, {price: 99.98, volume: 1500}, ... ]
asks: [ {price: 100.02, volume: 8000}, {price: 100.04, volume: 3000}, ... ]
这 10 档数据,构成了我们构建买卖压力比的核心原料。
二、买卖压力比的数学构造
2.1 最简单的版本:单档压力比
最直觉的定义就是买一量除以卖一量:
$$
P_{simple} = \frac{Vol_{bid[0]}}{Vol_{ask[0]}}
$$
但这个指标有明显的缺陷:单档数据极易被大单冲击产生尖峰噪声。例如某机构投资者在买一位置下了一个 50000 股的 iceberg order,你的压力比瞬间从 1.0 飙升到 5.0,但这个大单可能只是一分钟内唯一的一笔成交,并不能反映真实的市场偏向。
2.2 加权压力比:引入档位衰减
一个更稳健的设计是引入档位衰减权重。直觉上,越靠近最优报价的挂单对短期价格影响越大,越远端的挂单影响力越弱。我们用指数衰减来建模这个直觉:
$$
P_{weighted} = \frac{\sum_{i=0}^{N-1} Vol_{bid[i]} \cdot e^{-\lambda \cdot i}}{\sum_{i=0}^{N-1} Vol_{ask[i]} \cdot e^{-\lambda \cdot i}}
$$
其中:
- $N$ 是档位数(TickDB 港股最大 10 档)
- $\lambda$ 是衰减系数,控制档位权重的衰减速度
- $i=0$ 对应最优档(买一/卖一),$i=1$ 对应买二/卖二,以此类推
当 $\lambda = 0$ 时,退化为简单求和;当 $\lambda$ 增大时,前档权重急剧增大,远档几乎不贡献。
实践中 $\lambda$ 的经验取值:
| λ 取值 | 前三档权重占比 | 适用场景 |
|---|---|---|
| 0.1 | ~74% | 趋势跟随,偏短线 |
| 0.3 | ~58% | 中性,通用场景 |
| 0.5 | ~46% | 趋势反转,偏长线 |
2.3 距离调整因子:消除价格水平差异
跨股票比较压力比时还存在一个问题:不同股票的价格水平差异巨大。一只 $500 的股票买一量 1000 股和一只 $5 的股票买一量 1000 股,绝对数量相同但相对市场影响力完全不同。
我们引入一个距离调整因子:
$$
D = \frac{mid_price}{best_price \cdot N}
$$
修正后的加权压力比:
$$
P_{adjusted} = P_{weighted} \times (1 + D)
$$
在实际实现中,更常见的做法是使用相对档位深度——将每一档的挂单量标准化为该档价格的百分比偏移量:
# 简化的距离调整实现
def normalize_depth(depth_snapshot, side='bid'):
"""
将挂单量按价格偏移比例归一化
depth_snapshot: TickDB depth 频道推送的单个快照
"""
entries = depth_snapshot[side] # list of {price, volume}
if not entries:
return []
best_price = entries[0]['price']
result = []
for entry in entries:
# 计算相对于最优档的价格偏移(用基点 bps 表示)
offset_bps = ((entry['price'] - best_price) / best_price) * 10000
# 归一化量:volume / best_price(每股价格归一化)
normalized_vol = entry['volume'] / best_price
result.append({
'offset_bps': offset_bps,
'normalized_vol': normalized_vol
})
return result
2.4 物理直觉
为什么加权压力比有效?它的物理直觉在于:订单簿是未成交订单的快照,而未成交订单反映了交易者愿意以特定价格成交的意愿强度。
当买方持续在更高价位堆积挂单(压力比 > 1),说明聪明钱在逢低布局——他们在用未成交的限价单表达"这个价格我愿意买"的立场。反之,卖方堆积意味着"这个价格我愿意卖"的看空意图。多档加权的目的是过滤噪声,捕获更持续的结构性偏压。
三、生产级订阅代码
3.1 WebSocket 订阅 depth 频道
以下是完整的 WebSocket 订阅代码,严格遵循 TickDB 的鉴权规范和工程健壮性要求:
import os
import json
import time
import random
import logging
import threading
from datetime import datetime
from websocket import create_connection, WebSocketApp, WebSocketException
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)
class DepthSubscriber:
"""
TickDB WebSocket 订阅器:depth 频道
工程特性:
- 心跳保活(ping/pong)
- 指数退避 + 抖动重连
- 限频处理(code:3001 + Retry-After)
- 线程安全的数据传递
"""
def __init__(self, symbol: str, depth: int = 10):
self.symbol = symbol
self.depth = depth # 港股最大10档,美股1档
self.api_key = os.environ.get("TICKDB_API_KEY")
if not self.api_key:
raise ValueError("请设置环境变量 TICKDB_API_KEY")
self.ws = None
self.running = False
self._last_depth = None
self._lock = threading.Lock()
self._retry_count = 0
self._max_retries = 10
self._base_delay = 1
self._max_delay = 60
# 回调函数注册表
self._callbacks = []
def _get_ws_url(self) -> str:
"""构造 WebSocket URL(api_key 作为 URL 参数)"""
return f"wss://api.tickdb.ai/ws?api_key={self.api_key}"
def subscribe(self):
"""启动订阅(心跳 + 订阅命令)"""
self.ws = create_connection(self._get_ws_url())
self.running = True
self._retry_count = 0
# 订阅 depth 频道(可指定档位数)
sub_cmd = {
"cmd": "subscribe",
"param": {
"symbol": self.symbol,
"depth": self.depth,
"channel": "depth"
}
}
self.ws.send(json.dumps(sub_cmd))
logger.info(f"订阅成功:{self.symbol} depth × {self.depth} 档")
# 启动心跳和接收线程
threading.Thread(target=self._heartbeat_loop, daemon=True).start()
threading.Thread(target=self._receive_loop, daemon=True).start()
def _heartbeat_loop(self):
"""心跳保活:每 15 秒发送一次 ping"""
while self.running and self.ws and self.ws.connected:
try:
self.ws.send(json.dumps({"cmd": "ping"}))
time.sleep(15)
except Exception:
break
def _receive_loop(self):
"""接收并分发 depth 快照"""
while self.running:
try:
raw = self.ws.recv()
msg = json.loads(raw)
# 处理限频响应
if msg.get("code") == 3001:
retry_after = int(msg.get("headers", {}).get(
"Retry-After",
self._base_delay * 2
))
logger.warning(f"触发限频,等待 {retry_after}s")
time.sleep(retry_after)
continue
# 处理 pong
if msg.get("type") == "pong":
continue
# 分发 depth 数据
data = msg.get("data", {})
if data.get("symbol") == self.symbol:
with self._lock:
self._last_depth = data
for cb in self._callbacks:
try:
cb(data)
except Exception as e:
logger.error(f"回调执行失败:{e}")
except WebSocketException as e:
logger.warning(f"WebSocket 断开:{e},进入重连流程")
self.ws = None
self._reconnect()
except Exception as e:
logger.error(f"接收异常:{e}")
time.sleep(1)
def _reconnect(self):
"""指数退避 + 抖动重连"""
if self._retry_count >= self._max_retries:
logger.error("重试次数耗尽,订阅终止")
self.running = False
return
delay = min(self._base_delay * (2 ** self._retry_count), self._max_delay)
jitter = random.uniform(0, delay * 0.1)
wait = delay + jitter
self._retry_count += 1
logger.info(f"重连倒计时:{wait:.1f}s(第 {self._retry_count} 次)")
time.sleep(wait)
try:
self.ws = create_connection(self._get_ws_url())
self.running = True
self._retry_count = 0 # 重置重试计数
self.ws.send(json.dumps({
"cmd": "subscribe",
"param": {
"symbol": self.symbol,
"depth": self.depth,
"channel": "depth"
}
}))
logger.info("重连成功")
threading.Thread(target=self._heartbeat_loop, daemon=True).start()
threading.Thread(target=self._receive_loop, daemon=True).start()
except Exception as e:
logger.error(f"重连失败:{e}")
self._reconnect() # 递归重试
def on_depth(self, callback):
"""注册 depth 快照回调"""
self._callbacks.append(callback)
def get_latest(self):
"""获取最新快照(线程安全)"""
with self._lock:
return self._last_depth
def stop(self):
"""停止订阅"""
self.running = False
if self.ws:
self.ws.close()
logger.info("订阅已停止")
# ⚠️ 生产环境高频场景建议使用 aiohttp/asyncio 异步架构
# ⚠️ 多标的场景建议使用连接池管理而非单实例
3.2 回调中计算加权压力比
订阅器收到快照后,在回调中计算压力比:
import math
def compute_weighted_pressure_ratio(depth_data: dict, lambda_decay: float = 0.3) -> dict:
"""
计算加权买卖压力比
参数:
depth_data: TickDB depth 频道推送的快照
lambda_decay: 档位衰减系数(默认0.3,通用场景)
返回:
{
'weighted_pressure': float, # 加权压力比
'simple_pressure': float, # 简单压力比(买一/卖一)
'bid_total_weighted': float, # 加权买方总量
'ask_total_weighted': float, # 加权卖方总量
'timestamp': str
}
"""
bids = depth_data.get("bids", [])
asks = depth_data.get("asks", [])
if not bids or not asks:
return {"error": "depth 数据不完整"}
# 加权求和
bid_weighted = sum(
entry["volume"] * math.exp(-lambda_decay * i)
for i, entry in enumerate(bids)
)
ask_weighted = sum(
entry["volume"] * math.exp(-lambda_decay * i)
for i, entry in enumerate(asks)
)
# 简单压力比(单档)
simple_bid = bids[0]["volume"]
simple_ask = asks[0]["volume"]
weighted_pressure = bid_weighted / ask_weighted if ask_weighted > 0 else 0
simple_pressure = simple_bid / simple_ask if simple_ask > 0 else 0
return {
"weighted_pressure": round(weighted_pressure, 4),
"simple_pressure": round(simple_pressure, 4),
"bid_total_weighted": round(bid_weighted, 2),
"ask_total_weighted": round(ask_weighted, 2),
"timestamp": depth_data.get("timestamp", datetime.now().isoformat())
}
# 示例回调:实时打印压力比
def on_depth_snapshot(depth_data: dict):
result = compute_weighted_pressure_ratio(depth_data, lambda_decay=0.3)
if "error" not in result:
logger.info(
f"[{result['timestamp']}] "
f"加权压力比={result['weighted_pressure']:.3f} | "
f"简单压力比={result['simple_pressure']:.3f}"
)
# 启动订阅
if __name__ == "__main__":
subscriber = DepthSubscriber(symbol="700.HK", depth=10)
subscriber.on_depth(on_depth_snapshot)
subscriber.subscribe()
try:
while True:
time.sleep(30)
except KeyboardInterrupt:
subscriber.stop()
四、滑动窗口与动态阈值
4.1 为什么需要滑动窗口
单点压力比仍然有噪声。一个更好的做法是计算滑动窗口内的压力比均值或中位数,以过滤瞬时冲击。
from collections import deque
from statistics import median
class PressureRatioWindow:
"""
滑动窗口版买卖压力比
- 维护固定大小的历史快照队列
- 支持均值和中位数两种聚合方式
- 提供偏离度计算(当前值距均值的标准差倍数)
"""
def __init__(self, window_size: int = 60, lambda_decay: float = 0.3):
"""
参数:
window_size: 滑动窗口大小(快照数量)
lambda_decay: 档位衰减系数
"""
self.window_size = window_size
self.lambda_decay = lambda_decay
self._history = deque(maxlen=window_size)
def update(self, depth_data: dict) -> dict:
"""更新窗口,返回当前窗口统计量"""
result = compute_weighted_pressure_ratio(depth_data, self.lambda_decay)
if "error" not in result:
self._history.append(result["weighted_pressure"])
return self._compute_stats()
def _compute_stats(self) -> dict:
"""计算窗口内统计量"""
if len(self._history) < 10: # 窗口预热期
return {"status": "warming", "sample_size": len(self._history)}
samples = list(self._history)
mean = sum(samples) / len(samples)
variance = sum((x - mean) ** 2 for x in samples) / len(samples)
std = math.sqrt(variance)
current = samples[-1] if samples else 1.0
deviation = (current - mean) / std if std > 0 else 0
return {
"status": "ready",
"current": round(current, 4),
"mean": round(mean, 4),
"std": round(std, 4),
"median": round(median(samples), 4),
"deviation": round(deviation, 3), # 标准差倍数
"sample_size": len(samples)
}
4.2 动态阈值:从历史分位数构建
有了滑动窗口的统计量后,我们需要一个信号判定规则。最实用的方法是基于历史分位数构建动态阈值:
class DynamicThreshold:
"""
基于历史分位数的动态阈值生成器
- 自动适应不同标的的压力比分布
- 支持上下不对称阈值(适用于趋势跟踪/均值回归不同方向)
"""
def __init__(self, percentiles: tuple = (20, 80)):
self.percentiles = percentiles # (下限分位数, 上限分位数)
self._samples = deque(maxlen=500)
def add(self, value: float):
self._samples.append(value)
def get_thresholds(self) -> dict:
"""返回上下阈值"""
if len(self._samples) < 30:
return {"status": "insufficient_data"}
sorted_samples = sorted(self._samples)
n = len(sorted_samples)
lower_pct = self.percentiles[0]
upper_pct = self.percentiles[1]
lower_idx = int(n * lower_pct / 100)
upper_idx = int(n * upper_pct / 100)
return {
"status": "ready",
"lower": round(sorted_samples[lower_idx], 4),
"upper": round(sorted_samples[upper_idx], 4),
"lower_pct": lower_pct,
"upper_pct": upper_pct
}
def generate_signal(pressure_stats: dict, threshold: dict) -> str:
"""
基于压力比偏离度和动态阈值生成交易信号
信号逻辑:
- deviation > +2σ 且 pressure > upper_threshold → 空方强势,偏空
- deviation < -2σ 且 pressure < lower_threshold → 买方蓄力,偏多
- 其他 → 中性
"""
if pressure_stats.get("status") != "ready":
return "neutral"
deviation = pressure_stats.get("deviation", 0)
current = pressure_stats.get("current", 1.0)
if threshold.get("status") != "ready":
return "neutral"
lower = threshold["lower"]
upper = threshold["upper"]
# 偏离度 + 绝对值双重确认
if deviation > 2.0 and current > upper:
return "bearish" # 卖压显著偏高
elif deviation < -2.0 and current < lower:
return "bullish" # 买压显著蓄力
else:
return "neutral"
五、回测框架集成
5.1 历史 K 线数据获取
将上述信号生成逻辑嵌入回测框架,首先需要获取历史数据。TickDB 的 /v1/market/kline 接口提供清洗对齐的历史 K 线:
import os
import requests
from typing import List, Dict
def fetch_historical_klines(
symbol: str,
interval: str = "1m",
start_time: int = None,
end_time: int = None,
limit: int = 1000
) -> List[Dict]:
"""
获取历史 K 线数据(用于回测)
参数:
symbol: 交易品种代码
interval: K 线周期(1m/5m/15m/1h/4h/1d)
start_time: 毫秒级时间戳
end_time: 毫秒级时间戳
limit: 单次最大返回条数
注意:
- 使用 /v1/market/kline 获取已结束周期
- 不使用 /kline/latest 做回测(其数据为当前未结束周期)
"""
api_key = os.environ.get("TICKDB_API_KEY")
if not api_key:
raise ValueError("请设置环境变量 TICKDB_API_KEY")
headers = {"X-API-Key": api_key}
params = {
"symbol": symbol,
"interval": interval,
"limit": limit,
}
if start_time:
params["start"] = start_time
if end_time:
params["end"] = end_time
response = requests.get(
"https://api.tickdb.ai/v1/market/kline",
headers=headers,
params=params,
timeout=(3.05, 10)
)
if response.status_code != 200:
raise RuntimeError(f"HTTP {response.status_code}: {response.text}")
result = response.json()
if result.get("code") == 0:
return result.get("data", [])
elif result.get("code") == 3001:
retry_after = int(response.headers.get("Retry-After", 5))
time.sleep(retry_after)
return fetch_historical_klines(symbol, interval, start_time, end_time, limit)
else:
raise RuntimeError(f"API 错误 {result.get('code')}: {result.get('message')}")
5.2 事件驱动回测引擎
完整的回测引擎将三个组件串联:历史 K 线数据加载 → 逐 K 线推进 → 订单簿快照模拟 → 压力比计算 → 信号判定 → 交易执行。
from dataclasses import dataclass
from enum import Enum
class Signal(Enum):
BULLISH = "bullish"
BEARISH = "bearish"
NEUTRAL = "neutral"
@dataclass
class Trade:
entry_time: str
entry_price: float
direction: str # "long" or "short"
size: float
class PressureRatioBacktester:
"""
买卖压力比事件驱动回测引擎
设计说明:
- 使用 1 分钟 K 线作为事件触发器(每根 K 线收盘时更新订单簿快照模拟)
- 订单簿快照通过模拟生成(实际回测中可替换为真实 depth 历史数据)
- 支持买入持有基准对比
"""
def __init__(
self,
symbol: str,
initial_capital: float = 100_000,
position_size: float = 0.1, # 每笔仓位占资金比例
lambda_decay: float = 0.3,
window_size: int = 60
):
self.symbol = symbol
self.initial_capital = initial_capital
self.position_size = position_size
self.pressure_window = PressureRatioWindow(window_size, lambda_decay)
self.threshold_manager = DynamicThreshold(percentiles=(20, 80))
self.cash = initial_capital
self.position = 0.0
self.trades: List[Trade] = []
self.equity_curve = []
self.daily_pnl = []
def _simulate_depth_snapshot(self, kline: dict, pressure_ratio: float) -> dict:
"""
模拟 depth 快照
⚠️ 生产回测应使用真实 depth 历史数据,此处为演示模拟逻辑
"""
close_price = kline["close"]
base_vol = 5000
# 简单模拟:压力比 > 1 时买方挂单更多,反之卖方更多
if pressure_ratio > 1.5:
bid_vol = base_vol * 2
ask_vol = base_vol * 0.6
elif pressure_ratio < 0.7:
bid_vol = base_vol * 0.6
ask_vol = base_vol * 2
else:
bid_vol = base_vol
ask_vol = base_vol
return {
"symbol": self.symbol,
"timestamp": kline["timestamp"],
"bids": [
{"price": close_price - 0.01 * i, "volume": int(bid_vol * 0.8 ** i)}
for i in range(10)
],
"asks": [
{"price": close_price + 0.01 * i, "volume": int(ask_vol * 0.8 ** i)}
for i in range(10)
]
}
def run(self, klines: List[dict]):
"""执行回测"""
for kline in klines:
# 1. 更新压力比窗口
snapshot = self._simulate_depth_snapshot(
kline,
self.pressure_window.get_latest() if self.pressure_window._history else 1.0
)
stats = self.pressure_window.update(snapshot)
# 2. 更新阈值管理器(收集足够样本)
if stats.get("status") == "ready":
self.threshold_manager.add(stats["current"])
threshold = self.threshold_manager.get_thresholds()
# 3. 生成信号
signal = generate_signal(stats, threshold)
# 4. 执行交易(仅在信号切换时操作)
current_price = kline["close"]
self._execute_trade(signal, current_price, kline["timestamp"])
# 5. 更新权益
self._update_equity(current_price)
return self._generate_report()
def _execute_trade(self, signal: Signal, price: float, timestamp: str):
"""执行交易(仅在信号切换时)"""
if signal == Signal.BULLISH and self.position <= 0:
size = (self.cash * self.position_size) / price
self.cash -= size * price
self.position += size
self.trades.append(Trade(timestamp, price, "long", size))
elif signal == Signal.BEARISH and self.position >= 0:
size = (self.cash * self.position_size) / price
self.cash += size * price
self.position -= size
self.trades.append(Trade(timestamp, price, "short", size))
def _update_equity(self, current_price: float):
portfolio_value = self.cash + self.position * current_price
self.equity_curve.append(portfolio_value)
def _generate_report(self) -> dict:
"""生成回测报告"""
equity = self.equity_curve
returns = [(equity[i] - equity[i-1]) / equity[i-1]
for i in range(1, len(equity))]
total_return = (equity[-1] - self.initial_capital) / self.initial_capital
sharpe = (sum(returns) / len(returns) / (sum((r - sum(returns)/len(returns))**2 for r in returns)/len(returns))**0.5
* (252 * 390) ** 0.5) if len(returns) > 1 and sum((r - sum(returns)/len(returns))**2 for r in returns) > 0 else 0
peak = max(equity)
trough = min(e for e in equity if e <= peak)
max_drawdown = (peak - trough) / peak if peak > 0 else 0
return {
"total_return": round(total_return * 100, 2),
"sharpe_ratio": round(sharpe, 2),
"max_drawdown": round(max_drawdown * 100, 2),
"num_trades": len(self.trades),
"final_equity": round(equity[-1], 2)
}
# ⚠️ 重要提示:上述回测存在以下局限性:
# - 订单簿快照为模拟生成,未使用真实 depth 历史数据
# - 未考虑交易滑点和市场冲击成本
# - 未模拟流动性枯竭场景
# - 建议在实际使用前进行更长时间跨度的验证
5.3 回测运行示例
if __name__ == "__main__":
# 获取最近 30 天 1 分钟 K 线数据
import time
end_ts = int(time.time() * 1000)
start_ts = end_ts - 30 * 24 * 3600 * 1000
klines = fetch_historical_klines(
symbol="700.HK",
interval="1m",
start_time=start_ts,
end_time=end_ts,
limit=5000
)
if not klines:
print("未获取到 K 线数据,请检查 symbol 和时间范围")
else:
tester = PressureRatioBacktester(
symbol="700.HK",
initial_capital=100_000,
lambda_decay=0.3,
window_size=60
)
report = tester.run(klines)
print("=" * 40)
print("回测报告:买卖压力比策略")
print("=" * 40)
for k, v in report.items():
print(f" {k}: {v}")
六、实盘信号系统
6.1 与实盘数据流对接
回测验证通过后,实盘部署只需要将模拟的 _simulate_depth_snapshot 替换为真实的订阅器回调:
def build_real_time_signal_engine(symbol: str):
"""
实盘信号引擎:订阅 TickDB depth → 计算压力比 → 信号判定 → 告警
"""
subscriber = DepthSubscriber(symbol=symbol, depth=10)
pressure_window = PressureRatioWindow(window_size=60, lambda_decay=0.3)
threshold_manager = DynamicThreshold(percentiles=(20, 80))
def signal_callback(depth_data: dict):
# 计算压力比
stats = pressure_window.update(depth_data)
# 收集样本建立阈值
if stats.get("status") == "ready":
threshold_manager.add(stats["current"])
# 判定信号
threshold = threshold_manager.get_thresholds()
signal = generate_signal(stats, threshold)
# 告警(可替换为飞书/Webhook/钉钉)
if signal != "neutral":
logger.warning(
f"⚠️ 信号触发:{signal.upper()} | "
f"压力比={stats.get('current')} | "
f"偏离度={stats.get('deviation')}σ"
)
# 触发告警(示例)
_send_alert(symbol, signal, stats, threshold)
subscriber.on_depth(signal_callback)
subscriber.subscribe()
return subscriber
def _send_alert(symbol: str, signal: str, stats: dict, threshold: dict):
"""告警发送(示例:飞书 Webhook)"""
import urllib.request
import urllib.parse
webhook_url = os.environ.get("FEISHU_WEBHOOK_URL")
if not webhook_url:
logger.debug("未配置飞书 Webhook,跳过告警")
return
content = {
"msg_type": "text",
"content": {
"text": (
f"【TickDB 告警】{symbol}\n"
f"信号:{signal}\n"
f"加权压力比:{stats.get('current')}\n"
f"偏离度:{stats.get('deviation')}σ\n"
f"阈值区间:[{threshold.get('lower')}, {threshold.get('upper')}]"
)
}
}
try:
req = urllib.request.Request(
webhook_url,
data=urllib.parse.urlencode(content).encode("utf-8"),
headers={"Content-Type": "application/json"}
)
urllib.request.urlopen(req, timeout=5)
except Exception as e:
logger.error(f"告警发送失败:{e}")
七、参数敏感度分析
在将策略投入实盘前,对关键参数进行敏感度分析是必要的。以下是主要参数的影响方向和经验参考:
| 参数 | 影响方向 | 参考范围 | 注意事项 |
|---|---|---|---|
| λ(衰减系数) | λ↑ → 前档权重↑ → 信号更灵敏但噪声增加 | 0.1~0.5 | 需结合标的特性回测确定 |
| 窗口大小 | 窗口↑ → 信号越平滑但滞后越大 | 30~120 快照 | 高频场景可用较小窗口 |
| 分位数阈值 | 下限↓/上限↑ → 信号触发更困难 | (15,85)~(30,70) | 趋势跟踪可收紧反转阈值 |
| 偏离度阈值 | σ↑ → 信号触发条件更严格 | 1.5σ~3σ | 高波动标的需提高阈值 |
建议使用 TickDB 历史数据对上述参数进行网格搜索,取夏普比率最优的参数组合作为实盘初始参数,并设置定期重校准机制。
八、下一步行动
如果你希望亲手运行本文代码:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台生成 API Key 并设置环境变量
TICKDB_API_KEY - 复制本文代码,更换 symbol(如
700.HK)即可运行
如果你在研究港股订单簿结构,TickDB 的 depth 频道支持港股 10 档深度,是构建压力比因子的理想数据源。对比之下,美股 depth 仅支持 1 档,建议使用港股或数字货币标的进行此类策略开发。
如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,可直接在对话中调用 TickDB API 获取数据和构建信号。
如果你希望获取完整的回测历史数据,联系 [email protected] 了解专业版/企业版方案,包含 10 年级别的历史 K 线数据,支持跨周期策略回测。
本文不构成任何投资建议。市场有风险,投资需谨慎。