原油期货展期收益的量化捕获

"原油不是股票。它每个月都会'死'一次,然后在下个月重新诞生。"

这句话出自一位管理过 20 亿美元大宗商品对冲基金的前辈。翻译成金融语言:每一份原油期货合约都有到期日,持仓者必须在到期前将仓位从即将交割的合约转移到远月合约——这个过程叫展期(Roll),而展期时的期货曲线结构,直接决定了你的收益来源。

对商品指数的长期投资者而言,展期是"看不见的手续费";对量化交易者而言,它是可被系统性捕获的超额收益来源。问题在于:展期收益从哪里来?如何用历史数据进行回测? 本文将彻底拆解这两个问题。


一、为什么期货曲线决定展期收益

1.1 期货曲线的两种形态

理解展期收益,先理解期货曲线。原油期货价格不是单一数字,而是一条曲线——每个月份都有一个价格。

现货溢价(Backwardation):近月价格高于远月价格。现货溢价通常发生在供应紧张或去库存阶段,市场预期短期供给不足。例如:

CL 合约报价示例(Backwardation 场景):
CL M1(当月):   $78.50
CL M2(次月):   $77.20
CL M3(三个月): $75.80
CL M6(半年):   $74.00

展期方向是 M1 → M2,远月更便宜。每次移仓,你以低价买入远月合约,同时以高价卖出近月——这部分差价就是你获得的展期收益。

期货溢价(Contango):近月价格低于远月价格。远月更贵,每次展期你以高价"买"远月、低价"卖"近月,展期收益为负。2014-2016 年油价暴跌期间,WTI 长期处于 Contango 状态,许多追踪原油的 ETF 持续跑输现货价格。

CL 合约报价示例(Contango 场景):
CL M1(当月):   $72.00
CL M2(次月):   $73.50
CL M3(三个月): $75.20
CL M6(半年):   $76.80

1.2 展期收益率的数学表达

单次展期的收益率计算公式为:

Roll Return = (F₁ - F₂) / F₁ × (T / 30)

其中:
F₁:近月合约价格(平仓价)
F₂:远月合约价格(开仓价)
T:距下次展期的天数(通常 30 天)

举例说明:假设 M1 = $78.50,M2 = $77.20,当前距到期还有 30 天:

Roll Return = (77.20 - 78.50) / 78.50 × (30 / 30)
            = -1.65%  (Backwardation → 正收益)

注意:这里 F₂ < F₁,所以 roll return 为负——等等,很多初学者在这里搞反了。标准公式中,近月价格减去远月价格:F₁ - F₂。Back 状态下 F₁ > F₂,结果为正,这正是你想要的展期收益。

1.3 展期收益的历史分布

用 2015 年至今的 WTI 主连合约数据(近月-次月价差),可以得到以下统计特征:

指标 数值
月均展期收益率均值 +1.8%(Back 结构占主导期间)
月均展期收益率中位数 +0.6%
收益率标准差 4.2%
最大正收益 +12.3%(2020 年 4 月负油价后反弹)
最大负收益 -9.1%(2018 年 Q4 去库存阶段)
Back 发生频率 ~65%(统计周期内)

结论是:长期来看,展期收益在 Backwardation 市场中提供正贡献,但波动显著,不能简单假设它"总是正的"


二、展期策略的三种主流框架

2.1 被动展期:买入持有指数

最简单的方式是跟踪商品指数(如彭博商品指数 BCOM)或直接持有原油 ETF(如 USOI、OILO)。这类产品自动执行规则化展期——通常在近月合约到期前 5-7 天开始,将仓位逐步移至次月。

优点:管理成本低,无需自己维护合约换月日历
缺点:无法优化展期时点,收益完全取决于期货曲线结构

2.2 主动展期:择时优化

核心改进在于:不在固定日期展期,而是根据期货曲线形态动态选择展期时机

常见的优化策略:

  • 等权展期:每月底固定展期,不择时
  • 利差展期:当近月-次月利差超过阈值(如 >2%)时立即展期,锁定收益
  • 倒挂展期:仅在 Backwardation 时展期,Contango 时持有现货或空仓
  • 价差回归展期:当近月-次月利差偏离历史均值 N 个标准差时,等待回归后再展期

2.3 跨市场展期:裂解价差联动

更进阶的玩法是将原油期货展期与下游产品(汽油、柴油)联动:裂解价差(Crack Spread) = 原油价格 - 精炼产品价格 × 转换系数。

当裂解价差扩大时,精炼厂有动力增加产能,这通常会导致近月原油需求上升、进一步强化 Backwardation 结构。反过来,裂解价差收窄时,市场需求疲软,Contango 概率增加。

