策略连续亏损自动熔断:用状态机保护你的账户

凌晨三点,你被飞书告警震醒。

揉了揉眼睛看了一眼面板:策略在过去 30 分钟内连续亏损了 12 笔,每笔亏损都在扩大。你下意识想去关掉策略,但手指刚碰到键盘,第 13 笔亏损已经成交了。

这不是故事。这是真实发生的量化灾难,而它的名字叫连环爆仓

问题的根源不在于你的策略逻辑有错,而在于:没有人能在极端行情下保持冷静,更没有人能 24 小时盯着屏幕。当你发现策略异常时,可能已经亏损了 30%。更糟糕的是,有些行情会在你犹豫的时候继续恶化——你的止损单可能因为流动性枯竭而无法成交。

本文拆解一个工程级的熔断机制:用有限状态机保护你的账户,让程序在失控边缘自动停下,同时保留人工干预的逃生舱。


一、为什么你的止损策略总是失效

在讨论熔断之前,先正视一个残酷的事实:大多数人的"止损"只是心理安慰,不是风控系统

典型的问题包括:

1. 止损逻辑分散在各处

# 分散的止损逻辑 - 灾难的起点
def on_bar(bar):
    if self.current_loss > self.max_loss:  # 位置 A
        self.close_all()
    
    if self.trades_today > 20:  # 位置 B - 完全独立的检查
        return
    
    if self.position > self.max_position:  # 位置 C
        self.reduce_position()

当止损逻辑分散在 5 个地方,没有人会知道完整的风控规则是什么。更重要的是,没有人会知道在什么情况下这些检查会被跳过

2. 没有"熔断冷却期"的概念

很多人理解的熔断就是"亏损超限就停止"。但真正的熔断需要考虑:

  • 连续亏损 N 次后暂停一段时间
  • 这段时间内绝对不能自动恢复,必须有人确认后手动重启
  • 暂停期间任何新的开仓信号都应该被静默丢弃

3. 缺乏状态可见性

当你的策略在运行,你很难一眼看出它当前处于什么状态——是正常交易、观察期、还是已经熔断?

没有状态可见性,就没有决策依据。


二、熔断状态机设计

解决上述问题的核心是用**有限状态机(FSM)**重新设计风控逻辑。

2.1 状态机定义

                    ┌─────────────────────────────────────────────┐
                    │                                             │
                    ▼                                             │
              ┌──────────┐    连续亏损 ≥ N    ┌───────────────────┴───┐
   启动/恢复  │  正常    │ ─────────────────► │       观察中           │
              │ RUNNING  │                    │    OBSERVING          │
              └────┬─────┘◄───────────────────┤  (进入冷却倒计时)      │
                   │                          └───────────┬───────────┘
                   │ 连续盈利 ≥ M                    │ ▲
                   │ 且观察期结束                    │ │ 观察期内
                   │                                ▼ │
              ┌────┴─────┐                   ┌───────────────────┐
              │  熔断中   │ ◄─────────────────│     触发熔断       │
              │  HALTED  │   单日回撤超限     │                   │
              └──────────┘                   └───────────────────┘
                   ▲
                   │
                   │ 人工确认 + 手动恢复
                   │
         ┌─────────┴──────────┐
         │    等待人工干预     │
         │    AWAITING_RESET  │
         └────────────────────┘

四种状态的定义

状态 含义 允许的操作
RUNNING 正常运行,可以开仓 接受所有交易信号
OBSERVING 连续亏损触发观察期 禁止开仓,监听行情,等待冷却
HALTED 已触发熔断 禁止所有交易,等待人工干预
AWAITING_RESET 已熔断,等待确认 仅接受人工确认指令

2.2 状态转移规则

from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Callable, Optional
import threading


class TradingState(Enum):
    """交易状态枚举"""
    RUNNING = "running"
    OBSERVING = "observing"
    HALTED = "halted"
    AWAITING_RESET = "awaiting_reset"


