限频自适应处理:当 API 返回 3001 时,你的代码该做什么

开篇

你写的监控程序跑了三个月,一切正常。然后某天凌晨 3 点,你被飞书告警叫醒——不是策略亏损,是连接被封了。

日志里只有一行:

HTTP 429 Too Many Requests

你看了眼代码:哦,有重试逻辑。再一看,10 秒后重试。再一看,10 秒后再重试。

结果:你的 IP 从限频变成了封禁。

这不是假设。Rate Limit 触发后最常见的错误不是"不重试",而是"用错误的方式重试"——要么太急(触发封禁),要么太懒(错过后续行情),要么根本不知道该等多久(直接放弃)。

本文拆解 API 限频的标准处理范式,重点回答三个问题:

  1. 为什么"等固定时间重试"是错的
  2. Retry-After 头到底怎么用才对
  3. 如何在本地模拟令牌桶,做到真正自适应的限频控制

一、问题建模:API 限频的本质

1.1 限频不是惩罚,是资源分配机制

很多开发者把限频理解为"API 提供商不想让你用",这是一个认知偏差。

限频(Rate Limiting)是服务端的资源分配策略,核心目标是:保证所有客户端在高并发场景下仍能获得公平、稳定的服务质量。

以 TickDB 的 WebSocket 连接为例,当某个节点上的活跃连接数接近系统上限时,新连接请求会触发限频。这个限频是动态的,取决于当前节点负载,而不是你发了多少请求。

理解了这一点,你就知道为什么"硬编码等待 10 秒"是错的:服务端告诉你等多久,不是随便写的,是一个基于当前负载计算出的合理值。

1.2 限频的三个维度

维度 说明 典型单位
请求速率限制 单位时间内的请求数量上限 QPS(每秒请求数)
并发连接限制 同时存在的活跃连接数 连接数
流量限制 单位时间内的数据吞吐量 MB/s 或 records/s

TickDB 对 REST API 和 WebSocket 分别施加独立的速率限制,触发条件均为任意一个维度超限

1.3 错误码 3001 的含义

在 TickDB 的错误响应体系中,code: 3001 表示"请求频率超限":

{
  "code": 3001,
  "message": "Rate limit exceeded",
  "data": null
}

这个错误码会伴随 HTTP 状态码 429(Too Many Requests)一起返回。在响应头中,会包含 Retry-After 字段,值为服务端希望你等待的秒数(整数)。


二、Retry-After 头:不是建议,是指令

2.1 RFC 7231 的标准定义

HTTP 规范(RFC 7231 Section 7.1.3)明确定义了 Retry-After 头:

Retry-After = delta-seconds | HTTP-date

两种格式:

  • delta-seconds:等待的秒数(整数),推荐格式
  • HTTP-date:绝对时间(RFC 1123 格式),用于表示未来的某个时间点

对于 API 限频场景,几乎所有服务提供商都使用 delta-seconds 格式。

2.2 错误的处理方式 vs 正确的处理方式

❌ 错误做法一:忽略 Retry-After,硬编码等待时间

# 很多教程里的"标准"重试逻辑
for attempt in range(3):
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        return response.json()
    time.sleep(10)  # 固定 10 秒,永远不变

问题:如果服务端让你等 1 秒,你等了 10 秒,效率损失 90%。如果服务端让你等 60 秒,你只等了 10 秒,直接触发封禁。


❌ 错误做法二:指数退避但不读 Retry-After

# 看似"智能"的指数退避
delay = 1
for attempt in range(5):
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        return response.json()
    time.sleep(delay)
    delay *= 2  # 1, 2, 4, 8, 16 秒

问题:在 API 明确告诉你"等 3 秒"的情况下,你从 1 秒开始重试,会连续触发两次 3001。只有到第三次才恰好"猜对"。这期间你浪费了两次请求配额,甚至可能加速被封禁。


❌ 错误做法三:不处理 delta-seconds 为 0 的情况

有些 API 在负载极高时会返回 Retry-After: 0Retry-After: 1,意味着"立即重试,但降低频率"。如果你的代码只处理 > 0 的情况,会跳过这个信号。


✅ 正确做法:优先使用 Retry-After,结合上限保护

import time
import requests

