每日收盘后,职业量化团队在做什么?

曼哈顿下城,凌晨十二点。

交易所的终端早已归于沉寂,而第七层楼的服务器机房依然亮着微弱的蓝光。交易员早已离开,但算法工程师老陈还坐在工位前——他不需要盯盘,他要盯的是后台。

过去六年,他构建了一套盘后自动化工作流。每天下午四点零一分,最后一笔撮合完成后,这套系统就会自动启动:数据从交易所归档入库,因子开始重新计算,策略归因报告在凌晨两点前生成,而到第二天开盘前,所有的预计算信号已经就绪。

"手动跑这套流程需要三个人全职处理,"他说,"现在我一个人花十分钟检查日志就够了。"

这不是个例。在全球主要量化基金,盘后自动化早已是基础设施的核心组成。它不是"省事",而是"必须"——手动处理千亿级组合的数据,光是归因分析就足以让分析师通宵达旦,而第二天开盘的信号预计算又接踵而至。

本文拆解职业量化团队盘后工作流的完整架构,给出可落地的生产级代码实现,并探讨如何将 TickDB 作为数据中枢串联整个流程。


一、盘后工作流全景图

在进入代码细节之前,需要先理解盘后工作流的全貌。职业团队的盘后任务不是杂乱的脚本集合,而是一条编排精密的数据流水线。

1.1 标准四阶段模型

阶段 时间窗口 核心任务 产出物
数据归档 16:01 - 16:30 原始数据清洗、入库、校验 清洗后的 OHLCV、成交明细、订单簿快照
因子再计算 16:30 - 18:00 日内因子修正、Alpha 因子重算 因子值文件、因子 IC 分析
策略归因 18:00 - 02:00 PnL 分解、风险归因、异常检测 归因报告、告警通知
信号预计算 02:00 - 08:30 次日候选标的筛选、信号初始化 明日信号池、配置文件

这四个阶段存在严格依赖:数据归档没完成,因子再计算就是空中楼阁;归因分析没跑完,预计算信号就没有修正依据。大多数团队的问题在于,阶段之间的数据传递依赖手工操作或半自动化脚本,一旦某个环节出错,往往到第二天才发现。

1.2 依赖关系与容错设计

一个健壮的盘后工作流必须解决三个问题:

依赖编排:任务 B 必须在任务 A 完成且数据校验通过后才能启动。传统的 cron 定时无法满足这种跨任务的依赖要求,必须使用 DAG(有向无环图)调度框架。

异常隔离:某个阶段失败不应阻塞后续阶段,但应自动重试并记录断点。比如因子计算失败,应该在重试三次后发送告警,然后继续执行归因(使用已缓存的旧因子值),而不是整个流程卡死。

数据校验:入库数据必须经过校验才能流入下一阶段。常见的校验包括:价格跳变检测(单日涨跌幅 > 20% 的标的需人工确认)、成交量异常(低于历史均值 5 个标准差)、字段完整性(OHLC 关系校验)。

┌─────────────┐    ┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│  数据归档   │───▶│ 因子再计算  │───▶│  策略归因   │───▶│ 信号预计算  │
│ 16:01-16:30 │    │ 16:30-18:00 │    │ 18:00-02:00 │    │ 02:00-08:30 │
└─────────────┘    └─────────────┘    └─────────────┘    └─────────────┘
       │                  │                 │                  │
       ▼                  ▼                 ▼                  ▼
   数据校验           IC检测           告警通知            配置文件

二、核心痛点拆解

在设计盘后工作流之前,需要先理解手工流程中最容易出问题的环节。

2.1 数据归档的三大隐患

隐患一:交易所数据格式不一致。 美股、港股、数字货币的数据源格式差异极大。手动处理时,工程师需要针对每个数据源写适配脚本,版本一多就成了技术债务。

隐患二:网络抖动导致数据丢失。 交易所的 FTP 或 WebSocket 可能在收盘后的几分钟内出现短暂断连。如果缺乏自动补数机制,部分数据就成了空白。

隐患三:时间戳时区混乱。 交易所数据可能使用本地时间、UTC 或 Unix 时间戳。跨时区拼接时,一个细节错误可能导致整个因子数据集错位数小时。

