订单簿买卖压力比:从 depth 数据到交易信号的完整实现

价格是结果,订单簿是原因。

每一个 TickDB depth 频道推送的快照,都是交易战场上多空双方实时摆出的阵地图——买一卖一所呈现的不只是当前价格,更隐含着市场参与者的心理边界、聪明钱的布局意图,以及流动性在微观层面收缩或扩张的节奏。

对于量化开发者而言,订单簿是一座富矿。问题在于,这座矿藏的 raw ore(原始矿石)并不能直接投入策略:高频推送的多档挂单量,如果逐帧处理,数据噪声会淹没真实信号;如果仅看买卖价差,又丢失了深度结构中埋藏的信息。

本文的目标,是将多档挂单量压缩成一个可回测、可落地、有物理直觉支撑的因子:买卖压力比(Bid-Ask Pressure Ratio)。我们从订单簿数据的结构解读出发,推导加权压力比的数学构造,给出生产级的 WebSocket 订阅代码与滑动窗口计算逻辑,最后将它嵌入回测框架,完成一个完整的事件驱动策略回测闭环。


一、为什么只看买卖价差远远不够

在进入技术细节之前,有必要先说清楚为什么我们需要多档数据,而不能止步于买卖价差(Bid-Ask Spread)。

1.1 买卖价差的局限

买卖价差反映的是最优报价之间的距离,可以衡量当前市场摩擦的宽窄,但它是标量,丢失了方向信息:

spread = ask[0] - bid[0]

假设两个时刻的 spread 都是 $0.02,但我们无法区分以下两种情况:

时刻 盘口状态 买卖价差 实际情况
T1 买一 100.00(5000股) vs 卖一 100.02(5000股) 0.02 多空均衡
T2 买一 100.00(2000股) vs 卖一 100.02(8000股) 0.02 空方压力明显更大

T1 和 T2 的 spread 相同,但市场格局截然不同。价差不会告诉我们这些。只有多档挂单量才能揭示深度结构中的偏压。

1.2 depth 频道的结构优势

TickDB 的 depth 频道按档位推送订单簿快照,每个档位包含 bid_pricebid_volumeask_priceask_volume。以港股为例,最大可获取 10 档深度:

depth snapshot:
  bids: [ {price: 100.00, volume: 2000}, {price: 99.98, volume: 1500}, ... ]
  asks: [ {price: 100.02, volume: 8000}, {price: 100.04, volume: 3000}, ... ]

这 10 档数据,构成了我们构建买卖压力比的核心原料。


二、买卖压力比的数学构造

2.1 最简单的版本:单档压力比

最直觉的定义就是买一量除以卖一量:

$$
P_{simple} = \frac{Vol_{bid[0]}}{Vol_{ask[0]}}
$$

但这个指标有明显的缺陷:单档数据极易被大单冲击产生尖峰噪声。例如某机构投资者在买一位置下了一个 50000 股的 iceberg order,你的压力比瞬间从 1.0 飙升到 5.0,但这个大单可能只是一分钟内唯一的一笔成交,并不能反映真实的市场偏向。

2.2 加权压力比:引入档位衰减

一个更稳健的设计是引入档位衰减权重。直觉上,越靠近最优报价的挂单对短期价格影响越大,越远端的挂单影响力越弱。我们用指数衰减来建模这个直觉:

$$
P_{weighted} = \frac{\sum_{i=0}^{N-1} Vol_{bid[i]} \cdot e^{-\lambda \cdot i}}{\sum_{i=0}^{N-1} Vol_{ask[i]} \cdot e^{-\lambda \cdot i}}
$$

其中:

  • $N$ 是档位数(TickDB 港股最大 10 档)
  • $\lambda$ 是衰减系数,控制档位权重的衰减速度
  • $i=0$ 对应最优档(买一/卖一),$i=1$ 对应买二/卖二,以此类推

当 $\lambda = 0$ 时,退化为简单求和;当 $\lambda$ 增大时,前档权重急剧增大,远档几乎不贡献。

实践中 $\lambda$ 的经验取值

λ 取值 前三档权重占比 适用场景
0.1 ~74% 趋势跟随,偏短线
0.3 ~58% 中性,通用场景
0.5 ~46% 趋势反转,偏长线

2.3 距离调整因子:消除价格水平差异

