异常值检测与修正:价格跳空、成交量突增的自动化处理

"The market is always telling you something. The question is whether you're hearing the signal or the noise."

2010 年 5 月 6 日,道琼斯工业平均指数在几分钟内暴跌近 1000 点,随后迅速反弹。这次"闪电崩盘"让无数量化系统陷入混乱——大量止损单被触发,算法不计成本地抛售,而某些数据源记录的"错误"价格至今仍是金融史研究的对象。

十年后的今天,同样的场景在不同市场反复上演:MEME 股的狂热导致 Robinhood 数据异常、数字货币的流动性危机制造虚假价差、交易所系统升级期间的短暂数据真空。对于量化交易者,核心问题从来不是"数据有没有异常",而是"这个异常是真实市场的声音,还是系统制造的杂音"

本文构建一套生产级异常值检测与修正框架,涵盖统计检测、规则过滤、人工复核的三级体系,并提供可直接集成到 TickDB 数据管道中的 Python 实现。


一、问题建模:异常值的四种来源

在设计检测系统之前,必须理解"坏数据"从何而来。金融数据的异常值通常来自四个层面:

1.1 市场真实异动

价格或成交量的剧烈变化由真实的经济事件驱动:财报发布、央行决议、地缘冲突。这类异常是策略的信号源,必须保留

特征:变化有逻辑支撑、相关标的同步联动、成交量放大符合信息不对称理论。

1.2 交易所技术故障

交易所撮合系统、清算系统的瞬时故障会产生短暂的虚假价格或成交量。这类异常是系统噪声,需要修正或剔除

特征:单标的孤立的尖刺、持续时间极短(秒级)、后续价格快速回归、成交量与盘口不匹配。

1.3 数据传输丢失

网络抖动、API 超时、数据管道处理延迟导致的丢包。这类异常是传输噪声,需要补充或标记

特征:时间序列出现断层、相邻数据点时间戳不连续、特定时段系统性缺失。

1.4 人为操纵痕迹

幌骗交易、报价操纵、收盘价操纵产生的虚假深度。这类异常是市场失真,需要特别标记或剔除

特征:订单簿大单快速撤单、价格在该档位的虚假支撑、成交量集中在特定价格。


二、统计检测体系:三层过滤架构

基于以上问题建模,我们设计三层递进的检测架构:

┌─────────────────────────────────────────────────────┐
│                   第一层:统计过滤                   │
│         (Z-Score / MAD / 分位数 - 自动处理)          │
│                   ↓ 异常标记                         │
│                   ↓ 置信评估                         │
├─────────────────────────────────────────────────────┤
│                   第二层:规则过滤                   │
│         (时间窗口 / 上下文关联 / 交易所公告)           │
│                   ↓ 白名单/黑名单                     │
│                   ↓ 人工复核队列                      │
├─────────────────────────────────────────────────────┤
│                   第三层:人工复核                    │
│         (高风险标记 / 策略回溯 / 专家判断)             │
│                   ↓ 修正确认                         │
│                   ↓ 规则学习                         │
└─────────────────────────────────────────────────────┘

2.1 第一层:统计检测算法

Z-Score 方法

Z-Score 衡量当前值偏离均值的标准差倍数。适用于数据近似正态分布的场景。

$$
Z = \frac{x - \mu}{\sigma}
$$

其中 $\mu$ 为滑动均值,$\sigma$ 为滑动标准差。Z-Score > 3 的数据点通常被视为异常

import numpy as np
from collections import deque
from dataclasses import dataclass
from enum import Enum
from typing import Optional
import logging

logger = logging.getLogger(__name__)


@dataclass
class AnomalyRecord:
    """异常记录数据结构"""
    timestamp: str
    symbol: str
    field: str  # 'close', 'volume', 'bid', 'ask'
    raw_value: float
    z_score: float
    detection_method: str
    severity: str  # 'low', 'medium', 'high'


