"你的策略回测跑过了 2020 年 3 月的流动性黑洞吗?"
如果你做过实盘,就知道这道题有多残酷——当时纳斯达克 10 天内触发 4 次熔断,无数量化策略在 Yahoo Finance 的"干净"数据里回测出漂亮的曲线,一上实盘就被打得找不着北。
问题不在策略本身,而在数据源。多数免费数据有这三个致命伤:隔夜跳空填充假数据、分钟级数据缺失或不同步、历史深度不够。TickDB 提供的分钟级历史 K 线数据覆盖 A 股、港股、数字货币等多个市场,10 年级别、清洗对齐,可以从根本上解决这个痛点。
但光有数据源不够。你需要一个每日自动归档流程:盘后自动拉取当日分钟级数据,写入本地数据库,支持后续回测复用。这篇文章给出完整的生产级实现,包括 REST API 拉取模块、本地数据库设计、增量更新逻辑,以及去重方案。
一、为什么"每日归档"比实时流更划算
实时 WebSocket 流适合监控场景,但做回测分析时,每天收盘后批量拉取历史 K 线反而是更合理的选择,理由有三:
第一,带宽成本低。 盘后归档是一次性拉取固定时间范围的数据,不依赖长连接,不存在断线重连的运维开销。
第二,数据完整性高。 分钟级 K 线由 TickDB 服务端在收盘后合成,相比实时推送的逐根累积,缺失值的概率更低。
第三,回测窗口灵活。 归档完成后,本地数据库就是你自己的行情源,可以无限次查询而不产生 API 调用成本,适合因子研究阶段的反复迭代。
核心逻辑很清晰:每日定时触发 → REST API 拉取当日 K 线 → 入库 SQLite/ClickHouse → 增量去重。下面逐一展开。
二、系统架构总览
┌─────────────────────────────────────────────────────┐
│ 调度层(Scheduler) │
│ 每日 16:05(A 股)/ 01:05(数字货币)触发归档任务 │
└──────────────────────┬──────────────────────────────┘
│ 触发
▼
┌─────────────────────────────────────────────────────┐
│ TickDB API Client │
│ REST 调用 /v1/market/kline │
│ 处理限频(3001 + Retry-After) │
│ 处理时间边界:当日开盘~收盘 │
└──────────────────────┬──────────────────────────────┘
│ 原始 K 线列表
▼
┌─────────────────────────────────────────────────────┐
│ 数据持久化层 │
│ SQLite(个人/轻量)/ ClickHouse(团队/大规模) │
│ UPSERT 语义:ON CONFLICT 去重 │
└──────────────────────┬──────────────────────────────┘
│ 归档结果
▼
┌─────────────────────────────────────────────────────┐
│ 调度层(Scheduler) │
│ 记录本次归档时间戳下次增量起始点 │
└─────────────────────────────────────────────────────┘
三层职责分离的好处是:换数据源只需改 API Client,换存储引擎只需改持久化层,调度逻辑保持不动。
三、生产级 TickDB API 调用模块
3.1 接口选型与参数设计
TickDB 提供两个 K 线相关接口:
GET /v1/market/kline:获取已结束周期的历史 K 线,用于归档GET /v1/market/kline/latest:获取当前实时 K 线,不适合归档场景
归档应使用 /v1/market/kline,核心参数如下:
| 参数 | 类型 | 说明 | 示例 |
|---|---|---|---|
symbol |
string | 交易品种代码 | AAPL.US、BTC.USDT |
interval |
string | K 线周期 | 1m、5m、1h、1d |
start_time |
int | 起始时间戳(秒) | 1743782400 |
end_time |
int | 结束时间戳(秒) | 1743868800 |
limit |
int | 最大条数(上限 1000) | 1000 |
⚠️ 注意:分钟级数据单次最多返回 1000 条。对于高频场景(如 1 分钟 A 股全天 240 根),需要分页拉取,每次请求用
end_time控制时间窗口。
3.2 API Client 完整实现
以下代码覆盖了生产级的所有关键要素:环境变量鉴权、限频退避、指数退避重连、抖动、超时设置、分页拉取。
import os
import time
import logging
import random
from datetime import datetime, timezone
from typing import Optional
import requests
logger = logging.getLogger(__name__)
class TickDBKlineClient:
"""TickDB 历史 K 线拉取客户端(生产级)"""
BASE_URL = "https://api.tickdb.ai/v1/market/kline"
MAX_RETRY = 5
BASE_DELAY = 1.0
MAX_DELAY = 60.0
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
if not self.api_key:
raise ValueError("API Key 未设置。请设置环境变量 TICKDB_API_KEY")
self.headers = {"X-API-Key": self.api_key}
def _request(self, params: dict, retry_count: int = 0) -> dict:
"""单次请求封装,含超时和限频处理"""
response = requests.get(
self.BASE_URL,
headers=self.headers,
params=params,
timeout=(3.05, 10) # (connect_timeout, read_timeout)
)
# 限频处理:TickDB 返回 3001 时附带 Retry-After 头
if response.status_code == 429 or (
response.status_code == 200
and response.json().get("code") == 3001
):
retry_after = int(
response.headers.get("Retry-After", 5)
)
logger.warning(
f"触发限频,等待 {retry_after}s 后重试(第 {retry_count + 1} 次)"
)
time.sleep(retry_after)
return self._request(params, retry_count)
response.raise_for_status()
data = response.json()
if data.get("code") == 0:
return data.get("data", [])
# 非限频错误:指数退避重连
if retry_count < self.MAX_RETRY:
delay = min(self.BASE_DELAY * (2 ** retry_count), self.MAX_DELAY)
# 添加抖动,避免多实例同时重连的惊群效应
jitter = random.uniform(0, delay * 0.1)
logger.warning(
f"请求失败(code={data.get('code')}),{delay + jitter:.1f}s 后重试"
)
time.sleep(delay + jitter)
return self._request(params, retry_count + 1)
raise RuntimeError(
f"K 线请求失败,已达最大重试次数: code={data.get('code')}, "
f"message={data.get('message')}"
)
def fetch_klines(
self,
symbol: str,
interval: str,
start_time: int,
end_time: int,
) -> list[dict]:
"""
分页拉取指定时间范围的 K 线数据。
Args:
symbol: 交易品种,如 'AAPL.US'
interval: K 线周期,如 '1m', '5m', '1h'
start_time: 起始时间戳(秒)
end_time: 结束时间戳(秒)
Returns:
所有 K 线组成的列表,按时间升序排列
"""
all_klines = []
cursor = start_time
while cursor < end_time:
# TickDB 单次最多返回 1000 条,分页拉取
params = {
"symbol": symbol,
"interval": interval,
"start_time": cursor,
"end_time": end_time,
"limit": 1000,
}
klines = self._request(params)
if not klines:
break
all_klines.extend(klines)
last_close_time = klines[-1]["close_time"]
# 边界保护:避免死循环
if last_close_time <= cursor:
logger.warning(
f"分页游标停滞(cursor={cursor}, "
f"last_close_time={last_close_time}),跳过"
)
break
cursor = last_close_time + 1
# ⚠️ 高频场景下建议使用 aiohttp 异步并发拉取多标的
# 当前串行实现适合每日归档的低频场景
logger.info(
f"[{symbol}] 拉取 {interval} K 线 "
f"{datetime.fromtimestamp(start_time, tz=timezone.utc)} ~ "
f"{datetime.fromtimestamp(end_time, tz=timezone.utc)},"
f"共 {len(all_klines)} 条"
)
return all_klines
3.3 时间窗口的精确控制
归档时间窗口的设置有讲究。不同市场的交易时段不同:
| 市场 | 代码示例 | 盘后归档时间(建议) | 时间窗口边界 |
|---|---|---|---|
| A 股 | 600519.SH |
15:05 触发 | 09:30–15:00 |
| 港股 | 0700.HK |
16:05 触发 | 09:30–16:00 |
| 美股 | AAPL.US |
次日 01:05 触发 | 09:30–16:00 EST |
| 数字货币 | BTC.USDT |
任意时刻(24h) | 00:00–24:00 UTC |
from datetime import datetime, timezone, timedelta
def get_trading_day_params(symbol: str) -> tuple[int, int]:
"""返回当日交易时段的时间戳边界"""
now = datetime.now(timezone.utc)
# 简化示例:以 UTC 00:00 为数字货币的"日"分界
# 实际项目中建议用 pandas_market_calendars 读取各市场日历
start_of_day = now.replace(hour=0, minute=0, second=0, microsecond=0)
end_of_day = start_of_day + timedelta(days=1)
# 美股(AAPL.US)转 EST 时区,交易日为前一日
if symbol.endswith(".US") and symbol not in ("BTC.USDT", "ETH.USDT"):
# 实际应调用 market_calendars,这里仅作示意
pass
return int(start_of_day.timestamp()), int(end_of_day.timestamp())
⚠️ 生产提示:时间窗口的精确计算强烈建议依赖
pandas_market_calendars或各交易所官方日历 API,避免节假日的空数据污染归档结果。A 股还要处理盘中休市(11:30–13:00)和熔断断点。
四、数据库设计:SQLite 与 ClickHouse 双轨方案
4.1 数据模型
无论用 SQLite 还是 ClickHouse,底层数据模型一致:
CREATE TABLE IF NOT EXISTS kline_archive (
symbol TEXT NOT NULL,
interval TEXT NOT NULL,
open_time INTEGER NOT NULL, -- 开盘时间戳(秒)
close_time INTEGER NOT NULL, -- 收盘时间戳(秒)
open_price REAL NOT NULL,
high_price REAL NOT NULL,
low_price REAL NOT NULL,
close_price REAL NOT NULL,
volume REAL NOT NULL,
updated_at INTEGER NOT NULL, -- 入库时间戳
PRIMARY KEY (symbol, interval, open_time)
) PARTITIONED BY (interval);
设计要点:
open_time作为主键的一部分,实现天然去重close_time单独存储,方便区间查询(不用基于 open_time 推算)updated_at记录入库时间,支持审计和增量判断- ClickHouse 使用
PARTITIONED BY按周期分区,查询效率远高于 SQLite 全表扫描
4.2 SQLite 方案
import sqlite3
from pathlib import Path
from contextlib import contextmanager
from typing import Generator
class SQLiteArchiver:
"""SQLite 本地行情归档器"""
def __init__(self, db_path: str = "kline_archive.db"):
self.db_path = db_path
self._init_schema()
@contextmanager
def _conn(self) -> Generator[sqlite3.Connection, None, None]:
conn = sqlite3.connect(self.db_path)
conn.execute("PRAGMA journal_mode=WAL") # 写入并发安全
try:
yield conn
finally:
conn.close()
def _init_schema(self):
with self._conn() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS kline_archive (
symbol TEXT NOT NULL,
interval TEXT NOT NULL,
open_time INTEGER NOT NULL,
close_time INTEGER NOT NULL,
open_price REAL NOT NULL,
high_price REAL NOT NULL,
low_price REAL NOT NULL,
close_price REAL NOT NULL,
volume REAL NOT NULL,
updated_at INTEGER NOT NULL,
PRIMARY KEY (symbol, interval, open_time)
)
""")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_symbol_interval "
"ON kline_archive (symbol, interval, open_time DESC)"
)
def upsert(self, klines: list[dict]):
"""批量 UPSERT,含去重逻辑"""
if not klines:
return
now = int(time.time())
rows = [
(
k["symbol"],
k["interval"],
k["open_time"],
k["close_time"],
k["open"],
k["high"],
k["low"],
k["close"],
k["volume"],
now,
)
for k in klines
]
with self._conn() as conn:
conn.executemany(
"""
INSERT INTO kline_archive
(symbol, interval, open_time, close_time,
open_price, high_price, low_price,
close_price, volume, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(symbol, interval, open_time) DO UPDATE SET
open_price = excluded.open_price,
high_price = excluded.high_price,
low_price = excluded.low_price,
close_price = excluded.close_price,
volume = excluded.volume,
updated_at = excluded.updated_at
""",
rows,
)
conn.commit()
logger.info(
f"归档完成:{len(rows)} 条写入/更新,"
f"数据库 {self.db_path}"
)
def query_range(
self, symbol: str, interval: str,
start_time: int, end_time: int
) -> list[dict]:
"""区间查询,用于回测"""
with self._conn() as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
"""
SELECT * FROM kline_archive
WHERE symbol = ? AND interval = ?
AND open_time >= ? AND open_time < ?
ORDER BY open_time ASC
""",
(symbol, interval, start_time, end_time),
).fetchall()
return [dict(r) for r in rows]
⚠️ 生产提示:SQLite 在多进程并发写入时存在锁竞争。如果调度系统从多台机器触发归档,应将
journal_mode设为WAL或切换至 ClickHouse。
4.3 ClickHouse 方案(团队级)
from clickhouse_driver import Client
class ClickHouseArchiver:
"""ClickHouse 团队行情归档器"""
def __init__(self, host: str, port: int,
database: str = "market_data"):
self.client = Client(host=host, port=port, database=database)
self._init_schema()
def _init_schema(self):
"""创建分区表,支持按 interval 快速过滤"""
self.client.execute("""
CREATE TABLE IF NOT EXISTS kline_archive (
symbol String,
interval String,
open_time UInt32,
close_time UInt32,
open_price Float64,
high_price Float64,
low_price Float64,
close_price Float64,
volume Float64,
updated_at UInt32
)
ENGINE = ReplacingMergeTree(updated_at)
ORDER BY (symbol, interval, open_time)
PARTITION BY interval
""")
def upsert(self, klines: list[dict]):
"""ClickHouse ReplacingMergeTree 自动去重"""
if not klines:
return
now = int(time.time())
rows = [
(
k["symbol"], k["interval"],
k["open_time"], k["close_time"],
k["open"], k["high"], k["low"], k["close"],
k["volume"], now,
)
for k in klines
]
self.client.execute(
"INSERT INTO kline_archive VALUES",
rows,
)
logger.info(
f"ClickHouse 归档完成:{len(rows)} 条,数据库 {self.client.database}"
)
关键差异:SQLite 用
ON CONFLICT实时去重,ClickHouse 用ReplacingMergeTree后台异步去重(建议定期执行OPTIMIZE TABLE kline_archive)。
五、增量更新逻辑
增量归档的核心原则是:只拉取本地数据库中尚未存储的时间段。
import sqlite3
from datetime import datetime, timezone
class IncrementalArchiver:
"""增量归档协调器"""
def __init__(self, client: TickDBKlineClient,
archiver: SQLiteArchiver,
state_db: str = "archive_state.db"):
self.client = client
self.archiver = archiver
self.state_db = state_db
self._init_state_table()
def _init_state_table(self):
"""维护每个标的的最后归档时间戳"""
conn = sqlite3.connect(self.state_db)
conn.execute("""
CREATE TABLE IF NOT EXISTS archive_progress (
symbol TEXT PRIMARY KEY,
interval TEXT NOT NULL,
last_end_time INTEGER NOT NULL
)
""")
conn.close()
def _get_last_end_time(self, symbol: str,
interval: str) -> int:
conn = sqlite3.connect(self.state_db)
row = conn.execute(
"SELECT last_end_time FROM archive_progress "
"WHERE symbol = ? AND interval = ?",
(symbol, interval),
).fetchone()
conn.close()
return row[0] if row else 0
def _save_last_end_time(self, symbol: str,
interval: str, end_time: int):
conn = sqlite3.connect(self.state_db)
conn.execute(
"""
INSERT INTO archive_progress (symbol, interval, last_end_time)
VALUES (?, ?, ?)
ON CONFLICT(symbol) DO UPDATE SET
last_end_time = excluded.last_end_time,
interval = excluded.interval
""",
(symbol, interval, end_time),
)
conn.commit()
conn.close()
def archive(self, symbol: str, interval: str,
start_time: int, end_time: int):
"""
增量归档主流程。
增量逻辑:
1. 读取本地存储的最新 close_time 作为本次起始点
2. 仅拉取增量区间的新 K 线
3. 写入本地数据库
4. 更新归档进度状态
"""
# 增量起始点:优先使用上次归档时间,若无则使用传入的 start_time
incremental_start = max(
self._get_last_end_time(symbol, interval),
start_time,
)
if incremental_start >= end_time:
logger.info(
f"[{symbol}] {interval} 已是最新,无需归档 "
f"(最新时间:{datetime.fromtimestamp(incremental_start, tz=timezone.utc)})"
)
return
logger.info(
f"[{symbol}] 增量归档:"
f"{datetime.fromtimestamp(incremental_start, tz=timezone.utc)} ~ "
f"{datetime.fromtimestamp(end_time, tz=timezone.utc)}"
)
# 拉取增量数据
klines = self.client.fetch_klines(
symbol=symbol,
interval=interval,
start_time=incremental_start,
end_time=end_time,
)
if not klines:
logger.warning(f"[{symbol}] 拉取结果为空,请检查时间窗口")
return
# 入库(含去重)
self.archiver.upsert(klines)
# 更新归档状态
last_close_time = klines[-1]["close_time"]
self._save_last_end_time(symbol, interval, last_close_time)
logger.info(
f"[{symbol}] 归档完成:{len(klines)} 条,"
f"下次起始点 {datetime.fromtimestamp(last_close_time, tz=timezone.utc)}"
)
增量关键点:
_get_last_end_time读取的是close_time而非open_time。因为 K 线是左闭右开区间[open_time, close_time),下一根 K 线的起始点应从最后一根的close_time + 1开始拉取,防止边界重复。
六、调度与自动化
6.1 APScheduler 定时归档
from apscheduler.schedulers.blocking import BlockingScheduler
def daily_archive_job():
"""
每日盘后归档任务(示例配置)。
实际部署时建议根据各市场的交易时段分别配置调度。
"""
targets = [
("AAPL.US", "1m"),
("BTC.USDT", "1m"),
("600519.SH", "5m"),
]
now = int(datetime.now(timezone.utc).timestamp())
client = TickDBKlineClient()
archiver = SQLiteArchiver()
orchestrator = IncrementalArchiver(client, archiver)
for symbol, interval in targets:
try:
start_of_day = now - 86400 # 简化:拉取 24h 窗口
orchestrator.archive(
symbol=symbol,
interval=interval,
start_time=start_of_day,
end_time=now,
)
except Exception as e:
logger.error(f"[{symbol}] 归档异常:{e}", exc_info=True)
scheduler = BlockingScheduler()
# 美股盘后:周一至周五 01:05 UTC
scheduler.add_job(
daily_archive_job,
"cron",
day_of_week="mon-fri",
hour=1,
minute=5,
id="daily_archive",
)
# ⚠️ 建议在容器/Kubernetes 中运行,并配置健康检查探针
# 避免调度进程挂掉导致归档漏跑
scheduler.start()
6.2 容器化部署建议
| 组件 | 建议配置 |
|---|---|
| 调度进程 | 使用 Supervisor 管理,配置 autorestart=true |
| 数据持久化 | SQLite 建议挂载 NFS 或 hostPath,避免容器重建丢失 |
| 环境变量 | 通过 Kubernetes Secret 或 Docker Secret 注入 TICKDB_API_KEY |
| 监控 | 导出 Prometheus 指标(归档条数、耗时、失败率),接入飞书/Slack 告警 |
| 日志 | 结构化日志(JSON 格式),方便日志收集系统解析 |
七、完整运行示例
"""
TickDB 每日行情归档完整示例
运行前提:
1. 设置环境变量 TICKDB_API_KEY
2. pip install requests apscheduler
运行方式:
python archive_runner.py
"""
import os
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
def main():
if not os.environ.get("TICKDB_API_KEY"):
raise EnvironmentError(
"请设置 TICKDB_API_KEY 环境变量后重试"
)
# 初始化各层组件
api_client = TickDBKlineClient()
archiver = SQLiteArchiver("kline_archive.db")
orchestrator = IncrementalArchiver(api_client, archiver)
# 定义归档标的(A 股建议用 5m,数字货币建议用 1m)
symbols = [
("AAPL.US", "1m"),
("BTC.USDT", "1m"),
("600519.SH", "5m"),
]
now_ts = int(datetime.now(timezone.utc).timestamp())
day_start = now_ts - 86400 # 拉取过去 24h 窗口
for symbol, interval in symbols:
try:
orchestrator.archive(
symbol=symbol,
interval=interval,
start_time=day_start,
end_time=now_ts,
)
except Exception as e:
logging.error(
f"[{symbol}] 归档失败: {e}", exc_info=True
)
# 验证入库结果
sample = archiver.query_range(
symbol="AAPL.US",
interval="1m",
start_time=day_start,
end_time=now_ts,
)
logging.info(f"验证:AAPL.US 1m 共查询到 {len(sample)} 条")
if __name__ == "__main__":
main()
运行后在 kline_archive.db 中即可查询本地归档数据:
# 示例:查询过去 7 日 AAPL.US 1 分钟数据用于回测
import sqlite3
conn = sqlite3.connect("kline_archive.db")
conn.row_factory = sqlite3.Row
rows = conn.execute(
"""
SELECT open_time, close_price, volume
FROM kline_archive
WHERE symbol = 'AAPL.US'
AND interval = '1m'
AND open_time >= ? AND open_time < ?
ORDER BY open_time ASC
""",
(int((datetime.now().timestamp() - 7 * 86400)),
int(datetime.now().timestamp())),
).fetchall()
print(f"回测样本量:{len(rows)} 根 K 线")
for r in rows[:5]:
print(f" {r['open_time']}: close={r['close_price']}")
八、结语
本地行情数据库是量化系统的基础设施,而"每日归档"是维护这套基础设施最轻量的方式——不需要搭建流处理管道,不需要维护 WebSocket 长连接,只需要一个定时任务加一套入库逻辑。
用 TickDB REST 接口拉取历史 K 线,配合 SQLite(个人)或 ClickHouse(团队)的 UPSERT 语义,完整的增量归档链路只需 100 行左右的代码,就能跑在任意一台能联网的机器上。
归档只是起点。数据进了本地数据库之后,下一步是构建你自己的因子库、回测引擎和实时监控面板——这些内容会在 TickDB 后续的文章中逐一展开。
下一步行动
如果你想立刻跑起来:
- 访问 tickdb.ai 注册账号(免费层含 API 调用额度)
- 在控制台生成 API Key
- 设置环境变量
TICKDB_API_KEY - 复制本文代码,安装依赖后运行即可
如果你需要更长的历史数据用于回测:联系 [email protected] 获取机构版数据方案,包含 10 年级别分钟级历史 K 线,覆盖美股/A 股/港股/数字货币等多个市场。
如果你习惯用 AI 辅助开发:在 ClawHub 搜索安装 tickdb-market-data SKILL,可直接通过对话方式调用 TickDB 接口生成归档脚本。
免责声明:本文所有代码和方案仅供技术参考,不构成任何投资建议。历史行情数据不代表未来表现,实际使用前请自行验证数据准确性。市场有风险,投资需谨慎。