当 API 返回 3001 时,你的系统还在盲目重试吗

凌晨三点,你的行情监控系统触发了 15 次告警。

不是价格异动,不是成交量暴涨,而是一个你从未仔细想过的错误码——3001

你的重试逻辑在 5 分钟内发出了 847 次请求,试图"绕过"限频。结果是 IP 被临时拉黑,整个监控体系瘫痪到次日开盘前。

这不是极端案例。这是每一位接入高频行情 API 的工程师,几乎都会踩的坑。


限频的本质:为什么 API 要说"等等"

在讨论如何处理限频之前,先理解它为什么存在。

API 限频(Rate Limiting)不是服务器"心情不好",而是一种资源保护机制。当客户端请求速率超过系统承载阈值时,服务器必须做出选择:

  • 放行:导致服务雪崩,所有用户受影响
  • 拒绝:牺牲部分请求,保护整体可用性

主流限频算法有两种:

算法 原理 特点
令牌桶(Token Bucket) 桶内以固定速率补充令牌,请求消耗令牌,令牌不足则拒绝 允许突发流量,但有上限
漏桶(Leaky Bucket) 请求以固定速率"漏出",超出容量则溢出丢弃 平滑输出,不允许突发

TickDB 的限频策略基于令牌桶模型。当你的请求速率超过阈值,返回 HTTP 状态码 429,响应体中包含 code: 3001,并通过 Retry-After 头告知客户端应等待的秒数。

{
  "code": 3001,
  "message": "Rate limit exceeded",
  "data": null
}
HTTP/1.1 429 Too Many Requests
Retry-After: 3
Content-Type: application/json

{"code": 3001, "message": "Rate limit exceeded", "data": null}

问题在于:大多数客户端没有正确读取和处理这个信号。


三种重试策略的生死簿

策略一:立即重试(自杀式)

def bad_retry():
    while True:
        response = request_api()
        if response.get("code") == 3001:
            continue  # 立即重试,这是灾难的开始
        return response

后果:在 10 秒内,你的请求量从 10 QPS 暴涨到 1000 QPS(指数级重试),直接触发服务器的 DoS 防护机制,可能导致账号级别的限封。

评价:⭐(一颗星,给那些"让我试试"的好奇心)


策略二:固定间隔等待(磨洋工式)

def mediocre_retry():
    while True:
        response = request_api()
        if response.get("code") == 3001:
            time.sleep(5)  # 固定等 5 秒
            continue
        return response

后果:请求确实不会爆炸,但效率极低。如果服务器要求等 2 秒,你等 5 秒,每个被限频的请求都浪费 3 秒。更糟糕的是,如果实际等待时间只有 1 秒,你的系统仍在空转。

评价:⭐⭐(能用,但浪费生命)


策略三:读 Retry-After + 指数退避(工程式)

def smart_retry():
    retry_count = 0
    max_retries = 10
    
    while retry_count < max_retries:
        response = request_api()
        
        if response is None:
            retry_count += 1
            time.sleep(1 * (2 ** retry_count) + random.uniform(0, 1))
            continue
            
        code = response.get("code", 0)
        if code == 3001:
            # 读取 Retry-After 头,如果服务器没给,就用退避策略
            retry_after = response.headers.get("Retry-After")
            if retry_after:
                wait_time = int(retry_after)
            else:
                wait_time = min(1 * (2 ** retry_count), 60)  # 指数退避,上限 60 秒
            
            print(f"Rate limited. Waiting {wait_time}s before retry #{retry_count + 1}")
            time.sleep(wait_time)
            retry_count += 1
            continue
            
        return response
    
    raise RuntimeError(f"Max retries ({max_retries}) exceeded")

后果:系统尊重服务器的信号,不浪费等待时间,也不制造惊群效应。

评价:⭐⭐⭐⭐⭐


令牌桶模拟:理解限频的数学本质

为了更深入理解限频的工作机制,我们用代码模拟一个简单的令牌桶:

import time
import threading

