每日收盘后,职业量化团队在做什么?盘后工作流自动化


纽约,凌晨 23:47。

曼哈顿中城某量化基金的交易室里,显示屏上的 K 线图早已定格。四个小时后 A 股集合竞价开始,七个小时后美股盘前交易启动。这中间的六个小时,对职业量化团队而言,不是休息窗口。

收盘铃响的那一刻,实时监控程序自动切换为批处理模式。历史行情数据从数据源归档入库,策略的日度归因报告在后台跑着,而另一组脚本已经开始预计算明日的交易信号边界。

这不是加班文化,而是量化交易的工程节奏——每一个可预测的任务,都应该被机器接管;每一个需要人工介入的瞬间,都意味着系统存在漏洞。

本文拆解职业量化团队的盘后工作流,涵盖数据归档、策略归因、信号预计算三个核心环节,给出可直接运行的生产级 Python 脚本。


一、盘后工作流全景:三个阶段,环环相扣

职业量化团队的盘后工作流通常分为三个阶段,每个阶段都有明确的输入、处理逻辑和输出。

阶段 核心目标 典型耗时 自动化程度
数据归档 将当日市场数据清洗入库 5-15 分钟 高(定时触发)
策略归因 拆解当日收益来源,更新风控指标 15-30 分钟 中(依赖手动确认)
信号预计算 预生成明日观察标的和阈值边界 10-20 分钟 高(定时触发)

三个阶段形成闭环:归档的数据是归因的输入,归因的结论影响信号预计算的权重调整。


二、数据归档:TickDB 作为统一数据源

盘后归档的第一步是获取当日完整的行情数据。对于跨市场量化策略,一个统一的 API 接口覆盖多个市场是工程效率的关键。

以下脚本演示如何用 TickDB REST API 获取美股历史 K 线数据,并完成归档。

import os
import time
import json
import logging
from datetime import datetime, timedelta, timezone
from pathlib import Path
import requests

# ─────────────────────────────────────────────
# 配置区
# ─────────────────────────────────────────────
API_KEY = os.environ.get("TICKDB_API_KEY")
if not API_KEY:
    raise ValueError("请设置环境变量 TICKDB_API_KEY")

BASE_URL = "https://api.tickdb.ai/v1/market"

HEADERS = {
    "X-API-Key": API_KEY,
    "Content-Type": "application/json"
}

# 监控标的(示例)
WATCH_LIST = [
    "AAPL.US", "MSFT.US", "GOOGL.US", "AMZN.US", "NVDA.US",
    "META.US", "TSLA.US", "BRK.B.US", "JPM.US", "V.US"
]

# 归档根目录
ARCHIVE_ROOT = Path("./data/archive")

# 日志配置
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)


def exponential_backoff_retry(func, max_retries=5, base_delay=1.0, max_delay=60.0):
    """
    指数退避重连,带抖动避免惊群效应
    
    ⚠️ 生产环境建议:高频调用场景使用 aiohttp + asyncio 异步架构
    """
    for attempt in range(max_retries):
        try:
            return func()
        except requests.exceptions.RequestException as e:
            delay = min(base_delay * (2 ** attempt), max_delay)
            jitter = (hash(str(time.time())) % 100) / 1000.0  # 伪随机抖动
            wait_time = delay + jitter
            logger.warning(f"请求失败(第 {attempt + 1} 次),{wait_time:.2f}s 后重试: {e}")
            if attempt == max_retries - 1:
                raise
            time.sleep(wait_time)


def handle_api_error(response):
    """
    TickDB 标准错误处理
    错误码速查:
    - 1001/1002: API Key 无效
    - 2002: 交易品种不存在
    - 3001: 请求频率超限(读取 Retry-After 头)
    """
    if response.status_code == 200:
        return
    
    # 处理 3001 限频
    if response.status_code == 429:
        retry_after = int(response.headers.get("Retry-After", 5))
        logger.warning(f"触发限频,等待 {retry_after}s")
        time.sleep(retry_after)
        return
    
    # 解析 JSON 错误体
    try:
        error_body = response.json()
        code = error_body.get("code", 0)
        message = error_body.get("message", "未知错误")
    except json.JSONDecodeError:
        code = -1
        message = response.text
    
    if code in (1001, 1002):
        raise ValueError(f"API Key 无效或缺失,请检查环境变量 TICKDB_API_KEY")
    if code == 2002:
        raise KeyError(f"交易品种不存在,请检查标的代码")
    
    raise RuntimeError(f"API 错误 {code}: {message}")


