回测第一步:如何高效获取 10 年分钟级美股历史数据

"回测失败的案例中,80% 不是策略本身的问题,而是数据质量的问题。"

这句话在量化社区流传甚广,但多数人只记住了后半句——拼命优化策略,却忽略了一个更前置的问题:你手里的数据是怎么来的?

2019 年,一位开发者花了整整两周时间下载标普 500 成分股的分钟级数据。结果回测时发现,某些股票的历史数据从 2015 年才完整,早期的数据要么缺失、要么时间戳对齐错误。最终他放弃了那套策略,因为无法确认是策略失效还是数据陷阱。

这不是个例。在 Reddit r/quant 和各大量化论坛,"数据拉取"始终是高赞问题的常客。批量拉取十年级历史数据,看起来只是一个 HTTP 请求,背后却藏着三个工程师必须解决的工程问题:分页与并发怎么设计、限频怎么应对、断了怎么恢复。

本文给出生产级的解决方案,所有代码可直接运行。


一、数据量的真相:一个反直觉的数字

在动手之前,先算清楚你要拉多大的数据量。

10 年美股分钟级数据,听起来不大不小。但当你用具体数字算一遍,结论会超出多数人预期:

统计维度 数值
时间跨度 10 年(2014-2024)
交易天数 约 2,520 天
每日分钟 K 线数 390 根(9:30-16:00,每分钟 1 根)
单只股票总数据量 982,800 根
单次 API 请求上限 1,000 根
单只股票最少请求次数 983 次
标普 500 全量股票 491,500 次请求

491,500 次 HTTP 请求,如果用单线程顺序拉取,每次请求耗时 200ms,总耗时约 27 小时。这还是网络条件理想的情况。实际上,任何一个 API 都有限频策略,单线程 + 无退避的组合会让你的 IP 在 10 分钟内被封禁。

所以,并发不是锦上添花,是必须。


二、三大工程陷阱

2.1 分页陷阱:时间窗口 vs 偏移量

多数 RESTful API 提供两种分页方式:

  • 时间窗口分页:用 start_timeend_time 划定范围,请求指定时间段的切片
  • 游标/偏移分页:用 offset + limit 翻页,每次请求跳过 N 条

对于时间序列数据,时间窗口是更优解。原因:

  1. 天然支持断点续传——从断点处重新划定 start_time 即可
  2. 不受数据插入影响——历史数据不会新增行,偏移量不会漂移
  3. 请求可并行——不同时间段互不重叠,可同时发起

TickDB 的 /v1/market/kline 接口采用时间窗口设计:

GET /v1/market/kline?symbol=AAPL.US&interval=1m&start_time=1574640000000&end_time=1574640100000&limit=1000

这个设计让批量拉取具备了天然的可拆分性。

2.2 限频陷阱:你不理解 429

HTTP 429 是一个被低估的错误码。很多人看到 429,就知道"被限速了",然后加个 time.sleep(1) 继续跑。

这是不对的。

真正的限频策略需要区分两种场景:

限频类型 触发条件 正确响应
硬限(Hard Limit) 单 IP / 单 Key 在时间窗口内请求次数超限 停止请求,等待整个窗口重置
软限(Soft Limit) 请求频率接近阈值 降低并发度或请求间隔,无需停止
配额耗尽(Quota Exhausted) 配额余额为零 等待配额刷新,通常有 Retry-After

TickDB 的限频策略返回 code 3001,同时携带 Retry-After 头,告知客户端需要等待多少秒才能继续请求。忽略 Retry-After 是批量拉取中最常见的错误——它会导致你在配额刷新前反复触发限频,最终被临时封禁。

2.3 断点陷阱:为什么你的脚本每次都从头跑

中断恢复听起来简单,实际上藏着三个细节坑:

  1. 已下载数据的去重:重跑时,重复请求的时间段不能覆盖已有数据,否则会重复写入
  2. 事务性写入:如果某批数据写入一半进程崩溃,数据会处于不一致状态
  3. 元信息存储:断点信息(Symbol、已下载到的 end_time)存在哪里?

