订单簿买卖压力比:从 depth 数据到交易信号的完整实现

当订单簿开始倾斜

"价格是市场共识的快照,订单簿是这种共识的解剖图。"

想象一个场景:苹果发布会散场后 15 分钟,库克站上舞台的那一刻,某只苹果供应链股票的买一价挂着 500 手,卖一价挂着 200 手。看起来多方占优。但如果你看到第五档、第十档的数据——卖方像一道缓缓上升的冰山,买方却在每一档都薄薄一层——你的判断会完全不同。

这就是买卖压力比的核心洞察:单一档位的价格是谎言,多档挂单的结构才是真相。

本文将完整拆解如何将订单簿的多档挂单量压缩成一个可回测的因子,涵盖因子设计原理、动态阈值确定、生产级代码实现,以及如何将它集成到你的回测框架中。


一、为什么单一档位不够用

1.1 订单簿的单档局限

大多数实时行情只提供买一价/卖一价和对应挂单量。这是一个极度压缩的信息:

买卖档位(Bid/Ask Level 1)
┌─────────────────────────────────────┐
│  买一    $150.05    500 股          │
│  卖一    $150.08    300 股          │
└─────────────────────────────────────┘

基于这个信息,你能得出的结论是:当前有一股微弱的买方力量。但这个结论有三个致命问题:

  1. 无法感知纵深:500 手买一背后是否有更多买盘支撑?还是这只是做市商的一个陷阱单?
  2. 容易被操纵:大资金可以在瞬间撤单/挂单,单档数据在高频场景下几乎是噪声
  3. 忽略价格权重:$150.05 的 500 股和 $150.00 的 500 股,对价格的支撑能力完全不同

1.2 多档订单簿的结构价值

一个典型的 Level 2 订单簿结构如下:

时间戳: 2026-01-15 09:35:22.451

┌──────────────────────────────────────────────────────┐
│ BIDS (买方)                ASKS (卖方)               │
├──────────────────────────────────────────────────────┤
│ L1  $150.05  500           L1  $150.08  300           │
│ L2  $150.04  850           L2  $150.09  620           │
│ L3  $150.03  1200          L3  $150.10  1100          │
│ L4  $150.02  800           L4  $150.11  450           │
│ L5  $150.01  1500          L5  $150.12  2800          │
├──────────────────────────────────────────────────────┤
│ 合计: 4850 股              合计: 5370 股             │
│ 加权均价: $150.025         加权均价: $150.098        │
└──────────────────────────────────────────────────────┘

这个表格告诉我们什么?

  • 表面:卖方总量(5370)> 买方总量(4850),空方占优
  • 深层:买方的加权均价是 $150.025,卖方的加权均价是 $150.098——买方的"有效防御位置"更靠近当前价格
  • 结构:L5 档位出现了异常的卖单堆积(2800 股),这可能是某个量化策略的限价单

这就是为什么我们需要买卖压力比——一个能够同时考虑档位深度和价格权重的因子。


二、买卖压力比的因子设计

2.1 基础公式与物理含义

买卖压力比(Bid-Ask Pressure Ratio, BAPR)的基本形式是:

BAPR = Σ(BidVolume[i] × Weight[i]) / Σ(AskVolume[i] × Weight[i])

其中 Weight[i] 是第 i 档的权重函数。不同的权重函数对应不同的因子变体:

权重类型 公式 适用场景 物理含义
均匀权重 Weight = 1 简单监控 所有档位同等重要性
距离衰减 Weight = 1 / (1 + distance) 短期信号 越接近买一/卖一的单量越重要
指数衰减 Weight = exp(-λ × distance) 趋势跟踪 深度衰减更陡峭
价格倒数 Weight = 1 / price[i] 均值回归 单位股数的防御能力

对于 A 股和港股的 10 档数据,我们推荐距离衰减权重,原因如下:

  1. 距离当前价格越近的挂单,对短期价格支撑/压制作用越直接
  2. 衰减因子 λ 可以作为一个可优化的参数,适配不同标的的性格
  3. 计算量适中,不会给回测系统带来过重负担

