当日行情自动归档:用 TickDB 历史接口构建本地行情数据库

"数据的价值在于被使用,而使用的前提是被保存。"

凌晨 1 点,你从睡梦中被手机震醒——不是因为策略亏损,而是脚本报错了。日志显示凌晨 0:15 分,某接口返回空数据,但你没有做空值容错,程序直接崩溃。第二天醒来,你发现当天 23:00-00:15 的夜盘数据全部丢失。

这不是段子。这是每一个尝试构建本地行情数据库的量化开发者,都可能遇到的实际问题。

行情归档看似简单——拉数据、存进去、明天再来。但当你真正开始做,会发现三个隐藏的工程陷阱:边界条件处理(收盘后数据何时可用)、增量更新机制(避免重复拉取)、去重与幂等性(确保数据唯一性)。本文用 Python + SQLite 演示一套生产级的行情归档方案,覆盖从接口调用到本地入库的完整闭环。

为什么需要本地行情数据库

你可能会问:TickDB 本身不是提供实时和历史数据吗?为什么还要存到本地?

这是一个合理的质疑。答案是三个场景:

场景一:降低 API 调用成本。 如果你每天运行 20 次策略,每次策略需要过去 5 天的分钟级数据,每次调用都从 TickDB 拉取,则每天产生 100 次 API 请求。一个月下来是 3000 次。但如果你把数据本地缓存,策略只需查本地数据库,API 调用降至每月 30 次(每天一次归档)。

场景二:加速策略回测。 本地 SQLite 查询延迟在亚毫秒级,而网络 API 即使有 CDN 优化也至少几十毫秒。一次回测跑 1000 根 K 线,本地查询比远程 API 节省数秒到数十秒。

场景三:离线分析能力。 当你需要在地铁里、飞机上复盘策略,或进行跨品种相关性分析时,本地数据库让你不依赖网络。

这不是"重复造轮子"。这是以本地缓存换速度,以磁盘空间换 API 额度的经典工程权衡。

架构设计:三步走的归档流水线

行情归档的本质是一个 ETL 管道:Extract(提取)→ Transform(转换)→ Load(加载)。但与通用 ETL 不同,金融行情数据有独特的约束:

约束 说明 设计应对
数据不可变性 K 线一旦生成,数值不会改变 直接 Upsert,覆盖而非追加
时间单调性 K 线时间戳严格递增 依赖时间戳做增量边界
接口限频 TickDB 有 rate limit 两次请求间隔 ≥0.2 秒,加 Retry-After 处理
数据延迟 收盘后 K 线需等待撮合清算 归档任务在次日 0:30 执行,给足延迟窗口

基于以上约束,我们设计如下流水线:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  定时触发   │ ──▶ │  获取归档点  │ ──▶ │  分页拉取   │ ──▶ │  批量入库   │
│ (次日0:30) │     │ (上次最新时间)│     │ (每次500条) │     │ (Upsert)   │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘
       │                   │                   │                   │
       ▼                   ▼                   ▼                   ▼
   Cron/DAG          SQLite元数据表       限频控制+重试         唯一约束去重

核心设计原则:

  1. 增量拉取:只拉取"上次归档时间"到"当前时间"之间的数据,避免全量重复
  2. Upsert 而非 Insert:用 INSERT OR REPLACE 确保同一条 K 线只保留最新值
  3. 分页拉取:TickDB 单次返回上限 500 条,分页避免超限
  4. 幂等执行:无论脚本运行多少次,结果一致

生产级代码:完整实现

以下代码可直接运行,适配 Python 3.9+,依赖仅 requestssqlite3(标准库)。

1. 配置文件与常量定义

import os
import sqlite3
import time
import logging
from datetime import datetime, timedelta
from typing import Optional
import requests

# ==================== 配置区 ====================
TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY")
if not TICKDB_API_KEY:
    raise ValueError("请设置环境变量 TICKDB_API_KEY")

# TickDB API 配置
BASE_URL = "https://api.tickdb.ai/v1"
HEADERS = {"X-API-Key": TICKDB_API_KEY}

# 本地数据库路径
DB_PATH = "market_data.db"

# 归档标的列表(可扩展)
SYMBOLS = [
    "BTC.USDT",  # 数字货币,7x24
    "AAPL.US",   # 美股,需注意美股只在交易日有数据
]

