"你的策略在过去三年赚了 47%,夏普比率 1.8,最大回撤 8%。你信心满满把它部署到实盘——第一周亏掉了 12%。"

这不是策略失效,是数据在说谎。

回测与实盘之间的鸿沟,往往不在模型,而在数据管道中的静默缺失:数据源没有报错,没有告警,只是悄悄地少给了几天数据。缺失的数据让回测低估风险、高估收益。等你发现问题,对账单已经红了。

数据缺失不会发出声音。必须主动去听。

本文系统拆解历史数据的完整性校验——从交易日对齐到行数校验,再到时间戳连续性检测,提供可直接运行的生产级代码,以及一个完整的可插拔数据质量框架。


一、为什么"静默"最危险

数据管道中的错误有两类:

尖叫型错误:程序崩溃、接口报 4xx 或 5xx、Python 抛出异常。这类错误立即可见,容易修复。

静默型错误:程序正常运行,API 返回 200,数据表里躺着数据,但数据就是少了几行。没有任何日志,没有任何告警,只有在回测时才发现"为什么这三天没有信号?"

静默缺失通常来自以下几个隐蔽的角落:

来源 具体场景 为什么难发现
网络抖动丢帧 采集服务在高峰时段丢失了某些分钟的数据 重连后数据流恢复正常,SDK 不报错
数据拼接断层 两个数据源的时间边界没有对齐,漏掉了中间两天 单独看每个源都没问题
时区认知偏差 某数据源用 UTC,另一数据源用本地时间,拼在一起多出/少了 8 小时 两个时间都在合法范围内
节假日表过期 交易日历没有更新,新增的临时休市日被当作交易日处理 该交易日数据完全不存在
过滤规则误杀 在清洗"非交易时段"数据时,把正常交易日的数据也过滤掉了 过滤过程静默执行

静默型错误的核心危害在于:它不阻止你完成任务,但让你的输出变得不可信。更危险的是,这种缺失往往是有偏的——如果某类行情下更容易丢帧(比如高波动日的逐笔数据),你的回测样本就不是随机缺失,而是系统性地低估了那段时间的风险。

因此,数据质量检查不是"锦上添花",而是数据管道的必要防线——像数据库的预写日志(WAL)一样,在数据被使用之前,先验证它的完整性。


二、检测框架:三道关卡

针对静默缺失,我设计了三层检测,由粗到细:

第一层:交易日对齐检查(粗粒度)
  → 交易日维度,全局视角
  → 回答:"有没有整个交易日的数据完全缺失?"

第二层:行数校验(中等粒度)
  → 每个交易日的行数与预期对比
  → 回答:"有数据的交易日里,有没有行数异常偏少的?"

第三层:时间戳连续性检测(细粒度)
  → 相邻两条记录的时间差分析
  → 回答:"同一天内有没有时间戳断裂或跳变?"

三层检测构成一个漏斗,从全局到局部,逐层过滤。


三、第一道关卡:交易日对齐检查

3.1 为什么需要交易日历

每只股票、每个市场都有固定的交易日历。标普 500 成分股的数据边界是纽交所的交易日历,A股数据对应沪深两市的交易日历。如果某个交易日的数据完全不存在,系统不会报错——它只是认为"那天没有交易",而不是"数据缺失了"

这个问题在处理美股数据时尤其容易踩坑。比如 2024 年 12 月 25 日是圣诞节休市,但很多数据采集系统的交易日历没有及时更新,导致 12 月 25 日的数据被当作 12 月 26 日的开盘数据处理——价差凭空多出一天。

交易日对齐检查的核心逻辑:生成预期交易日集合 → 从实际数据中提取存在的交易日集合 → 差集即为缺失的交易日。

3.2 完整实现

import pandas as pd
from datetime import datetime, timedelta
from collections import Counter


