文章

价格之上,还有暗流

2010年5月6日,美股市场发生了著名的"闪电崩盘"(Flash Crash)。道琼斯工业平均指数在几分钟内暴跌近1000点,然后又几乎完全反弹回来。事后的调查报告中,有一幅图让所有量化研究员屏住了呼吸——那张图不是价格走势图,而是一张密密麻麻的散点图,每个点代表一笔成交,横轴是时间,纵轴是成交量,点被标记为红色(卖出)或蓝色(买入)。在那几十秒的崩盘窗口中,蓝色点几乎完全消失,红色点像决堤的洪水一样涌出——事后证明,大量算法驱动的自动卖出单是触发流动性真空的直接原因。

那一刻,市场参与者意识到一件事:知道价格是多少,只是理解市场的三分之一。知道谁在买、谁在卖,才是真正看懂市场的钥匙。

但残酷的现实是:你在 TickDB trades 接口拿到的原始逐笔成交数据长这样:

{
  "price": 42153.50,
  "quantity": 0.8231,
  "timestamp": 1714567890123
}

没有买方、没有卖方、没有方向。只有时间、价格和量。

这就是逐笔成交方向推断(Tick Direction Inference)要解决的核心问题:仅凭"发生了什么",还原"谁主导了这件事"。


一、为什么方向推断是刚需

在解释方法之前,有必要先说清楚为什么这件事值得专门写一篇长文。方向推断的实用价值远超学术趣味。

微观结构因子。许多量化因子建立在订单流之上,比如:

  • Order Flow Imbalance(OFI):统计单位时间内主动买入量与主动卖出量的差值,替代无法实时获取的订单簿快照
  • VPIN(Volume-synchronized Probability of Informed Trading):衡量知情交易者比例,用于预测流动性危机
  • 买卖压力比:主动买入量除以主动卖出量,用于判断短期供需拐点

以上所有因子的起点,都是"这笔成交是买还是卖"。

事件驱动择时。在财报发布、LME 库存突变、CPI 数据公布等事件前后,主动买入和主动卖出的切换节奏本身就是市场情绪的温度计。纯价格指标在这类场景下经常给出滞后信号,而订单流可以在价格反映之前先给出拐点提示。

套利策略的生存门槛。统计套利和做市策略需要估计短期价格变动方向。如果把方向搞反了,模型不仅不会赚钱,还会成为其他参与者的流动性来源。

所以问题不是"要不要做方向推断",而是"怎么做对方向推断"。


二、Lee-Ready 算法:方法论的起点

方向推断不是 TickDB 独有的问题,而是整个金融工程领域的经典难题。学术圈和工业界尝试过多种方法,其中Lee-Ready 算法(Lee & Ready, 1991)是目前被引用最广泛、工程落地最成熟的方法。

2.1 核心思想

Lee-Ready 的核心逻辑只有一条:

如果一笔成交的价格大于等于当时的卖一价(ask),则推断为主动买入(Buy)。
如果一笔成交的价格小于等于当时的买一价(bid),则推断为主动卖出(Sell)。

背后的直觉很好理解:当你愿意以卖一价(或者更高)成交时,说明你是主动进攻的那一方——你是买方,你推高了价格。如果你是被动等待的那一方,你的价格不会超过买一价,你的成交价会等于或低于买一价。

用伪代码表示:

if trade_price >= ask:
    direction = BUY
elif trade_price <= bid:
    direction = SELL
else:
    # 成交价在 bid 和 ask 之间(spread 内)
    # 需要特殊处理
    direction = UNCLASSIFIED

这就是 Lee-Ready 的第一条规则:报价比对法(Quote Rule)

2.2 中点价成交:第二条规则

这里有一个关键问题:如果成交价恰好等于买卖价差的中点价(mid-price = (bid + ask) / 2),应该归为买方还是卖方?

理论上,中点价成交意味着没有人在主动推价格——买方和卖方恰好在同一价格"碰头"。这种情况下用报价比对法无法判断方向。

