当市场失灵时,赚钱的机会就来了

2010 年 5 月 6 日,道琼斯指数在几分钟内暴跌 9%,然后几乎同样迅速地反弹回来。事后人们称之为“闪电崩盘”,媒体热衷于渲染其戏剧性。但量化交易者注意到了另一件事:在那片混乱中,一些历史上的“同涨同跌”兄弟,突然走出了完全不同的轨迹——一个跌 30%,另一个只跌 5%。短暂的价格失调,持续几分钟,然后回归正常。

如果你在那个瞬间持有前一类股票、做空后一类股票,你就是那场崩盘中少数赚钱的人。

这不是运气。这是统计套利的核心逻辑:当资产价格短暂偏离历史均衡关系时,买入被低估的、卖出被高估的,等待均值回归。 在数学上,我们把这种均衡关系叫做“协整”。

问题是:如何系统性地找到这种关系?如何让机器在价差偏离时自动发出信号?

本文给出完整答案:从数千只美股中筛选协整配对,基于 Z-Score 实时监控价差偏离,并在生产环境中部署实时监控代码。


一、协整配对交易:原理与适用场景

1.1 什么是协整,为什么它比相关性更重要

大多数新手会直接用相关性来筛选配对。这个思路有缺陷。

相关性描述的是“两个变量同向或反向移动的趋势”,但它不捕捉“偏离后会不会回归”。两个不相关甚至负相关的序列也可能存在协整关系。

协整描述的是“两个时间序列之间的长期均衡关系”。即使短期内它们各自剧烈波动,只要存在一个线性组合使得残差序列保持平稳,它们就是协整的。

用一个经典的比喻:遛狗的人走得很快,狗有时跑在前面,有时落在后面,但狗不会跑太远——协整关系就是那根狗绳。

在金融市场中,协整关系通常来自:

  • 同行业公司:面对相同的供需基本面,价格驱动因素接近
  • 上下游产业链:成本传导机制导致的定价联动
  • 跨市场 ETF:追踪相同或相似指数的基金

1.2 协整配对交易的收益来源

假设两只股票 A 和 B 存在协整关系,我们可以定义价差(spread):

spread = A - β × B

其中 β 是通过回归得到的对冲比率。

正常情况下,spread 在零附近波动。当某只股票受到临时冲击(财报、新闻、大单)时,spread 短暂偏离。此时:

  • 买入相对低估的股票
  • 卖空相对高估的股票
  • 等待 spread 回归零

收益来源有两层:

  1. 均值回归收益:价差从偏离点回归零
  2. 对冲后的 Alpha:在市场 Beta 中性情况下获取绝对收益

这正是机构量化团队偏爱的策略类型——不依赖大盘方向,在震荡市和分化市中表现稳健。


二、从 5000 只股票中筛选配对:三层漏斗模型

数千只美股全部做协整检验,计算量巨大。我们需要一套高效的筛选漏斗。

2.1 第一层:候选池构建(快速过滤)

排除以下标的,减少无效计算:

  • 流动性不足:日均成交量低于 100 万美元
  • 价格过低:股价低于 $5(容易被操纵,且统计特性不稳定)
  • 特殊状态:停牌、退市、近期 IPO(数据历史不够)
def build_candidate_pool(stocks_df, min_volume=1e6, min_price=5, min_history_days=252):
    """
    构建候选池:流动性 + 价格 + 历史数据三重过滤
    """
    filtered = stocks_df[
        (stocks_df['avg_daily_volume_usd'] >= min_volume) &
        (stocks_df['price'] >= min_price) &
        (stocks_df['history_days'] >= min_history_days)
    ]
    return filtered.index.tolist()

2.2 第二层:相关性初筛(预排序)

计算日收益率的皮尔逊相关系数,保留相关系数 > 0.7 的配对候选。这一步的目的是把完全不相关的标的排除掉,减少后续协整检验的配对数量。

def correlation_filter(returns_df, candidate_pairs, min_corr=0.7):
    """
    收益率相关性初筛
    """
    n = len(candidate_pairs)
    valid_pairs = []
    
    for i in range(n):
        for j in range(i + 1, n):
            stock_a, stock_b = candidate_pairs[i], candidate_pairs[j]
            corr = returns_df[stock_a].corr(returns_df[stock_b])
            if corr > min_corr:
                valid_pairs.append({
                    'stock_a': stock_a,
                    'stock_b': stock_b,
                    'correlation': corr
                })
    
    # 按相关系数降序排列
    return sorted(valid_pairs, key=lambda x: x['correlation'], reverse=True)

