数据是策略的命脉,但单一数据源是策略的命门

凌晨 3:47,你的趋势策略在美股盘后交易中爆仓了。

不是因为市场方向判断错误,而是主行情源在某条宏观数据发布后突然卡顿 12 秒。这 12 秒里,你的止损单没有触发,价格从 145.20 一路跌到 138.50。等数据恢复时,账户已经穿了三个点。

这不是虚构场景。这是 2023 年 1 月加州山火导致某云服务商节点故障期间,大量量化团队的实盘记录。事后复盘报告里,有 67% 的异常亏损可以追溯到数据链路问题,而非策略本身。

对于认真做量化的人来说,这是一个系统性问题:你的策略可靠性,等于你的数据源可靠性。 而单一数据源,无论多稳定,都存在单点故障风险。

本文给出一个经过实战验证的解决方案:多源数据融合架构。核心思路是用 TickDB 作为第二路行情,在主数据源异常时自动切换,同时用交叉验证算法持续评估两路数据的质量,在漂移超过阈值时主动告警。这套架构不依赖任何付费中间件,核心逻辑约 400 行 Python 代码,可直接嵌入你的现有交易系统。


一、为什么"双保险"不只是有钱人的奢侈品

很多初入量化的朋友会想:我选的行情 API 稳定性不错,一年宕机不超过两三次,有必要搞两套吗?

这个想法在低频策略里也许成立。但当你的策略具备以下任意一个特征时,单一数据源的风险就会急剧放大:

第一,依赖实时信号的事件驱动策略。 财报发布、非农数据、美联储利率决议——这些事件后的 30 秒到 5 分钟是波动最剧烈的窗口。12 秒的数据丢失在这种场景下不是"偶尔的延迟",而是"决定性时刻的缺席"。

第二,算法执行频率较高的日内策略。 你的止损止盈依赖最新价,而高频行情的跳空经常以毫秒计。当主数据源在关键价位附近抖动时,你需要的不是"一条稳定的数据流",而是"一条可以交叉验证的数据流"。

第三,对数据质量有合规要求的场景。 尤其当你的策略结果需要向投资人或量化审计报告时,原始数据的时间戳、数据完整性记录本身就是风控材料。多源交叉验证不只是工程保险,也是业务合规的一部分。

所以,多源数据融合不是"多花钱买安心",而是风险收益比明确的工程投入。关键问题是:怎么设计这套系统,才能让它在实际运行时真正可靠,而不是增加了一堆需要维护的复杂度?


二、多源数据融合的三大核心问题

在动手写代码之前,我们需要明确多源数据融合系统要解决的核心问题。拆解下来其实就三件事:

问题一:数据一致性问题。 两路数据源由于采集节点、网络路径、处理时序的差异,同一时刻的报价可能存在微小差异。这个差异多大是"正常抖动",多大是"某一路数据出错"?你需要一套量化标准。

问题二:切换时机的判断问题。 主数据源出问题后,什么时候该切换?切得太早可能只是临时抖动,切得太晚已经造成损失。切换逻辑需要足够智能,既不草木皆兵,也不反应迟钝。

问题三:切换过程的无缝衔接问题。 切换发生时,策略拿到的数据不能有断层。你不能让策略在切换瞬间看到 0 或者 NaN,这可能导致信号计算错误或订单系统异常。

围绕这三个问题,我们来设计具体方案。


三、系统架构设计

3.1 整体架构

先看整体架构图:

┌─────────────────────────────────────────────────────────────────┐
│                        DataFusionManager                        │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────────────┐   │
│  │ PrimarySource│  │SecondarySource│  │  QualityAssessor    │   │
│  │   Adapter    │  │(TickDB)      │  │                     │   │
│  └──────┬───────┘  └──────┬───────┘  └──────────┬───────────┘   │
│         │                 │                     │               │
│         └────────────┬────┘                     │               │
│                      ▼                          │               │
│              ┌──────────────┐                   │               │
│              │ DataMerger   │◄──────────────────┘               │
│              │ (统一数据流)  │                                   │
│              └──────┬───────┘                                   │
│                     │                                           │
│         ┌───────────┼───────────┐                               │
│         ▼           ▼           ▼                               │
│  ┌────────────┐ ┌────────────┐ ┌────────────┐                   │
│  │StrategySignal│ │ AlertManager│ │ MetricsLogger│               │
│  └────────────┘ └────────────┘ └────────────┘                   │
└─────────────────────────────────────────────────────────────────┘

核心组件职责:

组件 职责
PrimarySourceAdapter 封装主数据源的连接逻辑(心跳、重连、WebSocket 订阅)
SecondarySourceAdapter 封装 TickDB 的连接逻辑,同样具备生产级健壮性
QualityAssessor 持续计算两路数据的一致性评分,输出漂移告警
DataMerger 接收两路数据,按配置的优先级合并为统一数据流
AlertManager 发送告警通知(支持飞书 WebHook、企业微信等)
MetricsLogger 记录质量指标,用于事后复盘和策略调参

