凌晨 3 点 17 分,你被一条告警震醒。

定时任务显示“同步完成,提交 847 条新记录”。你揉着眼睛打开数据库,发现不对——本地记录从 12 万条变成了 19 万条。这意味着上游新增的记录数远少于同步脚本告诉你的数量。或者是重复插入了,或者是某条历史记录被上游悄悄改了。

你打开上游的变更日志,发现他们上周做了一次数据清洗,把 2019 年的历史记录重新编排了 ID。你过去三年的本地数据,现在和上游完全对不上了。

这不是脚本写得烂。这是增量同步的系统性盲区:只靠“新增”判断同步,永远不知道什么时候漏了修改,什么时候多了重复。

今天我们拆解三个核心机制:基于时间戳的增量拉取、UPSERT 去重、以及版本号+哈希校验的回溯检测。这套方案不是“最佳实践”,而是在生产环境跑了三年没有数据事故的经验总结


一、问题建模:你的同步系统缺了什么?

在动手写代码之前,先把增量同步的完整生命周期拆解清楚。大多数同步脚本的问题,不是实现细节有缺陷,而是漏掉了某些状态的检测

1.1 同步状态的四种可能

一条记录从数据源到本地数据库,可能处于以下四种状态:

状态 定义 检测方式
新增(New) 本地不存在,上游存在 基于时间戳/ID 范围查询
更新(Modified) 本地存在,上游版本更新 版本号对比 或 哈希校验
删除(Deleted) 本地存在,上游已删除 软删除标记 或 增量删除同步
未变(Unchanged) 两边完全一致 无需操作

大多数脚本只覆盖了第一种状态。进阶一点会检测第二种。但很少有人处理第三种,直到某天发现本地数据库里躺着上游已经删除了三年的记录。

1.2 三个核心障碍

增量同步在生产环境面临三个核心障碍:

障碍一:时间戳不可靠。 上游数据源的 updated_at 字段并不总是准确更新。部分老系统只在插入时写时间戳,修改时不更新。单纯基于时间范围的增量拉取会漏掉大量修改。

障碍二:主键漂移。 上游有时会重新生成 ID(比如数据迁移后),导致本地记录的 foreign_id 与上游新的记录无法关联。这种情况下,UPSERT 语义会插入新记录而不是更新旧记录,产生数据冗余。

障碍三:回溯修改。 这是最隐蔽的问题:上游在下周修改了上周已经同步过的数据。本地数据库永远停留在上周的状态,不知道这些修改发生了。三个月后你做因子回测,用的是过期的数据。

对应这三个障碍,我们有三条防线:

防线 解决的问题 核心技术
第一条:时间戳 + 版本号 基础增量识别 基于 updated_atversion 字段过滤
第二条:UPSERT 去重 主键冲突和重复插入 ON CONFLICT DO UPDATEREPLACE INTO
第三条:哈希校验 回溯修改检测 SHA-256 哈希值与本地存储版本比对

二、防线一:增量拉取——不只是时间戳

2.1 基础方案:基于时间戳的增量

最简单的增量拉取逻辑是:每次同步时记录上一次同步的时间点,下次只拉取该时间点之后更新的数据。

import os
import requests
from datetime import datetime, timezone

# 从配置文件读取上次同步时间(实际应持久化到数据库或文件)
LAST_SYNC_TIME = os.environ.get("LAST_SYNC_TIME", "2024-01-01T00:00:00Z")
API_KEY = os.environ.get("TICKDB_API_KEY")

def fetch_incremental_data(since: str, limit: int = 1000):
    """
    基于时间戳的增量拉取
    适用于数据源提供了 accurate updated_at 字段的情况
    """
    headers = {"X-API-Key": API_KEY}
    params = {
        "since": since,  # ISO 8601 时间格式
        "limit": limit,
        "order": "asc"  # 按时间升序,确保增量顺序
    }
    
    response = requests.get(
        "https://api.tickdb.ai/v1/market/kline",
        headers=headers,
        params=params,
        timeout=(3.05, 10)
    )
    
    if response.status_code == 429:
        retry_after = int(response.headers.get("Retry-After", 60))
        time.sleep(retry_after)
        return fetch_incremental_data(since, limit)
    
    data = response.json()
    if data.get("code") != 0:
        raise RuntimeError(f"API Error: {data}")
    
    return data.get("data", [])

