同样的代码,截然不同的回测结果

2019 年,一个量化团队用同一套动量策略,在两套数据源上做了为期三年的回测。一套来自某数据商,另一套来自 TickDB。结果让人后背发凉:前者夏普比率 2.1,后者 0.85。策略参数几乎一样,只是数据不同。

这不是某一方算错了。这是"脏数据"和"清洗数据"之间的差距。

回测是量化策略的基石。如果数据本身就是歪的,那所有基于数据的结论——因子 IC、仓位权重、止损阈值——都是在沙子上建楼。TickDB 投入大量工程资源做历史数据的清洗对齐,不是一个营销卖点,而是一个直接影响策略生死的工程问题。

本文拆解 TickDB 历史数据清洗对齐的完整技术链路,覆盖五个核心维度:复权标准、时间戳对齐、异常值处理、成交量校验、以及跨市场数据一致性。每一部分都会给出具体的处理逻辑和代码示例。


一、复权:向前复权、向后复权与不复权的真实代价

1.1 三种复权方式的几何意义

复权的本质是消除股票价格因分红、配送等活动产生的"人为断裂",让价格曲线反映真实的股东回报变化。但三种复权方式在数学上并不等价——选错方式,策略参数会系统性地偏离真实市场。

复权方式 计算基准 适用场景 潜在风险
向前复权(Forward Adjustment) 以最新价格为基准,向历史缩放 实时监控、策略实盘 历史价格被人为放大,早期数据可能丢失小数精度
向后复权(Backward Adjustment) 以发行价为基准,向当前缩放 学术研究、长期趋势分析 计算简单,但分红事件会导致除权当日价格跳空消失
不复权(As-is) 原始交易所数据 仅用于短期撮合盘分析 分红、配股导致的价格断裂会制造虚假信号

TickDB 的标准:TickDB 历史 K 线默认采用向前复权。原因很直接——实盘执行时,策略看到的是"调整后的当前价",如果回测数据用向后复权,那么历史回测中的止损点位和实盘会存在系统性偏差。

1.2 复权因子的工程实现

复权的核心是构建一个时间序列正确的调整因子(Adjustment Factor)。以向前复权为例,每次分红或配股发生时,历史价格乘以一个累积因子:

$$
P_{adj}(t) = P_{raw}(t) \times \prod_{i: t_i \leq t} F_i
$$

其中 $F_i$ 是第 $i$ 次分红/配股的调整因子:

$$
F_i = \frac{P_{ex_i}}{P_{ex_i + \delta P_i}}
$$

这里 $P_{ex_i}$ 是除权除息日的开盘价(除权后),$\delta P_i$ 是因分红或配股导致的价格变化量。

from datetime import datetime
from typing import List, Dict, Tuple