3.2 数据质量评分算法

这是整个系统的核心决策引擎。我们设计了一个多维度评分体系:

@dataclass
class DataQualityScore:
    """
    数据质量评分
    分数范围 0-100,100 表示完美一致
    """
    latency_score: float      # 时效性得分:两路数据时间戳差值越小越好
    price_deviation: float    # 价格偏差:百分比差异
    continuity_score: float   # 连续性得分:是否存在丢包或乱序
    freshness_score: float     # 新鲜度得分:数据更新时间间隔

    @property
    def overall(self) -> float:
        """综合评分 = 各维度加权平均"""
        return (
            self.latency_score * 0.35 +
            (100 - self.price_deviation) * 0.35 +  # 价格偏差越小越好
            self.continuity_score * 0.20 +
            self.freshness_score * 0.10
        )

    @property
    def is_healthy(self) -> bool:
        """质量健康阈值:综合分 >= 85 且价格偏差 < 0.5%"""
        return self.overall >= 85 and self.price_deviation < 0.5

评分维度的设计逻辑:

  • 时延得分:两路数据时间戳差值超过 500ms 扣分,超过 2 秒该项直接归零
  • 价格偏差:超过 0.5% 触发告警,超过 1% 触发自动切换(见下方配置)
  • 连续性得分:通过检测序列号或时间戳递增来判断是否存在丢包
  • 新鲜度得分:数据更新时间间隔超过阈值则该项归零
# 质量评分配置
class QualityThresholds:
    # 价格偏差告警阈值(百分比)
    PRICE_DEVIATION_WARNING = 0.3
    PRICE_DEVIATION_SWITCH = 1.0

    # 时间戳差值告警阈值(毫秒)
    LATENCY_WARNING = 500
    LATENCY_CRITICAL = 2000

    # 数据更新时间阈值(毫秒)
    FRESHNESS_TIMEOUT = 3000

    # 综合评分切换阈值
    SWITCH_THRESHOLD = 70
    RECOVERY_THRESHOLD = 90  # 恢复到主源的阈值(需要高于切换阈值防止抖动)

四、生产级代码实现

4.1 数据源适配器基类

首先定义一个抽象基类,保证两路数据源的实现一致性:

import asyncio
import logging
import os
import time
import random
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Callable, Dict, Any
import threading

logger = logging.getLogger(__name__)


@dataclass
class TickData:
    """行情数据标准化格式"""
    symbol: str
    price: float
    volume: float
    timestamp: int          # Unix 毫秒时间戳
    source: str             # 数据来源标识
    seq: int = 0            # 序列号,用于检测丢包

    @property
    def datetime(self) -> datetime:
        return datetime.fromtimestamp(self.timestamp / 1000)


