先问自己一个问题

你上周调用 TickDB 的费用超了,还是还有余量?

大多数人在正式接入之前,不会去想这个问题。等收到账单的那一刻,才发现自己低估了调用量——100 只股票、分钟级数据、7×24 小时运行,三个条件叠加在一起,一个月轻轻松松就是几十万次请求。

这不是 TickDB 的问题,是估算方法的问题。

本文给出两件事:第一,一个你能在接入前就算清楚的用量公式;第二,一套让实际调用量远低于理论值的工程策略。读完本文,你应该能自己算出“你的场景”一个月大概花多少,以及从哪里省。


一、为什么用量估算总是差 10 倍

1.1 常见误区

在进入公式之前,先拆解几个典型误区,它们几乎每个团队第一次接入时都会踩。

误区一:把“轮询间隔”当“调用次数”

"分钟级数据,1 分钟请求一次,1 天 1440 次,100 只股票 144,000 次,不贵。"

这个算法漏掉了两个关键因素:

  • TickDB 的 REST API 按请求计费,而不是按股票数量计费。你一次请求可以查 100 只股票,也可以一只一只查,结果天差地别。
  • 实时监控场景下,你的程序通常是长连接 + 推送,而不是轮询,推送不消耗 REST 请求配额。

误区二:把“实时 K 线”和“历史 K 线”混用

TickDB 提供了两个完全不同的接口:

  • /kline/latest:获取当前未收盘 K 线,适合实时监控,单次请求响应极快
  • /kline:获取已收盘的历史 K 线,适合回测,返回的是确定的历史数据

如果你在实时监控场景下用 /kline 轮询,每分钟对 100 只股票请求一次,实际上是在重复请求同一批数据——这批数据只有到收盘那一刻才会变化,但你的请求却在每一分钟都在发生。

误区三:低估“心跳”和“重连”的开销

生产级 WebSocket 连接不是“连上就不管了”。实际运行中,网络抖动、服务器维护、程序重启都会触发重连。如果你的重连逻辑没有退避处理,短时间内的大量重连会直接推高有效请求量。

1.2 正确的估算思路

正确的思路是先分场景,再算调用量。不同使用场景的计费模型完全不同:

使用场景 数据类型 推荐接口 计费方式
实时监控(盯盘、告警) 当前 K 线 /kline/latest + WebSocket 按实际请求/推送次数
策略回测 历史 K 线 /kline 按查询返回的数据量
订单簿快照 盘口数据 depth 频道 按推送频率
盘后分析 日级别历史 /kline (interval=1d) 按查询次数

下面给出一个通用的估算框架,你只需要把自己的参数代进去。


二、用量估算公式:把参数代进去

2.1 场景一:实时监控(REST + WebSocket 混合)

这是最常见的场景:监控 100 只股票的分钟级数据,价格异动时告警

月调用量公式:

月 REST 请求数 = (N × 1/60 × T) + S

其中:
  N   = 监控标的数量
  1/60 = 降频系数(分钟级数据不需要每分钟查 60 次)
  T   = 月运行天数(按 22 天计)
  S   = 特殊操作次数(换班重连、策略重启等,建议设为日请求量的 5%)

如果你在程序启动时批量查询所有股票的状态,这部分也算调用量:

启动批量查询 = ceil(N / 单批最大数量) × 日启动次数

代入具体数字:

月 REST 请求数 = (100 × 1 × 22) + (100 × 22 × 5%)
              = 2,200 + 110
              = 2,310 次/月

加上 WebSocket 长连接——WebSocket 的 depth 频道推送不消耗 REST 请求配额,只要你的代码只订阅、不轮询 REST,这部分的调用量可以忽略不计。

关键点:分钟级监控不需要每分钟查 60 次。你真正需要的是"最近一次 K 线更新的时间戳"是否变化。如果更新了,说明有新数据;如果没更新,说明数据没有变化——你不需要反复请求。

2.2 场景二:日线回测(历史 K 线查询)

这是第二种典型场景:用过去 3 年的日线数据回测一个趋势策略

月调用量公式:

月请求数 = ceil(D / 单次返回上限) × N

其中:
  D   = 历史总交易日数(约 750 天/年 × 年数)
  N   = 股票数量
  单次返回上限 = TickDB 单次 K 线查询的返回上限