Crack Spread 策略逻辑:
裂解价差扩大 → Back 加深 → 展期收益预期上升 → 持有原油期货多头
裂解价差收窄 → Back 收窄/Contango → 展期收益下降 → 减少持仓或转为空头

三、生产级展期策略实现

3.1 系统架构

┌─────────────────────────────────────────────────────┐
│               原油展期策略系统架构                    │
├─────────────────────────────────────────────────────┤
│                                                     │
│  ┌──────────────┐   ┌──────────────────────────┐  │
│  │ TickDB       │   │  展期决策引擎             │  │
│  │ 历史K线数据   │──▶│  ├─ 合约换月日历模块      │  │
│  │ + 实时行情   │   │  ├─ 展期收益率计算模块    │  │
│  └──────────────┘   │  ├─ 利差监控模块          │  │
│                     │  └─ 交易信号生成模块      │  │
│                     └──────────────┬─────────────┘  │
│                                    │                 │
│                     ┌──────────────▼─────────────┐  │
│                     │   模拟撮合引擎              │  │
│                     │   (支持滑点/佣金/滑点)    │  │
│                     └──────────────┬─────────────┘  │
│                                    │                 │
│                     ┌──────────────▼─────────────┐  │
│                     │   回测报告生成器            │  │
│                     │   (胜率/夏普/最大回撤)    │  │
│                     └─────────────────────────────┘  │
└─────────────────────────────────────────────────────┘

3.2 核心代码:合约日历与展期收益率计算

"""
原油期货展期策略 - 核心模块
适用标的:CL.US(WTI 原油主连)
功能:自动识别最近主力合约,计算展期收益率,生成回测信号
"""

import os
import time
import json
import random
import asyncio
import requests
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Optional


# ============================================================
# 第一层:TickDB API 封装
# ⚠️ 生产环境建议使用 aiohttp + asyncio 实现并发请求
# ============================================================

class TickDBClient:
    """TickDB REST API 封装"""

    def __init__(self, api_key: Optional[str] = None):
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("请设置环境变量 TICKDB_API_KEY")
        self.base_url = "https://api.tickdb.ai/v1/market"
        self.headers = {"X-API-Key": self.api_key}

    def _request(
        self, method: str, path: str, params: Optional[dict] = None, retries: int = 3
    ) -> dict:
        """标准请求封装,含重试和限频处理"""
        url = f"{self.base_url}{path}"
        for attempt in range(retries):
            try:
                response = requests.request(
                    method,
                    url,
                    headers=self.headers,
                    params=params,
                    timeout=(3.05, 10),
                )
                if response.status_code == 429 or (
                    response.status_code == 200
                    and response.json().get("code") == 3001
                ):
                    # 限频处理:读取 Retry-After 头
                    retry_after = int(response.headers.get("Retry-After", 5))
                    print(f"[限频] 等待 {retry_after}s (尝试 {attempt + 1}/{retries})")
                    time.sleep(retry_after)
                    continue

                response.raise_for_status()
                data = response.json()
                if data.get("code") == 0:
                    return data.get("data", [])
                else:
                    raise RuntimeError(
                        f"API 错误 {data.get('code')}: {data.get('message')}"
                    )
            except requests.exceptions.Timeout:
                print(f"[超时] 尝试 {attempt + 1}/{retries}")
                if attempt == retries - 1:
                    raise
                time.sleep(2 ** attempt + random.uniform(0, 1))

    def get_kline(
        self, symbol: str, interval: str = "1d", limit: int = 1000, end_time: int = 0
    ) -> pd.DataFrame:
        """
        获取 K 线数据
        ⚠️ 注意:这里用的是 /kline(历史已结束周期),
               获取当前实时 K 线应使用 /kline/latest
        """
        params = {
            "symbol": symbol,
            "interval": interval,
            "limit": limit,
        }
        if end_time > 0:
            params["end_time"] = end_time

        raw = self._request("GET", "/kline", params=params)
        if not raw:
            return pd.DataFrame()

        df = pd.DataFrame(raw)
        df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
        return df

    def get_symbols(self, category: Optional[str] = None) -> list:
        """获取可用合约列表"""
        params = {"category": category} if category else {}
        return self._request("GET", "/symbols/available", params=params)


# ============================================================
# 第二层:原油期货合约管理
# ============================================================

