凌晨三点,你的策略还在亏钱

凌晨三点,你被手机震动惊醒。策略已连续亏损 7 笔,账户回撤 12%,而你睡眼惺忪地点开监控面板时,它已经亏掉了过去三个月的利润。

这不是故事。这是每一个量化交易者都可能在某个深夜经历的噩梦——策略在无人值守时进入异常状态,滑点扩大、流动性枯竭、或者简单的数据源故障,却没有任何自动保护机制,只能眼睁睁看着亏损扩大。

手动监控是一个伪命题。人会疲劳、会分心、会恰好在关键时刻关掉手机通知。真正可靠的解决方案是:让程序自己知道什么时候该停下来

本文拆解一套基于状态机的自动熔断系统,涵盖:连续亏损计数与回撤阈值双重触发、状态安全流转与人工干预接口、生产级 Python 实现,以及真实场景下的阈值配置建议。


一、为什么连续亏损是危险的

在讨论熔断机制之前,需要先理解连续亏损为什么不是简单的“运气不好”,而是系统性风险的信号。

连续亏损往往意味着市场状态发生了变化。你的策略可能基于这样的假设:高波动环境下动量会持续。但如果市场突然切换到均值回归模式,动量因子失效,策略会连续被止损。更危险的是,连续亏损期间的交易往往伴随着扩大的价差和恶化的滑点——你不仅在亏策略的钱,还在亏流动性萎缩的钱。

亏损模式 背后可能的原因 风险等级
随机散点亏损 正常波动,策略有效
连续 3-5 笔亏损 短期风格切换或数据噪声
连续 5+ 笔亏损 因子失效或市场机制变化
亏损 + 扩大回撤 系统性风险,正在累积 极高

另一个关键问题:账户权益的复利损耗是非对称的。 亏损 10% 需要盈利 11.1% 回本,亏损 30% 需要盈利 42.9%,而亏损 50% 需要翻倍才能回本。当回撤超过某个阈值(比如 15%),即使策略最终恢复有效,你也可能因为保证金不足被强制平仓。

因此,熔断的目标不是“预测策略会不会恢复”,而是在统计异常信号出现时强制暂停,保护账户生存到策略可能恢复的那一天


二、状态机设计:让程序知道自己该做什么

熔断系统的核心不是一堆 if-else,而是一个状态机。状态机将策略的运行状态定义为有限的、明确的离散状态,任何状态转换都有明确的触发条件和边界行为。

2.1 四状态模型

                    ┌─────────────────────────────────────────┐
                    │                                         │
                    ▼                                         │
 ┌──────────┐    ┌──────────┐    ┌──────────────┐    ┌─────────┐
 │  RUNNING │───▶│ COOLING  │───▶│  CIRCUIT_OPEN │───▶│ MANUAL  │
 └──────────┘    └──────────┘    └──────────────┘    └─────────┘
      ▲                                    │               │
      │                                    │               │
      └────────────────────────────────────┘               │
                    (自然恢复冷却)              (人工确认恢复)
状态 含义 允许交易 说明
RUNNING 策略正常运行 监控亏损计数和回撤
COOLING 检测到异常,进入冷却 等待 N 分钟,消除短期噪声
CIRCUIT_OPEN 冷却后仍异常,熔断打开 暂停到下一个恢复窗口
MANUAL 人工干预暂停 等待人工确认

2.2 状态转换规则

当前状态 触发条件 目标状态 动作
RUNNING 连续亏损 ≥ N 或回撤 ≥ M% COOLING 停止开仓,记录告警
COOLING 冷却时间到期 RUNNING(观察)或 CIRCUIT_OPEN 若期间无新亏损则恢复,否则熔断
COOLING 冷却时间到期 + 亏损未改善 CIRCUIT_OPEN 停止交易,进入长等待
CIRCUIT_OPEN 恢复时间窗口到达 RUNNING 或 MANUAL 尝试恢复,或强制人工介入
任意状态 人工介入 MANUAL 冻结所有操作

2.3 状态机代码骨架

