格兰杰因果检验:美股与加密货币谁领先谁?

"相关系数是表象,领先滞后才是本质。"

2020 年 3 月 12 日,加密市场迎来"黑色星期四"。BTC 在 24 小时内跌幅超过 40%,多交易所出现技术故障。清算所 Margin Call 的连锁反应让传统市场参与者第一次认真看待加密资产的"领先性"。但问题是:这是统计上显著的领先,还是仅仅是巧合?

四年后的今天,机构量化团队在配置加密资产时面临的第一个问题是:当美股期货在盘后出现异动时,BTC 会在次日跟随,还是反之? 回答这个问题,需要的不是直觉,而是严格的统计检验。

本文将完整复现格兰杰因果检验的工程实现:从数据获取、平稳性检验、模型定阶,到最终的结果解读。代码基于 Python,包含完整的统计检验流程,所有数据通过 TickDB API 获取。


一、格兰杰因果检验的本质:它检验的是什么?

格兰杰因果检验(Granger Causality Test)名字中的"因果"二字容易造成误解。它的检验对象是预测能力

核心命题:如果 X 的历史信息能够显著提升对 Y 的预测精度,那么 X"格兰杰导致"Y。

注意这里的三个限定:

  1. 基于历史信息:它不使用未来数据,只检验"已知信息能否预测"
  2. "导致"不是"造成":格兰杰因果是统计关联,而非物理意义上的因果
  3. 受时间方向约束:Y 的过去值无法格兰杰导致 Y 本身——你不能用今天的价格预测今天的价格

形式化地表达,格兰杰因果检验的原假设是:

$$
H_0: \beta_1 = \beta_2 = ... = \beta_p = 0
$$

其中 $\beta_i$ 是 Y 对 X 滞后项的回归系数。如果拒绝原假设,我们就说 X 对 Y 具有格兰杰因果关系。

检验统计量服从 F 分布:

$$
F = \frac{(RSS_{restricted} - RSS_{full})/p}{RSS_{full}/(T-2p-1)}
$$

其中 $RSS$ 是残差平方和,$p$ 是滞后阶数,$T$ 是样本量。


二、数据获取:TickDB 的 REST API 封装

在进行统计检验之前,我们需要可靠的数据源。TickDB 提供 10 年级别的历史 K 线数据,涵盖美股和主流加密货币,适合跨资产关联分析。

以下代码封装了 TickDB 的 K 线数据获取,包含完整的错误处理和限频应对:

import os
import time
import requests
import pandas as pd
from typing import Optional
from datetime import datetime, timedelta


class TickDBClient:
    """TickDB REST API 客户端 - 包含重试机制和限频处理"""
    
    def __init__(self, base_url: str = "https://api.tickdb.ai/v1"):
        self.api_key = os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("请设置环境变量 TICKDB_API_KEY")
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({"X-API-Key": self.api_key})
    
    def _request_with_retry(
        self, 
        method: str, 
        endpoint: str, 
        params: Optional[dict] = None,
        max_retries: int = 3
    ) -> dict:
        """带限频处理的请求封装"""
        for attempt in range(max_retries):
            try:
                url = f"{self.base_url}{endpoint}"
                response = self.session.request(
                    method,
                    url,
                    params=params,
                    timeout=(3.05, 10)  # 连接超时, 读取超时
                )
                
                # 限频处理 (code:3001)
                if response.status_code == 429:
                    retry_after = int(response.headers.get("Retry-After", 5))
                    print(f"[限频] 等待 {retry_after} 秒后重试...")
                    time.sleep(retry_after)
                    continue
                
                data = response.json()
                code = data.get("code", 0)
                
                if code == 0:
                    return data.get("data", [])
                elif code == 3001:
                    retry_after = int(response.headers.get("Retry-After", 5))
                    print(f"[限频] 等待 {retry_after} 秒后重试...")
                    time.sleep(retry_after)
                    continue
                elif code in (1001, 1002):
                    raise ValueError("API Key 无效,请检查环境变量")
                else:
                    raise RuntimeError(f"API 错误 {code}: {data.get('message')}")
                    
            except requests.exceptions.Timeout:
                delay = min(2 ** attempt * 0.5, 10)  # 指数退避
                jitter = 0.1 * delay * (0.5 - 0.5)  # 轻微抖动
                print(f"[超时] {delay:.1f} 秒后重试...")
                time.sleep(delay + jitter)
                
        raise RuntimeError("达到最大重试次数")
    
    def get_klines(
        self,
        symbol: str,
        interval: str = "1h",
        limit: int = 1000,
        start_time: Optional[int] = None,
        end_time: Optional[int] = None
    ) -> pd.DataFrame:
        """
        获取历史 K 线数据
        
        Args:
            symbol: 交易品种,如 "BTC.USDT", "SPY.US"
            interval: K 线周期,1m/5m/15m/30m/1h/4h/1d
            limit: 获取数量上限
            start_time: 开始时间戳(毫秒)
            end_time: 结束时间戳(毫秒)
        
        Returns:
            DataFrame with columns: timestamp, open, high, low, close, volume
        """
        params = {
            "symbol": symbol,
            "interval": interval,
            "limit": limit
        }
        if start_time:
            params["start_time"] = start_time
        if end_time:
            params["end_time"] = end_time
        
        data = self._request_with_retry("GET", "/market/kline", params=params)
        
        if not data:
            return pd.DataFrame()
        
        df = pd.DataFrame(data)
        df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
        df.set_index("timestamp", inplace=True)
        
        # 确保数值类型
        numeric_cols = ["open", "high", "low", "close", "volume"]
        for col in numeric_cols:
            if col in df.columns:
                df[col] = pd.to_numeric(df[col], errors="coerce")
        
        return df

