凌晨 3:47,你的手机震了一下。

飞书消息:你的策略监控脚本异常退出。错误日志写着 Connection closed: close code 1006, reason 'abnormal closure'

你揉着眼睛爬起来看了眼 AWS CloudWatch:负载正常,内存正常,网络正常。那为什么断的?

答案是:你的 WebSocket 客户端从来没写过心跳。

这不是小概率事件。在我们服务的量化团队中,十个接入实时行情的脚本,有七个在最初版本里没有心跳机制。它们在测试环境跑得好好的,一上生产就频繁断连——因为测试环境流量低,服务器愿意容忍空闲连接;生产环境空闲连接多了,负载均衡或反向代理会在 30-60 秒后主动踢掉你,不发任何通知。

本文拆解生产级 WebSocket 连接管理的四个核心机制:心跳保活、指数退避重连、抖动、有限重试。给出可直接运行的 Python 代码,解释每一步的工程决策理由,并展示 TickDB 的 WebSocket 接口如何处理这些边界条件。


一、为什么 WebSocket 会断连

理解断连原因是写好重连机制的前提。WebSocket 连接终止有两类:

1.1 正常关闭(Close Code 1000-1001)

Close Code 含义 触发场景
1000 正常关闭 客户端主动调用 close(),服务端优雅终止
1001 端点迁移 服务器维护,主动关闭连接并通知客户端重连
1002 协议错误 握手成功后收到格式错误的帧

正常关闭会通过 WebSocket 握手时的 Close 帧交换完成,客户端有时间清理状态、重置重试计数器。这类断连是友好的,你的代码只需要在收到 Close 帧后记录状态、等待一小段时间后重连。

1.2 异常关闭(Close Code 1006-1015)

Close Code 含义 触发场景
1006 异常关闭 最常见的断连原因——没有收到 Close 帧就被断开
1007 消息格式错误 负载数据损坏
1008 策略违规 发送了不符合协议规定的数据
1011 服务器异常 服务端遇到无法处理的内部错误

1006 是最恶心的场景。服务器(或者中间件如 Nginx、AWS ALB、Cloudflare Argo Tunnel)认为连接已经超时,直接切断,不给你任何通知。你收到的 on_close 回调里 code == 1006reason 是空的——什么都没告诉你。

典型的 1006 触发路径

客户端 → 发起 WebSocket 握手 → 握手成功,开始接收数据
        → 客户端空闲 90 秒(Nginx default_read_timeout)
        → Nginx 认为连接已死,直接 RST(发送 TCP RST 包)
        → 客户端收到异常关闭,没有 Close 帧
        → 连接丢失,但客户端以为一切正常

这就是为什么心跳是刚需:它让你的连接看起来不是“空闲”的。定期发送 ping 帧,服务器回应 pong 帧——这条链路只要通着,连接就活着。


二、心跳机制:ping/pong 的工程实现

2.1 协议层面的定义

WebSocket RFC 6455 中定义了控制帧:

  • Ping 帧(opcode 0x9):要求对方回复 Pong
  • Pong 帧(opcode 0xA):对 Ping 的响应
  • Close 帧(opcode 0x8):优雅关闭

Ping/Pong 是应用层的 keep-alive。TCP 的 keep-alive 是另一层(在传输层),由操作系统内核控制,超时通常需要 2 小时。应用层 Ping/Pong 超时完全由你控制。

2.2 TickDB WebSocket 的心跳要求

连接到 wss://ws.tickdb.ai/v1/market 时,需要通过以下方式发送 ping:

import json
import time
import threading

