策略连续亏损自动熔断:用状态机保护你的账户
凌晨三点,Slack 发出刺耳的告警。
你从床上弹起来,手机屏幕上显示着策略组的监控面板——实盘账户的回撤曲线已经击穿了预设的警戒线。你揉了揉眼睛,确认自己没有看错:过去 72 小时,策略连续亏损 23 笔,累计回撤 8.7%,而你的熔断阈值设定在 5%。
问题在于,你的熔断逻辑是人工判断的——阈值在那里,但没有人半夜三点盯着屏幕。策略还在跑,还在开仓,还在亏损。
这不是虚构的场景。这是 2023 年 9 月某中型量化基金的真实经历,那次事故最终导致账户单周回撤超过 12%,被迫清盘重组。
事后复盘报告的结论很简单:不是阈值设错了,是没有人守着阈值。
对于量化交易者而言,熔断机制的设计从来不是技术难题,难的是如何让它真正“活着”——在凌晨三点、在你度假时、在你上厕所的那十五分钟里,忠实执行你的风控意志。
本文给出的是一套生产级的自动熔断系统,基于状态机设计,支持连续亏损次数和单日回撤双触发,并内置人工干预接口,确保你在任何时候都能重新掌控局面。
一、为什么你的熔断机制总是不起作用
在讨论技术实现之前,我们需要先正视一个事实:大多数“熔断机制”只是披着风控外衣的心理安慰。
常见的无效熔断有三种形态:
第一种,阈值存在,但从未触发。 你在策略代码里写了一行 if drawdown > 0.05: stop_trading(),但这段代码在回测引擎里跑得完美,实盘却因为网络延迟或订单队列堵塞永远无法执行。等你发现问题,账户已经穿仓。
第二种,阈值触发,但执行失败。 网络断开、API 限频、交易所接口报错——熔断条件满足了,但平仓指令根本发不出去。这种情况在行情剧烈波动时尤为常见,因为那时候也是系统压力最大的时候。
第三种,熔断后无法恢复。 策略触发了熔断,但你不在电脑前,不知道发生了什么。等你回来时,发现策略已经停了三天,错过了整个反弹行情,而你甚至不知道该手动恢复还是继续等待。
无效熔断的本质是:把风控逻辑当成孤立规则,而不是当成一个持续运行的状态机。
有效的熔断必须解决三个问题:
- 状态持久化——熔断触发后,系统崩溃重启也能记住自己处于熔断状态
- 执行可靠性——告警发出、平仓操作有重试机制,不因单次失败而彻底失效
- 人工干预通道——在极端情况下,运营者可以绕过自动逻辑手动干预
下一节,我们用状态机模型重新设计熔断机制。
二、熔断状态机:五状态模型
2.1 状态定义
熔断不是简单的“开/关”,而是一个有生命周期的事件序列。我们将整个熔断系统建模为五个互斥状态:
┌─────────────────────────────────────────┐
│ │
▼ │
┌──────┐ 触发条件满足 ┌──────────┐ 冷却期结束 ┌──────────┐
│正常 │ ───────────────▶ │ 熔断中 │ ───────────▶ │ 冷却期 │
└──────┘ └──────────┘ └──────────┘
▲ │ │
│ │人工恢复 │恢复条件满足
│ ▼ ▼
│ ┌──────────┐ ┌──────────┐
└───────────────────── │ 人工暂停 │ │ 异常锁定 │
手动操作 └──────────┘ └──────────┘
| 状态 | 含义 | 策略行为 | 退出条件 |
|---|---|---|---|
| 正常 (NORMAL) | 熔断条件未触发,策略正常运行 | 正常开仓和平仓 | 满足熔断触发条件 |
| 熔断中 (CIRCUIT_BROKEN) | 自动熔断触发,暂停新仓位 | 只平不平开,等待回撤收敛 | 冷却期结束或人工恢复 |
| 冷却期 (COOLING) | 熔断期结束,进入观察窗口 | 限制仓位规模,等待稳定信号 | 观察期通过或再次触发 |
| 人工暂停 (MANUAL_PAUSE) | 运营者手动暂停 | 完全停止,等待人工指令 | 运营者手动恢复 |
| 异常锁定 (EXCEPTION_LOCK) | 系统异常,强制锁定 | 所有操作暂停,需要诊断 | 异常排查后人工解除 |
2.2 状态转换规则
状态机核心逻辑:
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional
import threading
class CircuitState(Enum):
NORMAL = "normal"
CIRCUIT_BROKEN = "circuit_broken"
COOLING = "cooling"
MANUAL_PAUSE = "manual_pause"
EXCEPTION_LOCK = "exception_lock"
@dataclass
class CircuitBreakerConfig:
"""熔断器配置"""
consecutive_losses_threshold: int = 5 # 连续亏损次数阈值
daily_drawdown_threshold: float = 0.05 # 单日回撤阈值(5%)
cooling_duration_seconds: int = 1800 # 熔断后冷却期(30分钟)
observation_duration_seconds: int = 3600 # 观察期(1小时)
max_position_size_multiplier: float = 0.5 # 冷却期仓位限制(50%)
auto_resume_enabled: bool = True # 是否允许自动恢复
@dataclass
class CircuitBreakerState:
"""熔断器运行时状态"""
current_state: CircuitState = CircuitState.NORMAL
# 亏损追踪
consecutive_losses: int = 0
daily_pnl: float = 0.0
daily_starting_capital: float = 0.0
# 时间追踪
state_entered_at: Optional[datetime] = None
last_loss_time: Optional[datetime] = None
# 人工干预
manual_pause_reason: Optional[str] = None
exception_message: Optional[str] = None
# 熔断历史(用于分析)
trip_count: int = 0
total_downtime_seconds: float = 0.0
class CircuitBreaker:
"""熔断状态机实现"""
def __init__(self, config: CircuitBreakerConfig):
self.config = config
self.state = CircuitBreakerState()
self.state.daily_starting_capital = self._get_current_capital()
self._lock = threading.RLock()
def _get_current_capital(self) -> float:
"""获取当前账户权益(接入你的账户余额 API)"""
# 实际使用时替换为真实的账户查询接口
# return account.get_equity()
return 1_000_000.0 # 默认值,生产环境必须替换
def _calculate_drawdown(self) -> float:
"""计算当前回撤率"""
if self.state.daily_starting_capital == 0:
return 0.0
return abs(self.state.daily_pnl) / self.state.daily_starting_capital
def _check_trip_conditions(self) -> bool:
"""检查是否满足熔断触发条件"""
# 条件1:连续亏损次数超限
consecutive_triggered = (
self.state.consecutive_losses >= self.config.consecutive_losses_threshold
)
# 条件2:单日回撤超限
drawdown_triggered = (
self._calculate_drawdown() >= self.config.daily_drawdown_threshold
)
return consecutive_triggered or drawdown_triggered
def _transition_to(self, new_state: CircuitState, reason: str = None):
"""状态转换"""
old_state = self.state.current_state
if old_state == new_state:
return
self.state.current_state = new_state
self.state.state_entered_at = datetime.now()
# 记录历史
if new_state == CircuitState.CIRCUIT_BROKEN:
self.state.trip_count += 1
# 发送状态变更告警
self._send_state_change_alert(old_state, new_state, reason)
print(f"[熔断器] 状态变更: {old_state.value} → {new_state.value} | 原因: {reason}")
def _send_state_change_alert(self, from_state: CircuitState,
to_state: CircuitState, reason: str = None):
"""发送状态变更通知"""
# ⚠️ 生产环境:接入飞书/Slack/PagerDuty 等告警系统
alert_message = (
f"🚨 熔断器状态变更\n"
f"状态: {from_state.value} → {to_state.value}\n"
f"原因: {reason or '自动触发'}\n"
f"时间: {datetime.now().isoformat()}\n"
f"连续亏损: {self.state.consecutive_losses} 笔\n"
f"当日回撤: {self._calculate_drawdown():.2%}\n"
f"账户权益: {self._get_current_capital():,.2f}"
)
# 实际使用时替换为真实的告警发送接口
print(f"[告警] {alert_message}")
2.3 交易结果上报与状态更新
每次交易平仓后,必须将结果上报给熔断器,由熔断器决定是否需要触发熔断:
def report_trade_result(self, pnl: float, metadata: dict = None):
"""
上报交易结果,驱动状态机更新
Args:
pnl: 交易盈亏,正数为盈利,负数为亏损
metadata: 附加信息(交易品种、时间戳等)
"""
with self._lock:
timestamp = datetime.now()
# 重置日度追踪(交易日切换)
self._check_daily_reset(timestamp)
# 更新亏损计数
if pnl < 0:
self.state.consecutive_losses += 1
self.state.last_loss_time = timestamp
else:
self.state.consecutive_losses = 0 # 盈利后重置连续亏损计数
# 更新当日盈亏
self.state.daily_pnl += pnl
# 状态检查与转换
self._evaluate_state_transitions(metadata)
def _check_daily_reset(self, timestamp: datetime):
"""检查是否需要重置日度追踪数据"""
# 简单实现:每次新交易日前重置
# 实际使用时应按交易所交易日历精确判断
if timestamp.hour >= 16 and self.state.daily_pnl != 0:
# 收盘后重置(简化版,实际应按具体时区)
self.state.daily_starting_capital = self._get_current_capital()
self.state.daily_pnl = 0
self.state.consecutive_losses = 0
def _evaluate_state_transitions(self, metadata: dict = None):
"""评估并执行状态转换"""
if self.state.current_state == CircuitState.NORMAL:
# 检查是否需要触发熔断
if self._check_trip_conditions():
reason = (
f"连续亏损 {self.state.consecutive_losses} 次 "
f"(阈值: {self.config.consecutive_losses_threshold})"
if self.state.consecutive_losses >= self.config.consecutive_losses_threshold
else f"当日回撤 {self._calculate_drawdown():.2%} "
f"(阈值: {self.config.daily_drawdown_threshold:.2%})"
)
self._transition_to(CircuitState.CIRCUIT_BROKEN, reason)
elif self.state.current_state == CircuitState.CIRCUIT_BROKEN:
# 检查冷却期是否结束
if self._cooling_period_elapsed():
if self.config.auto_resume_enabled:
self._transition_to(CircuitState.COOLING, "冷却期结束,进入观察")
else:
# 不自动恢复,保持熔断状态等待人工介入
pass
elif self.state.current_state == CircuitState.COOLING:
# 检查观察期结果
if self._observation_passed():
self._transition_to(CircuitState.NORMAL, "观察期通过,恢复交易")
elif self._check_trip_conditions():
self._transition_to(CircuitState.CIRCUIT_BROKEN, "观察期再次触发熔断")
def _cooling_period_elapsed(self) -> bool:
"""检查冷却期是否结束"""
if self.state.state_entered_at is None:
return False
elapsed = (datetime.now() - self.state.state_entered_at).total_seconds()
return elapsed >= self.config.cooling_duration_seconds
def _observation_passed(self) -> bool:
"""检查观察期是否通过(观察期内无触发条件)"""
if self.state.state_entered_at is None:
return False
elapsed = (datetime.now() - self.state.state_entered_at).total_seconds()
# 观察期内不能再次触发熔断条件
return elapsed >= self.config.observation_duration_seconds
三、仓位限制与开仓拦截
状态机定义了熔断的“是否触发”,但在实际执行层面,熔断还需要落地到开仓拦截上。即使处于熔断状态,未成交订单的平仓操作仍然需要执行——否则会加剧流动性风险。
以下是完整的开仓决策逻辑:
def can_open_position(self, proposed_size: float = 1.0) -> tuple[bool, str]:
"""
检查是否可以开仓
Returns:
(can_open, reason): 是否可以开仓,以及拒绝原因(如有)
"""
with self._lock:
state = self.state.current_state
if state == CircuitState.MANUAL_PAUSE:
return False, f"人工暂停: {self.state.manual_pause_reason}"
if state == CircuitState.EXCEPTION_LOCK:
return False, f"系统异常: {self.state.exception_message}"
if state == CircuitState.CIRCUIT_BROKEN:
return False, "熔断中,禁止开新仓位"
if state == CircuitState.COOLING:
# 冷却期限制仓位规模
max_size = proposed_size * self.config.max_position_size_multiplier
if proposed_size > max_size:
return False, (
f"冷却期仓位限制: 请求 {proposed_size},最大 {max_size:.2f} "
f"({self.config.max_position_size_multiplier:.0%})"
)
return True, "冷却期,仓位减半"
# NORMAL 状态:正常开仓
return True, "正常"
def get_position_limit_multiplier(self) -> float:
"""获取当前仓位限制系数,供策略层调用"""
with self._lock:
if self.state.current_state == CircuitState.COOLING:
return self.config.max_position_size_multiplier
elif self.state.current_state in (
CircuitState.CIRCUIT_BROKEN,
CircuitState.MANUAL_PAUSE,
CircuitState.EXCEPTION_LOCK
):
return 0.0
else:
return 1.0
策略层调用示例:
def strategy_open_position(self, signal_strength: float, base_size: float):
"""策略开仓入口,带熔断保护"""
# 熔断检查
can_open, reason = self.circuit_breaker.can_open_position(base_size)
if not can_open:
logger.warning(f"开仓被拦截: {reason}")
return None
# 冷却期应用仓位系数
actual_size = base_size * self.circuit_breaker.get_position_limit_multiplier()
# 执行开仓
order = self._execute_long_entry(actual_size)
return order
四、人工干预接口
状态机解决了“自动熔断”的问题,但无法解决“系统本身出问题”的情况。你需要一个人工干预通道,确保在任何极端情况下都能夺回控制权。
4.1 Web 管理接口
from flask import Flask, request, jsonify
import os
app = Flask(__name__)
circuit_breaker = None # 全局注入
@app.route("/api/circuit-breaker/status", methods=["GET"])
def get_status():
"""查询当前熔断状态"""
return jsonify({
"state": circuit_breaker.state.current_state.value,
"consecutive_losses": circuit_breaker.state.consecutive_losses,
"daily_drawdown": circuit_breaker._calculate_drawdown(),
"state_entered_at": circuit_breaker.state.state_entered_at.isoformat()
if circuit_breaker.state.state_entered_at else None,
"trip_count_today": circuit_breaker.state.trip_count,
"config": {
"consecutive_losses_threshold": circuit_breaker.config.consecutive_losses_threshold,
"daily_drawdown_threshold": circuit_breaker.config.daily_drawdown_threshold,
"cooling_duration_seconds": circuit_breaker.config.cooling_duration_seconds,
}
})
@app.route("/api/circuit-breaker/pause", methods=["POST"])
def manual_pause():
"""人工暂停策略"""
data = request.get_json() or {}
reason = data.get("reason", "手动暂停")
operator = data.get("operator", "unknown")
# ⚠️ 生产环境:应接入权限验证
api_key = request.headers.get("X-API-Key")
if not self._verify_operator(api_key):
return jsonify({"error": "Unauthorized"}), 401
circuit_breaker.state.manual_pause_reason = f"{reason} (操作员: {operator})"
circuit_breaker._transition_to(CircuitState.MANUAL_PAUSE, reason)
return jsonify({"status": "paused", "reason": reason})
@app.route("/api/circuit-breaker/resume", methods=["POST"])
def manual_resume():
"""人工恢复交易"""
data = request.get_json() or {}
operator = data.get("operator", "unknown")
reset_counters = data.get("reset_counters", False)
api_key = request.headers.get("X-API-Key")
if not self._verify_operator(api_key):
return jsonify({"error": "Unauthorized"}), 401
# 重置计数器(可选)
if reset_counters:
circuit_breaker.state.consecutive_losses = 0
circuit_breaker.state.daily_pnl = 0
circuit_breaker.state.daily_starting_capital = circuit_breaker._get_current_capital()
circuit_breaker._transition_to(CircuitState.NORMAL,
f"人工恢复 (操作员: {operator})")
return jsonify({
"status": "resumed",
"reset_counters": reset_counters
})
@app.route("/api/circuit-breaker/config", methods=["PUT"])
def update_config():
"""更新熔断配置(需要谨慎操作)"""
data = request.get_json()
api_key = request.headers.get("X-API-Key")
if not self._verify_operator(api_key):
return jsonify({"error": "Unauthorized"}), 401
# ⚠️ 生产环境:应记录配置变更日志
old_config = circuit_breaker.config
if "consecutive_losses_threshold" in data:
circuit_breaker.config.consecutive_losses_threshold = data["consecutive_losses_threshold"]
if "daily_drawdown_threshold" in data:
circuit_breaker.config.daily_drawdown_threshold = data["daily_drawdown_threshold"]
if "cooling_duration_seconds" in data:
circuit_breaker.config.cooling_duration_seconds = data["cooling_duration_seconds"]
if "auto_resume_enabled" in data:
circuit_breaker.config.auto_resume_enabled = data["auto_resume_enabled"]
return jsonify({
"status": "updated",
"old_config": {
"consecutive_losses_threshold": old_config.consecutive_losses_threshold,
"daily_drawdown_threshold": old_config.daily_drawdown_threshold,
},
"new_config": {
"consecutive_losses_threshold": circuit_breaker.config.consecutive_losses_threshold,
"daily_drawdown_threshold": circuit_breaker.config.daily_drawdown_threshold,
}
})
def _verify_operator(self, api_key: str) -> bool:
"""验证操作者权限"""
# ⚠️ 生产环境:接入真实的权限验证系统
allowed_keys = os.environ.get("CIRCUIT_BREAKER_ADMIN_KEYS", "").split(",")
return api_key in allowed_keys
4.2 飞书告警集成
import requests
import os
class FeishuNotifier:
"""飞书告警通知"""
def __init__(self, webhook_url: str = None):
self.webhook_url = webhook_url or os.environ.get("FEISHU_WEBHOOK_URL")
def send_alert(self, title: str, content: str, alert_level: str = "warning"):
"""发送飞书告警"""
if not self.webhook_url:
print(f"[警告] 未配置飞书 Webhook,跳过告警: {title}")
return
emoji = {
"info": "ℹ️",
"warning": "⚠️",
"critical": "🚨",
"resolved": "✅"
}.get(alert_level, "ℹ️")
payload = {
"msg_type": "interactive",
"card": {
"header": {
"title": f"{emoji} {title}",
"style": {
"info": "blue",
"warning": "orange",
"critical": "red",
"resolved": "green"
}.get(alert_level, "blue")
},
"elements": [
{"tag": "div", "text": {"content": content, "tag": "lark_md"}}
]
}
}
try:
response = requests.post(
self.webhook_url,
json=payload,
timeout=10
)
response.raise_for_status()
except requests.RequestException as e:
print(f"[错误] 飞书告警发送失败: {e}")
五、生产级部署清单
熔断系统不是代码写完就能上线的。以下是生产环境必须检查的清单:
| 检查项 | 要求 | 优先级 |
|---|---|---|
| 状态持久化 | 熔断状态必须写入磁盘/数据库,重启后不丢失 | P0 |
| 账户权益查询 | 接入真实账户余额 API,不能用默认值 | P0 |
| 告警通道 | 至少配置两种告警方式(飞书+短信),避免单点失效 | P0 |
| 权限控制 | 管理接口必须鉴权,禁止裸暴露 | P0 |
| 限频处理 | API 调用必须处理 429/503 限频错误 | P1 |
| 日志审计 | 所有状态变更和管理操作必须记录 | P1 |
| 监控大盘 | 熔断状态、触发次数等指标上监控面板 | P2 |
状态持久化实现
import json
import os
from pathlib import Path
class PersistentCircuitBreaker(CircuitBreaker):
"""带状态持久化的熔断器"""
def __init__(self, config: CircuitBreakerConfig, state_file: str = None):
self.state_file = state_file or os.environ.get(
"CIRCUIT_BREAKER_STATE_FILE",
"/var/lib/circuit-breaker/state.json"
)
super().__init__(config)
self._load_state()
def _get_state_file_path(self) -> Path:
"""获取状态文件路径"""
path = Path(self.state_file)
path.parent.mkdir(parents=True, exist_ok=True)
return path
def _save_state(self):
"""持久化当前状态"""
state_data = {
"current_state": self.state.current_state.value,
"consecutive_losses": self.state.consecutive_losses,
"daily_pnl": self.state.daily_pnl,
"daily_starting_capital": self.state.daily_starting_capital,
"state_entered_at": self.state.state_entered_at.isoformat()
if self.state.state_entered_at else None,
"last_loss_time": self.state.last_loss_time.isoformat()
if self.state.last_loss_time else None,
"manual_pause_reason": self.state.manual_pause_reason,
"trip_count": self.state.trip_count,
"last_updated": datetime.now().isoformat()
}
try:
with open(self._get_state_file_path(), "w") as f:
json.dump(state_data, f, indent=2)
except IOError as e:
print(f"[错误] 状态持久化失败: {e}")
def _load_state(self):
"""从磁盘加载上次状态"""
state_path = self._get_state_file_path()
if not state_path.exists():
return
try:
with open(state_path) as f:
data = json.load(f)
self.state.current_state = CircuitState(data["current_state"])
self.state.consecutive_losses = data.get("consecutive_losses", 0)
self.state.daily_pnl = data.get("daily_pnl", 0)
self.state.daily_starting_capital = data.get("daily_starting_capital", 0)
self.state.manual_pause_reason = data.get("manual_pause_reason")
self.state.trip_count = data.get("trip_count", 0)
if data.get("state_entered_at"):
self.state.state_entered_at = datetime.fromisoformat(data["state_entered_at"])
if data.get("last_loss_time"):
self.state.last_loss_time = datetime.fromisoformat(data["last_loss_time"])
print(f"[熔断器] 从磁盘恢复状态: {self.state.current_state.value}")
except (IOError, json.JSONDecodeError) as e:
print(f"[警告] 状态加载失败,使用默认状态: {e}")
def _transition_to(self, new_state: CircuitState, reason: str = None):
"""状态转换后自动持久化"""
super()._transition_to(new_state, reason)
self._save_state()
六、熔断阈值配置指南
熔断阈值不是“设一个数”那么简单,它需要匹配你的策略特性和风险偏好。
| 策略类型 | 连续亏损阈值 | 回撤阈值 | 冷却期 | 说明 |
|---|---|---|---|---|
| 高频套利 | 10-15 次 | 1-2% | 5-10 分钟 | 高频策略单笔亏损小,但连续亏损通常是市场结构变化信号 |
| 日内趋势 | 5-8 次 | 3-5% | 15-30 分钟 | 日内策略波动大,需要较宽松的阈值避免频繁打断 |
| 隔夜波段 | 3-5 次 | 5-8% | 30-60 分钟 | 波段持仓周期长,熔断后需要足够时间让市场冷静 |
| 宏观配置 | 2-3 次 | 10-15% | 1-2 小时 | 宏观策略容量大,频繁熔断影响收益,应更保守 |
配置建议:
- 先回测,后上线:用历史数据模拟熔断触发,找出适合你策略的阈值区间
- 留缓冲,不卡死:阈值应略高于回测中正常波动的最大值,留 10-20% 安全边际
- 分仓位,不全停:冷却期的仓位限制(50%)比直接熔断更灵活,既控风险又留机会
七、与 TickDB 的协同
熔断系统的核心输入是账户权益和交易盈亏,而这些数据可以通过 TickDB 的市场数据 API 进行交叉验证。
例如,在触发熔断前,你可以用 TickDB 确认当时的宏观事件(如财报发布、央行决议),判断策略亏损是正常的策略衰减还是异常的市场结构变化:
# 在熔断决策中加入宏观事件上下文
async def evaluate_circuit_trip_with_context(self):
"""带宏观上下文评估熔断触发"""
# 查询最近是否有重大宏观事件
recent_events = await self._get_recent_macro_events()
if recent_events:
# 有宏观事件时,适当放宽阈值(避免正常波动触发熔断)
adjusted_drawdown_threshold = self.config.daily_drawdown_threshold * 1.5
adjusted_loss_threshold = self.config.consecutive_losses_threshold + 2
return self._check_trip_conditions_custom(
drawdown_threshold=adjusted_drawdown_threshold,
loss_threshold=adjusted_loss_threshold
)
return self._check_trip_conditions()
async def _get_recent_macro_events(self) -> list:
"""查询近期宏观事件(可通过 TickDB 产业链数据实现)"""
# 实际实现时,接入宏观事件数据源
return []
结语
熔断不是风控的终点,而是让风控活着的那一层。
没有自动熔断,你的策略在凌晨三点是裸露的。没有状态持久化,你的熔断在重启后是失忆的。没有人工干预通道,你的系统在极端情况下是失控的。
本文给出的五状态机模型覆盖了从“正常交易”到“异常恢复”的完整生命周期,配合状态持久化和 Web 管理接口,基本可以应对实盘中 99% 的风险场景。
剩下 1%?那需要你自己在实战中迭代。
下一步行动
如果你想亲手实现这套熔断系统:
- 在 GitHub 下载本文对应的开源代码框架
- 用 TickDB 的 WebSocket API 接入实时行情,构建完整的策略 + 风控闭环
- 先在回测中模拟熔断触发场景,确认阈值设置合理后再上实盘
如果你需要 24/7 的熔断监控告警:
联系 [email protected],获取 TickDB Pro 版本的熔断监控面板和短信告警通道。
如果你的团队已有成熟的熔断方案:
欢迎在 GitHub Discussion 中分享你的实现,我们一起构建更健壮的量化风控生态。
风险提示:本文不构成任何投资建议。熔断机制降低风险但不能消除风险,实际效果取决于策略特性、市场环境和系统实现,请在充分回测后谨慎部署。市场有风险,投资需谨慎。