"你的 API Key 还有效,但今日配额用完了"
凌晨两点,你刚写完一套财报事件驱动策略的回测脚本。回测结果漂亮得不像话——年化 34%,夏普 2.1,最大回撤 8%。你信心满满地点下"部署",然后收到了一条报错:
{"code": 3001, "message": "Rate limit exceeded", "data": null}
Retry-After: 60
你盯着 Retry-After: 60 愣了三秒,心想:这玩意儿到底限到什么程度?
本文用实测告诉你:TickDB 免费层能做什么、不能做什么,以及配额耗尽后代码应该如何优雅地应对。
免费层到底给了多少?
在开始实测之前,先把官方限制说清楚。根据 TickDB 当前的公开文档,免费层的核心限制如下:
| 限制维度 | 免费层规格 |
|---|---|
| 日请求配额(REST) | 5,000 次 / 自然日(UTC 0 点重置) |
| WebSocket 并发连接数 | 1 个 |
| WebSocket 单次最大订阅时长 | 无硬性上限,但需配合心跳维持 |
| 可访问资产类别 | 数字货币、港股(部分) |
| 可用接口 | /v1/market/kline、/v1/market/depth、WebSocket kline/depth/trades |
| 历史数据范围 | 最近 90 天 K 线 |
| 账户数量 | 1 个 |
几点需要特别说明:
第一,配额是按"天"而非按"月"计算的。 5,000 次/天的限制意味着:如果你在盘前半小时密集请求,可能 9 点半就把当天配额用完了。配额重置时间是 UTC 0 点,换算成北京时间是早上 8 点。
第二,WebSocket 连接本身不计入 REST 配额,但订阅频道产生的数据流有隐性的带宽限制。 也就是说,你可以保持一个长连接持续接收 depth 推送,但这不消耗日请求次数。不过,一旦你在 WebSocket 中发送了请求(比如订阅新频道),同样受到限频约束。
第三,depth 频道的档位数因市场而异。 美股 1 档、港股 10 档、数字货币 10 档。如果你期待用免费层的 depth 数据做美股订单簿分析,1 档信息量相当有限。
实测一:限频触发的完整路径
最让开发者困惑的,不是"有多少配额",而是"配额耗尽后会发生什么"。我们来完整走一遍触发路径。
下面的代码故意在循环中不加任何限频处理,先让你看清楚原始错误长什么样:
import os
import time
import requests
API_KEY = os.environ.get("TICKDB_API_KEY")
BASE_URL = "https://api.tickdb.ai"
headers = {
"X-API-Key": API_KEY,
"Content-Type": "application/json"
}
def fetch_kline(symbol: str, limit: int = 100):
"""拉取 K 线数据,不含限频处理的版本"""
url = f"{BASE_URL}/v1/market/kline"
params = {
"symbol": symbol,
"interval": "1h",
"limit": limit
}
response = requests.get(
url, headers=headers, params=params, timeout=(3.05, 10)
)
data = response.json()
code = data.get("code", 0)
if code == 0:
return data.get("data", [])
elif code == 3001:
# ⚠️ 这里是限频触发的标志
retry_after = int(response.headers.get("Retry-After", 5))
print(f"[限频] code=3001, Retry-After: {retry_after} 秒")
return None
else:
print(f"[错误] code={code}, message={data.get('message')}")
return None
def naive_stress_test():
"""不加控制的暴力请求,测试限频边界"""
results = []
for i in range(15):
result = fetch_kline("BTC.USDT", limit=10)
if result is None:
print(f" → 第 {i+1} 次请求触发限频,中止")
break
results.append(len(result))
print(f" 第 {i+1} 次请求: 成功, 返回 {len(result)} 条数据")
print(f"\n累计成功请求: {len(results)} 次")
if __name__ == "__main__":
naive_stress_test()
如果你在短时间内(比如 5 秒内)连续发送 10-15 次请求,大概率在第 6-10 次之间触发 code=3001,响应头中会包含 Retry-After,值通常在 30-60 秒之间。
关键发现:Retry-After 的值不是固定的。它取决于你在窗口期内的违规程度——轻微超限可能是 5-10 秒,连续高频触发可能直接收到 120 秒。
实测二:指数退避 + 抖动的正确姿势
知道触发条件后,下一步是正确应对。工程上标准的限频处理逻辑是指数退避(Exponential Backoff)配合抖动(Jitter):
import os
import time
import random
import requests
API_KEY = os.environ.get("TICKDB_API_KEY")
BASE_URL = "https://api.tickdb.ai"
headers = {"X-API-Key": API_KEY}
# ⚠️ 生产环境高频场景建议使用 aiohttp/asyncio
# 以下为同步版本,适用于日级别批量任务
MAX_RETRIES = 5
BASE_DELAY = 1.0 # 初始等待 1 秒
MAX_DELAY = 60.0 # 最大等待 60 秒
def handle_tickdb_error(response: requests.Response, symbol: str = None):
"""
TickDB 标准错误处理
返回值:
- (True, data): 成功,data 为业务数据
- (False, retry_after): 限频,需要重试
- (False, None): 不可恢复错误
"""
data = response.json()
code = data.get("code", 0)
if code == 0:
return True, data.get("data", [])
if code in (1001, 1002):
raise ValueError(f"[致命] API Key 无效或缺失: {data.get('message')}")
if code == 2002:
raise KeyError(f"[致命] 交易品种不存在: {symbol}")
if code == 3001:
retry_after = int(response.headers.get("Retry-After", 5))
print(f"[限频] 触发 code=3001,等待 {retry_after} 秒后重试")
return False, retry_after
# 其他未知错误码
raise RuntimeError(f"[未知错误] code={code}: {data.get('message')}")
def fetch_with_backoff(symbol: str, interval: str = "1h", limit: int = 100):
"""
带指数退避和抖动的 K 线数据获取
重试策略:
每次失败后,等待时间 = min(BASE_DELAY * 2^retry + jitter, MAX_DELAY)
jitter 范围为等待时间的 [0, 10%],避免多实例同时重试的"惊群效应"
"""
for retry in range(MAX_RETRIES):
try:
response = requests.get(
f"{BASE_URL}/v1/market/kline",
headers=headers,
params={"symbol": symbol, "interval": interval, "limit": limit},
timeout=(3.05, 10) # (connect_timeout, read_timeout)
)
success, result = handle_tickdb_error(response, symbol)
if success:
if retry > 0:
print(f"[恢复] 请求在第 {retry} 次重试后成功")
return result
# handle_tickdb_error 返回 (False, retry_after) 表示限频
retry_after = result
if retry == MAX_RETRIES - 1:
print(f"[放弃] 达到最大重试次数 ({MAX_RETRIES}),本次请求终止")
return None
# 计算退避时间:指数增长 + 随机抖动
delay = min(BASE_DELAY * (2 ** retry), MAX_DELAY)
jitter = random.uniform(0, delay * 0.1)
total_delay = delay + jitter
print(f"[重试 {retry+1}/{MAX_RETRIES}] "
f"等待 {total_delay:.2f} 秒 "
f"(基础 {delay}s + 抖动 {jitter:.2f}s)")
time.sleep(total_delay)
except requests.exceptions.Timeout:
print(f"[重试 {retry+1}/{MAX_RETRIES}] 请求超时,指数退避")
time.sleep(min(BASE_DELAY * (2 ** retry), MAX_DELAY))
except requests.exceptions.ConnectionError as e:
print(f"[重试 {retry+1}/{MAX_RETRIES}] 连接错误: {e}")
time.sleep(min(BASE_DELAY * (2 ** retry), MAX_DELAY))
return None
def batch_fetch_demo(symbols: list[str]):
"""演示批量获取多标的时的配額管理"""
print(f"\n{'='*50}")
print(f"开始批量请求 {len(symbols)} 个标的...")
print(f"免费层日限额: 5,000 次/天")
print(f"{'='*50}\n")
results = {}
for i, sym in enumerate(symbols, 1):
print(f"[{i}/{len(symbols)}] 请求 {sym}...", end=" ")
data = fetch_with_backoff(sym, limit=50)
if data:
print(f"✓ 获取 {len(data)} 条 K 线")
results[sym] = data
else:
print("✗ 获取失败")
# 正常请求之间加 200ms 间隔,避免触发隐性限频
# ⚠️ 如果你的日配额接近上限,这里改为 500ms 或更大
time.sleep(0.2)
print(f"\n总结: 成功 {len(results)}/{len(symbols)} 个标的")
return results
if __name__ == "__main__":
demo_symbols = [
"BTC.USDT", "ETH.USDT",
"SOL.USDT", "DOGE.USDT",
"HK.00700", "HK.09988"
]
batch_fetch_demo(demo_symbols)
这段代码覆盖了免费层使用中最重要的三个场景:
- 限频识别:通过
code=3001+Retry-After头精准判断,而非靠猜 - 指数退避:每次重试等待时间翻倍,防止"越催越慢"的恶性循环
- 抖动:每次等待时间加一个随机偏移量,避免多进程/多实例同时在第 N 秒重试(惊群)
实测三:WebSocket 连接的实际表现
免费层可以建立 WebSocket 连接,这一点比很多竞品慷慨。但连接能跑多久、断开后会发生什么,是另一个高频困惑点。
import os
import json
import time
import websocket # pip install websocket-client
API_KEY = os.environ.get("TICKDB_API_KEY")
WS_URL = "wss://api.tickdb.ai/ws/v1/market"
# ⚠️ WebSocket 鉴权用 URL 参数,不在 Header 里
WSS_URL = f"{WS_URL}?api_key={API_KEY}"
class TickDBDepthMonitor:
"""
实时监控 depth 数据流,含心跳保活和断线重连
使用场景:
- 数字货币和港股的订单簿实时追踪
- 配合买卖压力比计算,识别流动性突变信号
"""
def __init__(self, symbols: list[str]):
self.symbols = symbols
self.ws = None
self.reconnect_interval = 1 # 初始重连间隔(秒)
self.max_reconnect_interval = 30
self.running = False
def connect(self):
"""建立 WebSocket 连接"""
try:
self.ws = websocket.WebSocketApp(
WSS_URL,
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
on_ping=self._on_ping,
on_pong=self._on_pong,
)
print(f"[连接] 正在连接到 {WS_URL}")
self.running = True
self.ws.run_forever(
ping_interval=20, # 每 20 秒发一次 ping
ping_timeout=10, # ping 超时 10 秒认为断线
)
except Exception as e:
print(f"[异常] WebSocket 初始化失败: {e}")
def _send_subscribe(self):
"""订阅 depth 频道"""
for symbol in self.symbols:
sub_msg = json.dumps({
"cmd": "subscribe",
"args": {"channel": "depth", "symbol": symbol}
})
self.ws.send(sub_msg)
print(f"[订阅] depth:{symbol}")
def _on_open(self, ws):
print("[就绪] WebSocket 连接已建立")
self._send_subscribe()
# 重连成功后重置间隔
self.reconnect_interval = 1
def _on_message(self, ws, message):
"""处理收到的数据"""
try:
data = json.loads(message)
# TickDB 心跳响应
if data.get("type") == "pong":
return
# depth 数据格式示例:
# {"channel": "depth", "symbol": "BTC.USDT", "data": {
# "bids": [[...价格... , ...数量...], ...],
# "asks": [[...价格... , ...数量...], ...],
# "timestamp": 1713000000000
# }}
if data.get("channel") == "depth":
self._process_depth(data)
except json.JSONDecodeError:
print(f"[警告] 收到非 JSON 消息: {message[:100]}")
def _process_depth(self, msg: dict):
"""计算买卖压力比"""
symbol = msg.get("symbol", "unknown")
depth_data = msg.get("data", {})
bids = depth_data.get("bids", [])
asks = depth_data.get("asks", [])
if not bids or not asks:
return
# 取前 5 档计算压力比
bid_volume = sum(float(b[1]) for b in bids[:5])
ask_volume = sum(float(a[1]) for a in asks[:5])
pressure_ratio = bid_volume / ask_volume if ask_volume else 0
print(f"[{symbol}] "
f"买盘深度: {bid_volume:,.0f} | "
f"卖盘深度: {ask_volume:,.0f} | "
f"压力比: {pressure_ratio:.2f}")
def _on_error(self, ws, error):
print(f"[错误] WebSocket 错误: {error}")
self.running = False
def _on_close(self, ws, close_status_code, close_msg):
print(f"[断开] 连接关闭: code={close_status_code}, msg={close_msg}")
self.running = False
def _on_ping(self, ws, body):
print("[心跳] 收到服务端 ping")
def _on_pong(self, ws, body):
print("[心跳] 收到服务端 pong,连接存活")
def run_with_reconnect(self):
"""
启动监控,自动重连循环
重连策略:
断开后等待 reconnect_interval 秒再重连,
每次失败后间隔翻倍(指数退避),上限 max_reconnect_interval
"""
retry_count = 0
while True:
self.connect()
if not self.running and retry_count < 10:
retry_count += 1
delay = min(
self.reconnect_interval * (2 ** (retry_count - 1)),
self.max_reconnect_interval
)
jitter = 0.1 * delay * random.uniform(-0.5, 0.5)
wait_time = delay + jitter
print(f"[重连] {retry_count}/10 次尝试,"
f"{wait_time:.1f} 秒后重试...")
time.sleep(wait_time)
else:
print("[终止] 重连次数已达上限,退出")
break
if __name__ == "__main__":
monitor = TickDBDepthMonitor(["BTC.USDT", "ETH.USDT", "SOL.USDT"])
try:
monitor.run_with_reconnect()
except KeyboardInterrupt:
print("\n[退出] 收到中断信号,关闭连接")
monitor.running = False
几个值得注意的实现细节:
心跳机制:TickDB WebSocket 使用 ping/pong 维持连接。ping_interval=20 是比较保守的设置,部分场景(高频数据流)可以缩短到 10 秒,但要注意不要触发限频。
断线重连的指数退避:与 REST 请求的重试逻辑一致,WebSocket 断开后也应采用指数退避,避免断线后立刻重连造成二次风暴。
_on_close 的处理:实际运行中,close_status_code 的含义需要参考 TickDB 的具体协议文档。如果收到 4000-4999 区间的状态码,通常意味着需要检查请求参数;如果是非预期断线,先尝试重连。
实测四:日配额耗尽的全天模拟
这是最有参考价值的实测场景:我们模拟一个典型的量化开发者一天中的 API 调用模式,看看 5,000 次配额能用多久。
假设使用场景如下:
| 时段 | 操作 | 单次请求数 |
|---|---|---|
| 盘前 30 分钟 | 批量拉取 30 个数字货币的 1h K 线 | 30 次 |
| 盘中 | 每个标的每分钟轮询 1 次(30 个标的) | 1,800 次/小时 |
| 事件驱动监控 | depth 频道实时订阅(不计入 REST 配额) | 0 次 |
| 盘后批量回测 | 拉取 100 个历史 K 线片段 | 100 次 |
如果仅开启盘中轮询,5,000 次配额在约 2.5 小时 内耗尽。
这意味着免费层不适合做持续轮询类的策略。更合理的用法是:
改为事件驱动模式:用 WebSocket 订阅 depth 频道,仅在订单簿出现异常时(比如买卖压力比突破阈值)才触发 REST 请求去拉取额外数据。这样大部分计算在本地完成,API 调用量可以降到日均 200-500 次。
# 事件驱动的轻量请求策略伪代码
class AdaptiveDataFetcher:
"""
自适应数据获取器:
仅在检测到流动性异常时才调用 REST API,
正常情况下依赖 WebSocket 推送
"""
def __init__(self):
self.pressure_threshold_high = 2.5 # 买盘压力过大,警惕回调
self.pressure_threshold_low = 0.4 # 卖盘压力过大,警惕反弹
self.daily_request_count = 0
self.daily_limit = 5000
def on_depth_update(self, symbol: str, pressure_ratio: float):
"""depth 推送触发时调用"""
if self.daily_request_count >= self.daily_limit:
print("[配额] 今日配额已用尽,等待明日重置")
return
if (pressure_ratio > self.pressure_threshold_high or
pressure_ratio < self.pressure_threshold_low):
# 异常信号:才触发 REST 请求获取更多上下文
print(f"[信号] {symbol} 压力比异常: {pressure_ratio:.2f},"
f"拉取更多数据...")
self.daily_request_count += 1
# 调用 fetch_with_backoff 获取 kline 做确认
else:
# 正常状态:仅记录,不发 REST 请求
pass
免费层能力边界总结
| 场景 | 免费层能否支撑? | 建议 |
|---|---|---|
| 日内 K 线回测(100 个标的 × 500 条历史) | ⚠️ 勉强(需分 5 天拉取) | 批量请求加间隔,命中 3001 就换天 |
| 实时 depth 监控(数字货币 + 港股) | ✅ 完全支持 | WebSocket 连接数限 1 个,注意心跳 |
| 日内高频轮询策略 | ❌ 不支持 | 5,000 次配额撑不过 3 小时 |
| 事件驱动 + 阈值触发模式 | ✅ 推荐 | REST 请求量可控制在 500 次/天以内 |
| 美股订单簿分析 | ⚠️ 有限(仅 1 档 depth) | depth 信息量不足,建议搭配其他数据源 |
| 数字货币 trades 逐笔分析 | ✅ 支持 | trades 接口支持数字货币,可用于订单流分析 |
| 跨资产套利策略 | ⚠️ 部分 | 港股和数字货币可覆盖,美股 depth 档位少 |
给白嫖党的几条忠告
第一,把 REST 配额当成"奢侈品",不是日用品。 WebSocket 推送是免费层的核心优势,REST 接口是"查漏补缺"的工具。不要设计任何需要持续轮询 REST 的策略,那会让你在早上十点就把一天的配额烧光。
第二,永远实现指数退避和 Retry-After 读取。 这不是"锦上添花",是基本工程素养。一个没有退避处理的脚本,在配额耗尽后会陷入死循环,不断触发限频,既拿不到数据,又浪费开发时间。
第三,配额重置是 UTC 0 点,但你的服务器可能不在 UTC 时区。 如果你在北京时间早上 8 点部署策略,记得 8 点前后配额状态是割裂的。建议在配额接近上限时记录当前时间戳,防止跨 UTC 日界线的请求被意外拒绝。
第四,免费层最适合的场景是:小仓位验证 + 策略学习 + 技术原型。 到了实盘阶段,或者日均 API 调用超过 5,000 次,请认真考虑付费层或者用历史数据做离线回测,把实时 API 留给真正需要实时性的环节。
下一步行动
如果你正在设计量化策略:先评估你的 API 调用模式——是轮询还是事件驱动?免费层的 5,000 次配额对事件驱动模式足够,对轮询模式完全不够。
如果你想亲手测试限频边界:访问 tickdb.ai 注册(免费,无需信用卡),复制本文代码,把环境变量 TICKDB_API_KEY 配置好,直接跑 batch_fetch_demo。
如果你需要更长历史数据(>90 天)做完整回测:联系 [email protected] 了解机构版方案,历史 K 线数据支持最长 10 年范围。
如果你在写高频告警脚本:在 AI 助手中搜索安装 tickdb-market-data SKILL,可以直接用自然语言查询 TickDB 的数据能力边界。
本文不构成任何投资建议。市场有风险,投资需谨慎。