⚠️ 工程提示:生产环境中建议使用 aiohttp 进行异步数据获取,特别是需要同时拉取多个交易品种数据时。


三、跨资产数据同步与预处理

我们选择 BTC/USDT 和 SPY(标普 500 ETF)作为研究对象,使用 4 小时周期的 K 线数据。4 小时周期能够在保留日内信息的同时,过滤掉部分市场微观噪声。

import numpy as np
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.api import VAR
from statsmodels.stats.test_stattools import grangercausalitytests


def fetch_aligned_data(
    client: TickDBClient,
    symbols: list[str],
    interval: str = "4h",
    start_time: int,
    end_time: int
) -> pd.DataFrame:
    """
    获取并对齐多个资产的数据
    
    以时间戳为基准内连接,确保两个序列在相同时间点都有数据
    """
    dfs = {}
    
    for symbol in symbols:
        df = client.get_klines(
            symbol=symbol,
            interval=interval,
            start_time=start_time,
            end_time=end_time
        )
        if df.empty:
            raise ValueError(f"无法获取 {symbol} 的数据")
        dfs[symbol] = df[["close"]].rename(columns={"close": symbol})
    
    # 按时间戳内连接
    aligned = pd.concat(dfs.values(), axis=1)
    aligned = aligned.dropna()  # 删除任一资产缺失数据的时点
    
    print(f"数据时间范围: {aligned.index.min()} 至 {aligned.index.max()}")
    print(f"数据点数量: {len(aligned)}")
    
    return aligned


# 计算收益率(对价格序列进行对数差分)
def compute_returns(df: pd.DataFrame) -> pd.DataFrame:
    """计算对数收益率"""
    returns = np.log(df / df.shift(1)).dropna()
    returns.columns = [f"{col}_ret" for col in returns.columns]
    return returns


# 平稳性检验 (ADF 检验)
def adf_test(series: pd.Series, name: str) -> dict:
    """增广迪基-富勒检验"""
    result = adfuller(series, autolag="AIC")
    
    output = {
        "series": name,
        "adf_statistic": result[0],
        "p_value": result[1],
        "critical_values": result[4],
        "is_stationary": result[1] < 0.05
    }
    
    print(f"\n{'='*50}")
    print(f"ADF 检验结果: {name}")
    print(f"ADF 统计量: {result[0]:.4f}")
    print(f"P 值: {result[1]:.4f}")
    print(f"1% 临界值: {result[4]['1%']:.4f}")
    print(f"5% 临界值: {result[4]['5%']:.4f}")
    print(f"10% 临界值: {result[4]['10%']:.4f}")
    print(f"结论: {'平稳' if result[1] < 0.05 else '非平稳'} (5% 显著性水平)")
    print(f"{'='*50}")
    
    return output

