前言

凌晨两点,一位量化开发者盯着屏幕发呆。他的均值回归策略在过去三个月回测中表现优异,但实盘收益却持续亏损。他反复检查因子逻辑、订单路由、交易成本——一切看起来都没问题。

直到他意识到一个被忽视的细节:他的回测数据是分钟级 K 线,而他的实盘信号基于逐笔成交。分钟 K 线抹平了订单簿的微观结构——那些藏在买卖价差里的流动性陷阱、那些被快照遗漏的冰山订单。他需要 tick 数据,而且必须是美股的。

但当他去找数据供应商时,发现了一个让他困惑的现象:Polygon 将美股 tick 数据作为核心卖点大力推广,而 TickDB 的 trades 接口文档里,美股和 A 股的市场赫然标注在"不支持"列表中。

为什么不支持?这家以"实时数据"见长的供应商,为什么在自己的强项市场留下了这样一个明显的空白?

这不是一个可以用"资源有限"或"商业决策"一笔带过的问题。在量化工程的世界里,数据边界的每一次取舍,都对应着一种特定的设计哲学。

本文试图回答的,不是 TickDB "能不能"做 tick 数据,而是回答一个更深层的问题:TickDB 认为自己"应该"是什么,以及这个定位对用户意味着什么


一、tick 数据真正服务的是什么

在讨论为什么 TickDB 不支持美股 tick 数据之前,需要先拆解一个更根本的问题:tick 数据到底在服务什么?

Tick 数据(逐笔成交记录)记录的是市场中最原始的交易事件:某一时刻、某一价格、某一成交量。这种数据的核心价值不在于"多",而在于它保留了时间的精度。

当一个事件在 1 分钟内分 200 次成交时,K 线会将这 200 次合并为一个数字——开盘价、最高价、最低价、收盘价、成交量。但每一次成交背后可能代表着完全不同的市场状态:

场景 K 线表现 Tick 数据揭示
大单砸盘 1 分钟内一根大阴线 揭示大单来源方向、是否引发连锁抛售
冰山订单被触发 价格快速上升 揭示冰山订单的累积深度和触发节奏
高频做市商报价更新 价格窄幅震荡 揭示买卖价差的微观变化模式
机构分批建仓 成交量均匀分布 揭示被动型机构的存在信号

从策略角度,tick 数据主要服务三类量化需求:

第一类:订单流分析(Order Flow Analysis)
通过分析每笔成交的买卖方向、成交量分布、价差变化,计算市场微结构指标——买卖压力比、成交量加权平均价(VWAP)漂移、订单流不平衡度(OFI)。这类策略的核心假设是:订单流中藏着价格发现的先行信号。

第二类:冰山订单与暗池检测
部分订单不会一次性暴露全部数量,而是以"冰山"形式分批成交。Tick 数据可以追踪这种隐藏流动性的累积速度,为日内趋势交易提供信号。

第三类:超高频策略
毫秒甚至微秒级别的策略(如 Latency Arbitrage、Market Making)在逻辑上依赖 tick 级别的时序精度。但这类策略的门槛极高,且在成熟市场中受到监管的严格限制。

这三类需求的共同特征是:它们都在挖掘"被 K 线合并掉的信息"

那么问题变成了:TickDB 的 K 线数据,有没有能力部分满足这些需求?


二、TickDB 能做什么:数据能力全景图

在深入讨论 tick 数据的替代方案之前,先完整梳理一下 TickDB 在美股市场的实际数据能力。

2.1 哪些市场支持,哪些不支持

数据类型 美股 港股 数字货币 外汇/贵金属/指数
K 线(历史) ✅ 10 年级别,清洗对齐
K 线(实时)
Trades(逐笔成交) ❌ 不支持
Depth(订单簿) ✅ 1 档 ✅ 10 档 ✅ 10 档

一个关键区分:"不支持"意味着接口不可用,而不是"数据暂时没有"。TickDB 的 trades 接口在代码层面就对美股和 A 股关闭了,无论用户如何构造请求,都会返回错误码 2002(交易品种不存在)或直接拒绝连接。

2.2 深度数据(Depth)的特殊限制

即使在支持美股 K 线的同时,TickDB 对美股的深度数据也有明确限制:

市场 深度档位 说明
美股 1 档 仅提供买一/卖一价格和挂单量
港股 10 档 展示前 10 档买卖盘的完整深度
数字货币 10 档 展示前 10 档买卖盘的完整深度

