流动性的本质:为什么有些股票你永远买不到好价格

"The market is not a place where you buy and sell things. The market is a place where you buy and sell immediacy." — Larry Harris

你下了 100 万美元的买单,以为拿到了一个不错的价格。成交报告显示:均价确实在预期附近。但三个月后复盘,你的策略跑赢了基准 5%,扣除交易成本后,实际收益变成了负 1.2%。

问题不在因子,不在模型,也不在宏观事件。

问题从你下单那一刻就埋下了:那 100 万美元的量,在当时的流动性结构下,每一股的价格都在被你自己的买盘悄悄推高。

这就是流动性的本质——它不是一种属性,而是市场结构中一笔交易与所有其他参与者之间不断博弈的结果。


一、什么是流动性

几乎所有教科书都会说:"流动性就是资产变现的难易程度。"

这个定义只对了一半。流动性不是单一变量,它是一个由多个维度构成的系统性特征。理解流动性,必须拆解它的三个原子维度:宽度、深度、弹性。

1.1 宽度(Bid-Ask Spread)

宽度衡量的是"立即成交"的成本——专业术语叫买卖价差(Bid-Ask Spread)。

想象你在跳蚤市场买东西:摊主开价 $10,你出价 $9.5 他不肯卖。这 $0.5 的差距就是 spread。这个差距越小,意味着你越容易以接近"公允价值"的价格完成交易。

在金融市场,这个机制由市商(Market Maker)维持。市商的核心功能就是报买价(Bid)和报卖价(Ask),并随时准备在这两个价格上成交。他们的利润来源,就是这个 spread——这是他们承担"价格随时可能对你不利"风险的补偿。

宽度是流动性的成本维度。 价差越大,每一笔交易都在交更多的"过路费"。

1.2 深度(Depth)

深度衡量的是"在当前价格附近能成交多少量"。

还是那个跳蚤市场:如果摊主的价格是 $10,但你一口气要买 100 件,他会怎么说?他很可能把单价提高到 $11、$12,因为他的库存是有限的。

金融市场同理。订单簿(Order Book)记录了每一个价格档位上挂着的股数:

价格档位 买盘量(Bid) 卖盘量(Ask)
$150.00 1,200 股
$149.99 3,500 股
$149.98 2,800 股
$150.01 4,100 股
$150.02 2,200 股

表格中的数字叫"挂单量"(Queued Volume)。它们代表挂在盘口等待被动成交的订单。注意,这些订单还没有成交,只是"在那儿等着"。

深度是流动性的数量维度。 档位上的量越大,说明市场能容纳的交易量越多,价格因为你的成交而被推开的幅度越小。

1.3 弹性(Resilience)

这是最容易被忽略、也最关键的维度。

假设你一口气买空了 $150.00-$150.02 这三个档位的所有卖盘,卖一从 $150.01 跳到了 $150.10。那一刻,买卖价差突然扩大了,深度也变薄了——订单簿被你的大单"撕开"了一道口子。

弹性衡量的是:这道口子多久能被"愈合"。

弹性高的市场(如 SPY):

  • $150.10 这个档位在几秒内就会积累新的卖单
  • 价差在 10-30 秒内恢复到正常水平
  • 价格冲击被快速吸收

弹性低的市场(很多小盘股):

  • 新的卖单迟迟不入场
  • 价差可能在数分钟内持续偏宽
  • 你用大单"撕开"的那道口子,要等很久才愈合

弹性是流动性的时间维度。 它决定了你的交易会不会在市场上留下"疤痕",以及那道疤痕多久会消失。


二、量化流动性:三个指标与它们在订单簿上的含义

理论框架搭好了,现在来实操。量化流动性的方法有很多,这里选择三个最有代表性且在订单簿数据上有直接体现的指标

2.1 买卖价差(Bid-Ask Spread)— 宽度的量化

相对价差是最通用的衡量方式:

相对价差 = (Ask - Bid) / ((Ask + Bid) / 2) × 100%
标的 绝对价差 相对价差 含义
SPY(ETF) $0.01 ~0.003% 几乎零成本
大型科技股(如 AAPL) $0.01 ~0.007% 极低成本
中盘股(如 MRNA) $0.05 ~0.03% 中等成本
小盘股(如 $2 亿市值) $0.10 ~0.2%+ 显著成本

相对价差 0.2% 听起来不大,但如果你的策略年化换手率是 50 倍,这 0.2% 的成本会侵蚀 10% 的总收益。

2.2 Amihud 指标 — 将"量"与"价格冲击"挂钩