为什么使用对数收益率而非价格本身?

价格序列通常是非平稳的(存在单位根),直接检验会产生"伪回归"问题。对数收益率是价格一阶对数差分,接近收益率概念,通常是平稳的。


四、VAR 模型与滞后阶数选择

格兰杰因果检验基于向量自回归(VAR)模型。VAR 将多个时间序列整合为一个系统,每个序列的当前值由自身和所有其他序列的滞后值解释:

$$
\begin{bmatrix} y_{1,t} \ y_{2,t} \end{bmatrix} = \sum_{i=1}^{p} A_i \begin{bmatrix} y_{1,t-i} \ y_{2,t-i} \end{bmatrix} + \begin{bmatrix} u_{1,t} \ u_{2,t} \end{bmatrix}
$$

其中 $p$ 是滞后阶数,需要通过信息准则确定。

from statsmodels.tsa.api import VAR
from statsmodels.tsa.stattools import adfuller


def select_optimal_lag(returns_df: pd.DataFrame, max_lag: int = 24) -> dict:
    """
    使用信息准则选择 VAR 模型的最优滞后阶数
    
    AIC: 倾向于选择较复杂模型
    BIC: 惩罚项更强,倾向于选择较简单模型
    """
    model = VAR(returns_df)
    
    # 滞后阶数选择
    lag_order = model.select_order(maxlags=max_lag)
    
    print("\nVAR 滞后阶数选择:")
    print(f"AIC 建议阶数: {lag_order.aic}")
    print(f"BIC 建议阶数: {lag_order.bic}")
    print(f"HQIC 建议阶数: {lag_order.hqic}")
    print(f"FPE 建议阶数: {lag_order.fpe}")
    
    # 默认使用 AIC 建议的阶数
    optimal_lag = lag_order.aic
    
    return {
        "optimal_lag": optimal_lag,
        "aic": lag_order.aic,
        "bic": lag_order.bic,
        "hqic": lag_order.hqic,
        "fpe": lag_order.fpe
    }


def estimate_var_and_test_granger(
    returns_df: pd.DataFrame,
    max_lag: int = 24
) -> dict:
    """
    完整流程: 平稳性检验 -> 阶数选择 -> 格兰杰因果检验
    """
    results = {}
    
    # Step 1: 分别检验两个序列的平稳性
    print("\n" + "="*60)
    print("第一步: 平稳性检验 (ADF Test)")
    print("="*60)
    
    for col in returns_df.columns:
        adf_result = adf_test(returns_df[col], col)
        results[col] = adf_result
        
        if not adf_result["is_stationary"]:
            print(f"\n⚠️ 警告: {col} 收益率序列非平稳")
            print("建议: 检验是否存在 ARCH 效应,或考虑差分阶数更高的变换")
    
    # Step 2: 选择最优滞后阶数
    print("\n" + "="*60)
    print("第二步: VAR 模型滞后阶数选择")
    print("="*60)
    
    lag_selection = select_optimal_lag(returns_df, max_lag)
    optimal_lag = lag_selection["optimal_lag"]
    
    if optimal_lag == 0:
        print("警告: 最优阶数为 0,可能意味着序列间没有显著的动态关系")
        optimal_lag = 1  # 设置最小阶数继续检验
    
    results["lag_selection"] = lag_selection
    
    # Step 3: 拟合 VAR 模型
    print("\n" + "="*60)
    print("第三步: VAR 模型拟合")
    print("="*60)
    
    model = VAR(returns_df)
    var_result = model.fit(optimal_lag)
    
    print(f"\nVAR({optimal_lag}) 模型摘要:")
    print(f"R-squared (BTC): {var_result.rsquared['BTC_USDT_ret']:.4f}")
    print(f"R-squared (SPY): {var_result.rsquared['SPY.US_ret']:.4f}")
    print(f"AIC: {var_result.aic:.4f}")
    print(f"BIC: {var_result.bic:.4f}")
    
    results["var_model"] = var_result
    
    # Step 4: 格兰杰因果检验
    print("\n" + "="*60)
    print("第四步: 格兰杰因果检验")
    print("="*60)
    
    # 注意: grangercausalitytests 的输入顺序
    # 第一个参数是 "被预测变量" 的 DataFrame
    # 第二个参数 maxlag 是检验的最大滞后阶数
    
    print("\n检验一: BTC 是否格兰杰导致 SPY")
    print("(BTC 的过去值能否提升 SPY 的预测精度)")
    gc_btc_to_spy = grangercausalitytests(
        returns_df[["SPY.US_ret", "BTC_USDT_ret"]],  # [被预测变量, 预测变量]
        maxlag=optimal_lag,
        verbose=True
    )
    
    print("\n" + "-"*60)
    print("\n检验二: SPY 是否格兰杰导致 BTC")
    print("(SPY 的过去值能否提升 BTC 的预测精度)")
    gc_spy_to_btc = grangercausalitytests(
        returns_df[["BTC_USDT_ret", "SPY.US_ret"]],  # [被预测变量, 预测变量]
        maxlag=optimal_lag,
        verbose=True
    )
    
    results["gc_btc_to_spy"] = gc_btc_to_spy
    results["gc_spy_to_btc"] = gc_spy_to_btc
    
    return results

