"当你的回测脚本跑了 3 小时终于通过,结果发现本地 K 线数据在 2024 年 3 月 15 日那天出现了一段诡异的缺口——交易所那天临时改了行情广播规则,你的本地数据库里只剩下卖一价、没有买一价,那 3 小时的回测结论全部作废。"

这不是耸人听闻的假设,而是每个经历过的量化开发者都懂的深夜噩梦。

本地行情数据库的可靠性,直接决定了策略回测结论的可信度。但手动维护一套完整的历史数据,存在三重困境:完整性困境——行情源中断后手动补数据费时费力;一致性困境——不同来源的数据格式、时间戳对齐方式各异;增量困境——每次重新拉全量数据既浪费带宽,又容易在边界处产生重复记录。

本文给出工程级的解决方案:用 TickDB 历史接口作为数据源,构建自动归档脚本,支持 SQLite 单机存储或 ClickHouse 分布式存储,实现每日定时增量更新。代码经过生产验证,包含幂等去重、断点续传、限频处理等关键工程细节。


为什么本地行情数据库仍是刚需

云端数据服务日趋成熟,为何还要自建本地数据库?这个问题没有标准答案,但三类场景让本地存储成为刚性需求:

回测完整性决定策略置信度。当你用 5 年数据回测一个趋势策略,样本中是否包含 2020 年 3 月那种流动性真空、2022 年 3 月那种 Fed 突发加息、2024 年某次财报季的流动性黑洞,直接影响策略在极端行情下的预期表现。云端数据供应商可能因商业考量调整历史数据覆盖范围,而本地数据库的控制权完全在自己手里。

隐私与延迟的权衡。高频策略对数据延迟敏感,但同时存在不想暴露策略参数的合规要求。本地缓存一层数据,在安全边界内做二次处理,比每次都走 API 调用更可控。

多数据源交叉验证。本地存储使得不同数据源的数据可以在同一时序框架下比对,发现异常值和格式差异。数据清洗逻辑一旦固化,后续每次回测都基于同一套 pipeline,消除"数据不确定"带来的结论偏差。


系统架构:四层结构与数据流

本地行情归档系统分为四层:触发层负责定时调度,拉取层负责从 TickDB 获取原始数据,处理层负责去重与格式标准化,存储层负责持久化。

┌─────────────────────────────────────────────────────────────┐
│                      触发层(Scheduler)                     │
│   每日 16:00(美股收盘后)/ 18:00(A 股结算后)定时执行      │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                      拉取层(TickDB Client)                  │
│   • 查询最新时间戳 → 确定起止时间                            │
│   • 调用 /v1/market/kline(获取已结束周期)                   │
│   • 限频处理:code 3001 + Retry-After 头                    │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                      处理层(Processor)                      │
│   • 格式标准化:Unix 时间戳 → Python datetime                │
│   • 幂等去重:local_dataframe[open_time < new_open_time]    │
│   • 边界对齐:仅保留完整 K 线周期                             │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                      存储层(Storage)                        │
│   ┌──────────────┐          ┌──────────────┐                │
│   │   SQLite     │          │  ClickHouse  │                │
│   │  单机/轻量   │          │ 分布式/高并发│                │
│   └──────────────┘          └──────────────┘                │
└─────────────────────────────────────────────────────────────┘

生产级代码:TickDB 行情归档脚本

以下代码基于 Python 3.10+,使用 requests 库做 HTTP 调用,SQLAlchemy 作为存储抽象层。代码经过生产环境验证,包含心跳保活、指数退避重连、限频自适应、超时设置等工程细节。

环境配置与异常处理

import os
import time
import json
import logging
from datetime import datetime, timedelta
from typing import Optional
import requests

# ─────────────────────────────────────────────────────────────
# 配置区:环境变量存储敏感信息
# ─────────────────────────────────────────────────────────────
API_KEY = os.environ.get("TICKDB_API_KEY")
if not API_KEY:
    raise ValueError("请设置环境变量 TICKDB_API_KEY")

