策略连续亏损自动熔断:用状态机保护你的账户

凌晨三点,Slack 发出刺耳的告警。

你从床上弹起来,手机屏幕上显示着策略组的监控面板——实盘账户的回撤曲线已经击穿了预设的警戒线。你揉了揉眼睛,确认自己没有看错:过去 72 小时,策略连续亏损 23 笔,累计回撤 8.7%,而你的熔断阈值设定在 5%。

问题在于,你的熔断逻辑是人工判断的——阈值在那里,但没有人半夜三点盯着屏幕。策略还在跑,还在开仓,还在亏损。

这不是虚构的场景。这是 2023 年 9 月某中型量化基金的真实经历,那次事故最终导致账户单周回撤超过 12%,被迫清盘重组。

事后复盘报告的结论很简单:不是阈值设错了,是没有人守着阈值。

对于量化交易者而言,熔断机制的设计从来不是技术难题,难的是如何让它真正“活着”——在凌晨三点、在你度假时、在你上厕所的那十五分钟里,忠实执行你的风控意志。

本文给出的是一套生产级的自动熔断系统,基于状态机设计,支持连续亏损次数和单日回撤双触发,并内置人工干预接口,确保你在任何时候都能重新掌控局面。


一、为什么你的熔断机制总是不起作用

在讨论技术实现之前,我们需要先正视一个事实:大多数“熔断机制”只是披着风控外衣的心理安慰。

常见的无效熔断有三种形态:

第一种,阈值存在,但从未触发。 你在策略代码里写了一行 if drawdown > 0.05: stop_trading(),但这段代码在回测引擎里跑得完美,实盘却因为网络延迟或订单队列堵塞永远无法执行。等你发现问题,账户已经穿仓。

第二种,阈值触发,但执行失败。 网络断开、API 限频、交易所接口报错——熔断条件满足了,但平仓指令根本发不出去。这种情况在行情剧烈波动时尤为常见,因为那时候也是系统压力最大的时候。

第三种,熔断后无法恢复。 策略触发了熔断,但你不在电脑前,不知道发生了什么。等你回来时,发现策略已经停了三天,错过了整个反弹行情,而你甚至不知道该手动恢复还是继续等待。

无效熔断的本质是:把风控逻辑当成孤立规则,而不是当成一个持续运行的状态机。

有效的熔断必须解决三个问题:

  1. 状态持久化——熔断触发后,系统崩溃重启也能记住自己处于熔断状态
  2. 执行可靠性——告警发出、平仓操作有重试机制,不因单次失败而彻底失效
  3. 人工干预通道——在极端情况下,运营者可以绕过自动逻辑手动干预

下一节,我们用状态机模型重新设计熔断机制。


二、熔断状态机:五状态模型

2.1 状态定义

熔断不是简单的“开/关”,而是一个有生命周期的事件序列。我们将整个熔断系统建模为五个互斥状态:

                    ┌─────────────────────────────────────────┐
                    │                                         │
                    ▼                                         │
 ┌──────┐    触发条件满足    ┌──────────┐   冷却期结束   ┌──────────┐
 │正常  │ ───────────────▶  │ 熔断中   │ ───────────▶   │ 冷却期   │
 └──────┘                   └──────────┘                └──────────┘
    ▲                           │                            │
    │                           │人工恢复                    │恢复条件满足
    │                           ▼                            ▼
    │                      ┌──────────┐                ┌──────────┐
    └───────────────────── │ 人工暂停 │                │ 异常锁定 │
         手动操作           └──────────┘                └──────────┘
状态 含义 策略行为 退出条件
正常 (NORMAL) 熔断条件未触发,策略正常运行 正常开仓和平仓 满足熔断触发条件
熔断中 (CIRCUIT_BROKEN) 自动熔断触发,暂停新仓位 只平不平开,等待回撤收敛 冷却期结束或人工恢复
冷却期 (COOLING) 熔断期结束,进入观察窗口 限制仓位规模,等待稳定信号 观察期通过或再次触发
人工暂停 (MANUAL_PAUSE) 运营者手动暂停 完全停止,等待人工指令 运营者手动恢复
异常锁定 (EXCEPTION_LOCK) 系统异常,强制锁定 所有操作暂停,需要诊断 异常排查后人工解除

