收盘钟声响起的那一刻,你的策略才刚开始被审判

"你的因子在今天贡献了多少 alpha?"

这不是一个能靠"感觉"回答的问题。职业量化团队的答案来自盘后工作流:数据清洗、归因分析、信号预计算、风险检查——一套完整的自动化流水线,在纽交所 4:00 ET 收盘后立即启动,到次日凌晨 5:00 前完成所有准备工作。对于一个有竞争力的团队,这套系统的工程质量,直接决定了明天早上你坐在屏幕前时,手里握的是一把磨好的刀,还是一堆待处理的乱麻。

本文拆解一个典型的职业级盘后自动化工作流,从任务调度设计、数据 ETL 管道、策略归因分析脚本,到明日信号预计算,提供可直接落地的代码架构。目标不是"演示",是"能上线跑三年的系统"。


一、为什么盘后不是"收尾"而是"第二轮开始"

很多人以为收盘后就是整理数据、写写复盘笔记。但职业团队的盘后工作流,本质上是第二轮策略生产:用今天的全量数据对策略进行事后检验,从中提取改进信号,并为明日的实盘提供结构化的输入。

这条流水线的价值在于三个"早":

  • 早归档:今天产生的每一笔成交记录、每一个 order book 快照,必须在数据结构化后存档。手工操作意味着遗漏和延迟,自动化的归档在收盘后 15 分钟内完成。
  • 早归因:在隔夜信息(美股盘后财报发布、宏观经济数据)影响次日开盘之前,团队需要知道今天因子体系里谁在赚钱、谁在失血。归因报告的及时性决定了你调整组合的速度。
  • 早预计算:次日开盘前的宏观事件、盘前财报、板块异动信息,可以在盘后预先编码为"信号提醒",而不是开盘后手忙脚乱地临时分析。

这三个"早",依赖一套自动化工作流。人工盘后复盘的团队,永远比自动化团队慢半拍——而且慢的那半拍,通常是在凌晨 1:00 到 3:00 之间最疲劳的时候。


二、盘后工作流全景:五个阶段的时间窗口

一个完整的盘后工作流,按时间顺序分为五个阶段:

阶段 时间窗口(EST) 核心任务 自动化要求
T+0 归档 16:00 – 16:30 TickDB/API → 数据湖,生成原始表 脚本自动,失败告警
T+1 清洗 16:30 – 17:30 缺失值处理、异常值剔除、价格对齐 可配置规则,幂等执行
T+2 归因 17:30 – 19:00 因子暴露、收益归因、风险归因 全量报告 + 增量告警
T+3 预计算 19:00 – 23:00 明日信号预计算、事件编码、板块扫描 批量任务,并发执行
T+4 质检 23:00 – 05:00 数据一致性检查、模型回滚准备、次日白板 静默运行,出错钉钉

时间窗口的设计遵循一个原则:越早完成的任务,越优先自动化。归档和清洗是后续所有步骤的数据基础,任何延迟都会级联放大。

接下来逐阶段展开架构和代码。


三、T+0 数据归档:TickDB API 的定时拉取与持久化

3.1 归档架构设计

盘后归档的本质是从数据源(TickDB REST API)拉取当天完整的行情数据,写入本地数据湖(PostgreSQL 或 DuckDB)。架构上需要注意三个问题:

  1. 拉取时机:纽交所 16:00 收盘,但部分资产(期权、期货)的数据可能在 16:15 之后才完全结算。策略需要等数据完全写入后再归档,避免截断。
  2. 增量 vs 全量:日频因子归因需要当天全量数据;高频因子可以用增量归档节省带宽。但对新手团队,建议全量归档以保证一致性。
  3. 幂等性:同一个任务重新运行不应重复写入数据,需要在入库层做唯一键约束或 upsert 逻辑。

3.2 TickDB 历史 K 线拉取脚本

归档的第一步是从 TickDB 获取当日的 K 线数据。以下代码展示了生产级的定时拉取实现,包含重试机制、超时设置和环境变量管理:

import os
import time
import json
import logging
import requests
from datetime import datetime, timezone, timedelta
from pathlib import Path
import duckdb

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[logging.StreamHandler(), logging.FileHandler("/var/log/tickdb_archive.log")]
)
logger = logging.getLogger(__name__)

# ── 配置 ──────────────────────────────────────────────────────────
API_KEY = os.environ.get("TICKDB_API_KEY")
BASE_URL = "https://api.tickdb.ai/v1/market"
WATCHLIST = ["AAPL.US", "MSFT.US", "SPY.US", "QQQ.US", "NVDA.US"]

HEADERS = {"X-API-Key": API_KEY}
RETRY_CONFIG = {"max_retries": 3, "base_delay": 2, "max_delay": 30}