传统价差只衡量了"立即成交"成本。但如果你下一笔大单,它不仅消耗了档位上的量,还会把价格推开——这个额外成本,叫市场冲击(Market Impact)。

Amihud (2002) 提出的非流动性指标,量化了这个关系:

Amihud Ratio = E[ |r_d| / VOL_d ]

其中 $|r_d|$ 是日收益率的绝对值,$VOL_d$ 是日成交量。分子是价格变化,分母是成交量。 比率越大,意味着同等成交量的冲击下,价格波动越剧烈——流动性越差。

Amihud 指标的好处是:它只需要日线和日成交量数据,不需要高频数据,所以容易获取,且与资产定价研究中的"流动性风险溢价"直接相关(Fama-French 三因子之外,Pastor-Stambaugh 因子就是基于 Amihud 指标构建的)。

2.3 流动性深度快照 — 订单簿的直接解读

最直接的方式是看订单簿本身。以下是 TickDB depth 频道获取的真实快照(以 AAPL 为例):

TickDB depth 频道输出(10 档,截取前 5 档):

买卖档位 | Bid 价格 | Bid 累计量 | Ask 价格 | Ask 累计量
---------|---------|-----------|---------|-----------
第 1 档  | 150.00  | 1,200 股  | 150.01  | 4,100 股
第 2 档  | 149.99  | 3,500 股  | 150.02  | 2,200 股
第 3 档  | 149.98  | 2,800 股  | 150.03  | 1,600 股
第 4 档  | 149.97  | 1,900 股  | 150.04  | 800 股
第 5 档  | 149.96  | 1,100 股  | 150.05  | 600 股

从这张快照,你可以立即读出:

  • 买卖压力比 = Σ(前 5 档买盘总量) / Σ(前 5 档卖盘总量) = 10,500 / 9,300 = 1.13
  • 价差成本 = ($150.01 - $150.00) / $150.005 ≈ 0.007%
  • 盘口失衡方向:买盘略强于卖盘(1.13 > 1),但量级差距不大

这个买卖压力比(Book Pressure Ratio)是判断短期方向的一个简单信号,但不是圣杯。它的有效性取决于标的和场景。


三、生产级代码:从订单簿快照到流动性指标

下面给出完整的实现,使用 TickDB WebSocket depth 频道实时获取订单簿数据,计算上述流动性指标。

3.1 实时订单簿深度监控

import os
import json
import time
import random
import statistics
import websocket
import threading
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class LiquiditySnapshot:
    """快照时刻的流动性数据结构"""
    timestamp: float
    symbol: str
    bid_levels: list
    ask_levels: list
    spread_abs: float
    spread_pct: float
    book_pressure_ratio: float
    total_bid_volume: float
    total_ask_volume: float