2.2 状态转换规则

状态机核心逻辑:

from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional
import threading


class CircuitState(Enum):
    NORMAL = "normal"
    CIRCUIT_BROKEN = "circuit_broken"
    COOLING = "cooling"
    MANUAL_PAUSE = "manual_pause"
    EXCEPTION_LOCK = "exception_lock"


@dataclass
class CircuitBreakerConfig:
    """熔断器配置"""
    consecutive_losses_threshold: int = 5      # 连续亏损次数阈值
    daily_drawdown_threshold: float = 0.05     # 单日回撤阈值(5%)
    cooling_duration_seconds: int = 1800      # 熔断后冷却期(30分钟)
    observation_duration_seconds: int = 3600    # 观察期(1小时)
    max_position_size_multiplier: float = 0.5  # 冷却期仓位限制(50%)
    auto_resume_enabled: bool = True           # 是否允许自动恢复


@dataclass
class CircuitBreakerState:
    """熔断器运行时状态"""
    current_state: CircuitState = CircuitState.NORMAL
    
    # 亏损追踪
    consecutive_losses: int = 0
    daily_pnl: float = 0.0
    daily_starting_capital: float = 0.0
    
    # 时间追踪
    state_entered_at: Optional[datetime] = None
    last_loss_time: Optional[datetime] = None
    
    # 人工干预
    manual_pause_reason: Optional[str] = None
    exception_message: Optional[str] = None
    
    # 熔断历史(用于分析)
    trip_count: int = 0
    total_downtime_seconds: float = 0.0


class CircuitBreaker:
    """熔断状态机实现"""
    
    def __init__(self, config: CircuitBreakerConfig):
        self.config = config
        self.state = CircuitBreakerState()
        self.state.daily_starting_capital = self._get_current_capital()
        self._lock = threading.RLock()
    
    def _get_current_capital(self) -> float:
        """获取当前账户权益(接入你的账户余额 API)"""
        # 实际使用时替换为真实的账户查询接口
        # return account.get_equity()
        return 1_000_000.0  # 默认值,生产环境必须替换
    
    def _calculate_drawdown(self) -> float:
        """计算当前回撤率"""
        if self.state.daily_starting_capital == 0:
            return 0.0
        return abs(self.state.daily_pnl) / self.state.daily_starting_capital
    
    def _check_trip_conditions(self) -> bool:
        """检查是否满足熔断触发条件"""
        # 条件1:连续亏损次数超限
        consecutive_triggered = (
            self.state.consecutive_losses >= self.config.consecutive_losses_threshold
        )
        
        # 条件2:单日回撤超限
        drawdown_triggered = (
            self._calculate_drawdown() >= self.config.daily_drawdown_threshold
        )
        
        return consecutive_triggered or drawdown_triggered
    
    def _transition_to(self, new_state: CircuitState, reason: str = None):
        """状态转换"""
        old_state = self.state.current_state
        
        if old_state == new_state:
            return
        
        self.state.current_state = new_state
        self.state.state_entered_at = datetime.now()
        
        # 记录历史
        if new_state == CircuitState.CIRCUIT_BROKEN:
            self.state.trip_count += 1
        
        # 发送状态变更告警
        self._send_state_change_alert(old_state, new_state, reason)
        
        print(f"[熔断器] 状态变更: {old_state.value} → {new_state.value} | 原因: {reason}")
    
    def _send_state_change_alert(self, from_state: CircuitState, 
                                  to_state: CircuitState, reason: str = None):
        """发送状态变更通知"""
        # ⚠️ 生产环境:接入飞书/Slack/PagerDuty 等告警系统
        alert_message = (
            f"🚨 熔断器状态变更\n"
            f"状态: {from_state.value} → {to_state.value}\n"
            f"原因: {reason or '自动触发'}\n"
            f"时间: {datetime.now().isoformat()}\n"
            f"连续亏损: {self.state.consecutive_losses} 笔\n"
            f"当日回撤: {self._calculate_drawdown():.2%}\n"
            f"账户权益: {self._get_current_capital():,.2f}"
        )
        # 实际使用时替换为真实的告警发送接口
        print(f"[告警] {alert_message}")

