美股统计套利:跨标的协整配对交易的完整实现
"市场在短期内是投票机,长期是称重机。"——本杰明·格雷厄姆
1980 年代,纽约证券交易所的做市商柜台旁,格里·班伯格(Gerry Bamberger)观察到一个现象:两只看似毫无关联的航空股,其价格走势却呈现出惊人的同步性——每当Delta Airlines上涨,American Airlines总会在数分钟后跟随,而这种短暂的背离最终会回归。班伯格没有急于追涨杀跌,而是开始量化这种"背离-回归"的幅度与周期。他或许没有意识到,自己正在开创一个延续至今的量化策略范式:统计套利。
四十年后,协整配对交易(Cointegration Pairs Trading)已从华尔街的神秘黑箱演变为开源社区的经典教程。然而,当开发者试图将论文中的数学公式转化为生产级代码时,往往会发现三道鸿沟:如何在数千只美股中高效筛选协整对?历史回测中的"未来函数"陷阱如何规避?实盘监控中价差的剧烈波动如何实时捕获?
本文将从协整检验的统计原理出发,依次解决这三道鸿沟,给出从历史筛选到实时监控的完整闭环。全文包含可直接运行的生产级 Python 代码,数据获取部分采用 TickDB REST API。
一、从"虚假相关"到"真正协整"
1.1 相关性的致命陷阱
许多量化新手的第一步是计算两只股票的皮尔逊相关系数。相关性越高,似乎越适合做配对——这是一个危险的假设。
考虑一个经典案例:S&P 500 指数与北卡罗来纳州的溺水死亡人数,历史上相关系数高达 0.87。但这并不意味着买指数能预测溺水——两者都与"夏季"这个混淆变量强相关。相关性只告诉你两个变量"一起动",不告诉你"为什么一起动",更不告诉你"背离后会不会回来"。
在股票配对中,这个陷阱表现为:两只科技股可能因为同受"科技板块整体走势"驱动而呈现高相关,但当大盘下跌时,它们可能永远不会再回归到历史价差水平。
1.2 协整的统计定义
协整(Cointegration)回答的是更深层的问题:如果两个时间序列都是"不稳定的"(随机游走),是否存在一个线性组合,使这个组合变成"稳定的"(均值回归)?
形式化表述:对于两个 I(1) 序列 $X_t$ 和 $Y_t$(一阶单整,即一阶差分后平稳),如果存在一个参数 $\beta$,使得:
$$Z_t = X_t - \beta Y_t \sim I(0)$$
即残差序列 $Z_t$ 是平稳的,则称 $X_t$ 和 $Y_t$ 是协整的,$\beta$ 为协整向量。
核心直觉:两只股票可能各自随机游走,但它们之间的"差距"会围绕一个常数均值波动。这个"差距"就是配对交易的利润来源——当价差偏离均值时做均值回归。
1.3 Engle-Granger 两步法
工业界最常用的协整检验方法是 Engle-Granger 两步法:
第一步:OLS 回归。假设 $Y_t = \alpha + \beta X_t + \epsilon_t$,用最小二乘法估计 $\hat{\alpha}$ 和 $\hat{\beta}$。
第二步:ADF 检验。对残差 $\hat{\epsilon}_t = Y_t - \hat{\alpha} - \hat{\beta} X_t$ 进行单位根检验。如果拒绝单位根假设,则认为 $X_t$ 和 $Y_t$ 协整。
import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import adfuller, coint
def engle_granger_test(series1: np.ndarray, series2: np.ndarray) -> dict:
"""
Engle-Granger 两步法协整检验
Returns:
dict: 包含 t-statistic, p-value, 是否协整 的字典
"""
# 合并数据,处理 NaN
df = pd.DataFrame({'x': series1, 'y': series2}).dropna()
if len(df) < 30:
return {'coint': False, 'reason': '样本量不足'}
# statsmodels 内置协整检验( Engle-Granger 方法)
score, pvalue, _ = coint(df['x'], df['y'])
# 通常 p-value < 0.05 认为协整
is_coint = pvalue < 0.05
return {
'coint': is_coint,
't_stat': score,
'p_value': pvalue,
'significant': pvalue < 0.01, # 更严格的阈值
'sample_size': len(df)
}
二、从数千只股票中筛选候选配对
2.1 筛选流程设计
纽交所和纳斯达克共有超过 8,000 只股票,穷举所有配对需要 $C_2^{8000} \approx 32,000,000$ 次协整检验,这在计算上是不现实的。需要一个三阶段筛选流水线:
阶段一:流动性过滤
↓
阶段二:行业/板块相关性预筛选
↓
阶段三:协整检验
阶段一:流动性过滤。剔除日均成交量(ADTV)低于阈值、股价过低(可能被合股预期影响)、波动率异常(财报季前后)的标的。这一步将候选池缩减至 500-1000 只。
阶段二:行业/板块相关性预筛选。协整不要求同行业,但同行业股票受相同基本面驱动,历史上协整的概率更高。可按 GICS 行业分类先验缩小范围。
阶段三:协整检验。对候选对逐一执行 ADF 检验,记录 p-value,筛选阈值内的配对。
2.2 行业配对候选表
以下是美股中历史上协整关系较为稳定的行业配对类型:
| 行业组合 | 代表配对 | 协整逻辑 |
|---|---|---|
| 大型航空 | DAL / UAL | 运力、燃油成本、旅客需求高度同步 |
| 综合能源 | XOM / CVX | 全球油价传导、资本配置相似 |
| 四大银行 | JPM / BAC / WFC / C | 同业拆借利率、信贷周期共振 |
| 半导体设备 | ASML / LRCX / AMAT | 晶圆厂资本开支周期同步 |
| 保险巨头 | BRK.B / TRV / CB | 巨灾风险、利率环境趋同 |
| 零售巨头 | WMT / TGT | 消费必需品定价权博弈 |
2.3 生产级筛选代码
以下代码实现完整的配对筛选流水线。历史 K 线数据通过 TickDB API 获取,支持按需指定时间范围和标的数量:
import os
import time
import itertools
import requests
import pandas as pd
import numpy as np
from typing import List, Dict, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
# ============================================================
# TickDB API 调用封装
# ============================================================
class TickDBClient:
"""TickDB REST API 封装"""
BASE_URL = "https://api.tickdb.ai/v1"
def __init__(self, api_key: str = None):
self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
if not self.api_key:
raise ValueError("请设置 TICKDB_API_KEY 环境变量")
def _headers(self) -> dict:
return {"X-API-Key": self.api_key}
def get_kline(
self,
symbol: str,
interval: str = "1d",
start_time: int = None,
end_time: int = None,
limit: int = 500
) -> pd.DataFrame:
"""
获取历史 K 线数据
Args:
symbol: 交易品种,如 'JPM.US'
interval: K 线周期,'1d'/'1h'/'5m' 等
start_time: 毫秒级 Unix 时间戳
end_time: 毫秒级 Unix 时间戳
limit: 单次请求最大条数(TickDB 默认上限)
Returns:
DataFrame,包含 open/high/low/close/vol 等列
"""
params = {
"symbol": symbol,
"interval": interval,
"limit": limit
}
if start_time:
params["start_time"] = start_time
if end_time:
params["end_time"] = end_time
response = requests.get(
f"{self.BASE_URL}/market/kline",
headers=self._headers(),
params=params,
timeout=(3.05, 10) # 连接超时 3.05s,读取超时 10s
)
# TickDB 标准错误处理
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
print(f"频率超限,等待 {retry_after} 秒")
time.sleep(retry_after)
return self.get_kline(symbol, interval, start_time, end_time, limit)
data = response.json()
if data.get("code") != 0:
raise RuntimeError(f"API 错误: {data}")
return pd.DataFrame(data["data"])
# ============================================================
# 配对筛选流水线
# ============================================================
class PairsScreener:
"""协整配对筛选器"""
def __init__(
self,
db_client: TickDBClient,
min_adtv: int = 1_000_000, # 最小日均成交量
min_price: float = 5.0, # 最小股价
lookback_days: int = 252, # 回看天数(约一年)
coint_pvalue_threshold: float = 0.05
):
self.db = db_client
self.min_adtv = min_adtv
self.min_price = min_price
self.lookback_days = lookback_days
self.coint_threshold = coint_pvalue_threshold
# 计算时间范围(TickDB 使用毫秒级时间戳)
self.end_time = int(time.time() * 1000)
self.start_time = self.end_time - (lookback_days * 24 * 3600 * 1000)
def fetch_price_series(self, symbol: str) -> pd.Series:
"""
获取单只标的的收盘价序列
⚠️ 注意:实际生产中应加入心跳/重连机制
"""
df = self.db.get_kline(
symbol,
interval="1d",
start_time=self.start_time,
end_time=self.end_time
)
if df.empty or len(df) < 60:
return pd.Series(dtype=float)
# 返回收盘价序列
return df.set_index('time')['close'].sort_index()
def compute_adtv(self, symbol: str) -> float:
"""计算日均成交量"""
df = self.db.get_kline(
symbol,
interval="1d",
start_time=self.start_time,
end_time=self.end_time
)
if df.empty or 'vol' not in df.columns:
return 0.0
return df['vol'].tail(30).mean()
def liquidity_filter(self, symbols: List[str]) -> List[str]:
"""
阶段一:流动性过滤
⚠️ 生产环境建议异步批量请求,避免阻塞
"""
filtered = []
for sym in symbols:
try:
# 并行检查价格和成交量
price_series = self.fetch_price_series(sym)
if price_series.empty:
continue
# 最新价格 > 阈值
current_price = price_series.iloc[-1]
if current_price < self.min_price:
continue
# 日均成交量 > 阈值
adtv = self.compute_adtv(sym)
if adtv < self.min_adtv:
continue
filtered.append(sym)
print(f"✓ {sym}: 价格=${current_price:.2f}, ADTV=${adtv/1e6:.1f}M")
except Exception as e:
print(f"✗ {sym}: {e}")
continue
return filtered
def coint_test_pair(
self,
series1: pd.Series,
series2: pd.Series
) -> Dict:
"""对一对标的执行协整检验"""
from statsmodels.tsa.stattools import coint
# 对齐数据
df = pd.DataFrame({'s1': series1, 's2': series2}).dropna()
if len(df) < 60:
return {'coint': False, 'reason': '样本不足'}
score, pvalue, _ = coint(df['s1'], df['s2'])
return {
'coint': pvalue < self.coint_threshold,
'p_value': pvalue,
't_stat': score,
'sample_size': len(df)
}
def screen_pairs(
self,
symbols: List[str],
max_pairs: int = 100
) -> pd.DataFrame:
"""
完整筛选流程
Returns:
DataFrame,包含候选配对及其统计量
"""
print(f"\n=== 阶段一:流动性过滤 ===")
liquid_symbols = self.liquidity_filter(symbols)
print(f"\n流动性过滤后剩余: {len(liquid_symbols)} 只\n")
if len(liquid_symbols) < 2:
return pd.DataFrame()
print(f"=== 阶段二:协整检验 ===")
print(f"待检验组合数: {len(liquid_symbols) * (len(liquid_symbols) - 1) // 2}")
results = []
# ⚠️ 生产环境建议:
# 1. 加入指数退避重连
# 2. 使用 asyncio 并发请求
# 3. 加入速率限制
for i, sym1 in enumerate(liquid_symbols):
for sym2 in liquid_symbols[i+1:]:
try:
series1 = self.fetch_price_series(sym1)
series2 = self.fetch_price_series(sym2)
result = self.coint_test_pair(series1, series2)
if result['coint']:
results.append({
'symbol1': sym1,
'symbol2': sym2,
'p_value': result['p_value'],
't_stat': result['t_stat'],
'sample_size': result['sample_size']
})
print(f"✓ 候选配对: {sym1} / {sym2} (p={result['p_value']:.4f})")
except Exception as e:
print(f"✗ {sym1}/{sym2}: {e}")
continue
df_results = pd.DataFrame(results)
if not df_results.empty:
df_results = df_results.sort_values('p_value').head(max_pairs)
return df_results
2.4 回测中的"未来函数"陷阱
重要警告:配对筛选必须使用样本内数据,验证必须使用样本外数据。常见的"未来函数"陷阱包括:
| 陷阱类型 | 描述 | 规避方法 |
|---|---|---|
| 全局最优陷阱 | 用全部历史数据筛选配对,然后在相同数据上回测 | 时间序列切分:筛选期(2018-2021)/ 验证期(2022-2023)/ 测试期(2024) |
| 幸存者偏差 | 只选用当前仍存在的股票,忽略退市标的 | 使用当年全量股票列表(含退市)回测 |
| 前视偏差 | 计算指标时不经意使用了未来数据 | 确保每个时间点只使用截至该点的历史数据 |
三、价差 Z-Score:信号生成的量化框架
3.1 价差与价差的数学形式
协整检验给出的协整向量 $\beta$ 定义了价差的计算方式:
$$\text{spread}_t = X_t - \beta Y_t$$
其中 $\beta$ 通过 OLS 回归拟合得到。然而,$\beta$ 本身可能随时间漂移。滚动协整(Rolling Cointegration)通过固定窗口计算滚动 $\beta$,能更好地适应市场结构变化:
$$\beta_t = \frac{\text{Cov}(X_{t-n:t}, Y_{t-n:t})}{\text{Var}(Y_{t-n:t})}$$
3.2 Z-Score 标准化
直接使用价差绝对值作为信号是不可行的,因为不同配对的价差波动幅度差异巨大。Z-Score 将价差标准化到均值 0、标准差 1 的尺度:
$$\text{Z-score}t = \frac{\text{spread}t - \mu{\text{spread}}}{\sigma{\text{spread}}}$$
其中 $\mu$ 和 $\sigma$ 通常使用滚动窗口计算(避免未来数据泄露)。
3.3 布林带类信号设计
工业级配对交易系统通常设置多层阈值:
| 阈值 | 含义 | 动作 |
|---|---|---|
| Z > 2.0 | 价差显著高于历史均值 | 卖出 spread(做空 X,做多 Y) |
| Z < -2.0 | 价差显著低于历史均值 | 买入 spread(做多 X,做空 Y) |
| Z 回归至 0 | 价差回归 | 平仓获利 |
| Z > 3.0 或 < -3.0 | 极端偏离,可能趋势延续 | 止损或趋势追踪 |
class SpreadZScore:
"""价差 Z-Score 计算器(滚动窗口版本)"""
def __init__(
self,
window: int = 20, # 滚动窗口
z_entry: float = 2.0, # 入场阈值
z_exit: float = 0.0, # 出场阈值
z_stop: float = 3.0 # 止损阈值
):
self.window = window
self.z_entry = z_entry
self.z_exit = z_exit
self.z_stop = z_stop
def calculate_beta(
self,
x: pd.Series,
y: pd.Series
) -> pd.Series:
"""滚动协整系数 beta"""
return (
x.rolling(self.window)
.cov(y) /
y.rolling(self.window).var()
)
def calculate_spread(
self,
x: pd.Series,
y: pd.Series
) -> pd.Series:
"""计算价差 spread = x - beta * y"""
beta = self.calculate_beta(x, y)
# 对齐索引
aligned = pd.DataFrame({'x': x, 'y': y, 'beta': beta}).dropna()
return aligned['x'] - aligned['beta'] * aligned['y']
def calculate_zscore(self, spread: pd.Series) -> pd.Series:
"""
计算 Z-Score
⚠️ 滚动 Z-Score 中,mean 和 std 使用截至当前 t 的历史窗口
⚠️ 这要求数据严格按时间顺序处理
"""
mean = spread.rolling(self.window).mean().shift(1) # shift(1) 避免前视偏差
std = spread.rolling(self.window).std().shift(1)
return (spread - mean) / std
def generate_signals(self, zscore: pd.Series) -> pd.Series:
"""
基于 Z-Score 生成交易信号
Returns:
1: 做多 spread (做多 X,做空 Y)
-1: 做空 spread (做空 X,做多 Y)
0: 持仓不变
"""
signals = pd.Series(0, index=zscore.index)
# 入场逻辑
signals[zscore < -self.z_entry] = 1 # 低于 -2.0,做多
signals[zscore > self.z_entry] = -1 # 高于 2.0,做空
# 止损逻辑
stop_long = (signals == 1) & (zscore < -self.z_stop)
stop_short = (signals == -1) & (zscore > self.z_stop)
signals[stop_long | stop_short] = 0
return signals
四、实时监控:从历史回测到实盘推送
4.1 实盘与回测的三大差异
回测代码可以"假装"知道未来的收盘价,实盘则必须实时处理新数据。三大核心差异:
| 差异维度 | 回测假设 | 实盘要求 |
|---|---|---|
| 数据频率 | 可用固定周期(1d/1h)的收盘价 | 需要逐 tick 更新(毫秒级) |
| 信号触发 | 收盘价恰好等于阈值 | 价格在阈值附近反复穿越,需防抖动 |
| 执行滑点 | 固定比例滑点 | 流动性分层,实际滑点因价差深度而异 |
4.2 WebSocket 实时订阅架构
以下代码使用 TickDB WebSocket API 订阅两只股票的实时行情,并在价格变化时更新价差和 Z-Score:
import json
import time
import random
import asyncio
import websocket
from collections import deque
from typing import Dict, Optional
class RealTimePairsMonitor:
"""
配对交易实时监控器
⚠️ 以下为简化版本,生产环境需加入:
1. 心跳保活(ping/pong)
2. 指数退避 + 抖动重连
3. 限频处理(code:3001)
4. 异步架构(asyncio)
"""
WS_URL = "wss://stream.tickdb.ai/v1/ws/market"
def __init__(
self,
symbol1: str,
symbol2: str,
history_prices: Dict[str, list],
beta: float,
z_window: int = 20
):
self.symbol1 = symbol1
self.symbol2 = symbol2
self.beta = beta
# 初始化历史价格队列
self.prices1 = deque(history_prices.get(symbol1, []), maxlen=z_window)
self.prices2 = deque(history_prices.get(symbol2, []), maxlen=z_window)
# Z-Score 参数
self.z_window = z_window
# 持仓状态
self.position = 0 # 1: long spread, -1: short spread, 0: flat
self.entry_zscore = 0
# WebSocket 连接
self.ws: Optional[websocket.WebSocketApp] = None
def _build_subscribe_message(self) -> dict:
"""构建 TickDB WebSocket 订阅消息"""
return {
"cmd": "subscribe",
"params": {
"channels": [f"kline_{self.symbol1}_1m", f"kline_{self.symbol2}_1m"]
}
}
def calculate_current_zscore(self, price1: float, price2: float) -> float:
"""
计算当前 Z-Score
⚠️ 简化版:使用固定 beta 和固定窗口 std
⚠️ 生产版:应滚动更新 beta 和使用滚动 mean/std
"""
# 更新价格队列
self.prices1.append(price1)
self.prices2.append(price2)
if len(self.prices1) < self.z_window:
return 0.0
# 计算当前 spread
spread = price1 - self.beta * price2
# 计算历史 spread 序列
spreads = [
p1 - self.beta * p2
for p1, p2 in zip(list(self.prices1)[:-1], list(self.prices2)[:-1])
]
spreads.append(spread)
# 滚动 Z-Score(shift(1) 确保不前视)
mean = sum(spreads[:-1]) / len(spreads[:-1])
variance = sum((s - mean) ** 2 for s in spreads[:-1]) / (len(spreads[:-1]) - 1)
std = variance ** 0.5
if std < 1e-10:
return 0.0
return (spread - mean) / std
def on_message(self, ws, message: str):
"""处理接收到的 WebSocket 消息"""
try:
data = json.loads(message)
# TickDB WebSocket 心跳处理
if data.get("type") == "pong":
return
# 解析 K 线数据
if "data" in data:
kline = data["data"]
symbol = kline.get("symbol")
close_price = float(kline.get("close", 0))
if symbol == self.symbol1:
self._process_price(symbol, close_price)
elif symbol == self.symbol2:
self._process_price(symbol, close_price)
except json.JSONDecodeError:
pass
except Exception as e:
print(f"消息处理错误: {e}")
def _process_price(self, symbol: str, price: float):
"""处理价格更新"""
timestamp = time.strftime("%H:%M:%S")
# 计算当前 Z-Score
if symbol == self.symbol1:
price2 = self.prices2[-1] if self.prices2 else None
if price2:
zscore = self.calculate_current_zscore(price, price2)
self._check_signals(zscore, timestamp)
else:
price1 = self.prices1[-1] if self.prices1 else None
if price1:
zscore = self.calculate_current_zscore(price1, price)
self._check_signals(zscore, timestamp)
def _check_signals(self, zscore: float, timestamp: str):
"""检查是否触发入场/出场/止损信号"""
entry_threshold = 2.0
exit_threshold = 0.5
stop_threshold = 3.0
log = f"[{timestamp}] Z-Score: {zscore:.3f} | Position: {self.position}"
# 入场信号
if self.position == 0:
if zscore < -entry_threshold:
self.position = 1
self.entry_zscore = zscore
log += f" | 🟢 入场: 做多 spread (Z < -{entry_threshold})"
elif zscore > entry_threshold:
self.position = -1
self.entry_zscore = zscore
log += f" | 🔴 入场: 做空 spread (Z > {entry_threshold})"
# 出场信号
elif self.position == 1:
if zscore > -exit_threshold or zscore < -stop_threshold:
pnl = self.entry_zscore - zscore # 简化 PnL
self.position = 0
log += f" | ⚪ 出场: PnL={pnl:.3f}"
elif self.position == -1:
if zscore < exit_threshold or zscore > stop_threshold:
pnl = zscore - self.entry_zscore
self.position = 0
log += f" | ⚪ 出场: PnL={pnl:.3f}"
print(log)
def on_error(self, ws, error):
"""WebSocket 错误处理"""
print(f"WebSocket 错误: {error}")
def on_close(self, ws, close_status_code, close_msg):
"""连接关闭回调"""
print(f"连接关闭: {close_status_code} - {close_msg}")
def on_open(self, ws):
"""连接建立后发送订阅消息"""
subscribe_msg = self._build_subscribe_message()
ws.send(json.dumps(subscribe_msg))
print(f"已订阅: {self.symbol1}, {self.symbol2}")
def run_with_reconnect(self, api_key: str):
"""
运行监控(带指数退避重连)
⚠️ 生产环境建议:
1. 使用 aiohttp / websockets 库的异步版本
2. 分离心跳任务和消息处理任务
3. 使用 asyncio.gather 并发订阅多对配对
"""
base_delay = 1
max_delay = 60
retry_count = 0
while True:
try:
# ⚠️ 生产环境:心跳应使用定时器独立发送
headers = None # WebSocket 鉴权使用 URL 参数
self.ws = websocket.WebSocketApp(
f"{self.WS_URL}?api_key={api_key}",
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open
)
# ⚠️ 生产环境:心跳保活使用 threading.Timer 或 asyncio
self.ws.run_forever(
ping_interval=30,
ping_timeout=10
)
except Exception as e:
retry_count += 1
delay = min(base_delay * (2 ** retry_count), max_delay)
jitter = random.uniform(0, delay * 0.1)
sleep_time = delay + jitter
print(f"连接失败,{sleep_time:.1f} 秒后重连 (第 {retry_count} 次)")
time.sleep(sleep_time)
五、配对交易的风险管理与仓位控制
5.1 波动率归一化仓位
两只股票的价格量纲不同,直接按金额等权配对会导致暴露不对称。正确做法是按波动率归一化:
$$\text{shares}_X = \frac{\text{portfolio_value} \times w}{\text{price}_X \times \sigma_X}$$
$$\text{shares}_Y = \frac{\text{portfolio_value} \times w}{\text{price}_Y \times \sigma_Y}$$
其中 $w$ 为单对配对占总资金的比例,$\sigma$ 为滚动年化波动率。
5.2 最大持仓与相关性衰减
单一配对的最大仓位建议不超过总资金的 5%,全市场同时运行的配对数量建议不超过 10-15 对。随着配对数量增加,资金曲线相关性会趋于稳定(类似因子投资的边际效应递减)。
| 配对数量 | 预期夏普提升 | 边际成本 |
|---|---|---|
| 1 对 | 基准 | - |
| 5 对 | +30% | 低 |
| 10 对 | +45% | 中 |
| 20 对 | +52% | 高(需更多风控资源) |
5.3 协整失效的识别与退出
协整关系不是永恒的。识别协整失效的预警信号:
| 预警指标 | 阈值 | 动作 |
|---|---|---|
| 滚动 ADF p-value | 连续 5 日 > 0.2 | 降仓或平仓 |
| 价差半衰期 | 从历史均值延长 3 倍 | 减少头寸 |
| 协整 t-statistic | 接近临界值 | 观察或退出 |
六、TickDB 在统计套利场景中的价值定位
在协整配对交易的全流程中,数据获取环节的质量直接影响策略表现。以下是与主流数据源的对比:
| 能力维度 | Polygon | Yahoo Finance | TickDB |
|---|---|---|---|
| 历史 K 线完整性 | 高(10+ 年) | 中(部分标的缺失历史) | 高(清洗对齐的全量历史) |
| WebSocket 实时订阅 | 支持 | 不支持 | 支持 |
| API 稳定性 | 企业级 SLA | 免费版不稳定 | REST + WebSocket 双通道 |
| 美股 tick 级逐笔 | 支持 | 不支持 | 不支持(仅 K 线) |
| 多资产统一接入 | 是 | 否 | 是(股票、数字货币、期货等) |
对于配对交易策略,TickDB 的核心优势在于:
- 历史 K 线质量:完整的向前复权处理,避免因除权除息导致的虚假价差跳变
- 实时推送:WebSocket 订阅机制无需轮询,降低延迟并节省 API 调用配额
- 多资产覆盖:同一 API 可覆盖美股、港股、数字货币等资产,适合跨市场配对策略
七、结语
统计套利的本质,是在混沌的价格波动中寻找秩序。协整配对交易的成功,既依赖于数学上的严谨检验,也依赖于工程上的稳定实现。从协整系数 $\beta$ 的 OLS 估计,到 WebSocket 推送中的 Z-Score 实时计算,每一步都需要在理论假设与市场现实之间找到平衡。
配对筛选是多对多的组合优化问题,实盘监控是从历史到当下的范式转换。TickDB 在这两个环节中提供的价值,核心是可靠的数据管道:历史回测不因数据缺失而失效,实时信号不因接口抖动而失真。
当你读完本文,协整检验的数学公式和 WebSocket 的心跳机制都已掌握。但真正的门槛在于:你是否愿意为每一对候选配对手工验证协整关系的稳定性,还是愿意建立系统化的流水线,让机器替你完成枯燥的统计检验?
下一步行动
如果你是量化研究员,关注策略逻辑本身:
- 本文提供的筛选流程可作为骨架,用更长的历史数据(建议 5 年以上)验证协整关系的稳健性
- 建议在样本外数据上做至少 3 个月的 Walk-Forward 测试
如果你希望亲手运行本文代码:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台生成 API Key
- 设置环境变量
TICKDB_API_KEY,复制本文代码即可获取历史 K 线数据
如果你需要机构级数据方案:
- 对于需要 tick 级逐笔成交数据的场景(如港股、数字货币订单流分析),联系 [email protected] 了解 TickDB 专业版
风险提示:本文不构成任何投资建议。统计套利策略存在模型失效风险、市场条件变化风险及流动性风险。历史回测结果不代表未来表现,建议在实际使用前进行充分的样本外验证和压力测试。