为什么不用相关性作为最终筛选标准?
相关系数高不等于协整。2010 年闪电崩盘期间,高相关性的两只股票同时暴跌,spread 彻底失控。只有协整检验能真正捕捉“偏离后会不会回归”这一核心问题。

2.3 第三层:Engle-Granger 两步法协整检验

这是配对筛选的黄金标准,由 Engle 和 Granger 1987 年提出,两位经济学家因此获得诺贝尔奖。

第一步:回归

用最小二乘法(OLS)估计对冲比率 β:

A_t = α + β × B_t + ε_t

第二步:检验残差平稳性

对残差序列 ε_t 进行 ADF(Augmented Dickey-Fuller)检验。如果 p-value < 0.05,拒绝“存在单位根”的原假设,认为残差平稳,协整关系成立。

import numpy as np
from statsmodels.tsa.stattools import adfuller, coint

def engle_granger_test(stock_a_data, stock_b_data):
    """
    Engle-Granger 两步法协整检验
    
    参数:
        stock_a_data: pandas Series,价格序列 A
        stock_b_data: pandas Series,价格序列 B
    
    返回:
        dict: 包含 t-stat、p-value、协整系数 β
    """
    # 方法一:statsmodels 内置的协整检验(自动两步法)
    score, pvalue, _ = coint(stock_a_data, stock_b_data)
    
    # 方法二:手动两步法(展示原理)
    # 第一步:OLS 回归 A = α + βB + ε
    X = np.column_stack([np.ones(len(stock_b_data)), stock_b_data])
    beta = np.linalg.lstsq(X, stock_a_data, rcond=None)[0]
    alpha, hedge_ratio = beta[0], beta[1]
    
    # 第二步:ADF 检验残差
    spread = stock_a_data - hedge_ratio * stock_b_data - alpha
    adf_result = adfuller(spread, maxlag=1, regression='c')
    
    return {
        'pvalue': pvalue,
        'adf_stat': adf_result[0],
        'critical_values': adf_result[4],
        'alpha': alpha,
        'hedge_ratio': hedge_ratio,
        'is_cointegrated': pvalue < 0.05
    }

多周期对比的重要性
协整关系可能随时间失效。建议至少检验三个时间窗口:

  • 最近 60 个交易日(短期)
  • 最近 120 个交易日(中期)
  • 最近 252 个交易日(长期)

只有三个窗口都显著的配对,才进入核心池。


三、价差监控:Z-Score 动态阈值

3.1 Z-Score 的计算方式

找到协整配对后,下一步是定义“价差偏离多少算异常”。

Z-Score 表示当前价差距离历史均值多少个标准差:

Z = (spread - μ) / σ

其中 μ 和 σ 由滚动窗口计算,推荐窗口长度 20-60 天。

class SpreadMonitor:
    """
    协整配对价差实时监控器
    """
    def __init__(self, hedge_ratio, lookback=60):
        self.hedge_ratio = hedge_ratio
        self.lookback = lookback
        self.spread_history = []
    
    def update(self, price_a, price_b):
        """更新价差,计算 Z-Score"""
        spread = price_a - self.hedge_ratio * price_b
        
        # 滑动窗口更新历史
        self.spread_history.append(spread)
        if len(self.spread_history) > self.lookback:
            self.spread_history.pop(0)
        
        if len(self.spread_history) < 20:
            return None
        
        # 计算均值和标准差
        mean = np.mean(self.spread_history)
        std = np.std(self.spread_history)
        
        if std < 1e-8:
            return None
        
        z_score = (spread - mean) / std
        return z_score
    
    def generate_signal(self, z_score, upper_threshold=2.0, lower_threshold=-2.0):
        """基于 Z-Score 生成交易信号"""
        if z_score is None:
            return 'WAITING'
        
        if z_score > upper_threshold:
            return 'SELL_SPREAD'  # 价差过高,卖出 A 做多 B
        elif z_score < lower_threshold:
            return 'BUY_SPREAD'   # 价差过低,做多 A 卖出 B
        else:
            return 'NEUTRAL'

