订单簿失衡:价格变动前的无声预兆

市场收盘后,你打开某只热门股的分时图,发现它在下午两点突然突破了日内阻力位,涨幅 3%。你打开订单簿回放,想看看突破的瞬间发生了什么——却只看到一串冰冷的数字。

但如果你在价格突破前两分钟打开过订单簿,你会看到完全不同的画面:买盘深度在悄悄积累,卖盘像融化的冰山一样逐渐变薄,买卖压力比从 1.2 跳到 2.8。价格还没有动,但天平已经倾斜了。

这不是巧合。这是订单簿失衡——价格变动前的无声预兆。


一、为什么价格是结果,订单簿才是原因

每一个刚入门量化的开发者都听过一句话:“价格是由供需决定的。” 这话没错,但太粗糙了。

真实的金融市场里,“供需”的具象化就是订单簿。每一笔买价(bid)和卖价(ask)都是市场参与者用真金白银表达的意见。买一价上的 10,000 股,不是抽象的“需求”,而是 10,000 股的买入意愿被定价在那个价格。

当订单簿出现失衡——某一侧的压力显著超过另一侧——价格向压力大的方向移动,只是时间问题。

1.1 订单簿失衡的三种形态

失衡类型 特征 价格预示
买卖压力失衡 买方深度 vs 卖方深度的比值大幅偏离 1 短期方向性移动概率上升
价格梯度失衡 订单簿斜率变陡(深度集中在某一区域) 突破或反转的阻力/支撑发生变化
时间失衡 大量订单在同一时刻挂出(冰山订单拆解) 短期流动性结构被操纵

理解了这三种形态,你就拥有了量化交易中最接近“预知”能力的工具。


二、买卖压力比:订单簿失衡的第一把标尺

买卖压力比(Bid-Ask Pressure Ratio)是衡量订单簿失衡最直接的指标。它的计算逻辑很简单:

买卖压力比 = Σ(前 N 档买盘量) / Σ(前 N 档卖盘量)
  • 比值 > 1:买方压力占优,价格更可能向上移动
  • 比值 < 1:卖方压力占优,价格更可能向下移动
  • 比值剧烈变化:从 1.2 骤升至 2.8,往往预示着短期方向性机会

2.1 真实的订单簿数据长什么样

以下是一段模拟数据,展示了某只股票在财报发布前 30 秒的订单簿状态:

时间 买一量 买二量 买三量 卖一量 卖二量 卖三量 压力比 (3档)
14:29:45 12,500 8,200 6,400 11,800 9,100 7,300 1.06
14:29:50 15,300 9,800 7,100 11,200 8,400 6,900 1.22
14:29:55 21,600 12,400 9,200 9,800 7,200 5,600 1.67
14:30:00 35,200 18,900 14,500 8,100 5,300 4,200 2.48
14:30:05 财报发布 - - - - - -
14:30:10 12,400 7,200 4,100 45,600 28,900 21,300 0.24

看看这段数据里的信息量:

  • 14:29:55:压力比从 1.06 跳到 1.67,买盘在加速积累。这时候价格还没有动——但聪明钱已经在行动。
  • 14:30:00:压力比飙到 2.48,4 倍于正常水平。这不是散户行为,而是机构在财报前大规模建仓的痕迹。
  • 14:30:10:财报发布后,卖盘爆炸式增长(机构出货或利空消息),压力比瞬间跌至 0.24。如果你有实时监控能力,在 14:29:55 就应该开始关注这只股票。

这就是订单簿失衡的力量:价格还没有告诉你任何事情,订单簿已经说了很多。


三、订单簿斜率:失衡的第二个维度

买卖压力比告诉你“谁更有力量”,订单簿斜率告诉你“力量分布的形状”。

3.1 什么是订单簿斜率

想象一个简单的场景:

卖盘:8.01 → 50,000 股
卖盘:8.02 → 30,000 股
卖盘:8.03 → 20,000 股
卖盘:8.04 → 15,000 股