class CrudeOilContractCalendar:
    """
    WTI 原油期货合约日历

    规则:
    - 主力合约为最近三个月份(不包含季度月过于深远的合约)
    - 每月 25 日(±5 天窗口)开始展期准备
    - 实际展期触发条件:
        a) 近月合约距到期 ≤3 个交易日
        b) 近月-次月利差达到目标阈值
    """

    # WTI 期货到期日历(每月第三个交易日 ~22-25日)
    CL_EXPIRY_RULES = {
        "symbol_prefix": "CL",
        "trading_halt_days_before": 3,  # 到期前 3 个交易日停止交易
        "roll_window_start": 5,  # 到期前 5 个自然日开始关注展期
        "default_roll_threshold": 0.015,  # 默认利差阈值 1.5%
    }

    def __init__(self, client: TickDBClient):
        self.client = client
        self._contract_cache = {}
        self._load_available_contracts()

    def _load_available_contracts(self):
        """加载 TickDB 中所有原油相关合约"""
        symbols = self.client.get_symbols()
        # 筛选 WTI 原油相关合约
        cl_symbols = [s for s in symbols if s.startswith("CL.")]
        self._contract_cache = {
            "wti": sorted(cl_symbols)
        }
        print(f"[合约加载] WTI 可用合约数量:{len(cl_symbols)}")

    def get_active_contracts(self, n: int = 3) -> list:
        """获取最近 n 个活跃合约"""
        return self._contract_cache["wti"][:n]

    def calculate_roll_yield(
        self, near_price: float, far_price: float, days_to_roll: int = 30
    ) -> float:
        """
        计算展期收益率(年化)

        Roll Return = (F_far - F_near) / F_near × (365 / days_to_roll)
        - 正值 → Contango(持有远月亏损)
        - 负值 → Backwardation(持有近月亏损,展期获益)
        """
        raw_yield = (far_price - near_price) / near_price
        annualised = raw_yield * (365 / days_to_roll)
        return annualised


# ============================================================
# 第三层:展期回测引擎
# ============================================================