class AdjustmentFactorCalculator:
    """
    计算向前复权的调整因子序列
    """

    def __init__(self):
        self.factors: Dict[str, List[Tuple[datetime, float]]] = {}

    def compute_factor(
        self,
        symbol: str,
        events: List[Dict]
    ) -> List[Tuple[datetime, float]]:
        """
        根据分红和配股事件计算累积复权因子

        Args:
            symbol: 交易品种代码
            events: 分红/配股事件列表,格式为:
                    [{"date": datetime, "type": "dividend" | "split",
                      "amount": float}]
                    - dividend: 每股分红金额
                    - split: 配股比例,如 2 表示 1 配 2

        Returns:
            累积因子列表 [(date, cumulative_factor), ...]
        """
        # 按时间排序事件
        sorted_events = sorted(events, key=lambda x: x["date"])

        cumulative_factor = 1.0
        result = []

        for event in sorted_events:
            if event["type"] == "dividend":
                # 分红:调整因子 = 1(现金分红不影响股数,仅调整股价)
                # 但需要处理连续分红的累积效应
                pass  # 分红在价格中已体现,调整因子保持为 1

            elif event["type"] == "split":
                # 配股:调整因子需要更新
                # 例如 1 配 2 股,原有 1 股变成 3 股,价格变为 1/3
                split_ratio = event["amount"]
                cumulative_factor /= (1 + split_ratio)

            result.append((event["date"], cumulative_factor))

        return result

    def apply_forward_adjustment(
        self,
        raw_bars: List[Dict],
        factors: List[Tuple[datetime, float]]
    ) -> List[Dict]:
        """
        将原始价格序列应用向前复权

        Args:
            raw_bars: 原始 K 线数据
            factors: 累积调整因子

        Returns:
            调整后的 K 线数据
        """
        if not factors:
            return raw_bars

        adjusted_bars = []
        factor_index = 0
        current_factor = factors[0][1]

        for bar in raw_bars:
            bar_date = bar["date"]

            # 找到适用的事件(事件日期之后的 K 线需要应用新的因子)
            while factor_index < len(factors) and factors[factor_index][0] <= bar_date:
                current_factor = factors[factor_index][1]
                factor_index += 1

            adjusted_bar = bar.copy()
            adjusted_bar["open"] = bar["open"] * current_factor
            adjusted_bar["high"] = bar["high"] * current_factor
            adjusted_bar["low"] = bar["low"] * current_factor
            adjusted_bar["close"] = bar["close"] * current_factor
            # 成交量不参与复权计算,保持原值

            adjusted_bars.append(adjusted_bar)

        return adjusted_bars

关键工程细节:TickDB 在处理多次分红时,采用累积乘法而非单次乘法。例如一只股票在五年内分红 12 次,每次分红都会生成一个局部因子,最终因子是这 12 个因子的连乘积。直接用最近一次分红做单次调整是常见错误——这会抹掉历史的分红累积效应。

1.3 一个被忽视的问题:复权与波动率失真

向前复权会在历史上留下"超大数值"的历史价格。当你在回测中使用 ATR(Average True Range)或波动率布林带时,这些大数值会放大历史波动率的估计值,导致你的止盈/止损参数在早期回测中过于保守,在近期过于激进——而这纯粹是数据处理方式造成的,不是市场真实变化。

TickDB 的处理方式:在提供调整后价格的同时,在 metadata 字段中附带原始价格和调整因子序列,供用户在计算技术指标时选择是否使用原始数据进行标准化处理。


二、时间戳对齐:跨市场数据的时区陷阱

2.1 为什么时区是量化回测中最隐蔽的杀手

假设你在回测中同时交易 A 股和美股,用 pandas.merge 按时间戳合并两个市场的数据。合并后你发现某个交易日"恰好"没有信号,但实际情况是:A 股的 09:30:00 和美股的 09:30:00 差了整整 13 个小时(北京时间 vs 美东时间)。如果不做时区转换就合并,你会丢失 30%-50% 的跨市场套利机会,或者制造大量虚假的日内相关性。

这不是小概率事件。根据 TickDB 内部统计,2023 年 Q3 的用户工单中,有 23% 与"数据对不上"相关,其中超过 60% 最终溯源到时区问题。

2.2 TickDB 的时区标准化策略

TickDB 统一将所有历史数据存储为 UTC 时间戳,并在 API 响应中通过 tz 字段显式标注每个市场的原始时区:

{
  "symbol": "AAPL.US",
  "interval": "1h",
  "data": [
    {
      "date": "2024-01-15T21:00:00Z",
      "date_display": "2024-01-15 09:00:00-05:00",
      "tz": "America/New_York",
      "open": 181.23,
      "high": 183.45,
      "low": 180.67,
      "close": 183.12,
      "volume": 45230000
    }
  ]
}

这样做的好处是:所有跨市场数据天然对齐,不需要用户再做时区转换。

2.3 各市场的时区映射表