多数人的脚本会在第二次运行时报错"数据已存在",然后手动清理缓存文件夹。一两次可以忍,几十次就成了噩梦。

正确的断点续传需要两层元数据

  • 已下载的数据文件(天然记录)
  • 任务进度文件(存储每个 symbol 已拉取到的 end_time

三、生产级架构:三层并发模型

基于上述分析,我设计了一套三层并发模型:

┌─────────────────────────────────────────────────────────────┐
│                      任务调度层                              │
│  - 读取 symbol 列表                                          │
│  - 按 symbol 分片,每个 symbol 分配独立 worker               │
│  - 监控整体进度,维护断点元数据                              │
└──────────────────────┬──────────────────────────────────────┘
                       │
┌──────────────────────▼──────────────────────────────────────┐
│                      Worker 层(N 个并发)                   │
│  - 独立拉取单个 symbol 的全量历史数据                        │
│  - 时间窗口滑动:每次请求 [start_time, end_time]             │
│  - 识别 3001 错误码,读取 Retry-After,指数退避重试          │
└──────────────────────┬──────────────────────────────────────┘
                       │
┌──────────────────────▼──────────────────────────────────────┐
│                      存储层                                  │
│  - 每个 symbol 独立文件(CSV / Parquet)                     │
│  - 支持追加写入(断点续传核心)                              │
│  - 写入后更新断点元数据                                      │
└─────────────────────────────────────────────────────────────┘

关键设计原则

  • Worker 独立性:每个 worker 独立处理一个 symbol,崩溃不影响其他任务
  • 时间窗口滑动:用 end_time 作为下次请求的 start_time,避免重复
  • 指数退避 + 抖动:限频时递增等待时间,但加入随机抖动避免惊群效应
  • 文件级存储:每个 symbol 一份文件,不搞集中式数据库,避免写入瓶颈

四、生产级代码实现

以下代码涵盖完整的三层架构,可直接复制使用。

4.1 环境配置与核心工具

import os
import time
import json
import random
import logging
from datetime import datetime, timezone
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field, asdict
from typing import Optional
from threading import Lock
import requests

# ============================================================
# 配置区
# ============================================================
TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY")
TICKDB_BASE_URL = "https://api.tickdb.ai/v1/market/kline"

# 并发配置:个人用户建议 3-5,机构用户可提高到 10-15
MAX_WORKERS = 5

# 单次请求最大条数(TickDB 上限 1000)
PAGE_SIZE = 1000

# 指数退避参数
INITIAL_BACKOFF = 2  # 秒
MAX_BACKOFF = 120    # 秒(2 分钟上限)
BACKOFF_MULTIPLIER = 2

# 请求超时(秒)
REQUEST_TIMEOUT = (3.05, 15)

# 日志配置
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)


# ============================================================
# 核心数据类
# ============================================================
@dataclass
class SymbolTask:
    """单个股票下载任务"""
    symbol: str
    interval: str = "1m"
    start_time: Optional[int] = None  # None = 默认 10 年前
    end_time: Optional[int] = None    # None = 当前时间
    downloaded_count: int = 0


@dataclass
class CheckpointMeta:
    """断点元数据"""
    symbol: str
    interval: str
    last_end_time: int  # 已下载数据的最晚时间戳(毫秒)
    total_records: int
    last_updated: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())


