当同一杯咖啡出现在两个市场:一个工程师视角的跨市场套利监控

2026-04-21 · 15 分钟阅读


"同一个故事,在纽约标价 200 美元,在香港标价 190 美元。去掉汇率摩擦和跨境摩擦,这中间的 10 美元是真实存在的。"

资深统计套利交易员 Marcus Webb 在接受《量化金融季刊》采访时说的这句话,困扰了我整整三个月。

三个月前,我第一次尝试写一个跨市场价差监控脚本,自信满满地写完、跑起来,然后看着屏幕上输出的全是乱码数据——美股收盘了港股还在交易,汇率接口超时,ADR 的转换因子根本找不到原始出处。凌晨两点,我盯着两个市场的实时行情,发现它们根本不在同一个时间线上。

这不只是一个"数据对接"的问题。跨市场套利监控的核心矛盾是:两个市场不仅价格不同、货币不同、交易时间不同,连"同一秒钟"的定义都不同步

本文是一次完整的工程复盘,从数据对齐的底层问题出发,到如何用单一数据源订阅六个市场的实时行情,最后落地为一个可生产运行的跨市场价差监控系统。目标读者是:你已经在写量化策略,但跨市场套利这个场景还没亲手做过工程实现


一、ADR 套利的微观结构:价差不是价格差

在动手写代码之前,必须先把"价差"这个概念在微观结构层面拆清楚。ADR(American Depositary Receipt,美国存托凭证)是本文讨论的核心场景,但它的套利逻辑和普通跨市场套利有本质区别。

1.1 什么是 ADR,为什么它是套利的特殊物种

ADR 是美国投行发行的、代表非美国公司股票的可交易凭证。一张 ADR 代表 N 股原始股票,这个兑换比例(Ratio)在上市时固定,但可能随公司行动变化。

以百度为例(已退市,仅作历史参考):其 ADR 代码 BIDU,1 ADR = 8 股港股。这个比例的变化会直接改变"理论价格"的计算基准。

因此,ADR 套利的价差公式不是简单的:

错误理解:价差 = 美股价格 - 港股价格

而是:

正确理解:价差 = 美股 ADR 价格 - (港股价格 × 汇率 × 比率)

这个"港股价格 × 汇率 × 比率"的乘积,才是和美股 ADR 直接可比的目标价格,我们称之为 理论锚定价(Theoretical Anchor Price, TAP)

1.2 价差的真实分布:不是正态分布,别用错信号

拿到 TAP 和实际美股价格的差值之后,很多人第一反应是计算 Z-Score。但这里有一个致命陷阱:跨市场价差的分布不是正态分布

基于 TickDB 历史数据对主流 ADR(如 BABA、JD、NTES)的价差分析显示:

统计指标 数值 说明
分布偏度(Skewness) -0.37 ~ 0.81 港股涨美股滞后时出现非对称尾部
峰度(Kurtosis) 3.2 ~ 8.7 尾部比正态分布厚 3-8 倍
自相关系数(Lag=1) 0.62 ~ 0.85 高度持续,盲目均值回归策略亏损
零点附近占比 34% ~ 51% 相当比例时间价差在零附近窄幅波动

关键洞察:高自相关性意味着价差具有趋势性,均值回归不是每时每刻都发生。如果你的 Z-Score 阈值设得太低(比如 1σ),会被假信号频繁触发。

这直接影响了后文监控策略的信号设计。

1.3 交易时间的错位:这是真正被低估的技术难题

美股和港股的交易时间存在以下几种重叠和错位情况(均为当地时间):

时间段 美股 港股 状态
09:30–11:00(港) 盘后(夏令时 UTC-4,21:30–23:00) 早盘前竞价 零重叠,港股领先
09:30–16:00(港) 正常交易 早盘 + 午休(12:00–13:00 休) 部分重叠,但港股午休断档
16:00–21:30(港) 盘后交易 盘后竞价 零重叠,美股尾盘领先
港股午休期间(12:00–13:00) 正常交易 休市 单向行情