class SlidingWindowStats:
    """滑动窗口统计量计算器"""
    
    def __init__(self, window_size: int = 100, z_threshold: float = 3.0):
        self.window_size = window_size
        self.z_threshold = z_threshold
        self.values = deque(maxlen=window_size)
        self._median = None
    
    def update(self, value: float) -> Optional[AnomalyRecord]:
        """更新统计量并返回异常记录(如果有)"""
        self.values.append(value)
        
        if len(self.values) < 10:  # 数据不足,跳过检测
            return None
        
        # ⚠️ 数据不足时使用 Bessel 校正
        arr = np.array(self.values)
        mean = np.mean(arr)
        std = np.std(arr, ddof=1)  # Bessel 校正
        
        if std < 1e-8:  # 标准差过小,视为常量
            return None
        
        z_score = abs(value - mean) / std
        
        if z_score > self.z_threshold:
            severity = 'high' if z_score > 5 else 'medium' if z_score > 4 else 'low'
            return AnomalyRecord(
                timestamp="",
                symbol="",
                field="",
                raw_value=value,
                z_score=z_score,
                detection_method="z_score",
                severity=severity
            )
        
        return None
    
    def get_stats(self) -> dict:
        """获取当前统计量快照"""
        if len(self.values) < 10:
            return {}
        
        arr = np.array(self.values)
        return {
            'mean': float(np.mean(arr)),
            'std': float(np.std(arr, ddof=1)),
            'median': float(np.median(arr)),
            'p25': float(np.percentile(arr, 25)),
            'p75': float(np.percentile(arr, 75)),
            'count': len(self.values)
        }

Z-Score 的局限:对极端值敏感。当历史数据中存在旧异常值时,新的真实异动可能因为均值偏移而被低估。解决方案是使用指数加权移动平均(EWMA)赋予近期数据更高权重。

MAD 方法(Median Absolute Deviation)

MAD 是对 Z-Score 的稳健替代,使用中位数而非均值,对极端值不敏感:

$$
MAD = median(|X_i - median(X)|)
$$

修正 Z-Score:

$$
Z_{MAD} = \frac{0.6745(x - median)}{MAD}
$$

当 $Z_{MAD} > 3.5$ 时判定为异常。

class MADDetector:
    """基于 MAD 的稳健异常检测器"""
    
    def __init__(self, window_size: int = 100, mad_threshold: float = 3.5):
        self.window_size = window_size
        self.mad_threshold = mad_threshold
        self.values = deque(maxlen=window_size)
        self._k = 1.4826  # 正态分布修正系数
    
    def update(self, value: float) -> Optional[AnomalyRecord]:
        self.values.append(value)
        
        if len(self.values) < 10:
            return None
        
        arr = np.array(self.values)
        median = np.median(arr)
        mad = np.median(np.abs(arr - median))
        
        if mad < 1e-8:
            return None
        
        # 修正后的 Z-Score
        z_mad = 0.6745 * abs(value - median) / mad
        
        if z_mad > self.mad_threshold:
            return AnomalyRecord(
                timestamp="",
                symbol="",
                field="",
                raw_value=value,
                z_score=z_mad,
                detection_method="mad",
                severity='high' if z_mad > 6 else 'medium'
            )
        
        return None

分位数过滤

对于成交量等非负数据,分位数方法更直观:

class QuantileFilter:
    """分位数边界检测器"""
    
    def __init__(
        self, 
        window_size: int = 200,
        lower_quantile: float = 0.001,  # 下界千分位
        upper_quantile: float = 0.999   # 上界千分位
    ):
        self.window_size = window_size
        self.lower_quantile = lower_quantile
        self.upper_quantile = upper_quantile
        self.values = deque(maxlen=window_size)
    
    def update(self, value: float) -> Optional[AnomalyRecord]:
        self.values.append(value)
        
        if len(self.values) < 50:
            return None
        
        arr = np.array(self.values)
        lower_bound = np.percentile(arr, self.lower_quantile * 100)
        upper_bound = np.percentile(arr, self.upper_quantile * 100)
        
        if value < lower_bound or value > upper_bound:
            direction = "below" if value < lower_bound else "above"
            return AnomalyRecord(
                timestamp="",
                symbol="",
                field="",
                raw_value=value,
                z_score=float(value / lower_bound if direction == "below" else value / upper_bound),
                detection_method="quantile",
                severity='high'
            )
        
        return None

2.2 第二层:上下文关联规则

统计方法只能检测"数值异常",无法判断"上下文异常"。第二层引入领域知识:

规则类型 检测逻辑 置信权重
时间关联 盘前/盘后数据异常概率高于盘中 +30% 置信
消息关联 异常时间点附近有财经新闻 +40% 置信(可能是真异动)
相关标的对称性 标普 500 ETF 与成分股联动异常 +50% 置信
订单簿一致性 成交量 > 盘口深度 × 比例 +60% 置信(系统错误)
@dataclass
class ContextRule:
    """上下文规则基类"""
    name: str
    confidence_weight: float
    evaluate: callable  # (anomaly_record, context) -> bool


class TimeWindowRule(ContextRule):
    """盘前盘后规则"""
    
    def __init__(self):
        super().__init__(
            name="time_window",
            confidence_weight=0.3,
            evaluate=self._evaluate
        )
    
    @staticmethod
    def _evaluate(record: AnomalyRecord, context: dict) -> bool:
        hour = context.get('hour', 12)
        # 美股盘前 4:00-9:30 ET,盘后 16:00-20:00 ET
        is_pre_market = 4 <= hour < 9.5
        is_after_hours = 16 <= hour < 20
        return is_pre_market or is_after_hours


class NewsCorrelationRule(ContextRule):
    """新闻关联规则"""
    
    def __init__(self, news_client):
        super().__init__(
            name="news_correlation",
            confidence_weight=0.4,
            evaluate=self._evaluate
        )
        self.news_client = news_client
    
    def _evaluate(self, record: AnomalyRecord, context: dict) -> bool:
        symbol = record.symbol
        timestamp = record.timestamp
        # 检查异常前后 5 分钟内是否有相关新闻
        recent_news = self.news_client.query(
            symbol=symbol,
            start_time=timestamp - 300,
            end_time=timestamp + 300
        )
        return len(recent_news) > 0

2.3 第三层:人工复核队列

高风险异常自动进入人工复核队列,由专家判断并确认修正规则:

from enum import Enum
from queue import Queue
from threading import Lock
import json
import os


class ReviewPriority(Enum):
    """复核优先级"""
    AUTO_CORRECT = 0   # 自动修正
    LOW_REVIEW = 1     # 低优先级复核
    HIGH_REVIEW = 2    # 高优先级复核
    EXPERT_REVIEW = 3  # 专家复核


