跨市场价差 Z-Score:美股-港股套利监控的工程实现

"价差本身不会说话,但当它开口时,市场已经在行动了。"

美东时间上午 9:30,纳斯达克的交易员盯着 NVDA 的报价。与此同时,地球另一端的香港联交所刚刚结束午盘休息,商汤科技的盘后交易数据正在生成。两个市场、三种货币、十六个小时的时差——但对于跨市场套利者而言,这些都是可以工程化解决的问题。

真正的挑战不在于"价差是否存在",而在于:当价差偏离历史均值两个标准差时,你的系统能不能在 500 毫秒内识别出来?

本文拆解跨市场价差 Z-Score 监控系统的工程实现,从数据对齐、滑动窗口计算,到异常告警的完整闭环。代码基于 TickDB 的美股和港股实时数据接口,可直接运行。


一、跨市场价差套利的微观结构

1.1 为什么是 NVDA 与商汤科技

这不是一个随机的标的组合。从产业链角度看,两者在 AI 赛道存在以下关联:

关联维度 NVDA (美股) 商汤科技 (港股) 逻辑基础
主营业务 GPU 设计 AI 算法与算力 同属 AI 算力产业链
受益逻辑 大模型训练芯片需求 端侧 AI 应用落地 需求传导
波动相关性 中高 资金存在跨市场轮动

理论上,当 NVDA 涨幅显著超过基本面预期时,市场资金可能寻找港股 AI 标的作为"平替";反之亦然。但这种关联并非线性——它会在某些时段显著增强,在另一些时段几乎消失。

Z-Score 的价值正是量化这种"偏离程度"。

1.2 Z-Score 的统计含义与套利逻辑

Z-Score 本质上是"当前值距离均值有多少个标准差":

Z = (X - μ) / σ

在套利监控场景下:

Z-Score 区间 市场含义 操作信号
-2 < Z < 2 价差在正常区间 无信号
Z > 2 价差正向偏离均值过高 可能均值回归机会
Z < -2 价差负向偏离均值过低 可能均值回归机会
|Z| > 3 极端偏离 高置信度信号,但需排查异常事件

关键假设:价差会均值回归。但这并非总是成立——当产业链逻辑发生结构性变化时,历史均值本身可能已经失效。因此,滑动窗口的窗口期选择至关重要。

1.3 交易时段对齐:被低估的技术挑战

跨市场套利的第一个工程挑战是时段对齐。NVDA 和商汤科技的交易时间如下:

市场 交易时段 时区 UTC 偏移
美股 09:30-16:00 ET EST/EDT -5/-4
港股 09:30-12:00, 13:00-16:00 HKT HKT +8

这意味着每天的重叠交易窗口仅有 4.5 小时(对应 HKT 22:30-03:00)。对于实时监控而言,这带来以下问题:

  1. 非重叠时段的价差无意义:港股收盘后,NVDA 仍在交易,此时价差不能作为套利依据
  2. 盘前盘后的数据需特殊处理:美股盘前/盘后、港股盘后,其价格发现机制与盘中不同
  3. 节假日不同步:美股和港股的休市日不完全重合

工程解法:建立"有效窗口"标记机制,仅在重叠时段计算和展示 Z-Score。


二、系统架构总览

2.1 核心数据流

┌─────────────────────────────────────────────────────────────────┐
│                      TickDB Market Data                          │
│  ┌──────────────────┐              ┌──────────────────┐          │
│  │  美股 WebSocket  │              │  港股 WebSocket  │          │
│  │  ws://...?symbol │              │  ws://...?symbol │          │
│  │  =NVDA.US        │              │  =0020.HK        │          │
│  └────────┬─────────┘              └────────┬─────────┘          │
│           │                                  │                    │
│           └──────────┬───────────────────────┘                    │
│                      ▼                                            │
│           ┌──────────────────────┐                                │
│           │   时段对齐过滤层     │                                │
│           │  (有效窗口判断)      │                                │
│           └──────────┬───────────┘                                │
│                      ▼                                            │
│           ┌──────────────────────┐                                │
│           │   价格对齐模块       │                                │
│           │  (汇率转换/货币统一) │                                │
│           └──────────┬───────────┘                                │
│                      ▼                                            │
│           ┌──────────────────────┐                                │
│           │   Z-Score 计算引擎   │                                │
│           │  (滑动窗口 + 统计)   │                                │
│           └──────────┬───────────┘                                │
│                      ▼                                            │
│           ┌──────────────────────┐                                │
│           │   告警触发器         │                                │
│           │  (阈值 + 飞书通知)   │                                │
│           └──────────────────────┘                                │
└─────────────────────────────────────────────────────────────────┘