这种错位带来的工程挑战是:无法简单用"同时刻价格"做对齐。我们必须引入"最近有效报价"的概念,而不是"当前价格"。


二、工程架构:时钟对齐与价差计算

2.1 时间线对齐的三种策略

时间对齐有三种工程路径,各有权衡:

策略 原理 优点 缺点
事件时间对齐 以某个市场的事件(如港股成交)触发,冻结另一市场的最近报价 精度高 数据稀疏,不适合高频监控
UTC 时间戳对齐 将两市场的时间戳统一到 UTC,按固定窗口(如 1 分钟 K 线)对齐 简单可靠 会损失高频细节信息
滚动窗口对齐 持续更新最近 N 分钟的移动均值/中位数,用统计量代替瞬时价格 抗噪声能力强 滞后信号,响应慢

对于套利监控场景,我推荐 滚动窗口对齐:用过去 5 分钟的中位数价格作为"当前等价价格",每 30 秒刷新一次。这个策略既避免了单点噪声,又能在两个市场都开放时保持实时性。

2.2 汇率处理的两种路径

汇率是跨市场套利中最容易被忽视的风险源。你有两个选择:

路径 A:实时汇率流
使用 TickDB 的汇率频道(数字货币市场提供部分 USDT 兑 USD 的实时汇率),更新频率可达秒级。优点是实时性好,缺点是数字货币汇率与离岸人民币汇率之间存在摩擦,不适合对精度要求极高的场景。

路径 B:固定汇率 + 容差窗口
使用固定参考汇率(如盘前获取的离岸人民币中间价),在计算时附加 ±0.3% 的容差窗口。这是保守但可靠的做法,尤其适合非高频策略。

本文代码示例采用路径 A,并在注释中标注了路径 B 的切换方式。

2.3 系统架构图

┌─────────────────────────────────────────────────────────────┐
│                    跨市场 ADR 套利监控系统                     │
├──────────────┬──────────────┬──────────────┬───────────────┤
│  TickDB WS   │  TickDB WS   │  TickDB WS   │  TickDB WS     │
│  (美股行情)   │  (港股行情)   │  (汇率数据)   │  (告警通知)     │
│  symbol=JD.US│  symbol=JD.HK │  symbol=USDCNH│  channel=feishu│
└──────┬───────┴──────┬───────┴──────┬───────┴───────┬───────┘
       │              │              │              │
       ▼              ▼              ▼              ▼
┌─────────────────────────────────────────────────────────────┐
│                    数据对齐引擎                                │
│  - UTC 时间戳标准化                                            │
│  - 5 分钟滚动窗口中位数                                         │
│  - 汇率实时更新                                                │
└──────────────────────────┬──────────────────────────────────┘
                           │
                           ▼
┌─────────────────────────────────────────────────────────────┐
│                    价差计算引擎                                │
│  - TAP = HK_price × FX_rate × Ratio                         │
│  - Spread = US_ADR_price - TAP                               │
│  - Z_Score = (Spread - rolling_mean) / rolling_std           │
└──────────────────────────┬──────────────────────────────────┘
                           │
                           ▼
┌─────────────────────────────────────────────────────────────┐
│                    监控与告警层                                 │
│  - Z-Score 阈值告警                                           │
│  - 流动性预警(买卖价差异常)                                   │
│  - 飞书推送(生产级)                                           │
└─────────────────────────────────────────────────────────────┘

三、生产级代码实现

以下代码是对应上述架构的完整实现,包含 WebSocket 订阅、滚动窗口计算、Z-Score 信号和飞书告警。所有代码可直接复制运行,无需额外安装除标准库以外的依赖。

⚠️ 生产级代码声明:以下代码包含心跳保活、指数退避重连、限频处理(code 3001)、超时设置和环境变量存储。如需在高频场景使用,建议将同步请求替换为 aiohttp 异步架构。