def fetch_klines(symbol: str, interval: str = "1m", limit: int = 500,
                 start_time: int = None, end_time: int = None) -> list:
    """
    从 TickDB 获取 K 线数据,支持时间范围过滤。
    
    ⚠️ 注意:若需要获取历史大时间范围,请使用循环翻页(page_token)
    而非一次性拉取大量数据,避免触发限频。
    """
    params = {"symbol": symbol, "interval": interval, "limit": limit}
    if start_time:
        params["start_time"] = start_time
    if end_time:
        params["end_time"] = end_time

    for attempt in range(RETRY_CONFIG["max_retries"]):
        try:
            resp = requests.get(
                f"{BASE_URL}/kline",
                headers=HEADERS,
                params=params,
                timeout=(3.05, 10)  # 连接超时 3.05s,读取超时 10s
            )

            if resp.status_code == 200:
                data = resp.json()
                if data.get("code") == 0:
                    return data["data"]
                elif data.get("code") == 3001:
                    retry_after = int(resp.headers.get("Retry-After", 5))
                    logger.warning(f"限频: 等待 {retry_after}s (attempt {attempt + 1})")
                    time.sleep(retry_after)
                    continue
                else:
                    logger.error(f"API 错误 {data.get('code')}: {data.get('message')}")
                    return []
            else:
                logger.warning(f"HTTP {resp.status_code} (attempt {attempt + 1})")

        except requests.exceptions.Timeout:
            logger.warning(f"请求超时 (attempt {attempt + 1})")
        except requests.exceptions.RequestException as e:
            logger.error(f"网络异常: {e}")
            return []

        delay = min(RETRY_CONFIG["base_delay"] * (2 ** attempt), RETRY_CONFIG["max_delay"])
        jitter = time.uniform(0, delay * 0.1)
        time.sleep(delay + jitter)

    logger.error(f"拉取 {symbol} K线失败,已达最大重试次数")
    return []


def store_to_duckdb(symbol: str, interval: str, records: list, db_path: str = "data_lake.duckdb"):
    """将 K 线记录写入 DuckDB,启用 upsert 语义避免重复写入"""
    if not records:
        return 0

    con = duckdb.connect(db_path, read_only=False)
    con.execute("""
        CREATE TABLE IF NOT EXISTS kline_archive (
            symbol VARCHAR,
            interval VARCHAR,
            open_time BIGINT,
            close_time BIGINT,
            open DOUBLE, high DOUBLE, low DOUBLE, close DOUBLE,
            volume DOUBLE,
            insert_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            PRIMARY KEY (symbol, interval, open_time)
        )
    """)

    rows = [
        (
            symbol,
            interval,
            r["open_time"],
            r["close_time"],
            r["open"],
            r["high"],
            r["low"],
            r["close"],
            r["volume"],
        )
        for r in records
    ]

    con.executemany(
        "INSERT OR REPLACE INTO kline_archive "
        "(symbol, interval, open_time, close_time, open, high, low, close, volume) "
        "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
        rows
    )
    con.close()
    return len(rows)


def run_daily_archive(target_date: datetime = None):
    """
    每日归档主流程:
    1. 计算目标日期的时间戳范围(16:00 – 16:15 EST,等结算完成)
    2. 按标的拉取当日 K 线
    3. 写入本地 DuckDB
    """
    if target_date is None:
        target_date = datetime.now(timezone.utc) - timedelta(days=1)

    # 转换为毫秒时间戳(UTC,16:00 EST = 21:00 UTC)
    start_ts = int((target_date.replace(hour=21, minute=0, second=0)).timestamp() * 1000)
    end_ts = int((target_date.replace(hour=21, minute=15, second=0)).timestamp() * 1000)

    total_stored = 0
    failed_symbols = []

    for symbol in WATCHLIST:
        logger.info(f"归档 {symbol} ({target_date.strftime('%Y-%m-%d')})")
        records = fetch_klines(symbol, interval="1m", start_time=start_ts, end_time=end_ts)

        if records:
            count = store_to_duckdb(symbol, "1m", records)
            total_stored += count
            logger.info(f"  → 写入 {count} 条记录")
        else:
            failed_symbols.append(symbol)
            logger.warning(f"  → {symbol} 无数据或拉取失败")

    # 归档完成后发送告警
    if failed_symbols:
        logger.error(f"归档完成,但以下标的失败: {failed_symbols}")
    else:
        logger.info(f"✅ 归档完成,共写入 {total_stored} 条记录")

    return {"total": total_stored, "failed": failed_symbols}


if __name__ == "__main__":
    run_daily_archive()

⚠️ 生产环境提醒:上述代码使用同步 requests 库,在高频归档场景(50+ 标的、1 分钟间隔)下建议替换为 aiohttp + asyncio,并发拉取可将总耗时从串行的 N×2s 降低到并发运行的约 2-3s。但对日频归档(每日一次),同步版本已足够稳定。

3.3 定时调度配置