2.2 归因分析的计算陷阱

陷阱一:复利 vs 单利。 日收益率和年化收益率的计算方式不同,在展示给客户时若混用,会造成严重误解。

陷阱二:benchmark 选择。 使用不同 benchmark 会让同一策略呈现出截然不同的归因结果。究竟是相对沪深 300 超额,还是绝对收益?

陷阱三:幸存者偏差。 历史回测只包含存活标的,但实盘中退市标的的亏损同样计入成本。归因脚本若不考虑这一项,报告会显著高估收益。

隐患/陷阱 手工处理风险 自动化解决思路
数据格式不一致 工程师逐一适配,维护成本高 统一抽象层,按市场定义标准化解析器
网络抖动丢数 人工补数,效率低且易出错 自动重试 + 完整性校验 + 告警
时区混乱 调试困难,数据追查耗时 统一转换为 UTC 存储,返回时按需转换
复利/单利混淆 归因报告失真 脚本内强制标注计算方法,禁止混用
benchmark 漂移 选择任意性,结论不稳定 配置化,一套策略对应固定 benchmark
幸存者偏差 回测高估收益 归因时引入已退市标的成本

三、调度框架设计与实现

3.1 为什么不用 Cron

传统的 cron 定时任务在盘后场景有三个致命缺陷:

  • 无依赖管理cron 无法表达"任务 B 必须在任务 A 成功后才能启动"的语义。
  • 无失败重试:任务失败后只能靠外部监控脚本补救。
  • 无状态持久化:重启后无法续接断点,所有任务从头跑。

职业团队通常使用 DAG 调度框架。Python 生态中,Apache Airflow 是最成熟的选择,支持任务编排、失败重试、Web UI 监控、DAG 版本管理。

3.2 DAG 架构设计

以下是一个简化版的盘后工作流 DAG 设计:

# daily_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

default_args = {
    'owner': 'quant_team',
    'depends_on_past': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
}

with DAG(
    dag_id='daily_pipeline_v2',
    default_args=default_args,
    start_date=datetime(2026, 4, 1),
    schedule_interval='0 16 * * 1-5',  # 工作日 16:00
    catchup=False,
    max_active_runs=1,
) as dag:

    # 阶段一:数据归档
    archive_equity = PythonOperator(
        task_id='archive_us_equity',
        python_callable=run_equity_archive,
    )

    archive_crypto = PythonOperator(
        task_id='archive_crypto',
        python_callable=run_crypto_archive,
    )

    # 并行归档完成后,执行数据校验
    data_validation = PythonOperator(
        task_id='validate_archives',
        python_callable=validate_all_archives,
    )

    # 阶段二:因子再计算
    factor_recalculation = PythonOperator(
        task_id='recalculate_factors',
        python_callable=recalculate_alpha_factors,
    )

    # 阶段三:策略归因
    attribution_report = PythonOperator(
        task_id='generate_attribution',
        python_callable=generate_attribution_report,
        execution_timeout=timedelta(hours=6),
    )

    # 阶段四:信号预计算(跨天触发)
    signal_precompute = PythonOperator(
        task_id='precompute_signals',
        python_callable=precompute_next_signals,
    )

    # 依赖关系编排
    [archive_equity, archive_crypto] >> data_validation >> factor_recalculation >> attribution_report >> signal_precompute

3.3 生产级数据归档脚本

以下是 TickDB 数据归档的完整实现,包含网络抖动重试、完整性校验、环境变量管理:

