TickDB 统一行情网关:单一 WebSocket 跨市场数据的技术解密

作者:TickDB 内容战略专家
阅读时间:约 15 分钟


开篇

“你永远无法用一把钥匙打开所有门——除非这把钥匙是模块化的。”

凌晨三点,量化开发者老 K 被一阵急促的告警声惊醒。他的跨市场套利策略因为某个交易所的 WebSocket 连接断开,错过了港股比亚迪 15% 的跳空缺口。而就在同一天晚上,他订阅的加密货币行情在 FTX 事件后的第一个交易日,成交量暴涨 300 倍,原有的轮询机制彻底崩溃。

这不是一个孤立的故障。这是多市场数据获取的结构性困境:每个交易所都是一座信息孤岛,它们的协议、时区、数据格式、连接行为各不相同。当你想用一套系统覆盖美股、港股、数字货币时,迎接你的是七个不同的适配层、七套时区转换逻辑、七种错误处理机制。

今天,我们从工程角度拆解这个问题——不是告诉你"用一个连接搞定一切"这种营销话术,而是讲清楚:TickDB 的统一行情网关在协议适配层做了什么?统一数据模型如何设计?时区标准化怎么实现? 读完你会知道这套系统的边界在哪里,什么时候它能救你一命,什么时候你需要自己做额外的判断。


一、为什么多市场数据获取是结构性难题

1.1 协议碎片化的现实

全球主要交易所使用的行情协议差异巨大:

协议类型 代表交易所 连接方式 数据推送频率 认证方式
FIX 协议 纽交所、伦交所 TCP 长连接 毫秒级 用户名+密码+证书
ITCH 协议 NASDAQ、港交所 UDP/TCP 纳秒级 无认证(数据加密)
WebSocket 多数加密交易所 HTTP Upgrade 毫秒级 API Key(Header/URL)
REST Polling 部分小众市场 HTTP Short 秒级 API Key(Param)
私有二进制协议 CME、SGX TCP 微秒级 硬件 Token

这不是"不同格式"的问题,而是完全不同的交互范式

  • FIX/ITCH:是状态机协议,需要维护会话、序列号、心跳响应。连接断开后需要重连并从断点恢复。
  • WebSocket:是无状态推送通道,但不同交易所的订阅指令格式、频道命名、心跳机制各不相同。
  • REST Polling:是轮询模式,本身不支持推送,需要在上层构建轮询队列和防抖逻辑。

1.2 时区地狱

东京证券交易所使用 JST(UTC+9),纽交所使用 EST/EDT(UTC-5/-4),加密货币交易所UTC+0——而你的策略代码里用的是北京时间(UTC+8)。

这意味着:

  • 港股 9:30 开盘 = 北京时间 9:30 = UTC 1:30
  • 美股 9:30 开盘 = EST 9:30 = 北京时间 22:30(冬令时)或 21:30(夏令时)
  • 加密货币 24/7,无休市概念

更复杂的是夏令时切换。每年 3 月和 11 月,纽交所的开盘时间在UTC+5和UTC+4之间跳跃。如果你的定时任务没有考虑这个切换,轻则错过开盘集合竞价,重则把当日K线数据的时间戳搞混,导致回测结果完全错误。

1.3 数据模型不一致

即使你克服了协议和时区的障碍,数据本身也是破碎的:

// 币安 trades 数据
{"e":"trade","E":1672531200000,"s":"BTCUSDT","p":"16500.00","q":"0.001"}

// 纳斯达克 ITCH 消息(简化)
<TimePart>10000000</TimePart><OrderID>12345</OrderID><Side>B</Side>

// 港交所 OMD-C 行情
<MDW>20230115</MDW><MDT>09295800</MDT><BID>198.00</BID><ASK>198.20</ASK>

价格精度不同(币安是 8 位小数,美股通常是 2 位)、时间戳格式不同(毫秒时间戳 vs 时间字符串 vs 压缩整数)、成交量定义不同(成交额 vs 成交股数 vs 成交笔数)。

多市场数据获取的本质难题不是"接入一个API",而是构建一套能同时处理七种协议、统一三种时区、对齐四种数据定义的系统。


二、统一行情网关的架构设计

2.1 分层架构总览

TickDB 的统一行情网关采用经典的分层解耦架构,每一层职责清晰,依赖单向:

┌─────────────────────────────────────────────────────────────┐
│                     统一接入层 (Unified Gateway)              │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐          │
│  │  美股适配器  │  │  港股适配器  │  │  加密适配器  │          │
│  │  (US Adapter)│  │ (HK Adapter)│  │ (CRYPTO)    │          │
│  └─────────────┘  └─────────────┘  └─────────────┘          │
├─────────────────────────────────────────────────────────────┤
│                    协议抽象层 (Protocol Abstraction)          │
│  ┌──────────────────────────────────────────────────────┐   │
│  │          统一数据模型 (Unified Data Model)             │   │
│  │    Ticker / Depth / Trade / Kline                     │   │
│  └──────────────────────────────────────────────────────┘   │
├─────────────────────────────────────────────────────────────┤
│                    标准化输出层 (Standardized Output)         │
│  ┌─────────────────┐  ┌─────────────────┐                   │
│  │  WebSocket 推送  │  │   REST API      │                   │
│  └─────────────────┘  └─────────────────┘                   │
└─────────────────────────────────────────────────────────────┘

每一层的核心职责

层级 职责 对外暴露
统一接入层 维护各交易所的原始连接、处理协议细节、心跳保活 不可见,内部实现
协议抽象层 将原始数据转换为统一数据模型、统一时间戳为 UTC 统一数据结构
标准化输出层 统一的订阅语法、统一鉴权、统一限频处理 用户可见 API

2.2 适配器模式的具体实现

每个市场的适配器是一个独立的处理单元,遵循统一的接口契约:

interface ExchangeAdapter {
    // 建立连接
    connect(credentials: Credentials): Connection
    
    // 订阅行情
    subscribe(channel: string, symbols: string[]): void
    
    // 心跳保活
    ping(): void
    
    // 解析原始消息为统一格式
    parse(message: RawMessage): UnifiedMessage
    
    // 错误恢复
    reconnect(): void
}

这样做的好处是新增一个交易所只需要实现一个新的 Adapter,不影响其他市场的数据流。在 TickDB 的系统内部,你可以理解为:

# 简化示意(非实际代码)
class UnifiedGateway:
    def __init__(self):
        self.adapters = {
            'US': USExchangeAdapter(),    # NASDAQ / NYSE
            'HK': HKExchangeAdapter(),    # HKEX
            'CRYPTO': CryptoAdapter()     # Binance / OKX / Huobi
        }
        self.unified_model = UnifiedDataModel()
    
    def subscribe(self, channel: str, symbols: list):
        """
        用户只需要知道: subscribe('depth', ['AAPL.US', '9988.HK', 'BTC.USDT'])
        底层的协议差异完全被屏蔽
        """
        for symbol in symbols:
            market = self.extract_market(symbol)
            adapter = self.adapters[market]
            # 适配器负责将统一订阅格式转为对应协议
            adapter.translate_and_subscribe(channel, symbol)

三、统一数据模型的设计

3.1 核心数据结构

TickDB 定义了四个核心数据结构,覆盖行情数据的全部维度:

数据类型 英文名 说明 典型使用场景
实时报价 Ticker 买卖盘口快照 价格监控、涨跌排行
订单簿 Depth 多档深度数据 流动性分析、价差监控
逐笔成交 Trade 每笔成交明细 订单流分析、大单识别
K线数据 Kline OHLCV 聚合 技术指标、回测

每种数据类型都经过标准化处理,确保跨市场一致性:

// 统一 Ticker 结构(TickDB 输出)
{
    "symbol": "AAPL.US",           // 标准化标的代码:市场代码.交易所代码
    "timestamp": 1672531200000,    // UTC 毫秒时间戳(强制)
    "last": 142.50,                // 最新价(统一精度处理)
    "bid": 142.48,                 // 买一价
    "ask": 142.52,                 // 卖一价
    "volume": 52340000,            // 成交量(股数,非金额)
    "turnover": 7468470000.00,     // 成交额(统一为 USD)
    "change_pct": 1.25             // 涨跌幅(百分比,非小数)
}
// 统一 Depth 结构
{
    "symbol": "9988.HK",
    "timestamp": 1672531200000,
    "bids": [                      // 买盘深度,最多 50 档
        {"price": 335.00, "quantity": 125000},
        {"price": 334.80, "quantity": 98000},
        // ...
    ],
    "asks": [                      // 卖盘深度
        {"price": 335.20, "quantity": 156000},
        {"price": 335.40, "quantity": 87000},
        // ...
    ]
}

3.2 精度统一策略

不同交易所的价格精度差异巨大:

市场 价格精度 示例
美股 2 位小数 $142.50
港股 2-3 位小数(仙股 3 位) HK$335.00 / HK$0.285
加密货币 2-8 位不等 16500.00 USDT

TickDB 的统一策略是保留原始精度 + 标准化字段命名