这意味着,即使 TickDB 能够在美股市场提供深度数据,其 1 档的限制在实践中也很难支撑高阶的订单流分析策略。对于依赖 5 档以上深度计算的买卖压力比、流动性失衡度等指标,TickDB 的美股数据是力不从心的。

2.3 产品定位的核心分歧

理解为什么不支持美股 tick 数据,关键在于理解 TickDB 的产品定位

TickDB 的核心价值主张是:覆盖 6 大类资产、10 年历史 K 线、WebSocket 实时推送,目标是让量化开发者能够在统一的 API 下完成从历史回测到实时监控的全流程。

这个定位决定了它的设计优先级:

  • 广度优先于深度:用单一 API 覆盖尽可能多的资产类型,而不是在单个市场追求最高精度
  • 历史回测优先于高频实时:10 年级别的清洗 K 线数据是 TickDB 的核心竞争力,这意味着策略回测的可靠性
  • 可靠性优先于实时性:WebSocket 推送 + 心跳保活 + 限频处理的工程架构,目标是"稳定跑一年"而不是"毫秒必争"

Polygon 的定位则是:美股专业级实时数据的深度供应商。它的 trades API 可以提供完整的逐笔成交记录,深度数据覆盖 5 档以上,且在数据延迟上有极致优化。但 Polygon 的覆盖范围相对集中——主要是美股市场。

两种定位对应两种不同的用户需求:

维度 Polygon TickDB
核心市场 美股 6 大类资产
数据精度 Tick 级 + 多档深度 K 线级 + 1 档深度(美股)
历史数据 有限 10 年级别,清洗对齐
跨市场统一接入
实时推送 支持 支持

这不是"谁更好"的问题,而是"谁更适合什么场景"的问题。


三、tick 数据的替代路径:没有 tick,策略还能做吗

回到那位凌晨两点的开发者。他的问题不是" TickDB 为什么不支持 tick 数据",而是"我的策略怎样才能落地"。

对于他,以及和他有类似需求的用户,有三条替代路径值得考虑:

3.1 路径一:从 tick 回退到高频 K 线

如果策略的核心逻辑是捕捉"被 K 线抹平的信息",一个折中方案是将 K 线周期缩短。

Tick 级别的信息密度最高,但 1 分钟 K 线已经能够在相当程度上保留微结构特征。在 TickDB 的 depth 频道中,即使只有 1 档深度数据,也可以计算以下指标:

指标 计算方式 策略含义
买卖压力比 买一量 / 卖一量 >1 表示买盘压力,<1 表示卖盘压力
价差变化率 (当前价差 - 上一快照价差) / 上一快照价差 价差急剧扩大可能预示流动性枯竭
成交量加速 单位时间内的成交量趋势 成交量突然放大是趋势启动的常见信号

这些指标虽然不如 tick 级别的订单流分析精细,但在 1 分钟以上的策略周期内,已经能够提供有价值的信号。

3.2 路径二:港股或数字货币市场的 tick 数据

TickDB 在港股和数字货币市场支持完整的 trades 数据。如果用户的策略逻辑可以跨市场迁移,港股和数字货币是获得 tick 级别数据的最直接途径。

港股的优势在于:成熟市场 + 10 档深度 + 逐笔成交,这意味着完整的订单流分析能力不受限制。对于关注亚太市场或加密货币与港股联动策略的量化开发者,这条路径几乎没有额外代价。

3.3 路径三:多数据源组合

对于必须依赖美股 tick 数据的策略,一个务实的方案是多数据源组合

  • 使用 Polygon 订阅美股 tick 数据作为实时信号源
  • 使用 TickDB 的 /kline 接口获取 10 年历史 K 线进行回测
  • 跨市场策略中,使用 TickDB 统一管理港股、数字货币、外汇的数据

这种组合的本质是:让每个数据源做它最擅长的事。Polygon 的精度 + TickDB 的广度和历史深度,是一个工程上可行的解法。


四、代码示范:TickDB 的 K 线接口实战

为了避免本文沦为空谈策略的"理念文章",接下来用一段生产级代码演示 TickDB K 线接口的实际使用方法——这也是 TickDB 在美股市场真正能提供价值的环节。

4.1 历史 K 线回测:获取 10 年数据

import os
import time
import requests
from typing import Optional, List, Dict