归档脚本通过系统级定时任务(cron)触发。生产环境建议用 systemd timer 替代 cron,好处是可以配置依赖关系、在失败后自动重启:

# /etc/systemd/system/tickdb-archive.service
[Unit]
Description=TickDB Daily Archive
After=network-online.target
Wants=network-online.target

[Service]
Type=oneshot
Environment="TICKDB_API_KEY=%ENV{TICKDB_API_KEY}"
ExecStart=/opt/quant/venv/bin/python /opt/quant/scripts/daily_archive.py
User=quant
StandardOutput=journal
StandardError=journal

[Install]
WantedBy=multi-user.target
# /etc/systemd/system/tickdb-archive.timer
[Unit]
Description=TickDB Daily Archive Timer (16:20 EST)
Requires=tickdb-archive.service

[Timer]
OnCalendar=*-*-* 21:20:00 UTC
Persistent=true
RandomizedDelaySec=60

[Install]
WantedBy=timers.target

RandomizedDelaySec=60 是工程上的一个小技巧:在多实例部署时,避免所有 worker 同时在 21:20:00 准时发起请求,造成瞬时流量尖峰。


四、T+1 数据清洗:构建一致性数据管道

4.1 清洗规则设计

原始归档数据包含三类需要处理的问题:

问题类型 典型表现 处理策略
缺失值 某分钟 K 线缺失(如交易所短暂故障) 前向填充(ffill),缺失超过 3 分钟则标记为无效
异常值 价格为 0、成交量为负、价格跳变 > 20σ 标记为 NaN,参与计算时跳过
浮点精度 价格为 152.3000000000001 入库前统一 round 到精度 4 位

清洗管道的核心原则是可追溯:每条被清洗过的数据必须记录原因,便于事后回溯和规则调整。

4.2 清洗 ETL 脚本

import duckdb
import pandas as pd
import numpy as np
from datetime import datetime, timezone
from pathlib import Path
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def load_raw_data(symbol: str, date: str, db_path: str = "data_lake.duckdb") -> pd.DataFrame:
    con = duckdb.connect(db_path, read_only=True)
    df = con.execute(f"""
        SELECT * FROM kline_archive
        WHERE symbol = '{symbol}'
          AND DATE(from_unixtime(open_time / 1000)) = '{date}'
        ORDER BY open_time
    """).df()
    con.close()
    return df


def clean_kline(df: pd.DataFrame, max_price_jump: float = 0.2) -> pd.DataFrame:
    """
    标准化 K 线清洗流程:
    1. 标记缺失时间窗口
    2. 剔除异常价格(跳变超过 max_price_jump)
    3. 前向填充缺失值
    4. 记录清洗日志
    """
    if df.empty:
        return df

    df = df.copy()
    df["open_dt"] = pd.to_datetime(df["open_time"], unit="ms", utc=True)
    df["close_dt"] = pd.to_datetime(df["close_time"], unit="ms", utc=True)

    # ── 1. 检测时间序列缺失 ──────────────────────────────────────────
    df["expected_interval"] = df["open_time"].diff()
    df["missing_flag"] = df["expected_interval"] > 65_000  # 超过 1 分钟(留 5s 容差)

    missing_count = df["missing_flag"].sum()
    if missing_count > 0:
        logger.warning(f"检测到 {missing_count} 个时间窗口缺失")

    # ── 2. 异常价格检测(价格跳变)─────────────────────────────────
    df["price_pct_change"] = df["close"].pct_change().abs()
    df["price_spike_flag"] = df["price_pct_change"] > max_price_jump

    spikes = df[df["price_spike_flag"]]
    if not spikes.empty:
        logger.warning(f"检测到 {len(spikes)} 个价格跳变异常: {spikes[['open_dt', 'close', 'price_pct_change']].values}")

    # 异常值替换为 NaN(不直接删除,保留行位置)
    df.loc[df["price_spike_flag"], ["open", "high", "low", "close"]] = np.nan

    # ── 3. 浮点精度归一化 ──────────────────────────────────────────
    price_cols = ["open", "high", "low", "close"]
    df[price_cols] = df[price_cols].round(4)

    # ── 4. 前向填充缺失值 ──────────────────────────────────────────
    # 仅填充价格列,成交量不填充(因为成交量缺失意味着没有成交)
    df[price_cols] = df[price_cols].fillna(method="ffill", limit=1)

    # ── 5. 汇总清洗元数据 ──────────────────────────────────────────
    clean_stats = {
        "total_rows": len(df),
        "missing_windows": int(missing_count),
        "price_spikes": int(len(spikes)),
        "null_after_clean": int(df[price_cols].isnull().sum().sum()),
    }
    logger.info(f"清洗完成: {clean_stats}")

    # 保留清洗元数据供归因分析使用
    df.attrs["clean_stats"] = clean_stats
    return df