from enum import Enum, auto
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Callable, Optional
import threading
import logging

logger = logging.getLogger(__name__)


class State(Enum):
    RUNNING = auto()
    COOLING = auto()
    CIRCUIT_OPEN = auto()
    MANUAL = auto()


@dataclass
class CircuitBreaker:
    """
    基于状态机的策略熔断器
    
    职责:
    - 监控连续亏损次数和账户回撤
    - 管理状态流转(RUNNING → COOLING → CIRCUIT_OPEN)
    - 提供人工干预接口(暂停/恢复/强制平仓)
    """
    
    # 熔断阈值配置
    consecutive_loss_threshold: int = 5
    max_drawdown_threshold: float = 0.10  # 10%
    cooling_minutes: int = 15
    circuit_open_hours: int = 4
    
    # 内部状态
    _state: State = field(default=State.RUNNING, repr=False)
    _consecutive_losses: int = field(default=0, repr=False)
    _peak_equity: float = field(default=0.0, repr=False)
    _cooling_end_time: Optional[datetime] = field(default=None, repr=False)
    _circuit_open_end_time: Optional[datetime] = field(default=None, repr=False)
    _lock: threading.Lock = field(default_factory=threading.Lock, repr=False)
    
    # 回调函数
    on_state_change: Optional[Callable[[State, State], None]] = None
    on_alert: Optional[Callable[[str], None]] = None
    
    @property
    def state(self) -> State:
        """线程安全的状态读取"""
        with self._lock:
            return self._state
    
    @property
    def can_trade(self) -> bool:
        """只有 RUNNING 状态允许开仓"""
        return self.state == State.RUNNING
    
    def update_equity(self, current_equity: float) -> None:
        """
        更新账户权益,自动计算回撤
        """
        with self._lock:
            # 更新峰值
            if current_equity > self._peak_equity:
                self._peak_equity = current_equity
            
            # 计算当前回撤
            if self._peak_equity > 0:
                current_drawdown = (self._peak_equity - current_equity) / self._peak_equity
                
                # ⚠️ 回撤超过阈值,触发熔断
                if current_drawdown >= self.max_drawdown_threshold:
                    self._trigger_cooling(
                        f"回撤触发: {current_drawdown:.2%} > {self.max_drawdown_threshold:.2%}"
                    )
    
    def record_trade_result(self, profit_loss: float) -> None:
        """
        记录每笔交易结果,亏损增加计数,盈利重置
        """
        with self._lock:
            if profit_loss < 0:
                self._consecutive_losses += 1
                logger.warning(f"亏损记录: 第 {self._consecutive_losses} 笔连续亏损")
                
                # ⚠️ 连续亏损达到阈值,触发冷却
                if (self._consecutive_losses >= self.consecutive_loss_threshold 
                        and self._state == State.RUNNING):
                    self._trigger_cooling(
                        f"连续亏损触发: {self._consecutive_losses} >= {self.consecutive_loss_threshold}"
                    )
            else:
                # ⚠️ 盈利重置计数(在 RUNNING 状态下)
                if self._consecutive_losses > 0:
                    logger.info(f"盈利 {profit_loss:.2f},重置连续亏损计数")
                self._consecutive_losses = 0