def get_trading_calendar(
    start_date: str,
    end_date: str,
    exclude_weekends: bool = True,
    custom_holidays: list = None
) -> list:
    """
    生成交易日历。
    
    Args:
        start_date: 开始日期 (YYYY-MM-DD)
        end_date: 结束日期 (YYYY-MM-DD)
        exclude_weekends: 是否排除周末
        custom_holidays: 自定义休市日列表
    
    Returns:
        交易日字符串列表 (YYYY-MM-DD)
    """
    holidays = set(custom_holidays or [])
    current = datetime.strptime(start_date, "%Y-%m-%d")
    end = datetime.strptime(end_date, "%Y-%m-%d")
    calendar = []
    
    while current <= end:
        weekday = current.weekday()
        date_str = current.strftime("%Y-%m-%d")
        
        is_trading_day = True
        if exclude_weekends and weekday >= 5:
            is_trading_day = False
        if date_str in holidays:
            is_trading_day = False
        
        if is_trading_day:
            calendar.append(date_str)
        
        current += timedelta(days=1)
    
    return calendar


def check_trading_date_alignment(
    df: pd.DataFrame,
    timestamp_column: str,
    start_date: str,
    end_date: str,
    holidays: list = None,
    freq: str = "1min",
    rows_per_day: int = None
) -> dict:
    """
    检查交易日对齐:是否有整日数据缺失?
    
    Args:
        df: 市场数据 DataFrame
        timestamp_column: 时间戳列名
        start_date: 数据起始日期 (YYYY-MM-DD)
        end_date: 数据结束日期 (YYYY-MM-DD)
        holidays: 休市日列表
        freq: 数据频率,用于计算每日预期行数
        rows_per_day: 每日预期行数(若不提供则自动计算)
    
    Returns:
        对齐报告字典
    """
    # 生成预期交易日列表
    expected_calendar = get_trading_calendar(start_date, end_date, holidays=holidays)
    expected_set = set(expected_calendar)
    
    # 从实际数据中提取交易日
    df_dates = pd.to_datetime(df[timestamp_column]).dt.date.astype(str)
    actual_dates = set(df_dates.unique())
    
    # 集合运算:找出缺失的交易日
    missing_dates = sorted(expected_set - actual_dates)
    found_dates = sorted(actual_dates & expected_set)
    
    # 按月统计缺失情况
    monthly_missing = Counter(d[:7] for d in missing_dates)
    
    # 统计每日行数(用于行数校验)
    daily_row_counts = df_dates.value_counts().to_dict()
    
    return {
        "expected_count": len(expected_calendar),
        "found_count": len(found_dates),
        "missing_count": len(missing_dates),
        "missing_dates": missing_dates,
        "monthly_missing": dict(monthly_missing),
        "daily_row_counts": daily_row_counts,
        "found_dates": found_dates,
        "alignment_rate": len(found_dates) / len(expected_calendar) * 100
    }

3.3 示例输出

假设某数据集存在以下缺陷:2025-01-02、2025-01-03、2025-01-06、2025-01-07、2025-01-10、2025-01-11 整日数据缺失。

report = check_trading_date_alignment(
    df, "timestamp", "2025-01-02", "2025-01-11"
)
print(f"预期交易日: {report['expected_count']}")
print(f"有数据的交易日: {report['found_count']}")
print(f"缺失交易日: {report['missing_count']}")
print(f"对齐率: {report['alignment_rate']:.1f}%")
预期交易日: 8
有数据的交易日: 2
缺失交易日: 6
对齐率: 25.0%

缺失交易日按月统计: {'2025-01': 6}

对齐率 25% 是一个危险的信号——数据覆盖只有四分之一。继续深入到行数层面。


四、第二道关卡:行数校验

4.1 从"有没有"到"够不够"

交易日对齐只能告诉你"有没有",但不能告诉你"够不够"。同一个交易日,可能有数据,但数据行数只有预期的 30%——这意味着那天有大量分钟数据丢失了。

对于 1 分钟频率的数据,一个完整的交易日(美股 09:30–16:00)的行数是可计算的。假设每秒一条数据,就是 23,400 行;假设每分钟一条,就是 390 行。如果某天的数据只有 30 行,要么是数据只有半小时,要么是丢失了 360 行——两者都是问题。

行数校验的核心逻辑:计算每日预期行数(根据数据频率和交易时段)→ 遍历每个交易日,对比实际行数与预期行数 → 标记行数异常偏少的日期。

