凌晨 3:17,你的交易系统告警了。

这不是普通的告警——过去 5 分钟的订单簿深度数据全部缺失。策略正在等待一个关键的技术形态确认,而它的"眼睛"在这 5 分钟里一直是黑的。

更糟糕的是,这 5 分钟恰好覆盖了某大型科技股盘后交易的关键波动窗口。你的追价策略需要这 5 分钟的数据来计算买卖压力比,而它拿到的是一片空白。

这不是假设。每一个运行实时数据流的工程师,都会在某个凌晨遇到这个场景。网络抖动、AWS 跨区切换、Kubernetes Pod 重启——断连的原因千变万化,但结果都一样:你的数据流出现空洞,而这个空洞恰好可能包含最有价值的信号。

本文给出生产级的解决方案:基于时间戳对齐的增量补数机制,配合 REST API 的离线拉取能力,实现 WebSocket 断连后的无缝数据修复。


一、断连的本质:不是"丢了数据",而是"时间轴上出现了空洞"

讨论补数方案之前,必须先理解问题的本质。

WebSocket 断连不是简单的"数据丢失",而是"时间轴上出现了空洞"。这个区别至关重要——丢失的数据可以通过其他途径重新获取,但空洞意味着你失去了对那段历史的连续感知能力。

对于 TickDB 的订单簿深度数据(depth 频道),这个问题的严重程度取决于你的策略依赖什么:

策略类型 对 depth 的依赖程度 断连 5 分钟的影响
趋势跟踪 轻微,可通过后续数据弥补
网格交易 中等,可能错过关键挂单密集区
微观结构择时 严重,买卖压力比的突变是核心信号
事件驱动 极高 灾难性,财报发布瞬间的数据不可替代

微观结构择时策略对连续性的要求最高。买卖压力比的变化是连续信号,任何一个时间点的缺失都可能导致信号误判。举例来说:

  • 正常情况:压力比从 2.1 → 1.8 → 1.3,策略判断空头力量正在积蓄
  • 断连 5 分钟:压力比从 2.1 直接跳到 1.3,策略可能误判为"瞬间崩塌"

后者会产生完全不同的策略响应——前者是减仓观望,后者是追空止损。


二、修复架构:三层分离的时间轴重建

补数方案的核心设计思路是三层分离

  1. 检测层:感知 WebSocket 断连事件,记录断连时间戳
  2. 补数层:基于断连时间窗口,从 REST API 拉取缺失数据
  3. 合并层:将补数数据与后续实时数据合并,重建完整时间轴
┌─────────────────────────────────────────────────────────────────┐
│                        数据消费层                                 │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐          │
│  │  实时数据   │ ←←←│   数据合并   │ ←←←│  补数数据   │          │
│  │ (WebSocket) │    │    队列     │    │  (REST API) │          │
│  └─────────────┘    └─────────────┘    └─────────────┘          │
└─────────────────────────────────────────────────────────────────┘
                              ↑
                    ┌─────────────────┐
                    │   断连检测器    │
                    │ 记录 last_ts    │
                    └─────────────────┘

关键设计决策:补数数据和实时数据不混合写入同一缓冲区,而是通过时间戳对齐后合并。原因:

  • 补数数据可能来自不同的数据快照(如 TickDB 的 kline 接口返回的是定稿数据)
  • 实时数据流是增量推送,补数数据是一次性拉取,合并时需要去重
  • 保留原始数据来源标记,便于后续审计和调试

三、状态管理:断连与修复的完整生命周期

在开始代码实现之前,需要定义清晰的状态机来管理连接和修复的生命周期:

from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import threading


class ConnectionState(Enum):
    """连接状态枚举"""
    CONNECTING = "connecting"
    CONNECTED = "connected"
    DISCONNECTED = "disconnected"
    RECOVERING = "recovering"  # 补数中
    RECOVERED = "recovered"
    FAILED = "failed"