2.4 冷却与恢复逻辑

    def _trigger_cooling(self, reason: str) -> None:
        """进入冷却状态"""
        if self._state != State.RUNNING:
            return
        
        old_state = self._state
        self._state = State.COOLING
        self._cooling_end_time = datetime.now() + timedelta(minutes=self.cooling_minutes)
        
        self._emit_alert(f"🚨 进入冷却期: {reason},冷却 {self.cooling_minutes} 分钟")
        self._notify_state_change(old_state, self._state)
    
    def _try_recover_from_cooling(self) -> None:
        """冷却时间结束后,评估是否可以恢复"""
        if self._state != State.COOLING:
            return
        
        if datetime.now() < self._cooling_end_time:
            return
        
        old_state = self._state
        
        # 检查冷却期间是否有新亏损
        if self._consecutive_losses >= self.consecutive_loss_threshold:
            # ⚠️ 冷却后仍未改善,熔断打开
            self._state = State.CIRCUIT_OPEN
            self._circuit_open_end_time = datetime.now() + timedelta(hours=self.circuit_open_hours)
            self._emit_alert(f"⚠️ 冷却后未恢复,熔断打开,暂停 {self.circuit_open_hours} 小时")
        else:
            # 冷却期间无新亏损,尝试恢复
            self._state = State.RUNNING
            self._emit_alert("✅ 冷却期结束,策略恢复正常运行")
        
        self._notify_state_change(old_state, self._state)
    
    def tick(self) -> None:
        """
        ⚠️ 必须在主循环中定期调用(建议每分钟)
        处理时间驱动的状态转换
        """
        with self._lock:
            if self._state == State.COOLING:
                self._try_recover_from_cooling()
            elif self._state == State.CIRCUIT_OPEN:
                self._check_circuit_open_recovery()
    
    def _check_circuit_open_recovery(self) -> None:
        """熔断期结束后尝试恢复"""
        if self._state != State.CIRCUIT_OPEN:
            return
        
        if datetime.now() >= self._circuit_open_end_time:
            old_state = self._state
            self._state = State.MANUAL  # ⚠️ 强制人工介入
            self._emit_alert("🔴 熔断期结束,请人工确认是否恢复交易")
            self._notify_state_change(old_state, self._state)

三、人工干预接口:人永远是最终防线

状态机负责自动决策,但人工干预接口是不可省略的安全兜底。当系统出现无法自动处理的异常时,必须有人能强制介入。

3.1 核心干预操作

    def pause(self, reason: str = "人工暂停") -> bool:
        """
        人工暂停策略
        
        使用场景:
        - 发现数据源异常
        - 收到市场突发新闻
        - 策略参数需要临时调整
        """
        with self._lock:
            old_state = self._state
            if old_state in (State.COOLING, State.CIRCUIT_OPEN):
                self._state = State.MANUAL
                self._emit_alert(f"⏸️ 策略已人工暂停: {reason}")
                self._notify_state_change(old_state, self._state)
                return True
            elif old_state == State.RUNNING:
                self._state = State.MANUAL
                self._emit_alert(f"⏸️ 策略已人工暂停: {reason}")
                self._notify_state_change(old_state, self._state)
                return True
            return False
    
    def resume(self, acknowledged_by: str) -> bool:
        """
        人工恢复策略(仅限 MANUAL 状态)
        
        参数:
        - acknowledged_by: 确认恢复的人员标识(审计用)
        """
        with self._lock:
            if self._state != State.MANUAL:
                logger.warning(f"非 MANUAL 状态无法恢复,当前状态: {self._state}")
                return False
            
            old_state = self._state
            self._state = State.RUNNING
            self._consecutive_losses = 0  # 重置计数
            self._peak_equity = 0.0      # 重置基准
            
            logger.info(f"策略已由 {acknowledged_by} 人工恢复")
            self._emit_alert(f"▶️ 策略已人工恢复,恢复人: {acknowledged_by}")
            self._notify_state_change(old_state, self._state)
            return True
    
    def force_close_all(self) -> dict:
        """
        强制平仓所有持仓
        
        返回:
        - 平仓操作的结果摘要
        
        ⚠️ 这是最激进的操作,仅在极端情况下使用
        """
        self._emit_alert("🚨 强制平仓所有持仓告警已触发")
        return {
            "status": "alert_sent",
            "action_required": "请在交易终端手动执行市价平仓",
            "reason": "自动平仓功能需要对接券商 API,建议人工执行"
        }
    
    def _emit_alert(self, message: str) -> None:
        """发送告警通知"""
        logger.warning(message)
        if self.on_alert:
            self.on_alert(message)
    
    def _notify_state_change(self, old: State, new: State) -> None:
        """触发状态变更回调"""
        if self.on_state_change:
            self.on_state_change(old, new)

