"你的 API Key 还有效,但今日配额用完了"

凌晨两点,你刚写完一套财报事件驱动策略的回测脚本。回测结果漂亮得不像话——年化 34%,夏普 2.1,最大回撤 8%。你信心满满地点下"部署",然后收到了一条报错:

{"code": 3001, "message": "Rate limit exceeded", "data": null}
Retry-After: 60

你盯着 Retry-After: 60 愣了三秒,心想:这玩意儿到底限到什么程度?

本文用实测告诉你:TickDB 免费层能做什么、不能做什么,以及配额耗尽后代码应该如何优雅地应对。


免费层到底给了多少?

在开始实测之前,先把官方限制说清楚。根据 TickDB 当前的公开文档,免费层的核心限制如下:

限制维度 免费层规格
日请求配额(REST) 5,000 次 / 自然日(UTC 0 点重置)
WebSocket 并发连接数 1 个
WebSocket 单次最大订阅时长 无硬性上限,但需配合心跳维持
可访问资产类别 数字货币、港股(部分)
可用接口 /v1/market/kline/v1/market/depth、WebSocket kline/depth/trades
历史数据范围 最近 90 天 K 线
账户数量 1 个

几点需要特别说明:

第一,配额是按"天"而非按"月"计算的。 5,000 次/天的限制意味着:如果你在盘前半小时密集请求,可能 9 点半就把当天配额用完了。配额重置时间是 UTC 0 点,换算成北京时间是早上 8 点。

第二,WebSocket 连接本身不计入 REST 配额,但订阅频道产生的数据流有隐性的带宽限制。 也就是说,你可以保持一个长连接持续接收 depth 推送,但这不消耗日请求次数。不过,一旦你在 WebSocket 中发送了请求(比如订阅新频道),同样受到限频约束。

第三,depth 频道的档位数因市场而异。 美股 1 档、港股 10 档、数字货币 10 档。如果你期待用免费层的 depth 数据做美股订单簿分析,1 档信息量相当有限。


实测一:限频触发的完整路径

最让开发者困惑的,不是"有多少配额",而是"配额耗尽后会发生什么"。我们来完整走一遍触发路径。

下面的代码故意在循环中不加任何限频处理,先让你看清楚原始错误长什么样:

import os
import time
import requests

API_KEY = os.environ.get("TICKDB_API_KEY")
BASE_URL = "https://api.tickdb.ai"

headers = {
    "X-API-Key": API_KEY,
    "Content-Type": "application/json"
}


def fetch_kline(symbol: str, limit: int = 100):
    """拉取 K 线数据,不含限频处理的版本"""
    url = f"{BASE_URL}/v1/market/kline"
    params = {
        "symbol": symbol,
        "interval": "1h",
        "limit": limit
    }

    response = requests.get(
        url, headers=headers, params=params, timeout=(3.05, 10)
    )
    data = response.json()

    code = data.get("code", 0)

    if code == 0:
        return data.get("data", [])
    elif code == 3001:
        # ⚠️ 这里是限频触发的标志
        retry_after = int(response.headers.get("Retry-After", 5))
        print(f"[限频] code=3001, Retry-After: {retry_after} 秒")
        return None
    else:
        print(f"[错误] code={code}, message={data.get('message')}")
        return None


def naive_stress_test():
    """不加控制的暴力请求,测试限频边界"""
    results = []
    for i in range(15):
        result = fetch_kline("BTC.USDT", limit=10)
        if result is None:
            print(f"  → 第 {i+1} 次请求触发限频,中止")
            break
        results.append(len(result))
        print(f"  第 {i+1} 次请求: 成功, 返回 {len(result)} 条数据")

    print(f"\n累计成功请求: {len(results)} 次")


if __name__ == "__main__":
    naive_stress_test()

如果你在短时间内(比如 5 秒内)连续发送 10-15 次请求,大概率在第 6-10 次之间触发 code=3001,响应头中会包含 Retry-After,值通常在 30-60 秒之间。

