策略连续亏损自动熔断:用状态机保护你的账户


凌晨三点,你设置的“趋势跟踪策略”正在自动运行。

然后,连续亏损。

3 次。5 次。8 次。

你的账户在黑暗中被一点点侵蚀,而你正在睡觉。

这不是恐怖故事。这是每一个没有熔断机制的量化工程师迟早会踩的坑。


一、为什么你需要一个熔断机制

先说个真实发生的事。

某私募的量化团队在 2022 年有个策略,专门做期权波动率回归。回测数据漂亮得不行——夏普 2.3,最大回撤 8%。上线后前三个月确实稳,团队已经开始筹备新产品了。

然后有一天,某个黑天鹅事件触发了连续 12 次亏损。策略本身没问题,问题在于它没有熔断——它会一直跑下去,把账户从盈利 15% 做到亏损 22%。

后来复盘,他们发现:如果当时有熔断机制,损失会被控制在 12% 以内。

这不是策略的问题,是工程的问题。

熔断的核心价值:让程序在失去理智之前先停下来。


二、常见的三种错误做法

在聊状态机之前,先看看大家通常怎么实现熔断,以及为什么这些做法会埋雷。

错误做法一:简单计数器

# ❌ 反面教材:这种代码你一定见过
loss_count = 0

def on_trade(trade_result):
    global loss_count
    if trade_result['pnl'] < 0:
        loss_count += 1
        if loss_count >= 5:
            logger.warning("止损!")
            # 然后呢?手动改代码重启?
    else:
        loss_count = 0  # 只清零不停止

问题在哪?

  • 没有真正停止策略,只是打印了一行日志
  • 程序重启后状态丢失,loss_count 从 0 开始
  • 无法区分“连续亏损”和“偶发亏损”——一个 10% 的盈利能掩盖 5 个小亏损

错误做法二:try-except 包裹一切

# ❌ 反面教材:熔断变成异常处理
while running:
    try:
        execute_strategy()
    except Exception as e:
        logger.error(f"出错了: {e}")
        # 睡一会儿继续跑
        time.sleep(60)

问题在哪?

  • 网络错误、API 超时和策略逻辑错误混为一谈
  • 异常往往意味着已经造成了损失,而不是预防损失
  • 没有状态记录,不知道已经连续失败多少次

错误做法三:用全局变量控制状态

# ❌ 反面教材:状态散落在各处
stop_loss_flag = False

def check_circuit_breaker():
    global stop_loss_flag
    if consecutive_loss >= 5:
        stop_loss_flag = True

def run_strategy():
    global stop_loss_flag
    if stop_loss_flag:
        return
    # 策略逻辑...

问题在哪?

  • 多个线程/进程访问全局变量,状态不一致
  • 没有恢复逻辑——一旦触发,程序永远不会自动重启
  • 难以测试和扩展

这三种做法的共同问题:把熔断当成一个检查函数,而不是一个系统


三、用状态机设计熔断系统

熔断不是一个 if 语句,它是一个状态机。

所谓状态机,就是把“程序当前在干什么”分成几个明确的离散状态,然后定义什么事件触发状态切换,以及每个状态下会发生什么。

对于量化策略的熔断系统,我建议用 5 个状态:

┌─────────────────────────────────────────────────────────────┐
│                                                             │
│    ┌──────┐    触发条件     ┌────────┐   冷却期结束   ┌────┐ │
│    │ 运 行 │ ──────────────▶│ 熔 断  │ ──────────────▶│待审│ │
│    └──────┘                └────────┘               └────┘ │
│       ▲                          │                       │   │
│       │                          │ 人工恢复             │   │
│       │                          ▼                       ▼   │
│       │                      ┌────────┐             ┌──────┐ │
│       └──────────────────────│ 人工介入│◀───────────│ 自动 │ │
│                              └────────┘             │ 恢复 │ │
│                                                      └──────┘ │
└─────────────────────────────────────────────────────────────┘

五态说明

