"你的策略回测跑过了 2020 年 3 月的流动性黑洞吗?"

如果你做过实盘,就知道这道题有多残酷——当时纳斯达克 10 天内触发 4 次熔断,无数量化策略在 Yahoo Finance 的"干净"数据里回测出漂亮的曲线,一上实盘就被打得找不着北。

问题不在策略本身,而在数据源。多数免费数据有这三个致命伤:隔夜跳空填充假数据分钟级数据缺失或不同步历史深度不够。TickDB 提供的分钟级历史 K 线数据覆盖 A 股、港股、数字货币等多个市场,10 年级别、清洗对齐,可以从根本上解决这个痛点。

但光有数据源不够。你需要一个每日自动归档流程:盘后自动拉取当日分钟级数据,写入本地数据库,支持后续回测复用。这篇文章给出完整的生产级实现,包括 REST API 拉取模块、本地数据库设计、增量更新逻辑,以及去重方案。


一、为什么"每日归档"比实时流更划算

实时 WebSocket 流适合监控场景,但做回测分析时,每天收盘后批量拉取历史 K 线反而是更合理的选择,理由有三:

第一,带宽成本低。 盘后归档是一次性拉取固定时间范围的数据,不依赖长连接,不存在断线重连的运维开销。

第二,数据完整性高。 分钟级 K 线由 TickDB 服务端在收盘后合成,相比实时推送的逐根累积,缺失值的概率更低。

第三,回测窗口灵活。 归档完成后,本地数据库就是你自己的行情源,可以无限次查询而不产生 API 调用成本,适合因子研究阶段的反复迭代。

核心逻辑很清晰:每日定时触发 → REST API 拉取当日 K 线 → 入库 SQLite/ClickHouse → 增量去重。下面逐一展开。


二、系统架构总览

┌─────────────────────────────────────────────────────┐
│                    调度层(Scheduler)                 │
│  每日 16:05(A 股)/ 01:05(数字货币)触发归档任务      │
└──────────────────────┬──────────────────────────────┘
                       │ 触发
                       ▼
┌─────────────────────────────────────────────────────┐
│                  TickDB API Client                   │
│  REST 调用 /v1/market/kline                          │
│  处理限频(3001 + Retry-After)                       │
│  处理时间边界:当日开盘~收盘                          │
└──────────────────────┬──────────────────────────────┘
                       │ 原始 K 线列表
                       ▼
┌─────────────────────────────────────────────────────┐
│                  数据持久化层                         │
│  SQLite(个人/轻量)/ ClickHouse(团队/大规模)        │
│  UPSERT 语义:ON CONFLICT 去重                        │
└──────────────────────┬──────────────────────────────┘
                       │ 归档结果
                       ▼
┌─────────────────────────────────────────────────────┐
│                  调度层(Scheduler)                  │
│  记录本次归档时间戳下次增量起始点                       │
└─────────────────────────────────────────────────────┘

三层职责分离的好处是:换数据源只需改 API Client,换存储引擎只需改持久化层,调度逻辑保持不动。


三、生产级 TickDB API 调用模块

3.1 接口选型与参数设计

TickDB 提供两个 K 线相关接口:

  • GET /v1/market/kline:获取已结束周期的历史 K 线,用于归档
  • GET /v1/market/kline/latest:获取当前实时 K 线,不适合归档场景

归档应使用 /v1/market/kline,核心参数如下:

参数 类型 说明 示例
symbol string 交易品种代码 AAPL.USBTC.USDT
interval string K 线周期 1m5m1h1d
start_time int 起始时间戳(秒) 1743782400
end_time int 结束时间戳(秒) 1743868800
limit int 最大条数(上限 1000) 1000

⚠️ 注意:分钟级数据单次最多返回 1000 条。对于高频场景(如 1 分钟 A 股全天 240 根),需要分页拉取,每次请求用 end_time 控制时间窗口。

3.2 API Client 完整实现

以下代码覆盖了生产级的所有关键要素:环境变量鉴权限频退避指数退避重连抖动超时设置分页拉取

import os
import time
import logging
import random
from datetime import datetime, timezone
from typing import Optional
import requests