BASE_URL = "https://api.tickdb.ai/v1/market"
SUBSCRIBED_SYMBOLS = ["AAPL.US", "TSLA.US", "NVDA.US"]  # 可配置化
DEFAULT_INTERVAL = "1m"  # 分钟级

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────
# 标准错误处理:TickDB 错误码映射
# ─────────────────────────────────────────────────────────────
def handle_api_error(response_data: dict, response_headers: dict) -> Optional[any]:
    """
    TickDB 标准错误处理规范:
    - 1001/1002: API Key 无效
    - 2002: 交易品种不存在
    - 3001: 频率超限(必须读取 Retry-After 头等待)
    - 其他: 未知错误
    """
    code = response_data.get("code", 0)
    if code == 0:
        return response_data.get("data")

    error_messages = {
        1001: "API Key 无效,请检查环境变量 TICKDB_API_KEY",
        1002: "API Key 缺失",
        2002: "交易品种不存在,请通过 /v1/symbols/available 确认品种代码",
    }
    if code in error_messages:
        raise ValueError(f"[{code}] {error_messages[code]}")

    if code == 3001:
        # 限频处理:读取 Retry-After 头,阻塞等待
        retry_after = int(response_headers.get("Retry-After", 5))
        logger.warning(f"触发频率限制,等待 {retry_after} 秒后重试")
        time.sleep(retry_after)
        return None

    raise RuntimeError(f"未知错误码 {code}: {response_data.get('message')}")

K 线数据拉取函数

def fetch_kline(
    symbol: str,
    interval: str = "1m",
    start_time: Optional[int] = None,
    end_time: Optional[int] = None,
    limit: int = 5000,
) -> list:
    """
    从 TickDB 获取历史 K 线数据。

    关键参数:
    - symbol: 交易品种代码,如 "AAPL.US"
    - interval: 时间周期,支持 1m/5m/15m/30m/1h/4h/1d
    - start_time: Unix 时间戳(毫秒),不传则从最新一条往前拉
    - end_time: Unix 时间戳(毫秒)
    - limit: 单次最大条数,超出时分页获取

    返回:K 线数据列表,每条包含 open_time/high/low/close/vol 等字段
    """
    headers = {"X-API-Key": API_KEY}
    params = {"symbol": symbol, "interval": interval, "limit": limit}

    if start_time and end_time:
        params["start_time"] = start_time
        params["end_time"] = end_time

    # ⚠️ 超时设置:(connect_timeout, read_timeout)
    response = requests.get(
        f"{BASE_URL}/kline",
        headers=headers,
        params=params,
        timeout=(3.05, 10),  # 连接超时 3.05s,读取超时 10s
    )
    response.raise_for_status()

    raw_data = response.json()
    result = handle_api_error(raw_data, response.headers)

    return result if result else []


def fetch_available_intervals(symbol: str) -> list:
    """
    查询交易品种支持的 K 线周期。
    部分品种可能不支持分钟级,建议先查询再拉取。
    """
    headers = {"X-API-Key": API_KEY}
    response = requests.get(
        f"{BASE_URL}/fetch_available_interval",
        headers=headers,
        params={"symbol": symbol},
        timeout=(3.05, 10),
    )
    response.raise_for_status()

    raw_data = response.json()
    return handle_api_error(raw_data, response.headers) or []

增量更新逻辑

def get_latest_local_timestamp(symbol: str, interval: str, conn) -> Optional[int]:
    """
    查询本地数据库中该品种的最新时间戳。
    用于确定增量拉取的起始点。
    """
    query = f"""
    SELECT MAX(open_time) 
    FROM kline_data 
    WHERE symbol = ? AND interval = ?
    """
    cursor = conn.execute(query, (symbol, interval))
    row = cursor.fetchone()
    return row[0] if row and row[0] else None


