“你的策略回测失败,不是因为逻辑有问题,而是因为数据还没跑完。”
凌晨两点,一位量化研究员看着终端里报错的数据拉取任务,开始从头下载——这是他本周第三次因为断网导致数据拉取中断。他的策略模型在本地测试了三个月,一切正常,但上线后收益曲线和回测结果差了 37%。事后排查发现,训练数据和实盘数据的采样频率不一致。问题不是策略本身,而是回测数据的获取与管理机制。
这不是个例。在 TickDB 支持的数百个量化开发者中,数据获取是回测失败的前三大原因之一:样本量不足、断点丢失、限频被封。本篇文章拆解分钟级历史数据批量拉取的核心工程挑战,给出生产级的完整解决方案。
一、回测数据的三大工程陷阱
在进入代码层面之前,先厘清为什么“拉数据”这件事能绊倒这么多有经验的开发者。
1.1 陷阱一:分页机制的理解偏差
大多数行情 API 对单次请求返回的数据量都有上限。以 TickDB 为例,单次请求最大 limit 为 1000 条记录。对于 10 年分钟级美股数据,单只股票的历史记录约在 52 万条(250 个交易日 × 390 分钟 × 10 年,再扣除非交易时段)。这意味着即使 API 支持,你也需要发起至少 520 次请求才能获取单只股票的全量数据。
常见错误:直接用循环请求,没有正确处理分页游标(cursor)。当网络超时或限频触发时,游标重置导致数据重复或遗漏。
1.2 陷阱二:限频策略的忽视
TickDB 对不同接口有不同的限频规则:
| 接口 | 限频规则 | 超限后果 |
|---|---|---|
/kline (历史K线) |
20 次/秒 | 触发 code:3001,等待 Retry-After |
/ticker (实时行情) |
10 次/秒 | 触发 code:3001 |
/symbols/available |
5 次/秒 | 触发 code:3001 |
常见错误:无差别并发请求,导致大量请求同时触发限频,轻则延长拉取时间,重则被临时封禁 IP。
1.3 陷阱三:断点续传的缺失
拉取 10 年分钟级数据,单只股票耗时约 5-15 分钟(取决于网络和限频)。在这个时间窗口内,网络波动、进程崩溃、机器休眠都是极高概率事件。
常见错误:没有本地缓存机制,每次重跑都从头开始。这不是“努力”可以弥补的工程缺陷。
二、分片并发架构设计
解决上述三个陷阱的核心思路是:分片并发拉取 + 断点状态持久化 + 限频自适应调度。
2.1 架构概览
┌─────────────────────────────────────────────────────────────┐
│ 数据拉取调度器 (Orchestrator) │
├─────────────────────────────────────────────────────────────┤
│ 1. 读取本地缓存的进度状态 │
│ 2. 计算需要拉取的分片区间 │
│ 3. 分配任务给 Worker Pool │
│ 4. 监控限频状态,动态调整并发度 │
│ 5. 写入本地缓存(每个分片完成后) │
└─────────────────────────────────────────────────────────────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Worker 1 │ │ Worker 2 │ │ Worker N │
│ 分片 A │ │ 分片 B │ │ 分片 ... │
└──────────┘ └──────────┘ └──────────┘
│ │ │
└───────────────┼───────────────┘
▼
┌─────────────────────────────────────────┐
│ 本地缓存层 (SQLite/LevelDB) │
│ - 已拉取分片记录 │
│ - 最后成功请求的时间戳 │
│ - 当前分页游标 │
└─────────────────────────────────────────┘
2.2 分片策略:按时间窗口还是按游标?
有两种主流分片策略:
| 策略 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 时间窗口分片 | 断点明确、易于并行 | 需提前知道数据边界 | 固定回测周期 |
| 游标分片 | 无需知道数据边界 | 断点状态更复杂 | 实时增量更新 |
本文采用时间窗口分片,因为回测场景通常是“某年到某年”的固定周期,更符合量化研究者的实际需求。
分片大小建议:
| 数据周期 | 推荐分片大小 | 单分片记录数 |
|---|---|---|
| 1 分钟 K 线 | 7 天 | ~19,250 条 |
| 5 分钟 K 线 | 30 天 | ~19,500 条 |
| 60 分钟 K 线 | 90 天 | ~14,625 条 |
三、生产级代码实现
以下代码是完整的 TickDB 历史数据拉取器,包含所有生产级要素:
3.1 依赖与环境配置
# requirements.txt
# requests>=2.31.0
# tenacity>=8.2.3
# sqlite3 (Python 内置)
# python-dotenv>=1.0.0
import os
import time
import json
import sqlite3
import requests
import random
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, List, Tuple
from dataclasses import dataclass
from tenacity import retry, stop_after_attempt, wait_exponential
# ⚠️ 生产环境高频场景建议使用 aiohttp/asyncio 异步架构
# 以下同步代码适合单账号、日拉取量 < 10 万条的场景
# .env
TICKDB_API_KEY=your_api_key_here
TICKDB_BASE_URL=https://api.tickdb.ai/v1
# 数据存储路径
DATA_DIR=./tickdb_data
3.2 断点续传核心:进度状态管理
@dataclass
class FetchProgress:
"""单只股票的拉取进度状态"""
symbol: str
interval: str # 1m, 5m, 15m, 1h, etc.
start_time: int # Unix timestamp ms
end_time: int # Unix timestamp ms
last_cursor: Optional[int] = None # 最后成功请求的时间戳
last_fetched_time: Optional[int] = None
status: str = "pending" # pending / fetching / completed / failed
retry_count: int = 0
class ProgressStore:
"""基于 SQLite 的断点续传状态存储"""
def __init__(self, db_path: str = "./fetch_progress.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS fetch_progress (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
interval TEXT NOT NULL,
start_time INTEGER NOT NULL,
end_time INTEGER NOT NULL,
last_cursor INTEGER,
last_fetched_time INTEGER,
status TEXT DEFAULT 'pending',
retry_count INTEGER DEFAULT 0,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(symbol, interval, start_time)
)
""")
# 创建索引加速查询
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_symbol_interval_status
ON fetch_progress(symbol, interval, status)
""")
def get_progress(self, symbol: str, interval: str,
start_time: int, end_time: int) -> Optional[FetchProgress]:
"""查询是否存在未完成的拉取任务"""
with sqlite3.connect(self.db_path) as conn:
row = conn.execute("""
SELECT symbol, interval, start_time, end_time,
last_cursor, last_fetched_time, status, retry_count
FROM fetch_progress
WHERE symbol = ? AND interval = ?
AND start_time = ? AND end_time = ?
""", (symbol, interval, start_time, end_time)).fetchone()
if row:
return FetchProgress(*row)
return None
def save_progress(self, progress: FetchProgress):
"""保存拉取进度(每个分片完成后调用)"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT OR REPLACE INTO fetch_progress
(symbol, interval, start_time, end_time, last_cursor,
last_fetched_time, status, retry_count, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
""", (
progress.symbol, progress.interval, progress.start_time,
progress.end_time, progress.last_cursor, progress.last_fetched_time,
progress.status, progress.retry_count
))
def mark_completed(self, symbol: str, interval: str,
start_time: int, end_time: int):
"""标记某时间窗口的数据拉取完成"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
UPDATE fetch_progress
SET status = 'completed', updated_at = CURRENT_TIMESTAMP
WHERE symbol = ? AND interval = ?
AND start_time = ? AND end_time = ?
""", (symbol, interval, start_time, end_time))
3.3 TickDB API 客户端:限频自适应处理
class TickDBClient:
"""TickDB API 客户端,含限频处理与指数退避重连"""
RATE_LIMIT_CODES = {3001}
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
if not self.api_key:
raise ValueError("API Key 未设置,请设置 TICKDB_API_KEY 环境变量")
self.base_url = os.environ.get("TICKDB_BASE_URL",
"https://api.tickdb.ai/v1")
self.headers = {"X-API-Key": self.api_key}
# 限频状态追踪
self.last_request_time = 0
self.min_request_interval = 0.05 # 20 次/秒 = 间隔 50ms
def _adaptive_rate_limit(self):
"""自适应限频:根据距上次请求的时间动态等待"""
elapsed = time.time() - self.last_request_time
if elapsed < self.min_request_interval:
time.sleep(self.min_request_interval - elapsed)
self.last_request_time = time.time()
def _handle_error(self, response: dict, status_code: int) -> Optional[dict]:
"""统一错误处理,返回 None 表示需要重试"""
code = response.get("code", 0)
if code == 0:
return response.get("data")
# 限频处理
if code == 3001:
retry_after = int(response.headers.get("Retry-After",
self.min_request_interval * 2))
print(f"⚠️ 触发限频,等待 {retry_after:.2f} 秒")
time.sleep(retry_after)
return None
# 可纠正错误,可重试
if code in (1003, 5000):
return None
# 不可纠正错误,抛出异常
error_messages = {
1001: "API Key 无效",
1002: "API Key 缺失",
2002: "交易品种不存在",
4001: "参数错误",
}
raise RuntimeError(f"API 错误 {code}: {error_messages.get(code, '未知错误')}")
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=1, max=60)
)
def get_kline(self, symbol: str, interval: str,
start_time: int, end_time: int,
limit: int = 1000) -> Tuple[List[dict], Optional[int]]:
"""
获取 K 线数据
Returns:
(data_list, next_cursor): 数据列表和下次请求的游标
next_cursor 为 None 表示数据拉取完成
"""
self._adaptive_rate_limit()
params = {
"symbol": symbol,
"interval": interval,
"start_time": start_time,
"end_time": end_time,
"limit": limit,
}
# ⚠️ 生产环境建议添加请求超时,防止连接挂死
response = requests.get(
f"{self.base_url}/market/kline",
headers=self.headers,
params=params,
timeout=(3.05, 10) # (connect_timeout, read_timeout)
)
# 处理限频响应头
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
print(f"⚠️ HTTP 429 限频,等待 {retry_after} 秒")
time.sleep(retry_after)
raise Exception("Rate limited") # 触发 tenacity 重试
try:
data = response.json()
except json.JSONDecodeError:
raise Exception(f"响应解析失败: {response.text[:200]}")
result = self._handle_error(data, response.status_code)
if result is None:
raise Exception("重试后仍未成功") # 触发 tenacity 重试
klines = result.get("klines", [])
# 获取下一页游标
next_cursor = result.get("next_cursor")
return klines, next_cursor
def get_available_symbols(self, category: str = "US") -> List[str]:
"""获取可用交易品种列表"""
self._adaptive_rate_limit()
response = requests.get(
f"{self.base_url}/symbols/available",
headers=self.headers,
params={"category": category},
timeout=(3.05, 10)
)
data = response.json()
result = self._handle_error(data, response.status_code)
if result is None:
raise RuntimeError("获取交易品种列表失败")
return result.get("symbols", [])
3.4 分片拉取调度器
class DataFetcher:
"""历史数据拉取调度器"""
# 单分片时间窗口(毫秒)
SLICE_WINDOWS = {
"1m": 7 * 24 * 3600 * 1000, # 7 天
"5m": 30 * 24 * 3600 * 1000, # 30 天
"15m": 30 * 24 * 3600 * 1000, # 30 天
"1h": 90 * 24 * 3600 * 1000, # 90 天
}
def __init__(self, client: TickDBClient, progress_store: ProgressStore):
self.client = client
self.progress_store = progress_store
self.data_dir = Path(os.environ.get("DATA_DIR", "./tickdb_data"))
self.data_dir.mkdir(exist_ok=True)
def _generate_slices(self, symbol: str, interval: str,
start_date: str, end_date: str) -> List[Tuple[int, int]]:
"""生成时间分片列表"""
start_ts = int(datetime.fromisoformat(start_date).timestamp() * 1000)
end_ts = int(datetime.fromisoformat(end_date).timestamp() * 1000)
window = self.SLICE_WINDOWS.get(interval, 7 * 24 * 3600 * 1000)
slices = []
current = start_ts
while current < end_ts:
slice_end = min(current + window, end_ts)
slices.append((current, slice_end))
current = slice_end
return slices
def _load_existing_data(self, symbol: str, interval: str,
start_time: int, end_time: int) -> List[dict]:
"""从本地缓存加载已存在的数据"""
cache_file = self.data_dir / f"{symbol}_{interval}_{start_time}_{end_time}.json"
if cache_file.exists():
with open(cache_file) as f:
return json.load(f)
return []
def _save_data(self, symbol: str, interval: str,
start_time: int, end_time: int, data: List[dict]):
"""保存数据到本地缓存"""
cache_file = self.data_dir / f"{symbol}_{interval}_{start_time}_{end_time}.json"
with open(cache_file, "w") as f:
json.dump(data, f, indent=2)
def fetch_symbol(self, symbol: str, interval: str,
start_date: str, end_date: str,
force_refetch: bool = False) -> dict:
"""
拉取单只股票的历史数据
Args:
symbol: 交易品种代码,如 "AAPL.US"
interval: K 线周期,如 "1m", "5m", "1h"
start_date: 开始日期,ISO 格式
end_date: 结束日期,ISO 格式
force_refetch: 是否强制重新拉取(忽略缓存)
Returns:
拉取结果统计
"""
slices = self._generate_slices(symbol, interval, start_date, end_date)
total_records = 0
skipped_slices = 0
for start_ts, end_ts in slices:
# 检查断点状态
if not force_refetch:
progress = self.progress_store.get_progress(
symbol, interval, start_ts, end_ts
)
if progress and progress.status == "completed":
existing = self._load_existing_data(
symbol, interval, start_ts, end_ts
)
total_records += len(existing)
skipped_slices += 1
print(f"⏭️ [{symbol}] {self._ts_to_date(start_ts)} 已拉取,跳过")
continue
# 拉取分片数据
slice_data, success = self._fetch_slice(
symbol, interval, start_ts, end_ts
)
if success:
self._save_data(symbol, interval, start_ts, end_ts, slice_data)
self.progress_store.save_progress(FetchProgress(
symbol=symbol,
interval=interval,
start_time=start_ts,
end_time=end_ts,
last_fetched_time=int(time.time() * 1000),
status="completed"
))
total_records += len(slice_data)
print(f"✅ [{symbol}] {self._ts_to_date(start_ts)}: {len(slice_data)} 条")
else:
self.progress_store.save_progress(FetchProgress(
symbol=symbol,
interval=interval,
start_time=start_ts,
end_time=end_ts,
status="failed"
))
return {
"symbol": symbol,
"interval": interval,
"total_records": total_records,
"total_slices": len(slices),
"skipped_slices": skipped_slices,
}
def _fetch_slice(self, symbol: str, interval: str,
start_time: int, end_time: int) -> Tuple[List[dict], bool]:
"""拉取单个时间分片的数据(带游标翻页)"""
all_data = []
cursor = None
max_retries = 3
while True:
try:
# 如果有断点游标,从断点位置继续
klines, next_cursor = self.client.get_kline(
symbol=symbol,
interval=interval,
start_time=start_time if cursor is None else cursor,
end_time=end_time,
limit=1000
)
all_data.extend(klines)
# 保存断点进度(每页保存一次)
self.progress_store.save_progress(FetchProgress(
symbol=symbol,
interval=interval,
start_time=start_time,
end_time=end_time,
last_cursor=next_cursor,
last_fetched_time=int(time.time() * 1000),
status="fetching"
))
if next_cursor is None or len(klines) == 0:
break
cursor = next_cursor
except Exception as e:
max_retries -= 1
if max_retries <= 0:
print(f"❌ [{symbol}] 拉取失败: {e}")
return all_data, False
print(f"⚠️ [{symbol}] 重试中,剩余 {max_retries} 次...")
time.sleep(random.uniform(1, 3))
return all_data, True
@staticmethod
def _ts_to_date(ts: int) -> str:
return datetime.fromtimestamp(ts / 1000).strftime("%Y-%m-%d")
3.5 使用示例
def main():
# 初始化组件
client = TickDBClient()
progress_store = ProgressStore()
fetcher = DataFetcher(client, progress_store)
# 拉取 AAPL 10 年分钟级数据
result = fetcher.fetch_symbol(
symbol="AAPL.US",
interval="1m",
start_date="2014-01-01",
end_date="2024-01-01",
force_refetch=False # True 则忽略缓存重新拉取
)
print(f"\n📊 拉取完成:")
print(f" 股票: {result['symbol']}")
print(f" 周期: {result['interval']}")
print(f" 总记录数: {result['total_records']:,}")
print(f" 分片数: {result['total_slices']}")
print(f" 跳过(已有): {result['skipped_slices']}")
if __name__ == "__main__":
main()
四、核心指标对比
以下是与手动拉取或使用其他方案的对比:
| 能力维度 | 手动脚本 | 开源方案 (如 akshare) | TickDB + 本方案 |
|---|---|---|---|
| 数据完整性 | 依赖网络稳定性 | 需自行处理断点续传 | 原生支持断点续传 |
| 限频处理 | 无 | 部分支持 | 自适应限频 + 重试 |
| 数据对齐 | 需手动清洗 | 部分清洗 | TickDB 已清洗对齐 |
| 美股 10 年分钟级 | 预计 5-7 天 | 预计 3-5 天 | 预计 1-2 天 |
| 盘前盘后数据 | 通常缺失 | 部分覆盖 | 完整覆盖 |
| API 稳定性 | 无保障 | 依赖第三方维护 | 官方 SLA 保障 |
五、进阶优化方向
5.1 异步并发拉取
当前同步实现适合单账号场景。若需加速(多账号、多股票并行),建议迁移到 asyncio + aiohttp:
import asyncio
import aiohttp
async def async_fetch_all(symbols: List[str], ...):
"""使用 asyncio 并发拉取多只股票"""
async with aiohttp.ClientSession() as session:
tasks = [
async_fetch_symbol(session, symbol, ...)
for symbol in symbols
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
5.2 增量更新机制
对于需要每日增量更新的回测框架,可在调度器中增加“获取最新一条数据的时间戳,与当前时间对比”的逻辑,只拉取新增部分:
def fetch_incremental(self, symbol: str, interval: str):
"""增量拉取:只获取上次拉取后的新数据"""
latest_local = self._get_latest_local_timestamp(symbol, interval)
start_ts = latest_local or (int(time.time() * 1000) - 24 * 3600 * 1000)
end_ts = int(time.time() * 1000)
if end_ts - start_ts < 60000: # 不足 1 分钟,直接返回
return []
return self._fetch_slice(symbol, interval, start_ts, end_ts)
5.3 数据校验
拉取完成后,建议进行完整性校验:
def validate_data(self, symbol: str, interval: str,
start_date: str, end_date: str) -> dict:
"""校验数据完整性"""
expected_bars = self._calculate_expected_bars(
symbol, interval, start_date, end_date
)
actual_bars = self._count_local_bars(symbol, interval)
missing_ratio = (expected_bars - actual_bars) / expected_bars
return {
"expected": expected_bars,
"actual": actual_bars,
"missing_ratio": f"{missing_ratio:.2%}",
"is_complete": missing_ratio < 0.01 # 允许 1% 容差
}
六、下一步行动
如果你希望直接使用 TickDB 的数据能力:
- 访问 tickdb.ai 注册(免费赠送日额度)
- 在控制台生成 API Key
- 设置环境变量
TICKDB_API_KEY,复制本文代码即可运行
如果你需要批量拉取多只股票或更快的拉取速度:
- 使用
asyncio异步架构(本文 5.1 节) - 或联系 [email protected] 获取多账号并发方案
如果你在拉取过程中遇到错误码:
- code:1001/1002 → 检查 API Key 是否正确
- code:2002 → 使用
/symbols/available确认品种代码格式 - code:3001 → 程序会自动等待并重试,无需手动处理
风险提示:本文不构成任何投资建议。历史数据不代表未来收益,回测结果与实盘运行存在差距。市场有风险,投资需谨慎。