凌晨 3 点 17 分,你被一条告警震醒。
定时任务显示“同步完成,提交 847 条新记录”。你揉着眼睛打开数据库,发现不对——本地记录从 12 万条变成了 19 万条。这意味着上游新增的记录数远少于同步脚本告诉你的数量。或者是重复插入了,或者是某条历史记录被上游悄悄改了。
你打开上游的变更日志,发现他们上周做了一次数据清洗,把 2019 年的历史记录重新编排了 ID。你过去三年的本地数据,现在和上游完全对不上了。
这不是脚本写得烂。这是增量同步的系统性盲区:只靠“新增”判断同步,永远不知道什么时候漏了修改,什么时候多了重复。
今天我们拆解三个核心机制:基于时间戳的增量拉取、UPSERT 去重、以及版本号+哈希校验的回溯检测。这套方案不是“最佳实践”,而是在生产环境跑了三年没有数据事故的经验总结。
一、问题建模:你的同步系统缺了什么?
在动手写代码之前,先把增量同步的完整生命周期拆解清楚。大多数同步脚本的问题,不是实现细节有缺陷,而是漏掉了某些状态的检测。
1.1 同步状态的四种可能
一条记录从数据源到本地数据库,可能处于以下四种状态:
| 状态 | 定义 | 检测方式 |
|---|---|---|
| 新增(New) | 本地不存在,上游存在 | 基于时间戳/ID 范围查询 |
| 更新(Modified) | 本地存在,上游版本更新 | 版本号对比 或 哈希校验 |
| 删除(Deleted) | 本地存在,上游已删除 | 软删除标记 或 增量删除同步 |
| 未变(Unchanged) | 两边完全一致 | 无需操作 |
大多数脚本只覆盖了第一种状态。进阶一点会检测第二种。但很少有人处理第三种,直到某天发现本地数据库里躺着上游已经删除了三年的记录。
1.2 三个核心障碍
增量同步在生产环境面临三个核心障碍:
障碍一:时间戳不可靠。 上游数据源的 updated_at 字段并不总是准确更新。部分老系统只在插入时写时间戳,修改时不更新。单纯基于时间范围的增量拉取会漏掉大量修改。
障碍二:主键漂移。 上游有时会重新生成 ID(比如数据迁移后),导致本地记录的 foreign_id 与上游新的记录无法关联。这种情况下,UPSERT 语义会插入新记录而不是更新旧记录,产生数据冗余。
障碍三:回溯修改。 这是最隐蔽的问题:上游在下周修改了上周已经同步过的数据。本地数据库永远停留在上周的状态,不知道这些修改发生了。三个月后你做因子回测,用的是过期的数据。
对应这三个障碍,我们有三条防线:
| 防线 | 解决的问题 | 核心技术 |
|---|---|---|
| 第一条:时间戳 + 版本号 | 基础增量识别 | 基于 updated_at 或 version 字段过滤 |
| 第二条:UPSERT 去重 | 主键冲突和重复插入 | ON CONFLICT DO UPDATE 或 REPLACE INTO |
| 第三条:哈希校验 | 回溯修改检测 | SHA-256 哈希值与本地存储版本比对 |
二、防线一:增量拉取——不只是时间戳
2.1 基础方案:基于时间戳的增量
最简单的增量拉取逻辑是:每次同步时记录上一次同步的时间点,下次只拉取该时间点之后更新的数据。
import os
import requests
from datetime import datetime, timezone
# 从配置文件读取上次同步时间(实际应持久化到数据库或文件)
LAST_SYNC_TIME = os.environ.get("LAST_SYNC_TIME", "2024-01-01T00:00:00Z")
API_KEY = os.environ.get("TICKDB_API_KEY")
def fetch_incremental_data(since: str, limit: int = 1000):
"""
基于时间戳的增量拉取
适用于数据源提供了 accurate updated_at 字段的情况
"""
headers = {"X-API-Key": API_KEY}
params = {
"since": since, # ISO 8601 时间格式
"limit": limit,
"order": "asc" # 按时间升序,确保增量顺序
}
response = requests.get(
"https://api.tickdb.ai/v1/market/kline",
headers=headers,
params=params,
timeout=(3.05, 10)
)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
time.sleep(retry_after)
return fetch_incremental_data(since, limit)
data = response.json()
if data.get("code") != 0:
raise RuntimeError(f"API Error: {data}")
return data.get("data", [])
# ⚠️ 注意:这个方案在 updated_at 不准确时会产生漏同步
时间戳方案的局限:上游系统有时不更新 updated_at,或者时区处理不一致。我见过一个案例,数据源用的是服务器本地时间而非 UTC,每次同步都有几个小时的漂移。
2.2 进阶方案:版本号字段
如果数据源提供了显式的版本号字段(如 version、revision 或 seq),基于版本号的增量拉取比时间戳更可靠。
from typing import Optional
class VersionBasedSync:
"""
基于版本号的增量同步
版本号必须是单调递增的整数,每次更新自增
"""
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {"X-API-Key": api_key}
def fetch_since_version(self, last_version: Optional[int] = None):
"""
last_version: 上次同步的最大版本号,None 表示全量拉取
"""
params = {}
if last_version is not None:
params["from_version"] = last_version + 1 # 从下一个版本开始
# 实际 TickDB 不支持 version 参数,此处仅为示例逻辑
# 如需版本号支持,需联系 [email protected] 确认
response = requests.get(
"https://api.tickdb.ai/v1/market/kline",
headers=self.headers,
params=params,
timeout=(3.05, 10)
)
return response.json()
def persist_last_version(self, version: int):
"""
将本次同步的最大版本号持久化
生产环境建议存储到 Redis 或数据库,而非文件
"""
# 这里简化处理,实际应使用更可靠的持久化方案
with open(".sync_checkpoint", "w") as f:
f.write(str(version))
版本号方案的优势:版本号是单调递增的整数,不受服务器时间漂移影响,不存在时区歧义。但问题是,大多数公开 API 并不提供版本号字段。
2.3 生产级方案:游标分页 + 双保险
实际生产环境中,我推荐时间戳 + 分页游标的组合方案:
import time
import logging
from dataclasses import dataclass
from typing import Generator, List, Dict, Any
@dataclass
class SyncResult:
"""同步结果容器"""
fetched: int
upserted: int
skipped: int
errors: int
next_cursor: str
class CursorBasedSync:
"""
基于游标分页的增量同步
双保险:时间戳过滤 + 分页游标 + 限频自适应
"""
BASE_URL = "https://api.tickdb.ai/v1/market/kline"
MAX_RETRIES = 3
RATE_LIMIT_DELAY = 1 # 默认限频间隔
def __init__(self, api_key: str, symbol: str):
self.api_key = api_key
self.symbol = symbol
self.headers = {"X-API-Key": api_key}
self.logger = logging.getLogger(__name__)
def incremental_sync(
self,
start_time: str,
end_time: str = None,
limit: int = 1000
) -> SyncResult:
"""
增量同步主流程
Args:
start_time: ISO 8601 格式,开始时间
end_time: 结束时间,None 表示当前时间
limit: 每页大小
Returns:
SyncResult: 同步统计和游标信息
"""
result = SyncResult(0, 0, 0, 0, "")
cursor = start_time
consecutive_empty = 0
while True:
# 分页请求
data, next_cursor = self._fetch_page(cursor, end_time, limit)
if not data:
consecutive_empty += 1
if consecutive_empty >= 3:
self.logger.info("连续3页无数据,退出同步")
break
time.sleep(self.RATE_LIMIT_DELAY)
continue
consecutive_empty = 0
# UPSERT 入库
upserted, skipped = self._upsert_batch(data)
result.fetched += len(data)
result.upserted += upserted
result.skipped += skipped
result.next_cursor = next_cursor
# 检查游标
if not next_cursor or next_cursor == cursor:
break
cursor = next_cursor
# 限频保护
time.sleep(self.RATE_LIMIT_DELAY)
self.logger.info(
f"同步完成:获取 {result.fetched} 条,"
f"写入 {result.upserted} 条,跳过 {result.skipped} 条重复"
)
return result
def _fetch_page(self, cursor: str, end_time: str, limit: int):
"""单页拉取,含重试和限频处理"""
params = {
"symbol": self.symbol,
"start_time": cursor,
"limit": min(limit, 1000), # API 限制最大1000
"interval": "1h"
}
if end_time:
params["end_time"] = end_time
for attempt in range(self.MAX_RETRIES):
try:
response = requests.get(
self.BASE_URL,
headers=self.headers,
params=params,
timeout=(3.05, 10)
)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
self.logger.warning(f"触发限频,等待 {retry_after} 秒")
time.sleep(retry_after)
continue
result = response.json()
# 处理限频错误码
if result.get("code") == 3001:
retry_after = int(response.headers.get("Retry-After", 5))
self.logger.warning(f"API 限频(code:3001),等待 {retry_after}s")
time.sleep(retry_after)
continue
if result.get("code") != 0:
raise RuntimeError(f"API error: {result}")
data = result.get("data", [])
next_cursor = result.get("next_cursor", "")
return data, next_cursor
except requests.RequestException as e:
self.logger.error(f"请求失败 (尝试 {attempt + 1}/{self.MAX_RETRIES}): {e}")
if attempt < self.MAX_RETRIES - 1:
# 指数退避 + 抖动
delay = min(2 ** attempt * 0.5, 30)
jitter = random.uniform(0, delay * 0.1)
time.sleep(delay + jitter)
else:
result.errors += 1
raise
return [], ""
def _upsert_batch(self, data: List[Dict[str, Any]]):
"""
批量 UPSERT
具体实现取决于使用的数据库(PostgreSQL/MySQL/SQLite)
"""
# PostgreSQL 示例
upserted = 0
skipped = 0
for record in data:
try:
self._upsert_single(record)
upserted += 1
except Exception as e:
self.logger.error(f"UPSERT 失败: {e}")
skipped += 1
return upserted, skipped
def _upsert_single(self, record: Dict[str, Any]):
"""单条记录 UPSERT"""
raise NotImplementedError("子类实现数据库-specific UPSERT")
三、防线二:UPSERT 去重——让重复写入变成零操作
3.1 为什么普通 INSERT 会产生冗余
普通 INSERT INTO ... VALUES (...) 在主键冲突时会直接报错。这意味着:
- 首次同步成功
- 第二次同步遇到相同主键 → 报错 → 脚本卡住或跳过他
- 或者脚本在同步前先
DELETE WHERE id = ?,但这在高并发下可能丢失两次同步之间的新写入
UPSERT(Update on Conflict)解决了这个问题:冲突时更新,不冲突时插入,语义上等价于“先查后写”的合并操作。
3.2 PostgreSQL 方案:ON CONFLICT DO UPDATE
-- PostgreSQL UPSERT 语法
INSERT INTO market_kline (symbol, timestamp, open, high, low, close, volume, updated_at)
VALUES ('AAPL.US', '2024-01-15 09:30:00', 185.50, 186.20, 185.30, 186.00, 1523400, NOW())
ON CONFLICT (symbol, timestamp) -- 唯一约束列
DO UPDATE SET
open = EXCLUDED.open,
high = EXCLUDED.high,
low = EXCLUDED.low,
close = EXCLUDED.close,
volume = EXCLUDED.volume,
updated_at = EXCLUDED.updated_at, -- 更新本地记录时间
hash = md5(row_to_json(EXCLUDED.*)::text) -- 同时更新哈希
WHERE market_kline.updated_at < EXCLUDED.updated_at; -- 条件更新:只更新更新的记录
关键语法解释:
ON CONFLICT (symbol, timestamp):指定唯一约束列,当这些列冲突时执行 UPDATEEXCLUDED:引用尝试插入的值(即新数据)WHERE ...:条件更新,避免因时间戳漂移导致旧数据覆盖新数据
3.3 Python 实现:带事务的批量 UPSERT
import psycopg2
from psycopg2.extras import execute_values
from typing import List, Dict, Any
from datetime import datetime
class PostgresUpsert:
"""
PostgreSQL 生产级 UPSERT 实现
支持批量操作、错误回滚、哈希校验
"""
def __init__(self, connection_string: str):
self.conn = psycopg2.connect(connection_string)
self.conn.autocommit = False # 手动控制事务
def batch_upsert_kline(self, records: List[Dict[str, Any]]):
"""
批量 UPSERT K线数据
Args:
records: [{"symbol": "AAPL.US", "timestamp": "...", "open": 185.5, ...}, ...]
"""
if not records:
return 0
# 构造批量插入的值列表
values = []
for r in records:
# 计算哈希(用于后续回溯检测)
hash_value = self._compute_hash(r)
values.append((
r["symbol"],
r["timestamp"],
r.get("interval", "1h"),
r.get("open", 0),
r.get("high", 0),
r.get("low", 0),
r.get("close", 0),
r.get("volume", 0),
hash_value,
datetime.now(timezone.utc) # updated_at
))
# 唯一约束列
conflict_cols = ["symbol", "timestamp"]
update_cols = ["interval", "open", "high", "low", "close", "volume", "hash", "updated_at"]
query = f"""
INSERT INTO market_kline
(symbol, timestamp, interval, open, high, low, close, volume, hash, updated_at)
VALUES %s
ON CONFLICT ({', '.join(conflict_cols)})
DO UPDATE SET
interval = EXCLUDED.interval,
open = EXCLUDED.open,
high = EXCLUDED.high,
low = EXCLUDED.low,
close = EXCLUDED.close,
volume = EXCLUDED.volume,
hash = EXCLUDED.hash,
updated_at = EXCLUDED.updated_at
WHERE market_kline.hash != EXCLUDED.hash -- 只更新哈希不同的记录
"""
try:
with self.conn.cursor() as cur:
execute_values(cur, query, values, page_size=500)
self.conn.commit()
return len(records)
except Exception as e:
self.conn.rollback()
raise RuntimeError(f"UPSERT 批量写入失败: {e}")
@staticmethod
def _compute_hash(record: Dict[str, Any]) -> str:
"""
计算记录哈希
用于回溯修改检测
只对核心字段计算,排除 updated_at 等元数据字段
"""
import hashlib
import json
# 核心字段(排除元数据)
core_fields = ["symbol", "timestamp", "interval", "open", "high", "low", "close", "volume"]
core_data = {k: record.get(k) for k in core_fields if k in record}
canonical = json.dumps(core_data, sort_keys=True, separators=(',', ':'))
return hashlib.sha256(canonical.encode()).hexdigest()
3.4 防重设计:唯一约束是地基
UPSERT 的前提是数据库层有唯一约束。如果没有唯一约束,UPSERT 语法会报错,你只能退回到“先 DELETE 再 INSERT”的方案。
-- 确保唯一约束存在(如果不存在则创建)
ALTER TABLE market_kline
ADD CONSTRAINT market_kline_unique_placeholder
UNIQUE (symbol, timestamp, interval);
-- 如果表已存在重复数据,先清理
DELETE FROM market_kline a
WHERE EXISTS (
SELECT 1 FROM market_kline b
WHERE b.symbol = a.symbol
AND b.timestamp = a.timestamp
AND b.interval = a.interval
AND b.ctid > a.ctid -- 保留 rowid 更小的即更早插入的
);
四、防线三:回溯修改检测——哈希校验的工程实践
这是本文最关键的部分。前面两节讲的是“增量拉取”和“去重”,但即使做到了这两点,仍然无法检测上游的历史修改。
4.1 为什么需要哈希校验
场景还原:
- 2024 年 1 月 1 日,TickDB 上有一根 K 线:
AAPL.US, 09:30, close=185.50 - 你的同步脚本拉取了这条数据,存入本地,哈希值 =
abc123 - 2024 年 1 月 15 日,TickDB 因为某种原因(数据清洗、算法调整)把这条数据的 close 改成了
185.52 - 你的同步脚本再次运行时,基于时间戳只拉取 1 月 15 日之后的数据,不会重新拉取 1 月 1 日的修改
- 本地数据库永远停留在
close=185.50,而上游已是185.52
三个月后你做回测,误差已经累积到不可接受的程度。
哈希校验的核心逻辑:每次 UPSERT 时同时更新该记录的哈希值。下次同步时,如果上游数据修改了,哈希值会变化,触发更新。
4.2 哈希计算的四个原则
| 原则 | 说明 | 示例 |
|---|---|---|
| 只计算核心字段 | 排除 updated_at、id 等元数据字段 |
不用 `md5(timestamp |
| Canonical 序列化 | 键的顺序要一致 | {"a": 1, "b": 2} 和 {"b": 2, "a": 1} 哈希相同 |
| 使用加密哈希 | SHA-256 或 MD5(数据量大时 MD5 更快) | 不使用自定义字符串拼接 |
| 存储格式 | 以字符串存储在数据库列中 | VARCHAR(64) for SHA-256 |
4.3 检测脚本:找出被修改的历史数据
import hashlib
import json
import logging
from typing import Dict, Any, List, Tuple
from datetime import datetime, timezone
class RetroactiveChangeDetector:
"""
回溯修改检测器
定期比对本地哈希与上游数据哈希,发现历史修改
"""
def __init__(self, sync_client, db_connection):
self.sync = sync_client
self.db = db_connection
self.logger = logging.getLogger(__name__)
def detect_changes(
self,
symbol: str,
check_since: str = None,
sample_rate: float = 1.0
) -> List[Dict[str, Any]]:
"""
检测回溯修改
Args:
symbol: 交易品种
check_since: 检查起始时间,None 表示最近30天
sample_rate: 采样率,1.0 = 全量检查
Returns:
存在修改的记录列表
"""
if check_since is None:
# 默认检查最近30天
check_since = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()
# 获取本地记录的哈希
local_hashes = self._get_local_hashes(symbol, check_since, sample_rate)
# 逐条拉取上游数据进行比对
changed_records = []
for record in local_hashes:
upstream = self._fetch_upstream_record(record["symbol"], record["timestamp"])
if upstream is None:
self.logger.warning(f"上游不存在本地记录: {record['symbol']} @ {record['timestamp']}")
continue
local_hash = record["hash"]
upstream_hash = self._compute_hash(upstream)
if local_hash != upstream_hash:
changed_records.append({
"symbol": record["symbol"],
"timestamp": record["timestamp"],
"local_value": record,
"upstream_value": upstream,
"local_hash": local_hash,
"upstream_hash": upstream_hash
})
self.logger.info(
f"检测到回溯修改: {symbol} @ {record['timestamp']}, "
f"本地={local_hash[:8]}, 上游={upstream_hash[:8]}"
)
return changed_records
def _get_local_hashes(
self,
symbol: str,
since: str,
sample_rate: float
) -> List[Dict[str, Any]]:
"""从本地数据库读取哈希值"""
query = f"""
SELECT symbol, timestamp, hash, open, high, low, close, volume
FROM market_kline
WHERE symbol = %s AND timestamp >= %s
ORDER BY timestamp
"""
with self.db.cursor() as cur:
if sample_rate < 1.0:
query += f" TABLESAMPLE SYSTEM ({sample_rate * 100})"
cur.execute(query, (symbol, since))
columns = [desc[0] for desc in cur.description]
return [dict(zip(columns, row)) for row in cur.fetchall()]
def _fetch_upstream_record(self, symbol: str, timestamp: str) -> Dict[str, Any]:
"""
从上游拉取单条记录
限频处理同前文
"""
params = {
"symbol": symbol,
"start_time": timestamp,
"end_time": timestamp,
"limit": 1
}
response = requests.get(
"https://api.tickdb.ai/v1/market/kline",
headers={"X-API-Key": os.environ.get("TICKDB_API_KEY")},
params=params,
timeout=(3.05, 10)
)
data = response.json()
if data.get("code") != 0 or not data.get("data"):
return None
return data["data"][0]
@staticmethod
def _compute_hash(record: Dict[str, Any]) -> str:
"""SHA-256 哈希"""
core_fields = ["symbol", "timestamp", "interval", "open", "high", "low", "close", "volume"]
core_data = {k: record.get(k) for k in core_fields if k in record}
canonical = json.dumps(core_data, sort_keys=True, separators=(',', ':'))
return hashlib.sha256(canonical.encode()).hexdigest()
def repair_changes(self, changes: List[Dict[str, Any]], dry_run: bool = True):
"""
修复检测到的回溯修改
Args:
changes: detect_changes() 返回的修改列表
dry_run: True = 只记录不实际更新
"""
if not changes:
self.logger.info("无回溯修改需要修复")
return
action = "【模拟】修复" if dry_run else "修复"
self.logger.info(f"{action} {len(changes)} 条回溯修改")
for change in changes:
self.logger.info(
f"{action}: {change['symbol']} @ {change['timestamp']}\n"
f" 本地 close={change['local_value'].get('close')}, "
f"上游 close={change['upstream_value'].get('close')}"
)
if not dry_run:
self._apply_fix(change)
def _apply_fix(self, change: Dict[str, Any]):
"""执行修复更新"""
query = """
UPDATE market_kline
SET open = %s, high = %s, low = %s, close = %s, volume = %s,
hash = %s, updated_at = %s, is_backfilled = TRUE
WHERE symbol = %s AND timestamp = %s
"""
upstream = change["upstream_value"]
hash_val = self._compute_hash(upstream)
with self.db.cursor() as cur:
cur.execute(query, (
upstream.get("open"), upstream.get("high"), upstream.get("low"),
upstream.get("close"), upstream.get("volume"),
hash_val, datetime.now(timezone.utc),
change["symbol"], change["timestamp"]
))
self.db.commit()
4.4 调度策略:什么时候跑检测?
回溯修改检测的成本比增量同步高得多(需要全量拉取历史数据),建议按以下策略调度:
| 场景 | 调度频率 | 说明 |
|---|---|---|
| 数据清洗期(上游通知) | 立即执行 | 上游告知大规模数据修正时 |
| 每周例行 | 周末低峰期 | 检查最近7天的回溯修改 |
| 每月审计 | 月末 | 全量抽样检查,采样率 10% |
| 回测前 | 按需 | 重要策略上线前回溯验证 |
五、部署方案:三种规模的实现差异
| 维度 | 个人开发者 | 团队(小规模) | 机构级 |
|---|---|---|---|
| 调度方式 | 手动触发 / cron | Airflow / Prefect | 自研调度系统 |
| 数据库 | SQLite | PostgreSQL | PostgreSQL + 分区表 |
| 哈希存储 | 列存储 | 列存储 + 索引 | 列存储 + 分区索引 + 列式存储 |
| 检测频率 | 按需 | 每日一次 | 实时监控 |
| 告警集成 | 日志打印 | 飞书 / Slack | PagerDuty + 邮件 |
| 成本估算 | $0 | $50-200/月 | $500+/月 |
六、完整流程图
┌─────────────────────────────────────────────────────────────┐
│ 增量同步完整流程 │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────┐
│ 读取游标/时间戳 │
└────────┬────────┘
│
▼
┌─────────────────────────────┐
│ 分页拉取上游数据 │
│ (限频处理 / 重连 / 抖动) │
└─────────────┬───────────────┘
│
▼
┌─────────────────────────────┐
│ 遍历每条记录 │
│ │
│ ┌───────────────────────┐ │
│ │ 计算 SHA-256 哈希 │ │
│ └───────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────┐ │
│ │ UPSERT (PostgreSQL) │ │
│ │ ON CONFLICT │ │
│ │ WHERE hash != NEW │ │
│ └───────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────┴─────────┐ │
│ │ 哈希变化? │ │
│ └─────────┬─────────┘ │
│ 是/否 │ │
│ ┌───────┴───────┐ │
│ ▼ ▼ │
│ 写入+更新 跳过(无变化) │
│ │ │ │
│ └───────┬───────┘ │
└──────────────┼──────────────┘
│
▼
┌─────────────────┐
│ 持久化游标 │
│ (Redis/DB/File) │
└────────┬────────┘
│
▼
┌─────────────────────────────┐
│ 定期回溯检测(可选) │
│ (采样拉取 / 哈希比对) │
└─────────────────────────────┘
结语
增量同步不是“拉数据存进去”这么简单。它是一个需要同时处理新增识别、去重写入、回溯检测的完整系统。三个防线的设计逻辑:
- 时间戳/版本号过滤:降低网络和存储成本,只拉必要的增量
- UPSERT 去重:确保幂等性,重复运行不会产生脏数据
- 哈希校验:检测上游的历史修改,保证本地数据的最终一致性
这三层防线组合使用,即便上游发生数据清洗或算法调整,本地数据库也能在下一个同步周期内收敛到最新状态。
如果你已经有了数据同步方案,不妨问自己三个问题:
- 上游修改了三个月前的历史数据,你的本地会感知到吗?
- 同步脚本连续跑两次,数据库里会多出一倍重复数据吗?
- 如果游标丢失,你能从哪个时间点恢复?
如果任何一个问题你回答不了,这套方案值得你花一个周末搭起来。
下一步行动
如果你是个人开发者,想快速验证这套方案:
- 访问 tickdb.ai 注册获取免费 API Key
- 用上文提供的 Python 脚本对接
/kline接口 - PostgreSQL 的 UPSERT 语法可以本地用 Docker 起一个
如果你需要 TickDB 提供美股 10 年历史 K 线数据做策略回测,联系 [email protected] 了解机构级数据方案,包含回溯修改日志和版本号接口支持。
如果你习惯用 AI 辅助开发,在 ClawHub 搜索安装 tickdb-market-data SKILL,可以直接用自然语言查询 TickDB 的数据接口能力。
本文涉及的技术实现均基于 PostgreSQL 数据库。如使用 MySQL 或 SQLite,UPSERT 语法需做相应调整。TickDB 的 /kline 接口支持历史 K 线查询,但不支持美股和 A 股的 tick 级逐笔成交数据。