限频自适应处理:当 API 返回 3001 时,你的代码该做什么
开篇
你写的监控程序跑了三个月,一切正常。然后某天凌晨 3 点,你被飞书告警叫醒——不是策略亏损,是连接被封了。
日志里只有一行:
HTTP 429 Too Many Requests
你看了眼代码:哦,有重试逻辑。再一看,10 秒后重试。再一看,10 秒后再重试。
结果:你的 IP 从限频变成了封禁。
这不是假设。Rate Limit 触发后最常见的错误不是"不重试",而是"用错误的方式重试"——要么太急(触发封禁),要么太懒(错过后续行情),要么根本不知道该等多久(直接放弃)。
本文拆解 API 限频的标准处理范式,重点回答三个问题:
- 为什么"等固定时间重试"是错的
Retry-After头到底怎么用才对- 如何在本地模拟令牌桶,做到真正自适应的限频控制
一、问题建模:API 限频的本质
1.1 限频不是惩罚,是资源分配机制
很多开发者把限频理解为"API 提供商不想让你用",这是一个认知偏差。
限频(Rate Limiting)是服务端的资源分配策略,核心目标是:保证所有客户端在高并发场景下仍能获得公平、稳定的服务质量。
以 TickDB 的 WebSocket 连接为例,当某个节点上的活跃连接数接近系统上限时,新连接请求会触发限频。这个限频是动态的,取决于当前节点负载,而不是你发了多少请求。
理解了这一点,你就知道为什么"硬编码等待 10 秒"是错的:服务端告诉你等多久,不是随便写的,是一个基于当前负载计算出的合理值。
1.2 限频的三个维度
| 维度 | 说明 | 典型单位 |
|---|---|---|
| 请求速率限制 | 单位时间内的请求数量上限 | QPS(每秒请求数) |
| 并发连接限制 | 同时存在的活跃连接数 | 连接数 |
| 流量限制 | 单位时间内的数据吞吐量 | MB/s 或 records/s |
TickDB 对 REST API 和 WebSocket 分别施加独立的速率限制,触发条件均为任意一个维度超限。
1.3 错误码 3001 的含义
在 TickDB 的错误响应体系中,code: 3001 表示"请求频率超限":
{
"code": 3001,
"message": "Rate limit exceeded",
"data": null
}
这个错误码会伴随 HTTP 状态码 429(Too Many Requests)一起返回。在响应头中,会包含 Retry-After 字段,值为服务端希望你等待的秒数(整数)。
二、Retry-After 头:不是建议,是指令
2.1 RFC 7231 的标准定义
HTTP 规范(RFC 7231 Section 7.1.3)明确定义了 Retry-After 头:
Retry-After = delta-seconds | HTTP-date
两种格式:
- delta-seconds:等待的秒数(整数),推荐格式
- HTTP-date:绝对时间(RFC 1123 格式),用于表示未来的某个时间点
对于 API 限频场景,几乎所有服务提供商都使用 delta-seconds 格式。
2.2 错误的处理方式 vs 正确的处理方式
❌ 错误做法一:忽略 Retry-After,硬编码等待时间
# 很多教程里的"标准"重试逻辑
for attempt in range(3):
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json()
time.sleep(10) # 固定 10 秒,永远不变
问题:如果服务端让你等 1 秒,你等了 10 秒,效率损失 90%。如果服务端让你等 60 秒,你只等了 10 秒,直接触发封禁。
❌ 错误做法二:指数退避但不读 Retry-After
# 看似"智能"的指数退避
delay = 1
for attempt in range(5):
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json()
time.sleep(delay)
delay *= 2 # 1, 2, 4, 8, 16 秒
问题:在 API 明确告诉你"等 3 秒"的情况下,你从 1 秒开始重试,会连续触发两次 3001。只有到第三次才恰好"猜对"。这期间你浪费了两次请求配额,甚至可能加速被封禁。
❌ 错误做法三:不处理 delta-seconds 为 0 的情况
有些 API 在负载极高时会返回 Retry-After: 0 或 Retry-After: 1,意味着"立即重试,但降低频率"。如果你的代码只处理 > 0 的情况,会跳过这个信号。
✅ 正确做法:优先使用 Retry-After,结合上限保护
import time
import requests
def request_with_retry(url, headers, max_retries=5):
"""
标准限频自适应重试:
1. 优先读取 Retry-After
2. 以 Retry-After 为基础,叠加少量随机抖动
3. 设置最大等待时间上限
"""
for attempt in range(max_retries):
response = requests.get(url, headers=headers, timeout=(3.05, 10))
if response.status_code == 200:
return response.json()
# 解析 Retry-After 头
retry_after_raw = response.headers.get("Retry-After")
if response.status_code == 429:
if retry_after_raw:
# 解析 delta-seconds 格式
try:
retry_after = int(retry_after_raw)
except ValueError:
# 如果是 HTTP-date 格式,计算距离现在的时间差
from email.utils import parsedate_to_datetime
future_time = parsedate_to_datetime(retry_after_raw)
retry_after = int((future_time - datetime.now(timezone.utc)).total_seconds())
retry_after = max(1, retry_after) # 至少等 1 秒
else:
# 服务端未返回 Retry-After,使用保守退避
retry_after = min(60, 2 ** attempt)
# 添加抖动:避免大量客户端同时重试(惊群效应)
jitter = random.uniform(0, retry_after * 0.1)
actual_wait = retry_after + jitter
# 设置绝对上限,防止服务端错误配置导致长时间阻塞
actual_wait = min(actual_wait, 120)
print(f"[RateLimit] Attempt {attempt+1} hit 429. Waiting {actual_wait:.2f}s (Retry-After: {retry_after}s)")
time.sleep(actual_wait)
else:
# 非 429 错误,不重试
raise RuntimeError(f"HTTP {response.status_code}: {response.text}")
raise RuntimeError(f"Max retries ({max_retries}) exceeded after rate limit")
三、令牌桶:本地限频控制的核心算法
3.1 为什么需要本地令牌桶
Retry-After 处理的是被动响应——服务端告诉你超限了,你才知道慢下来。但如果你的代码在触发限频之前就做好了速率控制,就可以把 3001 的触发概率降到最低。
令牌桶(Token Bucket)算法是实现本地限频控制的标准方案。相比简单的计数器,令牌桶允许突发流量——你可以一次用掉多个累积的令牌,但不能持续超过上限。
3.2 令牌桶的工作原理
每过 1/r 秒(r = 每秒补充的令牌数),桶中增加 1 个令牌
桶中最多容纳 b 个令牌(桶的容量)
每次请求消耗 1 个令牌
如果桶中令牌不足,拒绝请求
三个关键参数:
- r(refill rate):每秒补充的令牌数,即允许的长期平均 QPS
- b(bucket size):桶的容量,即允许的最大突发量
- 消耗量:每次请求消耗 1 个令牌
3.3 生产级令牌桶实现
import time
import threading
from typing import Optional
from dataclasses import dataclass, field
import logging
logger = logging.getLogger(__name__)
@dataclass
class TokenBucket:
"""
线程安全的令牌桶实现。
参数:
rate: 每秒补充的令牌数(QPS 上限)
capacity: 桶的容量(最大突发量)
initial_tokens: 初始令牌数(默认等于容量)
"""
rate: float # 每秒补充的令牌数
capacity: float # 桶的容量
initial_tokens: float = field(default=None)
_tokens: float = field(init=False, default=0.0)
_last_refill_time: float = field(init=False, default=0.0)
_lock: threading.Lock = field(init=False, default_factory=threading.Lock)
def __post_init__(self):
if self.initial_tokens is None:
self.initial_tokens = self.capacity
self._tokens = self.initial_tokens
self._last_refill_time = time.monotonic()
def _refill(self) -> None:
"""根据时间流逝自动补充令牌"""
now = time.monotonic()
elapsed = now - self._last_refill_time
# 计算应该补充的令牌数
tokens_to_add = elapsed * self.rate
self._tokens = min(self.capacity, self._tokens + tokens_to_add)
self._last_refill_time = now
def acquire(self, tokens: int = 1, blocking: bool = False, timeout: Optional[float] = None) -> bool:
"""
获取令牌。
参数:
tokens: 需要获取的令牌数量
blocking: 是否阻塞等待(True = 等待令牌充足,False = 立即返回)
timeout: 阻塞模式下的最大等待时间(秒)
返回:
True: 成功获取令牌
False: 未获取到令牌(blocking=False 时)
抛出:
TimeoutError: 等待超时(blocking=True 时)
"""
if tokens > self.capacity:
raise ValueError(f"Requested {tokens} tokens, but bucket capacity is {self.capacity}")
deadline = None if timeout is None else time.monotonic() + timeout
with self._lock:
while True:
self._refill()
if self._tokens >= tokens:
self._tokens -= tokens
logger.debug(f"[TokenBucket] Acquired {tokens} token(s). Remaining: {self._tokens:.2f}")
return True
if not blocking:
logger.debug(f"[TokenBucket] Failed to acquire {tokens} token(s). Only {self._tokens:.2f} available.")
return False
# 计算需要等待多久
tokens_needed = tokens - self._tokens
wait_time = tokens_needed / self.rate
if deadline is not None:
remaining = deadline - time.monotonic()
if remaining <= 0:
raise TimeoutError(f"Timed out after {timeout}s waiting for {tokens} token(s)")
wait_time = min(wait_time, remaining)
# 释放锁,等待一段时间后再重试
# 注意:这里用条件变量会更高效,但简单起见使用 sleep + 重新获取锁
time.sleep(min(wait_time, 0.1)) # 最多等 0.1 秒,避免长时间持锁
def get_available_tokens(self) -> float:
"""获取当前可用令牌数(不阻塞)"""
with self._lock:
self._refill()
return self._tokens
def get_wait_time(self, tokens: int = 1) -> float:
"""估算获取指定数量令牌需要的等待时间"""
with self._lock:
self._refill()
if self._tokens >= tokens:
return 0.0
return (tokens - self._tokens) / self.rate
3.4 与 TickDB API 集成
import os
import time
import requests
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 从环境变量读取 API Key
TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY")
if not TICKDB_API_KEY:
raise ValueError("请设置环境变量 TICKDB_API_KEY")
# 全局令牌桶:假设 TickDB 允许每秒 10 个请求
# 注意:实际限制请参考 TickDB 官方文档中的具体 QPS 上限
global_bucket = TokenBucket(rate=10.0, capacity=10)
def fetch_kline(symbol: str, interval: str, limit: int = 100) -> dict:
"""
使用令牌桶控制的 TickDB K 线数据获取。
⚠️ 生产环境高频场景建议使用 aiohttp/asyncio 异步架构,
将令牌桶替换为 asyncio.Semaphore 实现。
"""
# 先获取预估等待时间,日志记录
wait_time = global_bucket.get_wait_time(1)
if wait_time > 0:
logger.info(f"[{symbol}] 令牌桶等待 {wait_time:.3f}s")
# 阻塞获取令牌(最多等 30 秒)
try:
global_bucket.acquire(1, blocking=True, timeout=30.0)
except TimeoutError:
raise RuntimeError(f"获取令牌超时,无法在 30s 内完成 {symbol} 的数据请求")
# 实际发起请求
headers = {
"X-API-Key": TICKDB_API_KEY,
"Content-Type": "application/json"
}
params = {
"symbol": symbol,
"interval": interval,
"limit": limit
}
response = requests.get(
"https://api.tickdb.ai/v1/market/kline",
headers=headers,
params=params,
timeout=(3.05, 10) # 连接超时 3.05s,读取超时 10s
)
# 处理 3001 限频错误(兜底保护)
if response.status_code == 429:
retry_after_raw = response.headers.get("Retry-After", "60")
try:
retry_after = int(retry_after_raw)
except ValueError:
retry_after = 60
logger.warning(f"[{symbol}] 令牌桶未能阻止 429。服务端要求等待 {retry_after}s。")
jitter = random.uniform(0, retry_after * 0.1)
time.sleep(retry_after + jitter)
# 重试一次
response = requests.get(
"https://api.tickdb.ai/v1/market/kline",
headers=headers,
params=params,
timeout=(3.05, 10)
)
if response.status_code != 200:
raise RuntimeError(f"[{symbol}] 请求失败: HTTP {response.status_code} - {response.text}")
return response.json()
# 示例:批量获取多只股票数据
if __name__ == "__main__":
symbols = ["AAPL.US", "TSLA.US", "NVDA.US", "META.US", "AMZN.US"]
results = {}
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {
executor.submit(fetch_kline, symbol, "1h", 100): symbol
for symbol in symbols
}
for future in as_completed(futures):
symbol = futures[future]
try:
results[symbol] = future.result()
logger.info(f"[{symbol}] 数据获取成功")
except Exception as e:
logger.error(f"[{symbol}] 数据获取失败: {e}")
四、生产级架构:三层限频保护体系
4.1 分层防御模型
单纯的令牌桶还不够。生产环境中,限频控制需要三个层次的协同:
| 层级 | 位置 | 机制 | 作用 |
|---|---|---|---|
| 第一层:本地令牌桶 | 客户端 | 令牌桶算法 | 在请求发出前控制频率,从源头避免触发服务端限频 |
| 第二层:响应拦截 | 客户端 | Retry-After 处理 | 服务端已限频时的被动响应,精确等待 |
| 第三层:全局协调 | 多客户端 | 分布式令牌桶(Redis) | 多实例协作时共享 QPS 配额 |
4.2 多实例场景下的分布式令牌桶
在量化团队中,多个策略实例可能同时运行。如果每个实例都各自为政,整体 QPS 仍可能超标。
import redis
import time
import logging
logger = logging.getLogger(__name__)
class DistributedTokenBucket:
"""
基于 Redis 的分布式令牌桶。
使用 Redis 的 INCR + EXPIRE 原子操作实现,支持多进程/多机器共享配额。
"""
def __init__(
self,
redis_client: redis.Redis,
key: str,
rate: float,
capacity: int,
window_seconds: int = 1
):
self.redis = redis_client
self.key = key
self.rate = rate
self.capacity = capacity
self.window_seconds = window_seconds
def acquire(self, tokens: int = 1) -> tuple[bool, float]:
"""
尝试获取令牌。
返回:
(success, retry_after): 是否成功,以及如果失败需要等待多少秒
"""
# Lua 脚本保证原子性:检查计数 + 增加计数
lua_script = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local tokens = tonumber(ARGV[3])
local current = redis.call('GET', key)
if current == false then
current = 0
else
current = tonumber(current)
end
if current + tokens > capacity then
local ttl = redis.call('TTL', key)
if ttl < 0 then
ttl = window
end
return {0, ttl}
end
redis.call('INCRBY', key, tokens)
redis.call('EXPIRE', key, window)
return {1, 0}
"""
result = self.redis.eval(
lua_script,
1, # KEYS 数量
self.key,
self.capacity,
self.window_seconds,
tokens
)
success = bool(result[0])
retry_after = float(result[1])
if not success:
logger.info(f"[DistributedBucket] Rate limited. Retry after {retry_after}s")
else:
logger.debug(f"[DistributedBucket] Acquired {tokens} token(s)")
return success, retry_after
def wait_and_acquire(self, timeout: float = 30.0) -> bool:
"""阻塞等待直到获取令牌或超时"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
success, retry_after = self.acquire(1)
if success:
return True
# 等待至少 retry_after 秒,但不超过剩余时间
wait_time = min(retry_after, deadline - time.monotonic())
if wait_time > 0:
time.sleep(wait_time)
raise TimeoutError(f"分布式令牌桶获取超时({timeout}s)")
五、常见陷阱与工程警示
5.1 时钟漂移问题
Retry-After: 0 不代表"立即重试"。在多实例场景下,机器之间的时钟可能存在毫秒级偏差。如果你的代码在收到 Retry-After: 0 后立即重试,可能比服务端的"解除限频"时间早了几毫秒,再次触发限频。
最佳实践:Retry-After 值小于 2 秒时,额外等待 random.uniform(0, 0.5) 作为缓冲。
5.2 并发惊群效应
当多个客户端实例在同一时刻收到限频响应时,如果它们都在 Retry-After 后同时重试,会在下一个瞬间再次造成流量突刺,触发新的限频。
最佳实践:在 Retry-After 的基础上叠加随机抖动(jitter),通常取 Retry-After * 0.1 作为抖动的最大值。Google 的 "Exponential Backoff" 论文建议使用完全随机抖动(jitter = random(0, base * 2^i))。
5.3 幂等性检查
收到 3001 响应后,你的代码需要判断:上一条请求是否真的被服务端处理了?
- 如果是写操作(下单、修改设置):需要确认请求是否已生效
- 如果是读操作(获取行情):可以安全重试
最佳实践:对读操作使用幂等重试,对写操作在重试前检查前一次请求的状态。
5.4 限频不等于错误
很多开发者在收到 429 后直接打日志为 ERROR 级别。这会导致告警噪音——限频是正常运营状态,不应该触发告警。
推荐日志级别:
- 首次触发 429:
WARNING - 连续触发 429(>3 次):
ERROR(可能是配置问题) - 触发 3001 并正确等待后成功:
INFO(正常降速)
六、完整请求管道代码
以下是整合了令牌桶、Retry-After 处理、抖动和工程健壮性的完整请求管道,可作为项目中的基础模块直接使用:
"""
TickDB API 请求管道
包含三层限频保护:本地令牌桶 + Retry-After + 分布式协调(可选)
"""
import os
import time
import random
import logging
from typing import Optional, Any
from dataclasses import dataclass
try:
import requests
except ImportError:
raise ImportError("requests 库未安装。请运行: pip install requests")
try:
import redis
REDIS_AVAILABLE = True
except ImportError:
REDIS_AVAILABLE = False
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)
@dataclass
class RateLimitConfig:
"""限频配置参数"""
qps: float = 10.0 # 每秒最大请求数
burst: int = 10 # 最大突发量(桶容量)
retry_after_min: int = 1 # Retry-After 最小值(秒)
retry_after_max: int = 120 # Retry-After 最大值(秒)
jitter_factor: float = 0.1 # 抖动系数(Retry-After 的百分比)
max_retries: int = 5 # 最大重试次数
class TokenBucket:
"""本地令牌桶(见本文第三部分)"""
def __init__(self, rate: float, capacity: float, initial_tokens: float = None):
import threading
self.rate = rate
self.capacity = capacity
self._tokens = initial_tokens if initial_tokens is not None else capacity
self._last_refill_time = time.monotonic()
self._lock = threading.Lock()
def _refill(self):
now = time.monotonic()
elapsed = now - self._last_refill_time
self._tokens = min(self.capacity, self._tokens + elapsed * self.rate)
self._last_refill_time = now
def acquire(self, blocking: bool = False, timeout: Optional[float] = None) -> bool:
deadline = None if timeout is None else time.monotonic() + timeout
with self._lock:
while True:
self._refill()
if self._tokens >= 1:
self._tokens -= 1
return True
if not blocking:
return False
wait = (1 - self._tokens) / self.rate
if deadline is not None:
remaining = deadline - time.monotonic()
if remaining <= 0:
return False
wait = min(wait, remaining, 0.1)
else:
wait = min(wait, 0.1)
time.sleep(wait)
class TickDBRequestPipeline:
"""
TickDB API 请求管道。
三层限频保护:本地令牌桶 → Retry-After 响应处理 → 指数退避兜底
"""
BASE_URL = "https://api.tickdb.ai/v1"
def __init__(
self,
api_key: str = None,
config: RateLimitConfig = None,
redis_client: "redis.Redis" = None
):
self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
if not self.api_key:
raise ValueError("必须提供 API Key:参数 api_key 或环境变量 TICKDB_API_KEY")
self.config = config or RateLimitConfig()
self.redis_client = redis_client
self.local_bucket = TokenBucket(
rate=self.config.qps,
capacity=self.config.burst
)
self._consecutive_429_count = 0
def _get(self, endpoint: str, params: dict = None) -> requests.Response:
"""执行单个 GET 请求,包含心跳和超时"""
url = f"{self.BASE_URL}{endpoint}"
headers = {
"X-API-Key": self.api_key,
"Accept": "application/json"
}
return requests.get(
url,
headers=headers,
params=params,
timeout=(3.05, 10) # 连接超时 3.05s,读取超时 10s
)
def _handle_rate_limit(self, response: requests.Response) -> float:
"""
处理 429 响应,返回需要等待的秒数。
⚠️ 此方法会更新内部状态,调用时需要注意线程安全。
"""
retry_after_raw = response.headers.get("Retry-After")
self._consecutive_429_count += 1
if retry_after_raw:
try:
base_wait = int(retry_after_raw)
except ValueError:
# HTTP-date 格式:计算距离现在的时间差
from email.utils import parsedate_to_datetime
future_time = parsedate_to_datetime(retry_after_raw)
base_wait = int((future_time - datetime.now(timezone.utc)).total_seconds())
base_wait = max(1, base_wait)
else:
# 无 Retry-After,使用指数退避作为兜底
base_wait = min(self.config.retry_after_max, 2 ** self._consecutive_429_count)
# 应用抖动(惊群效应防护)
jitter = random.uniform(0, base_wait * self.config.jitter_factor)
actual_wait = min(base_wait + jitter, self.config.retry_after_max)
# 应用最小等待限制(时钟漂移防护)
actual_wait = max(actual_wait, self.config.retry_after_min)
if self._consecutive_429_count <= 3:
logger.warning(
f"[RateLimit] 连续第 {self._consecutive_429_count} 次触发 429。 "
f"服务端建议等待 {base_wait}s,实际等待 {actual_wait:.2f}s"
)
else:
logger.error(
f"[RateLimit] 连续触发 {self._consecutive_429_count} 次 429。"
f"请检查是否超出了 API 的 QPS 上限。"
)
return actual_wait
def request(
self,
endpoint: str,
params: dict = None,
require_success: bool = True
) -> Optional[dict]:
"""
执行带限频控制的请求。
参数:
endpoint: API 端点(如 "/market/kline")
params: 查询参数
require_success: 如果为 True,重试耗尽后抛出异常;否则返回 None
返回:
成功时返回响应 JSON,失败时返回 None(require_success=False)或抛出异常
"""
for attempt in range(self.config.max_retries):
# 第一层:本地令牌桶限速
try:
self.local_bucket.acquire(blocking=True, timeout=30.0)
except TimeoutError:
raise RuntimeError(
"令牌桶获取超时(30s)。"
"可能是 QPS 配置过低或网络异常。"
)
# 第二层:发起请求
response = self._get(endpoint, params)
if response.status_code == 200:
self._consecutive_429_count = 0 # 重置计数器
return response.json()
if response.status_code == 429:
wait_time = self._handle_rate_limit(response)
time.sleep(wait_time)
continue
# 非 429 的其他 HTTP 错误(如 500、502、504)
logger.error(f"[HTTP Error] {response.status_code}: {response.text[:200]}")
if attempt < self.config.max_retries - 1:
time.sleep(min(2 ** attempt, 30))
continue
break
# 所有重试均失败
error_msg = f"请求 {endpoint} 失败,已重试 {self.config.max_retries} 次"
if require_success:
raise RuntimeError(error_msg)
logger.error(error_msg)
return None
# 使用示例
if __name__ == "__main__":
pipeline = TickDBRequestPipeline()
data = pipeline.request(
"/market/kline",
params={"symbol": "BTC.USDT", "interval": "1h", "limit": 100}
)
print(f"获取到 {len(data.get('data', []))} 条 K 线数据")
结语
API 限频不是故障,是 API 服务正常运营的一部分。把它当成故障来处理,你的代码会越修越乱;把它当成信号来处理,你的系统会越跑越稳。
核心原则只有三条:
- 永远读
Retry-After,不要猜 - 永远加随机抖动,不要让多个实例同步重试
- 永远在客户端加令牌桶,把限频拦截在到达服务端之前
下一步行动
如果你想直接集成到项目中,访问 tickdb.ai 在控制台生成 API Key,复制上文的 TickDBRequestPipeline 代码即可运行。代码中已包含完整的限频处理逻辑,无需额外依赖。
如果你在使用高频策略,建议将 requests 替换为 aiohttp + asyncio,将令牌桶替换为 asyncio.Semaphore,这样可以在单线程内实现真正的并发控制,避免同步阻塞影响策略响应速度。
如果你在多台机器上运行策略,在 TickDBRequestPipeline 初始化时传入 Redis 客户端,即可获得分布式令牌桶能力,多实例共享 QPS 配额。
风险提示:本文提供的令牌桶参数(QPS=10)为通用参考值。实际 API 的限频上限请以 TickDB 官方文档为准。限频配置过高可能导致 API Key 被封禁。
本文不构成任何投资建议。市场有风险,投资需谨慎。