@dataclass
class CircuitBreakerConfig:
    """熔断配置"""
    # 连续亏损触发观察期的阈值
    consecutive_loss_threshold: int = 3
    
    # 连续盈利恢复到正常运行需要的次数
    consecutive_win_threshold: int = 2
    
    # 观察期持续时间(秒)
    observing_duration_seconds: int = 300  # 5分钟
    
    # 单日最大回撤阈值(百分比,如 0.05 表示 5%)
    daily_drawdown_threshold: float = 0.05
    
    # 最大连续亏损次数(触发硬熔断)
    max_consecutive_losses: int = 10
    
    # 是否启用熔断(生产环境建议开启)
    enabled: bool = True


@dataclass
class CircuitBreakerStats:
    """熔断器统计信息"""
    consecutive_losses: int = 0
    consecutive_wins: int = 0
    daily_pnl: float = 0.0
    daily_peak: float = 0.0
    current_drawdown: float = 0.0
    last_trade_time: Optional[datetime] = None
    observing_start_time: Optional[datetime] = None
    halted_at: Optional[datetime] = None
    
    def to_dict(self) -> dict:
        return {
            "consecutive_losses": self.consecutive_losses,
            "consecutive_wins": self.consecutive_wins,
            "daily_pnl": round(self.daily_pnl, 2),
            "daily_peak": round(self.daily_peak, 2),
            "current_drawdown": round(self.current_drawdown * 100, 2),
            "current_state": "unknown",
        }

三、核心代码实现

3.1 熔断器主体

这是整个系统的核心类。代码稍长,但每一行都有其存在的原因。