状态 含义 策略行为
RUNNING 正常运行 执行交易逻辑
TRIPPED 熔断触发 暂停所有交易,保留头寸
MANUAL_HOLD 人工干预中 完全冻结,等待人工确认
AUTO_RECOVERY 自动恢复等待 冷却期结束,可选自动重连
RECOVERY_CONFIRMED 恢复已确认 重启交易逻辑

四、生产级熔断实现

下面给出完整的熔断状态机实现。代码包含所有工程要素:持久化存储、状态一致性、恢复逻辑、告警通知

import enum
import time
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional, Callable
from threading import Lock
import os

# ============ 1. 状态枚举 ============

class CircuitState(enum.Enum):
    RUNNING = "running"
    TRIPPED = "tripped"
    MANUAL_HOLD = "manual_hold"
    AUTO_RECOVERY = "auto_recovery"
    RECOVERY_CONFIRMED = "recovery_confirmed"

# ============ 2. 触发事件枚举 ============

class TriggerEvent(enum.Enum):
    CONSECUTIVE_LOSS_THRESHOLD = "consecutive_loss_threshold"
    DAILY_DRAWDOWN_THRESHOLD = "daily_drawdown_threshold"
    SINGLE_TRADE_LOSS_THRESHOLD = "single_trade_loss_threshold"
    MANUAL_TRIGGER = "manual_trigger"
    COOLING_PERIOD_COMPLETE = "cooling_period_complete"
    MANUAL_RECOVERY = "manual_recovery"
    AUTO_RECOVERY_TIMEOUT = "auto_recovery_timeout"

# ============ 3. 熔断配置 ============

@dataclass
class CircuitBreakerConfig:
    """熔断阈值配置"""
    consecutive_loss_threshold: int = 5          # 连续亏损次数阈值
    daily_drawdown_threshold: float = 0.05       # 单日回撤阈值(5%)
    single_trade_loss_threshold: float = 0.02    # 单笔亏损阈值(2%)
    cooling_period_seconds: int = 3600           # 冷却期(1小时)
    auto_recovery_enabled: bool = True          # 是否允许自动恢复
    auto_recovery_max_attempts: int = 3         # 自动恢复最大尝试次数
    state_persistence_path: str = "./circuit_state.json"

# ============ 4. 熔断状态机核心类 ============