Lee-Ready 的第二条规则处理这个问题:中点规则(Tick Rule)——用一个额外的信息来判断:如果这笔成交的价格高于上一笔确认方向的成交价格,则推断为买方主导;如果低于上一笔确认方向的成交价格,则推断为卖方主导。

if trade_price == mid_price:
    if trade_price > last_classified_price:
        direction = BUY
    elif trade_price < last_classified_price:
        direction = SELL
    else:
        direction = last_classified_direction  # 价格没变,沿用上一个方向

2.3 时间倒填问题:第三条规则

还有一个棘手的问题:在实时场景中,我们拿到一笔成交数据的时候,对应的报价可能还没有更新——即"时间倒填"(trade-shipping)现象。成交发生在撮合引擎中,但行情广播到终端有一定延迟(通常几十毫秒)。

Lee-Ready 论文的处理方式是:用当前时刻最近的一笔报价(而非同时间戳的报价)来比对。具体来说,在判断第 $i$ 笔成交的方向时,应该用 $i$ 时刻之后最近的 ask 和 bid。

在工程实践中,这意味着你的订阅系统需要同时维护一个最新的报价快照,与成交数据配对使用。TickDB 的 depth 频道可以提供这个能力(适用于港股 10 档、加密货币 10 档)。


三、生产级实现

光有算法思路不够,代码要能跑在生产环境里。下面给出完整的生产级实现,包含:同时订阅成交流和报价快照、数据对齐、时间倒填处理、中点价兜底逻辑,以及完整的工程健壮性保障。

3.1 订阅层:同时获取 trades 和 depth

import os
import json
import time
import random
import asyncio
import threading
from dataclasses import dataclass, field
from typing import Optional, Callable
from datetime import datetime
import requests

# ========== 配置 ==========
API_KEY = os.environ.get("TICKDB_API_KEY")
if not API_KEY:
    raise EnvironmentError("请设置环境变量 TICKDB_API_KEY")

BASE_URL = "https://api.tickdb.ai/v1/market"
HEADERS = {"X-API-Key": API_KEY}


# ========== 数据结构 ==========
@dataclass
class QuoteSnapshot:
    """报价快照"""
    bid_price: float
    bid_quantity: float
    ask_price: float
    ask_quantity: float
    timestamp: int  # 毫秒时间戳

    @property
    def mid_price(self) -> float:
        return (self.bid_price + self.ask_price) / 2


@dataclass
class ClassifiedTrade:
    """已分类的逐笔成交"""
    price: float
    quantity: float
    timestamp: int
    direction: str  # BUY / SELL / UNCLASSIFIED
    bid_price: float
    ask_price: float
    spread: float


@dataclass
class LeeReadyState:
    """Lee-Ready 算法状态机"""
    last_bid: float = 0.0
    last_ask: float = 0.0
    last_classified_price: float = 0.0
    last_classified_direction: str = "UNCLASSIFIED"
    quote_snapshot: Optional[QuoteSnapshot] = None


# ========== WebSocket 连接(成交流) ==========
def start_trades_websocket(symbol: str, on_trade: Callable):
    """
    订阅 TickDB trades 频道
    ⚠️ trades 接口支持数字货币和港股,不支持美股/A股
    """
    ws_url = f"wss://api.tickdb.ai/v1/market/ws?api_key={API_KEY}&symbol={symbol}&channel=trades"
    
    retry_delay = 1.0
    max_delay = 32.0
    max_retries = 10

    for attempt in range(max_retries):
        try:
            import websocket
            
            def on_message(ws, message):
                data = json.loads(message)
                # TickDB trades 推送格式: {"price": ..., "quantity": ..., "timestamp": ...}
                trade = {
                    "price": data.get("price"),
                    "quantity": data.get("quantity"),
                    "timestamp": data.get("timestamp")
                }
                on_trade(trade)

            def on_ping(ws, data):
                ws.send(json.dumps({"cmd": "pong"}))

            ws = websocket.WebSocketApp(
                ws_url,
                on_message=on_message,
                on_ping=on_ping,
                on_error=lambda ws, err: print(f"[WS Error] {err}")
            )
            
            thread = threading.Thread(target=ws.run_forever)
            thread.daemon = True
            thread.start()
            print(f"[WS] trades 订阅已启动: {symbol}")
            return ws

        except Exception as e:
            jitter = random.uniform(0, retry_delay * 0.1)
            sleep_time = retry_delay + jitter
            print(f"[WS] 连接失败 ({attempt+1}/{max_retries}),{sleep_time:.2f}s 后重试: {e}")
            time.sleep(sleep_time)
            retry_delay = min(retry_delay * 2, max_delay)

    raise RuntimeError("WebSocket 连接重试次数耗尽")