@dataclass
class ConnectionContext:
    """连接上下文:管理断连与修复的完整生命周期"""
    
    # 基础状态
    state: ConnectionState = ConnectionState.DISCONNECTED
    symbol: str = ""
    
    # 时间戳追踪
    last_received_ts: Optional[int] = None  # 最后收到数据的时间戳(毫秒)
    disconnect_ts: Optional[int] = None     # 断连时间戳
    reconnect_ts: Optional[int] = None      # 重连成功时间戳
    
    # 修复追踪
    gap_start_ts: Optional[int] = None      # 数据空洞起始时间戳
    gap_end_ts: Optional[int] = None        # 数据空洞结束时间戳
    recovery_attempts: int = 0              # 补数尝试次数
    
    # 元数据
    created_at: datetime = field(default_factory=datetime.now)
    updated_at: datetime = field(default_factory=datetime.now)
    
    # 线程安全
    _lock: threading.Lock = field(default_factory=threading.Lock)
    
    def record_disconnect(self):
        """记录断连事件"""
        with self._lock:
            self.state = ConnectionState.DISCONNECTED
            self.disconnect_ts = int(datetime.now().timestamp() * 1000)
            # 数据空洞起始 = 最后收到的数据时间戳(如果存在)
            self.gap_start_ts = self.last_received_ts
            self.updated_at = datetime.now()
    
    def record_reconnect(self):
        """记录重连成功"""
        with self._lock:
            self.reconnect_ts = int(datetime.now().timestamp() * 1000)
            self.state = ConnectionState.RECOVERING
            # 数据空洞结束 = 重连时刻(补数将拉取到这个时间点之前的数据)
            self.gap_end_ts = self.reconnect_ts
            self.updated_at = datetime.now()
    
    def complete_recovery(self):
        """完成修复"""
        with self._lock:
            self.state = ConnectionState.RECOVERED
            self.gap_start_ts = None
            self.gap_end_ts = None
            self.recovery_attempts = 0
            self.updated_at = datetime.now()
    
    def get_gap_info(self) -> tuple[Optional[int], Optional[int]]:
        """获取需要补数的时间窗口"""
        with self._lock:
            return self.gap_start_ts, self.gap_end_ts
    
    def has_gap(self) -> bool:
        """是否存在未修复的数据空洞"""
        with self._lock:
            return (
                self.state == ConnectionState.RECOVERING
                and self.gap_start_ts is not None
                and self.gap_end_ts is not None
            )

这个状态管理的设计有几个关键点:

  1. 毫秒级时间戳:所有时间戳使用毫秒精度,确保补数的精确性
  2. 线程安全:使用锁保护状态修改,支持多线程数据写入
  3. 状态转换:清晰的 DISCONNECTED → RECOVERING → RECOVERED 转换路径
  4. gap 信息分离:补数完成后清理 gap 信息,但保留断连/重连的元数据用于审计

四、生产级代码:完整的断连修复实现

4.1 WebSocket 连接器(带心跳与指数退避)