# ⚠️ 注意:这个方案在 updated_at 不准确时会产生漏同步

时间戳方案的局限:上游系统有时不更新 updated_at,或者时区处理不一致。我见过一个案例,数据源用的是服务器本地时间而非 UTC,每次同步都有几个小时的漂移。

2.2 进阶方案:版本号字段

如果数据源提供了显式的版本号字段(如 versionrevisionseq),基于版本号的增量拉取比时间戳更可靠。

from typing import Optional

class VersionBasedSync:
    """
    基于版本号的增量同步
    版本号必须是单调递增的整数,每次更新自增
    """
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.headers = {"X-API-Key": api_key}
    
    def fetch_since_version(self, last_version: Optional[int] = None):
        """
        last_version: 上次同步的最大版本号,None 表示全量拉取
        """
        params = {}
        if last_version is not None:
            params["from_version"] = last_version + 1  # 从下一个版本开始
        
        # 实际 TickDB 不支持 version 参数,此处仅为示例逻辑
        # 如需版本号支持,需联系 [email protected] 确认
        response = requests.get(
            "https://api.tickdb.ai/v1/market/kline",
            headers=self.headers,
            params=params,
            timeout=(3.05, 10)
        )
        
        return response.json()
    
    def persist_last_version(self, version: int):
        """
        将本次同步的最大版本号持久化
        生产环境建议存储到 Redis 或数据库,而非文件
        """
        # 这里简化处理,实际应使用更可靠的持久化方案
        with open(".sync_checkpoint", "w") as f:
            f.write(str(version))

版本号方案的优势:版本号是单调递增的整数,不受服务器时间漂移影响,不存在时区歧义。但问题是,大多数公开 API 并不提供版本号字段。

2.3 生产级方案:游标分页 + 双保险

实际生产环境中,我推荐时间戳 + 分页游标的组合方案:

import time
import logging
from dataclasses import dataclass
from typing import Generator, List, Dict, Any

@dataclass
class SyncResult:
    """同步结果容器"""
    fetched: int
    upserted: int
    skipped: int
    errors: int
    next_cursor: str

