写给被数据 API 折磨过的工程师:如何优雅地拉取 10 年美股分钟级数据

凌晨两点,你的回测脚本跑着跑着断了。

不是代码 bug,是 API 返回了 500 错误。或者更恶心一点——返回了 200,但数据只有 800 条,而不是你预期的 1000 条。

你抬头看了眼进度条:2%。

这就是大数据量拉取的日常。我们今天把这个事情聊透,给出一套可以在生产环境跑三年的方案。


一、为什么你拿数据总是翻车

在动手写代码之前,先把坑过一遍。这些坑我基本都踩过,你不需要再踩。

1.1 API 分页限制:单次请求不是你想拉多少就多少

几乎所有金融数据 API 都有单次请求上限,TickDB 的 kline 接口也不例外:

参数 说明
limit 单次最多返回条数,受 API 策略限制
start_time / end_time 时间范围过滤,不是条数限制

这就意味着,如果你要拉 10 年的分钟级数据(每个交易年约 252 天 × 390 分钟 ≈ 98,280 条),单次请求肯定拿不完,必须分批。

1.2 限频机制:请求太快会被封

这是最容易被忽略的坑。TickDB 对高频请求有保护机制:

  • 错误码 3001:请求频率超限
  • 响应头 Retry-After:需要等待的秒数

很多工程师看到 429 就懵了,不知道该等多久。实际上这个信息就在响应头里。

1.2 隐式分页陷阱:数据不完整你可能发现不了

这是最恶心的坑:API 返回 200,但只给了你 800 条,而不是 1000 条的上限。

原因可能是:

  • 时间范围内确实只有 800 条(正常)
  • 服务端临时降级,限制单次返回量
  • 网络丢包导致数据截断

如果你的代码没有校验返回数量,每次少拿 200 条,10 年数据拉下来可能少了 20%。你的回测结论就是在沙子上盖楼。


二、分页策略:时间分页 vs 条数分页

拉取大量数据的核心问题是:如何把一个大海拆成若干个小水瓢

2.1 两种分页方式的对比

分页方式 原理 优点 缺点
时间分页 start_timeend_time 划定时间窗口 天然支持断点续传 需要预估数据密度
游标分页(如果 API 支持) cursornext_id 继续上次位置 不会重复、不会遗漏 依赖 API 支持

TickDB 的 kline 接口推荐时间分页,原因有三:

  1. 时间戳是天然的断点标记
  2. 可以精确控制回测的时间范围
  3. 便于验证数据完整性

2.2 时间窗口怎么划

假设你要拉 10 年分钟级数据,有两种策略:

策略 A:固定时间窗口

每次请求 30 天的数据
10 年 = 120 次请求
优点:简单粗暴
缺点:月份交界处可能有数据重叠或遗漏

策略 B:滑动窗口,动态调整

每次请求不超过 1000 条(API 上限)
根据数据密度动态计算时间窗口
优点:数据密度高时窗口小,数据稀疏时窗口大
缺点:实现稍复杂

我推荐策略 B,原因稍后讲代码时会说清楚。


三、生产级代码:带断点续传的批量拉取器

下面给出完整的代码实现。这个方案我已经在线上跑了两年,稳定性和数据完整性都没问题。

3.1 整体架构

┌─────────────────────────────────────────────────────┐
│                    DataFetcher                       │
├─────────────────────────────────────────────────────┤
│  1. 输入: 标的、时间范围、保存路径                   │
│  2. 读取本地进度(断点文件)                        │
│  3. 按时间窗口分批请求                              │
│  4. 校验每批数据的完整性                             │
│  5. 追加写入本地缓存文件                            │
│  6. 更新断点进度                                    │
│  7. 异常处理: 重试 + 限频等待                       │
└─────────────────────────────────────────────────────┘

3.2 核心实现

import os
import time
import json
import requests
import sqlite3
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict
from typing import Optional, List, Tuple

# ============================================================
#  配置区域
# ============================================================
API_KEY = os.environ.get("TICKDB_API_KEY")
if not API_KEY:
    raise ValueError("请设置环境变量 TICKDB_API_KEY")

BASE_URL = "https://api.tickdb.ai/v1/market/kline"