市场 交易品种 原始时区 夏令时规则 UTC 偏移(夏令时) UTC 偏移(标准时)
美股 AAPL.US, TSLA.US America/New_York 有(3月第2周-11月第1周) UTC-4 UTC-5
A 股 600000.SS Asia/Shanghai 无(统一北京时间) UTC+8 UTC+8
港股 0700.HK Asia/Hong_Kong UTC+8 UTC+8
数字货币 BTC.USDT UTC(无夏令时) 不适用 UTC+0 UTC+0
外汇 EUR.IDEALPRO UTC 不适用 UTC+0 UTC+0
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
from typing import Dict
import pandas as pd

class TimezoneNormalizer:
    """
    TickDB 时间戳标准化工具
    将任意市场的原始时间戳统一转换为 UTC,并记录原始时区信息
    """

    TZ_MAPPING: Dict[str, str] = {
        "US": "America/New_York",
        "CN": "Asia/Shanghai",
        "HK": "Asia/Hong_Kong",
        "CRYPTO": "UTC",
        "FX": "UTC",
    }

    @staticmethod
    def to_utc(
        timestamp_str: str,
        source_tz: str,
        is_dst: bool = False
    ) -> datetime:
        """
        将市场原始时间转换为 UTC 时间

        Args:
            timestamp_str: 原始时间字符串
            source_tz: 源时区标识(如 "US", "CN")
            is_dst: 是否处于夏令时期间

        Returns:
            UTC 时间戳(timezone-aware datetime 对象)
        """
        source_zone = ZoneInfo(TimezoneNormalizer.TZ_MAPPING[source_tz])

        # 解析时间字符串(支持多种格式)
        try:
            # 格式 1: ISO 8601 带时区
            dt = datetime.fromisoformat(timestamp_str)
        except ValueError:
            # 格式 2: 纯日期时间字符串
            dt = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S")
            dt = dt.replace(tzinfo=source_zone)

        # 确保时间对象有时区信息
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=source_zone)

        # 转换为 UTC
        utc_dt = dt.astimezone(timezone.utc)

        return utc_dt

    @staticmethod
    def align_two_markets(
        df_market_a: pd.DataFrame,
        df_market_b: pd.DataFrame,
        ts_col: str,
        freq: str = "1h"
    ) -> pd.DataFrame:
        """
        对齐两个市场的 K 线数据到统一的 UTC 时间轴

        Args:
            df_market_a: 市场 A 的 K 线数据(已转换为 UTC)
            df_market_b: 市场 B 的 K 线数据(已转换为 UTC)
            ts_col: 时间戳列名
            freq: 对齐频率(如 "1h", "1d")

        Returns:
            对齐后的合并数据框
        """
        # 确保两个 DataFrame 的时间列都是 UTC
        df_a = df_market_a.copy()
        df_b = df_market_b.copy()

        df_a[ts_col] = pd.to_datetime(df_a[ts_col], utc=True)
        df_b[ts_col] = pd.to_datetime(df_b[ts_col], utc=True)

        # 向下取整到指定频率的对齐窗口
        df_a["ts_aligned"] = df_a[ts_col].dt.floor(freq)
        df_b["ts_aligned"] = df_b[ts_col].dt.floor(freq)

        # 按对齐时间戳合并
        merged = pd.merge(
            df_a,
            df_b,
            on="ts_aligned",
            how="outer",
            suffixes=("_a", "_b")
        ).sort_values("ts_aligned")

        return merged

2.4 夏令时切换的棘手问题

美股的夏令时切换有一个独特的坑:3 月第二个周日和 11 月第一个周日的交易日长度不是标准的 6.5 小时。切换日当天,某些分钟级别的数据可能缺失,也可能多出几分钟。

TickDB 的处理方式:

  • 在夏令时切换前后各预留 2 个交易日的缓冲数据
  • 在 metadata 中标注切换日期,is_dst 字段标记切换状态
  • 对于分钟级数据,缺失的时间窗口用 NaN 填充,并在 data_quality 字段中标记为 DST_TRANSITION