跨股票比较压力比时还存在一个问题:不同股票的价格水平差异巨大。一只 $500 的股票买一量 1000 股和一只 $5 的股票买一量 1000 股,绝对数量相同但相对市场影响力完全不同。

我们引入一个距离调整因子

$$
D = \frac{mid_price}{best_price \cdot N}
$$

修正后的加权压力比:

$$
P_{adjusted} = P_{weighted} \times (1 + D)
$$

在实际实现中,更常见的做法是使用相对档位深度——将每一档的挂单量标准化为该档价格的百分比偏移量:

# 简化的距离调整实现
def normalize_depth(depth_snapshot, side='bid'):
    """
    将挂单量按价格偏移比例归一化
    depth_snapshot: TickDB depth 频道推送的单个快照
    """
    entries = depth_snapshot[side]  # list of {price, volume}
    if not entries:
        return []

    best_price = entries[0]['price']
    result = []
    for entry in entries:
        # 计算相对于最优档的价格偏移(用基点 bps 表示)
        offset_bps = ((entry['price'] - best_price) / best_price) * 10000
        # 归一化量:volume / best_price(每股价格归一化)
        normalized_vol = entry['volume'] / best_price
        result.append({
            'offset_bps': offset_bps,
            'normalized_vol': normalized_vol
        })
    return result

2.4 物理直觉

为什么加权压力比有效?它的物理直觉在于:订单簿是未成交订单的快照,而未成交订单反映了交易者愿意以特定价格成交的意愿强度

当买方持续在更高价位堆积挂单(压力比 > 1),说明聪明钱在逢低布局——他们在用未成交的限价单表达"这个价格我愿意买"的立场。反之,卖方堆积意味着"这个价格我愿意卖"的看空意图。多档加权的目的是过滤噪声,捕获更持续的结构性偏压。


三、生产级订阅代码

3.1 WebSocket 订阅 depth 频道

以下是完整的 WebSocket 订阅代码,严格遵循 TickDB 的鉴权规范和工程健壮性要求:

import os
import json
import time
import random
import logging
import threading
from datetime import datetime
from websocket import create_connection, WebSocketApp, WebSocketException

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)