订单簿的形状是从上到下递减的——价格越高,愿意卖的人越多。这是正常状态。

但如果斜率发生变化呢?

卖盘:8.01 → 50,000 股
卖盘:8.02 → 48,000 股  ← 几乎没减少,斜率变平
卖盘:8.03 → 45,000 股

斜率变平意味着什么?意味着在这个价格区间,流动性很厚,想突破需要吃掉大量订单。而如果斜率变陡:

卖盘:8.01 → 50,000 股
卖盘:8.02 → 15,000 股  ← 急剧减少,斜率变陡
卖盘:8.03 → 12,000 股

斜率变陡意味着流动性稀薄,价格很可能快速穿过这个区域。

3.2 斜率异常的实战含义

斜率状态 买盘侧 卖盘侧 市场含义
正常斜率 平缓递减 平缓递减 多空均衡,波动有限
买盘斜率变陡 浅档堆积大量订单 平缓 支撑被加强,价格很难跌破该价位
买盘斜率变平 浅档订单稀疏 平缓 支撑正在被抽走,向下突破概率上升
卖盘斜率变陡 平缓 浅档堆积大量订单 阻力被加强,价格很难突破该价位
卖盘斜率变平 平缓 浅档订单稀疏 阻力正在被抽走,向上突破概率上升

突破前的订单簿斜率变化,是识别真假突破的核心工具。如果价格“突破”了阻力位,但卖盘斜率并没有变平(卖盘阻力依然很厚),这往往是假突破;反之,如果阻力位附近的订单簿斜率提前变平,价格虽然还没动,但真突破可能在几秒后到来。


四、冰山订单:隐藏的不对称信息

冰山订单(Iceberg Order)是机构用来隐藏真实意图的工具:一次性挂出大额卖单,但只在成交时显示实际成交量,未成交部分对市场不可见。

举个例子:

显示:卖一 8.01 → 500 股
隐藏:实际在队列中 45,000 股

成交 500 股后,新的 500 股出现在卖一
再成交 500 股,又补上 500 股
……直到 45,000 股全部成交

散户看到的是“每 500 股成交一次”,完全不知道背后还有 45,000 股在排队。

4.1 冰山订单的识别模式

虽然没有官方 API 能告诉你“这里是冰山订单”,但你可以通过时间序列分析来推断:

特征 描述
固定频率补单 每隔固定时间(如 200ms),卖一量回充到相同水平
量级不匹配 成交时成交量远小于订单簿显示的挂单量
斜率异常 大量订单集中在某一个价位,几乎没有梯度
重复出现 同一交易员 / 账号在多个价位挂出冰山

4.2 冰山订单的实战应用

当你识别出冰山订单时,你可以:

  1. 预估总量:通过补单频率和量级,计算冰山订单的总规模
  2. 预判价格压力:如果冰山在卖方,价格上方有大量隐藏卖压,可能压制上涨
  3. 寻找反向机会:如果冰山卖单接近被完全消耗,这是潜在的方向转折点

当然,识别冰山需要高频数据和精细的时间序列分析。对于个人量化开发者而言,TickDB 的 depth 频道(支持港股 10 档、数字货币 10 档深度)已经能提供足够精细的数据来分析订单簿结构。


五、生产级代码:用 TickDB 实时监控订单簿失衡

说了这么多理论,是时候上代码了。以下是一个完整的生产级 WebSocket 连接,订阅 TickDB 的 depth 频道,计算买卖压力比,并在失衡时触发告警。

import os
import time
import random
import json
import logging
from datetime import datetime
from threading import Thread
from collections import deque

import websocket

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger(__name__)