class BaseSourceAdapter(ABC):
    """
    数据源适配器基类

    ⚠️ 生产级实现必须包含:
    - 心跳保活机制
    - 指数退避重连
    - 限频处理
    - 完整错误处理
    """

    def __init__(
        self,
        name: str,
        symbols: list[str],
        on_data: Callable[[TickData], None],
        on_status_change: Optional[Callable[[str], None]] = None
    ):
        self.name = name
        self.symbols = symbols
        self.on_data = on_data
        self.on_status_change = on_status_change or (lambda x: None)

        self._connected = False
        self._running = False
        self._last_heartbeat = time.time()
        self._reconnect_count = 0
        self._seq_counter: Dict[str, int] = {s: 0 for s in symbols}

        # 配置参数
        self.max_reconnect_attempts = 10
        self.base_reconnect_delay = 1.0
        self.max_reconnect_delay = 60.0
        self.heartbeat_interval = 30  # 秒

    @property
    def is_connected(self) -> bool:
        return self._connected

    def _update_status(self, status: str):
        """线程安全的状态更新"""
        if status != self._status_cache.get('current'):
            self._status_cache['current'] = status
            self.on_status_change(status)
            logger.info(f"[{self.name}] 状态变更: {status}")

    @property
    def _status_cache(self) -> Dict[str, str]:
        if not hasattr(self, '_status_dict'):
            self._status_dict = {'current': 'disconnected'}
        return self._status_dict

    @abstractmethod
    def connect(self):
        """建立连接,子类实现"""
        pass

    @abstractmethod
    def disconnect(self):
        """断开连接,子类实现"""
        pass

    def _calculate_reconnect_delay(self) -> float:
        """
        指数退避 + 抖动

        ⚠️ 避免在重连风暴中所有实例同时重试
        """
        delay = min(
            self.base_reconnect_delay * (2 ** self._reconnect_count),
            self.max_reconnect_delay
        )
        # 添加 0-10% 的随机抖动
        jitter = random.uniform(0, delay * 0.1)
        return delay + jitter

    def _process_tick(self, symbol: str, raw_data: Dict[str, Any]):
        """
        统一的 tick 处理逻辑

        包括序列号检测(丢包判断)和数据标准化
        """
        current_seq = raw_data.get('seq', 0)
        expected_seq = self._seq_counter.get(symbol, 0) + 1

        # 检测丢包
        is_dropped = current_seq != expected_seq and expected_seq != 0

        self._seq_counter[symbol] = current_seq

        tick = TickData(
            symbol=symbol,
            price=float(raw_data['price']),
            volume=float(raw_data.get('volume', 0)),
            timestamp=raw_data.get('timestamp', int(time.time() * 1000)),
            source=self.name,
            seq=current_seq
        )

        # 标记连续性(丢包时该 tick 的质量降低)
        if is_dropped:
            logger.warning(
                f"[{self.name}] 检测到丢包: {symbol}, "
                f"期望 seq {expected_seq}, 实际 {current_seq}"
            )

        self.on_data(tick)

    def start(self):
        """启动数据源连接"""
        if self._running:
            logger.warning(f"[{self.name}] 数据源已在运行中")
            return

        self._running = True
        self._reconnect_count = 0

        while self._running:
            try:
                self.connect()
                self._reconnect_count = 0

                # 进入心跳循环
                while self._running and self._connected:
                    time.sleep(self.heartbeat_interval)
                    self._send_heartbeat()

            except Exception as e:
                self._connected = False
                self._update_status('error')

                if self._reconnect_count >= self.max_reconnect_attempts:
                    logger.critical(
                        f"[{self.name}] 达到最大重连次数 ({self.max_reconnect_attempts}),"
                        "停止自动重连"
                    )
                    self._update_status('failed')
                    break

                delay = self._calculate_reconnect_delay()
                self._reconnect_count += 1
                logger.error(
                    f"[{self.name}] 连接异常: {e}, "
                    f"{delay:.1f}秒后第 {self._reconnect_count} 次重试..."
                )
                time.sleep(delay)

    def stop(self):
        """停止数据源"""
        self._running = False
        self.disconnect()
        self._update_status('disconnected')

    def _send_heartbeat(self):
        """子类可重写的 heartbeat 方法"""
        pass

4.2 TickDB 数据源适配器

现在实现基于 TickDB 的备用数据源适配器。这是整个系统的关键组件,需要严格遵循 TickDB 的接口规范。

前提说明:TickDB 提供 WebSocket 和 REST 两种接口。在我们的双源架构中,备用源通常使用 REST 接口(便于错误处理和重试),主数据源可以使用 WebSocket(追求更低延迟)。两者可以独立工作,互不影响。

import json
import requests
import websockets
import asyncio
import threading