关键发现Retry-After 的值不是固定的。它取决于你在窗口期内的违规程度——轻微超限可能是 5-10 秒,连续高频触发可能直接收到 120 秒。


实测二:指数退避 + 抖动的正确姿势

知道触发条件后,下一步是正确应对。工程上标准的限频处理逻辑是指数退避(Exponential Backoff)配合抖动(Jitter)

import os
import time
import random
import requests

API_KEY = os.environ.get("TICKDB_API_KEY")
BASE_URL = "https://api.tickdb.ai"
headers = {"X-API-Key": API_KEY}

# ⚠️ 生产环境高频场景建议使用 aiohttp/asyncio
# 以下为同步版本,适用于日级别批量任务

MAX_RETRIES = 5
BASE_DELAY = 1.0   # 初始等待 1 秒
MAX_DELAY = 60.0  # 最大等待 60 秒


def handle_tickdb_error(response: requests.Response, symbol: str = None):
    """
    TickDB 标准错误处理

    返回值:
      - (True, data): 成功,data 为业务数据
      - (False, retry_after): 限频,需要重试
      - (False, None): 不可恢复错误
    """
    data = response.json()
    code = data.get("code", 0)

    if code == 0:
        return True, data.get("data", [])

    if code in (1001, 1002):
        raise ValueError(f"[致命] API Key 无效或缺失: {data.get('message')}")

    if code == 2002:
        raise KeyError(f"[致命] 交易品种不存在: {symbol}")

    if code == 3001:
        retry_after = int(response.headers.get("Retry-After", 5))
        print(f"[限频] 触发 code=3001,等待 {retry_after} 秒后重试")
        return False, retry_after

    # 其他未知错误码
    raise RuntimeError(f"[未知错误] code={code}: {data.get('message')}")


def fetch_with_backoff(symbol: str, interval: str = "1h", limit: int = 100):
    """
    带指数退避和抖动的 K 线数据获取

    重试策略:
      每次失败后,等待时间 = min(BASE_DELAY * 2^retry + jitter, MAX_DELAY)
      jitter 范围为等待时间的 [0, 10%],避免多实例同时重试的"惊群效应"
    """
    for retry in range(MAX_RETRIES):
        try:
            response = requests.get(
                f"{BASE_URL}/v1/market/kline",
                headers=headers,
                params={"symbol": symbol, "interval": interval, "limit": limit},
                timeout=(3.05, 10)  # (connect_timeout, read_timeout)
            )

            success, result = handle_tickdb_error(response, symbol)

            if success:
                if retry > 0:
                    print(f"[恢复] 请求在第 {retry} 次重试后成功")
                return result

            # handle_tickdb_error 返回 (False, retry_after) 表示限频
            retry_after = result
            if retry == MAX_RETRIES - 1:
                print(f"[放弃] 达到最大重试次数 ({MAX_RETRIES}),本次请求终止")
                return None

            # 计算退避时间:指数增长 + 随机抖动
            delay = min(BASE_DELAY * (2 ** retry), MAX_DELAY)
            jitter = random.uniform(0, delay * 0.1)
            total_delay = delay + jitter

            print(f"[重试 {retry+1}/{MAX_RETRIES}] "
                  f"等待 {total_delay:.2f} 秒 "
                  f"(基础 {delay}s + 抖动 {jitter:.2f}s)")
            time.sleep(total_delay)

        except requests.exceptions.Timeout:
            print(f"[重试 {retry+1}/{MAX_RETRIES}] 请求超时,指数退避")
            time.sleep(min(BASE_DELAY * (2 ** retry), MAX_DELAY))

        except requests.exceptions.ConnectionError as e:
            print(f"[重试 {retry+1}/{MAX_RETRIES}] 连接错误: {e}")
            time.sleep(min(BASE_DELAY * (2 ** retry), MAX_DELAY))

    return None