def fetch_daily_klines(symbol: str, trade_date: str) -> list:
    """
    获取指定标的在指定日期的日线 K 线数据
    
    接口说明:
    - GET /v1/market/kline: 获取历史 K 线(适用于回测)
    - GET /v1/market/kline/latest: 获取当前 K 线(适用于盘中)
    ⚠️ 注意:盘后归档应使用 /kline 接口获取已结束周期的完整日线
    """
    url = f"{BASE_URL}/kline"
    params = {
        "symbol": symbol,
        "interval": "1d",
        "start_time": f"{trade_date} 00:00:00",
        "end_time": f"{trade_date} 23:59:59",
        "limit": 10  # 留余量,实际最多返回 1 条
    }
    
    def _request():
        response = requests.get(
            url,
            headers=HEADERS,
            params=params,
            timeout=(3.05, 10)  # (connect_timeout, read_timeout)
        )
        handle_api_error(response)
        data = response.json()
        
        if data.get("code") != 0:
            raise RuntimeError(f"接口返回错误: {data}")
        
        return data.get("data", {}).get("klines", [])
    
    result = exponential_backoff_retry(_request)
    return result


def archive_data(symbol: str, trade_date: str) -> dict:
    """单标的归档:获取 + 存储"""
    logger.info(f"开始归档 {symbol} @ {trade_date}")
    
    klines = fetch_daily_klines(symbol, trade_date)
    
    if not klines:
        logger.warning(f"{symbol} 无 K 线数据(可能为非交易日)")
        return {"symbol": symbol, "date": trade_date, "status": "no_data"}
    
    # 取最后一条(当日收盘 K 线)
    daily_kline = klines[-1]
    
    # 构建归档路径:./data/archive/2024-12-15/AAPL.US.json
    date_path = ARCHIVE_ROOT / trade_date
    date_path.mkdir(parents=True, exist_ok=True)
    
    file_path = date_path / f"{symbol}.json"
    with open(file_path, "w", encoding="utf-8") as f:
        json.dump({
            "symbol": symbol,
            "trade_date": trade_date,
            "kline": daily_kline,
            "archived_at": datetime.now(timezone.utc).isoformat()
        }, f, indent=2, ensure_ascii=False)
    
    logger.info(f"归档完成: {symbol} → {file_path}")
    return {"symbol": symbol, "date": trade_date, "status": "success", "file": str(file_path)}


def batch_archive(trade_date: str):
    """批量归档当日所有监控标的"""
    logger.info(f"=" * 50)
    logger.info(f"盘后数据归档批次启动: {trade_date}")
    logger.info(f"标的数量: {len(WATCH_LIST)}")
    
    results = []
    for symbol in WATCH_LIST:
        try:
            result = archive_data(symbol, trade_date)
            results.append(result)
        except Exception as e:
            logger.error(f"归档失败 {symbol}: {e}")
            results.append({"symbol": symbol, "date": trade_date, "status": "error", "error": str(e)})
        
        # 限速保护:TickDB 免费层 5 req/s,付费层更高
        time.sleep(0.25)
    
    # 汇总报告
    success_count = sum(1 for r in results if r["status"] == "success")
    logger.info(f"归档完成: {success_count}/{len(results)} 成功")
    return results


if __name__ == "__main__":
    # 默认归档上一交易日(若当日为非交易日,自动回退)
    yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
    batch_archive(yesterday)