class ReviewQueue:
    """人工复核队列"""
    
    def __init__(self, persist_path: str = "./review_queue.json"):
        self.persist_path = persist_path
        self.queue = []
        self.lock = Lock()
        self._load_from_disk()
    
    def _load_from_disk(self):
        """从磁盘恢复队列状态"""
        if os.path.exists(self.persist_path):
            try:
                with open(self.persist_path, 'r') as f:
                    self.queue = json.load(f)
                logger.info(f"从磁盘恢复复核队列,共 {len(self.queue)} 条记录")
            except Exception as e:
                logger.warning(f"恢复复核队列失败: {e},使用空队列")
                self.queue = []
    
    def _save_to_disk(self):
        """持久化队列状态"""
        with self.lock:
            try:
                with open(self.persist_path, 'w') as f:
                    json.dump(self.queue, f, indent=2)
            except Exception as e:
                logger.error(f"保存复核队列失败: {e}")
    
    def enqueue(self, record: AnomalyRecord, priority: ReviewPriority, context: dict):
        """加入复核队列"""
        item = {
            'record': {
                'timestamp': record.timestamp,
                'symbol': record.symbol,
                'field': record.field,
                'raw_value': record.raw_value,
                'z_score': record.z_score,
                'detection_method': record.detection_method,
                'severity': record.severity
            },
            'context': context,
            'priority': priority.name,
            'status': 'pending'
        }
        
        # 高优先级插入队列头部
        if priority in (ReviewPriority.HIGH_REVIEW, ReviewPriority.EXPERT_REVIEW):
            self.queue.insert(0, item)
        else:
            self.queue.append(item)
        
        self._save_to_disk()
        logger.info(f"异常记录入队: {record.symbol} {record.field}={record.raw_value}, "
                   f"优先级={priority.name}, 当前队列长度={len(self.queue)}")
    
    def dequeue(self) -> Optional[dict]:
        """取出下一条待复核记录"""
        with self.lock:
            for i, item in enumerate(self.queue):
                if item['status'] == 'pending':
                    item['status'] = 'in_review'
                    self._save_to_disk()
                    return item
        return None
    
    def resolve(self, record_id: str, action: str, correction_value: Optional[float] = None):
        """处理复核结果
        
        Args:
            record_id: 记录索引
            action: 'correct' | 'discard' | 'whitelist'
            correction_value: 修正后的值(如果 action='correct')
        """
        with self.lock:
            if 0 <= record_id < len(self.queue):
                item = self.queue[record_id]
                item['status'] = action
                item['correction_value'] = correction_value
                
                # 如果是白名单,添加到白名单缓存
                if action == 'whitelist':
                    self._add_to_whitelist(item)
                
                self._save_to_disk()
                logger.info(f"复核完成: record_id={record_id}, action={action}")
    
    def _add_to_whitelist(self, item: dict):
        """添加白名单规则"""
        # 简化实现:实际应存储到持久化规则库
        logger.info(f"规则学习: {item['record']['symbol']} {item['record']['field']} "
                   f"被加入白名单")

三、生产级异常检测系统

将上述组件整合为完整的异常检测管道:

import requests
import time
import asyncio
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable
import random


@dataclass
class AnomalyConfig:
    """异常检测配置"""
    # 统计参数
    window_size: int = 100
    z_threshold: float = 3.0
    mad_threshold: float = 3.5
    quantile_lower: float = 0.001
    quantile_upper: float = 0.999
    
    # 行为参数
    auto_correct_threshold: float = 5.0  # Z-Score > 5 自动修正
    batch_size: int = 100
    max_retry: int = 3