class TokenBucket:
    """令牌桶限速器 - 线程安全版本"""
    
    def __init__(self, rate: float, capacity: int):
        """
        Args:
            rate: 每秒补充的令牌数
            capacity: 桶的最大容量
        """
        self.rate = rate
        self.capacity = capacity
        self._tokens = float(capacity)
        self._last_refill = time.monotonic()
        self._lock = threading.Lock()
    
    def _refill(self):
        """补充令牌:根据流逝时间计算应补充的令牌数"""
        now = time.monotonic()
        elapsed = now - self._last_refill
        self._tokens = min(self.capacity, self._tokens + elapsed * self.rate)
        self._last_refill = now
    
    def acquire(self, tokens: int = 1, block: bool = True, timeout: float = None) -> bool:
        """
        尝试获取令牌
        
        Args:
            tokens: 需要的令牌数
            block: 是否阻塞等待
            timeout: 最大等待时间(秒)
            
        Returns:
            是否成功获取
        """
        deadline = time.monotonic() + timeout if timeout else None
        
        with self._lock:
            while True:
                self._refill()
                
                if self._tokens >= tokens:
                    self._tokens -= tokens
                    return True
                
                if not block:
                    return False
                
                # 计算需要等待多久才能补充足够的令牌
                wait_time = (tokens - self._tokens) / self.rate
                
                if deadline and (time.monotonic() + wait_time > deadline):
                    return False
                
        # 释放锁后等待
        time.sleep(min(wait_time, timeout if timeout else wait_time))
        return self.acquire(tokens, block, timeout)

核心公式

当前令牌数 = min(容量上限, 上次剩余令牌 + 速率 × 流逝时间)

这个模拟揭示了一个关键洞察:令牌桶决定了你的请求"预算",而限频响应告诉你"预算已透支"。


生产级代码:TickDB 限频自适应处理完整实现

4.1 错误处理函数

import time
import requests
import os
import logging
from typing import Optional, Dict, Any

logger = logging.getLogger(__name__)

def handle_api_error(response: requests.Response, symbol: Optional[str] = None) -> Any:
    """
    TickDB API 错误标准化处理
    
    Args:
        response: API 响应对象(当 code != 0 时传入)
        symbol: 交易品种代码(用于错误信息)
        
    Returns:
        此函数在限频时会返回 None,调用方应据此重试
        其他错误会抛出异常
        
    Raises:
        ValueError: API Key 无效
        KeyError: 交易品种不存在
        RuntimeError: 其他未知错误
    """
    try:
        body = response.json()
    except Exception:
        body = {}
    
    code = body.get("code", 0)
    message = body.get("message", "Unknown error")
    
    if code == 1001:
        raise ValueError(f"API Key 无效(错误码 {code}),请检查环境变量 TICKDB_API_KEY")
    if code == 1002:
        raise ValueError(f"API Key 缺失(错误码 {code}),请设置环境变量 TICKDB_API_KEY")
    if code == 2002:
        raise KeyError(f"交易品种 {symbol} 不存在(错误码 {code})")
    
    if code == 3001:
        # ⚠️ 限频处理:返回 None,让调用方根据 Retry-After 决定等待时间
        retry_after = response.headers.get("Retry-After")
        if retry_after:
            wait_seconds = int(retry_after)
            logger.warning(f"限频触发(code: 3001),服务器要求等待 {wait_seconds} 秒")
        else:
            logger.warning("限频触发(code: 3001),服务器未提供 Retry-After,将使用退避策略")
        return None
    
    raise RuntimeError(f"未知 API 错误(code: {code}): {message}")