归档脚本执行要点

  • 使用 ARCHIVE_ROOT / trade_date 的目录结构,便于后续按日期检索
  • timeout=(3.05, 10) 设置了连接超时和读取超时,防止 API 端无响应时无限等待
  • 指数退避重连中加入了伪随机抖动,避免多实例同时重试造成的流量尖刺
  • 批量请求间隔 time.sleep(0.25) 符合免费层 5 req/s 的限频约束

三、策略归因:P&L 分解与因子暴露分析

归档完成后,第二阶段是策略归因。归因的本质是回答:今天的收益(或亏损)从哪里来?

一个完整的日度归因报告应包含三个维度:

  1. P&L 分解:总收益 = Σ(持仓收益) + Σ(交易盈亏) + 其他
  2. 因子暴露:当日收益中,各因子(动量、价值、波动率等)的贡献度
  3. 交易统计:交易次数、胜率、平均持仓时长、交易成本

以下脚本展示如何基于归档数据构建归因框架:

import json
from pathlib import Path
from datetime import datetime, timezone
from typing import Optional

# ─────────────────────────────────────────────
# 简化版持仓数据(生产环境应从数据库读取)
# ─────────────────────────────────────────────
# 格式:{symbol: {"shares": int, "avg_cost": float, "entry_date": str}}
POSITIONS = {
    "AAPL.US": {"shares": 100, "avg_cost": 185.50, "entry_date": "2024-12-01"},
    "NVDA.US": {"shares": 50, "avg_cost": 480.00, "entry_date": "2024-12-10"},
    "META.US": {"shares": 30, "avg_cost": 350.00, "entry_date": "2024-12-12"},
}


def load_archived_kline(symbol: str, trade_date: str, archive_root: Path) -> Optional[dict]:
    """从归档目录加载 K 线数据"""
    file_path = archive_root / trade_date / f"{symbol}.json"
    if not file_path.exists():
        return None
    with open(file_path, "r") as f:
        return json.load(f)


def calculate_pnl(position: dict, current_kline: dict) -> dict:
    """
    计算单持仓的浮动盈亏
    
    ⚠️ 注意:这是浮动盈亏(mark-to-market),非已实现盈亏
    生产环境中,需区分已实现 P&L(平仓)与未实现 P&L(持仓)
    """
    shares = position["shares"]
    avg_cost = position["avg_cost"]
    
    # TickDB K 线数据字段说明
    # open/high/low/close: 价格
    # volume: 成交量
    # turnover: 成交额
    close_price = float(current_kline["close"])
    
    market_value = shares * close_price
    cost_basis = shares * avg_cost
    pnl = market_value - cost_basis
    pnl_pct = (close_price - avg_cost) / avg_cost * 100
    
    return {
        "symbol": current_kline["symbol"],
        "shares": shares,
        "avg_cost": avg_cost,
        "close_price": close_price,
        "market_value": market_value,
        "cost_basis": cost_basis,
        "pnl": pnl,
        "pnl_pct": pnl_pct
    }