# ========== 报价快照轮询(depth) ==========
def poll_depth_snapshot(symbol: str, interval_ms: int = 100):
    """
    轮询 TickDB depth 频道获取最新报价快照
    ⚠️ depth 频道:港股 10 档 / 加密货币 10 档 / 美股 1 档
    ⚠️ 高频场景建议使用 WebSocket push,此处为兼容低频场景的轮询方案
    """
    latest_quote = {}

    def fetch_depth():
        try:
            resp = requests.get(
                f"{BASE_URL}/depth",
                headers=HEADERS,
                params={"symbol": symbol},
                timeout=(3.05, 10)
            )
            data = resp.json()
            
            if data.get("code") == 0:
                bids = data.get("data", {}).get("bids", [])
                asks = data.get("data", {}).get("asks", [])
                
                if bids and asks:
                    latest_quote["snapshot"] = QuoteSnapshot(
                        bid_price=float(bids[0][0]),
                        bid_quantity=float(bids[0][1]),
                        ask_price=float(asks[0][0]),
                        ask_quantity=float(asks[0][1]),
                        timestamp=int(data.get("ts", 0))
                    )
            else:
                handle_api_error(data)

        except requests.exceptions.Timeout:
            print("[API] depth 轮询超时,跳过本次")
        except Exception as e:
            print(f"[API] depth 轮询异常: {e}")

    def poll_loop():
        while True:
            fetch_depth()
            time.sleep(interval_ms / 1000)

    thread = threading.Thread(target=poll_loop)
    thread.daemon = True
    thread.start()
    
    def get_snapshot():
        return latest_quote.get("snapshot")

    return get_snapshot


# ========== Lee-Ready 核心分类逻辑 ==========
DIRECTION_BUY = "BUY"
DIRECTION_SELL = "SELL"
DIRECTION_UNCLASSIFIED = "UNCLASSIFIED"


def classify_trade_direction(trade: dict, state: LeeReadyState) -> str:
    """
    Lee-Ready 算法核心实现
    
    规则顺序:
    1. Quote Rule(报价比对):price >= ask → BUY, price <= bid → SELL
    2. Tick Rule(tick 方向递延):price == mid → 对比上一笔确认方向
    3. 中点价格兜底:无法判断时沿用上一笔确认方向
    
    ⚠️ 仅适用于港股和加密货币(trades 接口支持)
    """
    price = float(trade["price"])
    timestamp = trade["timestamp"]

    if state.quote_snapshot is None:
        return DIRECTION_UNCLASSIFIED

    bid = state.quote_snapshot.bid_price
    ask = state.quote_snapshot.ask_price

    if bid == 0 or ask == 0:
        return DIRECTION_UNCLASSIFIED

    # 更新状态机的报价记录(处理时间倒填:用最新报价而非同时间戳报价)
    state.last_bid = bid
    state.last_ask = ask

    spread = ask - bid
    mid = (bid + ask) / 2

    # 规则一:报价比对
    if price >= ask:
        state.last_classified_price = price
        state.last_classified_direction = DIRECTION_BUY
        return DIRECTION_BUY

    if price <= bid:
        state.last_classified_price = price
        state.last_classified_direction = DIRECTION_SELL
        return DIRECTION_SELL

    # 规则二:成交价在 bid-ask 之内(中点附近)
    # Lee-Ready 原版规则:用上一笔确认方向的成交价来判断
    if price > state.last_classified_price:
        state.last_classified_price = price
        state.last_classified_direction = DIRECTION_BUY
        return DIRECTION_BUY

    if price < state.last_classified_price:
        state.last_classified_price = price
        state.last_classified_direction = DIRECTION_SELL
        return DIRECTION_SELL

    # 规则三:价格与上一笔相同,沿用上一笔方向
    if state.last_classified_direction != DIRECTION_UNCLASSIFIED:
        return state.last_classified_direction

    return DIRECTION_UNCLASSIFIED