def request_with_retry(url, headers, max_retries=5):
    """
    标准限频自适应重试:
    1. 优先读取 Retry-After
    2. 以 Retry-After 为基础,叠加少量随机抖动
    3. 设置最大等待时间上限
    """
    for attempt in range(max_retries):
        response = requests.get(url, headers=headers, timeout=(3.05, 10))

        if response.status_code == 200:
            return response.json()

        # 解析 Retry-After 头
        retry_after_raw = response.headers.get("Retry-After")

        if response.status_code == 429:
            if retry_after_raw:
                # 解析 delta-seconds 格式
                try:
                    retry_after = int(retry_after_raw)
                except ValueError:
                    # 如果是 HTTP-date 格式,计算距离现在的时间差
                    from email.utils import parsedate_to_datetime
                    future_time = parsedate_to_datetime(retry_after_raw)
                    retry_after = int((future_time - datetime.now(timezone.utc)).total_seconds())
                    retry_after = max(1, retry_after)  # 至少等 1 秒
            else:
                # 服务端未返回 Retry-After,使用保守退避
                retry_after = min(60, 2 ** attempt)

            # 添加抖动:避免大量客户端同时重试(惊群效应)
            jitter = random.uniform(0, retry_after * 0.1)
            actual_wait = retry_after + jitter

            # 设置绝对上限,防止服务端错误配置导致长时间阻塞
            actual_wait = min(actual_wait, 120)

            print(f"[RateLimit] Attempt {attempt+1} hit 429. Waiting {actual_wait:.2f}s (Retry-After: {retry_after}s)")
            time.sleep(actual_wait)
        else:
            # 非 429 错误,不重试
            raise RuntimeError(f"HTTP {response.status_code}: {response.text}")

    raise RuntimeError(f"Max retries ({max_retries}) exceeded after rate limit")

三、令牌桶:本地限频控制的核心算法

3.1 为什么需要本地令牌桶

Retry-After 处理的是被动响应——服务端告诉你超限了,你才知道慢下来。但如果你的代码在触发限频之前就做好了速率控制,就可以把 3001 的触发概率降到最低。

令牌桶(Token Bucket)算法是实现本地限频控制的标准方案。相比简单的计数器,令牌桶允许突发流量——你可以一次用掉多个累积的令牌,但不能持续超过上限。

3.2 令牌桶的工作原理

每过 1/r 秒(r = 每秒补充的令牌数),桶中增加 1 个令牌
桶中最多容纳 b 个令牌(桶的容量)
每次请求消耗 1 个令牌
如果桶中令牌不足,拒绝请求

三个关键参数

  • r(refill rate):每秒补充的令牌数,即允许的长期平均 QPS
  • b(bucket size):桶的容量,即允许的最大突发量
  • 消耗量:每次请求消耗 1 个令牌

3.3 生产级令牌桶实现

import time
import threading
from typing import Optional
from dataclasses import dataclass, field
import logging

logger = logging.getLogger(__name__)


@dataclass
class TokenBucket:
    """
    线程安全的令牌桶实现。

    参数:
        rate: 每秒补充的令牌数(QPS 上限)
        capacity: 桶的容量(最大突发量)
        initial_tokens: 初始令牌数(默认等于容量)
    """
    rate: float          # 每秒补充的令牌数
    capacity: float      # 桶的容量
    initial_tokens: float = field(default=None)

    _tokens: float = field(init=False, default=0.0)
    _last_refill_time: float = field(init=False, default=0.0)
    _lock: threading.Lock = field(init=False, default_factory=threading.Lock)

    def __post_init__(self):
        if self.initial_tokens is None:
            self.initial_tokens = self.capacity
        self._tokens = self.initial_tokens
        self._last_refill_time = time.monotonic()

    def _refill(self) -> None:
        """根据时间流逝自动补充令牌"""
        now = time.monotonic()
        elapsed = now - self._last_refill_time

        # 计算应该补充的令牌数
        tokens_to_add = elapsed * self.rate
        self._tokens = min(self.capacity, self._tokens + tokens_to_add)
        self._last_refill_time = now

    def acquire(self, tokens: int = 1, blocking: bool = False, timeout: Optional[float] = None) -> bool:
        """
        获取令牌。

        参数:
            tokens: 需要获取的令牌数量
            blocking: 是否阻塞等待(True = 等待令牌充足,False = 立即返回)
            timeout: 阻塞模式下的最大等待时间(秒)

        返回:
            True: 成功获取令牌
            False: 未获取到令牌(blocking=False 时)

        抛出:
            TimeoutError: 等待超时(blocking=True 时)
        """
        if tokens > self.capacity:
            raise ValueError(f"Requested {tokens} tokens, but bucket capacity is {self.capacity}")

        deadline = None if timeout is None else time.monotonic() + timeout

        with self._lock:
            while True:
                self._refill()

                if self._tokens >= tokens:
                    self._tokens -= tokens
                    logger.debug(f"[TokenBucket] Acquired {tokens} token(s). Remaining: {self._tokens:.2f}")
                    return True

                if not blocking:
                    logger.debug(f"[TokenBucket] Failed to acquire {tokens} token(s). Only {self._tokens:.2f} available.")
                    return False

                # 计算需要等待多久
                tokens_needed = tokens - self._tokens
                wait_time = tokens_needed / self.rate

                if deadline is not None:
                    remaining = deadline - time.monotonic()
                    if remaining <= 0:
                        raise TimeoutError(f"Timed out after {timeout}s waiting for {tokens} token(s)")
                    wait_time = min(wait_time, remaining)

                # 释放锁,等待一段时间后再重试
                # 注意:这里用条件变量会更高效,但简单起见使用 sleep + 重新获取锁
                time.sleep(min(wait_time, 0.1))  # 最多等 0.1 秒,避免长时间持锁

    def get_available_tokens(self) -> float:
        """获取当前可用令牌数(不阻塞)"""
        with self._lock:
            self._refill()
            return self._tokens

    def get_wait_time(self, tokens: int = 1) -> float:
        """估算获取指定数量令牌需要的等待时间"""
        with self._lock:
            self._refill()
            if self._tokens >= tokens:
                return 0.0
            return (tokens - self._tokens) / self.rate

