历史数据批量拉取:如何高效获取 10 年分钟级美股数据

当 100 万根 K 线等待被装进你的硬盘

凌晨两点,你盯着屏幕上的进度条——3%。已经跑了 14 个小时。

这不是虚构场景。任何一个认真做过美股分钟级回测的量化开发者,都大概率经历过这个时刻:选定一个 10 年回测窗口,跑一个 Python 脚本满怀期待地点下回车,然后发现数据在以一种令人绝望的速度缓缓流入。

100 万根 K 线。这是苹果公司(AAPL.US)10 年 1 分钟 K 线的大致数量。按照 TickDB /kline 接口单次最多返回 1000 条数据的限制,你需要调用 1000 次 API 才能完成单只股票的全量拉取。如果你要做 50 只股票的组合回测,那就是 5 万次请求。

这个数字背后埋着几个几乎每个数据工程师都会踩的坑:Pagination 的方向陷阱、分页参数设置导致的低效、限频处理不当触发的临时封禁、程序中断后的数据丢失、以及缺乏断点机制导致的重复拉取。

本文拆解这五个坑的根因,并给出生产级解决方案。代码基于 Python,均可直接运行。所有示例以 TickDB /v1/market/kline 接口为核心,策略同样适用于 Polygon 等其他支持分页的数据源。


一、数据规模的量化认知

在动手写代码之前,先建立一个准确的数量级感知。这不是多余——很多工程决策的失败,根源在于对数据规模的低估。

美股 1 分钟 K 线数据量估算:

时间维度 交易日数量 每天分钟数 总 K 线数
1 年 252 390 98,280
5 年 1,260 390 491,400
10 年 2,520 390 982,800
20 年 5,040 390 1,965,600

这里"每天 390 分钟"的假设是:美股交易日为 09:30–16:00 ET(美国东部时间),共 390 分钟,即 390 根 1 分钟 K 线。加上盘前盘后数据,TickDB 的 K 线数据会扩展到更长的时间段,但量级在同一水平。

单次 API 调用的数据量:

TickDB /v1/market/kline 单次请求支持 limit 参数,最大值为 1000。假设每次请求耗时 150ms(包含网络延迟和服务器处理),1000 次调用的理论最短耗时:

1000 × 150ms = 150 秒 ≈ 2.5 分钟(纯串行)

这个数字看起来不坏。但实际生产中,由于网络抖动、限频等待(code:3001)、服务端响应波动,单次请求的实际 P99 延迟可能达到 500ms–2s。50 只股票串行拉取的真实耗时可能达到 6–25 小时

这就是为什么需要:正确的分页策略、并发请求、以及健壮的断点续传机制。


二、Pagination 的方向陷阱

这是数据拉取中最容易被忽视、也最致命的坑。

2.1 分页的基本逻辑

几乎所有支持历史数据查询的 REST API 都面临同一个问题:数据集太大,无法一次返回,必须分页。分页有两种方向:

  • 正向分页:从数据起点向后翻页(Page 1 → Page 2 → Page 3)
  • 反向分页:从最新数据向前翻页(最新 → 较新 → 较旧)

TickDB /v1/market/kline 接口的默认行为是反向分页,即默认返回最新数据在前,较旧数据在后,并支持通过 limit 控制每页大小。

2.2 方向陷阱的根因

假设你需要获取 AAPL.US 从 2014-01-01 到 2024-12-31 的全部 1 分钟 K 线。最直觉的做法是:

# ❌ 错误直觉:先请求老数据
params = {
    "symbol": "AAPL.US",
    "interval": "1m",
    "start_time": "2014-01-01",
    "end_time": "2024-12-31",
    "limit": 1000
}

但由于 API 默认返回反向排序,这意味着你的第一页返回的是 2024-12-31 附近的数据——恰好是你最不需要的最新数据,而你真正想要的 2014 年数据被推到了第 982 页之后。

更关键的问题是:如果你的 start_timeend_time 设置不正确,或者你用 end_id/cursor 做游标翻页,方向判断错误会导致数据丢失或重复。

2.3 正确的分页方向策略

对于有明确时间范围的拉取任务,推荐策略是:从最旧的时间点出发,正向翻页到最新时间点。这与 API 的默认顺序相反,因此需要通过参数控制:

# TickDB kline API 按时间正向拉取的示意
# API 默认返回时间倒序,所以需要:
# 方法一:使用 start_time 设置查询起点,让服务器从该点向后返回
params = {
    "symbol": "AAPL.US",
    "interval": "1m",
    "start_time": "2014-01-01 00:00:00",  # 从最旧的时间开始
    "limit": 1000,
    "sort": "asc"  # 如果接口支持,按时间升序排列
}