# archive_tickdb.py
import os
import time
import hashlib
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, List
import requests

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class TickDBArchiver:
    """TickDB 盘后数据归档器 - 生产级实现"""
    
    BASE_URL = "https://api.tickdb.ai/v1"
    
    def __init__(self, symbols: List[str], lookback_days: int = 5):
        self.api_key = os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("环境变量 TICKDB_API_KEY 未设置")
        
        self.symbols = symbols
        self.lookback_days = lookback_days
        self.session = requests.Session()
        self.session.headers.update({"X-API-Key": self.api_key})
        
    def _request_with_retry(
        self,
        method: str,
        endpoint: str,
        params: Optional[Dict] = None,
        max_retries: int = 5,
        base_delay: float = 2.0,
    ) -> Dict:
        """
        带指数退避和抖动的重试机制
        处理 TickDB 限频错误码 3001
        """
        for attempt in range(max_retries):
            try:
                response = self.session.request(
                    method=method,
                    url=f"{self.BASE_URL}{endpoint}",
                    params=params,
                    timeout=(3.05, 30),  # 连接超时 3.05s,读超时 30s
                )
                
                if response.status_code == 200:
                    return response.json()
                
                # 解析错误码
                try:
                    error_body = response.json()
                    error_code = error_body.get("code", 0)
                except Exception:
                    error_code = 0
                
                # 限频错误:等待 server 指定的 retry_after
                if error_code == 3001:
                    retry_after = int(response.headers.get("Retry-After", 60))
                    logger.warning(f"触发限频 (3001),等待 {retry_after}s 后重试...")
                    time.sleep(retry_after)
                    continue
                
                # API Key 无效 (1001/1002)
                if error_code in (1001, 1002):
                    raise ValueError(f"API Key 无效: {error_body.get('message')}")
                
                # 其他错误,重试
                raise RuntimeError(f"请求失败 [{error_code}]: {error_body.get('message', 'unknown')}")
                
            except requests.exceptions.RequestException as e:
                delay = min(base_delay * (2 ** attempt), 60)
                jitter = 0.1 * delay * (hash(time.time()) % 100) / 100  # 抖动
                sleep_time = delay + jitter
                logger.warning(f"请求异常 ({attempt+1}/{max_retries}): {e},{sleep_time:.1f}s 后重试")
                time.sleep(sleep_time)
        
        raise RuntimeError(f"达到最大重试次数 ({max_retries})")
    
    def fetch_historical_klines(self, symbol: str, interval: str = "1d") -> List[Dict]:
        """
        获取历史 K 线数据用于归档
        """
        end_date = datetime.now().strftime("%Y-%m-%d")
        start_date = (datetime.now() - timedelta(days=self.lookback_days)).strftime("%Y-%m-%d")
        
        all_data = []
        cursor = None
        
        while True:
            params = {
                "symbol": symbol,
                "interval": interval,
                "start": start_date,
                "end": end_date,
            }
            if cursor:
                params["cursor"] = cursor
            
            result = self._request_with_retry("GET", "/market/kline", params=params)
            data = result.get("data", {}).get("klines", [])
            
            if not data:
                break
                
            all_data.extend(data)
            
            # 检查是否还有下一页
            next_cursor = result.get("data", {}).get("next_cursor")
            if not next_cursor:
                break
            cursor = next_cursor
            
            # ⚠️ 注意:分页请求需要遵守限频,建议加上短暂延迟
            time.sleep(0.2)
        
        return all_data
    
    def validate_data(self, symbol: str, klines: List[Dict]) -> bool:
        """
        数据校验:
        1. 价格跳变检测(单日涨跌幅 > 20%)
        2. OHLC 关系校验(High >= max(Open, Close) 等)
        3. 成交量异常检测(低于历史均值 5 个标准差)
        """
        if len(klines) < 2:
            return True
        
        prices = [float(k["close"]) for k in klines]
        
        for i in range(1, len(klines)):
            change = abs(prices[i] - prices[i-1]) / prices[i-1]
            
            # 跳变检测
            if change > 0.20:
                logger.error(
                    f"数据异常 [{symbol}] {klines[i]['timestamp']}: "
                    f"单日变动 {change*100:.2f}%"
                )
                return False
            
            # OHLC 校验
            o, h, l, c = (
                float(klines[i]["open"]),
                float(klines[i]["high"]),
                float(klines[i]["low"]),
                float(klines[i]["close"]),
            )
            if not (h >= max(o, c) and l <= min(o, c)):
                logger.error(f"OHLC 关系错误 [{symbol}] at {klines[i]['timestamp']}")
                return False
        
        return True
    
    def archive(self, output_dir: str = "./archive") -> Dict[str, int]:
        """执行归档任务"""
        results = {}
        
        for symbol in self.symbols:
            logger.info(f"开始归档 {symbol}...")
            
            try:
                data = self.fetch_historical_klines(symbol)
                
                if not self.validate_data(symbol, data):
                    logger.warning(f"{symbol} 数据校验失败,跳过归档")
                    results[symbol] = 0
                    continue
                
                # 写入归档文件(以日期和标的命名)
                filename = f"{output_dir}/{symbol.replace('/', '_')}_{datetime.now().date()}.json"
                os.makedirs(output_dir, exist_ok=True)
                
                with open(filename, "w") as f:
                    import json
                    json.dump({
                        "symbol": symbol,
                        "archived_at": datetime.now().isoformat(),
                        "count": len(data),
                        "data": data,
                    }, f)
                
                results[symbol] = len(data)
                logger.info(f"✓ {symbol} 归档完成,共 {len(data)} 条")
                
            except Exception as e:
                logger.error(f"✗ {symbol} 归档失败: {e}")
                results[symbol] = -1
        
        return results


