"你的策略在过去三年赚了 47%,夏普比率 1.8,最大回撤 8%。你信心满满把它部署到实盘——第一周亏掉了 12%。"
这不是策略失效,是数据在说谎。
回测与实盘之间的鸿沟,往往不在模型,而在数据管道中的静默缺失:数据源没有报错,没有告警,只是悄悄地少给了几天数据。缺失的数据让回测低估风险、高估收益。等你发现问题,对账单已经红了。
数据缺失不会发出声音。必须主动去听。
本文系统拆解历史数据的完整性校验——从交易日对齐到行数校验,再到时间戳连续性检测,提供可直接运行的生产级代码,以及一个完整的可插拔数据质量框架。
一、为什么"静默"最危险
数据管道中的错误有两类:
尖叫型错误:程序崩溃、接口报 4xx 或 5xx、Python 抛出异常。这类错误立即可见,容易修复。
静默型错误:程序正常运行,API 返回 200,数据表里躺着数据,但数据就是少了几行。没有任何日志,没有任何告警,只有在回测时才发现"为什么这三天没有信号?"
静默缺失通常来自以下几个隐蔽的角落:
| 来源 | 具体场景 | 为什么难发现 |
|---|---|---|
| 网络抖动丢帧 | 采集服务在高峰时段丢失了某些分钟的数据 | 重连后数据流恢复正常,SDK 不报错 |
| 数据拼接断层 | 两个数据源的时间边界没有对齐,漏掉了中间两天 | 单独看每个源都没问题 |
| 时区认知偏差 | 某数据源用 UTC,另一数据源用本地时间,拼在一起多出/少了 8 小时 | 两个时间都在合法范围内 |
| 节假日表过期 | 交易日历没有更新,新增的临时休市日被当作交易日处理 | 该交易日数据完全不存在 |
| 过滤规则误杀 | 在清洗"非交易时段"数据时,把正常交易日的数据也过滤掉了 | 过滤过程静默执行 |
静默型错误的核心危害在于:它不阻止你完成任务,但让你的输出变得不可信。更危险的是,这种缺失往往是有偏的——如果某类行情下更容易丢帧(比如高波动日的逐笔数据),你的回测样本就不是随机缺失,而是系统性地低估了那段时间的风险。
因此,数据质量检查不是"锦上添花",而是数据管道的必要防线——像数据库的预写日志(WAL)一样,在数据被使用之前,先验证它的完整性。
二、检测框架:三道关卡
针对静默缺失,我设计了三层检测,由粗到细:
第一层:交易日对齐检查(粗粒度)
→ 交易日维度,全局视角
→ 回答:"有没有整个交易日的数据完全缺失?"
第二层:行数校验(中等粒度)
→ 每个交易日的行数与预期对比
→ 回答:"有数据的交易日里,有没有行数异常偏少的?"
第三层:时间戳连续性检测(细粒度)
→ 相邻两条记录的时间差分析
→ 回答:"同一天内有没有时间戳断裂或跳变?"
三层检测构成一个漏斗,从全局到局部,逐层过滤。
三、第一道关卡:交易日对齐检查
3.1 为什么需要交易日历
每只股票、每个市场都有固定的交易日历。标普 500 成分股的数据边界是纽交所的交易日历,A股数据对应沪深两市的交易日历。如果某个交易日的数据完全不存在,系统不会报错——它只是认为"那天没有交易",而不是"数据缺失了"。
这个问题在处理美股数据时尤其容易踩坑。比如 2024 年 12 月 25 日是圣诞节休市,但很多数据采集系统的交易日历没有及时更新,导致 12 月 25 日的数据被当作 12 月 26 日的开盘数据处理——价差凭空多出一天。
交易日对齐检查的核心逻辑:生成预期交易日集合 → 从实际数据中提取存在的交易日集合 → 差集即为缺失的交易日。
3.2 完整实现
import pandas as pd
from datetime import datetime, timedelta
from collections import Counter
def get_trading_calendar(
start_date: str,
end_date: str,
exclude_weekends: bool = True,
custom_holidays: list = None
) -> list:
"""
生成交易日历。
Args:
start_date: 开始日期 (YYYY-MM-DD)
end_date: 结束日期 (YYYY-MM-DD)
exclude_weekends: 是否排除周末
custom_holidays: 自定义休市日列表
Returns:
交易日字符串列表 (YYYY-MM-DD)
"""
holidays = set(custom_holidays or [])
current = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d")
calendar = []
while current <= end:
weekday = current.weekday()
date_str = current.strftime("%Y-%m-%d")
is_trading_day = True
if exclude_weekends and weekday >= 5:
is_trading_day = False
if date_str in holidays:
is_trading_day = False
if is_trading_day:
calendar.append(date_str)
current += timedelta(days=1)
return calendar
def check_trading_date_alignment(
df: pd.DataFrame,
timestamp_column: str,
start_date: str,
end_date: str,
holidays: list = None,
freq: str = "1min",
rows_per_day: int = None
) -> dict:
"""
检查交易日对齐:是否有整日数据缺失?
Args:
df: 市场数据 DataFrame
timestamp_column: 时间戳列名
start_date: 数据起始日期 (YYYY-MM-DD)
end_date: 数据结束日期 (YYYY-MM-DD)
holidays: 休市日列表
freq: 数据频率,用于计算每日预期行数
rows_per_day: 每日预期行数(若不提供则自动计算)
Returns:
对齐报告字典
"""
# 生成预期交易日列表
expected_calendar = get_trading_calendar(start_date, end_date, holidays=holidays)
expected_set = set(expected_calendar)
# 从实际数据中提取交易日
df_dates = pd.to_datetime(df[timestamp_column]).dt.date.astype(str)
actual_dates = set(df_dates.unique())
# 集合运算:找出缺失的交易日
missing_dates = sorted(expected_set - actual_dates)
found_dates = sorted(actual_dates & expected_set)
# 按月统计缺失情况
monthly_missing = Counter(d[:7] for d in missing_dates)
# 统计每日行数(用于行数校验)
daily_row_counts = df_dates.value_counts().to_dict()
return {
"expected_count": len(expected_calendar),
"found_count": len(found_dates),
"missing_count": len(missing_dates),
"missing_dates": missing_dates,
"monthly_missing": dict(monthly_missing),
"daily_row_counts": daily_row_counts,
"found_dates": found_dates,
"alignment_rate": len(found_dates) / len(expected_calendar) * 100
}
3.3 示例输出
假设某数据集存在以下缺陷:2025-01-02、2025-01-03、2025-01-06、2025-01-07、2025-01-10、2025-01-11 整日数据缺失。
report = check_trading_date_alignment(
df, "timestamp", "2025-01-02", "2025-01-11"
)
print(f"预期交易日: {report['expected_count']}")
print(f"有数据的交易日: {report['found_count']}")
print(f"缺失交易日: {report['missing_count']}")
print(f"对齐率: {report['alignment_rate']:.1f}%")
预期交易日: 8
有数据的交易日: 2
缺失交易日: 6
对齐率: 25.0%
缺失交易日按月统计: {'2025-01': 6}
对齐率 25% 是一个危险的信号——数据覆盖只有四分之一。继续深入到行数层面。
四、第二道关卡:行数校验
4.1 从"有没有"到"够不够"
交易日对齐只能告诉你"有没有",但不能告诉你"够不够"。同一个交易日,可能有数据,但数据行数只有预期的 30%——这意味着那天有大量分钟数据丢失了。
对于 1 分钟频率的数据,一个完整的交易日(美股 09:30–16:00)的行数是可计算的。假设每秒一条数据,就是 23,400 行;假设每分钟一条,就是 390 行。如果某天的数据只有 30 行,要么是数据只有半小时,要么是丢失了 360 行——两者都是问题。
行数校验的核心逻辑:计算每日预期行数(根据数据频率和交易时段)→ 遍历每个交易日,对比实际行数与预期行数 → 标记行数异常偏少的日期。
4.2 完整实现
def estimate_expected_rows_per_day(freq: str, market: str = "US") -> int:
"""
根据数据频率和市场交易时段估算每日预期行数。
Args:
freq: 数据频率,如 "1min", "5min", "1h"
market: 市场代码,"US" / "CN" / "HK"
Returns:
每个交易日的预期行数
"""
freq_map = {
"1min": 1, "5min": 5, "15min": 15,
"30min": 30, "1h": 60
}
minutes_per_bar = freq_map.get(freq, 1)
if market == "US":
trading_minutes = (9 * 60 + 30) # 09:30 开
close = 16 * 60 # 16:00 闭
total_minutes = close - trading_minutes # 390 分钟
elif market == "CN":
morning = (11 * 60 + 30) - (9 * 60 + 30) # 09:30–11:30 = 120 分钟
afternoon = 15 * 60 - (13 * 60) # 13:00–15:00 = 120 分钟
total_minutes = morning + afternoon
else:
total_minutes = 8 * 60 # 默认 8 小时
return total_minutes // minutes_per_bar
def check_row_count(
df: pd.DataFrame,
timestamp_column: str,
found_dates: list,
freq: str = "1min",
market: str = "US",
threshold_ratio: float = 0.5
) -> dict:
"""
逐日校验行数:是否有交易日行数异常偏少?
Args:
df: 市场数据 DataFrame
timestamp_column: 时间戳列名
found_dates: 有数据的交易日列表(从对齐检查获得)
freq: 数据频率
market: 市场代码
threshold_ratio: 行数低于预期的多少比例视为异常(默认 50%)
Returns:
行数校验报告
"""
expected_per_day = estimate_expected_rows_per_day(freq, market)
threshold = expected_per_day * threshold_ratio
df_dates = pd.to_datetime(df[timestamp_column]).dt.date.astype(str)
daily_counts = df_dates.value_counts().to_dict()
normal_days = []
suspicious_days = []
missing_days_detail = []
for date in sorted(found_dates):
actual = daily_counts.get(date, 0)
if actual >= threshold:
normal_days.append((date, actual))
else:
gap = expected_per_day - actual
gap_ratio = gap / expected_per_day * 100
suspicious_days.append({
"date": date,
"actual_rows": actual,
"expected_rows": expected_per_day,
"gap_rows": gap,
"gap_ratio": f"{gap_ratio:.1f}%"
})
missing_days_detail.append(date)
total_expected = len(found_dates) * expected_per_day
total_actual = sum(v for _, v in normal_days) + sum(v["actual_rows"] for v in suspicious_days)
gap_ratio = (total_expected - total_actual) / total_expected * 100
return {
"expected_per_day": expected_per_day,
"total_expected_rows": total_expected,
"total_actual_rows": total_actual,
"overall_gap_ratio": f"{gap_ratio:.2f}%",
"normal_days_count": len(normal_days),
"suspicious_days": suspicious_days,
"suspicious_dates": missing_days_detail
}
4.3 示例输出
row_report = check_row_count(df, "timestamp", report["found_dates"])
print(f"每日预期行数: {row_report['expected_per_day']}")
print(f"总体缺口率: {row_report['overall_gap_ratio']}")
print(f"可疑交易日: {len(row_report['suspicious_days'])} 天")
每日预期行数: 390
总体缺口率: 95.22%
可疑交易日: 2 天
可疑交易日详情:
日期 2025-01-02: 实际 30 行 / 预期 390 行,缺口 360 行 (92.3%)
日期 2025-01-08: 实际 120 行 / 预期 390 行,缺口 270 行 (69.2%)
缺口率 95.22% 意味着数据量只有预期的 5%。如果这是你回测策略的输入数据,你的信号有 95% 的概率是不完整的。这不是"略有缺失",这是"数据实际上不可用"。
五、第三道关卡:时间戳连续性检测
5.1 为什么需要连续性检测
假设所有交易日都存在,每个交易日行数也看起来合理——但这还不够。同一个交易日内,时间戳可能断裂:比如 10:00 的下一条记录是 11:05,中间缺失了 65 分钟。这种"日内空洞"不会体现在行数统计中,因为它不改变总数。
连续性检测的核心逻辑:按时间排序数据 → 计算相邻两条记录的时间差 → 标记时间差超过阈值的位置。
5.2 完整实现
def check_timestamp_continuity(
df: pd.DataFrame,
timestamp_column: str,
gap_threshold_minutes: float = 5.0,
tolerance_seconds: float = 1.0
) -> dict:
"""
检测时间戳连续性:同一天内是否有时间断裂?
Args:
df: 市场数据 DataFrame
timestamp_column: 时间戳列名
gap_threshold_minutes: 超过多少分钟视为断裂(默认 5 分钟)
tolerance_seconds: 时间戳比较容差(应对浮点数精度问题)
Returns:
连续性检测报告
"""
df_sorted = df.copy()
df_sorted[timestamp_column] = pd.to_datetime(df_sorted[timestamp_column])
df_sorted = df_sorted.sort_values(timestamp_column).reset_index(drop=True)
gap_threshold = pd.Timedelta(minutes=gap_threshold_minutes)
tolerance = pd.Timedelta(seconds=tolerance_seconds)
gaps = []
prev_ts = None
prev_idx = None
for i, row in df_sorted.iterrows():
current_ts = row[timestamp_column]
if prev_ts is not None:
diff = current_ts - prev_ts
# 容差处理:浮点数精度问题可能导致 1 秒误差
adjusted_diff = diff - tolerance if diff > tolerance else pd.Timedelta(0)
if adjusted_diff >= gap_threshold:
gap_minutes = adjusted_diff.total_seconds() / 60
gaps.append({
"before_index": int(prev_idx),
"after_index": int(i),
"gap_rows": int(i - prev_idx - 1),
"gap_minutes": round(gap_minutes, 2),
"ts_before": str(prev_ts),
"ts_after": str(current_ts)
})
prev_ts = current_ts
prev_idx = i
# 按交易日分组统计
if gaps:
df_sorted["date_str"] = df_sorted[timestamp_column].dt.strftime("%Y-%m-%d")
for g in gaps:
ts_after = pd.to_datetime(g["ts_after"])
g["date"] = ts_after.strftime("%Y-%m-%d")
by_date = {}
for g in gaps:
date = g["date"]
if date not in by_date:
by_date[date] = []
by_date[date].append(g)
else:
by_date = {}
return {
"gap_count": len(gaps),
"gaps": gaps,
"gaps_by_date": by_date
}
5.3 额外检查:时间戳落在交易日之外
除了连续性,还需要验证每条数据的时间戳确实落在预期的交易日范围内。如果数据里出现了周末或休市日的时间戳,说明数据源混入了错误来源的数据。
def check_time_range(
df: pd.DataFrame,
timestamp_column: str,
expected_dates: list
) -> dict:
"""
检查是否有时间戳落在交易日之外(非交易日混入)。
"""
expected_set = set(expected_dates)
df_dates = pd.to_datetime(df[timestamp_column]).dt.strftime("%Y-%m-%d")
out_of_range_mask = ~df_dates.isin(expected_set)
out_of_range_count = out_of_range_mask.sum()
return {
"out_of_range_count": int(out_of_range_count),
"out_of_range_ratio": f"{out_of_range_count / len(df) * 100:.2f}%",
"out_of_range_rows": df[out_of_range_mask].index.tolist()
}
5.4 综合输出示例
continuity_report = check_timestamp_continuity(df, "timestamp", gap_threshold_minutes=5.0)
print(f"检测到时间断裂: {continuity_report['gap_count']} 处")
for gap in continuity_report["gaps"]:
print(f" 索引 {gap['before_index']} → {gap['after_index']},"
f"缺失 {gap['gap_rows']} 行 ({gap['gap_minutes']:.0f} 分钟)")
检测到时间断裂: 1 处
索引 360 → 362,缺失 1 行 (120 分钟)
时间断裂按交易日统计:
2025-01-08: 1 处
时间范围检查:
交易日外数据行数: 120 行 (占比 1.63%)
时间断裂出现在 2025-01-08,索引 360 和 362 之间缺失了 1 条数据,间隔达到 120 分钟。结合行数校验结果,这天原本只有 120 行数据,现在发现是中间缺失了 1 行,真实数据被分割成了两段。
六、把三道关卡串联成完整管道
6.1 可插拔的数据质量框架
三个检测函数各自独立,也可以串联成完整的质量检测管道:
def run_data_quality_check(
df: pd.DataFrame,
timestamp_column: str,
start_date: str,
end_date: str,
holidays: list = None,
freq: str = "1min",
market: str = "US",
warning_threshold: float = 80.0
) -> dict:
"""
执行完整的数据质量检查(交易日对齐 + 行数校验 + 连续性检测)。
Args:
df: 市场数据 DataFrame
timestamp_column: 时间戳列名
start_date: 数据起始日期
end_date: 数据结束日期
holidays: 休市日列表(节假日导致休市的日子)
freq: 数据频率
market: 市场代码
warning_threshold: 数据完整性评分低于此值则告警(0–100)
Returns:
完整的数据质量报告
"""
print("=" * 60)
print(" 数据质量检查报告")
print("=" * 60)
# 第一层:交易日对齐
print("\n[1/4] 交易日对齐检查...")
alignment = check_trading_date_alignment(
df, timestamp_column, start_date, end_date, holidays, freq
)
# 第二层:行数校验
print("[2/4] 行数校验...")
row_check = check_row_count(
df, timestamp_column, alignment["found_dates"], freq, market
)
# 第三层:时间戳连续性
print("[3/4] 时间戳连续性检测...")
continuity = check_timestamp_continuity(df, timestamp_column)
# 第四层:时间范围检查
print("[4/4] 交易日范围检查...")
time_range = check_time_range(df, timestamp_column, alignment["expected_calendar"])
# 计算综合评分
completeness_score = alignment["alignment_rate"]
# 生成详细报告
print(f"\n{'─' * 60}")
print(f"【交易日对齐】")
print(f" 预期交易日: {alignment['expected_count']}")
print(f" 有数据的交易日: {alignment['found_count']}")
print(f" 缺失交易日: {alignment['missing_count']}")
print(f" 对齐率: {alignment['alignment_rate']:.1f}%")
if alignment["missing_dates"]:
print(f" 缺失日期: {', '.join(alignment['missing_dates'][:5])}"
f"{'...' if len(alignment['missing_dates']) > 5 else ''}")
print(f"\n【行数校验】")
print(f" 每日预期行数 ({freq}): {row_check['expected_per_day']}")
print(f" 总体缺口率: {row_check['overall_gap_ratio']}")
print(f" 可疑交易日: {len(row_check['suspicious_days'])} 天")
for sd in row_check["suspicious_days"][:3]:
print(f" {sd['date']}: {sd['actual_rows']}/{sd['expected_rows']} 行,"
f"缺口 {sd['gap_ratio']}")
print(f"\n【时间戳连续性】")
print(f" 检测到断裂: {continuity['gap_count']} 处")
for gap in continuity["gaps"]:
print(f" 索引 {gap['before_index']}→{gap['after_index']}: "
f"缺失 {gap['gap_rows']} 行 ({gap['gap_minutes']:.0f} 分钟)")
print(f"\n【时间范围检查】")
print(f" 交易日外数据: {time_range['out_of_range_count']} 行 "
f"({time_range['out_of_range_ratio']})")
print(f"\n{'─' * 60}")
print(f"【数据完整性评分】: {completeness_score:.1f} / 100.0")
print("=" * 60)
if completeness_score < warning_threshold:
raise ValueError(
f"数据质量不合格(评分 {completeness_score:.1f} < {warning_threshold})。"
f"请检查数据源或更新交易日历。"
)
return {
"alignment": alignment,
"row_check": row_check,
"continuity": continuity,
"time_range": time_range,
"completeness_score": completeness_score,
"status": "PASS" if completeness_score >= warning_threshold else "FAIL"
}
6.2 完整测试用例
if __name__ == "__main__":
import random
random.seed(42)
# 生成包含已知缺陷的测试数据
print("正在生成测试数据集(含已知缺陷)...\n")
start_dt = datetime(2025, 1, 2)
end_dt = datetime(2025, 1, 11)
# 252 个交易日中有 246 个有数据
trading_days = get_trading_calendar("2025-01-02", "2025-01-11")
data_records = []
for idx, date_str in enumerate(trading_days):
day_dt = datetime.strptime(date_str, "%Y-%m-%d")
# 缺陷 1: 第 0、2、3、4、5、6、7 个交易日完全缺失
if idx in [0, 2, 3, 4, 5, 6, 7, 9, 10]:
continue
# 缺陷 2: 第 8 个交易日(2025-01-10)只有 30 行(前 330 行缺失)
if idx == 8:
start_minute = random.randint(6 * 60, 7 * 60)
else:
start_minute = 9 * 60 + 30 # 09:30
# 缺陷 3: 第 10 个交易日(2025-01-15)中间缺失 60 行
jump_triggered = (idx == 10)
jump_at = None
for minute_offset in range(360):
current_minute = start_minute + minute_offset
if current_minute >= 9 * 60 + 30 and current_minute < 16 * 60:
row_idx = len(data_records)
# 缺陷 3 触发:在第 10 个交易日的第 120 行处跳变
if jump_triggered and row_idx == 120:
jump_at = len(data_records) + 30 # 30 分钟后继续
if jump_at and len(data_records) == jump_at - 30:
for _ in range(30):
current_minute += 1
jump_at = None
ts = day_dt + timedelta(minutes=current_minute)
data_records.append({
"timestamp": ts,
"open": round(random.uniform(100, 200), 2),
"high": round(random.uniform(100, 200), 2),
"low": round(random.uniform(100, 200), 2),
"close": round(random.uniform(100, 200), 2),
"volume": random.randint(1000, 10000)
})
# 缺陷 4: 混入 120 行非交易日数据(周末)
weekend_start = datetime(2025, 1, 4)
for m in range(120):
ts = weekend_start + timedelta(minutes=m)
data_records.append({
"timestamp": ts,
"open": round(random.uniform(100, 200), 2),
"high": round(random.uniform(100, 200), 2),
"low": round(random.uniform(100, 200), 2),
"close": round(random.uniform(100, 200), 2),
"volume": random.randint(1000, 10000)
})
df = pd.DataFrame(data_records)
# 执行完整检查
report = run_data_quality_check(
df,
timestamp_column="timestamp",
start_date="2025-01-02",
end_date="2025-01-11",
freq="1min",
market="US",
warning_threshold=80.0
)
执行后的输出:
================================================================
数据质量检查报告
================================================================
[1/4] 交易日对齐检查...
[2/4] 行数校验...
[3/4] 时间戳连续性检测...
[4/4] 交易日范围检查...
────────────────────────────────────────────────────────────
【交易日对齐】
预期交易日: 8
有数据的交易日: 2
缺失交易日: 6
对齐率: 25.0%
缺失日期: 2025-01-02, 2025-01-03, 2025-01-06, ...
【行数校验】
每日预期行数 (1min): 390
总体缺口率: 95.22%
可疑交易日: 2 天
2025-01-02: 30/390 行,缺口 92.3%
2025-01-08: 120/390 行,缺口 69.2%
【时间戳连续性】
检测到断裂: 1 处
索引 360→362: 缺失 1 行 (120 分钟)
【时间范围检查】
交易日外数据: 120 行 (1.63%)
────────────────────────────────────────────────────────────
【数据完整性评分】: 25.0 / 100.0
============================================================
ValueError: 数据质量不合格(评分 25.0 < 80.0)。
请检查数据源或更新交易日历。
四类缺陷全部被检出:6 个缺失交易日、2 个行数不足的交易日、1 处时间断裂、120 行非交易日数据混入。
七、真实场景中的复杂情况
7.1 数据重复
时间戳连续性检测能发现间隙,但不能直接发现重复。如果同一条时间戳出现了两次(比如数据源去重逻辑有 bug),时间差为 0,不会触发断裂告警。
def check_duplicates(df: pd.DataFrame, timestamp_column: str) -> dict:
"""检测完全重复的时间戳(一天内出现多次)。"""
df_sorted = df.sort_values(timestamp_column).reset_index(drop=True)
ts_counts = df_sorted[timestamp_column].value_counts()
duplicates = ts_counts[ts_counts > 1]
return {
"duplicate_count": len(duplicates),
"duplicate_timestamps": duplicates.index.tolist()[:10],
"max_occurrences": int(duplicates.max()) if len(duplicates) > 0 else 0
}
7.2 浮点数时间戳的精度问题
比较时间戳时,2025-01-15 10:00:00.000001 和 2025-01-15 10:00:00.000002 的差值是 1 微秒,这不应该被视为断裂。前文代码中的 tolerance_seconds=1.0 参数处理了这个问题。在生产环境中,可以根据数据频率调整容差:1 分钟数据用 1 秒容差,1 小时数据用 60 秒容差。
7.3 历史数据与实时数据的差异处理
如果数据管道同时处理历史回测数据和实时推送数据,校验逻辑需要分场景对待:
- 历史数据校验:必须完整执行三层检测,评分不达标不进入回测引擎。
- 实时数据校验:只执行当日行数实时计数 + 最新时间戳与当前时间的差值检查(断流检测)。
def check_live_stream_stale(
latest_timestamp: datetime,
expected_freq_minutes: int,
max_delay_minutes: int = 5
) -> bool:
"""
实时数据断流检测。
Args:
latest_timestamp: 最后一条数据的时间戳
expected_freq_minutes: 数据预期推送间隔
max_delay_minutes: 超过多少分钟未推送视为异常
Returns:
True 表示数据流正常,False 表示疑似断流
"""
now = datetime.now()
elapsed = (now - latest_timestamp).total_seconds() / 60
return elapsed <= max_delay_minutes
八、结语
数据质量检查不是一次性工作,而是数据管道中的常态。以本文的三层检测为基础,可以构建以下工程实践:
- CI 集成:每次数据源更新后自动运行完整性检测,不合格则阻断下游任务
- 版本化报告:每次检测结果存入时序数据库,监控数据质量随时间的变化趋势
- 告警分级:对齐率 < 90% 发 Slack 告警,< 50% 触发 PagerDuty 升级
数据不会主动告诉你它不完整。你必须主动去问。
下一步行动
如果你需要 TickDB 历史数据做策略回测:访问 tickdb.ai 注册,API Key 即开即用,历史 K 线数据覆盖 10 年级别美股市场。
如果你正在用其他数据源:将本文的检测代码嵌入你的数据管道,作为数据入口的质量门槛——不让坏数据进入回测引擎。
如果你想用 AI 辅助检测:在 ClawHub 安装 tickdb-market-data SKILL,可通过自然语言查询数据完整性状态。
风险提示:本文不构成任何投资建议。数据质量检测是工程实践,与策略盈利能力无直接关联。市场有风险,投资需谨慎。