行情数据的宿命:要么实时,要么归档

一个老问题:你在交易日收盘后,想复盘当天的分钟级走势,却发现数据要么散落在交易所的 CSV 里,要么干脆就没有保存。

实时数据靠 WebSocket 推送能拿到,但断网怎么办?凌晨服务器重启了怎么办?TickDB 的 REST 历史接口为此设计——让盘后归档成为一个可重复的、受控的管道工程,而不是靠手工导出和 Excel 拼凑。

本文的目标很具体:每天定时拉取 TickDB 的分钟级 K 线数据,存入本地数据库,支持增量更新,不重复写入,并且用生产级的错误处理确保这个管道能稳定跑三个月不用人管。


一、为什么需要本地行情数据库

这个问题值得先说清楚。TickDB 的 REST 接口随时能查到历史数据,那把行情存到本地多此一举吗?

不是。三个实际场景说明本地归档的必要性:

第一,策略回测需要稳定的数据源。 你不想每次回测都要等 API 响应,网络抖动会导致回测中断。用本地数据库,磁盘 I/O 是确定性的。

第二,多策略共享数据。 三个因子策略同时跑,如果每个都去调 TickDB API,超出免费层限频是迟早的事。本地归档一份,三个策略读本地,API 只承担归档职责。

第三,盘后分析的性能要求。 你要在收盘后 30 秒内做完当天的流动性分析,查询本地 SQLite 远比调 REST API 快 2-3 个数量级。

所以归档不是冗余,是架构上的必要分层。


二、TickDB 历史数据接口速览

在动手写代码之前,先把 TickDB 的 REST 接口规范梳理清楚,这是所有后续逻辑的基础。

TickDB 提供两类获取 K 线数据的接口,职责不同:

接口 端点 适用场景
获取历史 K 线 GET /v1/market/kline 回测、归档、分析
获取最新一根 K 线 GET /v1/market/kline/latest 实时监控,不适合做归档

归档场景下,正确的做法是调用 /v1/market/kline,按时间范围分页拉取。每次请求最大返回 1000 条记录,如果时间范围内数据量超过 1000 条,需要用时间分片多次请求。

关键参数

参数 类型 说明
symbol string 交易品种代码,如 AAPL.USBTC.USDT
interval string K 线周期,如 1m5m1h1d
start_time integer 起始时间戳(秒级,UTC)
end_time integer 结束时间戳(秒级,UTC)
limit integer 每页数量,最大 1000

请求示例(以获取 AAPL.US 2024 年 3 月 15 日的分钟级数据为例):

GET https://api.tickdb.ai/v1/market/kline?symbol=AAPL.US&interval=1m&start_time=1710460800&end_time=1710547200&limit=1000
Header: X-API-Key: <your_key>

返回数据每条记录包含:symbolintervalopen_timeclose_timeopenhighlowclosevolume 字段。


三、增量归档的核心设计

增量更新是本文的核心问题。如果每次都全量拉取,数据量大了以后会有两个问题:API 限频(code 3001)和存储浪费。

增量归档的设计逻辑很直接:以本地数据库的最新一条记录的时间戳为起点,向后拉取未入库的数据。

本地最新记录时间 → 作为 start_time → 拉取最新数据 → 入库(跳过已存在)

但有一个边界情况要处理:本地无数据时(首次运行),需要用配置的初始时间戳。

伪代码层面的逻辑如下:

if 本地最新时间戳 exists:
    start_time = 本地最新时间戳 + 1秒
else:
    start_time = 配置的初始回溯时间

end_time = 当前时间(UTC)

while True:
    response = 请求 /v1/market/kline(start_time, end_time, limit=1000)
    if response.records.empty:
        break
    for record in response.records:
        if 本地不存在该 (symbol, interval, open_time):
            写入本地数据库
    start_time = response.records[-1].open_time + 1
    if len(response.records) < 1000:
        break

四、生产级归档脚本

下面给出完整的 Python 实现。代码包含:环境变量鉴权、指数退避重连、限频处理(code 3001)、分页循环、增量去重、SQLite 写入。