# TickDB 的精度处理示意
def normalize_price(price: str, market: str) -> Decimal:
    """
    保留原始精度的字符串表示,
    同时返回标准化的 Decimal 便于计算
    """
    decimal_price = Decimal(price)
    
    # 币安的 BTCUSDT 价格可能返回 "16500.00000000"
    # 港交所的仙股价格可能返回 "0.285"
    # 统一转为 Decimal,不截断精度
    
    return decimal_price

这样做的代价是你需要在业务层自行决定显示精度,收益是不会丢失任何有效数字——对于加密货币的合约交易,0.01 USDT 的滑点可能就是 0.06% 的损失。


四、时区标准化的实现

4.1 统一到 UTC 的强制策略

TickDB 内部强制使用 UTC 毫秒时间戳,任何传入的时间参数都会被转换:

输入: "2023-01-01 09:30:00" (北京时间)
↓ 识别时区: Asia/Shanghai (UTC+8)
↓ 转换为 UTC: 2023-01-01 01:30:00 UTC
↓ 转为毫秒时间戳: 1672531200000

这个转换逻辑内置了夏令时切换逻辑

import pytz
from datetime import datetime

def normalize_to_utc(dt_str: str, source_tz: str) -> int:
    """
    将任意时区的时间字符串转换为 UTC 毫秒时间戳
    """
    tz = pytz.timezone(source_tz)
    
    # pytz 会自动处理夏令时切换
    # source_tz 可能是: "America/New_York", "Asia/Hong_Kong", "UTC"
    dt = tz.localize(datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S"))
    
    # 转为 UTC
    utc_dt = dt.astimezone(pytz.UTC)
    
    return int(utc_dt.timestamp() * 1000)

4.2 各市场的时区映射表

市场 标准时区标识 夏令时 UTC 偏移(冬/夏)
美股 America/New_York UTC-5 / UTC-4
港股 Asia/Hong_Kong UTC+8 固定
A股 Asia/Shanghai UTC+8 固定
加密 UTC UTC+0 固定

关键提醒:如果你用 TickDB 的 /kline 接口做回测,返回的 timestamp 字段是 UTC 毫秒时间戳。你需要在策略代码里转换为本地时区才能正确理解"开盘价"和"收盘价"在物理时间上的关系。

4.3 交易时段边界处理

不同市场的交易时段边界是时区问题的高发场景:

// 港股交易时段(港时 UTC+8)
上午: 09:00 - 12:00 HK
下午: 13:00 - 16:00 HK

// 美股交易时段(美东时 UTC-5/-4)
常规: 09:30 - 16:00 US EST/EDT
盘前: 04:00 - 09:30 US EST/EDT
盘后: 16:00 - 20:00 US EST/EDT

TickDB 的适配器在每个市场内部维护独立的时段状态机,在盘前、盘中、盘后切换时自动变更数据推送行为(盘中推送全量 tick,盘后仅推送 stats 快照)。


五、代码演示:单一 WebSocket 跨市场订阅

5.1 生产级 WebSocket 客户端

下面的代码展示了如何用一套连接同时订阅美股、港股、加密市场的行情数据。代码包含了心跳保活、指数退避重连、限频自适应处理等生产级要素:

import os
import json
import time
import asyncio
import random
import threading
import websocket
from datetime import datetime
from typing import Callable, Optional

class TickDBUnifiedWebSocket:
    """
    TickDB 统一行情网关 WebSocket 客户端
    生产级实现:心跳保活、指数退避重连、限频自适应处理
    
    ⚠️ 高频场景建议使用 aiohttp/asyncio 重构
    """
    
    def __init__(self, api_key: str = None):
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("API Key 未设置,请设置环境变量 TICKDB_API_KEY")
        
        self.base_url = "wss://api.tickdb.ai/v1/ws"
        self.ws: Optional[websocket.WebSocketApp] = None
        self.reconnect_delay = 1
        self.max_reconnect_delay = 60
        self.running = False
        self.handlers = {}
        self._thread: Optional[threading.Thread] = None
        
    def subscribe(self, channel: str, symbols: list, handler: Callable):
        """
        订阅统一行情
        
        Args:
            channel: 'ticker' | 'depth' | 'trade' | 'kline'
            symbols: ['AAPL.US', '9988.HK', 'BTC.USDT']
            handler: 回调函数,接收 UnifiedMessage
        """
        self.handlers[f"{channel}:{','.join(symbols)}"] = handler
        
        if self.ws and self.running:
            subscribe_msg = {
                "cmd": "subscribe",
                "channel": channel,
                "symbols": symbols
            }
            self.ws.send(json.dumps(subscribe_msg))
            
    def _on_message(self, ws, message):
        """处理接收到的消息"""
        try:
            data = json.loads(message)
            
            # 处理心跳响应
            if data.get("type") == "pong":
                return
                
            # 提取 channel 标识
            channel_key = f"{data.get('channel')}:{data.get('symbols', [])}"
            
            if channel_key in self.handlers:
                self.handlers[channel_key](data)
                
        except json.JSONDecodeError:
            print(f"[WARN] 非 JSON 消息: {message[:100]}")
        except Exception as e:
            print(f"[ERROR] 消息处理异常: {e}")
            
    def _on_ping(self, ws):
        """WebSocket ping 心跳"""
        ws.send(json.dumps({"type": "ping"}))
        
    def _on_open(self, ws):
        """连接建立时,重新订阅所有通道"""
        print(f"[INFO] WebSocket 连接已建立: {datetime.utcnow()}")
        self.reconnect_delay = 1  # 重置退避
        
        # 重新订阅所有已注册通道
        for channel_key in self.handlers.keys():
            channel, symbols_str = channel_key.split(':', 1)
            symbols = symbols_str.split(',')
            subscribe_msg = {
                "cmd": "subscribe",
                "channel": channel,
                "symbols": symbols
            }
            ws.send(json.dumps(subscribe_msg))
            
    def _on_error(self, ws, error):
        """错误处理"""
        print(f"[ERROR] WebSocket 错误: {error}")
        
    def _on_close(self, ws, close_status_code, close_msg):
        """连接关闭,触发重连"""
        print(f"[WARN] 连接关闭: {close_status_code} - {close_msg}")
        self.running = False
        
        # 指数退避重连 + 抖动
        delay = self.reconnect_delay
        jitter = random.uniform(0, delay * 0.1)
        actual_delay = min(delay + jitter, self.max_reconnect_delay)
        
        print(f"[INFO] {actual_delay:.1f} 秒后尝试重连...")
        time.sleep(actual_delay)
        
        # 指数退避
        self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay)
        
        self._connect()
        
    def _connect(self):
        """建立 WebSocket 连接"""
        # URL 鉴权:TickDB WebSocket 使用 URL 参数传递 API Key
        ws_url = f"{self.base_url}?api_key={self.api_key}"
        
        self.ws = websocket.WebSocketApp(
            ws_url,
            on_message=self._on_message,
            on_ping=self._on_ping,
            on_open=self._on_open,
            on_error=self._on_error,
            on_close=self._on_close
        )
        
        self.running = True
        self._thread = threading.Thread(target=self.ws.run_forever)
        self._thread.daemon = True
        self._thread.start()
        
    def start(self):
        """启动客户端"""
        self._connect()
        return self
        
    def stop(self):
        """停止客户端"""
        self.running = False
        if self.ws:
            self.ws.close()