class CursorBasedSync:
    """
    基于游标分页的增量同步
    双保险:时间戳过滤 + 分页游标 + 限频自适应
    """
    
    BASE_URL = "https://api.tickdb.ai/v1/market/kline"
    MAX_RETRIES = 3
    RATE_LIMIT_DELAY = 1  # 默认限频间隔
    
    def __init__(self, api_key: str, symbol: str):
        self.api_key = api_key
        self.symbol = symbol
        self.headers = {"X-API-Key": api_key}
        self.logger = logging.getLogger(__name__)
    
    def incremental_sync(
        self, 
        start_time: str, 
        end_time: str = None,
        limit: int = 1000
    ) -> SyncResult:
        """
        增量同步主流程
        
        Args:
            start_time: ISO 8601 格式,开始时间
            end_time: 结束时间,None 表示当前时间
            limit: 每页大小
        
        Returns:
            SyncResult: 同步统计和游标信息
        """
        result = SyncResult(0, 0, 0, 0, "")
        cursor = start_time
        consecutive_empty = 0
        
        while True:
            # 分页请求
            data, next_cursor = self._fetch_page(cursor, end_time, limit)
            
            if not data:
                consecutive_empty += 1
                if consecutive_empty >= 3:
                    self.logger.info("连续3页无数据,退出同步")
                    break
                time.sleep(self.RATE_LIMIT_DELAY)
                continue
            
            consecutive_empty = 0
            
            # UPSERT 入库
            upserted, skipped = self._upsert_batch(data)
            
            result.fetched += len(data)
            result.upserted += upserted
            result.skipped += skipped
            result.next_cursor = next_cursor
            
            # 检查游标
            if not next_cursor or next_cursor == cursor:
                break
            cursor = next_cursor
            
            # 限频保护
            time.sleep(self.RATE_LIMIT_DELAY)
        
        self.logger.info(
            f"同步完成:获取 {result.fetched} 条,"
            f"写入 {result.upserted} 条,跳过 {result.skipped} 条重复"
        )
        return result
    
    def _fetch_page(self, cursor: str, end_time: str, limit: int):
        """单页拉取,含重试和限频处理"""
        params = {
            "symbol": self.symbol,
            "start_time": cursor,
            "limit": min(limit, 1000),  # API 限制最大1000
            "interval": "1h"
        }
        if end_time:
            params["end_time"] = end_time
        
        for attempt in range(self.MAX_RETRIES):
            try:
                response = requests.get(
                    self.BASE_URL,
                    headers=self.headers,
                    params=params,
                    timeout=(3.05, 10)
                )
                
                if response.status_code == 429:
                    retry_after = int(response.headers.get("Retry-After", 60))
                    self.logger.warning(f"触发限频,等待 {retry_after} 秒")
                    time.sleep(retry_after)
                    continue
                
                result = response.json()
                
                # 处理限频错误码
                if result.get("code") == 3001:
                    retry_after = int(response.headers.get("Retry-After", 5))
                    self.logger.warning(f"API 限频(code:3001),等待 {retry_after}s")
                    time.sleep(retry_after)
                    continue
                
                if result.get("code") != 0:
                    raise RuntimeError(f"API error: {result}")
                
                data = result.get("data", [])
                next_cursor = result.get("next_cursor", "")
                return data, next_cursor
                
            except requests.RequestException as e:
                self.logger.error(f"请求失败 (尝试 {attempt + 1}/{self.MAX_RETRIES}): {e}")
                if attempt < self.MAX_RETRIES - 1:
                    # 指数退避 + 抖动
                    delay = min(2 ** attempt * 0.5, 30)
                    jitter = random.uniform(0, delay * 0.1)
                    time.sleep(delay + jitter)
                else:
                    result.errors += 1
                    raise
        
        return [], ""
    
    def _upsert_batch(self, data: List[Dict[str, Any]]):
        """
        批量 UPSERT
        具体实现取决于使用的数据库(PostgreSQL/MySQL/SQLite)
        """
        # PostgreSQL 示例
        upserted = 0
        skipped = 0
        
        for record in data:
            try:
                self._upsert_single(record)
                upserted += 1
            except Exception as e:
                self.logger.error(f"UPSERT 失败: {e}")
                skipped += 1
        
        return upserted, skipped
    
    def _upsert_single(self, record: Dict[str, Any]):
        """单条记录 UPSERT"""
        raise NotImplementedError("子类实现数据库-specific UPSERT")

三、防线二:UPSERT 去重——让重复写入变成零操作

3.1 为什么普通 INSERT 会产生冗余

普通 INSERT INTO ... VALUES (...) 在主键冲突时会直接报错。这意味着:

  1. 首次同步成功
  2. 第二次同步遇到相同主键 → 报错 → 脚本卡住或跳过他
  3. 或者脚本在同步前先 DELETE WHERE id = ?,但这在高并发下可能丢失两次同步之间的新写入

UPSERT(Update on Conflict)解决了这个问题:冲突时更新,不冲突时插入,语义上等价于“先查后写”的合并操作。

3.2 PostgreSQL 方案:ON CONFLICT DO UPDATE

-- PostgreSQL UPSERT 语法
INSERT INTO market_kline (symbol, timestamp, open, high, low, close, volume, updated_at)
VALUES ('AAPL.US', '2024-01-15 09:30:00', 185.50, 186.20, 185.30, 186.00, 1523400, NOW())
ON CONFLICT (symbol, timestamp)  -- 唯一约束列
DO UPDATE SET
    open = EXCLUDED.open,
    high = EXCLUDED.high,
    low = EXCLUDED.low,
    close = EXCLUDED.close,
    volume = EXCLUDED.volume,
    updated_at = EXCLUDED.updated_at,  -- 更新本地记录时间
    hash = md5(row_to_json(EXCLUDED.*)::text)  -- 同时更新哈希