3.2 为什么静态阈值不够用

2 倍标准差的静态阈值在多数场景下有效,但存在一个致命问题:不同配对的波动率差异巨大

EG 能源股与新能源股票的价差波动率,可能是消费股的 3-5 倍。如果用同一阈值,前者频繁触发假信号,后者几乎不触发。

解决方案:波动率自适应阈值

def adaptive_threshold(z_score, current_vol, historical_avg_vol):
    """
    基于波动率调整阈值
    
    当当前波动率高于历史均值时,自动扩大阈值范围
    避免在市场高波动期被噪声触发
    """
    vol_ratio = current_vol / historical_avg_vol
    
    # 波动率放大系数,最小 1.0(不做缩小)
    multiplier = max(1.0, vol_ratio)
    
    base_upper = 2.0
    base_lower = -2.0
    
    return base_upper * multiplier, base_lower * multiplier

3.3 非线性 Z-Score:Hurst 指数辅助判断

传统 Z-Score 假设均值回归必然发生,但历史数据中会出现“价差持续偏离”的情况。

Hurst 指数(H)可以帮助判断均值回归的可靠性:

  • H < 0.5:趋势反转特性,均值回归可信
  • H = 0.5:随机游走,无明确规律
  • H > 0.5:趋势延续,均值回归不可靠
from scipy.stats import linregress

def calculate_hurst_exponent(price_series, max_lag=20):
    """
    计算 Hurst 指数
    
    方法:R/S 分析(重标极差分析)
    """
    lags = range(2, max_lag)
    tau = []
    rpt = []
    
    for lag in lags:
        # 分割成子序列,计算 R/S
        n_subseq = len(price_series) // lag
        rs_values = []
        
        for i in range(n_subseq):
            subseq = price_series[i * lag:(i + 1) * lag]
            mean_sub = np.mean(subseq)
            
            # 累积离差
            cumdev = np.cumsum(subseq - mean_sub)
            R = np.max(cumdev) - np.min(cumdev)
            S = np.std(subseq)
            
            if S > 0:
                rs_values.append(R / S)
        
        if rs_values:
            tau.append(lag)
            rpt.append(np.mean(rs_values))
    
    # 双对数回归:log(R/S) = H * log(n) + c
    slope, _, _, _, _ = linregress(np.log(tau), np.log(rpt))
    return slope

当 Hurst 指数 > 0.55 时,考虑将该配对移入观察名单而非核心池,直到更多数据确认均值回归特性。


四、仓位管理与风险控制

4.1 布林带确认入场时机

纯 Z-Score 信号可能存在假突破。建议用布林带(Bollinger Bands)做二次确认:

  • 当 Z-Score 触发信号时,检查价差是否同时突破布林带
  • 布林带中轨对齐 20 日均值,带宽基于 20 日标准差
  • 双重确认减少假信号
def bollinger_confirm(spread, window=20, num_std=2):
    """布林带确认"""
    if len(spread) < window:
        return False
    
    recent = spread[-window:]
    mean = np.mean(recent)
    std = np.std(recent)
    
    upper_band = mean + num_std * std
    lower_band = mean - num_std * std
    
    current = spread[-1]
    
    return current > upper_band or current < lower_band

4.2 动态止损与时间止损

即使协整检验通过,价差也可能长时间不回归。需要设置双重保险:

价格止损:价差反向移动超过 2 倍 ATR(Average True Range)时平仓

def calculate_atr(entry_spread, current_spread, atr_multiplier=2.0):
    """
    ATR 动态止损
    
    当价差向不利方向移动 2 倍 ATR 时触发止损
    """
    spread_change = abs(current_spread - entry_spread)
    
    # 简化版 ATR:使用历史波动率估算
    # 生产环境建议使用实际 High/Low/Close 数据计算真实波幅
    historical_vol = np.std(entry_spread[-60:]) if len(entry_spread) >= 60 else 0.01
    
    return spread_change > atr_multiplier * historical_vol

时间止损:入场后 10 个交易日价差未回归,强制平仓(即使小幅亏损)