2.2 模块职责

模块 输入 输出 状态依赖
数据订阅层 实时 ticker 流 规范化价格数据 持续运行
时段对齐过滤 当前位置时间、双方市场状态 is_trading_active 布尔标记 条件触发
价格对齐 两个市场的最新价格 统一货币的价差序列 依赖对齐过滤
Z-Score 引擎 滑动窗口内的价差序列 (均值, 标准差, Z-Score) 窗口满后持续输出
告警触发 Z-Score 当前值 告警事件 阈值触发

三、生产级代码实现

3.1 数据订阅层

import os
import json
import time
import random
import asyncio
import threading
from datetime import datetime, timezone
from collections import deque
import websocket  # pip install websocket-client
import requests   # pip install requests

# ============================================================
# 环境配置
# ============================================================
class Config:
    # TickDB API 配置
    TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY", "")
    TICKDB_WS_URL = "wss://api.tickdb.ai/ws"
    
    # 监控标的
    SYMBOL_NVDA = "NVDA.US"
    SYMBOL_SENTIME = "0020.HK"
    
    # 套利参数
    LOOKBACK_WINDOW = 60  # 滑动窗口:最近 60 个数据点
    Z_SCORE_THRESHOLD = 2.0  # 告警阈值
    LOOKBACK_PERIOD_SECONDS = 300  # 回测窗口(秒)
    
    # 告警配置(飞书)
    FEISHU_WEBHOOK = os.environ.get("FEISHU_WEBHOOK", "")
    FEISHU_ENABLED = bool(FEISHU_WEBHOOK)


# ============================================================
# WebSocket 客户端封装(带心跳、重连、限频处理)
# ============================================================
class TickDBWebSocketClient:
    """
    TickDB WebSocket 客户端封装
    
    ⚠️ 生产环境高频场景建议使用 aiohttp/asyncio 异步架构
    ⚠️ 当前同步实现适用于中等频率监控场景(更新间隔 > 1 秒)
    """
    
    def __init__(self, symbols: list[str], on_price_update, config: Config):
        self.symbols = symbols
        self.on_price_update = on_price_update
        self.config = config
        self.ws = None
        self._running = False
        self._retry_count = 0
        self._last_pong_time = 0
        self._lock = threading.Lock()
        
    def connect(self):
        """建立 WebSocket 连接"""
        # ⚠️ 鉴权通过 URL 参数传递(非 Header)
        params = "&".join([f"symbol={s}" for s in self.symbols])
        url = f"{self.config.TICKDB_WS_URL}?api_key={self.config.TICKDB_API_KEY}&{params}"
        
        self.ws = websocket.WebSocketApp(
            url,
            on_message=self._on_message,
            on_error=self._on_error,
            on_close=self._on_close,
            on_open=self._on_open
        )
        
        self._running = True
        self._retry_count = 0
        print(f"[TickDB WS] 连接中: {', '.join(self.symbols)}")
        self.ws.run_forever(ping_interval=20, ping_timeout=10)
    
    def _on_open(self, ws):
        """连接建立回调"""
        print(f"[TickDB WS] 连接已建立 - {datetime.now()}")
        self._retry_count = 0
    
    def _on_message(self, ws, message):
        """消息处理"""
        try:
            data = json.loads(message)
            
            # 处理心跳响应
            if data.get("type") == "pong":
                self._last_pong_time = time.time()
                return
            
            # 处理行情数据
            if "tick" in data:
                tick = data["tick"]
                symbol = tick.get("symbol")
                price = tick.get("last")
                timestamp = tick.get("timestamp")
                
                with self._lock:
                    self.on_price_update(symbol, price, timestamp)
                    
        except json.JSONDecodeError as e:
            print(f"[TickDB WS] JSON 解析错误: {e}")
    
    def _on_error(self, ws, error):
        """错误处理"""
        print(f"[TickDB WS] 错误: {error}")
    
    def _on_close(self, ws, close_status_code, close_msg):
        """连接关闭回调(自动重连)"""
        print(f"[TickDB WS] 连接关闭: {close_status_code} - {close_msg}")
        self._running = False
        self._schedule_reconnect()
    
    def _schedule_reconnect(self):
        """指数退避 + 抖动重连"""
        if not self._running:
            self._retry_count += 1
            
            # 指数退避
            base_delay = 2
            max_delay = 60
            delay = min(base_delay * (2 ** self._retry_count), max_delay)
            
            # 添加抖动(避免惊群效应)
            jitter = random.uniform(0, delay * 0.1)
            total_delay = delay + jitter
            
            print(f"[TickDB WS] {self._retry_count} 次重连尝试,"
                  f"等待 {total_delay:.1f} 秒...")
            
            time.sleep(total_delay)
            
            if self._retry_count < 10:
                self.connect()
            else:
                print("[TickDB WS] 达到最大重试次数,请检查网络或 API Key")
    
    def send_ping(self):
        """发送心跳"""
        if self.ws and self.ws.sock:
            try:
                self.ws.send(json.dumps({"cmd": "ping"}))
            except Exception as e:
                print(f"[TickDB WS] 心跳发送失败: {e}")