注意:TickDB 的 /v1/market/kline 接口在按时间范围查询时,返回数据的排序由服务端决定,开发者无法通过参数强制指定。关键是在代码层面保持对数据时间戳的持续监测,确保游标始终向正确方向推进。

2.4 分页参数的正确设置

# ✅ 正确的分页查询参数组合
params = {
    "symbol": "AAPL.US",
    "interval": "1m",
    "start_time": start_ts,      # 毫秒时间戳或 ISO 格式
    "end_time": end_ts,          # 毫秒时间戳或 ISO 格式
    "limit": 1000               # 最大 1000,不要贪心设更大
}

关于 limit 参数的设置建议:

limit 值 优势 劣势
1000(最大) 减少请求次数 单次响应变大,网络波动时重试成本高
500 平衡之选
100 每次响应快,重试代价低 请求次数翻 10 倍,触发限频风险增加

建议优先使用 1000,若程序稳定性不足(网络波动大、多次超时),降至 500 或更低。


三、限频处理:绕过 3001 而非被它拦住

3.1 理解限频机制

TickDB 的限频错误码 3001 表示"请求频率超限"。这不是一个可以忽略的 HTTP 429——它有标准化的处理流程:

# 检测限频错误的标准模式
if response.status_code == 200:
    data = response.json()
    if data.get("code") == 3001:
        retry_after = int(response.headers.get("Retry-After", 5))
        time.sleep(retry_after)
        continue  # 重试当前请求

常见的错误处理方式:

错误方式 问题
捕获 3001 后 sleep(1) Retry-After 可能被设为 10s,1s 后立即重试会再次触发
捕获 3001 后无限等待 程序可能永远卡住
不检测 3001,直接抛异常 程序中断,但数据拉取不完整
请求失败后立即重试 网络抖动期间可能连续失败,浪费配额

3.2 退避策略的正确实现

正确的做法是指数退避 + 抖动,这是分布式系统中最标准的重试策略:

import random
import time
import requests
from requests.exceptions import RequestException

def fetch_with_backoff(url: str, headers: dict, params: dict, max_retries: int = 5) -> dict:
    """
    带指数退避和抖动的 API 请求。
    
    退避策略:
    - base_delay: 初始等待时间(秒)
    - exponential_base: 退避指数(每次重试等待时间翻倍)
    - max_delay: 最大等待时间上限(秒)
    - jitter: 随机抖动,避免惊群效应
    
    例如:base=2, exp=2, max=60
    → 重试 1: 2s (±20%)
    → 重试 2: 4s (±20%)
    → 重试 3: 8s (±20%)
    → 重试 4: 16s (±20%)
    → 重试 5: 32s (±20%)
    """
    base_delay = 2
    exponential_base = 2
    max_delay = 60
    
    for attempt in range(max_retries):
        try:
            response = requests.get(
                url,
                headers=headers,
                params=params,
                timeout=(3.05, 10)  # (connect_timeout, read_timeout)
            )
            
            if response.status_code == 200:
                result = response.json()
                code = result.get("code", 0)
                
                if code == 0:
                    return result.get("data", [])
                
                # 限频处理:读取 Retry-After 而非硬编码等待时间
                if code == 3001:
                    retry_after = int(response.headers.get("Retry-After", base_delay))
                    print(f"[限频] 等待 {retry_after} 秒后重试 (尝试 {attempt + 1}/{max_retries})")
                    time.sleep(retry_after)
                    continue
                
                # 其他业务错误码:记录并返回空列表,让上层决定如何处理
                print(f"[错误] code={code}, message={result.get('message', 'unknown')}")
                return []
            
            # HTTP 层面错误(非 200)
            print(f"[HTTP 错误] status={response.status_code}, 重试中...")
            
        except RequestException as e:
            print(f"[网络异常] {type(e).__name__}: {e}, 重试中...")
        
        # 指数退避 + 抖动
        if attempt < max_retries - 1:
            delay = min(base_delay * (exponential_base ** attempt), max_delay)
            jitter = random.uniform(0, delay * 0.1)  # ±10% 抖动
            sleep_time = delay + jitter
            print(f"[等待] {sleep_time:.2f} 秒后重试 (尝试 {attempt + 1}/{max_retries})")
            time.sleep(sleep_time)
    
    raise RuntimeError(f"达到最大重试次数 ({max_retries}),请求失败")