# 每页拉取数量(TickDB 上限 500)
PAGE_SIZE = 500

# 请求间隔(秒),留足限频余量
REQUEST_INTERVAL = 0.25

# 日志配置
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("archiver.log", encoding="utf-8"),
    ],
)
logger = logging.getLogger(__name__)

2. 数据库初始化

def init_database(db_path: str) -> None:
    """初始化 SQLite 数据库,创建行情表和元数据表"""
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    # 行情主表:存储分钟级 K 线数据
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS kline_1m (
            symbol TEXT NOT NULL,
            timestamp INTEGER NOT NULL,  -- 毫秒级时间戳
            open REAL NOT NULL,
            high REAL NOT NULL,
            low REAL NOT NULL,
            close REAL NOT NULL,
            volume REAL NOT NULL,
            created_at TEXT DEFAULT CURRENT_TIMESTAMP,
            PRIMARY KEY (symbol, timestamp)
        )
    """)
    
    # 索引:加速时间范围查询
    cursor.execute("""
        CREATE INDEX IF NOT EXISTS idx_kline_timestamp 
        ON kline_1m (symbol, timestamp)
    """)
    
    # 元数据表:记录每个标的的归档进度
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS archive_meta (
            symbol TEXT PRIMARY KEY,
            last_archived_ts INTEGER,  -- 上次归档的最新时间戳
            last_archived_at TEXT,
            total_records INTEGER DEFAULT 0
        )
    """)
    
    conn.commit()
    conn.close()
    logger.info(f"数据库初始化完成: {db_path}")

3. 核心 API 调用函数(含错误处理)

def fetch_kline_page(
    symbol: str,
    interval: str = "1m",
    start_time: Optional[int] = None,
    end_time: Optional[int] = None,
    limit: int = PAGE_SIZE,
    timeout: tuple = (3.05, 10)  # 连接超时 3.05s,读取超时 10s
) -> dict:
    """
    调用 TickDB /v1/market/kline 接口获取 K 线数据
    
    Args:
        symbol: 交易品种,如 "BTC.USDT"
        interval: K 线周期,"1m", "5m", "1h" 等
        start_time: 开始时间(毫秒时间戳),None 表示从头开始
        end_time: 结束时间(毫秒时间戳),None 表示截至最新
        limit: 每页条数,最大 500
        timeout: 请求超时配置
    
    Returns:
        API 响应字典
    """
    params = {
        "symbol": symbol,
        "interval": interval,
        "limit": limit,
    }
    if start_time is not None:
        params["start"] = start_time
    if end_time is not None:
        params["end"] = end_time
    
    try:
        response = requests.get(
            f"{BASE_URL}/market/kline",
            headers=HEADERS,
            params=params,
            timeout=timeout
        )
        response.raise_for_status()
        return response.json()
    
    except requests.exceptions.Timeout:
        logger.error(f"请求超时: {symbol} [{start_time} - {end_time}]")
        raise
    
    except requests.exceptions.HTTPError as e:
        logger.error(f"HTTP 错误: {e.response.status_code} {e.response.text}")
        raise


def handle_api_error(response: dict, retry_count: int = 0) -> Optional[dict]:
    """
    处理 TickDB API 错误码
    
    错误码参考:
    - 0: 成功
    - 1001/1002: API Key 无效
    - 2002: 品种不存在
    - 3001: 请求频率超限,需要等待 Retry-After
    """
    code = response.get("code", 0)
    
    if code == 0:
        return response.get("data")
    
    if code in (1001, 1002):
        raise ValueError(f"API Key 无效,请检查环境变量 TICKDB_API_KEY")
    
    if code == 2002:
        raise KeyError(f"交易品种不存在: {response.get('symbol')}")
    
    if code == 3001:
        # 限频错误:读取 Retry-After 头等待
        retry_after = int(response.headers.get("Retry-After", 5))
        logger.warning(f"触发限频 (code:3001),等待 {retry_after} 秒后重试")
        time.sleep(retry_after)
        return None  # 返回 None 表示需要重试
    
    # 其他未知错误
    raise RuntimeError(f"未知错误 code:{code} msg:{response.get('message')}")

4. 增量归档核心逻辑