def time_stop_loss(entry_date, current_date, max_holding_days=10):
    """时间止损"""
    holding_days = (current_date - entry_date).days
    return holding_days > max_holding_days

五、回测验证:三大核心指标

5.1 最小回测要求

指标 最低标准 优秀标准
回测周期 1 年 3 年(至少含一个完整牛熊)
配对数量 ≥5 对 ≥15 对(分散单一配对失效风险)
交易次数 ≥50 笔 ≥150 笔(统计显著性)

5.2 核心评估指标

夏普比率(Sharpe Ratio)
年化收益 / 年化波动率。统计套利目标区间:1.5 - 3.0(超过 3.0 需警惕过拟合)。

最大回撤(Max Drawdown)
任意时点账户从峰值到谷值的最大跌幅。机构要求通常 < 15%。

胜率与盈亏比

  • 胜率:盈利交易笔数 / 总交易笔数
  • 平均盈利 / 平均亏损:盈亏比

统计套利通常是“高胜率、中盈亏比”模式,胜率 > 55%、盈亏比 > 1.0 是合理预期。

5.3 回测局限性披露模板

回测局限性说明:上述结果基于历史数据模拟,未完全计入以下因素:实际交易中的滑点与市场冲击成本(已假设 0.05% 固定滑点);流动性枯竭时无法以目标价格成交的风险;协整关系随时间的结构性变化。建议在实盘前至少进行 3 个月的模拟盘验证。


六、生产级监控代码:Python + WebSocket 实战

6.1 系统架构

数据层:TickDB WebSocket 流 → 解析层:价格缓存 + spread 计算 → 决策层:Z-Score + 信号生成 → 告警层:飞书/Webhook

6.2 WebSocket 实时数据订阅

import os
import time
import random
import json
import threading
import logging
from datetime import datetime
import websocket

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# 协整配对配置
COINTEGRATED_PAIRS = {
    'BRK.B.US': {'pair': 'SPY.US', 'hedge_ratio': 3.2, 'lookback': 60},
    'XOM.US': {'pair': 'CVX.US', 'hedge_ratio': 1.1, 'lookback': 60},
}

class SpreadMonitor:
    """协整价差实时监控器"""
    
    def __init__(self, symbol_a, symbol_b, hedge_ratio, lookback=60):
        self.symbol_a = symbol_a
        self.symbol_b = symbol_b
        self.hedge_ratio = hedge_ratio
        
        self.price_a = None
        self.price_b = None
        self.spread_history = []
        self.lookback = lookback
        
        self.threshold_upper = 2.0
        self.threshold_lower = -2.0
    
    def update_price(self, price_a, price_b, timestamp):
        """更新价格并计算 Z-Score"""
        self.price_a = price_a
        self.price_b = price_b
        
        # 计算当前价差
        spread = price_a - self.hedge_ratio * price_b
        self.spread_history.append({'time': timestamp, 'spread': spread})
        
        # 保持滑动窗口
        if len(self.spread_history) > self.lookback:
            self.spread_history.pop(0)
        
        if len(self.spread_history) < 20:
            return None
        
        # 计算 Z-Score
        spreads = [s['spread'] for s in self.spread_history]
        mean = sum(spreads) / len(spreads)
        variance = sum((x - mean) ** 2 for x in spreads) / len(spreads)
        std = variance ** 0.5
        
        if std < 1e-8:
            return None
        
        z_score = (spread - mean) / std
        return z_score
    
    def generate_signal(self, z_score):
        """生成交易信号"""
        if z_score is None:
            return 'WAITING', 0
        
        signal_map = {
            (self.threshold_upper, float('inf')): ('SELL_SPREAD', z_score),
            (-float('inf'), self.threshold_lower): ('BUY_SPREAD', z_score),
        }
        
        for (low, high), (signal, z) in signal_map.items():
            if low <= z_score <= high:
                return signal, z
        
        return 'NEUTRAL', z_score