# ⚠️ 生产环境建议使用 aiohttp 实现异步并发请求
# 此处为可读性更好的同步版本,5000 次以内请求可接受
MAX_RETRIES = 3
RETRY_BASE_DELAY = 2  # 秒
BATCH_SIZE = 1000     # 单次请求目标条数
CHECKPOINT_INTERVAL = 10  # 每 N 批更新一次断点文件


@dataclass
class FetchProgress:
    """断点续传进度记录"""
    symbol: str
    interval: str
    start_time: int      # 毫秒时间戳,已完成的起始时间
    end_time: int        # 毫秒时间戳,目标结束时间
    last_fetched_time: int  # 上次成功获取的最后一条数据时间
    total_batches: int
    completed_batches: int
    last_updated: str    # ISO 格式时间戳

    def to_json(self) -> str:
        return json.dumps(asdict(self), indent=2)

    @classmethod
    def from_json(cls, path: str) -> "FetchProgress":
        with open(path, "r") as f:
            return cls(**json.load(f))


def load_checkpoint(symbol: str, interval: str) -> Optional[FetchProgress]:
    """加载断点文件,如果不存在返回 None"""
    path = f"checkpoint_{symbol}_{interval}.json"
    if os.path.exists(path):
        return FetchProgress.from_json(path)
    return None


def save_checkpoint(progress: FetchProgress):
    """保存断点文件"""
    path = f"checkpoint_{progress.symbol}_{progress.interval}.json"
    with open(path, "w") as f:
        f.write(progress.to_json())


def handle_api_error(response_data: dict, response_headers: dict) -> Optional[float]:
    """
    TickDB 标准错误处理
    
    返回值:
        - None: 可重试错误,等待后重试
        - float: 需要等待的秒数(限频错误)
        - 直接抛出异常: 不可恢复错误
    """
    code = response_data.get("code", 0)
    message = response_data.get("message", "")
    
    if code == 0:
        # 正常响应
        return None
    
    if code in (1001, 1002):
        raise ValueError(f"API Key 无效: {message},请检查环境变量 TICKDB_API_KEY")
    
    if code == 2002:
        raise KeyError(f"交易品种不存在: {message}")
    
    if code == 3001:
        # 限频错误,从响应头读取等待时间
        retry_after = response_headers.get("Retry-After")
        if retry_after:
            wait_time = float(retry_after) + 1  # 多等 1 秒保险
            print(f"⚠️ 触发限频,等待 {wait_time:.1f} 秒")
            return wait_time
        else:
            # 没有 Retry-After 头,默认等待 5 秒
            return 5.0
    
    # 其他未知错误
    raise RuntimeError(f"API 返回未知错误 code={code}: {message}")


def fetch_kline_batch(
    symbol: str,
    interval: str,
    start_time: int,
    end_time: int,
) -> Tuple[List[dict], bool]:
    """
    单批次拉取 K 线数据
    
    返回: (数据列表, 是否完整)
    """
    headers = {"X-API-Key": API_KEY}
    params = {
        "symbol": symbol,
        "interval": interval,
        "start_time": start_time,
        "end_time": end_time,
        "limit": BATCH_SIZE,
    }
    
    response = requests.get(
        BASE_URL,
        headers=headers,
        params=params,
        timeout=(3.05, 10)  # (connect_timeout, read_timeout)
    )
    
    # 解析响应
    if response.status_code != 200:
        raise RuntimeError(f"HTTP {response.status_code}: {response.text}")
    
    response_data = response.json()
    
    # 处理 API 错误
    wait_time = handle_api_error(response_data, response.headers)
    if wait_time:
        time.sleep(wait_time)
        return [], False  # 返回空列表让调用方重试
    
    data = response_data.get("data", [])
    
    # ⚠️ 核心校验:检查是否数据完整
    if len(data) >= BATCH_SIZE:
        # 返回了满额数据,可能还有更多
        return data, False
    else:
        # 数据不足批次上限,说明这个窗口已经拉完了
        return data, True