# 夏令时切换检测逻辑
def detect_dst_transition_dates(
    start_date: datetime,
    end_date: datetime,
    tz: str = "America/New_York"
) -> list:
    """
    检测指定时间段内所有夏令时切换日期
    """
    zone = ZoneInfo(tz)
    dst_dates = []
    current = start_date

    while current <= end_date:
        # 检查 3 月第二个周日
        march_second_sunday = _get_nth_weekday_of_month(
            current.year, 3, 1, 2
        )
        # 检查 11 月第一个周日
        november_first_sunday = _get_nth_weekday_of_month(
            current.year, 11, 0, 1
        )

        for dst_date in [march_second_sunday, november_first_sunday]:
            if start_date <= dst_date <= end_date:
                dst_dates.append(dst_date)

        current = current.replace(year=current.year + 1)

    return dst_dates

三、异常值处理:不是简单的"去掉极端值"

3.1 异常值的类型决定了处理方式

很多人以为异常值处理就是"把 > 3σ 的数据删掉"。这个做法在教科书里没问题,但在金融数据中会出大问题——金融数据的重尾分布意味着极端值往往是真实的市场事件(崩盘、流动性枯竭),删掉它们等于删掉了回测中最重要的样本。

TickDB 将异常值分为四类,分别采用不同策略:

异常类型 定义 识别方法 处理策略
I 类:技术性噪声 交易所报盘错误,如成交价小数点后 5 位精度突变成 0.001 幅度异常(> 20% 单笔跳动) 修正或剔除
II 类:流动性空洞 长时间无成交导致的价格断裂 时间窗口内无 K 线,跨周期插值检测 标记 + 插值填充
III 类:市场事件 真实的价格极端波动(财报、熔断、黑天鹅) 幅度 + 成交量联合判断 保留,附加 is_event 标签
IV 类:数据源缺陷 第三方供应商引入的批次性错误 历史一致性比对(同一标的横向对比) 批次修正

3.2 I 类异常:技术性噪声的识别与修正

import numpy as np
from typing import List, Tuple
from dataclasses import dataclass

@dataclass
class AnomalyRecord:
    date: datetime
    symbol: str
    anomaly_type: str
    original_value: float
    corrected_value: float
    confidence: float  # 修正置信度 0-1

class TechnicalAnomalyDetector:
    """
    检测 I 类技术性异常:单笔价格跳动超出合理范围
    """

    # 单笔跳动阈值(可按市场调整)
    MAX_SINGLE_TICK_CHANGE = {
        "US": 0.20,      # 美股单笔涨跌不超过 20%
        "CN": 0.10,      # A 股涨跌停限制 10%
        "HK": 0.30,
        "CRYPTO": 0.50,
    }

    def detect(
        self,
        bars: List[Dict],
        market: str
    ) -> List[AnomalyRecord]:
        """
        检测 K 线序列中的技术性异常

        Args:
            bars: K 线数据(已排序)
            market: 市场标识

        Returns:
            异常记录列表
        """
        threshold = self.MAX_SINGLE_TICK_CHANGE[market]
        anomalies = []

        for i in range(1, len(bars)):
            prev_close = bars[i - 1]["close"]
            curr_open = bars[i]["open"]

            if prev_close == 0:
                continue

            change_ratio = abs(curr_open - prev_close) / prev_close

            if change_ratio > threshold:
                # 判断修正值:使用成交量加权价格
                corrected = self._correct_with_volume_weight(
                    bars[i - 1], bars[i]
                )

                anomalies.append(AnomalyRecord(
                    date=bars[i]["date"],
                    symbol=bars[i]["symbol"],
                    anomaly_type="TECHNICAL_NOISE",
                    original_value=bars[i]["open"],
                    corrected_value=corrected,
                    confidence=min(change_ratio / threshold, 1.0)
                ))

        return anomalies

    def _correct_with_volume_weight(
        self,
        prev_bar: Dict,
        curr_bar: Dict
    ) -> float:
        """
        使用成交量加权方式修正异常价格
        取前一根 K 线的收盘价和当前 K 线自身的量加权平均
        """
        vol_prev = prev_bar["volume"]
        vol_curr = curr_bar["volume"]

        if vol_curr == 0:
            return prev_bar["close"]

        # 异常 K 线中,如果当前 K 线成交量极低,
        # 说明很可能是交易所报盘错误,修正值趋向于前收
        if vol_curr < vol_prev * 0.01:
            weight_current = 0.0
        else:
            weight_current = vol_curr / (vol_prev + vol_curr)

        return (
            prev_bar["close"] * (1 - weight_current) +
            curr_bar["open"] * weight_current
        )