def save_clean_data(df: pd.DataFrame, symbol: str, date: str, db_path: str = "data_lake.duckdb"):
    """写入清洗后的数据表(含清洗元数据)"""
    if df.empty:
        return

    con = duckdb.connect(db_path, read_only=False)

    # 清洗后的数据写单独表,便于区分原始数据和分析数据
    con.execute(f"""
        CREATE TABLE IF NOT EXISTS kline_clean (
            symbol VARCHAR, interval VARCHAR, open_time BIGINT,
            close_time BIGINT, open DOUBLE, high DOUBLE, low DOUBLE,
            close DOUBLE, volume DOUBLE, missing_flag BOOLEAN,
            price_spike_flag BOOLEAN, clean_dt TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)

    insert_df = df.drop(columns=["open_dt", "close_dt", "expected_interval",
                                  "price_pct_change"], errors="ignore")
    con.execute(f"DELETE FROM kline_clean WHERE symbol = '{symbol}' "
                f"AND DATE(from_unixtime(open_time / 1000)) = '{date}'")
    con.append("kline_clean", insert_df)
    con.close()
    logger.info(f"✅ {symbol} {date} 清洗数据已写入")


if __name__ == "__main__":
    import sys
    symbol = sys.argv[1] if len(sys.argv) > 1 else "AAPL.US"
    date = sys.argv[2] if len(sys.argv) > 2 else (
        datetime.now(timezone.utc) - pd.Timedelta(days=1)
    ).strftime("%Y-%m-%d")

    raw = load_raw_data(symbol, date)
    clean = clean_kline(raw)
    save_clean_data(clean, symbol, date)

清洗脚本的关键设计决策:不直接删除异常行,而是标记后保留。这在归因分析中非常重要——你需要区分"今天某因子数据有问题"和"今天某因子策略失效",两者需要不同的响应方式。


五、T+2 策略归因分析:Brinson 模型简化实现

5.1 归因的目标:回答三个问题

盘后归因的核心目标是回答:

  1. 总量问题:今天组合的总收益是多少?跑赢基准多少?
  2. 结构问题:收益来自行业配置(allocation effect)还是个股选择(selection effect)?
  3. 因子问题:今天赚钱的因子是哪个?亏损的因子是否与预期一致?

职业团队的归因报告通常在收盘后 2-3 小时内生成,在夜盘(美股盘后无夜盘,以期货为参考)开始前送达组合经理手中。

5.2 Brinson 模型简化实现

Brinson 归因将超额收益分解为三个效应。这里提供一个简化的日频 Brinson 模型实现,代码可直接接入 T+1 清洗后的数据:

import duckdb
import pandas as pd
import numpy as np
from dataclasses import dataclass
from typing import Optional
from datetime import datetime, timezone


@dataclass
class AttributionResult:
    """归因结果数据结构"""
    symbol: str
    sector: str
    weight: float
    actual_return: float
    benchmark_return: float
    allocation_effect: float   # 配置效应:被动持有该行业带来的超额收益
    selection_effect: float    # 选择效应:选股能力带来的超额收益
    interaction_effect: float  # 交互效应:配置×选股共同贡献
    total_effect: float        # 总超额收益 = allocation + selection + interaction


def compute_daily_attribution(
    portfolio_weights: dict,   # {symbol: weight}
    actual_returns: dict,      # {symbol: daily_return}
    sector_map: dict,          # {symbol: sector}
    benchmark_returns: dict,   # {symbol: benchmark_return}
    benchmark_sector_weights: dict,  # {sector: benchmark_weight}
    risk_free_rate: float = 0.0
) -> pd.DataFrame:
    """
    简化 Brinson 归因(日频版):

    对于每个持仓标的,计算其在总组合超额收益中的贡献分解。
    适用于股票组合的日频归因,不涉及期货、外汇等衍生品。

    公式:
        allocation_effect = (w_p - w_b) * r_b_sector
        selection_effect  = w_p * (r_p - r_b_sector)
        interaction       = (w_p - w_b) * (r_p - r_b_sector)
        total_effect      = allocation_effect + selection_effect + interaction

    其中 w_p 为组合中该标的权重,w_b 为基准中该行业权重,
    r_p 为标的实际收益,r_b_sector 为基准中该行业收益。
    """

    records = []

    for symbol, w_p in portfolio_weights.items():
        r_p = actual_returns.get(symbol, 0.0)
        sector = sector_map.get(symbol, "Unknown")
        r_b_sector = benchmark_returns.get(sector, 0.0)
        w_b = benchmark_sector_weights.get(sector, 0.0)

        allocation_effect = (w_p - w_b) * r_b_sector
        selection_effect = w_p * (r_p - r_b_sector)
        interaction = (w_p - w_b) * (r_p - r_b_sector)
        total_effect = allocation_effect + selection_effect + interaction

        records.append(AttributionResult(
            symbol=symbol, sector=sector,
            weight=w_p, actual_return=r_p,
            benchmark_return=r_b_sector,
            allocation_effect=allocation_effect,
            selection_effect=selection_effect,
            interaction_effect=interaction,
            total_effect=total_effect,
        ))

    df = pd.DataFrame(records)

    # 汇总统计
    total_excess = df["total_effect"].sum()
    allocation_total = df["allocation_effect"].sum()
    selection_total = df["selection_effect"].sum()
    interaction_total = df["interaction_effect"].sum()

    logger.info(f"""
    ╔══════════════════════════════════════╗
    ║       {datetime.now(timezone.utc).strftime('%Y-%m-%d')} 归因报告        ║
    ╠══════════════════════════════════════╣
    ║ 配置效应:     {allocation_total*100:>+.4f}%
    ║ 选股效应:     {selection_total*100:>+.4f}%
    ║ 交互效应:     {interaction_total*100:>+.4f}%
    ║ ────────────────────────────────║
    ║ 总超额收益:   {total_excess*100:>+.4f}%
    ╚══════════════════════════════════════╝
    """)

    return df.sort_values("total_effect", ascending=False)


def generate_attribution_report(
    date: str,
    db_path: str = "data_lake.duckdb",
    output_path: str = "reports/"
) -> str:
    """
    生成日频归因报告主流程:
    1. 从数据湖加载当日收盘价和基准数据
    2. 计算持仓收益
    3. 执行 Brinson 归因
    4. 输出 Markdown 格式报告
    """
    Path(output_path).mkdir(exist_ok=True)

    con = duckdb.connect(db_path, read_only=True)

    # 示例数据(实际场景从组合管理系统拉取持仓)
    # 这里用模拟数据演示归因框架
    portfolio_weights = {
        "AAPL.US": 0.20, "MSFT.US": 0.15, "NVDA.US": 0.18,
        "SPY.US": 0.25, "QQQ.US": 0.12, "TSLA.US": 0.10,
    }
    sector_map = {
        "AAPL.US": "Technology", "MSFT.US": "Technology", "NVDA.US": "Technology",
        "SPY.US": "Broad Market", "QQQ.US": "Technology",
        "TSLA.US": "Consumer Discretionary",
    }
    # 模拟当日收益
    actual_returns = {
        "AAPL.US": 0.023, "MSFT.US": 0.015, "NVDA.US": 0.047,
        "SPY.US": 0.012, "QQQ.US": 0.018, "TSLA.US": -0.031,
    }
    benchmark_returns = {
        "Technology": 0.020, "Consumer Discretionary": -0.010, "Broad Market": 0.012,
    }
    benchmark_sector_weights = {
        "Technology": 0.30, "Consumer Discretionary": 0.12, "Broad Market": 0.58,
    }

    con.close()

    result_df = compute_daily_attribution(
        portfolio_weights, actual_returns, sector_map,
        benchmark_returns, benchmark_sector_weights
    )

    # 生成 Markdown 报告
    report_path = f"{output_path}attribution_{date}.md"
    with open(report_path, "w") as f:
        f.write(f"# 策略归因报告 — {date}\n\n")
        f.write("## 超额收益分解\n\n")
        f.write("| 标的 | 行业 | 权重 | 实际收益 | 基准收益 | 配置效应 | 选股效应 | 交互效应 | 总效应 |\n")
        f.write("|------|------|------|---------|---------|---------|---------|---------|--------|\n")

        for _, row in result_df.iterrows():
            f.write(f"| {row['symbol']} | {row['sector']} | {row['weight']:.2%} | "
                    f"{row['actual_return']:+.2%} | {row['benchmark_return']:+.2%} | "
                    f"{row['allocation_effect']:+.4f} | {row['selection_effect']:+.4f} | "
                    f"{row['interaction_effect']:+.4f} | **{row['total_effect']:+.4f}** |\n")

        f.write("\n## 汇总\n\n")
        f.write(f"- 配置效应合计:{result_df['allocation_effect'].sum():+.4f}\n")
        f.write(f"- 选股效应合计:{result_df['selection_effect'].sum():+.4f}\n")
        f.write(f"- 交互效应合计:{result_df['interaction_effect'].sum():+.4f}\n")
        f.write(f"- **总超额收益:{result_df['total_effect'].sum():+.4f}**\n\n")
        f.write("---\n*报告生成时间:" + datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + "*\n")

    return report_path


if __name__ == "__main__":
    import sys
    date = sys.argv[1] if len(sys.argv) > 1 else (
        datetime.now(timezone.utc) - pd.Timedelta(days=1)
    ).strftime("%Y-%m-%d")
    path = generate_attribution_report(date)
    print(f"✅ 归因报告已生成: {path}")

⚠️ 归因模型的局限性:上述 Brinson 模型是简化版,适用于日频权益类组合。实际职业团队的归因系统通常包含:

  • 多因子归因( Barra / Axioma 模型):将收益分解到市值、动量、波动率等风险因子
  • 交易归因:区分持仓收益和交易执行收益(区分"选对了但买贵了"的情况)
  • 前推偏差检测:通过归因数据检测策略是否在近期样本上过度拟合
    本文的简化实现作为归因框架的起点,覆盖 80% 的日常归因需求。

六、T+3 明日信号预计算:事件驱动与数据准备

6.1 预计算的内容

盘后第三阶段是"为明天做准备"。职业团队在收盘后会做以下预计算:

事件编码:今晚有哪些重要信息会影响次日开盘?典型的预计算包括:

事件类型 数据来源 编码内容
盘后财报发布 新闻 API / 财报数据库 股票代码、预期 EPS、实际 EPS、Revenue surprise
宏观数据发布 Fed / BLS / ECB 官网 时间(EST)、指标名称、市场预期值、前值
期权到期日 CBOE 官网 到期日期、Gamma 集中区
指数成分调整 FTSE Russell / S&P 官网 调入/调出标的、调仓时间

技术面预扫描:收盘后对全市场标的运行"今日异动扫描",识别以下模式:

  • 突破 20 日/50 日均线的标的(供趋势策略使用)
  • 成交量突增 > 3σ 的标的(供事件驱动策略使用)
  • 波动率创季度新高的标的(供期权做市策略使用)

组合再平衡信号:根据当日归因结果,生成组合调整建议,供次日开盘前决策。

6.2 事件扫描与编码脚本

import requests
import json
import logging
from datetime import datetime, timezone, timedelta
from dataclasses import dataclass, asdict
from typing import List, Optional

logger = logging.getLogger(__name__)


@dataclass
class EarningsEvent:
    """盘后财报事件数据结构"""
    symbol: str
    company_name: str
    report_date: str          # 预期发布日期 (YYYY-MM-DD)
    expected_eps: Optional[float]
    actual_eps: Optional[float]
    expected_revenue: Optional[float]
    actual_revenue: Optional[float]
    surprise_pct: Optional[float]  # 实际 vs 预期偏离
    market_reaction_estimate: str  # "beat", "miss", "in-line", "pending"


@dataclass
class MacroEvent:
    """宏观数据事件"""
    event_name: str
    country: str
    release_time: str         # EST 时间
    indicator_name: str
    previous_value: str
    consensus: str
    impact_level: str         # "high", "medium", "low"


class PostMarketEventScanner:
    """
    盘后事件扫描器:整合财报、宏观数据,生成次日事件日历。
    
    ⚠️ 生产提示:此处使用模拟数据演示。生产环境应接入
    可靠数据源(如 Benzinga Pro、Alpha Vantage、Intrinio 等),
    并实现增量更新而非全量拉取。
    """

    def __init__(self, api_key: str = None):
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")

    def scan_earnings(self, date_range: tuple) -> List[EarningsEvent]:
        """
        扫描指定日期范围内的财报发布事件。
        
        Args:
            date_range: (start_date, end_date),格式 "YYYY-MM-DD"
        """
        # 示例:这里用模拟数据演示框架
        # 生产环境接入财报 API,按 symbol + date_range 过滤
        mock_events = [
            EarningsEvent(
                symbol="META.US", company_name="Meta Platforms",
                report_date=date_range[1], expected_eps=5.02, actual_eps=None,
                expected_revenue=41.5e9, actual_revenue=None,
                surprise_pct=None, market_reaction_estimate="pending"
            ),
            EarningsEvent(
                symbol="AMZN.US", company_name="Amazon",
                report_date=date_range[1], expected_eps=1.46, actual_eps=None,
                expected_revenue=143.2e9, actual_revenue=None,
                surprise_pct=None, market_reaction_estimate="pending"
            ),
        ]

        logger.info(f"扫描到 {len(mock_events)} 个待发布财报事件")
        return mock_events

    def scan_macro_events(self, target_date: str) -> List[MacroEvent]:
        """
        扫描次日宏观数据发布日程。
        
        宏观事件对期货(ES / NQ)和外汇市场有直接影响,
        是 CTA 和宏观策略盘前预计算的核心输入。
        """
        mock_macro = [
            MacroEvent(
                event_name="CPI 月率",
                country="US",
                release_time="08:30 EST",
                indicator_name="Consumer Price Index (MoM)",
                previous_value="0.2%",
                consensus="0.3%",
                impact_level="high"
            ),
            MacroEvent(
                event_name="FOMC 会议纪要",
                country="US",
                release_time="14:00 EST",
                indicator_name="FOMC Minutes",
                previous_value="维持利率不变",
                consensus="N/A",
                impact_level="high"
            ),
        ]
        logger.info(f"扫描到 {len(mock_macro)} 个宏观事件")
        return mock_macro

    def generate_tomorrow_brief(self, target_date: str) -> dict:
        """
        生成次日简报(明日开盘前必读):
        1. 事件日历(含时间线)
        2. 异动标的列表
        3. 盘前重点关注
        """
        earnings = self.scan_earnings((target_date, target_date))
        macro = self.scan_macro_events(target_date)

        brief = {
            "date": target_date,
            "earnings": [asdict(e) for e in earnings],
            "macro": [asdict(m) for m in macro],
            "generated_at": datetime.now(timezone.utc).isoformat(),
            "priority_items": self._rank_priority(earnings, macro),
        }
        return brief

    def _rank_priority(self, earnings: List, macro: List) -> List[str]:
        """按影响力排序次日关注事项"""
        items = []
        for e in earnings:
            items.append(f"📋 [{e.symbol}] 财报待发布(预期 EPS ${e.expected_eps})")
        for m in macro:
            items.append(f"📊 [{m.country}] {m.event_name} @ {m.release_time} "
                         f"(预期 {m.consensus}, 前值 {m.previous_value})")
        return sorted(items, key=lambda x: "📊" in x, reverse=True)


if __name__ == "__main__":
    import os
    os.environ.setdefault("TICKDB_API_KEY", "your_key_here")

    tomorrow = (datetime.now(timezone.utc) + timedelta(days=1)).strftime("%Y-%m-%d")
    scanner = PostMarketEventScanner()
    brief = scanner.generate_tomorrow_brief(tomorrow)

    print(json.dumps(brief, indent=2, ensure_ascii=False))

6.3 盘后预计算的整体调度架构

归因分析和预计算脚本由统一的调度器管理。推荐使用 AirflowDagster 管理复杂的依赖关系。以下是一个简化的 Airflow DAG 示例,展示了五个阶段的依赖链:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator

default_args = {
    "owner": "quant-team",
    "depends_on_past": False,
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="post_market_workflow",
    default_args=default_args,
    schedule="0 21 * * 1-5",  # 每周一至周五 21:00 UTC (16:00 EST)
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["daily", "post-market", "attribution"],
) as dag:

    start = EmptyOperator(task_id="start")

    # T+0: 数据归档
    archive = PythonOperator(
        task_id="archive_tickdb_data",
        python_callable=run_daily_archive,
    )

    # T+1: 数据清洗
    clean = PythonOperator(
        task_id="clean_and_validate",
        python_callable=lambda: [clean_kline(load_raw_data(s)) for s in WATCHLIST],
    )

    # T+2: 策略归因
    attribute = PythonOperator(
        task_id="compute_attribution",
        python_callable=lambda: generate_attribution_report(
            datetime.now().strftime("%Y-%m-%d")
        ),
    )

    # T+3: 事件预扫描
    prescan = PythonOperator(
        task_id="scan_tomorrow_events",
        python_callable=lambda: scanner.generate_tomorrow_brief(
            (datetime.now() + timedelta(days=1)).strftime("%Y-%m-%d")
        ),
    )

    # T+4: 质检与告警
    quality_check = PythonOperator(
        task_id="data_quality_check",
        python_callable=data_quality_report,
    )

    notify = PythonOperator(
        task_id="notify_team",
        python_callable=send_summary_to_slack,
        trigger_rule="all_done",  # 即使上游有失败也发送通知(带告警)
    )

    end = EmptyOperator(task_id="end")

    # 依赖链
    start >> archive >> clean >> [attribute, prescan] >> quality_check >> notify >> end

工程提示:在 DAG 调度中,clean 任务的输入依赖 archive 的输出。如果归档失败,清洗任务应该失败并阻止下游——这是 DAG 依赖管理的核心价值。trigger_rule="all_done" 仅用于通知任务,确保团队总是能收到当日状态的汇报,即使中间某个步骤出错。


七、T+4 质检与告警:让流水线自己说话

7.1 数据质量检查清单

盘后流水线的最后一个环节是质检。职业团队的质检脚本检查以下项目:

def data_quality_report(symbols: list, date: str, db_path: str = "data_lake.duckdb") -> dict:
    """
    盘后数据质量检查:
    返回检测结果字典,并在发现异常时记录告警。
    """
    checks = {
        "missing_data": [],
        "zero_volume_days": [],
        "price_spikes": [],
        "completeness_pct": {},
    }

    con = duckdb.connect(db_path, read_only=True)

    for symbol in symbols:
        # 1. 数据完整性:当日应有 390 分钟 K 线(美股 9:30-16:00)
        row_count = con.execute(f"""
            SELECT COUNT(*) FROM kline_clean
            WHERE symbol = '{symbol}'
              AND DATE(from_unixtime(open_time / 1000)) = '{date}'
        """).fetchone()[0]

        expected = 390
        completeness = row_count / expected * 100
        checks["completeness_pct"][symbol] = completeness

        if completeness < 95:
            checks["missing_data"].append({
                "symbol": symbol,
                "expected": expected,
                "actual": row_count,
                "gap": expected - row_count,
            })

        # 2. 零成交量检测
        zero_vol = con.execute(f"""
            SELECT COUNT(*) FROM kline_clean
            WHERE symbol = '{symbol}'
              AND DATE(from_unixtime(open_time / 1000)) = '{date}'
              AND volume = 0
        """).fetchone()[0]
        if zero_vol > 5:
            checks["zero_volume_days"].append({"symbol": symbol, "count": zero_vol})

        # 3. 价格异常检测(已在清洗阶段标记)
        spikes = con.execute(f"""
            SELECT COUNT(*) FROM kline_clean
            WHERE symbol = '{symbol}'
              AND DATE(from_unixtime(open_time / 1000)) = '{date}'
              AND price_spike_flag = True
        """).fetchone()[0]
        if spikes > 0:
            checks["price_spikes"].append({"symbol": symbol, "count": spikes})

    con.close()

    # 汇总告警
    alert_level = "ERROR" if checks["missing_data"] else \
                  "WARNING" if any(checks.values()) else "OK"

    logger.log(
        level=logging.ERROR if alert_level == "ERROR" else logging.WARNING,
        msg=f"数据质检 [{alert_level}]: "
            f"缺失 {len(checks['missing_data'])} 标的, "
            f"零成交量 {len(checks['zero_volume_days'])} 标的, "
            f"价格异常 {len(checks['price_spikes'])} 标的"
    )

    return {"status": alert_level, "checks": checks, "date": date}

7.2 告警路由

异常级别 触发条件 路由
ERROR 归档完全失败、核心标的数据缺失 立即钉钉/飞书通知 + 触发值班人员手机
WARNING 部分数据不完整、价格异常 异步消息,次日早会处理
OK 所有检查通过 仅写入日志,不打扰团队

告警的分级设计是工程上的重要取舍:把最重要的注意力留给最重要的问题。职业团队在凌晨 2:00 被一条 ERROR 叫醒,和被一条 WARNING 叫醒,团队的反应和处理质量完全不同。


八、部署方案:按团队规模选型

场景 数据量 推荐方案 说明
个人量化 < 20 标的 cron + DuckDB 轻量,零运维,本地运行
小团队(2-5 人) 20-100 标的 Airflow + PostgreSQL DAG 可视化,失败告警,协作友好
机构级 100+ 标的,多策略 Dagster + 数据仓库(Snowflake/BigQuery) 资产隔离,权限管理,审计日志

对于个人量化者,DuckDB 完全够用——它是一个嵌入式分析数据库,不需要独立部署服务器,读写性能在百万级日频数据上毫无压力。对于机构团队,PostgreSQL + Airflow 是行业标准组合,经历过大量生产环境的验证。


结语

收盘钟声敲响的那一刻,职业量化团队的工作才刚刚进入第二轮。

数据归档把今天的每一笔行情固化为可追溯的记录;数据清洗剔除噪声、保留信号;归因分析回答"今天赚了还是亏了,为什么";预计算在隔夜信息冲击次日开盘之前,提前为团队准备好行动指南;质检确保整个流水线的数据可信。

这五个环节中,每一个都可以手工完成——但手工意味着遗漏、延迟和疲劳期错误。自动化不是"省事",是"在凌晨 2:00 依然保持和下午 2:00 一样的质量"。

你的盘后工作流自动化了吗?


下一步行动

如果你在个人量化阶段,想快速搭建轻量盘后流水线:

  1. 注册 TickDB(免费层,API Key 即开即用)
  2. 将本文归档脚本中的 WATCHLIST 替换为你的持仓标的
  3. 配置 Linux cron 定时任务:20 21 * * 1-5 /opt/quant/venv/bin/python /opt/quant/scripts/daily_archive.py
  4. 本地 DuckDB 文件自动积累历史数据,可直接用 DuckDB SQL 做因子回测

如果你在团队协作阶段,需要 DAG 可视化和多人告警:

  1. 参考本文 DAG 示例,在 Airflow 中配置 post_market_workflow
  2. send_summary_to_slack 替换为你的团队通知渠道
  3. 将 DuckDB 替换为 PostgreSQL,启用远程访问供团队成员查询

如果你在机构阶段,需要企业级数据治理和权限管理:

  1. 联系 [email protected] 获取历史全量 K 线数据和机构 API 方案
  2. 部署 Dagster 替换 Airflow(资产隔离、代码版本溯源)
  3. 将 DuckDB 替换为 Snowflake / BigQuery,对接 BI 工具生成归因看板

如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,用自然语言描述你的策略思路,AI 将协助生成 TickDB 数据拉取引擎。


本文不构成任何投资建议。市场有风险,投资需谨慎。