class TickDBWebSocketClient:
    """
    TickDB WebSocket 客户端
    包含心跳保活机制
    """

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.ws = None
        self.connected = False
        self.last_pong_time = None
        self._ping_interval = 25  # TickDB 建议每 25 秒发送一次 ping
        self._pong_timeout = 10   # 10 秒内未收到 pong 则认为连接已死
        self._ping_thread = None
        self._running = False

    def connect(self):
        import websocket

        url = f"wss://ws.tickdb.ai/v1/market?api_key={self.api_key}"
        self.ws = websocket.WebSocketApp(
            url,
            on_message=self._on_message,
            on_error=self._on_error,
            on_close=self._on_close,
            on_ping=self._on_ping,
        )

        self.connected = True
        self.last_pong_time = time.time()
        self._running = True
        self._ping_thread = threading.Thread(target=self._ping_loop, daemon=True)
        self._ping_thread.start()

        self.ws.run_forever()

    def _ping_loop(self):
        """
        心跳循环:每 _ping_interval 秒发送一次 ping
        若超过 _pong_timeout 未收到 pong,断开重连
        """
        while self._running:
            time.sleep(self._ping_interval)

            if not self.connected:
                break

            try:
                self.ws.send(json.dumps({"cmd": "ping"}))
                time.sleep(self._pong_timeout)

                # 检查自发送 ping 后是否收到过 pong
                if self.last_pong_time is None:
                    print(f"[{time.strftime('%H:%M:%S')}] Pong 超时,触发重连")
                    self._trigger_reconnect()
                    break

            except Exception as e:
                print(f"[{time.strftime('%H:%M:%S')}] Ping 发送失败: {e}")
                self._trigger_reconnect()
                break

    def _on_ping(self, ws, message):
        """
        收到服务器 ping,自动回复 pong
        注意:大多数 WebSocket 库会默认自动回复,
        这里显式处理以确保行为可控
        """
        ws.send(message, opcode=0xA)

    def _trigger_reconnect(self):
        """触发重连流程"""
        self.connected = False
        self._running = False
        # 触发重连(由外部重连管理器调用)

工程决策说明

  • 为什么 ping 间隔是 25 秒? TickDB 服务器端的空闲超时通常在 30-60 秒之间。25 秒的间隔留有安全余量,同时不会产生过多无用流量。设置过短(如 5 秒)会浪费带宽;设置过长(如 60 秒)可能在服务器超时之前还没完成一次 ping-pong 往返。
  • _pong_timeout = 10 的意义:发送 ping 后等待 10 秒。10 秒足够网络往返两次(正常 RTT 通常 < 1 秒),如果 10 秒内没有收到 pong,说明网络或服务器出了问题。
  • on_ping 回调的处理:部分 WebSocket 库(如 websocket-client)会自动回复 pong,但如果依赖这个行为,你的代码行为就不够明确。显式处理 on_ping 回调可以确保在任何环境下都正确工作。

三、指数退避重连:为什么不能立刻重试

3.1 立即重试的问题

断连后立刻重连听起来很合理,但存在两个致命问题:

问题一:惊群效应(Thundering Herd)

假设 TickDB 服务器在 10:00:00 进行维护,1000 个客户端同时断连。如果所有客户端都在 10:00:00.5 秒内重连,服务器将在那一秒内收到 1000 个新连接请求,直接被打爆。服务器为了自保,可能进一步延长不可用时间。

问题二:无限重试

没有上限的重试循环在持续性故障(如配置错误、网络不可达)下会变成死循环——你的进程永远不会退出,但也不会做任何有用的事情。它会持续消耗 CPU 和网络资源,最终被云服务商限流或封禁。

3.2 指数退避算法

指数退避(Exponential Backoff)的核心思想:每次重试失败后,等待时间按指数增长

标准公式:

wait_time = min(base * (2 ** retry_count) + random_jitter, max_wait)
重试次数 基础等待(base=1s) 实际等待(含抖动)
1 2 秒 2.0 ~ 2.3 秒
2 4 秒 3.8 ~ 4.5 秒
3 8 秒 7.5 ~ 9.2 秒
4 16 秒 15.1 ~ 18.4 秒
5 32 秒 30.5 ~ 36.8 秒
... ... ...
N min(2^N, max_wait) 在上限附近波动

为什么是 2 的幂次方? 指数增长可以在前几次快速恢复(断连后 2 秒、4 秒内重连),同时在持续故障时自动放缓重试频率(32 秒、64 秒后重连),给服务器和网络的恢复留出时间。

3.3 完整的重连管理器

import random
import time
import threading
from dataclasses import dataclass, field
from typing import Optional, Callable