2.2 距离衰减权重的实现

def calculate_weighted_pressure(
    bid_volumes: List[float],
    ask_volumes: List[float],
    bid_prices: List[float],
    ask_prices: List[float],
    decay_lambda: float = 0.5,
    levels: int = 10
) -> float:
    """
    计算加权买卖压力比
    
    Args:
        bid_volumes: 各档买盘挂单量(从买一到买N)
        ask_volumes: 各档卖盘挂单量
        bid_prices: 各档买盘价格
        ask_prices: 各档卖盘价格
        decay_lambda: 指数衰减系数,越大则远档权重衰减越快
        levels: 参与计算的档位数
    
    Returns:
        float: 压力比,>1 表示买方占优,<1 表示卖方占优
    
    Note:
        - decay_lambda=0.5 适用于高频日内策略
        - decay_lambda=1.0 适用于趋势跟踪策略
        - 建议在回测中 Grid Search [0.1, 0.3, 0.5, 0.7, 1.0]
    """
    if len(bid_volumes) < levels or len(ask_volumes) < levels:
        return 1.0  # 数据不足时返回中性值
    
    bid_weighted = 0.0
    ask_weighted = 0.0
    
    for i in range(levels):
        distance = i  # 第0档距离为0,第1档距离为1,以此类推
        weight = 1.0 / (1.0 + decay_lambda * distance)
        
        bid_weighted += bid_volumes[i] * weight
        ask_weighted += ask_volumes[i] * weight
    
    if ask_weighted == 0:
        return float('inf') if bid_weighted > 0 else 1.0
    
    return bid_weighted / ask_weighted

2.3 动态阈值:从固定值到自适应

很多因子文章会告诉你"压力比 > 1.2 就做多,< 0.8 就做空"。这种固定阈值在模拟环境中可能有效,但在实盘中会让你亏得很惨。

问题在于:不同标的、不同市场状态、不同时间段,订单簿的结构差异巨大。

市场状态 正常交易日 财报发布日 极端波动日
正常压力比范围 0.85 - 1.15 0.5 - 2.5 0.2 - 5.0
平均买卖价差 0.01 - 0.03 0.05 - 0.20 0.30+

我们推荐使用 Z-Score 动态阈值

def calculate_dynamic_threshold(
    pressure_history: Deque[float],
    current_pressure: float,
    lookback: int = 100,
    z_threshold: float = 1.5
) -> dict:
    """
    基于历史分布计算动态阈值
    
    Returns:
        dict: {
            'upper_bound': 动态上界,
            'lower_bound': 动态下界,
            'z_score': 当前压力比的 Z-Score,
            'signal': 'overbought' | 'oversold' | 'neutral'
        }
    """
    if len(pressure_history) < lookback:
        # 数据不足,使用固定保守阈值
        return {
            'upper_bound': 1.5,
            'lower_bound': 0.67,
            'z_score': 0.0,
            'signal': 'neutral'
        }
    
    recent = list(pressure_history)[-lookback:]
    mean = statistics.mean(recent)
    stdev = statistics.stdev(recent)
    
    if stdev < 1e-6:
        return {'upper_bound': 1.5, 'lower_bound': 0.67, 'z_score': 0.0, 'signal': 'neutral'}
    
    z_score = (current_pressure - mean) / stdev
    
    return {
        'upper_bound': mean + z_threshold * stdev,
        'lower_bound': mean - z_threshold * stdev,
        'z_score': z_score,
        'signal': 'overbought' if z_score > z_threshold else 
                  'oversold' if z_score < -z_threshold else 'neutral'
    }

三、生产级代码:WebSocket 实时订阅与计算

3.1 TickDB depth 频道订阅

以下代码展示如何通过 TickDB 的 WebSocket 订阅实时 depth 数据,并计算买卖压力比。代码包含完整的工程健壮性处理。