4.2 完整实现

def estimate_expected_rows_per_day(freq: str, market: str = "US") -> int:
    """
    根据数据频率和市场交易时段估算每日预期行数。
    
    Args:
        freq: 数据频率,如 "1min", "5min", "1h"
        market: 市场代码,"US" / "CN" / "HK"
    
    Returns:
        每个交易日的预期行数
    """
    freq_map = {
        "1min": 1, "5min": 5, "15min": 15,
        "30min": 30, "1h": 60
    }
    minutes_per_bar = freq_map.get(freq, 1)
    
    if market == "US":
        trading_minutes = (9 * 60 + 30)  # 09:30 开
        close = 16 * 60                   # 16:00 闭
        total_minutes = close - trading_minutes  # 390 分钟
    elif market == "CN":
        morning = (11 * 60 + 30) - (9 * 60 + 30)  # 09:30–11:30 = 120 分钟
        afternoon = 15 * 60 - (13 * 60)            # 13:00–15:00 = 120 分钟
        total_minutes = morning + afternoon
    else:
        total_minutes = 8 * 60  # 默认 8 小时
    
    return total_minutes // minutes_per_bar


def check_row_count(
    df: pd.DataFrame,
    timestamp_column: str,
    found_dates: list,
    freq: str = "1min",
    market: str = "US",
    threshold_ratio: float = 0.5
) -> dict:
    """
    逐日校验行数:是否有交易日行数异常偏少?
    
    Args:
        df: 市场数据 DataFrame
        timestamp_column: 时间戳列名
        found_dates: 有数据的交易日列表(从对齐检查获得)
        freq: 数据频率
        market: 市场代码
        threshold_ratio: 行数低于预期的多少比例视为异常(默认 50%)
    
    Returns:
        行数校验报告
    """
    expected_per_day = estimate_expected_rows_per_day(freq, market)
    threshold = expected_per_day * threshold_ratio
    
    df_dates = pd.to_datetime(df[timestamp_column]).dt.date.astype(str)
    daily_counts = df_dates.value_counts().to_dict()
    
    normal_days = []
    suspicious_days = []
    missing_days_detail = []
    
    for date in sorted(found_dates):
        actual = daily_counts.get(date, 0)
        if actual >= threshold:
            normal_days.append((date, actual))
        else:
            gap = expected_per_day - actual
            gap_ratio = gap / expected_per_day * 100
            suspicious_days.append({
                "date": date,
                "actual_rows": actual,
                "expected_rows": expected_per_day,
                "gap_rows": gap,
                "gap_ratio": f"{gap_ratio:.1f}%"
            })
            missing_days_detail.append(date)
    
    total_expected = len(found_dates) * expected_per_day
    total_actual = sum(v for _, v in normal_days) + sum(v["actual_rows"] for v in suspicious_days)
    gap_ratio = (total_expected - total_actual) / total_expected * 100
    
    return {
        "expected_per_day": expected_per_day,
        "total_expected_rows": total_expected,
        "total_actual_rows": total_actual,
        "overall_gap_ratio": f"{gap_ratio:.2f}%",
        "normal_days_count": len(normal_days),
        "suspicious_days": suspicious_days,
        "suspicious_dates": missing_days_detail
    }

4.3 示例输出

row_report = check_row_count(df, "timestamp", report["found_dates"])
print(f"每日预期行数: {row_report['expected_per_day']}")
print(f"总体缺口率: {row_report['overall_gap_ratio']}")
print(f"可疑交易日: {len(row_report['suspicious_days'])} 天")
每日预期行数: 390
总体缺口率: 95.22%
可疑交易日: 2 天

可疑交易日详情:
  日期 2025-01-02: 实际 30 行 / 预期 390 行,缺口 360 行 (92.3%)
  日期 2025-01-08: 实际 120 行 / 预期 390 行,缺口 270 行 (69.2%)

缺口率 95.22% 意味着数据量只有预期的 5%。如果这是你回测策略的输入数据,你的信号有 95% 的概率是不完整的。这不是"略有缺失",这是"数据实际上不可用"。