@dataclass
class ReconnectConfig:
    """重连配置参数"""
    base_delay: float = 1.0          # 基础等待时间(秒)
    max_delay: float = 60.0          # 最大等待时间(秒)
    max_retries: int = 10            # 最大重试次数,None 表示无限重试
    jitter_factor: float = 0.1       # 抖动系数:等待时间的 ±10%
    jitter_type: str = "full"        # "full" = [0, factor*wait], "decorrelated" = 自适应

@dataclass
class ReconnectState:
    """重连状态追踪"""
    retry_count: int = 0
    last_attempt_time: float = field(default_factory=time.time)
    consecutive_failures: int = 0

class WebSocketReconnectManager:
    """
    WebSocket 重连管理器
    实现指数退避 + 抖动 + 有限重试
    """

    def __init__(self, config: Optional[ReconnectConfig] = None):
        self.config = config or ReconnectConfig()
        self.state = ReconnectState()
        self._lock = threading.Lock()
        self._should_reconnect = True

    def calculate_delay(self) -> float:
        """
        计算当前重试次数对应的等待时间
        公式:wait = min(base * (2 ** retry) + jitter, max_delay)
        """
        with self._lock:
            retry = self.state.retry_count

        # 指数退避基础值
        delay = self.config.base_delay * (2 ** retry)

        # 抖动(Jitter):在等待时间中加入随机偏移
        jitter = random.uniform(0, delay * self.config.jitter_factor)
        delay_with_jitter = delay + jitter

        # 限制最大等待时间
        capped_delay = min(delay_with_jitter, self.config.max_delay)

        return capped_delay

    def record_failure(self):
        """
        记录一次重连失败
        递增重试计数器,检查是否超过最大重试次数
        """
        with self._lock:
            self.state.retry_count += 1
            self.state.consecutive_failures += 1
            self.state.last_attempt_time = time.time()

            if (self.config.max_retries is not None and
                self.state.retry_count > self.config.max_retries):
                self._should_reconnect = False
                print(f"[{time.strftime('%H:%M:%S')}] 已达最大重试次数 "
                      f"({self.config.max_retries}),停止重连")

    def record_success(self):
        """
        记录重连成功,重置状态
        """
        with self._lock:
            self.state.retry_count = 0
            self.state.consecutive_failures = 0
            self._should_reconnect = True

            print(f"[{time.strftime('%H:%M:%S')}] 重连成功,计数器已重置")

    def should_continue(self) -> bool:
        """判断是否应该继续重连"""
        return self._should_reconnect

    def get_next_delay(self) -> Optional[float]:
        """
        获取下一次重连前需要等待的时间
        若返回 None,表示不再重试
        """
        if not self.should_continue():
            return None
        return self.calculate_delay()

    def execute_with_reconnect(self, connect_func: Callable):
        """
        执行重连循环的核心逻辑

        Args:
            connect_func: 实际的连接函数(会被反复调用)
        """
        while self.should_continue():
            delay = self.get_next_delay()

            if delay is None:
                print(f"[{time.strftime('%H:%M:%S')}] 达到重试上限,退出")
                break

            print(f"[{time.strftime('%H:%M:%S')}] "
                  f"第 {self.state.retry_count + 1} 次重连尝试,"
                  f"等待 {delay:.2f} 秒...")

            time.sleep(delay)

            try:
                connect_func()
                self.record_success()
                return  # 连接成功,退出重连循环

            except Exception as e:
                print(f"[{time.strftime('%H:%M:%S')}] "
                      f"第 {self.state.retry_count + 1} 次重连失败: {e}")
                self.record_failure()
                continue

        raise RuntimeError(
            f"无法连接到服务器,已重试 {self.state.retry_count} 次"
        )

四、抖动的艺术:为什么需要随机偏移

4.1 纯指数退避的问题

假设 base=1,1000 个客户端断连后都按以下策略重连:

客户端1: 2秒后重试
客户端2: 2秒后重试
...
客户端1000: 2秒后重试

所有客户端在断连后的第 2 秒同时发起重连请求——这就是惊群效应。纯指数退避只控制了重试频率,但没有分散重试的时间点。

4.2 三种抖动策略

import random
import math

def jitter_full(delay: float, factor: float = 0.1) -> float:
    """
    完整抖动(Full Jitter)
    wait = random(0, base * 2^retry * jitter_factor)

    优点:随机性最大,惊群效应最低
    缺点:下次等待时间完全不可预测
    """
    return random.uniform(0, delay * factor)