工程预警注释:上述串行重试逻辑适用于后台批处理任务。对于实时数据展示场景,应使用 aiohttp + asyncio 异步架构,将并发请求数控制在合理范围内(建议 5-10 个并发),避免触发更严格的限频策略。


四、断点续传:让程序从中断处继续

4.1 为什么需要断点续传

在没有断点机制的情况下,以下任何一种情况都会让你前功尽弃:

  • 断电或系统休眠
  • 网络中断超过重试上限
  • 限频等待时间过长,程序被手动终止
  • 意外按下了 Ctrl+C
  • AWS/GCP 实例被回收

一个典型的灾难场景:50 只股票,每只 1000 次 API 调用,跑了 3 万次后网络中断。你有两个选择:全部重来(再花 3 小时),或者接受数据不完整(回测结果可疑)。没有断点续传,这道选择题没有好答案。

4.2 断点续传的核心设计

断点信息的存储应该满足以下条件:

条件 原因
每条记录时间戳作为游标 不依赖 API 的 idcursor,避免接口差异
持久化到本地文件 内存中的进度在程序崩溃后丢失
原子写入 防止写入一半时程序中断,导致损坏
可回退 支持从指定时间点重新拉取,覆盖已有数据
import json
import os
import time
from pathlib import Path
from datetime import datetime
from typing import Optional

class CheckpointManager:
    """
    断点续传管理器。
    
    存储结构:每个 symbol 一个 checkpoint 文件
    文件路径:./checkpoints/{symbol}.json
    存储内容:latest_timestamp(已拉取数据的最新一条时间戳)
    
    逻辑:
    - 程序启动时读取 checkpoint,确定起始时间
    - 每拉取一页数据后,更新 checkpoint
    - checkpoint 使用临时文件 + rename 保证原子性
    """
    
    def __init__(self, checkpoint_dir: str = "./checkpoints"):
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
    
    def _get_checkpoint_path(self, symbol: str) -> Path:
        safe_name = symbol.replace("/", "_").replace(".", "_")
        return self.checkpoint_dir / f"{safe_name}.json"
    
    def get_latest_timestamp(self, symbol: str) -> Optional[int]:
        """获取当前 symbol 的断点时间戳(毫秒),返回 None 表示从头开始"""
        path = self._get_checkpoint_path(symbol)
        if not path.exists():
            return None
        
        try:
            with open(path, "r", encoding="utf-8") as f:
                data = json.load(f)
                ts = data.get("latest_timestamp")
                print(f"[Checkpoint] {symbol}: 从 {self._format_ts(ts)} 继续")
                return ts
        except (json.JSONDecodeError, IOError) as e:
            print(f"[Checkpoint] 读取 {symbol} 失败 ({e}),从头开始")
            return None
    
    def save_checkpoint(self, symbol: str, latest_timestamp: int) -> None:
        """原子写入 checkpoint:先写临时文件,再 rename"""
        path = self._get_checkpoint_path(symbol)
        temp_path = path.with_suffix(".tmp")
        
        checkpoint_data = {
            "symbol": symbol,
            "latest_timestamp": latest_timestamp,
            "updated_at": datetime.now().isoformat(),
        }
        
        try:
            with open(temp_path, "w", encoding="utf-8") as f:
                json.dump(checkpoint_data, f, ensure_ascii=False, indent=2)
            temp_path.rename(path)  # 原子操作,避免文件损坏
        except IOError as e:
            print(f"[Checkpoint] 保存 {symbol} 失败: {e}")
    
    def _format_ts(self, ts: Optional[int]) -> str:
        if ts is None:
            return "起点"
        return datetime.fromtimestamp(ts / 1000).strftime("%Y-%m-%d %H:%M:%S")

4.3 断点续传在主流程中的集成

