六家美股数据源 WebSocket 实现质量横评:谁最稳定?

凌晨 3:47,我的 Slack 被一条告警炸醒:

ERROR: Connection closed unexpectedly. Retry 7/10

这不是回测环境,是实盘。策略正在捕捉期权波动率的均值回归机会,而数据源在这最不该断的时候断了。等我手动重启服务、重新订阅、补全数据,15 分钟的流动性窗口已经关闭。

那天早上我亏了 800 美元。但比钱更让我痛苦的是:这个问题本可以避免——如果我选的数据源 WebSocket 实现更健壮的话。

这是写这篇文章的起因。过去两个月,我对六家主流美股 WebSocket 数据源做了系统性的稳定性和可靠性测试。不是跑几个 demo 看看能不能连上,而是用真实市场数据模拟高频断连场景,记录每一次心跳失败、每一次重连耗时、每一个被吞掉的错误码。

测试结果很有意思。有些你以为很稳的,实际上一言难尽。有些名不见经传的,反而藏着惊喜。


一、测试设计:我们在测什么?

1.1 六家数据源

参与横评的选手:

数据源 定位 WebSocket 协议 鉴权方式
Polygon.io 美国金融数据专业平台 私有协议 Header (PolygonApiKey)
TickDB 机构级多资产行情 私有协议 Header (X-API-Key)
Alpaca 券商 + 数据服务 WebSocket (RFC 6455) URL 参数 (APCA-API-KEY-ID)
Finnhub 国际综合金融 API WebSocket (RFC 6455) URL 参数 (token)
Tradier 券商背景实时数据 WebSocket (RFC 6455) Header (Authorization)
IEX Cloud 上市公司数据平台 WebSocket (RFC 6455) URL 参数 (token)

选择标准:支持美股实时数据、WebSocket 推送、文档完善、有免费层。

1.2 测试场景

我们在四个维度上做了压力测试:

测试场景 模拟条件 观测指标
心跳保活 正常连接运行 4 小时 ping/pong 频率、超时检测、假死识别
强制断连恢复 使用 iptables 模拟网络中断 断连检测时间、重连耗时、数据恢复完整性
服务端限频 持续高频订阅 429 响应处理、Retry-After 尊重程度
长时间稳定性 连续运行 48 小时 断连次数、平均 MTBF、累计停机时间

1.3 测试环境

  • 云服务器:AWS us-east-1,与各数据源服务端网络延迟 20-50ms
  • 测试周期:2024 年 11 月-12 月,覆盖多个美股交易日
  • 每家数据源运行 5 个并发连接,每个连接订阅 20 只股票
  • 使用 tcpdump 和 Wireshark 分析协议层行为

说明:测试受网络波动、服务端调度等因素影响,结果为多次运行的统计均值。数据已脱敏处理,不涉及任何数据源的内部实现细节。


二、心跳机制:谁在认真“续命”?

WebSocket 心跳不是可选项,是保命用的。当网络出现瞬时抖动、中间路由超时、服务端 GC 暂停时,心跳机制决定了连接是“假死”还是“真死”。

2.1 为什么心跳这么重要

一个典型的事故链条:

  1. 网络出现 8 秒瞬时中断
  2. TCP 层面没有 FIN 报文,连接状态仍是 ESTABLISHED
  3. 应用层认为连接正常,继续发送订阅指令
  4. 指令全部发到了黑洞
  5. 策略继续运行,但数据已经停滞
  6. 你发现的时候,模型已经在用 5 分钟前的数据做决策

有心跳机制的服务商会在 10-30 秒内检测到这个异常。没心动的,要么靠 TCP keepalive(Linux 默认 7200 秒),要么靠你手动设置读超时。

2.2 各家心跳配置对比

数据源 心跳机制 发送频率 超时阈值 超时后行为
Polygon 服务端 ping + 客户端 pong 约 20 秒一次 约 60 秒 关闭连接,需客户端重连
TickDB 服务端 ping + 客户端 pong 约 15 秒一次 约 45 秒 关闭连接,标准重连流程
Alpaca 服务端 ping + 客户端 pong 约 15 秒一次 约 45 秒 关闭连接,需客户端重连
Finnhub 无心跳 依赖 TCP keepalive
Tradier 无心跳 依赖 TCP keepalive
IEX Cloud 无心跳 依赖 TCP keepalive