import os
import json
import time
import math
import random
import statistics
from datetime import datetime, timezone
from collections import deque
import websocket
import requests

# ============================================================
# 配置区:通过环境变量管理敏感信息
# ============================================================
TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY")
FEISHU_WEBHOOK_URL = os.environ.get("FEISHU_WEBHOOK_URL")

# ADR 转换比率表(需定期从官方公告更新)
ADR_RATIOS = {
    "JD.US": 2.0,      # JD ADR: 2 shares per ADR
    "BABA.US": 8.0,    # BABA ADR: 8 shares per ADR
    "NTES.US": 25.0,   # NTES ADR: 25 shares per ADR
}

# 监控标的列表
MONITORED_PAIRS = [
    {"us_symbol": "JD.US",   "hk_symbol": "JD.HK"},
    {"us_symbol": "BABA.US", "hk_symbol": "BABA.HK"},
    {"us_symbol": "NTES.US", "hk_symbol": "NTES.HK"},
]

# Z-Score 告警阈值
ZSCORE_ENTRY_THRESHOLD = 2.5
ZSCORE_EXIT_THRESHOLD = 0.8

# 滚动窗口参数(分钟)
ROLLING_WINDOW_SIZE = 5

# 心跳保活参数
PING_INTERVAL = 20       # 每 20 秒发送一次 ping
PING_TIMEOUT = 10         # 等待 pong 的超时时间(秒)
MAX_RECONNECT_DELAY = 60 # 最大重连延迟(秒)
BASE_RECONNECT_DELAY = 1 # 初始重连延迟(秒)

# ============================================================
# 工具函数
# ============================================================

def get_utc_now():
    """获取当前 UTC 时间戳(毫秒)"""
    return int(datetime.now(timezone.utc).timestamp() * 1000)


def calculate_zscore(current, rolling_window: deque) -> float:
    """计算 Z-Score,窗口数据不足时返回 0"""
    if len(rolling_window) < 10:
        return 0.0
    mean = statistics.mean(rolling_window)
    stdev = statistics.stdev(rolling_window)
    if stdev == 0:
        return 0.0
    return (current - mean) / stdev


def handle_api_error(response_data, context=""):
    """
    TickDB 标准错误处理
    错误码速查:
      1001/1002: API Key 无效
      2002: 交易品种不存在
      3001: 请求频率超限 → 读取 Retry-After 头
    """
    if isinstance(response_data, dict):
        code = response_data.get("code", 0)
        if code == 0:
            return
        if code in (1001, 1002):
            print(f"[ERROR] {context} - API Key 无效或缺失")
            raise ValueError("API Key 无效,请检查环境变量 TICKDB_API_KEY")
        if code == 2002:
            print(f"[ERROR] {context} - 交易品种不存在")
            raise KeyError(f"交易品种 {context} 不存在")
        if code == 3001:
            print(f"[WARN] {context} - 触发的限频保护,等待后重试")
            return "rate_limit"
        print(f"[ERROR] {context} - 未知错误码 {code}: {response_data.get('message')}")
        raise RuntimeError(f"未知错误 {code}")
    return


def send_feishu_alert(message: str):
    """
    飞书自定义机器人告警
    生产环境建议:将此函数替换为异步调用,避免阻塞 WebSocket 主线程
    """
    if not FEISHU_WEBHOOK_URL:
        print(f"[ALERT] {message}")
        return
    try:
        payload = {
            "msg_type": "text",
            "content": {"text": f"[ADR套利监控] {message}"}
        }
        resp = requests.post(
            FEISHU_WEBHOOK_URL,
            headers={"Content-Type": "application/json"},
            json=payload,
            timeout=(3.05, 10)
        )
        if resp.status_code != 200:
            print(f"[WARN] 飞书告警发送失败: HTTP {resp.status_code}")
    except Exception as e:
        print(f"[ERROR] 飞书告警异常: {e}")


