原油期货展期收益的量化捕获
当账面利润在展期日蒸发
"原油期货的持有者发现,最危险的时刻不是价格暴跌那天,而是合约到期前的那个周四下午。"
2019年9月14日,沙特阿美油田遇袭的消息引爆市场。WTI原油期货单日暴涨14.68%,创下2008年以来最大单日涨幅。然而,对于持有期货多头合约的投资者而言,当周的账户净值却可能只上涨了3%——其余11%的涨幅在展期过程中被无声地吞噬。
这并非某个投资者的操作失误,而是期货合约与生俱来的数学特征:近月合约与远月合约之间的价差,决定了展期是收益的来源还是损耗。
本文拆解期货展期的收益机制,给出量化的展期收益计算框架,并提供可回测的生产级代码。
一、期货曲线的结构:展期收益从何而来
1.1 两个术语的精确区分
在讨论展期策略之前,必须明确两个常被混淆的概念:
展期(Roll):将到期的近月合约平仓,同时开仓下月合约的操作行为。这是一个时间节点上的操作。
展期收益(Roll Yield):由于期货曲线形态不同,展期操作本身可能产生正收益或负收益。这是一个数值结果。
理解两者区别至关重要:一个成熟的展期策略,其目标不是“按时展期”,而是“在最优曲线形态下展期”。
1.2 期货曲线的三种形态
| 曲线形态 | 市场结构 | 近月 vs 远月 | 展期收益 | 典型场景 |
|---|---|---|---|---|
| 正向市场(Contango) | 现货<期货 | 近月低价,远月高价 | 负收益 | 正常市场、产能过剩、存储成本高 |
| 反向市场(Backwardation) | 现货>期货 | 近月高价,远月低价 | 正收益 | 供应紧张、现货溢价、短期需求爆发 |
| 平坦曲线(Flat) | 现货≈期货 | 价格接近 | 几乎为零 | 供需平衡、过渡期 |
正向市场(Contango)示意:
时间 ──────────────────────────────────────────►
价格 ↑
远月合约 ████████████████
▏
▏ 展期成本:买入远月比卖出近月贵
▏
近月合约 ████████
反向市场(Backwardation)示意:
时间 ──────────────────────────────────────────►
价格 ↑
近月合约 ████████████████
▏
▏ 展期收益:卖出近月比买入远月贵
▏
远月合约 ████████
1.3 展期收益的数学表达
对于多头持仓,展期收益的计算公式为:
Roll Yield (Long) = (近月结算价 - 远月开仓价) / 近月结算价 × (持仓天数 / 合约天数)
等价变形:
Roll Yield (Long) ≈ -(远月升水 - 近月升水) / 近月价格 × 时间比率
简化理解:当市场为 Contango 时(远月 > 近月),多头展期相当于“以更高价格买入远月”,产生负收益;当市场为 Backwardation 时(近月 > 远月),多头展期相当于“以更高价格卖出近月”,产生正收益。
1.4 WTI 原油期货的历史曲线形态分布
| 年份 | Contango 天数占比 | Backwardation 天数占比 | 平均展期收益(多头年化) |
|---|---|---|---|
| 2015 | 68% | 32% | -4.2% |
| 2016 | 45% | 55% | +2.1% |
| 2017 | 72% | 28% | -5.8% |
| 2018 | 58% | 42% | -1.3% |
| 2019 | 61% | 39% | -2.7% |
| 2020(COVID) | 89% | 11% | -18.4% |
| 2021 | 35% | 65% | +8.9% |
| 2022 | 42% | 58% | +5.2% |
关键洞察:2020年是极端案例——原油期货在4月20日出现历史上首次负价格,Contango 曲线极度陡峭,持有实物原油的存储成本飙升,多头持有者的展期损耗触目惊心。而2021年能源危机期间,Backwardation 成为主导,展期收益成为重要的超额收益来源。
二、展期收益的驱动因素:为什么曲线会变形
2.1 供需基本面驱动
| 驱动因素 | Contango 强化条件 | Backwardation 强化条件 |
|---|---|---|
| 供给侧 | OPEC+增产、页岩油产能释放、库存高企 | 突发事件导致产能中断、制裁减产 |
| 需求侧 | 经济衰退预期、暖冬、炼厂检修季 | 经济复苏、寒冬、航煤需求爆发 |
| 存储成本 | 库存容量紧张、浮仓成本上升 | 库存充足、仓储充裕 |
2.2 时间节点效应
期货合约的展期并非均匀发生,而是集中在特定时间窗口:
近月合约到期前 5-7 个交易日:此时近月合约流动性下降,大额展期操作可能导致价格冲击。
主力合约切换日:当月合约持仓量开始向次月合约转移,市场流动性结构发生变化。
库存报告周(周三 EIA 发布):库存数据超预期会直接影响现货溢价预期,从而影响曲线形态。
WTI 原油期货主力合约切换时间线(以 CL 合约为例):
月初 ────────────────► 月末
│ │
│ 旧主力合约持仓量 │
│ ████████████░░░░░░░ │
│ 开始向次月合约转移 │
│ │
│ 新主力合约持仓量 │
│ ░░░░░░░████████████ │
│ 逐渐成为主力 │
│ │
◄──► 关键展期窗口:到期前 5-7 个交易日
此时近月流动性下降,冲击成本上升
2.3 波动率对展期策略的影响
展期收益具有显著的“路径依赖”特征。期权市场中的隐含波动率(IV)可以作为曲线即将变化的先行指标:
- 高 IV + Contango:通常预示后续曲线可能陡峭化,展期成本上升
- 低 IV + Backwardation:曲线可能趋于平坦,正展期收益收窄
- IV 期限结构倒挂:短期波动率 > 长期波动率,暗示市场担忧短期供应冲击
三、量化展期策略的三阶段设计
3.1 事前:曲线形态识别与展期窗口计算
展期决策的第一步是判断当前曲线是否支持展期操作。这需要两个核心指标:
展期成本率(Roll Cost Rate):
Roll Cost Rate = (次月合约价格 - 近月合约价格) / 近月合约价格 × (365 / 距到期天数)
时间价值损耗率(Time Decay Rate):
Time Decay Rate = (近月合约价格 - 现货价格) / 近月合约价格 × (365 / 距到期天数)
3.2 事中:动态调整展期时机
传统的“固定日期展期”存在明显缺陷:所有持有者在同一天操作,造成近月合约流动性枯竭和价格冲击。
改进策略一:时间加权展期
- 在到期前 7-10 个交易日内,按天数平均分配展期量
- 优点:降低单日冲击成本
- 缺点:可能错过最优曲线形态窗口
改进策略二:阈值触发展期
- 仅当展期成本率低于阈值(如年化 5%)时触发展期
- 优点:选择性展期,保留正收益窗口
- 缺点:可能错过展期窗口导致被动展期(近月合约流动性极差时)
改进策略三:信号驱动展期
- 结合库存数据、隐含波动率、曲线斜率变化构建综合信号
- 优点:可系统性捕获曲线变化带来的超额收益
- 缺点:策略复杂度高,需持续维护
3.3 事后:展期效果归因分析
每次展期操作后,应记录以下数据用于后续分析:
| 指标 | 计算方式 | 分析价值 |
|---|---|---|
| 实际展期收益率 | (平仓盈亏 + 开仓盈亏) / 持仓市值 | 评估策略有效性 |
| 曲线斜率变化 | 次月-近月价差的变化幅度 | 判断策略对曲线的影响 |
| 冲击成本 | 实际成交价 vs 报价中价的差值 | 优化下单算法 |
| 机会成本 | 若未展期持有至到期的理论收益 | 评估择时效果 |
四、生产级代码:原油期货展期回测系统
以下代码实现完整的展期回测框架,包括数据获取、曲线计算、信号生成和绩效归因。
4.1 环境配置与依赖
import os
import time
import json
import math
import random
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
import requests
import pandas as pd
import numpy as np
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# ============================================================
# ⚠️ 生产环境配置说明:
# 1. API Key 必须通过环境变量存储,禁止硬编码
# 2. 高频调用场景建议使用 aiohttp 异步架构
# 3. 建议配合 Redis 缓存合约元数据,减少重复查询
# ============================================================
class FuturesCurveShape(Enum):
"""期货曲线形态枚举"""
CONTANGO = "contango"
BACKWARDATION = "backwardation"
FLAT = "flat"
@dataclass
class ContractMetadata:
"""合约元数据"""
symbol: str # 合约代码,如 CL.NYM 或 sc.INE
name: str # 合约名称
exchange: str # 交易所
multiplier: float # 合约乘数
unit: str # 计价单位
tick_size: float # 最小跳动
first_trade_day: datetime # 首个交易日
last_trade_day: datetime # 最后交易日
delivery_months: List[str] = field(default_factory=list) # 交易月份列表
@dataclass
class CurveSnapshot:
"""某一时点的期货曲线快照"""
timestamp: datetime
near_contract: str # 近月合约代码
far_contract: str # 远月合约代码
near_price: float
far_price: float
spread: float # 远月-近月价差
spread_pct: float # 价差百分比
shape: FuturesCurveShape
roll_cost_annualized: float # 年化展期成本
@dataclass
class RollTransaction:
"""展期交易记录"""
roll_date: datetime
near_contract_close: float # 近月平仓价
far_contract_open: float # 远月开仓价
roll_yield: float # 本次展期收益率
roll_yield_annualized: float # 年化展期收益率
position_size: float # 持仓量(合约数)
notional_value: float # 名义本金
days_to_expiry: int # 距到期天数
@dataclass
class BacktestResult:
"""回测结果"""
total_return: float
annualized_return: float
volatility: float
sharpe_ratio: float
max_drawdown: float
max_drawdown_duration: int # 最大回撤持续天数
roll_yield_contribution: float # 展期收益贡献
price_return_contribution: float # 价格收益贡献
num_rolls: int
avg_roll_yield: float
4.2 数据获取层:TickDB API 封装
class TickDBFuturesClient:
"""
TickDB 期货数据客户端
功能:
- 获取期货 K 线数据(用于计算收益率)
- 获取期货品种元数据(合约规则、到期日)
- WebSocket 实时行情订阅(生产环境使用)
⚠️ 注意事项:
- 原油期货使用 sc.INE(上期所)或 CL.NYM(NYMEX)
- 不同交易所的合约乘数和报价单位可能不同
- 历史 K 线数据使用 /kline 接口,当前价格使用 /kline/latest
"""
BASE_URL = "https://api.tickdb.ai/v1"
MAX_RETRIES = 3
BASE_DELAY = 1.0
MAX_DELAY = 32.0
# 常用原油期货合约映射
CRUDE_OIL_CONTRACTS = {
"WTI": "CL.NYM", # NYMEX WTI 原油
"Brent": "BZ.NYM", # NYMEX Brent 原油
"上期所原油": "sc.INE" # 上期所 SC 原油
}
def __init__(self, api_key: Optional[str] = None):
"""
初始化客户端
Args:
api_key: TickDB API Key,若为 None 则从环境变量读取
"""
self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
if not self.api_key:
raise ValueError("API Key 未设置,请设置环境变量 TICKDB_API_KEY")
self.session = requests.Session()
self.session.headers.update({
"X-API-Key": self.api_key,
"Content-Type": "application/json"
})
# 缓存合约元数据,避免重复查询
self._contract_cache: Dict[str, ContractMetadata] = {}
self._cache_expiry: Optional[datetime] = None
self._cache_ttl = timedelta(hours=1)
logger.info(f"TickDB 客户端初始化完成,数据源:TickDB")
def _request_with_retry(
self,
method: str,
endpoint: str,
params: Optional[Dict] = None,
data: Optional[Dict] = None,
retry_count: int = 0
) -> Dict:
"""
带重试机制的请求方法
⚠️ 工程要点:
- 指数退避 + 抖动:避免惊群效应
- 限频处理:识别 3001 错误码,等待 Retry-After
- 超时设置:防止挂起
"""
url = f"{self.BASE_URL}{endpoint}"
try:
response = self.session.request(
method=method,
url=url,
params=params,
json=data,
timeout=(3.05, 10) # ⚠️ 必设超时:connect 3.05s, read 10s
)
# 处理限频
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
logger.warning(f"请求频率超限,等待 {retry_after} 秒")
time.sleep(retry_after)
return self._request_with_retry(
method, endpoint, params, data, retry_count
)
result = response.json()
# 处理 API 错误码
if result.get("code") == 3001:
retry_after = int(response.headers.get("Retry-After", 5))
logger.warning(f"API 限频 (3001),等待 {retry_after} 秒后重试")
time.sleep(retry_after)
return self._request_with_retry(
method, endpoint, params, data, retry_count
)
if result.get("code") == 1001:
raise ValueError("API Key 无效,请检查环境变量 TICKDB_API_KEY")
if result.get("code") not in (0, 200):
raise RuntimeError(
f"API 错误: code={result.get('code')}, "
f"message={result.get('message')}"
)
return result.get("data", result)
except requests.exceptions.Timeout:
logger.warning(f"请求超时({retry_count + 1}/{self.MAX_RETRIES})")
if retry_count < self.MAX_RETRIES:
delay = min(
self.BASE_DELAY * (2 ** retry_count) + random.uniform(0, 1),
self.MAX_DELAY
)
time.sleep(delay)
return self._request_with_retry(
method, endpoint, params, data, retry_count + 1
)
raise
except requests.exceptions.RequestException as e:
logger.error(f"网络请求失败: {e}")
raise
def get_historical_klines(
self,
symbol: str,
interval: str = "1d",
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
limit: int = 1000
) -> pd.DataFrame:
"""
获取期货历史 K 线数据
⚠️ 重要说明:
- interval: "1m","5m","1h","4h","1d","1w"
- 获取已结束周期的 K 线使用 /market/kline(而非 /kline/latest)
- 原油期货 K 线数据支持回测周期通常为数年
Args:
symbol: 合约代码,如 "CL.NYM"
interval: K 线周期
start_time: 开始时间
end_time: 结束时间
limit: 单次最大获取条数
Returns:
DataFrame,包含 timestamp, open, high, low, close, volume
"""
params = {
"symbol": symbol,
"interval": interval,
"limit": limit
}
if start_time:
params["start_time"] = int(start_time.timestamp() * 1000)
if end_time:
params["end_time"] = int(end_time.timestamp() * 1000)
data = self._request_with_retry("GET", "/market/kline", params=params)
if not data or "klines" not in data:
logger.warning(f"未获取到 {symbol} 的 K 线数据")
return pd.DataFrame()
df = pd.DataFrame(data["klines"])
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
# 数值列转换
for col in ["open", "high", "low", "close", "volume"]:
df[col] = df[col].astype(float)
return df.sort_values("timestamp").reset_index(drop=True)
def get_latest_price(self, symbol: str) -> Optional[float]:
"""
获取期货最新价格(实时接口)
⚠️ 注意:此接口用于实时监控,回测应使用 get_historical_klines
"""
data = self._request_with_retry("GET", "/market/kline/latest", {
"symbol": symbol,
"interval": "1d"
})
if data and "klines" in data and len(data["klines"]) > 0:
return float(data["klines"][-1]["close"])
return None
def get_nearby_contracts(
self,
underlying: str = "WTI"
) -> List[str]:
"""
获取近月合约序列
对于原油期货,TickDB 通常以连续合约形式提供数据,
但其底层对应的是不同到期月份的期货合约。
⚠️ 注意:不同数据源的合约代码格式可能不同
- 上期所原油:sc2301, sc2302, ..., sc2712
- NYMEX WTI:CLZ23, CLF24, ...(Z=12月, F=1月...)
Returns:
近月合约代码列表,如 ["CL.NYM", "CL+1.NYM", "CL+2.NYM"]
注:部分数据源可能不直接支持 +N 格式,需通过 /symbols 获取
"""
symbol = self.CRUDE_OIL_CONTRACTS.get(underlying)
if not symbol:
raise ValueError(f"不支持的原油品种: {underlying}")
# 尝试获取可用合约列表
try:
data = self._request_with_retry("GET", "/symbols/available", {
"category": "futures"
})
if data and "symbols" in data:
# 过滤出原油相关合约
oil_symbols = [
s for s in data["symbols"]
if "crude" in s.lower() or s.startswith("CL") or s.startswith("sc")
]
logger.info(f"获取到 {len(oil_symbols)} 个原油相关合约")
return oil_symbols[:6] # 返回近 6 个月合约
except Exception as e:
logger.warning(f"获取可用合约列表失败: {e},使用默认序列")
# 回退方案:构造近月合约序列
now = datetime.now()
contracts = []
for i in range(6):
month_offset = (now.month + i - 1) % 12 + 1
year_offset = now.year + (now.month + i - 1) // 12
if underlying == "WTI":
contracts.append(f"CL{year_offset % 100:02d}{month_offset:02d}.NYM")
elif underlying == "上期所原油":
contracts.append(f"sc{year_offset % 100:02d}{month_offset:02d}.INE")
return contracts
def batch_get_prices(
self,
symbols: List[str],
retry_count: int = 0
) -> Dict[str, float]:
"""
批量获取合约当前价格
⚠️ 生产环境优化:
- 此方法在高频场景下会产生 N 次 API 调用
- 建议在部署时改用 WebSocket 批量订阅
- 当前实现适用于日级别回测的低频场景
"""
prices = {}
for symbol in symbols:
try:
price = self.get_latest_price(symbol)
if price:
prices[symbol] = price
time.sleep(0.1) # ⚠️ 简单限速,避免触发限频
except Exception as e:
logger.warning(f"获取 {symbol} 价格失败: {e}")
return prices
4.3 展期计算引擎
class RollYieldCalculator:
"""
展期收益计算引擎
核心功能:
1. 计算期货曲线形态(Contango/Backwardation)
2. 计算展期成本/收益
3. 生成展期信号
4. 模拟展期操作
"""
def __init__(self, threshold_contango: float = 0.02):
"""
Args:
threshold_contango: Contango 判定阈值(年化),超过此值判定为陡峭 Contango
"""
self.threshold_contango = threshold_contango
def calculate_curve_snapshot(
self,
near_price: float,
far_price: float,
days_to_expiry: int,
timestamp: Optional[datetime] = None
) -> CurveSnapshot:
"""
计算某一时点的期货曲线快照
Args:
near_price: 近月合约价格
far_price: 远月合约价格
days_to_expiry: 距近月合约到期天数
timestamp: 快照时间
Returns:
CurveSnapshot 对象
"""
spread = far_price - near_price
spread_pct = spread / near_price if near_price > 0 else 0
# 判断曲线形态
if abs(spread_pct) < 0.001: # 约等于 0
shape = FuturesCurveShape.FLAT
elif spread_pct > 0:
shape = FuturesCurveShape.CONTANGO
else:
shape = FuturesCurveShape.BACKWARDATION
# 年化展期成本(假设多头持仓)
roll_cost_annualized = (
-spread_pct * 365 / days_to_expiry
if days_to_expiry > 0 else 0
)
return CurveSnapshot(
timestamp=timestamp or datetime.now(),
near_contract="",
far_contract="",
near_price=near_price,
far_price=far_price,
spread=spread,
spread_pct=spread_pct,
shape=shape,
roll_cost_annualized=roll_cost_annualized
)
def calculate_roll_transaction(
self,
near_close: float,
far_open: float,
position_size: float,
multiplier: float = 1000,
timestamp: Optional[datetime] = None,
days_to_expiry: int = 30
) -> RollTransaction:
"""
计算单次展期交易的收益
Args:
near_close: 近月合约平仓价格
far_open: 远月合约开仓价格
position_size: 持仓手数
multiplier: 合约乘数(每手对应的桶数/桶)
timestamp: 展期时间
days_to_expiry: 距到期天数
Returns:
RollTransaction 对象
"""
# 近月合约平仓盈亏
# 注意:这里简化处理,假设近月开仓价 = 近月当前价
near_pnl = 0 # 实际计算需要记录开仓价
# 远月合约开仓成本
far_cost = far_open * position_size * multiplier
# 展期收益率:近月卖出价 vs 远月买入价
# 若 far_open < near_close,则展期产生正收益
roll_yield = (near_close - far_open) / near_close
roll_yield_annualized = roll_yield * 365 / max(days_to_expiry, 1)
notional_value = near_close * position_size * multiplier
return RollTransaction(
roll_date=timestamp or datetime.now(),
near_contract_close=near_close,
far_contract_open=far_open,
roll_yield=roll_yield,
roll_yield_annualized=roll_yield_annualized,
position_size=position_size,
notional_value=notional_value,
days_to_expiry=days_to_expiry
)
def generate_roll_signal(
self,
curve_snapshot: CurveSnapshot,
iv_ratio: Optional[float] = None,
inventory_change: Optional[float] = None
) -> Dict:
"""
生成展期信号
结合曲线形态、隐含波动率、库存变化等因素,
生成综合展期建议。
Args:
curve_snapshot: 曲线快照
iv_ratio: 隐含波动率比率(短期IV/长期IV),>1 表示短期波动高
inventory_change: 库存变化率(周环比),负值表示去库
Returns:
信号字典,包含 action, weight, reason
"""
signal = {
"action": "hold", # hold, partial_roll, full_roll, skip_roll
"weight": 1.0, # 建议展期比例
"reason": [],
"risk_level": "normal" # low, normal, high
}
# 信号1:基于曲线形态
if curve_snapshot.shape == FuturesCurveShape.CONTANGO:
if curve_snapshot.roll_cost_annualized > self.threshold_contango:
signal["action"] = "partial_roll"
signal["weight"] = 0.3
signal["reason"].append(
f"Contango 陡峭(年化 {curve_snapshot.roll_cost_annualized:.1%}),"
f"展期成本过高,建议减少持仓"
)
signal["risk_level"] = "high"
else:
signal["reason"].append(
f"Contango 平缓(年化 {curve_snapshot.roll_cost_annualized:.1%}),"
f"可接受展期成本"
)
elif curve_snapshot.shape == FuturesCurveShape.BACKWARDATION:
signal["action"] = "full_roll"
signal["weight"] = 1.0
signal["reason"].append(
f"Backwardation 结构,远月贴水,展期产生正收益"
)
if curve_snapshot.spread_pct < -0.02:
signal["risk_level"] = "low" # 强Backwardation,低风险
else: # FLAT
signal["reason"].append("曲线平坦,展期影响较小")
# 信号2:隐含波动率加成
if iv_ratio is not None:
if iv_ratio > 1.5:
signal["reason"].append(
f"短期波动率高(IV比率={iv_ratio:.2f}),"
f"市场可能酝酿大幅波动"
)
signal["risk_level"] = "high"
elif iv_ratio < 0.8:
signal["reason"].append(
f"波动率处于低位,展期相对安全"
)
# 信号3:库存变化加成
if inventory_change is not None:
if inventory_change < -0.05: # 去库超过5%
if signal["action"] == "hold":
signal["action"] = "partial_roll"
signal["weight"] = 0.5
signal["reason"].append(
f"库存去化({inventory_change:.1%}),"
f"Backwardation 可能强化"
)
elif inventory_change > 0.05: # 累库超过5%
if signal["action"] in ("full_roll", "partial_roll"):
signal["weight"] = max(0.2, signal["weight"] - 0.3)
signal["reason"].append(
f"库存累积({inventory_change:.1%}),"
f"Backwardation 可能收窄"
)
signal["risk_level"] = "high"
return signal
4.4 回测引擎
class RollYieldBacktester:
"""
展期策略回测引擎
支持三种回测模式:
1. 固定日期展期(Benchmark)
2. 阈值触发展期
3. 多因子信号展期
"""
def __init__(
self,
initial_capital: float = 1_000_000,
contract_multiplier: float = 1000,
slippage: float = 0.0005,
commission: float = 2.0
):
"""
Args:
initial_capital: 初始资金
contract_multiplier: 合约乘数(WTI=1000桶/手)
slippage: 滑点(假设 5 个跳动)
commission: 单边手续费(美元/手)
"""
self.initial_capital = initial_capital
self.contract_multiplier = contract_multiplier
self.slippage = slippage
self.commission = commission
def run_backtest(
self,
price_data: pd.DataFrame,
roll_calendar: List[datetime],
mode: str = "fixed",
roll_threshold: float = 0.03,
signal_data: Optional[pd.DataFrame] = None
) -> BacktestResult:
"""
运行回测
Args:
price_data: 历史价格数据,需包含 near_price, far_price 列
roll_calendar: 预设展期日期列表
mode: 回测模式(fixed/threshold/signal)
roll_threshold: 阈值模式下的触发阈值(年化展期成本)
signal_data: 信号数据(用于 signal 模式),需包含 roll_signal 列
Returns:
BacktestResult 对象
"""
df = price_data.copy()
df = df.sort_values("timestamp").reset_index(drop=True)
# 初始化
capital = self.initial_capital
position_value = 0
cash = capital
daily_returns = []
roll_transactions = []
roll_yield_cumulative = 0
price_return_cumulative = 0
position_size = 0
current_roll_idx = 0
for i, row in df.iterrows():
date = row["timestamp"]
near_price = row["near_price"]
far_price = row.get("far_price", near_price)
# 每日结算:假设按近月合约价格计算持仓盈亏
if position_size > 0 and i > 0:
prev_price = df.iloc[i-1]["near_price"]
daily_pnl = (near_price - prev_price) * position_size * self.contract_multiplier
daily_return = daily_pnl / (position_size * prev_price * self.contract_multiplier)
daily_returns.append(daily_return)
# 累计价格收益
price_return_cumulative *= (1 + daily_return)
# 检查是否触发展期
should_roll = False
roll_ratio = 1.0
if mode == "fixed":
# 固定日期展期:检查是否在展期窗口
if current_roll_idx < len(roll_calendar):
roll_date = roll_calendar[current_roll_idx]
days_to_roll = (roll_date - date).days
if 0 <= days_to_roll <= 3: # 到期前3天内展期
should_roll = True
current_roll_idx += 1
elif mode == "threshold":
# 阈值触发:展期成本超过阈值时触发
if "days_to_expiry" in row:
days_to_exp = row["days_to_expiry"]
if days_to_exp <= 10: # 临近到期
spread_pct = (far_price - near_price) / near_price
roll_cost_annualized = -spread_pct * 365 / max(days_to_exp, 1)
if roll_cost_annualized > roll_threshold:
should_roll = True
elif mode == "signal" and signal_data is not None:
# 信号驱动:基于外部信号
sig_row = signal_data[signal_data["timestamp"] == date]
if not sig_row.empty:
signal = sig_row.iloc[0].get("roll_signal", "hold")
if signal in ("full_roll", "partial_roll"):
should_roll = True
roll_ratio = sig_row.iloc[0].get("weight", 1.0)
# 执行展期
if should_roll and position_size > 0:
# 平仓近月(扣除滑点和手续费)
close_price = near_price * (1 - self.slippage)
close_pnl = (close_price - row.get("entry_price", near_price)) * position_size * self.contract_multiplier
# 开仓远月(扣除滑点和手续费)
open_price = far_price * (1 + self.slippage)
new_position_value = position_size * open_price * self.contract_multiplier
# 计算展期收益率
roll_yield = (close_price - open_price) / close_price
roll_yield_cumulative *= (1 + roll_yield * roll_ratio)
# 记录展期交易
roll_tx = RollTransaction(
roll_date=date,
near_contract_close=close_price,
far_contract_open=open_price,
roll_yield=roll_yield * roll_ratio,
roll_yield_annualized=roll_yield * roll_ratio * 365 / 30,
position_size=position_size,
notional_value=new_position_value,
days_to_expiry=row.get("days_to_expiry", 30)
)
roll_transactions.append(roll_tx)
# 更新持仓成本
row["entry_price"] = open_price
# 计算绩效指标
total_return = (capital + sum(daily_returns) * capital) / capital - 1
annualized_return = (1 + total_return) ** (252 / len(df)) - 1
volatility = np.std(daily_returns) * np.sqrt(252) if daily_returns else 0
sharpe_ratio = annualized_return / volatility if volatility > 0 else 0
# 最大回撤
cumulative = np.cumprod([1 + r for r in daily_returns]) if daily_returns else [1]
running_max = np.maximum.accumulate(cumulative)
drawdown = (cumulative - running_max) / running_max
max_drawdown = abs(np.min(drawdown)) if len(drawdown) > 0 else 0
# 回撤持续天数
dd_start = np.where(drawdown == -max_drawdown)[0]
dd_duration = len(dd_start) if dd_start.size > 0 else 0
# 平均展期收益率
avg_roll_yield = (
np.mean([tx.roll_yield for tx in roll_transactions])
if roll_transactions else 0
)
return BacktestResult(
total_return=total_return,
annualized_return=annualized_return,
volatility=volatility,
sharpe_ratio=sharpe_ratio,
max_drawdown=max_drawdown,
max_drawdown_duration=dd_duration,
roll_yield_contribution=roll_yield_cumulative - 1,
price_return_contribution=price_return_cumulative - 1,
num_rolls=len(roll_transactions),
avg_roll_yield=avg_roll_yield
)
def compare_strategies(
self,
price_data: pd.DataFrame,
roll_calendar: List[datetime],
signal_data: Optional[pd.DataFrame] = None
) -> pd.DataFrame:
"""
对比不同展期策略的效果
Returns:
对比表格 DataFrame
"""
strategies = {
"固定日期展期": ("fixed", None),
"阈值触发展期 (3%)": ("threshold", 0.03),
"阈值触发展期 (5%)": ("threshold", 0.05),
"信号驱动展期": ("signal", None),
}
results = []
for name, (mode, threshold) in strategies.items():
result = self.run_backtest(
price_data=price_data,
roll_calendar=roll_calendar,
mode=mode,
roll_threshold=threshold,
signal_data=signal_data
)
results.append({
"策略": name,
"总收益率": f"{result.total_return:.2%}",
"年化收益率": f"{result.annualized_return:.2%}",
"年化波动率": f"{result.volatility:.2%}",
"夏普比率": f"{result.sharpe_ratio:.2f}",
"最大回撤": f"{result.max_drawdown:.2%}",
"展期收益贡献": f"{result.roll_yield_contribution:.2%}",
"展期次数": result.num_rolls,
"平均展期收益": f"{result.avg_roll_yield:.4f}"
})
return pd.DataFrame(results)
4.5 回测示例
def run_crude_oil_roll_backtest():
"""
原油期货展期策略回测示例
⚠️ 前置条件:
1. 设置环境变量 TICKDB_API_KEY
2. 确保网络可访问 TickDB API
"""
# 初始化客户端
client = TickDBFuturesClient()
# 获取历史数据(以 WTI 原油为例)
logger.info("正在获取 WTI 原油期货历史数据...")
# 回测周期:2018-01-01 至 2023-12-31
start_date = datetime(2018, 1, 1)
end_date = datetime(2023, 12, 31)
# 获取近月合约数据(主力连续合约)
near_contract = "CL.NYM"
near_data = client.get_historical_klines(
symbol=near_contract,
interval="1d",
start_time=start_date,
end_time=end_date,
limit=2000
)
if near_data.empty:
logger.error("未获取到数据,请检查 API Key 和网络连接")
return
logger.info(f"获取到 {len(near_data)} 条近月合约 K 线数据")
# 构造远月合约数据(简化处理:使用次月合约)
# ⚠️ 实际应用中应获取真实次月合约数据
far_contract = "CL+1.NYM"
far_data = client.get_historical_klines(
symbol=far_contract,
interval="1d",
start_time=start_date,
end_time=end_date,
limit=2000
)
# 合并数据
price_data = near_data[["timestamp", "close"]].copy()
price_data.columns = ["timestamp", "near_price"]
if not far_data.empty:
far_data = far_data[["timestamp", "close"]].copy()
far_data.columns = ["timestamp", "far_price"]
price_data = price_data.merge(far_data, on="timestamp", how="left")
price_data["far_price"] = price_data["far_price"].fillna(price_data["near_price"])
# 估算距到期天数(简化:每月15日左右到期)
price_data["days_to_expiry"] = (
(price_data["timestamp"].dt.month % 12 + 1) * 30 -
price_data["timestamp"].dt.day
).clip(lower=1)
# 构造展期日历(每月底展期)
roll_calendar = []
current = start_date.replace(day=1)
while current <= end_date:
# 月底前3天展期
next_month = current + timedelta(days=32)
roll_date = datetime(next_month.year, next_month.month, 25)
if roll_date <= end_date:
roll_calendar.append(roll_date)
current = next_month
logger.info(f"展期日历包含 {len(roll_calendar)} 次预设展期")
# 初始化回测引擎
backtester = RollYieldBacktester(
initial_capital=1_000_000,
contract_multiplier=1000, # WTI 每手1000桶
slippage=0.0003, # 假设3个跳动滑点
commission=3.5 # NYMEX 单边手续费
)
# 运行对比回测
logger.info("开始运行策略对比回测...")
results = backtester.compare_strategies(
price_data=price_data,
roll_calendar=roll_calendar
)
print("\n" + "="*80)
print("原油期货展期策略回测结果 (2018-2023)")
print("="*80)
print(results.to_string(index=False))
print("="*80)
# 详细分析:展期收益 vs 价格收益
logger.info("详细分析展期收益贡献...")
detailed_result = backtester.run_backtest(
price_data=price_data,
roll_calendar=roll_calendar,
mode="threshold",
roll_threshold=0.05
)
print(f"""
展期收益分解:
- 总收益率:{detailed_result.total_return:.2%}
- 价格收益贡献:{detailed_result.price_return_contribution:.2%}
- 展期收益贡献:{detailed_result.roll_yield_contribution:.2%}
- 展期次数:{detailed_result.num_rolls}
- 平均展期收益:{detailed_result.avg_roll_yield:.4f} ({detailed_result.avg_roll_yield*100:.2f}%)
""")
return results
# 入口
if __name__ == "__main__":
# ⚠️ 运行前请确保已设置环境变量
# export TICKDB_API_KEY="your_api_key_here"
try:
results = run_crude_oil_roll_backtest()
except ValueError as e:
print(f"配置错误: {e}")
print("请确保已设置 TICKDB_API_KEY 环境变量")
except Exception as e:
logger.error(f"回测失败: {e}")
raise
五、核心指标解读:展期策略的绩效归因
5.1 展期收益 vs 价格收益的分解
展期策略的总收益可分解为两个来源:
Total Return = Price Return + Roll Yield Return
其中:
- Price Return:近月合约价格变动带来的收益
- Roll Yield Return:展期操作本身产生的收益(或损耗)
理想情况(Backwardation 市场):
- 价格下跌,但展期收益为正,抵消部分损失
- 总收益可能优于单纯持有近月合约
不利情况(陡峭 Contango 市场):
- 价格下跌,展期成本叠加,损失放大
- 2020年4月的极端行情中,持有原油期货多头并按时展期的投资者,在油价反弹之前已经遭受双重打击
5.2 展期策略有效性检验
| 检验维度 | 检验方法 | 合格标准 |
|---|---|---|
| 展期择时效果 | 对比阈值触发 vs 固定日期的展期收益差异 | 阈值触发展期收益显著更高(p<0.05) |
| 曲线预测能力 | 展期信号与实际曲线形态的领先滞后关系 | 信号提前1-3天发出,准确率>60% |
| 冲击成本控制 | 展期日实际成交价与报价中价的差异 | 单次冲击成本<0.05% |
| 策略稳健性 | 不同年份、不同品种的表现一致性 | 年化收益标准差<年化收益均值 |
六、原油期货主要品种对比
| 品种 | 交易所 | 合约代码 | 合约乘数 | 报价单位 | 主力合约 | 特点 |
|---|---|---|---|---|---|---|
| WTI 原油 | NYMEX | CL | 1000 桶 | 美元/桶 | 1-12月 | 全球基准,流动性最强 |
| Brent 原油 | ICE | BZ | 1000 桶 | 美元/桶 | 1-12月 | 欧洲市场基准,与WTI价差稳定 |
| 上期所原油 | INE | SC | 1000 桶 | 人民币/桶 | 1-12月 | 国内定价权,日盘+夜盘 |
展期策略适用性:
- WTI 和 Brent:流动性好,展期冲击成本低,适合大资金
- 上期所 SC:夜盘交易(21:00-02:30)与外盘错开,适合跨市场套利
七、策略局限性与风险管理
7.1 展期策略的固有风险
| 风险类型 | 描述 | 缓释措施 |
|---|---|---|
| 曲线反转风险 | Contango 突然转为 Backwardation,或反之 | 设置止损阈值,动态调整仓位 |
| 流动性枯竭风险 | 近月合约到期前流动性急剧下降 | 提前分批展期,避免最后交易日操作 |
| 正价差扩大风险 | Contango 曲线陡峭化,展期成本上升 | 监控展期成本率,必要时减仓 |
| 交易成本侵蚀 | 频繁展期导致手续费累积 | 控制展期频率,设定最小展期阈值 |
7.2 极端行情应对
案例:2020年负价格事件复盘
2020年4月20日,WTI原油期货5月合约结算价跌至-37.63美元/桶,历史上首次出现负价格。
如果持有该合约多头并按时展期:
- 4月16日(到期前2个交易日):展期成本率年化超过100%
- 4月20日:无法在正常价位平仓,被迫以负价格结算
教训:
- 永远不要持有近月合约至最后交易日
- 展期策略必须设置"提前展期"触发条件
- 极端 Contango 市场应考虑减仓或空仓
7.3 风险管理框架建议
风险管理框架:展期策略
1. 仓位管理
- 单次展期不超过总仓位的 50%
- 极端 Contango 市场(年化展期成本>10%)降至 20%
2. 预警机制
- 展期成本率 > 5%/年:触发告警
- 展期成本率 > 10%/年:建议减仓
- 近月合约持仓量下降 > 50%:流动性预警
3. 应急预案
- 备用展期窗口:到期前 7-10 个交易日
- 合约切换预案:若主力合约流动性不足,切换至次主力
八、结语:展期是成本,还是收益?
"期货市场从不创造价值,它只是将风险重新分配给愿意承担的人。"
展期策略的本质,是在期货合约的时间维度上寻找定价偏差。对于多头投资者而言:
- Backwardation 市场:展期是正收益来源,应尽可能延长持仓时间,享受现货溢价带来的红利
- Contango 市场:展期是持续损耗,应通过阈值触发或信号驱动,尽量减少不必要的展期操作
- 平坦曲线市场:展期影响可以忽略,应专注于价格趋势的判断
核心原则:不要为了“按时展期”而展期,要为了“最优展期”而等待。
下一步行动
如果你希望动手复现本文策略:
- 访问 TickDB 官网 注册(免费,无需信用卡)
- 在控制台生成 API Key
- 设置环境变量
TICKDB_API_KEY,复制本文代码即可运行 - 使用 WTI 原油期货(CL.NYM)近月和次月合约进行回测验证
如果你关注展期策略的进阶优化:
- 结合期权波动率曲面预测曲线变化
- 引入库存数据(EIA 周报)作为展期信号增强
- 考虑跨品种套利(WTI vs Brent vs SC)
如果你需要完整的历史期货合约序列数据,联系 [email protected] 了解机构级数据方案。
回测局限性说明:上述回测结果基于 TickDB 历史 K 线数据模拟,不构成未来收益保证。回测存在以下固有局限:未完全模拟实际交易中的流动性冲击成本;未考虑合约切换时的价格断层;历史展期收益不代表未来表现。建议在实际使用前进行更长时间跨度的验证,并结合实时市场数据动态调整策略参数。
本文不构成任何投资建议。市场有风险,投资需谨慎。