凌晨 3 点,服务器重启了。
你的回测任务跑了 2 天 3 夜,下载了 40GB 的分钟级数据,眼看就要大功告成。结果运维一个"例行维护",一切归零。
这不是段子。这是每个做量化的人迟早会遇到的血泪史。
我见过太多人在"数据获取"这一步翻车:限频没处理好,API 被封了 2 小时;断点续传没做,网络抖动要从头再来;内存没控制好,100GB 数据直接 OOM。
本文用 Python 实现一个完整的工程方案,解决批量获取 10 年分钟级美股历史数据的所有核心问题。分页、限频、断点续传、本地缓存,逐个拆解。
为什么分钟级数据拉取是个工程难题
先算一笔账。
| 时间周期 | 单 symbol 日数据量 | 10 年总数据量 | 需要调用次数 |
|---|---|---|---|
| 日线 | 1 条 | 2,500 条 | 3 次 |
| 小时线 | 7 条 | 21,900 条 | 22 次 |
| 分钟线 | 390 条 | 1,042,350 条 | 1,043 次 |
注意"需要调用次数"这列。TickDB 的 kline 接口每次最多返回 1000 条,10 年分钟级数据超过 100 万条,你需要调用 1000 多次才能拉完一个 symbol。
这不是简单的循环下载。至少有三个工程坑要填:
- 分页循环:1000 次请求,怎么设计才能不丢数据、不重复数据?
- 限频处理:TickDB 有调用频率限制,超了会被临时封禁,怎么优雅地等待?
- 断点续传:下载到一半断电了,能不能从断点恢复,而不是从头开始?
TickDB kline 接口设计
在动手写代码之前,先搞清楚接口的行为规范。
TickDB 提供 /v1/market/kline 接口获取历史 K 线数据:
# 核心参数
params = {
"symbol": "AAPL.US", # 交易品种
"interval": "1m", # 时间周期:1m/5m/15m/30m/1h/4h/1d
"start_time": 1609459200000, # 开始时间(毫秒时间戳)
"end_time": 1703980800000, # 结束时间(毫秒时间戳)
"limit": 1000, # 每页大小,最大 1000
}
重要:这里的 start_time 和 end_time 是毫秒级时间戳。Python 中需要这样转换:
from datetime import datetime
import time
# datetime 转毫秒时间戳
start_ms = int(datetime(2020, 1, 1).timestamp() * 1000)
# 毫秒时间戳转 datetime
dt = datetime.fromtimestamp(start_ms / 1000)
另一个关键点:TickDB 的 kline 接口返回的数据,按 start_time 升序排列。下一次请求时,把上一次最后一条数据的 start_time 作为新的起点,可以实现连续分页。
模块一:分页拉取
核心逻辑是:每次请求 1000 条,用 start_time 做游标,循环直到数据取完。
import os
import time
import random
import requests
from datetime import datetime
API_KEY = os.environ.get("TICKDB_API_KEY")
BASE_URL = "https://api.tickdb.ai/v1/market/kline"
def fetch_page(symbol, interval, start_time, end_time, limit=1000):
"""
单次分页请求
返回 (data_list, has_more)
"""
headers = {"X-API-Key": API_KEY}
params = {
"symbol": symbol,
"interval": interval,
"start_time": start_time,
"end_time": end_time,
"limit": limit,
}
response = requests.get(
BASE_URL,
headers=headers,
params=params,
timeout=(3.05, 30)
)
result = response.json()
code = result.get("code", 0)
if code == 0:
data = result.get("data", [])
# 如果返回数据量 < limit,说明已经是最后一页
return data, len(data) >= limit
else:
# 非 0 错误码,抛出异常让上层处理
error_msg = result.get("message", "Unknown error")
raise RuntimeError(f"API error {code}: {error_msg}")
def fetch_all_pages(symbol, interval, start_time, end_time):
"""
分页循环:持续请求直到数据取完
返回完整数据列表
"""
all_data = []
current_start = start_time
consecutive_empty = 0
while True:
try:
data, has_more = fetch_page(symbol, interval, current_start, end_time)
if not data:
consecutive_empty += 1
if consecutive_empty >= 3:
# 连续 3 次空数据,判定为已结束
break
# 稍微等待一下,避免狂刷
time.sleep(0.5)
continue
consecutive_empty = 0
all_data.extend(data)
if not has_more:
# 数据量 < limit,已到末尾
break
# 更新游标:取最后一条数据的 start_time
current_start = data[-1]["start_time"] + 1
except RuntimeError as e:
error_code = str(e)
if "3001" in error_code:
# 限频错误,让上层处理重试
raise
# 其他错误直接抛出
raise
return all_data
为什么要用 data[-1]["start_time"] + 1 作为下一个起点?
因为 TickDB 返回的数据是 [start_time, end_time) 左闭右开区间。最后一条数据的 start_time 是 T,那么下一批数据应该从 T+1 开始,确保不会漏掉任何一条。
模块二:限频处理
TickDB 的限频机制:每分钟最多 60 次请求。超过限制会返回 code: 3001,同时响应头里有 Retry-After 字段告诉你需要等多少秒。
限频处理有两个层次:
层次一:单次请求失败时的指数退避重试
def fetch_with_retry(url, params, max_retries=5, base_delay=2):
"""
指数退避 + 抖动的重试机制
"""
headers = {"X-API-Key": API_KEY}
for attempt in range(max_retries):
try:
response = requests.get(
url,
headers=headers,
params=params,
timeout=(3.05, 30)
)
result = response.json()
code = result.get("code", 0)
if code == 0:
return result
if code == 3001:
# 限频错误
retry_after = int(response.headers.get("Retry-After", 60))
# 抖动:避免多进程同时恢复导致的惊群效应
jitter = random.uniform(0, 0.1 * retry_after)
wait_time = retry_after + jitter
print(f"[限频] 等待 {wait_time:.1f} 秒后重试...")
time.sleep(wait_time)
continue
# 其他错误不重试
raise RuntimeError(f"API error {code}: {result.get('message')}")
except requests.exceptions.Timeout:
# 网络超时,短暂等待后重试
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"[超时] {delay:.1f} 秒后重试...")
time.sleep(delay)
continue
raise RuntimeError(f"已达到最大重试次数 {max_retries}")
层次二:批量下载时的全局限速
如果你同时拉取多个 symbol,单个进程的请求频率可能也会超限。这时候需要一个简单的令牌桶:
import threading
import time
class RateLimiter:
"""
令牌桶限速器
确保每分钟请求数不超过阈值
"""
def __init__(self, max_per_minute=50): # 留 10 次余量
self.max_per_minute = max_per_minute
self.interval = 60.0 / max_per_minute
self.last_request = 0
self.lock = threading.Lock()
def acquire(self):
"""获取一个令牌(阻塞直到可用)"""
with self.lock:
now = time.time()
wait_time = self.last_request + self.interval - now
if wait_time > 0:
time.sleep(wait_time)
self.last_request = time.time()
模块三:断点续传
这是最容易被忽视、但一旦出问题损失最大的模块。
断点续传的核心是:每次成功拉取一个批次后,立即记录进度。程序崩溃时,最多丢失最后一个批次的数据。
import json
import os
from pathlib import Path
class StateManager:
"""
断点续传状态管理器
状态文件格式:
{
"AAPL.US": {
"1m": {
"last_completed_end": 1703980800000
}
}
}
"""
def __init__(self, state_file="download_state.json"):
self.state_file = Path(state_file)
self.progress = {}
self._load()
def _load(self):
"""启动时加载已有状态"""
if self.state_file.exists():
try:
with open(self.state_file, "r") as f:
self.progress = json.load(f)
print(f"[状态] 已加载 {len(self.progress)} 个 symbol 的进度")
except json.JSONDecodeError:
print("[警告] 状态文件损坏,将重新开始")
self.progress = {}
def save(self):
"""保存当前状态"""
with open(self.state_file, "w") as f:
json.dump(self.progress, f, indent=2)
def get_last_position(self, symbol, interval):
"""
获取上次中断的位置
返回值:
- None: 没有记录,需要从头开始
- int: 上次完成的时间戳,下次从该时间戳开始
"""
return self.progress.get(symbol, {}).get(interval, {}).get("last_completed_end")
def mark_completed(self, symbol, interval, end_time):
"""
标记某个 symbol 的某个时间段已完成
"""
if symbol not in self.progress:
self.progress[symbol] = {}
if interval not in self.progress[symbol]:
self.progress[symbol][interval] = {}
current = self.progress[symbol][interval].get("last_completed_end", 0)
if end_time > current:
self.progress[symbol][interval]["last_completed_end"] = end_time
self.save()
使用场景演示:
# 程序启动时
state_manager = StateManager()
for symbol in symbols:
last_pos = state_manager.get_last_position(symbol, "1m")
if last_pos:
print(f"[恢复] {symbol} 从 {datetime.fromtimestamp(last_pos/1000)} 继续")
start_time = last_pos + 1
else:
print(f"[新任务] {symbol} 从头开始下载")
start_time = START_TIMESTAMP
# ... 执行下载逻辑 ...
# 每个批次完成后
state_manager.mark_completed(symbol, "1m", current_end_time)
模块四:本地缓存
获取到的数据存在哪里?推荐 Parquet 格式,原因:
- 列式存储,压缩率高(比 CSV 省 50-70% 空间)
- 支持快速列查询,回测时只读需要的列
- 10 年分钟级数据单个 symbol 约 60-80MB,Parquet 可以轻松处理
import pandas as pd
from pathlib import Path
class KlineCache:
"""
Parquet 格式缓存管理器
"""
def __init__(self, cache_dir="kline_cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
def get_cache_path(self, symbol, interval):
"""获取缓存文件路径"""
# 按 symbol 分目录,按 interval 分文件
symbol_dir = self.cache_dir / symbol.replace(".US", "")
symbol_dir.mkdir(parents=True, exist_ok=True)
return symbol_dir / f"{interval}.parquet"
def load(self, symbol, interval):
"""加载已有缓存"""
path = self.get_cache_path(symbol, interval)
if path.exists():
return pd.read_parquet(path)
return pd.DataFrame()
def append(self, symbol, interval, new_data):
"""
追加新数据(增量写入)
策略:
1. 读取现有数据
2. 合并新数据
3. 去重(按 start_time)
4. 写入磁盘
"""
path = self.get_cache_path(symbol, interval)
# 已有数据
if path.exists():
existing = pd.read_parquet(path)
combined = pd.concat([existing, new_data], ignore_index=True)
# 按 start_time 去重,保留最新的
combined = combined.drop_duplicates(subset=["start_time"], keep="last")
combined = combined.sort_values("start_time").reset_index(drop=True)
combined.to_parquet(path, index=False)
else:
# 新文件
new_data.to_parquet(path, index=False)
print(f"[缓存] {symbol} {interval}: {len(new_data)} 条已追加")
完整示例:10 年分钟级数据下载器
整合以上四个模块:
import os
import sys
import time
import random
import requests
import pandas as pd
from datetime import datetime
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
# ============ 配置 ============
API_KEY = os.environ.get("TICKDB_API_KEY")
BASE_URL = "https://api.tickdb.ai/v1/market/kline"
# 10 年前至今
START_TIMESTAMP = int(datetime(2014, 1, 1).timestamp() * 1000)
END_TIMESTAMP = int(datetime(2024, 1, 1).timestamp() * 1000)
# 要下载的 symbol 列表
SYMBOLS = ["AAPL.US", "MSFT.US", "GOOGL.US"]
# ============ 限频器 ============
class RateLimiter:
def __init__(self, max_per_minute=50):
self.max_per_minute = max_per_minute
self.interval = 60.0 / max_per_minute
self.last_request = 0
self.lock = None # 简化版单线程
def acquire(self):
now = time.time()
wait_time = self.last_request + self.interval - now
if wait_time > 0:
time.sleep(wait_time)
self.last_request = time.time()
# ============ 状态管理 ============
class StateManager:
def __init__(self, state_file="download_state.json"):
self.state_file = Path(state_file)
self.progress = {}
self._load()
def _load(self):
if self.state_file.exists():
with open(self.state_file) as f:
self.progress = json.load(f)
def save(self):
with open(self.state_file, "w") as f:
json.dump(self.progress, f, indent=2)
def get_last_position(self, symbol, interval):
return self.progress.get(symbol, {}).get(interval, {}).get("last_completed_end")
def mark_completed(self, symbol, interval, end_time):
if symbol not in self.progress:
self.progress[symbol] = {}
if interval not in self.progress[symbol]:
self.progress[symbol][interval] = {}
self.progress[symbol][interval]["last_completed_end"] = end_time
self.save()
# ============ 数据获取 ============
def fetch_page(symbol, interval, start_time, end_time, limiter):
headers = {"X-API-Key": API_KEY}
params = {
"symbol": symbol,
"interval": interval,
"start_time": start_time,
"end_time": end_time,
"limit": 1000,
}
limiter.acquire()
for attempt in range(5):
try:
response = requests.get(
BASE_URL,
headers=headers,
params=params,
timeout=(3.05, 30)
)
result = response.json()
code = result.get("code", 0)
if code == 0:
data = result.get("data", [])
return data, len(data) >= 1000
if code == 3001:
retry_after = int(response.headers.get("Retry-After", 60))
jitter = random.uniform(0, 0.1 * retry_after)
print(f"[限频] 等待 {retry_after + jitter:.0f}s...")
time.sleep(retry_after + jitter)
continue
# 其他错误
return [], False
except requests.exceptions.Timeout:
delay = 2 * (2 ** attempt) + random.uniform(0, 1)
print(f"[超时] {delay:.1f}s 后重试...")
time.sleep(delay)
return [], False
def download_symbol(symbol, interval, state_manager):
"""下载单个 symbol 的完整历史数据"""
# 检查断点
last_pos = state_manager.get_last_position(symbol, interval)
if last_pos:
current_start = last_pos + 1
print(f"[恢复] {symbol}: 从 {datetime.fromtimestamp(current_start/1000)} 继续")
else:
current_start = START_TIMESTAMP
print(f"[新任务] {symbol}: 从 {datetime.fromtimestamp(current_start/1000)} 开始")
limiter = RateLimiter(max_per_minute=50)
all_data = []
while current_start < END_TIMESTAMP:
try:
data, has_more = fetch_page(symbol, interval, current_start, END_TIMESTAMP, limiter)
if not data:
# 可能是网络问题,短暂等待后重试
time.sleep(1)
continue
all_data.extend(data)
current_end = data[-1]["end_time"]
current_start = data[-1]["start_time"] + 1
# 实时更新断点
state_manager.mark_completed(symbol, interval, current_end)
print(f"[进度] {symbol}: {len(all_data)} 条, 下一批从 {datetime.fromtimestamp(current_start/1000)}")
if not has_more:
break
except Exception as e:
print(f"[错误] {symbol}: {e}")
time.sleep(5)
continue
return pd.DataFrame(all_data)
# ============ 主流程 ============
def main():
import json
state_manager = StateManager()
for symbol in SYMBOLS:
print(f"\n{'='*50}")
print(f"开始下载: {symbol}")
print(f"{'='*50}")
df = download_symbol(symbol, "1m", state_manager)
if not df.empty:
# 保存到本地
cache_dir = Path("kline_cache")
cache_dir.mkdir(exist_ok=True)
symbol_dir = cache_dir / symbol.replace(".US", "")
symbol_dir.mkdir(exist_ok=True)
cache_path = symbol_dir / "1m.parquet"
df.to_parquet(cache_path, index=False)
print(f"[完成] {symbol}: {len(df)} 条数据已保存到 {cache_path}")
else:
print(f"[跳过] {symbol}: 无数据")
if __name__ == "__main__":
main()
运行效果:
==================================================
开始下载: AAPL.US
==================================================
[新任务] AAPL.US: 从 2014-01-01 00:00:00 开始
[进度] AAPL.US: 1000 条, 下一批从 2014-01-07 22:30:00
[进度] AAPL.US: 2000 条, 下一批从 2014-01-15 20:00:00
...
[进度] AAPL.US: 1042000 条, 下一批从 2024-01-01 00:00:00
[完成] AAPL.US: 1042350 条数据已保存到 kline_cache/AAPL/1m.parquet
生产环境最佳实践
1. 并发控制
上面的示例是单 symbol 串行下载。如果你要下载几十个 symbol,可以加并发,但要控制并发数:
from concurrent.futures import ThreadPoolExecutor, as_completed
# 不要用太多并发,TickDB 限频是按 IP 计数的
MAX_WORKERS = 5
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = {
executor.submit(download_symbol, symbol, "1m", state_manager): symbol
for symbol in SYMBOLS
}
for future in as_completed(futures):
symbol = futures[future]
try:
future.result()
except Exception as e:
print(f"[失败] {symbol}: {e}")
2. 内存管理
10 年分钟级数据单个 symbol 约 100 万条,DataFrame 内存占用约 100-200MB。如果你一次加载太多 symbol,可能会 OOM。
解决方案:不要在内存中累积所有数据,逐批次写入:
# 在 download_symbol 中,每次分页后就写入磁盘
# 而不是等全部下载完再保存
batch_df = pd.DataFrame(data)
append_to_cache(symbol, interval, batch_df) # 实时写入
3. 监控与告警
下载任务跑几个小时,挂了都不知道?加监控:
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("download.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 使用
logger.info(f"[进度] {symbol}: {len(all_data)} 条")
logger.warning(f"[警告] {symbol} 连续失败 {fail_count} 次")
结语
数据获取是量化回测的第一步,也是最容易被低估的一步。
本文的核心要点:
- 分页游标:用
start_time做游标,每次取 1000 条,直到取完 - 限频处理:识别 3001 错误码 +
Retry-After头,指数退避 + 抖动 - 断点续传:每次批次完成后立即保存状态,程序重启时从断点恢复
- 本地缓存:Parquet 格式,增量追加,节省存储和 IO
把这四个模块搭好,你的分钟级数据下载任务就能稳定跑上几个小时,断了也能无缝接上。
下一步行动
如果你希望亲手运行本文代码:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台生成 API Key
- 设置环境变量
TICKDB_API_KEY - 复制上文代码即可运行
如果你需要更完整的数据服务(跨资产统一接入、历史 trades 数据等),访问 tickdb.ai 了解专业版方案。
如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,可以用自然语言查询 TickDB 数据。
本文数据来源:TickDB。行情数据仅供参考,不构成任何投资建议。市场有风险,投资需谨慎。