class OrderBookMonitor:
    """
    订单簿失衡实时监控系统
    支持 TickDB WebSocket depth 频道,实时计算买卖压力比
    """

    def __init__(self, symbols: list[str], pressure_threshold: float = 2.0, window_size: int = 10):
        self.symbols = symbols
        self.pressure_threshold = pressure_threshold  # 压力比告警阈值
        self.window_size = window_size  # 滑动窗口大小
        self.api_key = os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("请设置环境变量 TICKDB_API_KEY")

        self.ws = None
        self.running = False
        self.reconnect_delay = 1  # 初始重连延迟(秒)

        # 存储订单簿快照
        # 结构:{symbol: {"bids": [(price, qty), ...], "asks": [(price, qty), ...]}}
        self.order_books = {sym: {"bids": [], "asks": []} for sym in symbols}

        # 压力比历史(用于滑动窗口计算)
        self.pressure_history = {sym: deque(maxlen=window_size) for sym in symbols}

    def calculate_pressure_ratio(self, symbol: str, levels: int = 5) -> float:
        """
        计算买卖压力比(Bid-Ask Pressure Ratio)
        使用前 N 档的挂单量计算比值
        """
        book = self.order_books.get(symbol)
        if not book or not book["bids"] or not book["asks"]:
            return 1.0

        bid_volume = sum(qty for _, qty in book["bids"][:levels])
        ask_volume = sum(qty for _, qty in book["asks"][:levels])

        if ask_volume == 0:
            return float('inf')  # 避免除零

        return bid_volume / ask_volume

    def parse_depth_message(self, msg: dict) -> dict:
        """解析 TickDB depth 频道消息"""
        # TickDB depth 消息结构示例:
        # {"code": 0, "channel": "depth", "data": {
        #   "symbol": "700.HK",
        #   "bids": [[98.50, 100], [98.45, 200]],
        #   "asks": [[98.55, 150], [98.60, 180]]
        # }}
        channel = msg.get("channel", "")
        if channel != "depth":
            return {}

        data = msg.get("data", {})
        symbol = data.get("symbol", "")

        if symbol not in self.symbols:
            return {}

        bids = data.get("bids", [])  # [[price, qty], ...]
        asks = data.get("asks", [])

        # 更新本地订单簿快照
        self.order_books[symbol] = {
            "bids": [(float(p), float(q)) for p, q in bids],
            "asks": [(float(p), float(q)) for p, q in asks]
        }

        # 计算当前压力比
        current_ratio = self.calculate_pressure_ratio(symbol)
        self.pressure_history[symbol].append(current_ratio)

        # 计算滑动窗口平均压力比
        history = list(self.pressure_history[symbol])
        avg_ratio = sum(history) / len(history) if history else 1.0

        return {
            "symbol": symbol,
            "current_ratio": current_ratio,
            "avg_ratio": avg_ratio,
            "bid_top5_volume": sum(q for _, q in bids[:5]) if bids else 0,
            "ask_top5_volume": sum(q for _, q in asks[:5]) if asks else 0,
            "timestamp": datetime.now().isoformat()
        }

    def detect_imbalance(self, symbol: str) -> dict:
        """检测订单簿失衡状态"""
        current = self.calculate_pressure_ratio(symbol)

        # 计算斜率指标(简化版:比较相邻档位的量级变化)
        book = self.order_books[symbol]
        bid_slope = self._calc_slope(book["bids"][:5])
        ask_slope = self._calc_slope(book["asks"][:5])

        # 判断失衡类型
        imbalance_type = "balanced"
        if current > self.pressure_threshold:
            imbalance_type = "bullish_pressure"
        elif current < 1 / self.pressure_threshold:
            imbalance_type = "bearish_pressure"

        return {
            "symbol": symbol,
            "pressure_ratio": current,
            "bid_slope": bid_slope,
            "ask_slope": ask_slope,
            "imbalance_type": imbalance_type,
            "alert": imbalance_type != "balanced"
        }

    def _calc_slope(self, levels: list[tuple[float, float]]) -> float:
        """
        计算订单簿斜率(简化版)
        返回:档位间平均量级变化率
        正值表示递减(正常),负值表示递增(异常堆积)
        """
        if len(levels) < 2:
            return 0.0

        total_change = 0.0
        for i in range(len(levels) - 1):
            prev_qty = levels[i][1]
            next_qty = levels[i + 1][1]
            if prev_qty > 0:
                change = (next_qty - prev_qty) / prev_qty
                total_change += change

        return total_change / (len(levels) - 1)

    def send_alert(self, symbol: str, alert_data: dict):
        """触发告警通知"""
        logger.warning(
            f"⚠️ 订单簿失衡告警 | {symbol} | "
            f"压力比: {alert_data['pressure_ratio']:.2f} | "
            f"类型: {alert_data['imbalance_type']} | "
            f"买盘斜率: {alert_data['bid_slope']:.4f} | "
            f"卖盘斜率: {alert_data['ask_slope']:.4f}"
        )

    def on_message(self, ws, message):
        """处理 WebSocket 消息"""
        try:
            msg = json.loads(message)

            # 处理错误码
            if msg.get("code") == 3001:
                retry_after = int(ws.headers.get("Retry-After", 5))
                logger.warning(f"触发限频,等待 {retry_after} 秒")
                time.sleep(retry_after)
                return

            if msg.get("code") and msg.get("code") != 0:
                logger.error(f"TickDB 错误: code={msg['code']}, message={msg.get('message')}")
                return

            # 解析深度数据
            depth_data = self.parse_depth_message(msg)
            if not depth_data:
                return

            # 检测失衡
            imbalance = self.detect_imbalance(depth_data["symbol"])
            if imbalance["alert"]:
                self.send_alert(imbalance["symbol"], imbalance)

        except json.JSONDecodeError:
            logger.error("无法解析消息格式")
        except Exception as e:
            logger.error(f"消息处理异常: {e}")

    def on_ping(self, ws, data):
        """处理心跳 ping"""
        logger.debug("收到心跳 ping")

    def on_pong(self, ws, data):
        """处理心跳 pong"""
        logger.debug("心跳响应正常")

    def on_error(self, ws, error):
        """WebSocket 错误回调"""
        logger.error(f"WebSocket 错误: {error}")

    def on_close(self, ws, close_status_code, close_msg):
        """WebSocket 关闭回调"""
        logger.warning(f"连接断开: status={close_status_code}, msg={close_msg}")

    def on_open(self, ws):
        """WebSocket 连接建立后,订阅 depth 频道"""
        logger.info("连接已建立,开始订阅订单簿深度数据")

        for symbol in self.symbols:
            subscribe_msg = {
                "cmd": "subscribe",
                "channel": "depth",
                "symbol": symbol
            }
            ws.send(json.dumps(subscribe_msg))
            logger.info(f"已订阅 {symbol} 的 depth 频道")

        # 启动心跳保活线程
        self.running = True
        Thread(target=self._heartbeat_loop, daemon=True).start()

    def _heartbeat_loop(self):
        """心跳保活线程"""
        while self.running:
            time.sleep(30)  # 每 30 秒发送一次心跳
            if self.ws and self.ws.sock and self.ws.sock.connected:
                try:
                    self.ws.send(json.dumps({"cmd": "ping"}))
                    logger.debug("心跳已发送")
                except Exception as e:
                    logger.error(f"心跳发送失败: {e}")

    def connect(self):
        """建立 WebSocket 连接"""
        ws_url = f"wss://api.tickdb.ai/ws?api_key={self.api_key}"

        self.ws = websocket.WebSocketApp(
            ws_url,
            on_message=self.on_message,
            on_ping=self.on_ping,
            on_pong=self.on_pong,
            on_error=self.on_error,
            on_close=self.on_close
        )
        self.ws.on_open = self.on_open

        logger.info("正在连接 TickDB WebSocket...")
        self.ws.run_forever()

    def reconnect_with_backoff(self):
        """指数退避重连"""
        delay = self.reconnect_delay
        max_delay = 60

        while True:
            logger.info(f"{delay} 秒后尝试重连...")
            time.sleep(delay)

            try:
                self.connect()
                self.reconnect_delay = 1  # 重连成功后重置延迟
                break
            except Exception as e:
                logger.error(f"重连失败: {e}")
                # 指数退避 + 抖动
                delay = min(delay * 2, max_delay)
                jitter = random.uniform(0, delay * 0.1)
                delay += jitter
                self.reconnect_delay = delay

    def run(self):
        """启动监控"""
        try:
            self.connect()
        except KeyboardInterrupt:
            logger.info("用户中断,关闭监控")
            self.running = False
        except Exception as e:
            logger.error(f"连接异常: {e}")
            self.reconnect_with_backoff()