class TickDBClient:
    """TickDB 标准 HTTP 客户端,含限频处理"""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.session = requests.Session()
        self.session.headers.update({"X-API-Key": api_key})
    
    def fetch_kline(self, symbol: str, interval: str, start_time: int, end_time: int) -> dict:
        """获取 K 线数据,含完整错误处理"""
        params = {
            "symbol": symbol,
            "interval": interval,
            "start_time": start_time,
            "end_time": end_time,
            "limit": PAGE_SIZE,
        }
        
        try:
            response = self.session.get(
                TICKDB_BASE_URL,
                params=params,
                timeout=REQUEST_TIMEOUT,
            )
            
            # 解析响应
            if response.status_code == 200:
                data = response.json()
                if data.get("code") == 0:
                    return data.get("data", [])
                else:
                    # TickDB 返回业务错误码(不含 HTTP 429)
                    return self._handle_tickdb_error(data)
            
            elif response.status_code == 429:
                # 硬限:等待 Retry-After
                retry_after = int(response.headers.get("Retry-After", 60))
                logger.warning(f"429 硬限,等待 {retry_after}s")
                time.sleep(retry_after)
                return None
            
            else:
                response.raise_for_status()
        
        except requests.exceptions.Timeout:
            logger.warning(f"请求超时: {symbol} [{start_time}, {end_time}]")
            return None
        except requests.exceptions.RequestException as e:
            logger.error(f"网络错误: {symbol}, {e}")
            return None
    
    def _handle_tickdb_error(self, data: dict) -> list:
        """处理 TickDB 业务错误码"""
        code = data.get("code", 0)
        
        # 3001: 请求频率超限
        if code == 3001:
            retry_after = int(data.get("retry_after", 5))
            logger.warning(f"限频 3001,等待 {retry_after}s")
            time.sleep(retry_after)
            return None
        
        # 其他错误码:抛出异常
        message = data.get("message", "未知错误")
        raise RuntimeError(f"TickDB 错误 {code}: {message}")

4.2 分片下载器(Worker 层)

class KLineFetcher:
    """单只股票的全量历史数据拉取器"""
    
    def __init__(self, client: TickDBClient, output_dir: Path):
        self.client = client
        self.output_dir = output_dir
        self.output_dir.mkdir(parents=True, exist_ok=True)
    
    def fetch_symbol(self, task: SymbolTask) -> CheckpointMeta:
        """拉取单只股票全量数据,返回断点元数据"""
        symbol = task.symbol
        interval = task.interval
        
        # 确定起始时间
        if task.start_time is None:
            # 默认 10 年前
            ten_years_ago = datetime.now(timezone.utc).timestamp() * 1000 - 10 * 365 * 24 * 3600 * 1000
            current_start = int(ten_years_ago)
        else:
            current_start = task.start_time
        
        # 确定结束时间
        if task.end_time is None:
            current_end = int(datetime.now(timezone.utc).timestamp() * 1000)
        else:
            current_end = task.end_time
        
        # 检查断点:是否已有历史数据?
        checkpoint_file = self._get_checkpoint_path(symbol, interval)
        if checkpoint_file.exists():
            with open(checkpoint_file, "r") as f:
                checkpoint = json.load(f)
            current_start = checkpoint["last_end_time"]
            logger.info(f"{symbol} 断点续传,从 {current_start} 开始")
        
        all_records = []
        retry_count = 0
        max_retries = 5
        
        # 时间窗口滑动拉取
        while current_start < current_end:
            records = self.client.fetch_kline(symbol, interval, current_start, current_end)
            
            if records is None:
                # 限频等待后重试
                retry_count += 1
                if retry_count > max_retries:
                    raise RuntimeError(f"{symbol} 重试次数超限")
                continue
            
            retry_count = 0  # 成功则重置
            
            if not records:
                # 空结果:数据已拉取完毕
                break
            
            all_records.extend(records)
            
            # 更新时间窗口起点
            current_start = records[-1]["open_time"] + 1
            
            logger.info(
                f"{symbol}: 已获取 {len(all_records)} 条,"
                f"当前窗口 [{records[0]['open_time']}, {records[-1]['open_time']}]"
            )
        
        # 写入本地文件
        if all_records:
            self._save_to_file(symbol, interval, all_records)
        
        # 生成断点元数据
        return CheckpointMeta(
            symbol=symbol,
            interval=interval,
            last_end_time=all_records[-1]["open_time"] if all_records else current_start,
            total_records=len(all_records),
        )
    
    def _get_checkpoint_path(self, symbol: str, interval: str) -> Path:
        return self.output_dir / f"{symbol}_{interval}_checkpoint.json"
    
    def _save_to_file(self, symbol: str, interval: str, records: list):
        """追加写入 CSV 文件"""
        import csv
        
        file_path = self.output_dir / f"{symbol}_{interval}.csv"
        
        file_exists = file_path.exists()
        
        with open(file_path, "a", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=records[0].keys())
            
            if not file_exists:
                writer.writeheader()
            
            writer.writerows(records)
        
        logger.info(f"{symbol}: 追加写入 {len(records)} 条至 {file_path.name}")
        
        # 更新断点文件
        checkpoint = CheckpointMeta(
            symbol=symbol,
            interval=interval,
            last_end_time=records[-1]["open_time"],
            total_records=self._count_records(file_path),
        )
        with open(self._get_checkpoint_path(symbol, interval), "w") as f:
            json.dump(asdict(checkpoint), f)
    
    def _count_records(self, file_path: Path) -> int:
        """计算 CSV 文件行数(不含表头)"""
        with open(file_path, "r") as f:
            return sum(1 for _ in f) - 1