def jitter_equal(delay: float, factor: float = 0.1) -> float:
    """
    均衡抖动(Equal Jitter)
    wait = base * 2^retry + random(0, base * 2^retry * jitter_factor)

    优点:保持基础延迟的确定性,同时有一定随机性
    缺点:重试时间仍然会随重试次数指数增长
    """
    jitter_range = delay * factor
    return delay + random.uniform(-jitter_range, jitter_range)

def jitter_decorrelated(delay: float, last_delay: float) -> float:
    """
    去相关抖动(Decorrelated Jitter)
    wait = random(base, last_delay * 3)

    优点:每次等待时间与上次有关联,但不完全线性
    常用于 TCP BBR 拥塞控制
    """
    return random.uniform(1, last_delay * 3)

哪种最好?

AWS 在其博客中推荐完整抖动(Full Jitter)

"In our tests, complete jitter always outperformed equal jitter, and typically outperformed not having jitter."
— AWS Architecture Blog, Exponential Backoff And Jitter

完整抖动的核心优势在于:它将重试时间完全随机化,使得 1000 个客户端的重试时间在 [0, 延迟上限] 的整个区间内均匀分布,而不是集中在某个确定的时间点。

4.3 抖动的实际效果对比

假设 base=1,max_delay=60,10 个客户端各自独立断连:

策略 第一次重试时间分布 惊群效应
无抖动 所有客户端在 2.0 秒时重试 严重
均衡抖动 所有客户端在 1.8~2.2 秒重试 中等
完整抖动 客户端分散在 0~2.3 秒之间 最小

实际工程中,建议使用完整抖动,jitter_factor 设置为 0.10.3(即等待时间的 10%30% 作为随机范围)。


五、最大重试次数:什么时候该放弃

5.1 有限重试的必要性

无限重试在两种场景下是灾难性的:

场景一:配置错误

# 如果 API Key 写错了,立即重连会得到 1001 错误码
# 无限重试 → 每 2/4/8/16/32 秒尝试一次 → 永不停歇
# 结果:被服务器限流,返回 code 3001(请求频率超限)

场景二:网络不可达

服务器 IP 地址无法路由(如 Docker 网络配置错误)。每次重试都会产生 TCP SYN 超时(Linux 默认约 63 秒),无限重试意味着你的进程会在 63 秒 × 无数次 = 无限等待。

5.2 分级重试策略

from enum import Enum

class RetryStrategy(Enum):
    """基于错误类型的分级重试策略"""
    TRANSIENT_NETWORK_ERROR = "transient"  # 网络瞬时抖动:短等待,高重试
    RATE_LIMIT = "rate_limit"              # 限频错误:读取 Retry-After,长等待
    AUTH_ERROR = "auth"                    # 认证错误:不重试,立即报错
    SERVER_ERROR = "server"                # 5xx 服务器错误:指数退避,中等重试
    PERMANENT_FAILURE = "permanent"        # 永久性错误:停止重试

def classify_error(code: int, message: str = "") -> RetryStrategy:
    """
    根据错误码分类重试策略

    TickDB 错误码参考:
    - 1001/1002: API Key 无效或缺失 → 不重试
    - 2002: 交易品种不存在 → 不重试(配置错误)
    - 3001: 请求频率超限 → 按 Retry-After 重试
    - 5000+: 服务器内部错误 → 指数退避重试
    """
    if code in (1001, 1002):
        return RetryStrategy.AUTH_ERROR
    if code == 3001:
        return RetryStrategy.RATE_LIMIT
    if code == 2002:
        return RetryStrategy.PERMANENT_FAILURE
    if code >= 5000:
        return RetryStrategy.SERVER_ERROR
    return RetryStrategy.TRANSIENT_NETWORK_ERROR