2.3 交易结果上报与状态更新

每次交易平仓后,必须将结果上报给熔断器,由熔断器决定是否需要触发熔断:

    def report_trade_result(self, pnl: float, metadata: dict = None):
        """
        上报交易结果,驱动状态机更新
        
        Args:
            pnl: 交易盈亏,正数为盈利,负数为亏损
            metadata: 附加信息(交易品种、时间戳等)
        """
        with self._lock:
            timestamp = datetime.now()
            
            # 重置日度追踪(交易日切换)
            self._check_daily_reset(timestamp)
            
            # 更新亏损计数
            if pnl < 0:
                self.state.consecutive_losses += 1
                self.state.last_loss_time = timestamp
            else:
                self.state.consecutive_losses = 0  # 盈利后重置连续亏损计数
            
            # 更新当日盈亏
            self.state.daily_pnl += pnl
            
            # 状态检查与转换
            self._evaluate_state_transitions(metadata)
    
    def _check_daily_reset(self, timestamp: datetime):
        """检查是否需要重置日度追踪数据"""
        # 简单实现:每次新交易日前重置
        # 实际使用时应按交易所交易日历精确判断
        if timestamp.hour >= 16 and self.state.daily_pnl != 0:
            # 收盘后重置(简化版,实际应按具体时区)
            self.state.daily_starting_capital = self._get_current_capital()
            self.state.daily_pnl = 0
            self.state.consecutive_losses = 0
    
    def _evaluate_state_transitions(self, metadata: dict = None):
        """评估并执行状态转换"""
        
        if self.state.current_state == CircuitState.NORMAL:
            # 检查是否需要触发熔断
            if self._check_trip_conditions():
                reason = (
                    f"连续亏损 {self.state.consecutive_losses} 次 "
                    f"(阈值: {self.config.consecutive_losses_threshold})"
                    if self.state.consecutive_losses >= self.config.consecutive_losses_threshold
                    else f"当日回撤 {self._calculate_drawdown():.2%} "
                         f"(阈值: {self.config.daily_drawdown_threshold:.2%})"
                )
                self._transition_to(CircuitState.CIRCUIT_BROKEN, reason)
        
        elif self.state.current_state == CircuitState.CIRCUIT_BROKEN:
            # 检查冷却期是否结束
            if self._cooling_period_elapsed():
                if self.config.auto_resume_enabled:
                    self._transition_to(CircuitState.COOLING, "冷却期结束,进入观察")
                else:
                    # 不自动恢复,保持熔断状态等待人工介入
                    pass
        
        elif self.state.current_state == CircuitState.COOLING:
            # 检查观察期结果
            if self._observation_passed():
                self._transition_to(CircuitState.NORMAL, "观察期通过,恢复交易")
            elif self._check_trip_conditions():
                self._transition_to(CircuitState.CIRCUIT_BROKEN, "观察期再次触发熔断")
    
    def _cooling_period_elapsed(self) -> bool:
        """检查冷却期是否结束"""
        if self.state.state_entered_at is None:
            return False
        elapsed = (datetime.now() - self.state.state_entered_at).total_seconds()
        return elapsed >= self.config.cooling_duration_seconds
    
    def _observation_passed(self) -> bool:
        """检查观察期是否通过(观察期内无触发条件)"""
        if self.state.state_entered_at is None:
            return False
        elapsed = (datetime.now() - self.state.state_entered_at).total_seconds()
        
        # 观察期内不能再次触发熔断条件
        return elapsed >= self.config.observation_duration_seconds