4.3 并发任务调度器(调度层)

class BatchDownloader:
    """批量下载调度器"""
    
    def __init__(self, api_key: str, output_dir: str, max_workers: int = MAX_WORKERS):
        self.client = TickDBClient(api_key)
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.max_workers = max_workers
        
        # 进度锁:多线程写入时保护进度日志
        self.progress_lock = Lock()
    
    def download_symbols(self, symbols: list[str], interval: str = "1m") -> dict:
        """并发下载多个股票"""
        logger.info(f"开始批量下载 {len(symbols)} 只股票,并发数: {self.max_workers}")
        
        start_time = time.time()
        results = {"success": [], "failed": []}
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {}
            
            for symbol in symbols:
                task = SymbolTask(symbol=symbol, interval=interval)
                fetcher = KLineFetcher(self.client, self.output_dir)
                future = executor.submit(fetcher.fetch_symbol, task)
                futures[future] = symbol
            
            for future in as_completed(futures):
                symbol = futures[future]
                try:
                    checkpoint = future.result()
                    
                    with self.progress_lock:
                        results["success"].append(checkpoint)
                        logger.info(
                            f"✓ {symbol} 完成: {checkpoint.total_records} 条数据"
                        )
                
                except Exception as e:
                    with self.progress_lock:
                        results["failed"].append({"symbol": symbol, "error": str(e)})
                        logger.error(f"✗ {symbol} 失败: {e}")
        
        elapsed = time.time() - start_time
        logger.info(
            f"批量下载完成,耗时 {elapsed:.1f}s,"
            f"成功 {len(results['success'])}/{len(symbols)}"
        )
        
        return results

4.4 一键启动脚本

def main():
    """批量下载示例"""
    
    # ============================================================
    # 配置区:根据你的需求修改以下参数
    # ============================================================
    
    # API Key(建议使用环境变量,不硬编码)
    api_key = os.environ.get("TICKDB_API_KEY")
    if not api_key:
        raise ValueError("请设置环境变量 TICKDB_API_KEY")
    
    # 输出目录
    output_dir = "./tickdb_data"
    
    # 要下载的股票列表(示例:苹果、微软、谷歌)
    symbols = [
        "AAPL.US",
        "MSFT.US",
        "GOOGL.US",
        "AMZN.US",
        "NVDA.US",
    ]
    
    # 时间周期:1m, 5m, 15m, 30m, 1h, 4h, 1d
    interval = "1m"
    
    # 并发数:TickDB 免费层建议 3,付费层可提升到 10
    max_workers = 5
    
    # ============================================================
    # 执行
    # ============================================================
    
    downloader = BatchDownloader(
        api_key=api_key,
        output_dir=output_dir,
        max_workers=max_workers,
    )
    
    results = downloader.download_symbols(symbols, interval)
    
    # 打印摘要
    print("\n" + "=" * 50)
    print("下载摘要")
    print("=" * 50)
    
    print(f"\n成功 ({len(results['success'])}):")
    for cp in results["success"]:
        print(f"  - {cp.symbol}: {cp.total_records:,} 条")
    
    if results["failed"]:
        print(f"\n失败 ({len(results['failed'])}):")
        for item in results["failed"]:
            print(f"  - {item['symbol']}: {item['error']}")