def build_classified_trade(trade: dict, direction: str, state: LeeReadyState) -> ClassifiedTrade:
    """构建已分类成交记录"""
    return ClassifiedTrade(
        price=float(trade["price"]),
        quantity=float(trade["quantity"]),
        timestamp=trade["timestamp"],
        direction=direction,
        bid_price=state.last_bid,
        ask_price=state.last_ask,
        spread=state.last_ask - state.last_bid
    )


# ========== 主程序 ==========
def run(symbol: str):
    """
    演示:订阅逐笔成交 + 实时分类方向
    """
    state = LeeReadyState()
    classified_trades = []
    counter = 0

    def on_trade(trade):
        nonlocal counter
        direction = classify_trade_direction(trade, state)
        classified = build_classified_trade(trade, direction, state)
        classified_trades.append(classified)
        counter += 1

        # 每 20 笔打印一次摘要
        if counter % 20 == 0:
            buys = sum(1 for t in classified_trades[-20:] if t.direction == DIRECTION_BUY)
            sells = sum(1 for t in classified_trades[-20:] if t.direction == DIRECTION_SELL)
            print(f"[{datetime.fromtimestamp(trade['timestamp']/1000):%H:%M:%S}] "
                  f"最近 20 笔: BUY={buys}  SELL={sells}  "
                  f"价格={classified.price}  方向={direction}")

    # ⚠️ 请确认 symbol 在 trades 接口支持范围内(数字货币/港股)
    # 示例符号:BTC.USDT, ETH.USDT, 9988.HK
    print(f"[初始化] 订阅 {symbol} 的逐笔成交并实时分类方向")
    
    get_snapshot = poll_depth_snapshot(symbol)
    
    # 将快照注入状态机(实际生产中建议异步队列处理)
    def refresh_quote():
        snapshot = get_snapshot()
        if snapshot:
            state.quote_snapshot = snapshot

    start_trades_websocket(symbol, on_trade)


if __name__ == "__main__":
    # ⚠️ 示例 symbol,需确保该标的在 TickDB trades 支持范围内
    run("BTC.USDT")

3.2 代码要点说明

为什么同时需要 tradesdepth 两个频道? Lee-Ready 算法的输入是成交价配对同时间的 bid/ask 价格。depth 频道提供报价快照,trades 频道提供逐笔成交,两者组合才能完成方向推断。

时间倒填的实际工程处理:生产环境中,不能简单地把同一时刻的报价快照和成交配对,因为行情传输存在延迟。上面的代码通过维护一个"始终持有最新报价"的快照来解决这个问题,同时记录下当前成交时刻的 bid/ask 到结果字段中,方便后续回溯时验证。

为什么不直接用 depthbids[0]asks[0] 判断买卖方向? depth 频道的快照反映的是"挂了哪些单",而不是"哪些单刚刚成交了"。逐笔成交的 price 字段和快照 bid/ask 的实时比对,才是 Lee-Ready 方法的核心。


四、方向推断后的衍生指标

方向分类完成后,我们终于有了一张可以使用的"谁在买卖"的数据表。接下来介绍几个基于分类结果构建的实用指标。

4.1 订单流失衡(Order Flow Imbalance, OFI)

OFI 是单位时间内主动买入量与主动卖出量的差值,公式:

$$
\text{OFI}(t) = \sum_{i \in [t-\Delta t, t]} q_i \cdot \mathbb{1}{d_i = \text{BUY}} - \sum{i \in [t-\Delta t, t]} q_i \cdot \mathbb{1}_{d_i = \text{SELL}}
$$

其中 $q_i$ 是成交量,$d_i$ 是分类方向。