3.4 与 TickDB API 集成

import os
import time
import requests
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 从环境变量读取 API Key
TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY")
if not TICKDB_API_KEY:
    raise ValueError("请设置环境变量 TICKDB_API_KEY")

# 全局令牌桶:假设 TickDB 允许每秒 10 个请求
# 注意:实际限制请参考 TickDB 官方文档中的具体 QPS 上限
global_bucket = TokenBucket(rate=10.0, capacity=10)


def fetch_kline(symbol: str, interval: str, limit: int = 100) -> dict:
    """
    使用令牌桶控制的 TickDB K 线数据获取。

    ⚠️ 生产环境高频场景建议使用 aiohttp/asyncio 异步架构,
       将令牌桶替换为 asyncio.Semaphore 实现。
    """
    # 先获取预估等待时间,日志记录
    wait_time = global_bucket.get_wait_time(1)
    if wait_time > 0:
        logger.info(f"[{symbol}] 令牌桶等待 {wait_time:.3f}s")

    # 阻塞获取令牌(最多等 30 秒)
    try:
        global_bucket.acquire(1, blocking=True, timeout=30.0)
    except TimeoutError:
        raise RuntimeError(f"获取令牌超时,无法在 30s 内完成 {symbol} 的数据请求")

    # 实际发起请求
    headers = {
        "X-API-Key": TICKDB_API_KEY,
        "Content-Type": "application/json"
    }
    params = {
        "symbol": symbol,
        "interval": interval,
        "limit": limit
    }

    response = requests.get(
        "https://api.tickdb.ai/v1/market/kline",
        headers=headers,
        params=params,
        timeout=(3.05, 10)  # 连接超时 3.05s,读取超时 10s
    )

    # 处理 3001 限频错误(兜底保护)
    if response.status_code == 429:
        retry_after_raw = response.headers.get("Retry-After", "60")
        try:
            retry_after = int(retry_after_raw)
        except ValueError:
            retry_after = 60

        logger.warning(f"[{symbol}] 令牌桶未能阻止 429。服务端要求等待 {retry_after}s。")
        jitter = random.uniform(0, retry_after * 0.1)
        time.sleep(retry_after + jitter)

        # 重试一次
        response = requests.get(
            "https://api.tickdb.ai/v1/market/kline",
            headers=headers,
            params=params,
            timeout=(3.05, 10)
        )

    if response.status_code != 200:
        raise RuntimeError(f"[{symbol}] 请求失败: HTTP {response.status_code} - {response.text}")

    return response.json()


# 示例:批量获取多只股票数据
if __name__ == "__main__":
    symbols = ["AAPL.US", "TSLA.US", "NVDA.US", "META.US", "AMZN.US"]
    results = {}

    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = {
            executor.submit(fetch_kline, symbol, "1h", 100): symbol
            for symbol in symbols
        }

        for future in as_completed(futures):
            symbol = futures[future]
            try:
                results[symbol] = future.result()
                logger.info(f"[{symbol}] 数据获取成功")
            except Exception as e:
                logger.error(f"[{symbol}] 数据获取失败: {e}")