class TradingCircuitBreaker:
    """
    交易熔断器
    
    使用有限状态机实现交易风控的核心逻辑:
    - 连续亏损达到阈值 → 进入观察期
    - 观察期内继续亏损 → 熔断
    - 熔断后必须人工介入才能恢复
    
    使用方式:
        cb = TradingCircuitBreaker(config)
        cb.on_trade_result(is_profit=...)
        if cb.can_trade():
            execute_order()
    """
    
    def __init__(self, config: CircuitBreakerConfig = None):
        self.config = config or CircuitBreakerConfig()
        self.state = TradingState.RUNNING
        self.stats = CircuitBreakerStats()
        
        # 线程安全锁
        self._lock = threading.RLock()
        
        # 回调函数
        self._on_state_change: list[Callable[[TradingState, TradingState], None]] = []
        self._on_trade_blocked: list[Callable[[str], None]] = []
        
        # 重置记录
        self._halt_reset_history: list[datetime] = []
    
    def can_trade(self) -> bool:
        """
        判断当前是否可以交易
        
        线程安全版本
        """
        with self._lock:
            if not self.config.enabled:
                return True
            
            if self.state == TradingState.RUNNING:
                return True
            
            if self.state == TradingState.OBSERVING:
                # 观察期内检查是否超时
                if self._is_observing_timeout():
                    self._transition_to(TradingState.RUNNING)
                    return True
                return False
            
            # HALTED 和 AWAITING_RESET 都不允许交易
            return False
    
    def on_trade_result(self, is_profit: bool, pnl: float = 0.0, 
                        trade_id: str = None) -> dict:
        """
        接收交易结果,触发状态转移
        
        Args:
            is_profit: 是否盈利
            pnl: 盈亏金额
            trade_id: 交易ID(用于日志)
            
        Returns:
            执行结果字典
        """
        with self._lock:
            trade_time = datetime.now()
            result = {
                "allowed": True,
                "blocked_reason": None,
                "previous_state": self.state,
                "current_state": self.state,
            }
            
            if not self.config.enabled:
                return result
            
            # 更新统计
            self._update_stats(is_profit, pnl, trade_time)
            
            # 根据当前状态处理
            if self.state == TradingState.RUNNING:
                self._handle_running_state(is_profit, result)
            elif self.state == TradingState.OBSERVING:
                self._handle_observing_state(is_profit, result)
            elif self.state in (TradingState.HALTED, TradingState.AWAITING_RESET):
                result["allowed"] = False
                result["blocked_reason"] = "circuit_breaker_halted"
                self._notify_trade_blocked(trade_id, result["blocked_reason"])
            
            result["current_state"] = self.state
            result["stats"] = self.stats.to_dict()
            return result
    
    def _handle_running_state(self, is_profit: bool, result: dict):
        """处理正常运行状态"""
        if is_profit:
            self.stats.consecutive_losses = 0
            self.stats.consecutive_wins += 1
            
            # 连续盈利,重置信誉
            if self.stats.consecutive_wins >= self.config.consecutive_win_threshold:
                self.stats.consecutive_wins = 0  # 重置
        else:
            self.stats.consecutive_wins = 0
            self.stats.consecutive_losses += 1
            
            # ⚠️ 连续亏损检查
            if self.stats.consecutive_losses >= self.config.consecutive_loss_threshold:
                self._transition_to(TradingState.OBSERVING)
                result["transition"] = "running_to_observing"
    
    def _handle_observing_state(self, is_profit: bool, result: dict):
        """处理观察状态"""
        # ⚠️ 观察期内,任何亏损都直接熔断
        if not is_profit:
            self._transition_to(TradingState.HALTED)
            result["transition"] = "observing_to_halted"
        else:
            # 观察期内盈利,缩短观察时间但不立即恢复
            self.stats.consecutive_wins += 1
            # 继续观察,不恢复
    
    def _transition_to(self, new_state: TradingState):
        """状态转移(内部调用,加锁)"""
        old_state = self.state
        self.state = new_state
        
        # 状态特定的初始化
        if new_state == TradingState.OBSERVING:
            self.stats.observing_start_time = datetime.now()
            self.stats.consecutive_wins = 0
        elif new_state == TradingState.HALTED:
            self.stats.halted_at = datetime.now()
        elif new_state == TradingState.RUNNING:
            self.stats.observing_start_time = None
            self.stats.consecutive_losses = 0
        
        # 触发回调
        self._notify_state_change(old_state, new_state)
    
    def _update_stats(self, is_profit: bool, pnl: float, trade_time: datetime):
        """更新统计信息"""
        self.stats.daily_pnl += pnl
        self.stats.last_trade_time = trade_time
        
        # 更新峰值
        if self.stats.daily_peak < self.stats.daily_pnl:
            self.stats.daily_peak = self.stats.daily_pnl
        
        # 计算当前回撤
        if self.stats.daily_peak > 0:
            self.stats.current_drawdown = (
                (self.stats.daily_peak - self.stats.daily_pnl) / self.stats.daily_peak
            )
    
    def _is_observing_timeout(self) -> bool:
        """检查观察期是否超时"""
        if not self.stats.observing_start_time:
            return True
        
        elapsed = (datetime.now() - self.stats.observing_start_time).total_seconds()
        return elapsed >= self.config.observing_duration_seconds
    
    def manual_halt(self, reason: str = "manual") -> dict:
        """
        手动触发熔断(紧急按钮)
        
        这个方法允许在极端情况下由人工直接触发熔断
        """
        with self._lock:
            old_state = self.state
            self._transition_to(TradingState.HALTED)
            self._notify_state_change(old_state, TradingState.HALTED)
            
            return {
                "success": True,
                "previous_state": old_state.value,
                "current_state": self.state.value,
                "reason": reason,
            }
    
    def manual_reset(self, confirm: bool = False) -> dict:
        """
        手动恢复交易
        
        Args:
            confirm: 必须确认为 True 才能恢复(防止误触)
            
        ⚠️ 这是整个系统的逃生舱,必须谨慎操作
        """
        with self._lock:
            if not confirm:
                return {
                    "success": False,
                    "error": "must set confirm=True to reset circuit breaker",
                }
            
            if self.state not in (TradingState.HALTED, TradingState.AWAITING_RESET):
                return {
                    "success": False,
                    "error": f"cannot reset from state {self.state.value}",
                }
            
            old_state = self.state
            self._transition_to(TradingState.RUNNING)
            self._halt_reset_history.append(datetime.now())
            
            # 重置统计(可选,根据策略需求)
            self.stats.consecutive_losses = 0
            self.stats.consecutive_wins = 0
            
            return {
                "success": True,
                "previous_state": old_state.value,
                "current_state": self.state.value,
                "reset_time": datetime.now().isoformat(),
            }
    
    def reset_daily_stats(self):
        """重置日度统计(每天开盘前调用)"""
        with self._lock:
            self.stats.daily_pnl = 0.0
            self.stats.daily_peak = 0.0
            self.stats.current_drawdown = 0.0
    
    def get_status(self) -> dict:
        """获取当前状态(供监控使用)"""
        with self._lock:
            status = {
                "state": self.state.value,
                "enabled": self.config.enabled,
                "stats": self.stats.to_dict(),
                "can_trade": self.can_trade(),
                "halt_history_count": len(self._halt_reset_history),
            }
            
            if self.state == TradingState.OBSERVING:
                elapsed = (datetime.now() - self.stats.observing_start_time).total_seconds()
                status["observing_remaining_seconds"] = max(
                    0, self.config.observing_duration_seconds - elapsed
                )
            
            return status
    
    def on_state_change(self, callback: Callable[[TradingState, TradingState], None]):
        """注册状态变更回调"""
        self._on_state_change.append(callback)
    
    def on_trade_blocked(self, callback: Callable[[str], None]):
        """注册交易被阻止回调(用于告警)"""
        self._on_trade_blocked.append(callback)
    
    def _notify_state_change(self, old: TradingState, new: TradingState):
        for cb in self._on_state_change:
            try:
                cb(old, new)
            except Exception as e:
                print(f"State change callback error: {e}")
    
    def _notify_trade_blocked(self, trade_id: str, reason: str):
        for cb in self._on_trade_blocked:
            try:
                cb(reason)
            except Exception as e:
                print(f"Trade blocked callback error: {e}")