class AnomalyDetector:
    """生产级异常检测系统"""
    
    def __init__(self, config: AnomalyConfig = None):
        self.config = config or AnomalyConfig()
        
        # 初始化各层检测器
        self.z_detectors: Dict[str, SlidingWindowStats] = {}
        self.mad_detectors: Dict[str, MADDetector] = {}
        self.volume_detectors: Dict[str, QuantileFilter] = {}
        
        # 上下文规则
        self.context_rules: List[ContextRule] = []
        
        # 复核队列
        self.review_queue = ReviewQueue()
        
        # 修正回调
        self._correction_callback: Optional[Callable] = None
        
        # 白名单缓存
        self._whitelist: set = set()
    
    def register_correction_callback(self, callback: Callable):
        """注册修正回调"""
        self._correction_callback = callback
    
    def process_tick(self, tick_data: dict, context: dict = None) -> Optional[dict]:
        """处理单条 tick 数据
        
        Args:
            tick_data: 包含 timestamp, symbol, price, volume 等字段
            context: 上下文信息
            
        Returns:
            如果检测到异常,返回修正后的数据和异常记录
        """
        symbol = tick_data['symbol']
        timestamp = tick_data.get('timestamp', '')
        price = tick_data.get('price')
        volume = tick_data.get('volume')
        
        # 检查白名单
        key = f"{symbol}:{timestamp}"
        if key in self._whitelist:
            return tick_data
        
        context = context or {}
        context['hour'] = self._parse_hour(timestamp)
        
        results = []
        
        # 第一层:统计检测
        if price is not None:
            if symbol not in self.z_detectors:
                self.z_detectors[symbol] = SlidingWindowStats(
                    window_size=self.config.window_size,
                    z_threshold=self.config.z_threshold
                )
            
            z_result = self.z_detectors[symbol].update(price)
            if z_result:
                z_result.timestamp = timestamp
                z_result.symbol = symbol
                z_result.field = 'price'
                results.append(z_result)
        
        if volume is not None:
            if symbol not in self.volume_detectors:
                self.volume_detectors[symbol] = QuantileFilter(
                    window_size=self.config.window_size,
                    lower_quantile=self.config.quantile_lower,
                    upper_quantile=self.config.quantile_upper
                )
            
            vol_result = self.volume_detectors[symbol].update(volume)
            if vol_result:
                vol_result.timestamp = timestamp
                vol_result.symbol = symbol
                vol_result.field = 'volume'
                results.append(vol_result)
        
        # 第二层:规则过滤
        for anomaly_record in results:
            for rule in self.context_rules:
                if rule.evaluate(anomaly_record, context):
                    anomaly_record.confidence_weight = rule.confidence_weight
        
        # 决策处理
        for anomaly_record in results:
            # 高置信度异常进入复核队列
            if anomaly_record.z_score > self.config.auto_correct_threshold:
                self.review_queue.enqueue(
                    anomaly_record, 
                    ReviewPriority.AUTO_CORRECT,
                    context
                )
                # 自动修正:使用前一有效值
                corrected_tick = tick_data.copy()
                corrected_tick['price'] = self._get_last_valid_price(symbol)
                corrected_tick['is_corrected'] = True
                corrected_tick['correction_reason'] = 'auto_zscore'
                
                if self._correction_callback:
                    self._correction_callback(corrected_tick, anomaly_record)
                
                return corrected_tick
        
        return tick_data
    
    def _parse_hour(self, timestamp: str) -> int:
        """从时间戳解析小时"""
        # 简化实现:实际应解析完整时间
        try:
            return int(timestamp.split('T')[1].split(':')[0])
        except:
            return 12  # 默认中午
    
    def _get_last_valid_price(self, symbol: str) -> Optional[float]:
        """获取前一有效价格"""
        if symbol in self.z_detectors:
            stats = self.z_detectors[symbol].get_stats()
            if stats:
                return stats.get('mean')
        return None

四、实时数据集成:TickDB 实战

将异常检测系统与 TickDB 的 WebSocket 实时数据流集成:

import websocket
import json
import threading
import os


