订单簿失衡:价格变动前的无声预兆
市场收盘后,你打开某只热门股的分时图,发现它在下午两点突然突破了日内阻力位,涨幅 3%。你打开订单簿回放,想看看突破的瞬间发生了什么——却只看到一串冰冷的数字。
但如果你在价格突破前两分钟打开过订单簿,你会看到完全不同的画面:买盘深度在悄悄积累,卖盘像融化的冰山一样逐渐变薄,买卖压力比从 1.2 跳到 2.8。价格还没有动,但天平已经倾斜了。
这不是巧合。这是订单簿失衡——价格变动前的无声预兆。
一、为什么价格是结果,订单簿才是原因
每一个刚入门量化的开发者都听过一句话:“价格是由供需决定的。” 这话没错,但太粗糙了。
真实的金融市场里,“供需”的具象化就是订单簿。每一笔买价(bid)和卖价(ask)都是市场参与者用真金白银表达的意见。买一价上的 10,000 股,不是抽象的“需求”,而是 10,000 股的买入意愿被定价在那个价格。
当订单簿出现失衡——某一侧的压力显著超过另一侧——价格向压力大的方向移动,只是时间问题。
1.1 订单簿失衡的三种形态
| 失衡类型 | 特征 | 价格预示 |
|---|---|---|
| 买卖压力失衡 | 买方深度 vs 卖方深度的比值大幅偏离 1 | 短期方向性移动概率上升 |
| 价格梯度失衡 | 订单簿斜率变陡(深度集中在某一区域) | 突破或反转的阻力/支撑发生变化 |
| 时间失衡 | 大量订单在同一时刻挂出(冰山订单拆解) | 短期流动性结构被操纵 |
理解了这三种形态,你就拥有了量化交易中最接近“预知”能力的工具。
二、买卖压力比:订单簿失衡的第一把标尺
买卖压力比(Bid-Ask Pressure Ratio)是衡量订单簿失衡最直接的指标。它的计算逻辑很简单:
买卖压力比 = Σ(前 N 档买盘量) / Σ(前 N 档卖盘量)
- 比值 > 1:买方压力占优,价格更可能向上移动
- 比值 < 1:卖方压力占优,价格更可能向下移动
- 比值剧烈变化:从 1.2 骤升至 2.8,往往预示着短期方向性机会
2.1 真实的订单簿数据长什么样
以下是一段模拟数据,展示了某只股票在财报发布前 30 秒的订单簿状态:
| 时间 | 买一量 | 买二量 | 买三量 | 卖一量 | 卖二量 | 卖三量 | 压力比 (3档) |
|---|---|---|---|---|---|---|---|
| 14:29:45 | 12,500 | 8,200 | 6,400 | 11,800 | 9,100 | 7,300 | 1.06 |
| 14:29:50 | 15,300 | 9,800 | 7,100 | 11,200 | 8,400 | 6,900 | 1.22 |
| 14:29:55 | 21,600 | 12,400 | 9,200 | 9,800 | 7,200 | 5,600 | 1.67 |
| 14:30:00 | 35,200 | 18,900 | 14,500 | 8,100 | 5,300 | 4,200 | 2.48 |
| 14:30:05 | 财报发布 | - | - | - | - | - | - |
| 14:30:10 | 12,400 | 7,200 | 4,100 | 45,600 | 28,900 | 21,300 | 0.24 |
看看这段数据里的信息量:
- 14:29:55:压力比从 1.06 跳到 1.67,买盘在加速积累。这时候价格还没有动——但聪明钱已经在行动。
- 14:30:00:压力比飙到 2.48,4 倍于正常水平。这不是散户行为,而是机构在财报前大规模建仓的痕迹。
- 14:30:10:财报发布后,卖盘爆炸式增长(机构出货或利空消息),压力比瞬间跌至 0.24。如果你有实时监控能力,在 14:29:55 就应该开始关注这只股票。
这就是订单簿失衡的力量:价格还没有告诉你任何事情,订单簿已经说了很多。
三、订单簿斜率:失衡的第二个维度
买卖压力比告诉你“谁更有力量”,订单簿斜率告诉你“力量分布的形状”。
3.1 什么是订单簿斜率
想象一个简单的场景:
卖盘:8.01 → 50,000 股
卖盘:8.02 → 30,000 股
卖盘:8.03 → 20,000 股
卖盘:8.04 → 15,000 股
订单簿的形状是从上到下递减的——价格越高,愿意卖的人越多。这是正常状态。
但如果斜率发生变化呢?
卖盘:8.01 → 50,000 股
卖盘:8.02 → 48,000 股 ← 几乎没减少,斜率变平
卖盘:8.03 → 45,000 股
斜率变平意味着什么?意味着在这个价格区间,流动性很厚,想突破需要吃掉大量订单。而如果斜率变陡:
卖盘:8.01 → 50,000 股
卖盘:8.02 → 15,000 股 ← 急剧减少,斜率变陡
卖盘:8.03 → 12,000 股
斜率变陡意味着流动性稀薄,价格很可能快速穿过这个区域。
3.2 斜率异常的实战含义
| 斜率状态 | 买盘侧 | 卖盘侧 | 市场含义 |
|---|---|---|---|
| 正常斜率 | 平缓递减 | 平缓递减 | 多空均衡,波动有限 |
| 买盘斜率变陡 | 浅档堆积大量订单 | 平缓 | 支撑被加强,价格很难跌破该价位 |
| 买盘斜率变平 | 浅档订单稀疏 | 平缓 | 支撑正在被抽走,向下突破概率上升 |
| 卖盘斜率变陡 | 平缓 | 浅档堆积大量订单 | 阻力被加强,价格很难突破该价位 |
| 卖盘斜率变平 | 平缓 | 浅档订单稀疏 | 阻力正在被抽走,向上突破概率上升 |
突破前的订单簿斜率变化,是识别真假突破的核心工具。如果价格“突破”了阻力位,但卖盘斜率并没有变平(卖盘阻力依然很厚),这往往是假突破;反之,如果阻力位附近的订单簿斜率提前变平,价格虽然还没动,但真突破可能在几秒后到来。
四、冰山订单:隐藏的不对称信息
冰山订单(Iceberg Order)是机构用来隐藏真实意图的工具:一次性挂出大额卖单,但只在成交时显示实际成交量,未成交部分对市场不可见。
举个例子:
显示:卖一 8.01 → 500 股
隐藏:实际在队列中 45,000 股
成交 500 股后,新的 500 股出现在卖一
再成交 500 股,又补上 500 股
……直到 45,000 股全部成交
散户看到的是“每 500 股成交一次”,完全不知道背后还有 45,000 股在排队。
4.1 冰山订单的识别模式
虽然没有官方 API 能告诉你“这里是冰山订单”,但你可以通过时间序列分析来推断:
| 特征 | 描述 |
|---|---|
| 固定频率补单 | 每隔固定时间(如 200ms),卖一量回充到相同水平 |
| 量级不匹配 | 成交时成交量远小于订单簿显示的挂单量 |
| 斜率异常 | 大量订单集中在某一个价位,几乎没有梯度 |
| 重复出现 | 同一交易员 / 账号在多个价位挂出冰山 |
4.2 冰山订单的实战应用
当你识别出冰山订单时,你可以:
- 预估总量:通过补单频率和量级,计算冰山订单的总规模
- 预判价格压力:如果冰山在卖方,价格上方有大量隐藏卖压,可能压制上涨
- 寻找反向机会:如果冰山卖单接近被完全消耗,这是潜在的方向转折点
当然,识别冰山需要高频数据和精细的时间序列分析。对于个人量化开发者而言,TickDB 的 depth 频道(支持港股 10 档、数字货币 10 档深度)已经能提供足够精细的数据来分析订单簿结构。
五、生产级代码:用 TickDB 实时监控订单簿失衡
说了这么多理论,是时候上代码了。以下是一个完整的生产级 WebSocket 连接,订阅 TickDB 的 depth 频道,计算买卖压力比,并在失衡时触发告警。
import os
import time
import random
import json
import logging
from datetime import datetime
from threading import Thread
from collections import deque
import websocket
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger(__name__)
class OrderBookMonitor:
"""
订单簿失衡实时监控系统
支持 TickDB WebSocket depth 频道,实时计算买卖压力比
"""
def __init__(self, symbols: list[str], pressure_threshold: float = 2.0, window_size: int = 10):
self.symbols = symbols
self.pressure_threshold = pressure_threshold # 压力比告警阈值
self.window_size = window_size # 滑动窗口大小
self.api_key = os.environ.get("TICKDB_API_KEY")
if not self.api_key:
raise ValueError("请设置环境变量 TICKDB_API_KEY")
self.ws = None
self.running = False
self.reconnect_delay = 1 # 初始重连延迟(秒)
# 存储订单簿快照
# 结构:{symbol: {"bids": [(price, qty), ...], "asks": [(price, qty), ...]}}
self.order_books = {sym: {"bids": [], "asks": []} for sym in symbols}
# 压力比历史(用于滑动窗口计算)
self.pressure_history = {sym: deque(maxlen=window_size) for sym in symbols}
def calculate_pressure_ratio(self, symbol: str, levels: int = 5) -> float:
"""
计算买卖压力比(Bid-Ask Pressure Ratio)
使用前 N 档的挂单量计算比值
"""
book = self.order_books.get(symbol)
if not book or not book["bids"] or not book["asks"]:
return 1.0
bid_volume = sum(qty for _, qty in book["bids"][:levels])
ask_volume = sum(qty for _, qty in book["asks"][:levels])
if ask_volume == 0:
return float('inf') # 避免除零
return bid_volume / ask_volume
def parse_depth_message(self, msg: dict) -> dict:
"""解析 TickDB depth 频道消息"""
# TickDB depth 消息结构示例:
# {"code": 0, "channel": "depth", "data": {
# "symbol": "700.HK",
# "bids": [[98.50, 100], [98.45, 200]],
# "asks": [[98.55, 150], [98.60, 180]]
# }}
channel = msg.get("channel", "")
if channel != "depth":
return {}
data = msg.get("data", {})
symbol = data.get("symbol", "")
if symbol not in self.symbols:
return {}
bids = data.get("bids", []) # [[price, qty], ...]
asks = data.get("asks", [])
# 更新本地订单簿快照
self.order_books[symbol] = {
"bids": [(float(p), float(q)) for p, q in bids],
"asks": [(float(p), float(q)) for p, q in asks]
}
# 计算当前压力比
current_ratio = self.calculate_pressure_ratio(symbol)
self.pressure_history[symbol].append(current_ratio)
# 计算滑动窗口平均压力比
history = list(self.pressure_history[symbol])
avg_ratio = sum(history) / len(history) if history else 1.0
return {
"symbol": symbol,
"current_ratio": current_ratio,
"avg_ratio": avg_ratio,
"bid_top5_volume": sum(q for _, q in bids[:5]) if bids else 0,
"ask_top5_volume": sum(q for _, q in asks[:5]) if asks else 0,
"timestamp": datetime.now().isoformat()
}
def detect_imbalance(self, symbol: str) -> dict:
"""检测订单簿失衡状态"""
current = self.calculate_pressure_ratio(symbol)
# 计算斜率指标(简化版:比较相邻档位的量级变化)
book = self.order_books[symbol]
bid_slope = self._calc_slope(book["bids"][:5])
ask_slope = self._calc_slope(book["asks"][:5])
# 判断失衡类型
imbalance_type = "balanced"
if current > self.pressure_threshold:
imbalance_type = "bullish_pressure"
elif current < 1 / self.pressure_threshold:
imbalance_type = "bearish_pressure"
return {
"symbol": symbol,
"pressure_ratio": current,
"bid_slope": bid_slope,
"ask_slope": ask_slope,
"imbalance_type": imbalance_type,
"alert": imbalance_type != "balanced"
}
def _calc_slope(self, levels: list[tuple[float, float]]) -> float:
"""
计算订单簿斜率(简化版)
返回:档位间平均量级变化率
正值表示递减(正常),负值表示递增(异常堆积)
"""
if len(levels) < 2:
return 0.0
total_change = 0.0
for i in range(len(levels) - 1):
prev_qty = levels[i][1]
next_qty = levels[i + 1][1]
if prev_qty > 0:
change = (next_qty - prev_qty) / prev_qty
total_change += change
return total_change / (len(levels) - 1)
def send_alert(self, symbol: str, alert_data: dict):
"""触发告警通知"""
logger.warning(
f"⚠️ 订单簿失衡告警 | {symbol} | "
f"压力比: {alert_data['pressure_ratio']:.2f} | "
f"类型: {alert_data['imbalance_type']} | "
f"买盘斜率: {alert_data['bid_slope']:.4f} | "
f"卖盘斜率: {alert_data['ask_slope']:.4f}"
)
def on_message(self, ws, message):
"""处理 WebSocket 消息"""
try:
msg = json.loads(message)
# 处理错误码
if msg.get("code") == 3001:
retry_after = int(ws.headers.get("Retry-After", 5))
logger.warning(f"触发限频,等待 {retry_after} 秒")
time.sleep(retry_after)
return
if msg.get("code") and msg.get("code") != 0:
logger.error(f"TickDB 错误: code={msg['code']}, message={msg.get('message')}")
return
# 解析深度数据
depth_data = self.parse_depth_message(msg)
if not depth_data:
return
# 检测失衡
imbalance = self.detect_imbalance(depth_data["symbol"])
if imbalance["alert"]:
self.send_alert(imbalance["symbol"], imbalance)
except json.JSONDecodeError:
logger.error("无法解析消息格式")
except Exception as e:
logger.error(f"消息处理异常: {e}")
def on_ping(self, ws, data):
"""处理心跳 ping"""
logger.debug("收到心跳 ping")
def on_pong(self, ws, data):
"""处理心跳 pong"""
logger.debug("心跳响应正常")
def on_error(self, ws, error):
"""WebSocket 错误回调"""
logger.error(f"WebSocket 错误: {error}")
def on_close(self, ws, close_status_code, close_msg):
"""WebSocket 关闭回调"""
logger.warning(f"连接断开: status={close_status_code}, msg={close_msg}")
def on_open(self, ws):
"""WebSocket 连接建立后,订阅 depth 频道"""
logger.info("连接已建立,开始订阅订单簿深度数据")
for symbol in self.symbols:
subscribe_msg = {
"cmd": "subscribe",
"channel": "depth",
"symbol": symbol
}
ws.send(json.dumps(subscribe_msg))
logger.info(f"已订阅 {symbol} 的 depth 频道")
# 启动心跳保活线程
self.running = True
Thread(target=self._heartbeat_loop, daemon=True).start()
def _heartbeat_loop(self):
"""心跳保活线程"""
while self.running:
time.sleep(30) # 每 30 秒发送一次心跳
if self.ws and self.ws.sock and self.ws.sock.connected:
try:
self.ws.send(json.dumps({"cmd": "ping"}))
logger.debug("心跳已发送")
except Exception as e:
logger.error(f"心跳发送失败: {e}")
def connect(self):
"""建立 WebSocket 连接"""
ws_url = f"wss://api.tickdb.ai/ws?api_key={self.api_key}"
self.ws = websocket.WebSocketApp(
ws_url,
on_message=self.on_message,
on_ping=self.on_ping,
on_pong=self.on_pong,
on_error=self.on_error,
on_close=self.on_close
)
self.ws.on_open = self.on_open
logger.info("正在连接 TickDB WebSocket...")
self.ws.run_forever()
def reconnect_with_backoff(self):
"""指数退避重连"""
delay = self.reconnect_delay
max_delay = 60
while True:
logger.info(f"{delay} 秒后尝试重连...")
time.sleep(delay)
try:
self.connect()
self.reconnect_delay = 1 # 重连成功后重置延迟
break
except Exception as e:
logger.error(f"重连失败: {e}")
# 指数退避 + 抖动
delay = min(delay * 2, max_delay)
jitter = random.uniform(0, delay * 0.1)
delay += jitter
self.reconnect_delay = delay
def run(self):
"""启动监控"""
try:
self.connect()
except KeyboardInterrupt:
logger.info("用户中断,关闭监控")
self.running = False
except Exception as e:
logger.error(f"连接异常: {e}")
self.reconnect_with_backoff()
if __name__ == "__main__":
# ⚠️ 生产环境高频场景建议使用 aiohttp/asyncio
# 本示例为演示用,生产部署请参考 TickDB 官方异步示例
monitor = OrderBookMonitor(
symbols=["700.HK", "9988.HK", "BTC.USDT"],
pressure_threshold=2.0, # 压力比超过 2 倍触发告警
window_size=10
)
monitor.run()
代码核心逻辑解读
| 模块 | 功能 | 关键设计 |
|---|---|---|
calculate_pressure_ratio |
计算买卖压力比 | 使用前 N 档的量级求和,避免单档异常干扰 |
_calc_slope |
计算订单簿斜率 | 通过档位间量级变化率判断堆积/稀疏状态 |
detect_imbalance |
综合判断失衡类型 | 结合压力比和斜率,避免单一指标的误判 |
| 心跳保活 | 维持连接 | 独立线程定期发送 ping,避免被服务器断开 |
| 指数退避重连 | 异常恢复 | 含抖动的指数退避,避免惊群效应 |
数据支持说明:以上代码使用 TickDB WebSocket depth 频道获取实时订单簿深度数据。TickDB 支持港股 10 档深度和数字货币 10 档深度,可覆盖大部分订单簿分析需求。
六、从订单簿失衡到交易决策:闭环思路
光有数据还不够,你需要把订单簿失衡转化成可执行的交易逻辑。以下是一个简化的闭环框架:
订单簿监控 → 失衡检测 → 信号生成 → 阈值确认 → 执行/过滤
↑ ↓
←←←←←←←←← 绩效回测 ←←←←←←←←←←←←←←←←←←←←←←←
6.1 失衡信号生成规则(示例)
| 条件 | 信号 | 方向 |
|---|---|---|
| 压力比 > 2.0 且买盘斜率变平 | 买盘支撑衰减,向上突破概率高 | 多 |
| 压力比 < 0.5 且卖盘斜率变陡 | 卖压堆积,向下突破概率高 | 空 |
| 压力比剧烈波动(标准差 > 阈值) | 流动性结构不稳定,预判方向难 | 过滤 |
6.2 实战中需要注意的坑
- 不要用单一指标:压力比高不等于一定涨,还可能是买家被套后挂摊平单
- 滑点问题:失衡信号触发后,实际成交价可能已经移动,模拟盘和实盘差距可能很大
- 信息衰减:订单簿状态以毫秒计变化,信号生成和执行之间不能有延迟
- 区分真实失衡 vs 虚假挂单:有些交易员会用大量小额挂单制造虚假深度,然后快速撤单
七、结语:看见别人看不见的信号
订单簿失衡不是万能的预测工具。但它是目前散户和机构之间,少数信息不对等可以被人为抹平的领域。
机构有彭博终端,有 Level 2 数据,有专人盯着订单簿。但你也可以用 TickDB 的 depth 频道,实时获取港股和数字货币的 10 档订单簿深度,计算买卖压力比,监控斜率变化,在价格动之前看到信号。
这不是魔法。这是数据。
下一步行动
如果你想亲手实现本文的监控逻辑:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台生成 API Key
- 设置环境变量
TICKDB_API_KEY,复制本文代码即可运行
如果你想回测订单簿失衡信号的有效性:
- 使用 TickDB
/v1/market/kline接口获取历史 K 线数据 - 配合
/v1/market/depth获取历史订单簿快照(若支持) - 在 Python 中构建你自己的信号回测框架
如果你习惯用 AI 辅助开发:
- 在 AI 助手中搜索安装
tickdb-market-dataSKILL - 用自然语言描述你的策略思路,SKILL 可协助生成代码框架
风险提示:本文不构成任何投资建议。订单簿失衡分析仅作为技术研究用途,实际交易请充分考虑市场风险、流动性和交易成本。回测结果不代表未来收益。