五、结果解读:如何从 F 统计量到交易决策

格兰杰因果检验输出的是每个滞后阶数下的 F 统计量和 P 值。解读结果时需要关注三个维度:

5.1 统计显著性

P 值 < 0.05 通常作为"显著"的判断标准。但这只是一个参考——在样本量极大时,即使经济意义微弱的关系也可能统计显著。

def extract_gc_summary(gc_result: dict, optimal_lag: int) -> pd.DataFrame:
    """
    从格兰杰因果检验结果中提取汇总表
    
    gc_result: grangercausalitytests 的返回值
    optimal_lag: 最优滞后阶数
    """
    summary_rows = []
    
    for lag in range(1, optimal_lag + 1):
        ssr_ftest = gc_result[lag][0]["ssr_ftest"]
        
        summary_rows.append({
            "lag": lag,
            "f_statistic": ssr_ftest[0],
            "p_value": ssr_ftest[1],
            "significance": "***" if ssr_ftest[1] < 0.01 
                           else "**" if ssr_ftest[1] < 0.05 
                           else "*" if ssr_ftest[1] < 0.1 
                           else ""
        })
    
    summary_df = pd.DataFrame(summary_rows)
    return summary_df


def interpret_results(gc_btc_to_spy: dict, gc_spy_to_btc: dict, optimal_lag: int):
    """
    综合解读格兰杰因果检验结果
    
    Returns:
        dict with interpretation
    """
    # 提取最优阶数的检验结果
    btctosspy = gc_btc_to_spy[optimal_lag][0]["ssr_ftest"]
    spytobtc = gc_spy_to_btc[optimal_lag][0]["ssr_ftest"]
    
    interpretation = {
        "btc_granger_causes_spy": {
            "f_stat": btctosspy[0],
            "p_value": btctosspy[1],
            "significant": btctosspy[1] < 0.05,
            "interpretation": ""
        },
        "spy_granger_causes_btc": {
            "f_stat": spytobtc[0],
            "p_value": spytobtc[1],
            "significant": spytobtc[1] < 0.05,
            "interpretation": ""
        },
        "relationship": ""
    }
    
    # 解读 BTC -> SPY
    if interpretation["btc_granger_causes_spy"]["significant"]:
        interpretation["btc_granger_causes_spy"]["interpretation"] = \
            "BTC 的历史信息对 SPY 有显著的预测能力"
    else:
        interpretation["btc_granger_causes_spy"]["interpretation"] = \
            "未发现 BTC 对 SPY 的格兰杰因果关系"
    
    # 解读 SPY -> BTC
    if interpretation["spy_granger_causes_btc"]["significant"]:
        interpretation["spy_granger_causes_btc"]["interpretation"] = \
            "SPY 的历史信息对 BTC 有显著的预测能力"
    else:
        interpretation["spy_granger_causes_btc"]["interpretation"] = \
            "未发现 SPY 对 BTC 的格兰杰因果关系"
    
    # 综合判断
    if btctosspy[1] < 0.05 and spytobtc[1] < 0.05:
        interpretation["relationship"] = "双向格兰杰因果:存在反馈机制,两者相互影响"
    elif btctosspy[1] < 0.05 and spytobtc[1] >= 0.05:
        interpretation["relationship"] = "单向格兰杰因果:BTC 领先 SPY,BTC 是先行指标"
    elif btctosspy[1] >= 0.05 and spytobtc[1] < 0.05:
        interpretation["relationship"] = "单向格兰杰因果:SPY 领先 BTC,传统市场是先行指标"
    else:
        interpretation["relationship"] = "无格兰杰因果:两者在统计上是独立的,无法相互预测"
    
    return interpretation