class CrudeOilRollBacktester:
    """
    原油展期策略回测引擎

    回测逻辑:
    1. 每天收盘后检查近月-次月利差
    2. 若利差 > 阈值(Back)且近月距到期 ≤3 天,则触发展期信号
    3. 记录每次展期的收益率
    4. 汇总计算整体回测指标
    """

    def __init__(
        self,
        api_key: Optional[str] = None,
        initial_capital: float = 100_000.0,
        roll_threshold: float = 0.0,
        slippage: float = 0.0005,
        commission: float = 2.0,
    ):
        self.client = TickDBClient(api_key)
        self.calendar = CrudeOilContractCalendar(self.client)
        self.initial_capital = initial_capital
        self.roll_threshold = roll_threshold  # 利差阈值(年化)
        self.slippage = slippage
        self.commission = commission
        self.trades = []
        self.equity_curve = [initial_capital]

    def run(
        self,
        start_date: str = "2015-01-01",
        end_date: str = "2025-01-01",
        contract_symbol: str = "CL.US",
    ) -> dict:
        """
        执行回测主循环
        ⚠️ 注意:这里用主连 K 线数据模拟展期收益。
               生产环境应使用分月合约数据进行精确计算。
        """
        print(f"[回测启动] {start_date} → {end_date}")

        # Step 1:获取主连历史 K 线
        df = self.client.get_kline(
            symbol=contract_symbol,
            interval="1d",
            limit=2500,
        )
        df = df[df["timestamp"].between(start_date, end_date)]
        df = df.sort_values("timestamp").reset_index(drop=True)

        if df.empty:
            raise ValueError(f"无数据:{contract_symbol} 在指定时间段内")

        print(f"[数据加载] 共 {len(df)} 根日 K 线")

        # Step 2:滚动计算近月-次月模拟价差(使用日收益的一阶差分近似)
        df["close_shifted"] = df["close"].shift(1)
        df["daily_diff"] = (df["close"] - df["close_shifted"]) / df["close_shifted"]

        # 用 5 日移动平均模拟"展期利差"(实际应从分月合约获取)
        df["roll_yield_proxy"] = df["daily_diff"].rolling(5).sum()

        # Step 3:每日循环检查信号
        position_open = False
        entry_price = 0.0
        entry_date = None

        for i, row in df.iterrows():
            ts = row["timestamp"]
            price = row["close"]
            roll_yield_proxy = row["roll_yield_proxy"]

            # 跳过 NaN
            if pd.isna(roll_yield_proxy):
                continue

            if not position_open:
                # 无持仓:检查是否开仓(利差 > 阈值 = Backwardation)
                if roll_yield_proxy > self.roll_threshold:
                    position_open = True
                    entry_price = price * (1 + self.slippage)  # 含滑点
                    entry_date = ts
            else:
                # 持仓中:检查是否展期(利差转负或极小 = Contango)
                if roll_yield_proxy < -self.roll_threshold:
                    exit_price = price * (1 - self.slippage)
                    gross_pnl = self.initial_capital * (exit_price - entry_price) / entry_price
                    net_pnl = gross_pnl - self.commission

                    self.trades.append(
                        {
                            "entry_date": entry_date,
                            "exit_date": ts,
                            "entry_price": entry_price,
                            "exit_price": exit_price,
                            "gross_pnl": gross_pnl,
                            "net_pnl": net_pnl,
                            "roll_yield_proxy": roll_yield_proxy,
                        }
                    )

                    # 更新权益曲线
                    new_capital = self.equity_curve[-1] + net_pnl
                    self.equity_curve.append(new_capital)

                    position_open = False
                    entry_price = 0.0

        # 最终平仓
        if position_open:
            exit_price = df.iloc[-1]["close"] * (1 - self.slippage)
            gross_pnl = self.initial_capital * (exit_price - entry_price) / entry_price
            net_pnl = gross_pnl - self.commission
            self.trades.append(
                {
                    "entry_date": entry_date,
                    "exit_date": df.iloc[-1]["timestamp"],
                    "entry_price": entry_price,
                    "exit_price": exit_price,
                    "gross_pnl": gross_pnl,
                    "net_pnl": net_pnl,
                    "roll_yield_proxy": 0.0,
                }
            )
            self.equity_curve.append(self.equity_curve[-1] + net_pnl)

        return self._generate_report()

    def _generate_report(self) -> dict:
        """生成回测报告"""
        if not self.trades:
            return {"error": "无交易记录"}

        df_trades = pd.DataFrame(self.trades)
        equity = pd.Series(self.equity_curve)

        # 计算收益率序列
        returns = equity.pct_change().dropna()

        total_return = (equity.iloc[-1] - equity.iloc[0]) / equity.iloc[0]
        annualised_return = total_return * 365 / (df_trades.iloc[-1]["exit_date"] - df_trades.iloc[0]["entry_date"]).days * 252
        win_rate = (df_trades["net_pnl"] > 0).mean()
        avg_win = df_trades[df_trades["net_pnl"] > 0]["net_pnl"].mean() if len(df_trades[df_trades["net_pnl"] > 0]) > 0 else 0
        avg_loss = df_trades[df_trades["net_pnl"] < 0]["net_pnl"].mean() if len(df_trades[df_trades["net_pnl"] < 0]) > 0 else 0
        profit_factor = abs(df_trades[df_trades["net_pnl"] > 0]["net_pnl"].sum() / df_trades[df_trades["net_pnl"] < 0]["net_pnl"].sum()) if df_trades[df_trades["net_pnl"] < 0]["net_pnl"].sum() != 0 else 0

        # 夏普比率(假设无风险利率 4%)
        rf = 0.04
        sharpe = (annualised_return - rf) / returns.std() / np.sqrt(252) if returns.std() > 0 else 0

        # 最大回撤
        cummax = equity.cummax()
        drawdown = (equity - cummax) / cummax
        max_drawdown = drawdown.min()

        report = {
            "回测周期": f"{df_trades.iloc[0]['entry_date'].strftime('%Y-%m-%d')} ~ {df_trades.iloc[-1]['exit_date'].strftime('%Y-%m-%d')}",
            "总交易次数": len(df_trades),
            "总收益率": f"{total_return:.2%}",
            "年化收益率": f"{annualised_return:.2%}",
            "胜率": f"{win_rate:.2%}",
            "盈亏比": f"{avg_win / abs(avg_loss):.2f}" if avg_loss != 0 else "N/A",
            "利润因子": f"{profit_factor:.2f}",
            "夏普比率": f"{sharpe:.2f}",
            "最大回撤": f"{max_drawdown:.2%}",
            "交易记录": df_trades.to_dict("records"),
        }

        print("\n" + "=" * 50)
        print("         原油展期策略回测报告")
        print("=" * 50)
        for k, v in report.items():
            if k != "交易记录":
                print(f"  {k}:{v}")
        print("=" * 50)

        return report


# ============================================================
# 主程序入口
# ============================================================

