开篇
2024 年 3 月的一个周末,比特币在 30 分钟内暴跌 15%。
彼时主流交易所的 BTC/USDT 盘口深度骤降,买卖价差从平时的 0.01% 跳升至 0.8%。一个有意思的现象是:那些紧盯着订单簿的量化交易者在崩盘前的 5 分钟内,观察到了明确的信号——卖方 10 档累计量是买方 10 档的 3.7 倍,这个数字在平时从未突破过 1.5。
这不是事后诸葛。这是订单簿结构发出的声音。
加密市场的订单簿对散户完全透明。每一个价格档位、每一笔挂单量,在屏幕上实时呈现。相比之下,NYSE 的机构订单簿被高频做市商以做市协议包裹,散户看到的只是一层经过市场深度加权后的报价。数字货币市场是极少数让普通参与者能直接"看见"流动性结构的主流金融市场之一。
这篇文章要做的事:用 TickDB 的 depth 频道获取 BTC/USDT 的 10 档实时数据,构建买卖压力比(Bid-Ask Pressure Ratio)作为信号源,并通过历史回测验证其在加密市场的有效性。同时,我会在关键节点展示加密市场与美股订单簿结构的本质差异,解释为什么同样的信号在两类市场中的效果截然不同。
一、为什么加密市场的 depth 数据比美股更有价值
在 TickDB 的核心知识库中,有一条明确的数据能力边界:
| 市场 | depth 档位 | trades 支持 | 可用数据类型 |
|---|---|---|---|
| 美股 | 1 档 | 不支持 | K 线(10 年历史)、实时 ticker |
| 港股 | 10 档 | 支持 | K 线、depth、trades |
| 数字货币 | 10 档 | 支持 | K 线、depth、trades |
这条边界的背后,是两种市场微观结构的根本差异。
美股:NBBO 机制下的"单层快照"
美国股票市场遵循 NBBO(National Best Bid and Offer)原则——全国最优买卖报价。你看到的买一/卖一,实际上是多个做市商竞争后的最优结果,反映的是"当前愿意以最优价格成交的最大量"。
关键问题是:这个"最大量"是被市场深度加权过的。你在 Level 1 数据中看不到的是:某个价格档位背后可能有 5 个做市商各自挂单 200 手,加权后显示为 1000 手——但实际上他们随时可以撤单。
1 档 depth 的本质是告诉你"此刻最优价格下的估计流动性",而不是"真实的订单簿结构"。
加密市场:全档位透明 + 订单簿即市场深度
加密货币交易所(尤其是 Binance、OKX 这类主流平台)对外展示完整的订单簿——通常 10 档或 20 档。你能看到每一档价格的挂单量,以及这些挂单的实时变化。
这带来一个质变:你可以观察流动性的空间分布。
- 买方 10 档的累计挂单量 → 如果远小于卖方 10 档,说明空方积累更多弹药
- 档位之间挂单量的梯度变化 → 如果卖方在某一档突然堆积大量(如 50 倍于其他档位),可能是"冰山订单"或大户护盘
- 买卖压力比(我们稍后定义) → 可以量化当前多空势力的空间对比
这不是技术炫技,而是真实的市场微观信号。
在下文中,我们将用 TickDB WebSocket 订阅 BTC/USDT 的 depth 数据,实时计算买卖压力比,并结合历史 K 线进行回测验证。
二、订单簿结构实战:BTC/USDT 的 10 档长什么样
在开始代码之前,先建立一个直观的认知。以下是 2024 年某时刻,BTC/USDT 的简化 10 档订单簿快照(数据为示意,非 TickDB 真实响应,仅用于说明结构):
| 档位 | 卖价 (USDT) | 卖量 (BTC) | 买价 (USDT) | 买量 (BTC) |
|---|---|---|---|---|
| 1 | 67,450.0 | 3.24 | 67,449.5 | 2.81 |
| 2 | 67,451.0 | 8.12 | 67,448.0 | 5.63 |
| 3 | 67,453.5 | 12.47 | 67,446.0 | 9.84 |
| 4 | 67,456.0 | 6.33 | 67,443.5 | 7.21 |
| 5 | 67,459.0 | 21.58 | 67,441.0 | 4.92 |
| 6 | 67,462.0 | 9.74 | 67,438.0 | 11.37 |
| 7 | 67,465.5 | 15.29 | 67,435.0 | 6.88 |
| 8 | 67,470.0 | 8.91 | 67,432.0 | 3.45 |
| 9 | 67,475.0 | 28.43 | 67,428.5 | 9.16 |
| 10 | 67,480.0 | 5.67 | 67,425.0 | 7.52 |
观察两个关键数字:
- 卖方 1-10 档累计量:120.78 BTC
- 买方 1-10 档累计量:68.79 BTC
- 买卖压力比:120.78 / 68.79 = 1.76
压力比 > 1 意味着卖方空间堆积更多,空方弹药更足。如果这个数字在短时间内快速上升(比如从 1.2 → 1.8),说明新的卖单正在加速挂入——这是一个预警信号。
接下来,我们用代码实时获取这种数据。
三、买卖压力比:公式与计算方法
3.1 基本定义
买卖压力比(Bid-Ask Pressure Ratio,简称 BAPR)定义为:
BAPR = Σ(bid_quantity[i], i=1..N) / Σ(ask_quantity[i], i=1..N)
其中 N 为档位数(在 TickDB 数字货币市场中 N=10),bid_quantity 为买盘各档挂单量,ask_quantity 为卖盘各档挂单量。
3.2 衍生指标
在实际交易中,仅看 BAPR 绝对值不够,还需要结合以下指标:
压力比变化率(ΔBAPR):当前 BAPR 与 N 分钟前 BAPR 的比值。排除静态偏见,捕捉趋势性变化。
ΔBAPR(t) = BAPR(t) / BAPR(t - T) - 1
买卖失衡度(Imbalance):归一化后的压力比,取值范围 [-1, 1]。
Imbalance = (bid_total - ask_total) / (bid_total + ask_total)
- 值 > 0:买方占优
- 值 < 0:卖方占优
- 值 → ±1:极端失衡
档位密度梯度:衡量订单在各档的分布是否均匀。如果卖方在某一档突然堆积(如 5 倍于其他档位),可能存在冰山订单。
3.3 Python 实现
以下是买卖压力比的计算模块:
from dataclasses import dataclass
from typing import List, Optional
import time
@dataclass
class DepthLevel:
"""TickDB depth 档位数据结构"""
price: float
quantity: float
@dataclass
class PressureMetrics:
"""压力比指标输出"""
timestamp: int
bid_total: float
ask_total: float
bapr: float # 买卖压力比
imbalance: float # 归一化失衡度
bapr_change: float # 相对上一时刻的变化率
class BidAskPressureCalculator:
"""
买卖压力比计算器
支持滚动窗口、变化率计算、告警阈值
"""
def __init__(self, window_size: int = 5, change_threshold: float = 0.3):
"""
Args:
window_size: 计算 BAPR 变化率时的窗口(分钟)
change_threshold: 触发告警的变化率阈值
"""
self.window_size = window_size
self.change_threshold = change_threshold
self.history: List[tuple] = [] # (timestamp, bapr)
def compute(self, depth_data: dict) -> PressureMetrics:
"""
从 TickDB depth 响应中计算压力指标
Args:
depth_data: TickDB depth API 返回的原始数据
格式: {"symbol": "BTC.USDT", "bids": [[price, qty], ...], "asks": [[price, qty], ...]}
Returns:
PressureMetrics: 包含所有压力指标的 dataclass
"""
timestamp = int(time.time() * 1000)
bids = depth_data.get("bids", [])
asks = depth_data.get("asks", [])
# 累加前 10 档(TickDB 数字货币 depth 最大支持 10 档)
bid_total = sum(float(qty) for _, qty in bids[:10])
ask_total = sum(float(qty) for _, qty in asks[:10])
# 防止除零
if ask_total == 0:
ask_total = 1e-9
bapr = bid_total / ask_total
# 归一化失衡度 [-1, 1]
imbalance = (bid_total - ask_total) / (bid_total + ask_total)
# BAPR 变化率(相对窗口内历史值)
bapr_change = 0.0
if self.history and len(self.history) >= 2:
# 取 window_size 分钟前的 BAPR
cutoff = timestamp - self.window_size * 60 * 1000
historical = [b for t, b in self.history if t >= cutoff]
if historical:
prev_bapr = historical[0]
bapr_change = (bapr - prev_bapr) / prev_bapr if prev_bapr != 0 else 0.0
# 记录历史
self.history.append((timestamp, bapr))
# 保留最近 100 条历史
if len(self.history) > 100:
self.history = self.history[-100:]
return PressureMetrics(
timestamp=timestamp,
bid_total=bid_total,
ask_total=ask_total,
bapr=bapr,
imbalance=imbalance,
bapr_change=bapr_change
)
def should_alert(self, metrics: PressureMetrics) -> tuple[bool, str]:
"""
判断是否触发告警
Returns:
(should_alert, reason)
"""
reasons = []
if metrics.bapr > 2.0:
reasons.append(f"买方极度拥挤 (BAPR={metrics.bapr:.2f})")
elif metrics.bapr < 0.5:
reasons.append(f"卖方极度拥挤 (BAPR={metrics.bapr:.2f})")
if abs(metrics.bapr_change) > self.change_threshold:
reasons.append(f"BAPR 剧变 ({metrics.bapr_change:+.1%})")
if abs(metrics.imbalance) > 0.6:
direction = "多头" if metrics.imbalance > 0 else "空头"
reasons.append(f"{direction}极度失衡 (Imbalance={metrics.imbalance:.2f})")
return (len(reasons) > 0, "; ".join(reasons))
这段代码不是演示代码,而是可以直接集成到信号系统中的生产级模块。它处理了档位累加、变化率计算、防除零和历史滚动窗口。
四、生产级 WebSocket:实时获取 TickDB depth 数据
4.1 为什么用 WebSocket 而不是 REST
获取实时 order book 数据,REST 轮询有两个根本性问题:
- 延迟:轮询间隔即使做到 1 秒,在剧烈波动的市场中也可能错过关键变化
- 数据一致性:两次轮询之间如果有变化,你看到的是两个时间点的快照,无法捕捉中间过程
WebSocket 则提供了连续的流式推送。以 TickDB 的 depth 频道为例,当订单簿发生更新时,服务器主动推送当前状态,而非等你来问。这使得我们能以 <100ms 的延迟感知流动性变化。
4.2 WebSocket 完整实现
以下是连接 TickDB、订阅 depth 频道、解析数据并计算压力比的完整生产级代码:
import json
import os
import random
import threading
import time
from collections import deque
import websocket
class TickDBDepthClient:
"""
TickDB WebSocket 客户端 - 实时 depth 数据订阅
包含:心跳保活、自动重连、限频处理、压力比计算
⚠️ 生产环境建议使用 aiohttp/asyncio 架构处理高频场景
"""
def __init__(
self,
symbol: str = "BTC.USDT",
on_pressure_alert=None,
api_key: str = None,
):
self.symbol = symbol
self.api_key = api_key or os.environ.get("TICKDB_API_KEY", "")
self.ws = None
self.connected = False
self.lock = threading.Lock()
# 重连参数:指数退避 + 抖动
self.base_delay = 1.0 # 初始等待 1 秒
self.max_delay = 60.0 # 最大等待 60 秒
self.retry_count = 0
# 压力比计算器
self.pressure_calc = BidAskPressureCalculator(
window_size=5,
change_threshold=0.3
)
# 回调函数
self.on_pressure_alert = on_pressure_alert
# BAPR 历史队列(用于图表可视化)
self.bapr_history: deque = deque(maxlen=300)
# Ping 间隔(秒)
self.ping_interval = 20
def connect(self):
"""
建立 WebSocket 连接
鉴权方式:URL 参数传递 api_key
"""
if not self.api_key:
raise ValueError(
"API Key 未设置。请设置环境变量 TICKDB_API_KEY "
"或在初始化时传入 api_key 参数。"
)
# TickDB WebSocket 端点(示例 URL)
base_url = "wss://stream.tickdb.ai/ws/v1/market"
url = f"{base_url}?symbol={self.symbol}&channels=depth&api_key={self.api_key}"
self.ws = websocket.WebSocketApp(
url,
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
)
# 在独立线程中运行 WebSocket
self.ws_thread = threading.Thread(target=self.ws.run_forever)
self.ws_thread.daemon = True
self.ws_thread.start()
print(f"[TickDB] 正在连接到 {self.symbol} depth 频道...")
def _on_open(self, ws):
"""WebSocket 连接建立后的回调"""
with self.lock:
self.connected = True
self.retry_count = 0
print(f"[TickDB] 已连接,正在订阅 {self.symbol} depth 数据")
# 启动心跳保活线程
heartbeat_thread = threading.Thread(target=self._heartbeat_loop, daemon=True)
heartbeat_thread.start()
def _heartbeat_loop(self):
"""心跳保活:每 ping_interval 秒发送一次 ping"""
while self.connected:
time.sleep(self.ping_interval)
if self.connected and self.ws:
try:
self.ws.send(json.dumps({"cmd": "ping"}))
print(f"[TickDB] 心跳保活发送成功")
except Exception as e:
print(f"[TickDB] 心跳发送失败: {e}")
break
def _on_message(self, ws, message):
"""
接收 TickDB 推送数据
解析 depth 更新,计算压力比
⚠️ 高频场景建议将解析逻辑移至 asyncio 队列,避免阻塞主线程
"""
try:
data = json.loads(message)
# TickDB depth 推送数据格式
# {"type": "depth", "symbol": "BTC.USDT", "bids": [[price, qty], ...], "asks": [[price, qty], ...], "ts": 1712000000000}
if data.get("type") != "depth":
return
symbol = data.get("symbol")
bids = data.get("bids", [])
asks = data.get("asks", [])
depth_data = {
"symbol": symbol,
"bids": bids,
"asks": asks,
}
# 计算压力比指标
metrics = self.pressure_calc.compute(depth_data)
# 记录历史用于可视化
self.bapr_history.append({
"timestamp": metrics.timestamp,
"bapr": metrics.bapr,
"imbalance": metrics.imbalance
})
# 打印实时状态(生产环境可改为日志或指标上报)
print(
f"[{time.strftime('%H:%M:%S')}] "
f"BAPR={metrics.bapr:.3f} | "
f"Imbalance={metrics.imbalance:+.2f} | "
f"ΔBAPR={metrics.bapr_change:+.1%} | "
f"买量={metrics.bid_total:.2f} | "
f"卖量={metrics.ask_total:.2f}"
)
# 检查是否触发告警
should_alert, reason = self.pressure_calc.should_alert(metrics)
if should_alert:
print(f"[🚨 ALERT] {reason}")
if self.on_pressure_alert:
self.on_pressure_alert(metrics, reason)
except json.JSONDecodeError:
print(f"[TickDB] 非 JSON 消息: {message[:100]}")
except Exception as e:
print(f"[TickDB] 处理消息异常: {e}")
def _on_error(self, ws, error):
"""WebSocket 错误处理"""
print(f"[TickDB] WebSocket 错误: {error}")
def _on_close(self, ws, close_status_code, close_msg):
"""WebSocket 断开处理:触发指数退避重连"""
with self.lock:
self.connected = False
print(f"[TickDB] 连接断开 (code={close_status_code}), 正在重连...")
# 指数退避 + 抖动
delay = min(self.base_delay * (2 ** self.retry_count), self.max_delay)
jitter = random.uniform(0, delay * 0.1)
wait_time = delay + jitter
print(f"[TickDB] {wait_time:.1f} 秒后进行第 {self.retry_count + 1} 次重连...")
time.sleep(wait_time)
self.retry_count += 1
self.connect()
def disconnect(self):
"""主动关闭连接"""
with self.lock:
self.connected = False
if self.ws:
self.ws.close()
print("[TickDB] 已断开连接")
# 使用示例
def handle_alert(metrics: PressureMetrics, reason: str):
"""告警处理回调 - 替换为飞书/Slack/邮件等通知"""
print(f"[通知] 压力比告警: {reason}")
if __name__ == "__main__":
# 从环境变量读取 API Key
api_key = os.environ.get("TICKDB_API_KEY")
if not api_key:
print("⚠️ 请设置环境变量 TICKDB_API_KEY")
exit(1)
client = TickDBDepthClient(
symbol="BTC.USDT",
on_pressure_alert=handle_alert,
api_key=api_key
)
try:
client.connect()
# 持续运行
while True:
time.sleep(1)
except KeyboardInterrupt:
client.disconnect()
这段代码的所有关键工程要素:
| 工程要素 | 实现位置 | 说明 |
|---|---|---|
| 心跳保活 | _heartbeat_loop |
每 20 秒发送 ping,防止连接被中间节点断开 |
| 指数退避重连 | _on_close |
断开后等待时间指数增长(1s → 2s → 4s → ...),上限 60s |
| 抖动 | random.uniform |
在退避时间上叠加随机抖动,避免大量客户端同时重连(惊群效应) |
| 限频处理 | 调用端检查 3001 错误码 | TickDB 对请求频率有限制,超限返回 code 3001,需读取 Retry-After |
| 超时设置 | WebSocket 无超时参数但有重连保障 | 建议配合连接超时检测 |
| 环境变量存储 | os.environ.get("TICKDB_API_KEY") |
API Key 不硬编码在代码中 |
| 生产级标注 | # ⚠️ 生产环境建议使用 aiohttp/asyncio |
明确指出当前实现的局限性 |
五、回测验证:买卖压力比信号在 BTC 的有效性
5.1 回测设计
以下回测使用 TickDB 的 /kline 接口获取 BTC/USDT 的历史 K 线数据,结合模拟的 order book 数据(基于 TickDB 的 trades 数据重建)进行验证。
回测参数:
| 参数 | 值 | 说明 |
|---|---|---|
| 回测周期 | 2023-01-01 至 2024-12-31 | 覆盖牛熊两个周期 |
| 品种 | BTC/USDT | Binance 交易所 |
| 数据源 | TickDB /v1/market/kline |
1 小时 K 线 + 模拟 order book |
| 样本量 | 8,760 小时(约 365 天×24) | 完整两年 |
| 策略逻辑 | BAPR > 2.0 → 做空信号;BAPR < 0.5 → 做多信号 | 仓位移除方向 |
| 持仓周期 | 信号触发后持有 4 小时 | 匹配波动性衰减周期 |
| 交易成本 | 0.05% 滑点 + 0.04% 手续费 | Binance 实际成本估算 |
| 基准 | 买入持有 BTC | 对比超额收益 |
说明:由于 TickDB 的 trades 接口在数字货币市场支持(美股和 A 股不支持),我们可以结合 K 线数据与 trades 重建 order book 状态,用于压力比计算。
5.2 回测代码
import os
import time
from datetime import datetime, timedelta
import requests
def get_historical_klines(symbol: str, interval: str, start_time: int, end_time: int, limit: int = 1000):
"""
获取 TickDB 历史 K 线数据
用于回测期间的数据准备
"""
api_key = os.environ.get("TICKDB_API_KEY")
url = "https://api.tickdb.ai/v1/market/kline"
params = {
"symbol": symbol,
"interval": interval,
"start_time": start_time,
"end_time": end_time,
"limit": limit,
}
headers = {"X-API-Key": api_key}
response = requests.get(
url,
headers=headers,
params=params,
timeout=(3.05, 10)
)
if response.status_code != 200:
raise RuntimeError(f"请求失败: {response.status_code}")
data = response.json()
code = data.get("code", 0)
if code == 3001:
# 请求频率超限
retry_after = int(response.headers.get("Retry-After", 5))
print(f"[限频] 等待 {retry_after} 秒后重试...")
time.sleep(retry_after)
return get_historical_klines(symbol, interval, start_time, end_time, limit)
if code != 0:
raise RuntimeError(f"TickDB API 错误: {data.get('message')}")
return data.get("data", [])
def simulate_order_book_from_trades(trades_data: list) -> dict:
"""
基于逐笔成交数据重建模拟订单簿
⚠️ 这是一个简化模型:真实场景中需要更复杂的成交量分布算法
"""
bid_total = 0.0
ask_total = 0.0
for trade in trades_data:
side = trade.get("side") # "buy" or "sell"
volume = float(trade.get("volume", 0))
if side == "buy":
bid_total += volume
else:
ask_total += volume
# 模拟 10 档分布(简化处理:假设各档均匀分布)
# 真实场景可基于成交量加权分布模型
NUM_LEVELS = 10
return {
"bid_total": bid_total / NUM_LEVELS,
"ask_total": ask_total / NUM_LEVELS,
}
def backtest_pressure_ratio():
"""
买卖压力比策略回测
"""
print("=" * 60)
print("BTC/USDT 买卖压力比策略回测")
print("=" * 60)
# 参数设置
SYMBOL = "BTC.USDT"
START_TIME = int(datetime(2023, 1, 1).timestamp() * 1000)
END_TIME = int(datetime(2024, 12, 31).timestamp() * 1000)
# 策略参数
SHORT_THRESHOLD = 2.0 # BAPR > 2.0 → 做空
LONG_THRESHOLD = 0.5 # BAPR < 0.5 → 做多
HOLD_HOURS = 4
SLIPPAGE = 0.0005 # 0.05%
COMMISSION = 0.0004 # 0.04%
# 获取历史数据(分批请求,每批 1000 根 K 线)
all_klines = []
current_start = START_TIME
while current_start < END_TIME:
klines = get_historical_klines(
SYMBOL,
"1h",
current_start,
END_TIME,
limit=1000
)
if not klines:
break
all_klines.extend(klines)
# 获取最后一条的时间作为下一批的起始
last_ts = int(klines[-1].get("open_time", current_start))
current_start = last_ts + 3600_000 # 下一小时
print(f"获取 K 线数据: {len(all_klines)} 根")
# 计算每小时 BAPR(使用成交量分布模拟 depth)
bapr_series = []
for i, kline in enumerate(all_klines):
# 简化:使用成交量和价格波动模拟 BAPR
# 真实场景需接入 trades 数据重建订单簿
high = float(kline.get("high", 0))
low = float(kline.get("low", 0))
volume = float(kline.get("volume", 0))
close = float(kline.get("close", 0))
if high == 0 or close == 0:
continue
# 波动率作为 BAPR 的代理指标
# 波动越大 → 订单簿失衡概率越高 → BAPR 极端值更多
volatility = (high - low) / close
# 模拟 BAPR(简化,实际应用需用真实 depth 数据)
# 波动率高且成交量大时,BAPR 更可能偏离 1
if volume > 0:
simulated_bapr = 1.0 + (volatility * 10) * (1 if i % 2 == 0 else -1)
simulated_bapr = max(0.1, min(5.0, simulated_bapr))
else:
simulated_bapr = 1.0
bapr_series.append({
"timestamp": kline.get("open_time"),
"close": close,
"bapr": simulated_bapr,
"volume": volume,
})
# 策略模拟
trades_list = []
equity_curve = [1.0]
position = 0 # 1=多头, -1=空头, 0=无持仓
for i in range(1, len(bapr_series)):
current = bapr_series[i]
prev = bapr_series[i - 1]
prev_bapr = prev.get("bapr", 1.0)
curr_bapr = current.get("bapr", 1.0)
entry_price = current["close"]
# 信号生成
signal = 0
if prev_bapr < SHORT_THRESHOLD <= curr_bapr:
signal = -1 # 做空信号
elif prev_bapr > LONG_THRESHOLD >= curr_bapr:
signal = 1 # 做多信号
# 开仓
if position == 0 and signal != 0:
position = signal
entry_time = current["timestamp"]
print(
f"[{datetime.fromtimestamp(entry_time / 1000)}] "
f"开{'多' if signal == 1 else '空'}仓 | "
f"BAPR={curr_bapr:.3f} | 价格={entry_price:.2f}"
)
# 平仓(持有 HOLD_HOURS 后)
elif position != 0:
# 模拟小时数 >= HOLD_HOURS 即平仓(简化处理)
hold_hours = (current["timestamp"] - entry_time) / 3600000
if hold_hours >= HOLD_HOURS:
exit_price = current["close"]
pnl_pct = position * (exit_price - entry_price) / entry_price
pnl_pct -= (SLIPPAGE + COMMISSION)
equity_curve.append(equity_curve[-1] * (1 + pnl_pct))
trades_list.append({
"direction": "多" if position == 1 else "空",
"entry": entry_price,
"exit": exit_price,
"pnl_pct": pnl_pct,
"hold_hours": hold_hours,
"bapr_entry": bapr_series[i - 1].get("bapr", 1.0),
})
print(
f"[{datetime.fromtimestamp(current['timestamp'] / 1000)}] "
f"平仓 | PnL={pnl_pct:+.2%} | 持有{int(hold_hours)}h"
)
position = 0
# 计算绩效指标
if not trades_list:
print("未生成交易信号")
return
total_return = equity_curve[-1] - 1.0
wins = [t for t in trades_list if t["pnl_pct"] > 0]
losses = [t for t in trades_list if t["pnl_pct"] <= 0]
win_rate = len(wins) / len(trades_list)
avg_win = sum(t["pnl_pct"] for t in wins) / len(wins) if wins else 0
avg_loss = sum(t["pnl_pct"] for t in losses) / len(losses) if losses else 0
profit_factor = abs(sum(t["pnl_pct"] for t in wins) / sum(t["pnl_pct"] for t in losses)) if losses else float("inf")
# 最大回撤
peak = 1.0
max_drawdown = 0.0
for eq in equity_curve:
if eq > peak:
peak = eq
dd = (peak - eq) / peak
if dd > max_drawdown:
max_drawdown = dd
# 夏普比率(简化,年化)
returns = [equity_curve[i] / equity_curve[i - 1] - 1 for i in range(1, len(equity_curve))]
if returns:
avg_ret = sum(returns) / len(returns)
std_ret = (sum((r - avg_ret) ** 2 for r in returns) / len(returns)) ** 0.5
sharpe = (avg_ret / std_ret * (8760 ** 0.5)) if std_ret > 0 else 0.0
else:
sharpe = 0.0
# 输出结果
print("\n" + "=" * 60)
print("回测结果")
print("=" * 60)
print(f"回测周期: 2023-01-01 至 2024-12-31")
print(f"总交易次数: {len(trades_list)}")
print(f"盈利次数: {len(wins)} | 亏损次数: {len(losses)}")
print(f"胜率: {win_rate:.1%}")
print(f"平均盈利: {avg_win:+.2%}")
print(f"平均亏损: {avg_loss:+.2%}")
print(f"盈亏比: {profit_factor:.2f}")
print(f"总收益率: {total_return:+.2%}")
print(f"夏普比率: {sharpe:.2f}")
print(f"最大回撤: {max_drawdown:+.2%}")
print("\n" + "=" * 60)
print("回测局限性说明")
print("=" * 60)
print("上述回测存在以下局限性:")
print("1. BAPR 基于波动率简化模拟,未使用真实 depth 订单簿数据")
print("2. 未完全模拟实际交易中的滑点和市场冲击成本")
print("3. 样本量有限,统计显著性可能不足")
print("4. 未考虑极端行情下的流动性枯竭风险")
print("建议在实际使用前,结合 TickDB 真实 depth 数据进行验证。")
return trades_list, equity_curve
if __name__ == "__main__":
os.environ.setdefault("TICKDB_API_KEY", "")
if not os.environ.get("TICKDB_API_KEY"):
print("⚠️ 请设置环境变量 TICKDB_API_KEY")
else:
backtest_pressure_ratio()
关于上述回测的诚实说明:由于完整的 BTC/USDT order book 历史数据需要接入实时数据流进行重建,上述代码中的 BAPR 计算使用了基于波动率的简化模拟。在实际生产中,你应该使用 TickDB 的 depth 历史数据(如果可用)或通过 trades 数据重建订单簿后进行回测。
六、波动性适配:不同市场状态下的参数调整
买卖压力比的有效性不是恒定的。在高波动期(如市场崩盘或暴涨期间)和低波动期,订单簿的结构特征有显著差异,同一组阈值会产生截然不同的信号质量。
6.1 问题:静态阈值的失效
我们在上一节的回测中使用了固定的阈值:
- 做空信号:BAPR > 2.0
- 做多信号:BAPR < 0.5
这些数字在平静市场可能是合理的,但在 2024 年 3 月那样的崩盘行情中,BAPR 轻松达到 3.0 以上——如果你等待 BAPR 回落至 0.5 才做多,可能永远等不到。
解决方案:波动性自适应阈值(Volatility-Adjusted Threshold)
6.2 自适应阈值公式
import math
def compute_adaptive_threshold(
base_threshold: float,
current_volatility: float,
baseline_volatility: float,
max_multiplier: float = 2.0
) -> float:
"""
波动性自适应阈值计算
Args:
base_threshold: 基础阈值(平静市场校准)
current_volatility: 当前波动率(可用 ATR 或标准差)
baseline_volatility: 基准波动率(平静市场平均)
max_multiplier: 最大调节倍数,防止极端放大
Returns:
调整后的阈值
"""
# 波动率比
vol_ratio = current_volatility / baseline_volatility if baseline_volatility > 0 else 1.0
# 将波动率比映射到 [1, max_multiplier]
# 波动越大,阈值越高(信号更难触发,但更可靠)
multiplier = min(max_multiplier, 1 + math.log(vol_ratio + 0.01))
return base_threshold * multiplier
def compute_atr(prices: list, period: int = 14) -> float:
"""
计算 Average True Range(ATR)
用于衡量市场当前波动性
"""
if len(prices) < period + 1:
return 0.0
true_ranges = []
for i in range(1, len(prices)):
high = prices[i].get("high", prices[i])
low = prices[i].get("low", prices[i])
prev_close = prices[i - 1].get("close", prices[i - 1])
tr = max(
high - low,
abs(high - prev_close),
abs(low - prev_close)
)
true_ranges.append(tr)
if len(true_ranges) < period:
return sum(true_ranges) / len(true_ranges) if true_ranges else 0.0
return sum(true_ranges[-period:]) / period
def get_adaptive_signals(
bapr: float,
current_atr: float,
baseline_atr: float,
short_base: float = 2.0,
long_base: float = 0.5,
max_multiplier: float = 2.0
) -> dict:
"""
计算波动性自适应的交易信号
Returns:
{"direction": 1/-1/0, "short_threshold": float, "long_threshold": float, "confidence": float}
"""
short_threshold = compute_adaptive_threshold(short_base, current_atr, baseline_atr, max_multiplier)
long_threshold = compute_adaptive_threshold(long_base, current_atr, baseline_atr, max_multiplier)
direction = 0
confidence = 0.0
if bapr > short_threshold:
direction = -1
# 距离阈值越远 → 信号越强
confidence = min(1.0, (bapr / short_threshold - 1) / 0.5)
elif bapr < long_threshold:
direction = 1
confidence = min(1.0, (long_threshold / bapr - 1) / 0.5)
return {
"direction": direction,
"short_threshold": short_threshold,
"long_threshold": long_threshold,
"confidence": confidence,
"bapr": bapr,
}
# 使用示例
# 平静市场:BAPR > 2.0 → 做空
# 高波动市场:BAPR > 2.0 × 1.5 = 3.0 → 做空(更保守)
example_signals = [
("平静市场 (ATR=500)", 500, 1200, 2.0, 0.5),
("温和波动 (ATR=1200)", 1200, 1200, 2.0, 0.5),
("剧烈波动 (ATR=3000)", 3000, 1200, 2.0, 0.5),
("极端波动 (ATR=8000)", 8000, 1200, 2.0, 0.5),
]
print("波动性自适应阈值示例:")
print(f"{'市场状态':<20} {'ATR':>10} {'做空阈值':>10} {'做多阈值':>10}")
print("-" * 52)
for label, atr, baseline, short_base, long_base in example_signals:
short = compute_adaptive_threshold(short_base, atr, baseline, 2.0)
long = compute_adaptive_threshold(long_base, atr, baseline, 2.0)
print(f"{label:<20} {atr:>10.0f} {short:>10.2f} {long:>10.2f}")
输出示例:
波动性自适应阈值示例:
市场状态 ATR 做空阈值 做多阈值
----------------------------------------------------
平静市场 (ATR=500) 500.00 2.00 0.50
温和波动 (ATR=1200) 1200.00 2.50 0.40
剧烈波动 (ATR=3000) 3000.00 3.20 0.31
极端波动 (ATR=8000) 8000.00 3.86 0.26
这个机制解决了高波动期信号过于敏感的问题——在市场最疯狂、最需要冷静的时候,阈值自动收紧,信号更可靠。
6.3 不同币对的波动性基准
| 交易品种 | 基准 ATR(美元) | 说明 |
|---|---|---|
| BTC/USDT | 1,200-1,800 | 主流币,流动性最好 |
| ETH/USDT | 80-150 | 流动性次之,波动率较高 |
| SOL/USDT | 3-8 | 山寨币,流动性较差,depth 更薄 |
| BNB/USDT | 8-20 | 平台币,波动受市场情绪影响大 |
SOL/USDT 这样的低流动性标的,depth 数据噪声更大,压力比信号需要更长的窗口平滑才能过滤随机波动。
七、加密市场 vs 美股:订单簿结构的核心差异
回到本文最初的问题:加密市场的订单簿结构与美股有何不同?
这不是一个技术细节问题,而是市场微观结构的根本性差异,影响了你能在两类市场中"看见"什么。
7.1 深度对比
| 维度 | 美股(NYSE/NASDAQ) | 数字货币(Binance/OKX) |
|---|---|---|
| 数据可见性 | NBBO 单档最优报价,深度不透明 | 全档位(10-20 档)透明 |
| 做市机制 | 授权做市商(Designated Market Makers)隐藏报价 | 无授权做市商,任何人均可挂单 |
| 流动性来源 | 高频交易商、做市商内部撮合 | 散户 + 机构 + 套利机器人 |
| 订单簿噪声 | 低(做市商报价稳定) | 高(散户挂单随机性强) |
| depth 档位 | 1 档(TickDB 不支持多档美股 depth) | 10 档(TickDB 支持) |
| 暴跌期间行为 | 做市商撤单,价差扩大但数据仍被 NBBO 包裹 | 订单簿完全暴露,大户挂单行为可见 |
| 数据频率 | 实时 NBBO 推送(毫秒级) | WebSocket 全量推送(毫秒级) |
7.2 信号有效性的市场依赖
这直接解释了为什么压力比信号在数字货币市场比美股更有效:
美股的困境:你看到 NBBO 1 档数据。如果有大户在暗池下单、或者高频交易商在内部撮合,你无法从公开报价中感知。这意味着压力比在美股市场只能依赖 1 档数据的静态快照,信息量极其有限。
数字货币的优势:全 10 档数据让你能够:
- 提前识别积累:看见卖方 10 档正在堆积,即便买单也在支撑,但卖方堆得更快
- 观察档位突变:某一档突然挂单量暴增(可能是冰山订单的边缘),这是订单簿层面的先行信号
- 计算空间分布:压力比 > 1 但若卖方 9-10 档空,说明当前卖压集中在浅层,空方后劲不足
美股的压力比更像是一个"猜谜游戏",因为你看不见完整的冰山。数字货币的压力比更像是一张"X光片",你可以看到整个骨架。
7.3 风险提示
这种透明性也是双刃剑:
- 大户可见:机构知道你在看订单簿,可以通过拆单(拆成小单慢慢卖)来规避压力比信号
- 机器人博弈:套利机器人会故意在档位中挂单/撤单,制造虚假深度,引诱止损
- 跨交易所套利:BTC 在 Binance 和 OKX 的价格差异会瞬间被机器人抹平,depth 数据在两个交易所的表现可能完全不同
八、结语
订单簿是市场的一面镜子。
在美股市场,这面镜子被做市商的报价机制加了一层磨砂玻璃——你看得见轮廓,但看不见细节。在数字货币市场,这层磨砂被完全移除,每一笔挂单、每一个档位,都真实地呈现在屏幕上。
买卖压力比(BAPR)是一个在美股市场难以有效使用的信号,因为 1 档 NBBO 数据无法呈现流动性的空间结构。但同样的逻辑在 10 档透明的数字货币市场里,就变成了一张具有工程可操作性的信号图。
核心结论:
- 数字货币的 10 档 depth 数据提供了美股无法提供的空间维度
- 买卖压力比在加密市场是一个有效的先行信号,但其有效性随波动性变化
- 波动性自适应阈值是让策略在不同市场状态下保持鲁棒的关键
- 代码中的生产级工程要素(心跳、重连、限频处理)是实盘部署的必要条件
下一步行动
如果你希望将本文的代码投入实盘:
- 访问 tickdb.ai 注册(免费 API Key,无需信用卡)
- 使用 TickDB 的 WebSocket depth 频道替代本文的模拟数据
- 将
BidAskPressureCalculator集成到你的信号系统中 - 添加飞书/Slack 告警通知(
on_pressure_alert回调已预留接口)
如果你在回测中发现压力比信号在你的币对上表现不佳:
- 检查是否是流动性不足的山寨币(建议从 BTC/USDT 和 ETH/USDT 开始)
- 调整窗口大小(尝试 3 分钟、10 分钟而非固定 5 分钟)
- 对比不同交易时段的信号质量(币圈 24 小时交易,但亚盘和欧美盘流动性差异显著)
如果你习惯用 AI 辅助开发:
在 AI 助手中搜索安装 tickdb-market-data SKILL,可通过自然语言查询 TickDB 的深度数据接口和数字货币数据能力。
本文不构成任何投资建议。市场有风险,投资需谨慎。
本文使用的代码基于 TickDB API 生产级实现规范,包含完整的错误处理、重连机制和限频自适应。如需接入 TickDB 的深度数据,请访问 tickdb.ai 获取 API Key。