logger = logging.getLogger(__name__)


class TickDBKlineClient:
    """TickDB 历史 K 线拉取客户端(生产级)"""

    BASE_URL = "https://api.tickdb.ai/v1/market/kline"
    MAX_RETRY = 5
    BASE_DELAY = 1.0
    MAX_DELAY = 60.0

    def __init__(self, api_key: Optional[str] = None):
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("API Key 未设置。请设置环境变量 TICKDB_API_KEY")
        self.headers = {"X-API-Key": self.api_key}

    def _request(self, params: dict, retry_count: int = 0) -> dict:
        """单次请求封装,含超时和限频处理"""
        response = requests.get(
            self.BASE_URL,
            headers=self.headers,
            params=params,
            timeout=(3.05, 10)  # (connect_timeout, read_timeout)
        )

        # 限频处理:TickDB 返回 3001 时附带 Retry-After 头
        if response.status_code == 429 or (
            response.status_code == 200
            and response.json().get("code") == 3001
        ):
            retry_after = int(
                response.headers.get("Retry-After", 5)
            )
            logger.warning(
                f"触发限频,等待 {retry_after}s 后重试(第 {retry_count + 1} 次)"
            )
            time.sleep(retry_after)
            return self._request(params, retry_count)

        response.raise_for_status()
        data = response.json()

        if data.get("code") == 0:
            return data.get("data", [])

        # 非限频错误:指数退避重连
        if retry_count < self.MAX_RETRY:
            delay = min(self.BASE_DELAY * (2 ** retry_count), self.MAX_DELAY)
            # 添加抖动,避免多实例同时重连的惊群效应
            jitter = random.uniform(0, delay * 0.1)
            logger.warning(
                f"请求失败(code={data.get('code')}),{delay + jitter:.1f}s 后重试"
            )
            time.sleep(delay + jitter)
            return self._request(params, retry_count + 1)

        raise RuntimeError(
            f"K 线请求失败,已达最大重试次数: code={data.get('code')}, "
            f"message={data.get('message')}"
        )

    def fetch_klines(
        self,
        symbol: str,
        interval: str,
        start_time: int,
        end_time: int,
    ) -> list[dict]:
        """
        分页拉取指定时间范围的 K 线数据。

        Args:
            symbol: 交易品种,如 'AAPL.US'
            interval: K 线周期,如 '1m', '5m', '1h'
            start_time: 起始时间戳(秒)
            end_time: 结束时间戳(秒)

        Returns:
            所有 K 线组成的列表,按时间升序排列
        """
        all_klines = []
        cursor = start_time

        while cursor < end_time:
            # TickDB 单次最多返回 1000 条,分页拉取
            params = {
                "symbol": symbol,
                "interval": interval,
                "start_time": cursor,
                "end_time": end_time,
                "limit": 1000,
            }

            klines = self._request(params)
            if not klines:
                break

            all_klines.extend(klines)
            last_close_time = klines[-1]["close_time"]

            # 边界保护:避免死循环
            if last_close_time <= cursor:
                logger.warning(
                    f"分页游标停滞(cursor={cursor}, "
                    f"last_close_time={last_close_time}),跳过"
                )
                break

            cursor = last_close_time + 1

            # ⚠️ 高频场景下建议使用 aiohttp 异步并发拉取多标的
            # 当前串行实现适合每日归档的低频场景

        logger.info(
            f"[{symbol}] 拉取 {interval} K 线 "
            f"{datetime.fromtimestamp(start_time, tz=timezone.utc)} ~ "
            f"{datetime.fromtimestamp(end_time, tz=timezone.utc)},"
            f"共 {len(all_klines)} 条"
        )
        return all_klines

3.3 时间窗口的精确控制

归档时间窗口的设置有讲究。不同市场的交易时段不同:

市场 代码示例 盘后归档时间(建议) 时间窗口边界
A 股 600519.SH 15:05 触发 09:30–15:00
港股 0700.HK 16:05 触发 09:30–16:00
美股 AAPL.US 次日 01:05 触发 09:30–16:00 EST
数字货币 BTC.USDT 任意时刻(24h) 00:00–24:00 UTC
from datetime import datetime, timezone, timedelta