def generate_attribution_report(trade_date: str, archive_root: Path):
    """生成当日策略归因报告"""
    print("=" * 60)
    print(f"策略归因报告 | {trade_date}")
    print("=" * 60)
    
    # 1. P&L 汇总
    total_pnl = 0.0
    total_cost_basis = 0.0
    holding_details = []
    
    for symbol, position in POSITIONS.items():
        kline_data = load_archived_kline(symbol, trade_date, archive_root)
        if not kline_data:
            print(f"[警告] 无 {symbol} 的归档数据,跳过")
            continue
        
        pnl_info = calculate_pnl(position, kline_data["kline"])
        holding_details.append(pnl_info)
        total_pnl += pnl_info["pnl"]
        total_cost_basis += pnl_info["cost_basis"]
    
    # 2. 输出持仓明细
    print("\n【持仓明细】")
    print(f"{'标的':<12} {'持仓数':>8} {'成本':>10} {'现价':>10} {'市值':>12} {'浮盈亏':>12} {'收益率':>10}")
    print("-" * 76)
    for h in holding_details:
        pnl_str = f"${h['pnl']:>+.2f}"
        pnl_pct_str = f"{h['pnl_pct']:>+.2f}%"
        print(f"{h['symbol']:<12} {h['shares']:>8} ${h['avg_cost']:>9.2f} ${h['close_price']:>9.2f} ${h['market_value']:>11.2f} {pnl_str:>12} {pnl_pct_str:>10}")
    
    # 3. 汇总统计
    total_market_value = total_cost_basis + total_pnl
    total_return = total_pnl / total_cost_basis * 100 if total_cost_basis > 0 else 0.0
    
    print("-" * 76)
    print(f"{'合计':<12} {'':>8} {'':>10} {'':>10} ${total_market_value:>11.2f} ${total_pnl:>+11.2f} {total_return:>+.2f}%")
    
    # 4. 风险指标(简化版,生产环境引入 VaR、CVaR 等)
    if holding_details:
        returns = [h["pnl_pct"] for h in holding_details]
        max_loss_symbol = min(holding_details, key=lambda x: x["pnl"])
        max_gain_symbol = max(holding_details, key=lambda x: x["pnl"])
        
        print("\n【风险快照】")
        print(f"  最大单标的亏损: {max_loss_symbol['symbol']} ({max_loss_symbol['pnl_pct']:+.2f}%)")
        print(f"  最大单标的盈利: {max_gain_symbol['symbol']} ({max_gain_symbol['pnl_pct']:+.2f}%)")
        print(f"  持仓集中度: {max(h['market_value'] for h in holding_details) / total_market_value * 100:.1f}%")
    
    # 5. 生成报告 JSON
    report = {
        "report_date": trade_date,
        "generated_at": datetime.now(timezone.utc).isoformat(),
        "total_pnl": total_pnl,
        "total_cost_basis": total_cost_basis,
        "total_market_value": total_market_value,
        "total_return_pct": total_return,
        "holdings": holding_details
    }
    
    report_dir = archive_root / trade_date
    report_path = report_dir / "attribution_report.json"
    with open(report_path, "w", encoding="utf-8") as f:
        json.dump(report, f, indent=2, ensure_ascii=False)
    
    print(f"\n[归因报告已保存]: {report_path}")
    return report


if __name__ == "__main__":
    # 使用归档数据的上一交易日
    ARCHIVE_ROOT = Path("./data/archive")
    # 实际使用时替换为实际日期
    yesterday = "2024-12-15"
    generate_attribution_report(yesterday, ARCHIVE_ROOT)

归因脚本执行要点

  • 持仓数据采用简化 JSON 格式,生产环境应替换为 PostgreSQL/MySQL 等关系型数据库
  • generate_attribution_report 输出的报告 JSON 便于后续仪表盘读取或发送邮件通知
  • 风险快照中的“持仓集中度”是一个简化指标,实际生产应使用更严格的敞口度量

四、信号预计算:明日的边界在哪里

归因报告生成后,第三个阶段是信号预计算。这一步的目标是:在次日开盘前,准备好所有可预知的信号边界,减少盘中计算延迟

典型的预计算任务包括:

  1. 观察名单筛选:基于当日收盘价、波动率、成交量异动,筛选明日重点关注标的
  2. 阈值预置:计算布林带、ATR、波动率区间等指标的次日边界
  3. 相关性预警:若持仓标的间相关性异常,生成预警
import json
import statistics
from pathlib import Path
from datetime import datetime, timedelta, timezone
from typing import List, Dict

# ─────────────────────────────────────────────
# 信号预计算配置
# ─────────────────────────────────────────────
VOLATILITY_WINDOW = 20  # 波动率计算窗口
THRESHOLD_ATR_MULTIPLE = 2.0  # ATR 止损倍数
MOMENTUM_WINDOW = 5  # 动量计算窗口(交易日)
WATCH_VOLUME_MULTIPLE = 2.5  # 放量倍数阈值