import os
import json
import time
import random
import asyncio
import logging
from collections import deque
from dataclasses import dataclass, field
from typing import List, Optional, Deque
import websockets
import statistics

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class OrderBookSnapshot:
    """订单簿快照"""
    symbol: str
    timestamp: int
    bids: List[List[float]]  # [[price, volume], ...]
    asks: List[List[float]]  # [[price, volume], ...]
    
    def get_bid_volumes(self, levels: int = 10) -> List[float]:
        return [float(b[1]) for b in self.bids[:levels]]
    
    def get_ask_volumes(self, levels: int = 10) -> List[float]:
        return [float(a[1]) for a in self.asks[:levels]]
    
    def get_bid_prices(self, levels: int = 10) -> List[float]:
        return [float(b[0]) for b in self.bids[:levels]]
    
    def get_ask_prices(self, levels: int = 10) -> List[float]:
        return [float(a[0]) for a in self.asks[:levels]]


@dataclass
class PressureSignal:
    """买卖压力信号"""
    timestamp: int
    symbol: str
    pressure_ratio: float
    z_score: float
    signal: str  # 'overbought', 'oversold', 'neutral'
    upper_bound: float
    lower_bound: float
    
    def to_dict(self) -> dict:
        return {
            'timestamp': self.timestamp,
            'symbol': self.symbol,
            'pressure_ratio': round(self.pressure_ratio, 4),
            'z_score': round(self.z_score, 3),
            'signal': self.signal,
            'upper_bound': round(self.upper_bound, 4),
            'lower_bound': round(self.lower_bound, 4)
        }