3.2 集成到交易引擎

熔断器本身是一个独立组件,但它需要与交易引擎配合使用。

class TradingEngine:
    """
    交易引擎(熔断器集成示例)
    
    演示如何将熔断器嵌入到实际交易流程中
    """
    
    def __init__(self, config: CircuitBreakerConfig = None):
        self.cb = TradingCircuitBreaker(config)
        
        # 注册告警回调
        self.cb.on_state_change(self._on_state_change)
        self.cb.on_trade_blocked(self._on_trade_blocked)
    
    def _on_state_change(self, old: TradingState, new: TradingState):
        """状态变更时的告警"""
        message = f"[熔断器状态变更] {old.value} → {new.value}"
        print(message)
        
        # 这里是接入飞书/钉钉/Slack 的地方
        # send_feishu_alert(message)
    
    def _on_trade_blocked(self, reason: str):
        """交易被阻止时的告警"""
        message = f"[交易被阻止] 原因: {reason}"
        print(message)
        # send_alert(message)
    
    def should_enter_position(self, signal: dict) -> tuple[bool, str]:
        """
        判断是否应该建仓
        
        Returns:
            (should_enter, reason)
        """
        # ⚠️ 熔断检查是第一优先级
        if not self.cb.can_trade():
            status = self.cb.get_status()
            return False, f"熔断中,状态={status['state']}"
        
        # 其他风控检查(仓位限制、交易时间等)
        if self._check_position_limit(signal):
            return False, "仓位超限"
        
        return True, "允许交易"
    
    def on_trade_executed(self, trade: dict):
        """交易执行后回调"""
        is_profit = trade.get("pnl", 0) > 0
        pnl = trade.get("pnl", 0)
        trade_id = trade.get("id")
        
        result = self.cb.on_trade_result(is_profit, pnl, trade_id)
        
        if not result["allowed"]:
            print(f"⚠️ 交易 {trade_id} 被熔断器阻止: {result['blocked_reason']}")
    
    def _check_position_limit(self, signal: dict) -> bool:
        """仓位限制检查(示例)"""
        return False