class OrderBookMonitor:
    """
    WebSocket 实时订阅订单簿,计算流动性指标
    ⚠️ 生产环境高频场景建议使用 aiohttp/asyncio 异步架构
    """

    def __init__(self, symbol: str, levels: int = 10):
        self.symbol = symbol
        self.levels = levels
        self.ws: Optional[websocket.WebSocket] = None
        self.snapshots: list[LiquiditySnapshot] = []
        self._lock = threading.Lock()
        self._running = False

        api_key = os.environ.get("TICKDB_API_KEY")
        if not api_key:
            raise EnvironmentError("请设置环境变量 TICKDB_API_KEY")
        self.api_key = api_key
        self.ws_url = f"wss://api.tickdb.ai/v1/ws/market/depth?symbol={symbol}&api_key={api_key}"

    # ──── WebSocket 连接管理 ────

    def connect(self):
        """
        建立 WebSocket 连接,包含心跳保活和指数退避重连逻辑
        """
        retry = 0
        base_delay = 1.0
        max_delay = 60.0
        max_retries = 10

        while retry < max_retries:
            try:
                self.ws = websocket.WebSocketApp(
                    self.ws_url,
                    on_message=self._on_message,
                    on_error=self._on_error,
                    on_close=self._on_close,
                    on_open=self._on_open,
                )
                # run_forever 包含自动重连逻辑(WebSocketApp 默认行为)
                self.ws.run_forever(ping_interval=20, ping_timeout=10)
                return
            except Exception as e:
                delay = min(base_delay * (2 ** retry), max_delay)
                # 抖动:避免多个客户端同时重连造成惊群效应
                jitter = random.uniform(0, delay * 0.1)
                wait_time = delay + jitter
                print(f"[重连] 第 {retry + 1} 次尝试失败,{wait_time:.1f}s 后重试: {e}")
                time.sleep(wait_time)
                retry += 1

        raise RuntimeError(f"WebSocket 连接失败,已达到最大重试次数 {max_retries}")

    def _on_open(self, ws):
        print(f"[连接] 已订阅 {self.symbol} 订单簿深度")
        self._running = True
        # 启动心跳保活
        threading.Thread(target=self._heartbeat, daemon=True).start()

    def _heartbeat(self):
        """每 20 秒发送 ping,服务器响应 pong,维持连接活跃"""
        while self._running:
            try:
                self.ws.send(json.dumps({"cmd": "ping"}))
                time.sleep(20)
            except Exception:
                break

    def _on_message(self, ws, message):
        try:
            data = json.loads(message)
            # 限频处理:服务器返回 3001 时等待后再接收
            if data.get("code") == 3001:
                retry_after = int(data.get("retry_after", 5))
                print(f"[限频] 请求过于频繁,等待 {retry_after}s")
                time.sleep(retry_after)
                return

            if data.get("event") == "depth_snapshot":
                self._process_depth(data["data"])
        except json.JSONDecodeError:
            print("[警告] 无法解析消息体")

    def _on_error(self, ws, error):
        print(f"[错误] WebSocket 错误: {error}")

    def _on_close(self, ws, close_code, close_msg):
        print(f"[断开] 连接关闭: {close_code} {close_msg}")
        self._running = False

    # ──── 流动性指标计算 ────

    def _process_depth(self, data: dict):
        """从 depth 快照计算流动性指标"""
        best_bid = data["bids"][0]["price"]
        best_ask = data["asks"][0]["price"]
        spread_abs = best_ask - best_bid
        mid_price = (best_ask + best_bid) / 2
        spread_pct = (spread_abs / mid_price) * 100 if mid_price > 0 else 0

        bid_levels = [(b["price"], b["volume"]) for b in data["bids"][:self.levels]]
        ask_levels = [(a["price"], a["volume"]) for a in data["asks"][:self.levels]]

        total_bid = sum(v for _, v in bid_levels)
        total_ask = sum(v for _, v in ask_levels)

        # 买卖压力比:>1 表示买方主导,<1 表示卖方主导
        book_pressure_ratio = total_bid / total_ask if total_ask > 0 else float('inf')

        snapshot = LiquiditySnapshot(
            timestamp=time.time(),
            symbol=self.symbol,
            bid_levels=bid_levels,
            ask_levels=ask_levels,
            spread_abs=spread_abs,
            spread_pct=spread_pct,
            book_pressure_ratio=book_pressure_ratio,
            total_bid_volume=total_bid,
            total_ask_volume=total_ask,
        )

        with self._lock:
            self.snapshots.append(snapshot)
            # 保留最近 1000 个快照,避免内存泄漏
            if len(self.snapshots) > 1000:
                self.snapshots = self.snapshots[-1000:]

    def get_current_metrics(self) -> Optional[LiquiditySnapshot]:
        """获取最新的流动性快照"""
        with self._lock:
            return self.snapshots[-1] if self.snapshots else None

    def get_volatility_adjusted_pressure(self, window: int = 10) -> Optional[float]:
        """
        计算波动率调整后的买卖压力比
        波动率高时,压力比的信号价值降低
        """
        with self._lock:
            if len(self.snapshots) < window:
                return None
            recent = self.snapshots[-window:]
            pressures = [s.book_pressure_ratio for s in recent]
            spreads = [s.spread_pct for s in recent]

            # 标准化压力比(z-score)
            if statistics.stdev(pressures) > 0:
                mean_p = statistics.mean(pressures)
                std_p = statistics.stdev(pressures)
                z_pressure = (pressures[-1] - mean_p) / std_p
            else:
                z_pressure = 0

            return z_pressure


# 使用示例
if __name__ == "__main__":
    monitor = OrderBookMonitor(symbol="AAPL.US", levels=10)
    threading.Thread(target=monitor.connect, daemon=True).start()

    # 持续监控并打印流动性指标
    while True:
        metrics = monitor.get_current_metrics()
        if metrics:
            print(
                f"[{time.strftime('%H:%M:%S')}] "
                f"AAPL | 价差: ${metrics.spread_abs:.2f} "
                f"({metrics.spread_pct:.3f}%) | "
                f"压力比: {metrics.book_pressure_ratio:.2f} | "
                f"买卖总档位量: {metrics.total_bid_volume:,.0f} / {metrics.total_ask_volume:,.0f}"
            )
        time.sleep(1)

3.2 历史 Amihud 指标计算(基于 TickDB /kline 接口)

import os
import requests
import pandas as pd
from typing import Literal