def get_last_archived_timestamp(db_path: str, symbol: str) -> Optional[int]:
    """从元数据表获取某标的上次归档的最新时间戳"""
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    cursor.execute(
        "SELECT last_archived_ts FROM archive_meta WHERE symbol = ?",
        (symbol,)
    )
    row = cursor.fetchone()
    conn.close()
    
    if row is None:
        # 首次归档,返回 90 天前的毫秒时间戳(覆盖足够的历史窗口)
        ninety_days_ago = int((datetime.now() - timedelta(days=90)).timestamp() * 1000)
        return ninety_days_ago
    
    return row[0]


def archive_symbol(db_path: str, symbol: str, interval: str = "1m") -> int:
    """
    归档单个标的的 K 线数据(增量模式)
    
    流程:
    1. 获取上次归档的最新时间戳作为起始点
    2. 分页拉取数据,直到返回空数据
    3. 批量 Upsert 到 SQLite
    4. 更新元数据表
    
    Returns:
        本次归档的记录数
    """
    start_ts = get_last_archived_timestamp(db_path, symbol)
    end_ts = int(datetime.now().timestamp() * 1000)
    
    logger.info(f"开始归档 {symbol}: [{start_ts}] -> [{end_ts}]")
    
    all_records = []
    current_start = start_ts
    page_count = 0
    max_pages = 1000  # 防止无限循环
    
    while page_count < max_pages:
        # 拉取当前页
        try:
            resp = fetch_kline_page(
                symbol=symbol,
                interval=interval,
                start_time=current_start,
                end_time=end_ts,
                limit=PAGE_SIZE
            )
            
            # 处理错误(限频等)
            if resp is None:
                continue  # 重试当前页
            
            data = handle_api_error(resp)
            if not data:
                break  # 无数据,退出循环
            
            # data 是 K 线数组,格式:[timestamp, open, high, low, close, volume]
            if not isinstance(data, list) or len(data) == 0:
                break
            
            all_records.extend(data)
            page_count += 1
            
            # 更新起始点:取本页最后一条的时间戳 + 1ms(避免重复)
            current_start = data[-1][0] + 1
            
            logger.debug(f"  第 {page_count} 页: {len(data)} 条, 累计 {len(all_records)} 条")
            
            # 限频控制
            time.sleep(REQUEST_INTERVAL)
            
            # 如果返回不足一页,说明已到最新
            if len(data) < PAGE_SIZE:
                break
            
        except Exception as e:
            logger.error(f"拉取失败: {symbol} page={page_count} error={e}")
            raise
    
    if not all_records:
        logger.info(f"{symbol} 无新数据需要归档")
        return 0
    
    # 批量 Upsert 到 SQLite
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    upsert_sql = """
        INSERT OR REPLACE INTO kline_1m (symbol, timestamp, open, high, low, close, volume)
        VALUES (?, ?, ?, ?, ?, ?, ?)
    """
    
    records_to_insert = [(symbol, r[0], r[1], r[2], r[3], r[4], r[5]) for r in all_records]
    cursor.executemany(upsert_sql, records_to_insert)
    
    # 更新元数据
    latest_ts = all_records[-1][0]
    cursor.execute("""
        INSERT OR REPLACE INTO archive_meta (symbol, last_archived_ts, last_archived_at, total_records)
        VALUES (?, ?, datetime('now'), (
            SELECT COALESCE(MAX(total_records), 0) + ? 
            FROM archive_meta WHERE symbol = ?
        ))
    """, (symbol, latest_ts, len(records_to_insert), symbol))
    
    conn.commit()
    conn.close()
    
    logger.info(f"归档完成 {symbol}: 本次新增 {len(records_to_insert)} 条,最新时间戳 {latest_ts}")
    return len(records_to_insert)


def archive_all(db_path: str, symbols: list, interval: str = "1m") -> dict:
    """归档所有标的,返回各标的归档数量"""
    results = {}
    for symbol in symbols:
        try:
            count = archive_symbol(db_path, symbol, interval)
            results[symbol] = {"status": "success", "count": count}
        except Exception as e:
            logger.error(f"归档 {symbol} 失败: {e}")
            results[symbol] = {"status": "error", "error": str(e)}
        
        # 标的之间也做限频控制
        time.sleep(REQUEST_INTERVAL)
    
    return results