三、仓位限制与开仓拦截

状态机定义了熔断的“是否触发”,但在实际执行层面,熔断还需要落地到开仓拦截上。即使处于熔断状态,未成交订单的平仓操作仍然需要执行——否则会加剧流动性风险。

以下是完整的开仓决策逻辑:

    def can_open_position(self, proposed_size: float = 1.0) -> tuple[bool, str]:
        """
        检查是否可以开仓
        
        Returns:
            (can_open, reason): 是否可以开仓,以及拒绝原因(如有)
        """
        with self._lock:
            state = self.state.current_state
            
            if state == CircuitState.MANUAL_PAUSE:
                return False, f"人工暂停: {self.state.manual_pause_reason}"
            
            if state == CircuitState.EXCEPTION_LOCK:
                return False, f"系统异常: {self.state.exception_message}"
            
            if state == CircuitState.CIRCUIT_BROKEN:
                return False, "熔断中,禁止开新仓位"
            
            if state == CircuitState.COOLING:
                # 冷却期限制仓位规模
                max_size = proposed_size * self.config.max_position_size_multiplier
                
                if proposed_size > max_size:
                    return False, (
                        f"冷却期仓位限制: 请求 {proposed_size},最大 {max_size:.2f} "
                        f"({self.config.max_position_size_multiplier:.0%})"
                    )
                
                return True, "冷却期,仓位减半"
            
            # NORMAL 状态:正常开仓
            return True, "正常"
    
    def get_position_limit_multiplier(self) -> float:
        """获取当前仓位限制系数,供策略层调用"""
        with self._lock:
            if self.state.current_state == CircuitState.COOLING:
                return self.config.max_position_size_multiplier
            elif self.state.current_state in (
                CircuitState.CIRCUIT_BROKEN, 
                CircuitState.MANUAL_PAUSE,
                CircuitState.EXCEPTION_LOCK
            ):
                return 0.0
            else:
                return 1.0

策略层调用示例:

def strategy_open_position(self, signal_strength: float, base_size: float):
    """策略开仓入口,带熔断保护"""
    
    # 熔断检查
    can_open, reason = self.circuit_breaker.can_open_position(base_size)
    
    if not can_open:
        logger.warning(f"开仓被拦截: {reason}")
        return None
    
    # 冷却期应用仓位系数
    actual_size = base_size * self.circuit_breaker.get_position_limit_multiplier()
    
    # 执行开仓
    order = self._execute_long_entry(actual_size)
    
    return order

四、人工干预接口

状态机解决了“自动熔断”的问题,但无法解决“系统本身出问题”的情况。你需要一个人工干预通道,确保在任何极端情况下都能夺回控制权。

4.1 Web 管理接口

from flask import Flask, request, jsonify
import os

app = Flask(__name__)
circuit_breaker = None  # 全局注入


@app.route("/api/circuit-breaker/status", methods=["GET"])
def get_status():
    """查询当前熔断状态"""
    return jsonify({
        "state": circuit_breaker.state.current_state.value,
        "consecutive_losses": circuit_breaker.state.consecutive_losses,
        "daily_drawdown": circuit_breaker._calculate_drawdown(),
        "state_entered_at": circuit_breaker.state.state_entered_at.isoformat() 
                           if circuit_breaker.state.state_entered_at else None,
        "trip_count_today": circuit_breaker.state.trip_count,
        "config": {
            "consecutive_losses_threshold": circuit_breaker.config.consecutive_losses_threshold,
            "daily_drawdown_threshold": circuit_breaker.config.daily_drawdown_threshold,
            "cooling_duration_seconds": circuit_breaker.config.cooling_duration_seconds,
        }
    })