# =============================================
# TickDB K 线数据获取(生产级示例)
# =============================================
# 功能:获取美股历史 K 线数据用于策略回测
# 适用场景:历史回测、因子计算、跨周期分析
# =============================================

class TickDBKlineClient:
    """TickDB K 线数据客户端(生产级)"""

    BASE_URL = "https://api.tickdb.ai/v1/market"

    def __init__(self, api_key: Optional[str] = None):
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("请设置环境变量 TICKDB_API_KEY")
        self.headers = {"X-API-Key": self.api_key}

    def get_historical_klines(
        self,
        symbol: str,
        interval: str = "1h",
        limit: int = 1000,
        start_time: Optional[int] = None,
        end_time: Optional[int] = None
    ) -> List[Dict]:
        """
        获取历史 K 线数据

        参数说明:
        - symbol: 交易品种代码,如 'AAPL.US'
        - interval: K 线周期,支持 1m/5m/15m/30m/1h/4h/1d/1w
        - limit: 每次请求的最大条数(最大 1000)
        - start_time/end_time: Unix 毫秒时间戳

        ⚠️ 注意:limit=1000 是 TickDB 的单次请求上限
                 若需获取更长时间跨度,需分页请求
        """
        endpoint = f"{self.BASE_URL}/kline"
        params = {
            "symbol": symbol,
            "interval": interval,
            "limit": limit
        }
        if start_time:
            params["start"] = start_time
        if end_time:
            params["end"] = end_time

        response = requests.get(
            endpoint,
            headers=self.headers,
            params=params,
            timeout=(3.05, 10)  # 连接超时 3.05s,读取超时 10s
        )

        data = response.json()
        if data.get("code") == 0:
            return data.get("data", [])
        else:
            raise RuntimeError(
                f"K线获取失败 [code={data.get('code')}]: {data.get('message')}"
            )

    def paginate_long_range(
        self,
        symbol: str,
        interval: str,
        start_time: int,
        end_time: int
    ) -> List[Dict]:
        """
        分页获取长周期数据

        ⚠️ 生产级注意事项:
        - TickDB 单次请求限制 1000 条
        - 10 年 1h K 线约 87,000 条,需分约 87 次请求
        - 建议添加请求间隔(rate limit),避免触发限频错误
        """
        all_klines = []
        current_start = start_time

        while current_start < end_time:
            klines = self.get_historical_klines(
                symbol=symbol,
                interval=interval,
                limit=1000,
                start_time=current_start,
                end_time=end_time
            )

            if not klines:
                break

            all_klines.extend(klines)
            # 更新起始时间:为避免重叠,取最后一条 K 线的时间戳
            # ⚠️ 注意:不同品种的 K 线周期边界可能不完全对齐
            current_start = klines[-1]["time"] + 1

            print(f"已获取 {len(all_klines)} 条,当前时间区间: {current_start}")
            time.sleep(0.5)  # ⚠️ 生产环境建议使用 aiohttp 异步并发请求

        return all_klines


# =============================================
# 使用示例:获取苹果公司 10 年小时级 K 线
# =============================================

if __name__ == "__main__":
    client = TickDBKlineClient()

    # 10 年时间范围(Unix 毫秒时间戳)
    ten_years_ago = int((time.time() - 10 * 365 * 24 * 3600) * 1000)
    now = int(time.time() * 1000)

    try:
        # ⚠️ 为避免请求次数过多,这里仅演示获取最近 1000 条
        # 生产环境中使用 paginate_long_range 分页获取
        aapl_klines = client.get_historical_klines(
            symbol="AAPL.US",
            interval="1h",
            limit=1000
        )

        print(f"\n获取到 AAPL.US K 线 {len(aapl_klines)} 条")
        print(f"时间范围: {aapl_klines[0]['time']} ~ {aapl_klines[-1]['time']}")

        # 数据格式说明
        sample = aapl_klines[0]
        print(f"\n数据字段: {list(sample.keys())}")
        print(f"示例数据: {sample}")

    except Exception as e:
        print(f"请求异常: {e}")

4.2 实时 K 线推送:WebSocket 订阅

import json
import time
import random
import threading
from websocket import create_connection

# =============================================
# TickDB WebSocket 实时 K 线订阅(生产级)
# =============================================
# 功能:通过 WebSocket 订阅美股实时 K 线数据
# 适用场景:实时监控、日内策略信号
# ⚠️ 生产环境建议使用 asyncio + aiohttp 异步架构
# =============================================