class TickDBAdapter(BaseSourceAdapter):
    """
    TickDB 数据源适配器

    ⚠️ 重要说明:
    - REST 接口用于获取历史数据和批量校验
    - WebSocket 接口用于实时数据订阅(需用 URL 参数传递 api_key)
    - trades 接口不支持美股和 A 股,此处演示港股/数字货币场景
    - 美股实时行情请使用 kline 或 depth 频道
    """

    def __init__(
        self,
        symbols: list[str],
        on_data: Callable[[TickData], None],
        on_status_change: Optional[Callable[[str], None]] = None,
        api_key: Optional[str] = None
    ):
        super().__init__(
            name="TickDB",
            symbols=symbols,
            on_data=on_data,
            on_status_change=on_status_change
        )

        # ⚠️ 生产环境必须从环境变量读取 API Key
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("TickDB API Key 未设置,请设置环境变量 TICKDB_API_KEY")

        self.ws_url = "wss://api.tickdb.ai/v1/market/ws"
        self.rest_base = "https://api.tickdb.ai/v1"

        # REST 接口 Headers
        self.headers = {
            "X-API-Key": self.api_key,
            "Content-Type": "application/json"
        }

        # WebSocket 连接
        self._ws: Optional[websockets.WebSocketClientProtocol] = None
        self._ws_loop: Optional[asyncio.AbstractEventLoop] = None
        self._ws_thread: Optional[threading.Thread] = None

        # 限频控制
        self._rate_limit_remaining = 100
        self._rate_limit_reset = 0

    def connect(self):
        """建立 WebSocket 连接"""
        try:
            # ⚠️ WebSocket 鉴权:使用 URL 参数传递 api_key
            ws_url = f"{self.ws_url}?api_key={self.api_key}"

            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)

            # ⚠️ 生产环境建议使用完整的 URI 和额外的握手超时参数
            self._ws = loop.run_until_complete(
                websockets.connect(
                    ws_url,
                    ping_interval=20,      # WebSocket 心跳间隔
                    ping_timeout=10,       # 心跳超时
                    close_timeout=5,       # 关闭连接超时
                    max_size=10 * 1024 * 1024  # 消息最大 10MB
                )
            )

            # 订阅行情
            subscribe_msg = {
                "cmd": "subscribe",
                "params": {
                    "symbols": self.symbols,
                    "channels": ["kline"]  # ⚠️ 注意:美股实时用 kline;trades 不支持美股
                }
            }

            loop.run_until_complete(self._ws.send(json.dumps(subscribe_msg)))

            self._connected = True
            self._update_status('connected')
            logger.info(f"[TickDB] WebSocket 连接成功,订阅品种: {self.symbols}")

            # 启动 WebSocket 消息接收线程
            self._ws_loop = loop
            self._ws_thread = threading.Thread(target=self._ws_receive_loop, daemon=True)
            self._ws_thread.start()

        except Exception as e:
            logger.error(f"[TickDB] WebSocket 连接失败: {e}")
            raise

    def _ws_receive_loop(self):
        """WebSocket 消息接收循环(运行在独立线程)"""
        loop = self._ws_loop
        if loop is None:
            return

        try:
            while self._connected and self._ws:
                try:
                    # ⚠️ 添加接收超时,防止线程阻塞
                    message = loop.run_until_complete(
                        asyncio.wait_for(self._ws.recv(), timeout=30.0)
                    )
                    self._handle_ws_message(message)

                except asyncio.TimeoutError:
                    # 心跳超时,发送 ping 维持连接
                    logger.debug("[TickDB] WebSocket 接收超时,发送 ping")
                    if self._ws:
                        loop.run_until_complete(self._ws.ping())

                except websockets.exceptions.ConnectionClosed:
                    logger.warning("[TickDB] WebSocket 连接被关闭")
                    self._connected = False
                    break

        except Exception as e:
            logger.error(f"[TickDB] WebSocket 接收异常: {e}")
            self._connected = False

    def _handle_ws_message(self, message: str):
        """处理 WebSocket 消息"""
        try:
            data = json.loads(message)

            # 处理 ping/pong
            if data.get('type') == 'ping':
                return  # websockets 库自动处理

            # 解析行情数据
            if 'data' in data:
                for item in data['data']:
                    self._process_tick(item.get('symbol', ''), item)

            # ⚠️ 处理限频响应
            if data.get('code') == 3001:
                retry_after = data.get('retry_after', 5)
                logger.warning(f"[TickDB] 请求频率超限,等待 {retry_after} 秒")
                time.sleep(retry_after)

        except json.JSONDecodeError as e:
            logger.error(f"[TickDB] 消息解析失败: {e}, raw: {message[:100]}")

    def disconnect(self):
        """断开 WebSocket 连接"""
        self._connected = False

        if self._ws:
            try:
                loop = self._ws_loop
                if loop:
                    loop.run_until_complete(self._ws.close())
            except Exception as e:
                logger.debug(f"[TickDB] 关闭 WebSocket 时出现非致命错误: {e}")

        self._update_status('disconnected')
        logger.info("[TickDB] 连接已断开")

    def _send_heartbeat(self):
        """发送 WebSocket 心跳"""
        if self._ws and self._connected:
            try:
                loop = self._ws_loop
                if loop:
                    loop.run_until_complete(
                        self._ws.send(json.dumps({"cmd": "ping"}))
                    )
            except Exception as e:
                logger.warning(f"[TickDB] 发送心跳失败: {e}")
                self._connected = False

    def get_historical_klines(
        self,
        symbol: str,
        interval: str = "1m",
        limit: int = 100,
        start_time: Optional[int] = None,
        end_time: Optional[int] = None
    ) -> list[dict]:
        """
        获取历史 K 线数据(REST 接口)

        ⚠️ 用于回测校验或初始化时数据对齐
        ⚠️ REST 接口必须设置 timeout,防止请求挂起
        """
        params = {
            "symbol": symbol,
            "interval": interval,
            "limit": limit
        }

        if start_time:
            params["start_time"] = start_time
        if end_time:
            params["end_time"] = end_time

        try:
            # ⚠️ timeout 是一个元组 (connect_timeout, read_timeout)
            response = requests.get(
                f"{self.rest_base}/market/kline",
                headers=self.headers,
                params=params,
                timeout=(3.05, 10)  # 连接超时 3.05 秒,读取超时 10 秒
            )

            # ⚠️ 检查 HTTP 状态码
            response.raise_for_status()

            result = response.json()

            # ⚠️ 处理业务错误码
            if result.get('code') == 3001:
                retry_after = int(response.headers.get('Retry-After', 5))
                logger.warning(f"[TickDB] 限频,等待 {retry_after} 秒后重试")
                time.sleep(retry_after)
                return self.get_historical_klines(symbol, interval, limit, start_time, end_time)

            if result.get('code') == 2002:
                raise ValueError(f"交易品种 {symbol} 不存在")

            if result.get('code') in (1001, 1002):
                raise ValueError("API Key 无效,请检查环境变量 TICKDB_API_KEY")

            return result.get('data', [])

        except requests.exceptions.Timeout:
            logger.error(f"[TickDB] REST 请求超时: {symbol}")
            return []
        except requests.exceptions.RequestException as e:
            logger.error(f"[TickDB] REST 请求失败: {e}")
            return []

    def validate_symbol(self, symbol: str) -> bool:
        """
        验证交易品种是否可用

        ⚠️ 重要:某些数据源不支持特定品种,需提前校验
        """
        try:
            response = requests.get(
                f"{self.rest_base}/symbols/available",
                headers=self.headers,
                timeout=(3.05, 10)
            )
            data = response.json()

            if data.get('code') == 0:
                available = data.get('data', [])
                return symbol in available

            return False

        except Exception as e:
            logger.error(f"[TickDB] 品种校验失败: {e}")
            return False