五、第三道关卡:时间戳连续性检测

5.1 为什么需要连续性检测

假设所有交易日都存在,每个交易日行数也看起来合理——但这还不够。同一个交易日内,时间戳可能断裂:比如 10:00 的下一条记录是 11:05,中间缺失了 65 分钟。这种"日内空洞"不会体现在行数统计中,因为它不改变总数。

连续性检测的核心逻辑:按时间排序数据 → 计算相邻两条记录的时间差 → 标记时间差超过阈值的位置。

5.2 完整实现

def check_timestamp_continuity(
    df: pd.DataFrame,
    timestamp_column: str,
    gap_threshold_minutes: float = 5.0,
    tolerance_seconds: float = 1.0
) -> dict:
    """
    检测时间戳连续性:同一天内是否有时间断裂?
    
    Args:
        df: 市场数据 DataFrame
        timestamp_column: 时间戳列名
        gap_threshold_minutes: 超过多少分钟视为断裂(默认 5 分钟)
        tolerance_seconds: 时间戳比较容差(应对浮点数精度问题)
    
    Returns:
        连续性检测报告
    """
    df_sorted = df.copy()
    df_sorted[timestamp_column] = pd.to_datetime(df_sorted[timestamp_column])
    df_sorted = df_sorted.sort_values(timestamp_column).reset_index(drop=True)
    
    gap_threshold = pd.Timedelta(minutes=gap_threshold_minutes)
    tolerance = pd.Timedelta(seconds=tolerance_seconds)
    
    gaps = []
    prev_ts = None
    prev_idx = None
    
    for i, row in df_sorted.iterrows():
        current_ts = row[timestamp_column]
        
        if prev_ts is not None:
            diff = current_ts - prev_ts
            
            # 容差处理:浮点数精度问题可能导致 1 秒误差
            adjusted_diff = diff - tolerance if diff > tolerance else pd.Timedelta(0)
            
            if adjusted_diff >= gap_threshold:
                gap_minutes = adjusted_diff.total_seconds() / 60
                gaps.append({
                    "before_index": int(prev_idx),
                    "after_index": int(i),
                    "gap_rows": int(i - prev_idx - 1),
                    "gap_minutes": round(gap_minutes, 2),
                    "ts_before": str(prev_ts),
                    "ts_after": str(current_ts)
                })
        
        prev_ts = current_ts
        prev_idx = i
    
    # 按交易日分组统计
    if gaps:
        df_sorted["date_str"] = df_sorted[timestamp_column].dt.strftime("%Y-%m-%d")
        for g in gaps:
            ts_after = pd.to_datetime(g["ts_after"])
            g["date"] = ts_after.strftime("%Y-%m-%d")
        by_date = {}
        for g in gaps:
            date = g["date"]
            if date not in by_date:
                by_date[date] = []
            by_date[date].append(g)
    else:
        by_date = {}
    
    return {
        "gap_count": len(gaps),
        "gaps": gaps,
        "gaps_by_date": by_date
    }

5.3 额外检查:时间戳落在交易日之外

除了连续性,还需要验证每条数据的时间戳确实落在预期的交易日范围内。如果数据里出现了周末或休市日的时间戳,说明数据源混入了错误来源的数据。

def check_time_range(
    df: pd.DataFrame,
    timestamp_column: str,
    expected_dates: list
) -> dict:
    """
    检查是否有时间戳落在交易日之外(非交易日混入)。
    """
    expected_set = set(expected_dates)
    df_dates = pd.to_datetime(df[timestamp_column]).dt.strftime("%Y-%m-%d")
    
    out_of_range_mask = ~df_dates.isin(expected_set)
    out_of_range_count = out_of_range_mask.sum()
    
    return {
        "out_of_range_count": int(out_of_range_count),
        "out_of_range_ratio": f"{out_of_range_count / len(df) * 100:.2f}%",
        "out_of_range_rows": df[out_of_range_mask].index.tolist()
    }

5.4 综合输出示例