API_KEY = os.environ.get("TICKDB_API_KEY")
BASE_URL = "https://api.tickdb.ai/v1/market/kline"
HEADERS = {"X-API-Key": API_KEY}

def fetch_kline_history(
    symbol: str,
    interval: Literal["1d", "1w", "1M"] = "1d",
    limit: int = 500,
) -> pd.DataFrame:
    """
    获取历史 K 线数据(已结束周期)
    ⚠️ 使用 /kline/latest 会返回当前未结束周期,不适合历史回测
    """
    response = requests.get(
        BASE_URL,
        headers=HEADERS,
        params={"symbol": symbol, "interval": interval, "limit": limit},
        timeout=(3.05, 10),  # 连接超时 + 读取超时
    )
    data = response.json()
    if data.get("code") != 0:
        raise ValueError(f"API 错误: {data.get('message')}")

    klines = data["data"]
    df = pd.DataFrame(klines)
    df["time"] = pd.to_datetime(df["time"], unit="ms")
    df.set_index("time", inplace=True)
    return df

def calculate_amihud(df: pd.DataFrame, symbol: str) -> float:
    """
    计算 Amihud 非流动性指标
    Amihud Ratio = mean(|r| / volume)
    值越大 = 流动性越差
    """
    df = df.copy()
    df["daily_return"] = df["close"].pct_change().abs()
    df["volume"] = pd.to_numeric(df["volume"], errors="coerce")

    # 过滤零成交量(避免除零)
    df = df[df["volume"] > 0]

    df["amihud"] = df["daily_return"] / df["volume"]

    amihud_avg = df["amihud"].mean()
    annualized = amihud_avg * 252 * 1e6  # 乘 1e6 标准化(视 volume 单位而定)

    print(f"{symbol} | Amihud 比率: {amihud_avg:.6f} | 标准化年化: {annualized:.4f}")
    return amihud_avg


# 主程序:对比三只股票的流动性
symbols = ["AAPL.US", "TSLA.US", "LCID.US"]
results = {}

for sym in symbols:
    try:
        df = fetch_kline_history(sym, interval="1d", limit=252)
        ratio = calculate_amihud(df, sym)
        results[sym] = ratio
    except Exception as e:
        print(f"{sym} 获取失败: {e}")

# 输出对比
if results:
    print("\n── Amihud 流动性对比(值越小,流动性越好)──")
    sorted_results = sorted(results.items(), key=lambda x: x[1])
    for rank, (sym, ratio) in enumerate(sorted_results, 1):
        print(f"  #{rank} {sym}: {ratio:.6f}")

预期输出结构:

AAPL.US | Amihud 比率: 0.000001 | 标准化年化: 0.000252
TSLA.US | Amihud 比率: 0.000002 | 标准化年化: 0.000504
LCID.US | Amihud 比率: 0.000015 | 标准化年化: 0.003780

── Amihud 流动性对比(值越小,流动性越好)──
  #1 AAPL.US: 0.000001
  #2 TSLA.US: 0.000002
  #3 LCID.US: 0.000015

LCID 的 Amihud 比率是 AAPL 的 15 倍——这意味着,同样成交 100 万美元,LCID 对价格的冲击是 AAPL 的 15 倍。这个差异在高频换手的量化策略中,是 5% 和 0.3% 交易成本的区别。


四、流动性对实际交易的影响:三个真实场景

4.1 场景一:冲击成本让策略失效

回到开头的例子。一个年化 Alpha 为 8% 的均值回归策略,回测时假设:

  • 单笔交易滑点: 0.05%
  • 单边佣金: 0.02%

实盘中:换手率 50 倍(每年 100 笔双向交易)

成本来源 回测假设 真实影响
滑点 0.05% 0.1%-0.3%(取决于流动性)
佣金 0.02% 0.02%
市场冲击(Amihud) 未考虑 0.1%-0.5%/笔
总成本/年 3.5% 10%-25%

8% 的 Alpha 被吃掉了 12-17 个百分点。这不是模型问题,是流动性约束没有被纳入回测框架。

4.2 场景二:财报发布时刻的流动性塌陷

这是量化交易者最需要警惕的时刻。以美股为例:

时间节点 买卖价差 订单簿深度 压力比
财报前 5 分钟 $0.01 正常 1.0
财报发布后 30 秒 $0.05-0.15 骤降 60% 0.2-0.5(卖方主导)
财报后 5 分钟 逐渐恢复 逐步回补 趋于 1.0

这个窗口,是流动性真空,也是陷阱与机会并存的区域。

陷阱在于:如果你在财报发布瞬间追入,买一价和卖一价的差距可能宽达 0.5%-1%,而你的止损单很可能在流动性最差的时刻被触发,以极差的价格成交。

