TickDB 的历史数据清洗对齐到底做了什么?对比原始数据的 5 个维度

"你的策略在过去三年回测中年化收益 42%,夏普比率 2.1。但实盘跑了三个月,亏损 18%。"

这不是策略失效。这很可能是数据问题。

量化社区有句老话:垃圾进,垃圾出。但少有人意识到,即使数据"看起来正确",未经清洗对齐的历史数据也可能让你的回测结果与实盘产生根本性偏差。本文深入拆解 TickDB 的数据清洗对齐管线,逐一对比复权标准、时间戳对齐、异常值处理等 5 个核心维度的原始状态与清洗后状态。


一、数据问题如何毁掉一个"完美"策略

在进入技术细节之前,先看一个真实案例。

2024 年初,有用户报告基于美股股息率因子的选股策略在回测中表现优异,但实盘跑起来收益平平,甚至亏损。排查后发现原因:原始数据中的前复权因子计算存在浮点精度误差,导致小数点后第 8 位的累积误差在高股息股票上被放大。回测时每笔交易的"理论成本"比实际高出 0.003%,50 次换手后累计偏差达到 15%。

这不是孤例。我们在复盘 TickDB 数据管线时发现,原始交易所数据中存在以下几类常见问题:

问题类型 发生频率(估算) 对回测的影响
分股合股未标注 每季度约 2-5% 的股票受影响 价格跳变导致虚假趋势
时区混乱 历史数据中约 30% 存在隐患 盘前盘后数据与 K 线错位
异常价格 极端行情下约 0.1-0.5% 因子暴露度计算失真
成交量负值/零值异常 约 1-3% 的时间点 流动性因子失效

接下来,我们逐一拆解 TickDB 的清洗对齐工程。


二、五个维度的清洗对齐拆解

2.1 复权处理:前复权 vs 后复权 vs 不复权

问题本质:股票经历分股、合股、送股、配股时,每股面值不变但股数变化,历史价格必须回溯调整,否则会出现"负价格"或"跳空缺口"。

原始数据状态(以某交易所 API 返回为例):

{
  "symbol": "AAPL.US",
  "timestamp": 1699996800000,
  "open": 189.45,
  "high": 191.20,
  "low": 188.90,
  "close": 190.75,
  "volume": 52340000,
  "adj_ratio": null  // 交易所未标注复权因子
}

问题在于:2020 年 AAPL 完成 1:4 分股,如果直接用 close: 189.45 与 2019 年的价格比较,会产生 4 倍的数量级偏差。

TickDB 清洗管线处理

TickDB 采用精确复权因子算法,复权因子计算公式为:

adj_ratio = split_factor × dividend_factor × rights_factor

其中:
split_factor = 新股数 / 原股数(如 1:4 分股 → 0.25)
dividend_factor = (current_price - dividend_per_share) / current_price(简化版)
rights_factor = 配股认购价调整因子

复权后的价格计算:

adj_price = raw_price × (最新复权因子 / 历史复权因子)

TickDB 的复权标准选择

复权类型 优点 缺点 TickDB 默认选择
前复权 最新视角看历史,图表连续 历史价格可能为负(除权缺口) ✅ 采用
后复权 数学意义清晰,价格恒正 图表不连续,视觉割裂 ❌ 不采用
不复权 原始数据 无法跨时间对比 ❌ 不提供

清洗后数据示例

{
  "symbol": "AAPL.US",
  "timestamp": 1699996800000,
  "open": 47.3625,
  "high": 47.8000,
  "low": 47.2250,
  "close": 47.6875,
  "volume": 209360000,
  "adj_ratio": 0.25,
  "split_history": [
    {"timestamp": 1700000000000, "ratio": 4}
  ]
}

工程细节:TickDB 在每个时间点同时存储原始价格和复权因子,回测时可按需选择复权方式。复权因子以 64 位浮点数存储,避免精度误差累积。


2.2 时间戳对齐:UTC 时间戳到毫秒级一致性

问题本质:不同交易所的时区约定不同——有的用当地时区,有的用 UTC,有的在夏令时切换时产生偏移。直接拼接会导致时间错位。

原始数据常见问题