from collections import deque
from dataclasses import dataclass


@dataclass
class OFIWindow:
    """OFI 滑动窗口计算器"""
    window_seconds: int = 60
    classified_trades: deque = field(default_factory=deque)

    def add_trade(self, trade: ClassifiedTrade):
        self.classified_trades.append(trade)
        cutoff = trade.timestamp - self.window_seconds * 1000
        while self.classified_trades and self.classified_trades[0].timestamp < cutoff:
            self.classified_trades.popleft()

    def compute_ofi(self) -> float:
        buy_volume = sum(t.quantity for t in self.classified_trades if t.direction == DIRECTION_BUY)
        sell_volume = sum(t.quantity for t in self.classified_trades if t.direction == DIRECTION_SELL)
        return buy_volume - sell_volume

    def ofi_ratio(self) -> float:
        """
        OFI 比率:正向 OFI 占比(0.5 = 多空平衡)
        用于跨标的口味归一化
        """
        buy_volume = sum(t.quantity for t in self.classified_trades if t.direction == DIRECTION_BUY)
        sell_volume = sum(t.quantity for t in self.classified_trades if t.direction == DIRECTION_SELL)
        total = buy_volume + sell_volume
        if total == 0:
            return 0.5
        return buy_volume / total

4.2 买卖压力比(Buy/Sell Pressure Ratio)

$$
\text{PressureRatio} = \frac{\sum_{i \in \text{window}} q_i \cdot \mathbb{1}{d_i=\text{BUY}}}{\sum{i \in \text{window}} q_i \cdot \mathbb{1}_{d_i=\text{SELL}}}
$$

def compute_pressure_ratio(window_trades: list[ClassifiedTrade]) -> float:
    buy_q = sum(t.quantity for t in window_trades if t.direction == DIRECTION_BUY)
    sell_q = sum(t.quantity for t in window_trades if t.direction == DIRECTION_SELL)
    if sell_q == 0:
        return float('inf') if buy_q > 0 else 1.0
    return buy_q / sell_q


def detect_flow_shift(classified_trades: list[ClassifiedTrade], lookback: int = 50, threshold: float = 2.0) -> str:
    """
    监测订单流方向切换
    
    当压力比从 >threshold 快速跌至 <1/threshold 时,
    视为卖方压力骤增的信号
    
    ⚠️ 仅作为辅助参考,不构成交易建议
    """
    if len(classified_trades) < lookback:
        return "INSUFFICIENT_DATA"
    
    recent = classified_trades[-lookback:]
    ratio = compute_pressure_ratio(recent)
    
    if ratio > threshold:
        return "STRONG_BUY_PRESSURE"
    elif ratio < 1 / threshold:
        return "STRONG_SELL_PRESSURE"
    else:
        return "BALANCED"

4.3 成交价分布偏离

统计在窗口期内,主动买入的平均成交价与主动卖出的平均成交价之差(相对于中间价的百分比),用于判断哪一方在更有利的位置成交。

def compute_trade_price_spread(classified_trades: list[ClassifiedTrade]) -> dict:
    buys = [t for t in classified_trades if t.direction == DIRECTION_BUY]
    sells = [t for t in classified_trades if t.direction == DIRECTION_SELL]
    
    buy_avg = sum(t.price * t.quantity for t in buys) / sum(t.quantity for t in buys) if buys else 0
    sell_avg = sum(t.price * t.quantity for t in sells) / sum(t.quantity for t in sells) if sells else 0
    
    # 相对于 mid-price 的偏离
    mid_avg = (buy_avg + sell_avg) / 2 if (buy_avg and sell_avg) else 0
    
    return {
        "buy_avg_偏离mid_bps": (buy_avg - mid_avg) / mid_avg * 10000 if mid_avg else 0,
        "sell_avg_偏离mid_bps": (sell_avg - mid_avg) / mid_avg * 10000 if mid_avg else 0,
        "buy_avg": buy_avg,
        "sell_avg": sell_avg,
    }

五、方法局限性与应对

Lee-Ready 算法不是银弹,理解它的局限性是正确使用它的前提。