class TickDBWebSocketClient:
    """TickDB WebSocket 客户端 - 生产级实现"""
    
    def __init__(self, api_key, symbols, on_spread_alert=None):
        self.api_key = api_key
        self.base_url = "wss://stream.tickdb.ai/v1/market/ws"
        
        # 所有待订阅的交易品种
        all_symbols = set(symbols)
        for symbol_pair in COINTEGRATED_PAIRS.values():
            all_symbols.add(symbol_pair['pair'])
        self.symbols = list(all_symbols)
        
        # 监控器字典
        self.monitors = {
            symbol: SpreadMonitor(
                symbol,
                config['pair'],
                config['hedge_ratio'],
                config['lookback']
            )
            for symbol, config in COINTEGRATED_PAIRS.items()
        }
        
        self.on_spread_alert = on_spread_alert
        self.ws = None
        self.reconnect_delay = 1
        self.max_reconnect_delay = 60
        self.retry_count = 0
        self.max_retries = 10
        self.running = False
        
        # 接收缓冲
        self.price_cache = {}
    
    def connect(self):
        """建立 WebSocket 连接"""
        try:
            # 鉴权:API Key 通过 URL 参数传递
            url = f"{self.base_url}?api_key={self.api_key}"
            
            self.ws = websocket.WebSocketApp(
                url,
                on_message=self.on_message,
                on_error=self.on_error,
                on_close=self.on_close,
                on_open=self.on_open
            )
            
            logger.info(f"正在连接 TickDB WebSocket...")
            self.running = True
            
            # 启动心跳保活线程
            heartbeat_thread = threading.Thread(target=self.heartbeat_loop, daemon=True)
            heartbeat_thread.start()
            
            self.ws.run_forever(ping_interval=30)
            
        except Exception as e:
            logger.error(f"连接失败: {e}")
            self.schedule_reconnect()
    
    def on_open(self, ws):
        """连接建立后发送订阅请求"""
        logger.info("WebSocket 连接已建立,发送订阅请求...")
        
        # 构建订阅消息
        subscribe_msg = {
            "cmd": "subscribe",
            "params": {
                "channels": ["trades"],
                "symbols": self.symbols
            }
        }
        
        ws.send(json.dumps(subscribe_msg))
        logger.info(f"已订阅 {len(self.symbols)} 个交易品种")
    
    def on_message(self, ws, message):
        """处理接收到的消息"""
        try:
            data = json.loads(message)
            
            # 处理心跳响应
            if data.get('type') == 'pong':
                return
            
            # 处理数据消息
            if data.get('type') == 'trade' or 'data' in data:
                self.process_trade_data(data)
            
        except json.JSONDecodeError as e:
            logger.error(f"消息解析失败: {e}")
    
    def process_trade_data(self, data):
        """处理成交数据,计算价差"""
        # 提取价格信息(根据实际 API 响应格式调整)
        # 注意:美股不支持 tick 级别成交数据,这里用快照演示
        records = data.get('data', [])
        
        for record in records:
            symbol = record.get('s')
            price = record.get('p')  # 价格
            
            if symbol and price:
                self.price_cache[symbol] = {
                    'price': price,
                    'timestamp': record.get('t')
                }
        
        # 检查是否所有标的都有数据,更新监控器
        ready = all(sym in self.price_cache for sym in self.symbols)
        
        if ready:
            # 更新每个配对的监控器
            for symbol, monitor in self.monitors.items():
                if symbol in self.price_cache and monitor.symbol_b in self.price_cache:
                    price_a = self.price_cache[symbol]['price']
                    price_b = self.price_cache[monitor.symbol_b]['price']
                    timestamp = self.price_cache[symbol]['timestamp']
                    
                    z_score = monitor.update_price(price_a, price_b, timestamp)
                    signal, z = monitor.generate_signal(z_score)
                    
                    if signal != 'WAITING' and signal != 'NEUTRAL':
                        logger.warning(
                            f"⚠️ 信号触发 [{symbol} vs {monitor.symbol_b}] "
                            f"信号: {signal} | Z-Score: {z:.3f}"
                        )
                        
                        # 触发告警回调
                        if self.on_spread_alert:
                            self.on_spread_alert({
                                'symbol_a': symbol,
                                'symbol_b': monitor.symbol_b,
                                'signal': signal,
                                'z_score': z,
                                'hedge_ratio': monitor.hedge_ratio,
                                'timestamp': timestamp
                            })
    
    def heartbeat_loop(self):
        """心跳保活循环"""
        while self.running:
            time.sleep(25)
            if self.ws and self.running:
                try:
                    self.ws.send(json.dumps({"cmd": "ping"}))
                    logger.debug("心跳已发送")
                except Exception as e:
                    logger.error(f"心跳发送失败: {e}")
    
    def on_error(self, ws, error):
        """错误处理"""
        logger.error(f"WebSocket 错误: {error}")
    
    def on_close(self, ws, close_status_code, close_msg):
        """连接关闭处理"""
        logger.warning(f"连接关闭: {close_status_code} - {close_msg}")
        self.running = False
        self.schedule_reconnect()
    
    def schedule_reconnect(self):
        """安排重连(指数退避 + 抖动)"""
        if self.retry_count >= self.max_retries:
            logger.error("达到最大重试次数,停止重连")
            return
        
        # 指数退避
        delay = min(self.reconnect_delay * (2 ** self.retry_count), self.max_reconnect_delay)
        
        # 添加抖动(随机 0-10%)
        jitter = random.uniform(0, delay * 0.1)
        delay = delay + jitter
        
        logger.info(f"{delay:.1f} 秒后尝试重连...")
        
        timer = threading.Timer(delay, self.reconnect)
        timer.daemon = True
        timer.start()
    
    def reconnect(self):
        """重新连接"""
        self.retry_count += 1
        self.reconnect_delay = 1
        self.connect()
    
    def stop(self):
        """停止客户端"""
        self.running = False
        if self.ws:
            self.ws.close()
        logger.info("客户端已停止")


