文章
价格之上,还有暗流
2010年5月6日,美股市场发生了著名的"闪电崩盘"(Flash Crash)。道琼斯工业平均指数在几分钟内暴跌近1000点,然后又几乎完全反弹回来。事后的调查报告中,有一幅图让所有量化研究员屏住了呼吸——那张图不是价格走势图,而是一张密密麻麻的散点图,每个点代表一笔成交,横轴是时间,纵轴是成交量,点被标记为红色(卖出)或蓝色(买入)。在那几十秒的崩盘窗口中,蓝色点几乎完全消失,红色点像决堤的洪水一样涌出——事后证明,大量算法驱动的自动卖出单是触发流动性真空的直接原因。
那一刻,市场参与者意识到一件事:知道价格是多少,只是理解市场的三分之一。知道谁在买、谁在卖,才是真正看懂市场的钥匙。
但残酷的现实是:你在 TickDB trades 接口拿到的原始逐笔成交数据长这样:
{
"price": 42153.50,
"quantity": 0.8231,
"timestamp": 1714567890123
}
没有买方、没有卖方、没有方向。只有时间、价格和量。
这就是逐笔成交方向推断(Tick Direction Inference)要解决的核心问题:仅凭"发生了什么",还原"谁主导了这件事"。
一、为什么方向推断是刚需
在解释方法之前,有必要先说清楚为什么这件事值得专门写一篇长文。方向推断的实用价值远超学术趣味。
微观结构因子。许多量化因子建立在订单流之上,比如:
- Order Flow Imbalance(OFI):统计单位时间内主动买入量与主动卖出量的差值,替代无法实时获取的订单簿快照
- VPIN(Volume-synchronized Probability of Informed Trading):衡量知情交易者比例,用于预测流动性危机
- 买卖压力比:主动买入量除以主动卖出量,用于判断短期供需拐点
以上所有因子的起点,都是"这笔成交是买还是卖"。
事件驱动择时。在财报发布、LME 库存突变、CPI 数据公布等事件前后,主动买入和主动卖出的切换节奏本身就是市场情绪的温度计。纯价格指标在这类场景下经常给出滞后信号,而订单流可以在价格反映之前先给出拐点提示。
套利策略的生存门槛。统计套利和做市策略需要估计短期价格变动方向。如果把方向搞反了,模型不仅不会赚钱,还会成为其他参与者的流动性来源。
所以问题不是"要不要做方向推断",而是"怎么做对方向推断"。
二、Lee-Ready 算法:方法论的起点
方向推断不是 TickDB 独有的问题,而是整个金融工程领域的经典难题。学术圈和工业界尝试过多种方法,其中Lee-Ready 算法(Lee & Ready, 1991)是目前被引用最广泛、工程落地最成熟的方法。
2.1 核心思想
Lee-Ready 的核心逻辑只有一条:
如果一笔成交的价格大于等于当时的卖一价(ask),则推断为主动买入(Buy)。
如果一笔成交的价格小于等于当时的买一价(bid),则推断为主动卖出(Sell)。
背后的直觉很好理解:当你愿意以卖一价(或者更高)成交时,说明你是主动进攻的那一方——你是买方,你推高了价格。如果你是被动等待的那一方,你的价格不会超过买一价,你的成交价会等于或低于买一价。
用伪代码表示:
if trade_price >= ask:
direction = BUY
elif trade_price <= bid:
direction = SELL
else:
# 成交价在 bid 和 ask 之间(spread 内)
# 需要特殊处理
direction = UNCLASSIFIED
这就是 Lee-Ready 的第一条规则:报价比对法(Quote Rule)。
2.2 中点价成交:第二条规则
这里有一个关键问题:如果成交价恰好等于买卖价差的中点价(mid-price = (bid + ask) / 2),应该归为买方还是卖方?
理论上,中点价成交意味着没有人在主动推价格——买方和卖方恰好在同一价格"碰头"。这种情况下用报价比对法无法判断方向。
Lee-Ready 的第二条规则处理这个问题:中点规则(Tick Rule)——用一个额外的信息来判断:如果这笔成交的价格高于上一笔确认方向的成交价格,则推断为买方主导;如果低于上一笔确认方向的成交价格,则推断为卖方主导。
if trade_price == mid_price:
if trade_price > last_classified_price:
direction = BUY
elif trade_price < last_classified_price:
direction = SELL
else:
direction = last_classified_direction # 价格没变,沿用上一个方向
2.3 时间倒填问题:第三条规则
还有一个棘手的问题:在实时场景中,我们拿到一笔成交数据的时候,对应的报价可能还没有更新——即"时间倒填"(trade-shipping)现象。成交发生在撮合引擎中,但行情广播到终端有一定延迟(通常几十毫秒)。
Lee-Ready 论文的处理方式是:用当前时刻最近的一笔报价(而非同时间戳的报价)来比对。具体来说,在判断第 $i$ 笔成交的方向时,应该用 $i$ 时刻之后最近的 ask 和 bid。
在工程实践中,这意味着你的订阅系统需要同时维护一个最新的报价快照,与成交数据配对使用。TickDB 的 depth 频道可以提供这个能力(适用于港股 10 档、加密货币 10 档)。
三、生产级实现
光有算法思路不够,代码要能跑在生产环境里。下面给出完整的生产级实现,包含:同时订阅成交流和报价快照、数据对齐、时间倒填处理、中点价兜底逻辑,以及完整的工程健壮性保障。
3.1 订阅层:同时获取 trades 和 depth
import os
import json
import time
import random
import asyncio
import threading
from dataclasses import dataclass, field
from typing import Optional, Callable
from datetime import datetime
import requests
# ========== 配置 ==========
API_KEY = os.environ.get("TICKDB_API_KEY")
if not API_KEY:
raise EnvironmentError("请设置环境变量 TICKDB_API_KEY")
BASE_URL = "https://api.tickdb.ai/v1/market"
HEADERS = {"X-API-Key": API_KEY}
# ========== 数据结构 ==========
@dataclass
class QuoteSnapshot:
"""报价快照"""
bid_price: float
bid_quantity: float
ask_price: float
ask_quantity: float
timestamp: int # 毫秒时间戳
@property
def mid_price(self) -> float:
return (self.bid_price + self.ask_price) / 2
@dataclass
class ClassifiedTrade:
"""已分类的逐笔成交"""
price: float
quantity: float
timestamp: int
direction: str # BUY / SELL / UNCLASSIFIED
bid_price: float
ask_price: float
spread: float
@dataclass
class LeeReadyState:
"""Lee-Ready 算法状态机"""
last_bid: float = 0.0
last_ask: float = 0.0
last_classified_price: float = 0.0
last_classified_direction: str = "UNCLASSIFIED"
quote_snapshot: Optional[QuoteSnapshot] = None
# ========== WebSocket 连接(成交流) ==========
def start_trades_websocket(symbol: str, on_trade: Callable):
"""
订阅 TickDB trades 频道
⚠️ trades 接口支持数字货币和港股,不支持美股/A股
"""
ws_url = f"wss://api.tickdb.ai/v1/market/ws?api_key={API_KEY}&symbol={symbol}&channel=trades"
retry_delay = 1.0
max_delay = 32.0
max_retries = 10
for attempt in range(max_retries):
try:
import websocket
def on_message(ws, message):
data = json.loads(message)
# TickDB trades 推送格式: {"price": ..., "quantity": ..., "timestamp": ...}
trade = {
"price": data.get("price"),
"quantity": data.get("quantity"),
"timestamp": data.get("timestamp")
}
on_trade(trade)
def on_ping(ws, data):
ws.send(json.dumps({"cmd": "pong"}))
ws = websocket.WebSocketApp(
ws_url,
on_message=on_message,
on_ping=on_ping,
on_error=lambda ws, err: print(f"[WS Error] {err}")
)
thread = threading.Thread(target=ws.run_forever)
thread.daemon = True
thread.start()
print(f"[WS] trades 订阅已启动: {symbol}")
return ws
except Exception as e:
jitter = random.uniform(0, retry_delay * 0.1)
sleep_time = retry_delay + jitter
print(f"[WS] 连接失败 ({attempt+1}/{max_retries}),{sleep_time:.2f}s 后重试: {e}")
time.sleep(sleep_time)
retry_delay = min(retry_delay * 2, max_delay)
raise RuntimeError("WebSocket 连接重试次数耗尽")
# ========== 报价快照轮询(depth) ==========
def poll_depth_snapshot(symbol: str, interval_ms: int = 100):
"""
轮询 TickDB depth 频道获取最新报价快照
⚠️ depth 频道:港股 10 档 / 加密货币 10 档 / 美股 1 档
⚠️ 高频场景建议使用 WebSocket push,此处为兼容低频场景的轮询方案
"""
latest_quote = {}
def fetch_depth():
try:
resp = requests.get(
f"{BASE_URL}/depth",
headers=HEADERS,
params={"symbol": symbol},
timeout=(3.05, 10)
)
data = resp.json()
if data.get("code") == 0:
bids = data.get("data", {}).get("bids", [])
asks = data.get("data", {}).get("asks", [])
if bids and asks:
latest_quote["snapshot"] = QuoteSnapshot(
bid_price=float(bids[0][0]),
bid_quantity=float(bids[0][1]),
ask_price=float(asks[0][0]),
ask_quantity=float(asks[0][1]),
timestamp=int(data.get("ts", 0))
)
else:
handle_api_error(data)
except requests.exceptions.Timeout:
print("[API] depth 轮询超时,跳过本次")
except Exception as e:
print(f"[API] depth 轮询异常: {e}")
def poll_loop():
while True:
fetch_depth()
time.sleep(interval_ms / 1000)
thread = threading.Thread(target=poll_loop)
thread.daemon = True
thread.start()
def get_snapshot():
return latest_quote.get("snapshot")
return get_snapshot
# ========== Lee-Ready 核心分类逻辑 ==========
DIRECTION_BUY = "BUY"
DIRECTION_SELL = "SELL"
DIRECTION_UNCLASSIFIED = "UNCLASSIFIED"
def classify_trade_direction(trade: dict, state: LeeReadyState) -> str:
"""
Lee-Ready 算法核心实现
规则顺序:
1. Quote Rule(报价比对):price >= ask → BUY, price <= bid → SELL
2. Tick Rule(tick 方向递延):price == mid → 对比上一笔确认方向
3. 中点价格兜底:无法判断时沿用上一笔确认方向
⚠️ 仅适用于港股和加密货币(trades 接口支持)
"""
price = float(trade["price"])
timestamp = trade["timestamp"]
if state.quote_snapshot is None:
return DIRECTION_UNCLASSIFIED
bid = state.quote_snapshot.bid_price
ask = state.quote_snapshot.ask_price
if bid == 0 or ask == 0:
return DIRECTION_UNCLASSIFIED
# 更新状态机的报价记录(处理时间倒填:用最新报价而非同时间戳报价)
state.last_bid = bid
state.last_ask = ask
spread = ask - bid
mid = (bid + ask) / 2
# 规则一:报价比对
if price >= ask:
state.last_classified_price = price
state.last_classified_direction = DIRECTION_BUY
return DIRECTION_BUY
if price <= bid:
state.last_classified_price = price
state.last_classified_direction = DIRECTION_SELL
return DIRECTION_SELL
# 规则二:成交价在 bid-ask 之内(中点附近)
# Lee-Ready 原版规则:用上一笔确认方向的成交价来判断
if price > state.last_classified_price:
state.last_classified_price = price
state.last_classified_direction = DIRECTION_BUY
return DIRECTION_BUY
if price < state.last_classified_price:
state.last_classified_price = price
state.last_classified_direction = DIRECTION_SELL
return DIRECTION_SELL
# 规则三:价格与上一笔相同,沿用上一笔方向
if state.last_classified_direction != DIRECTION_UNCLASSIFIED:
return state.last_classified_direction
return DIRECTION_UNCLASSIFIED
def build_classified_trade(trade: dict, direction: str, state: LeeReadyState) -> ClassifiedTrade:
"""构建已分类成交记录"""
return ClassifiedTrade(
price=float(trade["price"]),
quantity=float(trade["quantity"]),
timestamp=trade["timestamp"],
direction=direction,
bid_price=state.last_bid,
ask_price=state.last_ask,
spread=state.last_ask - state.last_bid
)
# ========== 主程序 ==========
def run(symbol: str):
"""
演示:订阅逐笔成交 + 实时分类方向
"""
state = LeeReadyState()
classified_trades = []
counter = 0
def on_trade(trade):
nonlocal counter
direction = classify_trade_direction(trade, state)
classified = build_classified_trade(trade, direction, state)
classified_trades.append(classified)
counter += 1
# 每 20 笔打印一次摘要
if counter % 20 == 0:
buys = sum(1 for t in classified_trades[-20:] if t.direction == DIRECTION_BUY)
sells = sum(1 for t in classified_trades[-20:] if t.direction == DIRECTION_SELL)
print(f"[{datetime.fromtimestamp(trade['timestamp']/1000):%H:%M:%S}] "
f"最近 20 笔: BUY={buys} SELL={sells} "
f"价格={classified.price} 方向={direction}")
# ⚠️ 请确认 symbol 在 trades 接口支持范围内(数字货币/港股)
# 示例符号:BTC.USDT, ETH.USDT, 9988.HK
print(f"[初始化] 订阅 {symbol} 的逐笔成交并实时分类方向")
get_snapshot = poll_depth_snapshot(symbol)
# 将快照注入状态机(实际生产中建议异步队列处理)
def refresh_quote():
snapshot = get_snapshot()
if snapshot:
state.quote_snapshot = snapshot
start_trades_websocket(symbol, on_trade)
if __name__ == "__main__":
# ⚠️ 示例 symbol,需确保该标的在 TickDB trades 支持范围内
run("BTC.USDT")
3.2 代码要点说明
为什么同时需要 trades 和 depth 两个频道? Lee-Ready 算法的输入是成交价配对同时间的 bid/ask 价格。depth 频道提供报价快照,trades 频道提供逐笔成交,两者组合才能完成方向推断。
时间倒填的实际工程处理:生产环境中,不能简单地把同一时刻的报价快照和成交配对,因为行情传输存在延迟。上面的代码通过维护一个"始终持有最新报价"的快照来解决这个问题,同时记录下当前成交时刻的 bid/ask 到结果字段中,方便后续回溯时验证。
为什么不直接用 depth 的 bids[0] 和 asks[0] 判断买卖方向? depth 频道的快照反映的是"挂了哪些单",而不是"哪些单刚刚成交了"。逐笔成交的 price 字段和快照 bid/ask 的实时比对,才是 Lee-Ready 方法的核心。
四、方向推断后的衍生指标
方向分类完成后,我们终于有了一张可以使用的"谁在买卖"的数据表。接下来介绍几个基于分类结果构建的实用指标。
4.1 订单流失衡(Order Flow Imbalance, OFI)
OFI 是单位时间内主动买入量与主动卖出量的差值,公式:
$$
\text{OFI}(t) = \sum_{i \in [t-\Delta t, t]} q_i \cdot \mathbb{1}{d_i = \text{BUY}} - \sum{i \in [t-\Delta t, t]} q_i \cdot \mathbb{1}_{d_i = \text{SELL}}
$$
其中 $q_i$ 是成交量,$d_i$ 是分类方向。
from collections import deque
from dataclasses import dataclass
@dataclass
class OFIWindow:
"""OFI 滑动窗口计算器"""
window_seconds: int = 60
classified_trades: deque = field(default_factory=deque)
def add_trade(self, trade: ClassifiedTrade):
self.classified_trades.append(trade)
cutoff = trade.timestamp - self.window_seconds * 1000
while self.classified_trades and self.classified_trades[0].timestamp < cutoff:
self.classified_trades.popleft()
def compute_ofi(self) -> float:
buy_volume = sum(t.quantity for t in self.classified_trades if t.direction == DIRECTION_BUY)
sell_volume = sum(t.quantity for t in self.classified_trades if t.direction == DIRECTION_SELL)
return buy_volume - sell_volume
def ofi_ratio(self) -> float:
"""
OFI 比率:正向 OFI 占比(0.5 = 多空平衡)
用于跨标的口味归一化
"""
buy_volume = sum(t.quantity for t in self.classified_trades if t.direction == DIRECTION_BUY)
sell_volume = sum(t.quantity for t in self.classified_trades if t.direction == DIRECTION_SELL)
total = buy_volume + sell_volume
if total == 0:
return 0.5
return buy_volume / total
4.2 买卖压力比(Buy/Sell Pressure Ratio)
$$
\text{PressureRatio} = \frac{\sum_{i \in \text{window}} q_i \cdot \mathbb{1}{d_i=\text{BUY}}}{\sum{i \in \text{window}} q_i \cdot \mathbb{1}_{d_i=\text{SELL}}}
$$
def compute_pressure_ratio(window_trades: list[ClassifiedTrade]) -> float:
buy_q = sum(t.quantity for t in window_trades if t.direction == DIRECTION_BUY)
sell_q = sum(t.quantity for t in window_trades if t.direction == DIRECTION_SELL)
if sell_q == 0:
return float('inf') if buy_q > 0 else 1.0
return buy_q / sell_q
def detect_flow_shift(classified_trades: list[ClassifiedTrade], lookback: int = 50, threshold: float = 2.0) -> str:
"""
监测订单流方向切换
当压力比从 >threshold 快速跌至 <1/threshold 时,
视为卖方压力骤增的信号
⚠️ 仅作为辅助参考,不构成交易建议
"""
if len(classified_trades) < lookback:
return "INSUFFICIENT_DATA"
recent = classified_trades[-lookback:]
ratio = compute_pressure_ratio(recent)
if ratio > threshold:
return "STRONG_BUY_PRESSURE"
elif ratio < 1 / threshold:
return "STRONG_SELL_PRESSURE"
else:
return "BALANCED"
4.3 成交价分布偏离
统计在窗口期内,主动买入的平均成交价与主动卖出的平均成交价之差(相对于中间价的百分比),用于判断哪一方在更有利的位置成交。
def compute_trade_price_spread(classified_trades: list[ClassifiedTrade]) -> dict:
buys = [t for t in classified_trades if t.direction == DIRECTION_BUY]
sells = [t for t in classified_trades if t.direction == DIRECTION_SELL]
buy_avg = sum(t.price * t.quantity for t in buys) / sum(t.quantity for t in buys) if buys else 0
sell_avg = sum(t.price * t.quantity for t in sells) / sum(t.quantity for t in sells) if sells else 0
# 相对于 mid-price 的偏离
mid_avg = (buy_avg + sell_avg) / 2 if (buy_avg and sell_avg) else 0
return {
"buy_avg_偏离mid_bps": (buy_avg - mid_avg) / mid_avg * 10000 if mid_avg else 0,
"sell_avg_偏离mid_bps": (sell_avg - mid_avg) / mid_avg * 10000 if mid_avg else 0,
"buy_avg": buy_avg,
"sell_avg": sell_avg,
}
五、方法局限性与应对
Lee-Ready 算法不是银弹,理解它的局限性是正确使用它的前提。
5.1 底层数据要求
Lee-Ready 的前提假设是:每一笔成交都能配对到撮合时刻的 bid/ask 快照。这意味着:
- 必须有
depth数据源:如果你的标的只有 K 线数据没有订单簿数据,Lee-Ready 无法适用 - TickDB 的数据覆盖边界:trades 接口支持港股和数字货币,不支持美股和 A 股。对于美股历史回测,建议使用 TickDB 的
/v1/market/kline接口(10 年级别历史 K 线数据)配合量价关系因子,而非逐笔成交方向推断 - 数据对齐延迟:WebSocket 推送的 trades 和 depth 可能存在几十毫秒的时间差,上游数据源的时序质量直接影响分类准确率
5.2 算法本身的不确定性
| 场景 | Lee-Ready 的判断 | 潜在问题 |
|---|---|---|
| 成交价 == mid 且无历史方向 | UNCLASSIFIED | 高频小额成交可能导致大量无法分类 |
| bid-ask spread = 0(极度深度市场) | 报价比对失效 | 极端情况下两个方向同时满足 |
| 算法驱动的大宗成交 | 方向可能反映的是机器行为而非意图 | 单纯方向分类无法区分人工和算法 |
5.3 替代方案速查
当 Lee-Ready 条件不满足时,可以考虑以下替代方法:
| 方法 | 数据要求 | 适用场景 |
|---|---|---|
| 成交量加权均价变化方向 | 只需价格和量 | 无法获取订单簿时的近似方案 |
| ER(Ellis, Michaely, O'Hara)算法 | 成交价 + 成交量 + 报价 | 与 Lee-Ready 类似,但权重计算方式不同 |
| PIN 模型(Probability of Informed Trading) | 成交量时间序列 | 预测流动性风险,不适合实时分类 |
| 市场微观结构噪声过滤 | 需结合波动率阈值 | 过滤 spread 内的小幅震荡成交 |
六、在 TickDB 中完整使用这套方案
上面给出的代码是完整的独立模块,可以直接作为 TickDB 数据流的一个处理环节接入你的量化系统。以下是各接口的使用场景对照:
| 数据需求 | TickDB 接口 | 说明 |
|---|---|---|
| 逐笔成交流(实时) | WebSocket / trades |
港股和数字货币可用 |
| 订单簿快照(实时) | WebSocket / depth |
港股 10 档 / 加密 10 档 / 美股 1 档 |
| 历史 K 线回测 | GET /v1/market/kline |
10 年级别,清洗对齐,支持多周期 |
| 当前 K 线 | GET /v1/market/kline/latest |
实时展示用,勿用于回测 |
| 交易品种查询 | GET /v1/symbols/available |
判断哪些标的在支持范围内 |
结语:看见暗流
回到开头的那张散点图。量化研究的核心能力之一,就是把"价格是结果,订单簿是原因"这件事看穿。Lee-Ready 算法给了我们一把在逐笔成交级别还原市场参与意图的钥匙——不是完美答案,但是是目前为止工程界共识最强的起点。
方向分类只是微观结构分析的第一步。有了方向数据,你才可能构建 OFI、压力比、VPIN 这些因子;有了因子,你才有可能捕捉到那些价格还没有反映、但订单流已经在说的信号。
市场里真正的Alpha,往往藏在价格图表看不见的地方——在逐笔成交里,在买卖盘口的消长里,在那张被人忽略的订单簿里。
下一步行动
如果你想亲手实现本文的分类算法:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台生成 API Key
- 设置环境变量
TICKDB_API_KEY,复制本文代码即可运行(推荐先用BTC.USDT等数字货币标的测试)
如果你关心的是美股历史回测:
TickDB 的 /v1/market/kline 接口提供 10 年级别的历史 K 线数据,支持多周期回测。注意美股不支持逐笔成交方向推断,建议使用量价类因子替代。
如果你在搭建订单流分析系统:建议同时订阅 depth + trades 双频道,参考本文代码架构,将分类结果写入时序数据库(如 TimescaleDB/InfluxDB),后续用 ClickHouse 或 DuckDB 做 OLAP 分析。
风险提示:本文不构成任何投资建议。Lee-Ready 算法是一种统计推断方法,存在固有误差,分类结果不应直接作为交易信号使用。算法在实际市场中的表现受数据质量、市场制度和流动性状况等多重因素影响。市场有风险,投资需谨慎。