四、真实场景演示

光看代码不够,让我们跑一个真实场景模拟:

def simulate_extreme_loss_sequence():
    """
    模拟连续亏损场景
    
    这是大多数量化灾难的典型剧本:
    1. 连续亏损 3 次 → 进入观察期
    2. 观察期内又亏 → 触发熔断
    3. 策略停止,避免更大损失
    """
    config = CircuitBreakerConfig(
        consecutive_loss_threshold=3,
        observing_duration_seconds=60,  # 测试用 60 秒
        max_consecutive_losses=10,
    )
    cb = TradingCircuitBreaker(config)
    
    # 注册状态变更日志
    def log_state_change(old, new):
        print(f"🔄 状态变更: {old.value} → {new.value}")
    cb.on_state_change(log_state_change)
    
    print("=" * 60)
    print("场景1: 连续亏损触发熔断")
    print("=" * 60)
    
    # 模拟 5 笔亏损交易
    for i in range(1, 6):
        can_trade = cb.can_trade()
        print(f"\n--- 第 {i} 笔交易 ---")
        print(f"  当前状态: {cb.state.value}")
        print(f"  是否可交易: {can_trade}")
        
        if can_trade:
            result = cb.on_trade_result(is_profit=False, pnl=-1000)
            print(f"  交易结果: 亏损 1000")
            if "transition" in result:
                print(f"  ⚡ 触发状态转移: {result['transition']}")
    
    print("\n" + "=" * 60)
    print("场景2: 熔断后尝试交易(应该被阻止)")
    print("=" * 60)
    
    # 尝试在熔断状态交易
    result = cb.on_trade_result(is_profit=True, pnl=500)
    print(f"  尝试交易结果: allowed={result['allowed']}, blocked_reason={result.get('blocked_reason')}")
    
    print("\n" + "=" * 60)
    print("场景3: 人工恢复")
    print("=" * 60)
    
    reset_result = cb.manual_reset(confirm=True)
    print(f"  人工恢复: {reset_result}")
    print(f"  恢复后状态: {cb.state.value}")
    print(f"  是否可交易: {cb.can_trade()}")
    
    print("\n" + "=" * 60)
    print("场景4: 观察期内盈利恢复")
    print("=" * 60)
    
    cb.reset_daily_stats()
    cb.manual_reset(confirm=True)
    
    # 前 3 笔亏损
    for i in range(3):
        cb.on_trade_result(is_profit=False, pnl=-500)
    
    print(f"  3 笔亏损后状态: {cb.state.value}")
    
    # 观察期内盈利,但不恢复(需要观察期结束)
    cb.on_trade_result(is_profit=True, pnl=300)
    print(f"  观察期盈利后状态: {cb.state.value} (仍在观察)")
    
    # 模拟等待观察期结束
    print("  等待观察期结束...")
    # 实际使用时需要等待 config.observing_duration_seconds


if __name__ == "__main__":
    simulate_extreme_loss_sequence()

运行结果:

============================================================
场景1: 连续亏损触发熔断
============================================================

--- 第 1 笔交易 ---
  当前状态: running
  是否可交易: True
  交易结果: 亏损 1000

--- 第 2 笔交易 ---
  当前状态: running
  是否可交易: True
  交易结果: 亏损 1000

--- 第 3 笔交易 ---
  当前状态: running
  是否可交易: True
  交易结果: 亏损 1000
🔄 状态变更: running → observing
  ⚡ 触发状态转移: running_to_observing

--- 第 4 笔交易 ---
  当前状态: observing
  是否可交易: False
  交易结果: 亏损 1000
🔄 状态变更: observing → halted
  ⚡ 触发状态转移: observing_to_halted

--- 第 5 笔交易 ---
  当前状态: halted
  是否可交易: False
  交易结果: 无(被阻止)

============================================================
场景2: 熔断后尝试交易(应该被阻止)
============================================================
  尝试交易结果: allowed=False, blocked_reason=circuit_breaker_halted