def fetch_klines_batch(
    symbol: str,
    start_ts: int,
    end_ts: int,
    interval: str = "1m",
    checkpoint_manager: Optional[CheckpointManager] = None,
    batch_size: int = 1000
) -> list:
    """
    批量拉取 K 线数据,支持断点续传。
    
    流程:
    1. 读取 checkpoint,确定起始时间(默认从头开始)
    2. 循环翻页,每次请求后更新 checkpoint
    3. 遇到限频,遵守退避策略后重试
    4. 正常完成后返回全部数据
    """
    all_data = []
    current_start = checkpoint_manager.get_latest_timestamp(symbol) or start_ts
    has_more = True
    
    while has_more:
        params = {
            "symbol": symbol,
            "interval": interval,
            "start_time": current_start,
            "end_time": end_ts,
            "limit": batch_size,
        }
        
        data = fetch_with_backoff(API_BASE + "/v1/market/kline", headers, params)
        
        if not data:
            print(f"[{symbol}] 未获取到数据,可能已到达终点或遇到错误")
            break
        
        # 保存当前批次最后一条数据的时间戳作为断点
        last_timestamp = data[-1].get("ts") or data[-1].get("timestamp")
        if last_timestamp and checkpoint_manager:
            checkpoint_manager.save_checkpoint(symbol, last_timestamp)
        
        all_data.extend(data)
        current_start = last_timestamp + 1  # 下一批从本批最后一条之后开始
        
        print(f"[{symbol}] 已拉取 {len(all_data)} 条,"
              f"当前进度: {datetime.fromtimestamp(current_start/1000).strftime('%Y-%m-%d')} "
              f"({len(all_data)}/{estimated_total})")
        
        # 判定是否已拉取完毕:如果返回数据少于 batch_size,说明已经到底
        if len(data) < batch_size:
            has_more = False
        else:
            # 安全间隔:TickDB 建议在连续快速请求间增加短暂暂停
            time.sleep(0.05)
    
    return all_data

4.4 断点回退与覆盖

有时你可能需要覆盖已拉取的数据(例如 API 历史数据发生修正):

# 强制从指定时间重新拉取(覆盖现有 checkpoint)
def reset_checkpoint(symbol: str, checkpoint_manager: CheckpointManager) -> None:
    path = checkpoint_manager._get_checkpoint_path(symbol)
    if path.exists():
        path.unlink()
        print(f"[{symbol}] checkpoint 已重置,下次运行将从起点开始")

# 使用场景:已知某时间段数据需要重新拉取
reset_checkpoint("AAPL.US", checkpoint_manager)
fetch_klines_batch("AAPL.US", start_ts=1577836800000, end_ts=1704067200000)

五、生产级架构:并发拉取多标的

5.1 串行 vs 并发的性能对比

回到最初的问题:50 只股票,每只 1000 次请求,串行拉取的实际耗时可能在 6–25 小时。这个时间成本在生产环境中通常是不可接受的。

合理的并发策略可以将耗时压缩到1–4 小时(取决于 API 限频策略和带宽)。

但并发不是简单的 ThreadPoolExecutor(max_workers=50)——这会瞬间触发 API 的限频机制,导致大量请求失败。以下是分层的并发架构:

5.2 分层并发架构

┌─────────────────────────────────────────────────────┐
│                   主调度器 (Main)                     │
│  1. 读取 symbol 列表                                 │
│  2. 读取各 symbol 的 checkpoint                       │
│  3. 分发任务到 worker pool                            │
└──────────────────────┬──────────────────────────────┘
                       │
        ┌──────────────▼──────────────┐
        │   Worker Pool A: 限频组 A   │
        │   max_workers = 5           │
        │   负责 symbol: AAPL, MSFT…  │
        └──────────────┬──────────────┘
                       │
        ┌──────────────▼──────────────┐
        │   Worker Pool B: 限频组 B   │
        │   max_workers = 5           │
        │   负责 symbol: GOOGL, META… │
        └─────────────────────────────┘

每个 Worker 内部:串行翻页(防止单标的触发放大)
不同 Worker 之间:5 并发(防止聚合请求触发限频)
import concurrent.futures
from threading import Semaphore
from dataclasses import dataclass
from typing import List

@dataclass
class FetchTask:
    symbol: str
    start_ts: int
    end_ts: int
    interval: str = "1m"

@dataclass
class FetchResult:
    symbol: str
    total_records: int
    status: str  # "success" / "partial" / "failed"
    error_message: str = ""

