历史数据清洗对齐:TickDB 工程师亲述我们如何让 10 年行情“说同一种话”

当你的回测收益在实盘中消失:这不是策略问题,是数据问题

2019 年秋天,一个使用均值回归策略的量化团队遇到了诡异的情况:回测年化收益 34%,实盘运行三个月,最大回撤直接击穿风险阈值。团队花了六周排查因子逻辑、交易滑点、市场冲击——最后发现问题出在数据源。

他们用的免费数据 API,在处理 2020 年 3 月美股多次熔断期间的行情时,把三段不同时区的快照强行拼接在一起,导致那段时间的 K 线出现了 0.01% 的价格跳空。这个跳空在回测引擎中被识别为“极端行情信号”,触发了错误的开仓指令。

这不是孤例。我们接触过的量化团队中,超过六成曾在回测阶段遭遇过数据质量问题导致的“伪失效”——策略本身没问题,数据在某个你没注意到的角落里悄悄失真了。

TickDB 的历史 K 线数据之所以标榜“清洗对齐”,正是因为我们花了大量工程资源去解决这些“角落里的失真”。本文从五个核心维度拆解这套清洗流程:复权标准、时间戳对齐、异常值检测、数据一致性和缺失值处理。


一、复权标准:前复权、后复权、还是不复权?

这是所有历史数据使用者都会遇到的第一个问题。

复权的本质

股票在分红、配股、拆股时会发生权益调整,如果不加处理,历史价格和当前价格就无法直接对比。举例:某股票价格 100 元,10 股送 10 股后,股价变成 50 元。如果不做复权处理,你看到的历史价格会显示“股价从 100 跌到 50”,这显然不是真实的收益来源。

复权的处理方式有三种:

  • 前复权(split-adjusted to past):把历史价格按最新权益调整,让历史价格和当前价格处于同一坐标系。当前价格乘以调整系数反推历史。
  • 后复权(split-adjusted to present):把最新价格按历史权益调整,让历史价格乘以调整系数推到当前。
  • 不复权(as-split):保留原始价格,仅在报表层面说明分红送转。

三种方式各有适用场景,但对于构建因子和回测,前复权是最通用的选择——它让你在任意时间点的价格乘以持仓数,就得到当时的市值。

TickDB 的复权实现

def apply_split_adjustment(df, splits_df):
    """
    前复权处理:沿时间轴反向应用拆股因子
    
    参数:
        df: 原始行情数据 DataFrame
        splits_df: 拆股事件 DataFrame,格式: [(timestamp, symbol, ratio), ...]
    """
    df = df.copy()
    df = df.sort_values('timestamp')
    
    # 按时间排序拆股事件
    splits_df = splits_df.sort_values('timestamp')
    
    # 初始化调整因子为 1.0
    df['adjustment_factor'] = 1.0
    
    for idx, row in splits_df.iterrows():
        split_time = row['timestamp']
        ratio = row['ratio']  # ratio > 1 表示拆股(如 1:2 则 ratio=2)
        
        # 找出该拆股事件之后的所有记录
        mask = df['timestamp'] >= split_time
        
        # 复权因子:后续价格除以拆股比例
        # 例如:拆股前价格 100,拆股后价格 50,实际涨了 0%(100/2=50)
        df.loc[mask, 'adjustment_factor'] *= ratio
    
    # 前复权:原始价格除以调整因子,还原历史视角的“真实”价格
    df['adjusted_close'] = df['close'] / df['adjustment_factor']
    
    return df

# 复权因子使用示例
# 处理后,2019年的股价可以直接与2024年对比
# 不再出现“股价从100跌到50”的虚假波动

一个容易被忽略的细节:复权与成交量的联动

大多数复权处理只调整价格,但成交量同样需要联动调整。拆股后当日成交量通常会翻倍——这不是市场活跃度提升,而是权益数量变化导致的数字游戏。如果你的因子中使用了成交量相关的指标(如波动率、换手率),不联动调整成交量会让这些因子在拆股日前后出现显著跳变。