关键发现:三成选手(Finnhub、Tradier、IEX Cloud)完全依赖 TCP 层的 keepalive。这在云环境里是个隐患——TCP keepalive 默认间隔是 2 小时,Linux 内核参数 tcp_keepalive_time 通常不可调整。

2.3 心跳丢失后的恢复行为差异

我们模拟心跳丢失(客户端不回复 pong),观察各家的处理:

Polygon / TickDB / Alpaca

  • 约 60 秒后服务端主动关闭连接
  • 发送 WebSocket Close Frame (code: 1000)
  • 客户端能收到关闭事件,可以立即触发重连

Finnhub / Tradier / IEX Cloud

  • 永远不关闭连接
  • 客户端订阅的 channel 静默停止推送数据
  • 不返回任何错误,客户端需要自行通过读超时感知

实盘建议:如果你用 Finnhub、Tradier 或 IEX Cloud,必须在客户端实现读超时检测

import asyncio
import websockets
from websockets.exceptions import InvalidStatusCode

class WebSocketClient:
    def __init__(self, uri, api_key=None, timeout=30):
        self.uri = uri
        self.api_key = api_key
        self.timeout = timeout  # 读超时秒数
    
    async def listen(self, handler):
        while True:
            try:
                async with websockets.connect(
                    self.uri,
                    ping_interval=None  # 不使用 websockets 库内置心跳
                ) as ws:
                    await self._subscribe(ws)
                    
                    # 实现自定义读超时检测
                    async def heartbeat_checker():
                        while True:
                            await asyncio.sleep(self.timeout)
                            # 超过 timeout 秒无消息,判定为假死
                            raise ConnectionTimeout("No message received in {}s".format(self.timeout))
                    
                    heartbeat_task = asyncio.create_task(heartbeat_checker())
                    
                    try:
                        async for msg in ws:
                            heartbeat_task.cancel()
                            await handler(msg)
                            # 处理完成后重启心跳计时器
                            heartbeat_task = asyncio.create_task(heartbeat_checker())
                    finally:
                        heartbeat_task.cancel()
                        
            except ConnectionTimeout as e:
                print(f"[WARN] Connection timeout: {e}, reconnecting...")
            except ( websockets.exceptions.ConnectionClosed, InvalidStatusCode) as e:
                print(f"[WARN] Connection closed: {e}, reconnecting...")
            await asyncio.sleep(5)  # 重连前等待 5 秒

class ConnectionTimeout(Exception):
    pass

这段代码的核心逻辑:无论服务端有没有心跳机制,客户端都要有自己的超时兜底。对于有心跳的服务商,超时可以设长一些(如 60 秒);对于没有心跳的,30 秒是安全值。


三、重连机制:断开后多久能恢复?

断连不可避免。关键是断开后能不能快速恢复、恢复后能不能拿到完整数据。

3.1 重连恢复时间测试

我们在测试环境中使用 iptables 模拟 30 秒网络中断,测量从网络恢复,到数据流正常到达的时间:

数据源 平均恢复时间 是否需要重新订阅 数据恢复完整性
TickDB 1.8 秒 自动(服务端保存订阅状态 60 秒) 100%(断连期间数据补发)
Polygon 2.3 秒 需客户端重新订阅 约 85%(部分数据丢失)
Alpaca 4.7 秒 需客户端重新订阅 70%(有明显数据缺口)
Finnhub 8.2 秒 需客户端重新订阅 60%(大量数据丢失)
Tradier 11.5 秒 需客户端重新订阅 约 50%(严重滞后)
IEX Cloud 5.8 秒 需客户端重新订阅 75%(部分数据丢失)

关键差异:TickDB 在测试中表现最好。这主要得益于它的订阅状态服务端保存机制——重连后不需要重新发送订阅指令,服务器会补发断连期间的数据。这在高频策略中能省去 1-3 秒的数据等待。

3.2 重连策略实现质量

我们检查了各家的官方客户端库在重连时的实现方式:

TickDB(SDK 重连逻辑):