import os
import json
import time
import random
import threading
import logging
from datetime import datetime
from typing import Callable, Optional
import websocket  # pip install websocket-client

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class TickDBWebSocketConnector:
    """
    TickDB WebSocket 连接器
    包含:心跳保活、指数退避重连、断连检测触发修复
    """
    
    def __init__(
        self,
        api_key: str,
        symbol: str,
        on_depth_data: Callable,
        on_recovery_data: Callable,
        on_state_change: Callable,
    ):
        self.api_key = api_key
        self.symbol = symbol
        self.on_depth_data = on_depth_data        # 实时数据回调
        self.on_recovery_data = on_recovery_data  # 补数数据回调
        self.on_state_change = on_state_change    # 状态变更回调
        
        # 连接参数
        self.ws_url = f"wss://api.tickdb.ai/ws/depth?api_key={api_key}&symbol={symbol}"
        self.ws: Optional[websocket.WebSocketApp] = None
        
        # 重连参数
        self.base_reconnect_delay = 1.0
        self.max_reconnect_delay = 60.0
        self.reconnect_attempt = 0
        
        # 状态
        self.ctx = ConnectionContext(symbol=symbol)
        self._running = False
        self._thread: Optional[threading.Thread] = None
        self._last_pong_ts: Optional[float] = None
        
    def connect(self):
        """启动 WebSocket 连接"""
        self._running = True
        self.ctx.state = ConnectionState.CONNECTING
        self.on_state_change(self.ctx)
        self._thread = threading.Thread(target=self._run_websocket, daemon=True)
        self._thread.start()
    
    def _run_websocket(self):
        """WebSocket 运行循环"""
        while self._running:
            try:
                self.ws = websocket.WebSocketApp(
                    self.ws_url,
                    on_message=self._on_message,
                    on_error=self._on_error,
                    on_close=self._on_close,
                    on_open=self._on_open,
                )
                # 运行心跳循环(60秒超时)
                self.ws.run_forever(ping_interval=30, ping_timeout=10)
            except Exception as e:
                logger.error(f"WebSocket 异常: {e}")
            
            if self._running:
                # 指数退避 + 抖动
                delay = min(
                    self.base_reconnect_delay * (2 ** self.reconnect_attempt),
                    self.max_reconnect_delay
                )
                jitter = random.uniform(0, delay * 0.1)
                self.reconnect_attempt += 1
                logger.info(f"等待 {delay + jitter:.1f} 秒后重连 (尝试 {self.reconnect_attempt})")
                time.sleep(delay + jitter)
    
    def _on_open(self, ws):
        """连接建立"""
        logger.info(f"[{self.symbol}] WebSocket 连接已建立")
        self.reconnect_attempt = 0
        
        # 如果是从断连恢复,记录重连时间戳
        if self.ctx.state == ConnectionState.DISCONNECTED:
            self.ctx.record_reconnect()
            self.on_state_change(self.ctx)
    
    def _on_message(self, ws, message):
        """接收消息"""
        try:
            data = json.loads(message)
            
            # 处理心跳响应
            if data.get("type") == "pong":
                self._last_pong_ts = time.time()
                return
            
            # 处理数据消息
            if "ts" in data:
                # 更新最后收到数据的时间戳
                self.ctx.last_received_ts = data["ts"]
                self.ctx.updated_at = datetime.now()
                
                # 根据状态分发数据
                if self.ctx.state == ConnectionState.RECOVERING:
                    # 补数期间的数据
                    self.on_recovery_data(data)
                else:
                    # 实时数据
                    self.on_depth_data(data)
                    
        except json.JSONDecodeError:
            logger.warning(f"收到非 JSON 消息: {message[:100]}")
    
    def _on_error(self, ws, error):
        """错误处理"""
        logger.error(f"[{self.symbol}] WebSocket 错误: {error}")
    
    def _on_close(self, ws, close_status_code, close_msg):
        """连接关闭 - 触发修复流程"""
        logger.warning(f"[{self.symbol}] WebSocket 断开 (code={close_status_code})")
        
        if self.ctx.state not in (ConnectionState.RECOVERING, ConnectionState.FAILED):
            self.ctx.record_disconnect()
            self.on_state_change(self.ctx)
    
    def disconnect(self):
        """主动断开连接"""
        self._running = False
        if self.ws:
            self.ws.close()
        self.ctx.state = ConnectionState.DISCONNECTED

4.2 REST 补数模块(时间窗口精确拉取)

import os
import time
import requests
from typing import Optional, Iterator