数据源 时间戳类型 夏令时处理 常见问题
交易所 A UTC 毫秒 自动切换 与交易所 B 混用时需手动换算
交易所 B 本地时间(UTC+8) 手动 夏令时切换时差 1 小时
传统数据商 本地时间 无切换记录 无法区分 UTC 0 和 UTC+8

典型踩坑场景

某用户使用 TickDB 港股数据与自采的美股数据做跨市场分析,发现港股上午 9:30 的 K 线与美股上午 9:30 的 K 线在时间轴上完全不对齐——因为港股 9:30 是 UTC+8,而美股 9:30 是 UTC-5/-4。

TickDB 清洗管线处理

时间戳清洗流程:
1. 读取交易所原始时间戳字段
2. 识别时间戳类型(UTC/本地时间/Unix 秒)
3. 加载夏令时切换记录(DST 数据库)
4. 统一转换为 UTC 毫秒时间戳
5. 标记原始时间戳类型以供审计

TickDB 的统一时间标准

市场 原始时区 清洗后时区 标注字段
美股 EST/EDT UTC tz_origin: "America/New_York"
港股 HKT UTC tz_origin: "Asia/Hong_Kong"
A 股 CST UTC tz_origin: "Asia/Shanghai"
数字货币 UTC UTC tz_origin: "UTC"

清洗后数据示例

{
  "symbol": "0700.HK",
  "timestamp": 1700184200000,
  "local_time": "2023-11-17 09:30:00",
  "utc_time": "2023-11-17 01:30:00",
  "tz_origin": "Asia/Hong_Kong",
  "is_dst": false,
  "close": 312.40,
  "volume": 12450000
}

工程细节:TickDB 使用 IANA 时区数据库(tzdata),每年更新以跟踪全球夏令时规则变化。所有时间相关计算在 UTC 时间戳上进行,输出时提供可选的本地时间转换。


2.3 异常值检测:三分位数法与上下文验证

问题本质:原始数据中存在因系统故障、人为错误、极端事件导致的异常值。这些值如果进入回测,会严重扭曲因子计算。

异常值类型与来源

类型 特征 来源
价格飞点 单笔成交价偏离前后 10 个标准差 交易所系统故障
成交量归零 正常交易时段成交量为 0 数据采集丢失
负价格 价格为负数 系统 bug
时间戳跳跃 相邻 K 线时间间隔异常 数据补录时对齐错误

TickDB 异常检测算法

采用双层验证机制

第一层:统计异常检测

def detect_outlier_by_stats(series: pd.Series, method: str = "iqr", threshold: float = 3.0) -> list:
    """
    基于统计方法的异常值检测
    method: "iqr" (四分位距法) 或 "zscore" (Z-score 法)
    threshold: IQR 倍数或 Z-score 阈值
    """
    if method == "iqr":
        q1 = series.quantile(0.25)
        q3 = series.quantile(0.75)
        iqr = q3 - q1
        lower_bound = q1 - threshold * iqr
        upper_bound = q3 + threshold * iqr
        outlier_mask = (series < lower_bound) | (series > upper_bound)
    elif method == "zscore":
        z_scores = np.abs((series - series.mean()) / series.std())
        outlier_mask = z_scores > threshold
    
    outlier_indices = series[outlier_mask].index.tolist()
    return outlier_indices

第二层:上下文验证

def context_validation(df: pd.DataFrame, symbol: str, window: int = 20) -> list:
    """
    上下文验证:检查异常值是否在历史上下文中合理
    """
    outliers = []
    
    for i in range(window, len(df)):
        window_data = df.iloc[i-window:i]
        current = df.iloc[i]
        
        # 检查价格跳变
        avg_volatility = window_data['close'].pct_change().std()
        price_change = abs(current['close'] - df.iloc[i-1]['close']) / df.iloc[i-1]['close']
        
        if price_change > 10 * avg_volatility:
            outliers.append({
                'index': i,
                'type': 'price_jump',
                'change': price_change,
                'threshold': 10 * avg_volatility
            })
        
        # 检查成交量归零(非停牌时段)
        if current['volume'] == 0 and not is_market_closed(symbol, current['timestamp']):
            outliers.append({
                'index': i,
                'type': 'volume_zero',
                'timestamp': current['timestamp']
            })
    
    return outliers