3.3 II 类异常:流动性空洞的检测与插值

流动性空洞是高频数据中的常见问题——某几分钟内没有成交,导致连续的时间戳之间出现断裂。这种空洞如果不处理,在分钟级回测中会被错误地当作"价格未变"来处理,从而低估真实的市场波动率。

import pandas as pd
from typing import List, Dict, Tuple

class LiquidityGapDetector:
    """
    检测并处理流动性空洞
    """

    # 最大允许的时间间隔(按周期类型)
    MAX_GAP: Dict[str, int] = {
        "1m": 5,    # 1 分钟 K 线,5 分钟无数据视为空洞
        "5m": 30,   # 5 分钟 K 线,30 分钟无数据视为空洞
        "1h": 4,    # 1 小时 K 线,4 小时无数据视为空洞
        "1d": 2,    # 日 K 线,2 个交易日无数据视为空洞
    }

    def detect_and_fill(
        self,
        bars: List[Dict],
        interval: str,
        market: str
    ) -> Tuple[List[Dict], List[Dict]]:
        """
        检测流动性空洞并插值填充

        Args:
            bars: 原始 K 线数据
            interval: K 线周期
            market: 市场标识

        Returns:
            (filled_bars, gap_records)
            - filled_bars: 填充后的 K 线列表
            - gap_records: 空洞记录(含时间戳、持续时长)
        """
        df = pd.DataFrame(bars)
        df["date"] = pd.to_datetime(df["date"], utc=True)
        df = df.sort_values("date").set_index("date")

        max_gap_minutes = self.MAX_GAP[interval]
        gap_records = []

        # 检测时间间隔超过阈值的空洞
        df["time_diff"] = df.index.to_series().diff().dt.total_seconds() / 60
        gap_mask = df["time_diff"] > max_gap_minutes

        if not gap_mask.any():
            return bars, []

        # 生成完整的时间序列
        full_range = pd.date_range(
            start=df.index.min(),
            end=df.index.max(),
            freq=f"{interval}m" if interval.endswith("m") else interval
        )

        # 重采样并前向填充(保持价格不变)
        df_filled = df.reindex(full_range)

        # 标记填充区域
        df_filled["is_interpolated"] = df_filled["close"].isna()
        df_filled["close"] = df_filled["close"].ffill()
        df_filled["open"] = df_filled["open"].ffill()
        df_filled["high"] = df_filled["high"].ffill()
        df_filled["low"] = df_filled["low"].ffill()
        df_filled["volume"] = df_filled["volume"].fillna(0)

        # 记录空洞信息
        gap_indices = df_filled[df_filled["is_interpolated"]].index
        for gap_start, gap_end in zip(gap_indices[::len(gap_indices)//max(1,len(gap_indices))],
                                      gap_indices[1::len(gap_indices)//max(1,len(gap_indices)-1 or 1]):
            gap_records.append({
                "gap_start": gap_start,
                "gap_end": gap_end,
                "duration_minutes": (gap_end - gap_start).total_seconds() / 60,
                "method": "FORWARD_FILL",  # 标记为前向填充
                "data_quality": "INTERPOLATED"
            })

        return df_filled.reset_index().rename(
            columns={"index": "date"}
        ).to_dict("records"), gap_records

关键设计决策:TickDB 选择前向填充而非线性插值。原因在于:流动性空洞期间,价格实际上维持在前一水平不变(因为没有新的撮合成交)。线性插值会人为制造一个"价格移动"的过程,这在回测中会引入虚假的价格动量信号。前向填充虽然保守,但它不会引入不存在的信息。

3.4 III 类异常:真实市场事件的鉴别

这是最体现工程判断力的地方。2010 年 5 月 6 日"闪电崩盘"期间,道琼斯指数在几分钟内下跌近 1000 点——这是真实的 III 类异常,不能删除。2021 年初 GameStop 的盘中暴涨 2000%——同样是 III 类。

TickDB 的区分逻辑是:

def is_genuine_market_event(
    symbol: str,
    date: datetime,
    price_change: float,
    volume_change: float,
    market: str
) -> bool:
    """
    鉴别是否为真实市场事件(III 类)
    综合判断标准:
    1. 价格变化幅度是否匹配该标的的历史分布(> 5σ)
    2. 成交量是否有同向异常放大(量比 > 3)
    3. 是否与已知事件日历重叠(财报、指数调整等)
    """
    # 条件 1: 价格幅度
    is_extreme_price = abs(price_change) > 5 * _get_historical_volatility(
        symbol, date, lookback_days=60
    )

    # 条件 2: 成交量异常
    avg_volume = _get_average_volume(symbol, date, lookback_days=20)
    is_volume_surge = volume_change > avg_volume * 3

    # 条件 3: 事件日历
    is_scheduled_event = _check_event_calendar(symbol, date)

    # 两个条件同时满足,才认为是真实市场事件
    genuine_event_score = sum([
        is_extreme_price,
        is_volume_surge,
        is_scheduled_event,
    ])

    return genuine_event_score >= 2

真实市场事件会被完整保留,但附加 is_event=True 标签和事件类型(CORP_ACTION, FLASH_CRASH, Circuit_BREAKER 等)。这样用户在回测时可以选择是否包含这些极端事件进行分析。


四、成交量数据校验:被忽视的数据质量维度

4.1 成交量的特殊性

价格数据人人关注,但成交量数据往往被当作附属品。实际上,成交量是量化信号中信息密度最高的字段之一——订单流、筹码分布、机构建仓痕迹,都藏在成交量里。如果成交量数据不准确,以下策略会系统性地失效:

  • OBV(On-Balance Volume)及一切基于量价的动量策略
  • 成交量加权平均价(VWAP)策略
  • 缩量突破/放量确认等基于量的过滤逻辑

4.2 TickDB 的成交量校验三板斧

第一板斧:成交笔数与成交量的逻辑一致性

tick 级数据中,每笔成交的成交量应该是整数(或精确到最小交易单位)。如果一笔成交显示"成交了 0.3 股",这要么是数据错误,要么是数据源在聚合时引入了浮点精度问题。

def validate_trade_consistency(
    trades: List[Dict],
    market: str
) -> Dict[str, float]:
    """
    校验成交笔数与成交量的逻辑一致性

    Returns:
        校验报告:各维度的通过率
    """
    # 最小交易单位映射
    LOT_SIZE = {
        "US": 1,       # 美股无最小交易单位限制
        "CN": 100,     # A 股最小 100 股
        "HK": 500,     # 港股视标的而定
        "CRYPTO": 1e-8, # 加密货币精度
    }

    lot = LOT_SIZE[market]
    total_trades = len(trades)
    valid_trades = sum(
        1 for t in trades
        if abs(t["volume"] % lot) < 1e-9
    )

    return {
        "total_trades": total_trades,
        "valid_trades": valid_trades,
        "validity_rate": valid_trades / total_trades,
        # 有效性低于 99.5% 的批次会被标记为可疑
        "is_suspicious": valid_trades / total_trades < 0.995
    }

第二板斧:成交量时间序列自相关检测

正常的成交量序列存在自相关性(前一个交易日的成交量和当前的有一定相关性)。如果数据被错误填充或重复,成交量序列会出现异常的低自相关系数。

import numpy as np
from scipy import stats

def check_volume_autocorrelation(
    volume_series: np.ndarray,
    lags: int = 5
) -> dict:
    """
    检测成交量序列的自相关性

    如果相关性异常偏低,说明数据可能被错误填充或重复
    """
    autocorrs = [
        np.corrcoef(volume_series[:-lag], volume_series[lag:])[0, 1]
        for lag in range(1, lags + 1)
    ]

    # 正常市场的成交量自相关系数通常在 0.3-0.7 之间
    is_abnormal = any(
        (not np.isnan(a) and abs(a) < 0.1)
        for a in autocorrs
    )

    return {
        "autocorrelations": autocorrs,
        "is_abnormal": is_abnormal,
        "mean_autocorr": np.nanmean(autocorrs),
        "verdict": "DATA_OK" if not is_abnormal else "VOLUME_SERIES_SUSPICIOUS"
    }

第三板斧:K 线成交量与 tick 成交汇总的一致性

对于有 tick 级数据的品种,TickDB 会将 tick 成交汇总后与原始 K 线中的成交量进行交叉验证。差异超过 0.5% 的批次会被标记并触发人工复核。


五、跨市场数据一致性:同一套逻辑处理不同市场

5.1 跨市场一致性的挑战

TickDB 覆盖美股、港股、A 股、数字货币、外汇、贵金属、指数等多个市场。每个市场的交易规则、数据格式、发布频率都不同:

维度 美股 A 股 港股 数字货币
交易时间 9:30-16:00 美东 9:30-15:00 北京 9:30-16:00 北京 24×365
涨跌幅限制 无(熔断机制例外) ±10%(ST ±5%)
数据精度 价格精确到 0.01 美元 价格精确到 0.01 元 价格精确到 0.001 港元 价格精确到 1e-8
复权事件频率 高(季度分红常见) 中(年度分红为主) 不适用
最小交易单位 1 股 100 股 视标的 无限制

如果每套数据单独处理,就会出现:同一个函数在不同市场有不同的行为逻辑,这在代码层面是维护噩梦,在数据层面会导致一致性隐患。

5.2 统一抽象层设计

TickDB 的数据清洗管线采用市场无关的核心算法 + 市场特定的配置层架构:

from abc import ABC, abstractmethod
from dataclasses import dataclass

@dataclass
class MarketConfig:
    """每个市场的配置参数"""
    market_id: str
    timezone: str
    has_dst: bool
    price_precision: int
    volume_precision: int
    lot_size: int
    tick_size: float
    max_daily_change: float  # 最大日内涨跌
    split_adjustment_required: bool

# 市场配置注册表
MARKET_CONFIGS: Dict[str, MarketConfig] = {
    "US": MarketConfig(
        market_id="US",
        timezone="America/New_York",
        has_dst=True,
        price_precision=2,
        volume_precision=0,
        lot_size=1,
        tick_size=0.01,
        max_daily_change=999.0,  # 无限制
        split_adjustment_required=True,
    ),
    "CN": MarketConfig(
        market_id="CN",
        timezone="Asia/Shanghai",
        has_dst=False,
        price_precision=2,
        volume_precision=0,
        lot_size=100,
        tick_size=0.01,
        max_daily_change=0.10,  # ±10%
        split_adjustment_required=True,
    ),
    "CRYPTO": MarketConfig(
        market_id="CRYPTO",
        timezone="UTC",
        has_dst=False,
        price_precision=8,
        volume_precision=8,
        lot_size=1e-8,
        tick_size=1e-8,
        max_daily_change=999.0,
        split_adjustment_required=False,
    ),
}

class BaseDataPipeline(ABC):
    """
    统一数据清洗管线抽象基类
    所有市场的清洗逻辑继承此类,通过配置注入差异化参数
    """

    def __init__(self, config: MarketConfig):
        self.config = config
        self.anomaly_detector = TechnicalAnomalyDetector()
        self.gap_detector = LiquidityGapDetector()

    @abstractmethod
    def process(self, raw_data: List[Dict]) -> List[Dict]:
        """统一处理入口"""
        pass

    def normalize_price_precision(self, price: float) -> float:
        """市场特定的价格精度标准化"""
        return round(price, self.config.price_precision)

    def normalize_volume(self, volume: float) -> float:
        """市场特定的成交量标准化"""
        return round(volume, self.config.volume_precision)

    def check_price_bounds(self, price: float, prev_price: float) -> bool:
        """涨跌停检测"""
        if self.config.max_daily_change >= 1.0:  # 无限制市场
            return True
        return abs(price - prev_price) / prev_price <= self.config.max_daily_change

    def apply_market_rules(self, bars: List[Dict]) -> List[Dict]:
        """
        应用市场特定规则的最后一道清洗
        这是每个子类必须实现的 hook
        """
        cleaned = []
        for bar in bars:
            bar["close"] = self.normalize_price_precision(bar["close"])
            bar["volume"] = self.normalize_volume(bar["volume"])

            # 如果是最后一个 bar,不做涨跌停校验(当日结算价可能触及涨跌停)
            if len(cleaned) > 0:
                prev_close = cleaned[-1]["close"]
                if not self.check_price_bounds(bar["open"], prev_close):
                    bar["data_quality"] = "BOUND_CHECK_FAILED"

            cleaned.append(bar)

        return cleaned

这种设计的核心价值是:当你需要新增一个市场(比如印度股市)时,不需要修改核心清洗算法,只需新增一个 MarketConfig 配置实例。既保证了跨市场一致性,又保留了每个市场的特殊性。


总结:数据清洗不是成本,是策略质量的一部分

回到开篇的例子。那两套数据源产生巨大回测差异的原因,现在你应该能看清了——

不是策略错了,不是代码有 bug,而是数据本身不在同一个质量层级上。

TickDB 的历史数据清洗对齐体系,用一张图概括就是:

原始数据 → 时区标准化 → 复权计算 → 异常检测 → 成交量校验 → 跨市场对齐 → 终态数据
     │            │          │          │           │              │
     ↓            ↓          ↓          ↓           ↓              ↓
  数据接入    UTC 统一    复权因子   四类异常    三重校验      统一抽象层
  层质量控制   存储       累积更新    分类处理    体系         配置注入

每个环节都有明确的工程标准、算法实现和校验机制。没有哪个环节是"差不多就行"的——因为在量化回测中,数据质量直接决定策略结论的可信度

当你评估一个数据源时,问这五个问题:

  1. 用的是向前还是向后复权?策略实盘和回测用的是同一套标准吗?
  2. 时区转换是否经过夏令时校验?跨市场数据合并时,时间戳对齐了吗?
  3. 异常值被删除了还是被分类标记了?极端市场事件在数据中还存在吗?
  4. 成交量数据和 tick 成交汇总一致吗?量价关系的逻辑校验做过吗?
  5. 不同市场用同一套清洗逻辑吗?还是每个市场一套单独处理?

如果对方答不上来,那"历史数据"这四个字,可能只是"存着的原始数据"——而不是"可以信任的回测数据"。


下一步行动

如果你是刚入门量化的研究者
建议先从有完整清洗文档的数据源开始做起。原始数据的坑比你想象的要深——在错误的数据上优化参数,方向越努力,结果越离谱。

如果你已经有多年回测经验
用本文的五个维度审视一下你目前使用的数据源。如果发现有明显缺陷,建议用 TickDB 的 /kline 接口做一次交叉验证——你可以用小样本对比价格、复权因子和成交量差异,7 天免费额度足够完成一次完整的数据质量审计。

如果你在构建量化团队的数据管线
TickDB 提供历史数据的原始导出接口和清洗后的标准化接口两种访问方式。联系 [email protected],可以获取数据管线的架构设计文档,讨论如何将 TickDB 的清洗标准内嵌到你团队自己的数据处理体系中。

如果你习惯用 AI 辅助开发
在 ClawHub 搜索安装 tickdb-market-data SKILL,用自然语言查询数据质量报告和跨市场对齐方案。


本文不构成任何投资建议。市场有风险,投资需谨慎。回测结果基于历史数据,不预示未来收益。