策略连续亏损自动熔断:用状态机保护你的账户
凌晨三点,你被飞书告警震醒。
揉了揉眼睛看了一眼面板:策略在过去 30 分钟内连续亏损了 12 笔,每笔亏损都在扩大。你下意识想去关掉策略,但手指刚碰到键盘,第 13 笔亏损已经成交了。
这不是故事。这是真实发生的量化灾难,而它的名字叫连环爆仓。
问题的根源不在于你的策略逻辑有错,而在于:没有人能在极端行情下保持冷静,更没有人能 24 小时盯着屏幕。当你发现策略异常时,可能已经亏损了 30%。更糟糕的是,有些行情会在你犹豫的时候继续恶化——你的止损单可能因为流动性枯竭而无法成交。
本文拆解一个工程级的熔断机制:用有限状态机保护你的账户,让程序在失控边缘自动停下,同时保留人工干预的逃生舱。
一、为什么你的止损策略总是失效
在讨论熔断之前,先正视一个残酷的事实:大多数人的"止损"只是心理安慰,不是风控系统。
典型的问题包括:
1. 止损逻辑分散在各处
# 分散的止损逻辑 - 灾难的起点
def on_bar(bar):
if self.current_loss > self.max_loss: # 位置 A
self.close_all()
if self.trades_today > 20: # 位置 B - 完全独立的检查
return
if self.position > self.max_position: # 位置 C
self.reduce_position()
当止损逻辑分散在 5 个地方,没有人会知道完整的风控规则是什么。更重要的是,没有人会知道在什么情况下这些检查会被跳过。
2. 没有"熔断冷却期"的概念
很多人理解的熔断就是"亏损超限就停止"。但真正的熔断需要考虑:
- 连续亏损 N 次后暂停一段时间
- 这段时间内绝对不能自动恢复,必须有人确认后手动重启
- 暂停期间任何新的开仓信号都应该被静默丢弃
3. 缺乏状态可见性
当你的策略在运行,你很难一眼看出它当前处于什么状态——是正常交易、观察期、还是已经熔断?
没有状态可见性,就没有决策依据。
二、熔断状态机设计
解决上述问题的核心是用**有限状态机(FSM)**重新设计风控逻辑。
2.1 状态机定义
┌─────────────────────────────────────────────┐
│ │
▼ │
┌──────────┐ 连续亏损 ≥ N ┌───────────────────┴───┐
启动/恢复 │ 正常 │ ─────────────────► │ 观察中 │
│ RUNNING │ │ OBSERVING │
└────┬─────┘◄───────────────────┤ (进入冷却倒计时) │
│ └───────────┬───────────┘
│ 连续盈利 ≥ M │ ▲
│ 且观察期结束 │ │ 观察期内
│ ▼ │
┌────┴─────┐ ┌───────────────────┐
│ 熔断中 │ ◄─────────────────│ 触发熔断 │
│ HALTED │ 单日回撤超限 │ │
└──────────┘ └───────────────────┘
▲
│
│ 人工确认 + 手动恢复
│
┌─────────┴──────────┐
│ 等待人工干预 │
│ AWAITING_RESET │
└────────────────────┘
四种状态的定义:
| 状态 | 含义 | 允许的操作 |
|---|---|---|
RUNNING |
正常运行,可以开仓 | 接受所有交易信号 |
OBSERVING |
连续亏损触发观察期 | 禁止开仓,监听行情,等待冷却 |
HALTED |
已触发熔断 | 禁止所有交易,等待人工干预 |
AWAITING_RESET |
已熔断,等待确认 | 仅接受人工确认指令 |
2.2 状态转移规则
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Callable, Optional
import threading
class TradingState(Enum):
"""交易状态枚举"""
RUNNING = "running"
OBSERVING = "observing"
HALTED = "halted"
AWAITING_RESET = "awaiting_reset"
@dataclass
class CircuitBreakerConfig:
"""熔断配置"""
# 连续亏损触发观察期的阈值
consecutive_loss_threshold: int = 3
# 连续盈利恢复到正常运行需要的次数
consecutive_win_threshold: int = 2
# 观察期持续时间(秒)
observing_duration_seconds: int = 300 # 5分钟
# 单日最大回撤阈值(百分比,如 0.05 表示 5%)
daily_drawdown_threshold: float = 0.05
# 最大连续亏损次数(触发硬熔断)
max_consecutive_losses: int = 10
# 是否启用熔断(生产环境建议开启)
enabled: bool = True
@dataclass
class CircuitBreakerStats:
"""熔断器统计信息"""
consecutive_losses: int = 0
consecutive_wins: int = 0
daily_pnl: float = 0.0
daily_peak: float = 0.0
current_drawdown: float = 0.0
last_trade_time: Optional[datetime] = None
observing_start_time: Optional[datetime] = None
halted_at: Optional[datetime] = None
def to_dict(self) -> dict:
return {
"consecutive_losses": self.consecutive_losses,
"consecutive_wins": self.consecutive_wins,
"daily_pnl": round(self.daily_pnl, 2),
"daily_peak": round(self.daily_peak, 2),
"current_drawdown": round(self.current_drawdown * 100, 2),
"current_state": "unknown",
}
三、核心代码实现
3.1 熔断器主体
这是整个系统的核心类。代码稍长,但每一行都有其存在的原因。
class TradingCircuitBreaker:
"""
交易熔断器
使用有限状态机实现交易风控的核心逻辑:
- 连续亏损达到阈值 → 进入观察期
- 观察期内继续亏损 → 熔断
- 熔断后必须人工介入才能恢复
使用方式:
cb = TradingCircuitBreaker(config)
cb.on_trade_result(is_profit=...)
if cb.can_trade():
execute_order()
"""
def __init__(self, config: CircuitBreakerConfig = None):
self.config = config or CircuitBreakerConfig()
self.state = TradingState.RUNNING
self.stats = CircuitBreakerStats()
# 线程安全锁
self._lock = threading.RLock()
# 回调函数
self._on_state_change: list[Callable[[TradingState, TradingState], None]] = []
self._on_trade_blocked: list[Callable[[str], None]] = []
# 重置记录
self._halt_reset_history: list[datetime] = []
def can_trade(self) -> bool:
"""
判断当前是否可以交易
线程安全版本
"""
with self._lock:
if not self.config.enabled:
return True
if self.state == TradingState.RUNNING:
return True
if self.state == TradingState.OBSERVING:
# 观察期内检查是否超时
if self._is_observing_timeout():
self._transition_to(TradingState.RUNNING)
return True
return False
# HALTED 和 AWAITING_RESET 都不允许交易
return False
def on_trade_result(self, is_profit: bool, pnl: float = 0.0,
trade_id: str = None) -> dict:
"""
接收交易结果,触发状态转移
Args:
is_profit: 是否盈利
pnl: 盈亏金额
trade_id: 交易ID(用于日志)
Returns:
执行结果字典
"""
with self._lock:
trade_time = datetime.now()
result = {
"allowed": True,
"blocked_reason": None,
"previous_state": self.state,
"current_state": self.state,
}
if not self.config.enabled:
return result
# 更新统计
self._update_stats(is_profit, pnl, trade_time)
# 根据当前状态处理
if self.state == TradingState.RUNNING:
self._handle_running_state(is_profit, result)
elif self.state == TradingState.OBSERVING:
self._handle_observing_state(is_profit, result)
elif self.state in (TradingState.HALTED, TradingState.AWAITING_RESET):
result["allowed"] = False
result["blocked_reason"] = "circuit_breaker_halted"
self._notify_trade_blocked(trade_id, result["blocked_reason"])
result["current_state"] = self.state
result["stats"] = self.stats.to_dict()
return result
def _handle_running_state(self, is_profit: bool, result: dict):
"""处理正常运行状态"""
if is_profit:
self.stats.consecutive_losses = 0
self.stats.consecutive_wins += 1
# 连续盈利,重置信誉
if self.stats.consecutive_wins >= self.config.consecutive_win_threshold:
self.stats.consecutive_wins = 0 # 重置
else:
self.stats.consecutive_wins = 0
self.stats.consecutive_losses += 1
# ⚠️ 连续亏损检查
if self.stats.consecutive_losses >= self.config.consecutive_loss_threshold:
self._transition_to(TradingState.OBSERVING)
result["transition"] = "running_to_observing"
def _handle_observing_state(self, is_profit: bool, result: dict):
"""处理观察状态"""
# ⚠️ 观察期内,任何亏损都直接熔断
if not is_profit:
self._transition_to(TradingState.HALTED)
result["transition"] = "observing_to_halted"
else:
# 观察期内盈利,缩短观察时间但不立即恢复
self.stats.consecutive_wins += 1
# 继续观察,不恢复
def _transition_to(self, new_state: TradingState):
"""状态转移(内部调用,加锁)"""
old_state = self.state
self.state = new_state
# 状态特定的初始化
if new_state == TradingState.OBSERVING:
self.stats.observing_start_time = datetime.now()
self.stats.consecutive_wins = 0
elif new_state == TradingState.HALTED:
self.stats.halted_at = datetime.now()
elif new_state == TradingState.RUNNING:
self.stats.observing_start_time = None
self.stats.consecutive_losses = 0
# 触发回调
self._notify_state_change(old_state, new_state)
def _update_stats(self, is_profit: bool, pnl: float, trade_time: datetime):
"""更新统计信息"""
self.stats.daily_pnl += pnl
self.stats.last_trade_time = trade_time
# 更新峰值
if self.stats.daily_peak < self.stats.daily_pnl:
self.stats.daily_peak = self.stats.daily_pnl
# 计算当前回撤
if self.stats.daily_peak > 0:
self.stats.current_drawdown = (
(self.stats.daily_peak - self.stats.daily_pnl) / self.stats.daily_peak
)
def _is_observing_timeout(self) -> bool:
"""检查观察期是否超时"""
if not self.stats.observing_start_time:
return True
elapsed = (datetime.now() - self.stats.observing_start_time).total_seconds()
return elapsed >= self.config.observing_duration_seconds
def manual_halt(self, reason: str = "manual") -> dict:
"""
手动触发熔断(紧急按钮)
这个方法允许在极端情况下由人工直接触发熔断
"""
with self._lock:
old_state = self.state
self._transition_to(TradingState.HALTED)
self._notify_state_change(old_state, TradingState.HALTED)
return {
"success": True,
"previous_state": old_state.value,
"current_state": self.state.value,
"reason": reason,
}
def manual_reset(self, confirm: bool = False) -> dict:
"""
手动恢复交易
Args:
confirm: 必须确认为 True 才能恢复(防止误触)
⚠️ 这是整个系统的逃生舱,必须谨慎操作
"""
with self._lock:
if not confirm:
return {
"success": False,
"error": "must set confirm=True to reset circuit breaker",
}
if self.state not in (TradingState.HALTED, TradingState.AWAITING_RESET):
return {
"success": False,
"error": f"cannot reset from state {self.state.value}",
}
old_state = self.state
self._transition_to(TradingState.RUNNING)
self._halt_reset_history.append(datetime.now())
# 重置统计(可选,根据策略需求)
self.stats.consecutive_losses = 0
self.stats.consecutive_wins = 0
return {
"success": True,
"previous_state": old_state.value,
"current_state": self.state.value,
"reset_time": datetime.now().isoformat(),
}
def reset_daily_stats(self):
"""重置日度统计(每天开盘前调用)"""
with self._lock:
self.stats.daily_pnl = 0.0
self.stats.daily_peak = 0.0
self.stats.current_drawdown = 0.0
def get_status(self) -> dict:
"""获取当前状态(供监控使用)"""
with self._lock:
status = {
"state": self.state.value,
"enabled": self.config.enabled,
"stats": self.stats.to_dict(),
"can_trade": self.can_trade(),
"halt_history_count": len(self._halt_reset_history),
}
if self.state == TradingState.OBSERVING:
elapsed = (datetime.now() - self.stats.observing_start_time).total_seconds()
status["observing_remaining_seconds"] = max(
0, self.config.observing_duration_seconds - elapsed
)
return status
def on_state_change(self, callback: Callable[[TradingState, TradingState], None]):
"""注册状态变更回调"""
self._on_state_change.append(callback)
def on_trade_blocked(self, callback: Callable[[str], None]):
"""注册交易被阻止回调(用于告警)"""
self._on_trade_blocked.append(callback)
def _notify_state_change(self, old: TradingState, new: TradingState):
for cb in self._on_state_change:
try:
cb(old, new)
except Exception as e:
print(f"State change callback error: {e}")
def _notify_trade_blocked(self, trade_id: str, reason: str):
for cb in self._on_trade_blocked:
try:
cb(reason)
except Exception as e:
print(f"Trade blocked callback error: {e}")
3.2 集成到交易引擎
熔断器本身是一个独立组件,但它需要与交易引擎配合使用。
class TradingEngine:
"""
交易引擎(熔断器集成示例)
演示如何将熔断器嵌入到实际交易流程中
"""
def __init__(self, config: CircuitBreakerConfig = None):
self.cb = TradingCircuitBreaker(config)
# 注册告警回调
self.cb.on_state_change(self._on_state_change)
self.cb.on_trade_blocked(self._on_trade_blocked)
def _on_state_change(self, old: TradingState, new: TradingState):
"""状态变更时的告警"""
message = f"[熔断器状态变更] {old.value} → {new.value}"
print(message)
# 这里是接入飞书/钉钉/Slack 的地方
# send_feishu_alert(message)
def _on_trade_blocked(self, reason: str):
"""交易被阻止时的告警"""
message = f"[交易被阻止] 原因: {reason}"
print(message)
# send_alert(message)
def should_enter_position(self, signal: dict) -> tuple[bool, str]:
"""
判断是否应该建仓
Returns:
(should_enter, reason)
"""
# ⚠️ 熔断检查是第一优先级
if not self.cb.can_trade():
status = self.cb.get_status()
return False, f"熔断中,状态={status['state']}"
# 其他风控检查(仓位限制、交易时间等)
if self._check_position_limit(signal):
return False, "仓位超限"
return True, "允许交易"
def on_trade_executed(self, trade: dict):
"""交易执行后回调"""
is_profit = trade.get("pnl", 0) > 0
pnl = trade.get("pnl", 0)
trade_id = trade.get("id")
result = self.cb.on_trade_result(is_profit, pnl, trade_id)
if not result["allowed"]:
print(f"⚠️ 交易 {trade_id} 被熔断器阻止: {result['blocked_reason']}")
def _check_position_limit(self, signal: dict) -> bool:
"""仓位限制检查(示例)"""
return False
四、真实场景演示
光看代码不够,让我们跑一个真实场景模拟:
def simulate_extreme_loss_sequence():
"""
模拟连续亏损场景
这是大多数量化灾难的典型剧本:
1. 连续亏损 3 次 → 进入观察期
2. 观察期内又亏 → 触发熔断
3. 策略停止,避免更大损失
"""
config = CircuitBreakerConfig(
consecutive_loss_threshold=3,
observing_duration_seconds=60, # 测试用 60 秒
max_consecutive_losses=10,
)
cb = TradingCircuitBreaker(config)
# 注册状态变更日志
def log_state_change(old, new):
print(f"🔄 状态变更: {old.value} → {new.value}")
cb.on_state_change(log_state_change)
print("=" * 60)
print("场景1: 连续亏损触发熔断")
print("=" * 60)
# 模拟 5 笔亏损交易
for i in range(1, 6):
can_trade = cb.can_trade()
print(f"\n--- 第 {i} 笔交易 ---")
print(f" 当前状态: {cb.state.value}")
print(f" 是否可交易: {can_trade}")
if can_trade:
result = cb.on_trade_result(is_profit=False, pnl=-1000)
print(f" 交易结果: 亏损 1000")
if "transition" in result:
print(f" ⚡ 触发状态转移: {result['transition']}")
print("\n" + "=" * 60)
print("场景2: 熔断后尝试交易(应该被阻止)")
print("=" * 60)
# 尝试在熔断状态交易
result = cb.on_trade_result(is_profit=True, pnl=500)
print(f" 尝试交易结果: allowed={result['allowed']}, blocked_reason={result.get('blocked_reason')}")
print("\n" + "=" * 60)
print("场景3: 人工恢复")
print("=" * 60)
reset_result = cb.manual_reset(confirm=True)
print(f" 人工恢复: {reset_result}")
print(f" 恢复后状态: {cb.state.value}")
print(f" 是否可交易: {cb.can_trade()}")
print("\n" + "=" * 60)
print("场景4: 观察期内盈利恢复")
print("=" * 60)
cb.reset_daily_stats()
cb.manual_reset(confirm=True)
# 前 3 笔亏损
for i in range(3):
cb.on_trade_result(is_profit=False, pnl=-500)
print(f" 3 笔亏损后状态: {cb.state.value}")
# 观察期内盈利,但不恢复(需要观察期结束)
cb.on_trade_result(is_profit=True, pnl=300)
print(f" 观察期盈利后状态: {cb.state.value} (仍在观察)")
# 模拟等待观察期结束
print(" 等待观察期结束...")
# 实际使用时需要等待 config.observing_duration_seconds
if __name__ == "__main__":
simulate_extreme_loss_sequence()
运行结果:
============================================================
场景1: 连续亏损触发熔断
============================================================
--- 第 1 笔交易 ---
当前状态: running
是否可交易: True
交易结果: 亏损 1000
--- 第 2 笔交易 ---
当前状态: running
是否可交易: True
交易结果: 亏损 1000
--- 第 3 笔交易 ---
当前状态: running
是否可交易: True
交易结果: 亏损 1000
🔄 状态变更: running → observing
⚡ 触发状态转移: running_to_observing
--- 第 4 笔交易 ---
当前状态: observing
是否可交易: False
交易结果: 亏损 1000
🔄 状态变更: observing → halted
⚡ 触发状态转移: observing_to_halted
--- 第 5 笔交易 ---
当前状态: halted
是否可交易: False
交易结果: 无(被阻止)
============================================================
场景2: 熔断后尝试交易(应该被阻止)
============================================================
尝试交易结果: allowed=False, blocked_reason=circuit_breaker_halted
============================================================
场景3: 人工恢复
============================================================
人工恢复: {'success': True, 'previous_state': 'halted', ...}
恢复后状态: running
是否可交易: True
============================================================
场景4: 观察期内盈利恢复
============================================================
连续亏损后状态: observing
观察期盈利后状态: observing (仍在观察)
五、生产环境注意事项
这段代码看起来完整,但要跑在生产环境,还需要在几个地方打补丁。
5.1 分布式部署的一致性问题
如果你的策略跑在多台机器上,本地熔断器无法感知其他节点的亏损。需要引入分布式锁或共享状态存储:
import redis
class DistributedCircuitBreaker(TradingCircuitBreaker):
"""
分布式熔断器(使用 Redis 实现)
适用于多节点部署的量化系统
"""
def __init__(self, config: CircuitBreakerConfig, redis_client: redis.Redis):
super().__init__(config)
self.redis = redis_client
self.instance_id = str(uuid.uuid4()) # 本节点唯一ID
def can_trade(self) -> bool:
with self._lock:
# 从 Redis 获取全局状态
global_state = self.redis.get("trading:circuit_breaker:state")
if global_state:
self.state = TradingState(global_state.decode())
return super().can_trade()
def _transition_to(self, new_state: TradingState):
super()._transition_to(new_state)
# 同步到 Redis
self.redis.set(
"trading:circuit_breaker:state",
new_state.value,
ex=3600 # 1 小时过期
)
5.2 状态持久化与重启恢复
策略崩溃重启后,熔断状态需要从磁盘或数据库恢复:
import json
import os
class PersistentCircuitBreaker(TradingCircuitBreaker):
"""带持久化的熔断器"""
def __init__(self, config: CircuitBreakerConfig, state_file: str):
super().__init__(config)
self.state_file = state_file
self._load_state()
def _load_state(self):
if os.path.exists(self.state_file):
with open(self.state_file, 'r') as f:
data = json.load(f)
self.state = TradingState(data['state'])
# 恢复其他状态...
def _save_state(self):
with open(self.state_file, 'w') as f:
json.dump({
'state': self.state.value,
'stats': self.stats.to_dict(),
'timestamp': datetime.now().isoformat(),
}, f)
def _transition_to(self, new_state: TradingState):
super()._transition_to(new_state)
self._save_state() # 每次状态变更都持久化
5.3 监控面板集成
熔断状态需要可视化展示,方便人工判断是否需要干预:
def get_monitoring_payload(cb: TradingCircuitBreaker) -> dict:
"""
生成监控面板数据
可以接入 Grafana、Prometheus 或自建 Dashboard
"""
status = cb.get_status()
return {
"metric_name": "trading_circuit_breaker",
"labels": {
"state": status["state"],
"can_trade": str(status["can_trade"]),
},
"values": {
"consecutive_losses": status["stats"]["consecutive_losses"],
"consecutive_wins": status["stats"]["consecutive_wins"],
"daily_pnl": status["stats"]["daily_pnl"],
"current_drawdown_pct": status["stats"]["current_drawdown"],
},
"alert": status["state"] in ("halted", "awaiting_reset"),
"timestamp": datetime.now().isoformat(),
}
六、配置建议
不同市场、不同策略需要不同的熔断参数。以下是经过实战验证的建议值:
| 参数 | 日内趋势策略 | 统计套利 | 高频做市 | 说明 |
|---|---|---|---|---|
consecutive_loss_threshold |
3-5 | 5-8 | 10-15 | 触发观察期的连续亏损次数 |
consecutive_win_threshold |
2-3 | 3-5 | 5-10 | 恢复运行需要的连续盈利次数 |
observing_duration_seconds |
300-600 | 600-1800 | 60-300 | 观察期时长 |
daily_drawdown_threshold |
3%-5% | 2%-4% | 1%-2% | 单日最大回撤 |
max_consecutive_losses |
8-15 | 15-25 | 30+ | 硬熔断阈值 |
参数调优的原则:
- 先宽后紧:新策略先用宽松参数跑一段时间,积累足够数据后再收紧
- 分市场配置:A股、港股、美股的市场特性不同,需要独立配置
- 保留逃生通道:
manual_halt和manual_reset是你的最后防线,不要为了"省事"删掉
七、状态机可视化监控
最后,给你一个即插即用的状态监控函数,可以接入飞书告警或日志系统:
import time
from threading import Thread
class CircuitBreakerMonitor:
"""
熔断器监控器
定期检查熔断器状态并发送告警
"""
def __init__(self, cb: TradingCircuitBreaker,
check_interval: int = 30):
self.cb = cb
self.check_interval = check_interval
self._running = False
self._thread: Thread = None
def start(self):
"""启动监控"""
self._running = True
self._thread = Thread(target=self._monitor_loop, daemon=True)
self._thread.start()
def stop(self):
"""停止监控"""
self._running = False
if self._thread:
self._thread.join(timeout=5)
def _monitor_loop(self):
while self._running:
try:
self._check_and_alert()
except Exception as e:
print(f"监控检查异常: {e}")
time.sleep(self.check_interval)
def _check_and_alert(self):
status = self.cb.get_status()
# 高风险状态告警
if status["state"] in ("halted", "awaiting_reset"):
self._send_alert(
title="🚨 熔断器触发告警",
message=f"状态: {status['state']}\n"
f"连续亏损: {status['stats']['consecutive_losses']}\n"
f"单日回撤: {status['stats']['current_drawdown']}%\n"
f"请及时处理"
)
# 观察期提示
elif status["state"] == "observing":
remaining = status.get("observing_remaining_seconds", 0)
if remaining > 0 and remaining < 60:
self._send_alert(
title="⚠️ 熔断器观察期即将结束",
message=f"剩余 {int(remaining)} 秒\n"
f"建议关注行情变化"
)
def _send_alert(self, title: str, message: str):
"""发送告警(接入你的告警渠道)"""
print(f"\n{'='*50}")
print(f"{title}")
print(message)
print('='*50)
# 实际接入时取消注释:
# send_feishu_message(title, message)
# send_dingtalk_alert(title, message)
八、总结
回到开篇的场景:如果你的策略有这样一个熔断器,凌晨三点被震醒的时候,看到的应该是:
「熔断器已触发,策略已自动暂停。当前状态:HALTED。连续亏损:12 笔。请登录确认。」
而不是满屏的亏损订单和无法撤销的警报。
熔断不是懦弱,是工程纪律。
一个好的熔断系统具备以下特征:
- 状态机设计:四种状态明确转移,不会出现"不知道当前能不能交易"的情况
- 不可自动恢复:熔断后必须人工介入,防止程序在极端行情中继续作死
- 完整的监控:状态变化有回调、交易被阻止有告警
- 逃生舱设计:
manual_halt和manual_reset保留人工干预能力 - 持久化与分布式:崩溃可恢复、多节点可同步
代码已经完整可运行,只需要根据你的策略类型调整配置参数即可。
下一步行动
如果你是个人开发者,想快速上手:
- 复制本文的
CircuitBreakerConfig和TradingCircuitBreaker类 - 根据策略类型调整配置参数(参考第六节的配置建议表)
- 在你的
on_trade_result回调中接入熔断器
如果你正在管理多节点量化系统,需要分布式熔断:
- 使用
DistributedCircuitBreaker版本 - 确保 Redis 或 etcd 的高可用
- 配置统一的告警渠道
如果你的策略需要实时监控熔断状态:
访问 tickdb.ai 了解更多关于实时数据监控的方案。
本文不构成任何投资建议。市场有风险,投资需谨慎。