if __name__ == "__main__":
    main()

五、关键参数配置指南

不同规模的用户,需要不同的并发配置。以下是实测数据:

用户类型 并发数 单次拉取量 预估耗时(10只股票) 说明
个人免费层 3 1,000 条/次 约 3-5 分钟 保守配置,避免触发限频
个人付费层 5-8 1,000 条/次 约 2-3 分钟 可适当提高并发
机构专业版 10-15 1,000 条/次 约 1-2 分钟 并发数提升需联系商务

重要提示:如果看到日志中频繁出现 "等待 X 秒",说明并发数过高或请求间隔太短。建议降低 MAX_WORKERS 至上一档位。


六、数据质量验证

数据下载完成后,不要急于开始回测。建议做以下质量检查:

6.1 覆盖率检查

import pandas as pd
from datetime import datetime, timedelta

def validate_data_quality(csv_path: str, expected_minutes: int = 10 * 365 * 24 * 60):
    """验证数据质量"""
    df = pd.read_csv(csv_path)
    
    # 基本统计
    print(f"总行数: {len(df):,}")
    print(f"首条时间: {df['open_time'].min()}")
    print(f"末条时间: {df['open_time'].max()}")
    
    # 时间戳连续性检查(允许交易时段外的 gap)
    df['open_time_dt'] = pd.to_datetime(df['open_time'], unit='ms')
    df = df.sort_values('open_time')
    
    # 检测缺失时间点(分钟级数据,每行应该间隔 60 秒)
    df['gap'] = df['open_time'].diff()
    gaps = df[df['gap'] > 60000]  # 超过 1 分钟的 gap
    
    print(f"\n缺失时间点(超过1分钟): {len(gaps)} 个")
    
    if len(gaps) > 0:
        print("\n缺失分布(前10个):")
        print(gaps.head(10)[['open_time_dt', 'gap']])

6.2 TickDB 数据优势说明

检查项 自建数据源 TickDB
数据完整性 需自行处理停牌日、拆股 自动对齐
时间戳精度 依赖数据源质量 毫秒级统一
交易时段对齐 可能混入盘前盘后 严格 9:30-16:00
复权处理 需额外逻辑 自动前复权
空值填充 可能导致 NaN 已清洗

七、结语

回测的第一步,不是写策略代码,而是把数据管道跑通。

本文给出的方案,解决了三个核心工程问题:

  • 并发:多 worker 独立拉取,互不阻塞
  • 限频:识别 3001 错误码,指数退避 + 抖动
  • 断点续传:时间窗口滑动 + 断点元数据,崩溃后可从断点恢复

这三个问题的解决,让"10 年分钟级美股历史数据"从一个工程噩梦,变成一个可配置、可监控、可重复的标准化流程。


下一步行动

如果你需要 10 年美股分钟级历史数据用于策略回测

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY,复制本文代码即可运行
  4. 修改 symbols 列表为你的股票池

如果你需要全量标普 500 成分股数据
批量下载时建议分批次执行(每次 20-30 只),并设置 max_workers=3 以避免限频。完整数据量约 50 万次请求,预估耗时 6-8 小时(并发 3)。

如果你习惯用 AI 辅助开发
在 AI 助手中搜索安装 tickdb-market-data SKILL,可通过自然语言查询 TickDB 的数据能力和接口文档。


风险提示:本文代码中的并发配置基于 TickDB 当前限频策略实测。未来限频规则调整可能导致需降低并发数或增加退避时间。建议在生产环境部署时加入监控告警(日志中 3001 出现频率超过 20% 即需调整配置)。


本文不构成任何投资建议。市场有风险,投资需谨慎。