class DepthSubscriber:
    """
    TickDB WebSocket 订阅器:depth 频道
    工程特性:
    - 心跳保活(ping/pong)
    - 指数退避 + 抖动重连
    - 限频处理(code:3001 + Retry-After)
    - 线程安全的数据传递
    """

    def __init__(self, symbol: str, depth: int = 10):
        self.symbol = symbol
        self.depth = depth  # 港股最大10档,美股1档
        self.api_key = os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("请设置环境变量 TICKDB_API_KEY")

        self.ws = None
        self.running = False
        self._last_depth = None
        self._lock = threading.Lock()
        self._retry_count = 0
        self._max_retries = 10
        self._base_delay = 1
        self._max_delay = 60

        # 回调函数注册表
        self._callbacks = []

    def _get_ws_url(self) -> str:
        """构造 WebSocket URL(api_key 作为 URL 参数)"""
        return f"wss://api.tickdb.ai/ws?api_key={self.api_key}"

    def subscribe(self):
        """启动订阅(心跳 + 订阅命令)"""
        self.ws = create_connection(self._get_ws_url())
        self.running = True
        self._retry_count = 0

        # 订阅 depth 频道(可指定档位数)
        sub_cmd = {
            "cmd": "subscribe",
            "param": {
                "symbol": self.symbol,
                "depth": self.depth,
                "channel": "depth"
            }
        }
        self.ws.send(json.dumps(sub_cmd))
        logger.info(f"订阅成功:{self.symbol} depth × {self.depth} 档")

        # 启动心跳和接收线程
        threading.Thread(target=self._heartbeat_loop, daemon=True).start()
        threading.Thread(target=self._receive_loop, daemon=True).start()

    def _heartbeat_loop(self):
        """心跳保活:每 15 秒发送一次 ping"""
        while self.running and self.ws and self.ws.connected:
            try:
                self.ws.send(json.dumps({"cmd": "ping"}))
                time.sleep(15)
            except Exception:
                break

    def _receive_loop(self):
        """接收并分发 depth 快照"""
        while self.running:
            try:
                raw = self.ws.recv()
                msg = json.loads(raw)

                # 处理限频响应
                if msg.get("code") == 3001:
                    retry_after = int(msg.get("headers", {}).get(
                        "Retry-After",
                        self._base_delay * 2
                    ))
                    logger.warning(f"触发限频,等待 {retry_after}s")
                    time.sleep(retry_after)
                    continue

                # 处理 pong
                if msg.get("type") == "pong":
                    continue

                # 分发 depth 数据
                data = msg.get("data", {})
                if data.get("symbol") == self.symbol:
                    with self._lock:
                        self._last_depth = data
                    for cb in self._callbacks:
                        try:
                            cb(data)
                        except Exception as e:
                            logger.error(f"回调执行失败:{e}")

            except WebSocketException as e:
                logger.warning(f"WebSocket 断开:{e},进入重连流程")
                self.ws = None
                self._reconnect()
            except Exception as e:
                logger.error(f"接收异常:{e}")
                time.sleep(1)

    def _reconnect(self):
        """指数退避 + 抖动重连"""
        if self._retry_count >= self._max_retries:
            logger.error("重试次数耗尽,订阅终止")
            self.running = False
            return

        delay = min(self._base_delay * (2 ** self._retry_count), self._max_delay)
        jitter = random.uniform(0, delay * 0.1)
        wait = delay + jitter

        self._retry_count += 1
        logger.info(f"重连倒计时:{wait:.1f}s(第 {self._retry_count} 次)")

        time.sleep(wait)

        try:
            self.ws = create_connection(self._get_ws_url())
            self.running = True
            self._retry_count = 0  # 重置重试计数

            self.ws.send(json.dumps({
                "cmd": "subscribe",
                "param": {
                    "symbol": self.symbol,
                    "depth": self.depth,
                    "channel": "depth"
                }
            }))
            logger.info("重连成功")

            threading.Thread(target=self._heartbeat_loop, daemon=True).start()
            threading.Thread(target=self._receive_loop, daemon=True).start()

        except Exception as e:
            logger.error(f"重连失败:{e}")
            self._reconnect()  # 递归重试

    def on_depth(self, callback):
        """注册 depth 快照回调"""
        self._callbacks.append(callback)

    def get_latest(self):
        """获取最新快照(线程安全)"""
        with self._lock:
            return self._last_depth

    def stop(self):
        """停止订阅"""
        self.running = False
        if self.ws:
            self.ws.close()
            logger.info("订阅已停止")


# ⚠️ 生产环境高频场景建议使用 aiohttp/asyncio 异步架构
# ⚠️ 多标的场景建议使用连接池管理而非单实例

3.2 回调中计算加权压力比

订阅器收到快照后,在回调中计算压力比:

import math


def compute_weighted_pressure_ratio(depth_data: dict, lambda_decay: float = 0.3) -> dict:
    """
    计算加权买卖压力比

    参数:
        depth_data: TickDB depth 频道推送的快照
        lambda_decay: 档位衰减系数(默认0.3,通用场景)

    返回:
        {
            'weighted_pressure': float,  # 加权压力比
            'simple_pressure': float,     # 简单压力比(买一/卖一)
            'bid_total_weighted': float,  # 加权买方总量
            'ask_total_weighted': float,  # 加权卖方总量
            'timestamp': str
        }
    """
    bids = depth_data.get("bids", [])
    asks = depth_data.get("asks", [])

    if not bids or not asks:
        return {"error": "depth 数据不完整"}

    # 加权求和
    bid_weighted = sum(
        entry["volume"] * math.exp(-lambda_decay * i)
        for i, entry in enumerate(bids)
    )
    ask_weighted = sum(
        entry["volume"] * math.exp(-lambda_decay * i)
        for i, entry in enumerate(asks)
    )

    # 简单压力比(单档)
    simple_bid = bids[0]["volume"]
    simple_ask = asks[0]["volume"]

    weighted_pressure = bid_weighted / ask_weighted if ask_weighted > 0 else 0
    simple_pressure = simple_bid / simple_ask if simple_ask > 0 else 0

    return {
        "weighted_pressure": round(weighted_pressure, 4),
        "simple_pressure": round(simple_pressure, 4),
        "bid_total_weighted": round(bid_weighted, 2),
        "ask_total_weighted": round(ask_weighted, 2),
        "timestamp": depth_data.get("timestamp", datetime.now().isoformat())
    }