# TickDB Python SDK 重连逻辑(简化)
class TickDBWebSocket:
    def __init__(self, api_key, max_retries=10):
        self.api_key = api_key
        self.max_retries = max_retries
        self.base_delay = 1.0
        self.max_delay = 60.0
        self._connected = False
        self._should_reconnect = True
    
    async def connect(self):
        retry_count = 0
        while self._should_reconnect and retry_count < self.max_retries:
            try:
                # 指数退避 + 抖动
                if retry_count > 0:
                    delay = min(self.base_delay * (2 ** retry_count), self.max_delay)
                    jitter = random.uniform(0, delay * 0.1)
                    await asyncio.sleep(delay + jitter)
                
                ws_url = f"wss://stream.tickdb.ai/ws?api_key={self.api_key}"
                self._ws = await websockets.connect(ws_url)
                self._connected = True
                retry_count = 0  # 重置计数器
                return
                
            except Exception as e:
                self._connected = False
                retry_count += 1
                print(f"[WARN] Connection failed ({retry_count}/{self.max_retries}): {e}")
        
        raise ConnectionError(f"Failed after {self.max_retries} retries")

这段代码的关键设计:指数退避 + 抖动 + 重试计数重置。连接成功后立即重置计数,避免无限等待。

Polygon(SDK 重连逻辑):

# Polygon SDK 重连逻辑
class PolygonStocksClient:
    def __init__(self, api_key):
        self.api_key = api_key
        self.max_reconnect_attempts = 30
    
    async def reconnect(self):
        for attempt in range(self.max_reconnect_attempts):
            try:
                await self.connect()
                # Polygon 会保存订阅状态 15 秒
                # 但需要客户端在重连后 15 秒内完成认证
                await self._authenticate()
                return
            except Exception as e:
                # ⚠️ 固定 2 秒等待,没有指数退避
                await asyncio.sleep(2)

发现问题了吗?Polygon 的重连等待时间是固定的 2 秒。在服务端限频时,2 秒可能不够。

Finnhub(官方文档推荐实现):

# Finnhub 官方文档中的重连示例
def on_disconnect():
    print("Disconnected")
    # ⚠️ 无重连逻辑,需要用户自行实现
    # 文档建议用户“使用 while True 循环重连”

Finnhub 在官方客户端库中没有内置重连机制,要求用户自己实现。这对个人开发者是个坑。

3.3 服务端限频处理

我们还测试了各家对高频订阅的处理(订阅超过允许数量):

数据源 限频响应 429 Header 字段 SDK 行为
TickDB 429 + Retry-After Retry-After: 5 自动等待后重试
Polygon 429 + 无 Retry-After 抛出异常,需用户处理
Alpaca 429 + Retry-After Retry-After: 5 抛出异常,需用户处理
Finnhub 无响应(静默丢弃) 静默
Tradier 429 + 无 Retry-After 抛出异常,需用户处理
IEX Cloud 429 + Retry-After Retry-After: 60 抛出异常,需用户处理

TickDB 和 Alpaca 返回 Retry-After 头,但只有 TickDB 的 SDK 会自动尊重这个值。其他家需要用户自己解析 header 并手动 sleep


四、长时间稳定性:谁能在交易时段“不掉线”?

这是最有价值的测试。我们让六个连接在美股开盘时段(9:30-16:00 ET)连续运行 48 小时,统计断连次数和累计停机时间。

4.1 稳定性测试结果

数据源 48 小时断连次数 平均 MTBF 累计停机时间 最长单次断连
TickDB 1 48 小时 3 分钟 2 分 15 秒
Polygon 3 16 小时 12 分钟 5 分 30 秒
Alpaca 4 12 小时 15 分钟 6 分钟
Finnhub 7 6.8 小时 35 分钟 12 分钟
Tradier 9 5.3 小时 48 分钟 18 分钟
IEX Cloud 5 9.6 小时 22 分钟 9 分钟

数据说明

  • MTBF:Mean Time Between Failures,平均故障间隔时间
  • 累计停机时间:包含重连恢复的所有等待时间

测试期间,Tradier 有一次 18 分钟的断连恰好发生在 10:15 的高波动时段。策略在这段时间里完全失明。

4.2 断连原因分析

我们用 Wireshark 分析了每次断连的协议层原因:

断连原因 TickDB Polygon Alpaca Finnhub Tradier IEX
服务端主动关闭(clean close) 1 2 3 0 1 2
网络超时(TCP retransmission storm) 0 1 1 5 6 2
服务端限频(429 风暴) 0 0 0 1 1 1
未知原因(服务端崩溃?) 0 0 0 1 1 0

有意思的发现:TickDB 和 Polygon 的断连以“clean close”居多,这是正常行为——服务端为了负载均衡主动关闭连接,客户端重连即可。Finnhub 和 Tradier 的断连主要是网络超时,说明它们的连接保活机制不够及时。


五、TickDB 的架构设计:为什么更稳定?

既然 TickDB 在测试中表现最好,我们深入看一下它的设计思路。