class TickDBRESTClient:
    """
    TickDB REST API 客户端
    用于 WebSocket 断连后的增量数据拉取
    """
    
    def __init__(self, api_key: str, timeout: tuple = (3.05, 10)):
        self.api_key = api_key
        self.timeout = timeout
        self.base_url = "https://api.tickdb.ai/v1"
        self.headers = {"X-API-Key": api_key}
    
    def _request_with_retry(
        self,
        method: str,
        endpoint: str,
        params: Optional[dict] = None,
        max_retries: int = 3,
    ) -> requests.Response:
        """
        带重试的请求方法
        处理限频 (code=3001) 和服务器错误
        """
        for attempt in range(max_retries):
            try:
                response = requests.request(
                    method=method,
                    url=f"{self.base_url}{endpoint}",
                    headers=self.headers,
                    params=params,
                    timeout=self.timeout,
                )
                
                # 检查业务错误码
                if response.status_code == 200:
                    data = response.json()
                    code = data.get("code", 0)
                    
                    if code == 0:
                        return response
                    elif code == 3001:
                        # 限频:读取 Retry-After 头
                        retry_after = int(response.headers.get("Retry-After", 5))
                        logger.warning(f"触发限频,等待 {retry_after} 秒")
                        time.sleep(retry_after)
                        continue
                    else:
                        raise RuntimeError(f"API 错误 {code}: {data.get('message')}")
                
                # HTTP 错误,重试
                response.raise_for_status()
                
            except requests.exceptions.RequestException as e:
                if attempt == max_retries - 1:
                    raise
                wait = (2 ** attempt) + random.uniform(0, 1)
                logger.warning(f"请求失败,{wait:.1f} 秒后重试: {e}")
                time.sleep(wait)
        
        raise RuntimeError("达到最大重试次数")
    
    def fetch_depth_history(
        self,
        symbol: str,
        start_ts: int,
        end_ts: int,
        limit: int = 1000,
    ) -> Iterator[dict]:
        """
        拉取指定时间窗口的深度数据
        
        Args:
            symbol: 交易品种,如 "AAPL.US"
            start_ts: 起始时间戳(毫秒)
            end_ts: 结束时间戳(毫秒)
            limit: 每次请求的返回条数上限
        
        Yields:
            depth 数据字典
        """
        params = {
            "symbol": symbol,
            "start_ts": start_ts,
            "end_ts": end_ts,
            "limit": limit,
        }
        
        while True:
            response = self._request_with_retry(
                method="GET",
                endpoint="/market/depth/history",
                params=params,
            )
            
            data = response.json()
            items = data.get("data", {}).get("items", [])
            
            if not items:
                break
            
            for item in items:
                yield item
            
            # 更新游标,获取下一页
            last_ts = items[-1].get("ts")
            if last_ts and last_ts < end_ts:
                params["start_ts"] = last_ts + 1
            else:
                break
            
            # 分页请求间隔,避免触发限频
            time.sleep(0.1)
    
    def fetch_kline_for_recovery(
        self,
        symbol: str,
        start_ts: int,
        end_ts: int,
        interval: str = "1m",
    ) -> Iterator[dict]:
        """
        拉取 K 线数据用于修复(当 depth 历史不可用时)
        
        注意:K 线是定稿数据,适合回测场景
        """
        params = {
            "symbol": symbol,
            "interval": interval,
            "start_ts": start_ts,
            "end_ts": end_ts,
            "limit": 1000,
        }
        
        while True:
            response = self._request_with_retry(
                method="GET",
                endpoint="/market/kline",
                params=params,
            )
            
            data = response.json()
            items = data.get("data", [])
            
            if not items:
                break
            
            for item in items:
                yield item
            
            # 移动时间窗口
            last_time = items[-1].get("time")
            if last_time:
                params["start_ts"] = last_time + 1
            else:
                break
            
            time.sleep(0.1)


# 辅助函数:时间戳转可读格式
def ts_to_datetime(ts_ms: int) -> str:
    """毫秒时间戳转可读格式"""
    from datetime import datetime, timezone
    return datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")

4.3 修复引擎:时间戳对齐与数据合并

from collections import deque
from dataclasses import dataclass, field