def feishu_alert(spread_signal):
    """飞书告警(示例)"""
    import requests
    
    webhook_url = os.environ.get('FEISHU_WEBHOOK_URL')
    if not webhook_url:
        logger.warning("未配置飞书 Webhook,跳过告警")
        return
    
    message = {
        "msg_type": "interactive",
        "card": {
            "header": {
                "title": f"📊 套利信号触发",
                "style": "warning"
            },
            "elements": [
                {"tag": "div", "text": f"配对:{spread_signal['symbol_a']} / {spread_signal['symbol_b']}"},
                {"tag": "div", "text": f"信号:{spread_signal['signal']}"},
                {"tag": "div", "text": f"Z-Score:{spread_signal['z_score']:.3f}"},
                {"tag": "hr"},
                {"tag": "div", "text": f"时间:{spread_signal['timestamp']}"}
            ]
        }
    }
    
    try:
        response = requests.post(
            webhook_url,
            json=message,
            headers={"Content-Type": "application/json"},
            timeout=10
        )
        logger.info(f"飞书告警已发送: {response.status_code}")
    except Exception as e:
        logger.error(f"飞书告警失败: {e}")


def main():
    # 从环境变量读取 API Key
    api_key = os.environ.get("TICKDB_API_KEY")
    if not api_key:
        raise ValueError("请设置环境变量 TICKDB_API_KEY")
    
    # 订阅标的列表
    symbols = list(COINTEGRATED_PAIRS.keys())
    
    # 初始化客户端
    client = TickDBWebSocketClient(
        api_key=api_key,
        symbols=symbols,
        on_spread_alert=feishu_alert
    )
    
    logger.info("启动 TickDB 协整监控服务...")
    
    try:
        client.connect()
    except KeyboardInterrupt:
        logger.info("收到中断信号,停止服务...")
        client.stop()


if __name__ == "__main__":
    main()

代码关键设计说明

  • 心跳保活ping_interval=30 + 手动心跳线程,防止连接被中间设备断开
  • 指数退避重连delay = min(base * (2 ** retry), max_delay),避免高频重试
  • 抖动random.uniform(0, delay * 0.1),避免惊群效应
  • 限频处理:实际 TickDB 会返回 code:3001,这里预留了框架,实际需根据错误码处理
  • 环境变量os.environ.get("TICKDB_API_KEY"),不硬编码凭证
  • 异步提醒:生产环境高频场景建议将 requests.post 改为异步(aiohttp)

6.3 历史 K 线数据获取(回测用)

import requests
import os