class TickDBAnomalyPipeline:
    """TickDB 异常检测数据管道"""
    
    def __init__(
        self,
        api_key: str,
        symbols: List[str],
        anomaly_config: AnomalyConfig = None,
        ws_url: str = "wss://api.tickdb.ai/v1/market/ws"
    ):
        self.api_key = api_key
        self.symbols = symbols
        self.ws_url = f"{ws_url}?api_key={api_key}"
        self.anomaly_detector = AnomalyDetector(anomaly_config)
        self.ws = None
        self._running = False
        self._reconnect_delay = 1
        self._max_reconnect_delay = 60
        self._reconnect_jitter = 0.1
        
        # 注册修正回调
        self.anomaly_detector.register_correction_callback(
            self._on_correction
        )
    
    def _on_correction(self, corrected_data: dict, anomaly_record):
        """修正回调:记录修正日志"""
        print(f"[修正] {anomaly_record.symbol} {anomaly_record.field}: "
              f"{anomaly_record.raw_value:.4f} → {corrected_data.get('price')}")
    
    def start(self):
        """启动数据管道"""
        self._running = True
        self._connect_and_subscribe()
    
    def stop(self):
        """停止数据管道"""
        self._running = False
        if self.ws:
            self.ws.close()
    
    def _connect_and_subscribe(self):
        """连接并订阅"""
        retry_count = 0
        
        while self._running:
            try:
                headers = {
                    "X-API-Key": self.api_key,
                    "Content-Type": "application/json"
                }
                
                self.ws = websocket.WebSocketApp(
                    self.ws_url,
                    header=headers,
                    on_message=self._on_message,
                    on_error=self._on_error,
                    on_close=self._on_close,
                    on_open=self._on_open
                )
                
                # ⚠️ 非阻塞运行,超时自动重连
                self.ws.run_forever(ping_interval=30, ping_timeout=10)
                
            except Exception as e:
                retry_count += 1
                delay = min(
                    self._reconnect_delay * (2 ** retry_count),
                    self._max_reconnect_delay
                )
                # 添加抖动避免惊群
                delay *= (1 + random.uniform(-self._reconnect_jitter, self._reconnect_jitter))
                
                print(f"[重连] 第 {retry_count} 次重连,{delay:.1f} 秒后重试: {e}")
                time.sleep(delay)
    
    def _on_open(self, ws):
        """WebSocket 连接建立"""
        print("[连接] TickDB WebSocket 已连接")
        
        # 发送订阅消息
        subscribe_msg = {
            "cmd": "subscribe",
            "args": {
                "symbols": self.symbols,
                "channels": ["ticker", "depth"]
            }
        }
        ws.send(json.dumps(subscribe_msg))
        print(f"[订阅] 已订阅标的: {', '.join(self.symbols)}")
        
        # 重置重连计数
        self._reconnect_delay = 1
    
    def _on_message(self, ws, message):
        """处理接收到的消息"""
        try:
            data = json.loads(message)
            
            # 解析错误码
            code = data.get("code", 0)
            if code != 0:
                self._handle_error(code, data, ws)
                return
            
            # 提取数据
            payload = data.get("data", {})
            symbol = payload.get("s")
            tick_type = payload.get("type")
            
            # 构建 tick 数据
            tick_data = {
                "symbol": symbol,
                "timestamp": payload.get("ts"),
                "price": payload.get("last"),
                "volume": payload.get("vol")
            }
            
            # ⚠️ 上下文信息(实际从消息中提取)
            context = {"hour": 10}
            
            # 异常检测
            result = self.anomaly_detector.process_tick(tick_data, context)
            
            # 处理修正后的数据
            if result and result.get('is_corrected'):
                # 输出修正记录供下游系统使用
                self._output_corrected_data(result)
            
        except json.JSONDecodeError as e:
            print(f"[错误] 消息解析失败: {e}")
        except Exception as e:
            print(f"[错误] 消息处理异常: {e}")
    
    def _handle_error(self, code: int, data: dict, ws):
        """处理 TickDB 错误响应"""
        if code == 3001:
            # 限频错误
            retry_after = int(ws.sock.opt.get("headers", {}).get(
                "Retry-After", data.get("retry_after", 5)
            ))
            print(f"[限频] 请求超限,{retry_after} 秒后重试")
            time.sleep(retry_after)
        elif code in (1001, 1002):
            raise ValueError(f"API Key 无效: {data.get('message')}")
        else:
            print(f"[错误] TickDB 返回错误 {code}: {data.get('message')}")
    
    def _on_error(self, ws, error):
        """WebSocket 错误处理"""
        print(f"[错误] WebSocket 错误: {error}")
    
    def _on_close(self, ws, close_status_code, close_msg):
        """WebSocket 关闭处理"""
        print(f"[连接] WebSocket 已关闭: {close_status_code} - {close_msg}")
    
    def _output_corrected_data(self, data: dict):
        """输出修正后的数据"""
        # 简化实现:打印到标准输出
        # 生产环境应发送到消息队列或写入数据库
        print(f"[输出] 修正数据: {data['symbol']} @ {data['timestamp']}")


