凌晨三点,你的策略还在亏钱
凌晨三点,你被手机震动惊醒。策略已连续亏损 7 笔,账户回撤 12%,而你睡眼惺忪地点开监控面板时,它已经亏掉了过去三个月的利润。
这不是故事。这是每一个量化交易者都可能在某个深夜经历的噩梦——策略在无人值守时进入异常状态,滑点扩大、流动性枯竭、或者简单的数据源故障,却没有任何自动保护机制,只能眼睁睁看着亏损扩大。
手动监控是一个伪命题。人会疲劳、会分心、会恰好在关键时刻关掉手机通知。真正可靠的解决方案是:让程序自己知道什么时候该停下来。
本文拆解一套基于状态机的自动熔断系统,涵盖:连续亏损计数与回撤阈值双重触发、状态安全流转与人工干预接口、生产级 Python 实现,以及真实场景下的阈值配置建议。
一、为什么连续亏损是危险的
在讨论熔断机制之前,需要先理解连续亏损为什么不是简单的“运气不好”,而是系统性风险的信号。
连续亏损往往意味着市场状态发生了变化。你的策略可能基于这样的假设:高波动环境下动量会持续。但如果市场突然切换到均值回归模式,动量因子失效,策略会连续被止损。更危险的是,连续亏损期间的交易往往伴随着扩大的价差和恶化的滑点——你不仅在亏策略的钱,还在亏流动性萎缩的钱。
| 亏损模式 | 背后可能的原因 | 风险等级 |
|---|---|---|
| 随机散点亏损 | 正常波动,策略有效 | 低 |
| 连续 3-5 笔亏损 | 短期风格切换或数据噪声 | 中 |
| 连续 5+ 笔亏损 | 因子失效或市场机制变化 | 高 |
| 亏损 + 扩大回撤 | 系统性风险,正在累积 | 极高 |
另一个关键问题:账户权益的复利损耗是非对称的。 亏损 10% 需要盈利 11.1% 回本,亏损 30% 需要盈利 42.9%,而亏损 50% 需要翻倍才能回本。当回撤超过某个阈值(比如 15%),即使策略最终恢复有效,你也可能因为保证金不足被强制平仓。
因此,熔断的目标不是“预测策略会不会恢复”,而是在统计异常信号出现时强制暂停,保护账户生存到策略可能恢复的那一天。
二、状态机设计:让程序知道自己该做什么
熔断系统的核心不是一堆 if-else,而是一个状态机。状态机将策略的运行状态定义为有限的、明确的离散状态,任何状态转换都有明确的触发条件和边界行为。
2.1 四状态模型
┌─────────────────────────────────────────┐
│ │
▼ │
┌──────────┐ ┌──────────┐ ┌──────────────┐ ┌─────────┐
│ RUNNING │───▶│ COOLING │───▶│ CIRCUIT_OPEN │───▶│ MANUAL │
└──────────┘ └──────────┘ └──────────────┘ └─────────┘
▲ │ │
│ │ │
└────────────────────────────────────┘ │
(自然恢复冷却) (人工确认恢复)
| 状态 | 含义 | 允许交易 | 说明 |
|---|---|---|---|
RUNNING |
策略正常运行 | ✅ | 监控亏损计数和回撤 |
COOLING |
检测到异常,进入冷却 | ❌ | 等待 N 分钟,消除短期噪声 |
CIRCUIT_OPEN |
冷却后仍异常,熔断打开 | ❌ | 暂停到下一个恢复窗口 |
MANUAL |
人工干预暂停 | ❌ | 等待人工确认 |
2.2 状态转换规则
| 当前状态 | 触发条件 | 目标状态 | 动作 |
|---|---|---|---|
| RUNNING | 连续亏损 ≥ N 或回撤 ≥ M% | COOLING | 停止开仓,记录告警 |
| COOLING | 冷却时间到期 | RUNNING(观察)或 CIRCUIT_OPEN | 若期间无新亏损则恢复,否则熔断 |
| COOLING | 冷却时间到期 + 亏损未改善 | CIRCUIT_OPEN | 停止交易,进入长等待 |
| CIRCUIT_OPEN | 恢复时间窗口到达 | RUNNING 或 MANUAL | 尝试恢复,或强制人工介入 |
| 任意状态 | 人工介入 | MANUAL | 冻结所有操作 |
2.3 状态机代码骨架
from enum import Enum, auto
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Callable, Optional
import threading
import logging
logger = logging.getLogger(__name__)
class State(Enum):
RUNNING = auto()
COOLING = auto()
CIRCUIT_OPEN = auto()
MANUAL = auto()
@dataclass
class CircuitBreaker:
"""
基于状态机的策略熔断器
职责:
- 监控连续亏损次数和账户回撤
- 管理状态流转(RUNNING → COOLING → CIRCUIT_OPEN)
- 提供人工干预接口(暂停/恢复/强制平仓)
"""
# 熔断阈值配置
consecutive_loss_threshold: int = 5
max_drawdown_threshold: float = 0.10 # 10%
cooling_minutes: int = 15
circuit_open_hours: int = 4
# 内部状态
_state: State = field(default=State.RUNNING, repr=False)
_consecutive_losses: int = field(default=0, repr=False)
_peak_equity: float = field(default=0.0, repr=False)
_cooling_end_time: Optional[datetime] = field(default=None, repr=False)
_circuit_open_end_time: Optional[datetime] = field(default=None, repr=False)
_lock: threading.Lock = field(default_factory=threading.Lock, repr=False)
# 回调函数
on_state_change: Optional[Callable[[State, State], None]] = None
on_alert: Optional[Callable[[str], None]] = None
@property
def state(self) -> State:
"""线程安全的状态读取"""
with self._lock:
return self._state
@property
def can_trade(self) -> bool:
"""只有 RUNNING 状态允许开仓"""
return self.state == State.RUNNING
def update_equity(self, current_equity: float) -> None:
"""
更新账户权益,自动计算回撤
"""
with self._lock:
# 更新峰值
if current_equity > self._peak_equity:
self._peak_equity = current_equity
# 计算当前回撤
if self._peak_equity > 0:
current_drawdown = (self._peak_equity - current_equity) / self._peak_equity
# ⚠️ 回撤超过阈值,触发熔断
if current_drawdown >= self.max_drawdown_threshold:
self._trigger_cooling(
f"回撤触发: {current_drawdown:.2%} > {self.max_drawdown_threshold:.2%}"
)
def record_trade_result(self, profit_loss: float) -> None:
"""
记录每笔交易结果,亏损增加计数,盈利重置
"""
with self._lock:
if profit_loss < 0:
self._consecutive_losses += 1
logger.warning(f"亏损记录: 第 {self._consecutive_losses} 笔连续亏损")
# ⚠️ 连续亏损达到阈值,触发冷却
if (self._consecutive_losses >= self.consecutive_loss_threshold
and self._state == State.RUNNING):
self._trigger_cooling(
f"连续亏损触发: {self._consecutive_losses} >= {self.consecutive_loss_threshold}"
)
else:
# ⚠️ 盈利重置计数(在 RUNNING 状态下)
if self._consecutive_losses > 0:
logger.info(f"盈利 {profit_loss:.2f},重置连续亏损计数")
self._consecutive_losses = 0
2.4 冷却与恢复逻辑
def _trigger_cooling(self, reason: str) -> None:
"""进入冷却状态"""
if self._state != State.RUNNING:
return
old_state = self._state
self._state = State.COOLING
self._cooling_end_time = datetime.now() + timedelta(minutes=self.cooling_minutes)
self._emit_alert(f"🚨 进入冷却期: {reason},冷却 {self.cooling_minutes} 分钟")
self._notify_state_change(old_state, self._state)
def _try_recover_from_cooling(self) -> None:
"""冷却时间结束后,评估是否可以恢复"""
if self._state != State.COOLING:
return
if datetime.now() < self._cooling_end_time:
return
old_state = self._state
# 检查冷却期间是否有新亏损
if self._consecutive_losses >= self.consecutive_loss_threshold:
# ⚠️ 冷却后仍未改善,熔断打开
self._state = State.CIRCUIT_OPEN
self._circuit_open_end_time = datetime.now() + timedelta(hours=self.circuit_open_hours)
self._emit_alert(f"⚠️ 冷却后未恢复,熔断打开,暂停 {self.circuit_open_hours} 小时")
else:
# 冷却期间无新亏损,尝试恢复
self._state = State.RUNNING
self._emit_alert("✅ 冷却期结束,策略恢复正常运行")
self._notify_state_change(old_state, self._state)
def tick(self) -> None:
"""
⚠️ 必须在主循环中定期调用(建议每分钟)
处理时间驱动的状态转换
"""
with self._lock:
if self._state == State.COOLING:
self._try_recover_from_cooling()
elif self._state == State.CIRCUIT_OPEN:
self._check_circuit_open_recovery()
def _check_circuit_open_recovery(self) -> None:
"""熔断期结束后尝试恢复"""
if self._state != State.CIRCUIT_OPEN:
return
if datetime.now() >= self._circuit_open_end_time:
old_state = self._state
self._state = State.MANUAL # ⚠️ 强制人工介入
self._emit_alert("🔴 熔断期结束,请人工确认是否恢复交易")
self._notify_state_change(old_state, self._state)
三、人工干预接口:人永远是最终防线
状态机负责自动决策,但人工干预接口是不可省略的安全兜底。当系统出现无法自动处理的异常时,必须有人能强制介入。
3.1 核心干预操作
def pause(self, reason: str = "人工暂停") -> bool:
"""
人工暂停策略
使用场景:
- 发现数据源异常
- 收到市场突发新闻
- 策略参数需要临时调整
"""
with self._lock:
old_state = self._state
if old_state in (State.COOLING, State.CIRCUIT_OPEN):
self._state = State.MANUAL
self._emit_alert(f"⏸️ 策略已人工暂停: {reason}")
self._notify_state_change(old_state, self._state)
return True
elif old_state == State.RUNNING:
self._state = State.MANUAL
self._emit_alert(f"⏸️ 策略已人工暂停: {reason}")
self._notify_state_change(old_state, self._state)
return True
return False
def resume(self, acknowledged_by: str) -> bool:
"""
人工恢复策略(仅限 MANUAL 状态)
参数:
- acknowledged_by: 确认恢复的人员标识(审计用)
"""
with self._lock:
if self._state != State.MANUAL:
logger.warning(f"非 MANUAL 状态无法恢复,当前状态: {self._state}")
return False
old_state = self._state
self._state = State.RUNNING
self._consecutive_losses = 0 # 重置计数
self._peak_equity = 0.0 # 重置基准
logger.info(f"策略已由 {acknowledged_by} 人工恢复")
self._emit_alert(f"▶️ 策略已人工恢复,恢复人: {acknowledged_by}")
self._notify_state_change(old_state, self._state)
return True
def force_close_all(self) -> dict:
"""
强制平仓所有持仓
返回:
- 平仓操作的结果摘要
⚠️ 这是最激进的操作,仅在极端情况下使用
"""
self._emit_alert("🚨 强制平仓所有持仓告警已触发")
return {
"status": "alert_sent",
"action_required": "请在交易终端手动执行市价平仓",
"reason": "自动平仓功能需要对接券商 API,建议人工执行"
}
def _emit_alert(self, message: str) -> None:
"""发送告警通知"""
logger.warning(message)
if self.on_alert:
self.on_alert(message)
def _notify_state_change(self, old: State, new: State) -> None:
"""触发状态变更回调"""
if self.on_state_change:
self.on_state_change(old, new)
3.2 干预日志与审计
所有人工干预操作必须记录,以便事后审计和改进:
import json
from pathlib import Path
from datetime import datetime
class AuditLogger:
"""熔断操作审计日志"""
def __init__(self, log_dir: str = "./logs"):
self.log_dir = Path(log_dir)
self.log_dir.mkdir(parents=True, exist_ok=True)
self.audit_file = self.log_dir / f"circuit_breaker_audit_{datetime.now().strftime('%Y%m')}.jsonl"
def log(self, event_type: str, details: dict) -> None:
"""追加审计记录"""
record = {
"timestamp": datetime.now().isoformat(),
"event_type": event_type,
**details
}
with open(self.audit_file, "a") as f:
f.write(json.dumps(record) + "\n")
def get_monthly_report(self) -> list:
"""生成月度审计报告"""
records = []
with open(self.audit_file) as f:
for line in f:
records.append(json.loads(line))
return records
四、与交易执行层的集成
熔断器本身不执行交易,它是一个决策过滤器。真正的交易执行层需要检查熔断状态。
class StrategyExecutor:
"""策略执行器(集成熔断器)"""
def __init__(self, breaker: CircuitBreaker, broker_adapter):
self.breaker = breaker
self.broker = broker_adapter
def should_open_position(self, signal) -> tuple[bool, str]:
"""
判断是否应该开仓
返回:
- (can_trade, reason)
"""
if not self.breaker.can_trade:
return False, f"熔断器状态: {self.breaker.state.name}"
# ... 其他风控检查 ...
return True, "允许交易"
def execute_signal(self, signal) -> dict:
"""执行交易信号(带熔断检查)"""
can_trade, reason = self.should_open_position(signal)
if not can_trade:
return {
"status": "rejected",
"reason": reason,
"timestamp": datetime.now().isoformat()
}
# 执行交易 ...
result = self.broker.place_order(signal)
# ⚠️ 关键:将交易结果反馈给熔断器
self.breaker.record_trade_result(result["profit_loss"])
return result
五、阈值配置建议
熔断器的效果高度依赖阈值的合理设置。以下是基于实盘经验的参考值:
| 参数 | 保守配置 | 激进配置 | 说明 |
|---|---|---|---|
consecutive_loss_threshold |
3-5 | 5-8 | 建议 5,新策略从严 |
max_drawdown_threshold |
8%-10% | 12%-15% | 超过 15% 需特别审批 |
cooling_minutes |
15-30 | 30-60 | 覆盖一个交易时段 |
circuit_open_hours |
4-8 | 8-24 | 建议至少覆盖隔夜 |
配置原则:
- 新策略上线时使用保守配置,运行 1 个月数据后根据实际情况调整
- 高频策略可以缩短冷却时间(但缩短
consecutive_loss_threshold) - 趋势策略因为天然有连续小额亏损,用更大的亏损容忍度换趋势跟踪能力
六、进阶:与 TickDB 的集成
熔断器的状态变化和告警可以通过多种渠道通知。其中一个实用的方案是结合 TickDB 的 WebSocket 实时推送,将熔断状态变化推送到监控面板:
import os
import time
import json
import websocket
import random
import logging
logger = logging.getLogger(__name__)
class TickDBAlertPusher:
"""
使用 TickDB WebSocket 推送熔断告警
⚠️ 本代码用于演示告警推送架构
生产环境请替换为实际的消息队列(飞书/钉钉/Slack)
"""
def __init__(self, api_key: str = None):
self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
self.ws_url = "wss://api.tickdb.ai/ws/v1/market"
self._ws = None
self._reconnect_delay = 1
self._max_reconnect_delay = 30
def _connect(self) -> bool:
"""建立 WebSocket 连接"""
try:
# ⚠️ 熔断告警推送使用 TickDB 的自定义消息通道
self._ws = websocket.WebSocketApp(
self.ws_url,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
on_open=self._on_open
)
return True
except Exception as e:
logger.error(f"WebSocket 连接失败: {e}")
return False
def _on_open(self, ws):
"""连接建立后发送认证"""
auth_msg = {
"cmd": "auth",
"api_key": self.api_key
}
ws.send(json.dumps(auth_msg))
logger.info("TickDB WebSocket 已连接")
def _on_message(self, ws, message):
"""处理接收消息"""
data = json.loads(message)
if data.get("cmd") == "pong":
logger.debug("心跳响应正常")
def _on_error(self, ws, error):
logger.warning(f"WebSocket 错误: {error}")
def _on_close(self, ws, close_status_code, close_msg):
logger.warning(f"WebSocket 关闭: {close_status_code} - {close_msg}")
self._schedule_reconnect()
def _schedule_reconnect(self):
"""指数退避重连(带抖动)"""
delay = min(self._reconnect_delay * 2, self._max_reconnect_delay)
jitter = random.uniform(0, delay * 0.1)
self._reconnect_delay = delay + jitter
logger.info(f"计划 {self._reconnect_delay:.1f} 秒后重连")
time.sleep(self._reconnect_delay)
self._connect()
def push_alert(self, alert_type: str, message: str, state_info: dict) -> None:
"""
推送熔断告警
⚠️ 演示代码:实际生产中建议使用专业的告警服务
"""
alert_payload = {
"cmd": "alert",
"type": alert_type,
"message": message,
"state": state_info,
"timestamp": time.time()
}
try:
if self._ws and self._ws.sock and self._ws.sock.connected:
self._ws.send(json.dumps(alert_payload))
logger.info(f"告警已推送: {alert_type} - {message}")
else:
logger.warning("WebSocket 未连接,告警暂存")
except Exception as e:
logger.error(f"告警推送失败: {e}")
def start_heartbeat(self):
"""启动心跳保活"""
def heartbeat_loop():
while True:
time.sleep(30)
try:
if self._ws and self._ws.sock and self._ws.sock.connected:
self._ws.send(json.dumps({"cmd": "ping"}))
except Exception:
pass
import threading
threading.Thread(target=heartbeat_loop, daemon=True).start()
七、总结
熔断不是放弃,而是有序地撤退。一个设计良好的熔断系统应该做到:
- 自动识别异常:通过连续亏损计数和回撤阈值双重信号,避免单一指标的误判
- 有序状态流转:冷却 → 熔断 → 人工介入,每一步都有清晰的等待期和恢复条件
- 保留人工控制权:机器负责常规情况,人负责极端情况
- 完整的审计追溯:每一次状态变化都有记录可查
市场永远在变化,策略的有效性也永远在波动。熔断器的本质不是“阻止亏损”,而是确保你在策略失效的时候还活着,活到它再次有效的那一天。
下一步行动
如果你是个人量化开发者:
- 访问 tickdb.ai 注册,获取免费 API Key
- 将本文的状态机代码集成到你的策略框架中
- 从保守阈值开始运行,观察 2-4 周后根据实盘数据调整
如果你在寻找更多风控相关的实战案例:
- 《仓位管理背后的数学:凯利公式的实战局限与改良》
- 《事件驱动策略的尾部风险:用期权结构对冲黑天鹅》
如果你需要机构级的风控方案,联系 [email protected] 获取专属咨询。
风险提示:本文介绍的风控框架旨在提供系统性思路,具体阈值和参数需根据策略特性、市场环境和个人风险承受能力独立评估。历史表现不代表未来收益,程序化交易存在滑点、流动性风险和技术故障可能导致实际执行与回测结果产生重大偏差。请在实盘使用前充分测试,并在必要时寻求专业风控顾问的意见。