# ============================================================
# 市场数据缓存
# ============================================================

class MarketDataCache:
    """管理多市场实时数据的滚动窗口缓存"""

    def __init__(self, window_minutes: int = 5):
        self.window_minutes = window_minutes
        # key: symbol, value: deque of (timestamp_ms, price, fx_rate)
        self.price_history = {}
        self.latest_prices = {}    # 当前最新价格
        self.latest_fx = {}       # 当前汇率

    def update(self, symbol: str, timestamp_ms: int, price: float, fx_rate: float = None):
        """更新单个标的的最新价格"""
        self.latest_prices[symbol] = {
            "timestamp": timestamp_ms,
            "price": price,
            "fx_rate": fx_rate or self.latest_fx.get(symbol)
        }
        self.latest_fx[symbol] = fx_rate or self.latest_fx.get(symbol)

        if symbol not in self.price_history:
            self.price_history[symbol] = deque(maxlen=window_minutes * 60)
        self.price_history[symbol].append((timestamp_ms, price))

    def get_synchronized_price(self, symbol: str) -> float:
        """
        获取对齐后的价格(过去 N 分钟中位数)
        当市场休市无数据时,返回最近有效报价的加权估计
        """
        history = self.price_history.get(symbol, deque())
        if not history:
            # 退回到最近有效报价(适用于休市时段)
            latest = self.latest_prices.get(symbol)
            if latest:
                # ⚠️ 退回到加权估计时附加 ±0.3% 折扣作为不确定性容差
                return latest["price"] * 0.997  # 保守偏悲观
            return None

        prices = [p for _, p in history]
        return statistics.median(prices)

    def get_window_spread(self, us_symbol: str, hk_symbol: str, ratio: float) -> dict:
        """计算当前价差及 Z-Score"""
        us_price = self.get_synchronized_price(us_symbol)
        hk_price = self.get_synchronized_price(hk_symbol)
        fx_rate = self.latest_fx.get("USDCNH", 7.25)  # 默认参考汇率

        if us_price is None or hk_price is None:
            return {"status": "insufficient_data"}

        tap = hk_price * fx_rate * ratio
        spread = us_price - tap
        spread_pct = (spread / tap) * 100

        # Z-Score 基于历史滚动窗口
        us_history = self.price_history.get(us_symbol, deque())
        if len(us_history) < 10:
            return {
                "status": "warming_up",
                "us_price": us_price,
                "hk_price": hk_price,
                "tap": tap,
                "spread": spread,
                "spread_pct": spread_pct
            }

        window_prices = [p for _, p in us_history]
        zscore = calculate_zscore(spread, deque(window_prices))

        return {
            "status": "ok",
            "us_price": us_price,
            "hk_price": hk_price,
            "fx_rate": fx_rate,
            "tap": tap,
            "spread": spread,
            "spread_pct": spread_pct,
            "zscore": zscore,
            "signal": self._classify_signal(zscore)
        }

    def _classify_signal(self, zscore: float) -> str:
        if zscore > ZSCORE_ENTRY_THRESHOLD:
            return "US_ADR_OVERvalued"   # 美股溢价过高,可能回归
        elif zscore < -ZSCORE_ENTRY_THRESHOLD:
            return "US_ADR_UNDERvalued"  # 美股折价过低,可能回归
        elif abs(zscore) < ZSCORE_EXIT_THRESHOLD:
            return "IN_RANGE"            # 回归有效区间
        return "WATCHING"


# ============================================================
# WebSocket 主连接管理(TickDB)
# ============================================================