代入具体数字(3 年、100 只股票):

月请求数 = ceil(750 × 3 / 1000) × 100
         = ceil(2.25) × 100
         = 3 × 100
         = 300 次

300 次请求,换来 100 只股票、3 年的完整日线数据——这就是批量查询的威力。

2.3 场景三:高频轮询(不推荐,但很多人正在做)

如果你真的每分钟对 100 只股票各发一次 REST 请求:

月请求数 = 100 × 60 × 22 = 132,000 次/月

对比一下优化后的 2,310 次/月——相差 57 倍。这就是估算错误的代价。

2.4 一张表看懂三种场景的差距

场景 标的数 时间粒度 月调用量(估算) 调用量级别
高频轮询(错误做法) 100 分钟 132,000 10 万级
降频轮询 + WebSocket(标准做法) 100 分钟 2,310 千级
日线回测(批量查询) 100 日线 300 百级

差距不是 10%,是 10 倍到 100 倍


三、生产级代码:把估算公式落地

3.1 场景一:实时监控的标准架构

下面是一套生产级实时监控代码,包含降频逻辑、WebSocket 订阅、以及 REST 接口的按需调用。

import os
import time
import json
import threading
import requests
from collections import defaultdict
from datetime import datetime, timedelta

# ============================================================
# TickDB 实时监控:标准生产架构
# 核心优化:降频 + WebSocket 推送 + 按需 REST 查询
# ============================================================

API_KEY = os.environ.get("TICKDB_API_KEY")
BASE_URL = "https://api.tickdb.ai/v1/market"
HEADERS = {"X-API-Key": API_KEY}


