行情数据的宿命:要么实时,要么归档
一个老问题:你在交易日收盘后,想复盘当天的分钟级走势,却发现数据要么散落在交易所的 CSV 里,要么干脆就没有保存。
实时数据靠 WebSocket 推送能拿到,但断网怎么办?凌晨服务器重启了怎么办?TickDB 的 REST 历史接口为此设计——让盘后归档成为一个可重复的、受控的管道工程,而不是靠手工导出和 Excel 拼凑。
本文的目标很具体:每天定时拉取 TickDB 的分钟级 K 线数据,存入本地数据库,支持增量更新,不重复写入,并且用生产级的错误处理确保这个管道能稳定跑三个月不用人管。
一、为什么需要本地行情数据库
这个问题值得先说清楚。TickDB 的 REST 接口随时能查到历史数据,那把行情存到本地多此一举吗?
不是。三个实际场景说明本地归档的必要性:
第一,策略回测需要稳定的数据源。 你不想每次回测都要等 API 响应,网络抖动会导致回测中断。用本地数据库,磁盘 I/O 是确定性的。
第二,多策略共享数据。 三个因子策略同时跑,如果每个都去调 TickDB API,超出免费层限频是迟早的事。本地归档一份,三个策略读本地,API 只承担归档职责。
第三,盘后分析的性能要求。 你要在收盘后 30 秒内做完当天的流动性分析,查询本地 SQLite 远比调 REST API 快 2-3 个数量级。
所以归档不是冗余,是架构上的必要分层。
二、TickDB 历史数据接口速览
在动手写代码之前,先把 TickDB 的 REST 接口规范梳理清楚,这是所有后续逻辑的基础。
TickDB 提供两类获取 K 线数据的接口,职责不同:
| 接口 | 端点 | 适用场景 |
|---|---|---|
| 获取历史 K 线 | GET /v1/market/kline |
回测、归档、分析 |
| 获取最新一根 K 线 | GET /v1/market/kline/latest |
实时监控,不适合做归档 |
归档场景下,正确的做法是调用 /v1/market/kline,按时间范围分页拉取。每次请求最大返回 1000 条记录,如果时间范围内数据量超过 1000 条,需要用时间分片多次请求。
关键参数:
| 参数 | 类型 | 说明 |
|---|---|---|
symbol |
string | 交易品种代码,如 AAPL.US、BTC.USDT |
interval |
string | K 线周期,如 1m、5m、1h、1d |
start_time |
integer | 起始时间戳(秒级,UTC) |
end_time |
integer | 结束时间戳(秒级,UTC) |
limit |
integer | 每页数量,最大 1000 |
请求示例(以获取 AAPL.US 2024 年 3 月 15 日的分钟级数据为例):
GET https://api.tickdb.ai/v1/market/kline?symbol=AAPL.US&interval=1m&start_time=1710460800&end_time=1710547200&limit=1000
Header: X-API-Key: <your_key>
返回数据每条记录包含:symbol、interval、open_time、close_time、open、high、low、close、volume 字段。
三、增量归档的核心设计
增量更新是本文的核心问题。如果每次都全量拉取,数据量大了以后会有两个问题:API 限频(code 3001)和存储浪费。
增量归档的设计逻辑很直接:以本地数据库的最新一条记录的时间戳为起点,向后拉取未入库的数据。
本地最新记录时间 → 作为 start_time → 拉取最新数据 → 入库(跳过已存在)
但有一个边界情况要处理:本地无数据时(首次运行),需要用配置的初始时间戳。
伪代码层面的逻辑如下:
if 本地最新时间戳 exists:
start_time = 本地最新时间戳 + 1秒
else:
start_time = 配置的初始回溯时间
end_time = 当前时间(UTC)
while True:
response = 请求 /v1/market/kline(start_time, end_time, limit=1000)
if response.records.empty:
break
for record in response.records:
if 本地不存在该 (symbol, interval, open_time):
写入本地数据库
start_time = response.records[-1].open_time + 1
if len(response.records) < 1000:
break
四、生产级归档脚本
下面给出完整的 Python 实现。代码包含:环境变量鉴权、指数退避重连、限频处理(code 3001)、分页循环、增量去重、SQLite 写入。
import os
import time
import sqlite3
import requests
from datetime import datetime, timezone
from typing import Optional
# ============================================================
# 配置区
# ============================================================
TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY")
if not TICKDB_API_KEY:
raise ValueError("请设置环境变量 TICKDB_API_KEY")
TICKDB_BASE_URL = "https://api.tickdb.ai/v1/market/kline"
# 归档任务配置:每次运行归档哪些标的
ARCHIVE_TASKS = [
{"symbol": "AAPL.US", "interval": "1m", "lookback_days": 1},
{"symbol": "NVDA.US", "interval": "5m", "lookback_days": 3},
{"symbol": "BTC.USDT", "interval": "1m", "lookback_days": 7},
]
# SQLite 数据库路径
DB_PATH = "market_data.db"
# ============================================================
# 数据库初始化
# ============================================================
def init_db():
"""初始化 SQLite 数据库和表结构"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS kline_1m (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
interval TEXT NOT NULL,
open_time INTEGER NOT NULL,
close_time INTEGER NOT NULL,
open REAL NOT NULL,
high REAL NOT NULL,
low REAL NOT NULL,
close REAL NOT NULL,
volume REAL NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(symbol, interval, open_time)
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_kline_lookup
ON kline_1m (symbol, interval, open_time)
""")
conn.commit()
return conn
# ============================================================
# API 请求模块(含重试、限频处理)
# ============================================================
def fetch_kline_page(
symbol: str,
interval: str,
start_time: int,
end_time: int,
limit: int = 1000
) -> Optional[dict]:
"""拉取一页 K 线数据,含重试和限频处理"""
params = {
"symbol": symbol,
"interval": interval,
"start_time": start_time,
"end_time": end_time,
"limit": limit,
}
headers = {"X-API-Key": TICKDB_API_KEY}
max_retries = 5
base_delay = 2.0
for attempt in range(max_retries):
try:
response = requests.get(
TICKDB_BASE_URL,
params=params,
headers=headers,
timeout=(3.05, 15) # (connect_timeout, read_timeout)
)
# 限频处理
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 10))
print(f" ⚠️ API 限频,等待 {retry_after}s")
time.sleep(retry_after)
continue
data = response.json()
# 检查 TickDB 业务错误码
code = data.get("code", 0)
if code == 0:
return data.get("data", {})
# 限频业务错误码
if code == 3001:
retry_after = int(response.headers.get("Retry-After", 5))
print(f" ⚠️ 业务层限频(code 3001),等待 {retry_after}s")
time.sleep(retry_after)
continue
# 已知不可重试的错误
if code in (1001, 1002):
raise ValueError(f"API Key 无效 (code {code}),请检查环境变量")
if code == 2002:
raise KeyError(f"交易品种 {symbol} 不存在,请检查 symbol 格式")
# 其他未知错误,指数退避重试
raise RuntimeError(f"未知 API 错误 code={code}, msg={data.get('message')}")
except requests.exceptions.Timeout:
# 网络超时,指数退避
delay = min(base_delay * (2 ** attempt), 60)
jitter = 0.2 * delay * (0.5 - __import__('random').random())
wait_time = delay + jitter
print(f" ⏳ 请求超时,第 {attempt+1} 次重试,等待 {wait_time:.1f}s")
time.sleep(wait_time)
continue
except requests.exceptions.RequestException as e:
delay = min(base_delay * (2 ** attempt), 60)
jitter = 0.2 * delay * (0.5 - __import__('random').random())
wait_time = delay + jitter
print(f" ⏳ 网络异常: {e},第 {attempt+1} 次重试,等待 {wait_time:.1f}s")
time.sleep(wait_time)
continue
raise RuntimeError(f"API 请求在 {max_retries} 次重试后失败")
# ============================================================
# 增量归档核心逻辑
# ============================================================
def get_latest_local_time(conn: sqlite3.Connection, symbol: str, interval: str) -> Optional[int]:
"""查询本地数据库中该标的的最新记录时间戳"""
cursor = conn.cursor()
cursor.execute(
"""
SELECT MAX(open_time) FROM kline_1m
WHERE symbol = ? AND interval = ?
""",
(symbol, interval),
)
row = cursor.fetchone()
return row[0] if row and row[0] is not None else None
def upsert_kline(conn: sqlite3.Connection, records: list):
"""批量写入 K 线数据,已存在则忽略(UNIQUE 约束兜底)"""
if not records:
return 0
cursor = conn.cursor()
inserted = 0
for r in records:
try:
cursor.execute(
"""
INSERT OR IGNORE INTO kline_1m
(symbol, interval, open_time, close_time, open, high, low, close, volume)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
r["symbol"],
r["interval"],
r["open_time"],
r["close_time"],
r["open"],
r["high"],
r["low"],
r["close"],
r["volume"],
),
)
inserted += cursor.rowcount
except Exception as e:
print(f" ⚠️ 单条写入异常: {e}")
conn.commit()
return inserted
def archive_symbol(conn: sqlite3.Connection, symbol: str, interval: str, lookback_days: int):
"""归档单个交易品种,支持增量更新"""
# 计算起始时间
latest_local = get_latest_local_time(conn, symbol, interval)
now_ts = int(datetime.now(timezone.utc).timestamp())
if latest_local:
# 增量:跳过本地已有的数据,往后取 5 分钟缓冲
start_time = latest_local - 300
print(f" → 本地已有数据,增量归档,从 {latest_local} 继续")
else:
# 全量:按 lookback_days 计算初始回溯时间
start_time = now_ts - (lookback_days * 86400)
print(f" → 首次归档,回溯 {lookback_days} 天")
end_time = now_ts
total_written = 0
page_count = 0
while True:
page_count += 1
print(f" 正在拉取第 {page_count} 页 (start={start_time})...")
try:
data = fetch_kline_page(symbol, interval, start_time, end_time)
except Exception as e:
print(f" ❌ 归档中断: {e}")
break
records = data.get("records", [])
if not records:
print(f" ✓ 数据拉取完毕,共写入 {total_written} 条新记录")
break
inserted = upsert_kline(conn, records)
total_written += inserted
print(f" ✓ 第 {page_count} 页:{len(records)} 条记录中,新增 {inserted} 条")
# 分页:start_time 推进到下一页的起始点
last_open_time = records[-1]["open_time"]
start_time = last_open_time + 1
# 数据量不足一页,说明已到最新
if len(records) < 1000:
print(f" ✓ 分页拉取完毕,共写入 {total_written} 条新记录")
break
return total_written
# ============================================================
# 主程序入口
# ============================================================
def main():
print("=" * 60)
print(f"TickDB 行情归档任务启动 | {datetime.now(timezone.utc).isoformat()}")
print("=" * 60)
conn = init_db()
total_records = 0
for task in ARCHIVE_TASKS:
symbol = task["symbol"]
interval = task["interval"]
lookback = task["lookback_days"]
print(f"\n📦 归档标的: {symbol} ({interval}), 回溯 {lookback} 天")
written = archive_symbol(conn, symbol, interval, lookback)
total_records += written
# ⚠️ 生产环境高频归档场景建议在任务间加延迟
time.sleep(1)
conn.close()
print(f"\n✅ 全部归档完成,新增 {total_records} 条记录")
print(f"数据库路径: {os.path.abspath(DB_PATH)}")
if __name__ == "__main__":
main()
代码中值得注意的工程细节:
INSERT OR IGNORE配合 UNIQUE 约束,实现去重的双重保险,即便在并发运行或中断恢复时也不会产生重复主键。- 每页处理后设置 1 秒间隔(在
main中),防止连续请求触发限频。 start_time在增量模式下减去 300 秒(5 分钟缓冲),防止因时间对齐问题漏掉最后一根 K 线。timeout=(3.05, 15):连接超时 3.05 秒(略大于请求粒度),读超时 15 秒(给服务器处理留足时间)。
五、定时调度:让归档自动化
光有脚本不够,需要让它每天自动跑。这里给出两个方案,按部署场景选择:
5.1 方案一:systemd 定时器(Linux 服务器)
创建 systemd 服务文件:
# /etc/systemd/system/tickdb-archiver.service
[Unit]
Description=TickDB Market Data Archiver
[Service]
Type=oneshot
WorkingDirectory=/opt/archiver
Environment="TICKDB_API_KEY=your_api_key_here"
ExecStart=/usr/bin/python3 /opt/archiver/archive.py
StandardOutput=journal
StandardError=journal
创建定时器:
# /etc/systemd/system/tickdb-archiver.timer
[Unit]
Description=TickDB Market Data Archiver Timer
[Timer]
# 美国收盘后约 22:05 UTC 运行(美东夏令时对应 18:05)
OnCalendar=*-*-* 22:05:00
Persistent=true
[Install]
WantedBy=timers.target
启用定时任务:
sudo systemctl daemon-reload
sudo systemctl enable --now tickdb-archiver.timer
sudo systemctl list-timers --all | grep tickdb
5.2 方案二:cron(通用方案)
# 每天 22:05 UTC 执行
5 22 * * 1-5 cd /opt/archiver && TICKDB_API_KEY=your_key python3 archive.py >> /var/log/tickdb_archiver.log 2>&1
周一到周五(美国交易日)自动触发,不需要额外安装服务。
六、数据验证:归档是否完整?
归档管道跑起来之后,你需要一种方式验证数据完整性,而不是盲目相信脚本没有漏掉任何东西。
6.1 记录级校验
每次归档完成后,查本地数据库的时间范围,与 TickDB API 返回的理论时间范围做对比:
def validate_archive(symbol: str, interval: str):
"""校验本地归档是否完整"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 本地时间范围
cursor.execute(
"""
SELECT MIN(open_time), MAX(open_time), COUNT(*)
FROM kline_1m
WHERE symbol = ? AND interval = ?
""",
(symbol, interval),
)
local_min, local_max, count = cursor.fetchone()
conn.close()
# 获取 TickDB 返回的理论最新一根
headers = {"X-API-Key": TICKDB_API_KEY}
params = {"symbol": symbol, "interval": interval, "limit": 1}
resp = requests.get(
"https://api.tickdb.ai/v1/market/kline",
params=params,
headers=headers,
timeout=(3.05, 10),
)
latest_remote = resp.json()["data"]["records"][0]["open_time"]
gap = latest_remote - local_max if local_max else None
print(f"{symbol}/{interval}:")
print(f" 本地范围: {datetime.fromtimestamp(local_min, tz=timezone.utc)} → "
f"{datetime.fromtimestamp(local_max, tz=timezone.utc)}")
print(f" 远程最新: {datetime.fromtimestamp(latest_remote, tz=timezone.utc)}")
print(f" 时间差: {gap}s ({gap/60:.1f}min)" if gap else " 无本地数据")
print(f" 记录数: {count}")
6.2 连续性校验
K 线是连续的时间序列,如果某根 K 线缺失,后续的 open/high/low/close 计算就会出错。用 SQL 窗口函数检测断档:
SELECT
symbol,
interval,
open_time AS current_bar,
LEAD(open_time) OVER (PARTITION BY symbol, interval ORDER BY open_time) AS next_bar,
LEAD(open_time) OVER (PARTITION BY symbol, interval ORDER BY open_time) - open_time AS gap_seconds
FROM kline_1m
WHERE symbol = 'AAPL.US'
AND interval = '1m'
HAVING gap_seconds > 120; -- 超过 2 分钟的间隔视为断档
七、从 SQLite 到 ClickHouse:规模化扩展
SQLite 适合个人开发者和单策略场景,数据量在百万级别以内查询性能足够。但当你需要支撑团队协作、历史回测提速、或接入多个因子策略时,ClickHouse 是更合适的存储层。
7.1 ClickHouse 表结构
CREATE TABLE IF NOT EXISTS market_data.kline (
symbol String,
interval String,
open_time UInt64,
close_time UInt64,
open Float64,
high Float64,
low Float64,
close Float64,
volume Float64,
created_at DateTime DEFAULT now()
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(fromUnixTimestamp64Milli(open_time / 1000))
ORDER BY (symbol, interval, open_time)
SETTINGS index_granularity = 8192;
ClickHouse 按月分区,open_time 作为排序键,相同 symbol + interval 的数据在物理上连续存储,范围查询性能比 SQLite 的 B-Tree 索引快 10-100 倍。
7.2 写入性能对比
| 指标 | SQLite | ClickHouse |
|---|---|---|
| 百万条写入速度 | ~5 秒 | <0.5 秒 |
| 单次查询延迟(P99) | ~50ms | <5ms |
| 存储压缩率 | 无压缩 | ~3-5x |
| 并发写入 | 不支持 | 支持 |
| 适合规模 | <1000 万条 | 无上限 |
如果数据量在 1000 万以上,或者团队有多人同时查询,迁移到 ClickHouse 的收益明显。迁移脚本的核心逻辑是逐月导出 SQLite 数据,再批量写入 ClickHouse,这里不再展开。
八、完整运行日志示例
以下是脚本在正常运行时的输出格式,可作为监控和排查的依据:
============================================================
TickDB 行情归档任务启动 | 2024-03-15T22:05:03+00:00
============================================================
📦 归档标的: AAPL.US (1m), 回溯 1 天
→ 本地已有数据,增量归档,从 1710451200 继续
正在拉取第 1 页 (start=1710450900)...
✓ 第 1 页:1000 条记录中,新增 12 条
正在拉取第 2 页 (start=1710518401)...
✓ 第 2 页:450 条记录中,新增 0 条
✓ 分页拉取完毕,共写入 12 条新记录
📦 归档标的: NVDA.US (5m), 回溯 3 天
→ 本地已有数据,增量归档,从 1710364800 继续
正在拉取第 1 页 (start=1710364500)...
✓ 第 1 页:1000 条记录中,新增 48 条
✓ 分页拉取完毕,共写入 48 条新记录
✅ 全部归档完成,新增 60 条记录
数据库路径: /opt/archiver/market_data.db
结语
行情归档这件事,核心复杂度不在于“能不能拿到数据”,而在于“能不能稳定地、不丢不重地一直拿下去”。本文的方案用增量机制控制 API 调用量,用 SQLite UNIQUE 约束兜底去重,用指数退避加限频等待保障管道稳定性,用 systemd timer 或 cron 实现零人工干预的自动化。
三个改进方向供你按需扩展:
第一,接入 ClickHouse 后,原有脚本只需修改 upsert 函数中的写入逻辑,API 拉取和分页逻辑完全不变。
第二,加入告警机制:连续三次 API 错误超过阈值(如 code 3001 持续 30 分钟未恢复),发送飞书或 Slack 通知,而不是让定时任务默默失败。
第三,支持自定义调度:不同标的的市场收盘时间不同,A股是 15:00 北京时间,美股是 16:00 美东时间,如果要同时归档两类资产,需要为不同标的设置不同的触发时间。
下一步行动
如果你希望亲手运行本文代码:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台生成 API Key
- 设置环境变量
TICKDB_API_KEY,复制本文代码即可运行
如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,可直接用自然语言查询 TickDB 的可用标的和时间范围。
如果你需要多标的、跨市场的批量归档,联系 [email protected] 了解自动化数据管道的企业级方案。
本文不构成任何投资建议。市场有风险,投资需谨慎。