# 使用示例
if __name__ == "__main__":
    # 从环境变量读取 API Key
    api_key = os.environ.get("TICKDB_API_KEY")
    
    if not api_key:
        print("请设置 TICKDB_API_KEY 环境变量")
        exit(1)
    
    # 配置异常检测参数
    config = AnomalyConfig(
        window_size=100,
        z_threshold=3.0,
        auto_correct_threshold=5.0
    )
    
    # 初始化管道
    pipeline = TickDBAnomalyPipeline(
        api_key=api_key,
        symbols=["AAPL.US", "TSLA.US", "NVDA.US"],
        anomaly_config=config
    )
    
    # 启动
    print("启动 TickDB 异常检测管道...")
    try:
        pipeline.start()
    except KeyboardInterrupt:
        print("\n停止管道...")
        pipeline.stop()

五、进阶:自适应阈值与规则学习

静态阈值无法适应市场结构变化。引入自适应机制:

5.1 市场状态识别

不同市场状态下,价格波动的"正常范围"差异显著:

class MarketRegimeDetector:
    """市场状态检测器"""
    
    REGIMES = {
        'normal': {'volatility_mult': 1.0, 'volume_mult': 1.0},
        'earnings': {'volatility_mult': 2.5, 'volume_mult': 3.0},
        'high_volatility': {'volatility_mult': 3.0, 'volume_mult': 2.0},
        'crisis': {'volatility_mult': 5.0, 'volume_mult': 5.0}
    }
    
    def __init__(self, symbol: str):
        self.symbol = symbol
        self.current_regime = 'normal'
        self._regime_change_callbacks = []
    
    def detect(self, vix_level: float = None, recent_gap: float = None) -> str:
        """检测当前市场状态"""
        # 简化实现:基于 VIX 和历史波动率
        
        if vix_level is not None:
            if vix_level > 40:
                return 'crisis'
            elif vix_level > 30:
                return 'high_volatility'
        
        if recent_gap is not None and recent_gap > 0.05:
            return 'earnings'
        
        return 'normal'
    
    def get_adjusted_threshold(self, base_threshold: float, metric: str = 'price') -> float:
        """获取调整后的阈值"""
        regime = self.REGIMES[self.current_regime]
        
        if metric == 'price':
            return base_threshold * regime['volatility_mult']
        elif metric == 'volume':
            return base_threshold * regime['volume_mult']
        
        return base_threshold

5.2 规则自动学习

基于人工复核结果,生成新的检测规则:

class RuleLearner:
    """从复核结果中学习新规则"""
    
    def __init__(self, detector: AnomalyDetector):
        self.detector = detector
        self._learned_patterns = []
    
    def learn_from_review(self, review_result: dict):
        """从复核结果学习
        
        Args:
            review_result: {
                'symbol': 'AAPL.US',
                'pattern': 'earnings_gap_up',
                'action': 'whitelist',
                'context': {...}
            }
        """
        pattern = review_result.get('pattern')
        action = review_result.get('action')
        
        if action == 'whitelist' and pattern:
            # 添加到白名单
            key = f"{review_result['symbol']}:{pattern}"
            self.detector._whitelist.add(key)
            
            # 生成新规则(示例:特定事件下的价格区间)
            if pattern.startswith('earnings_'):
                self._learned_patterns.append({
                    'type': 'earnings_whitelist',
                    'symbol': review_result['symbol'],
                    'confidence': 1
                })
            
            print(f"[学习] 新规则已添加: {key}")

六、完整回测验证

将异常检测集成到回测框架,验证修正效果:

