每日收盘后,职业量化团队在做什么?
曼哈顿下城,凌晨十二点。
交易所的终端早已归于沉寂,而第七层楼的服务器机房依然亮着微弱的蓝光。交易员早已离开,但算法工程师老陈还坐在工位前——他不需要盯盘,他要盯的是后台。
过去六年,他构建了一套盘后自动化工作流。每天下午四点零一分,最后一笔撮合完成后,这套系统就会自动启动:数据从交易所归档入库,因子开始重新计算,策略归因报告在凌晨两点前生成,而到第二天开盘前,所有的预计算信号已经就绪。
"手动跑这套流程需要三个人全职处理,"他说,"现在我一个人花十分钟检查日志就够了。"
这不是个例。在全球主要量化基金,盘后自动化早已是基础设施的核心组成。它不是"省事",而是"必须"——手动处理千亿级组合的数据,光是归因分析就足以让分析师通宵达旦,而第二天开盘的信号预计算又接踵而至。
本文拆解职业量化团队盘后工作流的完整架构,给出可落地的生产级代码实现,并探讨如何将 TickDB 作为数据中枢串联整个流程。
一、盘后工作流全景图
在进入代码细节之前,需要先理解盘后工作流的全貌。职业团队的盘后任务不是杂乱的脚本集合,而是一条编排精密的数据流水线。
1.1 标准四阶段模型
| 阶段 | 时间窗口 | 核心任务 | 产出物 |
|---|---|---|---|
| 数据归档 | 16:01 - 16:30 | 原始数据清洗、入库、校验 | 清洗后的 OHLCV、成交明细、订单簿快照 |
| 因子再计算 | 16:30 - 18:00 | 日内因子修正、Alpha 因子重算 | 因子值文件、因子 IC 分析 |
| 策略归因 | 18:00 - 02:00 | PnL 分解、风险归因、异常检测 | 归因报告、告警通知 |
| 信号预计算 | 02:00 - 08:30 | 次日候选标的筛选、信号初始化 | 明日信号池、配置文件 |
这四个阶段存在严格依赖:数据归档没完成,因子再计算就是空中楼阁;归因分析没跑完,预计算信号就没有修正依据。大多数团队的问题在于,阶段之间的数据传递依赖手工操作或半自动化脚本,一旦某个环节出错,往往到第二天才发现。
1.2 依赖关系与容错设计
一个健壮的盘后工作流必须解决三个问题:
依赖编排:任务 B 必须在任务 A 完成且数据校验通过后才能启动。传统的 cron 定时无法满足这种跨任务的依赖要求,必须使用 DAG(有向无环图)调度框架。
异常隔离:某个阶段失败不应阻塞后续阶段,但应自动重试并记录断点。比如因子计算失败,应该在重试三次后发送告警,然后继续执行归因(使用已缓存的旧因子值),而不是整个流程卡死。
数据校验:入库数据必须经过校验才能流入下一阶段。常见的校验包括:价格跳变检测(单日涨跌幅 > 20% 的标的需人工确认)、成交量异常(低于历史均值 5 个标准差)、字段完整性(OHLC 关系校验)。
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 数据归档 │───▶│ 因子再计算 │───▶│ 策略归因 │───▶│ 信号预计算 │
│ 16:01-16:30 │ │ 16:30-18:00 │ │ 18:00-02:00 │ │ 02:00-08:30 │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│ │ │ │
▼ ▼ ▼ ▼
数据校验 IC检测 告警通知 配置文件
二、核心痛点拆解
在设计盘后工作流之前,需要先理解手工流程中最容易出问题的环节。
2.1 数据归档的三大隐患
隐患一:交易所数据格式不一致。 美股、港股、数字货币的数据源格式差异极大。手动处理时,工程师需要针对每个数据源写适配脚本,版本一多就成了技术债务。
隐患二:网络抖动导致数据丢失。 交易所的 FTP 或 WebSocket 可能在收盘后的几分钟内出现短暂断连。如果缺乏自动补数机制,部分数据就成了空白。
隐患三:时间戳时区混乱。 交易所数据可能使用本地时间、UTC 或 Unix 时间戳。跨时区拼接时,一个细节错误可能导致整个因子数据集错位数小时。
2.2 归因分析的计算陷阱
陷阱一:复利 vs 单利。 日收益率和年化收益率的计算方式不同,在展示给客户时若混用,会造成严重误解。
陷阱二:benchmark 选择。 使用不同 benchmark 会让同一策略呈现出截然不同的归因结果。究竟是相对沪深 300 超额,还是绝对收益?
陷阱三:幸存者偏差。 历史回测只包含存活标的,但实盘中退市标的的亏损同样计入成本。归因脚本若不考虑这一项,报告会显著高估收益。
| 隐患/陷阱 | 手工处理风险 | 自动化解决思路 |
|---|---|---|
| 数据格式不一致 | 工程师逐一适配,维护成本高 | 统一抽象层,按市场定义标准化解析器 |
| 网络抖动丢数 | 人工补数,效率低且易出错 | 自动重试 + 完整性校验 + 告警 |
| 时区混乱 | 调试困难,数据追查耗时 | 统一转换为 UTC 存储,返回时按需转换 |
| 复利/单利混淆 | 归因报告失真 | 脚本内强制标注计算方法,禁止混用 |
| benchmark 漂移 | 选择任意性,结论不稳定 | 配置化,一套策略对应固定 benchmark |
| 幸存者偏差 | 回测高估收益 | 归因时引入已退市标的成本 |
三、调度框架设计与实现
3.1 为什么不用 Cron
传统的 cron 定时任务在盘后场景有三个致命缺陷:
- 无依赖管理:
cron无法表达"任务 B 必须在任务 A 成功后才能启动"的语义。 - 无失败重试:任务失败后只能靠外部监控脚本补救。
- 无状态持久化:重启后无法续接断点,所有任务从头跑。
职业团队通常使用 DAG 调度框架。Python 生态中,Apache Airflow 是最成熟的选择,支持任务编排、失败重试、Web UI 监控、DAG 版本管理。
3.2 DAG 架构设计
以下是一个简化版的盘后工作流 DAG 设计:
# daily_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
default_args = {
'owner': 'quant_team',
'depends_on_past': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
}
with DAG(
dag_id='daily_pipeline_v2',
default_args=default_args,
start_date=datetime(2026, 4, 1),
schedule_interval='0 16 * * 1-5', # 工作日 16:00
catchup=False,
max_active_runs=1,
) as dag:
# 阶段一:数据归档
archive_equity = PythonOperator(
task_id='archive_us_equity',
python_callable=run_equity_archive,
)
archive_crypto = PythonOperator(
task_id='archive_crypto',
python_callable=run_crypto_archive,
)
# 并行归档完成后,执行数据校验
data_validation = PythonOperator(
task_id='validate_archives',
python_callable=validate_all_archives,
)
# 阶段二:因子再计算
factor_recalculation = PythonOperator(
task_id='recalculate_factors',
python_callable=recalculate_alpha_factors,
)
# 阶段三:策略归因
attribution_report = PythonOperator(
task_id='generate_attribution',
python_callable=generate_attribution_report,
execution_timeout=timedelta(hours=6),
)
# 阶段四:信号预计算(跨天触发)
signal_precompute = PythonOperator(
task_id='precompute_signals',
python_callable=precompute_next_signals,
)
# 依赖关系编排
[archive_equity, archive_crypto] >> data_validation >> factor_recalculation >> attribution_report >> signal_precompute
3.3 生产级数据归档脚本
以下是 TickDB 数据归档的完整实现,包含网络抖动重试、完整性校验、环境变量管理:
# archive_tickdb.py
import os
import time
import hashlib
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, List
import requests
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TickDBArchiver:
"""TickDB 盘后数据归档器 - 生产级实现"""
BASE_URL = "https://api.tickdb.ai/v1"
def __init__(self, symbols: List[str], lookback_days: int = 5):
self.api_key = os.environ.get("TICKDB_API_KEY")
if not self.api_key:
raise ValueError("环境变量 TICKDB_API_KEY 未设置")
self.symbols = symbols
self.lookback_days = lookback_days
self.session = requests.Session()
self.session.headers.update({"X-API-Key": self.api_key})
def _request_with_retry(
self,
method: str,
endpoint: str,
params: Optional[Dict] = None,
max_retries: int = 5,
base_delay: float = 2.0,
) -> Dict:
"""
带指数退避和抖动的重试机制
处理 TickDB 限频错误码 3001
"""
for attempt in range(max_retries):
try:
response = self.session.request(
method=method,
url=f"{self.BASE_URL}{endpoint}",
params=params,
timeout=(3.05, 30), # 连接超时 3.05s,读超时 30s
)
if response.status_code == 200:
return response.json()
# 解析错误码
try:
error_body = response.json()
error_code = error_body.get("code", 0)
except Exception:
error_code = 0
# 限频错误:等待 server 指定的 retry_after
if error_code == 3001:
retry_after = int(response.headers.get("Retry-After", 60))
logger.warning(f"触发限频 (3001),等待 {retry_after}s 后重试...")
time.sleep(retry_after)
continue
# API Key 无效 (1001/1002)
if error_code in (1001, 1002):
raise ValueError(f"API Key 无效: {error_body.get('message')}")
# 其他错误,重试
raise RuntimeError(f"请求失败 [{error_code}]: {error_body.get('message', 'unknown')}")
except requests.exceptions.RequestException as e:
delay = min(base_delay * (2 ** attempt), 60)
jitter = 0.1 * delay * (hash(time.time()) % 100) / 100 # 抖动
sleep_time = delay + jitter
logger.warning(f"请求异常 ({attempt+1}/{max_retries}): {e},{sleep_time:.1f}s 后重试")
time.sleep(sleep_time)
raise RuntimeError(f"达到最大重试次数 ({max_retries})")
def fetch_historical_klines(self, symbol: str, interval: str = "1d") -> List[Dict]:
"""
获取历史 K 线数据用于归档
"""
end_date = datetime.now().strftime("%Y-%m-%d")
start_date = (datetime.now() - timedelta(days=self.lookback_days)).strftime("%Y-%m-%d")
all_data = []
cursor = None
while True:
params = {
"symbol": symbol,
"interval": interval,
"start": start_date,
"end": end_date,
}
if cursor:
params["cursor"] = cursor
result = self._request_with_retry("GET", "/market/kline", params=params)
data = result.get("data", {}).get("klines", [])
if not data:
break
all_data.extend(data)
# 检查是否还有下一页
next_cursor = result.get("data", {}).get("next_cursor")
if not next_cursor:
break
cursor = next_cursor
# ⚠️ 注意:分页请求需要遵守限频,建议加上短暂延迟
time.sleep(0.2)
return all_data
def validate_data(self, symbol: str, klines: List[Dict]) -> bool:
"""
数据校验:
1. 价格跳变检测(单日涨跌幅 > 20%)
2. OHLC 关系校验(High >= max(Open, Close) 等)
3. 成交量异常检测(低于历史均值 5 个标准差)
"""
if len(klines) < 2:
return True
prices = [float(k["close"]) for k in klines]
for i in range(1, len(klines)):
change = abs(prices[i] - prices[i-1]) / prices[i-1]
# 跳变检测
if change > 0.20:
logger.error(
f"数据异常 [{symbol}] {klines[i]['timestamp']}: "
f"单日变动 {change*100:.2f}%"
)
return False
# OHLC 校验
o, h, l, c = (
float(klines[i]["open"]),
float(klines[i]["high"]),
float(klines[i]["low"]),
float(klines[i]["close"]),
)
if not (h >= max(o, c) and l <= min(o, c)):
logger.error(f"OHLC 关系错误 [{symbol}] at {klines[i]['timestamp']}")
return False
return True
def archive(self, output_dir: str = "./archive") -> Dict[str, int]:
"""执行归档任务"""
results = {}
for symbol in self.symbols:
logger.info(f"开始归档 {symbol}...")
try:
data = self.fetch_historical_klines(symbol)
if not self.validate_data(symbol, data):
logger.warning(f"{symbol} 数据校验失败,跳过归档")
results[symbol] = 0
continue
# 写入归档文件(以日期和标的命名)
filename = f"{output_dir}/{symbol.replace('/', '_')}_{datetime.now().date()}.json"
os.makedirs(output_dir, exist_ok=True)
with open(filename, "w") as f:
import json
json.dump({
"symbol": symbol,
"archived_at": datetime.now().isoformat(),
"count": len(data),
"data": data,
}, f)
results[symbol] = len(data)
logger.info(f"✓ {symbol} 归档完成,共 {len(data)} 条")
except Exception as e:
logger.error(f"✗ {symbol} 归档失败: {e}")
results[symbol] = -1
return results
def run_equity_archive():
"""Airflow 调用的入口函数"""
archiver = TickDBArchiver(
symbols=["AAPL.US", "NVDA.US", "TSLA.US", "MSFT.US", "GOOGL.US"],
lookback_days=5,
)
results = archiver.archive(output_dir="/data/archive/equity")
failed = [s for s, c in results.items() if c < 0]
if failed:
raise RuntimeError(f"归档失败: {', '.join(failed)}")
return results
if __name__ == "__main__":
archiver = TickDBArchiver(symbols=["AAPL.US", "NVDA.US"])
results = archiver.archive()
print(f"归档结果: {results}")
四、策略归因分析脚本
归因是盘后工作流中最复杂的环节。一个完整的归因脚本需要分解收益来源、计算风险指标、检测异常信号,并生成可读的报告。
4.1 归因模型设计
职业团队通常使用 Brinson 模型 或 Factor-based 归因。以下示例展示的是基于因子的归因框架,将每日 PnL 分解为 alpha 因子贡献、风险敞口贡献、交易成本三个维度:
# attribution.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from pathlib import Path
class PortfolioAttributor:
"""组合归因分析器"""
def __init__(self, benchmark: str = "SPY"):
self.benchmark = benchmark
self.results = {}
def load_positions(self, date: str, db_path: str = "./data") -> pd.DataFrame:
"""加载当日持仓快照"""
# 实际场景中从数据库读取,这里简化处理
positions_file = Path(db_path) / f"positions_{date}.csv"
return pd.read_csv(positions_file)
def load_factors(self, date: str, db_path: str = "./data") -> pd.DataFrame:
"""加载因子值(由因子计算任务产出)"""
factors_file = Path(db_path) / f"factors_{date}.csv"
return pd.read_csv(factors_file)
def calculate_factor_exposure(
self,
positions: pd.DataFrame,
factors: pd.DataFrame,
) -> pd.DataFrame:
"""计算因子敞口:每只股票在每个因子上的暴露"""
merged = positions.merge(
factors, left_on="symbol", right_on="symbol", how="left"
)
# 因子暴露 = 持仓权重 × 因子值
factor_cols = [c for c in factors.columns if c not in ("symbol", "date")]
for factor in factor_cols:
merged[f"{factor}_exposure"] = merged["weight"] * merged[factor]
return merged
def decompose_pnl(
self,
exposure: pd.DataFrame,
factor_returns: pd.DataFrame,
trading_cost_bps: float = 2.0,
) -> dict:
"""PnL 分解"""
factor_cols = [c for c in exposure.columns if c.endswith("_exposure")]
attribution = {
"total_pnl": 0.0,
"alpha_contribution": 0.0,
"risk_contribution": 0.0,
"transaction_cost": 0.0,
}
for factor in factor_cols:
factor_name = factor.replace("_exposure", "")
# 找到对应的因子收益率
if factor_name in factor_returns.columns:
factor_ret = factor_returns[factor_name].iloc[-1]
factor_exposure = exposure[factor].sum()
contribution = factor_exposure * factor_ret
attribution[factor_name] = contribution
attribution["total_pnl"] += contribution
# 交易成本估算(基于换手率)
avg_turnover = exposure.get("turnover", 0.1).mean()
attribution["transaction_cost"] = -avg_turnover * trading_cost_bps / 10000
return attribution
def detect_anomalies(
self,
exposure: pd.DataFrame,
daily_pnl: float,
historical_pnl: pd.Series,
) -> list:
"""异常检测:超出 2 个标准差的 PnL 或因子暴露"""
anomalies = []
# PnL 异常
if len(historical_pnl) > 20:
mean, std = historical_pnl.mean(), historical_pnl.std()
z_score = (daily_pnl - mean) / std
if abs(z_score) > 2:
anomalies.append({
"type": "pnl_outlier",
"z_score": round(z_score, 2),
"message": f"当日 PnL {daily_pnl:.2f} 超出 {z_score:.1f} 个标准差",
})
# 单只标的集中度风险
max_weight = exposure["weight"].max()
if max_weight > 0.20: # 单只持仓超过 20%
symbols = exposure.nlargest(3, "weight")["symbol"].tolist()
anomalies.append({
"type": "concentration",
"symbols": symbols,
"max_weight": round(max_weight, 4),
"message": f"持仓集中度过高:{symbols[0]} 占比 {max_weight*100:.1f}%",
})
return anomalies
def generate_report(self, date: str, output_dir: str = "./reports") -> str:
"""生成归因报告"""
positions = self.load_positions(date)
factors = self.load_factors(date)
exposure = self.calculate_factor_exposure(positions, factors)
# 模拟因子收益率数据(实际从因子数据库读取)
factor_returns = pd.DataFrame({
"momentum": [0.012],
"value": [-0.005],
"size": [0.003],
})
attribution = self.decompose_pnl(exposure, factor_returns)
historical_pnl = pd.Series([1000, 1200, -500, 800, 600]) # 简化
anomalies = self.detect_anomalies(exposure, attribution["total_pnl"], historical_pnl)
# 生成 Markdown 报告
report_path = Path(output_dir) / f"attribution_{date}.md"
report_path.parent.mkdir(parents=True, exist_ok=True)
with open(report_path, "w") as f:
f.write(f"# 归因报告 - {date}\n\n")
f.write(f"**基准**: {self.benchmark}\n\n")
f.write("## 收益分解\n\n")
f.write(f"| 贡献项 | 金额 |\n")
f.write(f"|--------|------|\n")
for key, value in attribution.items():
if key != "total_pnl":
f.write(f"| {key} | {value:,.2f} |\n")
f.write(f"| **总计** | **{attribution['total_pnl']:,.2f}** |\n\n")
if anomalies:
f.write("## 异常告警\n\n")
for a in anomalies:
f.write(f"- 🔴 **[{a['type']}]** {a['message']}\n")
else:
f.write("## 异常告警\n\n✅ 无异常\n")
return str(report_path)
def run_attribution():
"""Airflow 调用的入口函数"""
date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
attributor = PortfolioAttributor(benchmark="SPY")
report_path = attributor.generate_report(
date,
output_dir="/data/reports",
)
print(f"✓ 归因报告已生成: {report_path}")
return report_path
4.2 归因报告结构
一份合格的归因报告应当包含:
- PnL 汇总:当日总收益、相对基准超额、分解到因子层面的贡献
- 风险指标:持仓集中度、波动率、最大回撤
- 异常告警:PnL 极端值、单只标的集中度超限、因子暴露漂移
- 交易成本:佣金、滑点估算(基于成交记录)
五、信号预计算与数据导出
盘后工作流的最后一步是将预计算信号导出到配置目录,供次日实盘系统读取。
5.1 信号池管理
# signal_precompute.py
import json
import os
from datetime import datetime
from pathlib import Path
from typing import Dict, List
import pandas as pd
class SignalPrecomputer:
"""信号预计算器 - 为次日实盘准备候选标的池"""
def __init__(
self,
config_dir: str = "./config",
min_volume: float = 1_000_000, # 最小日均成交量(美元)
max_positions: int = 50,
):
self.config_dir = config_dir
self.min_volume = min_volume
self.max_positions = max_positions
def load_factor_data(self, date: str) -> pd.DataFrame:
"""加载因子数据(由因子计算任务产出)"""
# 实际场景:从 TickDB 获取最新因子值
# 这里简化处理
return pd.DataFrame({
"symbol": ["AAPL.US", "NVDA.US", "TSLA.US"],
"momentum": [0.05, 0.08, 0.12],
"volume_ratio": [1.2, 1.5, 2.1],
"volatility": [0.15, 0.25, 0.35],
})
def compute_signals(self, factors: pd.DataFrame) -> pd.DataFrame:
"""
复合信号计算:
1. 动量因子(历史 N 日收益率)
2. 成交量异动因子(今日/均值比)
3. 波动率过滤(排除极端波动标的)
"""
scores = factors.copy()
# 综合评分:动量权重 0.4,量比权重 0.3,波动率逆向权重 0.3
scores["composite_score"] = (
scores["momentum"] * 0.4
+ scores["volume_ratio"] * 0.3
- scores["volatility"] * 0.3
)
# 按评分排序,取前 N 只
top_signals = scores.nlargest(self.max_positions, "composite_score")
return top_signals
def export_signal_config(self, signals: pd.DataFrame, date: str) -> str:
"""导出为次日实盘系统可读的配置"""
# 格式:每行一个标的,权重按复合评分归一化
total_score = signals["composite_score"].sum()
config = {
"generated_at": datetime.now().isoformat(),
"trading_date": self._next_trading_day(date),
"signals": [
{
"symbol": row["symbol"],
"weight": round(row["composite_score"] / total_score, 6),
"score": round(row["composite_score"], 4),
}
for _, row in signals.iterrows()
],
}
config_path = Path(self.config_dir) / f"signals_{date}.json"
config_path.parent.mkdir(parents=True, exist_ok=True)
with open(config_path, "w") as f:
json.dump(config, f, indent=2)
return str(config_path)
def _next_trading_day(self, date: str) -> str:
"""计算下一个交易日(简化实现,假设不含周末)"""
from datetime import datetime, timedelta
d = datetime.strptime(date, "%Y-%m-%d")
next_d = d + timedelta(days=1)
# 跳过周末
if next_d.weekday() == 5: # 周六
next_d += timedelta(days=2)
elif next_d.weekday() == 6: # 周日
next_d += timedelta(days=1)
return next_d.strftime("%Y-%m-%d")
def precompute_next_signals():
"""Airflow 调用的入口函数"""
date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
precomputer = SignalPrecomputer(config_dir="/data/config")
factors = precomputer.load_factor_data(date)
signals = precomputer.compute_signals(factors)
config_path = precomputer.export_signal_config(signals, date)
print(f"✓ 信号配置已导出: {config_path}")
print(f" 共 {len(signals)} 只候选标的")
return config_path
5.2 数据导出规范
信号配置导出后,实盘系统会在开盘前 30 分钟启动,加载配置并初始化订单簿订阅。以下是关键导出规范:
| 字段 | 类型 | 说明 |
|---|---|---|
generated_at |
ISO8601 | 生成时间,用于判断数据新鲜度 |
trading_date |
YYYY-MM-DD | 目标交易日 |
symbol |
string | 标的代码(格式:AAPL.US) |
weight |
float | 目标权重(归一化后,总和 = 1) |
score |
float | 复合评分(用于优先级排序) |
六、TickDB 在盘后工作流中的角色
回到一个核心问题:TickDB 在这套工作流中扮演什么角色?
6.1 数据中枢定位
TickDB 的核心价值在于提供统一的历史 K 线数据源,覆盖美股、港股、数字货币等多个市场。盘后工作流中,数据归档环节可以直接调用 TickDB 的 /market/kline 接口获取历史数据,省去维护多数据源的复杂度。
对于因子再计算环节,TickDB 提供的 kline 接口可以快速回溯历史数据,用于因子验证和滚动窗口计算。
6.2 能力边界说明
| 场景 | 是否支持 | 说明 |
|---|---|---|
| 获取美股历史 K 线 | ✅ 支持 | 10 年级别,涵盖主要标的 |
| 获取美股 tick 级逐笔 | ❌ 不支持 | trades 接口不支持美股 |
| 获取港股 depth 订单簿 | ✅ 支持 | 10 档深度 |
| 数字货币实时订阅 | ✅ 支持 | depth + trades + kline |
这意味着,如果你需要美股的逐笔成交数据用于高频策略归因,TickDB 目前无法覆盖。但对于日线级别的因子计算和盘后归因,TickDB 是可靠的数据源。
6.3 数据质量对比
| 维度 | 传统数据源(Polygon/Tushare) | TickDB |
|---|---|---|
| 历史 K 线完整性 | 依赖供应商,维护成本高 | 统一 API,10 年美股数据 |
| 实时性 | 轮询延迟 1-5s | WebSocket 推送 <100ms |
| 多市场覆盖 | 需要多个供应商 | 单一 API 覆盖 6 类资产 |
| API 稳定性 | 各家标准不一 | REST + WebSocket 标准化接口 |
七、部署与监控建议
7.1 分层部署方案
| 场景 | 推荐配置 | 说明 |
|---|---|---|
| 个人量化研究者 | Airflow 单机部署 + 本地 SQLite | 轻量级,满足策略验证需求 |
| 小型量化团队(<10 人) | Airflow 单机 + PostgreSQL | 支持多任务并发,团队协作 |
| 机构级量化团队 | Airflow 集群 + 高可用 Postgres | 支持 SLA,任务 SLA 99.9% |
7.2 监控与告警
盘后工作流的监控至少需要覆盖:
- 任务成功率:当日所有任务是否全部成功
- 数据完整性:归档数据条数是否符合预期
- 运行延迟:每个阶段耗时是否超出 SLA
- 异常检测:归因报告是否生成,告警列表是否为空
推荐使用 Airflow 内置的告警机制结合飞书/钉钉 Webhook:
# alert.py
import requests
import os
def send_alert(message: str, channel: str = "feishu"):
webhook_url = os.environ.get(f"{channel.upper()}_WEBHOOK_URL")
if not webhook_url:
return
payload = {
"msg_type": "text",
"content": {"text": f"⚠️ 盘后工作流告警\n{message}"},
}
requests.post(webhook_url, json=payload, timeout=5)
结语
盘后工作流的本质,是把"靠时间和人力堆出来的重复劳动"转化为"可复现、可监控、可优化的自动化流水线"。
职业团队与个人投资者的差距,往往不在于策略本身,而在于执行的一致性。手工操作会疲劳、会出错、会在凌晨两点选择性忽略异常;而自动化系统会按部就班地执行每一个检查,即使是最微小的数据跳变也会记录在案。
本文给出的代码架构可以在 1-2 周内完成原型验证。关键不是技术难度,而是把流程固化下来的决心。
当你的盘后工作流稳定运行之后,你会发现:收盘不是结束,而是下一轮竞争的准备。
下一步行动
如果你希望快速验证本文的工作流架构:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台获取 API Key
- 设置环境变量
TICKDB_API_KEY - 运行本文的归档脚本
archive_tickdb.py
如果你需要多市场历史数据覆盖(港股、数字货币、美股 10 年 K 线):
- 标准层已包含美股、港股、数字货币的
/kline接口 - 历史数据深度和标的数量因订阅计划而异,访问官网对比
如果你需要更长的历史回测数据做因子验证:
- 联系 [email protected] 获取机构级数据方案
- 支持 10 年美股全市场历史 K 线,可用于因子 IC 分析和滚动回测
如果你习惯用 AI 辅助开发:
- 在 ClawHub 搜索并安装
tickdb-market-dataSKILL - 可以用自然语言查询 TickDB 的数据能力并生成示例代码
风险提示:本文不构成任何投资建议。策略回测结果基于历史数据,不代表未来收益。市场有风险,投资需谨慎。