收盘钟声响起的那一刻,你的策略才刚开始被审判
"你的因子在今天贡献了多少 alpha?"
这不是一个能靠"感觉"回答的问题。职业量化团队的答案来自盘后工作流:数据清洗、归因分析、信号预计算、风险检查——一套完整的自动化流水线,在纽交所 4:00 ET 收盘后立即启动,到次日凌晨 5:00 前完成所有准备工作。对于一个有竞争力的团队,这套系统的工程质量,直接决定了明天早上你坐在屏幕前时,手里握的是一把磨好的刀,还是一堆待处理的乱麻。
本文拆解一个典型的职业级盘后自动化工作流,从任务调度设计、数据 ETL 管道、策略归因分析脚本,到明日信号预计算,提供可直接落地的代码架构。目标不是"演示",是"能上线跑三年的系统"。
一、为什么盘后不是"收尾"而是"第二轮开始"
很多人以为收盘后就是整理数据、写写复盘笔记。但职业团队的盘后工作流,本质上是第二轮策略生产:用今天的全量数据对策略进行事后检验,从中提取改进信号,并为明日的实盘提供结构化的输入。
这条流水线的价值在于三个"早":
- 早归档:今天产生的每一笔成交记录、每一个 order book 快照,必须在数据结构化后存档。手工操作意味着遗漏和延迟,自动化的归档在收盘后 15 分钟内完成。
- 早归因:在隔夜信息(美股盘后财报发布、宏观经济数据)影响次日开盘之前,团队需要知道今天因子体系里谁在赚钱、谁在失血。归因报告的及时性决定了你调整组合的速度。
- 早预计算:次日开盘前的宏观事件、盘前财报、板块异动信息,可以在盘后预先编码为"信号提醒",而不是开盘后手忙脚乱地临时分析。
这三个"早",依赖一套自动化工作流。人工盘后复盘的团队,永远比自动化团队慢半拍——而且慢的那半拍,通常是在凌晨 1:00 到 3:00 之间最疲劳的时候。
二、盘后工作流全景:五个阶段的时间窗口
一个完整的盘后工作流,按时间顺序分为五个阶段:
| 阶段 | 时间窗口(EST) | 核心任务 | 自动化要求 |
|---|---|---|---|
| T+0 归档 | 16:00 – 16:30 | TickDB/API → 数据湖,生成原始表 | 脚本自动,失败告警 |
| T+1 清洗 | 16:30 – 17:30 | 缺失值处理、异常值剔除、价格对齐 | 可配置规则,幂等执行 |
| T+2 归因 | 17:30 – 19:00 | 因子暴露、收益归因、风险归因 | 全量报告 + 增量告警 |
| T+3 预计算 | 19:00 – 23:00 | 明日信号预计算、事件编码、板块扫描 | 批量任务,并发执行 |
| T+4 质检 | 23:00 – 05:00 | 数据一致性检查、模型回滚准备、次日白板 | 静默运行,出错钉钉 |
时间窗口的设计遵循一个原则:越早完成的任务,越优先自动化。归档和清洗是后续所有步骤的数据基础,任何延迟都会级联放大。
接下来逐阶段展开架构和代码。
三、T+0 数据归档:TickDB API 的定时拉取与持久化
3.1 归档架构设计
盘后归档的本质是从数据源(TickDB REST API)拉取当天完整的行情数据,写入本地数据湖(PostgreSQL 或 DuckDB)。架构上需要注意三个问题:
- 拉取时机:纽交所 16:00 收盘,但部分资产(期权、期货)的数据可能在 16:15 之后才完全结算。策略需要等数据完全写入后再归档,避免截断。
- 增量 vs 全量:日频因子归因需要当天全量数据;高频因子可以用增量归档节省带宽。但对新手团队,建议全量归档以保证一致性。
- 幂等性:同一个任务重新运行不应重复写入数据,需要在入库层做唯一键约束或 upsert 逻辑。
3.2 TickDB 历史 K 线拉取脚本
归档的第一步是从 TickDB 获取当日的 K 线数据。以下代码展示了生产级的定时拉取实现,包含重试机制、超时设置和环境变量管理:
import os
import time
import json
import logging
import requests
from datetime import datetime, timezone, timedelta
from pathlib import Path
import duckdb
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler(), logging.FileHandler("/var/log/tickdb_archive.log")]
)
logger = logging.getLogger(__name__)
# ── 配置 ──────────────────────────────────────────────────────────
API_KEY = os.environ.get("TICKDB_API_KEY")
BASE_URL = "https://api.tickdb.ai/v1/market"
WATCHLIST = ["AAPL.US", "MSFT.US", "SPY.US", "QQQ.US", "NVDA.US"]
HEADERS = {"X-API-Key": API_KEY}
RETRY_CONFIG = {"max_retries": 3, "base_delay": 2, "max_delay": 30}
def fetch_klines(symbol: str, interval: str = "1m", limit: int = 500,
start_time: int = None, end_time: int = None) -> list:
"""
从 TickDB 获取 K 线数据,支持时间范围过滤。
⚠️ 注意:若需要获取历史大时间范围,请使用循环翻页(page_token)
而非一次性拉取大量数据,避免触发限频。
"""
params = {"symbol": symbol, "interval": interval, "limit": limit}
if start_time:
params["start_time"] = start_time
if end_time:
params["end_time"] = end_time
for attempt in range(RETRY_CONFIG["max_retries"]):
try:
resp = requests.get(
f"{BASE_URL}/kline",
headers=HEADERS,
params=params,
timeout=(3.05, 10) # 连接超时 3.05s,读取超时 10s
)
if resp.status_code == 200:
data = resp.json()
if data.get("code") == 0:
return data["data"]
elif data.get("code") == 3001:
retry_after = int(resp.headers.get("Retry-After", 5))
logger.warning(f"限频: 等待 {retry_after}s (attempt {attempt + 1})")
time.sleep(retry_after)
continue
else:
logger.error(f"API 错误 {data.get('code')}: {data.get('message')}")
return []
else:
logger.warning(f"HTTP {resp.status_code} (attempt {attempt + 1})")
except requests.exceptions.Timeout:
logger.warning(f"请求超时 (attempt {attempt + 1})")
except requests.exceptions.RequestException as e:
logger.error(f"网络异常: {e}")
return []
delay = min(RETRY_CONFIG["base_delay"] * (2 ** attempt), RETRY_CONFIG["max_delay"])
jitter = time.uniform(0, delay * 0.1)
time.sleep(delay + jitter)
logger.error(f"拉取 {symbol} K线失败,已达最大重试次数")
return []
def store_to_duckdb(symbol: str, interval: str, records: list, db_path: str = "data_lake.duckdb"):
"""将 K 线记录写入 DuckDB,启用 upsert 语义避免重复写入"""
if not records:
return 0
con = duckdb.connect(db_path, read_only=False)
con.execute("""
CREATE TABLE IF NOT EXISTS kline_archive (
symbol VARCHAR,
interval VARCHAR,
open_time BIGINT,
close_time BIGINT,
open DOUBLE, high DOUBLE, low DOUBLE, close DOUBLE,
volume DOUBLE,
insert_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (symbol, interval, open_time)
)
""")
rows = [
(
symbol,
interval,
r["open_time"],
r["close_time"],
r["open"],
r["high"],
r["low"],
r["close"],
r["volume"],
)
for r in records
]
con.executemany(
"INSERT OR REPLACE INTO kline_archive "
"(symbol, interval, open_time, close_time, open, high, low, close, volume) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
rows
)
con.close()
return len(rows)
def run_daily_archive(target_date: datetime = None):
"""
每日归档主流程:
1. 计算目标日期的时间戳范围(16:00 – 16:15 EST,等结算完成)
2. 按标的拉取当日 K 线
3. 写入本地 DuckDB
"""
if target_date is None:
target_date = datetime.now(timezone.utc) - timedelta(days=1)
# 转换为毫秒时间戳(UTC,16:00 EST = 21:00 UTC)
start_ts = int((target_date.replace(hour=21, minute=0, second=0)).timestamp() * 1000)
end_ts = int((target_date.replace(hour=21, minute=15, second=0)).timestamp() * 1000)
total_stored = 0
failed_symbols = []
for symbol in WATCHLIST:
logger.info(f"归档 {symbol} ({target_date.strftime('%Y-%m-%d')})")
records = fetch_klines(symbol, interval="1m", start_time=start_ts, end_time=end_ts)
if records:
count = store_to_duckdb(symbol, "1m", records)
total_stored += count
logger.info(f" → 写入 {count} 条记录")
else:
failed_symbols.append(symbol)
logger.warning(f" → {symbol} 无数据或拉取失败")
# 归档完成后发送告警
if failed_symbols:
logger.error(f"归档完成,但以下标的失败: {failed_symbols}")
else:
logger.info(f"✅ 归档完成,共写入 {total_stored} 条记录")
return {"total": total_stored, "failed": failed_symbols}
if __name__ == "__main__":
run_daily_archive()
⚠️ 生产环境提醒:上述代码使用同步
requests库,在高频归档场景(50+ 标的、1 分钟间隔)下建议替换为aiohttp+asyncio,并发拉取可将总耗时从串行的 N×2s 降低到并发运行的约 2-3s。但对日频归档(每日一次),同步版本已足够稳定。
3.3 定时调度配置
归档脚本通过系统级定时任务(cron)触发。生产环境建议用 systemd timer 替代 cron,好处是可以配置依赖关系、在失败后自动重启:
# /etc/systemd/system/tickdb-archive.service
[Unit]
Description=TickDB Daily Archive
After=network-online.target
Wants=network-online.target
[Service]
Type=oneshot
Environment="TICKDB_API_KEY=%ENV{TICKDB_API_KEY}"
ExecStart=/opt/quant/venv/bin/python /opt/quant/scripts/daily_archive.py
User=quant
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
# /etc/systemd/system/tickdb-archive.timer
[Unit]
Description=TickDB Daily Archive Timer (16:20 EST)
Requires=tickdb-archive.service
[Timer]
OnCalendar=*-*-* 21:20:00 UTC
Persistent=true
RandomizedDelaySec=60
[Install]
WantedBy=timers.target
RandomizedDelaySec=60 是工程上的一个小技巧:在多实例部署时,避免所有 worker 同时在 21:20:00 准时发起请求,造成瞬时流量尖峰。
四、T+1 数据清洗:构建一致性数据管道
4.1 清洗规则设计
原始归档数据包含三类需要处理的问题:
| 问题类型 | 典型表现 | 处理策略 |
|---|---|---|
| 缺失值 | 某分钟 K 线缺失(如交易所短暂故障) | 前向填充(ffill),缺失超过 3 分钟则标记为无效 |
| 异常值 | 价格为 0、成交量为负、价格跳变 > 20σ | 标记为 NaN,参与计算时跳过 |
| 浮点精度 | 价格为 152.3000000000001 | 入库前统一 round 到精度 4 位 |
清洗管道的核心原则是可追溯:每条被清洗过的数据必须记录原因,便于事后回溯和规则调整。
4.2 清洗 ETL 脚本
import duckdb
import pandas as pd
import numpy as np
from datetime import datetime, timezone
from pathlib import Path
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def load_raw_data(symbol: str, date: str, db_path: str = "data_lake.duckdb") -> pd.DataFrame:
con = duckdb.connect(db_path, read_only=True)
df = con.execute(f"""
SELECT * FROM kline_archive
WHERE symbol = '{symbol}'
AND DATE(from_unixtime(open_time / 1000)) = '{date}'
ORDER BY open_time
""").df()
con.close()
return df
def clean_kline(df: pd.DataFrame, max_price_jump: float = 0.2) -> pd.DataFrame:
"""
标准化 K 线清洗流程:
1. 标记缺失时间窗口
2. 剔除异常价格(跳变超过 max_price_jump)
3. 前向填充缺失值
4. 记录清洗日志
"""
if df.empty:
return df
df = df.copy()
df["open_dt"] = pd.to_datetime(df["open_time"], unit="ms", utc=True)
df["close_dt"] = pd.to_datetime(df["close_time"], unit="ms", utc=True)
# ── 1. 检测时间序列缺失 ──────────────────────────────────────────
df["expected_interval"] = df["open_time"].diff()
df["missing_flag"] = df["expected_interval"] > 65_000 # 超过 1 分钟(留 5s 容差)
missing_count = df["missing_flag"].sum()
if missing_count > 0:
logger.warning(f"检测到 {missing_count} 个时间窗口缺失")
# ── 2. 异常价格检测(价格跳变)─────────────────────────────────
df["price_pct_change"] = df["close"].pct_change().abs()
df["price_spike_flag"] = df["price_pct_change"] > max_price_jump
spikes = df[df["price_spike_flag"]]
if not spikes.empty:
logger.warning(f"检测到 {len(spikes)} 个价格跳变异常: {spikes[['open_dt', 'close', 'price_pct_change']].values}")
# 异常值替换为 NaN(不直接删除,保留行位置)
df.loc[df["price_spike_flag"], ["open", "high", "low", "close"]] = np.nan
# ── 3. 浮点精度归一化 ──────────────────────────────────────────
price_cols = ["open", "high", "low", "close"]
df[price_cols] = df[price_cols].round(4)
# ── 4. 前向填充缺失值 ──────────────────────────────────────────
# 仅填充价格列,成交量不填充(因为成交量缺失意味着没有成交)
df[price_cols] = df[price_cols].fillna(method="ffill", limit=1)
# ── 5. 汇总清洗元数据 ──────────────────────────────────────────
clean_stats = {
"total_rows": len(df),
"missing_windows": int(missing_count),
"price_spikes": int(len(spikes)),
"null_after_clean": int(df[price_cols].isnull().sum().sum()),
}
logger.info(f"清洗完成: {clean_stats}")
# 保留清洗元数据供归因分析使用
df.attrs["clean_stats"] = clean_stats
return df
def save_clean_data(df: pd.DataFrame, symbol: str, date: str, db_path: str = "data_lake.duckdb"):
"""写入清洗后的数据表(含清洗元数据)"""
if df.empty:
return
con = duckdb.connect(db_path, read_only=False)
# 清洗后的数据写单独表,便于区分原始数据和分析数据
con.execute(f"""
CREATE TABLE IF NOT EXISTS kline_clean (
symbol VARCHAR, interval VARCHAR, open_time BIGINT,
close_time BIGINT, open DOUBLE, high DOUBLE, low DOUBLE,
close DOUBLE, volume DOUBLE, missing_flag BOOLEAN,
price_spike_flag BOOLEAN, clean_dt TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
insert_df = df.drop(columns=["open_dt", "close_dt", "expected_interval",
"price_pct_change"], errors="ignore")
con.execute(f"DELETE FROM kline_clean WHERE symbol = '{symbol}' "
f"AND DATE(from_unixtime(open_time / 1000)) = '{date}'")
con.append("kline_clean", insert_df)
con.close()
logger.info(f"✅ {symbol} {date} 清洗数据已写入")
if __name__ == "__main__":
import sys
symbol = sys.argv[1] if len(sys.argv) > 1 else "AAPL.US"
date = sys.argv[2] if len(sys.argv) > 2 else (
datetime.now(timezone.utc) - pd.Timedelta(days=1)
).strftime("%Y-%m-%d")
raw = load_raw_data(symbol, date)
clean = clean_kline(raw)
save_clean_data(clean, symbol, date)
清洗脚本的关键设计决策:不直接删除异常行,而是标记后保留。这在归因分析中非常重要——你需要区分"今天某因子数据有问题"和"今天某因子策略失效",两者需要不同的响应方式。
五、T+2 策略归因分析:Brinson 模型简化实现
5.1 归因的目标:回答三个问题
盘后归因的核心目标是回答:
- 总量问题:今天组合的总收益是多少?跑赢基准多少?
- 结构问题:收益来自行业配置(allocation effect)还是个股选择(selection effect)?
- 因子问题:今天赚钱的因子是哪个?亏损的因子是否与预期一致?
职业团队的归因报告通常在收盘后 2-3 小时内生成,在夜盘(美股盘后无夜盘,以期货为参考)开始前送达组合经理手中。
5.2 Brinson 模型简化实现
Brinson 归因将超额收益分解为三个效应。这里提供一个简化的日频 Brinson 模型实现,代码可直接接入 T+1 清洗后的数据:
import duckdb
import pandas as pd
import numpy as np
from dataclasses import dataclass
from typing import Optional
from datetime import datetime, timezone
@dataclass
class AttributionResult:
"""归因结果数据结构"""
symbol: str
sector: str
weight: float
actual_return: float
benchmark_return: float
allocation_effect: float # 配置效应:被动持有该行业带来的超额收益
selection_effect: float # 选择效应:选股能力带来的超额收益
interaction_effect: float # 交互效应:配置×选股共同贡献
total_effect: float # 总超额收益 = allocation + selection + interaction
def compute_daily_attribution(
portfolio_weights: dict, # {symbol: weight}
actual_returns: dict, # {symbol: daily_return}
sector_map: dict, # {symbol: sector}
benchmark_returns: dict, # {symbol: benchmark_return}
benchmark_sector_weights: dict, # {sector: benchmark_weight}
risk_free_rate: float = 0.0
) -> pd.DataFrame:
"""
简化 Brinson 归因(日频版):
对于每个持仓标的,计算其在总组合超额收益中的贡献分解。
适用于股票组合的日频归因,不涉及期货、外汇等衍生品。
公式:
allocation_effect = (w_p - w_b) * r_b_sector
selection_effect = w_p * (r_p - r_b_sector)
interaction = (w_p - w_b) * (r_p - r_b_sector)
total_effect = allocation_effect + selection_effect + interaction
其中 w_p 为组合中该标的权重,w_b 为基准中该行业权重,
r_p 为标的实际收益,r_b_sector 为基准中该行业收益。
"""
records = []
for symbol, w_p in portfolio_weights.items():
r_p = actual_returns.get(symbol, 0.0)
sector = sector_map.get(symbol, "Unknown")
r_b_sector = benchmark_returns.get(sector, 0.0)
w_b = benchmark_sector_weights.get(sector, 0.0)
allocation_effect = (w_p - w_b) * r_b_sector
selection_effect = w_p * (r_p - r_b_sector)
interaction = (w_p - w_b) * (r_p - r_b_sector)
total_effect = allocation_effect + selection_effect + interaction
records.append(AttributionResult(
symbol=symbol, sector=sector,
weight=w_p, actual_return=r_p,
benchmark_return=r_b_sector,
allocation_effect=allocation_effect,
selection_effect=selection_effect,
interaction_effect=interaction,
total_effect=total_effect,
))
df = pd.DataFrame(records)
# 汇总统计
total_excess = df["total_effect"].sum()
allocation_total = df["allocation_effect"].sum()
selection_total = df["selection_effect"].sum()
interaction_total = df["interaction_effect"].sum()
logger.info(f"""
╔══════════════════════════════════════╗
║ {datetime.now(timezone.utc).strftime('%Y-%m-%d')} 归因报告 ║
╠══════════════════════════════════════╣
║ 配置效应: {allocation_total*100:>+.4f}%
║ 选股效应: {selection_total*100:>+.4f}%
║ 交互效应: {interaction_total*100:>+.4f}%
║ ────────────────────────────────║
║ 总超额收益: {total_excess*100:>+.4f}%
╚══════════════════════════════════════╝
""")
return df.sort_values("total_effect", ascending=False)
def generate_attribution_report(
date: str,
db_path: str = "data_lake.duckdb",
output_path: str = "reports/"
) -> str:
"""
生成日频归因报告主流程:
1. 从数据湖加载当日收盘价和基准数据
2. 计算持仓收益
3. 执行 Brinson 归因
4. 输出 Markdown 格式报告
"""
Path(output_path).mkdir(exist_ok=True)
con = duckdb.connect(db_path, read_only=True)
# 示例数据(实际场景从组合管理系统拉取持仓)
# 这里用模拟数据演示归因框架
portfolio_weights = {
"AAPL.US": 0.20, "MSFT.US": 0.15, "NVDA.US": 0.18,
"SPY.US": 0.25, "QQQ.US": 0.12, "TSLA.US": 0.10,
}
sector_map = {
"AAPL.US": "Technology", "MSFT.US": "Technology", "NVDA.US": "Technology",
"SPY.US": "Broad Market", "QQQ.US": "Technology",
"TSLA.US": "Consumer Discretionary",
}
# 模拟当日收益
actual_returns = {
"AAPL.US": 0.023, "MSFT.US": 0.015, "NVDA.US": 0.047,
"SPY.US": 0.012, "QQQ.US": 0.018, "TSLA.US": -0.031,
}
benchmark_returns = {
"Technology": 0.020, "Consumer Discretionary": -0.010, "Broad Market": 0.012,
}
benchmark_sector_weights = {
"Technology": 0.30, "Consumer Discretionary": 0.12, "Broad Market": 0.58,
}
con.close()
result_df = compute_daily_attribution(
portfolio_weights, actual_returns, sector_map,
benchmark_returns, benchmark_sector_weights
)
# 生成 Markdown 报告
report_path = f"{output_path}attribution_{date}.md"
with open(report_path, "w") as f:
f.write(f"# 策略归因报告 — {date}\n\n")
f.write("## 超额收益分解\n\n")
f.write("| 标的 | 行业 | 权重 | 实际收益 | 基准收益 | 配置效应 | 选股效应 | 交互效应 | 总效应 |\n")
f.write("|------|------|------|---------|---------|---------|---------|---------|--------|\n")
for _, row in result_df.iterrows():
f.write(f"| {row['symbol']} | {row['sector']} | {row['weight']:.2%} | "
f"{row['actual_return']:+.2%} | {row['benchmark_return']:+.2%} | "
f"{row['allocation_effect']:+.4f} | {row['selection_effect']:+.4f} | "
f"{row['interaction_effect']:+.4f} | **{row['total_effect']:+.4f}** |\n")
f.write("\n## 汇总\n\n")
f.write(f"- 配置效应合计:{result_df['allocation_effect'].sum():+.4f}\n")
f.write(f"- 选股效应合计:{result_df['selection_effect'].sum():+.4f}\n")
f.write(f"- 交互效应合计:{result_df['interaction_effect'].sum():+.4f}\n")
f.write(f"- **总超额收益:{result_df['total_effect'].sum():+.4f}**\n\n")
f.write("---\n*报告生成时间:" + datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + "*\n")
return report_path
if __name__ == "__main__":
import sys
date = sys.argv[1] if len(sys.argv) > 1 else (
datetime.now(timezone.utc) - pd.Timedelta(days=1)
).strftime("%Y-%m-%d")
path = generate_attribution_report(date)
print(f"✅ 归因报告已生成: {path}")
⚠️ 归因模型的局限性:上述 Brinson 模型是简化版,适用于日频权益类组合。实际职业团队的归因系统通常包含:
- 多因子归因( Barra / Axioma 模型):将收益分解到市值、动量、波动率等风险因子
- 交易归因:区分持仓收益和交易执行收益(区分"选对了但买贵了"的情况)
- 前推偏差检测:通过归因数据检测策略是否在近期样本上过度拟合
本文的简化实现作为归因框架的起点,覆盖 80% 的日常归因需求。
六、T+3 明日信号预计算:事件驱动与数据准备
6.1 预计算的内容
盘后第三阶段是"为明天做准备"。职业团队在收盘后会做以下预计算:
事件编码:今晚有哪些重要信息会影响次日开盘?典型的预计算包括:
| 事件类型 | 数据来源 | 编码内容 |
|---|---|---|
| 盘后财报发布 | 新闻 API / 财报数据库 | 股票代码、预期 EPS、实际 EPS、Revenue surprise |
| 宏观数据发布 | Fed / BLS / ECB 官网 | 时间(EST)、指标名称、市场预期值、前值 |
| 期权到期日 | CBOE 官网 | 到期日期、Gamma 集中区 |
| 指数成分调整 | FTSE Russell / S&P 官网 | 调入/调出标的、调仓时间 |
技术面预扫描:收盘后对全市场标的运行"今日异动扫描",识别以下模式:
- 突破 20 日/50 日均线的标的(供趋势策略使用)
- 成交量突增 > 3σ 的标的(供事件驱动策略使用)
- 波动率创季度新高的标的(供期权做市策略使用)
组合再平衡信号:根据当日归因结果,生成组合调整建议,供次日开盘前决策。
6.2 事件扫描与编码脚本
import requests
import json
import logging
from datetime import datetime, timezone, timedelta
from dataclasses import dataclass, asdict
from typing import List, Optional
logger = logging.getLogger(__name__)
@dataclass
class EarningsEvent:
"""盘后财报事件数据结构"""
symbol: str
company_name: str
report_date: str # 预期发布日期 (YYYY-MM-DD)
expected_eps: Optional[float]
actual_eps: Optional[float]
expected_revenue: Optional[float]
actual_revenue: Optional[float]
surprise_pct: Optional[float] # 实际 vs 预期偏离
market_reaction_estimate: str # "beat", "miss", "in-line", "pending"
@dataclass
class MacroEvent:
"""宏观数据事件"""
event_name: str
country: str
release_time: str # EST 时间
indicator_name: str
previous_value: str
consensus: str
impact_level: str # "high", "medium", "low"
class PostMarketEventScanner:
"""
盘后事件扫描器:整合财报、宏观数据,生成次日事件日历。
⚠️ 生产提示:此处使用模拟数据演示。生产环境应接入
可靠数据源(如 Benzinga Pro、Alpha Vantage、Intrinio 等),
并实现增量更新而非全量拉取。
"""
def __init__(self, api_key: str = None):
self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
def scan_earnings(self, date_range: tuple) -> List[EarningsEvent]:
"""
扫描指定日期范围内的财报发布事件。
Args:
date_range: (start_date, end_date),格式 "YYYY-MM-DD"
"""
# 示例:这里用模拟数据演示框架
# 生产环境接入财报 API,按 symbol + date_range 过滤
mock_events = [
EarningsEvent(
symbol="META.US", company_name="Meta Platforms",
report_date=date_range[1], expected_eps=5.02, actual_eps=None,
expected_revenue=41.5e9, actual_revenue=None,
surprise_pct=None, market_reaction_estimate="pending"
),
EarningsEvent(
symbol="AMZN.US", company_name="Amazon",
report_date=date_range[1], expected_eps=1.46, actual_eps=None,
expected_revenue=143.2e9, actual_revenue=None,
surprise_pct=None, market_reaction_estimate="pending"
),
]
logger.info(f"扫描到 {len(mock_events)} 个待发布财报事件")
return mock_events
def scan_macro_events(self, target_date: str) -> List[MacroEvent]:
"""
扫描次日宏观数据发布日程。
宏观事件对期货(ES / NQ)和外汇市场有直接影响,
是 CTA 和宏观策略盘前预计算的核心输入。
"""
mock_macro = [
MacroEvent(
event_name="CPI 月率",
country="US",
release_time="08:30 EST",
indicator_name="Consumer Price Index (MoM)",
previous_value="0.2%",
consensus="0.3%",
impact_level="high"
),
MacroEvent(
event_name="FOMC 会议纪要",
country="US",
release_time="14:00 EST",
indicator_name="FOMC Minutes",
previous_value="维持利率不变",
consensus="N/A",
impact_level="high"
),
]
logger.info(f"扫描到 {len(mock_macro)} 个宏观事件")
return mock_macro
def generate_tomorrow_brief(self, target_date: str) -> dict:
"""
生成次日简报(明日开盘前必读):
1. 事件日历(含时间线)
2. 异动标的列表
3. 盘前重点关注
"""
earnings = self.scan_earnings((target_date, target_date))
macro = self.scan_macro_events(target_date)
brief = {
"date": target_date,
"earnings": [asdict(e) for e in earnings],
"macro": [asdict(m) for m in macro],
"generated_at": datetime.now(timezone.utc).isoformat(),
"priority_items": self._rank_priority(earnings, macro),
}
return brief
def _rank_priority(self, earnings: List, macro: List) -> List[str]:
"""按影响力排序次日关注事项"""
items = []
for e in earnings:
items.append(f"📋 [{e.symbol}] 财报待发布(预期 EPS ${e.expected_eps})")
for m in macro:
items.append(f"📊 [{m.country}] {m.event_name} @ {m.release_time} "
f"(预期 {m.consensus}, 前值 {m.previous_value})")
return sorted(items, key=lambda x: "📊" in x, reverse=True)
if __name__ == "__main__":
import os
os.environ.setdefault("TICKDB_API_KEY", "your_key_here")
tomorrow = (datetime.now(timezone.utc) + timedelta(days=1)).strftime("%Y-%m-%d")
scanner = PostMarketEventScanner()
brief = scanner.generate_tomorrow_brief(tomorrow)
print(json.dumps(brief, indent=2, ensure_ascii=False))
6.3 盘后预计算的整体调度架构
归因分析和预计算脚本由统一的调度器管理。推荐使用 Airflow 或 Dagster 管理复杂的依赖关系。以下是一个简化的 Airflow DAG 示例,展示了五个阶段的依赖链:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
default_args = {
"owner": "quant-team",
"depends_on_past": False,
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="post_market_workflow",
default_args=default_args,
schedule="0 21 * * 1-5", # 每周一至周五 21:00 UTC (16:00 EST)
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["daily", "post-market", "attribution"],
) as dag:
start = EmptyOperator(task_id="start")
# T+0: 数据归档
archive = PythonOperator(
task_id="archive_tickdb_data",
python_callable=run_daily_archive,
)
# T+1: 数据清洗
clean = PythonOperator(
task_id="clean_and_validate",
python_callable=lambda: [clean_kline(load_raw_data(s)) for s in WATCHLIST],
)
# T+2: 策略归因
attribute = PythonOperator(
task_id="compute_attribution",
python_callable=lambda: generate_attribution_report(
datetime.now().strftime("%Y-%m-%d")
),
)
# T+3: 事件预扫描
prescan = PythonOperator(
task_id="scan_tomorrow_events",
python_callable=lambda: scanner.generate_tomorrow_brief(
(datetime.now() + timedelta(days=1)).strftime("%Y-%m-%d")
),
)
# T+4: 质检与告警
quality_check = PythonOperator(
task_id="data_quality_check",
python_callable=data_quality_report,
)
notify = PythonOperator(
task_id="notify_team",
python_callable=send_summary_to_slack,
trigger_rule="all_done", # 即使上游有失败也发送通知(带告警)
)
end = EmptyOperator(task_id="end")
# 依赖链
start >> archive >> clean >> [attribute, prescan] >> quality_check >> notify >> end
工程提示:在 DAG 调度中,
clean任务的输入依赖archive的输出。如果归档失败,清洗任务应该失败并阻止下游——这是 DAG 依赖管理的核心价值。trigger_rule="all_done"仅用于通知任务,确保团队总是能收到当日状态的汇报,即使中间某个步骤出错。
七、T+4 质检与告警:让流水线自己说话
7.1 数据质量检查清单
盘后流水线的最后一个环节是质检。职业团队的质检脚本检查以下项目:
def data_quality_report(symbols: list, date: str, db_path: str = "data_lake.duckdb") -> dict:
"""
盘后数据质量检查:
返回检测结果字典,并在发现异常时记录告警。
"""
checks = {
"missing_data": [],
"zero_volume_days": [],
"price_spikes": [],
"completeness_pct": {},
}
con = duckdb.connect(db_path, read_only=True)
for symbol in symbols:
# 1. 数据完整性:当日应有 390 分钟 K 线(美股 9:30-16:00)
row_count = con.execute(f"""
SELECT COUNT(*) FROM kline_clean
WHERE symbol = '{symbol}'
AND DATE(from_unixtime(open_time / 1000)) = '{date}'
""").fetchone()[0]
expected = 390
completeness = row_count / expected * 100
checks["completeness_pct"][symbol] = completeness
if completeness < 95:
checks["missing_data"].append({
"symbol": symbol,
"expected": expected,
"actual": row_count,
"gap": expected - row_count,
})
# 2. 零成交量检测
zero_vol = con.execute(f"""
SELECT COUNT(*) FROM kline_clean
WHERE symbol = '{symbol}'
AND DATE(from_unixtime(open_time / 1000)) = '{date}'
AND volume = 0
""").fetchone()[0]
if zero_vol > 5:
checks["zero_volume_days"].append({"symbol": symbol, "count": zero_vol})
# 3. 价格异常检测(已在清洗阶段标记)
spikes = con.execute(f"""
SELECT COUNT(*) FROM kline_clean
WHERE symbol = '{symbol}'
AND DATE(from_unixtime(open_time / 1000)) = '{date}'
AND price_spike_flag = True
""").fetchone()[0]
if spikes > 0:
checks["price_spikes"].append({"symbol": symbol, "count": spikes})
con.close()
# 汇总告警
alert_level = "ERROR" if checks["missing_data"] else \
"WARNING" if any(checks.values()) else "OK"
logger.log(
level=logging.ERROR if alert_level == "ERROR" else logging.WARNING,
msg=f"数据质检 [{alert_level}]: "
f"缺失 {len(checks['missing_data'])} 标的, "
f"零成交量 {len(checks['zero_volume_days'])} 标的, "
f"价格异常 {len(checks['price_spikes'])} 标的"
)
return {"status": alert_level, "checks": checks, "date": date}
7.2 告警路由
| 异常级别 | 触发条件 | 路由 |
|---|---|---|
| ERROR | 归档完全失败、核心标的数据缺失 | 立即钉钉/飞书通知 + 触发值班人员手机 |
| WARNING | 部分数据不完整、价格异常 | 异步消息,次日早会处理 |
| OK | 所有检查通过 | 仅写入日志,不打扰团队 |
告警的分级设计是工程上的重要取舍:把最重要的注意力留给最重要的问题。职业团队在凌晨 2:00 被一条 ERROR 叫醒,和被一条 WARNING 叫醒,团队的反应和处理质量完全不同。
八、部署方案:按团队规模选型
| 场景 | 数据量 | 推荐方案 | 说明 |
|---|---|---|---|
| 个人量化 | < 20 标的 | cron + DuckDB | 轻量,零运维,本地运行 |
| 小团队(2-5 人) | 20-100 标的 | Airflow + PostgreSQL | DAG 可视化,失败告警,协作友好 |
| 机构级 | 100+ 标的,多策略 | Dagster + 数据仓库(Snowflake/BigQuery) | 资产隔离,权限管理,审计日志 |
对于个人量化者,DuckDB 完全够用——它是一个嵌入式分析数据库,不需要独立部署服务器,读写性能在百万级日频数据上毫无压力。对于机构团队,PostgreSQL + Airflow 是行业标准组合,经历过大量生产环境的验证。
结语
收盘钟声敲响的那一刻,职业量化团队的工作才刚刚进入第二轮。
数据归档把今天的每一笔行情固化为可追溯的记录;数据清洗剔除噪声、保留信号;归因分析回答"今天赚了还是亏了,为什么";预计算在隔夜信息冲击次日开盘之前,提前为团队准备好行动指南;质检确保整个流水线的数据可信。
这五个环节中,每一个都可以手工完成——但手工意味着遗漏、延迟和疲劳期错误。自动化不是"省事",是"在凌晨 2:00 依然保持和下午 2:00 一样的质量"。
你的盘后工作流自动化了吗?
下一步行动
如果你在个人量化阶段,想快速搭建轻量盘后流水线:
- 注册 TickDB(免费层,API Key 即开即用)
- 将本文归档脚本中的
WATCHLIST替换为你的持仓标的 - 配置 Linux cron 定时任务:
20 21 * * 1-5 /opt/quant/venv/bin/python /opt/quant/scripts/daily_archive.py - 本地 DuckDB 文件自动积累历史数据,可直接用 DuckDB SQL 做因子回测
如果你在团队协作阶段,需要 DAG 可视化和多人告警:
- 参考本文 DAG 示例,在 Airflow 中配置
post_market_workflow - 将
send_summary_to_slack替换为你的团队通知渠道 - 将 DuckDB 替换为 PostgreSQL,启用远程访问供团队成员查询
如果你在机构阶段,需要企业级数据治理和权限管理:
- 联系 [email protected] 获取历史全量 K 线数据和机构 API 方案
- 部署 Dagster 替换 Airflow(资产隔离、代码版本溯源)
- 将 DuckDB 替换为 Snowflake / BigQuery,对接 BI 工具生成归因看板
如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,用自然语言描述你的策略思路,AI 将协助生成 TickDB 数据拉取引擎。
本文不构成任何投资建议。市场有风险,投资需谨慎。