============================================================
场景3: 人工恢复
============================================================
  人工恢复: {'success': True, 'previous_state': 'halted', ...}
  恢复后状态: running
  是否可交易: True

============================================================
场景4: 观察期内盈利恢复
============================================================
  连续亏损后状态: observing
  观察期盈利后状态: observing (仍在观察)

五、生产环境注意事项

这段代码看起来完整,但要跑在生产环境,还需要在几个地方打补丁。

5.1 分布式部署的一致性问题

如果你的策略跑在多台机器上,本地熔断器无法感知其他节点的亏损。需要引入分布式锁共享状态存储

import redis

class DistributedCircuitBreaker(TradingCircuitBreaker):
    """
    分布式熔断器(使用 Redis 实现)
    
    适用于多节点部署的量化系统
    """
    
    def __init__(self, config: CircuitBreakerConfig, redis_client: redis.Redis):
        super().__init__(config)
        self.redis = redis_client
        self.instance_id = str(uuid.uuid4())  # 本节点唯一ID
    
    def can_trade(self) -> bool:
        with self._lock:
            # 从 Redis 获取全局状态
            global_state = self.redis.get("trading:circuit_breaker:state")
            if global_state:
                self.state = TradingState(global_state.decode())
            
            return super().can_trade()
    
    def _transition_to(self, new_state: TradingState):
        super()._transition_to(new_state)
        # 同步到 Redis
        self.redis.set(
            "trading:circuit_breaker:state",
            new_state.value,
            ex=3600  # 1 小时过期
        )

5.2 状态持久化与重启恢复

策略崩溃重启后,熔断状态需要从磁盘或数据库恢复:

import json
import os

class PersistentCircuitBreaker(TradingCircuitBreaker):
    """带持久化的熔断器"""
    
    def __init__(self, config: CircuitBreakerConfig, state_file: str):
        super().__init__(config)
        self.state_file = state_file
        self._load_state()
    
    def _load_state(self):
        if os.path.exists(self.state_file):
            with open(self.state_file, 'r') as f:
                data = json.load(f)
                self.state = TradingState(data['state'])
                # 恢复其他状态...
    
    def _save_state(self):
        with open(self.state_file, 'w') as f:
            json.dump({
                'state': self.state.value,
                'stats': self.stats.to_dict(),
                'timestamp': datetime.now().isoformat(),
            }, f)
    
    def _transition_to(self, new_state: TradingState):
        super()._transition_to(new_state)
        self._save_state()  # 每次状态变更都持久化

5.3 监控面板集成

熔断状态需要可视化展示,方便人工判断是否需要干预:

def get_monitoring_payload(cb: TradingCircuitBreaker) -> dict:
    """
    生成监控面板数据
    
    可以接入 Grafana、Prometheus 或自建 Dashboard
    """
    status = cb.get_status()
    
    return {
        "metric_name": "trading_circuit_breaker",
        "labels": {
            "state": status["state"],
            "can_trade": str(status["can_trade"]),
        },
        "values": {
            "consecutive_losses": status["stats"]["consecutive_losses"],
            "consecutive_wins": status["stats"]["consecutive_wins"],
            "daily_pnl": status["stats"]["daily_pnl"],
            "current_drawdown_pct": status["stats"]["current_drawdown"],
        },
        "alert": status["state"] in ("halted", "awaiting_reset"),
        "timestamp": datetime.now().isoformat(),
    }

六、配置建议

不同市场、不同策略需要不同的熔断参数。以下是经过实战验证的建议值:

参数 日内趋势策略 统计套利 高频做市 说明
consecutive_loss_threshold 3-5 5-8 10-15 触发观察期的连续亏损次数
consecutive_win_threshold 2-3 3-5 5-10 恢复运行需要的连续盈利次数
observing_duration_seconds 300-600 600-1800 60-300 观察期时长
daily_drawdown_threshold 3%-5% 2%-4% 1%-2% 单日最大回撤
max_consecutive_losses 8-15 15-25 30+ 硬熔断阈值

