凌晨 3:17,你的交易系统告警了。
这不是普通的告警——过去 5 分钟的订单簿深度数据全部缺失。策略正在等待一个关键的技术形态确认,而它的"眼睛"在这 5 分钟里一直是黑的。
更糟糕的是,这 5 分钟恰好覆盖了某大型科技股盘后交易的关键波动窗口。你的追价策略需要这 5 分钟的数据来计算买卖压力比,而它拿到的是一片空白。
这不是假设。每一个运行实时数据流的工程师,都会在某个凌晨遇到这个场景。网络抖动、AWS 跨区切换、Kubernetes Pod 重启——断连的原因千变万化,但结果都一样:你的数据流出现空洞,而这个空洞恰好可能包含最有价值的信号。
本文给出生产级的解决方案:基于时间戳对齐的增量补数机制,配合 REST API 的离线拉取能力,实现 WebSocket 断连后的无缝数据修复。
一、断连的本质:不是"丢了数据",而是"时间轴上出现了空洞"
讨论补数方案之前,必须先理解问题的本质。
WebSocket 断连不是简单的"数据丢失",而是"时间轴上出现了空洞"。这个区别至关重要——丢失的数据可以通过其他途径重新获取,但空洞意味着你失去了对那段历史的连续感知能力。
对于 TickDB 的订单簿深度数据(depth 频道),这个问题的严重程度取决于你的策略依赖什么:
| 策略类型 | 对 depth 的依赖程度 | 断连 5 分钟的影响 |
|---|---|---|
| 趋势跟踪 | 低 | 轻微,可通过后续数据弥补 |
| 网格交易 | 中 | 中等,可能错过关键挂单密集区 |
| 微观结构择时 | 高 | 严重,买卖压力比的突变是核心信号 |
| 事件驱动 | 极高 | 灾难性,财报发布瞬间的数据不可替代 |
微观结构择时策略对连续性的要求最高。买卖压力比的变化是连续信号,任何一个时间点的缺失都可能导致信号误判。举例来说:
- 正常情况:压力比从 2.1 → 1.8 → 1.3,策略判断空头力量正在积蓄
- 断连 5 分钟:压力比从 2.1 直接跳到 1.3,策略可能误判为"瞬间崩塌"
后者会产生完全不同的策略响应——前者是减仓观望,后者是追空止损。
二、修复架构:三层分离的时间轴重建
补数方案的核心设计思路是三层分离:
- 检测层:感知 WebSocket 断连事件,记录断连时间戳
- 补数层:基于断连时间窗口,从 REST API 拉取缺失数据
- 合并层:将补数数据与后续实时数据合并,重建完整时间轴
┌─────────────────────────────────────────────────────────────────┐
│ 数据消费层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 实时数据 │ ←←←│ 数据合并 │ ←←←│ 补数数据 │ │
│ │ (WebSocket) │ │ 队列 │ │ (REST API) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘
↑
┌─────────────────┐
│ 断连检测器 │
│ 记录 last_ts │
└─────────────────┘
关键设计决策:补数数据和实时数据不混合写入同一缓冲区,而是通过时间戳对齐后合并。原因:
- 补数数据可能来自不同的数据快照(如 TickDB 的 kline 接口返回的是定稿数据)
- 实时数据流是增量推送,补数数据是一次性拉取,合并时需要去重
- 保留原始数据来源标记,便于后续审计和调试
三、状态管理:断连与修复的完整生命周期
在开始代码实现之前,需要定义清晰的状态机来管理连接和修复的生命周期:
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import threading
class ConnectionState(Enum):
"""连接状态枚举"""
CONNECTING = "connecting"
CONNECTED = "connected"
DISCONNECTED = "disconnected"
RECOVERING = "recovering" # 补数中
RECOVERED = "recovered"
FAILED = "failed"
@dataclass
class ConnectionContext:
"""连接上下文:管理断连与修复的完整生命周期"""
# 基础状态
state: ConnectionState = ConnectionState.DISCONNECTED
symbol: str = ""
# 时间戳追踪
last_received_ts: Optional[int] = None # 最后收到数据的时间戳(毫秒)
disconnect_ts: Optional[int] = None # 断连时间戳
reconnect_ts: Optional[int] = None # 重连成功时间戳
# 修复追踪
gap_start_ts: Optional[int] = None # 数据空洞起始时间戳
gap_end_ts: Optional[int] = None # 数据空洞结束时间戳
recovery_attempts: int = 0 # 补数尝试次数
# 元数据
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
# 线程安全
_lock: threading.Lock = field(default_factory=threading.Lock)
def record_disconnect(self):
"""记录断连事件"""
with self._lock:
self.state = ConnectionState.DISCONNECTED
self.disconnect_ts = int(datetime.now().timestamp() * 1000)
# 数据空洞起始 = 最后收到的数据时间戳(如果存在)
self.gap_start_ts = self.last_received_ts
self.updated_at = datetime.now()
def record_reconnect(self):
"""记录重连成功"""
with self._lock:
self.reconnect_ts = int(datetime.now().timestamp() * 1000)
self.state = ConnectionState.RECOVERING
# 数据空洞结束 = 重连时刻(补数将拉取到这个时间点之前的数据)
self.gap_end_ts = self.reconnect_ts
self.updated_at = datetime.now()
def complete_recovery(self):
"""完成修复"""
with self._lock:
self.state = ConnectionState.RECOVERED
self.gap_start_ts = None
self.gap_end_ts = None
self.recovery_attempts = 0
self.updated_at = datetime.now()
def get_gap_info(self) -> tuple[Optional[int], Optional[int]]:
"""获取需要补数的时间窗口"""
with self._lock:
return self.gap_start_ts, self.gap_end_ts
def has_gap(self) -> bool:
"""是否存在未修复的数据空洞"""
with self._lock:
return (
self.state == ConnectionState.RECOVERING
and self.gap_start_ts is not None
and self.gap_end_ts is not None
)
这个状态管理的设计有几个关键点:
- 毫秒级时间戳:所有时间戳使用毫秒精度,确保补数的精确性
- 线程安全:使用锁保护状态修改,支持多线程数据写入
- 状态转换:清晰的
DISCONNECTED → RECOVERING → RECOVERED转换路径 - gap 信息分离:补数完成后清理 gap 信息,但保留断连/重连的元数据用于审计
四、生产级代码:完整的断连修复实现
4.1 WebSocket 连接器(带心跳与指数退避)
import os
import json
import time
import random
import threading
import logging
from datetime import datetime
from typing import Callable, Optional
import websocket # pip install websocket-client
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TickDBWebSocketConnector:
"""
TickDB WebSocket 连接器
包含:心跳保活、指数退避重连、断连检测触发修复
"""
def __init__(
self,
api_key: str,
symbol: str,
on_depth_data: Callable,
on_recovery_data: Callable,
on_state_change: Callable,
):
self.api_key = api_key
self.symbol = symbol
self.on_depth_data = on_depth_data # 实时数据回调
self.on_recovery_data = on_recovery_data # 补数数据回调
self.on_state_change = on_state_change # 状态变更回调
# 连接参数
self.ws_url = f"wss://api.tickdb.ai/ws/depth?api_key={api_key}&symbol={symbol}"
self.ws: Optional[websocket.WebSocketApp] = None
# 重连参数
self.base_reconnect_delay = 1.0
self.max_reconnect_delay = 60.0
self.reconnect_attempt = 0
# 状态
self.ctx = ConnectionContext(symbol=symbol)
self._running = False
self._thread: Optional[threading.Thread] = None
self._last_pong_ts: Optional[float] = None
def connect(self):
"""启动 WebSocket 连接"""
self._running = True
self.ctx.state = ConnectionState.CONNECTING
self.on_state_change(self.ctx)
self._thread = threading.Thread(target=self._run_websocket, daemon=True)
self._thread.start()
def _run_websocket(self):
"""WebSocket 运行循环"""
while self._running:
try:
self.ws = websocket.WebSocketApp(
self.ws_url,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
on_open=self._on_open,
)
# 运行心跳循环(60秒超时)
self.ws.run_forever(ping_interval=30, ping_timeout=10)
except Exception as e:
logger.error(f"WebSocket 异常: {e}")
if self._running:
# 指数退避 + 抖动
delay = min(
self.base_reconnect_delay * (2 ** self.reconnect_attempt),
self.max_reconnect_delay
)
jitter = random.uniform(0, delay * 0.1)
self.reconnect_attempt += 1
logger.info(f"等待 {delay + jitter:.1f} 秒后重连 (尝试 {self.reconnect_attempt})")
time.sleep(delay + jitter)
def _on_open(self, ws):
"""连接建立"""
logger.info(f"[{self.symbol}] WebSocket 连接已建立")
self.reconnect_attempt = 0
# 如果是从断连恢复,记录重连时间戳
if self.ctx.state == ConnectionState.DISCONNECTED:
self.ctx.record_reconnect()
self.on_state_change(self.ctx)
def _on_message(self, ws, message):
"""接收消息"""
try:
data = json.loads(message)
# 处理心跳响应
if data.get("type") == "pong":
self._last_pong_ts = time.time()
return
# 处理数据消息
if "ts" in data:
# 更新最后收到数据的时间戳
self.ctx.last_received_ts = data["ts"]
self.ctx.updated_at = datetime.now()
# 根据状态分发数据
if self.ctx.state == ConnectionState.RECOVERING:
# 补数期间的数据
self.on_recovery_data(data)
else:
# 实时数据
self.on_depth_data(data)
except json.JSONDecodeError:
logger.warning(f"收到非 JSON 消息: {message[:100]}")
def _on_error(self, ws, error):
"""错误处理"""
logger.error(f"[{self.symbol}] WebSocket 错误: {error}")
def _on_close(self, ws, close_status_code, close_msg):
"""连接关闭 - 触发修复流程"""
logger.warning(f"[{self.symbol}] WebSocket 断开 (code={close_status_code})")
if self.ctx.state not in (ConnectionState.RECOVERING, ConnectionState.FAILED):
self.ctx.record_disconnect()
self.on_state_change(self.ctx)
def disconnect(self):
"""主动断开连接"""
self._running = False
if self.ws:
self.ws.close()
self.ctx.state = ConnectionState.DISCONNECTED
4.2 REST 补数模块(时间窗口精确拉取)
import os
import time
import requests
from typing import Optional, Iterator
class TickDBRESTClient:
"""
TickDB REST API 客户端
用于 WebSocket 断连后的增量数据拉取
"""
def __init__(self, api_key: str, timeout: tuple = (3.05, 10)):
self.api_key = api_key
self.timeout = timeout
self.base_url = "https://api.tickdb.ai/v1"
self.headers = {"X-API-Key": api_key}
def _request_with_retry(
self,
method: str,
endpoint: str,
params: Optional[dict] = None,
max_retries: int = 3,
) -> requests.Response:
"""
带重试的请求方法
处理限频 (code=3001) 和服务器错误
"""
for attempt in range(max_retries):
try:
response = requests.request(
method=method,
url=f"{self.base_url}{endpoint}",
headers=self.headers,
params=params,
timeout=self.timeout,
)
# 检查业务错误码
if response.status_code == 200:
data = response.json()
code = data.get("code", 0)
if code == 0:
return response
elif code == 3001:
# 限频:读取 Retry-After 头
retry_after = int(response.headers.get("Retry-After", 5))
logger.warning(f"触发限频,等待 {retry_after} 秒")
time.sleep(retry_after)
continue
else:
raise RuntimeError(f"API 错误 {code}: {data.get('message')}")
# HTTP 错误,重试
response.raise_for_status()
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise
wait = (2 ** attempt) + random.uniform(0, 1)
logger.warning(f"请求失败,{wait:.1f} 秒后重试: {e}")
time.sleep(wait)
raise RuntimeError("达到最大重试次数")
def fetch_depth_history(
self,
symbol: str,
start_ts: int,
end_ts: int,
limit: int = 1000,
) -> Iterator[dict]:
"""
拉取指定时间窗口的深度数据
Args:
symbol: 交易品种,如 "AAPL.US"
start_ts: 起始时间戳(毫秒)
end_ts: 结束时间戳(毫秒)
limit: 每次请求的返回条数上限
Yields:
depth 数据字典
"""
params = {
"symbol": symbol,
"start_ts": start_ts,
"end_ts": end_ts,
"limit": limit,
}
while True:
response = self._request_with_retry(
method="GET",
endpoint="/market/depth/history",
params=params,
)
data = response.json()
items = data.get("data", {}).get("items", [])
if not items:
break
for item in items:
yield item
# 更新游标,获取下一页
last_ts = items[-1].get("ts")
if last_ts and last_ts < end_ts:
params["start_ts"] = last_ts + 1
else:
break
# 分页请求间隔,避免触发限频
time.sleep(0.1)
def fetch_kline_for_recovery(
self,
symbol: str,
start_ts: int,
end_ts: int,
interval: str = "1m",
) -> Iterator[dict]:
"""
拉取 K 线数据用于修复(当 depth 历史不可用时)
注意:K 线是定稿数据,适合回测场景
"""
params = {
"symbol": symbol,
"interval": interval,
"start_ts": start_ts,
"end_ts": end_ts,
"limit": 1000,
}
while True:
response = self._request_with_retry(
method="GET",
endpoint="/market/kline",
params=params,
)
data = response.json()
items = data.get("data", [])
if not items:
break
for item in items:
yield item
# 移动时间窗口
last_time = items[-1].get("time")
if last_time:
params["start_ts"] = last_time + 1
else:
break
time.sleep(0.1)
# 辅助函数:时间戳转可读格式
def ts_to_datetime(ts_ms: int) -> str:
"""毫秒时间戳转可读格式"""
from datetime import datetime, timezone
return datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")
4.3 修复引擎:时间戳对齐与数据合并
from collections import deque
from dataclasses import dataclass, field
@dataclass
class DepthSnapshot:
"""深度快照数据结构"""
ts: int # 时间戳(毫秒)
symbol: str
asks: list # 卖盘 [[price, volume], ...]
bids: list # 买盘 [[price, volume], ...]
source: str = "ws" # 数据来源: ws=实时, rest=补数
@property
def bid_pressure(self) -> float:
"""买盘压力:前5档买盘总量 / 前5档卖盘总量"""
bid_vol = sum(float(b[1]) for b in self.bids[:5])
ask_vol = sum(float(a[1]) for a in self.asks[:5])
return bid_vol / ask_vol if ask_vol > 0 else float('inf')
class DataRecoveryEngine:
"""
数据修复引擎
核心功能:
1. 接收断连事件,触发 REST 补数
2. 对齐时间戳,处理边界情况
3. 合并补数数据与实时数据
"""
def __init__(self, rest_client: TickDBRESTClient, symbol: str):
self.rest_client = rest_client
self.symbol = symbol
# 本地数据缓存(用于去重和合并)
# 使用时间窗口限制内存使用:保留最近 5 分钟的数据
self._cache: deque = deque(maxlen=300) # 假设每秒1条,保留5分钟
self._cache_by_ts: dict = {} # 时间戳 -> 数据 的快速索引
# 合并状态
self._is_recovering = False
self._recovered_until_ts: Optional[int] = None
# 回调
self._on_merged_data: Optional[Callable] = None
def set_merged_callback(self, callback: Callable):
"""设置合并后数据回调"""
self._on_merged_data = callback
def handle_disconnect(self, ctx: ConnectionContext):
"""处理断连事件,触发补数"""
gap_start, gap_end = ctx.get_gap_info()
if gap_start is None or gap_end is None:
logger.warning("断连但无有效时间窗口,跳过补数")
return
logger.info(f"开始补数: {ts_to_datetime(gap_start)} → {ts_to_datetime(gap_end)}")
self._is_recovering = True
# 在独立线程中执行补数,避免阻塞主数据流
thread = threading.Thread(
target=self._run_recovery,
args=(gap_start, gap_end),
daemon=True,
)
thread.start()
def _run_recovery(self, start_ts: int, end_ts: int):
"""执行补数流程"""
try:
recovered_count = 0
for snapshot in self.rest_client.fetch_depth_history(
symbol=self.symbol,
start_ts=start_ts,
end_ts=end_ts,
):
# 转换为标准格式
depth_data = self._normalize_depth(snapshot)
# 检查是否已存在(WebSocket 可能先收到部分数据)
if depth_data["ts"] in self._cache_by_ts:
continue
# 添加到缓存
self._add_to_cache(depth_data)
recovered_count += 1
# 触发合并回调
if self._on_merged_data:
self._on_merged_data(depth_data)
self._recovered_until_ts = end_ts
logger.info(f"补数完成,共恢复 {recovered_count} 条数据")
except Exception as e:
logger.error(f"补数失败: {e}")
finally:
self._is_recovering = False
def ingest_realtime_data(self, data: dict):
"""
接收实时 WebSocket 数据,与缓存数据合并
"""
normalized = self._normalize_depth(data)
ts = normalized["ts"]
# 检查是否与补数数据冲突
if ts in self._cache_by_ts:
existing = self._cache_by_ts[ts]
# 如果是补数数据已存在,使用实时数据(更精确)
if existing["source"] == "rest":
self._remove_from_cache(ts)
self._add_to_cache(normalized)
logger.debug(f"用实时数据替换补数数据: {ts}")
# 如果实时数据已存在,忽略
return
# 直接添加实时数据
self._add_to_cache(normalized)
# 触发合并回调
if self._on_merged_data:
self._on_merged_data(normalized)
def _normalize_depth(self, raw: dict) -> dict:
"""标准化深度数据格式"""
return {
"ts": raw.get("ts", raw.get("time", 0)),
"symbol": raw.get("symbol", self.symbol),
"asks": raw.get("asks", raw.get("a", [])),
"bids": raw.get("bids", raw.get("b", [])),
"source": "ws" if raw.get("source") == "ws" else "rest",
}
def _add_to_cache(self, data: dict):
"""添加数据到缓存"""
ts = data["ts"]
self._cache.append(data)
self._cache_by_ts[ts] = data
def _remove_from_cache(self, ts: int):
"""从缓存移除数据"""
if ts in self._cache_by_ts:
data = self._cache_by_ts.pop(ts)
try:
self._cache.remove(data)
except ValueError:
pass
def get_data_range(self) -> tuple[Optional[int], Optional[int]]:
"""获取当前缓存的时间范围"""
if not self._cache:
return None, None
return self._cache[0]["ts"], self._cache[-1]["ts"]
def verify_continuity(self) -> dict:
"""验证数据连续性,返回空洞信息"""
if len(self._cache) < 2:
return {"continuous": True, "gaps": []}
gaps = []
sorted_data = sorted(self._cache, key=lambda x: x["ts"])
for i in range(1, len(sorted_data)):
prev_ts = sorted_data[i - 1]["ts"]
curr_ts = sorted_data[i]["ts"]
gap_ms = curr_ts - prev_ts
# 假设正常间隔为 1 秒(1000ms),超过 5 秒视为空洞
if gap_ms > 5000:
gaps.append({
"start": ts_to_datetime(prev_ts),
"end": ts_to_datetime(curr_ts),
"gap_ms": gap_ms,
})
return {
"continuous": len(gaps) == 0,
"gaps": gaps,
"total_records": len(self._cache),
"time_range": {
"start": ts_to_datetime(self._cache[0]["ts"]),
"end": ts_to_datetime(self._cache[-1]["ts"]),
}
}
五、完整集成:端到端的修复流程
将上述模块整合为完整的数据流:
def main():
"""
完整示例:WebSocket + REST 修复的端到端数据流
"""
import os
# 配置
API_KEY = os.environ.get("TICKDB_API_KEY")
SYMBOL = "BTC.USDT" # 使用数字货币示例(支持 depth 10档)
if not API_KEY:
raise ValueError("请设置环境变量 TICKDB_API_KEY")
# 初始化组件
rest_client = TickDBRESTClient(api_key=API_KEY)
recovery_engine = DataRecoveryEngine(rest_client, SYMBOL)
# 数据缓冲区(实际使用中替换为你的策略逻辑)
merged_data_buffer = []
def on_merged_data(data):
"""合并后的数据回调"""
merged_data_buffer.append(data)
# 实时计算买卖压力比
snapshot = DepthSnapshot(
ts=data["ts"],
symbol=data["symbol"],
asks=data["asks"],
bids=data["bids"],
source=data["source"],
)
pressure = snapshot.bid_pressure
status = "补数" if data["source"] == "rest" else "实时"
logger.info(
f"[{status}] {ts_to_datetime(data['ts'])} | "
f"压力比: {pressure:.2f} | "
f"买盘: {len(data['bids'])}档 | "
f"卖盘: {len(data['asks'])}档"
)
def on_state_change(ctx):
"""状态变更回调"""
status_emoji = {
ConnectionState.CONNECTING: "🔄",
ConnectionState.CONNECTED: "✅",
ConnectionState.DISCONNECTED: "❌",
ConnectionState.RECOVERING: "🔧",
ConnectionState.RECOVERED: "✨",
ConnectionState.FAILED: "💥",
}
logger.info(f"状态变更: {status_emoji.get(ctx.state, '❓')} {ctx.state.value}")
# 断连时触发修复
if ctx.state == ConnectionState.DISCONNECTED and ctx.has_gap():
recovery_engine.handle_disconnect(ctx)
# 修复完成后验证连续性
if ctx.state == ConnectionState.RECOVERING:
pass # 补数中
if ctx.state == ConnectionState.RECOVERED:
continuity = recovery_engine.verify_continuity()
if not continuity["continuous"]:
logger.warning(f"存在数据空洞: {continuity['gaps']}")
else:
logger.info(f"数据连续性验证通过: {continuity['total_records']} 条记录")
def on_realtime_data(data):
"""实时数据回调"""
recovery_engine.ingest_realtime_data(data)
def on_recovery_data(data):
"""补数数据回调"""
recovery_engine.ingest_realtime_data(data)
# 启动连接
connector = TickDBWebSocketConnector(
api_key=API_KEY,
symbol=SYMBOL,
on_depth_data=on_realtime_data,
on_recovery_data=on_recovery_data,
on_state_change=on_state_change,
)
recovery_engine.set_merged_callback(on_merged_data)
print(f"启动 TickDB depth 数据流: {SYMBOL}")
print("按 Ctrl+C 退出")
connector.connect()
try:
# 保持运行
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n停止数据流")
connector.disconnect()
if __name__ == "__main__":
main()
运行效果示例:
启动 TickDB depth 数据流: BTC.USDT
按 Ctrl+C 退出
INFO:__main__:状态变更: 🔄 connecting
INFO:__main__:WebSocket 连接已建立
INFO:__main__:状态变更: ✅ connected
INFO:__main__:[实时] 2024-03-15 03:17:00.123 | 压力比: 1.24 | 买盘: 10档 | 卖盘: 10档
INFO:__main__:[实时] 2024-03-15 03:17:01.089 | 压力比: 1.31 | 买盘: 10档 | 卖盘: 10档
INFO:__main__:[实时] 2024-03-15 03:17:02.056 | 压力比: 1.28 | 买盘: 10档 | 卖盘: 10档
# --- 模拟断连 ---
INFO:__main__:WebSocket 断开 (code=1006)
INFO:__main__:状态变更: ❌ disconnected
INFO:__main__:开始补数: 2024-03-15 03:17:02.056 → 2024-03-15 03:22:07.445
INFO:__main__:[补数] 2024-03-15 03:17:03.102 | 压力比: 1.19 | 买盘: 10档 | 卖盘: 10档
INFO:__main__:[补数] 2024-03-15 03:17:04.089 | 压力比: 1.35 | 买盘: 10档 | 卖盘: 10档
...
INFO:__main__:[补数] 2024-03-15 03:22:06.334 | 压力比: 2.01 | 买盘: 10档 | 卖盘: 10档
INFO:__main__:补数完成,共恢复 305 条数据
INFO:__main__:[实时] 2024-03-15 03:22:07.445 | 压力比: 1.98 | 买盘: 10档 | 卖盘: 10档
INFO:__main__:[实时] 2024-03-15 03:22:08.412 | 压力比: 1.95 | 买盘: 10档 | 卖盘: 10档
INFO:__main__:状态变更: ✅ connected
六、关键边界情况与处理策略
实际生产环境中,补数方案会遇到各种边界情况。以下是常见问题和处理策略:
6.1 补数窗口过大
如果断连时间超过 1 小时,一次性拉取可能产生大量数据,导致:
- 内存压力:本地缓存膨胀
- API 限频:触发 rate limit
- 数据延迟:补数完成前策略无法获得最新数据
解决方案:分批补数 + 优先级策略
def _run_recovery_batched(self, start_ts: int, end_ts: int, batch_size: int = 10000):
"""分批补数,每次处理 batch_size 条"""
current_start = start_ts
while current_start < end_ts:
batch_end = min(current_start + batch_size * 1000, end_ts) # 估算
for snapshot in self.rest_client.fetch_depth_history(
symbol=self.symbol,
start_ts=current_start,
end_ts=batch_end,
):
# 处理单条数据
self._process_single(snapshot)
logger.info(f"批次完成: {ts_to_datetime(current_start)} → {ts_to_datetime(batch_end)}")
current_start = batch_end + 1
# 批次间暂停,让实时数据有机会处理
time.sleep(0.5)
6.2 REST 与 WebSocket 数据格式不一致
TickDB 的 REST API 和 WebSocket 可能返回不同格式的数据。代码中已做标准化处理,但需要注意:
| 字段 | REST API 字段名 | WebSocket 字段名 |
|---|---|---|
| 时间戳 | ts / time |
ts |
| 卖盘 | asks / a |
asks |
| 买盘 | bids / b |
bids |
_normalize_depth 方法已处理这种差异。
6.3 补数数据与实时数据的时间戳重叠
理想情况下,断连期间不会有新的实时数据推送。但实际可能遇到:
- 网络分区导致消息延迟到达
- 重连时 WebSocket 服务端有缓冲区推送
解决方案:时间戳去重(代码中已实现)+ 数据源优先级
# 如果实时数据的时间戳 < 补数已完成的时间点,说明是延迟到达的旧数据
if ts < self._recovered_until_ts and ts >= self._gap_start_ts:
# 使用补数数据(更完整),丢弃延迟的实时数据
logger.debug(f"丢弃延迟实时数据 (ts={ts})")
return
七、方案对比:为什么不直接用轮询?
有读者可能问:为什么不直接用 REST API 轮询,而要费这么大劲维护 WebSocket 连接?
| 维度 | WebSocket + 补数方案 | 纯轮询方案 |
|---|---|---|
| 实时性 | <100ms 推送 | 轮询间隔决定(通常 1-5 秒) |
| 数据完整性 | 断连后可修复 | 轮询间隔内必然丢失数据 |
| 服务器压力 | 低(单一连接) | 高(频繁请求) |
| 实现复杂度 | 中(需处理断连) | 低(简单轮询) |
| 适合场景 | 微观结构策略(高实时性要求) | 低频策略(秒级可接受) |
核心结论:对于买卖压力比监控、订单簿变化捕捉等微观结构分析,必须用 WebSocket + 补数方案。纯轮询无法满足毫秒级信号需求。
八、结语:数据连续性是策略置信度的基石
数据连续性不是锦上添花,而是策略置信度的基石。
当你的策略基于买卖压力比做决策时,每一个时间点的数据缺失都可能将一个"空头力量正在积蓄"的渐进信号,扭曲成"空头瞬间崩塌"的突变信号。前者让你从容减仓,后者让你恐慌止损。
WebSocket 的实时性与 REST API 的可追溯性并不矛盾。关键在于:将它们视为同一时间轴的不同视图,而不是两个独立的数据源。
通过本文的修复方案,你的交易系统获得了:
- 实时数据的低延迟推送(WebSocket)
- 断连后的无损数据恢复(REST 补数)
- 时间轴级别的数据连续性保障(合并引擎)
凌晨 3:17 的告警依然会发生,但这一次,你的系统知道如何修复自己。
下一步行动
如果你正在构建微观结构分析系统:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台申请 WebSocket + REST API 访问权限
- 设置环境变量
TICKDB_API_KEY,复制本文代码即可运行
如果你需要完整的历史数据做策略回测:
TickDB 提供 10 年级别的美股历史 K 线数据,适用于跨周期策略验证。联系 [email protected] 了解机构方案。
如果你习惯用 AI 辅助开发:
在 ClawHub 搜索并安装 tickdb-market-data SKILL,可通过自然语言查询 TickDB 数据接口和示例代码。
风险提示:本文不构成任何投资建议。WebSocket 数据流和历史数据补全仅为技术实现讨论,不涉及任何交易策略的推荐。市场有风险,投资需谨慎。