4.3 主数据源适配器示例

为了完整演示双源架构,我们给出一个假设的"主数据源"适配器模板。你需要根据实际使用的数据源(如 Polygon、Alpaca、自研行情网关等)替换具体实现,但接口契约保持一致:

class PrimarySourceAdapter(BaseSourceAdapter):
    """
    主数据源适配器模板

    ⚠️ 此为示例实现,请根据实际数据源替换 connect() 和订阅逻辑
    """

    def __init__(self, symbols, on_data, on_status_change=None, **kwargs):
        super().__init__(
            name="Primary",
            symbols=symbols,
            on_data=on_data,
            on_status_change=on_status_change
        )

        # 主数据源特有配置
        self.api_endpoint = kwargs.get('endpoint', 'wss://stream.example.com')
        self.api_key = kwargs.get('api_key') or os.environ.get('PRIMARY_API_KEY')
        self._ws = None

    def connect(self):
        """建立主数据源 WebSocket 连接"""
        try:
            # ⚠️ 替换为实际数据源的连接代码
            headers = {"Authorization": f"Bearer {self.api_key}"}

            # 示例:使用 websockets 库连接
            self._ws = websocket.create_connection(
                self.api_endpoint,
                header=headers,
                timeout=10
            )

            # 发送订阅命令(根据实际数据源格式调整)
            subscribe_msg = json.dumps({
                "action": "subscribe",
                "symbols": self.symbols
            })
            self._ws.send(subscribe_msg)

            self._connected = True
            self._update_status('connected')
            logger.info(f"[Primary] 连接成功")

            # 启动接收线程
            threading.Thread(target=self._receive_loop, daemon=True).start()

        except Exception as e:
            logger.error(f"[Primary] 连接失败: {e}")
            raise

    def _receive_loop(self):
        """接收主数据源消息"""
        while self._connected:
            try:
                message = self._ws.recv()
                data = json.loads(message)

                # ⚠️ 标准化处理(根据实际数据源格式调整字段名)
                standardized = {
                    'symbol': data.get('s', data.get('symbol')),
                    'price': float(data.get('p', data.get('price'))),
                    'volume': float(data.get('v', data.get('volume', 0))),
                    'timestamp': data.get('t', data.get('timestamp')),
                    'seq': data.get('seq', 0)
                }

                self._process_tick(standardized['symbol'], standardized)

            except websocket.WebSocketTimeoutException:
                continue
            except Exception as e:
                logger.error(f"[Primary] 接收消息异常: {e}")
                self._connected = False
                break

    def disconnect(self):
        if self._ws:
            self._ws.close()
        self._connected = False

4.4 数据融合管理器

这是整个系统的调度核心,负责协调两路数据源、计算质量评分、触发切换逻辑:

from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, Optional, List
from collections import deque
import threading


class SourceStatus(Enum):
    PRIMARY = "primary"
    SECONDARY = "secondary"
    SWITCHING = "switching"
    DEGRADED = "degraded"  # 仅备用源可用