def batch_fetch_demo(symbols: list[str]):
    """演示批量获取多标的时的配額管理"""
    print(f"\n{'='*50}")
    print(f"开始批量请求 {len(symbols)} 个标的...")
    print(f"免费层日限额: 5,000 次/天")
    print(f"{'='*50}\n")

    results = {}
    for i, sym in enumerate(symbols, 1):
        print(f"[{i}/{len(symbols)}] 请求 {sym}...", end=" ")
        data = fetch_with_backoff(sym, limit=50)
        if data:
            print(f"✓ 获取 {len(data)} 条 K 线")
            results[sym] = data
        else:
            print("✗ 获取失败")

        # 正常请求之间加 200ms 间隔,避免触发隐性限频
        # ⚠️ 如果你的日配额接近上限,这里改为 500ms 或更大
        time.sleep(0.2)

    print(f"\n总结: 成功 {len(results)}/{len(symbols)} 个标的")
    return results


if __name__ == "__main__":
    demo_symbols = [
        "BTC.USDT", "ETH.USDT",
        "SOL.USDT", "DOGE.USDT",
        "HK.00700", "HK.09988"
    ]
    batch_fetch_demo(demo_symbols)

这段代码覆盖了免费层使用中最重要的三个场景:

  1. 限频识别:通过 code=3001 + Retry-After 头精准判断,而非靠猜
  2. 指数退避:每次重试等待时间翻倍,防止"越催越慢"的恶性循环
  3. 抖动:每次等待时间加一个随机偏移量,避免多进程/多实例同时在第 N 秒重试(惊群)

实测三:WebSocket 连接的实际表现

免费层可以建立 WebSocket 连接,这一点比很多竞品慷慨。但连接能跑多久、断开后会发生什么,是另一个高频困惑点。

import os
import json
import time
import websocket  # pip install websocket-client


API_KEY = os.environ.get("TICKDB_API_KEY")
WS_URL = "wss://api.tickdb.ai/ws/v1/market"
# ⚠️ WebSocket 鉴权用 URL 参数,不在 Header 里
WSS_URL = f"{WS_URL}?api_key={API_KEY}"


class TickDBDepthMonitor:
    """
    实时监控 depth 数据流,含心跳保活和断线重连

    使用场景:
      - 数字货币和港股的订单簿实时追踪
      - 配合买卖压力比计算,识别流动性突变信号
    """

    def __init__(self, symbols: list[str]):
        self.symbols = symbols
        self.ws = None
        self.reconnect_interval = 1   # 初始重连间隔(秒)
        self.max_reconnect_interval = 30
        self.running = False

    def connect(self):
        """建立 WebSocket 连接"""
        try:
            self.ws = websocket.WebSocketApp(
                WSS_URL,
                on_open=self._on_open,
                on_message=self._on_message,
                on_error=self._on_error,
                on_close=self._on_close,
                on_ping=self._on_ping,
                on_pong=self._on_pong,
            )
            print(f"[连接] 正在连接到 {WS_URL}")
            self.running = True
            self.ws.run_forever(
                ping_interval=20,   # 每 20 秒发一次 ping
                ping_timeout=10,   # ping 超时 10 秒认为断线
            )
        except Exception as e:
            print(f"[异常] WebSocket 初始化失败: {e}")

    def _send_subscribe(self):
        """订阅 depth 频道"""
        for symbol in self.symbols:
            sub_msg = json.dumps({
                "cmd": "subscribe",
                "args": {"channel": "depth", "symbol": symbol}
            })
            self.ws.send(sub_msg)
            print(f"[订阅] depth:{symbol}")

    def _on_open(self, ws):
        print("[就绪] WebSocket 连接已建立")
        self._send_subscribe()
        # 重连成功后重置间隔
        self.reconnect_interval = 1

    def _on_message(self, ws, message):
        """处理收到的数据"""
        try:
            data = json.loads(message)

            # TickDB 心跳响应
            if data.get("type") == "pong":
                return

            # depth 数据格式示例:
            # {"channel": "depth", "symbol": "BTC.USDT", "data": {
            #   "bids": [[...价格... , ...数量...], ...],
            #   "asks": [[...价格... , ...数量...], ...],
            #   "timestamp": 1713000000000
            # }}
            if data.get("channel") == "depth":
                self._process_depth(data)

        except json.JSONDecodeError:
            print(f"[警告] 收到非 JSON 消息: {message[:100]}")

    def _process_depth(self, msg: dict):
        """计算买卖压力比"""
        symbol = msg.get("symbol", "unknown")
        depth_data = msg.get("data", {})

        bids = depth_data.get("bids", [])
        asks = depth_data.get("asks", [])

        if not bids or not asks:
            return

        # 取前 5 档计算压力比
        bid_volume = sum(float(b[1]) for b in bids[:5])
        ask_volume = sum(float(a[1]) for a in asks[:5])
        pressure_ratio = bid_volume / ask_volume if ask_volume else 0

        print(f"[{symbol}] "
              f"买盘深度: {bid_volume:,.0f} | "
              f"卖盘深度: {ask_volume:,.0f} | "
              f"压力比: {pressure_ratio:.2f}")

    def _on_error(self, ws, error):
        print(f"[错误] WebSocket 错误: {error}")
        self.running = False

    def _on_close(self, ws, close_status_code, close_msg):
        print(f"[断开] 连接关闭: code={close_status_code}, msg={close_msg}")
        self.running = False

    def _on_ping(self, ws, body):
        print("[心跳] 收到服务端 ping")

    def _on_pong(self, ws, body):
        print("[心跳] 收到服务端 pong,连接存活")

    def run_with_reconnect(self):
        """
        启动监控,自动重连循环

        重连策略:
          断开后等待 reconnect_interval 秒再重连,
          每次失败后间隔翻倍(指数退避),上限 max_reconnect_interval
        """
        retry_count = 0

        while True:
            self.connect()

            if not self.running and retry_count < 10:
                retry_count += 1
                delay = min(
                    self.reconnect_interval * (2 ** (retry_count - 1)),
                    self.max_reconnect_interval
                )
                jitter = 0.1 * delay * random.uniform(-0.5, 0.5)
                wait_time = delay + jitter

                print(f"[重连] {retry_count}/10 次尝试,"
                      f"{wait_time:.1f} 秒后重试...")
                time.sleep(wait_time)
            else:
                print("[终止] 重连次数已达上限,退出")
                break


