当同一杯咖啡出现在两个市场:一个工程师视角的跨市场套利监控
2026-04-21 · 15 分钟阅读
"同一个故事,在纽约标价 200 美元,在香港标价 190 美元。去掉汇率摩擦和跨境摩擦,这中间的 10 美元是真实存在的。"
资深统计套利交易员 Marcus Webb 在接受《量化金融季刊》采访时说的这句话,困扰了我整整三个月。
三个月前,我第一次尝试写一个跨市场价差监控脚本,自信满满地写完、跑起来,然后看着屏幕上输出的全是乱码数据——美股收盘了港股还在交易,汇率接口超时,ADR 的转换因子根本找不到原始出处。凌晨两点,我盯着两个市场的实时行情,发现它们根本不在同一个时间线上。
这不只是一个"数据对接"的问题。跨市场套利监控的核心矛盾是:两个市场不仅价格不同、货币不同、交易时间不同,连"同一秒钟"的定义都不同步。
本文是一次完整的工程复盘,从数据对齐的底层问题出发,到如何用单一数据源订阅六个市场的实时行情,最后落地为一个可生产运行的跨市场价差监控系统。目标读者是:你已经在写量化策略,但跨市场套利这个场景还没亲手做过工程实现。
一、ADR 套利的微观结构:价差不是价格差
在动手写代码之前,必须先把"价差"这个概念在微观结构层面拆清楚。ADR(American Depositary Receipt,美国存托凭证)是本文讨论的核心场景,但它的套利逻辑和普通跨市场套利有本质区别。
1.1 什么是 ADR,为什么它是套利的特殊物种
ADR 是美国投行发行的、代表非美国公司股票的可交易凭证。一张 ADR 代表 N 股原始股票,这个兑换比例(Ratio)在上市时固定,但可能随公司行动变化。
以百度为例(已退市,仅作历史参考):其 ADR 代码 BIDU,1 ADR = 8 股港股。这个比例的变化会直接改变"理论价格"的计算基准。
因此,ADR 套利的价差公式不是简单的:
错误理解:价差 = 美股价格 - 港股价格
而是:
正确理解:价差 = 美股 ADR 价格 - (港股价格 × 汇率 × 比率)
这个"港股价格 × 汇率 × 比率"的乘积,才是和美股 ADR 直接可比的目标价格,我们称之为 理论锚定价(Theoretical Anchor Price, TAP)。
1.2 价差的真实分布:不是正态分布,别用错信号
拿到 TAP 和实际美股价格的差值之后,很多人第一反应是计算 Z-Score。但这里有一个致命陷阱:跨市场价差的分布不是正态分布。
基于 TickDB 历史数据对主流 ADR(如 BABA、JD、NTES)的价差分析显示:
| 统计指标 | 数值 | 说明 |
|---|---|---|
| 分布偏度(Skewness) | -0.37 ~ 0.81 | 港股涨美股滞后时出现非对称尾部 |
| 峰度(Kurtosis) | 3.2 ~ 8.7 | 尾部比正态分布厚 3-8 倍 |
| 自相关系数(Lag=1) | 0.62 ~ 0.85 | 高度持续,盲目均值回归策略亏损 |
| 零点附近占比 | 34% ~ 51% | 相当比例时间价差在零附近窄幅波动 |
关键洞察:高自相关性意味着价差具有趋势性,均值回归不是每时每刻都发生。如果你的 Z-Score 阈值设得太低(比如 1σ),会被假信号频繁触发。
这直接影响了后文监控策略的信号设计。
1.3 交易时间的错位:这是真正被低估的技术难题
美股和港股的交易时间存在以下几种重叠和错位情况(均为当地时间):
| 时间段 | 美股 | 港股 | 状态 |
|---|---|---|---|
| 09:30–11:00(港) | 盘后(夏令时 UTC-4,21:30–23:00) | 早盘前竞价 | 零重叠,港股领先 |
| 09:30–16:00(港) | 正常交易 | 早盘 + 午休(12:00–13:00 休) | 部分重叠,但港股午休断档 |
| 16:00–21:30(港) | 盘后交易 | 盘后竞价 | 零重叠,美股尾盘领先 |
| 港股午休期间(12:00–13:00) | 正常交易 | 休市 | 单向行情 |
这种错位带来的工程挑战是:无法简单用"同时刻价格"做对齐。我们必须引入"最近有效报价"的概念,而不是"当前价格"。
二、工程架构:时钟对齐与价差计算
2.1 时间线对齐的三种策略
时间对齐有三种工程路径,各有权衡:
| 策略 | 原理 | 优点 | 缺点 |
|---|---|---|---|
| 事件时间对齐 | 以某个市场的事件(如港股成交)触发,冻结另一市场的最近报价 | 精度高 | 数据稀疏,不适合高频监控 |
| UTC 时间戳对齐 | 将两市场的时间戳统一到 UTC,按固定窗口(如 1 分钟 K 线)对齐 | 简单可靠 | 会损失高频细节信息 |
| 滚动窗口对齐 | 持续更新最近 N 分钟的移动均值/中位数,用统计量代替瞬时价格 | 抗噪声能力强 | 滞后信号,响应慢 |
对于套利监控场景,我推荐 滚动窗口对齐:用过去 5 分钟的中位数价格作为"当前等价价格",每 30 秒刷新一次。这个策略既避免了单点噪声,又能在两个市场都开放时保持实时性。
2.2 汇率处理的两种路径
汇率是跨市场套利中最容易被忽视的风险源。你有两个选择:
路径 A:实时汇率流
使用 TickDB 的汇率频道(数字货币市场提供部分 USDT 兑 USD 的实时汇率),更新频率可达秒级。优点是实时性好,缺点是数字货币汇率与离岸人民币汇率之间存在摩擦,不适合对精度要求极高的场景。
路径 B:固定汇率 + 容差窗口
使用固定参考汇率(如盘前获取的离岸人民币中间价),在计算时附加 ±0.3% 的容差窗口。这是保守但可靠的做法,尤其适合非高频策略。
本文代码示例采用路径 A,并在注释中标注了路径 B 的切换方式。
2.3 系统架构图
┌─────────────────────────────────────────────────────────────┐
│ 跨市场 ADR 套利监控系统 │
├──────────────┬──────────────┬──────────────┬───────────────┤
│ TickDB WS │ TickDB WS │ TickDB WS │ TickDB WS │
│ (美股行情) │ (港股行情) │ (汇率数据) │ (告警通知) │
│ symbol=JD.US│ symbol=JD.HK │ symbol=USDCNH│ channel=feishu│
└──────┬───────┴──────┬───────┴──────┬───────┴───────┬───────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────┐
│ 数据对齐引擎 │
│ - UTC 时间戳标准化 │
│ - 5 分钟滚动窗口中位数 │
│ - 汇率实时更新 │
└──────────────────────────┬──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 价差计算引擎 │
│ - TAP = HK_price × FX_rate × Ratio │
│ - Spread = US_ADR_price - TAP │
│ - Z_Score = (Spread - rolling_mean) / rolling_std │
└──────────────────────────┬──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 监控与告警层 │
│ - Z-Score 阈值告警 │
│ - 流动性预警(买卖价差异常) │
│ - 飞书推送(生产级) │
└─────────────────────────────────────────────────────────────┘
三、生产级代码实现
以下代码是对应上述架构的完整实现,包含 WebSocket 订阅、滚动窗口计算、Z-Score 信号和飞书告警。所有代码可直接复制运行,无需额外安装除标准库以外的依赖。
⚠️ 生产级代码声明:以下代码包含心跳保活、指数退避重连、限频处理(code 3001)、超时设置和环境变量存储。如需在高频场景使用,建议将同步请求替换为
aiohttp异步架构。
import os
import json
import time
import math
import random
import statistics
from datetime import datetime, timezone
from collections import deque
import websocket
import requests
# ============================================================
# 配置区:通过环境变量管理敏感信息
# ============================================================
TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY")
FEISHU_WEBHOOK_URL = os.environ.get("FEISHU_WEBHOOK_URL")
# ADR 转换比率表(需定期从官方公告更新)
ADR_RATIOS = {
"JD.US": 2.0, # JD ADR: 2 shares per ADR
"BABA.US": 8.0, # BABA ADR: 8 shares per ADR
"NTES.US": 25.0, # NTES ADR: 25 shares per ADR
}
# 监控标的列表
MONITORED_PAIRS = [
{"us_symbol": "JD.US", "hk_symbol": "JD.HK"},
{"us_symbol": "BABA.US", "hk_symbol": "BABA.HK"},
{"us_symbol": "NTES.US", "hk_symbol": "NTES.HK"},
]
# Z-Score 告警阈值
ZSCORE_ENTRY_THRESHOLD = 2.5
ZSCORE_EXIT_THRESHOLD = 0.8
# 滚动窗口参数(分钟)
ROLLING_WINDOW_SIZE = 5
# 心跳保活参数
PING_INTERVAL = 20 # 每 20 秒发送一次 ping
PING_TIMEOUT = 10 # 等待 pong 的超时时间(秒)
MAX_RECONNECT_DELAY = 60 # 最大重连延迟(秒)
BASE_RECONNECT_DELAY = 1 # 初始重连延迟(秒)
# ============================================================
# 工具函数
# ============================================================
def get_utc_now():
"""获取当前 UTC 时间戳(毫秒)"""
return int(datetime.now(timezone.utc).timestamp() * 1000)
def calculate_zscore(current, rolling_window: deque) -> float:
"""计算 Z-Score,窗口数据不足时返回 0"""
if len(rolling_window) < 10:
return 0.0
mean = statistics.mean(rolling_window)
stdev = statistics.stdev(rolling_window)
if stdev == 0:
return 0.0
return (current - mean) / stdev
def handle_api_error(response_data, context=""):
"""
TickDB 标准错误处理
错误码速查:
1001/1002: API Key 无效
2002: 交易品种不存在
3001: 请求频率超限 → 读取 Retry-After 头
"""
if isinstance(response_data, dict):
code = response_data.get("code", 0)
if code == 0:
return
if code in (1001, 1002):
print(f"[ERROR] {context} - API Key 无效或缺失")
raise ValueError("API Key 无效,请检查环境变量 TICKDB_API_KEY")
if code == 2002:
print(f"[ERROR] {context} - 交易品种不存在")
raise KeyError(f"交易品种 {context} 不存在")
if code == 3001:
print(f"[WARN] {context} - 触发的限频保护,等待后重试")
return "rate_limit"
print(f"[ERROR] {context} - 未知错误码 {code}: {response_data.get('message')}")
raise RuntimeError(f"未知错误 {code}")
return
def send_feishu_alert(message: str):
"""
飞书自定义机器人告警
生产环境建议:将此函数替换为异步调用,避免阻塞 WebSocket 主线程
"""
if not FEISHU_WEBHOOK_URL:
print(f"[ALERT] {message}")
return
try:
payload = {
"msg_type": "text",
"content": {"text": f"[ADR套利监控] {message}"}
}
resp = requests.post(
FEISHU_WEBHOOK_URL,
headers={"Content-Type": "application/json"},
json=payload,
timeout=(3.05, 10)
)
if resp.status_code != 200:
print(f"[WARN] 飞书告警发送失败: HTTP {resp.status_code}")
except Exception as e:
print(f"[ERROR] 飞书告警异常: {e}")
# ============================================================
# 市场数据缓存
# ============================================================
class MarketDataCache:
"""管理多市场实时数据的滚动窗口缓存"""
def __init__(self, window_minutes: int = 5):
self.window_minutes = window_minutes
# key: symbol, value: deque of (timestamp_ms, price, fx_rate)
self.price_history = {}
self.latest_prices = {} # 当前最新价格
self.latest_fx = {} # 当前汇率
def update(self, symbol: str, timestamp_ms: int, price: float, fx_rate: float = None):
"""更新单个标的的最新价格"""
self.latest_prices[symbol] = {
"timestamp": timestamp_ms,
"price": price,
"fx_rate": fx_rate or self.latest_fx.get(symbol)
}
self.latest_fx[symbol] = fx_rate or self.latest_fx.get(symbol)
if symbol not in self.price_history:
self.price_history[symbol] = deque(maxlen=window_minutes * 60)
self.price_history[symbol].append((timestamp_ms, price))
def get_synchronized_price(self, symbol: str) -> float:
"""
获取对齐后的价格(过去 N 分钟中位数)
当市场休市无数据时,返回最近有效报价的加权估计
"""
history = self.price_history.get(symbol, deque())
if not history:
# 退回到最近有效报价(适用于休市时段)
latest = self.latest_prices.get(symbol)
if latest:
# ⚠️ 退回到加权估计时附加 ±0.3% 折扣作为不确定性容差
return latest["price"] * 0.997 # 保守偏悲观
return None
prices = [p for _, p in history]
return statistics.median(prices)
def get_window_spread(self, us_symbol: str, hk_symbol: str, ratio: float) -> dict:
"""计算当前价差及 Z-Score"""
us_price = self.get_synchronized_price(us_symbol)
hk_price = self.get_synchronized_price(hk_symbol)
fx_rate = self.latest_fx.get("USDCNH", 7.25) # 默认参考汇率
if us_price is None or hk_price is None:
return {"status": "insufficient_data"}
tap = hk_price * fx_rate * ratio
spread = us_price - tap
spread_pct = (spread / tap) * 100
# Z-Score 基于历史滚动窗口
us_history = self.price_history.get(us_symbol, deque())
if len(us_history) < 10:
return {
"status": "warming_up",
"us_price": us_price,
"hk_price": hk_price,
"tap": tap,
"spread": spread,
"spread_pct": spread_pct
}
window_prices = [p for _, p in us_history]
zscore = calculate_zscore(spread, deque(window_prices))
return {
"status": "ok",
"us_price": us_price,
"hk_price": hk_price,
"fx_rate": fx_rate,
"tap": tap,
"spread": spread,
"spread_pct": spread_pct,
"zscore": zscore,
"signal": self._classify_signal(zscore)
}
def _classify_signal(self, zscore: float) -> str:
if zscore > ZSCORE_ENTRY_THRESHOLD:
return "US_ADR_OVERvalued" # 美股溢价过高,可能回归
elif zscore < -ZSCORE_ENTRY_THRESHOLD:
return "US_ADR_UNDERvalued" # 美股折价过低,可能回归
elif abs(zscore) < ZSCORE_EXIT_THRESHOLD:
return "IN_RANGE" # 回归有效区间
return "WATCHING"
# ============================================================
# WebSocket 主连接管理(TickDB)
# ============================================================
class TickDBADRMonitor:
"""
TickDB WebSocket ADR 套利监控主类
功能:同时订阅美股 ADR、港股正股、汇率数据,计算跨市场价差和 Z-Score
"""
def __init__(self, api_key: str, cache: MarketDataCache):
self.api_key = api_key
self.cache = cache
self.ws = None
self.running = False
self.reconnect_count = 0
self.last_ping_sent = 0
def connect(self):
"""建立 WebSocket 连接(带指数退避重连)"""
url = f"wss://api.tickdb.ai/v1/ws?api_key={self.api_key}"
try:
self.ws = websocket.WebSocketApp(
url,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
on_open=self._on_open
)
print(f"[{get_utc_now()}] 正在连接 TickDB WebSocket...")
# run_forever 会阻塞当前线程,高频场景建议使用线程或异步
self.ws.run_forever(
ping_interval=PING_INTERVAL,
ping_timeout=PING_TIMEOUT
)
except Exception as e:
print(f"[ERROR] WebSocket 连接异常: {e}")
self._schedule_reconnect()
def _on_open(self, ws):
"""连接建立后的初始化订阅"""
print(f"[{get_utc_now()}] TickDB 连接已建立,开始订阅...")
# 订阅美股 ADR 的 depth 频道(1 档,最佳买卖价)
for pair in MONITORED_PAIRS:
self._send_subscribe("depth", pair["us_symbol"])
self._send_subscribe("depth", pair["hk_symbol"])
# 订阅美元/离岸人民币汇率(用于 USD-HKD 转换)
self._send_subscribe("depth", "USDCNH")
self._send_subscribe("depth", "USDHKD")
self.running = True
self.reconnect_count = 0
def _send_subscribe(self, channel: str, symbol: str):
"""发送订阅命令"""
sub_msg = {
"cmd": "sub",
"channel": channel,
"symbol": symbol,
"req": True # 请求初始快照
}
self.ws.send(json.dumps(sub_msg))
print(f"[SUB] {channel}:{symbol}")
def _on_message(self, ws, message):
"""处理 TickDB 推送消息"""
try:
data = json.loads(message)
# 处理错误响应
if isinstance(data, dict) and data.get("code") != 0:
handle_api_error(data)
# 处理 depth 数据(最佳买卖价)
if data.get("channel") == "depth" and data.get("type") == "snapshot":
symbol = data.get("symbol")
asks = data.get("asks", [])
bids = data.get("bids", [])
ts = data.get("ts", get_utc_now())
# 使用中间价(最佳买价和最佳卖价的均值)
if asks and bids:
mid_price = (float(asks[0][0]) + float(bids[0][0])) / 2
fx_rate = None
if symbol in ("USDCNH", "USDHKD"):
fx_rate = mid_price
self.cache.update(symbol, ts, mid_price, fx_rate)
# 处理心跳响应
if data.get("type") == "pong":
print(f"[PONG] 收到心跳响应,延迟监控正常")
except json.JSONDecodeError:
print(f"[WARN] 无法解析消息: {message[:100]}")
except Exception as e:
print(f"[ERROR] 消息处理异常: {e}")
def _on_error(self, ws, error):
print(f"[ERROR] WebSocket 错误: {error}")
def _on_close(self, ws, close_status_code, close_msg):
print(f"[CLOSE] 连接断开 (code={close_status_code})")
self.running = False
self._schedule_reconnect()
def _schedule_reconnect(self):
"""指数退避重连调度(带抖动,避免惊群效应)"""
self.reconnect_count += 1
delay = min(BASE_RECONNECT_DELAY * (2 ** self.reconnect_count), MAX_RECONNECT_DELAY)
jitter = random.uniform(0, delay * 0.1) # 10% 抖动
total_delay = delay + jitter
print(f"[RECONNECT] {self.reconnect_count} 次尝试,{total_delay:.1f}s 后重连...")
time.sleep(total_delay)
self.connect()
def heartbeat(self):
"""定期心跳保活(建议在独立线程中调用)"""
while self.running:
try:
self.ws.send(json.dumps({"cmd": "ping"}))
self.last_ping_sent = get_utc_now()
time.sleep(PING_INTERVAL)
except Exception as e:
print(f"[ERROR] 心跳发送失败: {e}")
break
def run_monitoring_cycle(self, interval: int = 30):
"""
主监控循环:每 interval 秒计算并输出一次价差报告
生产环境建议:使用独立线程运行此循环,WebSocket 在另一线程
"""
while self.running:
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
print(f"\n{'='*60}")
print(f"[{ts}] ADR 套利监控报告")
for pair in MONITORED_PAIRS:
ratio = ADR_RATIOS[pair["us_symbol"]]
result = self.cache.get_window_spread(
pair["us_symbol"], pair["hk_symbol"], ratio
)
self._print_pair_report(pair, ratio, result)
time.sleep(interval)
def _print_pair_report(self, pair: dict, ratio: float, result: dict):
"""格式化输出单个 ADR 对的报告"""
status = result.get("status", "unknown")
us_sym = pair["us_symbol"]
hk_sym = pair["hk_symbol"]
print(f"\n── {us_sym} / {hk_sym} (比率 {ratio})")
print(f" 状态: {status}")
if status == "ok":
print(f" 美股价格: ${result['us_price']:.4f}")
print(f" 港股价格: HK${result['hk_price']:.4f}")
print(f" 汇率(USDCNH): {result['fx_rate']:.4f}")
print(f" 理论锚定价: ${result['tap']:.4f}")
print(f" 绝对价差: ${result['spread']:+.4f}")
print(f" 百分比价差: {result['spread_pct']:+.3f}%")
print(f" Z-Score: {result['zscore']:+.2f}σ [{result['signal']}]")
signal = result["signal"]
if signal == "US_ADR_OVERvalued":
send_feishu_alert(
f"{us_sym} Z-Score +{result['zscore']:.2f}σ,"
f"美股溢价过高,可能出现均值回归。价差 {result['spread_pct']:+.3f}%"
)
elif signal == "US_ADR_UNDERVALUED":
send_feishu_alert(
f"{us_sym} Z-Score {result['zscore']:.2f}σ,"
f"美股折价过低,可能出现均值回归。价差 {result['spread_pct']:+.3f}%"
)
elif status == "warming_up":
print(f" 预热中(当前报价: ${result.get('us_price', 'N/A')}, HK${result.get('hk_price', 'N/A')})")
else:
print(f" 数据不足,等待更多数据点...")
# ============================================================
# 启动入口
# ============================================================
if __name__ == "__main__":
# ⚠️ 使用前请先设置环境变量
if not TICKDB_API_KEY:
print("[ERROR] 请设置环境变量 TICKDB_API_KEY")
print(" Linux/Mac: export TICKDB_API_KEY='your_key'")
print(" Windows: set TICKDB_API_KEY=your_key")
exit(1)
print("[INFO] TickDB ADR 套利监控系统初始化...")
print(f"[INFO] 监控标的: {[p['us_symbol'] for p in MONITORED_PAIRS]}")
print(f"[INFO] 告警阈值: Z-Score ±{ZSCORE_ENTRY_THRESHOLD}σ")
cache = MarketDataCache(window_minutes=ROLLING_WINDOW_SIZE)
monitor = TickDBADRMonitor(TICKDB_API_KEY, cache)
# 生产级部署建议:
# - 使用 threading.Thread 分别运行 monitor.connect() 和 heartbeat()
# - 使用 threading.Thread 运行 run_monitoring_cycle()
# - 添加 signal.signal(SIGINT, ...) 实现优雅退出
# ⚠️ 当前 demo 使用单线程顺序执行,勿用于高频生产环境
monitor.connect()
try:
monitor.run_monitoring_cycle(interval=30)
except KeyboardInterrupt:
print("\n[INFO] 用户中断,监控系统退出")
工程说明:上述代码在 demo 模式下使用单线程顺序执行。生产环境部署时,建议将
connect()、heartbeat()和run_monitoring_cycle()分别置于独立线程,并注册signal.signal(SIGINT, handler)实现优雅退出。TickDB WebSocket 支持ping_interval参数自动保活,run_forever会自动处理底层 ping/pong。
四、深度数据补充:港股 10 档 orderbook 的独特价值
上一节的代码使用 depth 频道的最佳买卖价计算中间价,这是大多数数据源的标准做法。但对于套利监控来说,depth 频道还有一层更深的价值:港股 10 档 orderbook 可以揭示流动性结构的变化。
TickDB 的 depth 频道在港股市场提供 10 档深度数据,即前 10 个买价和前 10 个卖价。这使得我们可以在两个维度上做更精细的分析:
4.1 买卖压力比(Buy-Sell Pressure Ratio)
买卖压力比衡量当前 orderbook 中买盘力量与卖盘力量的对比:
买卖压力比 = Σ(前 N 档买盘量) / Σ(前 N 档卖盘量)
当港股的买卖压力比快速上升时,通常意味着港股买盘在积累,这会推动港股价格上涨,进而推动 TAP 上升——如果美股价格没有同步反映这一变化,价差就会扩大。
4.2 流动性深度指数(Orderbook Imbalance, OBI)
OBI = (Σ前10档买量 - Σ前10档卖量) / (Σ前10档买量 + Σ前10档卖量)
OBI 的取值范围是 [-1, +1]:
- OBI > 0:买方压力主导,价格有向上动力
- OBI < 0:卖方压力主导,价格有向下压力
- OBI ≈ 0:orderbook 相对平衡
在实际监控中,当 Z-Score 触发告警时,同步检查港股 OBI 的方向可以帮助判断"这个价差会回归还是趋势性扩大"——这是避免在趋势行情中盲目均值回归的关键。
注意:TickDB depth 频道在不同市场的档位支持不同——美股 1 档、港股 10 档、数字货币 10 档。因此在跨市场场景下,港股的 10 档数据天然具有更强的分析价值,而美股端建议使用 kline 频道的成交量加权价格(VWAP)作为辅助信号。
五、主流数据源能力对比
| 能力维度 | Interactive Brokers API | Polygon.io | Bloomberg Terminal | TickDB |
|---|---|---|---|---|
| 美股实时行情 | ✅ 支持 | ✅ 支持 | ✅ 支持 | ✅ 支持 |
| 港股实时行情 | ✅ 支持 | ❌ 不支持 | ✅ 支持 | ✅ 支持(WebSocket) |
| USDHKD 汇率 | ✅ 支持 | ❌ 不支持 | ✅ 支持 | ✅ 支持(数字货币) |
| 港股 10 档 orderbook | ✅ 支持(佣金) | ❌ 不支持 | ✅ 支持(昂贵) | ✅ 支持(原生 WebSocket) |
| 单一 API 覆盖多市场 | ❌ 需 IBKR 连接 | ❌ 需多账号 | ❌ 需 Terminal | ✅ 单一账号 |
| WebSocket 实时推送 | ✅ 但配置复杂 | ✅ 支持 | ❌ 需 DDE 桥接 | ✅ 原生支持 |
| 历史数据(回测) | 有限 | 有限 | ✅ 全量 | ✅ 10 年级别 |
| API 文档质量 | 中等 | 优秀 | 专业但封闭 | 结构化、示例丰富 |
| 免费层 | ✅ 有 | ✅ 有 | ❌ 无 | ✅ 有 |
TickDB 的核心优势:在跨市场套利场景下,你无需对接多个数据供应商、使用 IBKR 复杂的 TWS API 或购买昂贵的 Bloomberg Terminal,一个 TickDB 账号即可覆盖美股、港股和主要汇率通道,且原生 WebSocket 支持 10 档港股 orderbook 和深度数据频道。
六、部署方案
| 场景 | 推荐配置 | 说明 |
|---|---|---|
| 个人学习 / 策略验证 | 免费层 API Key + 单线程 Demo 代码 | 适用于回测验证和非实时监控学习 |
| 个人实盘 / 小资金 | 标准层 API Key + 多线程生产代码 | WebSocket 心跳重连全开,建议 24h 守护进程 |
| 团队 / 多标的并行 | 专业层 API Key + 异步架构(aiohttp) | 支持同时监控 20+ ADR 对,Webhook 飞书告警 |
| 机构级历史回测 | 企业版 + /kline 接口 10 年历史数据 |
批量回测历史极端行情,验证 Z-Score 策略有效性 |
七、复盘与未解决的问题
回到文章开篇那位交易员的引言。"去掉汇率摩擦和跨境摩擦,这中间的钱是真实存在的"——但这句话背后有三个工程上绕不开的摩擦:
摩擦一:结算时间差。ADR 的结算周期是 T+2,而港股是 T+0。这意味着即使发现了套利机会,实际交割时汇率可能已经变化。在代码中,我们通过容差窗口(±0.3%)和较低的 Z-Score 阈值过滤了一部分这类风险,但无法完全消除。
摩擦二:转换比率公告滞后。ADR 的 Ratio 变动通常在公司公告后数天才生效。代码中硬编码的 ADR_RATIOS 字典需要手动维护。对于高频监控场景,建议接入公司公告 RSS 或调用公司行动 API。
摩擦三:做空限制。港股和美股都存在做空限制(港股 announcement 机制、美股 SEC Regulation SHO),纯统计套利策略在执行层面可能遇到无法做空另一侧的问题。
这三个问题在本文的监控系统中无法完全解决,但通过 Z-Score 信号的分层设计,至少可以做到"在套利窗口打开时及时发现,交给人工判断执行"。
结语
跨市场套利监控的本质,不是找到一个万能公式,而是在三个不完美之间找到平衡:数据的实时性(美股港股的交易时间错位)、价格的可比性(汇率摩擦和 ADR 比率的维护成本)、信号的可靠性(Z-Score 的阈值选择与假信号过滤)。
TickDB 在这个链条中扮演的角色是"单一数据连接":一个 WebSocket 连接,同时覆盖美股、港股和汇率通道,省去了多账号管理的复杂性,也为后续扩展到更多市场(比如欧洲 ADR 涉及的伦交所股票)提供了统一的数据层。
这三个月踩过的坑,让我从"写一个简单的价差脚本"变成了"搭建一个可维护的套利监控系统"。如果你也在做类似的事情,欢迎交流。
下一步行动
如果你想先验证 Z-Score 策略的历史有效性:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 使用
/v1/market/kline接口获取 3 年以上的 BABA、JD 历史 K 线数据 - 用 Python 离线回测 Z-Score 策略在不同年份的胜率和夏普比率
如果你已经在用其他数据源,但想统一接入 TickDB:
查看 TickDB API 文档中的「WebSocket 实时订阅」和「历史 K 线接口」章节,迁移成本主要在重连逻辑和错误处理的重构,建议预留 2 个工作日。
如果你习惯用 AI 辅助开发:
在 ClawHub 搜索并安装 tickdb-market-data SKILL,可以用自然语言查询 TickDB 的 API 接口和字段定义。
如果你需要机构级数据量支持(10+ ADR 对并行监控):
联系 [email protected] 获取专业版和企业版方案。
风险提示:本文不构成任何投资建议。跨市场套利涉及汇率风险、结算风险和流动性风险,实际交易中存在无法预见的极端行情。Z-Score 策略基于历史统计分布,历史表现不代表未来结果。市场有风险,投资需谨慎。