def get_trading_day_params(symbol: str) -> tuple[int, int]:
    """返回当日交易时段的时间戳边界"""
    now = datetime.now(timezone.utc)

    # 简化示例:以 UTC 00:00 为数字货币的"日"分界
    # 实际项目中建议用 pandas_market_calendars 读取各市场日历
    start_of_day = now.replace(hour=0, minute=0, second=0, microsecond=0)
    end_of_day = start_of_day + timedelta(days=1)

    # 美股(AAPL.US)转 EST 时区,交易日为前一日
    if symbol.endswith(".US") and symbol not in ("BTC.USDT", "ETH.USDT"):
        # 实际应调用 market_calendars,这里仅作示意
        pass

    return int(start_of_day.timestamp()), int(end_of_day.timestamp())

⚠️ 生产提示:时间窗口的精确计算强烈建议依赖 pandas_market_calendars 或各交易所官方日历 API,避免节假日的空数据污染归档结果。A 股还要处理盘中休市(11:30–13:00)和熔断断点。


四、数据库设计:SQLite 与 ClickHouse 双轨方案

4.1 数据模型

无论用 SQLite 还是 ClickHouse,底层数据模型一致:

CREATE TABLE IF NOT EXISTS kline_archive (
    symbol      TEXT    NOT NULL,
    interval    TEXT    NOT NULL,
    open_time   INTEGER NOT NULL,  -- 开盘时间戳(秒)
    close_time  INTEGER NOT NULL, -- 收盘时间戳(秒)
    open_price  REAL    NOT NULL,
    high_price  REAL    NOT NULL,
    low_price   REAL    NOT NULL,
    close_price REAL    NOT NULL,
    volume      REAL    NOT NULL,
    updated_at  INTEGER NOT NULL,  -- 入库时间戳
    PRIMARY KEY (symbol, interval, open_time)
) PARTITIONED BY (interval);

设计要点

  • open_time 作为主键的一部分,实现天然去重
  • close_time 单独存储,方便区间查询(不用基于 open_time 推算)
  • updated_at 记录入库时间,支持审计和增量判断
  • ClickHouse 使用 PARTITIONED BY 按周期分区,查询效率远高于 SQLite 全表扫描

4.2 SQLite 方案

import sqlite3
from pathlib import Path
from contextlib import contextmanager
from typing import Generator