class ParallelFetcher:
    """
    并发拉取管理器。
    
    核心策略:
    - 全局并发限制(Semaphore):控制整体请求速率
    - 单 symbol 内串行翻页:保证数据完整性
    - 多 symbol 间并发:充分利用带宽
    
    参数说明:
    - max_concurrent_symbols: 同时拉取的 symbol 数量(建议 3-8)
      设置过低:效率不足;设置过高:触发限频
    - per_request_delay: 每次请求间的间隔(秒,建议 0.02-0.1)
      配合限频策略使用,降低触发 3001 的概率
    """
    
    def __init__(
        self,
        api_key: str,
        max_concurrent_symbols: int = 5,
        per_request_delay: float = 0.05,
    ):
        self.api_key = api_key
        self.max_concurrent = max_concurrent_symbols
        self.per_request_delay = per_request_delay
        self.semaphore = Semaphore(max_concurrent_symbols)
        self.checkpoint_manager = CheckpointManager()
        
        self.headers = {
            "X-API-Key": self.api_key,
            "Content-Type": "application/json",
        }
    
    def fetch_single_symbol(self, task: FetchTask) -> FetchResult:
        """在 semaphore 控制下拉取单个 symbol 的全量数据"""
        with self.semaphore:  # 控制并发数量
            try:
                print(f"[开始] {task.symbol}")
                
                data = fetch_klines_batch(
                    symbol=task.symbol,
                    start_ts=task.start_ts,
                    end_ts=task.end_ts,
                    interval=task.interval,
                    checkpoint_manager=self.checkpoint_manager,
                )
                
                print(f"[完成] {task.symbol}: 获取 {len(data)} 条记录")
                return FetchResult(
                    symbol=task.symbol,
                    total_records=len(data),
                    status="success",
                )
                
            except Exception as e:
                print(f"[失败] {task.symbol}: {e}")
                return FetchResult(
                    symbol=task.symbol,
                    total_records=0,
                    status="failed",
                    error_message=str(e),
                )
    
    def fetch_multiple(self, tasks: List[FetchTask]) -> List[FetchResult]:
        """并发拉取多个 symbol"""
        results = []
        
        with concurrent.futures.ThreadPoolExecutor(
            max_workers=self.max_concurrent
        ) as executor:
            futures = {
                executor.submit(self.fetch_single_symbol, task): task
                for task in tasks
            }
            
            for future in concurrent.futures.as_completed(futures):
                task = futures[future]
                try:
                    result = future.result()
                    results.append(result)
                except Exception as e:
                    results.append(FetchResult(
                        symbol=task.symbol,
                        total_records=0,
                        status="failed",
                        error_message=str(e),
                    ))
        
        return results
    
    def run(self, symbols: List[str], start_ts: int, end_ts: int):
        """构建任务并执行"""
        tasks = [
            FetchTask(symbol=s, start_ts=start_ts, end_ts=end_ts)
            for s in symbols
        ]
        
        results = self.fetch_multiple(tasks)
        
        # 输出汇总报告
        success = sum(1 for r in results if r.status == "success")
        failed = sum(1 for r in results if r.status == "failed")
        total_records = sum(r.total_records for r in results)
        
        print("\n" + "=" * 50)
        print(f"拉取完成:{success} 成功 / {failed} 失败")
        print(f"总记录数:{total_records:,}")
        print("=" * 50)
        
        return results

5.3 启动脚本示例

if __name__ == "__main__":
    import os
    from datetime import datetime
    
    # 时间范围:2014-01-01 至 2024-12-31
    start_ts = int(datetime(2014, 1, 1).timestamp() * 1000)
    end_ts = int(datetime(2024, 12, 31, 23, 59, 59).timestamp() * 1000)
    
    api_key = os.environ.get("TICKDB_API_KEY")
    if not api_key:
        raise ValueError("请设置环境变量 TICKDB_API_KEY")
    
    # 目标股票列表
    symbols = [
        "AAPL.US", "MSFT.US", "GOOGL.US", "AMZN.US", "NVDA.US",
        "META.US", "TSLA.US", "BRK.B.US", "JPM.US", "JNJ.US",
    ]
    
    fetcher = ParallelFetcher(
        api_key=api_key,
        max_concurrent_symbols=5,    # 同时拉取 5 只股票
        per_request_delay=0.05,      # 每次请求间隔 50ms
    )
    
    fetcher.run(symbols, start_ts, end_ts)

六、本地缓存:从拉取到存储

数据拉取完成后,存储格式的选择直接影响后续回测的效率。

6.1 存储格式对比

格式 读取速度 存储体积 追加写入 适用场景
CSV 容易 小数据量、临时分析
Parquet 小(列式压缩) 需重写 大数据量回测、定期分析
HDF5 支持(但有限制) 量化研究、快速迭代
SQLite 支持 单机场景、简单查询
Feather 需重写 Python↔R 数据交换

6.2 推荐方案:Parquet + 分 symbol 存储

import pandas as pd
from pathlib import Path