5. 主程序入口

def main():
    """每日归档主入口"""
    logger.info("=" * 50)
    logger.info(f"行情归档任务启动: {datetime.now().isoformat()}")
    
    # 初始化数据库
    init_database(DB_PATH)
    
    # 执行归档
    results = archive_all(DB_PATH, SYMBOLS, interval="1m")
    
    # 输出汇总
    logger.info("归档结果汇总:")
    total = 0
    for symbol, result in results.items():
        if result["status"] == "success":
            logger.info(f"  {symbol}: +{result['count']} 条")
            total += result["count"]
        else:
            logger.error(f"  {symbol}: {result['error']}")
    
    logger.info(f"本次归档总计: {total} 条")
    logger.info("=" * 50)


if __name__ == "__main__":
    main()

增量更新的关键设计

上述代码的核心在于增量边界的确定。让我展开解释几个关键细节。

边界时间戳的选择

# 增量拉取的起始点
start_ts = get_last_archived_timestamp(db_path, symbol)

首次运行时,archive_meta 表无记录,函数返回 90 天前的时间戳。这确保:

  1. 新用户能快速补齐近期历史数据(而非从零开始)
  2. 90 天足够覆盖大多数策略的回测窗口
  3. 不必拉取过久远的数据,节省 API 额度

后续运行时,返回上次归档的 last_archived_ts,确保每次只拉取新增数据。

去重机制:Upsert 的幂等性

cursor.execute("""
    INSERT OR REPLACE INTO kline_1m (symbol, timestamp, open, high, low, close, volume)
    VALUES (?, ?, ?, ?, ?, ?, ?)
""", ...)

INSERT OR REPLACE 是 SQLite 的 Upsert 语法。当 PRIMARY KEY (symbol, timestamp) 冲突时,会用新值替换旧值。这意味着:

  • 同一时间戳的 K 线,无论脚本运行多少次,结果一致
  • 如果 TickDB 的历史数据发生修正(如交易所调整),本地数据会自动同步

⚠️ 工程预警:TickDB 的 /v1/market/kline 接口返回的是已结束周期的 K 线。以 1m 周期为例,12:01:00 的 K 线在 12:02:00 之后才会生成。因此,如果你设置 end_time = 当前时间,可能会丢失最后一根正在生成的 K 线。建议归档任务在次日执行,给足数据生成窗口。

分页拉取的终止条件

if len(data) < PAGE_SIZE:
    break

当返回的条数小于单页上限时,说明已拉到最新数据。这是标准的光标分页模式,无需传递 page 参数。

但这里有一个边界情况:如果恰好在归档运行时,新的 K 线数据正在生成,len(data) == PAGE_SIZE 会导致继续拉取下一页,而下一页返回空数组,while 循环退出。这是一种最终一致性的设计:允许短暂的数据不完整,但确保下次归档时会补充完整。

从 SQLite 到 ClickHouse:规模化扩展

SQLite 适合个人开发者和小规模数据。当你需要存储多个标的 × 多年历史 × 多周期时,SQLite 的单文件写入锁会成为瓶颈。以下是迁移到 ClickHouse 的核心改动点:

维度 SQLite ClickHouse
表引擎 MergeTree (自动) MergeTree (手动指定)
批量写入 executemany INSERT INTO ... VALUES 批量
去重 PRIMARY KEY + INSERT OR REPLACE ALTER TABLE ... DELETE WHERE + 物化视图
查询性能 适合 <100 万行 适合 >10 亿行
部署复杂度 单文件 需要 ClickHouse 集群

ClickHouse 版本的核心 SQL:

-- 创建分布式表(多节点场景)
CREATE TABLE IF NOT EXISTS kline_1m
(
    symbol String,
    timestamp UInt64,
    open Float64,
    high Float64,
    low Float64,
    close Float64,
    volume Float64
)
ENGINE = MergeTree()
ORDER BY (symbol, timestamp)
SETTINGS index_granularity = 8192;

-- 批量写入(Python 端构造 VALUES 字符串)
INSERT INTO kline_1m (symbol, timestamp, open, high, low, close, volume) VALUES
('BTC.USDT', 1704067200000, 42000.5, 42100.0, 41950.0, 42050.0, 1250.5),
('BTC.USDT', 1704067260000, 42050.0, 42200.0, 42030.0, 42180.0, 1380.2);