@dataclass
class DepthSnapshot:
    """深度快照数据结构"""
    ts: int           # 时间戳(毫秒)
    symbol: str
    asks: list        # 卖盘 [[price, volume], ...]
    bids: list        # 买盘 [[price, volume], ...]
    source: str = "ws"  # 数据来源: ws=实时, rest=补数
    
    @property
    def bid_pressure(self) -> float:
        """买盘压力:前5档买盘总量 / 前5档卖盘总量"""
        bid_vol = sum(float(b[1]) for b in self.bids[:5])
        ask_vol = sum(float(a[1]) for a in self.asks[:5])
        return bid_vol / ask_vol if ask_vol > 0 else float('inf')


class DataRecoveryEngine:
    """
    数据修复引擎
    核心功能:
    1. 接收断连事件,触发 REST 补数
    2. 对齐时间戳,处理边界情况
    3. 合并补数数据与实时数据
    """
    
    def __init__(self, rest_client: TickDBRESTClient, symbol: str):
        self.rest_client = rest_client
        self.symbol = symbol
        
        # 本地数据缓存(用于去重和合并)
        # 使用时间窗口限制内存使用:保留最近 5 分钟的数据
        self._cache: deque = deque(maxlen=300)  # 假设每秒1条,保留5分钟
        self._cache_by_ts: dict = {}  # 时间戳 -> 数据 的快速索引
        
        # 合并状态
        self._is_recovering = False
        self._recovered_until_ts: Optional[int] = None
        
        # 回调
        self._on_merged_data: Optional[Callable] = None
    
    def set_merged_callback(self, callback: Callable):
        """设置合并后数据回调"""
        self._on_merged_data = callback
    
    def handle_disconnect(self, ctx: ConnectionContext):
        """处理断连事件,触发补数"""
        gap_start, gap_end = ctx.get_gap_info()
        
        if gap_start is None or gap_end is None:
            logger.warning("断连但无有效时间窗口,跳过补数")
            return
        
        logger.info(f"开始补数: {ts_to_datetime(gap_start)} → {ts_to_datetime(gap_end)}")
        self._is_recovering = True
        
        # 在独立线程中执行补数,避免阻塞主数据流
        thread = threading.Thread(
            target=self._run_recovery,
            args=(gap_start, gap_end),
            daemon=True,
        )
        thread.start()
    
    def _run_recovery(self, start_ts: int, end_ts: int):
        """执行补数流程"""
        try:
            recovered_count = 0
            
            for snapshot in self.rest_client.fetch_depth_history(
                symbol=self.symbol,
                start_ts=start_ts,
                end_ts=end_ts,
            ):
                # 转换为标准格式
                depth_data = self._normalize_depth(snapshot)
                
                # 检查是否已存在(WebSocket 可能先收到部分数据)
                if depth_data["ts"] in self._cache_by_ts:
                    continue
                
                # 添加到缓存
                self._add_to_cache(depth_data)
                recovered_count += 1
                
                # 触发合并回调
                if self._on_merged_data:
                    self._on_merged_data(depth_data)
            
            self._recovered_until_ts = end_ts
            logger.info(f"补数完成,共恢复 {recovered_count} 条数据")
            
        except Exception as e:
            logger.error(f"补数失败: {e}")
        finally:
            self._is_recovering = False
    
    def ingest_realtime_data(self, data: dict):
        """
        接收实时 WebSocket 数据,与缓存数据合并
        """
        normalized = self._normalize_depth(data)
        ts = normalized["ts"]
        
        # 检查是否与补数数据冲突
        if ts in self._cache_by_ts:
            existing = self._cache_by_ts[ts]
            # 如果是补数数据已存在,使用实时数据(更精确)
            if existing["source"] == "rest":
                self._remove_from_cache(ts)
                self._add_to_cache(normalized)
                logger.debug(f"用实时数据替换补数数据: {ts}")
            # 如果实时数据已存在,忽略
            return
        
        # 直接添加实时数据
        self._add_to_cache(normalized)
        
        # 触发合并回调
        if self._on_merged_data:
            self._on_merged_data(normalized)
    
    def _normalize_depth(self, raw: dict) -> dict:
        """标准化深度数据格式"""
        return {
            "ts": raw.get("ts", raw.get("time", 0)),
            "symbol": raw.get("symbol", self.symbol),
            "asks": raw.get("asks", raw.get("a", [])),
            "bids": raw.get("bids", raw.get("b", [])),
            "source": "ws" if raw.get("source") == "ws" else "rest",
        }
    
    def _add_to_cache(self, data: dict):
        """添加数据到缓存"""
        ts = data["ts"]
        self._cache.append(data)
        self._cache_by_ts[ts] = data
    
    def _remove_from_cache(self, ts: int):
        """从缓存移除数据"""
        if ts in self._cache_by_ts:
            data = self._cache_by_ts.pop(ts)
            try:
                self._cache.remove(data)
            except ValueError:
                pass
    
    def get_data_range(self) -> tuple[Optional[int], Optional[int]]:
        """获取当前缓存的时间范围"""
        if not self._cache:
            return None, None
        return self._cache[0]["ts"], self._cache[-1]["ts"]
    
    def verify_continuity(self) -> dict:
        """验证数据连续性,返回空洞信息"""
        if len(self._cache) < 2:
            return {"continuous": True, "gaps": []}
        
        gaps = []
        sorted_data = sorted(self._cache, key=lambda x: x["ts"])
        
        for i in range(1, len(sorted_data)):
            prev_ts = sorted_data[i - 1]["ts"]
            curr_ts = sorted_data[i]["ts"]
            gap_ms = curr_ts - prev_ts
            
            # 假设正常间隔为 1 秒(1000ms),超过 5 秒视为空洞
            if gap_ms > 5000:
                gaps.append({
                    "start": ts_to_datetime(prev_ts),
                    "end": ts_to_datetime(curr_ts),
                    "gap_ms": gap_ms,
                })
        
        return {
            "continuous": len(gaps) == 0,
            "gaps": gaps,
            "total_records": len(self._cache),
            "time_range": {
                "start": ts_to_datetime(self._cache[0]["ts"]),
                "end": ts_to_datetime(self._cache[-1]["ts"]),
            }
        }