class SQLiteArchiver:
    """SQLite 本地行情归档器"""

    def __init__(self, db_path: str = "kline_archive.db"):
        self.db_path = db_path
        self._init_schema()

    @contextmanager
    def _conn(self) -> Generator[sqlite3.Connection, None, None]:
        conn = sqlite3.connect(self.db_path)
        conn.execute("PRAGMA journal_mode=WAL")  # 写入并发安全
        try:
            yield conn
        finally:
            conn.close()

    def _init_schema(self):
        with self._conn() as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS kline_archive (
                    symbol TEXT NOT NULL,
                    interval TEXT NOT NULL,
                    open_time INTEGER NOT NULL,
                    close_time INTEGER NOT NULL,
                    open_price REAL NOT NULL,
                    high_price REAL NOT NULL,
                    low_price REAL NOT NULL,
                    close_price REAL NOT NULL,
                    volume REAL NOT NULL,
                    updated_at INTEGER NOT NULL,
                    PRIMARY KEY (symbol, interval, open_time)
                )
            """)
            conn.execute(
                "CREATE INDEX IF NOT EXISTS idx_symbol_interval "
                "ON kline_archive (symbol, interval, open_time DESC)"
            )

    def upsert(self, klines: list[dict]):
        """批量 UPSERT,含去重逻辑"""
        if not klines:
            return

        now = int(time.time())
        rows = [
            (
                k["symbol"],
                k["interval"],
                k["open_time"],
                k["close_time"],
                k["open"],
                k["high"],
                k["low"],
                k["close"],
                k["volume"],
                now,
            )
            for k in klines
        ]

        with self._conn() as conn:
            conn.executemany(
                """
                INSERT INTO kline_archive
                    (symbol, interval, open_time, close_time,
                     open_price, high_price, low_price,
                     close_price, volume, updated_at)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                ON CONFLICT(symbol, interval, open_time) DO UPDATE SET
                    open_price = excluded.open_price,
                    high_price = excluded.high_price,
                    low_price  = excluded.low_price,
                    close_price = excluded.close_price,
                    volume = excluded.volume,
                    updated_at = excluded.updated_at
                """,
                rows,
            )
            conn.commit()

        logger.info(
            f"归档完成:{len(rows)} 条写入/更新,"
            f"数据库 {self.db_path}"
        )

    def query_range(
        self, symbol: str, interval: str,
        start_time: int, end_time: int
    ) -> list[dict]:
        """区间查询,用于回测"""
        with self._conn() as conn:
            conn.row_factory = sqlite3.Row
            rows = conn.execute(
                """
                SELECT * FROM kline_archive
                WHERE symbol = ? AND interval = ?
                  AND open_time >= ? AND open_time < ?
                ORDER BY open_time ASC
                """,
                (symbol, interval, start_time, end_time),
            ).fetchall()
            return [dict(r) for r in rows]

⚠️ 生产提示:SQLite 在多进程并发写入时存在锁竞争。如果调度系统从多台机器触发归档,应将 journal_mode 设为 WAL 或切换至 ClickHouse。

4.3 ClickHouse 方案(团队级)

from clickhouse_driver import Client


class ClickHouseArchiver:
    """ClickHouse 团队行情归档器"""

    def __init__(self, host: str, port: int,
                 database: str = "market_data"):
        self.client = Client(host=host, port=port, database=database)
        self._init_schema()

    def _init_schema(self):
        """创建分区表,支持按 interval 快速过滤"""
        self.client.execute("""
            CREATE TABLE IF NOT EXISTS kline_archive (
                symbol     String,
                interval   String,
                open_time  UInt32,
                close_time UInt32,
                open_price Float64,
                high_price Float64,
                low_price  Float64,
                close_price Float64,
                volume     Float64,
                updated_at  UInt32
            )
            ENGINE = ReplacingMergeTree(updated_at)
            ORDER BY (symbol, interval, open_time)
            PARTITION BY interval
        """)

    def upsert(self, klines: list[dict]):
        """ClickHouse ReplacingMergeTree 自动去重"""
        if not klines:
            return

        now = int(time.time())
        rows = [
            (
                k["symbol"], k["interval"],
                k["open_time"], k["close_time"],
                k["open"], k["high"], k["low"], k["close"],
                k["volume"], now,
            )
            for k in klines
        ]

        self.client.execute(
            "INSERT INTO kline_archive VALUES",
            rows,
        )
        logger.info(
            f"ClickHouse 归档完成:{len(rows)} 条,数据库 {self.client.database}"
        )

关键差异:SQLite 用 ON CONFLICT 实时去重,ClickHouse 用 ReplacingMergeTree 后台异步去重(建议定期执行 OPTIMIZE TABLE kline_archive)。


五、增量更新逻辑

增量归档的核心原则是:只拉取本地数据库中尚未存储的时间段

import sqlite3
from datetime import datetime, timezone


class IncrementalArchiver:
    """增量归档协调器"""

    def __init__(self, client: TickDBKlineClient,
                 archiver: SQLiteArchiver,
                 state_db: str = "archive_state.db"):
        self.client = client
        self.archiver = archiver
        self.state_db = state_db
        self._init_state_table()

    def _init_state_table(self):
        """维护每个标的的最后归档时间戳"""
        conn = sqlite3.connect(self.state_db)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS archive_progress (
                symbol    TEXT PRIMARY KEY,
                interval  TEXT NOT NULL,
                last_end_time INTEGER NOT NULL
            )
        """)
        conn.close()

    def _get_last_end_time(self, symbol: str,
                           interval: str) -> int:
        conn = sqlite3.connect(self.state_db)
        row = conn.execute(
            "SELECT last_end_time FROM archive_progress "
            "WHERE symbol = ? AND interval = ?",
            (symbol, interval),
        ).fetchone()
        conn.close()
        return row[0] if row else 0

    def _save_last_end_time(self, symbol: str,
                            interval: str, end_time: int):
        conn = sqlite3.connect(self.state_db)
        conn.execute(
            """
            INSERT INTO archive_progress (symbol, interval, last_end_time)
            VALUES (?, ?, ?)
            ON CONFLICT(symbol) DO UPDATE SET
                last_end_time = excluded.last_end_time,
                interval = excluded.interval
            """,
            (symbol, interval, end_time),
        )
        conn.commit()
        conn.close()

    def archive(self, symbol: str, interval: str,
                start_time: int, end_time: int):
        """
        增量归档主流程。

        增量逻辑:
        1. 读取本地存储的最新 close_time 作为本次起始点
        2. 仅拉取增量区间的新 K 线
        3. 写入本地数据库
        4. 更新归档进度状态
        """
        # 增量起始点:优先使用上次归档时间,若无则使用传入的 start_time
        incremental_start = max(
            self._get_last_end_time(symbol, interval),
            start_time,
        )

        if incremental_start >= end_time:
            logger.info(
                f"[{symbol}] {interval} 已是最新,无需归档 "
                f"(最新时间:{datetime.fromtimestamp(incremental_start, tz=timezone.utc)})"
            )
            return

        logger.info(
            f"[{symbol}] 增量归档:"
            f"{datetime.fromtimestamp(incremental_start, tz=timezone.utc)} ~ "
            f"{datetime.fromtimestamp(end_time, tz=timezone.utc)}"
        )

        # 拉取增量数据
        klines = self.client.fetch_klines(
            symbol=symbol,
            interval=interval,
            start_time=incremental_start,
            end_time=end_time,
        )

        if not klines:
            logger.warning(f"[{symbol}] 拉取结果为空,请检查时间窗口")
            return

        # 入库(含去重)
        self.archiver.upsert(klines)

        # 更新归档状态
        last_close_time = klines[-1]["close_time"]
        self._save_last_end_time(symbol, interval, last_close_time)

        logger.info(
            f"[{symbol}] 归档完成:{len(klines)} 条,"
            f"下次起始点 {datetime.fromtimestamp(last_close_time, tz=timezone.utc)}"
        )