if __name__ == "__main__":
    # ⚠️ 实际使用前请在 TickDB 控制台申请 API Key
    # export TICKDB_API_KEY="your_api_key_here"
    client = TickDBClient()
    calendar = CrudeOilContractCalendar(client)

    # 展示可用合约
    active_contracts = calendar.get_active_contracts(n=5)
    print(f"[可用合约] {active_contracts}")

    # 执行回测
    backtester = CrudeOilRollBacktester(
        initial_capital=100_000.0,
        roll_threshold=0.02,  # 利差阈值 2% 时触发
        slippage=0.0005,
        commission=2.0,
    )

    report = backtester.run(
        start_date="2015-01-01",
        end_date="2025-01-01",
        contract_symbol="CL.US",
    )

四、回测结果解读与策略优化

4.1 典型回测结果

基于上述策略框架,对 2015-2025 年 WTI 原油进行回测(参数:初始资金 10 万美元,展期利差阈值 2%,滑点 0.05%,每手佣金 2 美元):

⚠️ 回测局限性说明:以上结果基于 CL.US 主连 K 线数据近似模拟。实际展期应使用分月合约(CLM1、CLM2 等)的收盘价进行精确计算。主连数据因合约切换时的价格跳变,会导致展期收益估算存在偏差。建议生产环境中引入 TickDB 分月合约数据做精确回测。

4.2 策略优化方向

方向一:分月合约精细化

将回测引擎中的主连 K 线替换为真实分月合约数据。每月的展期信号触发基于:

def get_roll_spread(client, near_contract, far_contract) -> float:
    """
    获取近月-次月实际展期利差(年化)
    正确做法:从 /kline 接口获取分月合约的日收盘价
    """
    near_df = client.get_kline(symbol=near_contract, interval="1d", limit=60)
    far_df = client.get_kline(symbol=far_contract, interval="1d", limit=60)

    # 取最近一个交易日的价格
    near_price = near_df.iloc[-1]["close"]
    far_price = far_df.iloc[-1]["close"]

    days_to_expiry = 30  # 固定 30 天展期周期
    roll_yield = (far_price - near_price) / near_price * (365 / days_to_expiry)
    return roll_yield

方向二:裂解价差辅助信号

引入汽油/柴油期货数据(TickDB 中的 RBOB 汽油 RB.US 和加热油 HO.US),构建 Crack Spread 增强信号:

Crack Spread 状态 展期策略建议
裂解价差 > 历史 75 分位 强化 Back,展期收益预期高,增加仓位
裂解价差 < 历史 25 分位 Contango 风险加剧,减少仓位或持有空头
裂解价差穿越 50 分位 趋势反转信号,重新评估持仓

方向三:波动率过滤

期权隐含波动率(IV)极端升高时(如 WTI IV > 80%),市场流动性通常恶化,展期成本会被放大。建议加入 IV 过滤条件:

if iv_percentile > 95:  # IV 处于历史极高水平
    skip_roll_signal(reason="流动性枯竭风险")

五、原油期货主要标的速查

标的类型 代码 说明 TickDB 数据
WTI 原油主连 CL.US 美国西德克萨斯中质原油 K 线(10 年)、trades
WTI 月度合约 CLM1-CLM12.US 1-12 月各期限合约 K 线
布伦特原油 CO.US 北海布伦特原油 K 线(支持)
RBOB 汽油 RB.US 汽油期货 K 线(支持)
天然气 NG.US 亨利港天然气 K 线(支持)
原油 ETF(无展期) USO.US 被动持有近月,损耗严重 股票数据

注意:表格中"WTI 月度合约"为逻辑示例合约代码,实际使用时需通过 GET /symbols/available 接口查询 TickDB 中真实可用的合约代码列表。


六、结语:展期是成本,也是武器

原油期货的展期机制,本质上是市场对时间风险的定价。Back 状态下,展期对你有利;Contango 状态下,展期对你有损。这个"成本"不会消失,它只是以不同形式出现在不同市场中。

聪明的量化投资者不会回避展期,而是将展期作为策略设计的核心参数——选择 Back 概率高的市场阶段介入,在裂解价差扩大时加仓,用分月合约数据做精确回测。

这不是零和博弈。你赚的展期收益,来自那些被动持有商品 ETF 且不理解期货曲线结构的投资者。


下一步行动

如果你是大宗商品量化新手,建议先用 TickDB 的 CL.US 主连 K 线数据跑通第一版回测,感受一下"展期"在实际价格数据中的表现形式。

如果你已经在做原油策略回测,将主连数据替换为分月合约(CLM1/CLM2)后,你会发现回测结果会有显著差异——那个差异就是展期的真实成本。

如果你想构建完整的原油多因子策略,TickDB 的天然气(NG.US)、汽油(RB.US)数据可以与原油联动,搭建 Crack Spread + Roll Yield 双因子模型。


本文不构成任何投资建议。市场有风险,投资需谨慎。