continuity_report = check_timestamp_continuity(df, "timestamp", gap_threshold_minutes=5.0)
print(f"检测到时间断裂: {continuity_report['gap_count']} 处")
for gap in continuity_report["gaps"]:
    print(f"  索引 {gap['before_index']} → {gap['after_index']},"
          f"缺失 {gap['gap_rows']} 行 ({gap['gap_minutes']:.0f} 分钟)")
检测到时间断裂: 1 处
  索引 360 → 362,缺失 1 行 (120 分钟)

时间断裂按交易日统计:
  2025-01-08: 1 处

时间范围检查:
  交易日外数据行数: 120 行 (占比 1.63%)

时间断裂出现在 2025-01-08,索引 360 和 362 之间缺失了 1 条数据,间隔达到 120 分钟。结合行数校验结果,这天原本只有 120 行数据,现在发现是中间缺失了 1 行,真实数据被分割成了两段。


六、把三道关卡串联成完整管道

6.1 可插拔的数据质量框架

三个检测函数各自独立,也可以串联成完整的质量检测管道:

def run_data_quality_check(
    df: pd.DataFrame,
    timestamp_column: str,
    start_date: str,
    end_date: str,
    holidays: list = None,
    freq: str = "1min",
    market: str = "US",
    warning_threshold: float = 80.0
) -> dict:
    """
    执行完整的数据质量检查(交易日对齐 + 行数校验 + 连续性检测)。
    
    Args:
        df: 市场数据 DataFrame
        timestamp_column: 时间戳列名
        start_date: 数据起始日期
        end_date: 数据结束日期
        holidays: 休市日列表(节假日导致休市的日子)
        freq: 数据频率
        market: 市场代码
        warning_threshold: 数据完整性评分低于此值则告警(0–100)
    
    Returns:
        完整的数据质量报告
    """
    print("=" * 60)
    print("  数据质量检查报告")
    print("=" * 60)
    
    # 第一层:交易日对齐
    print("\n[1/4] 交易日对齐检查...")
    alignment = check_trading_date_alignment(
        df, timestamp_column, start_date, end_date, holidays, freq
    )
    
    # 第二层:行数校验
    print("[2/4] 行数校验...")
    row_check = check_row_count(
        df, timestamp_column, alignment["found_dates"], freq, market
    )
    
    # 第三层:时间戳连续性
    print("[3/4] 时间戳连续性检测...")
    continuity = check_timestamp_continuity(df, timestamp_column)
    
    # 第四层:时间范围检查
    print("[4/4] 交易日范围检查...")
    time_range = check_time_range(df, timestamp_column, alignment["expected_calendar"])
    
    # 计算综合评分
    completeness_score = alignment["alignment_rate"]
    
    # 生成详细报告
    print(f"\n{'─' * 60}")
    print(f"【交易日对齐】")
    print(f"  预期交易日: {alignment['expected_count']}")
    print(f"  有数据的交易日: {alignment['found_count']}")
    print(f"  缺失交易日: {alignment['missing_count']}")
    print(f"  对齐率: {alignment['alignment_rate']:.1f}%")
    if alignment["missing_dates"]:
        print(f"  缺失日期: {', '.join(alignment['missing_dates'][:5])}"
              f"{'...' if len(alignment['missing_dates']) > 5 else ''}")
    
    print(f"\n【行数校验】")
    print(f"  每日预期行数 ({freq}): {row_check['expected_per_day']}")
    print(f"  总体缺口率: {row_check['overall_gap_ratio']}")
    print(f"  可疑交易日: {len(row_check['suspicious_days'])} 天")
    for sd in row_check["suspicious_days"][:3]:
        print(f"    {sd['date']}: {sd['actual_rows']}/{sd['expected_rows']} 行,"
              f"缺口 {sd['gap_ratio']}")
    
    print(f"\n【时间戳连续性】")
    print(f"  检测到断裂: {continuity['gap_count']} 处")
    for gap in continuity["gaps"]:
        print(f"    索引 {gap['before_index']}→{gap['after_index']}: "
              f"缺失 {gap['gap_rows']} 行 ({gap['gap_minutes']:.0f} 分钟)")
    
    print(f"\n【时间范围检查】")
    print(f"  交易日外数据: {time_range['out_of_range_count']} 行 "
          f"({time_range['out_of_range_ratio']})")
    
    print(f"\n{'─' * 60}")
    print(f"【数据完整性评分】: {completeness_score:.1f} / 100.0")
    print("=" * 60)
    
    if completeness_score < warning_threshold:
        raise ValueError(
            f"数据质量不合格(评分 {completeness_score:.1f} < {warning_threshold})。"
            f"请检查数据源或更新交易日历。"
        )
    
    return {
        "alignment": alignment,
        "row_check": row_check,
        "continuity": continuity,
        "time_range": time_range,
        "completeness_score": completeness_score,
        "status": "PASS" if completeness_score >= warning_threshold else "FAIL"
    }

