当市场分裂时,价差会说话
2008 年 9 月 16 日,雷曼兄弟申请破产后的第三天。大量机构投资者被迫去杠杆化,清算所有可变现资产。在这场集体性抛售中,一只名为 "LEH" 的股票——雷曼兄弟——在 30 分钟内跌去近半,与此同时,大量与之毫无业务关联的股票也在同步暴跌。
但一位遵循特定规则的量化交易者此刻看到的是机会,而非恐慌。
她知道,LEH 与同板块的其他股票之间并没有经济意义上的联动。它们被同一波流动性危机裹挟,而不是被同一条基本面的逻辑线牵引。这种"假性联动"是均值回归的燃料。而均值回归,正是统计套利的基石。
这不是 2008 年独有的现象。2021 年 1 月 GameStop 逼空期间,大量" meme 股"同步异动;2022 年美联储激进加息时,高股息板块与成长板块的负相关短暂失效——这些时刻都在重复同一个规律:系统性风险会制造假性联动,摧毁相关但摧毁不了协整。
本文拆解统计套利中最经典的策略——协整配对交易——的完整工程实现:从数千只美股中筛选协整对,到实时监控价差并生成交易信号,再到如何用 TickDB 的历史 K 线数据完成策略回测验证。
一、为什么是协整,不是相关
在动手之前,必须把最重要的概念理清楚——协整(Cointegration)和相关(Correlation)不是一回事。
相关性衡量的是两个序列变化趋势是否一致。X 涨,Y 也倾向于涨,它们就是相关的。但这种关系是脆弱的:如果市场情绪逆转,两个相关资产可以同步下跌,毫无"纠偏"机制。
协整性描述的是一种更深层的约束关系:两个(或多个)变量在短期内可能各自游走,但它们之间的线性组合是一个平稳序列。换句话说,偏离是暂时的,系统有一股"引力"把它们拉回均衡位置。
用一个经典比喻:一只喝醉的人和一只狗同时走路。喝醉的人步伐和狗的步伐高度相关——都在随机游走。但喝醉的人和狗之间的距离呢?那个距离是均值回归的,因为两个人/狗最终都会朝家的方向走。这个"距离"就是协整关系。
| 属性 | 相关性 | 协整性 |
|---|---|---|
| 衡量对象 | 收益率的同向性 | 价差的平稳性 |
| 稳定性 | 时变,市场结构改变后可能失效 | 更深层的经济联系,稳定性更强 |
| 适用场景 | 趋势类策略 | 均值回归类策略 |
| 计算方法 | Pearson 相关系数 | Engle-Granger / Johansen 检验 |
| 是否可套利 | 不直接 | 是 |
对于配对交易而言,我们追求的正是协整性而非相关性。两只股票价格"一起涨"不构成交易理由,但"它们之间存在平稳的价差序列"才是。
二、协整检验:从数千只股票到候选配对
2.1 筛选框架
面对数千只美股,穷举所有配对是不现实的(n 只股票需要 n(n-1)/2 对,全部检验成本极高)。需要一个多阶段筛选流程:
阶段一:预处理
├── 过滤低流动性标的(日均成交量 < 50 万股)
├── 过滤高波动异常值(日收益率标准差 > 5% 的标的排除)
└── 统一价格量纲(取对数收益率)
阶段二:粗筛——距离法
├── 计算历史价格序列之间的欧氏距离(或标准化距离)
├── 取距离最小的 Top-N 候选对(如 Top 500)
└── 理由:协整序列的距离通常较小
阶段三:精筛——统计检验
├── 对候选对逐一进行 ADF 检验(协整检验的核心)
├── 记录 p-value、临界值、平稳性结论
└── 选取 p-value < 0.05 的配对
2.2 ADF 检验原理
Augmented Dickey-Fuller(ADF)检验是协整检验的基石。它检验以下假设:
- H0(零假设):序列存在单位根,即非平稳
- H1(备择假设):序列是平稳的(或者说不存在单位根)
若拒绝 H0,则价差序列是平稳的——协整关系成立。
ADF 检验的核心回归方程:
$$\Delta y_t = \alpha + \beta t + \gamma y_{t-1} + \sum_{i=1}^{p} \delta_i \Delta y_{t-i} + \epsilon_t$$
检验的核心是 $\gamma = 0$ 是否成立。若 $\gamma \neq 0$,则 $y_{t-1}$ 对序列的平稳性有贡献,即协整成立。
2.3 候选对筛选实现
以下代码展示从标的池到候选协整对的完整筛选流程:
import os
import json
import time
import random
import numpy as np
import pandas as pd
from scipy import stats
from statsmodels.tsa.stattools import adfuller
# ─────────────────────────────────────────────
# TickDB 历史 K 线数据获取
# ─────────────────────────────────────────────
def fetch_historical_klines(symbol: str, interval: str = "1d", limit: int = 500) -> pd.DataFrame:
"""
从 TickDB 获取指定标的的历史 K 线数据。
interval: 1m/5m/1h/4h/1d/week/month
返回 DataFrame,列:[timestamp, open, high, low, close, volume]
"""
api_key = os.environ.get("TICKDB_API_KEY")
if not api_key:
raise ValueError("请设置环境变量 TICKDB_API_KEY")
import requests
url = "https://api.tickdb.ai/v1/market/kline"
headers = {"X-API-Key": api_key}
params = {
"symbol": symbol,
"interval": interval,
"limit": limit
}
try:
response = requests.get(
url,
headers=headers,
params=params,
timeout=(3.05, 10)
)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
raise RateLimitError(f"限频,等待 {retry_after} 秒")
data = response.json()
if data.get("code") == 3001:
retry_after = int(response.headers.get("Retry-After", 5))
raise RateLimitError(f"限频,等待 {retry_after} 秒")
if data.get("code") != 0:
raise RuntimeError(f"API 错误 {data.get('code')}: {data.get('message')}")
klines = data["data"]
df = pd.DataFrame(klines)
# 字段映射
df["timestamp"] = pd.to_datetime(df["ts"], unit="ms")
df = df[["timestamp", "open", "high", "low", "close", "volume"]]
df = df.sort_values("timestamp").reset_index(drop=True)
return df
except requests.exceptions.RequestException as e:
raise RuntimeError(f"网络请求失败: {e}")
class RateLimitError(Exception):
"""限频异常,用于触发重连退避"""
pass
def fetch_multiple_symbols(symbols: list, interval: str = "1d", limit: int = 500, max_retries: int = 3) -> dict:
"""
批量获取多只标的的 K 线数据。
⚠️ 高频调用需配合限频退避逻辑。
"""
results = {}
base_delay = 2.0
max_delay = 60.0
for i, symbol in enumerate(symbols):
retry_count = 0
while retry_count < max_retries:
try:
df = fetch_historical_klines(symbol, interval, limit)
results[symbol] = df
break
except RateLimitError as e:
wait_time = int(str(e).split("等待 ")[-1].rstrip(" 秒"))
print(f"[{symbol}] 触发限频,等待 {wait_time} 秒")
time.sleep(wait_time)
retry_count += 1
except Exception as e:
print(f"[{symbol}] 获取失败: {e}")
results[symbol] = None
break
# 指数退避 + 抖动:避免大量请求在同一时刻发起
delay = min(base_delay * (2 ** retry_count), max_delay)
jitter = random.uniform(0, delay * 0.1)
time.sleep(delay + jitter)
return results
# ─────────────────────────────────────────────
# 协整检验核心函数
# ─────────────────────────────────────────────
def cointegration_test(series1: pd.Series, series2: pd.Series) -> dict:
"""
对两个价格序列进行协整检验。
返回检验结果字典,包含 ADF 统计量、p-value、临界值和结论。
"""
if len(series1) != len(series2):
raise ValueError("两个序列长度必须相同")
# Engle-Granger 两步法:
# 第一步:用 OLS 回归 y = α + β * x + ε,求出残差序列 ε(即价差 spread)
slope, intercept, _, _, _ = stats.linregress(series2, series1)
spread = series1 - (intercept + slope * series2)
# 第二步:对残差序列 ε 做 ADF 检验
adf_result = adfuller(spread, maxlag=1, regression="c")
adf_stat = adf_result[0]
p_value = adf_result[1]
critical_values = adf_result[4]
# 临界值取 5% 显著性水平
is_cointegrated = p_value < 0.05 and adf_stat < critical_values["5%"]
return {
"spread_mean": spread.mean(),
"spread_std": spread.std(),
"hedge_ratio": slope,
"intercept": intercept,
"adf_statistic": adf_stat,
"p_value": p_value,
"critical_values": critical_values,
"is_cointegrated": is_cointegrated,
"sample_size": len(spread)
}
def screen_pairs(universe: list, interval: str = "1d", lookback: int = 500,
top_candidate: int = 200) -> pd.DataFrame:
"""
从股票池中筛选候选协整对。
参数:
universe: 标的代码列表,如 ["AAPL.US", "MSFT.US"]
interval: K 线周期
lookback: 历史数据量
top_candidate: 粗筛阶段保留的候选对数量
"""
print(f"正在获取 {len(universe)} 只标的的历史数据...")
all_prices = fetch_multiple_symbols(universe, interval, limit=lookback)
print(f"数据获取完成,有效标的: {sum(1 for v in all_prices.values() if v is not None)} 只")
# 构建价格矩阵(收盘价)
price_data = {}
for symbol, df in all_prices.items():
if df is not None and len(df) >= lookback * 0.9:
price_data[symbol] = df["close"].values
symbols = list(price_data.keys())
print(f"进入粗筛阶段,共 {len(symbols)} 只有效标的")
# 粗筛:计算标准化价格距离,保留 Top-N 候选对
n = len(symbols)
distances = []
for i in range(n):
for j in range(i + 1, n):
s1 = symbols[i]
s2 = symbols[j]
# 标准化后计算欧氏距离
p1 = (price_data[s1] - price_data[s1].mean()) / (price_data[s1].std() + 1e-8)
p2 = (price_data[s2] - price_data[s2].mean()) / (price_data[s2].std() + 1e-8)
dist = np.sqrt(np.mean((p1 - p2) ** 2))
distances.append({
"symbol1": s1,
"symbol2": s2,
"distance": dist
})
# 按距离升序排列,取前 top_candidate
distances_df = pd.DataFrame(distances)
distances_df = distances_df.sort_values("distance").head(top_candidate)
print(f"粗筛完成,保留 {len(distances_df)} 对候选对")
# 精筛:对候选对逐一做 ADF 协整检验
results = []
for _, row in distances_df.iterrows():
s1_prices = pd.Series(price_data[row["symbol1"]])
s2_prices = pd.Series(price_data[row["symbol2"]])
try:
result = cointegration_test(s1_prices, s2_prices)
result["symbol1"] = row["symbol1"]
result["symbol2"] = row["symbol2"]
results.append(result)
except Exception as e:
print(f"协整检验失败 {row['symbol1']}/{row['symbol2']}: {e}")
results_df = pd.DataFrame(results)
# 过滤出通过协整检验的配对
valid_pairs = results_df[results_df["is_cointegrated"] == True].copy()
valid_pairs = valid_pairs.sort_values("p_value").reset_index(drop=True)
print(f"\n协整检验完成,共 {len(valid_pairs)} 对通过检验 (p < 0.05)")
print(f"Top 5 候选配对(按 p-value 排序):")
print(valid_pairs[["symbol1", "symbol2", "p_value", "hedge_ratio"]].head())
return valid_pairs
# ─────────────────────────────────────────────
# 示例:运行筛选流程
# ─────────────────────────────────────────────
if __name__ == "__main__":
# 注意:实盘中 universe 应为完整的可交易标的列表
# 这里以几只代表性股票作为演示
demo_universe = [
"AAPL.US", "MSFT.US", "GOOGL.US", "AMZN.US",
"XOM.US", "CVX.US", "JPM.US", "BAC.US",
"JNJ.US", "PFE.US", "UNH.US", "MRK.US"
]
candidate_pairs = screen_pairs(
universe=demo_universe,
interval="1d",
lookback=500,
top_candidate=30
)
运行以上脚本后,会输出类似以下格式的候选配对表格:
| symbol1 | symbol2 | p_value | adf_statistic | hedge_ratio | spread_std |
|---|---|---|---|---|---|
| XOM.US | CVX.US | 0.0023 | -3.894 | 1.231 | 2.14 |
| JNJ.US | PFE.US | 0.0087 | -3.452 | 0.876 | 1.89 |
| JPM.US | BAC.US | 0.0142 | -3.218 | 1.089 | 3.05 |
其中,XOM(埃克森美孚)与 CVX(雪佛龙)构成一对经典的能源板块配对——两者业务高度同质,原油价格是共同驱动因子,均值回归特性稳定。
三、价差 Z-Score:信号从何而来
找到协整对只是第一步。交易的核心在于:价差偏离均衡多少个标准差时入场?偏离多少时离场?
这就要引入 Z-Score(标准分数):
$$Z_t = \frac{Spread_t - \mu_{spread}}{\sigma_{spread}}$$
其中 $\mu_{spread}$ 和 $\sigma_{spread}$ 通常用滚动窗口估算,而非全历史均值(因为协整关系可能缓慢漂移)。
3.1 经典阈值设置
| Z-Score 区间 | 信号含义 | 操作 |
|---|---|---|
| Z > +2.0 | 价差偏高,上轨突破 | 卖出 symbol1,买入 symbol2(空头对冲) |
| Z < -2.0 | 价差偏低,下轨突破 | 买入 symbol1,卖出 symbol2(多头对冲) |
| Z ∈ (-0.5, +0.5) | 价差接近均衡 | 平仓,持有现金 |
| Z 回归至 ±0.5 | 止盈信号 | 价差收敛时锁定利润 |
| Z 突破 ±3.0 | 止损信号 | 极端偏离,市场逻辑可能已变化 |
这些阈值不是固定的。不同资产类别、不同波动环境下,最优阈值差异显著。下一节中的回测模块提供了参数优化的框架。
3.2 Z-Score 实时计算
class SpreadMonitor:
"""
实时监控协整对的价差 Z-Score。
适用于 WebSocket 推送的 tick 数据(港股、数字货币)。
⚠️ 美股逐笔成交不支持,请使用 TickDB /kline/latest 端点获取实时 K 线。
"""
def __init__(self, symbol1: str, symbol2: str,
hedge_ratio: float, intercept: float,
window: int = 20):
self.symbol1 = symbol1
self.symbol2 = symbol2
self.hedge_ratio = hedge_ratio
self.intercept = intercept
self.window = window
# 滚动均值和标准差的历史缓冲区
self.price1_buffer = []
self.price2_buffer = []
self.spread_history = []
# Z-Score 阈值
self.entry_threshold = 2.0
self.exit_threshold = 0.5
self.stop_loss_threshold = 3.0
def update(self, price1: float, price2: float) -> dict:
"""
更新最新价格,计算实时 Z-Score。
返回信号字典,包含 Z-Score、信号类型和建议操作。
"""
self.price1_buffer.append(price1)
self.price2_buffer.append(price2)
# 维持滚动窗口大小
if len(self.price1_buffer) > self.window * 2:
self.price1_buffer.pop(0)
self.price2_buffer.pop(0)
if len(self.price1_buffer) < self.window:
return {"status": "warming_up", "z_score": None}
# 计算当前价差
current_spread = price1 - (self.intercept + self.hedge_ratio * price2)
# 计算滚动均值和标准差
recent_spread_history = self.spread_history[-self.window:] if self.spread_history else []
if len(recent_spread_history) < self.window:
self.spread_history.append(current_spread)
return {"status": "warming_up", "z_score": None}
spread_mean = np.mean(recent_spread_history)
spread_std = np.std(recent_spread_history) + 1e-8
z_score = (current_spread - spread_mean) / spread_std
# 更新价差历史
self.spread_history.append(current_spread)
if len(self.spread_history) > self.window * 3:
self.spread_history.pop(0)
# 生成交易信号
signal = self._generate_signal(z_score)
return {
"status": "ready",
"z_score": round(z_score, 3),
"spread": round(current_spread, 4),
"signal": signal,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
}
def _generate_signal(self, z_score: float) -> dict:
"""根据 Z-Score 生成交易信号"""
if z_score > self.stop_loss_threshold:
return {
"action": "STOP_LOSS",
"direction": "short_spread",
"description": f"Z={z_score:.2f} 突破止损阈值 {self.stop_loss_threshold},强制平仓"
}
elif z_score > self.entry_threshold:
return {
"action": "ENTRY",
"direction": "short_spread",
"description": f"Z={z_score:.2f} > +{self.entry_threshold},卖出 {self.symbol1},买入 {self.symbol2}"
}
elif z_score < -self.entry_threshold:
return {
"action": "ENTRY",
"direction": "long_spread",
"description": f"Z={z_score:.2f} < -{self.entry_threshold},买入 {self.symbol1},卖出 {self.symbol2}"
}
elif abs(z_score) < self.exit_threshold:
return {
"action": "EXIT",
"direction": "neutral",
"description": f"Z={z_score:.2f} 回归均衡,平仓"
}
else:
return {
"action": "HOLD",
"direction": "neutral",
"description": f"Z={z_score:.2f} 处于中性区间,观望"
}
# ─────────────────────────────────────────────
# WebSocket 实时推送订阅(适用于港股、数字货币)
# ─────────────────────────────────────────────
import websocket
import threading
import queue
class RealtimeSpreadWatcher:
"""
通过 WebSocket 订阅 TickDB 实时行情,驱动 SpreadMonitor。
适用于支持 trades 频道的标的(港股、数字货币)。
⚠️ 美股不支持 tick 级逐笔,此处示例代码适用于港股/数字货币。
"""
RECONNECT_BASE_DELAY = 2.0
RECONNECT_MAX_DELAY = 60.0
MAX_RECONNECT_ATTEMPTS = 10
def __init__(self, symbols: list, spread_monitor: SpreadMonitor):
self.symbols = symbols
self.monitor = spread_monitor
self.ws = None
self.reconnect_attempts = 0
self.latest_prices = {sym: None for sym in symbols}
self._msg_queue = queue.Queue()
self._running = False
def start(self, api_key: str):
"""启动 WebSocket 连接"""
self.api_key = api_key
self._running = True
self._connect()
# 启动独立线程处理消息
thread = threading.Thread(target=self._message_loop, daemon=True)
thread.start()
def _connect(self):
"""建立 WebSocket 连接"""
try:
symbols_param = ",".join(self.symbols)
# ⚠️ API Key 以 URL 参数形式传递(TickDB WebSocket 规范)
ws_url = f"wss://api.tickdb.ai/ws?api_key={self.api_key}&symbols={symbols_param}&channels=trades"
self.ws = websocket.WebSocketApp(
ws_url,
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close
)
thread = threading.Thread(target=self.ws.run_forever, daemon=True)
thread.start()
except Exception as e:
print(f"WebSocket 连接建立失败: {e}")
self._schedule_reconnect()
def _on_open(self, ws):
"""连接建立成功"""
print(f"[{time.strftime('%H:%M:%S')}] WebSocket 连接已建立,开始接收实时数据")
self.reconnect_attempts = 0
# 发送心跳保活(TickDB WebSocket 规范)
ws.send(json.dumps({"cmd": "ping"}))
def _on_message(self, ws, message):
"""接收并分发消息"""
self._msg_queue.put(message)
def _on_error(self, ws, error):
"""错误处理"""
print(f"WebSocket 错误: {error}")
def _on_close(self, ws, code, reason):
"""连接关闭后触发重连"""
print(f"WebSocket 关闭 (code={code}, reason={reason})")
self._schedule_reconnect()
def _schedule_reconnect(self):
"""指数退避重连"""
if not self._running:
return
delay = min(
self.RECONNECT_BASE_DELAY * (2 ** self.reconnect_attempts),
self.RECONNECT_MAX_DELAY
)
# 加抖动避免惊群
jitter = random.uniform(0, delay * 0.1)
wait_time = delay + jitter
print(f"[{time.strftime('%H:%M:%S')}] {wait_time:.1f} 秒后尝试重连 (第 {self.reconnect_attempts + 1} 次)")
time.sleep(wait_time)
if self.reconnect_attempts < self.MAX_RECONNECT_ATTEMPTS:
self.reconnect_attempts += 1
self._connect()
else:
print("达到最大重连次数,请检查网络或 API Key")
def _message_loop(self):
"""独立线程:处理消息队列,驱动 Z-Score 计算"""
last_heartbeat = time.time()
while self._running:
try:
message = self._msg_queue.get(timeout=1.0)
data = json.loads(message)
# 心跳保活:每 30 秒检测一次 ping/pong
if time.time() - last_heartbeat > 30:
if self.ws and self.ws.sock and self.ws.sock.connected:
self.ws.send(json.dumps({"cmd": "ping"}))
last_heartbeat = time.time()
# 处理 trades 频道数据
if data.get("channel") == "trades" and "data" in data:
for trade in data["data"]:
symbol = trade.get("s") or trade.get("symbol")
price = float(trade.get("p") or trade.get("price"))
if symbol in self.latest_prices:
self.latest_prices[symbol] = price
# 当两个标的都有最新价格时,计算 Z-Score
p1 = self.latest_prices.get(self.monitor.symbol1)
p2 = self.latest_prices.get(self.monitor.symbol2)
if p1 is not None and p2 is not None:
result = self.monitor.update(p1, p2)
if result["status"] == "ready":
self._log_signal(result)
except queue.Empty:
continue
except json.JSONDecodeError:
continue
def _log_signal(self, result: dict):
"""记录信号到控制台(生产环境应接入告警系统)"""
signal = result["signal"]
timestamp = result["timestamp"]
if signal["action"] in ("ENTRY", "STOP_LOSS"):
print(f"[{timestamp}] 🔔 {signal['action']} | "
f"Z={result['z_score']} | {signal['description']}")
else:
print(f"[{timestamp}] Z={result['z_score']} | {signal['description']}")
def stop(self):
"""停止 WebSocket 连接"""
self._running = False
if self.ws:
self.ws.close()
四、回测框架:让历史说话
策略写完了,但纸上谈兵没有意义。任何量化策略在上线前,必须经过回测验证。以下是一个完整的回测引擎:
class PairBacktester:
"""
配对交易策略回测引擎。
支持参数扫描(阈值优化)和多指标绩效评估。
"""
def __init__(self, symbol1: str, symbol2: str,
hedge_ratio: float, intercept: float,
entry_threshold: float = 2.0,
exit_threshold: float = 0.5,
stop_loss: float = 3.0,
window: int = 20,
initial_capital: float = 100_000.0,
commission: float = 0.001,
slippage: float = 0.0005):
self.symbol1 = symbol1
self.symbol2 = symbol2
self.hedge_ratio = hedge_ratio
self.intercept = intercept
self.entry_threshold = entry_threshold
self.exit_threshold = exit_threshold
self.stop_loss = stop_loss
self.window = window
self.initial_capital = initial_capital
self.commission = commission # 佣金率
self.slippage = slippage # 滑点率
def run(self, df1: pd.DataFrame, df2: pd.DataFrame) -> dict:
"""
执行回测。
参数:
df1, df2: 包含 timestamp, close, volume 列的 DataFrame
返回:
回测绩效指标字典
"""
# 合并数据
merged = pd.merge(
df1[["timestamp", "close"]].rename(columns={"close": "p1"}),
df2[["timestamp", "close"]].rename(columns={"close": "p2"}),
on="timestamp", how="inner"
).reset_index(drop=True)
# 计算价差
merged["spread"] = (
merged["p1"] - (self.intercept + self.hedge_ratio * merged["p2"])
)
# 计算 Z-Score
merged["z_score"] = (
(merged["spread"] - merged["spread"].rolling(self.window).mean())
/ (merged["spread"].rolling(self.window).std() + 1e-8)
)
# 模拟交易
position = 0 # +1: long spread, -1: short spread, 0: flat
entry_z = 0
pnl_list = []
equity = self.initial_capital
equity_curve = []
for i, row in merged.iterrows():
if pd.isna(row["z_score"]):
equity_curve.append(equity)
continue
z = row["z_score"]
price1 = row["p1"]
price2 = row["p2"]
# 开仓逻辑
if position == 0:
if z > self.entry_threshold:
position = -1
entry_z = z
entry_price1 = price1 * (1 + self.slippage)
entry_price2 = price2 * (1 - self.slippage)
elif z < -self.entry_threshold:
position = 1
entry_z = z
entry_price1 = price1 * (1 - self.slippage)
entry_price2 = price2 * (1 + self.slippage)
# 平仓逻辑
elif position != 0:
should_exit = False
# 止盈
if abs(z) < self.exit_threshold:
should_exit = True
# 止损
if abs(z) > self.stop_loss:
should_exit = True
if should_exit:
exit_price1 = price1 * (1 - self.slippage) if position == 1 else price1 * (1 + self.slippage)
exit_price2 = price2 * (1 + self.slippage) if position == 1 else price2 * (1 - self.slippage)
# 计算利润(按一股配对为单位)
# long spread: 盈利 = (p1上涨 - entry_p1) - hedge_ratio * (p2下跌 - entry_p2)
if position == 1:
pnl_per_unit = (exit_price1 - entry_price1) - self.hedge_ratio * (entry_price2 - exit_price2)
else:
pnl_per_unit = (entry_price1 - exit_price1) - self.hedge_ratio * (exit_price2 - entry_price2)
# 考虑佣金
trade_value = abs(entry_price1) + abs(entry_price2) * self.hedge_ratio
net_pnl = pnl_per_unit - trade_value * self.commission
equity += net_pnl
pnl_list.append(net_pnl)
position = 0
equity_curve.append(equity)
# 计算绩效指标
returns = pd.Series(equity_curve).pct_change().dropna()
equity_series = pd.Series(equity_curve)
# 最大回撤
rolling_max = equity_series.expanding().max()
drawdowns = equity_series - rolling_max
max_drawdown = drawdowns.min()
max_drawdown_pct = (max_drawdown / rolling_max[drawdowns.idxmin()]) * 100
# 夏普比率(年化)
annual_return = returns.mean() * 252
annual_vol = returns.std() * np.sqrt(252)
sharpe = annual_return / annual_vol if annual_vol != 0 else 0.0
# 索提诺比率
downside_returns = returns[returns < 0]
downside_vol = downside_returns.std() * np.sqrt(252)
sortino = annual_return / downside_vol if downside_vol != 0 else 0.0
# 胜率
wins = sum(1 for p in pnl_list if p > 0)
total_trades = len(pnl_list)
win_rate = wins / total_trades if total_trades > 0 else 0.0
# 平均盈利/平均亏损
avg_win = np.mean([p for p in pnl_list if p > 0]) if wins > 0 else 0.0
avg_loss = abs(np.mean([p for p in pnl_list if p < 0])) if (total_trades - wins) > 0 else 0.0
profit_factor = avg_win / avg_loss if avg_loss != 0 else float("inf")
return {
"initial_capital": self.initial_capital,
"final_equity": round(equity, 2),
"total_return": round((equity - self.initial_capital) / self.initial_capital * 100, 2),
"total_trades": total_trades,
"win_rate": round(win_rate, 3),
"profit_factor": round(profit_factor, 2),
"avg_win": round(avg_win, 2),
"avg_loss": round(avg_loss, 2),
"sharpe_ratio": round(sharpe, 3),
"sortino_ratio": round(sortino, 3),
"max_drawdown": round(max_drawdown, 2),
"max_drawdown_pct": round(max_drawdown_pct, 2),
"commission_rate": self.commission,
"slippage_rate": self.slippage,
"equity_curve": equity_curve
}
# ─────────────────────────────────────────────
# 示例:回测 XOM-CVX 配对
# ─────────────────────────────────────────────
if __name__ == "__main__":
df_xom = fetch_historical_klines("XOM.US", "1d", limit=1000)
df_cvx = fetch_historical_klines("CVX.US", "1d", limit=1000)
# 使用协整检验得到的参数
test_result = cointegration_test(df_xom["close"], df_cvx["close"])
backtester = PairBacktester(
symbol1="XOM.US",
symbol2="CVX.US",
hedge_ratio=test_result["hedge_ratio"],
intercept=test_result["intercept"],
entry_threshold=2.0,
exit_threshold=0.5,
stop_loss=3.0,
window=20,
initial_capital=100_000,
commission=0.001,
slippage=0.0005
)
results = backtester.run(df_xom, df_cvx)
print("\n" + "=" * 60)
print("配对交易回测结果")
print("=" * 60)
for key, value in results.items():
if key != "equity_curve":
print(f" {key:<25}: {value}")
五、回测结果解读:三个核心问题
拿到回测报告后,最重要的是回答三个问题:
5.1 问题一:策略盈利吗?
关注总收益率和夏普比率。如果回测周期覆盖了至少一个完整的牛熊周期(2008、2020、2022 等),且夏普比率 > 1.0,则策略具有初步的盈利基础。
| 夏普比率 | 评价 | 注意事项 |
|---|---|---|
| < 0.5 | 差 | 风险调整后无正阿尔法 |
| 0.5 - 1.0 | 一般 | 可接受,但成本敏感 |
| 1.0 - 2.0 | 良好 | 具备实际使用价值 |
| > 2.0 | 优秀 | 需警惕过拟合风险 |
5.2 问题二:最大回撤能承受吗?
配对交易通常被宣传为"低回撤",但这是有前提的。以下几个因素会导致回撤超出预期:
- 协整关系失效:两只股票的基本面逻辑发生根本变化(如一家公司被收购),价差不再回归。
- 极端事件冲击:2008 年、2020 年 3 月的市场流动性枯竭会导致价差急剧扩大,触及止损阈值前就损失惨重。
- 参数过拟合:用全部历史数据优化 entry/exit 阈值,会严重高估策略表现。
建议:最大回撤应控制在初始资本的 15% 以内,且回撤持续时间应不超过 30 个交易日。
5.3 问题三:交易成本敏感吗?
配对交易是均值回归策略,利润来自无数个小额的收敛交易。如果交易成本(佣金 + 滑点)超过每笔交易的平均利润,策略将无法盈利。
一个简单的检验:
$$\text{滑点损耗} = 2 \times \text{滑点率} \times \text{平均持仓金额}$$
如果这个损耗接近或超过平均单笔利润,则策略对执行质量极度敏感,不适合低流动性标的。
回测局限性说明:上述回测结果基于历史数据模拟,不构成未来收益保证。回测中存在以下局限性:未完全模拟实际交易中的滑点和市场冲击成本(已假设 0.05% 固定滑点);未考虑极端行情下的流动性枯竭风险;样本量有限,统计显著性可能不足。建议在实际使用前进行更长时间跨度的验证。
六、工程化注意事项
6.1 美股数据使用边界
必须强调一个关键的技术事实:
| 数据类型 | 是否支持 | 说明 |
|---|---|---|
| 历史 K 线数据 | ✅ 10 年级别,清洗对齐 | 适用于跨周期策略回测 |
| 实时 K 线 | ✅ /kline/latest 端点 |
可用于美股实时监控 |
| 逐笔成交(trades) | ❌ 不支持 | 美股和 A 股不提供 tick 级逐笔 |
对于美股的实时监控,建议使用 TickDB /kline/latest 端点轮询(建议频率不超过 1 次/秒,配合 3001 限频处理),而非尝试 WebSocket 订阅 trades 频道。
6.2 仓位管理的黄金法则
配对交易中,永远不要"All in"一对配对。建议单对配比不超过总资本的 10%,同时运行不超过 5 对配对(分散单个配对失效的风险)。
6.3 协整关系的动态维护
协整关系不是永久的。建议每个季度重新运行一次筛选流程,检查已有配对的协整 p-value 是否恶化。如果某对的 p-value 从 0.002 上升到 0.15,应当考虑剔除并寻找替代配对。
结语
统计套利是一门关于耐心的生意。
它不追逐涨停板,不预测美联储决议,不押注宏观叙事。它安静地在两只股票的价格缝隙中,寻找那个总会回归的均衡点。
但这份耐心需要工程上的严谨支撑。筛选流程中的每一步——从协整检验到 Z-Score 阈值——都值得反复推敲。回测不是为了证明策略"有效",而是为了理解策略"在什么条件下"有效,以及"在什么条件下"会失效。
如果你希望亲手运行本文的筛选流程并验证配对效果,可以使用 TickDB 的历史 K 线数据接口获取 10 年级别的清洗数据,进行完整的回测验证。
下一步行动
如果你想亲手实现配对筛选流程:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台生成 API Key
- 设置环境变量
TICKDB_API_KEY,复制本文代码即可运行
如果你需要机构级的历史全量数据(用于更严苛的协整验证和参数优化),联系 [email protected] 了解专业版数据方案。
如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,可以让 AI 帮你生成配对筛选和数据获取的辅助代码。
本文不构成任何投资建议。市场有风险,投资需谨慎。