WHERE market_kline.updated_at < EXCLUDED.updated_at;  -- 条件更新:只更新更新的记录

关键语法解释

  • ON CONFLICT (symbol, timestamp):指定唯一约束列,当这些列冲突时执行 UPDATE
  • EXCLUDED:引用尝试插入的值(即新数据)
  • WHERE ...:条件更新,避免因时间戳漂移导致旧数据覆盖新数据

3.3 Python 实现:带事务的批量 UPSERT

import psycopg2
from psycopg2.extras import execute_values
from typing import List, Dict, Any
from datetime import datetime

class PostgresUpsert:
    """
    PostgreSQL 生产级 UPSERT 实现
    支持批量操作、错误回滚、哈希校验
    """
    
    def __init__(self, connection_string: str):
        self.conn = psycopg2.connect(connection_string)
        self.conn.autocommit = False  # 手动控制事务
    
    def batch_upsert_kline(self, records: List[Dict[str, Any]]):
        """
        批量 UPSERT K线数据
        
        Args:
            records: [{"symbol": "AAPL.US", "timestamp": "...", "open": 185.5, ...}, ...]
        """
        if not records:
            return 0
        
        # 构造批量插入的值列表
        values = []
        for r in records:
            # 计算哈希(用于后续回溯检测)
            hash_value = self._compute_hash(r)
            values.append((
                r["symbol"],
                r["timestamp"],
                r.get("interval", "1h"),
                r.get("open", 0),
                r.get("high", 0),
                r.get("low", 0),
                r.get("close", 0),
                r.get("volume", 0),
                hash_value,
                datetime.now(timezone.utc)  # updated_at
            ))
        
        # 唯一约束列
        conflict_cols = ["symbol", "timestamp"]
        update_cols = ["interval", "open", "high", "low", "close", "volume", "hash", "updated_at"]
        
        query = f"""
            INSERT INTO market_kline 
                (symbol, timestamp, interval, open, high, low, close, volume, hash, updated_at)
            VALUES %s
            ON CONFLICT ({', '.join(conflict_cols)})
            DO UPDATE SET
                interval = EXCLUDED.interval,
                open = EXCLUDED.open,
                high = EXCLUDED.high,
                low = EXCLUDED.low,
                close = EXCLUDED.close,
                volume = EXCLUDED.volume,
                hash = EXCLUDED.hash,
                updated_at = EXCLUDED.updated_at
            WHERE market_kline.hash != EXCLUDED.hash  -- 只更新哈希不同的记录
        """
        
        try:
            with self.conn.cursor() as cur:
                execute_values(cur, query, values, page_size=500)
            self.conn.commit()
            return len(records)
        except Exception as e:
            self.conn.rollback()
            raise RuntimeError(f"UPSERT 批量写入失败: {e}")
    
    @staticmethod
    def _compute_hash(record: Dict[str, Any]) -> str:
        """
        计算记录哈希
        用于回溯修改检测
        只对核心字段计算,排除 updated_at 等元数据字段
        """
        import hashlib
        import json
        
        # 核心字段(排除元数据)
        core_fields = ["symbol", "timestamp", "interval", "open", "high", "low", "close", "volume"]
        core_data = {k: record.get(k) for k in core_fields if k in record}
        
        canonical = json.dumps(core_data, sort_keys=True, separators=(',', ':'))
        return hashlib.sha256(canonical.encode()).hexdigest()

3.4 防重设计:唯一约束是地基

UPSERT 的前提是数据库层有唯一约束。如果没有唯一约束,UPSERT 语法会报错,你只能退回到“先 DELETE 再 INSERT”的方案。