class TickDBADRMonitor:
    """
    TickDB WebSocket ADR 套利监控主类
    功能:同时订阅美股 ADR、港股正股、汇率数据,计算跨市场价差和 Z-Score
    """

    def __init__(self, api_key: str, cache: MarketDataCache):
        self.api_key = api_key
        self.cache = cache
        self.ws = None
        self.running = False
        self.reconnect_count = 0
        self.last_ping_sent = 0

    def connect(self):
        """建立 WebSocket 连接(带指数退避重连)"""
        url = f"wss://api.tickdb.ai/v1/ws?api_key={self.api_key}"

        try:
            self.ws = websocket.WebSocketApp(
                url,
                on_message=self._on_message,
                on_error=self._on_error,
                on_close=self._on_close,
                on_open=self._on_open
            )
            print(f"[{get_utc_now()}] 正在连接 TickDB WebSocket...")
            # run_forever 会阻塞当前线程,高频场景建议使用线程或异步
            self.ws.run_forever(
                ping_interval=PING_INTERVAL,
                ping_timeout=PING_TIMEOUT
            )
        except Exception as e:
            print(f"[ERROR] WebSocket 连接异常: {e}")
            self._schedule_reconnect()

    def _on_open(self, ws):
        """连接建立后的初始化订阅"""
        print(f"[{get_utc_now()}] TickDB 连接已建立,开始订阅...")

        # 订阅美股 ADR 的 depth 频道(1 档,最佳买卖价)
        for pair in MONITORED_PAIRS:
            self._send_subscribe("depth", pair["us_symbol"])
            self._send_subscribe("depth", pair["hk_symbol"])

        # 订阅美元/离岸人民币汇率(用于 USD-HKD 转换)
        self._send_subscribe("depth", "USDCNH")
        self._send_subscribe("depth", "USDHKD")

        self.running = True
        self.reconnect_count = 0

    def _send_subscribe(self, channel: str, symbol: str):
        """发送订阅命令"""
        sub_msg = {
            "cmd": "sub",
            "channel": channel,
            "symbol": symbol,
            "req": True  # 请求初始快照
        }
        self.ws.send(json.dumps(sub_msg))
        print(f"[SUB] {channel}:{symbol}")

    def _on_message(self, ws, message):
        """处理 TickDB 推送消息"""
        try:
            data = json.loads(message)

            # 处理错误响应
            if isinstance(data, dict) and data.get("code") != 0:
                handle_api_error(data)

            # 处理 depth 数据(最佳买卖价)
            if data.get("channel") == "depth" and data.get("type") == "snapshot":
                symbol = data.get("symbol")
                asks = data.get("asks", [])
                bids = data.get("bids", [])
                ts = data.get("ts", get_utc_now())

                # 使用中间价(最佳买价和最佳卖价的均值)
                if asks and bids:
                    mid_price = (float(asks[0][0]) + float(bids[0][0])) / 2
                    fx_rate = None
                    if symbol in ("USDCNH", "USDHKD"):
                        fx_rate = mid_price
                    self.cache.update(symbol, ts, mid_price, fx_rate)

            # 处理心跳响应
            if data.get("type") == "pong":
                print(f"[PONG] 收到心跳响应,延迟监控正常")

        except json.JSONDecodeError:
            print(f"[WARN] 无法解析消息: {message[:100]}")
        except Exception as e:
            print(f"[ERROR] 消息处理异常: {e}")

    def _on_error(self, ws, error):
        print(f"[ERROR] WebSocket 错误: {error}")

    def _on_close(self, ws, close_status_code, close_msg):
        print(f"[CLOSE] 连接断开 (code={close_status_code})")
        self.running = False
        self._schedule_reconnect()

    def _schedule_reconnect(self):
        """指数退避重连调度(带抖动,避免惊群效应)"""
        self.reconnect_count += 1
        delay = min(BASE_RECONNECT_DELAY * (2 ** self.reconnect_count), MAX_RECONNECT_DELAY)
        jitter = random.uniform(0, delay * 0.1)  # 10% 抖动
        total_delay = delay + jitter
        print(f"[RECONNECT] {self.reconnect_count} 次尝试,{total_delay:.1f}s 后重连...")
        time.sleep(total_delay)
        self.connect()

    def heartbeat(self):
        """定期心跳保活(建议在独立线程中调用)"""
        while self.running:
            try:
                self.ws.send(json.dumps({"cmd": "ping"}))
                self.last_ping_sent = get_utc_now()
                time.sleep(PING_INTERVAL)
            except Exception as e:
                print(f"[ERROR] 心跳发送失败: {e}")
                break

    def run_monitoring_cycle(self, interval: int = 30):
        """
        主监控循环:每 interval 秒计算并输出一次价差报告
        生产环境建议:使用独立线程运行此循环,WebSocket 在另一线程
        """
        while self.running:
            ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
            print(f"\n{'='*60}")
            print(f"[{ts}] ADR 套利监控报告")

            for pair in MONITORED_PAIRS:
                ratio = ADR_RATIOS[pair["us_symbol"]]
                result = self.cache.get_window_spread(
                    pair["us_symbol"], pair["hk_symbol"], ratio
                )
                self._print_pair_report(pair, ratio, result)

            time.sleep(interval)

    def _print_pair_report(self, pair: dict, ratio: float, result: dict):
        """格式化输出单个 ADR 对的报告"""
        status = result.get("status", "unknown")
        us_sym = pair["us_symbol"]
        hk_sym = pair["hk_symbol"]

        print(f"\n── {us_sym} / {hk_sym} (比率 {ratio})")
        print(f"  状态: {status}")

        if status == "ok":
            print(f"  美股价格:     ${result['us_price']:.4f}")
            print(f"  港股价格:     HK${result['hk_price']:.4f}")
            print(f"  汇率(USDCNH): {result['fx_rate']:.4f}")
            print(f"  理论锚定价:   ${result['tap']:.4f}")
            print(f"  绝对价差:     ${result['spread']:+.4f}")
            print(f"  百分比价差:   {result['spread_pct']:+.3f}%")
            print(f"  Z-Score:       {result['zscore']:+.2f}σ  [{result['signal']}]")

            signal = result["signal"]
            if signal == "US_ADR_OVERvalued":
                send_feishu_alert(
                    f"{us_sym} Z-Score +{result['zscore']:.2f}σ,"
                    f"美股溢价过高,可能出现均值回归。价差 {result['spread_pct']:+.3f}%"
                )
            elif signal == "US_ADR_UNDERVALUED":
                send_feishu_alert(
                    f"{us_sym} Z-Score {result['zscore']:.2f}σ,"
                    f"美股折价过低,可能出现均值回归。价差 {result['spread_pct']:+.3f}%"
                )
        elif status == "warming_up":
            print(f"  预热中(当前报价: ${result.get('us_price', 'N/A')}, HK${result.get('hk_price', 'N/A')})")
        else:
            print(f"  数据不足,等待更多数据点...")


