异常值检测与修正:价格跳空、成交量突增的自动化处理
"The market is always telling you something. The question is whether you're hearing the signal or the noise."
2010 年 5 月 6 日,道琼斯工业平均指数在几分钟内暴跌近 1000 点,随后迅速反弹。这次"闪电崩盘"让无数量化系统陷入混乱——大量止损单被触发,算法不计成本地抛售,而某些数据源记录的"错误"价格至今仍是金融史研究的对象。
十年后的今天,同样的场景在不同市场反复上演:MEME 股的狂热导致 Robinhood 数据异常、数字货币的流动性危机制造虚假价差、交易所系统升级期间的短暂数据真空。对于量化交易者,核心问题从来不是"数据有没有异常",而是"这个异常是真实市场的声音,还是系统制造的杂音"。
本文构建一套生产级异常值检测与修正框架,涵盖统计检测、规则过滤、人工复核的三级体系,并提供可直接集成到 TickDB 数据管道中的 Python 实现。
一、问题建模:异常值的四种来源
在设计检测系统之前,必须理解"坏数据"从何而来。金融数据的异常值通常来自四个层面:
1.1 市场真实异动
价格或成交量的剧烈变化由真实的经济事件驱动:财报发布、央行决议、地缘冲突。这类异常是策略的信号源,必须保留。
特征:变化有逻辑支撑、相关标的同步联动、成交量放大符合信息不对称理论。
1.2 交易所技术故障
交易所撮合系统、清算系统的瞬时故障会产生短暂的虚假价格或成交量。这类异常是系统噪声,需要修正或剔除。
特征:单标的孤立的尖刺、持续时间极短(秒级)、后续价格快速回归、成交量与盘口不匹配。
1.3 数据传输丢失
网络抖动、API 超时、数据管道处理延迟导致的丢包。这类异常是传输噪声,需要补充或标记。
特征:时间序列出现断层、相邻数据点时间戳不连续、特定时段系统性缺失。
1.4 人为操纵痕迹
幌骗交易、报价操纵、收盘价操纵产生的虚假深度。这类异常是市场失真,需要特别标记或剔除。
特征:订单簿大单快速撤单、价格在该档位的虚假支撑、成交量集中在特定价格。
二、统计检测体系:三层过滤架构
基于以上问题建模,我们设计三层递进的检测架构:
┌─────────────────────────────────────────────────────┐
│ 第一层:统计过滤 │
│ (Z-Score / MAD / 分位数 - 自动处理) │
│ ↓ 异常标记 │
│ ↓ 置信评估 │
├─────────────────────────────────────────────────────┤
│ 第二层:规则过滤 │
│ (时间窗口 / 上下文关联 / 交易所公告) │
│ ↓ 白名单/黑名单 │
│ ↓ 人工复核队列 │
├─────────────────────────────────────────────────────┤
│ 第三层:人工复核 │
│ (高风险标记 / 策略回溯 / 专家判断) │
│ ↓ 修正确认 │
│ ↓ 规则学习 │
└─────────────────────────────────────────────────────┘
2.1 第一层:统计检测算法
Z-Score 方法
Z-Score 衡量当前值偏离均值的标准差倍数。适用于数据近似正态分布的场景。
$$
Z = \frac{x - \mu}{\sigma}
$$
其中 $\mu$ 为滑动均值,$\sigma$ 为滑动标准差。Z-Score > 3 的数据点通常被视为异常。
import numpy as np
from collections import deque
from dataclasses import dataclass
from enum import Enum
from typing import Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class AnomalyRecord:
"""异常记录数据结构"""
timestamp: str
symbol: str
field: str # 'close', 'volume', 'bid', 'ask'
raw_value: float
z_score: float
detection_method: str
severity: str # 'low', 'medium', 'high'
class SlidingWindowStats:
"""滑动窗口统计量计算器"""
def __init__(self, window_size: int = 100, z_threshold: float = 3.0):
self.window_size = window_size
self.z_threshold = z_threshold
self.values = deque(maxlen=window_size)
self._median = None
def update(self, value: float) -> Optional[AnomalyRecord]:
"""更新统计量并返回异常记录(如果有)"""
self.values.append(value)
if len(self.values) < 10: # 数据不足,跳过检测
return None
# ⚠️ 数据不足时使用 Bessel 校正
arr = np.array(self.values)
mean = np.mean(arr)
std = np.std(arr, ddof=1) # Bessel 校正
if std < 1e-8: # 标准差过小,视为常量
return None
z_score = abs(value - mean) / std
if z_score > self.z_threshold:
severity = 'high' if z_score > 5 else 'medium' if z_score > 4 else 'low'
return AnomalyRecord(
timestamp="",
symbol="",
field="",
raw_value=value,
z_score=z_score,
detection_method="z_score",
severity=severity
)
return None
def get_stats(self) -> dict:
"""获取当前统计量快照"""
if len(self.values) < 10:
return {}
arr = np.array(self.values)
return {
'mean': float(np.mean(arr)),
'std': float(np.std(arr, ddof=1)),
'median': float(np.median(arr)),
'p25': float(np.percentile(arr, 25)),
'p75': float(np.percentile(arr, 75)),
'count': len(self.values)
}
Z-Score 的局限:对极端值敏感。当历史数据中存在旧异常值时,新的真实异动可能因为均值偏移而被低估。解决方案是使用指数加权移动平均(EWMA)赋予近期数据更高权重。
MAD 方法(Median Absolute Deviation)
MAD 是对 Z-Score 的稳健替代,使用中位数而非均值,对极端值不敏感:
$$
MAD = median(|X_i - median(X)|)
$$
修正 Z-Score:
$$
Z_{MAD} = \frac{0.6745(x - median)}{MAD}
$$
当 $Z_{MAD} > 3.5$ 时判定为异常。
class MADDetector:
"""基于 MAD 的稳健异常检测器"""
def __init__(self, window_size: int = 100, mad_threshold: float = 3.5):
self.window_size = window_size
self.mad_threshold = mad_threshold
self.values = deque(maxlen=window_size)
self._k = 1.4826 # 正态分布修正系数
def update(self, value: float) -> Optional[AnomalyRecord]:
self.values.append(value)
if len(self.values) < 10:
return None
arr = np.array(self.values)
median = np.median(arr)
mad = np.median(np.abs(arr - median))
if mad < 1e-8:
return None
# 修正后的 Z-Score
z_mad = 0.6745 * abs(value - median) / mad
if z_mad > self.mad_threshold:
return AnomalyRecord(
timestamp="",
symbol="",
field="",
raw_value=value,
z_score=z_mad,
detection_method="mad",
severity='high' if z_mad > 6 else 'medium'
)
return None
分位数过滤
对于成交量等非负数据,分位数方法更直观:
class QuantileFilter:
"""分位数边界检测器"""
def __init__(
self,
window_size: int = 200,
lower_quantile: float = 0.001, # 下界千分位
upper_quantile: float = 0.999 # 上界千分位
):
self.window_size = window_size
self.lower_quantile = lower_quantile
self.upper_quantile = upper_quantile
self.values = deque(maxlen=window_size)
def update(self, value: float) -> Optional[AnomalyRecord]:
self.values.append(value)
if len(self.values) < 50:
return None
arr = np.array(self.values)
lower_bound = np.percentile(arr, self.lower_quantile * 100)
upper_bound = np.percentile(arr, self.upper_quantile * 100)
if value < lower_bound or value > upper_bound:
direction = "below" if value < lower_bound else "above"
return AnomalyRecord(
timestamp="",
symbol="",
field="",
raw_value=value,
z_score=float(value / lower_bound if direction == "below" else value / upper_bound),
detection_method="quantile",
severity='high'
)
return None
2.2 第二层:上下文关联规则
统计方法只能检测"数值异常",无法判断"上下文异常"。第二层引入领域知识:
| 规则类型 | 检测逻辑 | 置信权重 |
|---|---|---|
| 时间关联 | 盘前/盘后数据异常概率高于盘中 | +30% 置信 |
| 消息关联 | 异常时间点附近有财经新闻 | +40% 置信(可能是真异动) |
| 相关标的对称性 | 标普 500 ETF 与成分股联动异常 | +50% 置信 |
| 订单簿一致性 | 成交量 > 盘口深度 × 比例 | +60% 置信(系统错误) |
@dataclass
class ContextRule:
"""上下文规则基类"""
name: str
confidence_weight: float
evaluate: callable # (anomaly_record, context) -> bool
class TimeWindowRule(ContextRule):
"""盘前盘后规则"""
def __init__(self):
super().__init__(
name="time_window",
confidence_weight=0.3,
evaluate=self._evaluate
)
@staticmethod
def _evaluate(record: AnomalyRecord, context: dict) -> bool:
hour = context.get('hour', 12)
# 美股盘前 4:00-9:30 ET,盘后 16:00-20:00 ET
is_pre_market = 4 <= hour < 9.5
is_after_hours = 16 <= hour < 20
return is_pre_market or is_after_hours
class NewsCorrelationRule(ContextRule):
"""新闻关联规则"""
def __init__(self, news_client):
super().__init__(
name="news_correlation",
confidence_weight=0.4,
evaluate=self._evaluate
)
self.news_client = news_client
def _evaluate(self, record: AnomalyRecord, context: dict) -> bool:
symbol = record.symbol
timestamp = record.timestamp
# 检查异常前后 5 分钟内是否有相关新闻
recent_news = self.news_client.query(
symbol=symbol,
start_time=timestamp - 300,
end_time=timestamp + 300
)
return len(recent_news) > 0
2.3 第三层:人工复核队列
高风险异常自动进入人工复核队列,由专家判断并确认修正规则:
from enum import Enum
from queue import Queue
from threading import Lock
import json
import os
class ReviewPriority(Enum):
"""复核优先级"""
AUTO_CORRECT = 0 # 自动修正
LOW_REVIEW = 1 # 低优先级复核
HIGH_REVIEW = 2 # 高优先级复核
EXPERT_REVIEW = 3 # 专家复核
class ReviewQueue:
"""人工复核队列"""
def __init__(self, persist_path: str = "./review_queue.json"):
self.persist_path = persist_path
self.queue = []
self.lock = Lock()
self._load_from_disk()
def _load_from_disk(self):
"""从磁盘恢复队列状态"""
if os.path.exists(self.persist_path):
try:
with open(self.persist_path, 'r') as f:
self.queue = json.load(f)
logger.info(f"从磁盘恢复复核队列,共 {len(self.queue)} 条记录")
except Exception as e:
logger.warning(f"恢复复核队列失败: {e},使用空队列")
self.queue = []
def _save_to_disk(self):
"""持久化队列状态"""
with self.lock:
try:
with open(self.persist_path, 'w') as f:
json.dump(self.queue, f, indent=2)
except Exception as e:
logger.error(f"保存复核队列失败: {e}")
def enqueue(self, record: AnomalyRecord, priority: ReviewPriority, context: dict):
"""加入复核队列"""
item = {
'record': {
'timestamp': record.timestamp,
'symbol': record.symbol,
'field': record.field,
'raw_value': record.raw_value,
'z_score': record.z_score,
'detection_method': record.detection_method,
'severity': record.severity
},
'context': context,
'priority': priority.name,
'status': 'pending'
}
# 高优先级插入队列头部
if priority in (ReviewPriority.HIGH_REVIEW, ReviewPriority.EXPERT_REVIEW):
self.queue.insert(0, item)
else:
self.queue.append(item)
self._save_to_disk()
logger.info(f"异常记录入队: {record.symbol} {record.field}={record.raw_value}, "
f"优先级={priority.name}, 当前队列长度={len(self.queue)}")
def dequeue(self) -> Optional[dict]:
"""取出下一条待复核记录"""
with self.lock:
for i, item in enumerate(self.queue):
if item['status'] == 'pending':
item['status'] = 'in_review'
self._save_to_disk()
return item
return None
def resolve(self, record_id: str, action: str, correction_value: Optional[float] = None):
"""处理复核结果
Args:
record_id: 记录索引
action: 'correct' | 'discard' | 'whitelist'
correction_value: 修正后的值(如果 action='correct')
"""
with self.lock:
if 0 <= record_id < len(self.queue):
item = self.queue[record_id]
item['status'] = action
item['correction_value'] = correction_value
# 如果是白名单,添加到白名单缓存
if action == 'whitelist':
self._add_to_whitelist(item)
self._save_to_disk()
logger.info(f"复核完成: record_id={record_id}, action={action}")
def _add_to_whitelist(self, item: dict):
"""添加白名单规则"""
# 简化实现:实际应存储到持久化规则库
logger.info(f"规则学习: {item['record']['symbol']} {item['record']['field']} "
f"被加入白名单")
三、生产级异常检测系统
将上述组件整合为完整的异常检测管道:
import requests
import time
import asyncio
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable
import random
@dataclass
class AnomalyConfig:
"""异常检测配置"""
# 统计参数
window_size: int = 100
z_threshold: float = 3.0
mad_threshold: float = 3.5
quantile_lower: float = 0.001
quantile_upper: float = 0.999
# 行为参数
auto_correct_threshold: float = 5.0 # Z-Score > 5 自动修正
batch_size: int = 100
max_retry: int = 3
class AnomalyDetector:
"""生产级异常检测系统"""
def __init__(self, config: AnomalyConfig = None):
self.config = config or AnomalyConfig()
# 初始化各层检测器
self.z_detectors: Dict[str, SlidingWindowStats] = {}
self.mad_detectors: Dict[str, MADDetector] = {}
self.volume_detectors: Dict[str, QuantileFilter] = {}
# 上下文规则
self.context_rules: List[ContextRule] = []
# 复核队列
self.review_queue = ReviewQueue()
# 修正回调
self._correction_callback: Optional[Callable] = None
# 白名单缓存
self._whitelist: set = set()
def register_correction_callback(self, callback: Callable):
"""注册修正回调"""
self._correction_callback = callback
def process_tick(self, tick_data: dict, context: dict = None) -> Optional[dict]:
"""处理单条 tick 数据
Args:
tick_data: 包含 timestamp, symbol, price, volume 等字段
context: 上下文信息
Returns:
如果检测到异常,返回修正后的数据和异常记录
"""
symbol = tick_data['symbol']
timestamp = tick_data.get('timestamp', '')
price = tick_data.get('price')
volume = tick_data.get('volume')
# 检查白名单
key = f"{symbol}:{timestamp}"
if key in self._whitelist:
return tick_data
context = context or {}
context['hour'] = self._parse_hour(timestamp)
results = []
# 第一层:统计检测
if price is not None:
if symbol not in self.z_detectors:
self.z_detectors[symbol] = SlidingWindowStats(
window_size=self.config.window_size,
z_threshold=self.config.z_threshold
)
z_result = self.z_detectors[symbol].update(price)
if z_result:
z_result.timestamp = timestamp
z_result.symbol = symbol
z_result.field = 'price'
results.append(z_result)
if volume is not None:
if symbol not in self.volume_detectors:
self.volume_detectors[symbol] = QuantileFilter(
window_size=self.config.window_size,
lower_quantile=self.config.quantile_lower,
upper_quantile=self.config.quantile_upper
)
vol_result = self.volume_detectors[symbol].update(volume)
if vol_result:
vol_result.timestamp = timestamp
vol_result.symbol = symbol
vol_result.field = 'volume'
results.append(vol_result)
# 第二层:规则过滤
for anomaly_record in results:
for rule in self.context_rules:
if rule.evaluate(anomaly_record, context):
anomaly_record.confidence_weight = rule.confidence_weight
# 决策处理
for anomaly_record in results:
# 高置信度异常进入复核队列
if anomaly_record.z_score > self.config.auto_correct_threshold:
self.review_queue.enqueue(
anomaly_record,
ReviewPriority.AUTO_CORRECT,
context
)
# 自动修正:使用前一有效值
corrected_tick = tick_data.copy()
corrected_tick['price'] = self._get_last_valid_price(symbol)
corrected_tick['is_corrected'] = True
corrected_tick['correction_reason'] = 'auto_zscore'
if self._correction_callback:
self._correction_callback(corrected_tick, anomaly_record)
return corrected_tick
return tick_data
def _parse_hour(self, timestamp: str) -> int:
"""从时间戳解析小时"""
# 简化实现:实际应解析完整时间
try:
return int(timestamp.split('T')[1].split(':')[0])
except:
return 12 # 默认中午
def _get_last_valid_price(self, symbol: str) -> Optional[float]:
"""获取前一有效价格"""
if symbol in self.z_detectors:
stats = self.z_detectors[symbol].get_stats()
if stats:
return stats.get('mean')
return None
四、实时数据集成:TickDB 实战
将异常检测系统与 TickDB 的 WebSocket 实时数据流集成:
import websocket
import json
import threading
import os
class TickDBAnomalyPipeline:
"""TickDB 异常检测数据管道"""
def __init__(
self,
api_key: str,
symbols: List[str],
anomaly_config: AnomalyConfig = None,
ws_url: str = "wss://api.tickdb.ai/v1/market/ws"
):
self.api_key = api_key
self.symbols = symbols
self.ws_url = f"{ws_url}?api_key={api_key}"
self.anomaly_detector = AnomalyDetector(anomaly_config)
self.ws = None
self._running = False
self._reconnect_delay = 1
self._max_reconnect_delay = 60
self._reconnect_jitter = 0.1
# 注册修正回调
self.anomaly_detector.register_correction_callback(
self._on_correction
)
def _on_correction(self, corrected_data: dict, anomaly_record):
"""修正回调:记录修正日志"""
print(f"[修正] {anomaly_record.symbol} {anomaly_record.field}: "
f"{anomaly_record.raw_value:.4f} → {corrected_data.get('price')}")
def start(self):
"""启动数据管道"""
self._running = True
self._connect_and_subscribe()
def stop(self):
"""停止数据管道"""
self._running = False
if self.ws:
self.ws.close()
def _connect_and_subscribe(self):
"""连接并订阅"""
retry_count = 0
while self._running:
try:
headers = {
"X-API-Key": self.api_key,
"Content-Type": "application/json"
}
self.ws = websocket.WebSocketApp(
self.ws_url,
header=headers,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
on_open=self._on_open
)
# ⚠️ 非阻塞运行,超时自动重连
self.ws.run_forever(ping_interval=30, ping_timeout=10)
except Exception as e:
retry_count += 1
delay = min(
self._reconnect_delay * (2 ** retry_count),
self._max_reconnect_delay
)
# 添加抖动避免惊群
delay *= (1 + random.uniform(-self._reconnect_jitter, self._reconnect_jitter))
print(f"[重连] 第 {retry_count} 次重连,{delay:.1f} 秒后重试: {e}")
time.sleep(delay)
def _on_open(self, ws):
"""WebSocket 连接建立"""
print("[连接] TickDB WebSocket 已连接")
# 发送订阅消息
subscribe_msg = {
"cmd": "subscribe",
"args": {
"symbols": self.symbols,
"channels": ["ticker", "depth"]
}
}
ws.send(json.dumps(subscribe_msg))
print(f"[订阅] 已订阅标的: {', '.join(self.symbols)}")
# 重置重连计数
self._reconnect_delay = 1
def _on_message(self, ws, message):
"""处理接收到的消息"""
try:
data = json.loads(message)
# 解析错误码
code = data.get("code", 0)
if code != 0:
self._handle_error(code, data, ws)
return
# 提取数据
payload = data.get("data", {})
symbol = payload.get("s")
tick_type = payload.get("type")
# 构建 tick 数据
tick_data = {
"symbol": symbol,
"timestamp": payload.get("ts"),
"price": payload.get("last"),
"volume": payload.get("vol")
}
# ⚠️ 上下文信息(实际从消息中提取)
context = {"hour": 10}
# 异常检测
result = self.anomaly_detector.process_tick(tick_data, context)
# 处理修正后的数据
if result and result.get('is_corrected'):
# 输出修正记录供下游系统使用
self._output_corrected_data(result)
except json.JSONDecodeError as e:
print(f"[错误] 消息解析失败: {e}")
except Exception as e:
print(f"[错误] 消息处理异常: {e}")
def _handle_error(self, code: int, data: dict, ws):
"""处理 TickDB 错误响应"""
if code == 3001:
# 限频错误
retry_after = int(ws.sock.opt.get("headers", {}).get(
"Retry-After", data.get("retry_after", 5)
))
print(f"[限频] 请求超限,{retry_after} 秒后重试")
time.sleep(retry_after)
elif code in (1001, 1002):
raise ValueError(f"API Key 无效: {data.get('message')}")
else:
print(f"[错误] TickDB 返回错误 {code}: {data.get('message')}")
def _on_error(self, ws, error):
"""WebSocket 错误处理"""
print(f"[错误] WebSocket 错误: {error}")
def _on_close(self, ws, close_status_code, close_msg):
"""WebSocket 关闭处理"""
print(f"[连接] WebSocket 已关闭: {close_status_code} - {close_msg}")
def _output_corrected_data(self, data: dict):
"""输出修正后的数据"""
# 简化实现:打印到标准输出
# 生产环境应发送到消息队列或写入数据库
print(f"[输出] 修正数据: {data['symbol']} @ {data['timestamp']}")
# 使用示例
if __name__ == "__main__":
# 从环境变量读取 API Key
api_key = os.environ.get("TICKDB_API_KEY")
if not api_key:
print("请设置 TICKDB_API_KEY 环境变量")
exit(1)
# 配置异常检测参数
config = AnomalyConfig(
window_size=100,
z_threshold=3.0,
auto_correct_threshold=5.0
)
# 初始化管道
pipeline = TickDBAnomalyPipeline(
api_key=api_key,
symbols=["AAPL.US", "TSLA.US", "NVDA.US"],
anomaly_config=config
)
# 启动
print("启动 TickDB 异常检测管道...")
try:
pipeline.start()
except KeyboardInterrupt:
print("\n停止管道...")
pipeline.stop()
五、进阶:自适应阈值与规则学习
静态阈值无法适应市场结构变化。引入自适应机制:
5.1 市场状态识别
不同市场状态下,价格波动的"正常范围"差异显著:
class MarketRegimeDetector:
"""市场状态检测器"""
REGIMES = {
'normal': {'volatility_mult': 1.0, 'volume_mult': 1.0},
'earnings': {'volatility_mult': 2.5, 'volume_mult': 3.0},
'high_volatility': {'volatility_mult': 3.0, 'volume_mult': 2.0},
'crisis': {'volatility_mult': 5.0, 'volume_mult': 5.0}
}
def __init__(self, symbol: str):
self.symbol = symbol
self.current_regime = 'normal'
self._regime_change_callbacks = []
def detect(self, vix_level: float = None, recent_gap: float = None) -> str:
"""检测当前市场状态"""
# 简化实现:基于 VIX 和历史波动率
if vix_level is not None:
if vix_level > 40:
return 'crisis'
elif vix_level > 30:
return 'high_volatility'
if recent_gap is not None and recent_gap > 0.05:
return 'earnings'
return 'normal'
def get_adjusted_threshold(self, base_threshold: float, metric: str = 'price') -> float:
"""获取调整后的阈值"""
regime = self.REGIMES[self.current_regime]
if metric == 'price':
return base_threshold * regime['volatility_mult']
elif metric == 'volume':
return base_threshold * regime['volume_mult']
return base_threshold
5.2 规则自动学习
基于人工复核结果,生成新的检测规则:
class RuleLearner:
"""从复核结果中学习新规则"""
def __init__(self, detector: AnomalyDetector):
self.detector = detector
self._learned_patterns = []
def learn_from_review(self, review_result: dict):
"""从复核结果学习
Args:
review_result: {
'symbol': 'AAPL.US',
'pattern': 'earnings_gap_up',
'action': 'whitelist',
'context': {...}
}
"""
pattern = review_result.get('pattern')
action = review_result.get('action')
if action == 'whitelist' and pattern:
# 添加到白名单
key = f"{review_result['symbol']}:{pattern}"
self.detector._whitelist.add(key)
# 生成新规则(示例:特定事件下的价格区间)
if pattern.startswith('earnings_'):
self._learned_patterns.append({
'type': 'earnings_whitelist',
'symbol': review_result['symbol'],
'confidence': 1
})
print(f"[学习] 新规则已添加: {key}")
六、完整回测验证
将异常检测集成到回测框架,验证修正效果:
class BacktestWithAnomalyDetection:
"""带异常检测的回测框架"""
def __init__(self, detector: AnomalyDetector):
self.detector = detector
self.stats = {
'total_ticks': 0,
'anomalies_detected': 0,
'auto_corrected': 0,
'manual_reviewed': 0
}
def run(
self,
historical_data: List[dict],
strategy_func: callable
) -> dict:
"""运行回测
Args:
historical_data: 历史 tick 数据
strategy_func: 策略函数 (data -> signal)
"""
corrected_data = []
signals = []
for tick in historical_data:
self.stats['total_ticks'] += 1
# 异常检测
result = self.detector.process_tick(tick)
if result is not tick:
self.stats['anomalies_detected'] += 1
if result.get('is_corrected'):
self.stats['auto_corrected'] += 1
corrected_data.append(result)
# 生成信号
signal = strategy_func(result)
signals.append(signal)
# 计算策略绩效
performance = self._calculate_performance(signals, corrected_data)
return {
'performance': performance,
'stats': self.stats,
'correction_log': self._get_correction_log()
}
def _calculate_performance(self, signals: list, data: list) -> dict:
"""计算绩效指标"""
# 简化实现
return {
'total_return': 0.0,
'sharpe_ratio': 0.0,
'max_drawdown': 0.0
}
def _get_correction_log(self) -> List[dict]:
"""获取修正日志"""
# 从复核队列提取已处理的记录
return []
# 回测示例
def momentum_strategy(data: dict) -> str:
"""简单动量策略示例"""
# 实际策略逻辑
return 'hold'
七、部署方案与配置建议
根据使用场景,推荐以下配置:
| 场景 | 窗口大小 | Z-Score 阈值 | 自动修正阈值 | 复核队列 |
|---|---|---|---|---|
| 个人量化 | 50-100 | 3.0 | 5.0 | 仅高优先级 |
| 团队/小型机构 | 100-200 | 3.5 | 4.5 | 全部记录 |
| 机构级部署 | 200-500 | 3.0 | 4.0 | 全量 + 规则学习 |
7.1 数据源配置
对于 TickDB 的不同数据类型,建议采用差异化检测策略:
| 数据类型 | 推荐检测方法 | 特殊规则 |
|---|---|---|
ticker(价格) |
Z-Score + MAD 双保险 | 盘前盘后降低阈值 |
depth(订单簿) |
分位数 + 深度一致性 | 大单撤单检测 |
trades(逐笔) |
成交量分位数 | 时间间隔异常检测 |
kline(K 线) |
收盘价 Z-Score + 跳空检测 | 缺口超过 10% 强制复核 |
八、总结
异常值检测的核心挑战不是技术实现,而是判断力——区分真实市场信号与系统噪声的判断力。
本文提供的框架将这种判断力系统化:
- 统计层提供客观的数值判断,消除主观偏差
- 规则层注入领域知识,捕捉上下文信息
- 人工层处理边缘案例,持续优化自动化边界
对于 TickDB 用户,建议从以下步骤开始:
- 先用轻量配置跑通数据管道,观察 2-3 周的数据分布
- 建立白名单,将已确认的真实异动模式加入白名单,减少误报
- 启用规则学习,从人工复核中提取新规则,逐渐扩大自动化覆盖
- 定期复盘,每季度评估检测效果,调整阈值
下一步行动
如果你希望亲手实现本文框架:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台生成 API Key
- 设置环境变量
TICKDB_API_KEY,复制本文代码即可运行 - 从单标的、单策略开始,逐步扩展到全市场覆盖
如果你需要企业级数据治理方案,包括全量历史数据清洗、定制化异常规则库、机构级 SLA 保障,联系 [email protected]。
如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,用自然语言描述你的异常检测需求,让 AI 生成适配的检测规则。
风险提示:本文不构成任何投资建议。异常值检测算法本身不产生交易信号,仅作为数据质量保障工具。任何策略在实盘部署前,请进行充分的回测验证和纸上交易测试。市场有风险,投资需谨慎。