def run_equity_archive():
    """Airflow 调用的入口函数"""
    archiver = TickDBArchiver(
        symbols=["AAPL.US", "NVDA.US", "TSLA.US", "MSFT.US", "GOOGL.US"],
        lookback_days=5,
    )
    results = archiver.archive(output_dir="/data/archive/equity")
    
    failed = [s for s, c in results.items() if c < 0]
    if failed:
        raise RuntimeError(f"归档失败: {', '.join(failed)}")
    
    return results


if __name__ == "__main__":
    archiver = TickDBArchiver(symbols=["AAPL.US", "NVDA.US"])
    results = archiver.archive()
    print(f"归档结果: {results}")

四、策略归因分析脚本

归因是盘后工作流中最复杂的环节。一个完整的归因脚本需要分解收益来源、计算风险指标、检测异常信号,并生成可读的报告。

4.1 归因模型设计

职业团队通常使用 Brinson 模型Factor-based 归因。以下示例展示的是基于因子的归因框架,将每日 PnL 分解为 alpha 因子贡献、风险敞口贡献、交易成本三个维度:

# attribution.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from pathlib import Path


class PortfolioAttributor:
    """组合归因分析器"""
    
    def __init__(self, benchmark: str = "SPY"):
        self.benchmark = benchmark
        self.results = {}
    
    def load_positions(self, date: str, db_path: str = "./data") -> pd.DataFrame:
        """加载当日持仓快照"""
        # 实际场景中从数据库读取,这里简化处理
        positions_file = Path(db_path) / f"positions_{date}.csv"
        return pd.read_csv(positions_file)
    
    def load_factors(self, date: str, db_path: str = "./data") -> pd.DataFrame:
        """加载因子值(由因子计算任务产出)"""
        factors_file = Path(db_path) / f"factors_{date}.csv"
        return pd.read_csv(factors_file)
    
    def calculate_factor_exposure(
        self,
        positions: pd.DataFrame,
        factors: pd.DataFrame,
    ) -> pd.DataFrame:
        """计算因子敞口:每只股票在每个因子上的暴露"""
        merged = positions.merge(
            factors, left_on="symbol", right_on="symbol", how="left"
        )
        
        # 因子暴露 = 持仓权重 × 因子值
        factor_cols = [c for c in factors.columns if c not in ("symbol", "date")]
        for factor in factor_cols:
            merged[f"{factor}_exposure"] = merged["weight"] * merged[factor]
        
        return merged
    
    def decompose_pnl(
        self,
        exposure: pd.DataFrame,
        factor_returns: pd.DataFrame,
        trading_cost_bps: float = 2.0,
    ) -> dict:
        """PnL 分解"""
        factor_cols = [c for c in exposure.columns if c.endswith("_exposure")]
        
        attribution = {
            "total_pnl": 0.0,
            "alpha_contribution": 0.0,
            "risk_contribution": 0.0,
            "transaction_cost": 0.0,
        }
        
        for factor in factor_cols:
            factor_name = factor.replace("_exposure", "")
            
            # 找到对应的因子收益率
            if factor_name in factor_returns.columns:
                factor_ret = factor_returns[factor_name].iloc[-1]
                factor_exposure = exposure[factor].sum()
                contribution = factor_exposure * factor_ret
                attribution[factor_name] = contribution
                attribution["total_pnl"] += contribution
        
        # 交易成本估算(基于换手率)
        avg_turnover = exposure.get("turnover", 0.1).mean()
        attribution["transaction_cost"] = -avg_turnover * trading_cost_bps / 10000
        
        return attribution
    
    def detect_anomalies(
        self,
        exposure: pd.DataFrame,
        daily_pnl: float,
        historical_pnl: pd.Series,
    ) -> list:
        """异常检测:超出 2 个标准差的 PnL 或因子暴露"""
        anomalies = []
        
        # PnL 异常
        if len(historical_pnl) > 20:
            mean, std = historical_pnl.mean(), historical_pnl.std()
            z_score = (daily_pnl - mean) / std
            if abs(z_score) > 2:
                anomalies.append({
                    "type": "pnl_outlier",
                    "z_score": round(z_score, 2),
                    "message": f"当日 PnL {daily_pnl:.2f} 超出 {z_score:.1f} 个标准差",
                })
        
        # 单只标的集中度风险
        max_weight = exposure["weight"].max()
        if max_weight > 0.20:  # 单只持仓超过 20%
            symbols = exposure.nlargest(3, "weight")["symbol"].tolist()
            anomalies.append({
                "type": "concentration",
                "symbols": symbols,
                "max_weight": round(max_weight, 4),
                "message": f"持仓集中度过高:{symbols[0]} 占比 {max_weight*100:.1f}%",
            })
        
        return anomalies
    
    def generate_report(self, date: str, output_dir: str = "./reports") -> str:
        """生成归因报告"""
        positions = self.load_positions(date)
        factors = self.load_factors(date)
        
        exposure = self.calculate_factor_exposure(positions, factors)
        
        # 模拟因子收益率数据(实际从因子数据库读取)
        factor_returns = pd.DataFrame({
            "momentum": [0.012],
            "value": [-0.005],
            "size": [0.003],
        })
        
        attribution = self.decompose_pnl(exposure, factor_returns)
        historical_pnl = pd.Series([1000, 1200, -500, 800, 600])  # 简化
        anomalies = self.detect_anomalies(exposure, attribution["total_pnl"], historical_pnl)
        
        # 生成 Markdown 报告
        report_path = Path(output_dir) / f"attribution_{date}.md"
        report_path.parent.mkdir(parents=True, exist_ok=True)
        
        with open(report_path, "w") as f:
            f.write(f"# 归因报告 - {date}\n\n")
            f.write(f"**基准**: {self.benchmark}\n\n")
            
            f.write("## 收益分解\n\n")
            f.write(f"| 贡献项 | 金额 |\n")
            f.write(f"|--------|------|\n")
            for key, value in attribution.items():
                if key != "total_pnl":
                    f.write(f"| {key} | {value:,.2f} |\n")
            f.write(f"| **总计** | **{attribution['total_pnl']:,.2f}** |\n\n")
            
            if anomalies:
                f.write("## 异常告警\n\n")
                for a in anomalies:
                    f.write(f"- 🔴 **[{a['type']}]** {a['message']}\n")
            else:
                f.write("## 异常告警\n\n✅ 无异常\n")
        
        return str(report_path)