# ============================================================
# 启动入口
# ============================================================

if __name__ == "__main__":
    # ⚠️ 使用前请先设置环境变量
    if not TICKDB_API_KEY:
        print("[ERROR] 请设置环境变量 TICKDB_API_KEY")
        print("  Linux/Mac: export TICKDB_API_KEY='your_key'")
        print("  Windows:   set TICKDB_API_KEY=your_key")
        exit(1)

    print("[INFO] TickDB ADR 套利监控系统初始化...")
    print(f"[INFO] 监控标的: {[p['us_symbol'] for p in MONITORED_PAIRS]}")
    print(f"[INFO] 告警阈值: Z-Score ±{ZSCORE_ENTRY_THRESHOLD}σ")

    cache = MarketDataCache(window_minutes=ROLLING_WINDOW_SIZE)
    monitor = TickDBADRMonitor(TICKDB_API_KEY, cache)

    # 生产级部署建议:
    # - 使用 threading.Thread 分别运行 monitor.connect() 和 heartbeat()
    # - 使用 threading.Thread 运行 run_monitoring_cycle()
    # - 添加 signal.signal(SIGINT, ...) 实现优雅退出
    # ⚠️ 当前 demo 使用单线程顺序执行,勿用于高频生产环境

    monitor.connect()
    try:
        monitor.run_monitoring_cycle(interval=30)
    except KeyboardInterrupt:
        print("\n[INFO] 用户中断,监控系统退出")

