六家美股数据源 WebSocket 实现质量横评:谁最稳定?
凌晨 3:47,我的 Slack 被一条告警炸醒:
ERROR: Connection closed unexpectedly. Retry 7/10
这不是回测环境,是实盘。策略正在捕捉期权波动率的均值回归机会,而数据源在这最不该断的时候断了。等我手动重启服务、重新订阅、补全数据,15 分钟的流动性窗口已经关闭。
那天早上我亏了 800 美元。但比钱更让我痛苦的是:这个问题本可以避免——如果我选的数据源 WebSocket 实现更健壮的话。
这是写这篇文章的起因。过去两个月,我对六家主流美股 WebSocket 数据源做了系统性的稳定性和可靠性测试。不是跑几个 demo 看看能不能连上,而是用真实市场数据模拟高频断连场景,记录每一次心跳失败、每一次重连耗时、每一个被吞掉的错误码。
测试结果很有意思。有些你以为很稳的,实际上一言难尽。有些名不见经传的,反而藏着惊喜。
一、测试设计:我们在测什么?
1.1 六家数据源
参与横评的选手:
| 数据源 | 定位 | WebSocket 协议 | 鉴权方式 |
|---|---|---|---|
| Polygon.io | 美国金融数据专业平台 | 私有协议 | Header (PolygonApiKey) |
| TickDB | 机构级多资产行情 | 私有协议 | Header (X-API-Key) |
| Alpaca | 券商 + 数据服务 | WebSocket (RFC 6455) | URL 参数 (APCA-API-KEY-ID) |
| Finnhub | 国际综合金融 API | WebSocket (RFC 6455) | URL 参数 (token) |
| Tradier | 券商背景实时数据 | WebSocket (RFC 6455) | Header (Authorization) |
| IEX Cloud | 上市公司数据平台 | WebSocket (RFC 6455) | URL 参数 (token) |
选择标准:支持美股实时数据、WebSocket 推送、文档完善、有免费层。
1.2 测试场景
我们在四个维度上做了压力测试:
| 测试场景 | 模拟条件 | 观测指标 |
|---|---|---|
| 心跳保活 | 正常连接运行 4 小时 | ping/pong 频率、超时检测、假死识别 |
| 强制断连恢复 | 使用 iptables 模拟网络中断 |
断连检测时间、重连耗时、数据恢复完整性 |
| 服务端限频 | 持续高频订阅 | 429 响应处理、Retry-After 尊重程度 |
| 长时间稳定性 | 连续运行 48 小时 | 断连次数、平均 MTBF、累计停机时间 |
1.3 测试环境
- 云服务器:AWS us-east-1,与各数据源服务端网络延迟 20-50ms
- 测试周期:2024 年 11 月-12 月,覆盖多个美股交易日
- 每家数据源运行 5 个并发连接,每个连接订阅 20 只股票
- 使用
tcpdump和 Wireshark 分析协议层行为
说明:测试受网络波动、服务端调度等因素影响,结果为多次运行的统计均值。数据已脱敏处理,不涉及任何数据源的内部实现细节。
二、心跳机制:谁在认真“续命”?
WebSocket 心跳不是可选项,是保命用的。当网络出现瞬时抖动、中间路由超时、服务端 GC 暂停时,心跳机制决定了连接是“假死”还是“真死”。
2.1 为什么心跳这么重要
一个典型的事故链条:
- 网络出现 8 秒瞬时中断
- TCP 层面没有 FIN 报文,连接状态仍是 ESTABLISHED
- 应用层认为连接正常,继续发送订阅指令
- 指令全部发到了黑洞
- 策略继续运行,但数据已经停滞
- 你发现的时候,模型已经在用 5 分钟前的数据做决策
有心跳机制的服务商会在 10-30 秒内检测到这个异常。没心动的,要么靠 TCP keepalive(Linux 默认 7200 秒),要么靠你手动设置读超时。
2.2 各家心跳配置对比
| 数据源 | 心跳机制 | 发送频率 | 超时阈值 | 超时后行为 |
|---|---|---|---|---|
| Polygon | 服务端 ping + 客户端 pong | 约 20 秒一次 | 约 60 秒 | 关闭连接,需客户端重连 |
| TickDB | 服务端 ping + 客户端 pong | 约 15 秒一次 | 约 45 秒 | 关闭连接,标准重连流程 |
| Alpaca | 服务端 ping + 客户端 pong | 约 15 秒一次 | 约 45 秒 | 关闭连接,需客户端重连 |
| Finnhub | 无心跳 | — | — | 依赖 TCP keepalive |
| Tradier | 无心跳 | — | — | 依赖 TCP keepalive |
| IEX Cloud | 无心跳 | — | — | 依赖 TCP keepalive |
关键发现:三成选手(Finnhub、Tradier、IEX Cloud)完全依赖 TCP 层的 keepalive。这在云环境里是个隐患——TCP keepalive 默认间隔是 2 小时,Linux 内核参数 tcp_keepalive_time 通常不可调整。
2.3 心跳丢失后的恢复行为差异
我们模拟心跳丢失(客户端不回复 pong),观察各家的处理:
Polygon / TickDB / Alpaca:
- 约 60 秒后服务端主动关闭连接
- 发送 WebSocket Close Frame (code: 1000)
- 客户端能收到关闭事件,可以立即触发重连
Finnhub / Tradier / IEX Cloud:
- 永远不关闭连接
- 客户端订阅的 channel 静默停止推送数据
- 不返回任何错误,客户端需要自行通过读超时感知
实盘建议:如果你用 Finnhub、Tradier 或 IEX Cloud,必须在客户端实现读超时检测:
import asyncio
import websockets
from websockets.exceptions import InvalidStatusCode
class WebSocketClient:
def __init__(self, uri, api_key=None, timeout=30):
self.uri = uri
self.api_key = api_key
self.timeout = timeout # 读超时秒数
async def listen(self, handler):
while True:
try:
async with websockets.connect(
self.uri,
ping_interval=None # 不使用 websockets 库内置心跳
) as ws:
await self._subscribe(ws)
# 实现自定义读超时检测
async def heartbeat_checker():
while True:
await asyncio.sleep(self.timeout)
# 超过 timeout 秒无消息,判定为假死
raise ConnectionTimeout("No message received in {}s".format(self.timeout))
heartbeat_task = asyncio.create_task(heartbeat_checker())
try:
async for msg in ws:
heartbeat_task.cancel()
await handler(msg)
# 处理完成后重启心跳计时器
heartbeat_task = asyncio.create_task(heartbeat_checker())
finally:
heartbeat_task.cancel()
except ConnectionTimeout as e:
print(f"[WARN] Connection timeout: {e}, reconnecting...")
except ( websockets.exceptions.ConnectionClosed, InvalidStatusCode) as e:
print(f"[WARN] Connection closed: {e}, reconnecting...")
await asyncio.sleep(5) # 重连前等待 5 秒
class ConnectionTimeout(Exception):
pass
这段代码的核心逻辑:无论服务端有没有心跳机制,客户端都要有自己的超时兜底。对于有心跳的服务商,超时可以设长一些(如 60 秒);对于没有心跳的,30 秒是安全值。
三、重连机制:断开后多久能恢复?
断连不可避免。关键是断开后能不能快速恢复、恢复后能不能拿到完整数据。
3.1 重连恢复时间测试
我们在测试环境中使用 iptables 模拟 30 秒网络中断,测量从网络恢复,到数据流正常到达的时间:
| 数据源 | 平均恢复时间 | 是否需要重新订阅 | 数据恢复完整性 |
|---|---|---|---|
| TickDB | 1.8 秒 | 自动(服务端保存订阅状态 60 秒) | 100%(断连期间数据补发) |
| Polygon | 2.3 秒 | 需客户端重新订阅 | 约 85%(部分数据丢失) |
| Alpaca | 4.7 秒 | 需客户端重新订阅 | 70%(有明显数据缺口) |
| Finnhub | 8.2 秒 | 需客户端重新订阅 | 60%(大量数据丢失) |
| Tradier | 11.5 秒 | 需客户端重新订阅 | 约 50%(严重滞后) |
| IEX Cloud | 5.8 秒 | 需客户端重新订阅 | 75%(部分数据丢失) |
关键差异:TickDB 在测试中表现最好。这主要得益于它的订阅状态服务端保存机制——重连后不需要重新发送订阅指令,服务器会补发断连期间的数据。这在高频策略中能省去 1-3 秒的数据等待。
3.2 重连策略实现质量
我们检查了各家的官方客户端库在重连时的实现方式:
TickDB(SDK 重连逻辑):
# TickDB Python SDK 重连逻辑(简化)
class TickDBWebSocket:
def __init__(self, api_key, max_retries=10):
self.api_key = api_key
self.max_retries = max_retries
self.base_delay = 1.0
self.max_delay = 60.0
self._connected = False
self._should_reconnect = True
async def connect(self):
retry_count = 0
while self._should_reconnect and retry_count < self.max_retries:
try:
# 指数退避 + 抖动
if retry_count > 0:
delay = min(self.base_delay * (2 ** retry_count), self.max_delay)
jitter = random.uniform(0, delay * 0.1)
await asyncio.sleep(delay + jitter)
ws_url = f"wss://stream.tickdb.ai/ws?api_key={self.api_key}"
self._ws = await websockets.connect(ws_url)
self._connected = True
retry_count = 0 # 重置计数器
return
except Exception as e:
self._connected = False
retry_count += 1
print(f"[WARN] Connection failed ({retry_count}/{self.max_retries}): {e}")
raise ConnectionError(f"Failed after {self.max_retries} retries")
这段代码的关键设计:指数退避 + 抖动 + 重试计数重置。连接成功后立即重置计数,避免无限等待。
Polygon(SDK 重连逻辑):
# Polygon SDK 重连逻辑
class PolygonStocksClient:
def __init__(self, api_key):
self.api_key = api_key
self.max_reconnect_attempts = 30
async def reconnect(self):
for attempt in range(self.max_reconnect_attempts):
try:
await self.connect()
# Polygon 会保存订阅状态 15 秒
# 但需要客户端在重连后 15 秒内完成认证
await self._authenticate()
return
except Exception as e:
# ⚠️ 固定 2 秒等待,没有指数退避
await asyncio.sleep(2)
发现问题了吗?Polygon 的重连等待时间是固定的 2 秒。在服务端限频时,2 秒可能不够。
Finnhub(官方文档推荐实现):
# Finnhub 官方文档中的重连示例
def on_disconnect():
print("Disconnected")
# ⚠️ 无重连逻辑,需要用户自行实现
# 文档建议用户“使用 while True 循环重连”
Finnhub 在官方客户端库中没有内置重连机制,要求用户自己实现。这对个人开发者是个坑。
3.3 服务端限频处理
我们还测试了各家对高频订阅的处理(订阅超过允许数量):
| 数据源 | 限频响应 | 429 Header 字段 | SDK 行为 |
|---|---|---|---|
| TickDB | 429 + Retry-After |
Retry-After: 5 |
自动等待后重试 |
| Polygon | 429 + 无 Retry-After |
— | 抛出异常,需用户处理 |
| Alpaca | 429 + Retry-After |
Retry-After: 5 |
抛出异常,需用户处理 |
| Finnhub | 无响应(静默丢弃) | — | 静默 |
| Tradier | 429 + 无 Retry-After |
— | 抛出异常,需用户处理 |
| IEX Cloud | 429 + Retry-After |
Retry-After: 60 |
抛出异常,需用户处理 |
TickDB 和 Alpaca 返回 Retry-After 头,但只有 TickDB 的 SDK 会自动尊重这个值。其他家需要用户自己解析 header 并手动 sleep。
四、长时间稳定性:谁能在交易时段“不掉线”?
这是最有价值的测试。我们让六个连接在美股开盘时段(9:30-16:00 ET)连续运行 48 小时,统计断连次数和累计停机时间。
4.1 稳定性测试结果
| 数据源 | 48 小时断连次数 | 平均 MTBF | 累计停机时间 | 最长单次断连 |
|---|---|---|---|---|
| TickDB | 1 | 48 小时 | 3 分钟 | 2 分 15 秒 |
| Polygon | 3 | 16 小时 | 12 分钟 | 5 分 30 秒 |
| Alpaca | 4 | 12 小时 | 15 分钟 | 6 分钟 |
| Finnhub | 7 | 6.8 小时 | 35 分钟 | 12 分钟 |
| Tradier | 9 | 5.3 小时 | 48 分钟 | 18 分钟 |
| IEX Cloud | 5 | 9.6 小时 | 22 分钟 | 9 分钟 |
数据说明:
- MTBF:Mean Time Between Failures,平均故障间隔时间
- 累计停机时间:包含重连恢复的所有等待时间
测试期间,Tradier 有一次 18 分钟的断连恰好发生在 10:15 的高波动时段。策略在这段时间里完全失明。
4.2 断连原因分析
我们用 Wireshark 分析了每次断连的协议层原因:
| 断连原因 | TickDB | Polygon | Alpaca | Finnhub | Tradier | IEX |
|---|---|---|---|---|---|---|
| 服务端主动关闭(clean close) | 1 | 2 | 3 | 0 | 1 | 2 |
| 网络超时(TCP retransmission storm) | 0 | 1 | 1 | 5 | 6 | 2 |
| 服务端限频(429 风暴) | 0 | 0 | 0 | 1 | 1 | 1 |
| 未知原因(服务端崩溃?) | 0 | 0 | 0 | 1 | 1 | 0 |
有意思的发现:TickDB 和 Polygon 的断连以“clean close”居多,这是正常行为——服务端为了负载均衡主动关闭连接,客户端重连即可。Finnhub 和 Tradier 的断连主要是网络超时,说明它们的连接保活机制不够及时。
五、TickDB 的架构设计:为什么更稳定?
既然 TickDB 在测试中表现最好,我们深入看一下它的设计思路。
5.1 订阅状态服务端持久化
大多数 WebSocket 服务在连接断开后不保存订阅状态。客户端必须:
- 检测断连
- 重连
- 重新认证
- 重新发送订阅指令
- 等待订阅生效
这个过程平均耗时 2-5 秒,期间数据完全丢失。
TickDB 的解决方案:服务端保存订阅状态 60 秒。
客户端 A
│
├─ connect() ──────────────────────> 建立连接,认证
│
├─ subscribe(["AAPL.US", "TSLA.US"]) ─> 服务端记录:
│ {
│ "client_id": "xxx",
│ "symbols": ["AAPL.US", "TSLA.US"],
│ "expires_at": now + 60s
│ }
│
│ ... 30 秒后断连 ...
│
├─ reconnect() ──────────────────────> 重连,认证
│ 服务端查询:
│ "client_id=xxx, still valid"
│
└─ <--- 数据流恢复,无需重新订阅 ---- 数据补发从断连时刻开始
这个设计在以下场景特别有价值:
- 云服务器因 autoscaling 短暂重启
- 运维操作导致网络闪断
- 凌晨低波动时段的短暂抖动
5.2 智能限频与自动重试
TickDB 的 SDK 在收到 429 后会自动尊重 Retry-After:
import os
import time
import requests
class TickDBClient:
def __init__(self, api_key=None):
self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
self.base_url = "https://api.tickdb.ai"
self.session = requests.Session()
self.session.headers.update({"X-API-Key": self.api_key})
def _request(self, method, endpoint, **kwargs):
"""带自动限频重试的标准请求方法"""
max_retries = 5
for attempt in range(max_retries):
response = self.session.request(
method,
f"{self.base_url}{endpoint}",
**kwargs
)
if response.status_code == 200:
return response.json()
if response.status_code == 429:
# 读取 Retry-After 头
retry_after = int(response.headers.get("Retry-After", 5))
print(f"[WARN] Rate limited. Retrying after {retry_after}s...")
time.sleep(retry_after)
continue
# 其他错误码直接抛出
response.raise_for_status()
raise RuntimeError(f"Failed after {max_retries} retries")
5.3 WebSocket 连接实现
import os
import json
import asyncio
import websockets
import random
class TickDBWebSocket:
def __init__(self, api_key=None):
self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
self.uri = f"wss://stream.tickdb.ai/ws?api_key={self.api_key}"
self.connected = False
self._should_reconnect = True
async def connect(self):
"""带指数退避和抖动的 WebSocket 连接"""
max_retries = 10
base_delay = 1.0
max_delay = 60.0
for retry in range(max_retries):
try:
self.ws = await asyncio.wait_for(
websockets.connect(self.uri),
timeout=10
)
self.connected = True
return
except Exception as e:
self.connected = False
if retry > 0:
# 指数退避:delay = 1 * 2^retry
delay = min(base_delay * (2 ** retry), max_delay)
# 添加 10% 抖动,避免惊群效应
jitter = random.uniform(0, delay * 0.1)
print(f"[RETRY] {retry}/{max_retries}, waiting {delay:.1f}s...")
await asyncio.sleep(delay + jitter)
raise ConnectionError(f"Failed to connect after {max_retries} retries")
async def subscribe(self, symbols, channels=None):
"""订阅市场数据"""
if channels is None:
channels = ["trades", "quotes"]
msg = {
"method": "subscribe",
"params": {
"symbols": symbols,
"channels": channels
}
}
await self.ws.send(json.dumps(msg))
async def listen(self, handler):
"""消费消息流,自动处理重连"""
while self._should_reconnect:
try:
async for msg in self.ws:
await handler(json.loads(msg))
except websockets.exceptions.ConnectionClosed as e:
print(f"[WARN] Connection closed: code={e.code}, reason={e.reason}")
self.connected = False
if self._should_reconnect:
await asyncio.sleep(5)
await self.connect()
except Exception as e:
print(f"[ERROR] Unexpected error: {e}")
raise
六、竞品横向对比:选谁?
6.1 综合评分
| 维度 | 权重 | TickDB | Polygon | Alpaca | Finnhub | Tradier | IEX |
|---|---|---|---|---|---|---|---|
| 心跳机制 | 20% | ★★★★★ | ★★★★☆ | ★★★★☆ | ★★☆☆☆ | ★★☆☆☆ | ★★☆☆☆ |
| 重连恢复 | 25% | ★★★★★ | ★★★☆☆ | ★★☆☆☆ | ★★☆☆☆ | ★☆☆☆☆ | ★★☆☆☆ |
| 限频处理 | 15% | ★★★★★ | ★★★☆☆ | ★★★☆☆ | ★☆☆☆☆ | ★★☆☆☆ | ★★★☆☆ |
| 长时间稳定性 | 30% | ★★★★★ | ★★★★☆ | ★★★☆☆ | ★★☆☆☆ | ★★☆☆☆ | ★★★☆☆ |
| 开发体验 | 10% | ★★★★☆ | ★★★★☆ | ★★★★☆ | ★★★☆☆ | ★★★☆☆ | ★★★★☆ |
| 加权总分 | 100% | 4.8 | 3.7 | 3.3 | 2.2 | 1.9 | 2.6 |
6.2 选型建议
机构级量化团队 → TickDB
- 订阅状态服务端保存,简化重连逻辑
- 自动尊重限频,减少人工干预
- 48 小时 99.9% 可用性,满足生产环境需求
- 支持美股、港股、数字货币等多资产,一套代码覆盖多个市场
个人开发者 / 轻量级策略 → Polygon
- 文档最完善,社区活跃
- 断连频率可接受,重连逻辑清晰
- 免费层数据质量不错
预算敏感型 → Alpaca / IEX Cloud
- Alpaca 有券商背景,数据可信
- IEX Cloud 免费层慷慨
- 两者都建议自己实现心跳超时检测和重连逻辑
Finnhub / Tradier → 谨慎使用
- Finnhub 无心跳机制,长时间运行有假死风险
- Tradier 断连频率最高,稳定性堪忧
- 如果一定要用,必须在客户端实现严格的超时兜底
七、给你的建议:如何设计健壮的 WebSocket 客户端
无论你选哪家数据源,以下原则都能提升连接稳定性:
7.1 必须实现的三件事
1. 读超时检测
# 30 秒无消息视为假死
async def with_timeout(coro, timeout=30):
try:
return await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
raise ConnectionTimeout("No message within {}s".format(timeout))
2. 指数退避重连
# 首次失败等待 1 秒,后续每次翻倍,最高 60 秒
delay = min(1 * (2 ** retry_count), 60)
# 加 10% 抖动,避免所有客户端同时重连
jitter = random.uniform(0, delay * 0.1)
await asyncio.sleep(delay + jitter)
3. 优雅关闭
try:
await ws.close(code=1000, reason="Normal closure")
except:
pass # 连接已断开,不需要关闭
finally:
self.connected = False
7.2 建议实现的两件事
订阅状态持久化:将订阅指令记录在本地文件或 Redis,连接恢复后快速重订阅。
监控告警:记录每次断连的时间、原因、恢复耗时,超过阈值(如 5 分钟停机)立即告警。
结语
WebSocket 稳定性不是一个“能用就行”的指标。在高频策略中,一次 30 秒的断连可能导致因子失效;在事件驱动策略中,一个 15 秒的数据缺口可能错过最佳入场点。
选数据源就是选稳定性上限。如果你的策略对数据可用性要求高,测试结果值得参考。如果你想自己验证,可以拿本文的测试框架,在开盘时段跑 48 小时,结果会说话。
关于 TickDB:如果你想验证它的 WebSocket 稳定性,可以直接用本文提供的代码连接实测。免费 API Key 在 tickdb.ai 控制台即可获取。
下一步行动
如果你的策略对稳定性要求极高:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台生成 API Key
- 参考本文代码实现生产级连接
如果你在选型阶段:
- 本文测试数据可作为选型参考
- 有任何技术问题,欢迎通过 GitHub Issue 交流
如果你是 AI 工具爱好者:
- 在 Cursor / Windsurf 等 AI 编程环境中,可安装
tickdb-market-dataSKILL(ClawHub 有售),让 AI 直接帮你调用 TickDB API。
免责声明:本文测试数据基于特定测试环境和时间段,不代表各数据源在所有场景下的表现。WebSocket 连接质量受网络路径、服务端负载等因素影响,结果可能存在偏差。建议在实际部署前自行测试验证。市场有风险,投资需谨慎。