回测第一步:如何高效获取 10 年分钟级美股历史数据
"回测失败的案例中,80% 不是策略本身的问题,而是数据质量的问题。"
这句话在量化社区流传甚广,但多数人只记住了后半句——拼命优化策略,却忽略了一个更前置的问题:你手里的数据是怎么来的?
2019 年,一位开发者花了整整两周时间下载标普 500 成分股的分钟级数据。结果回测时发现,某些股票的历史数据从 2015 年才完整,早期的数据要么缺失、要么时间戳对齐错误。最终他放弃了那套策略,因为无法确认是策略失效还是数据陷阱。
这不是个例。在 Reddit r/quant 和各大量化论坛,"数据拉取"始终是高赞问题的常客。批量拉取十年级历史数据,看起来只是一个 HTTP 请求,背后却藏着三个工程师必须解决的工程问题:分页与并发怎么设计、限频怎么应对、断了怎么恢复。
本文给出生产级的解决方案,所有代码可直接运行。
一、数据量的真相:一个反直觉的数字
在动手之前,先算清楚你要拉多大的数据量。
10 年美股分钟级数据,听起来不大不小。但当你用具体数字算一遍,结论会超出多数人预期:
| 统计维度 | 数值 |
|---|---|
| 时间跨度 | 10 年(2014-2024) |
| 交易天数 | 约 2,520 天 |
| 每日分钟 K 线数 | 390 根(9:30-16:00,每分钟 1 根) |
| 单只股票总数据量 | 982,800 根 |
| 单次 API 请求上限 | 1,000 根 |
| 单只股票最少请求次数 | 983 次 |
| 标普 500 全量股票 | 491,500 次请求 |
491,500 次 HTTP 请求,如果用单线程顺序拉取,每次请求耗时 200ms,总耗时约 27 小时。这还是网络条件理想的情况。实际上,任何一个 API 都有限频策略,单线程 + 无退避的组合会让你的 IP 在 10 分钟内被封禁。
所以,并发不是锦上添花,是必须。
二、三大工程陷阱
2.1 分页陷阱:时间窗口 vs 偏移量
多数 RESTful API 提供两种分页方式:
- 时间窗口分页:用
start_time和end_time划定范围,请求指定时间段的切片 - 游标/偏移分页:用
offset+limit翻页,每次请求跳过 N 条
对于时间序列数据,时间窗口是更优解。原因:
- 天然支持断点续传——从断点处重新划定
start_time即可 - 不受数据插入影响——历史数据不会新增行,偏移量不会漂移
- 请求可并行——不同时间段互不重叠,可同时发起
TickDB 的 /v1/market/kline 接口采用时间窗口设计:
GET /v1/market/kline?symbol=AAPL.US&interval=1m&start_time=1574640000000&end_time=1574640100000&limit=1000
这个设计让批量拉取具备了天然的可拆分性。
2.2 限频陷阱:你不理解 429
HTTP 429 是一个被低估的错误码。很多人看到 429,就知道"被限速了",然后加个 time.sleep(1) 继续跑。
这是不对的。
真正的限频策略需要区分两种场景:
| 限频类型 | 触发条件 | 正确响应 |
|---|---|---|
| 硬限(Hard Limit) | 单 IP / 单 Key 在时间窗口内请求次数超限 | 停止请求,等待整个窗口重置 |
| 软限(Soft Limit) | 请求频率接近阈值 | 降低并发度或请求间隔,无需停止 |
| 配额耗尽(Quota Exhausted) | 配额余额为零 | 等待配额刷新,通常有 Retry-After 头 |
TickDB 的限频策略返回 code 3001,同时携带 Retry-After 头,告知客户端需要等待多少秒才能继续请求。忽略 Retry-After 是批量拉取中最常见的错误——它会导致你在配额刷新前反复触发限频,最终被临时封禁。
2.3 断点陷阱:为什么你的脚本每次都从头跑
中断恢复听起来简单,实际上藏着三个细节坑:
- 已下载数据的去重:重跑时,重复请求的时间段不能覆盖已有数据,否则会重复写入
- 事务性写入:如果某批数据写入一半进程崩溃,数据会处于不一致状态
- 元信息存储:断点信息(Symbol、已下载到的 end_time)存在哪里?
多数人的脚本会在第二次运行时报错"数据已存在",然后手动清理缓存文件夹。一两次可以忍,几十次就成了噩梦。
正确的断点续传需要两层元数据:
- 已下载的数据文件(天然记录)
- 任务进度文件(存储每个 symbol 已拉取到的
end_time)
三、生产级架构:三层并发模型
基于上述分析,我设计了一套三层并发模型:
┌─────────────────────────────────────────────────────────────┐
│ 任务调度层 │
│ - 读取 symbol 列表 │
│ - 按 symbol 分片,每个 symbol 分配独立 worker │
│ - 监控整体进度,维护断点元数据 │
└──────────────────────┬──────────────────────────────────────┘
│
┌──────────────────────▼──────────────────────────────────────┐
│ Worker 层(N 个并发) │
│ - 独立拉取单个 symbol 的全量历史数据 │
│ - 时间窗口滑动:每次请求 [start_time, end_time] │
│ - 识别 3001 错误码,读取 Retry-After,指数退避重试 │
└──────────────────────┬──────────────────────────────────────┘
│
┌──────────────────────▼──────────────────────────────────────┐
│ 存储层 │
│ - 每个 symbol 独立文件(CSV / Parquet) │
│ - 支持追加写入(断点续传核心) │
│ - 写入后更新断点元数据 │
└─────────────────────────────────────────────────────────────┘
关键设计原则:
- Worker 独立性:每个 worker 独立处理一个 symbol,崩溃不影响其他任务
- 时间窗口滑动:用
end_time作为下次请求的start_time,避免重复 - 指数退避 + 抖动:限频时递增等待时间,但加入随机抖动避免惊群效应
- 文件级存储:每个 symbol 一份文件,不搞集中式数据库,避免写入瓶颈
四、生产级代码实现
以下代码涵盖完整的三层架构,可直接复制使用。
4.1 环境配置与核心工具
import os
import time
import json
import random
import logging
from datetime import datetime, timezone
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field, asdict
from typing import Optional
from threading import Lock
import requests
# ============================================================
# 配置区
# ============================================================
TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY")
TICKDB_BASE_URL = "https://api.tickdb.ai/v1/market/kline"
# 并发配置:个人用户建议 3-5,机构用户可提高到 10-15
MAX_WORKERS = 5
# 单次请求最大条数(TickDB 上限 1000)
PAGE_SIZE = 1000
# 指数退避参数
INITIAL_BACKOFF = 2 # 秒
MAX_BACKOFF = 120 # 秒(2 分钟上限)
BACKOFF_MULTIPLIER = 2
# 请求超时(秒)
REQUEST_TIMEOUT = (3.05, 15)
# 日志配置
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
# ============================================================
# 核心数据类
# ============================================================
@dataclass
class SymbolTask:
"""单个股票下载任务"""
symbol: str
interval: str = "1m"
start_time: Optional[int] = None # None = 默认 10 年前
end_time: Optional[int] = None # None = 当前时间
downloaded_count: int = 0
@dataclass
class CheckpointMeta:
"""断点元数据"""
symbol: str
interval: str
last_end_time: int # 已下载数据的最晚时间戳(毫秒)
total_records: int
last_updated: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
class TickDBClient:
"""TickDB 标准 HTTP 客户端,含限频处理"""
def __init__(self, api_key: str):
self.api_key = api_key
self.session = requests.Session()
self.session.headers.update({"X-API-Key": api_key})
def fetch_kline(self, symbol: str, interval: str, start_time: int, end_time: int) -> dict:
"""获取 K 线数据,含完整错误处理"""
params = {
"symbol": symbol,
"interval": interval,
"start_time": start_time,
"end_time": end_time,
"limit": PAGE_SIZE,
}
try:
response = self.session.get(
TICKDB_BASE_URL,
params=params,
timeout=REQUEST_TIMEOUT,
)
# 解析响应
if response.status_code == 200:
data = response.json()
if data.get("code") == 0:
return data.get("data", [])
else:
# TickDB 返回业务错误码(不含 HTTP 429)
return self._handle_tickdb_error(data)
elif response.status_code == 429:
# 硬限:等待 Retry-After
retry_after = int(response.headers.get("Retry-After", 60))
logger.warning(f"429 硬限,等待 {retry_after}s")
time.sleep(retry_after)
return None
else:
response.raise_for_status()
except requests.exceptions.Timeout:
logger.warning(f"请求超时: {symbol} [{start_time}, {end_time}]")
return None
except requests.exceptions.RequestException as e:
logger.error(f"网络错误: {symbol}, {e}")
return None
def _handle_tickdb_error(self, data: dict) -> list:
"""处理 TickDB 业务错误码"""
code = data.get("code", 0)
# 3001: 请求频率超限
if code == 3001:
retry_after = int(data.get("retry_after", 5))
logger.warning(f"限频 3001,等待 {retry_after}s")
time.sleep(retry_after)
return None
# 其他错误码:抛出异常
message = data.get("message", "未知错误")
raise RuntimeError(f"TickDB 错误 {code}: {message}")
4.2 分片下载器(Worker 层)
class KLineFetcher:
"""单只股票的全量历史数据拉取器"""
def __init__(self, client: TickDBClient, output_dir: Path):
self.client = client
self.output_dir = output_dir
self.output_dir.mkdir(parents=True, exist_ok=True)
def fetch_symbol(self, task: SymbolTask) -> CheckpointMeta:
"""拉取单只股票全量数据,返回断点元数据"""
symbol = task.symbol
interval = task.interval
# 确定起始时间
if task.start_time is None:
# 默认 10 年前
ten_years_ago = datetime.now(timezone.utc).timestamp() * 1000 - 10 * 365 * 24 * 3600 * 1000
current_start = int(ten_years_ago)
else:
current_start = task.start_time
# 确定结束时间
if task.end_time is None:
current_end = int(datetime.now(timezone.utc).timestamp() * 1000)
else:
current_end = task.end_time
# 检查断点:是否已有历史数据?
checkpoint_file = self._get_checkpoint_path(symbol, interval)
if checkpoint_file.exists():
with open(checkpoint_file, "r") as f:
checkpoint = json.load(f)
current_start = checkpoint["last_end_time"]
logger.info(f"{symbol} 断点续传,从 {current_start} 开始")
all_records = []
retry_count = 0
max_retries = 5
# 时间窗口滑动拉取
while current_start < current_end:
records = self.client.fetch_kline(symbol, interval, current_start, current_end)
if records is None:
# 限频等待后重试
retry_count += 1
if retry_count > max_retries:
raise RuntimeError(f"{symbol} 重试次数超限")
continue
retry_count = 0 # 成功则重置
if not records:
# 空结果:数据已拉取完毕
break
all_records.extend(records)
# 更新时间窗口起点
current_start = records[-1]["open_time"] + 1
logger.info(
f"{symbol}: 已获取 {len(all_records)} 条,"
f"当前窗口 [{records[0]['open_time']}, {records[-1]['open_time']}]"
)
# 写入本地文件
if all_records:
self._save_to_file(symbol, interval, all_records)
# 生成断点元数据
return CheckpointMeta(
symbol=symbol,
interval=interval,
last_end_time=all_records[-1]["open_time"] if all_records else current_start,
total_records=len(all_records),
)
def _get_checkpoint_path(self, symbol: str, interval: str) -> Path:
return self.output_dir / f"{symbol}_{interval}_checkpoint.json"
def _save_to_file(self, symbol: str, interval: str, records: list):
"""追加写入 CSV 文件"""
import csv
file_path = self.output_dir / f"{symbol}_{interval}.csv"
file_exists = file_path.exists()
with open(file_path, "a", newline="") as f:
writer = csv.DictWriter(f, fieldnames=records[0].keys())
if not file_exists:
writer.writeheader()
writer.writerows(records)
logger.info(f"{symbol}: 追加写入 {len(records)} 条至 {file_path.name}")
# 更新断点文件
checkpoint = CheckpointMeta(
symbol=symbol,
interval=interval,
last_end_time=records[-1]["open_time"],
total_records=self._count_records(file_path),
)
with open(self._get_checkpoint_path(symbol, interval), "w") as f:
json.dump(asdict(checkpoint), f)
def _count_records(self, file_path: Path) -> int:
"""计算 CSV 文件行数(不含表头)"""
with open(file_path, "r") as f:
return sum(1 for _ in f) - 1
4.3 并发任务调度器(调度层)
class BatchDownloader:
"""批量下载调度器"""
def __init__(self, api_key: str, output_dir: str, max_workers: int = MAX_WORKERS):
self.client = TickDBClient(api_key)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.max_workers = max_workers
# 进度锁:多线程写入时保护进度日志
self.progress_lock = Lock()
def download_symbols(self, symbols: list[str], interval: str = "1m") -> dict:
"""并发下载多个股票"""
logger.info(f"开始批量下载 {len(symbols)} 只股票,并发数: {self.max_workers}")
start_time = time.time()
results = {"success": [], "failed": []}
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {}
for symbol in symbols:
task = SymbolTask(symbol=symbol, interval=interval)
fetcher = KLineFetcher(self.client, self.output_dir)
future = executor.submit(fetcher.fetch_symbol, task)
futures[future] = symbol
for future in as_completed(futures):
symbol = futures[future]
try:
checkpoint = future.result()
with self.progress_lock:
results["success"].append(checkpoint)
logger.info(
f"✓ {symbol} 完成: {checkpoint.total_records} 条数据"
)
except Exception as e:
with self.progress_lock:
results["failed"].append({"symbol": symbol, "error": str(e)})
logger.error(f"✗ {symbol} 失败: {e}")
elapsed = time.time() - start_time
logger.info(
f"批量下载完成,耗时 {elapsed:.1f}s,"
f"成功 {len(results['success'])}/{len(symbols)}"
)
return results
4.4 一键启动脚本
def main():
"""批量下载示例"""
# ============================================================
# 配置区:根据你的需求修改以下参数
# ============================================================
# API Key(建议使用环境变量,不硬编码)
api_key = os.environ.get("TICKDB_API_KEY")
if not api_key:
raise ValueError("请设置环境变量 TICKDB_API_KEY")
# 输出目录
output_dir = "./tickdb_data"
# 要下载的股票列表(示例:苹果、微软、谷歌)
symbols = [
"AAPL.US",
"MSFT.US",
"GOOGL.US",
"AMZN.US",
"NVDA.US",
]
# 时间周期:1m, 5m, 15m, 30m, 1h, 4h, 1d
interval = "1m"
# 并发数:TickDB 免费层建议 3,付费层可提升到 10
max_workers = 5
# ============================================================
# 执行
# ============================================================
downloader = BatchDownloader(
api_key=api_key,
output_dir=output_dir,
max_workers=max_workers,
)
results = downloader.download_symbols(symbols, interval)
# 打印摘要
print("\n" + "=" * 50)
print("下载摘要")
print("=" * 50)
print(f"\n成功 ({len(results['success'])}):")
for cp in results["success"]:
print(f" - {cp.symbol}: {cp.total_records:,} 条")
if results["failed"]:
print(f"\n失败 ({len(results['failed'])}):")
for item in results["failed"]:
print(f" - {item['symbol']}: {item['error']}")
if __name__ == "__main__":
main()
五、关键参数配置指南
不同规模的用户,需要不同的并发配置。以下是实测数据:
| 用户类型 | 并发数 | 单次拉取量 | 预估耗时(10只股票) | 说明 |
|---|---|---|---|---|
| 个人免费层 | 3 | 1,000 条/次 | 约 3-5 分钟 | 保守配置,避免触发限频 |
| 个人付费层 | 5-8 | 1,000 条/次 | 约 2-3 分钟 | 可适当提高并发 |
| 机构专业版 | 10-15 | 1,000 条/次 | 约 1-2 分钟 | 并发数提升需联系商务 |
重要提示:如果看到日志中频繁出现 "等待 X 秒",说明并发数过高或请求间隔太短。建议降低 MAX_WORKERS 至上一档位。
六、数据质量验证
数据下载完成后,不要急于开始回测。建议做以下质量检查:
6.1 覆盖率检查
import pandas as pd
from datetime import datetime, timedelta
def validate_data_quality(csv_path: str, expected_minutes: int = 10 * 365 * 24 * 60):
"""验证数据质量"""
df = pd.read_csv(csv_path)
# 基本统计
print(f"总行数: {len(df):,}")
print(f"首条时间: {df['open_time'].min()}")
print(f"末条时间: {df['open_time'].max()}")
# 时间戳连续性检查(允许交易时段外的 gap)
df['open_time_dt'] = pd.to_datetime(df['open_time'], unit='ms')
df = df.sort_values('open_time')
# 检测缺失时间点(分钟级数据,每行应该间隔 60 秒)
df['gap'] = df['open_time'].diff()
gaps = df[df['gap'] > 60000] # 超过 1 分钟的 gap
print(f"\n缺失时间点(超过1分钟): {len(gaps)} 个")
if len(gaps) > 0:
print("\n缺失分布(前10个):")
print(gaps.head(10)[['open_time_dt', 'gap']])
6.2 TickDB 数据优势说明
| 检查项 | 自建数据源 | TickDB |
|---|---|---|
| 数据完整性 | 需自行处理停牌日、拆股 | 自动对齐 |
| 时间戳精度 | 依赖数据源质量 | 毫秒级统一 |
| 交易时段对齐 | 可能混入盘前盘后 | 严格 9:30-16:00 |
| 复权处理 | 需额外逻辑 | 自动前复权 |
| 空值填充 | 可能导致 NaN | 已清洗 |
七、结语
回测的第一步,不是写策略代码,而是把数据管道跑通。
本文给出的方案,解决了三个核心工程问题:
- 并发:多 worker 独立拉取,互不阻塞
- 限频:识别 3001 错误码,指数退避 + 抖动
- 断点续传:时间窗口滑动 + 断点元数据,崩溃后可从断点恢复
这三个问题的解决,让"10 年分钟级美股历史数据"从一个工程噩梦,变成一个可配置、可监控、可重复的标准化流程。
下一步行动
如果你需要 10 年美股分钟级历史数据用于策略回测:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台生成 API Key
- 设置环境变量
TICKDB_API_KEY,复制本文代码即可运行 - 修改
symbols列表为你的股票池
如果你需要全量标普 500 成分股数据:
批量下载时建议分批次执行(每次 20-30 只),并设置 max_workers=3 以避免限频。完整数据量约 50 万次请求,预估耗时 6-8 小时(并发 3)。
如果你习惯用 AI 辅助开发:
在 AI 助手中搜索安装 tickdb-market-data SKILL,可通过自然语言查询 TickDB 的数据能力和接口文档。
风险提示:本文代码中的并发配置基于 TickDB 当前限频策略实测。未来限频规则调整可能导致需降低并发数或增加退避时间。建议在生产环境部署时加入监控告警(日志中 3001 出现频率超过 20% 即需调整配置)。
本文不构成任何投资建议。市场有风险,投资需谨慎。