5.1 底层数据要求

Lee-Ready 的前提假设是:每一笔成交都能配对到撮合时刻的 bid/ask 快照。这意味着:

  • 必须有 depth 数据源:如果你的标的只有 K 线数据没有订单簿数据,Lee-Ready 无法适用
  • TickDB 的数据覆盖边界:trades 接口支持港股和数字货币,不支持美股和 A 股。对于美股历史回测,建议使用 TickDB 的 /v1/market/kline 接口(10 年级别历史 K 线数据)配合量价关系因子,而非逐笔成交方向推断
  • 数据对齐延迟:WebSocket 推送的 trades 和 depth 可能存在几十毫秒的时间差,上游数据源的时序质量直接影响分类准确率

5.2 算法本身的不确定性

场景 Lee-Ready 的判断 潜在问题
成交价 == mid 且无历史方向 UNCLASSIFIED 高频小额成交可能导致大量无法分类
bid-ask spread = 0(极度深度市场) 报价比对失效 极端情况下两个方向同时满足
算法驱动的大宗成交 方向可能反映的是机器行为而非意图 单纯方向分类无法区分人工和算法

5.3 替代方案速查

当 Lee-Ready 条件不满足时,可以考虑以下替代方法:

方法 数据要求 适用场景
成交量加权均价变化方向 只需价格和量 无法获取订单簿时的近似方案
ER(Ellis, Michaely, O'Hara)算法 成交价 + 成交量 + 报价 与 Lee-Ready 类似,但权重计算方式不同
PIN 模型(Probability of Informed Trading) 成交量时间序列 预测流动性风险,不适合实时分类
市场微观结构噪声过滤 需结合波动率阈值 过滤 spread 内的小幅震荡成交

六、在 TickDB 中完整使用这套方案

上面给出的代码是完整的独立模块,可以直接作为 TickDB 数据流的一个处理环节接入你的量化系统。以下是各接口的使用场景对照:

数据需求 TickDB 接口 说明
逐笔成交流(实时) WebSocket / trades 港股和数字货币可用
订单簿快照(实时) WebSocket / depth 港股 10 档 / 加密 10 档 / 美股 1 档
历史 K 线回测 GET /v1/market/kline 10 年级别,清洗对齐,支持多周期
当前 K 线 GET /v1/market/kline/latest 实时展示用,勿用于回测
交易品种查询 GET /v1/symbols/available 判断哪些标的在支持范围内

结语:看见暗流

回到开头的那张散点图。量化研究的核心能力之一,就是把"价格是结果,订单簿是原因"这件事看穿。Lee-Ready 算法给了我们一把在逐笔成交级别还原市场参与意图的钥匙——不是完美答案,但是是目前为止工程界共识最强的起点。

方向分类只是微观结构分析的第一步。有了方向数据,你才可能构建 OFI、压力比、VPIN 这些因子;有了因子,你才有可能捕捉到那些价格还没有反映、但订单流已经在说的信号。

市场里真正的Alpha,往往藏在价格图表看不见的地方——在逐笔成交里,在买卖盘口的消长里,在那张被人忽略的订单簿里。


下一步行动

如果你想亲手实现本文的分类算法

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY,复制本文代码即可运行(推荐先用 BTC.USDT 等数字货币标的测试)

如果你关心的是美股历史回测
TickDB 的 /v1/market/kline 接口提供 10 年级别的历史 K 线数据,支持多周期回测。注意美股不支持逐笔成交方向推断,建议使用量价类因子替代。

如果你在搭建订单流分析系统:建议同时订阅 depth + trades 双频道,参考本文代码架构,将分类结果写入时序数据库(如 TimescaleDB/InfluxDB),后续用 ClickHouse 或 DuckDB 做 OLAP 分析。


风险提示:本文不构成任何投资建议。Lee-Ready 算法是一种统计推断方法,存在固有误差,分类结果不应直接作为交易信号使用。算法在实际市场中的表现受数据质量、市场制度和流动性状况等多重因素影响。市场有风险,投资需谨慎。