策略连续亏损自动熔断:用状态机保护你的账户
凌晨三点,你设置的“趋势跟踪策略”正在自动运行。
然后,连续亏损。
3 次。5 次。8 次。
你的账户在黑暗中被一点点侵蚀,而你正在睡觉。
这不是恐怖故事。这是每一个没有熔断机制的量化工程师迟早会踩的坑。
一、为什么你需要一个熔断机制
先说个真实发生的事。
某私募的量化团队在 2022 年有个策略,专门做期权波动率回归。回测数据漂亮得不行——夏普 2.3,最大回撤 8%。上线后前三个月确实稳,团队已经开始筹备新产品了。
然后有一天,某个黑天鹅事件触发了连续 12 次亏损。策略本身没问题,问题在于它没有熔断——它会一直跑下去,把账户从盈利 15% 做到亏损 22%。
后来复盘,他们发现:如果当时有熔断机制,损失会被控制在 12% 以内。
这不是策略的问题,是工程的问题。
熔断的核心价值:让程序在失去理智之前先停下来。
二、常见的三种错误做法
在聊状态机之前,先看看大家通常怎么实现熔断,以及为什么这些做法会埋雷。
错误做法一:简单计数器
# ❌ 反面教材:这种代码你一定见过
loss_count = 0
def on_trade(trade_result):
global loss_count
if trade_result['pnl'] < 0:
loss_count += 1
if loss_count >= 5:
logger.warning("止损!")
# 然后呢?手动改代码重启?
else:
loss_count = 0 # 只清零不停止
问题在哪?
- 没有真正停止策略,只是打印了一行日志
- 程序重启后状态丢失,loss_count 从 0 开始
- 无法区分“连续亏损”和“偶发亏损”——一个 10% 的盈利能掩盖 5 个小亏损
错误做法二:try-except 包裹一切
# ❌ 反面教材:熔断变成异常处理
while running:
try:
execute_strategy()
except Exception as e:
logger.error(f"出错了: {e}")
# 睡一会儿继续跑
time.sleep(60)
问题在哪?
- 网络错误、API 超时和策略逻辑错误混为一谈
- 异常往往意味着已经造成了损失,而不是预防损失
- 没有状态记录,不知道已经连续失败多少次
错误做法三:用全局变量控制状态
# ❌ 反面教材:状态散落在各处
stop_loss_flag = False
def check_circuit_breaker():
global stop_loss_flag
if consecutive_loss >= 5:
stop_loss_flag = True
def run_strategy():
global stop_loss_flag
if stop_loss_flag:
return
# 策略逻辑...
问题在哪?
- 多个线程/进程访问全局变量,状态不一致
- 没有恢复逻辑——一旦触发,程序永远不会自动重启
- 难以测试和扩展
这三种做法的共同问题:把熔断当成一个检查函数,而不是一个系统。
三、用状态机设计熔断系统
熔断不是一个 if 语句,它是一个状态机。
所谓状态机,就是把“程序当前在干什么”分成几个明确的离散状态,然后定义什么事件触发状态切换,以及每个状态下会发生什么。
对于量化策略的熔断系统,我建议用 5 个状态:
┌─────────────────────────────────────────────────────────────┐
│ │
│ ┌──────┐ 触发条件 ┌────────┐ 冷却期结束 ┌────┐ │
│ │ 运 行 │ ──────────────▶│ 熔 断 │ ──────────────▶│待审│ │
│ └──────┘ └────────┘ └────┘ │
│ ▲ │ │ │
│ │ │ 人工恢复 │ │
│ │ ▼ ▼ │
│ │ ┌────────┐ ┌──────┐ │
│ └──────────────────────│ 人工介入│◀───────────│ 自动 │ │
│ └────────┘ │ 恢复 │ │
│ └──────┘ │
└─────────────────────────────────────────────────────────────┘
五态说明:
| 状态 | 含义 | 策略行为 |
|---|---|---|
RUNNING |
正常运行 | 执行交易逻辑 |
TRIPPED |
熔断触发 | 暂停所有交易,保留头寸 |
MANUAL_HOLD |
人工干预中 | 完全冻结,等待人工确认 |
AUTO_RECOVERY |
自动恢复等待 | 冷却期结束,可选自动重连 |
RECOVERY_CONFIRMED |
恢复已确认 | 重启交易逻辑 |
四、生产级熔断实现
下面给出完整的熔断状态机实现。代码包含所有工程要素:持久化存储、状态一致性、恢复逻辑、告警通知。
import enum
import time
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional, Callable
from threading import Lock
import os
# ============ 1. 状态枚举 ============
class CircuitState(enum.Enum):
RUNNING = "running"
TRIPPED = "tripped"
MANUAL_HOLD = "manual_hold"
AUTO_RECOVERY = "auto_recovery"
RECOVERY_CONFIRMED = "recovery_confirmed"
# ============ 2. 触发事件枚举 ============
class TriggerEvent(enum.Enum):
CONSECUTIVE_LOSS_THRESHOLD = "consecutive_loss_threshold"
DAILY_DRAWDOWN_THRESHOLD = "daily_drawdown_threshold"
SINGLE_TRADE_LOSS_THRESHOLD = "single_trade_loss_threshold"
MANUAL_TRIGGER = "manual_trigger"
COOLING_PERIOD_COMPLETE = "cooling_period_complete"
MANUAL_RECOVERY = "manual_recovery"
AUTO_RECOVERY_TIMEOUT = "auto_recovery_timeout"
# ============ 3. 熔断配置 ============
@dataclass
class CircuitBreakerConfig:
"""熔断阈值配置"""
consecutive_loss_threshold: int = 5 # 连续亏损次数阈值
daily_drawdown_threshold: float = 0.05 # 单日回撤阈值(5%)
single_trade_loss_threshold: float = 0.02 # 单笔亏损阈值(2%)
cooling_period_seconds: int = 3600 # 冷却期(1小时)
auto_recovery_enabled: bool = True # 是否允许自动恢复
auto_recovery_max_attempts: int = 3 # 自动恢复最大尝试次数
state_persistence_path: str = "./circuit_state.json"
# ============ 4. 熔断状态机核心类 ============
class CircuitBreaker:
"""
量化策略熔断状态机
功能:
- 监控连续亏损次数、单日回撤、单笔亏损
- 状态持久化,程序重启后可恢复
- 支持手动干预和自动恢复
- 触发熔断时发送告警
"""
def __init__(
self,
config: CircuitBreakerConfig,
on_tripped: Optional[Callable] = None,
on_recovery: Optional[Callable] = None
):
self.config = config
self.on_tripped_callback = on_tripped
self.on_recovery_callback = on_recovery
self._lock = Lock()
self._state = CircuitState.RUNNING
self._consecutive_loss_count = 0
self._daily_pnl = 0.0
self._daily_start_time = datetime.now().date()
self._cooling_start_time: Optional[datetime] = None
self._auto_recovery_attempts = 0
self._trigger_history: list = []
self._manual_recovery_token: Optional[str] = None
# 加载持久化状态
self._load_state()
# 检查是否需要重置每日统计
self._check_daily_reset()
self.logger = logging.getLogger(__name__)
# ============ 状态管理核心方法 ============
def record_trade_result(self, pnl: float, trade_id: str) -> bool:
"""
记录交易结果,返回是否允许继续交易
Args:
pnl: 盈亏金额(正数盈利,负数亏损)
trade_id: 交易ID,用于追踪
Returns:
True: 可以继续交易
False: 熔断已触发,禁止交易
"""
with self._lock:
self._check_daily_reset()
# 检查当前状态
if self._state in (CircuitState.MANUAL_HOLD, CircuitState.TRIPPED):
self.logger.warning(
f"交易 {trade_id} 被拒绝:状态机处于 {self._state.value},"
f"需人工干预"
)
return False
# 更新统计数据
self._daily_pnl += pnl
entry = {
"trade_id": trade_id,
"pnl": pnl,
"timestamp": datetime.now().isoformat()
}
if pnl < 0:
self._consecutive_loss_count += 1
entry["loss_count"] = self._consecutive_loss_count
self.logger.info(
f"亏损记录:第 {self._consecutive_loss_count} 次连续亏损,"
f"交易ID={trade_id}"
)
else:
self._consecutive_loss_count = 0
entry["loss_count"] = 0
self._trigger_history.append(entry)
# 触发检查
should_trip = self._evaluate_trip_conditions(pnl)
if should_trip:
self._trip(should_trip["reason"])
return False
return True
def _evaluate_trip_conditions(self, pnl: float) -> Optional[dict]:
"""评估是否触发熔断"""
reasons = []
# 检查1:连续亏损次数
if self._consecutive_loss_count >= self.config.consecutive_loss_threshold:
reasons.append({
"event": TriggerEvent.CONSECUTIVE_LOSS_THRESHOLD,
"detail": f"连续亏损 {self._consecutive_loss_count} 次超过阈值 "
f"{self.config.consecutive_loss_threshold}"
})
# 检查2:单日回撤
if self._daily_pnl <= -abs(self.config.daily_drawdown_threshold):
reasons.append({
"event": TriggerEvent.DAILY_DRAWDOWN_THRESHOLD,
"detail": f"单日回撤 {abs(self._daily_pnl):.2%} 超过阈值 "
f"{self.config.daily_drawdown_threshold:.2%}"
})
# 检查3:单笔亏损
if pnl < 0 and abs(pnl) >= self.config.single_trade_loss_threshold:
reasons.append({
"event": TriggerEvent.SINGLE_TRADE_LOSS_THRESHOLD,
"detail": f"单笔亏损 {abs(pnl):.2%} 超过阈值 "
f"{self.config.single_trade_loss_threshold:.2%}"
})
return reasons[0] if reasons else None
def _trip(self, reason: dict):
"""触发熔断"""
self.logger.critical(
f"🚨 熔断触发!原因:{reason['detail']}"
)
self._state = CircuitState.TRIPPED
self._cooling_start_time = datetime.now()
# 调用熔断回调
if self.on_tripped_callback:
try:
self.on_tripped_callback(reason)
except Exception as e:
self.logger.error(f"熔断回调执行失败: {e}")
# 发送告警(可集成飞书/钉钉/Slack)
self._send_alert(reason)
# 持久化状态
self._save_state()
def request_manual_recovery(self, recovery_token: str) -> bool:
"""
请求人工恢复熔断
Args:
recovery_token: 恢复令牌,需与触发时生成的令牌匹配
Returns:
是否成功恢复
"""
with self._lock:
if self._state not in (CircuitState.TRIPPED, CircuitState.MANUAL_HOLD):
self.logger.warning("当前状态不允许人工恢复")
return False
# 生成新的恢复令牌
self._manual_recovery_token = self._generate_token()
self._state = CircuitState.MANUAL_HOLD
self.logger.info(
f"人工恢复请求已接收,等待确认。当前状态:{self._state.value}"
)
self._save_state()
return True
def confirm_recovery(self, confirmation_token: str) -> bool:
"""
确认恢复交易
Args:
confirmation_token: 确认令牌
Returns:
是否成功确认恢复
"""
with self._lock:
if self._state != CircuitState.MANUAL_HOLD:
self.logger.warning("当前不在人工干预状态")
return False
self._state = CircuitState.RECOVERY_CONFIRMED
self._consecutive_loss_count = 0
self._auto_recovery_attempts = 0
self.logger.info("✅ 人工恢复已确认,策略重新开始运行")
if self.on_recovery_callback:
try:
self.on_recovery_callback()
except Exception as e:
self.logger.error(f"恢复回调执行失败: {e}")
# 重置为运行状态
self._state = CircuitState.RUNNING
self._save_state()
return True
def check_auto_recovery(self) -> bool:
"""检查是否满足自动恢复条件"""
with self._lock:
if not self.config.auto_recovery_enabled:
return False
if self._state != CircuitState.TRIPPED:
return False
if self._auto_recovery_attempts >= self.config.auto_recovery_max_attempts:
self.logger.warning(
"自动恢复尝试次数已达上限,需人工干预"
)
return False
# 检查冷却期是否结束
elapsed = (datetime.now() - self._cooling_start_time).total_seconds()
if elapsed < self.config.cooling_period_seconds:
remaining = self.config.cooling_period_seconds - elapsed
self.logger.debug(f"冷却期中,还剩 {remaining:.0f} 秒")
return False
# 通过冷却期,进入自动恢复
self._state = CircuitState.AUTO_RECOVERY
self._save_state()
return True
def attempt_auto_recovery(self) -> bool:
"""执行自动恢复尝试"""
with self._lock:
if self._state != CircuitState.AUTO_RECOVERY:
return False
self._auto_recovery_attempts += 1
self.logger.info(
f"执行第 {self._auto_recovery_attempts} 次自动恢复尝试"
)
# 自动恢复默认重置连续亏损计数
self._consecutive_loss_count = 0
self._state = CircuitState.RUNNING
if self.on_recovery_callback:
try:
self.on_recovery_callback()
except Exception as e:
self.logger.error(f"自动恢复回调执行失败: {e}")
self._save_state()
return True
# ============ 状态持久化 ============
def _save_state(self):
"""持久化状态到文件"""
state_data = {
"state": self._state.value,
"consecutive_loss_count": self._consecutive_loss_count,
"daily_pnl": self._daily_pnl,
"daily_start_date": self._daily_start_time.isoformat(),
"cooling_start_time": (
self._cooling_start_time.isoformat()
if self._cooling_start_time else None
),
"auto_recovery_attempts": self._auto_recovery_attempts,
"trigger_history": self._trigger_history[-50:], # 只保留最近50条
"saved_at": datetime.now().isoformat()
}
try:
with open(self.config.state_persistence_path, 'w') as f:
json.dump(state_data, f, indent=2)
except Exception as e:
self.logger.error(f"状态保存失败: {e}")
def _load_state(self):
"""从文件加载状态"""
if not os.path.exists(self.config.state_persistence_path):
return
try:
with open(self.config.state_persistence_path, 'r') as f:
data = json.load(f)
self._state = CircuitState(data["state"])
self._consecutive_loss_count = data.get("consecutive_loss_count", 0)
self._daily_pnl = data.get("daily_pnl", 0.0)
self._daily_start_time = datetime.fromisoformat(
data.get("daily_start_date", datetime.now().date().isoformat())
)
self._cooling_start_time = (
datetime.fromisoformat(data["cooling_start_time"])
if data.get("cooling_start_time") else None
)
self._auto_recovery_attempts = data.get("auto_recovery_attempts", 0)
self._trigger_history = data.get("trigger_history", [])
self.logger.info(f"从持久化文件恢复状态:{self._state.value}")
except Exception as e:
self.logger.warning(f"状态加载失败,使用默认状态: {e}")
def _check_daily_reset(self):
"""检查是否需要重置每日统计"""
today = datetime.now().date()
if today > self._daily_start_time:
self.logger.info(
f"新交易日开始,重置每日统计(上日盈亏:{self._daily_pnl:.2f})"
)
self._daily_pnl = 0.0
self._daily_start_time = today
# 注意:不重置连续亏损计数——连续亏损是跨日的
# ============ 工具方法 ============
def _generate_token(self) -> str:
"""生成恢复令牌"""
import secrets
return secrets.token_urlsafe(32)
def _send_alert(self, reason: dict):
"""
发送告警通知
⚠️ 生产环境应替换为飞书 Webhook / 钉钉机器人 / SMTP 等
此处仅作日志记录
"""
alert_message = (
f"🚨 【熔断告警】策略熔断触发\n"
f"时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
f"原因:{reason['detail']}\n"
f"当前状态:{self._state.value}\n"
f"连续亏损:{self._consecutive_loss_count} 次\n"
f"单日盈亏:{self._daily_pnl:.2f}\n"
f"请登录系统进行人工确认"
)
# ⚠️ 实际部署时替换为真实告警渠道
self.logger.critical(alert_message)
def get_status(self) -> dict:
"""获取当前熔断状态快照"""
with self._lock:
return {
"state": self._state.value,
"consecutive_loss_count": self._consecutive_loss_count,
"daily_pnl": self._daily_pnl,
"auto_recovery_attempts": self._auto_recovery_attempts,
"cooling_remaining_seconds": (
max(0, self.config.cooling_period_seconds -
(datetime.now() - self._cooling_start_time).total_seconds())
if self._cooling_start_time and self._state == CircuitState.TRIPPED
else 0
)
}
五、集成到策略执行引擎
光有熔断器不够,还得把它集成到你的策略执行引擎里。下面是一个示例框架:
import time
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
# ============ 策略执行引擎 ============
class StrategyEngine:
"""
带熔断保护的策略执行引擎
使用示例:
config = CircuitBreakerConfig(
consecutive_loss_threshold=5,
daily_drawdown_threshold=0.05,
cooling_period_seconds=3600
)
engine = StrategyEngine(circuit_breaker=CircuitBreaker(config))
engine.run()
"""
def __init__(
self,
circuit_breaker: CircuitBreaker,
strategy_fn: Callable,
check_interval: float = 60.0
):
self.cb = circuit_breaker
self.strategy_fn = strategy_fn
self.check_interval = check_interval
self.running = False
self.logger = logging.getLogger(__name__)
def run(self):
"""主循环"""
self.running = True
self.logger.info("策略执行引擎启动")
while self.running:
try:
# 1. 检查熔断状态
status = self.cb.get_status()
self.logger.debug(f"当前状态:{status}")
# 2. 处理自动恢复
if self.cb.check_auto_recovery():
self.cb.attempt_auto_recovery()
# 3. 检查是否允许交易
status = self.cb.get_status()
if status["state"] in ("tripped", "manual_hold"):
self.logger.warning(
f"熔断保护中,状态={status['state']},"
f"冷却剩余 {status['cooling_remaining_seconds']:.0f}s"
)
time.sleep(self.check_interval)
continue
# 4. 执行策略逻辑
trade_result = self.strategy_fn()
if trade_result is not None:
# 5. 记录交易结果
allowed = self.cb.record_trade_result(
pnl=trade_result.get("pnl", 0),
trade_id=trade_result.get("id", "unknown")
)
if not allowed:
self.logger.critical("交易被熔断器拒绝")
except KeyboardInterrupt:
self.logger.info("收到停止信号")
self.running = False
except Exception as e:
self.logger.error(f"执行异常: {e}", exc_info=True)
time.sleep(5)
self.logger.info("策略执行引擎已停止")
def stop(self):
"""停止引擎"""
self.running = False
六、配置指南:你的熔断阈值应该设多少
阈值设置没有标准答案,取决于你的策略特性和风险偏好。
基准参考
| 参数 | 保守建议 | 中性建议 | 激进建议 |
|---|---|---|---|
| 连续亏损次数 | 3 次 | 5 次 | 8 次 |
| 单日回撤 | 3% | 5% | 8% |
| 单笔亏损 | 1% | 2% | 3% |
| 冷却期 | 2 小时 | 1 小时 | 30 分钟 |
因子选择建议
趋势跟踪策略:
- 适合较宽松阈值(趋势策略本身就容易连亏)
- 建议连续亏损 8-10 次再触发
均值回归策略:
- 适合较严格阈值(均值回归本应快速回归)
- 建议连续亏损 3-5 次触发
高频做市策略:
- 阈值极严格,因为单笔亏损可能累积极快
- 建议单笔亏损 0.5% 触发
七、人工干预接口设计
熔断不能只靠程序自动处理。真实场景中,你需要:
- 立即查看当前状态(飞书/钉钉机器人)
- 手动触发熔断(当你发现市场异常时)
- 确认恢复交易(审查后决定是否重启)
下面是一个基于 REST 的管理接口示例:
from flask import Flask, jsonify, request
app = Flask(__name__)
circuit_breaker = CircuitBreaker(
CircuitBreakerConfig(),
on_tripped=lambda r: print(f"熔断告警: {r}")
)
@app.route("/api/circuit/status")
def get_status():
"""获取熔断状态"""
return jsonify(circuit_breaker.get_status())
@app.route("/api/circuit/trip", methods=["POST"])
def manual_trip():
"""手动触发熔断"""
reason = request.json.get("reason", "手动触发")
circuit_breaker._trip({
"event": TriggerEvent.MANUAL_TRIGGER,
"detail": reason
})
return jsonify({"success": True, "message": "熔断已触发"})
@app.route("/api/circuit/recover", methods=["POST"])
def manual_recover():
"""请求恢复熔断"""
# ⚠️ 生产环境需要严格的身份验证
token = request.json.get("token")
if not token:
return jsonify({"error": "缺少恢复令牌"}), 400
success = circuit_breaker.request_manual_recovery(token)
return jsonify({
"success": success,
"confirmation_token": circuit_breaker._manual_recovery_token
})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
⚠️ 重要提醒:
- 人工恢复需要两级确认:请求 → 确认,防止误操作
- 生产环境必须添加身份验证(API Key / JWT)
- 建议记录所有人工干预操作,便于审计
八、常见的五个工程坑
坑一:状态持久化文件被误删
程序重启后丢失熔断状态,连续亏损计数归零。
解决方案:状态文件要作为关键数据保护,重启前检查文件是否存在。
坑二:多进程共享状态不一致
开了多个进程跑同一个策略,每个进程都有自己的熔断器,独立计数。
解决方案:使用 Redis 存储状态,或使用文件锁确保原子读写。
坑三:冷却期内市场反转,错失机会
熔断了 1 小时,结果那 1 小时里行情 V 型反转。
解决方案:冷却期不要设太长(建议≤2 小时),同时提供手动快速恢复通道。
坑四:熔断触发后没有真正停止交易
状态变了,但订单队列里还有未成交的订单。
解决方案:熔断触发时,同时清空订单队列(cancel all orders)。
坑五:自动恢复后立即爆仓
自动恢复太激进,冷却期刚结束就全仓干进去。
解决方案:自动恢复后,第一笔交易仓位减半,观察 3-5 分钟再恢复正常仓位。
九、下一步行动
如果你想直接测试这个熔断系统:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台生成 API Key
- 设置环境变量
TICKDB_API_KEY - 复制上面的代码即可运行
export TICKDB_API_KEY="your_api_key_here"
如果你想了解更多风控实践:
- 《仓位管理进阶:用凯利公式计算最优下注比例》
- 《订单簿预警:提前识别流动性枯竭的 7 个信号》
风险提示:本文不构成任何投资建议。熔断机制是风险管理工具,不能消除风险本身。实际使用时,请根据策略特性和自身风险承受能力调整阈值参数,并在模拟盘充分验证后再投入实盘。