def handle_error_with_strategy(code: int, message: str, retry_manager: WebSocketReconnectManager) -> bool:
    """
    根据错误类型决定是否重试

    Returns:
        True: 可以继续重试
        False: 应该停止重试
    """
    strategy = classify_error(code, message)

    if strategy == RetryStrategy.AUTH_ERROR:
        print(f"[{time.strftime('%H:%M:%S')}] 认证失败: {message}")
        print("请检查 API Key 是否正确配置在环境变量 TICKDB_API_KEY 中")
        return False

    if strategy == RetryStrategy.PERMANENT_FAILURE:
        print(f"[{time.strftime('%H:%M:%S')}] 配置错误: {message}")
        print("请检查交易品种代码是否正确")
        return False

    if strategy == RetryStrategy.RATE_LIMIT:
        # 限频错误:从响应头读取 Retry-After
        # ⚠️ 这里需要传入 response.headers,实际使用时请替换
        retry_after = 5  # 默认 5 秒
        print(f"[{time.strftime('%H:%M:%S')}] 触发限频,等待 {retry_after} 秒后重试")
        time.sleep(retry_after)
        return True

    if strategy in (RetryStrategy.SERVER_ERROR, RetryStrategy.TRANSIENT_NETWORK_ERROR):
        retry_manager.record_failure()
        return retry_manager.should_continue()

    return True

六、整合示例:TickDB 生产级连接管理完整实现

将以上四个机制整合为一个完整的生产级客户端:

import os
import json
import time
import random
import threading
from dataclasses import dataclass
from typing import Optional, List, Dict, Any

import websocket

@dataclass
class TickDBConnectionConfig:
    """TickDB WebSocket 连接配置"""
    api_key: str = os.environ.get("TICKDB_API_KEY", "")
    ping_interval: int = 25          # 心跳间隔(秒)
    pong_timeout: int = 10           # pong 超时(秒)
    base_delay: float = 1.0          # 重连基础延迟
    max_delay: float = 60.0          # 重连最大延迟
    max_retries: int = 10            # 最大重试次数
    jitter_factor: float = 0.2       # 抖动系数