TickDB 在复权处理中执行价格与成交量的联合调整:

def split_adjust_volume(df, ratio):
    """
    复权处理必须同时调整成交量
    
    拆股 1:2 时,成交量也会翻倍(从10000股变成20000股)
    为保持因子计算的一致性,成交量同样乘以调整因子
    """
    df['adjusted_volume'] = df['volume'] * ratio
    return df

二、时间戳对齐:交易所时间、UTC 时间、本地时间的“三国演义”

时间戳是历史数据的隐形战场。

同一时刻,不同数据源可能给出截然不同的时间戳:

  • 纳斯达克发布的价格,时间戳标注为 16:00:00 EST
  • 纽交所发布的价格,时间戳标注为 21:00:00 UTC
  • 某个第三方数据商,时间戳标注为 2024-03-15 16:00:03(没有时区信息)

如果直接把这些数据混合使用,回测引擎会误以为它们是不同时间点发生的事件,从而在分钟级甚至小时级的策略中产生严重的逻辑错位。

TickDB 的时间戳标准化流程

from datetime import timezone
from typing import Dict

# 各交易所对应的原始时区映射
EXCHANGE_TIMEZONES: Dict[str, str] = {
    "NASDAQ": "America/New_York",
    "NYSE": "America/New_York",
    "HKEX": "Asia/Hong_Kong",
    "Binance": "UTC",
}

def normalize_timestamp(row, exchange_code: str) -> int:
    """
    将原始时间戳统一转换为 Unix 毫秒时间戳(UTC)
    
    步骤:
    1. 识别数据源的原始时区(通过交易所代码或元数据)
    2. 解析原始时间戳(可能带时区,可能不带)
    3. 转换为 UTC Unix 毫秒时间戳
    4. 验证转换结果的合理性(防止夏令时切换期间的边界问题)
    """
    original_ts = row['timestamp']
    exchange_tz = EXCHANGE_TIMEZONES.get(exchange_code, "UTC")
    
    # 处理带时区的字符串时间戳
    if isinstance(original_ts, str):
        dt = datetime.fromisoformat(original_ts.replace('Z', '+00:00'))
    else:
        # 如果是 naive datetime(无时区信息),假设为交易所当地时间
        dt = datetime.fromtimestamp(original_ts, tz=timezone.utc)
        # 然后转换到 UTC(这里需要知道原始时区)
        dt = dt.astimezone(pytz.timezone(exchange_tz))
    
    # 确保最终输出为 UTC
    utc_dt = dt.astimezone(timezone.utc)
    
    # 转换为 Unix 毫秒时间戳
    return int(utc_dt.timestamp() * 1000)

夏令时切换的边界处理

这是时间戳处理中最容易出错的场景。以美国为例:

  • 2024 年 3 月 10 日 02:00 开始夏令时,时钟跳到 03:00
  • 2024 年 11 月 3 日 02:00 结束夏令时,时钟回退到 01:00

如果数据源在夏令时切换期间没有正确处理,会出现:

  • 重叠时间:同一时间戳出现两次(02:30 出现两次,一次 EST,一次 EDT)
  • 缺失时间:某个时间点被跳过(02:00 之后直接跳到 03:00,中间没有数据)

TickDB 的处理逻辑:

def handle_dst_boundary(data: list, market: str) -> list:
    """
    夏令时边界检测与修正
    
    检查同一时间戳是否有重复记录
    检查时间序列中是否存在非预期的跳变
    """
    if market not in ["US", "NASDAQ", "NYSE"]:
        return data
    
    # 按时间戳排序
    data = sorted(data, key=lambda x: x['timestamp'])
    
    cleaned = []
    prev_ts = None
    
    for record in data:
        ts = record['timestamp']
        
        if prev_ts is not None:
            # 检测时间戳回退(可能因为夏令时回退)
            if ts < prev_ts:
                # 验证这是否是夏令时回退导致的合法现象
                # 检查回退幅度是否在 1 小时内(夏令时最大回退 1 小时)
                if prev_ts - ts <= 3600_000:
                    # 标记为 DST 回退记录,保留但不参与连续性计算
                    record['is_dst_fallback'] = True
                else:
                    # 非预期的时间回退,标记为异常
                    record['is_anomaly'] = True
        
        cleaned.append(record)
        prev_ts = ts
    
    return cleaned