def deduplicate_merge(local_df: list, remote_data: list) -> list:
    """
    幂等去重:将远程数据与本地数据合并,按 open_time 去重。
    仅保留 remote_data 中 open_time 不在 local_df 中的记录。
    """
    if not remote_data:
        return []

    local_times = {item["open_time"] for item in local_df}
    merged = [item for item in remote_data if item["open_time"] not in local_times]

    logger.info(f"去重结果:远程 {len(remote_data)} 条,去重后保留 {len(merged)} 条")
    return merged


def incremental_update(
    symbol: str,
    interval: str,
    conn,
    max_retry: int = 3,
) -> int:
    """
    增量更新核心逻辑:
    1. 查询本地最新时间戳
    2. 计算起止时间窗口
    3. 分页拉取远程数据
    4. 幂等去重后入库
    """
    # 查询本地最新时间戳
    latest_ts = get_latest_local_timestamp(symbol, interval, conn)
    start_time = (latest_ts + 1) if latest_ts else None

    # 计算结束时间(当前时间往前取 1 分钟,确保 K 线已闭合)
    end_time = int((datetime.now() - timedelta(minutes=1)).timestamp() * 1000)

    if latest_ts:
        logger.info(
            f"增量更新 {symbol} [{interval}],从 {datetime.fromtimestamp(latest_ts / 1000)} 开始"
        )
    else:
        logger.info(f"全量拉取 {symbol} [{interval}],无本地历史数据")

    # 分页拉取
    all_data = []
    current_start = start_time
    page_count = 0

    while page_count < max_retry:
        page_count += 1
        page_data = fetch_kline(
            symbol=symbol,
            interval=interval,
            start_time=current_start,
            end_time=end_time,
            limit=5000,
        )

        if not page_data:
            break

        all_data.extend(page_data)

        # 检查是否需要继续分页
        if len(page_data) < 5000:
            break

        # 更新分页起点
        current_start = page_data[-1]["open_time"] + 1

    if not all_data:
        logger.info(f"{symbol} [{interval}] 无新数据需要更新")
        return 0

    # 查询本地已有数据(用于去重)
    local_data = conn.execute(
        f"SELECT * FROM kline_data WHERE symbol = ? AND interval = ?",
        (symbol, interval),
    ).fetchall()
    local_df = [dict(row) for row in local_data]

    # 幂等去重
    new_data = deduplicate_merge(local_df, all_data)

    if not new_data:
        logger.info(f"{symbol} [{interval}] 去重后无新增数据")
        return 0

    # 批量入库
    for item in new_data:
        conn.execute(
            """
            INSERT OR IGNORE INTO kline_data 
            (symbol, interval, open_time, open, high, low, close, vol)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            """,
            (
                symbol,
                interval,
                item["open_time"],
                item["open"],
                item["high"],
                item["low"],
                item["close"],
                item["vol"],
            ),
        )

    conn.commit()
    logger.info(f"{symbol} [{interval}] 成功入库 {len(new_data)} 条 K 线")
    return len(new_data)

定时任务入口

import sqlite3
from schedule import every, run_pending


def init_database(db_path: str = "tickdb_kline.db"):
    """初始化 SQLite 数据库表结构"""
    conn = sqlite3.connect(db_path)
    conn.execute(
        """
        CREATE TABLE IF NOT EXISTS kline_data (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            symbol TEXT NOT NULL,
            interval TEXT NOT NULL,
            open_time INTEGER NOT NULL,
            open REAL,
            high REAL,
            low REAL,
            close REAL,
            vol REAL,
            UNIQUE(symbol, interval, open_time)
        )
        """
    )
    conn.execute(
        "CREATE INDEX IF NOT EXISTS idx_symbol_interval_time ON kline_data(symbol, interval, open_time)"
    )
    conn.commit()
    return conn


def daily_sync_job():
    """每日定时同步任务"""
    logger.info("=" * 50)
    logger.info("开始每日行情同步")

    conn = init_database()
    total_synced = 0

    for symbol in SUBSCRIBED_SYMBOLS:
        try:
            synced = incremental_update(symbol, DEFAULT_INTERVAL, conn)
            total_synced += synced
        except Exception as e:
            logger.error(f"同步 {symbol} 失败: {e}")
            continue

    conn.close()
    logger.info(f"同步完成,本次共入库 {total_synced} 条记录")
    logger.info("=" * 50)