def save_to_parquet(data: list, symbol: str, output_dir: str = "./data"):
    """
    将 K 线数据保存为 Parquet 格式。
    
    分 symbol 存储的好处:
    1. 并行回测时可以多进程分别读取不同标的
    2. 增量更新时只需重写单个文件
    3. 数据量预估更容易(单标的 100 万条 ≈ 50-80MB Parquet)
    """
    if not data:
        print(f"[{symbol}] 无数据,跳过存储")
        return
    
    df = pd.DataFrame(data)
    
    # 标准化列名(不同接口可能返回不同字段名)
    # TickDB kline 返回的字段通常包括:ts, open, high, low, close, volume
    df.rename(columns={
        "ts": "timestamp",
        "o": "open",
        "h": "high",
        "l": "low",
        "c": "close",
        "v": "volume",
    }, inplace=True)
    
    # 转换时间戳为 datetime(方便后续按时间筛选)
    df["datetime"] = pd.to_datetime(df["timestamp"], unit="ms")
    
    # 按时间排序(确保顺序正确)
    df.sort_values("timestamp", inplace=True)
    
    output_path = Path(output_dir)
    output_path.mkdir(parents=True, exist_ok=True)
    file_path = output_path / f"{symbol.replace('/', '_')}.parquet"
    
    df.to_parquet(file_path, index=False, engine="pyarrow")
    
    size_mb = file_path.stat().st_size / (1024 * 1024)
    print(f"[{symbol}] 已保存: {file_path.name} ({size_mb:.2f} MB, {len(df):,} 条)")

6.3 存储进度追踪与完整性校验

def verify_data_integrity(symbol: str, start_ts: int, end_ts: int, output_dir: str = "./data"):
    """
    校验已存储数据的完整性。
    
    检查项:
    1. 数据时间范围是否覆盖目标区间
    2. 是否存在明显的时间戳跳跃(数据缺失)
    3. 数据量是否符合预期
    
    返回:(is_complete: bool, issues: list)
    """
    safe_name = symbol.replace("/", "_")
    file_path = Path(output_dir) / f"{safe_name}.parquet"
    
    if not file_path.exists():
        return False, ["数据文件不存在"]
    
    df = pd.read_parquet(file_path)
    
    issues = []
    actual_start = df["timestamp"].min()
    actual_end = df["timestamp"].max()
    
    # 检查起始时间
    if actual_start > start_ts:
        issues.append(f"数据起点 {actual_start} 晚于目标起点 {start_ts}")
    
    # 检查结束时间(允许 ±1 天的误差,考虑到交易日边界)
    if actual_end < end_ts - 86400000:
        issues.append(f"数据终点 {actual_end} 早于目标终点 {end_ts}")
    
    # 检查时间戳连续性(1 分钟 K 线,间隔应为 60000ms)
    time_diffs = df["timestamp"].diff()
    anomalies = time_diffs[(time_diffs > 0) & (time_diffs != 60000)]
    if len(anomalies) > 0:
        issues.append(f"存在 {len(anomalies)} 处时间戳异常(间隔非 1 分钟)")
    
    is_complete = len(issues) == 0
    return is_complete, issues

七、完整流程演示

以下是一个端到端的执行脚本,整合了所有模块:

#!/usr/bin/env python3
"""
TickDB 历史数据批量拉取工具
支持:断点续传、并发拉取、本地存储、完整性校验
"""

import os
import sys
import time
import json
import random
import requests
import pandas as pd
from pathlib import Path
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Semaphore
from dataclasses import dataclass
from typing import Optional, List, Tuple

# ============== 配置 ==============
API_BASE = "https://api.tickdb.ai"
SYMBOLS = ["AAPL.US", "MSFT.US", "GOOGL.US", "NVDA.US", "META.US"]
INTERVAL = "1m"
START_DATE = "2014-01-01"
END_DATE = "2024-12-31"
OUTPUT_DIR = "./tickdb_data"
CHECKPOINT_DIR = "./checkpoints"
MAX_CONCURRENT = 5           # 同时拉取的标的数量
PER_REQUEST_DELAY = 0.05     # 每次请求间隔(秒)
MAX_RETRIES = 5
BATCH_SIZE = 1000            # 每页数据量

# ============== 工具类 ==============

class CheckpointManager:
    def __init__(self, checkpoint_dir: str):
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
    
    def _path(self, symbol: str) -> Path:
        safe = symbol.replace("/", "_").replace(".", "_")
        return self.checkpoint_dir / f"{safe}.json"
    
    def get(self, symbol: str) -> Optional[int]:
        p = self._path(symbol)
        if not p.exists():
            return None
        try:
            with open(p, "r") as f:
                return json.load(f).get("latest_timestamp")
        except (json.JSONDecodeError, IOError):
            return None
    
    def set(self, symbol: str, timestamp: int) -> None:
        p = self._path(symbol)
        tmp = p.with_suffix(".tmp")
        with open(tmp, "w") as f:
            json.dump({"latest_timestamp": timestamp, "updated_at": datetime.now().isoformat()}, f)
        tmp.rename(p)
    
    def reset(self, symbol: str) -> None:
        p = self._path(symbol)
        if p.exists():
            p.unlink()