增量关键点_get_last_end_time 读取的是 close_time 而非 open_time。因为 K 线是左闭右开区间 [open_time, close_time),下一根 K 线的起始点应从最后一根的 close_time + 1 开始拉取,防止边界重复。


六、调度与自动化

6.1 APScheduler 定时归档

from apscheduler.schedulers.blocking import BlockingScheduler


def daily_archive_job():
    """
    每日盘后归档任务(示例配置)。
    实际部署时建议根据各市场的交易时段分别配置调度。
    """
    targets = [
        ("AAPL.US",  "1m"),
        ("BTC.USDT", "1m"),
        ("600519.SH", "5m"),
    ]

    now = int(datetime.now(timezone.utc).timestamp())

    client = TickDBKlineClient()
    archiver = SQLiteArchiver()
    orchestrator = IncrementalArchiver(client, archiver)

    for symbol, interval in targets:
        try:
            start_of_day = now - 86400  # 简化:拉取 24h 窗口
            orchestrator.archive(
                symbol=symbol,
                interval=interval,
                start_time=start_of_day,
                end_time=now,
            )
        except Exception as e:
            logger.error(f"[{symbol}] 归档异常:{e}", exc_info=True)


scheduler = BlockingScheduler()

# 美股盘后:周一至周五 01:05 UTC
scheduler.add_job(
    daily_archive_job,
    "cron",
    day_of_week="mon-fri",
    hour=1,
    minute=5,
    id="daily_archive",
)

# ⚠️ 建议在容器/Kubernetes 中运行,并配置健康检查探针
# 避免调度进程挂掉导致归档漏跑
scheduler.start()

6.2 容器化部署建议

