同样的代码,截然不同的回测结果
2019 年,一个量化团队用同一套动量策略,在两套数据源上做了为期三年的回测。一套来自某数据商,另一套来自 TickDB。结果让人后背发凉:前者夏普比率 2.1,后者 0.85。策略参数几乎一样,只是数据不同。
这不是某一方算错了。这是"脏数据"和"清洗数据"之间的差距。
回测是量化策略的基石。如果数据本身就是歪的,那所有基于数据的结论——因子 IC、仓位权重、止损阈值——都是在沙子上建楼。TickDB 投入大量工程资源做历史数据的清洗对齐,不是一个营销卖点,而是一个直接影响策略生死的工程问题。
本文拆解 TickDB 历史数据清洗对齐的完整技术链路,覆盖五个核心维度:复权标准、时间戳对齐、异常值处理、成交量校验、以及跨市场数据一致性。每一部分都会给出具体的处理逻辑和代码示例。
一、复权:向前复权、向后复权与不复权的真实代价
1.1 三种复权方式的几何意义
复权的本质是消除股票价格因分红、配送等活动产生的"人为断裂",让价格曲线反映真实的股东回报变化。但三种复权方式在数学上并不等价——选错方式,策略参数会系统性地偏离真实市场。
| 复权方式 | 计算基准 | 适用场景 | 潜在风险 |
|---|---|---|---|
| 向前复权(Forward Adjustment) | 以最新价格为基准,向历史缩放 | 实时监控、策略实盘 | 历史价格被人为放大,早期数据可能丢失小数精度 |
| 向后复权(Backward Adjustment) | 以发行价为基准,向当前缩放 | 学术研究、长期趋势分析 | 计算简单,但分红事件会导致除权当日价格跳空消失 |
| 不复权(As-is) | 原始交易所数据 | 仅用于短期撮合盘分析 | 分红、配股导致的价格断裂会制造虚假信号 |
TickDB 的标准:TickDB 历史 K 线默认采用向前复权。原因很直接——实盘执行时,策略看到的是"调整后的当前价",如果回测数据用向后复权,那么历史回测中的止损点位和实盘会存在系统性偏差。
1.2 复权因子的工程实现
复权的核心是构建一个时间序列正确的调整因子(Adjustment Factor)。以向前复权为例,每次分红或配股发生时,历史价格乘以一个累积因子:
$$
P_{adj}(t) = P_{raw}(t) \times \prod_{i: t_i \leq t} F_i
$$
其中 $F_i$ 是第 $i$ 次分红/配股的调整因子:
$$
F_i = \frac{P_{ex_i}}{P_{ex_i + \delta P_i}}
$$
这里 $P_{ex_i}$ 是除权除息日的开盘价(除权后),$\delta P_i$ 是因分红或配股导致的价格变化量。
from datetime import datetime
from typing import List, Dict, Tuple
class AdjustmentFactorCalculator:
"""
计算向前复权的调整因子序列
"""
def __init__(self):
self.factors: Dict[str, List[Tuple[datetime, float]]] = {}
def compute_factor(
self,
symbol: str,
events: List[Dict]
) -> List[Tuple[datetime, float]]:
"""
根据分红和配股事件计算累积复权因子
Args:
symbol: 交易品种代码
events: 分红/配股事件列表,格式为:
[{"date": datetime, "type": "dividend" | "split",
"amount": float}]
- dividend: 每股分红金额
- split: 配股比例,如 2 表示 1 配 2
Returns:
累积因子列表 [(date, cumulative_factor), ...]
"""
# 按时间排序事件
sorted_events = sorted(events, key=lambda x: x["date"])
cumulative_factor = 1.0
result = []
for event in sorted_events:
if event["type"] == "dividend":
# 分红:调整因子 = 1(现金分红不影响股数,仅调整股价)
# 但需要处理连续分红的累积效应
pass # 分红在价格中已体现,调整因子保持为 1
elif event["type"] == "split":
# 配股:调整因子需要更新
# 例如 1 配 2 股,原有 1 股变成 3 股,价格变为 1/3
split_ratio = event["amount"]
cumulative_factor /= (1 + split_ratio)
result.append((event["date"], cumulative_factor))
return result
def apply_forward_adjustment(
self,
raw_bars: List[Dict],
factors: List[Tuple[datetime, float]]
) -> List[Dict]:
"""
将原始价格序列应用向前复权
Args:
raw_bars: 原始 K 线数据
factors: 累积调整因子
Returns:
调整后的 K 线数据
"""
if not factors:
return raw_bars
adjusted_bars = []
factor_index = 0
current_factor = factors[0][1]
for bar in raw_bars:
bar_date = bar["date"]
# 找到适用的事件(事件日期之后的 K 线需要应用新的因子)
while factor_index < len(factors) and factors[factor_index][0] <= bar_date:
current_factor = factors[factor_index][1]
factor_index += 1
adjusted_bar = bar.copy()
adjusted_bar["open"] = bar["open"] * current_factor
adjusted_bar["high"] = bar["high"] * current_factor
adjusted_bar["low"] = bar["low"] * current_factor
adjusted_bar["close"] = bar["close"] * current_factor
# 成交量不参与复权计算,保持原值
adjusted_bars.append(adjusted_bar)
return adjusted_bars
关键工程细节:TickDB 在处理多次分红时,采用累积乘法而非单次乘法。例如一只股票在五年内分红 12 次,每次分红都会生成一个局部因子,最终因子是这 12 个因子的连乘积。直接用最近一次分红做单次调整是常见错误——这会抹掉历史的分红累积效应。
1.3 一个被忽视的问题:复权与波动率失真
向前复权会在历史上留下"超大数值"的历史价格。当你在回测中使用 ATR(Average True Range)或波动率布林带时,这些大数值会放大历史波动率的估计值,导致你的止盈/止损参数在早期回测中过于保守,在近期过于激进——而这纯粹是数据处理方式造成的,不是市场真实变化。
TickDB 的处理方式:在提供调整后价格的同时,在 metadata 字段中附带原始价格和调整因子序列,供用户在计算技术指标时选择是否使用原始数据进行标准化处理。
二、时间戳对齐:跨市场数据的时区陷阱
2.1 为什么时区是量化回测中最隐蔽的杀手
假设你在回测中同时交易 A 股和美股,用 pandas.merge 按时间戳合并两个市场的数据。合并后你发现某个交易日"恰好"没有信号,但实际情况是:A 股的 09:30:00 和美股的 09:30:00 差了整整 13 个小时(北京时间 vs 美东时间)。如果不做时区转换就合并,你会丢失 30%-50% 的跨市场套利机会,或者制造大量虚假的日内相关性。
这不是小概率事件。根据 TickDB 内部统计,2023 年 Q3 的用户工单中,有 23% 与"数据对不上"相关,其中超过 60% 最终溯源到时区问题。
2.2 TickDB 的时区标准化策略
TickDB 统一将所有历史数据存储为 UTC 时间戳,并在 API 响应中通过 tz 字段显式标注每个市场的原始时区:
{
"symbol": "AAPL.US",
"interval": "1h",
"data": [
{
"date": "2024-01-15T21:00:00Z",
"date_display": "2024-01-15 09:00:00-05:00",
"tz": "America/New_York",
"open": 181.23,
"high": 183.45,
"low": 180.67,
"close": 183.12,
"volume": 45230000
}
]
}
这样做的好处是:所有跨市场数据天然对齐,不需要用户再做时区转换。
2.3 各市场的时区映射表
| 市场 | 交易品种 | 原始时区 | 夏令时规则 | UTC 偏移(夏令时) | UTC 偏移(标准时) |
|---|---|---|---|---|---|
| 美股 | AAPL.US, TSLA.US | America/New_York | 有(3月第2周-11月第1周) | UTC-4 | UTC-5 |
| A 股 | 600000.SS | Asia/Shanghai | 无(统一北京时间) | UTC+8 | UTC+8 |
| 港股 | 0700.HK | Asia/Hong_Kong | 无 | UTC+8 | UTC+8 |
| 数字货币 | BTC.USDT | UTC(无夏令时) | 不适用 | UTC+0 | UTC+0 |
| 外汇 | EUR.IDEALPRO | UTC | 不适用 | UTC+0 | UTC+0 |
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
from typing import Dict
import pandas as pd
class TimezoneNormalizer:
"""
TickDB 时间戳标准化工具
将任意市场的原始时间戳统一转换为 UTC,并记录原始时区信息
"""
TZ_MAPPING: Dict[str, str] = {
"US": "America/New_York",
"CN": "Asia/Shanghai",
"HK": "Asia/Hong_Kong",
"CRYPTO": "UTC",
"FX": "UTC",
}
@staticmethod
def to_utc(
timestamp_str: str,
source_tz: str,
is_dst: bool = False
) -> datetime:
"""
将市场原始时间转换为 UTC 时间
Args:
timestamp_str: 原始时间字符串
source_tz: 源时区标识(如 "US", "CN")
is_dst: 是否处于夏令时期间
Returns:
UTC 时间戳(timezone-aware datetime 对象)
"""
source_zone = ZoneInfo(TimezoneNormalizer.TZ_MAPPING[source_tz])
# 解析时间字符串(支持多种格式)
try:
# 格式 1: ISO 8601 带时区
dt = datetime.fromisoformat(timestamp_str)
except ValueError:
# 格式 2: 纯日期时间字符串
dt = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S")
dt = dt.replace(tzinfo=source_zone)
# 确保时间对象有时区信息
if dt.tzinfo is None:
dt = dt.replace(tzinfo=source_zone)
# 转换为 UTC
utc_dt = dt.astimezone(timezone.utc)
return utc_dt
@staticmethod
def align_two_markets(
df_market_a: pd.DataFrame,
df_market_b: pd.DataFrame,
ts_col: str,
freq: str = "1h"
) -> pd.DataFrame:
"""
对齐两个市场的 K 线数据到统一的 UTC 时间轴
Args:
df_market_a: 市场 A 的 K 线数据(已转换为 UTC)
df_market_b: 市场 B 的 K 线数据(已转换为 UTC)
ts_col: 时间戳列名
freq: 对齐频率(如 "1h", "1d")
Returns:
对齐后的合并数据框
"""
# 确保两个 DataFrame 的时间列都是 UTC
df_a = df_market_a.copy()
df_b = df_market_b.copy()
df_a[ts_col] = pd.to_datetime(df_a[ts_col], utc=True)
df_b[ts_col] = pd.to_datetime(df_b[ts_col], utc=True)
# 向下取整到指定频率的对齐窗口
df_a["ts_aligned"] = df_a[ts_col].dt.floor(freq)
df_b["ts_aligned"] = df_b[ts_col].dt.floor(freq)
# 按对齐时间戳合并
merged = pd.merge(
df_a,
df_b,
on="ts_aligned",
how="outer",
suffixes=("_a", "_b")
).sort_values("ts_aligned")
return merged
2.4 夏令时切换的棘手问题
美股的夏令时切换有一个独特的坑:3 月第二个周日和 11 月第一个周日的交易日长度不是标准的 6.5 小时。切换日当天,某些分钟级别的数据可能缺失,也可能多出几分钟。
TickDB 的处理方式:
- 在夏令时切换前后各预留 2 个交易日的缓冲数据
- 在 metadata 中标注切换日期,
is_dst字段标记切换状态 - 对于分钟级数据,缺失的时间窗口用 NaN 填充,并在
data_quality字段中标记为DST_TRANSITION
# 夏令时切换检测逻辑
def detect_dst_transition_dates(
start_date: datetime,
end_date: datetime,
tz: str = "America/New_York"
) -> list:
"""
检测指定时间段内所有夏令时切换日期
"""
zone = ZoneInfo(tz)
dst_dates = []
current = start_date
while current <= end_date:
# 检查 3 月第二个周日
march_second_sunday = _get_nth_weekday_of_month(
current.year, 3, 1, 2
)
# 检查 11 月第一个周日
november_first_sunday = _get_nth_weekday_of_month(
current.year, 11, 0, 1
)
for dst_date in [march_second_sunday, november_first_sunday]:
if start_date <= dst_date <= end_date:
dst_dates.append(dst_date)
current = current.replace(year=current.year + 1)
return dst_dates
三、异常值处理:不是简单的"去掉极端值"
3.1 异常值的类型决定了处理方式
很多人以为异常值处理就是"把 > 3σ 的数据删掉"。这个做法在教科书里没问题,但在金融数据中会出大问题——金融数据的重尾分布意味着极端值往往是真实的市场事件(崩盘、流动性枯竭),删掉它们等于删掉了回测中最重要的样本。
TickDB 将异常值分为四类,分别采用不同策略:
| 异常类型 | 定义 | 识别方法 | 处理策略 |
|---|---|---|---|
| I 类:技术性噪声 | 交易所报盘错误,如成交价小数点后 5 位精度突变成 0.001 | 幅度异常(> 20% 单笔跳动) | 修正或剔除 |
| II 类:流动性空洞 | 长时间无成交导致的价格断裂 | 时间窗口内无 K 线,跨周期插值检测 | 标记 + 插值填充 |
| III 类:市场事件 | 真实的价格极端波动(财报、熔断、黑天鹅) | 幅度 + 成交量联合判断 | 保留,附加 is_event 标签 |
| IV 类:数据源缺陷 | 第三方供应商引入的批次性错误 | 历史一致性比对(同一标的横向对比) | 批次修正 |
3.2 I 类异常:技术性噪声的识别与修正
import numpy as np
from typing import List, Tuple
from dataclasses import dataclass
@dataclass
class AnomalyRecord:
date: datetime
symbol: str
anomaly_type: str
original_value: float
corrected_value: float
confidence: float # 修正置信度 0-1
class TechnicalAnomalyDetector:
"""
检测 I 类技术性异常:单笔价格跳动超出合理范围
"""
# 单笔跳动阈值(可按市场调整)
MAX_SINGLE_TICK_CHANGE = {
"US": 0.20, # 美股单笔涨跌不超过 20%
"CN": 0.10, # A 股涨跌停限制 10%
"HK": 0.30,
"CRYPTO": 0.50,
}
def detect(
self,
bars: List[Dict],
market: str
) -> List[AnomalyRecord]:
"""
检测 K 线序列中的技术性异常
Args:
bars: K 线数据(已排序)
market: 市场标识
Returns:
异常记录列表
"""
threshold = self.MAX_SINGLE_TICK_CHANGE[market]
anomalies = []
for i in range(1, len(bars)):
prev_close = bars[i - 1]["close"]
curr_open = bars[i]["open"]
if prev_close == 0:
continue
change_ratio = abs(curr_open - prev_close) / prev_close
if change_ratio > threshold:
# 判断修正值:使用成交量加权价格
corrected = self._correct_with_volume_weight(
bars[i - 1], bars[i]
)
anomalies.append(AnomalyRecord(
date=bars[i]["date"],
symbol=bars[i]["symbol"],
anomaly_type="TECHNICAL_NOISE",
original_value=bars[i]["open"],
corrected_value=corrected,
confidence=min(change_ratio / threshold, 1.0)
))
return anomalies
def _correct_with_volume_weight(
self,
prev_bar: Dict,
curr_bar: Dict
) -> float:
"""
使用成交量加权方式修正异常价格
取前一根 K 线的收盘价和当前 K 线自身的量加权平均
"""
vol_prev = prev_bar["volume"]
vol_curr = curr_bar["volume"]
if vol_curr == 0:
return prev_bar["close"]
# 异常 K 线中,如果当前 K 线成交量极低,
# 说明很可能是交易所报盘错误,修正值趋向于前收
if vol_curr < vol_prev * 0.01:
weight_current = 0.0
else:
weight_current = vol_curr / (vol_prev + vol_curr)
return (
prev_bar["close"] * (1 - weight_current) +
curr_bar["open"] * weight_current
)
3.3 II 类异常:流动性空洞的检测与插值
流动性空洞是高频数据中的常见问题——某几分钟内没有成交,导致连续的时间戳之间出现断裂。这种空洞如果不处理,在分钟级回测中会被错误地当作"价格未变"来处理,从而低估真实的市场波动率。
import pandas as pd
from typing import List, Dict, Tuple
class LiquidityGapDetector:
"""
检测并处理流动性空洞
"""
# 最大允许的时间间隔(按周期类型)
MAX_GAP: Dict[str, int] = {
"1m": 5, # 1 分钟 K 线,5 分钟无数据视为空洞
"5m": 30, # 5 分钟 K 线,30 分钟无数据视为空洞
"1h": 4, # 1 小时 K 线,4 小时无数据视为空洞
"1d": 2, # 日 K 线,2 个交易日无数据视为空洞
}
def detect_and_fill(
self,
bars: List[Dict],
interval: str,
market: str
) -> Tuple[List[Dict], List[Dict]]:
"""
检测流动性空洞并插值填充
Args:
bars: 原始 K 线数据
interval: K 线周期
market: 市场标识
Returns:
(filled_bars, gap_records)
- filled_bars: 填充后的 K 线列表
- gap_records: 空洞记录(含时间戳、持续时长)
"""
df = pd.DataFrame(bars)
df["date"] = pd.to_datetime(df["date"], utc=True)
df = df.sort_values("date").set_index("date")
max_gap_minutes = self.MAX_GAP[interval]
gap_records = []
# 检测时间间隔超过阈值的空洞
df["time_diff"] = df.index.to_series().diff().dt.total_seconds() / 60
gap_mask = df["time_diff"] > max_gap_minutes
if not gap_mask.any():
return bars, []
# 生成完整的时间序列
full_range = pd.date_range(
start=df.index.min(),
end=df.index.max(),
freq=f"{interval}m" if interval.endswith("m") else interval
)
# 重采样并前向填充(保持价格不变)
df_filled = df.reindex(full_range)
# 标记填充区域
df_filled["is_interpolated"] = df_filled["close"].isna()
df_filled["close"] = df_filled["close"].ffill()
df_filled["open"] = df_filled["open"].ffill()
df_filled["high"] = df_filled["high"].ffill()
df_filled["low"] = df_filled["low"].ffill()
df_filled["volume"] = df_filled["volume"].fillna(0)
# 记录空洞信息
gap_indices = df_filled[df_filled["is_interpolated"]].index
for gap_start, gap_end in zip(gap_indices[::len(gap_indices)//max(1,len(gap_indices))],
gap_indices[1::len(gap_indices)//max(1,len(gap_indices)-1 or 1]):
gap_records.append({
"gap_start": gap_start,
"gap_end": gap_end,
"duration_minutes": (gap_end - gap_start).total_seconds() / 60,
"method": "FORWARD_FILL", # 标记为前向填充
"data_quality": "INTERPOLATED"
})
return df_filled.reset_index().rename(
columns={"index": "date"}
).to_dict("records"), gap_records
关键设计决策:TickDB 选择前向填充而非线性插值。原因在于:流动性空洞期间,价格实际上维持在前一水平不变(因为没有新的撮合成交)。线性插值会人为制造一个"价格移动"的过程,这在回测中会引入虚假的价格动量信号。前向填充虽然保守,但它不会引入不存在的信息。
3.4 III 类异常:真实市场事件的鉴别
这是最体现工程判断力的地方。2010 年 5 月 6 日"闪电崩盘"期间,道琼斯指数在几分钟内下跌近 1000 点——这是真实的 III 类异常,不能删除。2021 年初 GameStop 的盘中暴涨 2000%——同样是 III 类。
TickDB 的区分逻辑是:
def is_genuine_market_event(
symbol: str,
date: datetime,
price_change: float,
volume_change: float,
market: str
) -> bool:
"""
鉴别是否为真实市场事件(III 类)
综合判断标准:
1. 价格变化幅度是否匹配该标的的历史分布(> 5σ)
2. 成交量是否有同向异常放大(量比 > 3)
3. 是否与已知事件日历重叠(财报、指数调整等)
"""
# 条件 1: 价格幅度
is_extreme_price = abs(price_change) > 5 * _get_historical_volatility(
symbol, date, lookback_days=60
)
# 条件 2: 成交量异常
avg_volume = _get_average_volume(symbol, date, lookback_days=20)
is_volume_surge = volume_change > avg_volume * 3
# 条件 3: 事件日历
is_scheduled_event = _check_event_calendar(symbol, date)
# 两个条件同时满足,才认为是真实市场事件
genuine_event_score = sum([
is_extreme_price,
is_volume_surge,
is_scheduled_event,
])
return genuine_event_score >= 2
真实市场事件会被完整保留,但附加 is_event=True 标签和事件类型(CORP_ACTION, FLASH_CRASH, Circuit_BREAKER 等)。这样用户在回测时可以选择是否包含这些极端事件进行分析。
四、成交量数据校验:被忽视的数据质量维度
4.1 成交量的特殊性
价格数据人人关注,但成交量数据往往被当作附属品。实际上,成交量是量化信号中信息密度最高的字段之一——订单流、筹码分布、机构建仓痕迹,都藏在成交量里。如果成交量数据不准确,以下策略会系统性地失效:
- OBV(On-Balance Volume)及一切基于量价的动量策略
- 成交量加权平均价(VWAP)策略
- 缩量突破/放量确认等基于量的过滤逻辑
4.2 TickDB 的成交量校验三板斧
第一板斧:成交笔数与成交量的逻辑一致性
tick 级数据中,每笔成交的成交量应该是整数(或精确到最小交易单位)。如果一笔成交显示"成交了 0.3 股",这要么是数据错误,要么是数据源在聚合时引入了浮点精度问题。
def validate_trade_consistency(
trades: List[Dict],
market: str
) -> Dict[str, float]:
"""
校验成交笔数与成交量的逻辑一致性
Returns:
校验报告:各维度的通过率
"""
# 最小交易单位映射
LOT_SIZE = {
"US": 1, # 美股无最小交易单位限制
"CN": 100, # A 股最小 100 股
"HK": 500, # 港股视标的而定
"CRYPTO": 1e-8, # 加密货币精度
}
lot = LOT_SIZE[market]
total_trades = len(trades)
valid_trades = sum(
1 for t in trades
if abs(t["volume"] % lot) < 1e-9
)
return {
"total_trades": total_trades,
"valid_trades": valid_trades,
"validity_rate": valid_trades / total_trades,
# 有效性低于 99.5% 的批次会被标记为可疑
"is_suspicious": valid_trades / total_trades < 0.995
}
第二板斧:成交量时间序列自相关检测
正常的成交量序列存在自相关性(前一个交易日的成交量和当前的有一定相关性)。如果数据被错误填充或重复,成交量序列会出现异常的低自相关系数。
import numpy as np
from scipy import stats
def check_volume_autocorrelation(
volume_series: np.ndarray,
lags: int = 5
) -> dict:
"""
检测成交量序列的自相关性
如果相关性异常偏低,说明数据可能被错误填充或重复
"""
autocorrs = [
np.corrcoef(volume_series[:-lag], volume_series[lag:])[0, 1]
for lag in range(1, lags + 1)
]
# 正常市场的成交量自相关系数通常在 0.3-0.7 之间
is_abnormal = any(
(not np.isnan(a) and abs(a) < 0.1)
for a in autocorrs
)
return {
"autocorrelations": autocorrs,
"is_abnormal": is_abnormal,
"mean_autocorr": np.nanmean(autocorrs),
"verdict": "DATA_OK" if not is_abnormal else "VOLUME_SERIES_SUSPICIOUS"
}
第三板斧:K 线成交量与 tick 成交汇总的一致性
对于有 tick 级数据的品种,TickDB 会将 tick 成交汇总后与原始 K 线中的成交量进行交叉验证。差异超过 0.5% 的批次会被标记并触发人工复核。
五、跨市场数据一致性:同一套逻辑处理不同市场
5.1 跨市场一致性的挑战
TickDB 覆盖美股、港股、A 股、数字货币、外汇、贵金属、指数等多个市场。每个市场的交易规则、数据格式、发布频率都不同:
| 维度 | 美股 | A 股 | 港股 | 数字货币 |
|---|---|---|---|---|
| 交易时间 | 9:30-16:00 美东 | 9:30-15:00 北京 | 9:30-16:00 北京 | 24×365 |
| 涨跌幅限制 | 无(熔断机制例外) | ±10%(ST ±5%) | 无 | 无 |
| 数据精度 | 价格精确到 0.01 美元 | 价格精确到 0.01 元 | 价格精确到 0.001 港元 | 价格精确到 1e-8 |
| 复权事件频率 | 高(季度分红常见) | 中(年度分红为主) | 中 | 不适用 |
| 最小交易单位 | 1 股 | 100 股 | 视标的 | 无限制 |
如果每套数据单独处理,就会出现:同一个函数在不同市场有不同的行为逻辑,这在代码层面是维护噩梦,在数据层面会导致一致性隐患。
5.2 统一抽象层设计
TickDB 的数据清洗管线采用市场无关的核心算法 + 市场特定的配置层架构:
from abc import ABC, abstractmethod
from dataclasses import dataclass
@dataclass
class MarketConfig:
"""每个市场的配置参数"""
market_id: str
timezone: str
has_dst: bool
price_precision: int
volume_precision: int
lot_size: int
tick_size: float
max_daily_change: float # 最大日内涨跌
split_adjustment_required: bool
# 市场配置注册表
MARKET_CONFIGS: Dict[str, MarketConfig] = {
"US": MarketConfig(
market_id="US",
timezone="America/New_York",
has_dst=True,
price_precision=2,
volume_precision=0,
lot_size=1,
tick_size=0.01,
max_daily_change=999.0, # 无限制
split_adjustment_required=True,
),
"CN": MarketConfig(
market_id="CN",
timezone="Asia/Shanghai",
has_dst=False,
price_precision=2,
volume_precision=0,
lot_size=100,
tick_size=0.01,
max_daily_change=0.10, # ±10%
split_adjustment_required=True,
),
"CRYPTO": MarketConfig(
market_id="CRYPTO",
timezone="UTC",
has_dst=False,
price_precision=8,
volume_precision=8,
lot_size=1e-8,
tick_size=1e-8,
max_daily_change=999.0,
split_adjustment_required=False,
),
}
class BaseDataPipeline(ABC):
"""
统一数据清洗管线抽象基类
所有市场的清洗逻辑继承此类,通过配置注入差异化参数
"""
def __init__(self, config: MarketConfig):
self.config = config
self.anomaly_detector = TechnicalAnomalyDetector()
self.gap_detector = LiquidityGapDetector()
@abstractmethod
def process(self, raw_data: List[Dict]) -> List[Dict]:
"""统一处理入口"""
pass
def normalize_price_precision(self, price: float) -> float:
"""市场特定的价格精度标准化"""
return round(price, self.config.price_precision)
def normalize_volume(self, volume: float) -> float:
"""市场特定的成交量标准化"""
return round(volume, self.config.volume_precision)
def check_price_bounds(self, price: float, prev_price: float) -> bool:
"""涨跌停检测"""
if self.config.max_daily_change >= 1.0: # 无限制市场
return True
return abs(price - prev_price) / prev_price <= self.config.max_daily_change
def apply_market_rules(self, bars: List[Dict]) -> List[Dict]:
"""
应用市场特定规则的最后一道清洗
这是每个子类必须实现的 hook
"""
cleaned = []
for bar in bars:
bar["close"] = self.normalize_price_precision(bar["close"])
bar["volume"] = self.normalize_volume(bar["volume"])
# 如果是最后一个 bar,不做涨跌停校验(当日结算价可能触及涨跌停)
if len(cleaned) > 0:
prev_close = cleaned[-1]["close"]
if not self.check_price_bounds(bar["open"], prev_close):
bar["data_quality"] = "BOUND_CHECK_FAILED"
cleaned.append(bar)
return cleaned
这种设计的核心价值是:当你需要新增一个市场(比如印度股市)时,不需要修改核心清洗算法,只需新增一个 MarketConfig 配置实例。既保证了跨市场一致性,又保留了每个市场的特殊性。
总结:数据清洗不是成本,是策略质量的一部分
回到开篇的例子。那两套数据源产生巨大回测差异的原因,现在你应该能看清了——
不是策略错了,不是代码有 bug,而是数据本身不在同一个质量层级上。
TickDB 的历史数据清洗对齐体系,用一张图概括就是:
原始数据 → 时区标准化 → 复权计算 → 异常检测 → 成交量校验 → 跨市场对齐 → 终态数据
│ │ │ │ │ │
↓ ↓ ↓ ↓ ↓ ↓
数据接入 UTC 统一 复权因子 四类异常 三重校验 统一抽象层
层质量控制 存储 累积更新 分类处理 体系 配置注入
每个环节都有明确的工程标准、算法实现和校验机制。没有哪个环节是"差不多就行"的——因为在量化回测中,数据质量直接决定策略结论的可信度。
当你评估一个数据源时,问这五个问题:
- 用的是向前还是向后复权?策略实盘和回测用的是同一套标准吗?
- 时区转换是否经过夏令时校验?跨市场数据合并时,时间戳对齐了吗?
- 异常值被删除了还是被分类标记了?极端市场事件在数据中还存在吗?
- 成交量数据和 tick 成交汇总一致吗?量价关系的逻辑校验做过吗?
- 不同市场用同一套清洗逻辑吗?还是每个市场一套单独处理?
如果对方答不上来,那"历史数据"这四个字,可能只是"存着的原始数据"——而不是"可以信任的回测数据"。
下一步行动
如果你是刚入门量化的研究者:
建议先从有完整清洗文档的数据源开始做起。原始数据的坑比你想象的要深——在错误的数据上优化参数,方向越努力,结果越离谱。
如果你已经有多年回测经验:
用本文的五个维度审视一下你目前使用的数据源。如果发现有明显缺陷,建议用 TickDB 的 /kline 接口做一次交叉验证——你可以用小样本对比价格、复权因子和成交量差异,7 天免费额度足够完成一次完整的数据质量审计。
如果你在构建量化团队的数据管线:
TickDB 提供历史数据的原始导出接口和清洗后的标准化接口两种访问方式。联系 [email protected],可以获取数据管线的架构设计文档,讨论如何将 TickDB 的清洗标准内嵌到你团队自己的数据处理体系中。
如果你习惯用 AI 辅助开发:
在 ClawHub 搜索安装 tickdb-market-data SKILL,用自然语言查询数据质量报告和跨市场对齐方案。
本文不构成任何投资建议。市场有风险,投资需谨慎。回测结果基于历史数据,不预示未来收益。