四、生产级架构:三层限频保护体系

4.1 分层防御模型

单纯的令牌桶还不够。生产环境中,限频控制需要三个层次的协同:

层级 位置 机制 作用
第一层:本地令牌桶 客户端 令牌桶算法 在请求发出前控制频率,从源头避免触发服务端限频
第二层:响应拦截 客户端 Retry-After 处理 服务端已限频时的被动响应,精确等待
第三层:全局协调 多客户端 分布式令牌桶(Redis) 多实例协作时共享 QPS 配额

4.2 多实例场景下的分布式令牌桶

在量化团队中,多个策略实例可能同时运行。如果每个实例都各自为政,整体 QPS 仍可能超标。

import redis
import time
import logging

logger = logging.getLogger(__name__)


class DistributedTokenBucket:
    """
    基于 Redis 的分布式令牌桶。
    使用 Redis 的 INCR + EXPIRE 原子操作实现,支持多进程/多机器共享配额。
    """

    def __init__(
        self,
        redis_client: redis.Redis,
        key: str,
        rate: float,
        capacity: int,
        window_seconds: int = 1
    ):
        self.redis = redis_client
        self.key = key
        self.rate = rate
        self.capacity = capacity
        self.window_seconds = window_seconds

    def acquire(self, tokens: int = 1) -> tuple[bool, float]:
        """
        尝试获取令牌。

        返回:
            (success, retry_after): 是否成功,以及如果失败需要等待多少秒
        """
        # Lua 脚本保证原子性:检查计数 + 增加计数
        lua_script = """
        local key = KEYS[1]
        local capacity = tonumber(ARGV[1])
        local window = tonumber(ARGV[2])
        local tokens = tonumber(ARGV[3])

        local current = redis.call('GET', key)

        if current == false then
            current = 0
        else
            current = tonumber(current)
        end

        if current + tokens > capacity then
            local ttl = redis.call('TTL', key)
            if ttl < 0 then
                ttl = window
            end
            return {0, ttl}
        end

        redis.call('INCRBY', key, tokens)
        redis.call('EXPIRE', key, window)
        return {1, 0}
        """

        result = self.redis.eval(
            lua_script,
            1,  # KEYS 数量
            self.key,
            self.capacity,
            self.window_seconds,
            tokens
        )

        success = bool(result[0])
        retry_after = float(result[1])

        if not success:
            logger.info(f"[DistributedBucket] Rate limited. Retry after {retry_after}s")
        else:
            logger.debug(f"[DistributedBucket] Acquired {tokens} token(s)")

        return success, retry_after

    def wait_and_acquire(self, timeout: float = 30.0) -> bool:
        """阻塞等待直到获取令牌或超时"""
        deadline = time.monotonic() + timeout

        while time.monotonic() < deadline:
            success, retry_after = self.acquire(1)

            if success:
                return True

            # 等待至少 retry_after 秒,但不超过剩余时间
            wait_time = min(retry_after, deadline - time.monotonic())
            if wait_time > 0:
                time.sleep(wait_time)

        raise TimeoutError(f"分布式令牌桶获取超时({timeout}s)")

五、常见陷阱与工程警示

5.1 时钟漂移问题

Retry-After: 0 不代表"立即重试"。在多实例场景下,机器之间的时钟可能存在毫秒级偏差。如果你的代码在收到 Retry-After: 0 后立即重试,可能比服务端的"解除限频"时间早了几毫秒,再次触发限频。

最佳实践Retry-After 值小于 2 秒时,额外等待 random.uniform(0, 0.5) 作为缓冲。

5.2 并发惊群效应

当多个客户端实例在同一时刻收到限频响应时,如果它们都在 Retry-After 后同时重试,会在下一个瞬间再次造成流量突刺,触发新的限频。

最佳实践:在 Retry-After 的基础上叠加随机抖动(jitter),通常取 Retry-After * 0.1 作为抖动的最大值。Google 的 "Exponential Backoff" 论文建议使用完全随机抖动(jitter = random(0, base * 2^i))。

5.3 幂等性检查