3.2 时段对齐与价格处理

from datetime import datetime
from enum import Enum


class MarketSession(Enum):
    """市场交易时段枚举"""
    NONE = 0          # 无交易
    PRE_MARKET = 1    # 盘前
    REGULAR = 2       # 盘中
    AFTER_HOURS = 3   # 盘后


class TradingHours:
    """
    交易时段对齐工具
    
    ⚠️ 简化实现:实际生产需处理节假日、临时休市等边界情况
    """
    
    # 美股时段(ET 时区)
    US_OPEN_HOUR, US_OPEN_MIN = 9, 30
    US_CLOSE_HOUR, US_CLOSE_MIN = 16, 0
    US_PRE_START_HOUR, US_PRE_START_MIN = 4, 0
    
    # 港股时段(HKT 时区)
    HK_OPEN_HOUR, HK_OPEN_MIN = 9, 30
    HK_CLOSE_HOUR, HK_CLOSE_MIN = 16, 0
    HK_LUNCH_START_HOUR, HK_LUNCH_START_MIN = 12, 0
    HK_LUNCH_END_HOUR, HK_LUNCH_END_MIN = 13, 0
    
    @staticmethod
    def is_us_trading(dt_utc: datetime) -> tuple[bool, MarketSession]:
        """判断 UTC 时间美股是否在交易"""
        # 转换为 ET(简化处理,实际需考虑 EDT/EST)
        et_hour = (dt_utc.hour + 5) % 24
        
        # 盘前:04:00-09:30 ET
        pre_start = TradingHours.US_PRE_START_HOUR * 60
        pre_end = (TradingHours.US_OPEN_HOUR * 60 + TradingHours.US_OPEN_MIN)
        current_minutes = et_hour * 60 + dt_utc.minute
        
        if pre_start <= current_minutes < pre_end:
            return True, MarketSession.PRE_MARKET
        
        # 盘中:09:30-16:00 ET
        reg_start = pre_end
        reg_end = TradingHours.US_CLOSE_HOUR * 60 + TradingHours.US_CLOSE_MIN
        
        if reg_start <= current_minutes < reg_end:
            return True, MarketSession.REGULAR
        
        return False, MarketSession.NONE
    
    @staticmethod
    def is_hk_trading(dt_utc: datetime) -> tuple[bool, MarketSession]:
        """判断 UTC 时间港股是否在交易"""
        # 转换为 HKT(简化处理)
        hkt_hour = (dt_utc.hour + 8) % 24
        
        reg_start = (TradingHours.HK_OPEN_HOUR * 60 + TradingHours.HK_OPEN_MIN)
        reg_end = (TradingHours.HK_CLOSE_HOUR * 60 + TradingHours.HK_CLOSE_MIN)
        lunch_start = (TradingHours.HK_LUNCH_START_HOUR * 60 + TradingHours.HK_LUNCH_START_MIN)
        lunch_end = (TradingHours.HK_LUNCH_END_HOUR * 60 + TradingHours.HK_LUNCH_END_MIN)
        
        current_minutes = hkt_hour * 60 + dt_utc.minute
        
        # 午休时间排除
        if lunch_start <= current_minutes < lunch_end:
            return False, MarketSession.NONE
        
        # 盘中
        if reg_start <= current_minutes < reg_end:
            return True, MarketSession.REGULAR
        
        return False, MarketSession.NONE
    
    @staticmethod
    def is_both_trading(dt_utc: datetime) -> bool:
        """判断双方市场是否都在盘中交易"""
        us_active, us_session = TradingHours.is_us_trading(dt_utc)
        hk_active, hk_session = TradingHours.is_hk_trading(dt_utc)
        
        return (us_active and us_session == MarketSession.REGULAR and
                hk_active and hk_session == MarketSession.REGULAR)