5.2 经济显著性

P 值显著后,需要进一步考察"预测能力的实际幅度"。这涉及比较加入 X 的滞后项后,Y 的预测误差方差降低了多少。

def compute_prediction_improvement(
    returns_df: pd.DataFrame,
    target_col: str,
    predictor_col: str,
    optimal_lag: int
) -> dict:
    """
    计算加入预测变量后,目标变量的预测改进幅度
    
    比较受约束模型(仅用自身滞后)和非受约束模型(加入预测变量滞后)
    """
    y = returns_df[target_col]
    x = returns_df[predictor_col]
    
    # 非受约束模型: y ~ y(-1)...y(-p) + x(-1)...x(-p)
    # 受约束模型: y ~ y(-1)...y(-p)
    
    # 使用 VAR 的似然比检验结果
    model = VAR(returns_df)
    var_result = model.fit(optimal_lag)
    
    # 计算预测误差
    forecast = var_result.forecast(returns_df.values[-optimal_lag:], steps=1)
    
    # 简化版:计算加入 X 后 R² 的变化
    # (实际工程中应使用更严谨的预测误差比较)
    
    return {
        "target": target_col,
        "predictor": predictor_col,
        "note": "需要更详细的预测误差分析以量化经济显著性"
    }

5.3 时变性

格兰杰因果关系可能是时变的——2020 年存在的关系在 2024 年可能消失。滚动窗口检验可以揭示这种动态变化:

def rolling_granger_test(
    returns_df: pd.DataFrame,
    window_size: int = 500,
    step_size: int = 100,
    max_lag: int = 4
) -> pd.DataFrame:
    """
    滚动窗口格兰杰因果检验
    
    揭示领先滞后关系的时变性
    """
    results = []
    
    for start in range(0, len(returns_df) - window_size, step_size):
        end = start + window_size
        window_data = returns_df.iloc[start:end]
        
        # 检查数据量是否足够
        if len(window_data) < window_size * 0.8:
            continue
        
        # 重新进行 ADF 检验
        btc_adf = adfuller(window_data.iloc[:, 0])
        spy_adf = adfuller(window_data.iloc[:, 1])
        
        if btc_adf[1] > 0.05 or spy_adf[1] > 0.05:
            continue  # 跳过非平稳窗口
        
        try:
            # 选择滞后阶数
            model = VAR(window_data)
            lag_order = model.select_order(maxlags=max_lag)
            optimal = max(lag_order.aic, 1)
            
            # 格兰杰检验
            gc_result = grangercausalitytests(
                window_data,
                maxlag=optimal,
                verbose=False
            )
            
            # 记录最优滞后阶数的结果
            btctosspy = gc_result[optimal][0]["ssr_ftest"]
            spytobtc = gc_result[optimal][0]["ssr_ftest"]
            
            results.append({
                "window_end": window_data.index[-1],
                "lag": optimal,
                "btc_to_spy_p": btctosspy[1],
                "spy_to_btc_p": spytobtc[1],
                "btc_to_spy_f": btctosspy[0],
                "spy_to_btc_f": spytobtc[0],
                "dominant_direction": "BTC->SPY" if btctosspy[1] < spytobtc[1] 
                                     else "SPY->BTC" if spytobtc[1] < btctosspy[1] 
                                     else "None"
            })
        except Exception as e:
            print(f"窗口 {window_data.index[0]} 到 {window_data.index[-1]} 检验失败: {e}")
            continue
    
    return pd.DataFrame(results)