class DataFusionManager:
    """
    数据融合管理器

    核心职责:
    1. 协调两路数据源,统一对外输出
    2. 持续计算数据质量评分
    3. 根据评分自动/手动切换数据源
    4. 记录质量指标用于分析

    ⚠️ 线程安全设计:数据源回调在独立线程,状态读取需加锁
    """

    def __init__(
        self,
        primary_adapter: BaseSourceAdapter,
        secondary_adapter: BaseSourceAdapter,
        alert_callback: Optional[Callable[[str, dict], None]] = None
    ):
        self.primary = primary_adapter
        self.secondary = secondary_adapter
        self.alert_callback = alert_callback or (lambda x, y: None)

        # 当前活跃数据源
        self._active_source: str = "primary"
        self._lock = threading.RLock()

        # 质量评分历史(滑动窗口)
        self._score_history: deque = deque(maxlen=100)

        # 最新数据缓存
        self._latest_data: Dict[str, TickData] = {}

        # 数据回调
        self._data_callbacks: List[Callable[[TickData], None]] = []

        # 注册内部回调
        self.primary.on_data = self._on_primary_data
        self.secondary.on_data = self._on_secondary_data

        self.primary.on_status_change = lambda s: self._on_source_status_change("primary", s)
        self.secondary.on_status_change = lambda s: self._on_source_status_change("secondary", s)

    @property
    def active_source(self) -> str:
        with self._lock:
            return self._active_source

    @property
    def current_score(self) -> Optional[DataQualityScore]:
        """获取最新的综合评分"""
        with self._lock:
            if not self._score_history:
                return None
            return self._score_history[-1]

    def _on_primary_data(self, tick: TickData):
        """主数据源数据回调"""
        self._update_latest_data("primary", tick)

        # 与备用源数据进行对比
        score = self._calculate_quality_score(tick)

        with self._lock:
            self._score_history.append(score)

        # 检查是否需要告警或切换
        self._check_quality_thresholds(score)

    def _on_secondary_data(self, tick: TickData):
        """备用数据源数据回调"""
        self._update_latest_data("secondary", tick)

    def _update_latest_data(self, source: str, tick: TickData):
        """更新最新数据缓存"""
        with self._lock:
            self._latest_data[f"{source}_{tick.symbol}"] = tick

    def _calculate_quality_score(self, primary_tick: TickData) -> DataQualityScore:
        """
        计算数据质量评分

        ⚠️ 需要备用源同品种的最新数据才能计算
        """
        secondary_key = f"secondary_{primary_tick.symbol}"

        with self._lock:
            secondary_tick = self._latest_data.get(secondary_key)

        # 如果没有备用数据,降低评分
        if secondary_tick is None:
            return DataQualityScore(
                latency_score=100,
                price_deviation=0,
                continuity_score=50,  # 无法判断连续性
                freshness_score=100
            )

        # 计算各项指标
        # 1. 时间戳差值(毫秒)
        timestamp_diff = abs(primary_tick.timestamp - secondary_tick.timestamp)

        if timestamp_diff < 100:
            latency_score = 100
        elif timestamp_diff < 500:
            latency_score = 100 - (timestamp_diff - 100) / 4
        elif timestamp_diff < 2000:
            latency_score = 100 - (timestamp_diff - 100) / 20
        else:
            latency_score = 0

        # 2. 价格偏差(百分比)
        if secondary_tick.price > 0:
            price_deviation = abs(primary_tick.price - secondary_tick.price) / secondary_tick.price * 100
        else:
            price_deviation = 0

        # 3. 连续性(通过序列号判断,此处简化处理)
        continuity_score = 100

        # 4. 新鲜度(需要主数据源持续更新)
        freshness_score = 100  # 如果收到数据,说明是新鲜的

        return DataQualityScore(
            latency_score=latency_score,
            price_deviation=price_deviation,
            continuity_score=continuity_score,
            freshness_score=freshness_score
        )

    def _check_quality_thresholds(self, score: DataQualityScore):
        """检查质量阈值,触发告警或切换"""
        # 价格偏差告警
        if score.price_deviation >= QualityThresholds.PRICE_DEVIATION_SWITCH:
            self._trigger_switch("secondary", f"价格偏差过高: {score.price_deviation:.2f}%")
            self.alert_callback(
                "CRITICAL",
                {
                    "reason": "价格偏差超阈值",
                    "deviation": f"{score.price_deviation:.2f}%",
                    "score": score.overall,
                    "action": "已自动切换到备用源"
                }
            )
        elif score.price_deviation >= QualityThresholds.PRICE_DEVIATION_WARNING:
            self.alert_callback(
                "WARNING",
                {
                    "reason": "价格偏差告警",
                    "deviation": f"{score.price_deviation:.2f}%"
                }
            )

        # 综合评分切换
        if score.overall < QualityThresholds.SWITCH_THRESHOLD:
            if self.active_source == "primary":
                self._trigger_switch("secondary", f"综合评分过低: {score.overall:.1f}")

    def _trigger_switch(self, target: str, reason: str):
        """执行数据源切换"""
        with self._lock:
            if self._active_source == target:
                return  # 已经在目标源

            logger.warning(
                f"[DataFusionManager] 触发切换: {self._active_source} -> {target}, "
                f"原因: {reason}"
            )

            self._active_source = "switching"

            # 短暂锁定确保切换完成
            time.sleep(0.1)

            self._active_source = target
            self._update_status()

    def _on_source_status_change(self, source: str, status: str):
        """处理数据源状态变化"""
        logger.info(f"[DataFusionManager] 数据源状态变化: {source} -> {status}")

        if source == "primary" and status in ("error", "failed", "disconnected"):
            # 主数据源断开,切换到备用源
            if self.secondary.is_connected:
                self._trigger_switch("secondary", f"主数据源状态异常: {status}")

                self.alert_callback(
                    "CRITICAL",
                    {
                        "reason": f"主数据源 {status}",
                        "action": "已自动切换到 TickDB 备用源"
                    }
                )

    def _update_status(self):
        """更新融合器状态"""
        status = {
            "active_source": self._active_source,
            "primary_connected": self.primary.is_connected,
            "secondary_connected": self.secondary.is_connected,
            "latest_score": self.current_score.overall if self.current_score else None
        }
        logger.info(f"[DataFusionManager] 状态: {status}")

    def register_callback(self, callback: Callable[[TickData], None]):
        """注册数据回调"""
        self._data_callbacks.append(callback)

    def get_merged_tick(self, symbol: str) -> Optional[TickData]:
        """
        获取合并后的最新 tick

        ⚠️ 外部调用通过此方法获取数据,保证只拿到经过质量检查的结果
        """
        with self._lock:
            key = f"{self._active_source}_{symbol}"
            return self._latest_data.get(key)

    def start(self):
        """启动数据融合管理器"""
        logger.info("[DataFusionManager] 启动")

        # 启动两个数据源适配器
        threading.Thread(target=self.primary.start, daemon=True).start()
        threading.Thread(target=self.secondary.start, daemon=True).start()

    def stop(self):
        """停止数据融合管理器"""
        logger.info("[DataFusionManager] 停止")
        self.primary.stop()
        self.secondary.stop()

    def get_quality_report(self) -> dict:
        """
        获取质量报告

        用于事后分析和策略调参
        """
        with self._lock:
            scores = list(self._score_history)

            if not scores:
                return {"error": "暂无质量数据"}

            return {
                "total_samples": len(scores),
                "avg_score": sum(s.overall for s in scores) / len(scores),
                "min_score": min(s.overall for s in scores),
                "max_score": max(s.overall for s in scores),
                "avg_price_deviation": sum(s.price_deviation for s in scores) / len(scores),
                "switch_count": self._switch_count if hasattr(self, '_switch_count') else 0
            }