class TickDBMonitor:
    """
    TickDB 实时监控器
    架构设计:
    1. WebSocket 接收 depth/ticker 推送(不消耗 REST 配额)
    2. REST /kline/latest 按需查询(降频到每标的最多 1 次/分钟)
    3. 内存缓存最近更新时间戳,避免重复查询
    """

    def __init__(self, symbols: list[str], alert_threshold: float = 0.03):
        self.symbols = symbols
        self.alert_threshold = alert_threshold  # 价格变动超过 3% 触发告警
        self._cache = {}        # {symbol: {"last_close": float, "last_update": datetime}}
        self._lock = threading.Lock()
        self._running = False

        # WebSocket 连接状态
        self._ws = None
        self._ws_reconnect_delay = 1
        self._ws_max_delay = 60
        self._ws_retry_count = 0

        # REST 请求统计(用于用量监控)
        self._rest_call_count = 0

    # ----------------------------------------------------------
    # 核心方法 1:降频轮询 /kline/latest
    # ----------------------------------------------------------
    def poll_latest_klines(self) -> dict:
        """
        获取所有标的的最新 K 线(分钟级)。
        关键优化:只有当本地缓存的数据已过期(超过 1 分钟)
                 或不存在时,才真正发起 REST 请求。
        """
        results = {}
        now = datetime.now()

        with self._lock:
            for symbol in self.symbols:
                cached = self._cache.get(symbol)
                # 降频:如果缓存未过期(1 分钟以内),跳过请求
                if cached and (now - cached["last_update"]).seconds < 60:
                    results[symbol] = cached
                    continue

                # 缓存过期,发起 REST 请求
                url = f"{BASE_URL}/kline/latest"
                params = {
                    "symbol": symbol,
                    "interval": "1m",
                    "limit": 1
                }

                try:
                    response = requests.get(
                        url,
                        headers=HEADERS,
                        params=params,
                        timeout=(3.05, 10)
                    )
                    data = response.json()

                    if data.get("code") == 0:
                        kline = data["data"]["klines"][0]
                        close_price = float(kline["close"])
                        self._cache[symbol] = {
                            "last_close": close_price,
                            "last_update": now,
                            "open": float(kline["open"]),
                            "high": float(kline["high"]),
                            "low": float(kline["low"]),
                        }
                        results[symbol] = self._cache[symbol]
                        self._rest_call_count += 1

                except requests.exceptions.Timeout:
                    # ⚠️ 超时处理:记录日志,不阻塞其他标的
                    print(f"[WARN] 请求 {symbol} 超时,跳过本次")
                    continue

        return results

    # ----------------------------------------------------------
    # 核心方法 2:监听价格异动
    # ----------------------------------------------------------
    def check_alerts(self):
        """
        对比当前价格与缓存价格,检测异动。
        """
        klines = self.poll_latest_klines()
        alerts = []

        for symbol, data in klines.items():
            cached = self._cache.get(symbol)
            if cached and "last_close" in cached:
                change_pct = abs(data["last_close"] - cached["last_close"]) / cached["last_close"]
                if change_pct > self.alert_threshold:
                    alerts.append({
                        "symbol": symbol,
                        "price": data["last_close"],
                        "change_pct": round(change_pct * 100, 2),
                        "direction": "up" if data["last_close"] > cached["last_close"] else "down"
                    })

        return alerts

    # ----------------------------------------------------------
    # 核心方法 3:启动 WebSocket 深度订阅(推送模式)
    # ----------------------------------------------------------
    def start_depth_subscription(self):
        """
        启动 WebSocket depth 频道订阅。
        ⚠️ 注意:WebSocket 连接参数在 URL 中传递 api_key
        ⚠️ 生产环境建议使用 aiohttp/asyncio 架构
        """
        import websocket
        ws_url = f"wss://api.tickdb.ai/v1/market/stream?api_key={API_KEY}&channels=depth"

        def on_message(ws, message):
            msg = json.loads(message)
            # 处理 depth 推送,更新缓存(不消耗 REST 配额)
            if msg.get("channel") == "depth":
                symbol = msg.get("symbol")
                with self._lock:
                    if symbol in self._cache:
                        self._cache[symbol]["depth"] = msg.get("data")

        def on_error(ws, error):
            print(f"[WS ERROR] {error}")

        def on_close(ws, code, reason):
            # ⚠️ 指数退避重连:避免惊群效应
            delay = min(self._ws_reconnect_delay * (2 ** self._ws_retry_count), self._ws_max_delay)
            jitter = (hash(str(time.time())) % 1000) / 1000.0  # 简单抖动
            wait = delay + jitter * delay * 0.1
            print(f"[WS CLOSE] {code} {reason},{wait:.1f}s 后重连(第 {self._ws_retry_count + 1} 次)")
            time.sleep(wait)
            self._ws_retry_count += 1
            self.start_depth_subscription()

        def on_open(ws):
            print("[WS OPEN] 深度订阅已连接")
            self._ws_retry_count = 0
            self._ws_reconnect_delay = 1
            # 订阅 depth 频道
            subscribe_msg = {
                "cmd": "subscribe",
                "params": {
                    "channels": ["depth"],
                    "symbols": self.symbols
                }
            }
            ws.send(json.dumps(subscribe_msg))

        self._ws = websocket.WebSocketApp(
            ws_url,
            on_message=on_message,
            on_error=on_error,
            on_close=on_close,
            on_open=on_open
        )

        # 非阻塞启动(生产环境建议用 threading 或 asyncio)
        ws_thread = threading.Thread(target=self._ws.run_forever, daemon=True)
        ws_thread.start()

    def get_rest_call_stats(self) -> dict:
        """返回本月 REST 调用统计,用于成本监控"""
        return {
            "total_rest_calls": self._rest_call_count,
            "monitored_symbols": len(self.symbols),
            "cache_hit_rate": self._estimate_cache_hit_rate()
        }

    def _estimate_cache_hit_rate(self) -> float:
        """估算缓存命中率(降频效果)"""
        if self._rest_call_count == 0:
            return 0.0
        theoretical_calls = len(self.symbols) * 60 * 22  # 高频轮询理论值
        return round(1 - (self._rest_call_count / theoretical_calls), 4)


if __name__ == "__main__":
    symbols = [f"{s}.US" for s in ["AAPL", "MSFT", "GOOGL", "NVDA", "TSLA"]]

    monitor = TickDBMonitor(symbols=symbols, alert_threshold=0.03)

    # 启动 WebSocket 深度订阅
    monitor.start_depth_subscription()

    # 主循环(每 10 秒检查一次告警)
    # ⚠️ 注意:这里的轮询间隔是 10 秒,
    #         但 /kline/latest 在缓存未过期时不会发起实际请求
    while True:
        try:
            alerts = monitor.check_alerts()
            if alerts:
                print(f"[{datetime.now().strftime('%H:%M:%S')}] 检测到异动: {alerts}")

            stats = monitor.get_rest_call_stats()
            print(f"[{datetime.now().strftime('%H:%M:%S')}] "
                  f"REST 调用: {stats['total_rest_calls']}, "
                  f"缓存命中率: {stats['cache_hit_rate']:.1%}")

            time.sleep(10)

        except KeyboardInterrupt:
            print("监控已停止")
            break

