当市场失灵时,赚钱的机会就来了
2010 年 5 月 6 日,道琼斯指数在几分钟内暴跌 9%,然后几乎同样迅速地反弹回来。事后人们称之为“闪电崩盘”,媒体热衷于渲染其戏剧性。但量化交易者注意到了另一件事:在那片混乱中,一些历史上的“同涨同跌”兄弟,突然走出了完全不同的轨迹——一个跌 30%,另一个只跌 5%。短暂的价格失调,持续几分钟,然后回归正常。
如果你在那个瞬间持有前一类股票、做空后一类股票,你就是那场崩盘中少数赚钱的人。
这不是运气。这是统计套利的核心逻辑:当资产价格短暂偏离历史均衡关系时,买入被低估的、卖出被高估的,等待均值回归。 在数学上,我们把这种均衡关系叫做“协整”。
问题是:如何系统性地找到这种关系?如何让机器在价差偏离时自动发出信号?
本文给出完整答案:从数千只美股中筛选协整配对,基于 Z-Score 实时监控价差偏离,并在生产环境中部署实时监控代码。
一、协整配对交易:原理与适用场景
1.1 什么是协整,为什么它比相关性更重要
大多数新手会直接用相关性来筛选配对。这个思路有缺陷。
相关性描述的是“两个变量同向或反向移动的趋势”,但它不捕捉“偏离后会不会回归”。两个不相关甚至负相关的序列也可能存在协整关系。
协整描述的是“两个时间序列之间的长期均衡关系”。即使短期内它们各自剧烈波动,只要存在一个线性组合使得残差序列保持平稳,它们就是协整的。
用一个经典的比喻:遛狗的人走得很快,狗有时跑在前面,有时落在后面,但狗不会跑太远——协整关系就是那根狗绳。
在金融市场中,协整关系通常来自:
- 同行业公司:面对相同的供需基本面,价格驱动因素接近
- 上下游产业链:成本传导机制导致的定价联动
- 跨市场 ETF:追踪相同或相似指数的基金
1.2 协整配对交易的收益来源
假设两只股票 A 和 B 存在协整关系,我们可以定义价差(spread):
spread = A - β × B
其中 β 是通过回归得到的对冲比率。
正常情况下,spread 在零附近波动。当某只股票受到临时冲击(财报、新闻、大单)时,spread 短暂偏离。此时:
- 买入相对低估的股票
- 卖空相对高估的股票
- 等待 spread 回归零
收益来源有两层:
- 均值回归收益:价差从偏离点回归零
- 对冲后的 Alpha:在市场 Beta 中性情况下获取绝对收益
这正是机构量化团队偏爱的策略类型——不依赖大盘方向,在震荡市和分化市中表现稳健。
二、从 5000 只股票中筛选配对:三层漏斗模型
数千只美股全部做协整检验,计算量巨大。我们需要一套高效的筛选漏斗。
2.1 第一层:候选池构建(快速过滤)
排除以下标的,减少无效计算:
- 流动性不足:日均成交量低于 100 万美元
- 价格过低:股价低于 $5(容易被操纵,且统计特性不稳定)
- 特殊状态:停牌、退市、近期 IPO(数据历史不够)
def build_candidate_pool(stocks_df, min_volume=1e6, min_price=5, min_history_days=252):
"""
构建候选池:流动性 + 价格 + 历史数据三重过滤
"""
filtered = stocks_df[
(stocks_df['avg_daily_volume_usd'] >= min_volume) &
(stocks_df['price'] >= min_price) &
(stocks_df['history_days'] >= min_history_days)
]
return filtered.index.tolist()
2.2 第二层:相关性初筛(预排序)
计算日收益率的皮尔逊相关系数,保留相关系数 > 0.7 的配对候选。这一步的目的是把完全不相关的标的排除掉,减少后续协整检验的配对数量。
def correlation_filter(returns_df, candidate_pairs, min_corr=0.7):
"""
收益率相关性初筛
"""
n = len(candidate_pairs)
valid_pairs = []
for i in range(n):
for j in range(i + 1, n):
stock_a, stock_b = candidate_pairs[i], candidate_pairs[j]
corr = returns_df[stock_a].corr(returns_df[stock_b])
if corr > min_corr:
valid_pairs.append({
'stock_a': stock_a,
'stock_b': stock_b,
'correlation': corr
})
# 按相关系数降序排列
return sorted(valid_pairs, key=lambda x: x['correlation'], reverse=True)
为什么不用相关性作为最终筛选标准?
相关系数高不等于协整。2010 年闪电崩盘期间,高相关性的两只股票同时暴跌,spread 彻底失控。只有协整检验能真正捕捉“偏离后会不会回归”这一核心问题。
2.3 第三层:Engle-Granger 两步法协整检验
这是配对筛选的黄金标准,由 Engle 和 Granger 1987 年提出,两位经济学家因此获得诺贝尔奖。
第一步:回归
用最小二乘法(OLS)估计对冲比率 β:
A_t = α + β × B_t + ε_t
第二步:检验残差平稳性
对残差序列 ε_t 进行 ADF(Augmented Dickey-Fuller)检验。如果 p-value < 0.05,拒绝“存在单位根”的原假设,认为残差平稳,协整关系成立。
import numpy as np
from statsmodels.tsa.stattools import adfuller, coint
def engle_granger_test(stock_a_data, stock_b_data):
"""
Engle-Granger 两步法协整检验
参数:
stock_a_data: pandas Series,价格序列 A
stock_b_data: pandas Series,价格序列 B
返回:
dict: 包含 t-stat、p-value、协整系数 β
"""
# 方法一:statsmodels 内置的协整检验(自动两步法)
score, pvalue, _ = coint(stock_a_data, stock_b_data)
# 方法二:手动两步法(展示原理)
# 第一步:OLS 回归 A = α + βB + ε
X = np.column_stack([np.ones(len(stock_b_data)), stock_b_data])
beta = np.linalg.lstsq(X, stock_a_data, rcond=None)[0]
alpha, hedge_ratio = beta[0], beta[1]
# 第二步:ADF 检验残差
spread = stock_a_data - hedge_ratio * stock_b_data - alpha
adf_result = adfuller(spread, maxlag=1, regression='c')
return {
'pvalue': pvalue,
'adf_stat': adf_result[0],
'critical_values': adf_result[4],
'alpha': alpha,
'hedge_ratio': hedge_ratio,
'is_cointegrated': pvalue < 0.05
}
多周期对比的重要性
协整关系可能随时间失效。建议至少检验三个时间窗口:
- 最近 60 个交易日(短期)
- 最近 120 个交易日(中期)
- 最近 252 个交易日(长期)
只有三个窗口都显著的配对,才进入核心池。
三、价差监控:Z-Score 动态阈值
3.1 Z-Score 的计算方式
找到协整配对后,下一步是定义“价差偏离多少算异常”。
Z-Score 表示当前价差距离历史均值多少个标准差:
Z = (spread - μ) / σ
其中 μ 和 σ 由滚动窗口计算,推荐窗口长度 20-60 天。
class SpreadMonitor:
"""
协整配对价差实时监控器
"""
def __init__(self, hedge_ratio, lookback=60):
self.hedge_ratio = hedge_ratio
self.lookback = lookback
self.spread_history = []
def update(self, price_a, price_b):
"""更新价差,计算 Z-Score"""
spread = price_a - self.hedge_ratio * price_b
# 滑动窗口更新历史
self.spread_history.append(spread)
if len(self.spread_history) > self.lookback:
self.spread_history.pop(0)
if len(self.spread_history) < 20:
return None
# 计算均值和标准差
mean = np.mean(self.spread_history)
std = np.std(self.spread_history)
if std < 1e-8:
return None
z_score = (spread - mean) / std
return z_score
def generate_signal(self, z_score, upper_threshold=2.0, lower_threshold=-2.0):
"""基于 Z-Score 生成交易信号"""
if z_score is None:
return 'WAITING'
if z_score > upper_threshold:
return 'SELL_SPREAD' # 价差过高,卖出 A 做多 B
elif z_score < lower_threshold:
return 'BUY_SPREAD' # 价差过低,做多 A 卖出 B
else:
return 'NEUTRAL'
3.2 为什么静态阈值不够用
2 倍标准差的静态阈值在多数场景下有效,但存在一个致命问题:不同配对的波动率差异巨大。
EG 能源股与新能源股票的价差波动率,可能是消费股的 3-5 倍。如果用同一阈值,前者频繁触发假信号,后者几乎不触发。
解决方案:波动率自适应阈值。
def adaptive_threshold(z_score, current_vol, historical_avg_vol):
"""
基于波动率调整阈值
当当前波动率高于历史均值时,自动扩大阈值范围
避免在市场高波动期被噪声触发
"""
vol_ratio = current_vol / historical_avg_vol
# 波动率放大系数,最小 1.0(不做缩小)
multiplier = max(1.0, vol_ratio)
base_upper = 2.0
base_lower = -2.0
return base_upper * multiplier, base_lower * multiplier
3.3 非线性 Z-Score:Hurst 指数辅助判断
传统 Z-Score 假设均值回归必然发生,但历史数据中会出现“价差持续偏离”的情况。
Hurst 指数(H)可以帮助判断均值回归的可靠性:
- H < 0.5:趋势反转特性,均值回归可信
- H = 0.5:随机游走,无明确规律
- H > 0.5:趋势延续,均值回归不可靠
from scipy.stats import linregress
def calculate_hurst_exponent(price_series, max_lag=20):
"""
计算 Hurst 指数
方法:R/S 分析(重标极差分析)
"""
lags = range(2, max_lag)
tau = []
rpt = []
for lag in lags:
# 分割成子序列,计算 R/S
n_subseq = len(price_series) // lag
rs_values = []
for i in range(n_subseq):
subseq = price_series[i * lag:(i + 1) * lag]
mean_sub = np.mean(subseq)
# 累积离差
cumdev = np.cumsum(subseq - mean_sub)
R = np.max(cumdev) - np.min(cumdev)
S = np.std(subseq)
if S > 0:
rs_values.append(R / S)
if rs_values:
tau.append(lag)
rpt.append(np.mean(rs_values))
# 双对数回归:log(R/S) = H * log(n) + c
slope, _, _, _, _ = linregress(np.log(tau), np.log(rpt))
return slope
当 Hurst 指数 > 0.55 时,考虑将该配对移入观察名单而非核心池,直到更多数据确认均值回归特性。
四、仓位管理与风险控制
4.1 布林带确认入场时机
纯 Z-Score 信号可能存在假突破。建议用布林带(Bollinger Bands)做二次确认:
- 当 Z-Score 触发信号时,检查价差是否同时突破布林带
- 布林带中轨对齐 20 日均值,带宽基于 20 日标准差
- 双重确认减少假信号
def bollinger_confirm(spread, window=20, num_std=2):
"""布林带确认"""
if len(spread) < window:
return False
recent = spread[-window:]
mean = np.mean(recent)
std = np.std(recent)
upper_band = mean + num_std * std
lower_band = mean - num_std * std
current = spread[-1]
return current > upper_band or current < lower_band
4.2 动态止损与时间止损
即使协整检验通过,价差也可能长时间不回归。需要设置双重保险:
价格止损:价差反向移动超过 2 倍 ATR(Average True Range)时平仓
def calculate_atr(entry_spread, current_spread, atr_multiplier=2.0):
"""
ATR 动态止损
当价差向不利方向移动 2 倍 ATR 时触发止损
"""
spread_change = abs(current_spread - entry_spread)
# 简化版 ATR:使用历史波动率估算
# 生产环境建议使用实际 High/Low/Close 数据计算真实波幅
historical_vol = np.std(entry_spread[-60:]) if len(entry_spread) >= 60 else 0.01
return spread_change > atr_multiplier * historical_vol
时间止损:入场后 10 个交易日价差未回归,强制平仓(即使小幅亏损)
def time_stop_loss(entry_date, current_date, max_holding_days=10):
"""时间止损"""
holding_days = (current_date - entry_date).days
return holding_days > max_holding_days
五、回测验证:三大核心指标
5.1 最小回测要求
| 指标 | 最低标准 | 优秀标准 |
|---|---|---|
| 回测周期 | 1 年 | 3 年(至少含一个完整牛熊) |
| 配对数量 | ≥5 对 | ≥15 对(分散单一配对失效风险) |
| 交易次数 | ≥50 笔 | ≥150 笔(统计显著性) |
5.2 核心评估指标
夏普比率(Sharpe Ratio)
年化收益 / 年化波动率。统计套利目标区间:1.5 - 3.0(超过 3.0 需警惕过拟合)。
最大回撤(Max Drawdown)
任意时点账户从峰值到谷值的最大跌幅。机构要求通常 < 15%。
胜率与盈亏比
- 胜率:盈利交易笔数 / 总交易笔数
- 平均盈利 / 平均亏损:盈亏比
统计套利通常是“高胜率、中盈亏比”模式,胜率 > 55%、盈亏比 > 1.0 是合理预期。
5.3 回测局限性披露模板
回测局限性说明:上述结果基于历史数据模拟,未完全计入以下因素:实际交易中的滑点与市场冲击成本(已假设 0.05% 固定滑点);流动性枯竭时无法以目标价格成交的风险;协整关系随时间的结构性变化。建议在实盘前至少进行 3 个月的模拟盘验证。
六、生产级监控代码:Python + WebSocket 实战
6.1 系统架构
数据层:TickDB WebSocket 流 → 解析层:价格缓存 + spread 计算 → 决策层:Z-Score + 信号生成 → 告警层:飞书/Webhook
6.2 WebSocket 实时数据订阅
import os
import time
import random
import json
import threading
import logging
from datetime import datetime
import websocket
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 协整配对配置
COINTEGRATED_PAIRS = {
'BRK.B.US': {'pair': 'SPY.US', 'hedge_ratio': 3.2, 'lookback': 60},
'XOM.US': {'pair': 'CVX.US', 'hedge_ratio': 1.1, 'lookback': 60},
}
class SpreadMonitor:
"""协整价差实时监控器"""
def __init__(self, symbol_a, symbol_b, hedge_ratio, lookback=60):
self.symbol_a = symbol_a
self.symbol_b = symbol_b
self.hedge_ratio = hedge_ratio
self.price_a = None
self.price_b = None
self.spread_history = []
self.lookback = lookback
self.threshold_upper = 2.0
self.threshold_lower = -2.0
def update_price(self, price_a, price_b, timestamp):
"""更新价格并计算 Z-Score"""
self.price_a = price_a
self.price_b = price_b
# 计算当前价差
spread = price_a - self.hedge_ratio * price_b
self.spread_history.append({'time': timestamp, 'spread': spread})
# 保持滑动窗口
if len(self.spread_history) > self.lookback:
self.spread_history.pop(0)
if len(self.spread_history) < 20:
return None
# 计算 Z-Score
spreads = [s['spread'] for s in self.spread_history]
mean = sum(spreads) / len(spreads)
variance = sum((x - mean) ** 2 for x in spreads) / len(spreads)
std = variance ** 0.5
if std < 1e-8:
return None
z_score = (spread - mean) / std
return z_score
def generate_signal(self, z_score):
"""生成交易信号"""
if z_score is None:
return 'WAITING', 0
signal_map = {
(self.threshold_upper, float('inf')): ('SELL_SPREAD', z_score),
(-float('inf'), self.threshold_lower): ('BUY_SPREAD', z_score),
}
for (low, high), (signal, z) in signal_map.items():
if low <= z_score <= high:
return signal, z
return 'NEUTRAL', z_score
class TickDBWebSocketClient:
"""TickDB WebSocket 客户端 - 生产级实现"""
def __init__(self, api_key, symbols, on_spread_alert=None):
self.api_key = api_key
self.base_url = "wss://stream.tickdb.ai/v1/market/ws"
# 所有待订阅的交易品种
all_symbols = set(symbols)
for symbol_pair in COINTEGRATED_PAIRS.values():
all_symbols.add(symbol_pair['pair'])
self.symbols = list(all_symbols)
# 监控器字典
self.monitors = {
symbol: SpreadMonitor(
symbol,
config['pair'],
config['hedge_ratio'],
config['lookback']
)
for symbol, config in COINTEGRATED_PAIRS.items()
}
self.on_spread_alert = on_spread_alert
self.ws = None
self.reconnect_delay = 1
self.max_reconnect_delay = 60
self.retry_count = 0
self.max_retries = 10
self.running = False
# 接收缓冲
self.price_cache = {}
def connect(self):
"""建立 WebSocket 连接"""
try:
# 鉴权:API Key 通过 URL 参数传递
url = f"{self.base_url}?api_key={self.api_key}"
self.ws = websocket.WebSocketApp(
url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open
)
logger.info(f"正在连接 TickDB WebSocket...")
self.running = True
# 启动心跳保活线程
heartbeat_thread = threading.Thread(target=self.heartbeat_loop, daemon=True)
heartbeat_thread.start()
self.ws.run_forever(ping_interval=30)
except Exception as e:
logger.error(f"连接失败: {e}")
self.schedule_reconnect()
def on_open(self, ws):
"""连接建立后发送订阅请求"""
logger.info("WebSocket 连接已建立,发送订阅请求...")
# 构建订阅消息
subscribe_msg = {
"cmd": "subscribe",
"params": {
"channels": ["trades"],
"symbols": self.symbols
}
}
ws.send(json.dumps(subscribe_msg))
logger.info(f"已订阅 {len(self.symbols)} 个交易品种")
def on_message(self, ws, message):
"""处理接收到的消息"""
try:
data = json.loads(message)
# 处理心跳响应
if data.get('type') == 'pong':
return
# 处理数据消息
if data.get('type') == 'trade' or 'data' in data:
self.process_trade_data(data)
except json.JSONDecodeError as e:
logger.error(f"消息解析失败: {e}")
def process_trade_data(self, data):
"""处理成交数据,计算价差"""
# 提取价格信息(根据实际 API 响应格式调整)
# 注意:美股不支持 tick 级别成交数据,这里用快照演示
records = data.get('data', [])
for record in records:
symbol = record.get('s')
price = record.get('p') # 价格
if symbol and price:
self.price_cache[symbol] = {
'price': price,
'timestamp': record.get('t')
}
# 检查是否所有标的都有数据,更新监控器
ready = all(sym in self.price_cache for sym in self.symbols)
if ready:
# 更新每个配对的监控器
for symbol, monitor in self.monitors.items():
if symbol in self.price_cache and monitor.symbol_b in self.price_cache:
price_a = self.price_cache[symbol]['price']
price_b = self.price_cache[monitor.symbol_b]['price']
timestamp = self.price_cache[symbol]['timestamp']
z_score = monitor.update_price(price_a, price_b, timestamp)
signal, z = monitor.generate_signal(z_score)
if signal != 'WAITING' and signal != 'NEUTRAL':
logger.warning(
f"⚠️ 信号触发 [{symbol} vs {monitor.symbol_b}] "
f"信号: {signal} | Z-Score: {z:.3f}"
)
# 触发告警回调
if self.on_spread_alert:
self.on_spread_alert({
'symbol_a': symbol,
'symbol_b': monitor.symbol_b,
'signal': signal,
'z_score': z,
'hedge_ratio': monitor.hedge_ratio,
'timestamp': timestamp
})
def heartbeat_loop(self):
"""心跳保活循环"""
while self.running:
time.sleep(25)
if self.ws and self.running:
try:
self.ws.send(json.dumps({"cmd": "ping"}))
logger.debug("心跳已发送")
except Exception as e:
logger.error(f"心跳发送失败: {e}")
def on_error(self, ws, error):
"""错误处理"""
logger.error(f"WebSocket 错误: {error}")
def on_close(self, ws, close_status_code, close_msg):
"""连接关闭处理"""
logger.warning(f"连接关闭: {close_status_code} - {close_msg}")
self.running = False
self.schedule_reconnect()
def schedule_reconnect(self):
"""安排重连(指数退避 + 抖动)"""
if self.retry_count >= self.max_retries:
logger.error("达到最大重试次数,停止重连")
return
# 指数退避
delay = min(self.reconnect_delay * (2 ** self.retry_count), self.max_reconnect_delay)
# 添加抖动(随机 0-10%)
jitter = random.uniform(0, delay * 0.1)
delay = delay + jitter
logger.info(f"{delay:.1f} 秒后尝试重连...")
timer = threading.Timer(delay, self.reconnect)
timer.daemon = True
timer.start()
def reconnect(self):
"""重新连接"""
self.retry_count += 1
self.reconnect_delay = 1
self.connect()
def stop(self):
"""停止客户端"""
self.running = False
if self.ws:
self.ws.close()
logger.info("客户端已停止")
def feishu_alert(spread_signal):
"""飞书告警(示例)"""
import requests
webhook_url = os.environ.get('FEISHU_WEBHOOK_URL')
if not webhook_url:
logger.warning("未配置飞书 Webhook,跳过告警")
return
message = {
"msg_type": "interactive",
"card": {
"header": {
"title": f"📊 套利信号触发",
"style": "warning"
},
"elements": [
{"tag": "div", "text": f"配对:{spread_signal['symbol_a']} / {spread_signal['symbol_b']}"},
{"tag": "div", "text": f"信号:{spread_signal['signal']}"},
{"tag": "div", "text": f"Z-Score:{spread_signal['z_score']:.3f}"},
{"tag": "hr"},
{"tag": "div", "text": f"时间:{spread_signal['timestamp']}"}
]
}
}
try:
response = requests.post(
webhook_url,
json=message,
headers={"Content-Type": "application/json"},
timeout=10
)
logger.info(f"飞书告警已发送: {response.status_code}")
except Exception as e:
logger.error(f"飞书告警失败: {e}")
def main():
# 从环境变量读取 API Key
api_key = os.environ.get("TICKDB_API_KEY")
if not api_key:
raise ValueError("请设置环境变量 TICKDB_API_KEY")
# 订阅标的列表
symbols = list(COINTEGRATED_PAIRS.keys())
# 初始化客户端
client = TickDBWebSocketClient(
api_key=api_key,
symbols=symbols,
on_spread_alert=feishu_alert
)
logger.info("启动 TickDB 协整监控服务...")
try:
client.connect()
except KeyboardInterrupt:
logger.info("收到中断信号,停止服务...")
client.stop()
if __name__ == "__main__":
main()
代码关键设计说明:
- 心跳保活:
ping_interval=30+ 手动心跳线程,防止连接被中间设备断开 - 指数退避重连:
delay = min(base * (2 ** retry), max_delay),避免高频重试 - 抖动:
random.uniform(0, delay * 0.1),避免惊群效应 - 限频处理:实际 TickDB 会返回
code:3001,这里预留了框架,实际需根据错误码处理 - 环境变量:
os.environ.get("TICKDB_API_KEY"),不硬编码凭证 - 异步提醒:生产环境高频场景建议将
requests.post改为异步(aiohttp)
6.3 历史 K 线数据获取(回测用)
import requests
import os
def fetch_historical_klines(symbol, interval='1d', limit=252):
"""
获取历史 K 线数据用于回测
注意:美股不支持 tick 级别历史数据,使用 K 线数据
"""
api_key = os.environ.get("TICKDB_API_KEY")
headers = {"X-API-Key": api_key}
params = {
"symbol": symbol,
"interval": interval,
"limit": limit
}
try:
response = requests.get(
"https://api.tickdb.ai/v1/market/kline",
headers=headers,
params=params,
timeout=(3.05, 10) # 连接超时 3.05s,读取超时 10s
)
if response.status_code != 200:
raise RuntimeError(f"HTTP {response.status_code}")
data = response.json()
if data.get('code') == 0:
return data.get('data', [])
# 错误码处理
error_code = data.get('code')
if error_code == 3001:
retry_after = int(response.headers.get('Retry-After', 5))
raise Exception(f"限频,请在 {retry_after} 秒后重试")
elif error_code == 2002:
raise KeyError(f"交易品种 {symbol} 不存在,请检查 symbol 格式")
else:
raise RuntimeError(f"API 错误 {error_code}: {data.get('message')}")
except requests.exceptions.Timeout:
raise RuntimeError("请求超时,请检查网络连接")
except requests.exceptions.RequestException as e:
raise RuntimeError(f"网络错误: {e}")
七、策略监控与持续迭代
7.1 协整关系的动态维护
协整不是一成不变的。随着市场结构变化、行业周期轮动、宏观事件冲击,历史上的有效配对可能失效。
建议建立以下维护机制:
| 检查频率 | 检查内容 | 处理方式 |
|---|---|---|
| 每日 | 当前 Z-Score 与入场时相比变化 | 记录,但不触发操作 |
| 每周 | 滚动协整检验 p-value | p-value > 0.1 时移入观察名单 |
| 每月 | 候选池更新(剔除流动性下降标的) | 增删配对,调整权重 |
| 每季度 | 全量回测 + 配对重排 | 重新运行三层漏斗,更新核心池 |
7.2 多配对组合管理
单一配对风险集中,建议同时监控 5-10 个有效配对,分配等权或风险平价权重。
class PairPortfolio:
"""配对组合管理器"""
def __init__(self, max_pairs=10):
self.max_pairs = max_pairs
self.pairs = {} # {symbol_pair: position_info}
self.max_position_per_pair = 0.15 # 单个配对最大仓位 15%
def add_pair(self, pair_key, hedge_ratio, entry_spread):
if len(self.pairs) >= self.max_pairs:
return False
self.pairs[pair_key] = {
'hedge_ratio': hedge_ratio,
'entry_spread': entry_spread,
'entry_time': datetime.now(),
'size': 0
}
return True
def calculate_position_size(self, pair_key, portfolio_capital, current_vol):
"""
风险平价仓位计算
根据当前波动率动态调整仓位
波动率越高,仓位越小
"""
target_vol = 0.02 # 目标波动率 2%
if current_vol > 0:
position_size = target_vol / current_vol
else:
position_size = self.max_position_per_pair
# 限制最大仓位
max_size = self.max_position_per_pair * portfolio_capital
return min(position_size, max_size)
八、结语
协整配对交易是量化领域最经典的策略之一。它的优势在于不依赖大盘方向、Beta 中性、在震荡市中天然有利;它的挑战在于协整关系需要持续监控、仓位管理需要动态调整、实盘系统需要高可用性。
本文给出了完整的工程实现路线:从三层漏斗筛选配对、用 Z-Score 监控价差、在生产环境用 WebSocket 实时推送信号。核心代码可以直接运行,监控逻辑可以根据实际需求调整阈值和告警方式。
策略的可持续性,取决于你对市场微观结构的理解深度,以及你对系统工程的敬畏程度。
下一步行动
如果你想亲手实现本文策略:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台生成 API Key
- 设置环境变量
TICKDB_API_KEY,复制本文代码即可运行
如果你需要 10 年历史 K 线数据验证配对有效性,联系 [email protected] 了解机构方案。
如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,直接对话调用 TickDB 数据接口。
如果你对本文的协整检验逻辑感兴趣,欢迎进一步探讨 Hurst 指数与协整稳定性分析。
风险提示:本文不构成任何投资建议。统计套利策略存在模型风险、市场风险与流动性风险,历史回测结果不代表未来收益。实盘部署前请充分评估风险承受能力。