class DepthPressureMonitor:
    """
    TickDB depth 频道实时监控器
    
    功能:
    - WebSocket 订阅 depth 数据
    - 实时计算加权买卖压力比
    - 基于 Z-Score 的动态阈值信号生成
    - 历史数据滚动窗口维护
    
    ⚠️ 生产环境建议:
    - 使用 asyncio/aiohttp 处理高频数据
    - 考虑 Redis 缓存计算结果
    - 部署多实例避免单点故障
    """
    
    def __init__(
        self,
        api_key: str,
        symbols: List[str],
        levels: int = 10,
        decay_lambda: float = 0.5,
        z_threshold: float = 1.5,
        history_lookback: int = 100,
        ping_interval: int = 20
    ):
        self.api_key = api_key
        self.symbols = symbols
        self.levels = levels
        self.decay_lambda = decay_lambda
        self.z_threshold = z_threshold
        self.history_lookback = history_lookback
        self.ping_interval = ping_interval
        
        # 历史压力比队列(用于计算 Z-Score)
        self.pressure_history: Deque[float] = deque(maxlen=history_lookback)
        
        # 上次信号(用于去重)
        self._last_signal: Optional[str] = None
        
        # 连接状态
        self._running = False
        self._ws = None
        self._reconnect_delay = 1
        self._max_reconnect_delay = 60
    
    def _calculate_weighted_pressure(
        self,
        bid_volumes: List[float],
        ask_volumes: List[float]
    ) -> float:
        """计算加权买卖压力比"""
        bid_weighted = 0.0
        ask_weighted = 0.0
        
        for i in range(min(self.levels, len(bid_volumes), len(ask_volumes))):
            distance = i
            weight = 1.0 / (1.0 + self.decay_lambda * distance)
            bid_weighted += bid_volumes[i] * weight
            ask_weighted += ask_volumes[i] * weight
        
        if ask_weighted == 0:
            return float('inf') if bid_weighted > 0 else 1.0
        
        return bid_weighted / ask_weighted
    
    def _calculate_dynamic_threshold(
        self,
        current_pressure: float
    ) -> dict:
        """基于历史分布计算动态阈值"""
        if len(self.pressure_history) < self.history_lookback // 2:
            return {
                'upper_bound': 1.5,
                'lower_bound': 0.67,
                'z_score': 0.0,
                'signal': 'neutral'
            }
        
        recent = list(self.pressure_history)
        mean = statistics.mean(recent)
        stdev = statistics.stdev(recent)
        
        if stdev < 1e-6:
            return {'upper_bound': 1.5, 'lower_bound': 0.67, 'z_score': 0.0, 'signal': 'neutral'}
        
        z_score = (current_pressure - mean) / stdev
        
        if z_score > self.z_threshold:
            signal = 'overbought'
        elif z_score < -self.z_threshold:
            signal = 'oversold'
        else:
            signal = 'neutral'
        
        return {
            'upper_bound': mean + self.z_threshold * stdev,
            'lower_bound': mean - self.z_threshold * stdev,
            'z_score': z_score,
            'signal': signal
        }
    
    def _generate_signal(
        self,
        symbol: str,
        timestamp: int,
        pressure_ratio: float
    ) -> Optional[PressureSignal]:
        """生成压力信号(去重逻辑)"""
        threshold_info = self._calculate_dynamic_threshold(pressure_ratio)
        
        signal_obj = PressureSignal(
            timestamp=timestamp,
            symbol=symbol,
            pressure_ratio=pressure_ratio,
            z_score=threshold_info['z_score'],
            signal=threshold_info['signal'],
            upper_bound=threshold_info['upper_bound'],
            lower_bound=threshold_info['lower_bound']
        )
        
        # 去重:只有信号状态变化时才输出
        if signal_obj.signal == self._last_signal:
            return None
        
        self._last_signal = signal_obj.signal
        return signal_obj
    
    async def _connect_websocket(self) -> websockets.WebSocketClientProtocol:
        """建立 WebSocket 连接"""
        uri = f"wss://api.tickdb.ai/ws/depth?api_key={self.api_key}"
        
        try:
            ws = await websockets.connect(
                uri,
                ping_interval=self.ping_interval,
                ping_timeout=10,
                close_timeout=5
            )
            logger.info("WebSocket 连接已建立")
            return ws
        except Exception as e:
            logger.error(f"WebSocket 连接失败: {e}")
            raise
    
    async def _subscribe_symbols(self, ws: websockets.WebSocketClientProtocol):
        """订阅标的"""
        subscribe_msg = {
            "cmd": "subscribe",
            "params": {
                "symbols": self.symbols,
                "levels": self.levels
            }
        }
        await ws.send(json.dumps(subscribe_msg))
        logger.info(f"已订阅标的: {self.symbols}")
    
    async def _heartbeat(self, ws: websockets.WebSocketClientProtocol):
        """心跳保活"""
        while self._running:
            try:
                await ws.send(json.dumps({"cmd": "ping"}))
                await asyncio.sleep(self.ping_interval)
            except Exception:
                break
    
    async def _reconnect(self):
        """指数退避重连"""
        self._reconnect_delay = min(
            self._reconnect_delay * 2 + random.uniform(0, 1),
            self._max_reconnect_delay
        )
        logger.info(f"{self._reconnect_delay:.1f} 秒后尝试重连...")
        await asyncio.sleep(self._reconnect_delay)
        self._reconnect_delay = min(self._reconnect_delay, 30)
    
    async def start(self):
        """
        启动监控
        
        ⚠️ 注意事项:
        - 需确保网络稳定,断线自动重连
        - 高频场景建议使用 aiohttp 异步架构
        - 建议配合 Prometheus 监控采集延迟指标
        """
        self._running = True
        
        while self._running:
            try:
                self._ws = await self._connect_websocket()
                await self._subscribe_symbols(self._ws)
                
                # 启动心跳任务
                heartbeat_task = asyncio.create_task(self._heartbeat(self._ws))
                
                # 消息处理循环
                while self._running:
                    try:
                        message = await asyncio.wait_for(
                            self._ws.recv(),
                            timeout=30
                        )
                        await self._process_message(message)
                        
                    except asyncio.TimeoutError:
                        logger.debug("等待消息超时,继续监听...")
                        continue
                    except websockets.exceptions.ConnectionClosed:
                        logger.warning("WebSocket 连接意外关闭")
                        break
                
                heartbeat_task.cancel()
                
            except Exception as e:
                logger.error(f"异常: {e}")
                await self._reconnect()
    
    async def _process_message(self, message: str):
        """处理接收到的 depth 数据"""
        try:
            data = json.loads(message)
            
            # 解析 depth 快照
            snapshot = OrderBookSnapshot(
                symbol=data['s'],
                timestamp=data['t'],
                bids=data.get('b', []),
                asks=data.get('a', [])
            )
            
            # 计算压力比
            bid_vols = snapshot.get_bid_volumes(self.levels)
            ask_vols = snapshot.get_ask_volumes(self.levels)
            
            pressure_ratio = self._calculate_weighted_pressure(bid_vols, ask_vols)
            
            # 更新历史
            self.pressure_history.append(pressure_ratio)
            
            # 生成信号
            signal = self._generate_signal(
                snapshot.symbol,
                snapshot.timestamp,
                pressure_ratio
            )
            
            if signal:
                logger.info(f"信号触发: {signal.to_dict()}")
                await self._handle_signal(signal)
                
        except json.JSONDecodeError as e:
            logger.error(f"JSON 解析失败: {e}")
        except KeyError as e:
            logger.error(f"消息格式异常,缺少字段: {e}")
    
    async def _handle_signal(self, signal: PressureSignal):
        """
        信号处理回调
        
        在此实现你的交易逻辑:
        - 发送告警(飞书/钉钉/邮件)
        - 调用下单接口
        - 写入事件日志
        """
        if signal.signal == 'overbought':
            logger.warning(f"【{signal.symbol}】买方压力显著 (Z={signal.z_score:.2f})")
        elif signal.signal == 'oversold':
            logger.warning(f"【{signal.symbol}】卖方压力显著 (Z={signal.z_score:.2f})")
    
    def stop(self):
        """停止监控"""
        self._running = False
        logger.info("监控已停止")