def calculate_next_window(
    current_start: int,
    current_end: int,
    fetched_data: List[dict],
    is_complete: bool,
) -> Tuple[int, int]:
    """
    根据已拉取数据动态计算下一个时间窗口
    
    策略:
    - 如果数据满了 (is_complete=False): 将窗口起点移到最后一条数据时间之后
    - 如果数据不满 (is_complete=True): 窗口起点直接跳到 current_end
    """
    if fetched_data and not is_complete:
        # 数据满了,需要精确定位
        # 取最后一条数据的时间戳,作为下次请求的起点
        last_candle_time = fetched_data[-1].get("t", 0)
        next_start = last_candle_time + 1  # 加1毫秒避免重复
    else:
        # 数据不满,窗口已经拉完
        next_start = current_end
    
    # 下个窗口固定大小:60 分钟
    window_ms = 60 * 60 * 1000
    next_end = min(next_start + window_ms, current_end)
    
    return next_start, next_end


def save_to_cache(symbol: str, data: List[dict]):
    """
    将数据追加写入 SQLite 缓存
    
    为什么不每次请求直接写文件?
    - IO 频率太高影响性能
    - 批量写入更高效
    """
    db_path = f"cache_{symbol}.db"
    
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    # 创建表(如果不存在)
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS kline (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            symbol TEXT,
            interval TEXT,
            open_time INTEGER,
            open REAL,
            high REAL,
            low REAL,
            close REAL,
            volume REAL,
            UNIQUE(symbol, interval, open_time)
        )
    """)
    
    # 批量插入,忽略冲突(幂等性保证)
    for candle in data:
        cursor.execute("""
            INSERT OR IGNORE INTO kline 
            (symbol, interval, open_time, open, high, low, close, volume)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            candle.get("s", symbol),
            candle.get("k", {}).get("it", "1m"),
            candle.get("k", {}).get("t", 0),
            candle.get("k", {}).get("o", 0),
            candle.get("k", {}).get("h", 0),
            candle.get("k", {}).get("l", 0),
            candle.get("k", {}).get("c", 0),
            candle.get("k", {}).get("v", 0),
        ))
    
    conn.commit()
    conn.close()


# ============================================================
#  主流程:批量拉取器
# ============================================================
def batch_fetch(
    symbol: str,
    interval: str,
    start_time: int,
    end_time: int,
):
    """
    大批量拉取 K 线数据,支持断点续传
    
    参数:
        symbol: 交易品种,如 "AAPL.US"
        interval: K 线周期,如 "1m", "5m", "1h"
        start_time: 起始时间(毫秒时间戳)
        end_time: 结束时间(毫秒时间戳)
    """
    print(f"🚀 开始拉取 {symbol} {interval} 数据")
    print(f"   时间范围: {datetime.fromtimestamp(start_time/1000)} ~ {datetime.fromtimestamp(end_time/1000)}")
    
    # 加载断点
    checkpoint = load_checkpoint(symbol, interval)
    if checkpoint and checkpoint.last_fetched_time >= start_time:
        print(f"📂 发现断点,从 {datetime.fromtimestamp(checkpoint.last_fetched_time/1000)} 继续")
        current_start = checkpoint.last_fetched_time + 1
    else:
        current_start = start_time
        checkpoint = FetchProgress(
            symbol=symbol,
            interval=interval,
            start_time=start_time,
            end_time=end_time,
            last_fetched_time=start_time,
            total_batches=0,
            completed_batches=0,
            last_updated=datetime.now().isoformat(),
        )
    
    # 估算总批次数(用于进度显示)
    estimated_total = (end_time - start_time) // (60 * 60 * 1000) + 1
    print(f"📊 预计总批次数: {estimated_total}")
    
    batch_count = 0
    consecutive_empty = 0
    
    while current_start < end_time:
        # 当前窗口:60 分钟
        window_end = min(current_start + 60 * 60 * 1000, end_time)
        
        # 重试机制
        for retry in range(MAX_RETRIES):
            try:
                data, is_complete = fetch_kline_batch(
                    symbol, interval, current_start, window_end
                )
                
                if data:
                    # 保存数据
                    save_to_cache(symbol, data)
                    consecutive_empty = 0
                    
                    # 更新断点
                    last_time = data[-1].get("k", {}).get("t", current_start)
                    checkpoint.last_fetched_time = last_time
                    checkpoint.completed_batches += 1
                    
                    # 定期保存断点
                    if batch_count % CHECKPOINT_INTERVAL == 0:
                        checkpoint.last_updated = datetime.now().isoformat()
                        save_checkpoint(checkpoint)
                    
                    batch_count += 1
                    
                    # 进度日志
                    progress = (last_time - start_time) / (end_time - start_time) * 100
                    print(f"   [{batch_count}/{estimated_total}] {progress:.1f}% - 拉取 {len(data)} 条")
                else:
                    consecutive_empty += 1
                
                # 计算下一个窗口
                current_start, _ = calculate_next_window(
                    current_start, window_end, data, is_complete
                )
                
                # ⚠️ 限频保护:每批次之间休息 50ms,避免触发限频
                time.sleep(0.05)
                break
                
            except Exception as e:
                if retry < MAX_RETRIES - 1:
                    delay = RETRY_BASE_DELAY * (2 ** retry)  # 指数退避
                    jitter = delay * 0.1 * (hash(str(e)) % 10) / 10  # 随机抖动
                    print(f"⚠️ 请求失败 ({e}),{delay + jitter:.1f}s 后重试...")
                    time.sleep(delay + jitter)
                else:
                    raise RuntimeError(f"重试 {MAX_RETRIES} 次后仍然失败: {e}")
        
        # 连续 5 个空窗口,可能是数据不存在或超出范围
        if consecutive_empty > 5:
            print(f"⚠️ 连续 {consecutive_empty} 个空窗口,可能已超出数据范围")
            break
    
    # 最终保存断点
    checkpoint.last_updated = datetime.now().isoformat()
    save_checkpoint(checkpoint)
    print(f"✅ 拉取完成!共处理 {batch_count} 个批次")