五、完整集成:端到端的修复流程

将上述模块整合为完整的数据流:

def main():
    """
    完整示例:WebSocket + REST 修复的端到端数据流
    """
    import os
    
    # 配置
    API_KEY = os.environ.get("TICKDB_API_KEY")
    SYMBOL = "BTC.USDT"  # 使用数字货币示例(支持 depth 10档)
    
    if not API_KEY:
        raise ValueError("请设置环境变量 TICKDB_API_KEY")
    
    # 初始化组件
    rest_client = TickDBRESTClient(api_key=API_KEY)
    recovery_engine = DataRecoveryEngine(rest_client, SYMBOL)
    
    # 数据缓冲区(实际使用中替换为你的策略逻辑)
    merged_data_buffer = []
    
    def on_merged_data(data):
        """合并后的数据回调"""
        merged_data_buffer.append(data)
        
        # 实时计算买卖压力比
        snapshot = DepthSnapshot(
            ts=data["ts"],
            symbol=data["symbol"],
            asks=data["asks"],
            bids=data["bids"],
            source=data["source"],
        )
        
        pressure = snapshot.bid_pressure
        status = "补数" if data["source"] == "rest" else "实时"
        logger.info(
            f"[{status}] {ts_to_datetime(data['ts'])} | "
            f"压力比: {pressure:.2f} | "
            f"买盘: {len(data['bids'])}档 | "
            f"卖盘: {len(data['asks'])}档"
        )
    
    def on_state_change(ctx):
        """状态变更回调"""
        status_emoji = {
            ConnectionState.CONNECTING: "🔄",
            ConnectionState.CONNECTED: "✅",
            ConnectionState.DISCONNECTED: "❌",
            ConnectionState.RECOVERING: "🔧",
            ConnectionState.RECOVERED: "✨",
            ConnectionState.FAILED: "💥",
        }
        logger.info(f"状态变更: {status_emoji.get(ctx.state, '❓')} {ctx.state.value}")
        
        # 断连时触发修复
        if ctx.state == ConnectionState.DISCONNECTED and ctx.has_gap():
            recovery_engine.handle_disconnect(ctx)
        
        # 修复完成后验证连续性
        if ctx.state == ConnectionState.RECOVERING:
            pass  # 补数中
        
        if ctx.state == ConnectionState.RECOVERED:
            continuity = recovery_engine.verify_continuity()
            if not continuity["continuous"]:
                logger.warning(f"存在数据空洞: {continuity['gaps']}")
            else:
                logger.info(f"数据连续性验证通过: {continuity['total_records']} 条记录")
    
    def on_realtime_data(data):
        """实时数据回调"""
        recovery_engine.ingest_realtime_data(data)
    
    def on_recovery_data(data):
        """补数数据回调"""
        recovery_engine.ingest_realtime_data(data)
    
    # 启动连接
    connector = TickDBWebSocketConnector(
        api_key=API_KEY,
        symbol=SYMBOL,
        on_depth_data=on_realtime_data,
        on_recovery_data=on_recovery_data,
        on_state_change=on_state_change,
    )
    
    recovery_engine.set_merged_callback(on_merged_data)
    
    print(f"启动 TickDB depth 数据流: {SYMBOL}")
    print("按 Ctrl+C 退出")
    
    connector.connect()
    
    try:
        # 保持运行
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("\n停止数据流")
        connector.disconnect()


