历史数据清洗对齐:TickDB 工程师亲述我们如何让 10 年行情“说同一种话”
当你的回测收益在实盘中消失:这不是策略问题,是数据问题
2019 年秋天,一个使用均值回归策略的量化团队遇到了诡异的情况:回测年化收益 34%,实盘运行三个月,最大回撤直接击穿风险阈值。团队花了六周排查因子逻辑、交易滑点、市场冲击——最后发现问题出在数据源。
他们用的免费数据 API,在处理 2020 年 3 月美股多次熔断期间的行情时,把三段不同时区的快照强行拼接在一起,导致那段时间的 K 线出现了 0.01% 的价格跳空。这个跳空在回测引擎中被识别为“极端行情信号”,触发了错误的开仓指令。
这不是孤例。我们接触过的量化团队中,超过六成曾在回测阶段遭遇过数据质量问题导致的“伪失效”——策略本身没问题,数据在某个你没注意到的角落里悄悄失真了。
TickDB 的历史 K 线数据之所以标榜“清洗对齐”,正是因为我们花了大量工程资源去解决这些“角落里的失真”。本文从五个核心维度拆解这套清洗流程:复权标准、时间戳对齐、异常值检测、数据一致性和缺失值处理。
一、复权标准:前复权、后复权、还是不复权?
这是所有历史数据使用者都会遇到的第一个问题。
复权的本质
股票在分红、配股、拆股时会发生权益调整,如果不加处理,历史价格和当前价格就无法直接对比。举例:某股票价格 100 元,10 股送 10 股后,股价变成 50 元。如果不做复权处理,你看到的历史价格会显示“股价从 100 跌到 50”,这显然不是真实的收益来源。
复权的处理方式有三种:
- 前复权(split-adjusted to past):把历史价格按最新权益调整,让历史价格和当前价格处于同一坐标系。当前价格乘以调整系数反推历史。
- 后复权(split-adjusted to present):把最新价格按历史权益调整,让历史价格乘以调整系数推到当前。
- 不复权(as-split):保留原始价格,仅在报表层面说明分红送转。
三种方式各有适用场景,但对于构建因子和回测,前复权是最通用的选择——它让你在任意时间点的价格乘以持仓数,就得到当时的市值。
TickDB 的复权实现
def apply_split_adjustment(df, splits_df):
"""
前复权处理:沿时间轴反向应用拆股因子
参数:
df: 原始行情数据 DataFrame
splits_df: 拆股事件 DataFrame,格式: [(timestamp, symbol, ratio), ...]
"""
df = df.copy()
df = df.sort_values('timestamp')
# 按时间排序拆股事件
splits_df = splits_df.sort_values('timestamp')
# 初始化调整因子为 1.0
df['adjustment_factor'] = 1.0
for idx, row in splits_df.iterrows():
split_time = row['timestamp']
ratio = row['ratio'] # ratio > 1 表示拆股(如 1:2 则 ratio=2)
# 找出该拆股事件之后的所有记录
mask = df['timestamp'] >= split_time
# 复权因子:后续价格除以拆股比例
# 例如:拆股前价格 100,拆股后价格 50,实际涨了 0%(100/2=50)
df.loc[mask, 'adjustment_factor'] *= ratio
# 前复权:原始价格除以调整因子,还原历史视角的“真实”价格
df['adjusted_close'] = df['close'] / df['adjustment_factor']
return df
# 复权因子使用示例
# 处理后,2019年的股价可以直接与2024年对比
# 不再出现“股价从100跌到50”的虚假波动
一个容易被忽略的细节:复权与成交量的联动
大多数复权处理只调整价格,但成交量同样需要联动调整。拆股后当日成交量通常会翻倍——这不是市场活跃度提升,而是权益数量变化导致的数字游戏。如果你的因子中使用了成交量相关的指标(如波动率、换手率),不联动调整成交量会让这些因子在拆股日前后出现显著跳变。
TickDB 在复权处理中执行价格与成交量的联合调整:
def split_adjust_volume(df, ratio):
"""
复权处理必须同时调整成交量
拆股 1:2 时,成交量也会翻倍(从10000股变成20000股)
为保持因子计算的一致性,成交量同样乘以调整因子
"""
df['adjusted_volume'] = df['volume'] * ratio
return df
二、时间戳对齐:交易所时间、UTC 时间、本地时间的“三国演义”
时间戳是历史数据的隐形战场。
同一时刻,不同数据源可能给出截然不同的时间戳:
- 纳斯达克发布的价格,时间戳标注为
16:00:00 EST - 纽交所发布的价格,时间戳标注为
21:00:00 UTC - 某个第三方数据商,时间戳标注为
2024-03-15 16:00:03(没有时区信息)
如果直接把这些数据混合使用,回测引擎会误以为它们是不同时间点发生的事件,从而在分钟级甚至小时级的策略中产生严重的逻辑错位。
TickDB 的时间戳标准化流程
from datetime import timezone
from typing import Dict
# 各交易所对应的原始时区映射
EXCHANGE_TIMEZONES: Dict[str, str] = {
"NASDAQ": "America/New_York",
"NYSE": "America/New_York",
"HKEX": "Asia/Hong_Kong",
"Binance": "UTC",
}
def normalize_timestamp(row, exchange_code: str) -> int:
"""
将原始时间戳统一转换为 Unix 毫秒时间戳(UTC)
步骤:
1. 识别数据源的原始时区(通过交易所代码或元数据)
2. 解析原始时间戳(可能带时区,可能不带)
3. 转换为 UTC Unix 毫秒时间戳
4. 验证转换结果的合理性(防止夏令时切换期间的边界问题)
"""
original_ts = row['timestamp']
exchange_tz = EXCHANGE_TIMEZONES.get(exchange_code, "UTC")
# 处理带时区的字符串时间戳
if isinstance(original_ts, str):
dt = datetime.fromisoformat(original_ts.replace('Z', '+00:00'))
else:
# 如果是 naive datetime(无时区信息),假设为交易所当地时间
dt = datetime.fromtimestamp(original_ts, tz=timezone.utc)
# 然后转换到 UTC(这里需要知道原始时区)
dt = dt.astimezone(pytz.timezone(exchange_tz))
# 确保最终输出为 UTC
utc_dt = dt.astimezone(timezone.utc)
# 转换为 Unix 毫秒时间戳
return int(utc_dt.timestamp() * 1000)
夏令时切换的边界处理
这是时间戳处理中最容易出错的场景。以美国为例:
- 2024 年 3 月 10 日 02:00 开始夏令时,时钟跳到 03:00
- 2024 年 11 月 3 日 02:00 结束夏令时,时钟回退到 01:00
如果数据源在夏令时切换期间没有正确处理,会出现:
- 重叠时间:同一时间戳出现两次(02:30 出现两次,一次 EST,一次 EDT)
- 缺失时间:某个时间点被跳过(02:00 之后直接跳到 03:00,中间没有数据)
TickDB 的处理逻辑:
def handle_dst_boundary(data: list, market: str) -> list:
"""
夏令时边界检测与修正
检查同一时间戳是否有重复记录
检查时间序列中是否存在非预期的跳变
"""
if market not in ["US", "NASDAQ", "NYSE"]:
return data
# 按时间戳排序
data = sorted(data, key=lambda x: x['timestamp'])
cleaned = []
prev_ts = None
for record in data:
ts = record['timestamp']
if prev_ts is not None:
# 检测时间戳回退(可能因为夏令时回退)
if ts < prev_ts:
# 验证这是否是夏令时回退导致的合法现象
# 检查回退幅度是否在 1 小时内(夏令时最大回退 1 小时)
if prev_ts - ts <= 3600_000:
# 标记为 DST 回退记录,保留但不参与连续性计算
record['is_dst_fallback'] = True
else:
# 非预期的时间回退,标记为异常
record['is_anomaly'] = True
cleaned.append(record)
prev_ts = ts
return cleaned
三、异常值检测:价格跳变、断点、重复数据的三重过滤
原始行情数据中常见的异常类型:
| 异常类型 | 典型特征 | 可能成因 |
|---|---|---|
| 价格跳变 | 单根 K 线涨跌幅超过 20%,前后 K 线正常 | 数据传输错误、交易所系统故障 |
| 断点缺失 | 正常交易日突然缺少数据 | 网络丢包、数据源维护 |
| 重复数据 | 同一时间戳出现两条相同记录 | 数据源去重逻辑失效 |
| 虚假极值 | 最高价远高于收盘价,或最低价远低于收盘价 | K 线生成逻辑错误 |
TickDB 的异常检测算法
我们采用三层过滤机制:
import numpy as np
from typing import Tuple
class AnomalyDetector:
"""多层级异常检测器"""
def __init__(self, lookback_periods: int = 20, volatility_multiplier: float = 5.0):
"""
参数:
lookback_periods: 用于计算统计量的历史 K 线数量
volatility_multiplier: 判定异常的价格波动倍数(超过 mean ± multiplier × std 视为异常)
"""
self.lookback = lookback_periods
self.multiplier = volatility_multiplier
def detect_price_jump(self, candles: list) -> list:
"""
第一层:价格跳变检测
使用滚动统计量(均值 + 标准差)判定单根 K 线是否异常
"""
closes = np.array([c['close'] for c in candles])
anomalies = []
for i in range(self.lookback, len(candles)):
# 计算前 N 根 K 线的均值和标准差
historical = closes[i - self.lookback:i]
mean_price = np.mean(historical)
std_price = np.std(historical)
current_price = closes[i]
# 判定:超过均值的 N 个标准差视为异常
if std_price > 0: # 避免除零
z_score = abs(current_price - mean_price) / std_price
if z_score > self.multiplier:
anomalies.append({
'timestamp': candles[i]['timestamp'],
'type': 'price_jump',
'z_score': z_score,
'reason': f"价格偏离 {z_score:.2f} 个标准差"
})
return anomalies
def detect_ohlc_inconsistency(self, candle: dict) -> bool:
"""
第二层:OHLC 逻辑一致性检测
检测 K 线内部矛盾:
- 最高价 < 开盘价 且 最高价 < 收盘价
- 最低价 > 开盘价 且 最低价 > 收盘价
"""
high = candle['high']
low = candle['low']
open_price = candle['open']
close = candle['close']
# 最高价必须 >= 开仓收三者
if high < max(open_price, close, low):
return True
# 最低价必须 <= 开仓收三者
if low > min(open_price, close, high):
return True
return False
def detect_duplicates(self, candles: list) -> list:
"""
第三层:重复数据检测
检查同一时间戳是否存在多条记录
"""
seen = {}
duplicates = []
for candle in candles:
ts = candle['timestamp']
if ts in seen:
duplicates.append({
'timestamp': ts,
'count': seen[ts] + 1,
'reason': '时间戳重复'
})
seen[ts] += 1
else:
seen[ts] = 1
return duplicates
# 异常处理策略
def handle_anomalies(candles: list, anomalies: list) -> list:
"""
异常处理三步走:
1. 标记但不删除(保留追溯能力)
2. 用前后 K 线均值插值填充
3. 记录异常日志用于后续分析
"""
cleaned = candles.copy()
for anomaly in anomalies:
ts = anomaly['timestamp']
# 找到异常记录的位置
for i, candle in enumerate(cleaned):
if candle['timestamp'] == ts:
# 标记为异常
cleaned[i]['is_anomaly'] = True
# 计算插值(使用前后各 3 根 K 线的均值)
# 注意:这里为了简洁省略了具体插值逻辑
# 生产环境中需要考虑窗口边界和趋势方向
break
return cleaned
一个真实案例:2023 年某小型科技股的闪崩事件
2023 年 5 月,某只纳斯达克小盘股在正常交易时段出现了单笔成交价格为 0.01 美元的情况——该公司股价正常水平在 15 美元附近。事后查明是某个做市商的错误报价导致。
这种异常如果直接进入回测数据,会导致策略计算出 99.9% 的潜在收益,从而产生错误的因子权重。TickDB 在检测到这类极端异常后,会:
- 标记该条数据为
is_anomaly - 使用前后 5 根 K 线的加权均值替代异常值
- 在元数据中记录异常事件,供后续审计
四、数据一致性:从采集到存储的全链路校验
清洗对齐不是一次性操作,而是贯穿整个数据生命周期的持续过程。
TickDB 的数据一致性架构
原始采集 → 标准化处理 → 异常检测 → 时间戳对齐 → 复权计算 → 入库 → 出库校验
↓ ↓ ↓ ↓ ↓ ↓ ↓
源校验 格式校验 逻辑校验 时区校验 因子校验 存储校验 出库校验
每个环节都有对应的校验机制:
def validate_data_integrity(df: pd.DataFrame) -> dict:
"""
出库前数据完整性校验
校验维度:
1. 时间连续性:无缺失交易日(美股每年约 252 个交易日)
2. 价格连续性:相邻 K 线涨跌幅在合理范围内
3. 成交量非负:所有记录 volume >= 0
4. OHLC 逻辑:High >= max(Open, Close, Low), Low <= min(Open, Close, High)
"""
report = {
'total_records': len(df),
'missing_dates': [],
'price_gaps': [],
'invalid_ohlc': 0,
'negative_volume': 0,
'passed': True
}
# 检查时间连续性
df_sorted = df.sort_values('timestamp')
expected_interval = 24 * 60 * 60 * 1000 # 日线间隔 1 天
for i in range(1, len(df_sorted)):
gap = df_sorted.iloc[i]['timestamp'] - df_sorted.iloc[i-1]['timestamp']
# 允许周末间隔(3天)和节假日间隔
if gap > 4 * expected_interval: # 超过 4 天视为异常缺失
report['missing_dates'].append({
'from': df_sorted.iloc[i-1]['timestamp'],
'to': df_sorted.iloc[i]['timestamp'],
'gap_days': gap / expected_interval
})
# 检查 OHLC 逻辑
for idx, row in df_sorted.iterrows():
if row['high'] < max(row['open'], row['close'], row['low']):
report['invalid_ohlc'] += 1
if row['low'] > min(row['open'], row['close'], row['high']):
report['invalid_ohlc'] += 1
# 检查成交量
if (df_sorted['volume'] < 0).any():
report['negative_volume'] = (df_sorted['volume'] < 0).sum()
# 综合判定
report['passed'] = (
len(report['missing_dates']) == 0 and
report['invalid_ohlc'] == 0 and
report['negative_volume'] == 0
)
return report
# 校验不通过时的处理
def quarantine_invalid_data(df: pd.DataFrame, report: dict):
"""
校验失败的数据进入隔离区,等待人工复核
不直接删除,保留原始数据用于问题追溯
"""
pass # 实际实现需要对接隔离存储系统
五、缺失值处理:休市期间、停牌、数据源故障的区别对待
不同的缺失原因,对应不同的处理策略。
| 缺失场景 | 特征 | TickDB 处理方式 |
|---|---|---|
| 正常休市 | 周末、节假日,有规律 | 标记 market_closed=True,不填充数据 |
| 盘中停牌 | 交易时间内突然停止 | 保留停牌前最后一条 K 线,标记 suspended=True |
| 数据源故障 | 无规律、连续缺失 | 根据业务需求选择:零填充 / 前向填充 / 线性插值 |
| 熔断触发 | 非标准休市但停止交易 | 标记 circuit_breaker=True |
def handle_missing_periods(df: pd.DataFrame, symbol: str, exchange: str) -> pd.DataFrame:
"""
缺失值处理:根据缺失类型选择不同策略
核心原则:不要假设数据不存在就是“应该连续”
休市和停牌是有意义的市场状态,不是需要填补的空白
"""
df = df.copy()
# 检测缺失模式
df['timestamp_diff'] = df['timestamp'].diff()
# 按缺失类型标记
if exchange in ["NASDAQ", "NYSE"]:
expected_interval = 24 * 60 * 60 * 1000 # 日线
# 超过 1 天且是 3 的倍数 = 周末/节假日
df['is_weekend'] = (df['timestamp_diff'] > expected_interval) & \
((df['timestamp_diff'] / expected_interval) % 3 == 0)
# 盘中突然断开 = 停牌
df['is_suspended'] = (df['timestamp_diff'] > 60_000) & \
(df['timestamp_diff'] < expected_interval) & \
(~df['is_weekend'])
# 数据源故障导致的缺失:使用前向填充 + 标记
# 注意:前向填充会引入“虚假连续性”,需要在使用时注意
df['forward_filled'] = df['close'].isna()
df['close'] = df['close'].fillna(method='ffill')
return df
六、清洗效果对比:TickDB vs 原始数据
以下是 TickDB 清洗后的数据与原始数据的关键指标对比:
| 维度 | 原始数据常见问题 | TickDB 处理后 |
|---|---|---|
| 复权一致性 | 前复权/后复权/不复权混用 | 统一前复权,价格可直接跨周期对比 |
| 时间戳精度 | UTC/EST/HKT 混用,精度秒级/毫秒级混用 | 统一 UTC 毫秒时间戳 |
| 价格跳变 | 单日涨跌幅 50%+ 的虚假波动 | 超过 5σ 标记为异常并插值 |
| 时间连续性 | 熔断期间数据丢失不知 | 完整标记休市/停牌/熔断 |
| 数据重复 | 同一时间戳多条记录 | 自动去重,保留第一条 |
| OHLC 逻辑 | 高价 < 收盘价等矛盾 | 100% 校验,矛盾数据隔离 |
一个具体的对比案例
以苹果公司(AAPL.US)2020 年 3 月的历史数据为例:
| 指标 | 原始数据 | TickDB 清洗后 |
|---|---|---|
| 2020-03-16 收盘价 | $74.25(不复权) | $73.28(前复权) |
| 时间戳格式 | 2020-03-16 16:00:00 EST |
1584392400000 (UTC ms) |
| 最高价验证 | 存在 High < Close 的异常记录 | 100% 逻辑校验通过 |
| 休市标记 | 周末数据缺失,未标记 | 标记 is_weekend=True |
七、如果你在使用未清洗的数据做回测
我们见过太多这样的场景:策略逻辑完美,代码实现无误,回测曲线亮眼——实盘一跑就亏。
问题往往不在策略本身,而在于回测所使用的数据质量。
如果你当前的数据存在以下任一问题:
- 历史价格没有统一复权标准
- 时间戳存在时区混乱或精度不一致
- 异常值没有检测和标记
- 休市/停牌期间数据被错误填充
你的回测结果可能存在系统性的偏差。这种偏差在短期策略中可能不明显,但在需要长周期验证的因子(如价值因子、动量因子)上,会被显著放大。
下一步行动:
- 如果你正在选择数据源,可以用 TickDB 的免费层进行对比测试(注册地址:tickdb.ai)
- 如果你已经在用其他数据源,可以用本文的检测方法自行校验数据质量
- 如果你需要 10 年级别的历史 K 线数据用于因子回测,联系 [email protected] 获取机构级数据方案
回测局限性说明:本文涉及的数据清洗逻辑基于公开市场信息和行业标准实践,具体处理规则可能因市场、标的和时间段而异。历史数据清洗不能完全消除市场结构变化带来的影响,建议在实盘前进行样本外测试。
本文为 TickDB 内容战略专家出品,如需技术对接或数据质量验证,可通过官方渠道联系团队。