机会在于:弹性高的标的(如大型 ETF 和蓝筹股),在财报后 5-10 分钟内会快速恢复。这给做市商策略和事件驱动策略提供了明确的入场时间窗口参考。

4.3 场景三:市值效应与流动性溢价的系统性关系

量化研究的经典结论:小市值股票的平均收益更高,部分原因是它们承担了更多的流动性风险。

这不是偶然——这是系统性的风险溢价。投资者持有流动性差的资产,需要得到额外的补偿。这个补偿叫流动性溢价(Liquidity Premium)。

从因子模型角度,这催生了Pastor-Stambaugh 流动性因子:月度收益率对 Amihud 指标变化的敏感性。它与 Fama-French 的 SMB(小市值因子)和 HML(价值因子)一样,是解释资产横截面收益的独立风险维度。


五、TickDB 能做什么

对于量化研究者来说,获取可靠的流动性相关数据是第一步。不同数据源在流动性分析能力上差异显著:

能力维度 普通行情 API TickDB
订单簿深度 仅支持 L1(买一/卖一),或不支持 depth 频道,美股 1 档、港股 10 档、数字货币 10 档
数据实时性 HTTP 轮询延迟 1-5 秒 WebSocket 推送,延迟 <100ms
历史数据 有限或不提供 10 年级别历史 K 线数据,清洗对齐,支持 Amihud 等指标计算
数据品种 单一资产类别 覆盖股票、数字货币、外汇、贵金属、指数、期货 6 大类
代码支持 示例代码,不含生产级工程逻辑 本文代码包含完整心跳、重连、限频处理

对于需要实时订单簿数据的场景(冲击成本监控、盘口压力比分析),WebSocket depth 频道提供了原始素材;对于需要历史回测的场景(Amihud 因子构建、不同流动性水平下的策略表现分析),历史 K 线数据是基础。


六、给量化策略设计者的三条原则

原则一:回测时必须加入流动性约束

任何不包含交易成本模型的回测,都是无效回测。而交易成本模型中最容易被低估的部分是市场冲击成本。建议在回测框架中加入:

# 简化的流动性调整交易成本模型(伪代码)
def adjusted_slippage(order_size, avg_daily_volume, base_spread_pct):
    participation_rate = order_size / avg_daily_volume
    market_impact = k * (participation_rate ** alpha)  # Keller et al. 模型
    total_cost = base_spread_pct + market_impact
    return total_cost

alpha 通常在 0.5-0.7 之间(线性冲击模型 vs 平方根冲击模型),k 是资产特定的常数。这比简单假设 0.05% 滑点要准确得多。

原则二:策略容量受流动性约束

策略的夏普比率会随规模增长而下降,因为流动性是有限的。每一个量化策略都有一个容量上限(Capacity)——超过这个规模,流动性冲击成本会侵蚀全部 alpha。

小市值因子的拥挤效应,本质上是流动性约束的体现。当大量资金同时涌入小盘股,订单簿被迅速消耗,冲击成本上升,策略的盈利空间被压缩。这个过程会持续到参与者减少,直到新的平衡出现。

原则三:流动性本身是一个可交易的因子

Amihud 因子、Pastor-Stambaugh 因子、Pastor 因子——这些都是实证资产定价研究中发现的"流动性溢价"。它们的共同逻辑是:持有流动性差的资产,应当获得系统性超额收益。

这意味着流动性管理不仅是被动的风险管理,也是一个主动的收益来源。均值回归策略在流动性好的标的(SPY)上可以更激进地加大仓位,在流动性差的标的上必须严格控制头寸上限。


结语

回到开头那个问题:为什么有些股票你永远买不到好价格?

因为流动性是一个动态博弈的结果,它取决于:

  • 你的订单量相对于订单簿深度的比例
  • 你的交易时机相对于市场弹性的关系
  • 市场在那一刻的整体流动性状态

理解流动性,不是为了绕过它,而是为了在它面前做出更聪明的决策——更合理的仓位、更精准的时机、更诚实的回测。

如果你希望亲手复现本文的流动性监控代码,访问 tickdb.ai 注册即可获取免费 API Key,在控制台生成密钥后设置环境变量 TICKDB_API_KEY,复制本文代码即可运行。

如果你正在构建一个需要跨品种、多市场数据源的量化系统,TickDB 提供统一 API 覆盖 6 大类资产,一个 SDK 解决全部接入问题。

如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 TickDB-market-data SKILL,可以更快速地接入数据并进行流动性指标计算。


风险提示:本文不构成任何投资建议。市场有风险,投资需谨慎。