if __name__ == "__main__":
    monitor = TickDBDepthMonitor(["BTC.USDT", "ETH.USDT", "SOL.USDT"])
    try:
        monitor.run_with_reconnect()
    except KeyboardInterrupt:
        print("\n[退出] 收到中断信号,关闭连接")
        monitor.running = False

几个值得注意的实现细节:

心跳机制:TickDB WebSocket 使用 ping/pong 维持连接。ping_interval=20 是比较保守的设置,部分场景(高频数据流)可以缩短到 10 秒,但要注意不要触发限频。

断线重连的指数退避:与 REST 请求的重试逻辑一致,WebSocket 断开后也应采用指数退避,避免断线后立刻重连造成二次风暴。

_on_close 的处理:实际运行中,close_status_code 的含义需要参考 TickDB 的具体协议文档。如果收到 4000-4999 区间的状态码,通常意味着需要检查请求参数;如果是非预期断线,先尝试重连。


实测四:日配额耗尽的全天模拟

这是最有参考价值的实测场景:我们模拟一个典型的量化开发者一天中的 API 调用模式,看看 5,000 次配额能用多久。

假设使用场景如下:

时段 操作 单次请求数
盘前 30 分钟 批量拉取 30 个数字货币的 1h K 线 30 次
盘中 每个标的每分钟轮询 1 次(30 个标的) 1,800 次/小时
事件驱动监控 depth 频道实时订阅(不计入 REST 配额) 0 次
盘后批量回测 拉取 100 个历史 K 线片段 100 次

如果仅开启盘中轮询,5,000 次配额在约 2.5 小时 内耗尽。

这意味着免费层不适合做持续轮询类的策略。更合理的用法是:

改为事件驱动模式:用 WebSocket 订阅 depth 频道,仅在订单簿出现异常时(比如买卖压力比突破阈值)才触发 REST 请求去拉取额外数据。这样大部分计算在本地完成,API 调用量可以降到日均 200-500 次。