def run_attribution():
    """Airflow 调用的入口函数"""
    date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
    
    attributor = PortfolioAttributor(benchmark="SPY")
    report_path = attributor.generate_report(
        date,
        output_dir="/data/reports",
    )
    
    print(f"✓ 归因报告已生成: {report_path}")
    return report_path

4.2 归因报告结构

一份合格的归因报告应当包含:

  • PnL 汇总:当日总收益、相对基准超额、分解到因子层面的贡献
  • 风险指标:持仓集中度、波动率、最大回撤
  • 异常告警:PnL 极端值、单只标的集中度超限、因子暴露漂移
  • 交易成本:佣金、滑点估算(基于成交记录)

五、信号预计算与数据导出

盘后工作流的最后一步是将预计算信号导出到配置目录,供次日实盘系统读取。

5.1 信号池管理

# signal_precompute.py
import json
import os
from datetime import datetime
from pathlib import Path
from typing import Dict, List
import pandas as pd


class SignalPrecomputer:
    """信号预计算器 - 为次日实盘准备候选标的池"""
    
    def __init__(
        self,
        config_dir: str = "./config",
        min_volume: float = 1_000_000,  # 最小日均成交量(美元)
        max_positions: int = 50,
    ):
        self.config_dir = config_dir
        self.min_volume = min_volume
        self.max_positions = max_positions
    
    def load_factor_data(self, date: str) -> pd.DataFrame:
        """加载因子数据(由因子计算任务产出)"""
        # 实际场景:从 TickDB 获取最新因子值
        # 这里简化处理
        return pd.DataFrame({
            "symbol": ["AAPL.US", "NVDA.US", "TSLA.US"],
            "momentum": [0.05, 0.08, 0.12],
            "volume_ratio": [1.2, 1.5, 2.1],
            "volatility": [0.15, 0.25, 0.35],
        })
    
    def compute_signals(self, factors: pd.DataFrame) -> pd.DataFrame:
        """
        复合信号计算:
        1. 动量因子(历史 N 日收益率)
        2. 成交量异动因子(今日/均值比)
        3. 波动率过滤(排除极端波动标的)
        """
        scores = factors.copy()
        
        # 综合评分:动量权重 0.4,量比权重 0.3,波动率逆向权重 0.3
        scores["composite_score"] = (
            scores["momentum"] * 0.4
            + scores["volume_ratio"] * 0.3
            - scores["volatility"] * 0.3
        )
        
        # 按评分排序,取前 N 只
        top_signals = scores.nlargest(self.max_positions, "composite_score")
        
        return top_signals
    
    def export_signal_config(self, signals: pd.DataFrame, date: str) -> str:
        """导出为次日实盘系统可读的配置"""
        # 格式:每行一个标的,权重按复合评分归一化
        total_score = signals["composite_score"].sum()
        
        config = {
            "generated_at": datetime.now().isoformat(),
            "trading_date": self._next_trading_day(date),
            "signals": [
                {
                    "symbol": row["symbol"],
                    "weight": round(row["composite_score"] / total_score, 6),
                    "score": round(row["composite_score"], 4),
                }
                for _, row in signals.iterrows()
            ],
        }
        
        config_path = Path(self.config_dir) / f"signals_{date}.json"
        config_path.parent.mkdir(parents=True, exist_ok=True)
        
        with open(config_path, "w") as f:
            json.dump(config, f, indent=2)
        
        return str(config_path)
    
    def _next_trading_day(self, date: str) -> str:
        """计算下一个交易日(简化实现,假设不含周末)"""
        from datetime import datetime, timedelta
        d = datetime.strptime(date, "%Y-%m-%d")
        next_d = d + timedelta(days=1)
        
        # 跳过周末
        if next_d.weekday() == 5:  # 周六
            next_d += timedelta(days=2)
        elif next_d.weekday() == 6:  # 周日
            next_d += timedelta(days=1)
        
        return next_d.strftime("%Y-%m-%d")