if __name__ == "__main__":
    # ⚠️ 生产环境高频场景建议使用 aiohttp/asyncio
    # 本示例为演示用,生产部署请参考 TickDB 官方异步示例

    monitor = OrderBookMonitor(
        symbols=["700.HK", "9988.HK", "BTC.USDT"],
        pressure_threshold=2.0,  # 压力比超过 2 倍触发告警
        window_size=10
    )

    monitor.run()

代码核心逻辑解读

模块 功能 关键设计
calculate_pressure_ratio 计算买卖压力比 使用前 N 档的量级求和,避免单档异常干扰
_calc_slope 计算订单簿斜率 通过档位间量级变化率判断堆积/稀疏状态
detect_imbalance 综合判断失衡类型 结合压力比和斜率,避免单一指标的误判
心跳保活 维持连接 独立线程定期发送 ping,避免被服务器断开
指数退避重连 异常恢复 含抖动的指数退避,避免惊群效应

数据支持说明:以上代码使用 TickDB WebSocket depth 频道获取实时订单簿深度数据。TickDB 支持港股 10 档深度和数字货币 10 档深度,可覆盖大部分订单簿分析需求。


六、从订单簿失衡到交易决策:闭环思路

光有数据还不够,你需要把订单簿失衡转化成可执行的交易逻辑。以下是一个简化的闭环框架:

订单簿监控 → 失衡检测 → 信号生成 → 阈值确认 → 执行/过滤
     ↑                                            ↓
     ←←←←←←←←← 绩效回测 ←←←←←←←←←←←←←←←←←←←←←←←

6.1 失衡信号生成规则(示例)

条件 信号 方向
压力比 > 2.0 且买盘斜率变平 买盘支撑衰减,向上突破概率高
压力比 < 0.5 且卖盘斜率变陡 卖压堆积,向下突破概率高
压力比剧烈波动(标准差 > 阈值) 流动性结构不稳定,预判方向难 过滤

6.2 实战中需要注意的坑

  1. 不要用单一指标:压力比高不等于一定涨,还可能是买家被套后挂摊平单
  2. 滑点问题:失衡信号触发后,实际成交价可能已经移动,模拟盘和实盘差距可能很大
  3. 信息衰减:订单簿状态以毫秒计变化,信号生成和执行之间不能有延迟
  4. 区分真实失衡 vs 虚假挂单:有些交易员会用大量小额挂单制造虚假深度,然后快速撤单

七、结语:看见别人看不见的信号

订单簿失衡不是万能的预测工具。但它是目前散户和机构之间,少数信息不对等可以被人为抹平的领域。

机构有彭博终端,有 Level 2 数据,有专人盯着订单簿。但你也可以用 TickDB 的 depth 频道,实时获取港股和数字货币的 10 档订单簿深度,计算买卖压力比,监控斜率变化,在价格动之前看到信号。

这不是魔法。这是数据。


下一步行动

如果你想亲手实现本文的监控逻辑

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY,复制本文代码即可运行

如果你想回测订单簿失衡信号的有效性

  • 使用 TickDB /v1/market/kline 接口获取历史 K 线数据
  • 配合 /v1/market/depth 获取历史订单簿快照(若支持)
  • 在 Python 中构建你自己的信号回测框架

如果你习惯用 AI 辅助开发

  • 在 AI 助手中搜索安装 tickdb-market-data SKILL
  • 用自然语言描述你的策略思路,SKILL 可协助生成代码框架

风险提示:本文不构成任何投资建议。订单簿失衡分析仅作为技术研究用途,实际交易请充分考虑市场风险、流动性和交易成本。回测结果不代表未来收益。