def fetch_historical_klines(symbol, interval='1d', limit=252):
    """
    获取历史 K 线数据用于回测
    
    注意:美股不支持 tick 级别历史数据,使用 K 线数据
    """
    api_key = os.environ.get("TICKDB_API_KEY")
    
    headers = {"X-API-Key": api_key}
    
    params = {
        "symbol": symbol,
        "interval": interval,
        "limit": limit
    }
    
    try:
        response = requests.get(
            "https://api.tickdb.ai/v1/market/kline",
            headers=headers,
            params=params,
            timeout=(3.05, 10)  # 连接超时 3.05s,读取超时 10s
        )
        
        if response.status_code != 200:
            raise RuntimeError(f"HTTP {response.status_code}")
        
        data = response.json()
        
        if data.get('code') == 0:
            return data.get('data', [])
        
        # 错误码处理
        error_code = data.get('code')
        if error_code == 3001:
            retry_after = int(response.headers.get('Retry-After', 5))
            raise Exception(f"限频,请在 {retry_after} 秒后重试")
        elif error_code == 2002:
            raise KeyError(f"交易品种 {symbol} 不存在,请检查 symbol 格式")
        else:
            raise RuntimeError(f"API 错误 {error_code}: {data.get('message')}")
    
    except requests.exceptions.Timeout:
        raise RuntimeError("请求超时,请检查网络连接")
    except requests.exceptions.RequestException as e:
        raise RuntimeError(f"网络错误: {e}")

七、策略监控与持续迭代

7.1 协整关系的动态维护

协整不是一成不变的。随着市场结构变化、行业周期轮动、宏观事件冲击,历史上的有效配对可能失效。

建议建立以下维护机制:

检查频率 检查内容 处理方式
每日 当前 Z-Score 与入场时相比变化 记录,但不触发操作
每周 滚动协整检验 p-value p-value > 0.1 时移入观察名单
每月 候选池更新(剔除流动性下降标的) 增删配对,调整权重
每季度 全量回测 + 配对重排 重新运行三层漏斗,更新核心池

7.2 多配对组合管理

单一配对风险集中,建议同时监控 5-10 个有效配对,分配等权或风险平价权重。

class PairPortfolio:
    """配对组合管理器"""
    
    def __init__(self, max_pairs=10):
        self.max_pairs = max_pairs
        self.pairs = {}  # {symbol_pair: position_info}
        self.max_position_per_pair = 0.15  # 单个配对最大仓位 15%
    
    def add_pair(self, pair_key, hedge_ratio, entry_spread):
        if len(self.pairs) >= self.max_pairs:
            return False
        
        self.pairs[pair_key] = {
            'hedge_ratio': hedge_ratio,
            'entry_spread': entry_spread,
            'entry_time': datetime.now(),
            'size': 0
        }
        return True
    
    def calculate_position_size(self, pair_key, portfolio_capital, current_vol):
        """
        风险平价仓位计算
        
        根据当前波动率动态调整仓位
        波动率越高,仓位越小
        """
        target_vol = 0.02  # 目标波动率 2%
        
        if current_vol > 0:
            position_size = target_vol / current_vol
        else:
            position_size = self.max_position_per_pair
        
        # 限制最大仓位
        max_size = self.max_position_per_pair * portfolio_capital
        return min(position_size, max_size)

八、结语

协整配对交易是量化领域最经典的策略之一。它的优势在于不依赖大盘方向、Beta 中性、在震荡市中天然有利;它的挑战在于协整关系需要持续监控、仓位管理需要动态调整、实盘系统需要高可用性

本文给出了完整的工程实现路线:从三层漏斗筛选配对、用 Z-Score 监控价差、在生产环境用 WebSocket 实时推送信号。核心代码可以直接运行,监控逻辑可以根据实际需求调整阈值和告警方式。

策略的可持续性,取决于你对市场微观结构的理解深度,以及你对系统工程的敬畏程度。


下一步行动

如果你想亲手实现本文策略

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY,复制本文代码即可运行

如果你需要 10 年历史 K 线数据验证配对有效性,联系 [email protected] 了解机构方案。

如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,直接对话调用 TickDB 数据接口。

如果你对本文的协整检验逻辑感兴趣,欢迎进一步探讨 Hurst 指数与协整稳定性分析。


风险提示:本文不构成任何投资建议。统计套利策略存在模型风险、市场风险与流动性风险,历史回测结果不代表未来收益。实盘部署前请充分评估风险承受能力。