class BacktestWithAnomalyDetection:
    """带异常检测的回测框架"""
    
    def __init__(self, detector: AnomalyDetector):
        self.detector = detector
        self.stats = {
            'total_ticks': 0,
            'anomalies_detected': 0,
            'auto_corrected': 0,
            'manual_reviewed': 0
        }
    
    def run(
        self, 
        historical_data: List[dict],
        strategy_func: callable
    ) -> dict:
        """运行回测
        
        Args:
            historical_data: 历史 tick 数据
            strategy_func: 策略函数 (data -> signal)
        """
        corrected_data = []
        signals = []
        
        for tick in historical_data:
            self.stats['total_ticks'] += 1
            
            # 异常检测
            result = self.detector.process_tick(tick)
            
            if result is not tick:
                self.stats['anomalies_detected'] += 1
                if result.get('is_corrected'):
                    self.stats['auto_corrected'] += 1
            
            corrected_data.append(result)
            
            # 生成信号
            signal = strategy_func(result)
            signals.append(signal)
        
        # 计算策略绩效
        performance = self._calculate_performance(signals, corrected_data)
        
        return {
            'performance': performance,
            'stats': self.stats,
            'correction_log': self._get_correction_log()
        }
    
    def _calculate_performance(self, signals: list, data: list) -> dict:
        """计算绩效指标"""
        # 简化实现
        return {
            'total_return': 0.0,
            'sharpe_ratio': 0.0,
            'max_drawdown': 0.0
        }
    
    def _get_correction_log(self) -> List[dict]:
        """获取修正日志"""
        # 从复核队列提取已处理的记录
        return []


# 回测示例
def momentum_strategy(data: dict) -> str:
    """简单动量策略示例"""
    # 实际策略逻辑
    return 'hold'

七、部署方案与配置建议

根据使用场景,推荐以下配置:

场景 窗口大小 Z-Score 阈值 自动修正阈值 复核队列
个人量化 50-100 3.0 5.0 仅高优先级
团队/小型机构 100-200 3.5 4.5 全部记录
机构级部署 200-500 3.0 4.0 全量 + 规则学习

7.1 数据源配置

对于 TickDB 的不同数据类型,建议采用差异化检测策略:

数据类型 推荐检测方法 特殊规则
ticker(价格) Z-Score + MAD 双保险 盘前盘后降低阈值
depth(订单簿) 分位数 + 深度一致性 大单撤单检测
trades(逐笔) 成交量分位数 时间间隔异常检测
kline(K 线) 收盘价 Z-Score + 跳空检测 缺口超过 10% 强制复核

八、总结

异常值检测的核心挑战不是技术实现,而是判断力——区分真实市场信号与系统噪声的判断力。

本文提供的框架将这种判断力系统化:

  • 统计层提供客观的数值判断,消除主观偏差
  • 规则层注入领域知识,捕捉上下文信息
  • 人工层处理边缘案例,持续优化自动化边界

对于 TickDB 用户,建议从以下步骤开始:

  1. 先用轻量配置跑通数据管道,观察 2-3 周的数据分布
  2. 建立白名单,将已确认的真实异动模式加入白名单,减少误报
  3. 启用规则学习,从人工复核中提取新规则,逐渐扩大自动化覆盖
  4. 定期复盘,每季度评估检测效果,调整阈值

下一步行动

如果你希望亲手实现本文框架

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY,复制本文代码即可运行
  4. 从单标的、单策略开始,逐步扩展到全市场覆盖

如果你需要企业级数据治理方案,包括全量历史数据清洗、定制化异常规则库、机构级 SLA 保障,联系 [email protected]

如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,用自然语言描述你的异常检测需求,让 AI 生成适配的检测规则。


风险提示:本文不构成任何投资建议。异常值检测算法本身不产生交易信号,仅作为数据质量保障工具。任何策略在实盘部署前,请进行充分的回测验证和纸上交易测试。市场有风险,投资需谨慎。