@app.route("/api/circuit-breaker/pause", methods=["POST"])
def manual_pause():
    """人工暂停策略"""
    data = request.get_json() or {}
    reason = data.get("reason", "手动暂停")
    operator = data.get("operator", "unknown")
    
    # ⚠️ 生产环境:应接入权限验证
    api_key = request.headers.get("X-API-Key")
    if not self._verify_operator(api_key):
        return jsonify({"error": "Unauthorized"}), 401
    
    circuit_breaker.state.manual_pause_reason = f"{reason} (操作员: {operator})"
    circuit_breaker._transition_to(CircuitState.MANUAL_PAUSE, reason)
    
    return jsonify({"status": "paused", "reason": reason})


@app.route("/api/circuit-breaker/resume", methods=["POST"])
def manual_resume():
    """人工恢复交易"""
    data = request.get_json() or {}
    operator = data.get("operator", "unknown")
    reset_counters = data.get("reset_counters", False)
    
    api_key = request.headers.get("X-API-Key")
    if not self._verify_operator(api_key):
        return jsonify({"error": "Unauthorized"}), 401
    
    # 重置计数器(可选)
    if reset_counters:
        circuit_breaker.state.consecutive_losses = 0
        circuit_breaker.state.daily_pnl = 0
        circuit_breaker.state.daily_starting_capital = circuit_breaker._get_current_capital()
    
    circuit_breaker._transition_to(CircuitState.NORMAL, 
                                   f"人工恢复 (操作员: {operator})")
    
    return jsonify({
        "status": "resumed", 
        "reset_counters": reset_counters
    })


@app.route("/api/circuit-breaker/config", methods=["PUT"])
def update_config():
    """更新熔断配置(需要谨慎操作)"""
    data = request.get_json()
    
    api_key = request.headers.get("X-API-Key")
    if not self._verify_operator(api_key):
        return jsonify({"error": "Unauthorized"}), 401
    
    # ⚠️ 生产环境:应记录配置变更日志
    old_config = circuit_breaker.config
    
    if "consecutive_losses_threshold" in data:
        circuit_breaker.config.consecutive_losses_threshold = data["consecutive_losses_threshold"]
    if "daily_drawdown_threshold" in data:
        circuit_breaker.config.daily_drawdown_threshold = data["daily_drawdown_threshold"]
    if "cooling_duration_seconds" in data:
        circuit_breaker.config.cooling_duration_seconds = data["cooling_duration_seconds"]
    if "auto_resume_enabled" in data:
        circuit_breaker.config.auto_resume_enabled = data["auto_resume_enabled"]
    
    return jsonify({
        "status": "updated",
        "old_config": {
            "consecutive_losses_threshold": old_config.consecutive_losses_threshold,
            "daily_drawdown_threshold": old_config.daily_drawdown_threshold,
        },
        "new_config": {
            "consecutive_losses_threshold": circuit_breaker.config.consecutive_losses_threshold,
            "daily_drawdown_threshold": circuit_breaker.config.daily_drawdown_threshold,
        }
    })


def _verify_operator(self, api_key: str) -> bool:
    """验证操作者权限"""
    # ⚠️ 生产环境:接入真实的权限验证系统
    allowed_keys = os.environ.get("CIRCUIT_BREAKER_ADMIN_KEYS", "").split(",")
    return api_key in allowed_keys

4.2 飞书告警集成

import requests
import os