import os
import time
import sqlite3
import requests
from datetime import datetime, timezone
from typing import Optional

# ============================================================
# 配置区
# ============================================================
TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY")
if not TICKDB_API_KEY:
    raise ValueError("请设置环境变量 TICKDB_API_KEY")

TICKDB_BASE_URL = "https://api.tickdb.ai/v1/market/kline"

# 归档任务配置:每次运行归档哪些标的
ARCHIVE_TASKS = [
    {"symbol": "AAPL.US", "interval": "1m",  "lookback_days": 1},
    {"symbol": "NVDA.US", "interval": "5m",  "lookback_days": 3},
    {"symbol": "BTC.USDT", "interval": "1m", "lookback_days": 7},
]

# SQLite 数据库路径
DB_PATH = "market_data.db"

# ============================================================
# 数据库初始化
# ============================================================
def init_db():
    """初始化 SQLite 数据库和表结构"""
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS kline_1m (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            symbol TEXT NOT NULL,
            interval TEXT NOT NULL,
            open_time INTEGER NOT NULL,
            close_time INTEGER NOT NULL,
            open REAL NOT NULL,
            high REAL NOT NULL,
            low REAL NOT NULL,
            close REAL NOT NULL,
            volume REAL NOT NULL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            UNIQUE(symbol, interval, open_time)
        )
    """)
    cursor.execute("""
        CREATE INDEX IF NOT EXISTS idx_kline_lookup
        ON kline_1m (symbol, interval, open_time)
    """)
    conn.commit()
    return conn

# ============================================================
# API 请求模块(含重试、限频处理)
# ============================================================
def fetch_kline_page(
    symbol: str,
    interval: str,
    start_time: int,
    end_time: int,
    limit: int = 1000
) -> Optional[dict]:
    """拉取一页 K 线数据,含重试和限频处理"""
    params = {
        "symbol": symbol,
        "interval": interval,
        "start_time": start_time,
        "end_time": end_time,
        "limit": limit,
    }
    headers = {"X-API-Key": TICKDB_API_KEY}

    max_retries = 5
    base_delay = 2.0

    for attempt in range(max_retries):
        try:
            response = requests.get(
                TICKDB_BASE_URL,
                params=params,
                headers=headers,
                timeout=(3.05, 15)  # (connect_timeout, read_timeout)
            )

            # 限频处理
            if response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", 10))
                print(f"  ⚠️ API 限频,等待 {retry_after}s")
                time.sleep(retry_after)
                continue

            data = response.json()

            # 检查 TickDB 业务错误码
            code = data.get("code", 0)
            if code == 0:
                return data.get("data", {})

            # 限频业务错误码
            if code == 3001:
                retry_after = int(response.headers.get("Retry-After", 5))
                print(f"  ⚠️ 业务层限频(code 3001),等待 {retry_after}s")
                time.sleep(retry_after)
                continue

            # 已知不可重试的错误
            if code in (1001, 1002):
                raise ValueError(f"API Key 无效 (code {code}),请检查环境变量")
            if code == 2002:
                raise KeyError(f"交易品种 {symbol} 不存在,请检查 symbol 格式")

            # 其他未知错误,指数退避重试
            raise RuntimeError(f"未知 API 错误 code={code}, msg={data.get('message')}")

        except requests.exceptions.Timeout:
            # 网络超时,指数退避
            delay = min(base_delay * (2 ** attempt), 60)
            jitter = 0.2 * delay * (0.5 - __import__('random').random())
            wait_time = delay + jitter
            print(f"  ⏳ 请求超时,第 {attempt+1} 次重试,等待 {wait_time:.1f}s")
            time.sleep(wait_time)
            continue

        except requests.exceptions.RequestException as e:
            delay = min(base_delay * (2 ** attempt), 60)
            jitter = 0.2 * delay * (0.5 - __import__('random').random())
            wait_time = delay + jitter
            print(f"  ⏳ 网络异常: {e},第 {attempt+1} 次重试,等待 {wait_time:.1f}s")
            time.sleep(wait_time)
            continue

    raise RuntimeError(f"API 请求在 {max_retries} 次重试后失败")

# ============================================================
# 增量归档核心逻辑
# ============================================================
def get_latest_local_time(conn: sqlite3.Connection, symbol: str, interval: str) -> Optional[int]:
    """查询本地数据库中该标的的最新记录时间戳"""
    cursor = conn.cursor()
    cursor.execute(
        """
        SELECT MAX(open_time) FROM kline_1m
        WHERE symbol = ? AND interval = ?
        """,
        (symbol, interval),
    )
    row = cursor.fetchone()
    return row[0] if row and row[0] is not None else None

def upsert_kline(conn: sqlite3.Connection, records: list):
    """批量写入 K 线数据,已存在则忽略(UNIQUE 约束兜底)"""
    if not records:
        return 0
    cursor = conn.cursor()
    inserted = 0
    for r in records:
        try:
            cursor.execute(
                """
                INSERT OR IGNORE INTO kline_1m
                (symbol, interval, open_time, close_time, open, high, low, close, volume)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
                """,
                (
                    r["symbol"],
                    r["interval"],
                    r["open_time"],
                    r["close_time"],
                    r["open"],
                    r["high"],
                    r["low"],
                    r["close"],
                    r["volume"],
                ),
            )
            inserted += cursor.rowcount
        except Exception as e:
            print(f"    ⚠️ 单条写入异常: {e}")
    conn.commit()
    return inserted

def archive_symbol(conn: sqlite3.Connection, symbol: str, interval: str, lookback_days: int):
    """归档单个交易品种,支持增量更新"""
    # 计算起始时间
    latest_local = get_latest_local_time(conn, symbol, interval)
    now_ts = int(datetime.now(timezone.utc).timestamp())

    if latest_local:
        # 增量:跳过本地已有的数据,往后取 5 分钟缓冲
        start_time = latest_local - 300
        print(f"  → 本地已有数据,增量归档,从 {latest_local} 继续")
    else:
        # 全量:按 lookback_days 计算初始回溯时间
        start_time = now_ts - (lookback_days * 86400)
        print(f"  → 首次归档,回溯 {lookback_days} 天")

    end_time = now_ts
    total_written = 0
    page_count = 0

    while True:
        page_count += 1
        print(f"  正在拉取第 {page_count} 页 (start={start_time})...")

        try:
            data = fetch_kline_page(symbol, interval, start_time, end_time)
        except Exception as e:
            print(f"  ❌ 归档中断: {e}")
            break

        records = data.get("records", [])
        if not records:
            print(f"  ✓ 数据拉取完毕,共写入 {total_written} 条新记录")
            break

        inserted = upsert_kline(conn, records)
        total_written += inserted
        print(f"  ✓ 第 {page_count} 页:{len(records)} 条记录中,新增 {inserted} 条")

        # 分页:start_time 推进到下一页的起始点
        last_open_time = records[-1]["open_time"]
        start_time = last_open_time + 1

        # 数据量不足一页,说明已到最新
        if len(records) < 1000:
            print(f"  ✓ 分页拉取完毕,共写入 {total_written} 条新记录")
            break

    return total_written

# ============================================================
# 主程序入口
# ============================================================
def main():
    print("=" * 60)
    print(f"TickDB 行情归档任务启动 | {datetime.now(timezone.utc).isoformat()}")
    print("=" * 60)

    conn = init_db()
    total_records = 0

    for task in ARCHIVE_TASKS:
        symbol = task["symbol"]
        interval = task["interval"]
        lookback = task["lookback_days"]

        print(f"\n📦 归档标的: {symbol} ({interval}), 回溯 {lookback} 天")
        written = archive_symbol(conn, symbol, interval, lookback)
        total_records += written

        # ⚠️ 生产环境高频归档场景建议在任务间加延迟
        time.sleep(1)

    conn.close()
    print(f"\n✅ 全部归档完成,新增 {total_records} 条记录")
    print(f"数据库路径: {os.path.abspath(DB_PATH)}")

if __name__ == "__main__":
    main()

代码中值得注意的工程细节

  • INSERT OR IGNORE 配合 UNIQUE 约束,实现去重的双重保险,即便在并发运行或中断恢复时也不会产生重复主键。
  • 每页处理后设置 1 秒间隔(在 main 中),防止连续请求触发限频。
  • start_time 在增量模式下减去 300 秒(5 分钟缓冲),防止因时间对齐问题漏掉最后一根 K 线。
  • timeout=(3.05, 15):连接超时 3.05 秒(略大于请求粒度),读超时 15 秒(给服务器处理留足时间)。

五、定时调度:让归档自动化

光有脚本不够,需要让它每天自动跑。这里给出两个方案,按部署场景选择:

5.1 方案一:systemd 定时器(Linux 服务器)

创建 systemd 服务文件:

# /etc/systemd/system/tickdb-archiver.service
[Unit]
Description=TickDB Market Data Archiver

[Service]
Type=oneshot
WorkingDirectory=/opt/archiver
Environment="TICKDB_API_KEY=your_api_key_here"
ExecStart=/usr/bin/python3 /opt/archiver/archive.py
StandardOutput=journal
StandardError=journal

创建定时器:

# /etc/systemd/system/tickdb-archiver.timer
[Unit]
Description=TickDB Market Data Archiver Timer

[Timer]
# 美国收盘后约 22:05 UTC 运行(美东夏令时对应 18:05)
OnCalendar=*-*-* 22:05:00
Persistent=true

[Install]
WantedBy=timers.target

启用定时任务:

sudo systemctl daemon-reload
sudo systemctl enable --now tickdb-archiver.timer
sudo systemctl list-timers --all | grep tickdb

5.2 方案二:cron(通用方案)

# 每天 22:05 UTC 执行
5 22 * * 1-5 cd /opt/archiver && TICKDB_API_KEY=your_key python3 archive.py >> /var/log/tickdb_archiver.log 2>&1

周一到周五(美国交易日)自动触发,不需要额外安装服务。


六、数据验证:归档是否完整?

归档管道跑起来之后,你需要一种方式验证数据完整性,而不是盲目相信脚本没有漏掉任何东西。

6.1 记录级校验

每次归档完成后,查本地数据库的时间范围,与 TickDB API 返回的理论时间范围做对比:

def validate_archive(symbol: str, interval: str):
    """校验本地归档是否完整"""
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()

    # 本地时间范围
    cursor.execute(
        """
        SELECT MIN(open_time), MAX(open_time), COUNT(*)
        FROM kline_1m
        WHERE symbol = ? AND interval = ?
        """,
        (symbol, interval),
    )
    local_min, local_max, count = cursor.fetchone()
    conn.close()

    # 获取 TickDB 返回的理论最新一根
    headers = {"X-API-Key": TICKDB_API_KEY}
    params = {"symbol": symbol, "interval": interval, "limit": 1}
    resp = requests.get(
        "https://api.tickdb.ai/v1/market/kline",
        params=params,
        headers=headers,
        timeout=(3.05, 10),
    )
    latest_remote = resp.json()["data"]["records"][0]["open_time"]

    gap = latest_remote - local_max if local_max else None

    print(f"{symbol}/{interval}:")
    print(f"  本地范围: {datetime.fromtimestamp(local_min, tz=timezone.utc)} → "
          f"{datetime.fromtimestamp(local_max, tz=timezone.utc)}")
    print(f"  远程最新: {datetime.fromtimestamp(latest_remote, tz=timezone.utc)}")
    print(f"  时间差: {gap}s ({gap/60:.1f}min)" if gap else "  无本地数据")
    print(f"  记录数: {count}")

6.2 连续性校验

K 线是连续的时间序列,如果某根 K 线缺失,后续的 open/high/low/close 计算就会出错。用 SQL 窗口函数检测断档:

SELECT
    symbol,
    interval,
    open_time AS current_bar,
    LEAD(open_time) OVER (PARTITION BY symbol, interval ORDER BY open_time) AS next_bar,
    LEAD(open_time) OVER (PARTITION BY symbol, interval ORDER BY open_time) - open_time AS gap_seconds
FROM kline_1m
WHERE symbol = 'AAPL.US'
  AND interval = '1m'
HAVING gap_seconds > 120;  -- 超过 2 分钟的间隔视为断档

七、从 SQLite 到 ClickHouse:规模化扩展

SQLite 适合个人开发者和单策略场景,数据量在百万级别以内查询性能足够。但当你需要支撑团队协作、历史回测提速、或接入多个因子策略时,ClickHouse 是更合适的存储层。

7.1 ClickHouse 表结构

CREATE TABLE IF NOT EXISTS market_data.kline (
    symbol       String,
    interval     String,
    open_time    UInt64,
    close_time   UInt64,
    open         Float64,
    high         Float64,
    low          Float64,
    close        Float64,
    volume       Float64,
    created_at   DateTime DEFAULT now()
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(fromUnixTimestamp64Milli(open_time / 1000))
ORDER BY (symbol, interval, open_time)
SETTINGS index_granularity = 8192;

ClickHouse 按月分区,open_time 作为排序键,相同 symbol + interval 的数据在物理上连续存储,范围查询性能比 SQLite 的 B-Tree 索引快 10-100 倍。

7.2 写入性能对比

指标 SQLite ClickHouse
百万条写入速度 ~5 秒 <0.5 秒
单次查询延迟(P99) ~50ms <5ms
存储压缩率 无压缩 ~3-5x
并发写入 不支持 支持
适合规模 <1000 万条 无上限

如果数据量在 1000 万以上,或者团队有多人同时查询,迁移到 ClickHouse 的收益明显。迁移脚本的核心逻辑是逐月导出 SQLite 数据,再批量写入 ClickHouse,这里不再展开。


八、完整运行日志示例

以下是脚本在正常运行时的输出格式,可作为监控和排查的依据:

============================================================
TickDB 行情归档任务启动 | 2024-03-15T22:05:03+00:00
============================================================

📦 归档标的: AAPL.US (1m), 回溯 1 天
  → 本地已有数据,增量归档,从 1710451200 继续
  正在拉取第 1 页 (start=1710450900)...
  ✓ 第 1 页:1000 条记录中,新增 12 条
  正在拉取第 2 页 (start=1710518401)...
  ✓ 第 2 页:450 条记录中,新增 0 条
  ✓ 分页拉取完毕,共写入 12 条新记录

📦 归档标的: NVDA.US (5m), 回溯 3 天
  → 本地已有数据,增量归档,从 1710364800 继续
  正在拉取第 1 页 (start=1710364500)...
  ✓ 第 1 页:1000 条记录中,新增 48 条
  ✓ 分页拉取完毕,共写入 48 条新记录

✅ 全部归档完成,新增 60 条记录
数据库路径: /opt/archiver/market_data.db

结语

行情归档这件事,核心复杂度不在于“能不能拿到数据”,而在于“能不能稳定地、不丢不重地一直拿下去”。本文的方案用增量机制控制 API 调用量,用 SQLite UNIQUE 约束兜底去重,用指数退避加限频等待保障管道稳定性,用 systemd timer 或 cron 实现零人工干预的自动化。

三个改进方向供你按需扩展:

第一,接入 ClickHouse 后,原有脚本只需修改 upsert 函数中的写入逻辑,API 拉取和分页逻辑完全不变。

第二,加入告警机制:连续三次 API 错误超过阈值(如 code 3001 持续 30 分钟未恢复),发送飞书或 Slack 通知,而不是让定时任务默默失败。

第三,支持自定义调度:不同标的的市场收盘时间不同,A股是 15:00 北京时间,美股是 16:00 美东时间,如果要同时归档两类资产,需要为不同标的设置不同的触发时间。


下一步行动

如果你希望亲手运行本文代码

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY,复制本文代码即可运行

如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,可直接用自然语言查询 TickDB 的可用标的和时间范围。

如果你需要多标的、跨市场的批量归档,联系 [email protected] 了解自动化数据管道的企业级方案。


本文不构成任何投资建议。市场有风险,投资需谨慎。