逐笔成交方向推断:用 tick 数据还原每笔交易的买卖方向
一、为什么这件事不简单
你有 10 年的美股 tick 数据,每一笔成交记录包含:时间戳、成交价、成交量。
你的任务是:给每一笔成交打上标签——这笔是主动买入还是主动卖出。
听起来是个简单的分类问题。价格上涨 → 买方推动 → 主动买入;价格下跌 → 卖方推动 → 主动卖出。
但实际数据会立刻给你一个下马威:超过 40% 的成交恰好发生在买卖报价的中点,此时价格既不上涨也不下跌,涨跌判断法(Tick Test)完全失效。
这就是订单流分析的第一个门槛:tick 数据只告诉你「发生了什么」,不告诉你「谁在推动」。
本文系统讲解目前最通用的解决方案——Lee-Ready 算法及其改进版本,并给出生产级的 Python 实现。
二、tick 数据的结构与局限
2.1 一条 tick 记录里有什么
典型的 tick 成交数据(基于 NYSE TAQ 数据集):
timestamp | price | volume | exchange
1710489600.123456 | 185.42 | 100 | N
1710489600.124789 | 185.43 | 200 | N
1710489600.130012 | 185.43 | 50 | N
一条 tick 记录包含时间戳、成交价、成交量,可能还有交易所代码。不包含方向信息。
这就是订单流分析的根本挑战:交易所不公开逐笔买卖方向(因为这会被高频交易者用于信息套利),我们必须从成交价与报价的关系中间接推断。
2.2 三个关键时间点你需要知道
在讨论算法之前,有三个时间概念必须区分清楚:
- Trade time:这笔交易的实际成交时间
- Quote time:当前最优买卖报价的发布时间
- Quote condition:报价是否处于「可交易」状态(NORMAL、CLOSING、OUT_OF_SEQUENCE)
TAQ 数据集提供了 trade 和 quote 两套独立的时间序列,你需要把它们对齐才能做报价比对。如果 quote 时间与 trade 时间差超过 5 秒,这笔成交不能用于方向推断。
三、报价比对法:Lee-Ready 算法的核心逻辑
3.1 基本思路
Lee-Ready 算法的核心假设只有一条:每一笔成交要么来自买方市价单,要么来自卖方市价单。
买方市价单以卖价(ask)成交 → 主动买入
卖方市价单以买价(bid)成交 → 主动卖出
因此,算法的本质是:对于每一笔成交,找到同一时刻的买卖报价,判断成交价更接近哪个报价。
3.2 分类规则
| 成交价位置 | 初始判断 | 说明 |
|---|---|---|
| = Ask | 主动买入 | 以卖价成交,一定是买方推动 |
| = Bid | 主动卖出 | 以买价成交,一定是卖方推动 |
| 在 Ask 和 Bid 之间 | 进入中立分类 | 无法直接判断 |
中立分类是 Lee-Ready 算法的核心创新点。对于无法直接归类的成交,使用价格变动方向作为次级判断:
if price > last_price:
标记为 主动买入
elif price < last_price:
标记为 主动卖出
else:
标记为 中立(不可用)
3.3 关键细节:成交价格与报价的时间对齐
这是实现中最容易出错的地方。
Lee-Ready 原文规定:使用成交时刻之前最近的一次报价快照(last quote before the trade)。
# 伪代码
for each trade:
# 找到成交时刻之前最近的有效报价
quotes_before_trade = quotes[quotes.timestamp < trade.timestamp]
if quotes_before_trade is empty:
skip this trade # 没有报价数据,无法判断
nearest_quote = quotes_before_trade.iloc[-1]
if trade.price == nearest_quote.ask:
direction = "buy"
elif trade.price == nearest_quote.bid:
direction = "sell"
else:
# 中立区间:用价格变动方向
direction = infer_from_tick_test(trade, last_trade)
四、生产级实现
4.1 数据模型
from dataclasses import dataclass
from typing import Optional, Literal
from datetime import datetime
@dataclass
class Quote:
"""单条报价快照"""
timestamp: float # Unix 时间戳,精确到微秒
bid: float # 买一价
ask: float # 卖一价
bid_size: int # 买一量
ask_size: int # 卖一量
@property
def mid(self) -> float:
return (self.bid + self.ask) / 2
@property
def spread(self) -> float:
return self.ask - self.bid
def is_valid(self, max_age_seconds: float = 5.0) -> bool:
"""报价是否在有效时间内"""
return self.spread > 0 # 买卖价差必须为正
@dataclass
class Tick:
"""单条成交记录"""
timestamp: float
price: float
volume: int
def __repr__(self):
return f"Tick(t={self.timestamp:.3f}, p={self.price}, v={self.volume})"
@dataclass
class AnnotatedTick:
"""标注了方向的结果"""
tick: Tick
direction: Literal["buy", "sell", "neutral"]
confidence: float # 0~1,越高越确定
quote_used: Optional[Quote] = None
@property
def is_deterministic(self) -> bool:
"""是否为确定分类(非中立区间成交)"""
return self.direction in ("buy", "sell")
4.2 Lee-Ready 算法核心实现
import bisect
from typing import List, Tuple, Optional
class LeeReadyClassifier:
"""
Lee-Ready 算法实现
规则:
1. 成交价 = ask → 主动买入(确定)
2. 成交价 = bid → 主动卖出(确定)
3. 成交价在 ask 和 bid 之间 → 用价格变动方向推断
4. 价格未变化 → 标记为 neutral(不可用)
参考:Lee, C.M. & Ready, M.J. (1991). Inferring Trade Direction
from Intraday Data. Journal of Finance, 46(2), 733-746.
"""
def __init__(self, quote_max_age: float = 5.0):
"""
Args:
quote_max_age: 报价最大有效期(秒),超过此时间的报价不用于判断
"""
self.quote_max_age = quote_max_age
def classify(
self,
trade: Tick,
quote_at_trade: Optional[Quote],
last_trade: Optional[Tick] = None
) -> AnnotatedTick:
"""
对单笔成交进行方向分类
Args:
trade: 当前成交
quote_at_trade: 成交时刻的最近有效报价
last_trade: 前一笔成交(用于 tick test)
"""
if quote_at_trade is None:
return AnnotatedTick(
tick=trade,
direction="neutral",
confidence=0.0
)
# 规则1 & 2:成交价精确匹配买卖价 → 确定分类
if trade.price == quote_at_trade.ask:
return AnnotatedTick(
tick=trade,
direction="buy",
confidence=1.0,
quote_used=quote_at_trade
)
if trade.price == quote_at_trade.bid:
return AnnotatedTick(
tick=trade,
direction="sell",
confidence=1.0,
quote_used=quote_at_trade
)
# 规则3:成交价在买卖价之间 → 使用 tick test
# ⚠️ Lee-Ready 原版使用成交量加权平均价格(VWAP),
# 但对于单笔成交,VWAP ≈ 成交价,直接使用即可
if last_trade is not None:
if trade.price > last_trade.price:
direction = "buy"
confidence = 0.7 # 中等置信度
elif trade.price < last_trade.price:
direction = "sell"
confidence = 0.7
else:
direction = "neutral"
confidence = 0.0
else:
# 第一笔成交,无法做 tick test
direction = "neutral"
confidence = 0.0
return AnnotatedTick(
tick=trade,
direction=direction,
confidence=confidence,
quote_used=quote_at_trade
)
4.3 批量处理与报价对齐
这是性能的关键瓶颈。暴力遍历的复杂度是 O(n × m),我们可以利用报价和成交都是时间有序的特点,将复杂度降至 O(n + m)。
class TradeQuoteAligner:
"""
时间对齐器:将成交数据与报价数据对齐
使用二分查找而非线性扫描,保证大规模数据处理的效率
⚠️ 数据量超过 100 万条时,线性扫描会成为显著瓶颈
"""
def __init__(self, quotes: List[Quote]):
# 按时间排序报价
self.quotes = sorted(quotes, key=lambda q: q.timestamp)
self.quote_timestamps = [q.timestamp for q in self.quotes]
def find_nearest_quote_before(self, trade_time: float) -> Optional[Quote]:
"""
找到成交时刻之前最近的报价
使用 bisect_right:
- trade_time = 1000.0
- quote_timestamps = [900, 950, 1000, 1050]
- bisect_right → 索引 3(第一个 > 1000.0 的位置)
- 目标索引 = 3 - 1 = 2,即 timestamp=1000 的报价
⚠️ 注意:bisect_right 在精确时间相等时返回右边界,
因此需要减 1 来获取「之前」的报价
"""
# 找到第一个 > trade_time 的位置
idx = bisect.bisect_right(self.quote_timestamps, trade_time)
if idx == 0:
return None # 没有更早的报价
# 取前一个报价
return self.quotes[idx - 1]
def classify_batch(
trades: List[Tick],
quotes: List[Quote],
quote_max_age: float = 5.0
) -> List[AnnotatedTick]:
"""
批量分类
时间复杂度:O((n + m) log m),n=成交笔数,m=报价笔数
空间复杂度:O(n),存储结果
"""
aligner = TradeQuoteAligner(quotes)
classifier = LeeReadyClassifier(quote_max_age=quote_max_age)
results = []
last_trade = None
for trade in trades:
quote = aligner.find_nearest_quote_before(trade.timestamp)
annotated = classifier.classify(trade, quote, last_trade)
results.append(annotated)
last_trade = trade
return results
4.4 实证检验:验证分类准确性
def compute_direction_statistics(
annotated_trades: List[AnnotatedTick]
) -> dict:
"""
统计分类结果,用于验证算法有效性
"""
total = len(annotated_trades)
buy_count = sum(1 for t in annotated_trades if t.direction == "buy")
sell_count = sum(1 for t in annotated_trades if t.direction == "sell")
neutral_count = sum(1 for t in annotated_trades if t.direction == "neutral")
deterministic = sum(1 for t in annotated_trades if t.is_deterministic)
return {
"total": total,
"buy_ratio": buy_count / total,
"sell_ratio": sell_count / total,
"neutral_ratio": neutral_count / total,
"deterministic_ratio": deterministic / total,
"buy_count": buy_count,
"sell_count": sell_count,
"neutral_count": neutral_count,
}
# 示例输出(基于美股某日的 1 分钟数据)
example_stats = compute_direction_statistics([])
# 典型美股数据分布:
# buy_ratio ≈ 52-54%(买卖不完全平衡)
# deterministic_ratio ≈ 55-65%(其余 35-45% 需要 tick test)
# neutral_ratio ≈ 3-8%(价格恰好未变化)
五、算法的局限性与改进方向
5.1 Lee-Ready 的三个已知缺陷
缺陷一:tick test 依赖于成交序列的连续性
如果数据存在跳票(比如交易所维护导致的 1 小时数据缺失),缺失期间的价格变动无法追踪,tick test 就会失效。通常的解决方案是:设置最大允许间隔(如 30 秒),超过则重置 tick test。
MAX_TICK_TEST_GAP = 30.0 # 秒,超过此间隔重置判断
def safe_classify(trade, quote, last_trade, last_trade_time=None):
# 检查时间连续性
if last_trade_time is not None:
time_gap = trade.timestamp - last_trade_time
if time_gap > MAX_TICK_TEST_GAP:
# 重置:无法用 tick test,用报价法判断
last_trade = None # 强制跳过 tick test
缺陷二:价格恰好在买卖价中点时,完全依赖 tick test
这种情况在数据中占比不低(10-25%,取决于股票流动性)。tick test 的局限在于:短时间内可能连续出现零价格变动,导致大量 neutral 标签。
改进方案:用 成交量加权 替代单笔价格比较。Lee-Ready 原文即建议使用过去几笔成交的成交量加权平均价格(VWAP):
def vwap_tick_test(trade: Tick, window: List[Tick], window_vwap: float) -> str:
"""
基于 VWAP 的 tick test
原理:如果当前成交价 > 过去 N 笔的 VWAP,
说明价格呈上涨趋势,倾向于买方推动
"""
if trade.price > window_vwap:
return "buy"
elif trade.price < window_vwap:
return "sell"
else:
return "neutral"
缺陷三:无法处理盘前/盘后交易
美股盘前 4:00-9:30 和盘后 16:00-20:00 的报价质量较差,买卖价差扩大,部分成交不在正常报价区间内。建议对这些时段的数据单独处理或过滤。
5.2 改进版本:FINRA + 机器学习
FINRA 禁忌法(FINRA BOT Prohibition)
FINRA 规则 5210 禁止「装饰交易」(wash trading),即同一机构以相同价格同时买卖同一标的。在实际数据中,Lee-Ready 发现这种「同一价格连续买卖」的模式会导致方向推断错误。改进方法是在遇到此类模式时,追溯到更早的报价。
基于订单簿状态的概率推断
更前沿的方法是结合 depth 频道的订单簿状态。如果当前卖盘深度远大于买盘深度,且价格没有上涨,这可能意味着买方力量正在衰减,而非单纯的方向信息。
def probabilistic_direction(
trade_price: float,
quote: Quote,
order_book_state: dict # 来自 depth 频道的买卖盘深度
) -> Tuple[str, float]:
"""
概率推断版本
结合报价位置和订单簿状态,计算方向概率
返回:(方向, 置信度)
"""
spread = quote.ask - quote.bid
# 计算成交价在买卖价之间的相对位置
# 0.0 = bid(确定卖出),1.0 = ask(确定买入)
relative_position = (trade_price - quote.bid) / spread if spread > 0 else 0.5
# 订单簿压力调整
buy_depth_ratio = order_book_state.get("buy_depth", 0) / max(
order_book_state.get("buy_depth", 1) + order_book_state.get("sell_depth", 1), 1
)
# 综合概率(简化版,实际需要校准)
buy_probability = relative_position * 0.7 + buy_depth_ratio * 0.3
if buy_probability > 0.65:
return ("buy", buy_probability)
elif buy_probability < 0.35:
return ("sell", 1 - buy_probability)
else:
return ("neutral", 0.5)
六、应用场景:你能用方向数据做什么
6.1 订单流因子
最直接的应用是构建买卖方向的累积指标:
import numpy as np
def compute_order_flow_imbalance(
annotated_trades: List[AnnotatedTick],
window_seconds: int = 60
) -> np.ndarray:
"""
计算订单流不平衡(OFI)
OFI = (窗口内主动买入量 - 主动卖出量) / 总成交量
值域:[-1, 1]
> 0 表示买方主导,< 0 表示卖方主导
"""
timestamps = np.array([t.tick.timestamp for t in annotated_trades])
volumes = np.array([t.tick.volume for t in annotated_trades])
directions = np.array([
1 if t.direction == "buy" else (-1 if t.direction == "sell" else 0)
for t in annotated_trades
])
ofi_values = []
start_time = timestamps[0]
for i, ts in enumerate(timestamps):
if ts - start_time > window_seconds:
# 窗口滑动
mask = (timestamps >= start_time) & (timestamps < ts)
window_volumes = volumes[mask]
window_directions = directions[mask]
buy_vol = np.sum(window_volumes[window_directions == 1])
sell_vol = np.sum(window_volumes[window_directions == -1])
total_vol = np.sum(window_volumes)
ofi = (buy_vol - sell_vol) / total_vol if total_vol > 0 else 0
ofi_values.append((ts, ofi))
start_time = ts
return np.array(ofi_values)
6.2 VPIN:毒性流量检测
Volume-Synchronized Probability of Informed Trading(交易量同步知情交易概率)是 2013 年由 Easley、Lopez de Prado、O'Hara 提出的指标,用于衡量订单流中包含「知情交易者」的概率。
def compute_vpin(
annotated_trades: List[AnnotatedTick],
bucket_size: int = 50 # 每 bucket 的交易笔数
) -> list:
"""
VPIN 计算
VPIN = |buy_volume - sell_volume| / bucket_total_volume
⚠️ 此为简化版。原始论文使用固定成交量 bucket 而非固定笔数,
这样能更好地处理大单和小单的权重差异
"""
vpin_values = []
for i in range(0, len(annotated_trades) - bucket_size, bucket_size):
bucket = annotated_trades[i:i + bucket_size]
buy_vol = sum(
t.tick.volume for t in bucket if t.direction == "buy"
)
sell_vol = sum(
t.tick.volume for t in bucket if t.direction == "sell"
)
total_vol = buy_vol + sell_vol
if total_vol > 0:
vpin = abs(buy_vol - sell_vol) / total_vol
vpin_values.append((bucket[-1].tick.timestamp, vpin))
return vpin_values
VPIN 的核心逻辑:高 VPIN 意味着大单倾向于单边交易,可能是机构知情交易者在建仓或平仓,这往往是价格即将剧烈波动的先兆。2010 年 Flash Crash 的事后分析显示,VPIN 在崩盘前 20 分钟已经显著升高。
6.3 订单流重构与异常检测
有了方向标注后的另一个应用是对比「预期成交分布」与「实际成交分布」。如果某段时间内主动卖出比例异常高于历史均值,可能意味着:
- 机构正在悄悄减仓(无声出货)
- 期权到期日附近的 gamma 挤压即将发生
- 量化因子正在失效,触发了程序化抛售
def detect_order_flow_anomaly(
ofi_series: np.ndarray,
lookback: int = 20,
threshold_std: float = 2.0
) -> List[dict]:
"""
基于 OFI 的异常检测
检测 OFI 偏离历史均值超过 N 个标准差的时点
这些时点通常对应流动性结构的突变
"""
anomalies = []
rolling_mean = np.convolve(
ofi_series[:, 1],
np.ones(lookback)/lookback,
mode='valid'
)
rolling_std = np.array([
np.std(ofi_series[max(0, i-lookback):i+1, 1])
for i in range(lookback, len(ofi_series))
])
for i in range(lookback, len(ofi_series)):
current_ofi = ofi_series[i, 1]
expected = rolling_mean[i - lookback]
std = rolling_std[i - lookback]
z_score = (current_ofi - expected) / std if std > 0 else 0
if abs(z_score) > threshold_std:
anomalies.append({
"timestamp": ofi_series[i, 0],
"ofi": current_ofi,
"z_score": z_score,
"direction": "overbought" if z_score > 0 else "oversold"
})
return anomalies
七、数据质量检查清单
在正式使用方向标注数据之前,必须做以下检查:
def validate_classification_results(
annotated_trades: List[AnnotatedTick]
) -> dict:
"""
分类结果质量验证
⚠️ 以下任一指标异常都意味着算法或数据存在问题,
需要在分析前修复
"""
stats = compute_direction_statistics(annotated_trades)
checks = {
# 买入/卖出比应该接近 1(允许 0.8-1.2 的范围)
"buy_sell_ratio_plausible":
0.8 <= stats["buy_ratio"] / stats["sell_ratio"] <= 1.2,
# 中立标签占比不应超过 15%(过高说明数据质量差或时间间隔太大)
"neutral_ratio_acceptable":
stats["neutral_ratio"] < 0.15,
# 确定分类占比应该 > 50%(否则大部分成交依赖 tick test)
"deterministic_ratio_adequate":
stats["deterministic_ratio"] > 0.50,
# 置信度分布检查
"avg_confidence": np.mean([t.confidence for t in annotated_trades])
}
return {
"stats": stats,
"checks": checks,
"is_usable": all(checks.values()) if "avg_confidence" not in checks else all(v for k, v in checks.items() if k != "avg_confidence")
}
结语
逐笔成交方向推断是订单流分析的起点,而非终点。它的价值不在于「给每笔成交打标签」这件事本身,而在于:当你拥有了方向信息,订单簿的动态、流动性的转移、机构行为的痕迹才开始变得可见。
Lee-Ready 算法诞生于 1991 年,至今仍是该领域最通用的基准方法。改进版本(如基于 VWAP 的 tick test、概率推断、FINRA 禁忌法)都在解决它的局部缺陷,但核心思路——「报价比对 + 价格变动方向」——依然是最稳健的方案。
最后提醒:本文算法基于公开市场数据的合理推断,不构成任何投资建议。市场有风险,投资需谨慎。