class FeishuNotifier:
    """飞书告警通知"""
    
    def __init__(self, webhook_url: str = None):
        self.webhook_url = webhook_url or os.environ.get("FEISHU_WEBHOOK_URL")
    
    def send_alert(self, title: str, content: str, alert_level: str = "warning"):
        """发送飞书告警"""
        if not self.webhook_url:
            print(f"[警告] 未配置飞书 Webhook,跳过告警: {title}")
            return
        
        emoji = {
            "info": "ℹ️",
            "warning": "⚠️",
            "critical": "🚨",
            "resolved": "✅"
        }.get(alert_level, "ℹ️")
        
        payload = {
            "msg_type": "interactive",
            "card": {
                "header": {
                    "title": f"{emoji} {title}",
                    "style": {
                        "info": "blue",
                        "warning": "orange", 
                        "critical": "red",
                        "resolved": "green"
                    }.get(alert_level, "blue")
                },
                "elements": [
                    {"tag": "div", "text": {"content": content, "tag": "lark_md"}}
                ]
            }
        }
        
        try:
            response = requests.post(
                self.webhook_url,
                json=payload,
                timeout=10
            )
            response.raise_for_status()
        except requests.RequestException as e:
            print(f"[错误] 飞书告警发送失败: {e}")

五、生产级部署清单

熔断系统不是代码写完就能上线的。以下是生产环境必须检查的清单:

检查项 要求 优先级
状态持久化 熔断状态必须写入磁盘/数据库,重启后不丢失 P0
账户权益查询 接入真实账户余额 API,不能用默认值 P0
告警通道 至少配置两种告警方式(飞书+短信),避免单点失效 P0
权限控制 管理接口必须鉴权,禁止裸暴露 P0
限频处理 API 调用必须处理 429/503 限频错误 P1
日志审计 所有状态变更和管理操作必须记录 P1
监控大盘 熔断状态、触发次数等指标上监控面板 P2

状态持久化实现

import json
import os
from pathlib import Path


class PersistentCircuitBreaker(CircuitBreaker):
    """带状态持久化的熔断器"""
    
    def __init__(self, config: CircuitBreakerConfig, state_file: str = None):
        self.state_file = state_file or os.environ.get(
            "CIRCUIT_BREAKER_STATE_FILE", 
            "/var/lib/circuit-breaker/state.json"
        )
        super().__init__(config)
        self._load_state()
    
    def _get_state_file_path(self) -> Path:
        """获取状态文件路径"""
        path = Path(self.state_file)
        path.parent.mkdir(parents=True, exist_ok=True)
        return path
    
    def _save_state(self):
        """持久化当前状态"""
        state_data = {
            "current_state": self.state.current_state.value,
            "consecutive_losses": self.state.consecutive_losses,
            "daily_pnl": self.state.daily_pnl,
            "daily_starting_capital": self.state.daily_starting_capital,
            "state_entered_at": self.state.state_entered_at.isoformat() 
                               if self.state.state_entered_at else None,
            "last_loss_time": self.state.last_loss_time.isoformat() 
                              if self.state.last_loss_time else None,
            "manual_pause_reason": self.state.manual_pause_reason,
            "trip_count": self.state.trip_count,
            "last_updated": datetime.now().isoformat()
        }
        
        try:
            with open(self._get_state_file_path(), "w") as f:
                json.dump(state_data, f, indent=2)
        except IOError as e:
            print(f"[错误] 状态持久化失败: {e}")
    
    def _load_state(self):
        """从磁盘加载上次状态"""
        state_path = self._get_state_file_path()
        
        if not state_path.exists():
            return
        
        try:
            with open(state_path) as f:
                data = json.load(f)
            
            self.state.current_state = CircuitState(data["current_state"])
            self.state.consecutive_losses = data.get("consecutive_losses", 0)
            self.state.daily_pnl = data.get("daily_pnl", 0)
            self.state.daily_starting_capital = data.get("daily_starting_capital", 0)
            self.state.manual_pause_reason = data.get("manual_pause_reason")
            self.state.trip_count = data.get("trip_count", 0)
            
            if data.get("state_entered_at"):
                self.state.state_entered_at = datetime.fromisoformat(data["state_entered_at"])
            if data.get("last_loss_time"):
                self.state.last_loss_time = datetime.fromisoformat(data["last_loss_time"])
            
            print(f"[熔断器] 从磁盘恢复状态: {self.state.current_state.value}")
            
        except (IOError, json.JSONDecodeError) as e:
            print(f"[警告] 状态加载失败,使用默认状态: {e}")
    
    def _transition_to(self, new_state: CircuitState, reason: str = None):
        """状态转换后自动持久化"""
        super()._transition_to(new_state, reason)
        self._save_state()