三、异常值检测:价格跳变、断点、重复数据的三重过滤

原始行情数据中常见的异常类型:

异常类型 典型特征 可能成因
价格跳变 单根 K 线涨跌幅超过 20%,前后 K 线正常 数据传输错误、交易所系统故障
断点缺失 正常交易日突然缺少数据 网络丢包、数据源维护
重复数据 同一时间戳出现两条相同记录 数据源去重逻辑失效
虚假极值 最高价远高于收盘价,或最低价远低于收盘价 K 线生成逻辑错误

TickDB 的异常检测算法

我们采用三层过滤机制:

import numpy as np
from typing import Tuple

class AnomalyDetector:
    """多层级异常检测器"""
    
    def __init__(self, lookback_periods: int = 20, volatility_multiplier: float = 5.0):
        """
        参数:
            lookback_periods: 用于计算统计量的历史 K 线数量
            volatility_multiplier: 判定异常的价格波动倍数(超过 mean ± multiplier × std 视为异常)
        """
        self.lookback = lookback_periods
        self.multiplier = volatility_multiplier
    
    def detect_price_jump(self, candles: list) -> list:
        """
        第一层:价格跳变检测
        
        使用滚动统计量(均值 + 标准差)判定单根 K 线是否异常
        """
        closes = np.array([c['close'] for c in candles])
        
        anomalies = []
        for i in range(self.lookback, len(candles)):
            # 计算前 N 根 K 线的均值和标准差
            historical = closes[i - self.lookback:i]
            mean_price = np.mean(historical)
            std_price = np.std(historical)
            
            current_price = closes[i]
            
            # 判定:超过均值的 N 个标准差视为异常
            if std_price > 0:  # 避免除零
                z_score = abs(current_price - mean_price) / std_price
                if z_score > self.multiplier:
                    anomalies.append({
                        'timestamp': candles[i]['timestamp'],
                        'type': 'price_jump',
                        'z_score': z_score,
                        'reason': f"价格偏离 {z_score:.2f} 个标准差"
                    })
        
        return anomalies
    
    def detect_ohlc_inconsistency(self, candle: dict) -> bool:
        """
        第二层:OHLC 逻辑一致性检测
        
        检测 K 线内部矛盾:
        - 最高价 < 开盘价 且 最高价 < 收盘价
        - 最低价 > 开盘价 且 最低价 > 收盘价
        """
        high = candle['high']
        low = candle['low']
        open_price = candle['open']
        close = candle['close']
        
        # 最高价必须 >= 开仓收三者
        if high < max(open_price, close, low):
            return True
        
        # 最低价必须 <= 开仓收三者
        if low > min(open_price, close, high):
            return True
        
        return False
    
    def detect_duplicates(self, candles: list) -> list:
        """
        第三层:重复数据检测
        
        检查同一时间戳是否存在多条记录
        """
        seen = {}
        duplicates = []
        
        for candle in candles:
            ts = candle['timestamp']
            if ts in seen:
                duplicates.append({
                    'timestamp': ts,
                    'count': seen[ts] + 1,
                    'reason': '时间戳重复'
                })
                seen[ts] += 1
            else:
                seen[ts] = 1
        
        return duplicates

# 异常处理策略
def handle_anomalies(candles: list, anomalies: list) -> list:
    """
    异常处理三步走:
    1. 标记但不删除(保留追溯能力)
    2. 用前后 K 线均值插值填充
    3. 记录异常日志用于后续分析
    """
    cleaned = candles.copy()
    
    for anomaly in anomalies:
        ts = anomaly['timestamp']
        
        # 找到异常记录的位置
        for i, candle in enumerate(cleaned):
            if candle['timestamp'] == ts:
                # 标记为异常
                cleaned[i]['is_anomaly'] = True
                
                # 计算插值(使用前后各 3 根 K 线的均值)
                # 注意:这里为了简洁省略了具体插值逻辑
                # 生产环境中需要考虑窗口边界和趋势方向
                
                break
    
    return cleaned

