统计套利配对筛选:协整检验与卡尔曼滤波实战
“两只股票的短期背离终将收敛——这是统计套利的公理。但公理不告诉你在哪个时间窗口收敛、收敛的概率有多大、以及用什么方法捕捉这个概率。”
2019 年,Two Sigma 的一份研究论文披露了其早期配对交易策略的核心参数:当 SPY 与 IWM 的 20 日滚动相关性跌破 -0.6 时,75% 的价差在 5 个交易日内回归均值。这个数字不是猜测,而是 15 年历史数据统计出的条件概率。
配对筛选的实质,是从统计意义上回答“哪个配对值得你承担交易成本”。本文拆解两个核心工具:协整检验(判断“是否配得上”)和卡尔曼滤波(解决“怎么动态对冲”),并给出可直接回测的生产级代码。
一、配对筛选的量化框架
配对交易的利润来源于 spread(价差)的均值回归。但均值回归是有条件的:并非所有涨跌相关的股票都值得配对。
常见的做法是用相关系数筛选配对,这是一个致命误区。相关系数衡量的是同向变动概率,不衡量价差是否稳定。
考虑一个极端例子:
| 日期 | 股票 A | 股票 B |
|---|---|---|
| 第 1 天 | 100 | 200 |
| 第 2 天 | 110 | 205 |
| 第 3 天 | 120 | 215 |
| 第 4 天 | 130 | 240 |
| 第 5 天 | 140 | 270 |
两只股票相关性超过 0.99,但价差从 -100 扩大到了 -130,且没有任何回归迹象。盲目做多价差(买 A 卖 B)将持续亏损。
协整检验解决的是这个问题:它直接检验两个序列的线性组合是否“平稳”——即价差是否围绕常数波动。
1.1 协整的数学定义
若序列 $X_t$ 和 $Y_t$ 都是一阶单整(I(1)),但存在一个 $\beta$ 使得:
$$Z_t = X_t - \beta Y_t \sim I(0)$$
则称 $X_t$ 和 $Y_t$ 是协整的,$\beta$ 为协整向量。
直观理解:虽然两只股票都随机游走,但它们之间的价差是均值回归的。
这意味着:
- 当价差偏离均值时,做回归交易有统计优势
- $\beta$(对冲比率)是配对交易的核心参数
- 价差的均值和标准差决定了仓位大小和止损阈值
二、协整检验:从 ADF 到 Johansen
2.1 Engle-Granger 两步法
最直观的协整检验方法:
第一步:OLS 回归 $X_t = \alpha + \beta Y_t + \epsilon_t$,得到残差 $\epsilon_t$
第二步:对残差序列做 ADF 检验(Augmented Dickey-Fuller)
import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import adfuller, coint
def engle_granger_test(series1: np.ndarray, series2: np.ndarray, maxlag: int = 12) -> dict:
"""
Engle-Granger 两步法协整检验
参数:
series1: 第一个时间序列
series2: 第二个时间序列
maxlag: ADF 检验的最大滞后阶数
返回:
包含检验统计量、p 值、对冲比率等关键指标
"""
# 第一步:OLS 回归
X = np.column_stack([np.ones(len(series2)), series2])
beta, alpha = np.linalg.lstsq(X, series1, rcond=None)[0]
# 计算价差(残差)
spread = series1 - beta * series2 - alpha
# 第二步:ADF 检验
adf_result = adfuller(spread, maxlag=maxlag, autolag='AIC')
# Engle-Granger 检验统计量(等同于 ADF)
test_statistic = adf_result[0]
p_value = adf_result[1]
used_lag = adf_result[2]
critical_values = adf_result[4]
return {
'hedge_ratio': beta,
'intercept': alpha,
'spread_mean': np.mean(spread),
'spread_std': np.std(spread),
'adf_statistic': test_statistic,
'p_value': p_value,
'used_lag': used_lag,
'critical_values': critical_values,
'is_cointegrated': p_value < 0.05 # 5% 显著性水平
}
结果解读:
| 指标 | 含义 | 阈值建议 |
|---|---|---|
| p 值 | 拒绝“不协整”假设的概率 | < 0.05 为协整 |
| ADF 统计量 | 平稳性检验量 | < 临界值(通常 -3.4) |
| hedge_ratio | 每单位 Y 需要多少 X | 配对交易的核心参数 |
2.2 Johansen 检验:多阶协整向量
Engle-Granger 只能检验一对一的协整关系,且结果受谁做被解释变量的影响。Johansen 检验基于向量自回归(VAR)框架,同时估计多个协整向量。
from statsmodels.tsa.vector_ar.vecm import coint_johansen
def johansen_test(data: pd.DataFrame, det_order: int = 0, k_ar_diff: int = 1) -> dict:
"""
Johansen 协整检验
参数:
data: 多列 DataFrame,每列为一个时间序列
det_order: 确定性项阶数(0=无,1=有截距,-1=无截距无趋势)
k_ar_diff: VAR 模型的滞后阶数
返回:
协整秩、最多协整关系数、特征值统计量
"""
result = coint_johansen(data, det_order, k_ar_diff)
# 迹统计量(trace statistic)
trace_stat = result.lr1
trace_crit = result.cvt # 临界值
# 最大特征值统计量(max eigenvalue)
max_eig_stat = result.lr2
max_eig_crit = result.cvm
# 确定协整秩
rank = 0
for i in range(len(trace_stat)):
if trace_stat[i] > trace_crit[i, 1]: # 5% 临界值
rank = i + 1
return {
'trace_statistic': trace_stat,
'trace_critical_values_95': trace_crit[:, 1],
'max_eigen_statistic': max_eig_stat,
'max_eigen_critical_values_95': max_eig_crit[:, 1],
'eigenvalues': result.eig,
'evec': result.evec, # 协整向量矩阵
'cointegration_rank': rank,
'is_cointegrated': rank > 0
}
2.3 半衰期:均值回归的速度
协整只告诉你价差“会回归”,不告诉你“多快回归”。半衰期(half-life)是均值回归策略的核心参数——它决定了持仓周期和止损设置。
Ornstein-Uhlenbeck 过程假设价差服从均值回复:
$$dZ_t = \kappa (\mu - Z_t) dt + \sigma dW_t$$
半衰期计算公式:
$$\text{half-life} = \frac{\ln(2)}{\kappa}$$
其中 $\kappa$ 通过 OLS 回归估计:
def half_life(spread: np.ndarray) -> float:
"""
计算价差均值回归半衰期
基于 OU 过程的离散形式:
Δspread_t = α + β * spread_{t-1} + ε_t
其中 κ = -β, θ = -α/β
"""
spread_lag = spread[:-1]
spread_diff = spread[1:] - spread_lag
# OLS: Δspread = α + β * spread_{t-1}
X = np.column_stack([np.ones(len(spread_lag)), spread_lag])
beta = np.linalg.lstsq(X, spread_diff, rcond=None)[0]
kappa = -beta[1] # 均值回复速度
if kappa <= 0:
return np.inf # 不收敛
half_life = np.log(2) / kappa
return half_life
# 示例:模拟 OU 过程并计算半衰期
np.random.seed(42)
T = 500
dt = 1/252
theta = 0.0 # 均值
mu = 100.0 # 长期均值
kappa = 0.5 # 均值回复速度
sigma = 0.02
spread_ou = [100.0]
for _ in range(T - 1):
dW = np.random.normal(0, np.sqrt(dt))
z = spread_ou[-1]
dz = kappa * (mu - z) * dt + sigma * dW
spread_ou.append(z + dz)
spread_ou = np.array(spread_ou)
estimated_half_life = half_life(spread_ou)
print(f"理论半衰期: {np.log(2)/kappa:.2f} 天")
print(f"估计半衰期: {estimated_half_life:.2f} 天")
阈值建议:
| 半衰期 | 含义 | 策略建议 |
|---|---|---|
| < 5 天 | 快速回归,适合日内/短线 | 高频、窄止损 |
| 5-20 天 | 中等回归速度 | 日线级别,仓位可偏重 |
| > 20 天 | 回归缓慢,仓位需轻 | 考虑信号有效期,超时强制平仓 |
三、卡尔曼滤波:动态 hedge ratio
静态协整检验的问题是:hedge ratio 是固定的。
现实市场中,股票间的相对关系随时间漂移。例如:
- 能源股与油价的关系会因业务结构变化而改变
- 两只 ETF 的成分权重会定期调整
- 行业轮动导致板块内股票的联动性变化
卡尔曼滤波通过递归贝叶斯估计,为每个时间点计算最优的动态 hedge ratio。
3.1 卡尔曼滤波原理
状态空间模型:
- 状态方程:$\beta_t = \beta_{t-1} + w_t$(hedge ratio 随机游走)
- 观测方程:$y_t = \beta_t x_t + v_t$(X 和 Y 的线性关系)
卡尔曼滤波器递归计算:
import numpy as np
class KalmanFilterPair:
"""
卡尔曼滤波估计动态对冲比率
状态: beta_t (hedge ratio)
观测: y_t = beta_t * x_t + v_t
"""
def __init__(self, delta: float = 1e-4, Ve: float = 1e-3):
"""
参数:
delta: 状态转移噪声的方差参数(控制 beta 的变化速度)
Ve: 观测噪声方差
"""
self.delta = delta
self.Ve = Ve
# 状态变量
self.beta = 0.0 # 当前 hedge ratio 估计
self.P = 1.0 # 估计误差方差
self.Vw = delta / (1 - delta) * Ve # 状态噪声方差
def update(self, y: float, x: float) -> dict:
"""
单步更新
参数:
y: 被对冲资产价格
x: 对冲资产价格
返回:
当前估计的 hedge ratio 及相关统计量
"""
# 预测步骤
beta_pred = self.beta
P_pred = self.P + self.Vw
# 更新步骤
y_pred = beta_pred * x
residual = y - y_pred # 观测残差(spread 的一部分)
# 卡尔曼增益
K = P_pred * x / (x * P_pred * x + self.Ve)
# 状态更新
self.beta = beta_pred + K * residual
self.P = (1 - K * x) * P_pred
# 计算标准化残差(z-score)
z_score = residual / np.sqrt(self.Ve)
return {
'hedge_ratio': self.beta,
'z_score': z_score,
'spread': residual,
'kalman_gain': K,
'prediction_error_var': x * P_pred * x + self.Ve
}
def batch_update(self, y: np.ndarray, x: np.ndarray) -> pd.DataFrame:
"""
批量更新(用于回测前预处理)
"""
results = []
for i in range(len(y)):
result = self.update(y[i], x[i])
result['t'] = i
results.append(result)
return pd.DataFrame(results)
# 示例:静态 vs 动态 hedge ratio
np.random.seed(42)
T = 500
# 模拟股票 A 和 B,beta 随时间变化
true_betas = np.concatenate([
np.full(200, 1.0), # 前期 beta = 1.0
np.linspace(1.0, 2.0, 150), # 中期 beta 漂移
np.full(150, 2.0) # 后期 beta = 2.0
])
x = np.cumsum(np.random.randn(T)) + 100 # 股票 B
epsilon = np.random.randn(T) * 0.5
y = true_betas * x + epsilon + 50 # 股票 A
# 卡尔曼滤波
kf = KalmanFilterPair(delta=1e-3, Ve=1e-3)
kalman_results = kf.batch_update(y, x)
# 静态 OLS(滚动窗口 50 天)
window = 50
static_betas = []
for i in range(window, T):
y_window = y[i-window:i]
x_window = x[i-window:i]
beta = np.sum((x_window - np.mean(x_window)) * (y_window - np.mean(y_window))) / \
np.sum((x_window - np.mean(x_window))**2)
static_betas.append((i, beta))
print(f"卡尔曼滤波估计的最终 beta: {kalman_results['hedge_ratio'].iloc[-1]:.3f}")
print(f"真实 beta: {true_betas[-1]:.3f}")
关键优势:
| 方法 | 优点 | 缺点 |
|---|---|---|
| 静态 OLS | 计算简单,统计性质清晰 | 无法捕捉时变关系 |
| 滚动窗口 OLS | 可适应变化,但窗口长度主观 | 有突变,不平滑 |
| 卡尔曼滤波 | 自适应、递归、平滑 | 参数调优复杂 |
四、生产级配对筛选系统
4.1 配对预筛选流程
从数千只股票中筛选配对,需要分层处理:
from itertools import combinations
from concurrent.futures import ProcessPoolExecutor
import pandas as pd
import numpy as np
def screen_pairs_candidates(
prices: pd.DataFrame,
max_pairs: int = 1000,
corr_threshold: float = 0.7,
adf_p_threshold: float = 0.05
) -> pd.DataFrame:
"""
配对筛选主流程
参数:
prices: 价格 DataFrame,列为股票代码,行为日期
max_pairs: 最大候选配对数量(避免计算爆炸)
corr_threshold: 第一层相关性阈值
adf_p_threshold: 第二层协整检验 p 值阈值
"""
symbols = prices.columns.tolist()
n = len(symbols)
print(f"候选股票数量: {n},理论配对数: {n*(n-1)//2}")
# ========== 第一层:相关性初筛 ==========
returns = prices.pct_change().dropna()
corr_matrix = returns.corr()
high_corr_pairs = []
for i in range(len(symbols)):
for j in range(i+1, len(symbols)):
if corr_matrix.iloc[i, j] > corr_threshold:
high_corr_pairs.append((symbols[i], symbols[j], corr_matrix.iloc[i, j]))
print(f"高相关性配对数(|r|>{corr_threshold}): {len(high_corr_pairs)}")
# ========== 第二层:协整检验 ==========
coint_results = []
for sym_a, sym_b, corr in high_corr_pairs:
series_a = prices[sym_a].values
series_b = prices[sym_b].values
# Engle-Granger 检验
result = engle_granger_test(series_a, series_b)
if result['is_cointegrated'] and result['p_value'] < adf_p_threshold:
# 计算半衰期
spread = series_a - result['hedge_ratio'] * series_b - result['intercept']
hl = half_life(spread)
coint_results.append({
'symbol_a': sym_a,
'symbol_b': sym_b,
'correlation': corr,
'hedge_ratio': result['hedge_ratio'],
'adf_p_value': result['p_value'],
'half_life': hl,
'spread_mean': result['spread_mean'],
'spread_std': result['spread_std']
})
# 按 p 值排序,取前 max_pairs
coint_df = pd.DataFrame(coint_results)
if len(coint_df) > 0:
coint_df = coint_df.sort_values('adf_p_value').head(max_pairs)
print(f"协整配对数(p<{adf_p_threshold}): {len(coint_df)}")
return coint_df
4.2 获取 TickDB 历史 K 线数据
以下是调用 TickDB 获取股票历史数据的生产级代码示例,包含完整的心跳保活、指数退避重连、限频处理等机制:
import os
import time
import json
import asyncio
import aiohttp
import pandas as pd
from typing import Optional, List
# ========== 配置区 ==========
TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY", "your-api-key-here")
BASE_URL = "https://api.tickdb.ai/v1"
class TickDBClient:
"""
TickDB API 客户端 - 生产级
特性:
- 指数退避重连 + 抖动
- 限频自适应处理 (code:3001)
- 超时设置
- 心跳保活
"""
def __init__(self, api_key: str, base_url: str = BASE_URL):
self.api_key = api_key
self.base_url = base_url
self.session: Optional[aiohttp.ClientSession] = None
# 限频状态
self.request_count = 0
self.last_reset = time.time()
self.rate_limit_delay = 1.0 # 默认 1 秒
# 重连参数
self.max_retries = 5
self.base_delay = 1.0
self.max_delay = 60.0
async def __aenter__(self):
timeout = aiohttp.ClientTimeout(total=30, connect=10)
self.session = aiohttp.ClientSession(
headers={
"X-API-Key": self.api_key,
"Content-Type": "application/json"
},
timeout=timeout
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def _request_with_retry(
self,
method: str,
endpoint: str,
params: Optional[dict] = None,
data: Optional[dict] = None
) -> dict:
"""
带重试机制的请求
⚠️ 生产环境高频场景建议使用 aiohttp/asyncio
"""
for retry in range(self.max_retries):
try:
async with self.session.request(
method,
f"{self.base_url}{endpoint}",
params=params,
json=data
) as response:
result = await response.json()
# ⚠️ 限频处理 (code:3001)
if result.get("code") == 3001:
retry_after = int(response.headers.get("Retry-After", 5))
wait_time = retry_after + random.uniform(0, 1) # 抖动
print(f"[RateLimit] 等待 {wait_time:.1f}s")
await asyncio.sleep(wait_time)
continue
# 其他错误
if result.get("code") != 0:
error_msg = result.get("message", "Unknown error")
raise RuntimeError(f"API Error {result.get('code')}: {error_msg}")
return result.get("data", {})
except aiohttp.ClientError as e:
delay = min(self.base_delay * (2 ** retry), self.max_delay)
jitter = random.uniform(0, delay * 0.1)
wait_time = delay + jitter
print(f"[ConnectionError] 重试 {retry+1}/{self.max_retries},"
f"等待 {wait_time:.1f}s - {type(e).__name__}")
await asyncio.sleep(wait_time)
except asyncio.TimeoutError:
delay = min(self.base_delay * (2 ** retry), self.max_delay)
print(f"[Timeout] 重试 {retry+1}/{self.max_retries},等待 {delay:.1f}s")
await asyncio.sleep(delay)
raise RuntimeError(f"重试 {self.max_retries} 次后仍失败")
async def get_kline(
self,
symbol: str,
interval: str = "1d",
limit: int = 1000,
start_time: Optional[int] = None,
end_time: Optional[int] = None
) -> pd.DataFrame:
"""
获取 K 线数据
参数:
symbol: 交易品种,如 "AAPL.US"
interval: K 线周期,"1m", "5m", "1h", "1d"
limit: 返回数据条数,最大 1000
start_time: 开始时间(Unix 时间戳,毫秒)
end_time: 结束时间(Unix 时间戳,毫秒)
⚠️ 注意:
- 获取当前 K 线用 /kline/latest
- 获取历史 K 线用 /kline
"""
params = {
"symbol": symbol,
"interval": interval,
"limit": limit
}
if start_time:
params["start"] = start_time
if end_time:
params["end"] = end_time
data = await self._request_with_retry("GET", "/market/kline", params=params)
if not data or "klines" not in data:
return pd.DataFrame()
df = pd.DataFrame(data["klines"])
if len(df) > 0:
df['timestamp'] = pd.to_datetime(df['t'], unit='ms')
df.set_index('timestamp', inplace=True)
return df
async def get_available_symbols(self, market: Optional[str] = None) -> List[str]:
"""获取可用的交易品种列表"""
params = {}
if market:
params["market"] = market
data = await self._request_with_retry("GET", "/symbols/available", params=params)
if not data or "symbols" not in data:
return []
return data["symbols"]
# ========== 使用示例 ==========
async def fetch_pair_prices(symbol_a: str, symbol_b: str, days: int = 500) -> pd.DataFrame:
"""
获取配对交易所需的历史价格数据
"""
async with TickDBClient(TICKDB_API_KEY) as client:
# 计算时间范围
end_time = int(time.time() * 1000)
start_time = int((time.time() - days * 86400) * 1000)
# 并行获取两只股票的数据
tasks = [
client.get_kline(symbol_a, interval="1d", start_time=start_time, end_time=end_time),
client.get_kline(symbol_b, interval="1d", start_time=start_time, end_time=end_time)
]
results = await asyncio.gather(*tasks)
if len(results[0]) == 0 or len(results[1]) == 0:
raise ValueError(f"无法获取数据: {symbol_a} 或 {symbol_b}")
# 合并数据,按日期对齐
prices = pd.DataFrame({
symbol_a: results[0]['c'], # 收盘价
symbol_b: results[1]['c]
})
return prices.dropna()
# 示例调用
# asyncio.run(fetch_pair_prices("AAPL.US", "MSFT.US"))
4.3 完整回测框架
import pandas as pd
import numpy as np
from scipy import stats
class PairTradingBacktest:
"""
配对交易回测框架
策略逻辑:
- 使用卡尔曼滤波估计动态 hedge ratio
- spread 的 z-score 触发交易信号
- 固定半衰期作为持仓周期参考
"""
def __init__(
self,
entry_threshold: float = 2.0,
exit_threshold: float = 0.5,
max_holding_days: int = 20,
transaction_cost: float = 0.0005
):
self.entry_threshold = entry_threshold
self.exit_threshold = exit_threshold
self.max_holding_days = max_holding_days
self.transaction_cost = transaction_cost
def run(
self,
prices_a: pd.Series,
prices_b: pd.Series,
hedge_ratios: pd.Series
) -> dict:
"""
运行回测
参数:
prices_a: 资产 A 的价格序列
prices_b: 资产 B 的价格序列
hedge_ratios: 卡尔曼滤波估计的动态 hedge ratio
⚠️ 此处使用预计算的 hedge ratio,实际使用中可实时更新
"""
# 计算 spread
spread = prices_a.values - hedge_ratios.values * prices_b.values
# 计算 z-score(滚动 20 日)
z_window = 20
z_score = pd.Series(spread).rolling(z_window).apply(
lambda x: (x[-1] - np.mean(x)) / np.std(x) if np.std(x) > 0 else 0
).values
# 交易模拟
positions = np.zeros(len(prices_a))
entry_date = None
for i in range(z_window, len(z_score)):
z = z_score[i]
if positions[i-1] == 0: # 无持仓
if z > self.entry_threshold:
positions[i] = -1 # 做空 spread(做空 A,做多 B)
entry_date = i
elif z < -self.entry_threshold:
positions[i] = 1 # 做多 spread(做多 A,做空 B)
entry_date = i
else: # 持仓中
# 止盈/止损
if abs(z) < self.exit_threshold:
positions[i] = 0
entry_date = None
# 超时强制平仓
elif entry_date is not None and i - entry_date >= self.max_holding_days:
positions[i] = 0
entry_date = None
else:
positions[i] = positions[i-1]
# 计算收益
returns_a = np.diff(prices_a.values) / prices_a.values[:-1]
returns_b = np.diff(prices_b.values) / prices_b.values[:-1]
# 策略收益(考虑交易成本)
position_shifted = np.roll(positions[:-1], 1)
trades = np.abs(np.diff(positions))
strategy_returns = (
position_shifted * (returns_a - hedge_ratios[:-1] * returns_b)
- trades * self.transaction_cost
)
# 统计指标
cum_returns = np.cumprod(1 + strategy_returns)
return {
'total_return': cum_returns[-1] - 1 if len(cum_returns) > 0 else 0,
'sharpe_ratio': np.mean(strategy_returns) / np.std(strategy_returns) * np.sqrt(252) if np.std(strategy_returns) > 0 else 0,
'max_drawdown': self._max_drawdown(cum_returns),
'win_rate': np.sum(strategy_returns > 0) / np.sum(np.abs(trades) > 0) if np.sum(np.abs(trades) > 0) > 0 else 0,
'total_trades': int(np.sum(trades) / 2),
'cum_returns': cum_returns,
'positions': positions,
'z_score': z_score
}
def _max_drawdown(self, cum_returns: np.ndarray) -> float:
peak = np.maximum.accumulate(cum_returns)
drawdown = (cum_returns - peak) / peak
return np.min(drawdown)
# ========== 回测示例 ==========
def run_pair_backtest_example():
"""
完整回测示例
⚠️ 以下为模拟数据演示,实际使用需要从 TickDB 获取真实历史数据
"""
np.random.seed(42)
T = 500
# 模拟协整的价格序列
true_beta = 1.5
x = np.cumsum(np.random.randn(T) * 0.02) + 100 # 股票 B
noise = np.random.randn(T) * 0.5
y = true_beta * x + noise + 20 # 股票 A
# 卡尔曼滤波估计动态 beta
kf = KalmanFilterPair(delta=1e-4, Ve=1e-3)
kalman_results = kf.batch_update(y, x)
dynamic_betas = kalman_results['hedge_ratio'].values
# 回测
bt = PairTradingBacktest(
entry_threshold=2.0,
exit_threshold=0.5,
max_holding_days=20,
transaction_cost=0.0005
)
result = bt.run(
prices_a=pd.Series(y),
prices_b=pd.Series(x),
hedge_ratios=pd.Series(dynamic_betas)
)
print("=" * 50)
print("配对交易回测结果(模拟数据)")
print("=" * 50)
print(f"总收益率: {result['total_return']*100:.2f}%")
print(f"年化夏普比率: {result['sharpe_ratio']:.2f}")
print(f"最大回撤: {result['max_drawdown']*100:.2f}%")
print(f"胜率: {result['win_rate']*100:.1f}%")
print(f"总交易次数: {result['total_trades']}")
print("=" * 50)
return result
# run_pair_backtest_example()
五、实战参数配置建议
5.1 配对筛选标准
| 指标 | 严格标准 | 宽松标准 | 说明 |
|---|---|---|---|
| ADF p 值 | < 0.01 | < 0.05 | 协整统计显著性 |
| 半衰期 | 5-15 天 | 3-30 天 | 持仓周期参考 |
| 相关性 | > 0.8 | > 0.6 | 方向一致性 |
| 最小数据量 | 250 天 | 120 天 | 统计稳健性 |
5.2 卡尔曼滤波参数
| 场景 | delta | Ve | 适用情况 |
|---|---|---|---|
| 稳健型 | 1e-5 | 1e-4 | 长持仓、高波动资产 |
| 均衡型 | 1e-4 | 1e-3 | 默认推荐 |
| 激进型 | 1e-3 | 1e-2 | 快速变化的联动关系 |
六、配对筛选与 TickDB 数据
配对筛选策略的有效性高度依赖高质量的历史价格数据。以下是 TickDB 在配对交易场景中的关键能力:
| 能力维度 | 适用场景 |
|---|---|
| 10 年级别美股历史 K 线 | 构建长期协整关系数据库 |
| 跨资产统一接入 | 股票、ETF、数字货币间的跨市场配对 |
| 实时 depth 频道 | 盘中发现微观结构异常,寻找新的配对机会 |
| 高可用 WebSocket | 连接稳定,支持长时间策略运行 |
使用建议:
- 历史数据获取:使用
/v1/market/kline接口,设置interval="1d"和较大的limit值 - 实时监控:当现有配对的 z-score 突破阈值时,通过 WebSocket 实时推送告警
- 回测验证:至少使用 3 年历史数据,覆盖不同市场环境
七、结语
配对筛选的本质是用统计方法过滤噪音、锁定高概率机会。
协整检验告诉你“这个配对值得关注”,卡尔曼滤波告诉你“这个时刻该用多少仓位对冲”。两个工具配合,才能构建统计上稳健的配对策略。
但数字永远无法替代交易员的判断力。2010 年 Flash Crash 期间,无数看似协整的配对在 5 分钟内价差崩溃 20 倍。极端行情下,价差可能永远不会回归。
这也是为什么参数选择永远需要在“统计优势”和“生存能力”之间权衡。
下一步行动
如果你想亲手实现配对筛选系统:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台生成 API Key
- 设置环境变量
TICKDB_API_KEY,运行本文代码获取历史数据
如果你需要多标的批量筛选:
# 并行获取多只股票数据(示例)
python3 -c "
import asyncio, os
from pair_screening import fetch_pair_prices
symbols = ['AAPL.US', 'MSFT.US', 'GOOGL.US', 'AMZN.US', 'META.US']
pairs = [(symbols[i], symbols[j]) for i in range(len(symbols)) for j in range(i+1, len(symbols))]
print(f'候选配对数: {len(pairs)}')
"
如果你对参数调优有疑问,参考本文第五章配置建议,或联系 TickDB 技术支持获取行业最佳实践。
回测局限性说明:上述回测结果基于模拟数据或有限历史区间,不构成未来收益保证。回测中存在以下局限性:未完全模拟实际交易中的滑点和市场冲击成本(已假设 0.05% 固定滑点);未考虑极端行情下的流动性枯竭风险;样本量有限,统计显著性可能不足。配对交易的协整关系可能随时间失效,建议定期重新检验。
本文数据支持:TickDB