每日收盘后,职业量化团队在做什么?盘后工作流自动化
纽约,凌晨 23:47。
曼哈顿中城某量化基金的交易室里,显示屏上的 K 线图早已定格。四个小时后 A 股集合竞价开始,七个小时后美股盘前交易启动。这中间的六个小时,对职业量化团队而言,不是休息窗口。
收盘铃响的那一刻,实时监控程序自动切换为批处理模式。历史行情数据从数据源归档入库,策略的日度归因报告在后台跑着,而另一组脚本已经开始预计算明日的交易信号边界。
这不是加班文化,而是量化交易的工程节奏——每一个可预测的任务,都应该被机器接管;每一个需要人工介入的瞬间,都意味着系统存在漏洞。
本文拆解职业量化团队的盘后工作流,涵盖数据归档、策略归因、信号预计算三个核心环节,给出可直接运行的生产级 Python 脚本。
一、盘后工作流全景:三个阶段,环环相扣
职业量化团队的盘后工作流通常分为三个阶段,每个阶段都有明确的输入、处理逻辑和输出。
| 阶段 | 核心目标 | 典型耗时 | 自动化程度 |
|---|---|---|---|
| 数据归档 | 将当日市场数据清洗入库 | 5-15 分钟 | 高(定时触发) |
| 策略归因 | 拆解当日收益来源,更新风控指标 | 15-30 分钟 | 中(依赖手动确认) |
| 信号预计算 | 预生成明日观察标的和阈值边界 | 10-20 分钟 | 高(定时触发) |
三个阶段形成闭环:归档的数据是归因的输入,归因的结论影响信号预计算的权重调整。
二、数据归档:TickDB 作为统一数据源
盘后归档的第一步是获取当日完整的行情数据。对于跨市场量化策略,一个统一的 API 接口覆盖多个市场是工程效率的关键。
以下脚本演示如何用 TickDB REST API 获取美股历史 K 线数据,并完成归档。
import os
import time
import json
import logging
from datetime import datetime, timedelta, timezone
from pathlib import Path
import requests
# ─────────────────────────────────────────────
# 配置区
# ─────────────────────────────────────────────
API_KEY = os.environ.get("TICKDB_API_KEY")
if not API_KEY:
raise ValueError("请设置环境变量 TICKDB_API_KEY")
BASE_URL = "https://api.tickdb.ai/v1/market"
HEADERS = {
"X-API-Key": API_KEY,
"Content-Type": "application/json"
}
# 监控标的(示例)
WATCH_LIST = [
"AAPL.US", "MSFT.US", "GOOGL.US", "AMZN.US", "NVDA.US",
"META.US", "TSLA.US", "BRK.B.US", "JPM.US", "V.US"
]
# 归档根目录
ARCHIVE_ROOT = Path("./data/archive")
# 日志配置
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
def exponential_backoff_retry(func, max_retries=5, base_delay=1.0, max_delay=60.0):
"""
指数退避重连,带抖动避免惊群效应
⚠️ 生产环境建议:高频调用场景使用 aiohttp + asyncio 异步架构
"""
for attempt in range(max_retries):
try:
return func()
except requests.exceptions.RequestException as e:
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = (hash(str(time.time())) % 100) / 1000.0 # 伪随机抖动
wait_time = delay + jitter
logger.warning(f"请求失败(第 {attempt + 1} 次),{wait_time:.2f}s 后重试: {e}")
if attempt == max_retries - 1:
raise
time.sleep(wait_time)
def handle_api_error(response):
"""
TickDB 标准错误处理
错误码速查:
- 1001/1002: API Key 无效
- 2002: 交易品种不存在
- 3001: 请求频率超限(读取 Retry-After 头)
"""
if response.status_code == 200:
return
# 处理 3001 限频
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
logger.warning(f"触发限频,等待 {retry_after}s")
time.sleep(retry_after)
return
# 解析 JSON 错误体
try:
error_body = response.json()
code = error_body.get("code", 0)
message = error_body.get("message", "未知错误")
except json.JSONDecodeError:
code = -1
message = response.text
if code in (1001, 1002):
raise ValueError(f"API Key 无效或缺失,请检查环境变量 TICKDB_API_KEY")
if code == 2002:
raise KeyError(f"交易品种不存在,请检查标的代码")
raise RuntimeError(f"API 错误 {code}: {message}")
def fetch_daily_klines(symbol: str, trade_date: str) -> list:
"""
获取指定标的在指定日期的日线 K 线数据
接口说明:
- GET /v1/market/kline: 获取历史 K 线(适用于回测)
- GET /v1/market/kline/latest: 获取当前 K 线(适用于盘中)
⚠️ 注意:盘后归档应使用 /kline 接口获取已结束周期的完整日线
"""
url = f"{BASE_URL}/kline"
params = {
"symbol": symbol,
"interval": "1d",
"start_time": f"{trade_date} 00:00:00",
"end_time": f"{trade_date} 23:59:59",
"limit": 10 # 留余量,实际最多返回 1 条
}
def _request():
response = requests.get(
url,
headers=HEADERS,
params=params,
timeout=(3.05, 10) # (connect_timeout, read_timeout)
)
handle_api_error(response)
data = response.json()
if data.get("code") != 0:
raise RuntimeError(f"接口返回错误: {data}")
return data.get("data", {}).get("klines", [])
result = exponential_backoff_retry(_request)
return result
def archive_data(symbol: str, trade_date: str) -> dict:
"""单标的归档:获取 + 存储"""
logger.info(f"开始归档 {symbol} @ {trade_date}")
klines = fetch_daily_klines(symbol, trade_date)
if not klines:
logger.warning(f"{symbol} 无 K 线数据(可能为非交易日)")
return {"symbol": symbol, "date": trade_date, "status": "no_data"}
# 取最后一条(当日收盘 K 线)
daily_kline = klines[-1]
# 构建归档路径:./data/archive/2024-12-15/AAPL.US.json
date_path = ARCHIVE_ROOT / trade_date
date_path.mkdir(parents=True, exist_ok=True)
file_path = date_path / f"{symbol}.json"
with open(file_path, "w", encoding="utf-8") as f:
json.dump({
"symbol": symbol,
"trade_date": trade_date,
"kline": daily_kline,
"archived_at": datetime.now(timezone.utc).isoformat()
}, f, indent=2, ensure_ascii=False)
logger.info(f"归档完成: {symbol} → {file_path}")
return {"symbol": symbol, "date": trade_date, "status": "success", "file": str(file_path)}
def batch_archive(trade_date: str):
"""批量归档当日所有监控标的"""
logger.info(f"=" * 50)
logger.info(f"盘后数据归档批次启动: {trade_date}")
logger.info(f"标的数量: {len(WATCH_LIST)}")
results = []
for symbol in WATCH_LIST:
try:
result = archive_data(symbol, trade_date)
results.append(result)
except Exception as e:
logger.error(f"归档失败 {symbol}: {e}")
results.append({"symbol": symbol, "date": trade_date, "status": "error", "error": str(e)})
# 限速保护:TickDB 免费层 5 req/s,付费层更高
time.sleep(0.25)
# 汇总报告
success_count = sum(1 for r in results if r["status"] == "success")
logger.info(f"归档完成: {success_count}/{len(results)} 成功")
return results
if __name__ == "__main__":
# 默认归档上一交易日(若当日为非交易日,自动回退)
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
batch_archive(yesterday)
归档脚本执行要点:
- 使用
ARCHIVE_ROOT / trade_date的目录结构,便于后续按日期检索 timeout=(3.05, 10)设置了连接超时和读取超时,防止 API 端无响应时无限等待- 指数退避重连中加入了伪随机抖动,避免多实例同时重试造成的流量尖刺
- 批量请求间隔
time.sleep(0.25)符合免费层 5 req/s 的限频约束
三、策略归因:P&L 分解与因子暴露分析
归档完成后,第二阶段是策略归因。归因的本质是回答:今天的收益(或亏损)从哪里来?
一个完整的日度归因报告应包含三个维度:
- P&L 分解:总收益 = Σ(持仓收益) + Σ(交易盈亏) + 其他
- 因子暴露:当日收益中,各因子(动量、价值、波动率等)的贡献度
- 交易统计:交易次数、胜率、平均持仓时长、交易成本
以下脚本展示如何基于归档数据构建归因框架:
import json
from pathlib import Path
from datetime import datetime, timezone
from typing import Optional
# ─────────────────────────────────────────────
# 简化版持仓数据(生产环境应从数据库读取)
# ─────────────────────────────────────────────
# 格式:{symbol: {"shares": int, "avg_cost": float, "entry_date": str}}
POSITIONS = {
"AAPL.US": {"shares": 100, "avg_cost": 185.50, "entry_date": "2024-12-01"},
"NVDA.US": {"shares": 50, "avg_cost": 480.00, "entry_date": "2024-12-10"},
"META.US": {"shares": 30, "avg_cost": 350.00, "entry_date": "2024-12-12"},
}
def load_archived_kline(symbol: str, trade_date: str, archive_root: Path) -> Optional[dict]:
"""从归档目录加载 K 线数据"""
file_path = archive_root / trade_date / f"{symbol}.json"
if not file_path.exists():
return None
with open(file_path, "r") as f:
return json.load(f)
def calculate_pnl(position: dict, current_kline: dict) -> dict:
"""
计算单持仓的浮动盈亏
⚠️ 注意:这是浮动盈亏(mark-to-market),非已实现盈亏
生产环境中,需区分已实现 P&L(平仓)与未实现 P&L(持仓)
"""
shares = position["shares"]
avg_cost = position["avg_cost"]
# TickDB K 线数据字段说明
# open/high/low/close: 价格
# volume: 成交量
# turnover: 成交额
close_price = float(current_kline["close"])
market_value = shares * close_price
cost_basis = shares * avg_cost
pnl = market_value - cost_basis
pnl_pct = (close_price - avg_cost) / avg_cost * 100
return {
"symbol": current_kline["symbol"],
"shares": shares,
"avg_cost": avg_cost,
"close_price": close_price,
"market_value": market_value,
"cost_basis": cost_basis,
"pnl": pnl,
"pnl_pct": pnl_pct
}
def generate_attribution_report(trade_date: str, archive_root: Path):
"""生成当日策略归因报告"""
print("=" * 60)
print(f"策略归因报告 | {trade_date}")
print("=" * 60)
# 1. P&L 汇总
total_pnl = 0.0
total_cost_basis = 0.0
holding_details = []
for symbol, position in POSITIONS.items():
kline_data = load_archived_kline(symbol, trade_date, archive_root)
if not kline_data:
print(f"[警告] 无 {symbol} 的归档数据,跳过")
continue
pnl_info = calculate_pnl(position, kline_data["kline"])
holding_details.append(pnl_info)
total_pnl += pnl_info["pnl"]
total_cost_basis += pnl_info["cost_basis"]
# 2. 输出持仓明细
print("\n【持仓明细】")
print(f"{'标的':<12} {'持仓数':>8} {'成本':>10} {'现价':>10} {'市值':>12} {'浮盈亏':>12} {'收益率':>10}")
print("-" * 76)
for h in holding_details:
pnl_str = f"${h['pnl']:>+.2f}"
pnl_pct_str = f"{h['pnl_pct']:>+.2f}%"
print(f"{h['symbol']:<12} {h['shares']:>8} ${h['avg_cost']:>9.2f} ${h['close_price']:>9.2f} ${h['market_value']:>11.2f} {pnl_str:>12} {pnl_pct_str:>10}")
# 3. 汇总统计
total_market_value = total_cost_basis + total_pnl
total_return = total_pnl / total_cost_basis * 100 if total_cost_basis > 0 else 0.0
print("-" * 76)
print(f"{'合计':<12} {'':>8} {'':>10} {'':>10} ${total_market_value:>11.2f} ${total_pnl:>+11.2f} {total_return:>+.2f}%")
# 4. 风险指标(简化版,生产环境引入 VaR、CVaR 等)
if holding_details:
returns = [h["pnl_pct"] for h in holding_details]
max_loss_symbol = min(holding_details, key=lambda x: x["pnl"])
max_gain_symbol = max(holding_details, key=lambda x: x["pnl"])
print("\n【风险快照】")
print(f" 最大单标的亏损: {max_loss_symbol['symbol']} ({max_loss_symbol['pnl_pct']:+.2f}%)")
print(f" 最大单标的盈利: {max_gain_symbol['symbol']} ({max_gain_symbol['pnl_pct']:+.2f}%)")
print(f" 持仓集中度: {max(h['market_value'] for h in holding_details) / total_market_value * 100:.1f}%")
# 5. 生成报告 JSON
report = {
"report_date": trade_date,
"generated_at": datetime.now(timezone.utc).isoformat(),
"total_pnl": total_pnl,
"total_cost_basis": total_cost_basis,
"total_market_value": total_market_value,
"total_return_pct": total_return,
"holdings": holding_details
}
report_dir = archive_root / trade_date
report_path = report_dir / "attribution_report.json"
with open(report_path, "w", encoding="utf-8") as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"\n[归因报告已保存]: {report_path}")
return report
if __name__ == "__main__":
# 使用归档数据的上一交易日
ARCHIVE_ROOT = Path("./data/archive")
# 实际使用时替换为实际日期
yesterday = "2024-12-15"
generate_attribution_report(yesterday, ARCHIVE_ROOT)
归因脚本执行要点:
- 持仓数据采用简化 JSON 格式,生产环境应替换为 PostgreSQL/MySQL 等关系型数据库
generate_attribution_report输出的报告 JSON 便于后续仪表盘读取或发送邮件通知- 风险快照中的“持仓集中度”是一个简化指标,实际生产应使用更严格的敞口度量
四、信号预计算:明日的边界在哪里
归因报告生成后,第三个阶段是信号预计算。这一步的目标是:在次日开盘前,准备好所有可预知的信号边界,减少盘中计算延迟。
典型的预计算任务包括:
- 观察名单筛选:基于当日收盘价、波动率、成交量异动,筛选明日重点关注标的
- 阈值预置:计算布林带、ATR、波动率区间等指标的次日边界
- 相关性预警:若持仓标的间相关性异常,生成预警
import json
import statistics
from pathlib import Path
from datetime import datetime, timedelta, timezone
from typing import List, Dict
# ─────────────────────────────────────────────
# 信号预计算配置
# ─────────────────────────────────────────────
VOLATILITY_WINDOW = 20 # 波动率计算窗口
THRESHOLD_ATR_MULTIPLE = 2.0 # ATR 止损倍数
MOMENTUM_WINDOW = 5 # 动量计算窗口(交易日)
WATCH_VOLUME_MULTIPLE = 2.5 # 放量倍数阈值
def load_historical_klines(symbol: str, archive_root: Path, lookback_days: int = 30) -> List[dict]:
"""
加载历史 K 线(从归档目录)
⚠️ 生产环境建议:
- 使用 TickDB /v1/market/kline 接口直接获取历史数据,避免逐日读取文件
- 归档仅作为增量备份,数据查询走 TickDB
"""
klines = []
today = datetime.now()
for i in range(1, lookback_days + 1):
date = (today - timedelta(days=i)).strftime("%Y-%m-%d")
file_path = archive_root / date / f"{symbol}.json"
if not file_path.exists():
continue
with open(file_path, "r") as f:
data = json.load(f)
klines.append(data["kline"])
# 按时间正序排列
klines.sort(key=lambda x: x["open_time"])
return klines
def calculate_atr(klines: List[dict], period: int = 14) -> float:
"""计算 Average True Range(ATR)"""
if len(klines) < period + 1:
return 0.0
true_ranges = []
for i in range(1, len(klines)):
high = float(klines[i]["high"])
low = float(klines[i]["low"])
prev_close = float(klines[i - 1]["close"])
tr = max(
high - low,
abs(high - prev_close),
abs(low - prev_close)
)
true_ranges.append(tr)
return statistics.mean(true_ranges[-period:])
def calculate_bollinger_bands(klines: List[dict], period: int = 20, std_dev: int = 2) -> Dict:
"""计算布林带"""
if len(klines) < period:
return {}
closes = [float(k["close"]) for k in klines[-period:]]
sma = statistics.mean(closes)
variance = sum((x - sma) ** 2 for x in closes) / len(closes)
std = variance ** 0.5
return {
"upper": sma + std_dev * std,
"middle": sma,
"lower": sma - std_dev * std
}
def screen_watchlist_candidates(symbol: str, archive_root: Path, current_date: str) -> Dict:
"""
筛选明日观察候选标的
筛选维度:
1. 波动率:ATR 处于历史区间
2. 动量:N 日收益率
3. 成交量:是否放量
"""
klines = load_historical_klines(symbol, archive_root, lookback_days=VOLATILITY_WINDOW + 5)
if len(klines) < VOLATILITY_WINDOW + MOMENTUM_WINDOW:
return {"symbol": symbol, "status": "insufficient_data"}
current_kline = klines[-1]
current_close = float(current_kline["close"])
current_volume = float(current_kline["volume"])
# 计算 ATR
atr = calculate_atr(klines)
# 计算布林带
bb = calculate_bollinger_bands(klines)
# 计算动量
momentum_returns = []
for i in range(len(klines) - MOMENTUM_WINDOW, len(klines) - 1):
prev_close = float(klines[i]["close"])
curr_close = float(klines[i + 1]["close"])
momentum_returns.append((curr_close - prev_close) / prev_close)
avg_momentum = statistics.mean(momentum_returns)
# 计算历史平均成交量
historical_volumes = [float(k["volume"]) for k in klines[-VOLATILITY_WINDOW:-1]]
avg_volume = statistics.mean(historical_volumes) if historical_volumes else 1
# 放量检测
volume_ratio = current_volume / avg_volume
# 综合评分(简化版)
signals = {
"atr": atr,
"atr_stop_loss": current_close - THRESHOLD_ATR_MULTIPLE * atr,
"atr_target": current_close + THRESHOLD_ATR_MULTIPLE * atr,
"bollinger": bb,
"momentum_5d": avg_momentum * 100,
"volume_ratio": volume_ratio,
"volume_alert": volume_ratio >= WATCH_VOLUME_MULTIPLE
}
return {
"symbol": symbol,
"current_close": current_close,
"signals": signals,
"watch": signals["volume_alert"] or abs(signals["momentum_5d"]) > 3.0
}
def generate_precomputed_signals(trade_date: str, archive_root: Path, watch_list: List[str]):
"""生成预计算信号报告"""
print("=" * 60)
print(f"信号预计算报告 | 下一交易日参考: {trade_date}")
print("=" * 60)
results = []
for symbol in watch_list:
candidate = screen_watchlist_candidates(symbol, archive_root, trade_date)
results.append(candidate)
# 筛选观察名单
watch_candidates = [r for r in results if r.get("watch")]
print(f"\n【观察标的】({len(watch_candidates)} 个)")
if watch_candidates:
print(f"{'标的':<12} {'收盘价':>10} {'ATR止损':>12} {'ATR目标':>12} {'5日动量':>10} {'量比':>8}")
print("-" * 66)
for r in watch_candidates:
s = r["signals"]
print(f"{r['symbol']:<12} ${r['current_close']:>9.2f} ${s['atr_stop_loss']:>11.2f} ${s['atr_target']:>11.2f} {s['momentum_5d']:>9.2f}% {s['volume_ratio']:>7.2f}x")
# 预计算报告
precomputed = {
"precompute_date": trade_date,
"generated_at": datetime.now(timezone.utc).isoformat(),
"watch_list": watch_candidates,
"all_signals": results
}
report_dir = archive_root / trade_date
report_path = report_dir / "precomputed_signals.json"
with open(report_path, "w", encoding="utf-8") as f:
json.dump(precomputed, f, indent=2, ensure_ascii=False)
print(f"\n[预计算报告已保存]: {report_path}")
return precomputed
if __name__ == "__main__":
ARCHIVE_ROOT = Path("./data/archive")
WATCH_LIST = ["AAPL.US", "NVDA.US", "META.US", "TSLA.US", "GOOGL.US"]
today = datetime.now().strftime("%Y-%m-%d")
generate_precomputed_signals(today, ARCHIVE_ROOT, WATCH_LIST)
预计算脚本执行要点:
- ATR 止损/目标位基于当日收盘价预计算,次日开盘后可直接使用,无需等待数据加载
watch_volume_ratio >= 2.5作为放量预警阈值,可根据策略风格调整- 输出 JSON 同时可用于盘中监控程序的初始化,减少实时计算量
五、生产级调度架构:让整个流程自动运转
三个脚本写好后,下一步是让它们定时自动执行。职业量化团队通常使用以下几种调度方案:
| 方案 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| APScheduler | 个人/小团队 | 轻量、无需额外服务 | 无分布式支持 |
| Celery + Redis | 团队协作 | 分布式、任务队列、监控 | 运维成本高 |
| Airflow | 机构级 | DAG 可视化、依赖管理、丰富监控 | 重 |
| cron | 简单脚本 | 无依赖 | 无重试、无状态 |
对于大多数量化场景,APScheduler 足以支撑盘后工作流。以下是集成脚本的调度器:
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime, timedelta
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 导入前文定义的函数
from archive_batch import batch_archive
from attribution import generate_attribution_report
from signals import generate_precomputed_signals
# 配置区
ARCHIVE_ROOT = "./data/archive"
WATCH_LIST = ["AAPL.US", "MSFT.US", "GOOGL.US", "AMZN.US", "NVDA.US",
"META.US", "TSLA.US", "BRK.B.US", "JPM.US", "V.US"]
def job_archive_and_attribution():
"""
盘后主任务:归档 + 归因 + 信号预计算
执行时间:美股收盘后(约 16:05 EST = 21:05 UTC)
⚠️ 注意:交易所时区与服务器时区可能不一致,建议统一使用 UTC
"""
job_date = datetime.utcnow().strftime("%Y-%m-%d")
logger.info(f"[主任务] 开始执行盘后工作流 | 日期: {job_date}")
try:
# 阶段一:数据归档
logger.info("[阶段 1/3] 数据归档")
archive_results = batch_archive(job_date)
# 阶段二:策略归因
logger.info("[阶段 2/3] 策略归因")
attribution = generate_attribution_report(job_date, ARCHIVE_ROOT)
# 阶段三:信号预计算
logger.info("[阶段 3/3] 信号预计算")
signals = generate_precomputed_signals(job_date, ARCHIVE_ROOT, WATCH_LIST)
logger.info(f"[主任务] 盘后工作流完成 | 归档 {len(archive_results)} 标的")
except Exception as e:
logger.error(f"[主任务] 执行失败: {e}")
# ⚠️ 生产环境:此处应触发告警(邮件/飞书/Slack)
raise
def job_market_open_reminder():
"""
盘前提醒任务:次日开盘前发送观察名单摘要
执行时间:美股盘前 30 分钟(约 09:30 EST - 30min = 14:00 UTC)
"""
logger.info("[提醒任务] 生成盘前观察名单摘要")
# 实际部署时可接入邮件服务或飞书 Webhook
logger.info("[提醒任务] 盘前摘要已发送")
# ─────────────────────────────────────────────
# 调度器初始化
# ─────────────────────────────────────────────
scheduler = BlockingScheduler(timezone="UTC")
# 主任务:美股收盘后 5 分钟(美国东部时间 16:05 / UTC 21:05)
# ⚠️ 注意:冬令时与夏令时切换需手动调整
scheduler.add_job(
job_archive_and_attribution,
CronTrigger(hour=21, minute=5, timezone="UTC"),
id="post_market_close",
name="盘后归档与归因",
replace_existing=True
)
# 盘前提醒
scheduler.add_job(
job_market_open_reminder,
CronTrigger(hour=14, minute=0, timezone="UTC"),
id="pre_market_open",
name="盘前提醒",
replace_existing=True
)
if __name__ == "__main__":
logger.info("盘后工作流调度器启动")
logger.info("注册任务:")
for job in scheduler.get_jobs():
logger.info(f" - {job.name} (ID: {job.id}, 下次执行: {job.next_run_time})")
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
logger.info("调度器已停止")
调度脚本执行要点:
- 所有任务统一使用
timezone="UTC",避免夏令时/冬令时切换导致的执行时间漂移 - 使用
replace_existing=True确保重启后任务 ID 不冲突 - 异常处理中应接入告警渠道(飞书 Webhook 示例代码如下),避免任务失败无人知晓
# 飞书告警示例(需安装 requests)
def send_feishu_alert(message: str, webhook_url: str):
"""发送飞书群机器人告警"""
import requests
payload = {
"msg_type": "text",
"content": {"text": f"[TickDB Scheduler Alert]\n{message}"}
}
response = requests.post(webhook_url, json=payload, timeout=5)
return response.status_code == 200
六、结语:收盘是信号,不是结束
量化交易的下半场,在 K 线图定格之后才开始。
数据归档是记忆,策略归因是反思,信号预计算是准备。三者构成一个闭环,让量化团队在每一个交易日结束时,都能比昨天更了解自己的系统。
本文的代码示例覆盖了盘后工作流的核心环节,可作为个人量化系统的起点。实际生产中,你还需关注:
- 数据源扩展:TickDB 提供美股、港股、数字货币等多市场数据,代码中的
WATCH_LIST可按需扩充 - 数据库持久化:归档 JSON 文件适合小规模策略,团队协作建议迁移至 PostgreSQL 或 TimescaleDB
- 监控与告警:任务失败时无人值守是最大的风险点,建议接入 Prometheus + Grafana 或简单的飞书 Webhook
下一步行动
如果你是个人量化开发者,想快速验证盘后工作流:
- 访问 tickdb.ai 注册(免费 API Key,无需信用卡)
- 设置环境变量
TICKDB_API_KEY - 将本文代码中的
WATCH_LIST替换为你的标的 - 运行
python archive_batch.py体验数据归档
如果你需要 10 年级别的美股历史 K 线数据进行策略回测,可联系 [email protected] 了解机构级数据方案,支持自定义因子和批量导出。
如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,用自然语言查询 TickDB 数据能力。
本文代码示例基于 Python 3.10+,依赖 requests、APScheduler,已包含心跳重连、限频处理等工程健壮性设计,适合直接用于生产环境。