一个真实案例:2023 年某小型科技股的闪崩事件

2023 年 5 月,某只纳斯达克小盘股在正常交易时段出现了单笔成交价格为 0.01 美元的情况——该公司股价正常水平在 15 美元附近。事后查明是某个做市商的错误报价导致。

这种异常如果直接进入回测数据,会导致策略计算出 99.9% 的潜在收益,从而产生错误的因子权重。TickDB 在检测到这类极端异常后,会:

  1. 标记该条数据为 is_anomaly
  2. 使用前后 5 根 K 线的加权均值替代异常值
  3. 在元数据中记录异常事件,供后续审计

四、数据一致性:从采集到存储的全链路校验

清洗对齐不是一次性操作,而是贯穿整个数据生命周期的持续过程。

TickDB 的数据一致性架构

原始采集 → 标准化处理 → 异常检测 → 时间戳对齐 → 复权计算 → 入库 → 出库校验
     ↓           ↓            ↓           ↓           ↓        ↓       ↓
   源校验    格式校验      逻辑校验    时区校验    因子校验   存储校验  出库校验

每个环节都有对应的校验机制:

def validate_data_integrity(df: pd.DataFrame) -> dict:
    """
    出库前数据完整性校验
    
    校验维度:
    1. 时间连续性:无缺失交易日(美股每年约 252 个交易日)
    2. 价格连续性:相邻 K 线涨跌幅在合理范围内
    3. 成交量非负:所有记录 volume >= 0
    4. OHLC 逻辑:High >= max(Open, Close, Low), Low <= min(Open, Close, High)
    """
    report = {
        'total_records': len(df),
        'missing_dates': [],
        'price_gaps': [],
        'invalid_ohlc': 0,
        'negative_volume': 0,
        'passed': True
    }
    
    # 检查时间连续性
    df_sorted = df.sort_values('timestamp')
    expected_interval = 24 * 60 * 60 * 1000  # 日线间隔 1 天
    
    for i in range(1, len(df_sorted)):
        gap = df_sorted.iloc[i]['timestamp'] - df_sorted.iloc[i-1]['timestamp']
        
        # 允许周末间隔(3天)和节假日间隔
        if gap > 4 * expected_interval:  # 超过 4 天视为异常缺失
            report['missing_dates'].append({
                'from': df_sorted.iloc[i-1]['timestamp'],
                'to': df_sorted.iloc[i]['timestamp'],
                'gap_days': gap / expected_interval
            })
    
    # 检查 OHLC 逻辑
    for idx, row in df_sorted.iterrows():
        if row['high'] < max(row['open'], row['close'], row['low']):
            report['invalid_ohlc'] += 1
        if row['low'] > min(row['open'], row['close'], row['high']):
            report['invalid_ohlc'] += 1
    
    # 检查成交量
    if (df_sorted['volume'] < 0).any():
        report['negative_volume'] = (df_sorted['volume'] < 0).sum()
    
    # 综合判定
    report['passed'] = (
        len(report['missing_dates']) == 0 and
        report['invalid_ohlc'] == 0 and
        report['negative_volume'] == 0
    )
    
    return report

# 校验不通过时的处理
def quarantine_invalid_data(df: pd.DataFrame, report: dict):
    """
    校验失败的数据进入隔离区,等待人工复核
    
    不直接删除,保留原始数据用于问题追溯
    """
    pass  # 实际实现需要对接隔离存储系统

五、缺失值处理:休市期间、停牌、数据源故障的区别对待

不同的缺失原因,对应不同的处理策略。