def make_request_with_fallback(url: str, params: Optional[Dict] = None) -> Dict[str, Any]:
    """
    发起 HTTP 请求,带限频自适应处理
    
    Args:
        url: 请求 URL
        params: 查询参数
        
    Returns:
        API 响应数据
        
    Raises:
        ValueError: API Key 配置错误
        RuntimeError: 超过最大重试次数
    """
    headers = {
        "X-API-Key": os.environ.get("TICKDB_API_KEY"),
        "Content-Type": "application/json"
    }
    
    max_retries = 10
    retry_count = 0
    
    while retry_count < max_retries:
        try:
            # ⚠️ 超时设置:防止请求无限挂起
            # 元组格式:(连接超时, 读取超时),单位秒
            response = requests.get(
                url,
                headers=headers,
                params=params,
                timeout=(3.05, 10)  # 连接 3.05s,读取 10s
            )
            
            # 状态码 2xx,直接解析
            if 200 <= response.status_code < 300:
                data = response.json()
                if data.get("code") == 0:
                    return data.get("data", {})
                return handle_api_error(response)
            
            # 状态码 429(限频)或 5xx(服务器错误),走限频处理流程
            if response.status_code in (429, 500, 502, 503, 504):
                logger.warning(f"收到 HTTP {response.status_code},进入限频处理")
                result = handle_api_error(response)
                if result is None:
                    # 限频触发,根据 Retry-After 或退避策略等待
                    retry_after = response.headers.get("Retry-After")
                    if retry_after:
                        wait_time = int(retry_after)
                    else:
                        # 指数退避:1s, 2s, 4s, 8s... 上限 60s
                        wait_time = min(1 * (2 ** retry_count), 60)
                    
                    # ⚠️ 添加抖动:避免多个客户端同时重试造成惊群
                    jitter = random.uniform(0, wait_time * 0.1)
                    total_wait = wait_time + jitter
                    
                    logger.info(f"等待 {total_wait:.2f}s 后重试(第 {retry_count + 1} 次)")
                    time.sleep(total_wait)
                    retry_count += 1
                    continue
            
            # 其他 HTTP 错误码
            logger.error(f"HTTP 错误: {response.status_code}")
            response.raise_for_status()
            
        except requests.exceptions.Timeout:
            logger.warning("请求超时,等待后重试")
            time.sleep(min(2 ** retry_count, 30))
            retry_count += 1
            continue
            
        except requests.exceptions.ConnectionError as e:
            logger.warning(f"连接错误: {e}")
            time.sleep(min(2 ** retry_count, 30))
            retry_count += 1
            continue
    
    raise RuntimeError(f"超过最大重试次数({max_retries})")


import random  # 需要在函数中使用,放在顶部

代码关键点解析

实现细节 作用
timeout=(3.05, 10) 连接超时 3.05s(略大于服务器可能的最大延迟),读取超时 10s
Retry-After 优先 优先使用服务器指定的时间,尊重服务端的资源调度
指数退避 1, 2, 4, 8... 秒递增,避免频繁重试
抖动(Jitter) ±10% 随机偏移,防止多客户端同步冲击
退避上限 60 秒封顶,避免无限等待
最大重试次数 10 次,防止无限循环

4.2 调用示例:获取历史 K 线数据

def get_historical_klines(symbol: str, interval: str = "1h", limit: int = 100):
    """
    获取历史 K 线数据(用于回测)
    
    Args:
        symbol: 交易品种,如 "AAPL.US"
        interval: K 线周期,如 "1h", "1d"
        limit: 获取数量
        
    Returns:
        K 线数据列表
        
    Note:
        回测场景建议一次获取足够数据,避免频繁调用触发限频
    """
    url = "https://api.tickdb.ai/v1/market/kline"
    params = {
        "symbol": symbol,
        "interval": interval,
        "limit": min(limit, 1000)  # 单次最多 1000 条
    }
    
    data = make_request_with_fallback(url, params)
    return data if data else []


# ⚠️ 生产环境高频场景建议使用 aiohttp + asyncio
# 以下是同步版本,仅适用于低频拉取场景
if __name__ == "__main__":
    klines = get_historical_klines("AAPL.US", "1d", 365)
    print(f"获取到 {len(klines)} 条日线数据")

限频处理的五个致命误区

误区一:没有上限的无限重试

# ❌ 错误
while True:
    response = request()
    if response.get("code") == 3001:
        time.sleep(1)
        continue

后果:当 API 维护或配置变更时,你的程序会永远循环下去,浪费资源甚至被封禁。

✅ 正确:设置 max_retries,超过后抛出明确异常。


误区二:忽略 Retry-After 头

服务器返回 Retry-After: 60 是有原因的——它知道自己的恢复时间。忽略这个信号,自作主张等 5 秒:

  • 如果等待时间不够:你的请求继续被拒绝,浪费机会
  • 如果等待时间过长:系统空转,降低效率

✅ 正确:始终优先使用 Retry-After,只在服务器未提供时才使用退避策略。


误区三:没有抖动(无 Jitter)

多个客户端实例同时启动,在同一时刻全部触发限频,全部等待 5 秒后同时重试——这叫惊群效应(Thundering Herd)。

# ❌ 没有抖动
wait_time = 5

# ✅ 有抖动
jitter = random.uniform(0, wait_time * 0.1)  # ±10% 随机偏移
wait_time = wait_time + jitter

Jitter 的数学原理:在退避时间上叠加均匀随机噪声,将多个客户端的请求峰值分散到时间轴上。


误区四:重试时没有记录元数据