组件 建议配置
调度进程 使用 Supervisor 管理,配置 autorestart=true
数据持久化 SQLite 建议挂载 NFS 或 hostPath,避免容器重建丢失
环境变量 通过 Kubernetes Secret 或 Docker Secret 注入 TICKDB_API_KEY
监控 导出 Prometheus 指标(归档条数、耗时、失败率),接入飞书/Slack 告警
日志 结构化日志(JSON 格式),方便日志收集系统解析

七、完整运行示例

"""
 TickDB 每日行情归档完整示例

 运行前提:
 1. 设置环境变量 TICKDB_API_KEY
 2. pip install requests apscheduler

 运行方式:
 python archive_runner.py
"""

import os
import logging

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)


def main():
    if not os.environ.get("TICKDB_API_KEY"):
        raise EnvironmentError(
            "请设置 TICKDB_API_KEY 环境变量后重试"
        )

    # 初始化各层组件
    api_client = TickDBKlineClient()
    archiver   = SQLiteArchiver("kline_archive.db")
    orchestrator = IncrementalArchiver(api_client, archiver)

    # 定义归档标的(A 股建议用 5m,数字货币建议用 1m)
    symbols = [
        ("AAPL.US",    "1m"),
        ("BTC.USDT",   "1m"),
        ("600519.SH",  "5m"),
    ]

    now_ts = int(datetime.now(timezone.utc).timestamp())
    day_start = now_ts - 86400  # 拉取过去 24h 窗口

    for symbol, interval in symbols:
        try:
            orchestrator.archive(
                symbol=symbol,
                interval=interval,
                start_time=day_start,
                end_time=now_ts,
            )
        except Exception as e:
            logging.error(
                f"[{symbol}] 归档失败: {e}", exc_info=True
            )

    # 验证入库结果
    sample = archiver.query_range(
        symbol="AAPL.US",
        interval="1m",
        start_time=day_start,
        end_time=now_ts,
    )
    logging.info(f"验证:AAPL.US 1m 共查询到 {len(sample)} 条")


if __name__ == "__main__":
    main()

运行后在 kline_archive.db 中即可查询本地归档数据:

# 示例:查询过去 7 日 AAPL.US 1 分钟数据用于回测
import sqlite3

conn = sqlite3.connect("kline_archive.db")
conn.row_factory = sqlite3.Row
rows = conn.execute(
    """
    SELECT open_time, close_price, volume
    FROM kline_archive
    WHERE symbol = 'AAPL.US'
      AND interval = '1m'
      AND open_time >= ? AND open_time < ?
    ORDER BY open_time ASC
    """,
    (int((datetime.now().timestamp() - 7 * 86400)), 
     int(datetime.now().timestamp())),
).fetchall()

print(f"回测样本量:{len(rows)} 根 K 线")
for r in rows[:5]:
    print(f"  {r['open_time']}: close={r['close_price']}")

八、结语

本地行情数据库是量化系统的基础设施,而"每日归档"是维护这套基础设施最轻量的方式——不需要搭建流处理管道,不需要维护 WebSocket 长连接,只需要一个定时任务加一套入库逻辑。

用 TickDB REST 接口拉取历史 K 线,配合 SQLite(个人)或 ClickHouse(团队)的 UPSERT 语义,完整的增量归档链路只需 100 行左右的代码,就能跑在任意一台能联网的机器上。

归档只是起点。数据进了本地数据库之后,下一步是构建你自己的因子库、回测引擎和实时监控面板——这些内容会在 TickDB 后续的文章中逐一展开。


下一步行动

如果你想立刻跑起来

  1. 访问 tickdb.ai 注册账号(免费层含 API 调用额度)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY
  4. 复制本文代码,安装依赖后运行即可

如果你需要更长的历史数据用于回测:联系 [email protected] 获取机构版数据方案,包含 10 年级别分钟级历史 K 线,覆盖美股/A 股/港股/数字货币等多个市场。

如果你习惯用 AI 辅助开发:在 ClawHub 搜索安装 tickdb-market-data SKILL,可直接通过对话方式调用 TickDB 接口生成归档脚本。


免责声明:本文所有代码和方案仅供技术参考,不构成任何投资建议。历史行情数据不代表未来表现,实际使用前请自行验证数据准确性。市场有风险,投资需谨慎。