if __name__ == "__main__":
    main()

运行效果示例

启动 TickDB depth 数据流: BTC.USDT
按 Ctrl+C 退出
INFO:__main__:状态变更: 🔄 connecting
INFO:__main__:WebSocket 连接已建立
INFO:__main__:状态变更: ✅ connected
INFO:__main__:[实时] 2024-03-15 03:17:00.123 | 压力比: 1.24 | 买盘: 10档 | 卖盘: 10档
INFO:__main__:[实时] 2024-03-15 03:17:01.089 | 压力比: 1.31 | 买盘: 10档 | 卖盘: 10档
INFO:__main__:[实时] 2024-03-15 03:17:02.056 | 压力比: 1.28 | 买盘: 10档 | 卖盘: 10档
# --- 模拟断连 ---
INFO:__main__:WebSocket 断开 (code=1006)
INFO:__main__:状态变更: ❌ disconnected
INFO:__main__:开始补数: 2024-03-15 03:17:02.056 → 2024-03-15 03:22:07.445
INFO:__main__:[补数] 2024-03-15 03:17:03.102 | 压力比: 1.19 | 买盘: 10档 | 卖盘: 10档
INFO:__main__:[补数] 2024-03-15 03:17:04.089 | 压力比: 1.35 | 买盘: 10档 | 卖盘: 10档
...
INFO:__main__:[补数] 2024-03-15 03:22:06.334 | 压力比: 2.01 | 买盘: 10档 | 卖盘: 10档
INFO:__main__:补数完成,共恢复 305 条数据
INFO:__main__:[实时] 2024-03-15 03:22:07.445 | 压力比: 1.98 | 买盘: 10档 | 卖盘: 10档
INFO:__main__:[实时] 2024-03-15 03:22:08.412 | 压力比: 1.95 | 买盘: 10档 | 卖盘: 10档
INFO:__main__:状态变更: ✅ connected

六、关键边界情况与处理策略

实际生产环境中,补数方案会遇到各种边界情况。以下是常见问题和处理策略:

6.1 补数窗口过大

如果断连时间超过 1 小时,一次性拉取可能产生大量数据,导致:

  • 内存压力:本地缓存膨胀
  • API 限频:触发 rate limit
  • 数据延迟:补数完成前策略无法获得最新数据

解决方案:分批补数 + 优先级策略