# 示例回调:实时打印压力比
def on_depth_snapshot(depth_data: dict):
    result = compute_weighted_pressure_ratio(depth_data, lambda_decay=0.3)
    if "error" not in result:
        logger.info(
            f"[{result['timestamp']}] "
            f"加权压力比={result['weighted_pressure']:.3f} | "
            f"简单压力比={result['simple_pressure']:.3f}"
        )


# 启动订阅
if __name__ == "__main__":
    subscriber = DepthSubscriber(symbol="700.HK", depth=10)
    subscriber.on_depth(on_depth_snapshot)
    subscriber.subscribe()

    try:
        while True:
            time.sleep(30)
    except KeyboardInterrupt:
        subscriber.stop()

四、滑动窗口与动态阈值

4.1 为什么需要滑动窗口

单点压力比仍然有噪声。一个更好的做法是计算滑动窗口内的压力比均值或中位数,以过滤瞬时冲击。

from collections import deque
from statistics import median


class PressureRatioWindow:
    """
    滑动窗口版买卖压力比
    - 维护固定大小的历史快照队列
    - 支持均值和中位数两种聚合方式
    - 提供偏离度计算(当前值距均值的标准差倍数)
    """

    def __init__(self, window_size: int = 60, lambda_decay: float = 0.3):
        """
        参数:
            window_size: 滑动窗口大小(快照数量)
            lambda_decay: 档位衰减系数
        """
        self.window_size = window_size
        self.lambda_decay = lambda_decay
        self._history = deque(maxlen=window_size)

    def update(self, depth_data: dict) -> dict:
        """更新窗口,返回当前窗口统计量"""
        result = compute_weighted_pressure_ratio(depth_data, self.lambda_decay)
        if "error" not in result:
            self._history.append(result["weighted_pressure"])

        return self._compute_stats()

    def _compute_stats(self) -> dict:
        """计算窗口内统计量"""
        if len(self._history) < 10:  # 窗口预热期
            return {"status": "warming", "sample_size": len(self._history)}

        samples = list(self._history)
        mean = sum(samples) / len(samples)
        variance = sum((x - mean) ** 2 for x in samples) / len(samples)
        std = math.sqrt(variance)

        current = samples[-1] if samples else 1.0
        deviation = (current - mean) / std if std > 0 else 0

        return {
            "status": "ready",
            "current": round(current, 4),
            "mean": round(mean, 4),
            "std": round(std, 4),
            "median": round(median(samples), 4),
            "deviation": round(deviation, 3),  # 标准差倍数
            "sample_size": len(samples)
        }

4.2 动态阈值:从历史分位数构建

有了滑动窗口的统计量后,我们需要一个信号判定规则。最实用的方法是基于历史分位数构建动态阈值:

class DynamicThreshold:
    """
    基于历史分位数的动态阈值生成器
    - 自动适应不同标的的压力比分布
    - 支持上下不对称阈值(适用于趋势跟踪/均值回归不同方向)
    """

    def __init__(self, percentiles: tuple = (20, 80)):
        self.percentiles = percentiles  # (下限分位数, 上限分位数)
        self._samples = deque(maxlen=500)

    def add(self, value: float):
        self._samples.append(value)

    def get_thresholds(self) -> dict:
        """返回上下阈值"""
        if len(self._samples) < 30:
            return {"status": "insufficient_data"}

        sorted_samples = sorted(self._samples)
        n = len(sorted_samples)

        lower_pct = self.percentiles[0]
        upper_pct = self.percentiles[1]

        lower_idx = int(n * lower_pct / 100)
        upper_idx = int(n * upper_pct / 100)

        return {
            "status": "ready",
            "lower": round(sorted_samples[lower_idx], 4),
            "upper": round(sorted_samples[upper_idx], 4),
            "lower_pct": lower_pct,
            "upper_pct": upper_pct
        }