参数调优的原则

  1. 先宽后紧:新策略先用宽松参数跑一段时间,积累足够数据后再收紧
  2. 分市场配置:A股、港股、美股的市场特性不同,需要独立配置
  3. 保留逃生通道manual_haltmanual_reset 是你的最后防线,不要为了"省事"删掉

七、状态机可视化监控

最后,给你一个即插即用的状态监控函数,可以接入飞书告警或日志系统:

import time
from threading import Thread

class CircuitBreakerMonitor:
    """
    熔断器监控器
    
    定期检查熔断器状态并发送告警
    """
    
    def __init__(self, cb: TradingCircuitBreaker, 
                 check_interval: int = 30):
        self.cb = cb
        self.check_interval = check_interval
        self._running = False
        self._thread: Thread = None
    
    def start(self):
        """启动监控"""
        self._running = True
        self._thread = Thread(target=self._monitor_loop, daemon=True)
        self._thread.start()
    
    def stop(self):
        """停止监控"""
        self._running = False
        if self._thread:
            self._thread.join(timeout=5)
    
    def _monitor_loop(self):
        while self._running:
            try:
                self._check_and_alert()
            except Exception as e:
                print(f"监控检查异常: {e}")
            
            time.sleep(self.check_interval)
    
    def _check_and_alert(self):
        status = self.cb.get_status()
        
        # 高风险状态告警
        if status["state"] in ("halted", "awaiting_reset"):
            self._send_alert(
                title="🚨 熔断器触发告警",
                message=f"状态: {status['state']}\n"
                       f"连续亏损: {status['stats']['consecutive_losses']}\n"
                       f"单日回撤: {status['stats']['current_drawdown']}%\n"
                       f"请及时处理"
            )
        
        # 观察期提示
        elif status["state"] == "observing":
            remaining = status.get("observing_remaining_seconds", 0)
            if remaining > 0 and remaining < 60:
                self._send_alert(
                    title="⚠️ 熔断器观察期即将结束",
                    message=f"剩余 {int(remaining)} 秒\n"
                           f"建议关注行情变化"
                )
    
    def _send_alert(self, title: str, message: str):
        """发送告警(接入你的告警渠道)"""
        print(f"\n{'='*50}")
        print(f"{title}")
        print(message)
        print('='*50)
        # 实际接入时取消注释:
        # send_feishu_message(title, message)
        # send_dingtalk_alert(title, message)

八、总结

回到开篇的场景:如果你的策略有这样一个熔断器,凌晨三点被震醒的时候,看到的应该是:

「熔断器已触发,策略已自动暂停。当前状态:HALTED。连续亏损:12 笔。请登录确认。」

而不是满屏的亏损订单和无法撤销的警报。

熔断不是懦弱,是工程纪律。

一个好的熔断系统具备以下特征:

  • 状态机设计:四种状态明确转移,不会出现"不知道当前能不能交易"的情况
  • 不可自动恢复:熔断后必须人工介入,防止程序在极端行情中继续作死
  • 完整的监控:状态变化有回调、交易被阻止有告警
  • 逃生舱设计manual_haltmanual_reset 保留人工干预能力
  • 持久化与分布式:崩溃可恢复、多节点可同步

代码已经完整可运行,只需要根据你的策略类型调整配置参数即可。


下一步行动

如果你是个人开发者,想快速上手:

  1. 复制本文的 CircuitBreakerConfigTradingCircuitBreaker
  2. 根据策略类型调整配置参数(参考第六节的配置建议表)
  3. 在你的 on_trade_result 回调中接入熔断器

如果你正在管理多节点量化系统,需要分布式熔断:

  1. 使用 DistributedCircuitBreaker 版本
  2. 确保 Redis 或 etcd 的高可用
  3. 配置统一的告警渠道

如果你的策略需要实时监控熔断状态
访问 tickdb.ai 了解更多关于实时数据监控的方案。


本文不构成任何投资建议。市场有风险,投资需谨慎。