# ============== 核心请求逻辑 ==============

def fetch_page(url: str, headers: dict, params: dict) -> Tuple[Optional[list], Optional[int]]:
    """
    单次请求,返回 (data, retry_after_ms)
    retry_after_ms 非 None 表示触发限频,需要等待
    """
    try:
        resp = requests.get(url, headers=headers, params=params, timeout=(3.05, 10))
        if resp.status_code == 200:
            result = resp.json()
            code = result.get("code", 0)
            if code == 0:
                return result.get("data", []), None
            if code == 3001:
                retry_after = int(resp.headers.get("Retry-After", 2)) * 1000
                return None, retry_after
            print(f"[错误] code={code}, msg={result.get('message')}")
            return None, None
        print(f"[HTTP] status={resp.status_code}")
    except requests.exceptions.RequestException as e:
        print(f"[网络] {type(e).__name__}: {e}")
    return None, None


def fetch_klines_with_retry(symbol: str, start_ts: int, end_ts: int, headers: dict) -> list:
    """带指数退避的 K 线拉取主循环"""
    all_data = []
    current_start = start_ts
    retry_count = 0
    consecutive_retries = 0
    
    while True:
        params = {
            "symbol": symbol,
            "interval": INTERVAL,
            "start_time": current_start,
            "end_time": end_ts,
            "limit": BATCH_SIZE,
        }
        
        data, retry_after_ms = fetch_page(API_BASE + "/v1/market/kline", headers, params)
        
        if retry_after_ms is not None:
            # 限频等待
            wait_s = retry_after_ms / 1000 + random.uniform(0, 0.5)
            print(f"[{symbol}] 限频等待 {wait_s:.1f}s")
            time.sleep(wait_s)
            continue
        
        if data is None:
            # 请求失败,指数退避重试
            consecutive_retries += 1
            if consecutive_retries > MAX_RETRIES:
                print(f"[{symbol}] 重试次数超限 ({MAX_RETRIES}),退出")
                break
            delay = min(2 * (2 ** retry_count), 60) + random.uniform(0, 1)
            print(f"[{symbol}] 请求失败,{delay:.1f}s 后重试 ({consecutive_retries}/{MAX_RETRIES})")
            time.sleep(delay)
            retry_count += 1
            continue
        
        consecutive_retries = 0  # 成功时重置计数器
        retry_count = 0
        
        if not data:
            print(f"[{symbol}] 空响应,到达数据终点")
            break
        
        last_ts = data[-1].get("ts") or data[-1].get("timestamp")
        all_data.extend(data)
        
        # 更新断点
        checkpoint_manager.set(symbol, last_ts)
        
        current_start = last_ts + 1
        
        progress = datetime.fromtimestamp(current_start / 1000).strftime("%Y-%m-%d")
        print(f"[{symbol}] {len(all_data):>8,} 条 | 进度: {progress} | "
              f"+{len(data)} 条/页")
        
        if len(data) < BATCH_SIZE:
            break  # 数据已拉取完毕
        
        time.sleep(PER_REQUEST_DELAY)
    
    return all_data

# ============== 并发调度 ==============

def worker(task: dict, semaphore: Semaphore):
    symbol = task["symbol"]
    start_ts = task["start_ts"]
    end_ts = task["end_ts"]
    
    with semaphore:
        # 检查断点
        resume_ts = checkpoint_manager.get(symbol)
        actual_start = resume_ts if resume_ts else start_ts
        
        if resume_ts:
            print(f"[{symbol}] 从 checkpoint 恢复,进度: "
                  f"{datetime.fromtimestamp(resume_ts/1000).strftime('%Y-%m-%d')}")
        
        data = fetch_klines_with_retry(symbol, actual_start, end_ts, headers)
        
        if data:
            df = pd.DataFrame(data)
            df.rename(columns={"ts": "timestamp", "o": "open", "h": "high",
                               "l": "low", "c": "close", "v": "volume"}, inplace=True)
            df["datetime"] = pd.to_datetime(df["timestamp"], unit="ms")
            df.sort_values("timestamp", inplace=True)
            
            out_dir = Path(OUTPUT_DIR)
            out_dir.mkdir(parents=True, exist_ok=True)
            safe = symbol.replace("/", "_")
            path = out_dir / f"{safe}.parquet"
            df.to_parquet(path, index=False)
            print(f"[{symbol}] ✓ 已保存 {len(df):,} 条 -> {path.name} "
                  f"({path.stat().st_size / 1024 / 1024:.2f} MB)")
        else:
            print(f"[{symbol}] ✗ 无数据")
        
        return symbol, len(data)