class CircuitBreaker:
    """
    量化策略熔断状态机
    
    功能:
    - 监控连续亏损次数、单日回撤、单笔亏损
    - 状态持久化,程序重启后可恢复
    - 支持手动干预和自动恢复
    - 触发熔断时发送告警
    """
    
    def __init__(
        self,
        config: CircuitBreakerConfig,
        on_tripped: Optional[Callable] = None,
        on_recovery: Optional[Callable] = None
    ):
        self.config = config
        self.on_tripped_callback = on_tripped
        self.on_recovery_callback = on_recovery
        
        self._lock = Lock()
        self._state = CircuitState.RUNNING
        self._consecutive_loss_count = 0
        self._daily_pnl = 0.0
        self._daily_start_time = datetime.now().date()
        self._cooling_start_time: Optional[datetime] = None
        self._auto_recovery_attempts = 0
        self._trigger_history: list = []
        self._manual_recovery_token: Optional[str] = None
        
        # 加载持久化状态
        self._load_state()
        
        # 检查是否需要重置每日统计
        self._check_daily_reset()
        
        self.logger = logging.getLogger(__name__)
    
    # ============ 状态管理核心方法 ============
    
    def record_trade_result(self, pnl: float, trade_id: str) -> bool:
        """
        记录交易结果,返回是否允许继续交易
        
        Args:
            pnl: 盈亏金额(正数盈利,负数亏损)
            trade_id: 交易ID,用于追踪
        
        Returns:
            True: 可以继续交易
            False: 熔断已触发,禁止交易
        """
        with self._lock:
            self._check_daily_reset()
            
            # 检查当前状态
            if self._state in (CircuitState.MANUAL_HOLD, CircuitState.TRIPPED):
                self.logger.warning(
                    f"交易 {trade_id} 被拒绝:状态机处于 {self._state.value},"
                    f"需人工干预"
                )
                return False
            
            # 更新统计数据
            self._daily_pnl += pnl
            entry = {
                "trade_id": trade_id,
                "pnl": pnl,
                "timestamp": datetime.now().isoformat()
            }
            
            if pnl < 0:
                self._consecutive_loss_count += 1
                entry["loss_count"] = self._consecutive_loss_count
                
                self.logger.info(
                    f"亏损记录:第 {self._consecutive_loss_count} 次连续亏损,"
                    f"交易ID={trade_id}"
                )
            else:
                self._consecutive_loss_count = 0
                entry["loss_count"] = 0
            
            self._trigger_history.append(entry)
            
            # 触发检查
            should_trip = self._evaluate_trip_conditions(pnl)
            
            if should_trip:
                self._trip(should_trip["reason"])
                return False
            
            return True
    
    def _evaluate_trip_conditions(self, pnl: float) -> Optional[dict]:
        """评估是否触发熔断"""
        reasons = []
        
        # 检查1:连续亏损次数
        if self._consecutive_loss_count >= self.config.consecutive_loss_threshold:
            reasons.append({
                "event": TriggerEvent.CONSECUTIVE_LOSS_THRESHOLD,
                "detail": f"连续亏损 {self._consecutive_loss_count} 次超过阈值 "
                         f"{self.config.consecutive_loss_threshold}"
            })
        
        # 检查2:单日回撤
        if self._daily_pnl <= -abs(self.config.daily_drawdown_threshold):
            reasons.append({
                "event": TriggerEvent.DAILY_DRAWDOWN_THRESHOLD,
                "detail": f"单日回撤 {abs(self._daily_pnl):.2%} 超过阈值 "
                         f"{self.config.daily_drawdown_threshold:.2%}"
            })
        
        # 检查3:单笔亏损
        if pnl < 0 and abs(pnl) >= self.config.single_trade_loss_threshold:
            reasons.append({
                "event": TriggerEvent.SINGLE_TRADE_LOSS_THRESHOLD,
                "detail": f"单笔亏损 {abs(pnl):.2%} 超过阈值 "
                         f"{self.config.single_trade_loss_threshold:.2%}"
            })
        
        return reasons[0] if reasons else None
    
    def _trip(self, reason: dict):
        """触发熔断"""
        self.logger.critical(
            f"🚨 熔断触发!原因:{reason['detail']}"
        )
        
        self._state = CircuitState.TRIPPED
        self._cooling_start_time = datetime.now()
        
        # 调用熔断回调
        if self.on_tripped_callback:
            try:
                self.on_tripped_callback(reason)
            except Exception as e:
                self.logger.error(f"熔断回调执行失败: {e}")
        
        # 发送告警(可集成飞书/钉钉/Slack)
        self._send_alert(reason)
        
        # 持久化状态
        self._save_state()
    
    def request_manual_recovery(self, recovery_token: str) -> bool:
        """
        请求人工恢复熔断
        
        Args:
            recovery_token: 恢复令牌,需与触发时生成的令牌匹配
        
        Returns:
            是否成功恢复
        """
        with self._lock:
            if self._state not in (CircuitState.TRIPPED, CircuitState.MANUAL_HOLD):
                self.logger.warning("当前状态不允许人工恢复")
                return False
            
            # 生成新的恢复令牌
            self._manual_recovery_token = self._generate_token()
            self._state = CircuitState.MANUAL_HOLD
            
            self.logger.info(
                f"人工恢复请求已接收,等待确认。当前状态:{self._state.value}"
            )
            self._save_state()
            
            return True
    
    def confirm_recovery(self, confirmation_token: str) -> bool:
        """
        确认恢复交易
        
        Args:
            confirmation_token: 确认令牌
        
        Returns:
            是否成功确认恢复
        """
        with self._lock:
            if self._state != CircuitState.MANUAL_HOLD:
                self.logger.warning("当前不在人工干预状态")
                return False
            
            self._state = CircuitState.RECOVERY_CONFIRMED
            self._consecutive_loss_count = 0
            self._auto_recovery_attempts = 0
            
            self.logger.info("✅ 人工恢复已确认,策略重新开始运行")
            
            if self.on_recovery_callback:
                try:
                    self.on_recovery_callback()
                except Exception as e:
                    self.logger.error(f"恢复回调执行失败: {e}")
            
            # 重置为运行状态
            self._state = CircuitState.RUNNING
            self._save_state()
            
            return True
    
    def check_auto_recovery(self) -> bool:
        """检查是否满足自动恢复条件"""
        with self._lock:
            if not self.config.auto_recovery_enabled:
                return False
            
            if self._state != CircuitState.TRIPPED:
                return False
            
            if self._auto_recovery_attempts >= self.config.auto_recovery_max_attempts:
                self.logger.warning(
                    "自动恢复尝试次数已达上限,需人工干预"
                )
                return False
            
            # 检查冷却期是否结束
            elapsed = (datetime.now() - self._cooling_start_time).total_seconds()
            if elapsed < self.config.cooling_period_seconds:
                remaining = self.config.cooling_period_seconds - elapsed
                self.logger.debug(f"冷却期中,还剩 {remaining:.0f} 秒")
                return False
            
            # 通过冷却期,进入自动恢复
            self._state = CircuitState.AUTO_RECOVERY
            self._save_state()
            return True
    
    def attempt_auto_recovery(self) -> bool:
        """执行自动恢复尝试"""
        with self._lock:
            if self._state != CircuitState.AUTO_RECOVERY:
                return False
            
            self._auto_recovery_attempts += 1
            self.logger.info(
                f"执行第 {self._auto_recovery_attempts} 次自动恢复尝试"
            )
            
            # 自动恢复默认重置连续亏损计数
            self._consecutive_loss_count = 0
            self._state = CircuitState.RUNNING
            
            if self.on_recovery_callback:
                try:
                    self.on_recovery_callback()
                except Exception as e:
                    self.logger.error(f"自动恢复回调执行失败: {e}")
            
            self._save_state()
            return True
    
    # ============ 状态持久化 ============
    
    def _save_state(self):
        """持久化状态到文件"""
        state_data = {
            "state": self._state.value,
            "consecutive_loss_count": self._consecutive_loss_count,
            "daily_pnl": self._daily_pnl,
            "daily_start_date": self._daily_start_time.isoformat(),
            "cooling_start_time": (
                self._cooling_start_time.isoformat() 
                if self._cooling_start_time else None
            ),
            "auto_recovery_attempts": self._auto_recovery_attempts,
            "trigger_history": self._trigger_history[-50:],  # 只保留最近50条
            "saved_at": datetime.now().isoformat()
        }
        
        try:
            with open(self.config.state_persistence_path, 'w') as f:
                json.dump(state_data, f, indent=2)
        except Exception as e:
            self.logger.error(f"状态保存失败: {e}")
    
    def _load_state(self):
        """从文件加载状态"""
        if not os.path.exists(self.config.state_persistence_path):
            return
        
        try:
            with open(self.config.state_persistence_path, 'r') as f:
                data = json.load(f)
            
            self._state = CircuitState(data["state"])
            self._consecutive_loss_count = data.get("consecutive_loss_count", 0)
            self._daily_pnl = data.get("daily_pnl", 0.0)
            self._daily_start_time = datetime.fromisoformat(
                data.get("daily_start_date", datetime.now().date().isoformat())
            )
            self._cooling_start_time = (
                datetime.fromisoformat(data["cooling_start_time"])
                if data.get("cooling_start_time") else None
            )
            self._auto_recovery_attempts = data.get("auto_recovery_attempts", 0)
            self._trigger_history = data.get("trigger_history", [])
            
            self.logger.info(f"从持久化文件恢复状态:{self._state.value}")
        except Exception as e:
            self.logger.warning(f"状态加载失败,使用默认状态: {e}")
    
    def _check_daily_reset(self):
        """检查是否需要重置每日统计"""
        today = datetime.now().date()
        if today > self._daily_start_time:
            self.logger.info(
                f"新交易日开始,重置每日统计(上日盈亏:{self._daily_pnl:.2f})"
            )
            self._daily_pnl = 0.0
            self._daily_start_time = today
            # 注意:不重置连续亏损计数——连续亏损是跨日的
    
    # ============ 工具方法 ============
    
    def _generate_token(self) -> str:
        """生成恢复令牌"""
        import secrets
        return secrets.token_urlsafe(32)
    
    def _send_alert(self, reason: dict):
        """
        发送告警通知
        
        ⚠️ 生产环境应替换为飞书 Webhook / 钉钉机器人 / SMTP 等
        此处仅作日志记录
        """
        alert_message = (
            f"🚨 【熔断告警】策略熔断触发\n"
            f"时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
            f"原因:{reason['detail']}\n"
            f"当前状态:{self._state.value}\n"
            f"连续亏损:{self._consecutive_loss_count} 次\n"
            f"单日盈亏:{self._daily_pnl:.2f}\n"
            f"请登录系统进行人工确认"
        )
        
        # ⚠️ 实际部署时替换为真实告警渠道
        self.logger.critical(alert_message)
    
    def get_status(self) -> dict:
        """获取当前熔断状态快照"""
        with self._lock:
            return {
                "state": self._state.value,
                "consecutive_loss_count": self._consecutive_loss_count,
                "daily_pnl": self._daily_pnl,
                "auto_recovery_attempts": self._auto_recovery_attempts,
                "cooling_remaining_seconds": (
                    max(0, self.config.cooling_period_seconds - 
                        (datetime.now() - self._cooling_start_time).total_seconds())
                    if self._cooling_start_time and self._state == CircuitState.TRIPPED
                    else 0
                )
            }