3.2 干预日志与审计

所有人工干预操作必须记录,以便事后审计和改进:

import json
from pathlib import Path
from datetime import datetime

class AuditLogger:
    """熔断操作审计日志"""
    
    def __init__(self, log_dir: str = "./logs"):
        self.log_dir = Path(log_dir)
        self.log_dir.mkdir(parents=True, exist_ok=True)
        self.audit_file = self.log_dir / f"circuit_breaker_audit_{datetime.now().strftime('%Y%m')}.jsonl"
    
    def log(self, event_type: str, details: dict) -> None:
        """追加审计记录"""
        record = {
            "timestamp": datetime.now().isoformat(),
            "event_type": event_type,
            **details
        }
        with open(self.audit_file, "a") as f:
            f.write(json.dumps(record) + "\n")
    
    def get_monthly_report(self) -> list:
        """生成月度审计报告"""
        records = []
        with open(self.audit_file) as f:
            for line in f:
                records.append(json.loads(line))
        return records

四、与交易执行层的集成

熔断器本身不执行交易,它是一个决策过滤器。真正的交易执行层需要检查熔断状态。

class StrategyExecutor:
    """策略执行器(集成熔断器)"""
    
    def __init__(self, breaker: CircuitBreaker, broker_adapter):
        self.breaker = breaker
        self.broker = broker_adapter
    
    def should_open_position(self, signal) -> tuple[bool, str]:
        """
        判断是否应该开仓
        
        返回:
        - (can_trade, reason)
        """
        if not self.breaker.can_trade:
            return False, f"熔断器状态: {self.breaker.state.name}"
        
        # ... 其他风控检查 ...
        return True, "允许交易"
    
    def execute_signal(self, signal) -> dict:
        """执行交易信号(带熔断检查)"""
        can_trade, reason = self.should_open_position(signal)
        
        if not can_trade:
            return {
                "status": "rejected",
                "reason": reason,
                "timestamp": datetime.now().isoformat()
            }
        
        # 执行交易 ...
        result = self.broker.place_order(signal)
        
        # ⚠️ 关键:将交易结果反馈给熔断器
        self.breaker.record_trade_result(result["profit_loss"])
        
        return result

五、阈值配置建议

熔断器的效果高度依赖阈值的合理设置。以下是基于实盘经验的参考值:

参数 保守配置 激进配置 说明
consecutive_loss_threshold 3-5 5-8 建议 5,新策略从严
max_drawdown_threshold 8%-10% 12%-15% 超过 15% 需特别审批
cooling_minutes 15-30 30-60 覆盖一个交易时段
circuit_open_hours 4-8 8-24 建议至少覆盖隔夜