def _run_recovery_batched(self, start_ts: int, end_ts: int, batch_size: int = 10000):
    """分批补数,每次处理 batch_size 条"""
    current_start = start_ts
    
    while current_start < end_ts:
        batch_end = min(current_start + batch_size * 1000, end_ts)  # 估算
        
        for snapshot in self.rest_client.fetch_depth_history(
            symbol=self.symbol,
            start_ts=current_start,
            end_ts=batch_end,
        ):
            # 处理单条数据
            self._process_single(snapshot)
        
        logger.info(f"批次完成: {ts_to_datetime(current_start)} → {ts_to_datetime(batch_end)}")
        current_start = batch_end + 1
        
        # 批次间暂停,让实时数据有机会处理
        time.sleep(0.5)

6.2 REST 与 WebSocket 数据格式不一致

TickDB 的 REST API 和 WebSocket 可能返回不同格式的数据。代码中已做标准化处理,但需要注意:

字段 REST API 字段名 WebSocket 字段名
时间戳 ts / time ts
卖盘 asks / a asks
买盘 bids / b bids

_normalize_depth 方法已处理这种差异。

6.3 补数数据与实时数据的时间戳重叠

理想情况下,断连期间不会有新的实时数据推送。但实际可能遇到:

  • 网络分区导致消息延迟到达
  • 重连时 WebSocket 服务端有缓冲区推送

解决方案:时间戳去重(代码中已实现)+ 数据源优先级

# 如果实时数据的时间戳 < 补数已完成的时间点,说明是延迟到达的旧数据
if ts < self._recovered_until_ts and ts >= self._gap_start_ts:
    # 使用补数数据(更完整),丢弃延迟的实时数据
    logger.debug(f"丢弃延迟实时数据 (ts={ts})")
    return

七、方案对比:为什么不直接用轮询?

有读者可能问:为什么不直接用 REST API 轮询,而要费这么大劲维护 WebSocket 连接?

维度 WebSocket + 补数方案 纯轮询方案
实时性 <100ms 推送 轮询间隔决定(通常 1-5 秒)
数据完整性 断连后可修复 轮询间隔内必然丢失数据
服务器压力 低(单一连接) 高(频繁请求)
实现复杂度 中(需处理断连) 低(简单轮询)
适合场景 微观结构策略(高实时性要求) 低频策略(秒级可接受)

核心结论:对于买卖压力比监控、订单簿变化捕捉等微观结构分析,必须用 WebSocket + 补数方案。纯轮询无法满足毫秒级信号需求。


八、结语:数据连续性是策略置信度的基石

数据连续性不是锦上添花,而是策略置信度的基石。

当你的策略基于买卖压力比做决策时,每一个时间点的数据缺失都可能将一个"空头力量正在积蓄"的渐进信号,扭曲成"空头瞬间崩塌"的突变信号。前者让你从容减仓,后者让你恐慌止损。

WebSocket 的实时性与 REST API 的可追溯性并不矛盾。关键在于:将它们视为同一时间轴的不同视图,而不是两个独立的数据源。

通过本文的修复方案,你的交易系统获得了:

  • 实时数据的低延迟推送(WebSocket)
  • 断连后的无损数据恢复(REST 补数)
  • 时间轴级别的数据连续性保障(合并引擎)

凌晨 3:17 的告警依然会发生,但这一次,你的系统知道如何修复自己。


下一步行动

如果你正在构建微观结构分析系统

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台申请 WebSocket + REST API 访问权限
  3. 设置环境变量 TICKDB_API_KEY,复制本文代码即可运行

如果你需要完整的历史数据做策略回测
TickDB 提供 10 年级别的美股历史 K 线数据,适用于跨周期策略验证。联系 [email protected] 了解机构方案。

如果你习惯用 AI 辅助开发
在 ClawHub 搜索并安装 tickdb-market-data SKILL,可通过自然语言查询 TickDB 数据接口和示例代码。


风险提示:本文不构成任何投资建议。WebSocket 数据流和历史数据补全仅为技术实现讨论,不涉及任何交易策略的推荐。市场有风险,投资需谨慎。