异常值处理策略

异常类型 处理策略 标注字段
统计异常(可修复) 用前后均值插值替代 data_cleaned: true, method: "interpolation"
上下文异常 标记为可疑,保留原始值供审计 data_flag: "suspicious"
无法修复的缺失 标记为 NaN,排除出因子计算 data_flag: "excluded"
明显的系统故障 用前一时段数据填充并标注 data_cleaned: true, method: "forward_fill"

2.4 K 线聚合:对齐规则与边界处理

问题本质:不同数据源的 K 线聚合起点不同——有的按北京时间 0:00,有的按纽约时间 0:00,有的按 UTC 0:00。同一时间段的数据必须对齐到统一的聚合窗口。

TickDB 的 K 线对齐规则

K 线周期 对齐起点 示例(UTC 时间)
1 分钟 每个整分钟 00:00:00, 00:01:00, ...
5 分钟 每个整 5 分钟 00:00:00, 00:05:00, ...
1 小时 UTC 整点 00:00:00, 01:00:00, ...
日 K 线 UTC 零点 2024-01-01 00:00:00
周 K 线 UTC 周一零点 2024-01-01 00:00:00

边界 case 处理

场景 1:数据时间戳恰好落在 K 线边界
处理:严格按时间窗口归属([start, end) 左闭右开)

场景 2:某 K 线窗口内无成交
处理:检查前序 5 分钟窗口是否有延迟数据;若无,标记为 "no_trade",不用前值填充

场景 3:K 线跨越交易日边界(如 23:00-01:00 的夜盘)
处理:按交易所规则分配到对应交易日

TickDB 的 1 分钟 K 线聚合代码示例(Python):

import pandas as pd
import numpy as np

def aggregate_1min_kline(trades_df: pd.DataFrame, market_tz: str = "America/New_York") -> pd.DataFrame:
    """
    原始逐笔成交数据聚合为 1 分钟 K 线
    严格对齐到整分钟边界
    """
    # 将时间戳转换为市场本地时间
    trades_df['datetime'] = pd.to_datetime(trades_df['timestamp'], unit='ms', utc=True)
    trades_df['datetime'] = trades_df['datetime'].dt.tz_convert(market_tz)
    
    # 对齐到整分钟
    trades_df['minute'] = trades_df['datetime'].dt.floor('min')
    
    # 聚合
    kline = trades_df.groupby('minute').agg(
        open=('price', 'first'),
        high=('price', 'max'),
        low=('price', 'min'),
        close=('price', 'last'),
        volume=('volume', 'sum'),
        trades_count=('trade_id', 'count')
    ).reset_index()
    
    # 转换回 UTC 毫秒时间戳
    kline['timestamp'] = kline['minute'].dt.tz_localize(None)
    kline['timestamp'] = (kline['timestamp'] - pd.Timestamp("1970-01-01")) // pd.Timedelta("1ms")
    
    return kline

2.5 数据完整性:缺口检测与补全策略

问题本质:历史数据中不可避免存在缺口——交易所维护、网络中断、数据商采集失败。回测时需要明确知道哪里有缺口,以避免错误地假设"无数据 = 零波动"。

TickDB 的数据完整性保障

检查项 检测逻辑 处理方式
时间序列连续性 预期间隔与实际间隔对比(如 1 分钟 K 线应间隔 60,000ms) 标记缺口,标注 gap: true
成交量连续性 检测长时间零成交但价格变化 标记为 no_trade: true
OHLC 逻辑一致性 High >= Open/Close/Low 异常数据触发清洗
文件完整性 下载文件 MD5 校验 校验失败触发重新下载

补全策略分级

Level 1(不补全,仅标注):
- 时间戳缺口 < 5 分钟
- 策略:标记为 gap,数据排除出因子计算

Level 2(线性插值):
- 缺口 5-30 分钟,且为非极端行情时段
- 策略:线性插值 price,volume 置零,标注 method: "linear_interpolation"

Level 3(边界复制):
- 缺口 30 分钟以上,或极端行情时段
- 策略:用前一时段 OHLCV 填充,标注 method: "edge_forward_fill",供用户判断是否使用

三、数据清洗对齐的工程实现