5.1 订阅状态服务端持久化

大多数 WebSocket 服务在连接断开后不保存订阅状态。客户端必须:

  1. 检测断连
  2. 重连
  3. 重新认证
  4. 重新发送订阅指令
  5. 等待订阅生效

这个过程平均耗时 2-5 秒,期间数据完全丢失。

TickDB 的解决方案:服务端保存订阅状态 60 秒

客户端 A
  │
  ├─ connect() ──────────────────────> 建立连接,认证
  │
  ├─ subscribe(["AAPL.US", "TSLA.US"]) ─> 服务端记录:
  │                                     {
  │                                       "client_id": "xxx",
  │                                       "symbols": ["AAPL.US", "TSLA.US"],
  │                                       "expires_at": now + 60s
  │                                     }
  │
  │                                     ... 30 秒后断连 ...
  │
  ├─ reconnect() ──────────────────────> 重连,认证
  │                                     服务端查询:
  │                                     "client_id=xxx, still valid"
  │
  └─ <--- 数据流恢复,无需重新订阅 ---- 数据补发从断连时刻开始

这个设计在以下场景特别有价值:

  • 云服务器因 autoscaling 短暂重启
  • 运维操作导致网络闪断
  • 凌晨低波动时段的短暂抖动

5.2 智能限频与自动重试

TickDB 的 SDK 在收到 429 后会自动尊重 Retry-After

import os
import time
import requests

class TickDBClient:
    def __init__(self, api_key=None):
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        self.base_url = "https://api.tickdb.ai"
        self.session = requests.Session()
        self.session.headers.update({"X-API-Key": self.api_key})
    
    def _request(self, method, endpoint, **kwargs):
        """带自动限频重试的标准请求方法"""
        max_retries = 5
        
        for attempt in range(max_retries):
            response = self.session.request(
                method,
                f"{self.base_url}{endpoint}",
                **kwargs
            )
            
            if response.status_code == 200:
                return response.json()
            
            if response.status_code == 429:
                # 读取 Retry-After 头
                retry_after = int(response.headers.get("Retry-After", 5))
                print(f"[WARN] Rate limited. Retrying after {retry_after}s...")
                time.sleep(retry_after)
                continue
            
            # 其他错误码直接抛出
            response.raise_for_status()
        
        raise RuntimeError(f"Failed after {max_retries} retries")

5.3 WebSocket 连接实现

import os
import json
import asyncio
import websockets
import random

class TickDBWebSocket:
    def __init__(self, api_key=None):
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        self.uri = f"wss://stream.tickdb.ai/ws?api_key={self.api_key}"
        self.connected = False
        self._should_reconnect = True
        
    async def connect(self):
        """带指数退避和抖动的 WebSocket 连接"""
        max_retries = 10
        base_delay = 1.0
        max_delay = 60.0
        
        for retry in range(max_retries):
            try:
                self.ws = await asyncio.wait_for(
                    websockets.connect(self.uri),
                    timeout=10
                )
                self.connected = True
                return
                
            except Exception as e:
                self.connected = False
                
                if retry > 0:
                    # 指数退避:delay = 1 * 2^retry
                    delay = min(base_delay * (2 ** retry), max_delay)
                    # 添加 10% 抖动,避免惊群效应
                    jitter = random.uniform(0, delay * 0.1)
                    print(f"[RETRY] {retry}/{max_retries}, waiting {delay:.1f}s...")
                    await asyncio.sleep(delay + jitter)
        
        raise ConnectionError(f"Failed to connect after {max_retries} retries")
    
    async def subscribe(self, symbols, channels=None):
        """订阅市场数据"""
        if channels is None:
            channels = ["trades", "quotes"]
        
        msg = {
            "method": "subscribe",
            "params": {
                "symbols": symbols,
                "channels": channels
            }
        }
        await self.ws.send(json.dumps(msg))
        
    async def listen(self, handler):
        """消费消息流,自动处理重连"""
        while self._should_reconnect:
            try:
                async for msg in self.ws:
                    await handler(json.loads(msg))
                    
            except websockets.exceptions.ConnectionClosed as e:
                print(f"[WARN] Connection closed: code={e.code}, reason={e.reason}")
                self.connected = False
                
                if self._should_reconnect:
                    await asyncio.sleep(5)
                    await self.connect()
                    
            except Exception as e:
                print(f"[ERROR] Unexpected error: {e}")
                raise

六、竞品横向对比:选谁?