def generate_signal(pressure_stats: dict, threshold: dict) -> str:
    """
    基于压力比偏离度和动态阈值生成交易信号

    信号逻辑:
    - deviation > +2σ 且 pressure > upper_threshold → 空方强势,偏空
    - deviation < -2σ 且 pressure < lower_threshold → 买方蓄力,偏多
    - 其他 → 中性
    """
    if pressure_stats.get("status") != "ready":
        return "neutral"

    deviation = pressure_stats.get("deviation", 0)
    current = pressure_stats.get("current", 1.0)

    if threshold.get("status") != "ready":
        return "neutral"

    lower = threshold["lower"]
    upper = threshold["upper"]

    # 偏离度 + 绝对值双重确认
    if deviation > 2.0 and current > upper:
        return "bearish"   # 卖压显著偏高
    elif deviation < -2.0 and current < lower:
        return "bullish"   # 买压显著蓄力
    else:
        return "neutral"

五、回测框架集成

5.1 历史 K 线数据获取

将上述信号生成逻辑嵌入回测框架,首先需要获取历史数据。TickDB 的 /v1/market/kline 接口提供清洗对齐的历史 K 线:

import os
import requests
from typing import List, Dict


def fetch_historical_klines(
    symbol: str,
    interval: str = "1m",
    start_time: int = None,
    end_time: int = None,
    limit: int = 1000
) -> List[Dict]:
    """
    获取历史 K 线数据(用于回测)

    参数:
        symbol: 交易品种代码
        interval: K 线周期(1m/5m/15m/1h/4h/1d)
        start_time: 毫秒级时间戳
        end_time: 毫秒级时间戳
        limit: 单次最大返回条数

    注意:
        - 使用 /v1/market/kline 获取已结束周期
        - 不使用 /kline/latest 做回测(其数据为当前未结束周期)
    """
    api_key = os.environ.get("TICKDB_API_KEY")
    if not api_key:
        raise ValueError("请设置环境变量 TICKDB_API_KEY")

    headers = {"X-API-Key": api_key}
    params = {
        "symbol": symbol,
        "interval": interval,
        "limit": limit,
    }
    if start_time:
        params["start"] = start_time
    if end_time:
        params["end"] = end_time

    response = requests.get(
        "https://api.tickdb.ai/v1/market/kline",
        headers=headers,
        params=params,
        timeout=(3.05, 10)
    )

    if response.status_code != 200:
        raise RuntimeError(f"HTTP {response.status_code}: {response.text}")

    result = response.json()
    if result.get("code") == 0:
        return result.get("data", [])
    elif result.get("code") == 3001:
        retry_after = int(response.headers.get("Retry-After", 5))
        time.sleep(retry_after)
        return fetch_historical_klines(symbol, interval, start_time, end_time, limit)
    else:
        raise RuntimeError(f"API 错误 {result.get('code')}: {result.get('message')}")

5.2 事件驱动回测引擎

完整的回测引擎将三个组件串联:历史 K 线数据加载 → 逐 K 线推进 → 订单簿快照模拟 → 压力比计算 → 信号判定 → 交易执行。

from dataclasses import dataclass
from enum import Enum


class Signal(Enum):
    BULLISH = "bullish"
    BEARISH = "bearish"
    NEUTRAL = "neutral"


@dataclass
class Trade:
    entry_time: str
    entry_price: float
    direction: str  # "long" or "short"
    size: float