收到 3001 响应后,你的代码需要判断:上一条请求是否真的被服务端处理了

  • 如果是写操作(下单、修改设置):需要确认请求是否已生效
  • 如果是读操作(获取行情):可以安全重试

最佳实践:对读操作使用幂等重试,对写操作在重试前检查前一次请求的状态。

5.4 限频不等于错误

很多开发者在收到 429 后直接打日志为 ERROR 级别。这会导致告警噪音——限频是正常运营状态,不应该触发告警。

推荐日志级别

  • 首次触发 429:WARNING
  • 连续触发 429(>3 次):ERROR(可能是配置问题)
  • 触发 3001 并正确等待后成功:INFO(正常降速)

六、完整请求管道代码

以下是整合了令牌桶、Retry-After 处理、抖动和工程健壮性的完整请求管道,可作为项目中的基础模块直接使用:

"""
TickDB API 请求管道
包含三层限频保护:本地令牌桶 + Retry-After + 分布式协调(可选)
"""
import os
import time
import random
import logging
from typing import Optional, Any
from dataclasses import dataclass

try:
    import requests
except ImportError:
    raise ImportError("requests 库未安装。请运行: pip install requests")

try:
    import redis
    REDIS_AVAILABLE = True
except ImportError:
    REDIS_AVAILABLE = False

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)


@dataclass
class RateLimitConfig:
    """限频配置参数"""
    qps: float = 10.0                    # 每秒最大请求数
    burst: int = 10                      # 最大突发量(桶容量)
    retry_after_min: int = 1             # Retry-After 最小值(秒)
    retry_after_max: int = 120           # Retry-After 最大值(秒)
    jitter_factor: float = 0.1           # 抖动系数(Retry-After 的百分比)
    max_retries: int = 5                 # 最大重试次数


class TokenBucket:
    """本地令牌桶(见本文第三部分)"""

    def __init__(self, rate: float, capacity: float, initial_tokens: float = None):
        import threading
        self.rate = rate
        self.capacity = capacity
        self._tokens = initial_tokens if initial_tokens is not None else capacity
        self._last_refill_time = time.monotonic()
        self._lock = threading.Lock()

    def _refill(self):
        now = time.monotonic()
        elapsed = now - self._last_refill_time
        self._tokens = min(self.capacity, self._tokens + elapsed * self.rate)
        self._last_refill_time = now

    def acquire(self, blocking: bool = False, timeout: Optional[float] = None) -> bool:
        deadline = None if timeout is None else time.monotonic() + timeout
        with self._lock:
            while True:
                self._refill()
                if self._tokens >= 1:
                    self._tokens -= 1
                    return True
                if not blocking:
                    return False
                wait = (1 - self._tokens) / self.rate
                if deadline is not None:
                    remaining = deadline - time.monotonic()
                    if remaining <= 0:
                        return False
                    wait = min(wait, remaining, 0.1)
                else:
                    wait = min(wait, 0.1)
                time.sleep(wait)