-- 确保唯一约束存在(如果不存在则创建)
ALTER TABLE market_kline 
ADD CONSTRAINT market_kline_unique_placeholder
UNIQUE (symbol, timestamp, interval);

-- 如果表已存在重复数据,先清理
DELETE FROM market_kline a
WHERE EXISTS (
    SELECT 1 FROM market_kline b
    WHERE b.symbol = a.symbol 
      AND b.timestamp = a.timestamp 
      AND b.interval = a.interval
      AND b.ctid > a.ctid  -- 保留 rowid 更小的即更早插入的
);

四、防线三:回溯修改检测——哈希校验的工程实践

这是本文最关键的部分。前面两节讲的是“增量拉取”和“去重”,但即使做到了这两点,仍然无法检测上游的历史修改。

4.1 为什么需要哈希校验

场景还原:

  1. 2024 年 1 月 1 日,TickDB 上有一根 K 线:AAPL.US, 09:30, close=185.50
  2. 你的同步脚本拉取了这条数据,存入本地,哈希值 = abc123
  3. 2024 年 1 月 15 日,TickDB 因为某种原因(数据清洗、算法调整)把这条数据的 close 改成了 185.52
  4. 你的同步脚本再次运行时,基于时间戳只拉取 1 月 15 日之后的数据,不会重新拉取 1 月 1 日的修改
  5. 本地数据库永远停留在 close=185.50,而上游已是 185.52

三个月后你做回测,误差已经累积到不可接受的程度。

哈希校验的核心逻辑:每次 UPSERT 时同时更新该记录的哈希值。下次同步时,如果上游数据修改了,哈希值会变化,触发更新。

4.2 哈希计算的四个原则