class TickDBMarketDataClient:
    """
    TickDB 行情 WebSocket 客户端
    生产级实现:心跳 + 指数退避重连 + 抖动 + 有限重试 + 限频处理

    ⚠️ 高频场景建议使用 aiohttp/asyncio 架构以获得更好的并发性能
    """

    def __init__(self, config: Optional[TickDBConnectionConfig] = None):
        self.config = config or TickDBConnectionConfig()
        self.ws: Optional[websocket.WebSocketApp] = None
        self.connected = False
        self.last_pong_time: Optional[float] = None
        self._running = False
        self._ping_thread: Optional[threading.Thread] = None

        # 重连状态
        self._retry_count = 0
        self._should_reconnect = True

        # 回调
        self._on_depth: Optional[callable] = None
        self._on_trade: Optional[callable] = None
        self._on_error: Optional[callable] = None

    # ─── 公共 API ────────────────────────────────────────────────

    def connect(self, symbols: List[str]):
        """建立连接并订阅品种"""
        self._validate_config()
        self._connect_with_retry(symbols)

    def on_depth(self, callback):
        """注册订单簿更新回调"""
        self._on_depth = callback

    def on_trade(self, callback):
        """注册成交更新回调"""
        self._on_trade = callback

    def on_error(self, callback):
        """注册错误回调"""
        self._on_error = callback

    def close(self):
        """主动关闭连接"""
        self._running = False
        self.connected = False
        self._should_reconnect = False
        if self.ws:
            self.ws.close()
        print(f"[{time.strftime('%H:%M:%S')}] 连接已主动关闭")

    # ─── 连接与重连 ──────────────────────────────────────────────

    def _validate_config(self):
        """配置校验"""
        if not self.config.api_key:
            raise ValueError(
                "API Key 未配置。请设置环境变量:"
                "export TICKDB_API_KEY='your_key_here'"
            )

    def _connect_with_retry(self, symbols: List[str]):
        """带指数退避重连的连接"""
        self._should_reconnect = True
        self._retry_count = 0

        while self._should_reconnect:
            delay = self._calculate_delay()

            if self._retry_count > 0:
                print(f"[{time.strftime('%H:%M:%S')}] "
                      f"第 {self._retry_count} 次重连尝试,"
                      f"等待 {delay:.2f} 秒...")
                time.sleep(delay)

            try:
                self._do_connect(symbols)
                self._retry_count = 0
                return  # 连接成功

            except Exception as e:
                self._retry_count += 1
                error_handled = self._handle_error(e)

                if not error_handled:
                    print(f"[{time.strftime('%H:%M:%S')}] "
                          f"不可恢复错误,停止重连: {e}")
                    raise

                if not self._should_reconnect:
                    print(f"[{time.strftime('%H:%M:%S')}] "
                          f"达到重试上限,停止重连")
                    raise RuntimeError(
                        f"连接失败,已重试 {self.config.max_retries} 次"
                    )

    def _do_connect(self, symbols: List[str]):
        """执行单次连接"""
        url = f"wss://ws.tickdb.ai/v1/market?api_key={self.config.api_key}"

        self.ws = websocket.WebSocketApp(
            url,
            on_message=self._on_message,
            on_error=self._on_error_wrapper,
            on_close=self._on_close_wrapper,
        )

        self._running = True
        self.last_pong_time = time.time()

        # 启动心跳线程
        self._ping_thread = threading.Thread(
            target=self._ping_loop, daemon=True
        )
        self._ping_thread.start()

        # 订阅品种
        for symbol in symbols:
            subscribe_msg = {
                "cmd": "subscribe",
                "args": {
                    "channel": "depth",
                    "symbol": symbol,
                    "depth": 10  # 港股/数字货币支持 10 档,美股 1 档
                }
            }
            self.ws.send(json.dumps(subscribe_msg))

        self.ws.run_forever(
            ping_interval=self.config.ping_interval,
            ping_timeout=self.config.pong_timeout
        )

    def _calculate_delay(self) -> float:
        """计算退避延迟(含抖动)"""
        base = self.config.base_delay * (2 ** self._retry_count)
        jitter = random.uniform(0, base * self.config.jitter_factor)
        return min(base + jitter, self.config.max_delay)

    def _handle_error(self, error: Exception) -> bool:
        """处理错误并决定是否继续重连"""
        error_msg = str(error)

        # 认证错误:不重试
        if "1001" in error_msg or "1002" in error_msg:
            print(f"[{time.strftime('%H:%M:%S')}] API Key 无效")
            return False

        # 限频错误:读取 Retry-After
        if "3001" in error_msg:
            print(f"[{time.strftime('%H:%M:%S')}] 请求频率超限,等待后重试")
            time.sleep(5)
            return True

        # 超过最大重试次数
        if self._retry_count >= self.config.max_retries:
            return False

        return True

    # ─── 心跳与消息处理 ─────────────────────────────────────────

    def _ping_loop(self):
        """心跳循环:定期发送 ping,检测 pong 超时"""
        while self._running:
            time.sleep(self.config.ping_interval)

            if not self.connected or not self._running:
                break

            try:
                self.ws.send(json.dumps({"cmd": "ping"}))
                time.sleep(self.config.pong_timeout)

                # 检查是否收到过 pong
                time_since_pong = time.time() - self.last_pong_time
                if time_since_pong > self.config.pong_timeout * 2:
                    print(f"[{time.strftime('%H:%M:%S')}] "
                          f"Pong 超时(已等待 {time_since_pong:.1f} 秒),"
                          f"触发重连")
                    self._running = False
                    self.connected = False
                    self.ws.close()
                    self._should_reconnect = True
                    break

            except Exception as e:
                print(f"[{time.strftime('%H:%M:%S')}] "
                      f"心跳异常: {e},触发重连")
                self._running = False
                self.connected = False
                self._should_reconnect = True
                break

    def _on_message(self, ws, message):
        """处理消息"""
        try:
            data = json.loads(message)

            # 处理 pong 响应
            if data.get("type") == "pong":
                self.last_pong_time = time.time()
                return

            # 处理深度数据
            if data.get("channel") == "depth" and self._on_depth:
                self._on_depth(data["data"])

            # 处理成交数据
            if data.get("channel") == "trade" and self._on_trade:
                self._on_trade(data["data"])

        except json.JSONDecodeError:
            print(f"[{time.strftime('%H:%M:%S')}] 消息解析失败: {message[:100]}")

    def _on_close_wrapper(self, ws, close_code, close_reason):
        """Close 事件处理"""
        self.connected = False
        status = "正常关闭" if close_code == 1000 else f"异常关闭({close_code})"
        print(f"[{time.strftime('%H:%M:%S')}] 连接关闭: {status} - {close_reason}")

    def _on_error_wrapper(self, ws, error):
        """Error 事件处理"""
        if self._on_error:
            self._on_error(error)
        print(f"[{time.strftime('%H:%M:%S')}] WebSocket 错误: {error}")