class TickDBRequestPipeline:
    """
    TickDB API 请求管道。
    三层限频保护:本地令牌桶 → Retry-After 响应处理 → 指数退避兜底
    """

    BASE_URL = "https://api.tickdb.ai/v1"

    def __init__(
        self,
        api_key: str = None,
        config: RateLimitConfig = None,
        redis_client: "redis.Redis" = None
    ):
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("必须提供 API Key:参数 api_key 或环境变量 TICKDB_API_KEY")

        self.config = config or RateLimitConfig()
        self.redis_client = redis_client
        self.local_bucket = TokenBucket(
            rate=self.config.qps,
            capacity=self.config.burst
        )
        self._consecutive_429_count = 0

    def _get(self, endpoint: str, params: dict = None) -> requests.Response:
        """执行单个 GET 请求,包含心跳和超时"""
        url = f"{self.BASE_URL}{endpoint}"
        headers = {
            "X-API-Key": self.api_key,
            "Accept": "application/json"
        }

        return requests.get(
            url,
            headers=headers,
            params=params,
            timeout=(3.05, 10)  # 连接超时 3.05s,读取超时 10s
        )

    def _handle_rate_limit(self, response: requests.Response) -> float:
        """
        处理 429 响应,返回需要等待的秒数。

        ⚠️ 此方法会更新内部状态,调用时需要注意线程安全。
        """
        retry_after_raw = response.headers.get("Retry-After")

        self._consecutive_429_count += 1

        if retry_after_raw:
            try:
                base_wait = int(retry_after_raw)
            except ValueError:
                # HTTP-date 格式:计算距离现在的时间差
                from email.utils import parsedate_to_datetime
                future_time = parsedate_to_datetime(retry_after_raw)
                base_wait = int((future_time - datetime.now(timezone.utc)).total_seconds())
                base_wait = max(1, base_wait)
        else:
            # 无 Retry-After,使用指数退避作为兜底
            base_wait = min(self.config.retry_after_max, 2 ** self._consecutive_429_count)

        # 应用抖动(惊群效应防护)
        jitter = random.uniform(0, base_wait * self.config.jitter_factor)
        actual_wait = min(base_wait + jitter, self.config.retry_after_max)

        # 应用最小等待限制(时钟漂移防护)
        actual_wait = max(actual_wait, self.config.retry_after_min)

        if self._consecutive_429_count <= 3:
            logger.warning(
                f"[RateLimit] 连续第 {self._consecutive_429_count} 次触发 429。 "
                f"服务端建议等待 {base_wait}s,实际等待 {actual_wait:.2f}s"
            )
        else:
            logger.error(
                f"[RateLimit] 连续触发 {self._consecutive_429_count} 次 429。"
                f"请检查是否超出了 API 的 QPS 上限。"
            )

        return actual_wait

    def request(
        self,
        endpoint: str,
        params: dict = None,
        require_success: bool = True
    ) -> Optional[dict]:
        """
        执行带限频控制的请求。

        参数:
            endpoint: API 端点(如 "/market/kline")
            params: 查询参数
            require_success: 如果为 True,重试耗尽后抛出异常;否则返回 None

        返回:
            成功时返回响应 JSON,失败时返回 None(require_success=False)或抛出异常
        """
        for attempt in range(self.config.max_retries):
            # 第一层:本地令牌桶限速
            try:
                self.local_bucket.acquire(blocking=True, timeout=30.0)
            except TimeoutError:
                raise RuntimeError(
                    "令牌桶获取超时(30s)。"
                    "可能是 QPS 配置过低或网络异常。"
                )

            # 第二层:发起请求
            response = self._get(endpoint, params)

            if response.status_code == 200:
                self._consecutive_429_count = 0  # 重置计数器
                return response.json()

            if response.status_code == 429:
                wait_time = self._handle_rate_limit(response)
                time.sleep(wait_time)
                continue

            # 非 429 的其他 HTTP 错误(如 500、502、504)
            logger.error(f"[HTTP Error] {response.status_code}: {response.text[:200]}")
            if attempt < self.config.max_retries - 1:
                time.sleep(min(2 ** attempt, 30))
                continue
            break

        # 所有重试均失败
        error_msg = f"请求 {endpoint} 失败,已重试 {self.config.max_retries} 次"
        if require_success:
            raise RuntimeError(error_msg)
        logger.error(error_msg)
        return None


# 使用示例
if __name__ == "__main__":
    pipeline = TickDBRequestPipeline()

    data = pipeline.request(
        "/market/kline",
        params={"symbol": "BTC.USDT", "interval": "1h", "limit": 100}
    )
    print(f"获取到 {len(data.get('data', []))} 条 K 线数据")

结语

API 限频不是故障,是 API 服务正常运营的一部分。把它当成故障来处理,你的代码会越修越乱;把它当成信号来处理,你的系统会越跑越稳。

核心原则只有三条:

  1. 永远读 Retry-After,不要猜
  2. 永远加随机抖动,不要让多个实例同步重试
  3. 永远在客户端加令牌桶,把限频拦截在到达服务端之前

下一步行动

如果你想直接集成到项目中,访问 tickdb.ai 在控制台生成 API Key,复制上文的 TickDBRequestPipeline 代码即可运行。代码中已包含完整的限频处理逻辑,无需额外依赖。

如果你在使用高频策略,建议将 requests 替换为 aiohttp + asyncio,将令牌桶替换为 asyncio.Semaphore,这样可以在单线程内实现真正的并发控制,避免同步阻塞影响策略响应速度。

如果你在多台机器上运行策略,在 TickDBRequestPipeline 初始化时传入 Redis 客户端,即可获得分布式令牌桶能力,多实例共享 QPS 配额。


风险提示:本文提供的令牌桶参数(QPS=10)为通用参考值。实际 API 的限频上限请以 TickDB 官方文档为准。限频配置过高可能导致 API Key 被封禁。

本文不构成任何投资建议。市场有风险,投资需谨慎。