代码核心优化点解读:

优化手段 效果 节省幅度
缓存 + 降频 同一标的同一分钟内不重复请求 ~98%
WebSocket depth 推送 盘口数据不走 REST ~100%
按需查询 只在缓存过期时查 REST 与缓存命中率挂钩
抖动重连 避免重连风暴 防止调用量突增

3.2 场景二:批量历史 K 线查询

回测场景下,正确的做法是一次请求拉取尽可能多的数据。

import requests
import os
from typing import Generator

API_KEY = os.environ.get("TICKDB_API_KEY")
BASE_URL = "https://api.tickdb.ai/v1/market"
HEADERS = {"X-API-Key": API_KEY}

# ============================================================
# 批量历史 K 线查询:最小化请求数
# 策略:单次请求拉满上限,多次请求覆盖所有标的
# ============================================================


def fetch_historical_klines_batch(
    symbols: list[str],
    interval: str = "1d",
    months: int = 36
) -> dict[str, list[dict]]:
    """
    批量获取多只股票的历史 K 线数据。

    参数:
        symbols: 股票代码列表,如 ["AAPL.US", "MSFT.US"]
        interval: K 线周期,支持 1m/5m/15m/1h/1d
        months: 回测时间跨度(月)

    返回:
        {symbol: [kline_data, ...], ...}
    """
    all_data = {}
    limit = 1000  # TickDB 单次 K 线查询上限

    for symbol in symbols:
        # 按月分批请求,避免一次请求返回过多数据
        symbol_data = []
        end_time = None

        for _ in range(months):
            params = {
                "symbol": symbol,
                "interval": interval,
                "limit": limit,
            }
            if end_time:
                params["end_time"] = end_time

            try:
                response = requests.get(
                    f"{BASE_URL}/kline",
                    headers=HEADERS,
                    params=params,
                    timeout=(3.05, 10)
                )
                data = response.json()

                if data.get("code") == 0:
                    klines = data["data"]["klines"]
                    if not klines:
                        break
                    symbol_data.extend(klines)
                    # 下一页:从最后一条 K 线的时间戳继续
                    end_time = klines[0]["time"]

                elif data.get("code") == 3001:
                    # ⚠️ 限频处理:读取 Retry-After
                    retry_after = int(response.headers.get("Retry-After", 5))
                    print(f"[RATE LIMIT] 请求 {symbol} 触发限频,"
                          f"等待 {retry_after}s 后重试")
                    import time
                    time.sleep(retry_after)
                    continue

                else:
                    print(f"[ERROR] {symbol}: code={data.get('code')}, "
                          f"msg={data.get('message')}")
                    break

            except requests.exceptions.Timeout:
                print(f"[TIMEOUT] 请求 {symbol} 超时,3s 后重试")
                import time
                time.sleep(3)
                continue

        all_data[symbol] = symbol_data
        print(f"[OK] {symbol}: 获取 {len(symbol_data)} 条 {interval} K 线")

    return all_data


# 用量估算验证
def estimate_monthly_cost(num_symbols: int, months: int, interval: str) -> dict:
    """
    验证回测场景的月调用量。
    公式:ceil(历史总条数 / 单次上限) × 标的数
    """
    # 假设每月约 22 个交易日
    trading_days_per_month = 22
    total_days = num_symbols * months * trading_days_per_month

    # 估算请求数(实际上每个月需要 ceil(monthly_days / 1000) 次)
    requests_per_symbol = 0
    remaining = 0
    for _ in range(months):
        month_days = trading_days_per_month
        remaining += month_days
        while remaining > 1000:
            requests_per_symbol += 1
            remaining -= 1000
        if remaining > 0:
            requests_per_symbol += 1
            remaining = 0

    total_requests = requests_per_symbol * num_symbols

    return {
        "标的数量": num_symbols,
        "回测跨度": f"{months} 个月",
        "K 线周期": interval,
        "总 K 线数(估算)": total_days,
        "月调用量(估算)": total_requests,
        "单次平均 K 线数": round(total_days / total_requests, 1) if total_requests > 0 else 0
    }