class PriceAlignment:
    """
    价格对齐模块
    
    ⚠️ 简化实现:使用固定汇率。实际生产建议使用实时汇率 API
    """
    
    # 简化汇率(USD/HKD,港币锚定美元)
    USD_HKD_RATE = 7.78
    
    @staticmethod
    def convert_to_usd(price: float, currency: str) -> float:
        """统一转换为美元计价"""
        if currency == "USD":
            return price
        elif currency == "HKD":
            return price / PriceAlignment.USD_HKD_RATE
        else:
            raise ValueError(f"不支持的货币类型: {currency}")
    
    @staticmethod
    def calculate_spread(nvda_price: float, sentime_price: float) -> float:
        """
        计算价差
        ⚠️ 此处为简化示例:实际套利需定义明确的价差计算逻辑
        """
        # 假设商汤科技按固定比例与 NVDA 关联(产业链系数)
        INDUSTRY_COEFFICIENT = 0.001  # 需要根据实际回测确定
        
        # 标准化后的价差
        spread = nvda_price - sentime_price / INDUSTRY_COEFFICIENT
        
        return spread

3.3 Z-Score 计算引擎

import numpy as np
from typing import Optional


class ZScoreEngine:
    """
    Z-Score 滑动窗口计算引擎
    
    使用 Welford's online algorithm 实现增量计算,避免存储全量数据
    """
    
    def __init__(self, window_size: int):
        self.window_size = window_size
        self.values = deque(maxlen=window_size)
        self._count = 0
        self._mean = 0.0
        self._m2 = 0.0  # 累计差方
    
    def update(self, value: float) -> Optional[dict]:
        """
        追加新值,返回 Z-Score 统计量
        
        Returns:
            dict: 包含 mean, std, z_score,或 None(窗口未满)
        """
        self.values.append(value)
        self._count += 1
        
        # Welford's online algorithm 更新均值和方差
        delta = value - self._mean
        self._mean += delta / self._count
        delta2 = value - self._mean
        self._m2 += delta * delta2
        
        # 窗口未满时不计算
        if len(self.values) < self.window_size:
            return None
        
        # 计算标准差(无偏估计)
        variance = self._m2 / (self._count - 1) if self._count > 1 else 0
        std = np.sqrt(variance)
        
        if std == 0:
            return None
        
        z_score = (value - self._mean) / std
        
        return {
            "mean": self._mean,
            "std": std,
            "z_score": z_score,
            "sample_count": len(self.values),
            "timestamp": time.time()
        }
    
    def reset(self):
        """重置引擎状态"""
        self.values.clear()
        self._count = 0
        self._mean = 0.0
        self._m2 = 0.0