class TickDBWebSocketClient:
    """TickDB WebSocket 客户端(生产级,含心跳与重连)"""

    WS_URL = "wss://api.tickdb.ai/v1/ws/market"

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.ws = None
        self.running = False
        self.retry_count = 0
        self.max_retries = 5
        self.base_delay = 1  # 初始重连延迟(秒)
        self.max_delay = 60   # 最大重连延迟(秒)

    def connect(self):
        """建立 WebSocket 连接"""
        url = f"{self.WS_URL}?api_key={self.api_key}"
        self.ws = create_connection(url, timeout=10)
        self.retry_count = 0
        print("WebSocket 连接已建立")

    def subscribe(self, symbol: str, interval: str = "1m"):
        """订阅 K 线实时数据"""
        subscribe_msg = {
            "cmd": "sub",
            "params": {
                "type": "kline",
                "symbol": symbol,
                "interval": interval
            }
        }
        self.ws.send(json.dumps(subscribe_msg))
        print(f"已订阅 {symbol} {interval} K 线数据")

    def heartbeat(self):
        """心跳保活"""
        while self.running:
            try:
                self.ws.send(json.dumps({"cmd": "ping"}))
                print("心跳发送成功")
                time.sleep(30)  # ⚠️ 建议根据实际 API 调整心跳间隔
            except Exception as e:
                print(f"心跳异常: {e}")
                self.reconnect()
                break

    def reconnect(self):
        """指数退避重连(含抖动)"""
        self.retry_count += 1
        if self.retry_count > self.max_retries:
            print("重连次数超限,终止连接")
            self.running = False
            return

        delay = min(self.base_delay * (2 ** self.retry_count), self.max_delay)
        # 添加抖动:避免惊群效应(所有客户端同时重连)
        jitter = random.uniform(0, delay * 0.1)
        wait_time = delay + jitter

        print(f"将在 {wait_time:.2f} 秒后尝试重连(第 {self.retry_count} 次)")
        time.sleep(wait_time)

        try:
            self.connect()
            self.running = True
            threading.Thread(target=self.receive_loop, daemon=True).start()
            threading.Thread(target=self.heartbeat, daemon=True).start()
        except Exception as e:
            print(f"重连失败: {e}")
            self.reconnect()

    def receive_loop(self):
        """接收消息循环"""
        while self.running:
            try:
                msg = self.ws.recv()
                data = json.loads(msg)

                # 处理 K 线消息
                if data.get("type") == "kline":
                    kline = data.get("data", {})
                    print(
                        f"K线更新 | {kline.get('symbol')} | "
                        f"O:{kline.get('open')} H:{kline.get('high')} "
                        f"L:{kline.get('low')} C:{kline.get('close')}"
                    )

                # 处理限频响应(code:3001)
                if data.get("code") == 3001:
                    retry_after = int(data.get("retry_after", 5))
                    print(f"触发限频,等待 {retry_after} 秒")
                    time.sleep(retry_after)

            except Exception as e:
                print(f"接收异常: {e}")
                self.running = False
                self.reconnect()
                break

    def start(self, symbol: str, interval: str = "1m"):
        """启动客户端"""
        self.running = True
        self.connect()
        self.subscribe(symbol, interval)

        threading.Thread(target=self.receive_loop, daemon=True).start()
        threading.Thread(target=self.heartbeat, daemon=True).start()

        try:
            while self.running:
                time.sleep(1)
        except KeyboardInterrupt:
            print("\n接收到中断信号,正在关闭连接...")
            self.stop()

    def stop(self):
        """优雅关闭"""
        self.running = False
        if self.ws:
            self.ws.close()
            print("WebSocket 连接已关闭")


if __name__ == "__main__":
    api_key = os.environ.get("TICKDB_API_KEY")
    if not api_key:
        raise ValueError("请设置环境变量 TICKDB_API_KEY")

    client = TickDBWebSocketClient(api_key)
    # 订阅苹果公司 1 分钟 K 线实时推送
    client.start(symbol="AAPL.US", interval="1m")

4.3 数据对比表:TickDB 能做什么,不能做什么

能力维度 Polygon TickDB
美股逐笔成交(tick) ✅ 完整支持 ❌ 不支持
美股订单簿深度 ✅ 多档深度 ⚠️ 仅 1 档
美股历史 K 线 有限年限 ✅ 10 年级别,清洗对齐
港股 tick 数据 ❌ 不支持 ✅ 完整支持
数字货币 tick 数据 有限支持 ✅ 完整支持
外汇/贵金属 ✅ 主流品种 ✅ 主流品种
WebSocket 实时推送
统一 API 多市场 ⚠️ 侧重美股 ✅ 6 大类资产
心跳重连机制 需自行实现 原生支持