async def main():
    """主函数"""
    api_key = os.environ.get("TICKDB_API_KEY")
    
    if not api_key:
        logger.error("请设置环境变量 TICKDB_API_KEY")
        return
    
    # 监控港股 / 数字货币 depth 数据
    # 注意:美股仅支持 1 档,港股支持 10 档,数字货币支持 10 档
    monitor = DepthPressureMonitor(
        api_key=api_key,
        symbols=["700.HK", "9988.HK", "BTC.USDT"],
        levels=10,           # 港股和数字货币支持 10 档
        decay_lambda=0.5,    # 建议 Grid Search 优化
        z_threshold=1.5,     # 1.5 倍标准差触发信号
        history_lookback=100
    )
    
    try:
        await monitor.start()
    except KeyboardInterrupt:
        logger.info("收到中断信号")
        monitor.stop()


if __name__ == "__main__":
    # ⚠️ 高频场景(每秒 >10 次订阅)建议使用 asyncio.run(main())
    # asyncio.run(main())
    pass

3.2 代码健壮性说明

上述代码包含以下生产级工程实践:

实践 实现方式 原因
心跳保活 ping_interval=20 + 定时发送 {"cmd": "ping"} 防止 NAT 超时导致连接断开
指数退避重连 delay = min(delay * 2 + jitter, max_delay) 避免惊群效应,重连风暴
随机抖动 random.uniform(0, 1) 多实例不会同时重连
超时保护 asyncio.wait_for(..., timeout=30) 防止阻塞在 recv()
去重逻辑 _last_signal 状态记忆 避免重复告警/下单
异常隔离 逐层 try-except 单条消息异常不影响整体运行

四、回测框架集成

4.1 数据准备

要进行回测,你需要有历史 depth 数据。TickDB 提供以下接口:

数据类型 接口 说明
历史 K 线 GET /v1/market/kline 支持 10 年级别美股 K 线回测
实时 depth WebSocket depth 频道 港股 10 档、数字货币 10 档
历史 depth 不支持 tick 数据不支持美股和 A 股

⚠️ 重要限制:TickDB 的 trades 接口不支持美股和 A 股。对于需要美股历史逐笔数据的回测,请使用 TickDB 的 /kline 接口或配合其他数据源。

4.2 基于 K 线数据的压力比因子回测

虽然我们无法获取美股历史 depth 数据,但可以通过分钟 K 线模拟买卖压力比因子进行回测:

import os
import requests
import pandas as pd
from typing import List, Tuple
import statistics