五、集成到策略执行引擎

光有熔断器不够,还得把它集成到你的策略执行引擎里。下面是一个示例框架:

import time
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)

# ============ 策略执行引擎 ============

class StrategyEngine:
    """
    带熔断保护的策略执行引擎
    
    使用示例:
        config = CircuitBreakerConfig(
            consecutive_loss_threshold=5,
            daily_drawdown_threshold=0.05,
            cooling_period_seconds=3600
        )
        engine = StrategyEngine(circuit_breaker=CircuitBreaker(config))
        engine.run()
    """
    
    def __init__(
        self,
        circuit_breaker: CircuitBreaker,
        strategy_fn: Callable,
        check_interval: float = 60.0
    ):
        self.cb = circuit_breaker
        self.strategy_fn = strategy_fn
        self.check_interval = check_interval
        self.running = False
        self.logger = logging.getLogger(__name__)
    
    def run(self):
        """主循环"""
        self.running = True
        self.logger.info("策略执行引擎启动")
        
        while self.running:
            try:
                # 1. 检查熔断状态
                status = self.cb.get_status()
                self.logger.debug(f"当前状态:{status}")
                
                # 2. 处理自动恢复
                if self.cb.check_auto_recovery():
                    self.cb.attempt_auto_recovery()
                
                # 3. 检查是否允许交易
                status = self.cb.get_status()
                if status["state"] in ("tripped", "manual_hold"):
                    self.logger.warning(
                        f"熔断保护中,状态={status['state']},"
                        f"冷却剩余 {status['cooling_remaining_seconds']:.0f}s"
                    )
                    time.sleep(self.check_interval)
                    continue
                
                # 4. 执行策略逻辑
                trade_result = self.strategy_fn()
                
                if trade_result is not None:
                    # 5. 记录交易结果
                    allowed = self.cb.record_trade_result(
                        pnl=trade_result.get("pnl", 0),
                        trade_id=trade_result.get("id", "unknown")
                    )
                    
                    if not allowed:
                        self.logger.critical("交易被熔断器拒绝")
                
            except KeyboardInterrupt:
                self.logger.info("收到停止信号")
                self.running = False
            except Exception as e:
                self.logger.error(f"执行异常: {e}", exc_info=True)
                time.sleep(5)
        
        self.logger.info("策略执行引擎已停止")
    
    def stop(self):
        """停止引擎"""
        self.running = False