3.1 整体管线架构

┌─────────────────────────────────────────────────────────────────┐
│                      TickDB 数据清洗管线                         │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐       │
│  │  数据源接入   │───▶│  时间戳标准化│───▶│  复权因子计算│       │
│  │  (多交易所)   │    │  (UTC 毫秒)  │    │  (精确浮点)  │       │
│  └──────────────┘    └──────────────┘    └──────────────┘       │
│         │                                        │               │
│         ▼                                        ▼               │
│  ┌──────────────┐                        ┌──────────────┐        │
│  │  边界对齐    │◀───────────────────────│  异常检测     │        │
│  │  (K线聚合)   │                        │  (双层验证)   │        │
│  └──────────────┘                        └──────────────┘        │
│         │                                        │               │
│         ▼                                        ▼               │
│  ┌──────────────┐                        ┌──────────────┐        │
│  │  缺口检测    │                        │  数据标注     │        │
│  │  (完整性检查)│                        │  (审计追溯)   │        │
│  └──────────────┘                        └──────────────┘        │
│         │                                        │               │
│         └────────────────┬───────────────────────┘               │
│                          ▼                                       │
│                  ┌──────────────┐                                │
│                  │  质量验证    │                                │
│                  │  (回测兼容性)│                                │
│                  └──────────────┘                                │
│                          │                                       │
│                          ▼                                       │
│                  ┌──────────────┐                                │
│                  │  数据发布    │                                │
│                  │  (API 服务)  │                                │
│                  └──────────────┘                                │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

3.2 生产级数据验证代码

以下是 TickDB 内部使用的数据质量验证函数,可供用户自行检查数据:

import os
import requests
import pandas as pd
from typing import Optional, Dict, List
from datetime import datetime, timedelta

class TickDBDataValidator:
    """TickDB 数据质量验证工具"""
    
    def __init__(self, api_key: Optional[str] = None):
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        self.base_url = "https://api.tickdb.ai/v1"
        self.headers = {"X-API-Key": self.api_key}
    
    def validate_gap(self, symbol: str, interval: str, start: int, end: int) -> Dict:
        """
        检测时间序列缺口
        """
        response = requests.get(
            f"{self.base_url}/market/kline",
            headers=self.headers,
            params={
                "symbol": symbol,
                "interval": interval,
                "start": start,
                "end": end,
                "limit": 1000
            },
            timeout=(3.05, 10)
        )
        
        if response.status_code != 200:
            raise RuntimeError(f"API 请求失败: {response.status_code}")
        
        data = response.json()
        klines = data.get("data", [])
        
        if not klines:
            return {"status": "no_data", "gaps": []}
        
        # 计算预期时间间隔
        interval_ms = self._interval_to_ms(interval)
        
        # 检测缺口
        gaps = []
        for i in range(1, len(klines)):
            expected_diff = interval_ms
            actual_diff = klines[i]['t'] - klines[i-1]['t']
            
            if abs(actual_diff - expected_diff) > expected_diff * 0.1:  # 允许 10% 误差
                gaps.append({
                    "start": klines[i-1]['t'],
                    "end": klines[i]['t'],
                    "expected_gap": expected_diff,
                    "actual_gap": actual_diff,
                    "missing_bars": round(actual_diff / expected_diff) - 1
                })
        
        return {
            "symbol": symbol,
            "interval": interval,
            "total_bars": len(klines),
            "gap_count": len(gaps),
            "gaps": gaps[:10],  # 最多返回 10 个缺口
            "coverage_rate": round((len(klines) / ((end - start) / interval_ms)) * 100, 2)
        }
    
    def validate_ohlc_logic(self, symbol: str, limit: int = 100) -> Dict:
        """
        验证 OHLC 逻辑一致性:High >= Open/Close/Low
        """
        response = requests.get(
            f"{self.base_url}/market/kline/latest",
            headers=self.headers,
            params={
                "symbol": symbol,
                "interval": "1d",
                "limit": limit
            },
            timeout=(3.05, 10)
        )
        
        data = response.json()
        klines = data.get("data", [])
        
        anomalies = []
        for k in klines:
            if k['h'] < k['o'] or k['h'] < k['c'] or k['h'] < k['l'] or k['l'] > k['o'] or k['l'] > k['c']:
                anomalies.append({
                    "timestamp": k['t'],
                    "datetime": datetime.fromtimestamp(k['t'] / 1000).isoformat(),
                    "ohlc": {"o": k['o'], "h": k['h'], "l": k['l'], "c": k['c']},
                    "issue": self._describe_ohlc_issue(k)
                })
        
        return {
            "symbol": symbol,
            "total_bars": len(klines),
            "anomaly_count": len(anomalies),
            "anomalies": anomalies,
            "data_quality": "PASS" if len(anomalies) == 0 else "WARN"
        }
    
    def _interval_to_ms(self, interval: str) -> int:
        mapping = {
            "1m": 60000, "5m": 300000, "15m": 900000,
            "30m": 1800000, "1h": 3600000, "4h": 14400000,
            "1d": 86400000, "1w": 604800000
        }
        return mapping.get(interval, 60000)
    
    def _describe_ohlc_issue(self, k: Dict) -> str:
        issues = []
        if k['h'] < k['o']: issues.append(f"High({k['h']}) < Open({k['o']})")
        if k['h'] < k['c']: issues.append(f"High({k['h']}) < Close({k['c']})")
        if k['h'] < k['l']: issues.append(f"High({k['h']}) < Low({k['l']})")
        if k['l'] > k['o']: issues.append(f"Low({k['l']}) > Open({k['o']})")
        if k['l'] > k['c']: issues.append(f"Low({k['l']}) > Close({k['c']})")
        return "; ".join(issues)