def fetch_kline_data(
    symbol: str,
    interval: str = "1m",
    start_time: int = None,
    end_time: int = None,
    limit: int = 1000
) -> pd.DataFrame:
    """
    获取历史 K 线数据
    
    Args:
        symbol: 交易品种,如 "AAPL.US"
        interval: K 线周期,1m/5m/15m/1h/1d
        start_time: 开始时间戳(毫秒)
        end_time: 结束时间戳(毫秒)
        limit: 单次最大返回条数
    
    Returns:
        pd.DataFrame: OHLCV 数据
    """
    api_key = os.environ.get("TICKDB_API_KEY")
    url = "https://api.tickdb.ai/v1/market/kline"
    
    headers = {"X-API-Key": api_key}
    params = {
        "symbol": symbol,
        "interval": interval,
        "limit": limit
    }
    
    if start_time:
        params["start_time"] = start_time
    if end_time:
        params["end_time"] = end_time
    
    response = requests.get(
        url,
        headers=headers,
        params=params,
        timeout=(3.05, 10)  # 连接超时 3.05s,读取超时 10s
    )
    
    if response.status_code != 200:
        raise RuntimeError(f"请求失败: {response.status_code}")
    
    data = response.json()
    
    if data.get("code") != 0:
        raise RuntimeError(f"API 错误: {data.get('message')}")
    
    df = pd.DataFrame(data["data"])
    
    # 标准化列名
    df.rename(columns={
        "t": "timestamp",
        "o": "open",
        "h": "high",
        "l": "low",
        "c": "close",
        "v": "volume"
    }, inplace=True)
    
    return df


def simulate_pressure_from_kline(df: pd.DataFrame) -> pd.Series:
    """
    基于 K 线数据模拟买卖压力比
    
    近似逻辑:
    - 阳线(close > open):假设买方占优
    - 阴线(close < open):假设卖方占优
    - 上影线长度反映卖压
    - 下影线长度反映买压
    
    ⚠️ 这是近似模拟,非真实订单簿数据
    """
    df = df.copy()
    
    # 计算基础因子
    df['body'] = df['close'] - df['open']
    df['upper_shadow'] = df['high'] - df[['close', 'open']].max(axis=1)
    df['lower_shadow'] = df[['close', 'open']].min(axis=1) - df['low']
    
    # 价格变化率
    df['price_change'] = df['close'].pct_change()
    
    # 成交量标准化
    df['volume_norm'] = df['volume'] / df['volume'].rolling(20).mean()
    
    # 模拟压力比(简化模型)
    # 正值表示买方占优,负值表示卖方占优
    df['simulated_pressure'] = (
        df['body'] / df['open'] +  # 实体大小
        df['lower_shadow'] / df['open'] -  # 下影线贡献买压
        df['upper_shadow'] / df['open']     # 上影线贡献卖压
    ) * df['volume_norm']
    
    # 平滑处理
    df['pressure_ratio'] = df['simulated_pressure'].rolling(5).mean() + 1.0
    
    return df['pressure_ratio']