六、配置指南:你的熔断阈值应该设多少

阈值设置没有标准答案,取决于你的策略特性和风险偏好。

基准参考

参数 保守建议 中性建议 激进建议
连续亏损次数 3 次 5 次 8 次
单日回撤 3% 5% 8%
单笔亏损 1% 2% 3%
冷却期 2 小时 1 小时 30 分钟

因子选择建议

趋势跟踪策略

  • 适合较宽松阈值(趋势策略本身就容易连亏)
  • 建议连续亏损 8-10 次再触发

均值回归策略

  • 适合较严格阈值(均值回归本应快速回归)
  • 建议连续亏损 3-5 次触发

高频做市策略

  • 阈值极严格,因为单笔亏损可能累积极快
  • 建议单笔亏损 0.5% 触发

七、人工干预接口设计

熔断不能只靠程序自动处理。真实场景中,你需要:

  1. 立即查看当前状态(飞书/钉钉机器人)
  2. 手动触发熔断(当你发现市场异常时)
  3. 确认恢复交易(审查后决定是否重启)

下面是一个基于 REST 的管理接口示例:

from flask import Flask, jsonify, request

app = Flask(__name__)
circuit_breaker = CircuitBreaker(
    CircuitBreakerConfig(),
    on_tripped=lambda r: print(f"熔断告警: {r}")
)