if __name__ == "__main__":
    # 示例:100 只股票,36 个月日线数据
    result = estimate_monthly_cost(
        num_symbols=100,
        months=36,
        interval="1d"
    )
    print("=== 用量估算 ===")
    for k, v in result.items():
        print(f"  {k}: {v}")

四、成本优化:四个层次的降本策略

4.1 策略一:架构层——从轮询到订阅

这是效果最大、改动也最大的优化。

高频轮询(错误)
  ↓
  100 只股票 × 每分钟 60 次 × 22 天 = 132,000 次/月

WebSocket + 按需查询(正确)
  ↓
  WebSocket 推送 depth(0 次 REST 调用)
  + 按需查询 /kline/latest(降频后 2,310 次/月)
  ↓
节省比例:98.3%

实现要点:将 poll_latest_klines() 拆分为两层:

  • WebSocket 层:处理 depth/ticker 频道推送,维护本地缓存
  • REST 层:仅在需要时(缓存过期、手动查询、历史补充)才发起请求

4.2 策略二:缓存层——时间窗口缓存

from functools import lru_cache
from datetime import datetime, timedelta

# 简单的时间窗口缓存装饰器
def cache_until(interval_seconds: int):
    """缓存结果直到指定时间窗口过去"""
    def decorator(func):
        cache = {"value": None, "expires_at": datetime.min}

        def wrapper(*args, **kwargs):
            now = datetime.now()
            if now < cache["expires_at"]:
                return cache["value"]
            result = func(*args, **kwargs)
            cache["value"] = result
            cache["expires_at"] = now + timedelta(seconds=interval_seconds)
            return result
        return wrapper
    return decorator


@lru_cache(maxsize=1)
def get_cached_klines(symbol: str, interval: str = "1m"):
    """
    同一标的同一分钟内的重复请求,直接返回缓存。
    ⚠️ 注意:lru_cache 的 key 基于函数参数,
             生产环境建议使用 Redis 实现跨进程缓存。
    """
    response = requests.get(
        f"{BASE_URL}/kline/latest",
        headers=HEADERS,
        params={"symbol": symbol, "interval": interval},
        timeout=(3.05, 10)
    )
    return response.json()

缓存层优化的核心原则是:同一个标的的同一个请求,在 60 秒内只发一次

4.3 策略三:请求层——批量合并

# ⚠️ 伪代码示例:展示批量请求的思路
# 实际实现需参考 TickDB API 文档确认批量接口支持情况

def batch_fetch(symbols: list[str], data_type: str = "kline") -> dict:
    """
    批量请求多只股票的同类型数据。
    效果:将 100 次单独请求合并为 1-3 次批量请求。
    """
    if not hasattr(batch_fetch, '_cache'):
        batch_fetch._cache = {}

    # 检查缓存
    now = datetime.now()
    cache_key = f"{','.join(sorted(symbols))}:{data_type}"
    if cache_key in batch_fetch._cache:
        cached = batch_fetch._cache[cache_key]
        if (now - cached["timestamp"]).seconds < 60:
            return cached["data"]

    # 实际批量请求
    payload = {
        "symbols": symbols,
        "type": data_type,
        "interval": "1m",
        "limit": 1
    }

    try:
        response = requests.post(
            f"{BASE_URL}/batch",
            headers=HEADERS,
            json=payload,
            timeout=(3.05, 10)
        )
        result = response.json()

        # 更新缓存
        batch_fetch._cache[cache_key] = {
            "data": result,
            "timestamp": now
        }
        return result

    except requests.exceptions.Timeout:
        raise RuntimeError(f"批量请求 {len(symbols)} 只股票超时")


# 批量请求的效果对比
def compare_request_efficiency(symbols: list[str]):
    """对比单独请求 vs 批量请求的调用量"""
    # 单独请求:100 只股票 = 100 次请求
    separate_requests = len(symbols)

    # 批量请求:假设单批上限 50,100 只股票 = 2 次请求
    batch_size = 50
    batch_requests = (len(symbols) + batch_size - 1) // batch_size

    print(f"单独请求: {separate_requests} 次")
    print(f"批量请求: {batch_requests} 次")
    print(f"节省: {separate_requests - batch_requests} 次 "
          f"({round((1 - batch_requests / separate_requests) * 100, 1)}%)")