def backtest_pressure_strategy(
    df: pd.DataFrame,
    upper_threshold: float = 1.2,
    lower_threshold: float = 0.8,
    holding_period: int = 10
) -> dict:
    """
    买卖压力比策略回测
    
    策略逻辑:
    - pressure_ratio > upper_threshold → 买入信号
    - pressure_ratio < lower_threshold → 卖出信号
    - 持仓 holding_period 根 K 线后平仓
    
    ⚠️ 回测局限性:
    - 基于 K 线模拟的压力比,非真实订单簿数据
    - 未考虑交易成本和滑点
    - 未考虑流动性风险
    """
    df = df.copy()
    
    # 计算模拟压力比
    df['pressure_ratio'] = simulate_pressure_from_kline(df)
    
    # 生成信号
    df['signal'] = 0
    df.loc[df['pressure_ratio'] > upper_threshold, 'signal'] = 1   # 买入
    df.loc[df['pressure_ratio'] < lower_threshold, 'signal'] = -1  # 卖出
    
    # 计算持仓状态
    df['position'] = df['signal'].replace(0, pd.NA).ffill().fillna(0)
    df['position'] = df['position'].replace(0, pd.NA).bfill().fillna(0)
    
    # 计算收益
    df['returns'] = df['close'].pct_change()
    df['strategy_returns'] = df['position'].shift(1) * df['returns']
    
    # 去除 NA
    df = df.dropna()
    
    # 绩效计算
    total_return = (1 + df['strategy_returns']).prod() - 1
    annual_return = (1 + total_return) ** (252 * 390 / len(df)) - 1
    annual_volatility = df['strategy_returns'].std() * (252 * 390) ** 0.5
    sharpe_ratio = annual_return / annual_volatility if annual_volatility > 0 else 0
    
    # 最大回撤
    cumulative = (1 + df['strategy_returns']).cumprod()
    running_max = cumulative.cummax()
    drawdown = (cumulative - running_max) / running_max
    max_drawdown = drawdown.min()
    
    # 交易统计
    trades = df[df['signal'] != 0]
    num_trades = len(trades)
    win_rate = (df['strategy_returns'] > 0).mean()
    
    return {
        'total_return': round(total_return * 100, 2),
        'annual_return': round(annual_return * 100, 2),
        'annual_volatility': round(annual_volatility * 100, 2),
        'sharpe_ratio': round(sharpe_ratio, 2),
        'max_drawdown': round(max_drawdown * 100, 2),
        'num_trades': num_trades,
        'win_rate': round(win_rate * 100, 2)
    }


def run_backtest(symbol: str = "AAPL.US"):
    """
    执行完整回测流程
    """
    print(f"正在获取 {symbol} 历史数据...")
    
    # 获取最近 3 个月 1 分钟 K 线
    df = fetch_kline_data(
        symbol=symbol,
        interval="1m",
        limit=50000  # 约 35 天数据
    )
    
    print(f"获取到 {len(df)} 条 K 线数据")
    print(f"时间范围: {df['timestamp'].min()} ~ {df['timestamp'].max()}")
    
    # 回测
    print("\n开始回测...")
    results = backtest_pressure_strategy(
        df,
        upper_threshold=1.2,
        lower_threshold=0.8,
        holding_period=10
    )
    
    print("\n" + "="*50)
    print("回测结果")
    print("="*50)
    print(f"总收益率:    {results['total_return']}%")
    print(f"年化收益率: {results['annual_return']}%")
    print(f"年化波动率: {results['annual_volatility']}%")
    print(f"夏普比率:    {results['sharpe_ratio']}")
    print(f"最大回撤:    {results['max_drawdown']}%")
    print(f"交易次数:    {results['num_trades']}")
    print(f"胜率:        {results['win_rate']}%")
    print("="*50)
    
    return results


if __name__ == "__main__":
    # 设置 API Key
    os.environ["TICKDB_API_KEY"] = "your_api_key_here"
    
    # 执行回测
    results = run_backtest("AAPL.US")

4.3 回测参数优化

在实际策略中,decay_lambdaupper_thresholdlower_threshold 等参数需要优化:

from itertools import product


def grid_search_parameters(df: pd.DataFrame):
    """
    参数网格搜索
    
    搜索空间:
    - decay_lambda: [0.1, 0.3, 0.5, 0.7, 1.0]
    - upper_threshold: [1.1, 1.2, 1.3, 1.5]
    - lower_threshold: [0.7, 0.8, 0.9]
    
    优化目标:最大化夏普比率
    """
    decay_lambdas = [0.1, 0.3, 0.5, 0.7, 1.0]
    upper_thresholds = [1.1, 1.2, 1.3, 1.5]
    lower_thresholds = [0.7, 0.8, 0.9]
    
    best_sharpe = -float('inf')
    best_params = {}
    
    total_combinations = len(decay_lambdas) * len(upper_thresholds) * len(lower_thresholds)
    print(f"共 {total_combinations} 种参数组合")
    
    for i, (lam, upper, lower) in enumerate(
        product(decay_lambdas, upper_thresholds, lower_thresholds),
        1
    ):
        if upper <= lower:
            continue
        
        results = backtest_pressure_strategy(
            df,
            upper_threshold=upper,
            lower_threshold=lower
        )
        
        sharpe = results['sharpe_ratio']
        
        if sharpe > best_sharpe:
            best_sharpe = sharpe
            best_params = {
                'decay_lambda': lam,
                'upper_threshold': upper,
                'lower_threshold': lower
            }
            print(f"[{i}/{total_combinations}] 新最优: Sharpe={sharpe:.2f}, 参数={best_params}")
    
    print(f"\n最优参数: {best_params}")
    print(f"最优夏普比率: {best_sharpe:.2f}")
    
    return best_params