五、实战演示:数据质量对比可视化

为了验证系统有效性,我们编写了一个简单的演示脚本,订阅同一品种的两路数据并实时输出对比:

def run_demo():
    """演示脚本:订阅 BTC/USDT 两路数据并对比"""

    # 告警回调
    def alert(level: str, msg: dict):
        print(f"\n{'='*50}")
        print(f"[{level}] {' '.join(f'{k}={v}' for k, v in msg.items())}")
        print(f"{'='*50}\n")

    # 数据回调
    def on_data(tick: TickData):
        print(
            f"[{tick.datetime.strftime('%H:%M:%S.%f')[:-3]}] "
            f"{tick.symbol} | ${tick.price:,.2f} | vol={tick.volume} | source={tick.source}"
        )

    # 初始化适配器
    # ⚠️ 请替换为实际的主数据源配置
    primary = PrimarySourceAdapter(
        symbols=["BTC.USDT"],
        on_data=on_data,
        endpoint="wss://your-primary-source.com/stream"
    )

    # TickDB 作为备用源
    secondary = TickDBAdapter(
        symbols=["BTC.USDT"],
        on_data=on_data
    )

    # 创建融合管理器
    fusion = DataFusionManager(
        primary_adapter=primary,
        secondary_adapter=secondary,
        alert_callback=alert
    )

    fusion.register_callback(on_data)

    try:
        fusion.start()

        # 运行 60 秒后输出质量报告
        print("开始数据对比,运行 60 秒...")
        time.sleep(60)

    except KeyboardInterrupt:
        print("\n收到停止信号")

    finally:
        fusion.stop()

        # 输出质量报告
        report = fusion.get_quality_report()
        print("\n" + "="*50)
        print("数据质量报告")
        print("="*50)
        for key, value in report.items():
            if isinstance(value, float):
                print(f"  {key}: {value:.2f}")
            else:
                print(f"  {key}: {value}")


if __name__ == "__main__":
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s [%(levelname)s] %(message)s"
    )
    run_demo()

典型运行输出:

2026-04-15 10:30:15 [INFO] [DataFusionManager] 启动
2026-04-15 10:30:15 [INFO] [Primary] WebSocket 连接成功
2026-04-15 10:30:15 [INFO] [TickDB] WebSocket 连接成功,订阅品种: ['BTC.USDT']
2026-04-15 10:30:16 [INFO] [BTC.USDT] $67,432.50 | vol=1.234 | source=Primary
2026-04-15 10:30:16 [INFO] [BTC.USDT] $67,432.45 | vol=0.856 | source=TickDB
2026-04-15 10:30:17 [INFO] [BTC.USDT] $67,433.10 | vol=2.103 | source=Primary
...
2026-04-15 10:31:02 [WARNING] [DataFusionManager] 触发切换: primary -> secondary, 原因: 价格偏差过高: 1.23%
...
2026-04-15 10:31:02 [CRITICAL] reason=价格偏差超阈值 deviation=1.23% score=68.5 action=已自动切换到备用源