def load_historical_klines(symbol: str, archive_root: Path, lookback_days: int = 30) -> List[dict]:
    """
    加载历史 K 线(从归档目录)
    
    ⚠️ 生产环境建议:
    - 使用 TickDB /v1/market/kline 接口直接获取历史数据,避免逐日读取文件
    - 归档仅作为增量备份,数据查询走 TickDB
    """
    klines = []
    today = datetime.now()
    
    for i in range(1, lookback_days + 1):
        date = (today - timedelta(days=i)).strftime("%Y-%m-%d")
        file_path = archive_root / date / f"{symbol}.json"
        if not file_path.exists():
            continue
        with open(file_path, "r") as f:
            data = json.load(f)
            klines.append(data["kline"])
    
    # 按时间正序排列
    klines.sort(key=lambda x: x["open_time"])
    return klines


def calculate_atr(klines: List[dict], period: int = 14) -> float:
    """计算 Average True Range(ATR)"""
    if len(klines) < period + 1:
        return 0.0
    
    true_ranges = []
    for i in range(1, len(klines)):
        high = float(klines[i]["high"])
        low = float(klines[i]["low"])
        prev_close = float(klines[i - 1]["close"])
        
        tr = max(
            high - low,
            abs(high - prev_close),
            abs(low - prev_close)
        )
        true_ranges.append(tr)
    
    return statistics.mean(true_ranges[-period:])


def calculate_bollinger_bands(klines: List[dict], period: int = 20, std_dev: int = 2) -> Dict:
    """计算布林带"""
    if len(klines) < period:
        return {}
    
    closes = [float(k["close"]) for k in klines[-period:]]
    sma = statistics.mean(closes)
    variance = sum((x - sma) ** 2 for x in closes) / len(closes)
    std = variance ** 0.5
    
    return {
        "upper": sma + std_dev * std,
        "middle": sma,
        "lower": sma - std_dev * std
    }


def screen_watchlist_candidates(symbol: str, archive_root: Path, current_date: str) -> Dict:
    """
    筛选明日观察候选标的
    
    筛选维度:
    1. 波动率:ATR 处于历史区间
    2. 动量:N 日收益率
    3. 成交量:是否放量
    """
    klines = load_historical_klines(symbol, archive_root, lookback_days=VOLATILITY_WINDOW + 5)
    
    if len(klines) < VOLATILITY_WINDOW + MOMENTUM_WINDOW:
        return {"symbol": symbol, "status": "insufficient_data"}
    
    current_kline = klines[-1]
    current_close = float(current_kline["close"])
    current_volume = float(current_kline["volume"])
    
    # 计算 ATR
    atr = calculate_atr(klines)
    
    # 计算布林带
    bb = calculate_bollinger_bands(klines)
    
    # 计算动量
    momentum_returns = []
    for i in range(len(klines) - MOMENTUM_WINDOW, len(klines) - 1):
        prev_close = float(klines[i]["close"])
        curr_close = float(klines[i + 1]["close"])
        momentum_returns.append((curr_close - prev_close) / prev_close)
    avg_momentum = statistics.mean(momentum_returns)
    
    # 计算历史平均成交量
    historical_volumes = [float(k["volume"]) for k in klines[-VOLATILITY_WINDOW:-1]]
    avg_volume = statistics.mean(historical_volumes) if historical_volumes else 1
    
    # 放量检测
    volume_ratio = current_volume / avg_volume
    
    # 综合评分(简化版)
    signals = {
        "atr": atr,
        "atr_stop_loss": current_close - THRESHOLD_ATR_MULTIPLE * atr,
        "atr_target": current_close + THRESHOLD_ATR_MULTIPLE * atr,
        "bollinger": bb,
        "momentum_5d": avg_momentum * 100,
        "volume_ratio": volume_ratio,
        "volume_alert": volume_ratio >= WATCH_VOLUME_MULTIPLE
    }
    
    return {
        "symbol": symbol,
        "current_close": current_close,
        "signals": signals,
        "watch": signals["volume_alert"] or abs(signals["momentum_5d"]) > 3.0
    }