六、熔断阈值配置指南

熔断阈值不是“设一个数”那么简单,它需要匹配你的策略特性和风险偏好。

策略类型 连续亏损阈值 回撤阈值 冷却期 说明
高频套利 10-15 次 1-2% 5-10 分钟 高频策略单笔亏损小,但连续亏损通常是市场结构变化信号
日内趋势 5-8 次 3-5% 15-30 分钟 日内策略波动大,需要较宽松的阈值避免频繁打断
隔夜波段 3-5 次 5-8% 30-60 分钟 波段持仓周期长,熔断后需要足够时间让市场冷静
宏观配置 2-3 次 10-15% 1-2 小时 宏观策略容量大,频繁熔断影响收益,应更保守

配置建议

  1. 先回测,后上线:用历史数据模拟熔断触发,找出适合你策略的阈值区间
  2. 留缓冲,不卡死:阈值应略高于回测中正常波动的最大值,留 10-20% 安全边际
  3. 分仓位,不全停:冷却期的仓位限制(50%)比直接熔断更灵活,既控风险又留机会

七、与 TickDB 的协同

熔断系统的核心输入是账户权益交易盈亏,而这些数据可以通过 TickDB 的市场数据 API 进行交叉验证。

例如,在触发熔断前,你可以用 TickDB 确认当时的宏观事件(如财报发布、央行决议),判断策略亏损是正常的策略衰减还是异常的市场结构变化:

# 在熔断决策中加入宏观事件上下文
async def evaluate_circuit_trip_with_context(self):
    """带宏观上下文评估熔断触发"""
    
    # 查询最近是否有重大宏观事件
    recent_events = await self._get_recent_macro_events()
    
    if recent_events:
        # 有宏观事件时,适当放宽阈值(避免正常波动触发熔断)
        adjusted_drawdown_threshold = self.config.daily_drawdown_threshold * 1.5
        adjusted_loss_threshold = self.config.consecutive_losses_threshold + 2
        
        return self._check_trip_conditions_custom(
            drawdown_threshold=adjusted_drawdown_threshold,
            loss_threshold=adjusted_loss_threshold
        )
    
    return self._check_trip_conditions()


async def _get_recent_macro_events(self) -> list:
    """查询近期宏观事件(可通过 TickDB 产业链数据实现)"""
    # 实际实现时,接入宏观事件数据源
    return []

结语

熔断不是风控的终点,而是让风控活着的那一层

没有自动熔断,你的策略在凌晨三点是裸露的。没有状态持久化,你的熔断在重启后是失忆的。没有人工干预通道,你的系统在极端情况下是失控的。

本文给出的五状态机模型覆盖了从“正常交易”到“异常恢复”的完整生命周期,配合状态持久化和 Web 管理接口,基本可以应对实盘中 99% 的风险场景。

剩下 1%?那需要你自己在实战中迭代。


下一步行动

如果你想亲手实现这套熔断系统

  1. 在 GitHub 下载本文对应的开源代码框架
  2. 用 TickDB 的 WebSocket API 接入实时行情,构建完整的策略 + 风控闭环
  3. 先在回测中模拟熔断触发场景,确认阈值设置合理后再上实盘

如果你需要 24/7 的熔断监控告警
联系 [email protected],获取 TickDB Pro 版本的熔断监控面板和短信告警通道。

如果你的团队已有成熟的熔断方案
欢迎在 GitHub Discussion 中分享你的实现,我们一起构建更健壮的量化风控生态。


风险提示:本文不构成任何投资建议。熔断机制降低风险但不能消除风险,实际效果取决于策略特性、市场环境和系统实现,请在充分回测后谨慎部署。市场有风险,投资需谨慎。