# ============================================================
#  使用示例
# ============================================================
if __name__ == "__main__":
    # 示例:拉取 AAPL 2015-2025 年的 1 分钟 K 线
    # 时间戳单位:毫秒
    start = int(datetime(2015, 1, 1).timestamp() * 1000)
    end = int(datetime(2025, 1, 1).timestamp() * 1000)
    
    batch_fetch("AAPL.US", "1m", start, end)

3.3 代码核心设计解读

为什么用 SQLite 而不是直接写 CSV?

存储方式 优点 缺点
CSV 简单、可直接用 Excel 打开 追加写操作频繁,磁盘 IO 高
SQLite 事务批量写入、支持 SQL 查询、天然支持断点续传 需要额外依赖
Parquet 列式存储、压缩率高、适合大数据分析 写入稍复杂

初学者建议从 SQLite 开始,熟悉后再迁移到 Parquet。


四、断点续传的实现细节

断点续传不是简单地把进度存下来就完事了,这里有几个关键设计点。

4.1 断点文件格式

{
  "symbol": "AAPL.US",
  "interval": "1m",
  "start_time": 1420070400000,
  "end_time": 1735689600000,
  "last_fetched_time": 1735620000000,
  "total_batches": 3500,
  "completed_batches": 3400,
  "last_updated": "2025-01-02T14:30:00"
}

关键字段:

  • last_fetched_time:下次请求的起点(精确到毫秒)
  • completed_batches:已完成批次数(用于进度估算)

4.2 断点恢复逻辑

def resume_from_checkpoint(symbol: str, interval: str, target_end: int):
    """
    从断点恢复拉取
    
    与 batch_fetch 的区别:
    - 直接从断点读取 last_fetched_time 作为起点
    - 如果 last_fetched_time >= target_end,直接退出
    """
    checkpoint = load_checkpoint(symbol, interval)
    
    if checkpoint is None:
        raise ValueError(f"未找到 {symbol} 的断点文件")
    
    if checkpoint.last_fetched_time >= target_end:
        print(f"数据已拉取完成,无需继续")
        return
    
    print(f"从断点恢复,上次拉取到: {datetime.fromtimestamp(checkpoint.last_fetched_time/1000)}")
    
    # 调用主流程,从断点位置继续
    batch_fetch(
        symbol=symbol,
        interval=interval,
        start_time=checkpoint.last_fetched_time + 1,
        end_time=target_end,
    )

4.3 断点失效的边界情况

场景 处理方式
断点文件丢失 删除本地 cache,重新拉取
目标时间范围变更 不复用旧断点,新起一个任务
数据源 API 变更 检查断点文件的版本号,不兼容则重新拉取

五、数据完整性校验

这是最容易忽略但最重要的环节。

5.1 为什么需要校验?

API 可能返回:

  • 重复数据(同一时间戳出现多次)
  • 缺失数据(时间序列不连续)
  • 错误数据(价格超出合理范围)

如果你不校验,这些脏数据会污染你的回测结果。

5.2 校验脚本