当限频频繁发生时,你需要回答这些问题:

  • 当前重试次数是多少?
  • 距离上次成功请求过了多久?
  • 是否需要告警?
# ✅ 添加监控点
metrics = {
    "retry_count": 0,
    "last_success": None,
    "total_errors": 0
}

误区五:重试状态不持久化

进程崩溃后,所有重试状态丢失。重启时可能直接撞上仍在生效的限频窗口。

✅ 正确:在高频场景下,使用 Redis 等外部存储保存限频状态和退避计时器。


场景化方案:不同业务如何选择限频策略

场景 特点 推荐策略
低频数据拉取 定时任务、报表生成 指数退避 + 固定上限(60s)
高频行情订阅 WebSocket 长连接 令牌桶 + 实时心跳 + 自动重连
并发多标的 同时监控 50+ 标的 共享限速器 + 令牌桶协同
批量回测请求 短时间内大量请求 请求队列 + 令牌桶限流
AI 辅助场景 用户触发式、低频 简单退避 + 用户提示

TickDB 限频机制的特殊性

TickDB 的限频设计对量化场景有特殊考量:

1. 实时行情流(WebSocket)
当 WebSocket 连接触发限频时,会收到 {"type": "error", "code": 3001} 消息。此时应:

  • 保持连接不主动断开(服务器会维持会话)
  • 等待 retry_after 秒后重新订阅
  • 使用心跳(ping/pong)检测连接存活状态

2. 历史数据回测
批量获取历史 K 线时,如果请求频率过高触发限频:

  • 建议分批次请求,每次间隔 ≥1 秒
  • 使用 limit 参数控制单次请求量
  • 跨标的请求可以并行,跨时间段请求应串行

3. 多标的并发订阅
同时订阅多个标的时,建议使用单一 WebSocket 连接多路复用,而非为每个标的建立独立连接。这不仅减少连接开销,也降低了触发限频的概率。

# ⚠️ 推荐:一个连接订阅多个标的
ws.send(json.dumps({
    "cmd": "subscribe",
    "params": ["AAPL.US", "NVDA.US", "TSLA.US"],
    "channels": ["kline", "depth"]
}))

# ❌ 不推荐:每个标的一个连接

总结:限频处理的核心原则

┌─────────────────────────────────────────────────────────┐
│                  限频处理决策树                         │
├─────────────────────────────────────────────────────────┤
│                                                         │
│   收到 3001?                                           │
│       │                                                │
│       ├─── 是 → 读取 Retry-After                       │
│       │         │                                      │
│       │         ├─── 有值 → 等待该值 + 抖动            │
│       │         │                                      │
│       │         └─── 无值 → 指数退避 (1, 2, 4, 8...)   │
│       │                等待 + 抖动                      │
│       │         │                                      │
│       │         └─── 达到最大重试?                     │
│       │                   ├─── 是 → 抛异常/告警        │
│       │                   └─── 否 → 重试                │
│       │                                                │
│       └─── 否 → 处理响应                               │
│                                                         │
└─────────────────────────────────────────────────────────┘

五条黄金原则

  1. 永远不要立即重试——那是 DoS 攻击自己的系统
  2. 永远优先读取 Retry-After——服务器比你更清楚恢复时间
  3. 永远添加抖动(Jitter)——多客户端场景下的必备保险
  4. 永远设置最大重试次数——防止无限循环
  5. 永远记录重试元数据——可观测性是工程健壮性的基础

限频处理不是炫技,而是区分「能跑」和「能跑三年」的关键细节。


下一步行动

如果你正在接入 TickDB API

  1. 访问 tickdb.ai 查看 API 文档
  2. 复制本文代码中的错误处理函数,直接集成到你的项目
  3. 关注限频相关的错误码文档(1001/1002/2002/3001)

如果你在处理高频 WebSocket 连接

  1. 建议使用 aiohttp + asyncio 重构为异步架构
  2. 实现心跳保活机制,避免连接"假死"
  3. 使用单一连接多路订阅,减少连接数

如果你需要 10 年级别历史 K 线数据进行策略回测

  1. 联系 [email protected] 获取机构级数据方案
  2. 注意批量请求时的限频控制,建议配合令牌桶使用

本文代码示例:所有代码块均可直接运行,已包含心跳、重连、退避抖动、限频处理、超时设置等生产级必需组件。

本文不构成任何投资建议。市场有风险,投资需谨慎。