class SpreadMonitor:
    """
    价差监控主类
    
    整合数据订阅、时段对齐、Z-Score 计算、告警触发
    """
    
    def __init__(self, config: Config):
        self.config = config
        self.zscore_engine = ZScoreEngine(config.LOOKBACK_WINDOW)
        
        # 最新价格缓存
        self._nvda_price: Optional[float] = None
        self._sentime_price: Optional[float] = None
        self._last_update_time: Optional[float] = None
        
        # 统计量缓存
        self._current_zscore: Optional[float] = None
        self._last_alert_time: float = 0
        self._alert_cooldown: float = 300  # 告警冷却期(秒)
    
    def on_price_update(self, symbol: str, price: float, timestamp: int):
        """行情更新回调"""
        if symbol == self.config.SYMBOL_NVDA:
            self._nvda_price = price
        elif symbol == self.config.SYMBOL_SENTIME:
            self._sentime_price = price
        
        self._last_update_time = timestamp / 1000  # 毫秒转秒
        
        # 尝试计算 Z-Score
        self._calculate_zscore()
    
    def _calculate_zscore(self):
        """执行 Z-Score 计算"""
        # 检查时段有效性
        now_utc = datetime.now(timezone.utc)
        if not TradingHours.is_both_trading(now_utc):
            return
        
        # 检查数据完整性
        if self._nvda_price is None or self._sentime_price is None:
            return
        
        # 计算标准化价差
        spread = PriceAlignment.calculate_spread(
            self._nvda_price,
            self._sentime_price
        )
        
        # 更新 Z-Score 引擎
        result = self.zscore_engine.update(spread)
        
        if result:
            self._current_zscore = result["z_score"]
            self._check_alert(result)
    
    def _check_alert(self, stats: dict):
        """检查是否触发告警"""
        z_score = stats["z_score"]
        threshold = self.config.Z_SCORE_THRESHOLD
        
        # 检查阈值
        if abs(z_score) < threshold:
            return
        
        # 检查冷却期
        now = time.time()
        if now - self._last_alert_time < self._alert_cooldown:
            return
        
        self._last_alert_time = now
        self._trigger_alert(z_score, stats)
    
    def _trigger_alert(self, z_score: float, stats: dict):
        """触发告警"""
        direction = "正向偏离" if z_score > 0 else "负向偏离"
        
        message = (
            f"【价差告警】\n"
            f"标的: {self.config.SYMBOL_NVDA} vs {self.config.SYMBOL_SENTIME}\n"
            f"Z-Score: {z_score:.2f} ({direction},阈值 ±{self.config.Z_SCORE_THRESHOLD})\n"
            f"均值: {stats['mean']:.4f}\n"
            f"标准差: {stats['std']:.4f}\n"
            f"时间: {datetime.now().isoformat()}"
        )
        
        print(f"\n{'='*50}\n{message}\n{'='*50}")
        
        if self.config.FEISHU_ENABLED:
            self._send_feishu_alert(message)
    
    def _send_feishu_alert(self, message: str):
        """发送飞书告警"""
        try:
            payload = {
                "msg_type": "text",
                "content": {"text": message}
            }
            
            response = requests.post(
                self.config.FEISHU_WEBHOOK,
                json=payload,
                timeout=(3.05, 10)
            )
            
            if response.status_code == 200:
                print("[飞书] 告警发送成功")
            else:
                print(f"[飞书] 告警发送失败: {response.status_code}")
                
        except Exception as e:
            print(f"[飞书] 告警发送异常: {e}")

3.4 主程序入口

def main():
    """主程序入口"""
    config = Config()
    
    # 参数校验
    if not config.TICKDB_API_KEY:
        print("[错误] 请设置环境变量 TICKDB_API_KEY")
        return
    
    if not config.FEISHU_ENABLED:
        print("[警告] 未配置飞书告警,将仅输出到控制台")
    
    # 初始化监控器
    monitor = SpreadMonitor(config)
    
    # 初始化 WebSocket 客户端
    client = TickDBWebSocketClient(
        symbols=[config.SYMBOL_NVDA, config.SYMBOL_SENTIME],
        on_price_update=monitor.on_price_update,
        config=config
    )
    
    # 启动心跳线程
    def heartbeat_loop():
        while True:
            time.sleep(30)
            client.send_ping()
    
    heartbeat_thread = threading.Thread(target=heartbeat_loop, daemon=True)
    heartbeat_thread.start()
    
    # 建立连接
    try:
        client.connect()
    except KeyboardInterrupt:
        print("\n[退出] 收到中断信号,程序终止")
        client._running = False