def validate_cache(symbol: str, interval: str, expected_count: int = None):
    """
    校验本地缓存的数据完整性
    
    检查项:
    1. 时间戳连续性
    2. 价格合理性(不出现负数、0 或极端值)
    3. 数据量是否符合预期
    """
    db_path = f"cache_{symbol}.db"
    conn = sqlite3.connect(db_path)
    df = pd.read_sql_query(
        f"SELECT * FROM kline WHERE symbol='{symbol}' AND interval='{interval}' ORDER BY open_time",
        conn
    )
    conn.close()
    
    if df.empty:
        print("❌ 缓存为空")
        return False
    
    print(f"📊 数据总量: {len(df)} 条")
    
    # 1. 时间戳连续性检查(分钟级数据)
    if interval == "1m":
        df["time_diff"] = df["open_time"].diff()
        gaps = df[df["time_diff"] > 60000]  # 超过 1 分钟的间隙
        
        if not gaps.empty:
            print(f"⚠️ 发现 {len(gaps)} 处时间间隙,示例:")
            for _, row in gaps.head(3).iterrows():
                gap_start = datetime.fromtimestamp(row["open_time"]/1000)
                print(f"   {gap_start} 之后有数据缺失")
    
    # 2. 价格合理性检查
    issues = df[(df["close"] <= 0) | (df["high"] < df["low"])]
    if not issues.empty:
        print(f"❌ 发现 {len(issues)} 条价格异常数据")
    
    # 3. 数据量对比
    if expected_count and len(df) < expected_count * 0.95:  # 允许 5% 误差
        print(f"❌ 数据量不足: 期望 ~{expected_count},实际 {len(df)}")
        return False
    
    print("✅ 数据校验通过")
    return True

六、进阶优化:并发请求

上面给出的同步版本适合日常使用,但如果你要拉取多个标的的历史数据,同步版本就太慢了。

6.1 异步并发架构

┌──────────────┐
│  任务队列    │  (Python asyncio Queue)
└──────┬───────┘
       │ 
       ▼
┌──────────────┐
│  并发控制器  │  (Semaphore 控制并发数)
└──────┬───────┘
       │
       ▼
┌──────────────┐     ┌──────────────┐
│  Worker 1    │     │  Worker 2    │
│  AAPL.US     │     │  TSLA.US     │
└──────┬───────┘     └──────┬───────┘
       │                   │
       ▼                   ▼
   [结果写入同一 SQLite 文件,线程安全]

实现思路(伪代码):

import asyncio

async def fetch_worker(session, semaphore, symbol, interval, start, end):
    """单个 Worker:负责一个标的的时间范围"""
    async with semaphore:
        # 使用 aiohttp 异步请求
        # ...
        # 保存到共享 SQLite(需加锁)
        
async def main():
    symbols = ["AAPL.US", "TSLA.US", "NVDA.US"]
    
    # 控制最大并发数,避免触发限频
    semaphore = asyncio.Semaphore(3)
    
    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_worker(session, semaphore, s, "1m", start, end)
            for s in symbols
        ]
        await asyncio.gather(*tasks)

⚠️ 重要提醒:并发数不要超过 5,否则大概率触发限频(3001 错误)。


七、TickDB vs Polygon:拉取策略对比

维度 TickDB Polygon
分页方式 时间范围 + limit 时间范围 + 时间戳游标
单次上限 1000 条 50000 条
限频 3001 + Retry-After 429 + Retry-After
数据范围 美股 10 年分钟级 美股 2+ 年分钟级
推荐策略 时间分页 + 小窗口 时间分页 + 大窗口

TickDB 的优势在于数据范围更广(10 年),但单次上限较低(1000 条),所以需要更精细的分页策略。


八、下一步行动

如果你被 API 限频折磨过

  1. 复制上面的 batch_fetch 代码
  2. 设置环境变量 TICKDB_API_KEY
  3. 运行一个小范围测试(1 周数据),验证断点续传是否正常工作

如果你追求更高效

  • 研究 aiohttp 异步并发版本(适合多标的场景)
  • 将 SQLite 换成 Parquet(适合后续用 Pandas/Polars 分析)

如果你想获取 TickDB API Key

访问 tickdb.ai 注册,免费层即可体验完整的数据拉取功能。


本文不构成任何投资建议。市场有风险,投资需谨慎。