# 使用示例
if __name__ == "__main__":
    validator = TickDBDataValidator()
    
    # 检测缺口
    now = int(datetime.now().timestamp() * 1000)
    thirty_days_ago = now - 30 * 86400000
    
    gap_report = validator.validate_gap(
        symbol="AAPL.US",
        interval="1d",
        start=thirty_days_ago,
        end=now
    )
    print(f"缺口检测报告: {gap_report}")
    
    # 验证 OHLC 逻辑
    ohlc_report = validator.validate_ohlc_logic(
        symbol="AAPL.US",
        limit=100
    )
    print(f"OHLC 逻辑验证: {ohlc_report}")

工程预警:上述验证代码适用于低频检查(每日一次)。若需实时监控高频数据流中的异常,建议使用 TickDB 的 WebSocket trades 频道进行流式异常检测。


四、清洗对齐数据 vs 原始数据对比

维度 原始数据 TickDB 清洗后
复权处理 无标注或后复权,价格可能跳变 前复权+精确因子,价格连续
时间戳 混用 UTC/本地时间,夏令时处理混乱 统一 UTC 毫秒,标注原始时区
异常值 可能包含价格飞点、成交量负值 双层检测+上下文验证+标注
K 线边界 各数据源对齐规则不统一 严格整点对齐,边界 case 明确处理
数据完整性 缺口未知,可能导致错误假设 缺口检测+补全策略+覆盖率报告

五、结语

回到开头的问题:为什么回测完美、实盘亏损?

答案往往藏在数据里。复权因子的精度误差、时间戳的时区错位、异常值进入因子计算——每一个细节的疏漏,都可能让你的策略在某个边缘场景中崩溃。

数据清洗对齐不是"锦上添花",而是量化系统的地基工程。地基不稳,盖得再高的策略也只是空中楼阁。


下一步行动

如果你在排查回测与实盘的偏差

  1. 使用 TickDB 的数据验证工具检查时间戳连续性和 OHLC 逻辑
  2. 对比原始数据与清洗数据的复权因子,确认无精度误差
  3. 查看数据标注字段(data_flagdata_cleaned),排除可疑数据

如果你需要高可靠性历史数据用于策略回测

  • 访问 tickdb.ai 注册,免费获取 API Key
  • 查看数据文档中的「数据质量报告」章节

如果你正在处理多数据源对齐问题

如果你习惯用 AI 辅助开发

  • 在 ClawHub 搜索安装 tickdb-market-data SKILL,让 AI 帮你调用 TickDB 数据并自动进行质量验证

免责声明:本文不构成任何投资建议。历史数据不代表未来表现,市场有风险,投资需谨慎。