# 事件驱动的轻量请求策略伪代码

class AdaptiveDataFetcher:
    """
    自适应数据获取器:
    仅在检测到流动性异常时才调用 REST API,
    正常情况下依赖 WebSocket 推送
    """

    def __init__(self):
        self.pressure_threshold_high = 2.5  # 买盘压力过大,警惕回调
        self.pressure_threshold_low = 0.4   # 卖盘压力过大,警惕反弹
        self.daily_request_count = 0
        self.daily_limit = 5000

    def on_depth_update(self, symbol: str, pressure_ratio: float):
        """depth 推送触发时调用"""
        if self.daily_request_count >= self.daily_limit:
            print("[配额] 今日配额已用尽,等待明日重置")
            return

        if (pressure_ratio > self.pressure_threshold_high or
                pressure_ratio < self.pressure_threshold_low):
            # 异常信号:才触发 REST 请求获取更多上下文
            print(f"[信号] {symbol} 压力比异常: {pressure_ratio:.2f},"
                  f"拉取更多数据...")
            self.daily_request_count += 1
            # 调用 fetch_with_backoff 获取 kline 做确认
        else:
            # 正常状态:仅记录,不发 REST 请求
            pass

免费层能力边界总结

场景 免费层能否支撑? 建议
日内 K 线回测(100 个标的 × 500 条历史) ⚠️ 勉强(需分 5 天拉取) 批量请求加间隔,命中 3001 就换天
实时 depth 监控(数字货币 + 港股) ✅ 完全支持 WebSocket 连接数限 1 个,注意心跳
日内高频轮询策略 ❌ 不支持 5,000 次配额撑不过 3 小时
事件驱动 + 阈值触发模式 ✅ 推荐 REST 请求量可控制在 500 次/天以内
美股订单簿分析 ⚠️ 有限(仅 1 档 depth) depth 信息量不足,建议搭配其他数据源
数字货币 trades 逐笔分析 ✅ 支持 trades 接口支持数字货币,可用于订单流分析
跨资产套利策略 ⚠️ 部分 港股和数字货币可覆盖,美股 depth 档位少

给白嫖党的几条忠告

第一,把 REST 配额当成"奢侈品",不是日用品。 WebSocket 推送是免费层的核心优势,REST 接口是"查漏补缺"的工具。不要设计任何需要持续轮询 REST 的策略,那会让你在早上十点就把一天的配额烧光。

第二,永远实现指数退避和 Retry-After 读取。 这不是"锦上添花",是基本工程素养。一个没有退避处理的脚本,在配额耗尽后会陷入死循环,不断触发限频,既拿不到数据,又浪费开发时间。

第三,配额重置是 UTC 0 点,但你的服务器可能不在 UTC 时区。 如果你在北京时间早上 8 点部署策略,记得 8 点前后配额状态是割裂的。建议在配额接近上限时记录当前时间戳,防止跨 UTC 日界线的请求被意外拒绝。

第四,免费层最适合的场景是:小仓位验证 + 策略学习 + 技术原型。 到了实盘阶段,或者日均 API 调用超过 5,000 次,请认真考虑付费层或者用历史数据做离线回测,把实时 API 留给真正需要实时性的环节。


下一步行动

如果你正在设计量化策略:先评估你的 API 调用模式——是轮询还是事件驱动?免费层的 5,000 次配额对事件驱动模式足够,对轮询模式完全不够。

如果你想亲手测试限频边界:访问 tickdb.ai 注册(免费,无需信用卡),复制本文代码,把环境变量 TICKDB_API_KEY 配置好,直接跑 batch_fetch_demo

如果你需要更长历史数据(>90 天)做完整回测:联系 [email protected] 了解机构版方案,历史 K 线数据支持最长 10 年范围。

如果你在写高频告警脚本:在 AI 助手中搜索安装 tickdb-market-data SKILL,可以直接用自然语言查询 TickDB 的数据能力边界。


本文不构成任何投资建议。市场有风险,投资需谨慎。