五、策略视角:没有 tick 数据,什么策略依然成立

回到那位凌晨两点的开发者。如果他最终选择使用 TickDB 的 K 线数据而非 Polygon 的 tick 数据,他的策略池会发生什么变化?

以下策略在 K 线数据下依然成立,且在 TickDB 的数据能力范围内:

5.1 趋势跟踪策略

趋势跟踪的核心逻辑是"追涨杀跌",依赖的是价格的方向性变化,而非微观结构的瞬时波动。日线或小时线级别的趋势信号,在 K 线数据下完全可以工作。

入场逻辑(伪代码):
if 当前 EMA(20) > EMA(60) and 当前 K 线收盘 > 当前 EMA(20):
    趋势确认,多头入场

这类策略在 TickDB 的 10 年 K 线数据下可以进行充分的历史回测,覆盖多个牛熊周期,样本量充足。

5.2 事件驱动策略

财报发布、美联储利率决议、宏观数据公布——这些事件驱动的核心逻辑是"价格对信息的反应",而不是"逐笔成交中的隐藏信号"。

事件类型 核心数据需求 TickDB 能否支持
财报发布 历史 K 线 + 实时价格
美联储利率决议 K 线 + 宏观日历
地缘政治事件 新闻 + 资产价格 ✅(价格层面)
订单流微观结构 Tick + 多档深度

事件驱动策略的关键是"事件前后价格变化的方向和幅度",这在 K 线层面完全可以量化。

5.3 均值回归策略(降级版)

如果原本的均值回归策略依赖 tick 级别的价差异常,可以降级为基于 1 分钟 K 线价差的均值回归——将分析周期从毫秒拉长到分钟,信号频率降低但依然有效。

降级版逻辑:
if (当前 1min 收盘价 - 5min 均线) > 2 * 5min 标准差:
    价格回归概率较高,做空信号

这不是完美的替代,但它是在约束条件下的最优解——而量化交易的本质,本来就是在约束中寻找概率优势。


六、坦诚的价值:为什么敢于说不

在商业世界里,明确声明"我们不做什么",是一件需要勇气的事。

TickDB 选择不支持美股 tick 数据,不是因为技术上不可行——逐笔成交数据的采集和存储并非不可逾越的障碍。选择"不做",背后是一种产品哲学的坚持:

TickDB 相信:量化交易的核心战场在中低频,而不是高频。

在这个认知下,TickDB 将资源集中在:更长的历史数据、更广的市场覆盖、更稳定的工程架构。这些取舍让 TickDB 成为回测和低频策略开发者的最优选择,而不是成为另一个 Polygon。

这不是贬低 Polygon。Polygon 的美股 tick 数据在高频做市、订单流分析等领域是无可替代的。但 TickDB 的用户,有相当一部分是从历史回测起步、逐步构建多市场策略的量化开发者——他们的战场,在 10 年级别的数据里,在跨资产的相关性里,而不是在毫秒级的价差里。

了解自己的能力边界,是做出正确工具选择的前提。

如果你需要美股 tick 数据,Polygon 是一个值得考虑的选择。
如果你需要跨 6 大类资产、10 年历史、稳定的回测基础设施,TickDB 是为你设计的。

如果你两者都需要——那就不是二选一,而是组合使用。


七、下一步行动

如果你正在评估 TickDB 是否适合你的策略需求
访问 tickdb.ai,注册账户后在控制台生成 API Key。TickDB 提供免费层级,无需信用卡即可体验 10 年美股 K 线数据的回测能力。

如果你需要美股 tick 数据作为补充
建议同时注册 Polygon 账户,将其作为美股微观结构数据的实时信号源,TickDB 作为跨市场历史回测的统一底座。

如果你想快速验证 TickDB 的 API 能力
在控制台中尝试以下请求(以苹果公司 AAPL.US 为例):

GET /v1/market/kline?symbol=AAPL.US&interval=1h&limit=10

查看返回的 K 线数据结构,感受 10 年清洗数据的质量。


风险提示:本文不构成任何投资建议。市场有风险,投资需谨慎。TickDB 的数据能力和产品边界可能随版本更新而调整,建议在实际使用前查阅最新文档。