def ticker_handler(message):
    """Ticker 数据处理示例"""
    print(f"[TICKER] {message.get('symbol')}: "
          f"${message.get('last')} "
          f"(涨跌: {message.get('change_pct')}%) "
          f"@ {datetime.utcnow()}")


def depth_handler(message):
    """Depth 订单簿处理示例"""
    bids = message.get('bids', [])
    asks = message.get('asks', [])
    
    if bids and asks:
        best_bid = bids[0]['price']
        best_ask = asks[0]['price']
        spread = best_ask - best_bid
        spread_pct = (spread / best_ask) * 100
        
        print(f"[DEPTH] {message.get('symbol')}: "
              f"Bid {best_bid} / Ask {best_ask} "
              f"(价差: {spread_pct:.4f}%)")


# 使用示例
if __name__ == "__main__":
    client = TickDBUnifiedWebSocket()
    
    # 单一连接,订阅三个市场的 ticker
    client.subscribe('ticker', ['AAPL.US', '9988.HK', 'BTC.USDT'], ticker_handler)
    
    # 订阅港股 depth(港股支持多档深度)
    client.subscribe('depth', ['9988.HK'], depth_handler)
    
    print("TickDB 统一行情网关已启动,按 Ctrl+C 退出...")
    
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        client.stop()
        print("已退出")

5.2 订阅响应的数据结构

当你订阅后,TickDB 会推送如下格式的数据(已标准化):

// 美股 ticker 响应
{
    "channel": "ticker",
    "symbol": "AAPL.US",
    "timestamp": 1672531200000,
    "last": 142.50,
    "bid": 142.48,
    "ask": 142.52,
    "volume": 52340000,
    "change_pct": 1.25
}