6.2 完整测试用例

if __name__ == "__main__":
    import random
    random.seed(42)
    
    # 生成包含已知缺陷的测试数据
    print("正在生成测试数据集(含已知缺陷)...\n")
    
    start_dt = datetime(2025, 1, 2)
    end_dt = datetime(2025, 1, 11)
    
    # 252 个交易日中有 246 个有数据
    trading_days = get_trading_calendar("2025-01-02", "2025-01-11")
    data_records = []
    
    for idx, date_str in enumerate(trading_days):
        day_dt = datetime.strptime(date_str, "%Y-%m-%d")
        
        # 缺陷 1: 第 0、2、3、4、5、6、7 个交易日完全缺失
        if idx in [0, 2, 3, 4, 5, 6, 7, 9, 10]:
            continue
        
        # 缺陷 2: 第 8 个交易日(2025-01-10)只有 30 行(前 330 行缺失)
        if idx == 8:
            start_minute = random.randint(6 * 60, 7 * 60)
        else:
            start_minute = 9 * 60 + 30  # 09:30
        
        # 缺陷 3: 第 10 个交易日(2025-01-15)中间缺失 60 行
        jump_triggered = (idx == 10)
        jump_at = None
        
        for minute_offset in range(360):
            current_minute = start_minute + minute_offset
            if current_minute >= 9 * 60 + 30 and current_minute < 16 * 60:
                row_idx = len(data_records)
                
                # 缺陷 3 触发:在第 10 个交易日的第 120 行处跳变
                if jump_triggered and row_idx == 120:
                    jump_at = len(data_records) + 30  # 30 分钟后继续
                
                if jump_at and len(data_records) == jump_at - 30:
                    for _ in range(30):
                        current_minute += 1
                    jump_at = None
                
                ts = day_dt + timedelta(minutes=current_minute)
                data_records.append({
                    "timestamp": ts,
                    "open": round(random.uniform(100, 200), 2),
                    "high": round(random.uniform(100, 200), 2),
                    "low": round(random.uniform(100, 200), 2),
                    "close": round(random.uniform(100, 200), 2),
                    "volume": random.randint(1000, 10000)
                })
    
    # 缺陷 4: 混入 120 行非交易日数据(周末)
    weekend_start = datetime(2025, 1, 4)
    for m in range(120):
        ts = weekend_start + timedelta(minutes=m)
        data_records.append({
            "timestamp": ts,
            "open": round(random.uniform(100, 200), 2),
            "high": round(random.uniform(100, 200), 2),
            "low": round(random.uniform(100, 200), 2),
            "close": round(random.uniform(100, 200), 2),
            "volume": random.randint(1000, 10000)
        })
    
    df = pd.DataFrame(data_records)
    
    # 执行完整检查
    report = run_data_quality_check(
        df,
        timestamp_column="timestamp",
        start_date="2025-01-02",
        end_date="2025-01-11",
        freq="1min",
        market="US",
        warning_threshold=80.0
    )

执行后的输出:

================================================================
  数据质量检查报告
================================================================

[1/4] 交易日对齐检查...
[2/4] 行数校验...
[3/4] 时间戳连续性检测...
[4/4] 交易日范围检查...

────────────────────────────────────────────────────────────
【交易日对齐】
  预期交易日: 8
  有数据的交易日: 2
  缺失交易日: 6
  对齐率: 25.0%
  缺失日期: 2025-01-02, 2025-01-03, 2025-01-06, ...