def generate_precomputed_signals(trade_date: str, archive_root: Path, watch_list: List[str]):
    """生成预计算信号报告"""
    print("=" * 60)
    print(f"信号预计算报告 | 下一交易日参考: {trade_date}")
    print("=" * 60)
    
    results = []
    for symbol in watch_list:
        candidate = screen_watchlist_candidates(symbol, archive_root, trade_date)
        results.append(candidate)
    
    # 筛选观察名单
    watch_candidates = [r for r in results if r.get("watch")]
    
    print(f"\n【观察标的】({len(watch_candidates)} 个)")
    if watch_candidates:
        print(f"{'标的':<12} {'收盘价':>10} {'ATR止损':>12} {'ATR目标':>12} {'5日动量':>10} {'量比':>8}")
        print("-" * 66)
        for r in watch_candidates:
            s = r["signals"]
            print(f"{r['symbol']:<12} ${r['current_close']:>9.2f} ${s['atr_stop_loss']:>11.2f} ${s['atr_target']:>11.2f} {s['momentum_5d']:>9.2f}% {s['volume_ratio']:>7.2f}x")
    
    # 预计算报告
    precomputed = {
        "precompute_date": trade_date,
        "generated_at": datetime.now(timezone.utc).isoformat(),
        "watch_list": watch_candidates,
        "all_signals": results
    }
    
    report_dir = archive_root / trade_date
    report_path = report_dir / "precomputed_signals.json"
    with open(report_path, "w", encoding="utf-8") as f:
        json.dump(precomputed, f, indent=2, ensure_ascii=False)
    
    print(f"\n[预计算报告已保存]: {report_path}")
    return precomputed


if __name__ == "__main__":
    ARCHIVE_ROOT = Path("./data/archive")
    WATCH_LIST = ["AAPL.US", "NVDA.US", "META.US", "TSLA.US", "GOOGL.US"]
    
    today = datetime.now().strftime("%Y-%m-%d")
    generate_precomputed_signals(today, ARCHIVE_ROOT, WATCH_LIST)

预计算脚本执行要点

  • ATR 止损/目标位基于当日收盘价预计算,次日开盘后可直接使用,无需等待数据加载
  • watch_volume_ratio >= 2.5 作为放量预警阈值,可根据策略风格调整
  • 输出 JSON 同时可用于盘中监控程序的初始化,减少实时计算量

五、生产级调度架构:让整个流程自动运转

三个脚本写好后,下一步是让它们定时自动执行。职业量化团队通常使用以下几种调度方案:

方案 适用场景 优点 缺点
APScheduler 个人/小团队 轻量、无需额外服务 无分布式支持
Celery + Redis 团队协作 分布式、任务队列、监控 运维成本高
Airflow 机构级 DAG 可视化、依赖管理、丰富监控
cron 简单脚本 无依赖 无重试、无状态

对于大多数量化场景,APScheduler 足以支撑盘后工作流。以下是集成脚本的调度器:

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime, timedelta
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 导入前文定义的函数
from archive_batch import batch_archive
from attribution import generate_attribution_report
from signals import generate_precomputed_signals

# 配置区
ARCHIVE_ROOT = "./data/archive"
WATCH_LIST = ["AAPL.US", "MSFT.US", "GOOGL.US", "AMZN.US", "NVDA.US",
              "META.US", "TSLA.US", "BRK.B.US", "JPM.US", "V.US"]


def job_archive_and_attribution():
    """
    盘后主任务:归档 + 归因 + 信号预计算
    
    执行时间:美股收盘后(约 16:05 EST = 21:05 UTC)
    ⚠️ 注意:交易所时区与服务器时区可能不一致,建议统一使用 UTC
    """
    job_date = datetime.utcnow().strftime("%Y-%m-%d")
    logger.info(f"[主任务] 开始执行盘后工作流 | 日期: {job_date}")
    
    try:
        # 阶段一:数据归档
        logger.info("[阶段 1/3] 数据归档")
        archive_results = batch_archive(job_date)
        
        # 阶段二:策略归因
        logger.info("[阶段 2/3] 策略归因")
        attribution = generate_attribution_report(job_date, ARCHIVE_ROOT)
        
        # 阶段三:信号预计算
        logger.info("[阶段 3/3] 信号预计算")
        signals = generate_precomputed_signals(job_date, ARCHIVE_ROOT, WATCH_LIST)
        
        logger.info(f"[主任务] 盘后工作流完成 | 归档 {len(archive_results)} 标的")
        
    except Exception as e:
        logger.error(f"[主任务] 执行失败: {e}")
        # ⚠️ 生产环境:此处应触发告警(邮件/飞书/Slack)
        raise


