当日行情自动归档:用 TickDB 历史接口构建本地行情数据库
"数据的价值在于被使用,而使用的前提是被保存。"
凌晨 1 点,你从睡梦中被手机震醒——不是因为策略亏损,而是脚本报错了。日志显示凌晨 0:15 分,某接口返回空数据,但你没有做空值容错,程序直接崩溃。第二天醒来,你发现当天 23:00-00:15 的夜盘数据全部丢失。
这不是段子。这是每一个尝试构建本地行情数据库的量化开发者,都可能遇到的实际问题。
行情归档看似简单——拉数据、存进去、明天再来。但当你真正开始做,会发现三个隐藏的工程陷阱:边界条件处理(收盘后数据何时可用)、增量更新机制(避免重复拉取)、去重与幂等性(确保数据唯一性)。本文用 Python + SQLite 演示一套生产级的行情归档方案,覆盖从接口调用到本地入库的完整闭环。
为什么需要本地行情数据库
你可能会问:TickDB 本身不是提供实时和历史数据吗?为什么还要存到本地?
这是一个合理的质疑。答案是三个场景:
场景一:降低 API 调用成本。 如果你每天运行 20 次策略,每次策略需要过去 5 天的分钟级数据,每次调用都从 TickDB 拉取,则每天产生 100 次 API 请求。一个月下来是 3000 次。但如果你把数据本地缓存,策略只需查本地数据库,API 调用降至每月 30 次(每天一次归档)。
场景二:加速策略回测。 本地 SQLite 查询延迟在亚毫秒级,而网络 API 即使有 CDN 优化也至少几十毫秒。一次回测跑 1000 根 K 线,本地查询比远程 API 节省数秒到数十秒。
场景三:离线分析能力。 当你需要在地铁里、飞机上复盘策略,或进行跨品种相关性分析时,本地数据库让你不依赖网络。
这不是"重复造轮子"。这是以本地缓存换速度,以磁盘空间换 API 额度的经典工程权衡。
架构设计:三步走的归档流水线
行情归档的本质是一个 ETL 管道:Extract(提取)→ Transform(转换)→ Load(加载)。但与通用 ETL 不同,金融行情数据有独特的约束:
| 约束 | 说明 | 设计应对 |
|---|---|---|
| 数据不可变性 | K 线一旦生成,数值不会改变 | 直接 Upsert,覆盖而非追加 |
| 时间单调性 | K 线时间戳严格递增 | 依赖时间戳做增量边界 |
| 接口限频 | TickDB 有 rate limit | 两次请求间隔 ≥0.2 秒,加 Retry-After 处理 |
| 数据延迟 | 收盘后 K 线需等待撮合清算 | 归档任务在次日 0:30 执行,给足延迟窗口 |
基于以上约束,我们设计如下流水线:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 定时触发 │ ──▶ │ 获取归档点 │ ──▶ │ 分页拉取 │ ──▶ │ 批量入库 │
│ (次日0:30) │ │ (上次最新时间)│ │ (每次500条) │ │ (Upsert) │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│ │ │ │
▼ ▼ ▼ ▼
Cron/DAG SQLite元数据表 限频控制+重试 唯一约束去重
核心设计原则:
- 增量拉取:只拉取"上次归档时间"到"当前时间"之间的数据,避免全量重复
- Upsert 而非 Insert:用
INSERT OR REPLACE确保同一条 K 线只保留最新值 - 分页拉取:TickDB 单次返回上限 500 条,分页避免超限
- 幂等执行:无论脚本运行多少次,结果一致
生产级代码:完整实现
以下代码可直接运行,适配 Python 3.9+,依赖仅 requests 和 sqlite3(标准库)。
1. 配置文件与常量定义
import os
import sqlite3
import time
import logging
from datetime import datetime, timedelta
from typing import Optional
import requests
# ==================== 配置区 ====================
TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY")
if not TICKDB_API_KEY:
raise ValueError("请设置环境变量 TICKDB_API_KEY")
# TickDB API 配置
BASE_URL = "https://api.tickdb.ai/v1"
HEADERS = {"X-API-Key": TICKDB_API_KEY}
# 本地数据库路径
DB_PATH = "market_data.db"
# 归档标的列表(可扩展)
SYMBOLS = [
"BTC.USDT", # 数字货币,7x24
"AAPL.US", # 美股,需注意美股只在交易日有数据
]
# 每页拉取数量(TickDB 上限 500)
PAGE_SIZE = 500
# 请求间隔(秒),留足限频余量
REQUEST_INTERVAL = 0.25
# 日志配置
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("archiver.log", encoding="utf-8"),
],
)
logger = logging.getLogger(__name__)
2. 数据库初始化
def init_database(db_path: str) -> None:
"""初始化 SQLite 数据库,创建行情表和元数据表"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 行情主表:存储分钟级 K 线数据
cursor.execute("""
CREATE TABLE IF NOT EXISTS kline_1m (
symbol TEXT NOT NULL,
timestamp INTEGER NOT NULL, -- 毫秒级时间戳
open REAL NOT NULL,
high REAL NOT NULL,
low REAL NOT NULL,
close REAL NOT NULL,
volume REAL NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (symbol, timestamp)
)
""")
# 索引:加速时间范围查询
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_kline_timestamp
ON kline_1m (symbol, timestamp)
""")
# 元数据表:记录每个标的的归档进度
cursor.execute("""
CREATE TABLE IF NOT EXISTS archive_meta (
symbol TEXT PRIMARY KEY,
last_archived_ts INTEGER, -- 上次归档的最新时间戳
last_archived_at TEXT,
total_records INTEGER DEFAULT 0
)
""")
conn.commit()
conn.close()
logger.info(f"数据库初始化完成: {db_path}")
3. 核心 API 调用函数(含错误处理)
def fetch_kline_page(
symbol: str,
interval: str = "1m",
start_time: Optional[int] = None,
end_time: Optional[int] = None,
limit: int = PAGE_SIZE,
timeout: tuple = (3.05, 10) # 连接超时 3.05s,读取超时 10s
) -> dict:
"""
调用 TickDB /v1/market/kline 接口获取 K 线数据
Args:
symbol: 交易品种,如 "BTC.USDT"
interval: K 线周期,"1m", "5m", "1h" 等
start_time: 开始时间(毫秒时间戳),None 表示从头开始
end_time: 结束时间(毫秒时间戳),None 表示截至最新
limit: 每页条数,最大 500
timeout: 请求超时配置
Returns:
API 响应字典
"""
params = {
"symbol": symbol,
"interval": interval,
"limit": limit,
}
if start_time is not None:
params["start"] = start_time
if end_time is not None:
params["end"] = end_time
try:
response = requests.get(
f"{BASE_URL}/market/kline",
headers=HEADERS,
params=params,
timeout=timeout
)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
logger.error(f"请求超时: {symbol} [{start_time} - {end_time}]")
raise
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP 错误: {e.response.status_code} {e.response.text}")
raise
def handle_api_error(response: dict, retry_count: int = 0) -> Optional[dict]:
"""
处理 TickDB API 错误码
错误码参考:
- 0: 成功
- 1001/1002: API Key 无效
- 2002: 品种不存在
- 3001: 请求频率超限,需要等待 Retry-After
"""
code = response.get("code", 0)
if code == 0:
return response.get("data")
if code in (1001, 1002):
raise ValueError(f"API Key 无效,请检查环境变量 TICKDB_API_KEY")
if code == 2002:
raise KeyError(f"交易品种不存在: {response.get('symbol')}")
if code == 3001:
# 限频错误:读取 Retry-After 头等待
retry_after = int(response.headers.get("Retry-After", 5))
logger.warning(f"触发限频 (code:3001),等待 {retry_after} 秒后重试")
time.sleep(retry_after)
return None # 返回 None 表示需要重试
# 其他未知错误
raise RuntimeError(f"未知错误 code:{code} msg:{response.get('message')}")
4. 增量归档核心逻辑
def get_last_archived_timestamp(db_path: str, symbol: str) -> Optional[int]:
"""从元数据表获取某标的上次归档的最新时间戳"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute(
"SELECT last_archived_ts FROM archive_meta WHERE symbol = ?",
(symbol,)
)
row = cursor.fetchone()
conn.close()
if row is None:
# 首次归档,返回 90 天前的毫秒时间戳(覆盖足够的历史窗口)
ninety_days_ago = int((datetime.now() - timedelta(days=90)).timestamp() * 1000)
return ninety_days_ago
return row[0]
def archive_symbol(db_path: str, symbol: str, interval: str = "1m") -> int:
"""
归档单个标的的 K 线数据(增量模式)
流程:
1. 获取上次归档的最新时间戳作为起始点
2. 分页拉取数据,直到返回空数据
3. 批量 Upsert 到 SQLite
4. 更新元数据表
Returns:
本次归档的记录数
"""
start_ts = get_last_archived_timestamp(db_path, symbol)
end_ts = int(datetime.now().timestamp() * 1000)
logger.info(f"开始归档 {symbol}: [{start_ts}] -> [{end_ts}]")
all_records = []
current_start = start_ts
page_count = 0
max_pages = 1000 # 防止无限循环
while page_count < max_pages:
# 拉取当前页
try:
resp = fetch_kline_page(
symbol=symbol,
interval=interval,
start_time=current_start,
end_time=end_ts,
limit=PAGE_SIZE
)
# 处理错误(限频等)
if resp is None:
continue # 重试当前页
data = handle_api_error(resp)
if not data:
break # 无数据,退出循环
# data 是 K 线数组,格式:[timestamp, open, high, low, close, volume]
if not isinstance(data, list) or len(data) == 0:
break
all_records.extend(data)
page_count += 1
# 更新起始点:取本页最后一条的时间戳 + 1ms(避免重复)
current_start = data[-1][0] + 1
logger.debug(f" 第 {page_count} 页: {len(data)} 条, 累计 {len(all_records)} 条")
# 限频控制
time.sleep(REQUEST_INTERVAL)
# 如果返回不足一页,说明已到最新
if len(data) < PAGE_SIZE:
break
except Exception as e:
logger.error(f"拉取失败: {symbol} page={page_count} error={e}")
raise
if not all_records:
logger.info(f"{symbol} 无新数据需要归档")
return 0
# 批量 Upsert 到 SQLite
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
upsert_sql = """
INSERT OR REPLACE INTO kline_1m (symbol, timestamp, open, high, low, close, volume)
VALUES (?, ?, ?, ?, ?, ?, ?)
"""
records_to_insert = [(symbol, r[0], r[1], r[2], r[3], r[4], r[5]) for r in all_records]
cursor.executemany(upsert_sql, records_to_insert)
# 更新元数据
latest_ts = all_records[-1][0]
cursor.execute("""
INSERT OR REPLACE INTO archive_meta (symbol, last_archived_ts, last_archived_at, total_records)
VALUES (?, ?, datetime('now'), (
SELECT COALESCE(MAX(total_records), 0) + ?
FROM archive_meta WHERE symbol = ?
))
""", (symbol, latest_ts, len(records_to_insert), symbol))
conn.commit()
conn.close()
logger.info(f"归档完成 {symbol}: 本次新增 {len(records_to_insert)} 条,最新时间戳 {latest_ts}")
return len(records_to_insert)
def archive_all(db_path: str, symbols: list, interval: str = "1m") -> dict:
"""归档所有标的,返回各标的归档数量"""
results = {}
for symbol in symbols:
try:
count = archive_symbol(db_path, symbol, interval)
results[symbol] = {"status": "success", "count": count}
except Exception as e:
logger.error(f"归档 {symbol} 失败: {e}")
results[symbol] = {"status": "error", "error": str(e)}
# 标的之间也做限频控制
time.sleep(REQUEST_INTERVAL)
return results
5. 主程序入口
def main():
"""每日归档主入口"""
logger.info("=" * 50)
logger.info(f"行情归档任务启动: {datetime.now().isoformat()}")
# 初始化数据库
init_database(DB_PATH)
# 执行归档
results = archive_all(DB_PATH, SYMBOLS, interval="1m")
# 输出汇总
logger.info("归档结果汇总:")
total = 0
for symbol, result in results.items():
if result["status"] == "success":
logger.info(f" {symbol}: +{result['count']} 条")
total += result["count"]
else:
logger.error(f" {symbol}: {result['error']}")
logger.info(f"本次归档总计: {total} 条")
logger.info("=" * 50)
if __name__ == "__main__":
main()
增量更新的关键设计
上述代码的核心在于增量边界的确定。让我展开解释几个关键细节。
边界时间戳的选择
# 增量拉取的起始点
start_ts = get_last_archived_timestamp(db_path, symbol)
首次运行时,archive_meta 表无记录,函数返回 90 天前的时间戳。这确保:
- 新用户能快速补齐近期历史数据(而非从零开始)
- 90 天足够覆盖大多数策略的回测窗口
- 不必拉取过久远的数据,节省 API 额度
后续运行时,返回上次归档的 last_archived_ts,确保每次只拉取新增数据。
去重机制:Upsert 的幂等性
cursor.execute("""
INSERT OR REPLACE INTO kline_1m (symbol, timestamp, open, high, low, close, volume)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", ...)
INSERT OR REPLACE 是 SQLite 的 Upsert 语法。当 PRIMARY KEY (symbol, timestamp) 冲突时,会用新值替换旧值。这意味着:
- 同一时间戳的 K 线,无论脚本运行多少次,结果一致
- 如果 TickDB 的历史数据发生修正(如交易所调整),本地数据会自动同步
⚠️ 工程预警:TickDB 的 /v1/market/kline 接口返回的是已结束周期的 K 线。以 1m 周期为例,12:01:00 的 K 线在 12:02:00 之后才会生成。因此,如果你设置 end_time = 当前时间,可能会丢失最后一根正在生成的 K 线。建议归档任务在次日执行,给足数据生成窗口。
分页拉取的终止条件
if len(data) < PAGE_SIZE:
break
当返回的条数小于单页上限时,说明已拉到最新数据。这是标准的光标分页模式,无需传递 page 参数。
但这里有一个边界情况:如果恰好在归档运行时,新的 K 线数据正在生成,len(data) == PAGE_SIZE 会导致继续拉取下一页,而下一页返回空数组,while 循环退出。这是一种最终一致性的设计:允许短暂的数据不完整,但确保下次归档时会补充完整。
从 SQLite 到 ClickHouse:规模化扩展
SQLite 适合个人开发者和小规模数据。当你需要存储多个标的 × 多年历史 × 多周期时,SQLite 的单文件写入锁会成为瓶颈。以下是迁移到 ClickHouse 的核心改动点:
| 维度 | SQLite | ClickHouse |
|---|---|---|
| 表引擎 | MergeTree (自动) | MergeTree (手动指定) |
| 批量写入 | executemany |
INSERT INTO ... VALUES 批量 |
| 去重 | PRIMARY KEY + INSERT OR REPLACE |
ALTER TABLE ... DELETE WHERE + 物化视图 |
| 查询性能 | 适合 <100 万行 | 适合 >10 亿行 |
| 部署复杂度 | 单文件 | 需要 ClickHouse 集群 |
ClickHouse 版本的核心 SQL:
-- 创建分布式表(多节点场景)
CREATE TABLE IF NOT EXISTS kline_1m
(
symbol String,
timestamp UInt64,
open Float64,
high Float64,
low Float64,
close Float64,
volume Float64
)
ENGINE = MergeTree()
ORDER BY (symbol, timestamp)
SETTINGS index_granularity = 8192;
-- 批量写入(Python 端构造 VALUES 字符串)
INSERT INTO kline_1m (symbol, timestamp, open, high, low, close, volume) VALUES
('BTC.USDT', 1704067200000, 42000.5, 42100.0, 41950.0, 42050.0, 1250.5),
('BTC.USDT', 1704067260000, 42050.0, 42200.0, 42030.0, 42180.0, 1380.2);
⚠️ 生产提示:ClickHouse 的写入是异步合并的,新写入的数据需要数秒到数十秒才能被查询到。如果你需要严格的一致性读取,考虑使用 ReplacingMergeTree 引擎并在查询时加 FINAL 关键字。
部署方案:定时任务配置
Linux Crontab
# 每天凌晨 0:30 执行归档脚本
30 0 * * * cd /opt/archiver && /usr/bin/python3 main.py >> /var/log/archiver.log 2>&1
Docker 容器化
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir requests
COPY . .
CMD ["python", "main.py"]
# docker-compose.yml
version: "3.8"
services:
archiver:
build: .
container_name: tickdb-archiver
environment:
- TICKDB_API_KEY=${TICKDB_API_KEY}
volumes:
- ./data:/app/data
- ./logs:/app/logs
restart: unless-stopped
cron:
schedule: "30 0 * * *"
# 注:docker-compose 原生不支持 cron,需使用外部调度器或使用包含 cron 的镜像
生产级调度建议
对于企业级场景,推荐使用:
- Airflow:支持任务依赖、可视化 DAG、失败告警
- Dagster:现代 Python-first 调度框架,原生支持 dbt 和数据资产
- Prefect:云原生,自动化重试和状态管理
数据验证:归档完整性检查
归档完成后,建议增加数据完整性校验:
def validate_archived_data(db_path: str, symbol: str, date: str) -> dict:
"""
校验指定日期的数据完整性
Returns:
校验结果字典
"""
conn = sqlite3.connect(db_path)
# 统计该日期的记录数
cursor = conn.cursor()
cursor.execute("""
SELECT COUNT(*), MIN(timestamp), MAX(timestamp)
FROM kline_1m
WHERE symbol = ?
AND timestamp >= ?
AND timestamp < ?
""", (
symbol,
int(datetime.strptime(date, "%Y-%m-%d").timestamp() * 1000),
int((datetime.strptime(date, "%Y-%m-%d") + timedelta(days=1)).timestamp() * 1000),
))
row = cursor.fetchone()
conn.close()
count, min_ts, max_ts = row
# 美股交易日约 390 分钟(9:30 - 16:00)
# 数字货币 7x24 约 1440 分钟
expected_minutes = 1440 if "USDT" in symbol else 390
return {
"symbol": symbol,
"date": date,
"actual_records": count,
"expected_minutes": expected_minutes,
"completeness": f"{count / expected_minutes * 100:.1f}%",
"time_range": f"{datetime.fromtimestamp(min_ts/1000) if min_ts else None} - {datetime.fromtimestamp(max_ts/1000) if max_ts else None}",
}
⚠️ 注意:数字货币 7×24 不代表每天恰好 1440 条记录。网络中断、交易所维护等情况也会导致数据缺失。建议设定可接受的完整性阈值(如 >99.5%),低于阈值时触发告警。
结语
行情归档是一个看似简单、实则充满边界细节的工程问题。本文覆盖了:
- 如何用 TickDB
/v1/market/kline接口增量拉取数据 - 如何用 SQLite 做幂等性存储和增量边界管理
- 如何处理限频(3001 错误码)和超时
- 如何扩展到 ClickHouse 以支撑规模化场景
- 如何做数据完整性校验
这套方案的核心是增量边界的设计哲学:记住上次跑到哪里,下次从那里继续。这不仅是归档的哲学,也是大多数批处理 ETL 的通用范式。
下一步行动
如果你想亲手运行本文代码:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台生成 API Key
- 设置环境变量
TICKDB_API_KEY,复制本文代码即可运行
如果你需要管理多个标的、多周期、多市场的行情数据,考虑将 SQLite 迁移到 ClickHouse 或 TimescaleDB,并使用 Airflow 做任务编排。联系我获取 TickDB 企业版方案。
如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,调用归档逻辑时直接说"帮我归档最近 30 天的 AAPL 1 分钟数据"。
本文不构成任何投资建议。市场有风险,投资需谨慎。