跨市场价差 Z-Score:美股-港股套利监控的工程实现
"价差本身不会说话,但当它开口时,市场已经在行动了。"
美东时间上午 9:30,纳斯达克的交易员盯着 NVDA 的报价。与此同时,地球另一端的香港联交所刚刚结束午盘休息,商汤科技的盘后交易数据正在生成。两个市场、三种货币、十六个小时的时差——但对于跨市场套利者而言,这些都是可以工程化解决的问题。
真正的挑战不在于"价差是否存在",而在于:当价差偏离历史均值两个标准差时,你的系统能不能在 500 毫秒内识别出来?
本文拆解跨市场价差 Z-Score 监控系统的工程实现,从数据对齐、滑动窗口计算,到异常告警的完整闭环。代码基于 TickDB 的美股和港股实时数据接口,可直接运行。
一、跨市场价差套利的微观结构
1.1 为什么是 NVDA 与商汤科技
这不是一个随机的标的组合。从产业链角度看,两者在 AI 赛道存在以下关联:
| 关联维度 | NVDA (美股) | 商汤科技 (港股) | 逻辑基础 |
|---|---|---|---|
| 主营业务 | GPU 设计 | AI 算法与算力 | 同属 AI 算力产业链 |
| 受益逻辑 | 大模型训练芯片需求 | 端侧 AI 应用落地 | 需求传导 |
| 波动相关性 | 高 | 中高 | 资金存在跨市场轮动 |
理论上,当 NVDA 涨幅显著超过基本面预期时,市场资金可能寻找港股 AI 标的作为"平替";反之亦然。但这种关联并非线性——它会在某些时段显著增强,在另一些时段几乎消失。
Z-Score 的价值正是量化这种"偏离程度"。
1.2 Z-Score 的统计含义与套利逻辑
Z-Score 本质上是"当前值距离均值有多少个标准差":
Z = (X - μ) / σ
在套利监控场景下:
| Z-Score 区间 | 市场含义 | 操作信号 |
|---|---|---|
| -2 < Z < 2 | 价差在正常区间 | 无信号 |
| Z > 2 | 价差正向偏离均值过高 | 可能均值回归机会 |
| Z < -2 | 价差负向偏离均值过低 | 可能均值回归机会 |
| |Z| > 3 | 极端偏离 | 高置信度信号,但需排查异常事件 |
关键假设:价差会均值回归。但这并非总是成立——当产业链逻辑发生结构性变化时,历史均值本身可能已经失效。因此,滑动窗口的窗口期选择至关重要。
1.3 交易时段对齐:被低估的技术挑战
跨市场套利的第一个工程挑战是时段对齐。NVDA 和商汤科技的交易时间如下:
| 市场 | 交易时段 | 时区 | UTC 偏移 |
|---|---|---|---|
| 美股 | 09:30-16:00 ET | EST/EDT | -5/-4 |
| 港股 | 09:30-12:00, 13:00-16:00 HKT | HKT | +8 |
这意味着每天的重叠交易窗口仅有 4.5 小时(对应 HKT 22:30-03:00)。对于实时监控而言,这带来以下问题:
- 非重叠时段的价差无意义:港股收盘后,NVDA 仍在交易,此时价差不能作为套利依据
- 盘前盘后的数据需特殊处理:美股盘前/盘后、港股盘后,其价格发现机制与盘中不同
- 节假日不同步:美股和港股的休市日不完全重合
工程解法:建立"有效窗口"标记机制,仅在重叠时段计算和展示 Z-Score。
二、系统架构总览
2.1 核心数据流
┌─────────────────────────────────────────────────────────────────┐
│ TickDB Market Data │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ 美股 WebSocket │ │ 港股 WebSocket │ │
│ │ ws://...?symbol │ │ ws://...?symbol │ │
│ │ =NVDA.US │ │ =0020.HK │ │
│ └────────┬─────────┘ └────────┬─────────┘ │
│ │ │ │
│ └──────────┬───────────────────────┘ │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ 时段对齐过滤层 │ │
│ │ (有效窗口判断) │ │
│ └──────────┬───────────┘ │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ 价格对齐模块 │ │
│ │ (汇率转换/货币统一) │ │
│ └──────────┬───────────┘ │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ Z-Score 计算引擎 │ │
│ │ (滑动窗口 + 统计) │ │
│ └──────────┬───────────┘ │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ 告警触发器 │ │
│ │ (阈值 + 飞书通知) │ │
│ └──────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
2.2 模块职责
| 模块 | 输入 | 输出 | 状态依赖 |
|---|---|---|---|
| 数据订阅层 | 实时 ticker 流 | 规范化价格数据 | 持续运行 |
| 时段对齐过滤 | 当前位置时间、双方市场状态 | is_trading_active 布尔标记 |
条件触发 |
| 价格对齐 | 两个市场的最新价格 | 统一货币的价差序列 | 依赖对齐过滤 |
| Z-Score 引擎 | 滑动窗口内的价差序列 | (均值, 标准差, Z-Score) | 窗口满后持续输出 |
| 告警触发 | Z-Score 当前值 | 告警事件 | 阈值触发 |
三、生产级代码实现
3.1 数据订阅层
import os
import json
import time
import random
import asyncio
import threading
from datetime import datetime, timezone
from collections import deque
import websocket # pip install websocket-client
import requests # pip install requests
# ============================================================
# 环境配置
# ============================================================
class Config:
# TickDB API 配置
TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY", "")
TICKDB_WS_URL = "wss://api.tickdb.ai/ws"
# 监控标的
SYMBOL_NVDA = "NVDA.US"
SYMBOL_SENTIME = "0020.HK"
# 套利参数
LOOKBACK_WINDOW = 60 # 滑动窗口:最近 60 个数据点
Z_SCORE_THRESHOLD = 2.0 # 告警阈值
LOOKBACK_PERIOD_SECONDS = 300 # 回测窗口(秒)
# 告警配置(飞书)
FEISHU_WEBHOOK = os.environ.get("FEISHU_WEBHOOK", "")
FEISHU_ENABLED = bool(FEISHU_WEBHOOK)
# ============================================================
# WebSocket 客户端封装(带心跳、重连、限频处理)
# ============================================================
class TickDBWebSocketClient:
"""
TickDB WebSocket 客户端封装
⚠️ 生产环境高频场景建议使用 aiohttp/asyncio 异步架构
⚠️ 当前同步实现适用于中等频率监控场景(更新间隔 > 1 秒)
"""
def __init__(self, symbols: list[str], on_price_update, config: Config):
self.symbols = symbols
self.on_price_update = on_price_update
self.config = config
self.ws = None
self._running = False
self._retry_count = 0
self._last_pong_time = 0
self._lock = threading.Lock()
def connect(self):
"""建立 WebSocket 连接"""
# ⚠️ 鉴权通过 URL 参数传递(非 Header)
params = "&".join([f"symbol={s}" for s in self.symbols])
url = f"{self.config.TICKDB_WS_URL}?api_key={self.config.TICKDB_API_KEY}&{params}"
self.ws = websocket.WebSocketApp(
url,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
on_open=self._on_open
)
self._running = True
self._retry_count = 0
print(f"[TickDB WS] 连接中: {', '.join(self.symbols)}")
self.ws.run_forever(ping_interval=20, ping_timeout=10)
def _on_open(self, ws):
"""连接建立回调"""
print(f"[TickDB WS] 连接已建立 - {datetime.now()}")
self._retry_count = 0
def _on_message(self, ws, message):
"""消息处理"""
try:
data = json.loads(message)
# 处理心跳响应
if data.get("type") == "pong":
self._last_pong_time = time.time()
return
# 处理行情数据
if "tick" in data:
tick = data["tick"]
symbol = tick.get("symbol")
price = tick.get("last")
timestamp = tick.get("timestamp")
with self._lock:
self.on_price_update(symbol, price, timestamp)
except json.JSONDecodeError as e:
print(f"[TickDB WS] JSON 解析错误: {e}")
def _on_error(self, ws, error):
"""错误处理"""
print(f"[TickDB WS] 错误: {error}")
def _on_close(self, ws, close_status_code, close_msg):
"""连接关闭回调(自动重连)"""
print(f"[TickDB WS] 连接关闭: {close_status_code} - {close_msg}")
self._running = False
self._schedule_reconnect()
def _schedule_reconnect(self):
"""指数退避 + 抖动重连"""
if not self._running:
self._retry_count += 1
# 指数退避
base_delay = 2
max_delay = 60
delay = min(base_delay * (2 ** self._retry_count), max_delay)
# 添加抖动(避免惊群效应)
jitter = random.uniform(0, delay * 0.1)
total_delay = delay + jitter
print(f"[TickDB WS] {self._retry_count} 次重连尝试,"
f"等待 {total_delay:.1f} 秒...")
time.sleep(total_delay)
if self._retry_count < 10:
self.connect()
else:
print("[TickDB WS] 达到最大重试次数,请检查网络或 API Key")
def send_ping(self):
"""发送心跳"""
if self.ws and self.ws.sock:
try:
self.ws.send(json.dumps({"cmd": "ping"}))
except Exception as e:
print(f"[TickDB WS] 心跳发送失败: {e}")
3.2 时段对齐与价格处理
from datetime import datetime
from enum import Enum
class MarketSession(Enum):
"""市场交易时段枚举"""
NONE = 0 # 无交易
PRE_MARKET = 1 # 盘前
REGULAR = 2 # 盘中
AFTER_HOURS = 3 # 盘后
class TradingHours:
"""
交易时段对齐工具
⚠️ 简化实现:实际生产需处理节假日、临时休市等边界情况
"""
# 美股时段(ET 时区)
US_OPEN_HOUR, US_OPEN_MIN = 9, 30
US_CLOSE_HOUR, US_CLOSE_MIN = 16, 0
US_PRE_START_HOUR, US_PRE_START_MIN = 4, 0
# 港股时段(HKT 时区)
HK_OPEN_HOUR, HK_OPEN_MIN = 9, 30
HK_CLOSE_HOUR, HK_CLOSE_MIN = 16, 0
HK_LUNCH_START_HOUR, HK_LUNCH_START_MIN = 12, 0
HK_LUNCH_END_HOUR, HK_LUNCH_END_MIN = 13, 0
@staticmethod
def is_us_trading(dt_utc: datetime) -> tuple[bool, MarketSession]:
"""判断 UTC 时间美股是否在交易"""
# 转换为 ET(简化处理,实际需考虑 EDT/EST)
et_hour = (dt_utc.hour + 5) % 24
# 盘前:04:00-09:30 ET
pre_start = TradingHours.US_PRE_START_HOUR * 60
pre_end = (TradingHours.US_OPEN_HOUR * 60 + TradingHours.US_OPEN_MIN)
current_minutes = et_hour * 60 + dt_utc.minute
if pre_start <= current_minutes < pre_end:
return True, MarketSession.PRE_MARKET
# 盘中:09:30-16:00 ET
reg_start = pre_end
reg_end = TradingHours.US_CLOSE_HOUR * 60 + TradingHours.US_CLOSE_MIN
if reg_start <= current_minutes < reg_end:
return True, MarketSession.REGULAR
return False, MarketSession.NONE
@staticmethod
def is_hk_trading(dt_utc: datetime) -> tuple[bool, MarketSession]:
"""判断 UTC 时间港股是否在交易"""
# 转换为 HKT(简化处理)
hkt_hour = (dt_utc.hour + 8) % 24
reg_start = (TradingHours.HK_OPEN_HOUR * 60 + TradingHours.HK_OPEN_MIN)
reg_end = (TradingHours.HK_CLOSE_HOUR * 60 + TradingHours.HK_CLOSE_MIN)
lunch_start = (TradingHours.HK_LUNCH_START_HOUR * 60 + TradingHours.HK_LUNCH_START_MIN)
lunch_end = (TradingHours.HK_LUNCH_END_HOUR * 60 + TradingHours.HK_LUNCH_END_MIN)
current_minutes = hkt_hour * 60 + dt_utc.minute
# 午休时间排除
if lunch_start <= current_minutes < lunch_end:
return False, MarketSession.NONE
# 盘中
if reg_start <= current_minutes < reg_end:
return True, MarketSession.REGULAR
return False, MarketSession.NONE
@staticmethod
def is_both_trading(dt_utc: datetime) -> bool:
"""判断双方市场是否都在盘中交易"""
us_active, us_session = TradingHours.is_us_trading(dt_utc)
hk_active, hk_session = TradingHours.is_hk_trading(dt_utc)
return (us_active and us_session == MarketSession.REGULAR and
hk_active and hk_session == MarketSession.REGULAR)
class PriceAlignment:
"""
价格对齐模块
⚠️ 简化实现:使用固定汇率。实际生产建议使用实时汇率 API
"""
# 简化汇率(USD/HKD,港币锚定美元)
USD_HKD_RATE = 7.78
@staticmethod
def convert_to_usd(price: float, currency: str) -> float:
"""统一转换为美元计价"""
if currency == "USD":
return price
elif currency == "HKD":
return price / PriceAlignment.USD_HKD_RATE
else:
raise ValueError(f"不支持的货币类型: {currency}")
@staticmethod
def calculate_spread(nvda_price: float, sentime_price: float) -> float:
"""
计算价差
⚠️ 此处为简化示例:实际套利需定义明确的价差计算逻辑
"""
# 假设商汤科技按固定比例与 NVDA 关联(产业链系数)
INDUSTRY_COEFFICIENT = 0.001 # 需要根据实际回测确定
# 标准化后的价差
spread = nvda_price - sentime_price / INDUSTRY_COEFFICIENT
return spread
3.3 Z-Score 计算引擎
import numpy as np
from typing import Optional
class ZScoreEngine:
"""
Z-Score 滑动窗口计算引擎
使用 Welford's online algorithm 实现增量计算,避免存储全量数据
"""
def __init__(self, window_size: int):
self.window_size = window_size
self.values = deque(maxlen=window_size)
self._count = 0
self._mean = 0.0
self._m2 = 0.0 # 累计差方
def update(self, value: float) -> Optional[dict]:
"""
追加新值,返回 Z-Score 统计量
Returns:
dict: 包含 mean, std, z_score,或 None(窗口未满)
"""
self.values.append(value)
self._count += 1
# Welford's online algorithm 更新均值和方差
delta = value - self._mean
self._mean += delta / self._count
delta2 = value - self._mean
self._m2 += delta * delta2
# 窗口未满时不计算
if len(self.values) < self.window_size:
return None
# 计算标准差(无偏估计)
variance = self._m2 / (self._count - 1) if self._count > 1 else 0
std = np.sqrt(variance)
if std == 0:
return None
z_score = (value - self._mean) / std
return {
"mean": self._mean,
"std": std,
"z_score": z_score,
"sample_count": len(self.values),
"timestamp": time.time()
}
def reset(self):
"""重置引擎状态"""
self.values.clear()
self._count = 0
self._mean = 0.0
self._m2 = 0.0
class SpreadMonitor:
"""
价差监控主类
整合数据订阅、时段对齐、Z-Score 计算、告警触发
"""
def __init__(self, config: Config):
self.config = config
self.zscore_engine = ZScoreEngine(config.LOOKBACK_WINDOW)
# 最新价格缓存
self._nvda_price: Optional[float] = None
self._sentime_price: Optional[float] = None
self._last_update_time: Optional[float] = None
# 统计量缓存
self._current_zscore: Optional[float] = None
self._last_alert_time: float = 0
self._alert_cooldown: float = 300 # 告警冷却期(秒)
def on_price_update(self, symbol: str, price: float, timestamp: int):
"""行情更新回调"""
if symbol == self.config.SYMBOL_NVDA:
self._nvda_price = price
elif symbol == self.config.SYMBOL_SENTIME:
self._sentime_price = price
self._last_update_time = timestamp / 1000 # 毫秒转秒
# 尝试计算 Z-Score
self._calculate_zscore()
def _calculate_zscore(self):
"""执行 Z-Score 计算"""
# 检查时段有效性
now_utc = datetime.now(timezone.utc)
if not TradingHours.is_both_trading(now_utc):
return
# 检查数据完整性
if self._nvda_price is None or self._sentime_price is None:
return
# 计算标准化价差
spread = PriceAlignment.calculate_spread(
self._nvda_price,
self._sentime_price
)
# 更新 Z-Score 引擎
result = self.zscore_engine.update(spread)
if result:
self._current_zscore = result["z_score"]
self._check_alert(result)
def _check_alert(self, stats: dict):
"""检查是否触发告警"""
z_score = stats["z_score"]
threshold = self.config.Z_SCORE_THRESHOLD
# 检查阈值
if abs(z_score) < threshold:
return
# 检查冷却期
now = time.time()
if now - self._last_alert_time < self._alert_cooldown:
return
self._last_alert_time = now
self._trigger_alert(z_score, stats)
def _trigger_alert(self, z_score: float, stats: dict):
"""触发告警"""
direction = "正向偏离" if z_score > 0 else "负向偏离"
message = (
f"【价差告警】\n"
f"标的: {self.config.SYMBOL_NVDA} vs {self.config.SYMBOL_SENTIME}\n"
f"Z-Score: {z_score:.2f} ({direction},阈值 ±{self.config.Z_SCORE_THRESHOLD})\n"
f"均值: {stats['mean']:.4f}\n"
f"标准差: {stats['std']:.4f}\n"
f"时间: {datetime.now().isoformat()}"
)
print(f"\n{'='*50}\n{message}\n{'='*50}")
if self.config.FEISHU_ENABLED:
self._send_feishu_alert(message)
def _send_feishu_alert(self, message: str):
"""发送飞书告警"""
try:
payload = {
"msg_type": "text",
"content": {"text": message}
}
response = requests.post(
self.config.FEISHU_WEBHOOK,
json=payload,
timeout=(3.05, 10)
)
if response.status_code == 200:
print("[飞书] 告警发送成功")
else:
print(f"[飞书] 告警发送失败: {response.status_code}")
except Exception as e:
print(f"[飞书] 告警发送异常: {e}")
3.4 主程序入口
def main():
"""主程序入口"""
config = Config()
# 参数校验
if not config.TICKDB_API_KEY:
print("[错误] 请设置环境变量 TICKDB_API_KEY")
return
if not config.FEISHU_ENABLED:
print("[警告] 未配置飞书告警,将仅输出到控制台")
# 初始化监控器
monitor = SpreadMonitor(config)
# 初始化 WebSocket 客户端
client = TickDBWebSocketClient(
symbols=[config.SYMBOL_NVDA, config.SYMBOL_SENTIME],
on_price_update=monitor.on_price_update,
config=config
)
# 启动心跳线程
def heartbeat_loop():
while True:
time.sleep(30)
client.send_ping()
heartbeat_thread = threading.Thread(target=heartbeat_loop, daemon=True)
heartbeat_thread.start()
# 建立连接
try:
client.connect()
except KeyboardInterrupt:
print("\n[退出] 收到中断信号,程序终止")
client._running = False
if __name__ == "__main__":
main()
四、订单簿数据与深度监控(扩展)
4.1 为什么需要 depth 频道
Z-Score 监控的是价格层面的偏离,但真实的套利机会往往在流动性层面先行信号。
当 NVDA 突然出现大量卖单时,价格可能尚未大幅下跌,但买卖价差已经扩大——这是流动性枯竭的前兆。单纯的 ticker 数据无法捕捉这一信号。
TickDB 的 depth 频道提供订单簿快照:
| 市场 | depth 档位 | 适用场景 |
|---|---|---|
| 美股 | 1 档 | NVDA 基础流动性监控 |
| 港股 | 10 档 | 商汤科技精细化分析 |
| 数字货币 | 10 档 | 高频套利场景 |
4.2 depth 数据订阅扩展
def subscribe_depth(ws, symbol: str, depth: int = 10):
"""
订阅 depth 频道
⚠️ depth 频道仅支持部分市场,详细列表见 TickDB 文档
"""
subscribe_msg = {
"cmd": "subscribe",
"channel": "depth",
"symbol": symbol,
"depth": min(depth, 10) # 限制最大档位
}
ws.send(json.dumps(subscribe_msg))
print(f"[TickDB WS] 订阅 depth: {symbol} (深度 {depth} 档)")
def calculate_order_imbalance(depth_data: dict, levels: int = 5) -> float:
"""
计算订单簿压力比
压力比 > 1: 买盘主导
压力比 < 1: 卖盘主导
"""
bids = depth_data.get("bids", [])[:levels]
asks = depth_data.get("asks", [])[:levels]
bid_volume = sum(qty for _, qty in bids)
ask_volume = sum(qty for _, qty in asks)
if ask_volume == 0:
return float('inf')
return bid_volume / ask_volume
# 使用示例
# subscribe_depth(client.ws, "0020.HK", depth=10)
# 在 _on_message 中解析 depth 频道数据
五、部署方案对比
| 维度 | 个人量化 | 小型团队 | 机构级 |
|---|---|---|---|
| 部署方式 | 本地运行 | VPS/云服务器 | 容器化 + K8s |
| 延迟要求 | <1s 可接受 | <500ms | <100ms |
| 数据源 | 免费 API | 付费 API | 专线 + 缓存 |
| 告警方式 | 控制台/飞书 | 飞书 + 钉钉 | 全渠道 + 交易柜台 |
| 监控标的 | 1-3 对 | 5-10 对 | 50+ 对 |
| 高可用 | 无需 | 主备切换 | 自动故障转移 |
个人量化推荐配置:
- VPS:2 核 2G,Ubuntu 22.04
- Python 3.10+,websocket-client, requests, numpy
- 免费飞书机器人告警
六、结语与下一步行动
回到开篇的问题:当价差偏离均值两个标准差时,你的系统能不能在 500 毫秒内识别出来?
本文的答案是:可以,但需要解决三个工程问题——交易时段对齐、滑动窗口 Z-Score 计算、异常数据过滤。没有银弹。Z-Score 只是一个统计工具,它的有效性取决于你选择的历史窗口期是否仍反映当前产业链关系。
本文要点回顾:
- 时段对齐是跨市场套利的基础设施,不能跳过
- Z-Score 的窗口期选择需根据产业链周期动态调整
- 生产级代码必须包含心跳、重连、限频处理
- depth 频道可作为价格信号的先行验证
下一步行动
如果你想亲手运行本文代码:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台生成 API Key
- 设置环境变量
TICKDB_API_KEY - 复制本文代码,安装依赖后直接运行
如果你需要 10 年全量历史 K 线数据验证套利假设,联系 [email protected] 了解机构方案。
如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,调用 get_stock_depth 和 get_stock_ticker 等工具函数。
风险提示:本文不构成任何投资建议。跨市场价差可能长期不回归均值,历史统计关系可能因市场结构变化而失效。Z-Score 策略存在假阳性风险,请务必结合基本面分析综合判断。市场有风险,投资需谨慎。