配置原则

  • 新策略上线时使用保守配置,运行 1 个月数据后根据实际情况调整
  • 高频策略可以缩短冷却时间(但缩短 consecutive_loss_threshold
  • 趋势策略因为天然有连续小额亏损,用更大的亏损容忍度换趋势跟踪能力

六、进阶:与 TickDB 的集成

熔断器的状态变化和告警可以通过多种渠道通知。其中一个实用的方案是结合 TickDB 的 WebSocket 实时推送,将熔断状态变化推送到监控面板:

import os
import time
import json
import websocket
import random
import logging

logger = logging.getLogger(__name__)


class TickDBAlertPusher:
    """
    使用 TickDB WebSocket 推送熔断告警
    
    ⚠️ 本代码用于演示告警推送架构
    生产环境请替换为实际的消息队列(飞书/钉钉/Slack)
    """
    
    def __init__(self, api_key: str = None):
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        self.ws_url = "wss://api.tickdb.ai/ws/v1/market"
        self._ws = None
        self._reconnect_delay = 1
        self._max_reconnect_delay = 30
    
    def _connect(self) -> bool:
        """建立 WebSocket 连接"""
        try:
            # ⚠️ 熔断告警推送使用 TickDB 的自定义消息通道
            self._ws = websocket.WebSocketApp(
                self.ws_url,
                on_message=self._on_message,
                on_error=self._on_error,
                on_close=self._on_close,
                on_open=self._on_open
            )
            return True
        except Exception as e:
            logger.error(f"WebSocket 连接失败: {e}")
            return False
    
    def _on_open(self, ws):
        """连接建立后发送认证"""
        auth_msg = {
            "cmd": "auth",
            "api_key": self.api_key
        }
        ws.send(json.dumps(auth_msg))
        logger.info("TickDB WebSocket 已连接")
    
    def _on_message(self, ws, message):
        """处理接收消息"""
        data = json.loads(message)
        if data.get("cmd") == "pong":
            logger.debug("心跳响应正常")
    
    def _on_error(self, ws, error):
        logger.warning(f"WebSocket 错误: {error}")
    
    def _on_close(self, ws, close_status_code, close_msg):
        logger.warning(f"WebSocket 关闭: {close_status_code} - {close_msg}")
        self._schedule_reconnect()
    
    def _schedule_reconnect(self):
        """指数退避重连(带抖动)"""
        delay = min(self._reconnect_delay * 2, self._max_reconnect_delay)
        jitter = random.uniform(0, delay * 0.1)
        self._reconnect_delay = delay + jitter
        
        logger.info(f"计划 {self._reconnect_delay:.1f} 秒后重连")
        time.sleep(self._reconnect_delay)
        
        self._connect()
    
    def push_alert(self, alert_type: str, message: str, state_info: dict) -> None:
        """
        推送熔断告警
        
        ⚠️ 演示代码:实际生产中建议使用专业的告警服务
        """
        alert_payload = {
            "cmd": "alert",
            "type": alert_type,
            "message": message,
            "state": state_info,
            "timestamp": time.time()
        }
        
        try:
            if self._ws and self._ws.sock and self._ws.sock.connected:
                self._ws.send(json.dumps(alert_payload))
                logger.info(f"告警已推送: {alert_type} - {message}")
            else:
                logger.warning("WebSocket 未连接,告警暂存")
        except Exception as e:
            logger.error(f"告警推送失败: {e}")
    
    def start_heartbeat(self):
        """启动心跳保活"""
        def heartbeat_loop():
            while True:
                time.sleep(30)
                try:
                    if self._ws and self._ws.sock and self._ws.sock.connected:
                        self._ws.send(json.dumps({"cmd": "ping"}))
                except Exception:
                    pass
        
        import threading
        threading.Thread(target=heartbeat_loop, daemon=True).start()

七、总结

熔断不是放弃,而是有序地撤退。一个设计良好的熔断系统应该做到:

  1. 自动识别异常:通过连续亏损计数和回撤阈值双重信号,避免单一指标的误判
  2. 有序状态流转:冷却 → 熔断 → 人工介入,每一步都有清晰的等待期和恢复条件
  3. 保留人工控制权:机器负责常规情况,人负责极端情况
  4. 完整的审计追溯:每一次状态变化都有记录可查

市场永远在变化,策略的有效性也永远在波动。熔断器的本质不是“阻止亏损”,而是确保你在策略失效的时候还活着,活到它再次有效的那一天。


下一步行动

如果你是个人量化开发者

  1. 访问 tickdb.ai 注册,获取免费 API Key
  2. 将本文的状态机代码集成到你的策略框架中
  3. 从保守阈值开始运行,观察 2-4 周后根据实盘数据调整

如果你在寻找更多风控相关的实战案例

  • 《仓位管理背后的数学:凯利公式的实战局限与改良》
  • 《事件驱动策略的尾部风险:用期权结构对冲黑天鹅》

如果你需要机构级的风控方案,联系 [email protected] 获取专属咨询。


风险提示:本文介绍的风控框架旨在提供系统性思路,具体阈值和参数需根据策略特性、市场环境和个人风险承受能力独立评估。历史表现不代表未来收益,程序化交易存在滑点、流动性风险和技术故障可能导致实际执行与回测结果产生重大偏差。请在实盘使用前充分测试,并在必要时寻求专业风控顾问的意见。