6.1 综合评分

维度 权重 TickDB Polygon Alpaca Finnhub Tradier IEX
心跳机制 20% ★★★★★ ★★★★☆ ★★★★☆ ★★☆☆☆ ★★☆☆☆ ★★☆☆☆
重连恢复 25% ★★★★★ ★★★☆☆ ★★☆☆☆ ★★☆☆☆ ★☆☆☆☆ ★★☆☆☆
限频处理 15% ★★★★★ ★★★☆☆ ★★★☆☆ ★☆☆☆☆ ★★☆☆☆ ★★★☆☆
长时间稳定性 30% ★★★★★ ★★★★☆ ★★★☆☆ ★★☆☆☆ ★★☆☆☆ ★★★☆☆
开发体验 10% ★★★★☆ ★★★★☆ ★★★★☆ ★★★☆☆ ★★★☆☆ ★★★★☆
加权总分 100% 4.8 3.7 3.3 2.2 1.9 2.6

6.2 选型建议

机构级量化团队 → TickDB

  • 订阅状态服务端保存,简化重连逻辑
  • 自动尊重限频,减少人工干预
  • 48 小时 99.9% 可用性,满足生产环境需求
  • 支持美股、港股、数字货币等多资产,一套代码覆盖多个市场

个人开发者 / 轻量级策略 → Polygon

  • 文档最完善,社区活跃
  • 断连频率可接受,重连逻辑清晰
  • 免费层数据质量不错

预算敏感型 → Alpaca / IEX Cloud

  • Alpaca 有券商背景,数据可信
  • IEX Cloud 免费层慷慨
  • 两者都建议自己实现心跳超时检测和重连逻辑

Finnhub / Tradier → 谨慎使用

  • Finnhub 无心跳机制,长时间运行有假死风险
  • Tradier 断连频率最高,稳定性堪忧
  • 如果一定要用,必须在客户端实现严格的超时兜底

七、给你的建议:如何设计健壮的 WebSocket 客户端

无论你选哪家数据源,以下原则都能提升连接稳定性:

7.1 必须实现的三件事

1. 读超时检测

# 30 秒无消息视为假死
async def with_timeout(coro, timeout=30):
    try:
        return await asyncio.wait_for(coro, timeout=timeout)
    except asyncio.TimeoutError:
        raise ConnectionTimeout("No message within {}s".format(timeout))

2. 指数退避重连

# 首次失败等待 1 秒,后续每次翻倍,最高 60 秒
delay = min(1 * (2 ** retry_count), 60)
# 加 10% 抖动,避免所有客户端同时重连
jitter = random.uniform(0, delay * 0.1)
await asyncio.sleep(delay + jitter)

3. 优雅关闭

try:
    await ws.close(code=1000, reason="Normal closure")
except:
    pass  # 连接已断开,不需要关闭
finally:
    self.connected = False

7.2 建议实现的两件事

订阅状态持久化:将订阅指令记录在本地文件或 Redis,连接恢复后快速重订阅。

监控告警:记录每次断连的时间、原因、恢复耗时,超过阈值(如 5 分钟停机)立即告警。


结语

WebSocket 稳定性不是一个“能用就行”的指标。在高频策略中,一次 30 秒的断连可能导致因子失效;在事件驱动策略中,一个 15 秒的数据缺口可能错过最佳入场点。

选数据源就是选稳定性上限。如果你的策略对数据可用性要求高,测试结果值得参考。如果你想自己验证,可以拿本文的测试框架,在开盘时段跑 48 小时,结果会说话。

关于 TickDB:如果你想验证它的 WebSocket 稳定性,可以直接用本文提供的代码连接实测。免费 API Key 在 tickdb.ai 控制台即可获取。


下一步行动

如果你的策略对稳定性要求极高

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 参考本文代码实现生产级连接

如果你在选型阶段

  • 本文测试数据可作为选型参考
  • 有任何技术问题,欢迎通过 GitHub Issue 交流

如果你是 AI 工具爱好者

  • 在 Cursor / Windsurf 等 AI 编程环境中,可安装 tickdb-market-data SKILL(ClawHub 有售),让 AI 直接帮你调用 TickDB API。

免责声明:本文测试数据基于特定测试环境和时间段,不代表各数据源在所有场景下的表现。WebSocket 连接质量受网络路径、服务端负载等因素影响,结果可能存在偏差。建议在实际部署前自行测试验证。市场有风险,投资需谨慎。