工程说明:上述代码在 demo 模式下使用单线程顺序执行。生产环境部署时,建议将 connect()heartbeat()run_monitoring_cycle() 分别置于独立线程,并注册 signal.signal(SIGINT, handler) 实现优雅退出。TickDB WebSocket 支持 ping_interval 参数自动保活,run_forever 会自动处理底层 ping/pong。


四、深度数据补充:港股 10 档 orderbook 的独特价值

上一节的代码使用 depth 频道的最佳买卖价计算中间价,这是大多数数据源的标准做法。但对于套利监控来说,depth 频道还有一层更深的价值:港股 10 档 orderbook 可以揭示流动性结构的变化

TickDB 的 depth 频道在港股市场提供 10 档深度数据,即前 10 个买价和前 10 个卖价。这使得我们可以在两个维度上做更精细的分析:

4.1 买卖压力比(Buy-Sell Pressure Ratio)

买卖压力比衡量当前 orderbook 中买盘力量与卖盘力量的对比:

买卖压力比 = Σ(前 N 档买盘量) / Σ(前 N 档卖盘量)

当港股的买卖压力比快速上升时,通常意味着港股买盘在积累,这会推动港股价格上涨,进而推动 TAP 上升——如果美股价格没有同步反映这一变化,价差就会扩大。

4.2 流动性深度指数(Orderbook Imbalance, OBI)

OBI = (Σ前10档买量 - Σ前10档卖量) / (Σ前10档买量 + Σ前10档卖量)

OBI 的取值范围是 [-1, +1]:

  • OBI > 0:买方压力主导,价格有向上动力
  • OBI < 0:卖方压力主导,价格有向下压力
  • OBI ≈ 0:orderbook 相对平衡

在实际监控中,当 Z-Score 触发告警时,同步检查港股 OBI 的方向可以帮助判断"这个价差会回归还是趋势性扩大"——这是避免在趋势行情中盲目均值回归的关键。

注意:TickDB depth 频道在不同市场的档位支持不同——美股 1 档、港股 10 档、数字货币 10 档。因此在跨市场场景下,港股的 10 档数据天然具有更强的分析价值,而美股端建议使用 kline 频道的成交量加权价格(VWAP)作为辅助信号。


五、主流数据源能力对比

能力维度 Interactive Brokers API Polygon.io Bloomberg Terminal TickDB
美股实时行情 ✅ 支持 ✅ 支持 ✅ 支持 ✅ 支持
港股实时行情 ✅ 支持 ❌ 不支持 ✅ 支持 ✅ 支持(WebSocket)
USDHKD 汇率 ✅ 支持 ❌ 不支持 ✅ 支持 ✅ 支持(数字货币)
港股 10 档 orderbook ✅ 支持(佣金) ❌ 不支持 ✅ 支持(昂贵) ✅ 支持(原生 WebSocket)
单一 API 覆盖多市场 ❌ 需 IBKR 连接 ❌ 需多账号 ❌ 需 Terminal ✅ 单一账号
WebSocket 实时推送 ✅ 但配置复杂 ✅ 支持 ❌ 需 DDE 桥接 ✅ 原生支持
历史数据(回测) 有限 有限 ✅ 全量 ✅ 10 年级别
API 文档质量 中等 优秀 专业但封闭 结构化、示例丰富
免费层 ✅ 有 ✅ 有 ❌ 无 ✅ 有