class PressureRatioBacktester:
    """
    买卖压力比事件驱动回测引擎

    设计说明:
    - 使用 1 分钟 K 线作为事件触发器(每根 K 线收盘时更新订单簿快照模拟)
    - 订单簿快照通过模拟生成(实际回测中可替换为真实 depth 历史数据)
    - 支持买入持有基准对比
    """

    def __init__(
        self,
        symbol: str,
        initial_capital: float = 100_000,
        position_size: float = 0.1,  # 每笔仓位占资金比例
        lambda_decay: float = 0.3,
        window_size: int = 60
    ):
        self.symbol = symbol
        self.initial_capital = initial_capital
        self.position_size = position_size

        self.pressure_window = PressureRatioWindow(window_size, lambda_decay)
        self.threshold_manager = DynamicThreshold(percentiles=(20, 80))

        self.cash = initial_capital
        self.position = 0.0
        self.trades: List[Trade] = []
        self.equity_curve = []
        self.daily_pnl = []

    def _simulate_depth_snapshot(self, kline: dict, pressure_ratio: float) -> dict:
        """
        模拟 depth 快照
        ⚠️ 生产回测应使用真实 depth 历史数据,此处为演示模拟逻辑
        """
        close_price = kline["close"]
        base_vol = 5000

        # 简单模拟:压力比 > 1 时买方挂单更多,反之卖方更多
        if pressure_ratio > 1.5:
            bid_vol = base_vol * 2
            ask_vol = base_vol * 0.6
        elif pressure_ratio < 0.7:
            bid_vol = base_vol * 0.6
            ask_vol = base_vol * 2
        else:
            bid_vol = base_vol
            ask_vol = base_vol

        return {
            "symbol": self.symbol,
            "timestamp": kline["timestamp"],
            "bids": [
                {"price": close_price - 0.01 * i, "volume": int(bid_vol * 0.8 ** i)}
                for i in range(10)
            ],
            "asks": [
                {"price": close_price + 0.01 * i, "volume": int(ask_vol * 0.8 ** i)}
                for i in range(10)
            ]
        }

    def run(self, klines: List[dict]):
        """执行回测"""
        for kline in klines:
            # 1. 更新压力比窗口
            snapshot = self._simulate_depth_snapshot(
                kline,
                self.pressure_window.get_latest() if self.pressure_window._history else 1.0
            )
            stats = self.pressure_window.update(snapshot)

            # 2. 更新阈值管理器(收集足够样本)
            if stats.get("status") == "ready":
                self.threshold_manager.add(stats["current"])
            threshold = self.threshold_manager.get_thresholds()

            # 3. 生成信号
            signal = generate_signal(stats, threshold)

            # 4. 执行交易(仅在信号切换时操作)
            current_price = kline["close"]
            self._execute_trade(signal, current_price, kline["timestamp"])

            # 5. 更新权益
            self._update_equity(current_price)

        return self._generate_report()

    def _execute_trade(self, signal: Signal, price: float, timestamp: str):
        """执行交易(仅在信号切换时)"""
        if signal == Signal.BULLISH and self.position <= 0:
            size = (self.cash * self.position_size) / price
            self.cash -= size * price
            self.position += size
            self.trades.append(Trade(timestamp, price, "long", size))

        elif signal == Signal.BEARISH and self.position >= 0:
            size = (self.cash * self.position_size) / price
            self.cash += size * price
            self.position -= size
            self.trades.append(Trade(timestamp, price, "short", size))

    def _update_equity(self, current_price: float):
        portfolio_value = self.cash + self.position * current_price
        self.equity_curve.append(portfolio_value)

    def _generate_report(self) -> dict:
        """生成回测报告"""
        equity = self.equity_curve
        returns = [(equity[i] - equity[i-1]) / equity[i-1]
                   for i in range(1, len(equity))]

        total_return = (equity[-1] - self.initial_capital) / self.initial_capital
        sharpe = (sum(returns) / len(returns) / (sum((r - sum(returns)/len(returns))**2 for r in returns)/len(returns))**0.5
                  * (252 * 390) ** 0.5) if len(returns) > 1 and sum((r - sum(returns)/len(returns))**2 for r in returns) > 0 else 0

        peak = max(equity)
        trough = min(e for e in equity if e <= peak)
        max_drawdown = (peak - trough) / peak if peak > 0 else 0

        return {
            "total_return": round(total_return * 100, 2),
            "sharpe_ratio": round(sharpe, 2),
            "max_drawdown": round(max_drawdown * 100, 2),
            "num_trades": len(self.trades),
            "final_equity": round(equity[-1], 2)
        }


# ⚠️ 重要提示:上述回测存在以下局限性:
# - 订单簿快照为模拟生成,未使用真实 depth 历史数据
# - 未考虑交易滑点和市场冲击成本
# - 未模拟流动性枯竭场景
# - 建议在实际使用前进行更长时间跨度的验证

5.3 回测运行示例

if __name__ == "__main__":
    # 获取最近 30 天 1 分钟 K 线数据
    import time
    end_ts = int(time.time() * 1000)
    start_ts = end_ts - 30 * 24 * 3600 * 1000

    klines = fetch_historical_klines(
        symbol="700.HK",
        interval="1m",
        start_time=start_ts,
        end_time=end_ts,
        limit=5000
    )

    if not klines:
        print("未获取到 K 线数据,请检查 symbol 和时间范围")
    else:
        tester = PressureRatioBacktester(
            symbol="700.HK",
            initial_capital=100_000,
            lambda_decay=0.3,
            window_size=60
        )
        report = tester.run(klines)

        print("=" * 40)
        print("回测报告:买卖压力比策略")
        print("=" * 40)
        for k, v in report.items():
            print(f"  {k}: {v}")