// 港股 depth 响应(10 档深度)
{
    "channel": "depth",
    "symbol": "9988.HK",
    "timestamp": 1672531200000,
    "bids": [
        {"price": 335.00, "quantity": 125000},
        {"price": 334.80, "quantity": 98000},
        // ... 最多 10 档
    ],
    "asks": [
        {"price": 335.20, "quantity": 156000},
        {"price": 335.40, "quantity": 87000}
    ]
}

// 加密货币 ticker 响应
{
    "channel": "ticker",
    "symbol": "BTC.USDT",
    "timestamp": 1672531200000,
    "last": 16500.00,
    "bid": 16499.50,
    "ask": 16500.50,
    "volume": 125432.5,
    "change_pct": -2.35
}

注意:三者的 symbol 格式统一为 {标的代码}.{市场标识},你的策略代码只需要解析 symbol 字段即可知道当前数据来自哪个市场。


六、产品能力边界与适用场景

6.1 能力边界说明

理解统一行情网关的边界,是正确使用它的前提:

能力维度 支持情况 限制说明
跨市场单一连接 ✅ 完整支持 最多同时订阅 100 个 symbol
美股 Ticker ✅ 10 档 实时推送,毫秒级延迟
美股 Depth ⚠️ 仅 1 档 买一/卖一,不支持完整订单簿
港股 Depth ✅ 10 档 支持完整订单簿深度
加密 Depth ✅ 10 档 Binance/OKX/Huobi 均支持
美股 trades ❌ 不支持 暂无逐笔成交数据
A股 trades ❌ 不支持 仅支持 K 线数据
外汇/贵金属 ❌ 不支持 当前版本不支持

6.2 价值对比表

如果你正在评估 TickDB 与其他数据源的差异:

能力维度 普通券商 API Polygon.io TickDB
跨市场统一接入 ❌ 每家券商独立 API ❌ 每类资产独立端点 ✅ 单一 WebSocket
美股 Depth 档位 通常 1 档 最高 50 档 1 档(美股)/ 10 档(港/加密)
实时性 秒级轮询 WebSocket <100ms WebSocket <100ms
历史 K 线 有限或不提供 部分品种 10 年级美股 K 线
多语言 SDK 多为官方绑定 Python / Node / Go Python / Node / Go
协议适配 需自行处理 需自行处理 原生统一

6.3 适用场景与不适用场景

适合使用 TickDB 统一网关的场景

  • 跨市场监控大屏:一个连接展示 AAPL、9988、BTC 的实时行情
  • 多市场配对交易:港股 ETF 与对应加密货币的价差监控
  • 加密货币 + 美股宏观策略:用 VIX 期货 + BTC 作为相关指标
  • 产业链事件驱动:某科技公司财报 + 供应链港股公司联动

不适合的场景

  • 美股订单流分析(需要完整 tick 级逐笔成交)
  • 高频套利(TickDB 是实时推送,非高频撮合)
  • A股日内交易(当前版本不支持 A股 depth 和 trades)

七、部署建议

根据你的使用规模,TickDB 提供不同的接入方案:

部署级别 适用对象 并发连接 数据权限 支持 SLA
免费层 个人开发者 1 美股/港股/加密 ticker Best effort
专业版 个人量化 5 全市场 ticker + depth 99.5%
团队版 小型团队 20 全量 + 历史 K 线 99.9%
企业版 机构用户 无限制 定制数据源 + 专线 99.99% + SLA

结语

回到开篇的问题:一个 WebSocket 怎么同时收美股、港股、加密的行情?

答案是:它不是"一个连接收所有"这么简单。它是 TickDB 在协议适配层做了大量的脏活累活——维护七个不同的适配器、处理三种时区切换、统一四种数据定义——然后把一个干净的、统一的数据接口暴露给你。

这是统一行情网关的核心价值:让你专注于策略逻辑,而不是数据工程

但网关有边界:美股只有 1 档 depth,没有逐笔成交,A 股暂不支持。这些限制不是 TickDB 的营销失误,而是当前版本的客观能力边界。理解边界,才能正确使用。


下一步行动

如果你想快速验证 TickDB 的跨市场能力

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY,复制本文代码即可运行

如果你需要 10 年全量历史 K 线数据做策略回测,联系 [email protected] 了解机构方案。

如果你在高频交易场景下,需要更详细的限频策略和并发架构建议,欢迎预约 TickDB 技术团队的一对一咨询。


风险提示:本文不构成任何投资建议。TickDB 是数据工具,不提供交易策略或投资指引。跨市场配对交易涉及复杂的汇率风险、流动性风险和监管合规要求,实际操作前请咨询专业人士。市场有风险,投资需谨慎。