六、完整分析流程示例

以下是一个完整的分析流程,从数据获取到结果解读:

def main():
    """
    完整流程: 获取数据 -> 预处理 -> 平稳性检验 -> VAR 定阶 -> 格兰杰因果检验
    """
    # 配置
    START_TIME = int((datetime.now() - timedelta(days=730)).timestamp() * 1000)
    END_TIME = int(datetime.now().timestamp() * 1000)
    
    # 初始化客户端
    client = TickDBClient()
    
    # 获取数据
    print("正在获取数据...")
    data = fetch_aligned_data(
        client,
        symbols=["BTC.USDT", "SPY.US"],
        interval="4h",
        start_time=START_TIME,
        end_time=END_TIME
    )
    
    # 计算收益率
    returns = compute_returns(data)
    print(f"\n收益率数据形状: {returns.shape}")
    print("\n收益率统计摘要:")
    print(returns.describe())
    
    # 执行完整分析
    results = estimate_var_and_test_granger(returns, max_lag=24)
    
    # 解读结果
    optimal_lag = results["lag_selection"]["optimal_lag"]
    interpretation = interpret_results(
        results["gc_btc_to_spy"],
        results["gc_spy_to_btc"],
        optimal_lag
    )
    
    print("\n" + "="*60)
    print("最终结论")
    print("="*60)
    print(f"BTC -> SPY: {interpretation['btc_granger_causes_spy']['interpretation']}")
    print(f"  F统计量: {interpretation['btc_granger_causes_spy']['f_stat']:.4f}")
    print(f"  P值: {interpretation['btc_granger_causes_spy']['p_value']:.4f}")
    print()
    print(f"SPY -> BTC: {interpretation['spy_granger_causes_btc']['interpretation']}")
    print(f"  F统计量: {interpretation['spy_granger_causes_btc']['f_stat']:.4f}")
    print(f"  P值: {interpretation['spy_granger_causes_btc']['p_value']:.4f}")
    print()
    print(f"综合判断: {interpretation['relationship']}")


if __name__ == "__main__":
    main()

七、研究的局限性与注意事项

7.1 统计局限

局限 说明 缓解方法
伪回归 非平稳序列可能产生虚假相关 ADF 检验、协整检验
因果不等于预测能力 格兰杰因果是预测能力的必要条件非充分条件 样本外预测验证
线性假设 VAR 假设线性关系,可能遗漏非线性动态 检查残差分布、考虑非线性模型
滞后阶数选择敏感性 不同信息准则可能给出不同阶数 比较多个阶数的结果稳健性

7.2 市场结构变化

加密市场与传统市场之间的关系在监管政策变化、机构参与度提升后可能发生结构性变化。历史检验结果不构成未来预测保证。

7.3 数据频率敏感性

我们选择了 4 小时周期。在 1 小时、日线等不同周期上,领先滞后关系可能不同。高频数据中还存在隔夜收益率跳跃、非交易时间等信息不对称问题。


八、结语

格兰杰因果检验是量化研究中验证领先滞后关系的利器,但它不是万能的魔法。统计显著性不等于经济显著性,过去的关系不等于未来的规律。

本文的核心价值在于提供了一套完整的工程实现流程:从数据获取、平稳性检验、模型定阶,到格兰杰因果检验和结果解读。代码中的每个模块都可以独立使用或替换。

在实际策略开发中,建议将格兰杰因果检验作为因子候选的筛选工具,而非直接信号。真正有价值的因子需要满足:统计上显著、逻辑上可解释、样本外稳健。


下一步行动

如果你希望亲手复现本文的分析流程

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY,复制本文代码即可运行
  4. 尝试更换交易品种(ETH、BTC、QQQ)或时间周期

如果你对跨资产关联分析有更深入的需求

  • 协整检验:发现均值回归关系
  • DCC-GARCH:捕捉动态相关性
  • Impulse Response:量化冲击传导路径

如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,快速调用 TickDB 的数据接口。


回测局限性说明:本文展示的是统计检验方法论,未进行完整的回测验证。实际交易中需要考虑交易成本、滑点、流动性冲击等因素。历史数据中的领先滞后关系可能因市场结构变化而失效。