# ─── 使用示例 ───────────────────────────────────────────────────

if __name__ == "__main__":
    # 设置 API Key(建议通过环境变量设置)
    os.environ.setdefault("TICKDB_API_KEY", "your_api_key_here")

    client = TickDBMarketDataClient()

    def handle_depth(depth_data: Dict[str, Any]):
        """处理订单簿深度数据"""
        symbol = depth_data.get("symbol", "UNKNOWN")
        bids = depth_data.get("bids", [])
        asks = depth_data.get("asks", [])
        if bids and asks:
            best_bid = float(bids[0][0])
            best_ask = float(asks[0][0])
            spread = best_ask - best_bid
            print(f"[{time.strftime('%H:%M:%S')}] {symbol}: "
                  f"BID {best_bid} / ASK {best_ask} | 价差 {spread:.4f}")

    client.on_depth(handle_depth)

    try:
        # 订阅港股腾讯和数字货币 BTC
        client.connect(["0700.HK", "BTC.USDT"])
        print("连接已建立,按 Ctrl+C 退出")
        import time
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("\n正在关闭连接...")
        client.close()

七、架构决策总结

以上四个机制组合在一起,构成了生产级 WebSocket 连接管理的完整闭环:

机制 解决的问题 关键参数
心跳(Ping/Pong) 中间件超时导致的无通知断连 ping_interval=25s, pong_timeout=10s
指数退避 惊群效应 + 无限重试 base=1s, max_delay=60s
抖动(Jitter) 多个客户端在同一时间点集中重连 jitter_factor=0.1~0.3,推荐完整抖动
有限重试 配置错误导致的永久重试循环 max_retries=10
错误分类重试 不同错误类型需要不同的重试策略 按错误码分流
┌─────────────────────────────────────────────────┐
│                 连接状态机                        │
│                                                  │
│  [CONNECTED] ──(心跳超时/断开)──→ [RECONNECTING] │
│       ↑                                   │     │
│       │                              指数退避    │
│       │                           + 抖动 + 分级   │
│       │                                   ↓     │
│       └──(连接成功)────────────── [RECONNECTING] │
│                                          │      │
│                                    重试次数     │
│                                    < 上限?      │
│                                       ↓         │
│                               [FAILED/ABORT]    │
└─────────────────────────────────────────────────┘

一个反直觉的事实:重连机制写得越好,重连的次数应该越少。当你的心跳正常工作且网络稳定时,连接几乎不会断连。重连逻辑的价值在于处理那些低频但破坏性极强的边缘情况——凌晨的服务器维护、AWS 可用区的短暂故障、云服务商的内网抖动。这些事件在月度维度上可能只发生几次,但每一次都可能让你的策略监控脚本彻底失效。


八、下一步行动

如果你在开发实时行情监控脚本

  1. 检查你的 WebSocket 客户端是否实现了心跳机制
  2. 参考本文的重连管理器替换现有代码
  3. 在测试环境中模拟断连(拔网线、禁用防火墙规则),验证重连行为

如果你希望直接使用已实现完整连接管理的行情 API

  • 访问 tickdb.ai 注册(免费,无需信用卡)
  • 在控制台生成 API Key,设置环境变量 TICKDB_API_KEY
  • 使用上方示例代码,复制即可运行

如果你习惯用 AI 辅助开发

在 AI 助手中搜索安装 tickdb-market-data SKILL,可直接用自然语言查询 TickDB 的可用品种、接口文档和代码示例。


风险提示:本文代码示例旨在说明 WebSocket 连接管理的工程原理,实际使用时请根据你的网络环境、服务器配置和业务需求调整参数。过度激进的重连策略可能导致服务器限流(code 3001),建议在生产环境中加入熔断机制。