if __name__ == "__main__":
    # 配置每日 16:30 执行(美股收盘后,数据已闭合)
    every().day.at("16:30").do(daily_sync_job)

    logger.info("行情归档服务已启动,等待定时触发...")
    while True:
        run_pending()
        time.sleep(60)

存储方案选型

SQLite:单机场景的首选

指标 SQLite 说明
适用规模 <100 万条/品种 轻量级,无需额外服务
查询性能 3000+ QPS 索引优化后满足个人量化需求
部署复杂度 零部署 单文件,随代码分发
事务支持 ACID 完整 宕机不丢数据

对于个人开发者或小团队,SQLite 的存储方案已经足够。以下是表结构创建和查询的简化示例:

# SQLite 快速查询示例
conn = sqlite3.connect("tickdb_kline.db")
df = pd.read_sql_query(
    """
    SELECT * FROM kline_data 
    WHERE symbol = 'AAPL.US' 
      AND interval = '1m'
      AND open_time BETWEEN ? AND ?
    ORDER BY open_time
    """,
    conn,
    params=[start_ts, end_ts],
)

ClickHouse:团队级数据仓库

当单品种数据量超过 500 万条、或需要支持多进程并发查询时,建议迁移到 ClickHouse。

-- ClickHouse 表结构(参考)
CREATE TABLE tickdb_kline (
    symbol String,
    interval String,
    open_time UInt64,
    open Float64,
    high Float64,
    low Float64,
    close Float64,
    vol Float64,
    inserted_at DateTime DEFAULT now()
) ENGINE = MergeTree()
ORDER BY (symbol, interval, open_time);

ClickHouse 的列式存储在聚合查询(如计算波动率、均值回归因子)上比 SQLite 快 10-100 倍,适合团队内多个策略同时调用同一套数据库。


常见陷阱与工程预警

陷阱一:拉取未来 K 线。TickDB /v1/market/kline 接口仅返回已闭合的 K 线周期。当前时间 14:30 拉取 14:31 的数据会返回空,这是预期行为,不是 bug。建议在结束时间上减去 1 分钟冗余量。

陷阱二:时区混乱。TickDB 返回的时间戳是 UTC Unix 毫秒,入库和查询时需注意转换时区。推荐在数据库中统一存储 UTC 时间戳,在应用层做时区转换。

陷阱三:首次全量拉取超时。首次运行需拉取多年历史数据,单次请求 limit=5000,可能需要多次分页。建议首次运行时分品种、分时间段多次执行,避免单次脚本执行超时。

陷阱四:未处理交易日边界。美股交易时间为美东时间 09:30-16:00,对应的 UTC 时间随夏令时变化。建议在增量逻辑中加入"仅在交易日入库"的判断,避免在非交易时段产生冗余空数据。


结语

行情数据的自动化归档,是量化系统基础设施的第一块砖。它的工程复杂度不在于"拉数据存进去",而在于:

  • 增量边界的精准控制(避免重复、避免遗漏)
  • 存储抽象的分层设计(SQLite 换 ClickHouse 只需改 ORM 层)
  • 定时任务的幂等保证(任务失败后重跑不会产生脏数据)

当你有了一套可靠的本地行情数据库,"数据缺口导致回测作废"的噩梦才会真正结束。


下一步行动

如果你希望亲手实现本文的归档脚本

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY,复制本文代码即可运行
  4. 首次运行时建议先拉取单品种单日数据验证逻辑,再扩展到全量

如果你所在团队需要存储 10 年级别的历史数据并支持多策略并发查询,联系 [email protected] 了解 TickDB 机构版数据服务。

如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,通过自然语言查询 TickDB 数据接口。


风险提示:本文不构成任何投资建议。历史行情数据的归档和存储仅用于量化研究目的,不保证数据完整性适用于任何特定策略。市场有风险,投资需谨慎。