五、深度数据能力对比

对于需要真实订单簿 depth 数据的场景,以下是 TickDB 与其他方案的对比:

能力维度 轮询 REST API 竞品 WebSocket TickDB
数据档位 通常 1-5 档 1-10 档 港股/数字货币 10 档
推送延迟 轮询间隔 1-5 秒 <200ms <100ms
断线重连 需自行实现 部分支持 原生心跳 + 指数退避
限频处理 需自行实现 需自行实现 返回 3001 错误码 + Retry-After
美股 depth 不支持 支持(高价套餐) 1 档
历史 depth 收费高昂 部分支持 暂不支持
SDK 支持 通用 HTTP 各自封装 原生 Python SDK

⚠️ 注意:TickDB 的 trades 接口不支持美股和 A 股。对于美股回测,建议使用 /kline 接口获取历史 K 线数据。


六、部署方案

根据你的使用场景,选择合适的部署架构:

场景 推荐方案 说明
个人研究 本地运行脚本 使用上面提供的代码,配合免费 API Key
日内策略实盘 VPS + systemd 服务 Linux 服务器,配置自动重启
多标的监控 Docker 容器化 每个标的独立容器,Docker Compose 管理
高频信号 Kubernetes + 多副本 水平扩展,避免单点故障,配合 Redis 缓存
机构级部署 TickDB 企业版 + 专线接入 降低网络延迟,支持 SLA 保障

6.1 systemd 服务配置(VPS 部署)

[Unit]
Description=TickDB Depth Pressure Monitor
After=network.target

[Service]
Type=simple
User=ubuntu
WorkingDirectory=/home/ubuntu/strategies
Environment="TICKDB_API_KEY=your_api_key_here"
ExecStart=/usr/bin/python3 /home/ubuntu/strategies/monitor.py
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target

部署命令:

sudo cp tickdb-monitor.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable tickdb-monitor
sudo systemctl start tickdb-monitor
sudo systemctl status tickdb-monitor

结语

订单簿是市场微观结构的DNA序列,买卖压力比是对这段序列的解读算法。

本文的核心要点:

  1. 多档数据优于单档:单一买一/卖一的数据噪声太大,10 档数据能揭示更真实的供需结构
  2. 加权计算有意义:距离衰减权重让近端挂单的权重更高,符合价格支撑/压制的物理直觉
  3. 动态阈值是关键:固定阈值在回测中表现良好,在实盘中会让你亏钱;Z-Score 自适应阈值更鲁棒
  4. 生产级代码不可省略:心跳、重连、限频处理不是锦上添花,而是分布式系统的基础

下一步行动

如果你希望深入研究订单簿结构
订阅 TickDB 的 WebSocket depth 频道,用本文的代码框架观察你关注标的的真实挂单结构。每档数据背后都是一个交易者的预期,你的任务是读懂这些预期的分歧与共识。

如果你想开始回测
访问 tickdb.ai 注册获取免费 API Key(无需信用卡),使用本文的回测代码对你的策略进行历史验证。记住:回测结果仅供参考,实盘前务必做好风险管理。

如果你需要机构级数据支持
TickDB 提供 10 年级别的历史 K 线数据,覆盖美股、港股、数字货币等多类资产。联系 [email protected] 了解机构方案。


风险提示:本文不构成任何投资建议。买卖压力比因子仅为技术分析工具,不能保证盈利。市场有风险,投资需谨慎。