⚠️ 生产提示:ClickHouse 的写入是异步合并的,新写入的数据需要数秒到数十秒才能被查询到。如果你需要严格的一致性读取,考虑使用 ReplacingMergeTree 引擎并在查询时加 FINAL 关键字。

部署方案:定时任务配置

Linux Crontab

# 每天凌晨 0:30 执行归档脚本
30 0 * * * cd /opt/archiver && /usr/bin/python3 main.py >> /var/log/archiver.log 2>&1

Docker 容器化

FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir requests

COPY . .

CMD ["python", "main.py"]
# docker-compose.yml
version: "3.8"
services:
  archiver:
    build: .
    container_name: tickdb-archiver
    environment:
      - TICKDB_API_KEY=${TICKDB_API_KEY}
    volumes:
      - ./data:/app/data
      - ./logs:/app/logs
    restart: unless-stopped
    cron:
      schedule: "30 0 * * *"
      # 注:docker-compose 原生不支持 cron,需使用外部调度器或使用包含 cron 的镜像

生产级调度建议

对于企业级场景,推荐使用:

  • Airflow:支持任务依赖、可视化 DAG、失败告警
  • Dagster:现代 Python-first 调度框架,原生支持 dbt 和数据资产
  • Prefect:云原生,自动化重试和状态管理

数据验证:归档完整性检查

归档完成后,建议增加数据完整性校验:

def validate_archived_data(db_path: str, symbol: str, date: str) -> dict:
    """
    校验指定日期的数据完整性
    
    Returns:
        校验结果字典
    """
    conn = sqlite3.connect(db_path)
    
    # 统计该日期的记录数
    cursor = conn.cursor()
    cursor.execute("""
        SELECT COUNT(*), MIN(timestamp), MAX(timestamp)
        FROM kline_1m
        WHERE symbol = ?
          AND timestamp >= ?
          AND timestamp < ?
    """, (
        symbol,
        int(datetime.strptime(date, "%Y-%m-%d").timestamp() * 1000),
        int((datetime.strptime(date, "%Y-%m-%d") + timedelta(days=1)).timestamp() * 1000),
    ))
    
    row = cursor.fetchone()
    conn.close()
    
    count, min_ts, max_ts = row
    
    # 美股交易日约 390 分钟(9:30 - 16:00)
    # 数字货币 7x24 约 1440 分钟
    expected_minutes = 1440 if "USDT" in symbol else 390
    
    return {
        "symbol": symbol,
        "date": date,
        "actual_records": count,
        "expected_minutes": expected_minutes,
        "completeness": f"{count / expected_minutes * 100:.1f}%",
        "time_range": f"{datetime.fromtimestamp(min_ts/1000) if min_ts else None} - {datetime.fromtimestamp(max_ts/1000) if max_ts else None}",
    }

⚠️ 注意:数字货币 7×24 不代表每天恰好 1440 条记录。网络中断、交易所维护等情况也会导致数据缺失。建议设定可接受的完整性阈值(如 >99.5%),低于阈值时触发告警。

结语

行情归档是一个看似简单、实则充满边界细节的工程问题。本文覆盖了:

  • 如何用 TickDB /v1/market/kline 接口增量拉取数据
  • 如何用 SQLite 做幂等性存储和增量边界管理
  • 如何处理限频(3001 错误码)和超时
  • 如何扩展到 ClickHouse 以支撑规模化场景
  • 如何做数据完整性校验

这套方案的核心是增量边界的设计哲学:记住上次跑到哪里,下次从那里继续。这不仅是归档的哲学,也是大多数批处理 ETL 的通用范式。


下一步行动

如果你想亲手运行本文代码

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY,复制本文代码即可运行

如果你需要管理多个标的、多周期、多市场的行情数据,考虑将 SQLite 迁移到 ClickHouse 或 TimescaleDB,并使用 Airflow 做任务编排。联系我获取 TickDB 企业版方案。

如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,调用归档逻辑时直接说"帮我归档最近 30 天的 AAPL 1 分钟数据"。


本文不构成任何投资建议。市场有风险,投资需谨慎。