原则 说明 示例
只计算核心字段 排除 updated_atid 等元数据字段 不用 `md5(timestamp
Canonical 序列化 键的顺序要一致 {"a": 1, "b": 2}{"b": 2, "a": 1} 哈希相同
使用加密哈希 SHA-256 或 MD5(数据量大时 MD5 更快) 不使用自定义字符串拼接
存储格式 以字符串存储在数据库列中 VARCHAR(64) for SHA-256

4.3 检测脚本:找出被修改的历史数据

import hashlib
import json
import logging
from typing import Dict, Any, List, Tuple
from datetime import datetime, timezone

class RetroactiveChangeDetector:
    """
    回溯修改检测器
    定期比对本地哈希与上游数据哈希,发现历史修改
    """
    
    def __init__(self, sync_client, db_connection):
        self.sync = sync_client
        self.db = db_connection
        self.logger = logging.getLogger(__name__)
    
    def detect_changes(
        self, 
        symbol: str, 
        check_since: str = None,
        sample_rate: float = 1.0
    ) -> List[Dict[str, Any]]:
        """
        检测回溯修改
        
        Args:
            symbol: 交易品种
            check_since: 检查起始时间,None 表示最近30天
            sample_rate: 采样率,1.0 = 全量检查
        
        Returns:
            存在修改的记录列表
        """
        if check_since is None:
            # 默认检查最近30天
            check_since = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()
        
        # 获取本地记录的哈希
        local_hashes = self._get_local_hashes(symbol, check_since, sample_rate)
        
        # 逐条拉取上游数据进行比对
        changed_records = []
        for record in local_hashes:
            upstream = self._fetch_upstream_record(record["symbol"], record["timestamp"])
            
            if upstream is None:
                self.logger.warning(f"上游不存在本地记录: {record['symbol']} @ {record['timestamp']}")
                continue
            
            local_hash = record["hash"]
            upstream_hash = self._compute_hash(upstream)
            
            if local_hash != upstream_hash:
                changed_records.append({
                    "symbol": record["symbol"],
                    "timestamp": record["timestamp"],
                    "local_value": record,
                    "upstream_value": upstream,
                    "local_hash": local_hash,
                    "upstream_hash": upstream_hash
                })
                self.logger.info(
                    f"检测到回溯修改: {symbol} @ {record['timestamp']}, "
                    f"本地={local_hash[:8]}, 上游={upstream_hash[:8]}"
                )
        
        return changed_records
    
    def _get_local_hashes(
        self, 
        symbol: str, 
        since: str, 
        sample_rate: float
    ) -> List[Dict[str, Any]]:
        """从本地数据库读取哈希值"""
        query = f"""
            SELECT symbol, timestamp, hash, open, high, low, close, volume
            FROM market_kline
            WHERE symbol = %s AND timestamp >= %s
            ORDER BY timestamp
        """
        
        with self.db.cursor() as cur:
            if sample_rate < 1.0:
                query += f" TABLESAMPLE SYSTEM ({sample_rate * 100})"
            
            cur.execute(query, (symbol, since))
            columns = [desc[0] for desc in cur.description]
            return [dict(zip(columns, row)) for row in cur.fetchall()]
    
    def _fetch_upstream_record(self, symbol: str, timestamp: str) -> Dict[str, Any]:
        """
        从上游拉取单条记录
        限频处理同前文
        """
        params = {
            "symbol": symbol,
            "start_time": timestamp,
            "end_time": timestamp,
            "limit": 1
        }
        
        response = requests.get(
            "https://api.tickdb.ai/v1/market/kline",
            headers={"X-API-Key": os.environ.get("TICKDB_API_KEY")},
            params=params,
            timeout=(3.05, 10)
        )
        
        data = response.json()
        if data.get("code") != 0 or not data.get("data"):
            return None
        
        return data["data"][0]
    
    @staticmethod
    def _compute_hash(record: Dict[str, Any]) -> str:
        """SHA-256 哈希"""
        core_fields = ["symbol", "timestamp", "interval", "open", "high", "low", "close", "volume"]
        core_data = {k: record.get(k) for k in core_fields if k in record}
        canonical = json.dumps(core_data, sort_keys=True, separators=(',', ':'))
        return hashlib.sha256(canonical.encode()).hexdigest()
    
    def repair_changes(self, changes: List[Dict[str, Any]], dry_run: bool = True):
        """
        修复检测到的回溯修改
        
        Args:
            changes: detect_changes() 返回的修改列表
            dry_run: True = 只记录不实际更新
        """
        if not changes:
            self.logger.info("无回溯修改需要修复")
            return
        
        action = "【模拟】修复" if dry_run else "修复"
        self.logger.info(f"{action} {len(changes)} 条回溯修改")
        
        for change in changes:
            self.logger.info(
                f"{action}: {change['symbol']} @ {change['timestamp']}\n"
                f"  本地 close={change['local_value'].get('close')}, "
                f"上游 close={change['upstream_value'].get('close')}"
            )
            
            if not dry_run:
                self._apply_fix(change)
    
    def _apply_fix(self, change: Dict[str, Any]):
        """执行修复更新"""
        query = """
            UPDATE market_kline
            SET open = %s, high = %s, low = %s, close = %s, volume = %s,
                hash = %s, updated_at = %s, is_backfilled = TRUE
            WHERE symbol = %s AND timestamp = %s
        """
        
        upstream = change["upstream_value"]
        hash_val = self._compute_hash(upstream)
        
        with self.db.cursor() as cur:
            cur.execute(query, (
                upstream.get("open"), upstream.get("high"), upstream.get("low"),
                upstream.get("close"), upstream.get("volume"),
                hash_val, datetime.now(timezone.utc),
                change["symbol"], change["timestamp"]
            ))
        
        self.db.commit()

4.4 调度策略:什么时候跑检测?

回溯修改检测的成本比增量同步高得多(需要全量拉取历史数据),建议按以下策略调度:

场景 调度频率 说明
数据清洗期(上游通知) 立即执行 上游告知大规模数据修正时
每周例行 周末低峰期 检查最近7天的回溯修改
每月审计 月末 全量抽样检查,采样率 10%
回测前 按需 重要策略上线前回溯验证

五、部署方案:三种规模的实现差异

维度 个人开发者 团队(小规模) 机构级
调度方式 手动触发 / cron Airflow / Prefect 自研调度系统
数据库 SQLite PostgreSQL PostgreSQL + 分区表
哈希存储 列存储 列存储 + 索引 列存储 + 分区索引 + 列式存储
检测频率 按需 每日一次 实时监控
告警集成 日志打印 飞书 / Slack PagerDuty + 邮件
成本估算 $0 $50-200/月 $500+/月

六、完整流程图

┌─────────────────────────────────────────────────────────────┐
│                    增量同步完整流程                            │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
                    ┌─────────────────┐
                    │   读取游标/时间戳 │
                    └────────┬────────┘
                              │
                              ▼
              ┌─────────────────────────────┐
              │      分页拉取上游数据        │
              │  (限频处理 / 重连 / 抖动)   │
              └─────────────┬───────────────┘
                            │
                            ▼
              ┌─────────────────────────────┐
              │      遍历每条记录           │
              │                             │
              │  ┌───────────────────────┐  │
              │  │  计算 SHA-256 哈希    │  │
              │  └───────────────────────┘  │
              │             │               │
              │             ▼               │
              │  ┌───────────────────────┐  │
              │  │  UPSERT (PostgreSQL) │  │
              │  │  ON CONFLICT        │  │
              │  │  WHERE hash != NEW   │  │
              │  └───────────────────────┘  │
              │             │               │
              │             ▼               │
              │    ┌─────────┴─────────┐    │
              │    │   哈希变化?      │    │
              │    └─────────┬─────────┘    │
              │         是/否 │             │
              │      ┌───────┴───────┐     │
              │      ▼               ▼     │
              │   写入+更新   跳过(无变化) │
              │      │               │     │
              │      └───────┬───────┘     │
              └──────────────┼──────────────┘
                             │
                             ▼
                    ┌─────────────────┐
                    │   持久化游标     │
                    │ (Redis/DB/File) │
                    └────────┬────────┘
                             │
                             ▼
              ┌─────────────────────────────┐
              │    定期回溯检测(可选)       │
              │  (采样拉取 / 哈希比对)        │
              └─────────────────────────────┘

结语

增量同步不是“拉数据存进去”这么简单。它是一个需要同时处理新增识别、去重写入、回溯检测的完整系统。三个防线的设计逻辑:

  • 时间戳/版本号过滤:降低网络和存储成本,只拉必要的增量
  • UPSERT 去重:确保幂等性,重复运行不会产生脏数据
  • 哈希校验:检测上游的历史修改,保证本地数据的最终一致性

这三层防线组合使用,即便上游发生数据清洗或算法调整,本地数据库也能在下一个同步周期内收敛到最新状态。

如果你已经有了数据同步方案,不妨问自己三个问题

  1. 上游修改了三个月前的历史数据,你的本地会感知到吗?
  2. 同步脚本连续跑两次,数据库里会多出一倍重复数据吗?
  3. 如果游标丢失,你能从哪个时间点恢复?

如果任何一个问题你回答不了,这套方案值得你花一个周末搭起来。


下一步行动

如果你是个人开发者,想快速验证这套方案:

  1. 访问 tickdb.ai 注册获取免费 API Key
  2. 用上文提供的 Python 脚本对接 /kline 接口
  3. PostgreSQL 的 UPSERT 语法可以本地用 Docker 起一个

如果你需要 TickDB 提供美股 10 年历史 K 线数据做策略回测,联系 [email protected] 了解机构级数据方案,包含回溯修改日志和版本号接口支持。

如果你习惯用 AI 辅助开发,在 ClawHub 搜索安装 tickdb-market-data SKILL,可以直接用自然语言查询 TickDB 的数据接口能力。


本文涉及的技术实现均基于 PostgreSQL 数据库。如使用 MySQL 或 SQLite,UPSERT 语法需做相应调整。TickDB 的 /kline 接口支持历史 K 线查询,但不支持美股和 A 股的 tick 级逐笔成交数据。