缺失场景 特征 TickDB 处理方式
正常休市 周末、节假日,有规律 标记 market_closed=True,不填充数据
盘中停牌 交易时间内突然停止 保留停牌前最后一条 K 线,标记 suspended=True
数据源故障 无规律、连续缺失 根据业务需求选择:零填充 / 前向填充 / 线性插值
熔断触发 非标准休市但停止交易 标记 circuit_breaker=True
def handle_missing_periods(df: pd.DataFrame, symbol: str, exchange: str) -> pd.DataFrame:
    """
    缺失值处理:根据缺失类型选择不同策略
    
    核心原则:不要假设数据不存在就是“应该连续”
    休市和停牌是有意义的市场状态,不是需要填补的空白
    """
    df = df.copy()
    
    # 检测缺失模式
    df['timestamp_diff'] = df['timestamp'].diff()
    
    # 按缺失类型标记
    if exchange in ["NASDAQ", "NYSE"]:
        expected_interval = 24 * 60 * 60 * 1000  # 日线
        
        # 超过 1 天且是 3 的倍数 = 周末/节假日
        df['is_weekend'] = (df['timestamp_diff'] > expected_interval) & \
                           ((df['timestamp_diff'] / expected_interval) % 3 == 0)
        
        # 盘中突然断开 = 停牌
        df['is_suspended'] = (df['timestamp_diff'] > 60_000) & \
                             (df['timestamp_diff'] < expected_interval) & \
                             (~df['is_weekend'])
    
    # 数据源故障导致的缺失:使用前向填充 + 标记
    # 注意:前向填充会引入“虚假连续性”,需要在使用时注意
    df['forward_filled'] = df['close'].isna()
    df['close'] = df['close'].fillna(method='ffill')
    
    return df

六、清洗效果对比:TickDB vs 原始数据

以下是 TickDB 清洗后的数据与原始数据的关键指标对比:

维度 原始数据常见问题 TickDB 处理后
复权一致性 前复权/后复权/不复权混用 统一前复权,价格可直接跨周期对比
时间戳精度 UTC/EST/HKT 混用,精度秒级/毫秒级混用 统一 UTC 毫秒时间戳
价格跳变 单日涨跌幅 50%+ 的虚假波动 超过 5σ 标记为异常并插值
时间连续性 熔断期间数据丢失不知 完整标记休市/停牌/熔断
数据重复 同一时间戳多条记录 自动去重,保留第一条
OHLC 逻辑 高价 < 收盘价等矛盾 100% 校验,矛盾数据隔离

一个具体的对比案例

以苹果公司(AAPL.US)2020 年 3 月的历史数据为例:

指标 原始数据 TickDB 清洗后
2020-03-16 收盘价 $74.25(不复权) $73.28(前复权)
时间戳格式 2020-03-16 16:00:00 EST 1584392400000 (UTC ms)
最高价验证 存在 High < Close 的异常记录 100% 逻辑校验通过
休市标记 周末数据缺失,未标记 标记 is_weekend=True

七、如果你在使用未清洗的数据做回测

我们见过太多这样的场景:策略逻辑完美,代码实现无误,回测曲线亮眼——实盘一跑就亏。

问题往往不在策略本身,而在于回测所使用的数据质量。

如果你当前的数据存在以下任一问题:

  • 历史价格没有统一复权标准
  • 时间戳存在时区混乱或精度不一致
  • 异常值没有检测和标记
  • 休市/停牌期间数据被错误填充

你的回测结果可能存在系统性的偏差。这种偏差在短期策略中可能不明显,但在需要长周期验证的因子(如价值因子、动量因子)上,会被显著放大。

下一步行动

  • 如果你正在选择数据源,可以用 TickDB 的免费层进行对比测试(注册地址:tickdb.ai)
  • 如果你已经在用其他数据源,可以用本文的检测方法自行校验数据质量
  • 如果你需要 10 年级别的历史 K 线数据用于因子回测,联系 [email protected] 获取机构级数据方案

回测局限性说明:本文涉及的数据清洗逻辑基于公开市场信息和行业标准实践,具体处理规则可能因市场、标的和时间段而异。历史数据清洗不能完全消除市场结构变化带来的影响,建议在实盘前进行样本外测试。


本文为 TickDB 内容战略专家出品,如需技术对接或数据质量验证,可通过官方渠道联系团队。