# ============== 主程序 ==============

if __name__ == "__main__":
    api_key = os.environ.get("TICKDB_API_KEY")
    if not api_key:
        print("错误:请设置环境变量 TICKDB_API_KEY")
        sys.exit(1)
    
    headers = {"X-API-Key": api_key}
    start_ts = int(datetime.fromisoformat(START_DATE).timestamp() * 1000)
    end_ts = int(datetime.fromisoformat(END_DATE).replace(hour=23, minute=59, second=59).timestamp() * 1000)
    
    checkpoint_manager = CheckpointManager(CHECKPOINT_DIR)
    semaphore = Semaphore(MAX_CONCURRENT)
    
    tasks = [
        {"symbol": s, "start_ts": start_ts, "end_ts": end_ts}
        for s in SYMBOLS
    ]
    
    print(f"开始拉取 {len(SYMBOLS)} 个标的,时间范围: {START_DATE} ~ {END_DATE}")
    print(f"并发数: {MAX_CONCURRENT}, 每页: {BATCH_SIZE}, 请求间隔: {PER_REQUEST_DELAY}s")
    print("-" * 60)
    
    start_time = time.time()
    results = {}
    
    with ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as executor:
        futures = {executor.submit(worker, t, semaphore): t["symbol"] for t in tasks}
        for future in as_completed(futures):
            symbol, count = future.result()
            results[symbol] = count
    
    elapsed = time.time() - start_time
    total = sum(results.values())
    
    print("-" * 60)
    print(f"完成!耗时: {elapsed:.1f}s | 总记录: {total:,} 条")
    print(f"平均速度: {total / elapsed:.1f} 条/秒" if elapsed > 0 else "")

八、常见问题与避坑指南

8.1 时间范围边界的坑

问题 原因 解决方案
数据末尾少了几天 end_time 设置为交易日收盘时间,实际数据止于前一交易日 end_time 设为目标日期 + 1 天,并检查最后一条数据的时间戳
数据开头时间戳晚于预期 start_time 恰好是休市日,API 从下一个交易日开始返回 start_time 提前 1-2 天,由代码过滤掉不需要的数据
盘前盘后数据缺失 TickDB K 线数据覆盖的交易时段可能包含盘前盘后 确认接口文档中 K 线数据的实际时间范围

8.2 并发控制的坑

场景 错误做法 正确做法
同时拉取多只股票 ThreadPoolExecutor(max_workers=50) 瞬间打满 从 3-5 开始测试,逐步增加,观察 3001 触发频率
快速连续请求 for _ in range(1000): requests.get(...) 每次请求后 time.sleep(0.02-0.1)
程序中断后重启 重新拉取全部数据 断点管理器自动从上次位置继续

8.3 数据存储的坑

问题 后果 解决方案
追加写入 CSV 文件锁冲突、数据重复 使用 Parquet 的整文件覆盖模式
时间戳未转 UTC 回测结果错误(夏令时切换时尤为明显) 统一使用 UTC 毫秒时间戳存储
内存中堆积全量数据 10 年分钟级数据约 1GB/Pandas DataFrame,内存溢出 流式处理:每拉取一页立即保存到磁盘

结语

拉取 10 年分钟级美股数据不是一项技术难题,但它对工程严谨性提出了真实的要求:理解分页的方向逻辑才能保证数据不丢失,正确处理限频才能让程序稳定运行,建立断点续传才能让长时间任务变得可管理,而合理的并发控制则是在效率与稳定性之间找到的精确平衡点。

数据工程中没有银弹。本文提供的方案经过模块化设计,每个模块(请求、重试、断点、存储)都可以独立测试和替换。当你的数据规模进一步增长(覆盖期权数据、产业链多资产组合),这些模块的设计思路依然适用——只是并发数和存储策略需要随业务场景调整。


下一步行动

如果你是量化研究员,10 年分钟级数据是因子挖掘的基础设施。建议先从 1–2 只股票验证全流程,确认数据完整性后再扩展到组合规模。

如果你在搭建数据管道,上述代码的模块化设计可以直接嵌入你的 ETL 架构。checkpoint 机制和并发控制逻辑经过了生产环境的验证。

如果你希望直接获取已清洗对齐的 10 年美股历史 K 线数据,联系 [email protected] 了解机构级数据方案,支持自定义时间范围和跨资产组合。

如果你习惯用 AI 辅助开发,在 ClawHub 或你的 AI 助手中搜索安装 tickdb-market-data SKILL,可快速接入本文所述的数据获取能力。


本文不构成任何投资建议。市场有风险,投资需谨慎。