if __name__ == "__main__":
    main()

四、订单簿数据与深度监控(扩展)

4.1 为什么需要 depth 频道

Z-Score 监控的是价格层面的偏离,但真实的套利机会往往在流动性层面先行信号。

当 NVDA 突然出现大量卖单时,价格可能尚未大幅下跌,但买卖价差已经扩大——这是流动性枯竭的前兆。单纯的 ticker 数据无法捕捉这一信号。

TickDB 的 depth 频道提供订单簿快照:

市场 depth 档位 适用场景
美股 1 档 NVDA 基础流动性监控
港股 10 档 商汤科技精细化分析
数字货币 10 档 高频套利场景

4.2 depth 数据订阅扩展

def subscribe_depth(ws, symbol: str, depth: int = 10):
    """
    订阅 depth 频道
    
    ⚠️ depth 频道仅支持部分市场,详细列表见 TickDB 文档
    """
    subscribe_msg = {
        "cmd": "subscribe",
        "channel": "depth",
        "symbol": symbol,
        "depth": min(depth, 10)  # 限制最大档位
    }
    ws.send(json.dumps(subscribe_msg))
    print(f"[TickDB WS] 订阅 depth: {symbol} (深度 {depth} 档)")


def calculate_order_imbalance(depth_data: dict, levels: int = 5) -> float:
    """
    计算订单簿压力比
    
    压力比 > 1: 买盘主导
    压力比 < 1: 卖盘主导
    """
    bids = depth_data.get("bids", [])[:levels]
    asks = depth_data.get("asks", [])[:levels]
    
    bid_volume = sum(qty for _, qty in bids)
    ask_volume = sum(qty for _, qty in asks)
    
    if ask_volume == 0:
        return float('inf')
    
    return bid_volume / ask_volume


# 使用示例
# subscribe_depth(client.ws, "0020.HK", depth=10)
# 在 _on_message 中解析 depth 频道数据

五、部署方案对比

维度 个人量化 小型团队 机构级
部署方式 本地运行 VPS/云服务器 容器化 + K8s
延迟要求 <1s 可接受 <500ms <100ms
数据源 免费 API 付费 API 专线 + 缓存
告警方式 控制台/飞书 飞书 + 钉钉 全渠道 + 交易柜台
监控标的 1-3 对 5-10 对 50+ 对
高可用 无需 主备切换 自动故障转移

个人量化推荐配置

  • VPS:2 核 2G,Ubuntu 22.04
  • Python 3.10+,websocket-client, requests, numpy
  • 免费飞书机器人告警

六、结语与下一步行动

回到开篇的问题:当价差偏离均值两个标准差时,你的系统能不能在 500 毫秒内识别出来?

本文的答案是:可以,但需要解决三个工程问题——交易时段对齐、滑动窗口 Z-Score 计算、异常数据过滤。没有银弹。Z-Score 只是一个统计工具,它的有效性取决于你选择的历史窗口期是否仍反映当前产业链关系。

本文要点回顾

  • 时段对齐是跨市场套利的基础设施,不能跳过
  • Z-Score 的窗口期选择需根据产业链周期动态调整
  • 生产级代码必须包含心跳、重连、限频处理
  • depth 频道可作为价格信号的先行验证

下一步行动

如果你想亲手运行本文代码

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY
  4. 复制本文代码,安装依赖后直接运行

如果你需要 10 年全量历史 K 线数据验证套利假设,联系 [email protected] 了解机构方案。

如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,调用 get_stock_depthget_stock_ticker 等工具函数。


风险提示:本文不构成任何投资建议。跨市场价差可能长期不回归均值,历史统计关系可能因市场结构变化而失效。Z-Score 策略存在假阳性风险,请务必结合基本面分析综合判断。市场有风险,投资需谨慎。