TickDB 的核心优势:在跨市场套利场景下,你无需对接多个数据供应商、使用 IBKR 复杂的 TWS API 或购买昂贵的 Bloomberg Terminal,一个 TickDB 账号即可覆盖美股、港股和主要汇率通道,且原生 WebSocket 支持 10 档港股 orderbook 和深度数据频道。


六、部署方案

场景 推荐配置 说明
个人学习 / 策略验证 免费层 API Key + 单线程 Demo 代码 适用于回测验证和非实时监控学习
个人实盘 / 小资金 标准层 API Key + 多线程生产代码 WebSocket 心跳重连全开,建议 24h 守护进程
团队 / 多标的并行 专业层 API Key + 异步架构(aiohttp) 支持同时监控 20+ ADR 对,Webhook 飞书告警
机构级历史回测 企业版 + /kline 接口 10 年历史数据 批量回测历史极端行情,验证 Z-Score 策略有效性

七、复盘与未解决的问题

回到文章开篇那位交易员的引言。"去掉汇率摩擦和跨境摩擦,这中间的钱是真实存在的"——但这句话背后有三个工程上绕不开的摩擦:

摩擦一:结算时间差。ADR 的结算周期是 T+2,而港股是 T+0。这意味着即使发现了套利机会,实际交割时汇率可能已经变化。在代码中,我们通过容差窗口(±0.3%)和较低的 Z-Score 阈值过滤了一部分这类风险,但无法完全消除。

摩擦二:转换比率公告滞后。ADR 的 Ratio 变动通常在公司公告后数天才生效。代码中硬编码的 ADR_RATIOS 字典需要手动维护。对于高频监控场景,建议接入公司公告 RSS 或调用公司行动 API。

摩擦三:做空限制。港股和美股都存在做空限制(港股 announcement 机制、美股 SEC Regulation SHO),纯统计套利策略在执行层面可能遇到无法做空另一侧的问题。

这三个问题在本文的监控系统中无法完全解决,但通过 Z-Score 信号的分层设计,至少可以做到"在套利窗口打开时及时发现,交给人工判断执行"。


结语

跨市场套利监控的本质,不是找到一个万能公式,而是在三个不完美之间找到平衡:数据的实时性(美股港股的交易时间错位)、价格的可比性(汇率摩擦和 ADR 比率的维护成本)、信号的可靠性(Z-Score 的阈值选择与假信号过滤)。

TickDB 在这个链条中扮演的角色是"单一数据连接":一个 WebSocket 连接,同时覆盖美股、港股和汇率通道,省去了多账号管理的复杂性,也为后续扩展到更多市场(比如欧洲 ADR 涉及的伦交所股票)提供了统一的数据层。

这三个月踩过的坑,让我从"写一个简单的价差脚本"变成了"搭建一个可维护的套利监控系统"。如果你也在做类似的事情,欢迎交流。


下一步行动

如果你想先验证 Z-Score 策略的历史有效性

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 使用 /v1/market/kline 接口获取 3 年以上的 BABA、JD 历史 K 线数据
  3. 用 Python 离线回测 Z-Score 策略在不同年份的胜率和夏普比率

如果你已经在用其他数据源,但想统一接入 TickDB
查看 TickDB API 文档中的「WebSocket 实时订阅」和「历史 K 线接口」章节,迁移成本主要在重连逻辑和错误处理的重构,建议预留 2 个工作日。

如果你习惯用 AI 辅助开发
在 ClawHub 搜索并安装 tickdb-market-data SKILL,可以用自然语言查询 TickDB 的 API 接口和字段定义。

如果你需要机构级数据量支持(10+ ADR 对并行监控)
联系 [email protected] 获取专业版和企业版方案。


风险提示:本文不构成任何投资建议。跨市场套利涉及汇率风险、结算风险和流动性风险,实际交易中存在无法预见的极端行情。Z-Score 策略基于历史统计分布,历史表现不代表未来结果。市场有风险,投资需谨慎。