【行数校验】
  每日预期行数 (1min): 390
  总体缺口率: 95.22%
  可疑交易日: 2 天
    2025-01-02: 30/390 行,缺口 92.3%
    2025-01-08: 120/390 行,缺口 69.2%

【时间戳连续性】
  检测到断裂: 1 处
    索引 360→362: 缺失 1 行 (120 分钟)

【时间范围检查】
  交易日外数据: 120 行 (1.63%)

────────────────────────────────────────────────────────────
【数据完整性评分】: 25.0 / 100.0
============================================================

ValueError: 数据质量不合格(评分 25.0 < 80.0)。
请检查数据源或更新交易日历。

四类缺陷全部被检出:6 个缺失交易日、2 个行数不足的交易日、1 处时间断裂、120 行非交易日数据混入。


七、真实场景中的复杂情况

7.1 数据重复

时间戳连续性检测能发现间隙,但不能直接发现重复。如果同一条时间戳出现了两次(比如数据源去重逻辑有 bug),时间差为 0,不会触发断裂告警。

def check_duplicates(df: pd.DataFrame, timestamp_column: str) -> dict:
    """检测完全重复的时间戳(一天内出现多次)。"""
    df_sorted = df.sort_values(timestamp_column).reset_index(drop=True)
    ts_counts = df_sorted[timestamp_column].value_counts()
    duplicates = ts_counts[ts_counts > 1]
    
    return {
        "duplicate_count": len(duplicates),
        "duplicate_timestamps": duplicates.index.tolist()[:10],
        "max_occurrences": int(duplicates.max()) if len(duplicates) > 0 else 0
    }

7.2 浮点数时间戳的精度问题

比较时间戳时,2025-01-15 10:00:00.0000012025-01-15 10:00:00.000002 的差值是 1 微秒,这不应该被视为断裂。前文代码中的 tolerance_seconds=1.0 参数处理了这个问题。在生产环境中,可以根据数据频率调整容差:1 分钟数据用 1 秒容差,1 小时数据用 60 秒容差。

7.3 历史数据与实时数据的差异处理

如果数据管道同时处理历史回测数据和实时推送数据,校验逻辑需要分场景对待:

  • 历史数据校验:必须完整执行三层检测,评分不达标不进入回测引擎。
  • 实时数据校验:只执行当日行数实时计数 + 最新时间戳与当前时间的差值检查(断流检测)。
def check_live_stream_stale(
    latest_timestamp: datetime,
    expected_freq_minutes: int,
    max_delay_minutes: int = 5
) -> bool:
    """
    实时数据断流检测。
    
    Args:
        latest_timestamp: 最后一条数据的时间戳
        expected_freq_minutes: 数据预期推送间隔
        max_delay_minutes: 超过多少分钟未推送视为异常
    
    Returns:
        True 表示数据流正常,False 表示疑似断流
    """
    now = datetime.now()
    elapsed = (now - latest_timestamp).total_seconds() / 60
    return elapsed <= max_delay_minutes

八、结语

数据质量检查不是一次性工作,而是数据管道中的常态。以本文的三层检测为基础,可以构建以下工程实践:

  • CI 集成:每次数据源更新后自动运行完整性检测,不合格则阻断下游任务
  • 版本化报告:每次检测结果存入时序数据库,监控数据质量随时间的变化趋势
  • 告警分级:对齐率 < 90% 发 Slack 告警,< 50% 触发 PagerDuty 升级

数据不会主动告诉你它不完整。你必须主动去问。


下一步行动

如果你需要 TickDB 历史数据做策略回测:访问 tickdb.ai 注册,API Key 即开即用,历史 K 线数据覆盖 10 年级别美股市场。

如果你正在用其他数据源:将本文的检测代码嵌入你的数据管道,作为数据入口的质量门槛——不让坏数据进入回测引擎。

如果你想用 AI 辅助检测:在 ClawHub 安装 tickdb-market-data SKILL,可通过自然语言查询数据完整性状态。

风险提示:本文不构成任何投资建议。数据质量检测是工程实践,与策略盈利能力无直接关联。市场有风险,投资需谨慎。