先问自己一个问题
你上周调用 TickDB 的费用超了,还是还有余量?
大多数人在正式接入之前,不会去想这个问题。等收到账单的那一刻,才发现自己低估了调用量——100 只股票、分钟级数据、7×24 小时运行,三个条件叠加在一起,一个月轻轻松松就是几十万次请求。
这不是 TickDB 的问题,是估算方法的问题。
本文给出两件事:第一,一个你能在接入前就算清楚的用量公式;第二,一套让实际调用量远低于理论值的工程策略。读完本文,你应该能自己算出“你的场景”一个月大概花多少,以及从哪里省。
一、为什么用量估算总是差 10 倍
1.1 常见误区
在进入公式之前,先拆解几个典型误区,它们几乎每个团队第一次接入时都会踩。
误区一:把“轮询间隔”当“调用次数”
"分钟级数据,1 分钟请求一次,1 天 1440 次,100 只股票 144,000 次,不贵。"
这个算法漏掉了两个关键因素:
- TickDB 的 REST API 按请求计费,而不是按股票数量计费。你一次请求可以查 100 只股票,也可以一只一只查,结果天差地别。
- 实时监控场景下,你的程序通常是长连接 + 推送,而不是轮询,推送不消耗 REST 请求配额。
误区二:把“实时 K 线”和“历史 K 线”混用
TickDB 提供了两个完全不同的接口:
/kline/latest:获取当前未收盘 K 线,适合实时监控,单次请求响应极快/kline:获取已收盘的历史 K 线,适合回测,返回的是确定的历史数据
如果你在实时监控场景下用 /kline 轮询,每分钟对 100 只股票请求一次,实际上是在重复请求同一批数据——这批数据只有到收盘那一刻才会变化,但你的请求却在每一分钟都在发生。
误区三:低估“心跳”和“重连”的开销
生产级 WebSocket 连接不是“连上就不管了”。实际运行中,网络抖动、服务器维护、程序重启都会触发重连。如果你的重连逻辑没有退避处理,短时间内的大量重连会直接推高有效请求量。
1.2 正确的估算思路
正确的思路是先分场景,再算调用量。不同使用场景的计费模型完全不同:
| 使用场景 | 数据类型 | 推荐接口 | 计费方式 |
|---|---|---|---|
| 实时监控(盯盘、告警) | 当前 K 线 | /kline/latest + WebSocket |
按实际请求/推送次数 |
| 策略回测 | 历史 K 线 | /kline |
按查询返回的数据量 |
| 订单簿快照 | 盘口数据 | depth 频道 |
按推送频率 |
| 盘后分析 | 日级别历史 | /kline (interval=1d) |
按查询次数 |
下面给出一个通用的估算框架,你只需要把自己的参数代进去。
二、用量估算公式:把参数代进去
2.1 场景一:实时监控(REST + WebSocket 混合)
这是最常见的场景:监控 100 只股票的分钟级数据,价格异动时告警。
月调用量公式:
月 REST 请求数 = (N × 1/60 × T) + S
其中:
N = 监控标的数量
1/60 = 降频系数(分钟级数据不需要每分钟查 60 次)
T = 月运行天数(按 22 天计)
S = 特殊操作次数(换班重连、策略重启等,建议设为日请求量的 5%)
如果你在程序启动时批量查询所有股票的状态,这部分也算调用量:
启动批量查询 = ceil(N / 单批最大数量) × 日启动次数
代入具体数字:
月 REST 请求数 = (100 × 1 × 22) + (100 × 22 × 5%)
= 2,200 + 110
= 2,310 次/月
加上 WebSocket 长连接——WebSocket 的 depth 频道推送不消耗 REST 请求配额,只要你的代码只订阅、不轮询 REST,这部分的调用量可以忽略不计。
关键点:分钟级监控不需要每分钟查 60 次。你真正需要的是"最近一次 K 线更新的时间戳"是否变化。如果更新了,说明有新数据;如果没更新,说明数据没有变化——你不需要反复请求。
2.2 场景二:日线回测(历史 K 线查询)
这是第二种典型场景:用过去 3 年的日线数据回测一个趋势策略。
月调用量公式:
月请求数 = ceil(D / 单次返回上限) × N
其中:
D = 历史总交易日数(约 750 天/年 × 年数)
N = 股票数量
单次返回上限 = TickDB 单次 K 线查询的返回上限
代入具体数字(3 年、100 只股票):
月请求数 = ceil(750 × 3 / 1000) × 100
= ceil(2.25) × 100
= 3 × 100
= 300 次
300 次请求,换来 100 只股票、3 年的完整日线数据——这就是批量查询的威力。
2.3 场景三:高频轮询(不推荐,但很多人正在做)
如果你真的每分钟对 100 只股票各发一次 REST 请求:
月请求数 = 100 × 60 × 22 = 132,000 次/月
对比一下优化后的 2,310 次/月——相差 57 倍。这就是估算错误的代价。
2.4 一张表看懂三种场景的差距
| 场景 | 标的数 | 时间粒度 | 月调用量(估算) | 调用量级别 |
|---|---|---|---|---|
| 高频轮询(错误做法) | 100 | 分钟 | 132,000 | 10 万级 |
| 降频轮询 + WebSocket(标准做法) | 100 | 分钟 | 2,310 | 千级 |
| 日线回测(批量查询) | 100 | 日线 | 300 | 百级 |
差距不是 10%,是 10 倍到 100 倍。
三、生产级代码:把估算公式落地
3.1 场景一:实时监控的标准架构
下面是一套生产级实时监控代码,包含降频逻辑、WebSocket 订阅、以及 REST 接口的按需调用。
import os
import time
import json
import threading
import requests
from collections import defaultdict
from datetime import datetime, timedelta
# ============================================================
# TickDB 实时监控:标准生产架构
# 核心优化:降频 + WebSocket 推送 + 按需 REST 查询
# ============================================================
API_KEY = os.environ.get("TICKDB_API_KEY")
BASE_URL = "https://api.tickdb.ai/v1/market"
HEADERS = {"X-API-Key": API_KEY}
class TickDBMonitor:
"""
TickDB 实时监控器
架构设计:
1. WebSocket 接收 depth/ticker 推送(不消耗 REST 配额)
2. REST /kline/latest 按需查询(降频到每标的最多 1 次/分钟)
3. 内存缓存最近更新时间戳,避免重复查询
"""
def __init__(self, symbols: list[str], alert_threshold: float = 0.03):
self.symbols = symbols
self.alert_threshold = alert_threshold # 价格变动超过 3% 触发告警
self._cache = {} # {symbol: {"last_close": float, "last_update": datetime}}
self._lock = threading.Lock()
self._running = False
# WebSocket 连接状态
self._ws = None
self._ws_reconnect_delay = 1
self._ws_max_delay = 60
self._ws_retry_count = 0
# REST 请求统计(用于用量监控)
self._rest_call_count = 0
# ----------------------------------------------------------
# 核心方法 1:降频轮询 /kline/latest
# ----------------------------------------------------------
def poll_latest_klines(self) -> dict:
"""
获取所有标的的最新 K 线(分钟级)。
关键优化:只有当本地缓存的数据已过期(超过 1 分钟)
或不存在时,才真正发起 REST 请求。
"""
results = {}
now = datetime.now()
with self._lock:
for symbol in self.symbols:
cached = self._cache.get(symbol)
# 降频:如果缓存未过期(1 分钟以内),跳过请求
if cached and (now - cached["last_update"]).seconds < 60:
results[symbol] = cached
continue
# 缓存过期,发起 REST 请求
url = f"{BASE_URL}/kline/latest"
params = {
"symbol": symbol,
"interval": "1m",
"limit": 1
}
try:
response = requests.get(
url,
headers=HEADERS,
params=params,
timeout=(3.05, 10)
)
data = response.json()
if data.get("code") == 0:
kline = data["data"]["klines"][0]
close_price = float(kline["close"])
self._cache[symbol] = {
"last_close": close_price,
"last_update": now,
"open": float(kline["open"]),
"high": float(kline["high"]),
"low": float(kline["low"]),
}
results[symbol] = self._cache[symbol]
self._rest_call_count += 1
except requests.exceptions.Timeout:
# ⚠️ 超时处理:记录日志,不阻塞其他标的
print(f"[WARN] 请求 {symbol} 超时,跳过本次")
continue
return results
# ----------------------------------------------------------
# 核心方法 2:监听价格异动
# ----------------------------------------------------------
def check_alerts(self):
"""
对比当前价格与缓存价格,检测异动。
"""
klines = self.poll_latest_klines()
alerts = []
for symbol, data in klines.items():
cached = self._cache.get(symbol)
if cached and "last_close" in cached:
change_pct = abs(data["last_close"] - cached["last_close"]) / cached["last_close"]
if change_pct > self.alert_threshold:
alerts.append({
"symbol": symbol,
"price": data["last_close"],
"change_pct": round(change_pct * 100, 2),
"direction": "up" if data["last_close"] > cached["last_close"] else "down"
})
return alerts
# ----------------------------------------------------------
# 核心方法 3:启动 WebSocket 深度订阅(推送模式)
# ----------------------------------------------------------
def start_depth_subscription(self):
"""
启动 WebSocket depth 频道订阅。
⚠️ 注意:WebSocket 连接参数在 URL 中传递 api_key
⚠️ 生产环境建议使用 aiohttp/asyncio 架构
"""
import websocket
ws_url = f"wss://api.tickdb.ai/v1/market/stream?api_key={API_KEY}&channels=depth"
def on_message(ws, message):
msg = json.loads(message)
# 处理 depth 推送,更新缓存(不消耗 REST 配额)
if msg.get("channel") == "depth":
symbol = msg.get("symbol")
with self._lock:
if symbol in self._cache:
self._cache[symbol]["depth"] = msg.get("data")
def on_error(ws, error):
print(f"[WS ERROR] {error}")
def on_close(ws, code, reason):
# ⚠️ 指数退避重连:避免惊群效应
delay = min(self._ws_reconnect_delay * (2 ** self._ws_retry_count), self._ws_max_delay)
jitter = (hash(str(time.time())) % 1000) / 1000.0 # 简单抖动
wait = delay + jitter * delay * 0.1
print(f"[WS CLOSE] {code} {reason},{wait:.1f}s 后重连(第 {self._ws_retry_count + 1} 次)")
time.sleep(wait)
self._ws_retry_count += 1
self.start_depth_subscription()
def on_open(ws):
print("[WS OPEN] 深度订阅已连接")
self._ws_retry_count = 0
self._ws_reconnect_delay = 1
# 订阅 depth 频道
subscribe_msg = {
"cmd": "subscribe",
"params": {
"channels": ["depth"],
"symbols": self.symbols
}
}
ws.send(json.dumps(subscribe_msg))
self._ws = websocket.WebSocketApp(
ws_url,
on_message=on_message,
on_error=on_error,
on_close=on_close,
on_open=on_open
)
# 非阻塞启动(生产环境建议用 threading 或 asyncio)
ws_thread = threading.Thread(target=self._ws.run_forever, daemon=True)
ws_thread.start()
def get_rest_call_stats(self) -> dict:
"""返回本月 REST 调用统计,用于成本监控"""
return {
"total_rest_calls": self._rest_call_count,
"monitored_symbols": len(self.symbols),
"cache_hit_rate": self._estimate_cache_hit_rate()
}
def _estimate_cache_hit_rate(self) -> float:
"""估算缓存命中率(降频效果)"""
if self._rest_call_count == 0:
return 0.0
theoretical_calls = len(self.symbols) * 60 * 22 # 高频轮询理论值
return round(1 - (self._rest_call_count / theoretical_calls), 4)
if __name__ == "__main__":
symbols = [f"{s}.US" for s in ["AAPL", "MSFT", "GOOGL", "NVDA", "TSLA"]]
monitor = TickDBMonitor(symbols=symbols, alert_threshold=0.03)
# 启动 WebSocket 深度订阅
monitor.start_depth_subscription()
# 主循环(每 10 秒检查一次告警)
# ⚠️ 注意:这里的轮询间隔是 10 秒,
# 但 /kline/latest 在缓存未过期时不会发起实际请求
while True:
try:
alerts = monitor.check_alerts()
if alerts:
print(f"[{datetime.now().strftime('%H:%M:%S')}] 检测到异动: {alerts}")
stats = monitor.get_rest_call_stats()
print(f"[{datetime.now().strftime('%H:%M:%S')}] "
f"REST 调用: {stats['total_rest_calls']}, "
f"缓存命中率: {stats['cache_hit_rate']:.1%}")
time.sleep(10)
except KeyboardInterrupt:
print("监控已停止")
break
代码核心优化点解读:
| 优化手段 | 效果 | 节省幅度 |
|---|---|---|
| 缓存 + 降频 | 同一标的同一分钟内不重复请求 | ~98% |
| WebSocket depth 推送 | 盘口数据不走 REST | ~100% |
| 按需查询 | 只在缓存过期时查 REST | 与缓存命中率挂钩 |
| 抖动重连 | 避免重连风暴 | 防止调用量突增 |
3.2 场景二:批量历史 K 线查询
回测场景下,正确的做法是一次请求拉取尽可能多的数据。
import requests
import os
from typing import Generator
API_KEY = os.environ.get("TICKDB_API_KEY")
BASE_URL = "https://api.tickdb.ai/v1/market"
HEADERS = {"X-API-Key": API_KEY}
# ============================================================
# 批量历史 K 线查询:最小化请求数
# 策略:单次请求拉满上限,多次请求覆盖所有标的
# ============================================================
def fetch_historical_klines_batch(
symbols: list[str],
interval: str = "1d",
months: int = 36
) -> dict[str, list[dict]]:
"""
批量获取多只股票的历史 K 线数据。
参数:
symbols: 股票代码列表,如 ["AAPL.US", "MSFT.US"]
interval: K 线周期,支持 1m/5m/15m/1h/1d
months: 回测时间跨度(月)
返回:
{symbol: [kline_data, ...], ...}
"""
all_data = {}
limit = 1000 # TickDB 单次 K 线查询上限
for symbol in symbols:
# 按月分批请求,避免一次请求返回过多数据
symbol_data = []
end_time = None
for _ in range(months):
params = {
"symbol": symbol,
"interval": interval,
"limit": limit,
}
if end_time:
params["end_time"] = end_time
try:
response = requests.get(
f"{BASE_URL}/kline",
headers=HEADERS,
params=params,
timeout=(3.05, 10)
)
data = response.json()
if data.get("code") == 0:
klines = data["data"]["klines"]
if not klines:
break
symbol_data.extend(klines)
# 下一页:从最后一条 K 线的时间戳继续
end_time = klines[0]["time"]
elif data.get("code") == 3001:
# ⚠️ 限频处理:读取 Retry-After
retry_after = int(response.headers.get("Retry-After", 5))
print(f"[RATE LIMIT] 请求 {symbol} 触发限频,"
f"等待 {retry_after}s 后重试")
import time
time.sleep(retry_after)
continue
else:
print(f"[ERROR] {symbol}: code={data.get('code')}, "
f"msg={data.get('message')}")
break
except requests.exceptions.Timeout:
print(f"[TIMEOUT] 请求 {symbol} 超时,3s 后重试")
import time
time.sleep(3)
continue
all_data[symbol] = symbol_data
print(f"[OK] {symbol}: 获取 {len(symbol_data)} 条 {interval} K 线")
return all_data
# 用量估算验证
def estimate_monthly_cost(num_symbols: int, months: int, interval: str) -> dict:
"""
验证回测场景的月调用量。
公式:ceil(历史总条数 / 单次上限) × 标的数
"""
# 假设每月约 22 个交易日
trading_days_per_month = 22
total_days = num_symbols * months * trading_days_per_month
# 估算请求数(实际上每个月需要 ceil(monthly_days / 1000) 次)
requests_per_symbol = 0
remaining = 0
for _ in range(months):
month_days = trading_days_per_month
remaining += month_days
while remaining > 1000:
requests_per_symbol += 1
remaining -= 1000
if remaining > 0:
requests_per_symbol += 1
remaining = 0
total_requests = requests_per_symbol * num_symbols
return {
"标的数量": num_symbols,
"回测跨度": f"{months} 个月",
"K 线周期": interval,
"总 K 线数(估算)": total_days,
"月调用量(估算)": total_requests,
"单次平均 K 线数": round(total_days / total_requests, 1) if total_requests > 0 else 0
}
if __name__ == "__main__":
# 示例:100 只股票,36 个月日线数据
result = estimate_monthly_cost(
num_symbols=100,
months=36,
interval="1d"
)
print("=== 用量估算 ===")
for k, v in result.items():
print(f" {k}: {v}")
四、成本优化:四个层次的降本策略
4.1 策略一:架构层——从轮询到订阅
这是效果最大、改动也最大的优化。
高频轮询(错误)
↓
100 只股票 × 每分钟 60 次 × 22 天 = 132,000 次/月
WebSocket + 按需查询(正确)
↓
WebSocket 推送 depth(0 次 REST 调用)
+ 按需查询 /kline/latest(降频后 2,310 次/月)
↓
节省比例:98.3%
实现要点:将 poll_latest_klines() 拆分为两层:
- WebSocket 层:处理
depth/ticker频道推送,维护本地缓存 - REST 层:仅在需要时(缓存过期、手动查询、历史补充)才发起请求
4.2 策略二:缓存层——时间窗口缓存
from functools import lru_cache
from datetime import datetime, timedelta
# 简单的时间窗口缓存装饰器
def cache_until(interval_seconds: int):
"""缓存结果直到指定时间窗口过去"""
def decorator(func):
cache = {"value": None, "expires_at": datetime.min}
def wrapper(*args, **kwargs):
now = datetime.now()
if now < cache["expires_at"]:
return cache["value"]
result = func(*args, **kwargs)
cache["value"] = result
cache["expires_at"] = now + timedelta(seconds=interval_seconds)
return result
return wrapper
return decorator
@lru_cache(maxsize=1)
def get_cached_klines(symbol: str, interval: str = "1m"):
"""
同一标的同一分钟内的重复请求,直接返回缓存。
⚠️ 注意:lru_cache 的 key 基于函数参数,
生产环境建议使用 Redis 实现跨进程缓存。
"""
response = requests.get(
f"{BASE_URL}/kline/latest",
headers=HEADERS,
params={"symbol": symbol, "interval": interval},
timeout=(3.05, 10)
)
return response.json()
缓存层优化的核心原则是:同一个标的的同一个请求,在 60 秒内只发一次。
4.3 策略三:请求层——批量合并
# ⚠️ 伪代码示例:展示批量请求的思路
# 实际实现需参考 TickDB API 文档确认批量接口支持情况
def batch_fetch(symbols: list[str], data_type: str = "kline") -> dict:
"""
批量请求多只股票的同类型数据。
效果:将 100 次单独请求合并为 1-3 次批量请求。
"""
if not hasattr(batch_fetch, '_cache'):
batch_fetch._cache = {}
# 检查缓存
now = datetime.now()
cache_key = f"{','.join(sorted(symbols))}:{data_type}"
if cache_key in batch_fetch._cache:
cached = batch_fetch._cache[cache_key]
if (now - cached["timestamp"]).seconds < 60:
return cached["data"]
# 实际批量请求
payload = {
"symbols": symbols,
"type": data_type,
"interval": "1m",
"limit": 1
}
try:
response = requests.post(
f"{BASE_URL}/batch",
headers=HEADERS,
json=payload,
timeout=(3.05, 10)
)
result = response.json()
# 更新缓存
batch_fetch._cache[cache_key] = {
"data": result,
"timestamp": now
}
return result
except requests.exceptions.Timeout:
raise RuntimeError(f"批量请求 {len(symbols)} 只股票超时")
# 批量请求的效果对比
def compare_request_efficiency(symbols: list[str]):
"""对比单独请求 vs 批量请求的调用量"""
# 单独请求:100 只股票 = 100 次请求
separate_requests = len(symbols)
# 批量请求:假设单批上限 50,100 只股票 = 2 次请求
batch_size = 50
batch_requests = (len(symbols) + batch_size - 1) // batch_size
print(f"单独请求: {separate_requests} 次")
print(f"批量请求: {batch_requests} 次")
print(f"节省: {separate_requests - batch_requests} 次 "
f"({round((1 - batch_requests / separate_requests) * 100, 1)}%)")
4.4 策略四:监控层——用量仪表盘
成本控制的前提是可见性。建议在监控系统中加入用量统计模块:
import threading
from datetime import datetime
class CostMonitor:
"""
用量监控器:实时追踪 API 调用量,估算月账单。
建议接入 Prometheus/Grafana 进行可视化。
"""
def __init__(self, monthly_budget: float = 100.0):
self.monthly_budget = monthly_budget # 每月预算(美元)
self._calls_today = 0
self._calls_this_month = 0
self._month_start = datetime.now()
self._lock = threading.Lock()
def record_call(self, endpoint: str, symbol_count: int = 1):
"""每次 API 调用后调用此方法记录"""
with self._lock:
self._calls_today += 1
self._calls_this_month += 1
def reset_month(self):
"""每月重置计数(在计费周期切换时调用)"""
with self._lock:
self._calls_this_month = 0
self._month_start = datetime.now()
def get_usage_report(self) -> dict:
"""获取当前用量报告"""
with self._lock:
days_in_month = 30
today = datetime.now().day
days_passed = today if today > 0 else 1
projected_monthly = self._calls_this_month * (days_in_month / days_passed)
budget_used_pct = projected_monthly / self.monthly_budget if self.monthly_budget > 0 else 0
return {
"calls_today": self._calls_today,
"calls_this_month": self._calls_this_month,
"projected_monthly": round(projected_monthly),
"monthly_budget": self.monthly_budget,
"budget_used_pct": round(budget_used_pct * 100, 1),
"status": self._get_status(budget_used_pct, days_passed / days_in_month)
}
def _get_status(self, budget_pct: float, time_pct: float) -> str:
if budget_pct <= time_pct * 0.8:
return "✅ 健康"
elif budget_pct <= time_pct * 1.0:
return "⚠️ 注意"
elif budget_pct <= time_pct * 1.2:
return "🔴 警告:接近预算上限"
else:
return "🚨 超支:立即优化"
# 使用示例
cost_monitor = CostMonitor(monthly_budget=100.0)
# 模拟运行
for _ in range(100):
cost_monitor.record_call(endpoint="/kline/latest")
report = cost_monitor.get_usage_report()
print("=== 用量报告 ===")
for k, v in report.items():
print(f" {k}: {v}")
五、TickDB 各功能模块用量估算速查表
| 功能 | 场景 | 日均调用量估算 | 月均调用量估算 | 优化空间 |
|---|---|---|---|---|
/kline/latest |
实时监控(100 标的) | 100–2,200 | 2,200–48,400 | 降频后降至 2,200 |
/kline |
历史日线(100 标的 × 3 年) | — | 300 | 批量查询 |
depth 频道 |
订单簿快照 | 0 REST 调用 | 0 | WebSocket 推送 |
ticker 频道 |
实时成交数据 | 0 REST 调用 | 0 | WebSocket 推送 |
/symbols/available |
品种查询 | 1–5/天 | 30–150 | 缓存结果 |
重要区分:
depth和ticker是 WebSocket 推送频道,接入后不消耗 REST 请求配额。这是 TickDB 相对于纯轮询方案的核心优势之一。
六、一句话总结
你的月调用量不是固定的 100×60×22,而是由架构设计决定的变量。用 WebSocket 推送代替轮询,用缓存代替重复查询,用批量请求代替循环单查——这三个改动加起来,通常能让账单减少 90% 以上。
下一步行动
如果你想亲手验证本文的估算模型:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台生成 API Key
- 将
TICKDB_API_KEY设为环境变量,复制本文代码运行 - 对比你的实际调用量与理论估算值
如果你已经有代码在运行:
在现有代码中加入 CostMonitor,看看你的实际调用量比理论值高了多少——那个差值就是你优化的空间。
如果你需要更长的历史数据做回测:
联系 [email protected] 了解历史 K 线数据的批量采购方案。
风险提示:本文所有用量估算基于标准产品定价,实际费用请以 TickDB 官方账单为准。API 定价可能随版本更新调整,建议定期查阅最新定价文档。