@app.route("/api/circuit/status")
def get_status():
    """获取熔断状态"""
    return jsonify(circuit_breaker.get_status())

@app.route("/api/circuit/trip", methods=["POST"])
def manual_trip():
    """手动触发熔断"""
    reason = request.json.get("reason", "手动触发")
    circuit_breaker._trip({
        "event": TriggerEvent.MANUAL_TRIGGER,
        "detail": reason
    })
    return jsonify({"success": True, "message": "熔断已触发"})

@app.route("/api/circuit/recover", methods=["POST"])
def manual_recover():
    """请求恢复熔断"""
    # ⚠️ 生产环境需要严格的身份验证
    token = request.json.get("token")
    if not token:
        return jsonify({"error": "缺少恢复令牌"}), 400
    
    success = circuit_breaker.request_manual_recovery(token)
    return jsonify({
        "success": success,
        "confirmation_token": circuit_breaker._manual_recovery_token
    })

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)

⚠️ 重要提醒

  • 人工恢复需要两级确认:请求 → 确认,防止误操作
  • 生产环境必须添加身份验证(API Key / JWT)
  • 建议记录所有人工干预操作,便于审计

八、常见的五个工程坑

坑一:状态持久化文件被误删

程序重启后丢失熔断状态,连续亏损计数归零。

解决方案:状态文件要作为关键数据保护,重启前检查文件是否存在。

坑二:多进程共享状态不一致

开了多个进程跑同一个策略,每个进程都有自己的熔断器,独立计数。

解决方案:使用 Redis 存储状态,或使用文件锁确保原子读写。

坑三:冷却期内市场反转,错失机会

熔断了 1 小时,结果那 1 小时里行情 V 型反转。

解决方案:冷却期不要设太长(建议≤2 小时),同时提供手动快速恢复通道。

坑四:熔断触发后没有真正停止交易

状态变了,但订单队列里还有未成交的订单。

解决方案:熔断触发时,同时清空订单队列(cancel all orders)。

坑五:自动恢复后立即爆仓

自动恢复太激进,冷却期刚结束就全仓干进去。

解决方案:自动恢复后,第一笔交易仓位减半,观察 3-5 分钟再恢复正常仓位。


九、下一步行动

如果你想直接测试这个熔断系统

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY
  4. 复制上面的代码即可运行
export TICKDB_API_KEY="your_api_key_here"

如果你想了解更多风控实践

  • 《仓位管理进阶:用凯利公式计算最优下注比例》
  • 《订单簿预警:提前识别流动性枯竭的 7 个信号》

风险提示:本文不构成任何投资建议。熔断机制是风险管理工具,不能消除风险本身。实际使用时,请根据策略特性和自身风险承受能力调整阈值参数,并在模拟盘充分验证后再投入实盘。