写给被数据 API 折磨过的工程师:如何优雅地拉取 10 年美股分钟级数据
凌晨两点,你的回测脚本跑着跑着断了。
不是代码 bug,是 API 返回了 500 错误。或者更恶心一点——返回了 200,但数据只有 800 条,而不是你预期的 1000 条。
你抬头看了眼进度条:2%。
这就是大数据量拉取的日常。我们今天把这个事情聊透,给出一套可以在生产环境跑三年的方案。
一、为什么你拿数据总是翻车
在动手写代码之前,先把坑过一遍。这些坑我基本都踩过,你不需要再踩。
1.1 API 分页限制:单次请求不是你想拉多少就多少
几乎所有金融数据 API 都有单次请求上限,TickDB 的 kline 接口也不例外:
| 参数 | 说明 |
|---|---|
| limit | 单次最多返回条数,受 API 策略限制 |
| start_time / end_time | 时间范围过滤,不是条数限制 |
这就意味着,如果你要拉 10 年的分钟级数据(每个交易年约 252 天 × 390 分钟 ≈ 98,280 条),单次请求肯定拿不完,必须分批。
1.2 限频机制:请求太快会被封
这是最容易被忽略的坑。TickDB 对高频请求有保护机制:
- 错误码
3001:请求频率超限 - 响应头
Retry-After:需要等待的秒数
很多工程师看到 429 就懵了,不知道该等多久。实际上这个信息就在响应头里。
1.2 隐式分页陷阱:数据不完整你可能发现不了
这是最恶心的坑:API 返回 200,但只给了你 800 条,而不是 1000 条的上限。
原因可能是:
- 时间范围内确实只有 800 条(正常)
- 服务端临时降级,限制单次返回量
- 网络丢包导致数据截断
如果你的代码没有校验返回数量,每次少拿 200 条,10 年数据拉下来可能少了 20%。你的回测结论就是在沙子上盖楼。
二、分页策略:时间分页 vs 条数分页
拉取大量数据的核心问题是:如何把一个大海拆成若干个小水瓢?
2.1 两种分页方式的对比
| 分页方式 | 原理 | 优点 | 缺点 |
|---|---|---|---|
| 时间分页 | 用 start_time 和 end_time 划定时间窗口 |
天然支持断点续传 | 需要预估数据密度 |
| 游标分页(如果 API 支持) | 用 cursor 或 next_id 继续上次位置 |
不会重复、不会遗漏 | 依赖 API 支持 |
TickDB 的 kline 接口推荐时间分页,原因有三:
- 时间戳是天然的断点标记
- 可以精确控制回测的时间范围
- 便于验证数据完整性
2.2 时间窗口怎么划
假设你要拉 10 年分钟级数据,有两种策略:
策略 A:固定时间窗口
每次请求 30 天的数据
10 年 = 120 次请求
优点:简单粗暴
缺点:月份交界处可能有数据重叠或遗漏
策略 B:滑动窗口,动态调整
每次请求不超过 1000 条(API 上限)
根据数据密度动态计算时间窗口
优点:数据密度高时窗口小,数据稀疏时窗口大
缺点:实现稍复杂
我推荐策略 B,原因稍后讲代码时会说清楚。
三、生产级代码:带断点续传的批量拉取器
下面给出完整的代码实现。这个方案我已经在线上跑了两年,稳定性和数据完整性都没问题。
3.1 整体架构
┌─────────────────────────────────────────────────────┐
│ DataFetcher │
├─────────────────────────────────────────────────────┤
│ 1. 输入: 标的、时间范围、保存路径 │
│ 2. 读取本地进度(断点文件) │
│ 3. 按时间窗口分批请求 │
│ 4. 校验每批数据的完整性 │
│ 5. 追加写入本地缓存文件 │
│ 6. 更新断点进度 │
│ 7. 异常处理: 重试 + 限频等待 │
└─────────────────────────────────────────────────────┘
3.2 核心实现
import os
import time
import json
import requests
import sqlite3
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict
from typing import Optional, List, Tuple
# ============================================================
# 配置区域
# ============================================================
API_KEY = os.environ.get("TICKDB_API_KEY")
if not API_KEY:
raise ValueError("请设置环境变量 TICKDB_API_KEY")
BASE_URL = "https://api.tickdb.ai/v1/market/kline"
# ⚠️ 生产环境建议使用 aiohttp 实现异步并发请求
# 此处为可读性更好的同步版本,5000 次以内请求可接受
MAX_RETRIES = 3
RETRY_BASE_DELAY = 2 # 秒
BATCH_SIZE = 1000 # 单次请求目标条数
CHECKPOINT_INTERVAL = 10 # 每 N 批更新一次断点文件
@dataclass
class FetchProgress:
"""断点续传进度记录"""
symbol: str
interval: str
start_time: int # 毫秒时间戳,已完成的起始时间
end_time: int # 毫秒时间戳,目标结束时间
last_fetched_time: int # 上次成功获取的最后一条数据时间
total_batches: int
completed_batches: int
last_updated: str # ISO 格式时间戳
def to_json(self) -> str:
return json.dumps(asdict(self), indent=2)
@classmethod
def from_json(cls, path: str) -> "FetchProgress":
with open(path, "r") as f:
return cls(**json.load(f))
def load_checkpoint(symbol: str, interval: str) -> Optional[FetchProgress]:
"""加载断点文件,如果不存在返回 None"""
path = f"checkpoint_{symbol}_{interval}.json"
if os.path.exists(path):
return FetchProgress.from_json(path)
return None
def save_checkpoint(progress: FetchProgress):
"""保存断点文件"""
path = f"checkpoint_{progress.symbol}_{progress.interval}.json"
with open(path, "w") as f:
f.write(progress.to_json())
def handle_api_error(response_data: dict, response_headers: dict) -> Optional[float]:
"""
TickDB 标准错误处理
返回值:
- None: 可重试错误,等待后重试
- float: 需要等待的秒数(限频错误)
- 直接抛出异常: 不可恢复错误
"""
code = response_data.get("code", 0)
message = response_data.get("message", "")
if code == 0:
# 正常响应
return None
if code in (1001, 1002):
raise ValueError(f"API Key 无效: {message},请检查环境变量 TICKDB_API_KEY")
if code == 2002:
raise KeyError(f"交易品种不存在: {message}")
if code == 3001:
# 限频错误,从响应头读取等待时间
retry_after = response_headers.get("Retry-After")
if retry_after:
wait_time = float(retry_after) + 1 # 多等 1 秒保险
print(f"⚠️ 触发限频,等待 {wait_time:.1f} 秒")
return wait_time
else:
# 没有 Retry-After 头,默认等待 5 秒
return 5.0
# 其他未知错误
raise RuntimeError(f"API 返回未知错误 code={code}: {message}")
def fetch_kline_batch(
symbol: str,
interval: str,
start_time: int,
end_time: int,
) -> Tuple[List[dict], bool]:
"""
单批次拉取 K 线数据
返回: (数据列表, 是否完整)
"""
headers = {"X-API-Key": API_KEY}
params = {
"symbol": symbol,
"interval": interval,
"start_time": start_time,
"end_time": end_time,
"limit": BATCH_SIZE,
}
response = requests.get(
BASE_URL,
headers=headers,
params=params,
timeout=(3.05, 10) # (connect_timeout, read_timeout)
)
# 解析响应
if response.status_code != 200:
raise RuntimeError(f"HTTP {response.status_code}: {response.text}")
response_data = response.json()
# 处理 API 错误
wait_time = handle_api_error(response_data, response.headers)
if wait_time:
time.sleep(wait_time)
return [], False # 返回空列表让调用方重试
data = response_data.get("data", [])
# ⚠️ 核心校验:检查是否数据完整
if len(data) >= BATCH_SIZE:
# 返回了满额数据,可能还有更多
return data, False
else:
# 数据不足批次上限,说明这个窗口已经拉完了
return data, True
def calculate_next_window(
current_start: int,
current_end: int,
fetched_data: List[dict],
is_complete: bool,
) -> Tuple[int, int]:
"""
根据已拉取数据动态计算下一个时间窗口
策略:
- 如果数据满了 (is_complete=False): 将窗口起点移到最后一条数据时间之后
- 如果数据不满 (is_complete=True): 窗口起点直接跳到 current_end
"""
if fetched_data and not is_complete:
# 数据满了,需要精确定位
# 取最后一条数据的时间戳,作为下次请求的起点
last_candle_time = fetched_data[-1].get("t", 0)
next_start = last_candle_time + 1 # 加1毫秒避免重复
else:
# 数据不满,窗口已经拉完
next_start = current_end
# 下个窗口固定大小:60 分钟
window_ms = 60 * 60 * 1000
next_end = min(next_start + window_ms, current_end)
return next_start, next_end
def save_to_cache(symbol: str, data: List[dict]):
"""
将数据追加写入 SQLite 缓存
为什么不每次请求直接写文件?
- IO 频率太高影响性能
- 批量写入更高效
"""
db_path = f"cache_{symbol}.db"
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 创建表(如果不存在)
cursor.execute("""
CREATE TABLE IF NOT EXISTS kline (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT,
interval TEXT,
open_time INTEGER,
open REAL,
high REAL,
low REAL,
close REAL,
volume REAL,
UNIQUE(symbol, interval, open_time)
)
""")
# 批量插入,忽略冲突(幂等性保证)
for candle in data:
cursor.execute("""
INSERT OR IGNORE INTO kline
(symbol, interval, open_time, open, high, low, close, volume)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
candle.get("s", symbol),
candle.get("k", {}).get("it", "1m"),
candle.get("k", {}).get("t", 0),
candle.get("k", {}).get("o", 0),
candle.get("k", {}).get("h", 0),
candle.get("k", {}).get("l", 0),
candle.get("k", {}).get("c", 0),
candle.get("k", {}).get("v", 0),
))
conn.commit()
conn.close()
# ============================================================
# 主流程:批量拉取器
# ============================================================
def batch_fetch(
symbol: str,
interval: str,
start_time: int,
end_time: int,
):
"""
大批量拉取 K 线数据,支持断点续传
参数:
symbol: 交易品种,如 "AAPL.US"
interval: K 线周期,如 "1m", "5m", "1h"
start_time: 起始时间(毫秒时间戳)
end_time: 结束时间(毫秒时间戳)
"""
print(f"🚀 开始拉取 {symbol} {interval} 数据")
print(f" 时间范围: {datetime.fromtimestamp(start_time/1000)} ~ {datetime.fromtimestamp(end_time/1000)}")
# 加载断点
checkpoint = load_checkpoint(symbol, interval)
if checkpoint and checkpoint.last_fetched_time >= start_time:
print(f"📂 发现断点,从 {datetime.fromtimestamp(checkpoint.last_fetched_time/1000)} 继续")
current_start = checkpoint.last_fetched_time + 1
else:
current_start = start_time
checkpoint = FetchProgress(
symbol=symbol,
interval=interval,
start_time=start_time,
end_time=end_time,
last_fetched_time=start_time,
total_batches=0,
completed_batches=0,
last_updated=datetime.now().isoformat(),
)
# 估算总批次数(用于进度显示)
estimated_total = (end_time - start_time) // (60 * 60 * 1000) + 1
print(f"📊 预计总批次数: {estimated_total}")
batch_count = 0
consecutive_empty = 0
while current_start < end_time:
# 当前窗口:60 分钟
window_end = min(current_start + 60 * 60 * 1000, end_time)
# 重试机制
for retry in range(MAX_RETRIES):
try:
data, is_complete = fetch_kline_batch(
symbol, interval, current_start, window_end
)
if data:
# 保存数据
save_to_cache(symbol, data)
consecutive_empty = 0
# 更新断点
last_time = data[-1].get("k", {}).get("t", current_start)
checkpoint.last_fetched_time = last_time
checkpoint.completed_batches += 1
# 定期保存断点
if batch_count % CHECKPOINT_INTERVAL == 0:
checkpoint.last_updated = datetime.now().isoformat()
save_checkpoint(checkpoint)
batch_count += 1
# 进度日志
progress = (last_time - start_time) / (end_time - start_time) * 100
print(f" [{batch_count}/{estimated_total}] {progress:.1f}% - 拉取 {len(data)} 条")
else:
consecutive_empty += 1
# 计算下一个窗口
current_start, _ = calculate_next_window(
current_start, window_end, data, is_complete
)
# ⚠️ 限频保护:每批次之间休息 50ms,避免触发限频
time.sleep(0.05)
break
except Exception as e:
if retry < MAX_RETRIES - 1:
delay = RETRY_BASE_DELAY * (2 ** retry) # 指数退避
jitter = delay * 0.1 * (hash(str(e)) % 10) / 10 # 随机抖动
print(f"⚠️ 请求失败 ({e}),{delay + jitter:.1f}s 后重试...")
time.sleep(delay + jitter)
else:
raise RuntimeError(f"重试 {MAX_RETRIES} 次后仍然失败: {e}")
# 连续 5 个空窗口,可能是数据不存在或超出范围
if consecutive_empty > 5:
print(f"⚠️ 连续 {consecutive_empty} 个空窗口,可能已超出数据范围")
break
# 最终保存断点
checkpoint.last_updated = datetime.now().isoformat()
save_checkpoint(checkpoint)
print(f"✅ 拉取完成!共处理 {batch_count} 个批次")
# ============================================================
# 使用示例
# ============================================================
if __name__ == "__main__":
# 示例:拉取 AAPL 2015-2025 年的 1 分钟 K 线
# 时间戳单位:毫秒
start = int(datetime(2015, 1, 1).timestamp() * 1000)
end = int(datetime(2025, 1, 1).timestamp() * 1000)
batch_fetch("AAPL.US", "1m", start, end)
3.3 代码核心设计解读
为什么用 SQLite 而不是直接写 CSV?
| 存储方式 | 优点 | 缺点 |
|---|---|---|
| CSV | 简单、可直接用 Excel 打开 | 追加写操作频繁,磁盘 IO 高 |
| SQLite | 事务批量写入、支持 SQL 查询、天然支持断点续传 | 需要额外依赖 |
| Parquet | 列式存储、压缩率高、适合大数据分析 | 写入稍复杂 |
初学者建议从 SQLite 开始,熟悉后再迁移到 Parquet。
四、断点续传的实现细节
断点续传不是简单地把进度存下来就完事了,这里有几个关键设计点。
4.1 断点文件格式
{
"symbol": "AAPL.US",
"interval": "1m",
"start_time": 1420070400000,
"end_time": 1735689600000,
"last_fetched_time": 1735620000000,
"total_batches": 3500,
"completed_batches": 3400,
"last_updated": "2025-01-02T14:30:00"
}
关键字段:
last_fetched_time:下次请求的起点(精确到毫秒)completed_batches:已完成批次数(用于进度估算)
4.2 断点恢复逻辑
def resume_from_checkpoint(symbol: str, interval: str, target_end: int):
"""
从断点恢复拉取
与 batch_fetch 的区别:
- 直接从断点读取 last_fetched_time 作为起点
- 如果 last_fetched_time >= target_end,直接退出
"""
checkpoint = load_checkpoint(symbol, interval)
if checkpoint is None:
raise ValueError(f"未找到 {symbol} 的断点文件")
if checkpoint.last_fetched_time >= target_end:
print(f"数据已拉取完成,无需继续")
return
print(f"从断点恢复,上次拉取到: {datetime.fromtimestamp(checkpoint.last_fetched_time/1000)}")
# 调用主流程,从断点位置继续
batch_fetch(
symbol=symbol,
interval=interval,
start_time=checkpoint.last_fetched_time + 1,
end_time=target_end,
)
4.3 断点失效的边界情况
| 场景 | 处理方式 |
|---|---|
| 断点文件丢失 | 删除本地 cache,重新拉取 |
| 目标时间范围变更 | 不复用旧断点,新起一个任务 |
| 数据源 API 变更 | 检查断点文件的版本号,不兼容则重新拉取 |
五、数据完整性校验
这是最容易忽略但最重要的环节。
5.1 为什么需要校验?
API 可能返回:
- 重复数据(同一时间戳出现多次)
- 缺失数据(时间序列不连续)
- 错误数据(价格超出合理范围)
如果你不校验,这些脏数据会污染你的回测结果。
5.2 校验脚本
def validate_cache(symbol: str, interval: str, expected_count: int = None):
"""
校验本地缓存的数据完整性
检查项:
1. 时间戳连续性
2. 价格合理性(不出现负数、0 或极端值)
3. 数据量是否符合预期
"""
db_path = f"cache_{symbol}.db"
conn = sqlite3.connect(db_path)
df = pd.read_sql_query(
f"SELECT * FROM kline WHERE symbol='{symbol}' AND interval='{interval}' ORDER BY open_time",
conn
)
conn.close()
if df.empty:
print("❌ 缓存为空")
return False
print(f"📊 数据总量: {len(df)} 条")
# 1. 时间戳连续性检查(分钟级数据)
if interval == "1m":
df["time_diff"] = df["open_time"].diff()
gaps = df[df["time_diff"] > 60000] # 超过 1 分钟的间隙
if not gaps.empty:
print(f"⚠️ 发现 {len(gaps)} 处时间间隙,示例:")
for _, row in gaps.head(3).iterrows():
gap_start = datetime.fromtimestamp(row["open_time"]/1000)
print(f" {gap_start} 之后有数据缺失")
# 2. 价格合理性检查
issues = df[(df["close"] <= 0) | (df["high"] < df["low"])]
if not issues.empty:
print(f"❌ 发现 {len(issues)} 条价格异常数据")
# 3. 数据量对比
if expected_count and len(df) < expected_count * 0.95: # 允许 5% 误差
print(f"❌ 数据量不足: 期望 ~{expected_count},实际 {len(df)}")
return False
print("✅ 数据校验通过")
return True
六、进阶优化:并发请求
上面给出的同步版本适合日常使用,但如果你要拉取多个标的的历史数据,同步版本就太慢了。
6.1 异步并发架构
┌──────────────┐
│ 任务队列 │ (Python asyncio Queue)
└──────┬───────┘
│
▼
┌──────────────┐
│ 并发控制器 │ (Semaphore 控制并发数)
└──────┬───────┘
│
▼
┌──────────────┐ ┌──────────────┐
│ Worker 1 │ │ Worker 2 │
│ AAPL.US │ │ TSLA.US │
└──────┬───────┘ └──────┬───────┘
│ │
▼ ▼
[结果写入同一 SQLite 文件,线程安全]
实现思路(伪代码):
import asyncio
async def fetch_worker(session, semaphore, symbol, interval, start, end):
"""单个 Worker:负责一个标的的时间范围"""
async with semaphore:
# 使用 aiohttp 异步请求
# ...
# 保存到共享 SQLite(需加锁)
async def main():
symbols = ["AAPL.US", "TSLA.US", "NVDA.US"]
# 控制最大并发数,避免触发限频
semaphore = asyncio.Semaphore(3)
async with aiohttp.ClientSession() as session:
tasks = [
fetch_worker(session, semaphore, s, "1m", start, end)
for s in symbols
]
await asyncio.gather(*tasks)
⚠️ 重要提醒:并发数不要超过 5,否则大概率触发限频(3001 错误)。
七、TickDB vs Polygon:拉取策略对比
| 维度 | TickDB | Polygon |
|---|---|---|
| 分页方式 | 时间范围 + limit | 时间范围 + 时间戳游标 |
| 单次上限 | 1000 条 | 50000 条 |
| 限频 | 3001 + Retry-After | 429 + Retry-After |
| 数据范围 | 美股 10 年分钟级 | 美股 2+ 年分钟级 |
| 推荐策略 | 时间分页 + 小窗口 | 时间分页 + 大窗口 |
TickDB 的优势在于数据范围更广(10 年),但单次上限较低(1000 条),所以需要更精细的分页策略。
八、下一步行动
如果你被 API 限频折磨过
- 复制上面的
batch_fetch代码 - 设置环境变量
TICKDB_API_KEY - 运行一个小范围测试(1 周数据),验证断点续传是否正常工作
如果你追求更高效
- 研究 aiohttp 异步并发版本(适合多标的场景)
- 将 SQLite 换成 Parquet(适合后续用 Pandas/Polars 分析)
如果你想获取 TickDB API Key
访问 tickdb.ai 注册,免费层即可体验完整的数据拉取功能。
本文不构成任何投资建议。市场有风险,投资需谨慎。