六、实盘信号系统

6.1 与实盘数据流对接

回测验证通过后,实盘部署只需要将模拟的 _simulate_depth_snapshot 替换为真实的订阅器回调:

def build_real_time_signal_engine(symbol: str):
    """
    实盘信号引擎:订阅 TickDB depth → 计算压力比 → 信号判定 → 告警
    """
    subscriber = DepthSubscriber(symbol=symbol, depth=10)
    pressure_window = PressureRatioWindow(window_size=60, lambda_decay=0.3)
    threshold_manager = DynamicThreshold(percentiles=(20, 80))

    def signal_callback(depth_data: dict):
        # 计算压力比
        stats = pressure_window.update(depth_data)

        # 收集样本建立阈值
        if stats.get("status") == "ready":
            threshold_manager.add(stats["current"])

        # 判定信号
        threshold = threshold_manager.get_thresholds()
        signal = generate_signal(stats, threshold)

        # 告警(可替换为飞书/Webhook/钉钉)
        if signal != "neutral":
            logger.warning(
                f"⚠️ 信号触发:{signal.upper()} | "
                f"压力比={stats.get('current')} | "
                f"偏离度={stats.get('deviation')}σ"
            )

            # 触发告警(示例)
            _send_alert(symbol, signal, stats, threshold)

    subscriber.on_depth(signal_callback)
    subscriber.subscribe()
    return subscriber


def _send_alert(symbol: str, signal: str, stats: dict, threshold: dict):
    """告警发送(示例:飞书 Webhook)"""
    import urllib.request
    import urllib.parse

    webhook_url = os.environ.get("FEISHU_WEBHOOK_URL")
    if not webhook_url:
        logger.debug("未配置飞书 Webhook,跳过告警")
        return

    content = {
        "msg_type": "text",
        "content": {
            "text": (
                f"【TickDB 告警】{symbol}\n"
                f"信号:{signal}\n"
                f"加权压力比:{stats.get('current')}\n"
                f"偏离度:{stats.get('deviation')}σ\n"
                f"阈值区间:[{threshold.get('lower')}, {threshold.get('upper')}]"
            )
        }
    }

    try:
        req = urllib.request.Request(
            webhook_url,
            data=urllib.parse.urlencode(content).encode("utf-8"),
            headers={"Content-Type": "application/json"}
        )
        urllib.request.urlopen(req, timeout=5)
    except Exception as e:
        logger.error(f"告警发送失败:{e}")

七、参数敏感度分析

在将策略投入实盘前,对关键参数进行敏感度分析是必要的。以下是主要参数的影响方向和经验参考:

参数 影响方向 参考范围 注意事项
λ(衰减系数) λ↑ → 前档权重↑ → 信号更灵敏但噪声增加 0.1~0.5 需结合标的特性回测确定
窗口大小 窗口↑ → 信号越平滑但滞后越大 30~120 快照 高频场景可用较小窗口
分位数阈值 下限↓/上限↑ → 信号触发更困难 (15,85)~(30,70) 趋势跟踪可收紧反转阈值
偏离度阈值 σ↑ → 信号触发条件更严格 1.5σ~3σ 高波动标的需提高阈值

建议使用 TickDB 历史数据对上述参数进行网格搜索,取夏普比率最优的参数组合作为实盘初始参数,并设置定期重校准机制。


八、下一步行动

如果你希望亲手运行本文代码

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key 并设置环境变量 TICKDB_API_KEY
  3. 复制本文代码,更换 symbol(如 700.HK)即可运行

如果你在研究港股订单簿结构,TickDB 的 depth 频道支持港股 10 档深度,是构建压力比因子的理想数据源。对比之下,美股 depth 仅支持 1 档,建议使用港股或数字货币标的进行此类策略开发。

如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,可直接在对话中调用 TickDB API 获取数据和构建信号。

如果你希望获取完整的回测历史数据,联系 [email protected] 了解专业版/企业版方案,包含 10 年级别的历史 K 线数据,支持跨周期策略回测。


本文不构成任何投资建议。市场有风险,投资需谨慎。