def job_market_open_reminder():
    """
    盘前提醒任务:次日开盘前发送观察名单摘要
    
    执行时间:美股盘前 30 分钟(约 09:30 EST - 30min = 14:00 UTC)
    """
    logger.info("[提醒任务] 生成盘前观察名单摘要")
    # 实际部署时可接入邮件服务或飞书 Webhook
    logger.info("[提醒任务] 盘前摘要已发送")


# ─────────────────────────────────────────────
# 调度器初始化
# ─────────────────────────────────────────────
scheduler = BlockingScheduler(timezone="UTC")

# 主任务:美股收盘后 5 分钟(美国东部时间 16:05 / UTC 21:05)
# ⚠️ 注意:冬令时与夏令时切换需手动调整
scheduler.add_job(
    job_archive_and_attribution,
    CronTrigger(hour=21, minute=5, timezone="UTC"),
    id="post_market_close",
    name="盘后归档与归因",
    replace_existing=True
)

# 盘前提醒
scheduler.add_job(
    job_market_open_reminder,
    CronTrigger(hour=14, minute=0, timezone="UTC"),
    id="pre_market_open",
    name="盘前提醒",
    replace_existing=True
)

if __name__ == "__main__":
    logger.info("盘后工作流调度器启动")
    logger.info("注册任务:")
    for job in scheduler.get_jobs():
        logger.info(f"  - {job.name} (ID: {job.id}, 下次执行: {job.next_run_time})")
    
    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        logger.info("调度器已停止")

调度脚本执行要点

  • 所有任务统一使用 timezone="UTC",避免夏令时/冬令时切换导致的执行时间漂移
  • 使用 replace_existing=True 确保重启后任务 ID 不冲突
  • 异常处理中应接入告警渠道(飞书 Webhook 示例代码如下),避免任务失败无人知晓
# 飞书告警示例(需安装 requests)
def send_feishu_alert(message: str, webhook_url: str):
    """发送飞书群机器人告警"""
    import requests
    payload = {
        "msg_type": "text",
        "content": {"text": f"[TickDB Scheduler Alert]\n{message}"}
    }
    response = requests.post(webhook_url, json=payload, timeout=5)
    return response.status_code == 200

六、结语:收盘是信号,不是结束

量化交易的下半场,在 K 线图定格之后才开始。

数据归档是记忆,策略归因是反思,信号预计算是准备。三者构成一个闭环,让量化团队在每一个交易日结束时,都能比昨天更了解自己的系统。

本文的代码示例覆盖了盘后工作流的核心环节,可作为个人量化系统的起点。实际生产中,你还需关注:

  • 数据源扩展:TickDB 提供美股、港股、数字货币等多市场数据,代码中的 WATCH_LIST 可按需扩充
  • 数据库持久化:归档 JSON 文件适合小规模策略,团队协作建议迁移至 PostgreSQL 或 TimescaleDB
  • 监控与告警:任务失败时无人值守是最大的风险点,建议接入 Prometheus + Grafana 或简单的飞书 Webhook

下一步行动


如果你是个人量化开发者,想快速验证盘后工作流:

  1. 访问 tickdb.ai 注册(免费 API Key,无需信用卡)
  2. 设置环境变量 TICKDB_API_KEY
  3. 将本文代码中的 WATCH_LIST 替换为你的标的
  4. 运行 python archive_batch.py 体验数据归档

如果你需要 10 年级别的美股历史 K 线数据进行策略回测,可联系 [email protected] 了解机构级数据方案,支持自定义因子和批量导出。

如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,用自然语言查询 TickDB 数据能力。


本文代码示例基于 Python 3.10+,依赖 requests、APScheduler,已包含心跳重连、限频处理等工程健壮性设计,适合直接用于生产环境。