def precompute_next_signals():
    """Airflow 调用的入口函数"""
    date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
    
    precomputer = SignalPrecomputer(config_dir="/data/config")
    factors = precomputer.load_factor_data(date)
    signals = precomputer.compute_signals(factors)
    config_path = precomputer.export_signal_config(signals, date)
    
    print(f"✓ 信号配置已导出: {config_path}")
    print(f"  共 {len(signals)} 只候选标的")
    return config_path

5.2 数据导出规范

信号配置导出后,实盘系统会在开盘前 30 分钟启动,加载配置并初始化订单簿订阅。以下是关键导出规范:

字段 类型 说明
generated_at ISO8601 生成时间,用于判断数据新鲜度
trading_date YYYY-MM-DD 目标交易日
symbol string 标的代码(格式:AAPL.US
weight float 目标权重(归一化后,总和 = 1)
score float 复合评分(用于优先级排序)

六、TickDB 在盘后工作流中的角色

回到一个核心问题:TickDB 在这套工作流中扮演什么角色?

6.1 数据中枢定位

TickDB 的核心价值在于提供统一的历史 K 线数据源,覆盖美股、港股、数字货币等多个市场。盘后工作流中,数据归档环节可以直接调用 TickDB 的 /market/kline 接口获取历史数据,省去维护多数据源的复杂度。

对于因子再计算环节,TickDB 提供的 kline 接口可以快速回溯历史数据,用于因子验证和滚动窗口计算。

6.2 能力边界说明

场景 是否支持 说明
获取美股历史 K 线 ✅ 支持 10 年级别,涵盖主要标的
获取美股 tick 级逐笔 ❌ 不支持 trades 接口不支持美股
获取港股 depth 订单簿 ✅ 支持 10 档深度
数字货币实时订阅 ✅ 支持 depth + trades + kline

这意味着,如果你需要美股的逐笔成交数据用于高频策略归因,TickDB 目前无法覆盖。但对于日线级别的因子计算和盘后归因,TickDB 是可靠的数据源。

6.3 数据质量对比

维度 传统数据源(Polygon/Tushare) TickDB
历史 K 线完整性 依赖供应商,维护成本高 统一 API,10 年美股数据
实时性 轮询延迟 1-5s WebSocket 推送 <100ms
多市场覆盖 需要多个供应商 单一 API 覆盖 6 类资产
API 稳定性 各家标准不一 REST + WebSocket 标准化接口

七、部署与监控建议

7.1 分层部署方案

场景 推荐配置 说明
个人量化研究者 Airflow 单机部署 + 本地 SQLite 轻量级,满足策略验证需求
小型量化团队(<10 人) Airflow 单机 + PostgreSQL 支持多任务并发,团队协作
机构级量化团队 Airflow 集群 + 高可用 Postgres 支持 SLA,任务 SLA 99.9%

7.2 监控与告警

盘后工作流的监控至少需要覆盖:

  • 任务成功率:当日所有任务是否全部成功
  • 数据完整性:归档数据条数是否符合预期
  • 运行延迟:每个阶段耗时是否超出 SLA
  • 异常检测:归因报告是否生成,告警列表是否为空

推荐使用 Airflow 内置的告警机制结合飞书/钉钉 Webhook:

# alert.py
import requests
import os

def send_alert(message: str, channel: str = "feishu"):
    webhook_url = os.environ.get(f"{channel.upper()}_WEBHOOK_URL")
    if not webhook_url:
        return
    
    payload = {
        "msg_type": "text",
        "content": {"text": f"⚠️ 盘后工作流告警\n{message}"},
    }
    requests.post(webhook_url, json=payload, timeout=5)

结语

盘后工作流的本质,是把"靠时间和人力堆出来的重复劳动"转化为"可复现、可监控、可优化的自动化流水线"。

职业团队与个人投资者的差距,往往不在于策略本身,而在于执行的一致性。手工操作会疲劳、会出错、会在凌晨两点选择性忽略异常;而自动化系统会按部就班地执行每一个检查,即使是最微小的数据跳变也会记录在案。

本文给出的代码架构可以在 1-2 周内完成原型验证。关键不是技术难度,而是把流程固化下来的决心

当你的盘后工作流稳定运行之后,你会发现:收盘不是结束,而是下一轮竞争的准备。


下一步行动

如果你希望快速验证本文的工作流架构

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台获取 API Key
  3. 设置环境变量 TICKDB_API_KEY
  4. 运行本文的归档脚本 archive_tickdb.py

如果你需要多市场历史数据覆盖(港股、数字货币、美股 10 年 K 线):

  • 标准层已包含美股、港股、数字货币的 /kline 接口
  • 历史数据深度和标的数量因订阅计划而异,访问官网对比

如果你需要更长的历史回测数据做因子验证

  • 联系 [email protected] 获取机构级数据方案
  • 支持 10 年美股全市场历史 K 线,可用于因子 IC 分析和滚动回测

如果你习惯用 AI 辅助开发

  • 在 ClawHub 搜索并安装 tickdb-market-data SKILL
  • 可以用自然语言查询 TickDB 的数据能力并生成示例代码

风险提示:本文不构成任何投资建议。策略回测结果基于历史数据,不代表未来收益。市场有风险,投资需谨慎。