4.4 策略四:监控层——用量仪表盘

成本控制的前提是可见性。建议在监控系统中加入用量统计模块:

import threading
from datetime import datetime


class CostMonitor:
    """
    用量监控器:实时追踪 API 调用量,估算月账单。
    建议接入 Prometheus/Grafana 进行可视化。
    """

    def __init__(self, monthly_budget: float = 100.0):
        self.monthly_budget = monthly_budget  # 每月预算(美元)
        self._calls_today = 0
        self._calls_this_month = 0
        self._month_start = datetime.now()
        self._lock = threading.Lock()

    def record_call(self, endpoint: str, symbol_count: int = 1):
        """每次 API 调用后调用此方法记录"""
        with self._lock:
            self._calls_today += 1
            self._calls_this_month += 1

    def reset_month(self):
        """每月重置计数(在计费周期切换时调用)"""
        with self._lock:
            self._calls_this_month = 0
            self._month_start = datetime.now()

    def get_usage_report(self) -> dict:
        """获取当前用量报告"""
        with self._lock:
            days_in_month = 30
            today = datetime.now().day
            days_passed = today if today > 0 else 1

            projected_monthly = self._calls_this_month * (days_in_month / days_passed)
            budget_used_pct = projected_monthly / self.monthly_budget if self.monthly_budget > 0 else 0

            return {
                "calls_today": self._calls_today,
                "calls_this_month": self._calls_this_month,
                "projected_monthly": round(projected_monthly),
                "monthly_budget": self.monthly_budget,
                "budget_used_pct": round(budget_used_pct * 100, 1),
                "status": self._get_status(budget_used_pct, days_passed / days_in_month)
            }

    def _get_status(self, budget_pct: float, time_pct: float) -> str:
        if budget_pct <= time_pct * 0.8:
            return "✅ 健康"
        elif budget_pct <= time_pct * 1.0:
            return "⚠️ 注意"
        elif budget_pct <= time_pct * 1.2:
            return "🔴 警告:接近预算上限"
        else:
            return "🚨 超支:立即优化"


# 使用示例
cost_monitor = CostMonitor(monthly_budget=100.0)

# 模拟运行
for _ in range(100):
    cost_monitor.record_call(endpoint="/kline/latest")

report = cost_monitor.get_usage_report()
print("=== 用量报告 ===")
for k, v in report.items():
    print(f"  {k}: {v}")

五、TickDB 各功能模块用量估算速查表

功能 场景 日均调用量估算 月均调用量估算 优化空间
/kline/latest 实时监控(100 标的) 100–2,200 2,200–48,400 降频后降至 2,200
/kline 历史日线(100 标的 × 3 年) 300 批量查询
depth 频道 订单簿快照 0 REST 调用 0 WebSocket 推送
ticker 频道 实时成交数据 0 REST 调用 0 WebSocket 推送
/symbols/available 品种查询 1–5/天 30–150 缓存结果

重要区分depthticker 是 WebSocket 推送频道,接入后不消耗 REST 请求配额。这是 TickDB 相对于纯轮询方案的核心优势之一。


六、一句话总结

你的月调用量不是固定的 100×60×22,而是由架构设计决定的变量。用 WebSocket 推送代替轮询,用缓存代替重复查询,用批量请求代替循环单查——这三个改动加起来,通常能让账单减少 90% 以上。


下一步行动

如果你想亲手验证本文的估算模型:

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. TICKDB_API_KEY 设为环境变量,复制本文代码运行
  4. 对比你的实际调用量与理论估算值

如果你已经有代码在运行:
在现有代码中加入 CostMonitor,看看你的实际调用量比理论值高了多少——那个差值就是你优化的空间。

如果你需要更长的历史数据做回测:
联系 [email protected] 了解历史 K 线数据的批量采购方案。


风险提示:本文所有用量估算基于标准产品定价,实际费用请以 TickDB 官方账单为准。API 定价可能随版本更新调整,建议定期查阅最新定价文档。