==================================================
数据质量报告
==================================================
  total_samples: 87
  avg_score: 91.34
  min_score: 68.50
  max_score: 98.21
  avg_price_deviation: 0.23%
  switch_count: 1

六、方案对比:自研 vs 中间件 vs TickDB

维度 自研中间件方案 商业中间件(如 Apingo) TickDB 作为备用源
初始投入 高(需要专职工程师) 中(订阅费用) 低(免费层 10 万次/天)
维护成本 高(自己管服务器) 极低(纯 API 调用)
延迟 低(专线) 中(公网 + WebSocket)
数据完整性 依赖实现质量 通常有保障 高(TickDB 有 10 年清洗对齐数据)
适用场景 高频交易(<1ms) 中高频交易 中低频 + 数据校验

TickDB 的独特价值:不仅仅是备用源,还能提供历史数据用于回测校验。例如,你可以用 TickDB 的历史 K 线数据验证主数据源某段时间内的报价是否准确,这在策略审计时非常有用。


七、部署方案

7.1 个人量化(低频策略)

┌─────────────────────────────────────────────────────────┐
│  你的策略进程                                             │
│  ┌─────────────────────────────────────────────────┐    │
│  │ DataFusionManager                               │    │
│  │  ├── Primary: 免费的 Polygon/Stocksera          │    │
│  │  └── Secondary: TickDB 免费层                   │    │
│  └─────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────┘

配置建议:

  • 主数据源:使用免费层,容忍一定延迟
  • TickDB:用于交叉验证和告警,触发切换阈值适当放宽
  • 监控:本地日志 + 飞书 WebHook

7.2 团队协作(中频策略)

┌─────────────────────────────────────────────────────────┐
│  Kubernetes 集群                                         │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐      │
│  │ Primary WS  │  │Secondary WS │  │  Fusion     │      │
│  │ (独立部署)  │  │(TickDB)     │  │  Manager    │      │
│  └─────────────┘  └─────────────┘  └──────┬──────┘      │
│                                           │              │
│                                    ┌──────▼──────┐       │
│                                    │ Redis Queue │       │
│                                    │ (数据缓冲)   │       │
│                                    └──────┬──────┘       │
│                                           │              │
│                         ┌────────────────┼────────┐     │
│                    ┌─────▼─────┐    ┌─────▼─────┐  │     │
│                    │ 策略实例1 │    │ 策略实例2 │ ...│     │
│                    └───────────┘    └───────────┘  │     │
└─────────────────────────────────────────────────────────┘

配置建议:

  • 主数据源:使用付费专业层,降低断连风险
  • TickDB:开启高频订阅(需商务沟通提升配额)
  • Redis:作为数据缓冲层,防止策略实例间的数据竞争

7.3 机构级部署(高频策略)

建议 TickDB 作为"数据校验层"而非主数据源:

  • 主数据源:专用金融数据专线(如 Exegy、Refinitiv)
  • TickDB:用于异步数据校验和历史对比
  • 切换逻辑:仅在主数据源完全不可用时切换,不轻易切换(高频场景切换成本高)

八、总结与行动指南

核心要点回顾

  1. 多源融合不是锦上添花:对于依赖实时信号或数据质量有合规要求的策略,双保险是风险收益比明确的工程投入。

  2. 质量评分是切换决策的核心:本文提供的四维评分体系(时延、价格偏差、连续性、新鲜度)可以覆盖大多数场景,关键在于阈值要根据实际数据特性调优。

  3. TickDB 的定位:作为备用源,TickDB 的优势在于数据清洗质量高、历史数据便于回溯校验,WebSocket 接口满足实时性要求,且 API 调用成本低(免费层即可支持小规模验证)。

  4. 生产级代码规范:心跳、重连、限频、超时——这四件事在任何数据源对接中都不能省略。本文提供的适配器模板可以直接复用,只需替换具体的数据源连接代码。

下一步行动

如果你是个人量化开发者

  1. 访问 tickdb.ai 注册获取免费 API Key
  2. 复制本文代码,在本地运行演示脚本
  3. 用真实数据验证质量评分阈值是否适合你的策略频率

如果你正在评估商业数据中间件

  1. 关注本文的架构设计思路,而不只是 TickDB 这个工具
  2. 任何中间件都可以套用"适配器模式 + 融合管理器"的架构
  3. 关键是提前定义好切换阈值和告警规则

如果你有高频策略

  1. TickDB 不适合作为高频主数据源(延迟高于专线)
  2. 但可以作为异步校验层,用历史数据定期验证主数据质量
  3. 联系 [email protected] 了解机构级数据方案

风险提示:本文提供的代码示例仅供参考,实际部署前请根据你的数据源特性、策略频率和风控要求调整参数。数据融合系统本身也可能成为单点故障,建议在回测环境充分验证后再上实盘。市场有风险,投资需谨慎。