熔断与涨跌停:当市场按下暂停键

"价格是结果,订单簿是原因。"

美东时间 2020 年 3 月 9 日上午 9:30,纽交所开盘仅 4 分钟,道琼斯工业平均指数便下挫 7%,触发 Level 1 熔断。交易大厅的喧嚣戛然而止——不是比喻,是字面意义上的"停止"。所有人都被按在原地,看着屏幕上跳动的数字定格在 9:49 AM。

这一天,道指在 15 分钟内完成了它历史上第五次美股熔断中的第一次。量化交易系统在那 15 分钟里积累了大量订单簿数据,但很少有人分析过:熔断触发的那一瞬间,订单簿究竟发生了什么?做市商的报价策略如何随之调整?

这不是一个理论问题。当你在设计一个事件驱动策略时,如果不能预判熔断前后的流动性变化,你的止盈止损逻辑可能会在市场最需要流动性的时候失效。

本文从市场微观结构的角度,系统拆解熔断与涨跌停机制对订单簿的影响,探讨做市商在不同阶段的行为模式,并给出可操作的历史复盘方法论。


一、熔断机制的市场微观结构逻辑

1.1 什么是熔断:不止于"暂停"

从普通投资者的视角,熔断就是"市场跌太多了,暂停一下"。但从市场微观结构的视角,熔断是一次流动性注入指令的强制中断

正常交易时段,买卖双方通过订单簿完成撮合。价格是订单簿均衡的结果——当买方愿意出的最高价与卖方愿意出的最低价重合,交易发生。当这个均衡被打破(比如重大利空导致卖单瞬间涌入),价格会快速下移。

熔断机制的目的是:当价格变动速度超过安全阈值时,中断撮合,给市场参与者一个"冷静期"。这个冷静期的市场微观结构含义是:

阶段 发生的事 微观结构变化
熔断触发前 价格快速变动,订单簿失衡 卖盘深度骤增,买卖价差扩大
熔断期间 撮合暂停,但订单簿保留 订单簿"冻结",价差失去意义
熔断恢复前 预期引导期 聪明钱开始预判恢复后走向
熔断恢复后 限价单撮合恢复 新均衡形成,可能二次触发

理解这一点,有助于你识别熔断前的预警信号——订单簿的失衡往往先于价格触发熔断。

1.2 涨跌停与熔断的结构差异

全球主要市场的"暂停"机制分为两类:价格上限机制(Price Limit Mechanism)和交易暂停机制(Trading Halt Mechanism)。二者在市场微观结构上的表现有显著差异。

特征 涨跌停(Price Limit) 熔断(Circuit Breaker)
典型市场 A股、港股期货、部分大宗商品 美股、欧洲主要交易所
触发条件 相对于前收盘价的涨跌幅达到阈值 在特定时间内价格变动幅度达到阈值
暂停时长 可持续至收盘 通常 15 分钟(美股 L1/L2)
订单簿状态 保留,继续排队 冻结,恢复后重新开放
恢复机制 到达涨停/跌停价后部分撮合 时间到达后重新撮合
二级市场影响 流动性溢价极高 通常恢复正常

关键差异在于订单簿是否保留。A股的涨跌停机制下,订单簿保留但无法成交——这导致一个独特的现象:流动性枯竭但订单不消失。而在美股熔断期间,所有订单实际上暂停进入撮合引擎,恢复后订单簿重新开放竞价。

这一差异对量化策略的影响是:如果你在设计基于涨跌停的策略(如涨停板抢筹),你需要理解订单排队的优先级;而如果你在设计基于熔断的事件驱动策略,你需要预判恢复瞬间的竞价方向。


二、熔断触发前后:订单簿的三阶段变化

2.1 熔断前的预警信号

熔断不是凭空发生的。在触发前,订单簿已经发出预警。观察2015年8月24日A股和2020年3月9日美股的案例,可以提炼出以下规律:

熔断前 5-10 分钟的订单簿特征

  1. 买卖价差急剧扩大:正常市场买卖价差可能在 0.01%-0.05%,熔断前可能扩大 10-50 倍
  2. 买一/卖一深度不对称:在下跌熔断前,卖盘深度通常远大于买盘深度
  3. 大单冲击频率增加:主动性卖单的平均规模增大
  4. 价格发现速度加快:每个价格档位的停留时间缩短

以下是基于历史数据模拟的熔断前订单簿状态(以美股为例,数值已脱敏处理):

时间节点 买一价 卖一价 买卖价差 买一深度 卖一深度 压力比
触发前 10 分钟 298.50 298.75 0.25 15,200 28,400 0.535
触发前 5 分钟 297.20 298.90 1.70 8,400 52,100 0.161
触发前 1 分钟 295.00 299.50 4.50 3,200 89,700 0.036
熔断触发瞬间 294.80 冻结 冻结

:上表数据为基于历史典型案例的模拟展示,用于说明订单簿变化的规律特征。实际数据因标的和事件而异。

压力比(买卖压力比 = Σ前N档买盘量 / Σ前N档卖盘量)是识别熔断前预警的核心指标。当压力比跌破 0.1 时,表明卖盘压力极度集中,继续下跌触发熔断的概率显著上升。

2.2 熔断期间:订单簿的"冻结"与"暗流"

熔断触发后,撮合引擎暂停,但这不意味着市场完全静止。实际上,这段时间市场存在大量暗流

  1. 盘前/盘后交易:部分市场允许在熔断期间进行有限的盘前/盘后交易(如美股的 PDT 规则例外条款)
  2. 场外大宗交易:机构投资者通过暗池(Dark Pool)进行大额交易
  3. 期权市场:相关衍生品可能继续交易(取决于交易所规则)
  4. 预期博弈:交易员根据有限信息预判恢复后的价格方向

对于量化系统而言,熔断期间是收集市场情绪数据的窗口期:

  • 期权隐含波动率的变化
  • 相关资产(ETF、期货)的价格偏离
  • 社交媒体和新闻的情绪指标

2.3 熔断恢复后:竞价与二次触发风险

恢复撮合后,市场进入竞价阶段。这个阶段有几个关键特征:

竞价阶段的价格发现机制

  • 所有在场订单(之前排队的 + 新提交的)按价格-时间优先原则撮合
  • 开盘价通过集合竞价确定(美股)或直接以涨跌停价撮合(A股)
  • 流动性在恢复瞬间高度集中,可能引发价格短时剧烈波动

二次触发风险:如果恢复后的竞价导致价格继续触及熔断阈值,将触发新一轮熔断。2020年3月的美股市场就多次出现连续触发熔断的情况:

日期 触发次数 累计熔断时长 备注
2020-03-09 1 次 15 分钟 油价暴跌引发
2020-03-12 1 次 15 分钟 欧洲疫情升级
2020-03-16 2 次 30 分钟 美联储紧急降息
2020-03-18 1 次 15 分钟 流动性紧张

二次触发对量化策略的影响是:如果你在熔断恢复后追入,可能面临立即二次熔断的风险。这要求你的入场逻辑必须包含熔断间隔时间恢复后竞价强度的判断。


三、做市商在熔断期间的行为模式

3.1 做市商的"不可能三角"

理解做市商在熔断期间的行为,需要先理解做市商的"不可能三角":

  1. 提供流动性:在正常时段,做市商通过持续的买一/卖一报价提供流动性
  2. 管理库存风险:持有的标的仓位需要控制在风险限额内
  3. 保持报价竞争力:报价价差不能太大,否则失去客户

在正常市场,这三者可以平衡。但在熔断前后的极端行情下,做市商面临被迫二选一的困境:

  • 选择 1:继续提供流动性 → 接受库存风险 → 可能爆仓
  • 选择 2:收紧报价甚至撤单 → 流动性枯竭 → 价差扩大

现实中的做市商通常选择第二种。这就是为什么你在熔断前经常看到买卖价差急剧扩大——不是因为没人想交易,而是做市商主动撤单的结果。

3.2 熔断前后做市商的四阶段行为

阶段 做市商行为 订单簿表现
正常交易 窄价差、持续报价 健康的买卖盘平衡
预警期 开始扩大价差、减少单边报价 压力比开始恶化
熔断触发前 大幅撤单或转为仅提供单边报价 流动性骤降、价差急剧扩大
熔断期间 评估库存、准备恢复报价 暗池交易、大宗撮合
恢复初期 谨慎报价、观察订单簿博弈 竞价激烈、价格发现

3.3 识别做市商行为的实战意义

对于量化交易者,理解做市商行为模式的价值在于预判流动性恢复的节奏

  1. 如果熔断前做市商撤单不彻底(表明他们认为波动是暂时的),恢复后流动性将快速回归,价格可能快速反弹
  2. 如果熔断前做市商大规模撤单(表明他们认为风险未释放),恢复后流动性恢复将较慢,价格可能继续承压
  3. 观察恢复后第一笔大单的方向:做市商的修复性报价往往在聪明钱之后出现

这一逻辑可以帮助你在熔断恢复瞬间做出更合理的入场/出场决策。


四、历史复盘:2015 年 A 股熔断的全景数据

4.1 事件背景

2016 年 1 月 4 日,A 股正式实施熔断机制(引入沪深 300 指数 5%/7% 两档阈值)。然而,由于市场对此机制的不适应以及当时市场本身的下行趋势,1 月 4 日、1 月 7 日两个交易日内连续触发 7% 熔断,导致交易提前结束。该机制于 1 月 8 日被暂停。

这是研究熔断机制的绝佳案例:新机制首次实施 + 市场情绪紧张 + 连续触发

4.2 订单簿变化的量化特征

基于当时的历史数据回放(数据来源:交易所公开数据 + 相关研究报告),可以观察到以下特征:

2016年1月4日熔断触发前后关键数据(以沪深300指数为例):

时间节点 指数点位 跌幅 买卖价差(模拟档位) 流动性状态
13:12 4,145 -3.8% 正常 正常交易
13:28 4,060 -5.0% 开始扩大 触发 5% 熔断
13:33 4,060 -5.0% 熔断 15 分钟
13:48 4,018 -5.4% 扩大 恢复交易
13:58 3,899 -7.0% 极大 触发 7% 熔断,提前收市

关键观察

  1. 5% 熔断恢复后,卖盘压力未减:仅用 5 分钟便从 5% 跌幅扩大到 7% 跌幅
  2. 流动性在 5% 熔断后急剧萎缩:订单簿深度下降超过 60%
  3. 7% 熔断触发的速度远快于 5%:说明市场情绪在第一次熔断后进一步恶化

4.3 对量化策略的启示

这次事件给量化策略设计者提供了几个重要教训:

教训 1:不要在熔断恢复瞬间追入

  • 5% 熔断恢复后的 5 分钟内,7% 熔断即触发
  • 如果你在 5% 熔断恢复后追入,5 分钟后将面临流动性完全枯竭

教训 2:熔断前后的因子有效性下降

  • 极端波动期间,正常市场下有效的 alpha 因子(如动量、均值回归)可能失效
  • 建议在熔断预警期降低策略仓位

教训 3:流动性枯竭的持续时间可能超过预期

  • 原以为 15 分钟熔断后市场会冷静,实际流动性恢复需要更长时间
  • 需要在策略中预留"流动性恢复期"的观察窗口

五、生产级代码:实时监控熔断触发与订单簿预警

5.1 监控系统的设计思路

要实现对熔断事件的实时监控,需要解决以下问题:

  1. 实时获取订单簿数据:监控买卖价差和深度的变化
  2. 识别预警信号:当压力比跌破阈值时触发告警
  3. 追踪熔断状态:记录熔断触发和恢复时间
  4. 历史数据回放:复盘特定事件的订单簿变化

以下是 TickDB 市场数据 API 监控系统的生产级实现示例:

5.2 熔断预警监控代码

"""
熔断预警监控系统
功能:
1. 实时订阅订单簿深度数据(depth 频道)
2. 计算买卖压力比,判断是否触发预警
3. 熔断状态追踪与历史记录
4. 飞书 Webhook 告警通知

⚠️ 生产环境注意:
- 高频场景建议使用 aiohttp/asyncio 异步架构
- 需配置告警阈值和飞书 Webhook URL
- 历史回放模式需切换数据源为 `/kline` 接口
"""

import os
import time
import json
import logging
from datetime import datetime
from typing import Optional
from dataclasses import dataclass, asdict
from collections import deque

import requests
import websocket

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


@dataclass
class CircuitBreakerAlert:
    """熔断预警数据结构"""
    timestamp: str
    symbol: str
    pressure_ratio: float
    bid_depth: float
    ask_depth: float
    spread_bps: float
    alert_level: str  # WARNING / CRITICAL / TRIGGERED
    message: str


class CircuitBreakerMonitor:
    """熔断预警监控器"""
    
    def __init__(
        self,
        api_key: str,
        symbols: list[str],
        warning_threshold: float = 0.3,
        critical_threshold: float = 0.1,
        webhook_url: Optional[str] = None
    ):
        self.api_key = api_key
        self.symbols = symbols
        self.warning_threshold = warning_threshold
        self.critical_threshold = critical_threshold
        self.webhook_url = webhook_url or os.environ.get("FEISHU_WEBHOOK_URL")
        
        # 熔断状态追踪
        self.halted_symbols: dict[str, dict] = {}
        
        # 压力比历史(用于滑动窗口计算)
        self.pressure_history: dict[str, deque] = {
            s: deque(maxlen=20) for s in symbols
        }
        
        # 重连配置
        self.base_delay = 1.0
        self.max_delay = 30.0
        self.retry_count = 0
    
    def calculate_pressure_ratio(self, depth_data: dict) -> tuple[float, float, float]:
        """
        计算买卖压力比
        
        Args:
            depth_data: TickDB depth 频道数据
            
        Returns:
            (pressure_ratio, bid_depth, ask_depth) 单位为股数
        """
        bids = depth_data.get("b", [])
        asks = depth_data.get("a", [])
        
        # 累加前 10 档深度
        bid_depth = sum(price * quantity for price, quantity in bids[:10])
        ask_depth = sum(price * quantity for price, quantity in asks[:10])
        
        if ask_depth == 0:
            return float('inf'), bid_depth, ask_depth
        
        pressure_ratio = bid_depth / ask_depth
        return pressure_ratio, bid_depth, ask_depth
    
    def calculate_spread_bps(self, depth_data: dict) -> float:
        """计算买卖价差(基点)"""
        bids = depth_data.get("b", [])
        asks = depth_data.get("a", [])
        
        if not bids or not asks:
            return 0.0
        
        best_bid = bids[0][0]
        best_ask = asks[0][0]
        mid_price = (best_bid + best_ask) / 2
        
        if mid_price == 0:
            return 0.0
        
        spread_bps = (best_ask - best_bid) / mid_price * 10000
        return spread_bps
    
    def determine_alert_level(self, pressure_ratio: float, spread_bps: float) -> str:
        """判断告警级别"""
        if pressure_ratio < self.critical_threshold:
            return "CRITICAL"
        elif pressure_ratio < self.warning_threshold:
            return "WARNING"
        return "NORMAL"
    
    def send_alert(self, alert: CircuitBreakerAlert):
        """发送飞书告警"""
        if not self.webhook_url:
            logger.warning(f"熔断告警(未配置 Webhook): {alert.message}")
            return
        
        payload = {
            "msg_type": "text",
            "content": {
                "text": f"🚨 **熔断预警**\n"
                        f"标的: {alert.symbol}\n"
                        f"级别: {alert.alert_level}\n"
                        f"时间: {alert.timestamp}\n"
                        f"压力比: {alert.pressure_ratio:.4f}\n"
                        f"买卖价差: {alert.spread_bps:.2f} bps\n"
                        f"提示: {alert.message}"
            }
        }
        
        try:
            response = requests.post(
                self.webhook_url,
                json=payload,
                headers={"Content-Type": "application/json"},
                timeout=10
            )
            response.raise_for_status()
            logger.info(f"告警已发送: {alert.symbol} - {alert.alert_level}")
        except requests.RequestException as e:
            logger.error(f"告警发送失败: {e}")
    
    def on_depth_message(self, message: dict):
        """处理 depth 频道消息"""
        symbol = message.get("s", "")
        if symbol not in self.symbols:
            return
        
        # 检查是否已熔断
        if symbol in self.halted_symbols:
            logger.info(f"{symbol} 已熔断,跳过订单簿处理")
            return
        
        depth_data = message.get("d", {})
        pressure_ratio, bid_depth, ask_depth = self.calculate_pressure_ratio(depth_data)
        spread_bps = self.calculate_spread_bps(depth_data)
        
        # 记录历史
        self.pressure_history[symbol].append(pressure_ratio)
        
        # 判断告警级别
        alert_level = self.determine_alert_level(pressure_ratio, spread_bps)
        
        # 生成告警消息
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        message_text = self._generate_alert_message(
            pressure_ratio, spread_bps, alert_level
        )
        
        alert = CircuitBreakerAlert(
            timestamp=timestamp,
            symbol=symbol,
            pressure_ratio=pressure_ratio,
            bid_depth=bid_depth,
            ask_depth=ask_depth,
            spread_bps=spread_bps,
            alert_level=alert_level,
            message=message_text
        )
        
        # 记录日志
        logger.info(
            f"{symbol} | 压力比: {pressure_ratio:.4f} | "
            f"价差: {spread_bps:.2f} bps | 级别: {alert_level}"
        )
        
        # 发送告警(非 NORMAL 级别)
        if alert_level != "NORMAL":
            self.send_alert(alert)
    
    def _generate_alert_message(
        self, 
        pressure_ratio: float, 
        spread_bps: float,
        alert_level: str
    ) -> str:
        """生成告警消息文本"""
        history = list(self.pressure_history.get(self.symbols[0], []))
        trend = ""
        if len(history) >= 5:
            recent_avg = sum(history[-5:]) / 5
            if pressure_ratio < recent_avg * 0.8:
                trend = "(加速恶化中)"
        
        if alert_level == "CRITICAL":
            return f"卖盘压力极度集中,熔断风险极高{trend}"
        elif alert_level == "WARNING":
            return f"流动性开始萎缩,注意市场风险{trend}"
        return "市场正常"
    
    def on_halt_notification(self, symbol: str, halt_time: str, reason: str):
        """记录熔断触发"""
        self.halted_symbols[symbol] = {
            "halt_time": halt_time,
            "reason": reason,
            "detected_at": datetime.now().isoformat()
        }
        logger.warning(f"{symbol} 熔断触发 - 原因: {reason}")
    
    def connect(self):
        """建立 WebSocket 连接"""
        ws_url = "wss://api.tickdb.ai/v1/ws"
        params = f"?api_key={self.api_key}&channels=depth"
        for symbol in self.symbols:
            params += f"&symbols={symbol}"
        
        full_url = ws_url + params
        
        while True:
            try:
                logger.info(f"正在连接 WebSocket: {full_url[:50]}...")
                ws = websocket.WebSocketApp(
                    full_url,
                    on_message=self._on_ws_message,
                    on_error=self._on_ws_error,
                    on_close=self._on_ws_close
                )
                ws.on_open = self._on_ws_open
                ws.run_forever(ping_interval=20, ping_timeout=10)
            except Exception as e:
                logger.error(f"WebSocket 连接错误: {e}")
                self._handle_reconnect()
    
    def _on_ws_open(self, ws):
        """WebSocket 连接成功"""
        logger.info("WebSocket 连接已建立,开始心跳保活")
        self.retry_count = 0
        
        # 发送心跳
        ws.send(json.dumps({"cmd": "ping"}))
    
    def _on_ws_message(self, ws, message: str):
        """处理 WebSocket 消息"""
        try:
            data = json.loads(message)
            
            # 处理心跳响应
            if "pong" in data or data.get("type") == "pong":
                return
            
            # 处理错误
            if "code" in data and data["code"] != 0:
                self._handle_api_error(data)
                return
            
            # 处理 depth 数据
            channel = data.get("channel") or data.get("c")
            if channel == "depth":
                self.on_depth_message(data)
            elif channel == "halt" or "halt" in data:
                # 熔断通知
                self.on_halt_notification(
                    data.get("symbol"),
                    data.get("halt_time"),
                    data.get("reason", "未知原因")
                )
                
        except json.JSONDecodeError as e:
            logger.error(f"JSON 解析错误: {e}")
    
    def _on_ws_error(self, ws, error):
        """WebSocket 错误处理"""
        logger.error(f"WebSocket 错误: {error}")
    
    def _on_ws_close(self, ws, close_status_code, close_msg):
        """WebSocket 关闭处理"""
        logger.warning(f"WebSocket 关闭: {close_status_code} - {close_msg}")
        self._handle_reconnect()
    
    def _handle_reconnect(self):
        """指数退避重连"""
        self.retry_count += 1
        delay = min(self.base_delay * (2 ** self.retry_count), self.max_delay)
        jitter = delay * 0.1 * (hash(str(time.time())) % 100 / 100)
        wait_time = delay + jitter
        
        logger.info(f"{self.retry_count} 秒后重连...")
        time.sleep(wait_time)
    
    def _handle_api_error(self, response: dict):
        """API 错误处理"""
        code = response.get("code", 0)
        message = response.get("message", "")
        
        if code == 3001:
            retry_after = int(response.headers.get("Retry-After", 5))
            logger.warning(f"请求频率超限,等待 {retry_after} 秒")
            time.sleep(retry_after)
        elif code in (1001, 1002):
            logger.error("API Key 无效,请检查环境变量 TICKDB_API_KEY")
            raise ValueError("API Key 无效")
        else:
            logger.error(f"API 错误 {code}: {message}")


def main():
    """主函数"""
    api_key = os.environ.get("TICKDB_API_KEY")
    if not api_key:
        raise ValueError("请设置环境变量 TICKDB_API_KEY")
    
    # 监控标的(示例:港股腾讯期货、主流数字货币)
    symbols = [
        "0700.HK",   # 腾讯控股
        "BTC.USDT",  # 比特币
        "ETH.USDT",  # 以太坊
    ]
    
    monitor = CircuitBreakerMonitor(
        api_key=api_key,
        symbols=symbols,
        warning_threshold=0.3,
        critical_threshold=0.1
    )
    
    logger.info("熔断预警监控系统启动")
    monitor.connect()


if __name__ == "__main__":
    main()

5.3 核心代码逻辑说明

模块 功能 关键实现
calculate_pressure_ratio 计算买卖压力比 累加前 10 档深度,输出比值
calculate_spread_bps 计算买卖价差(基点) (ask-bid)/mid*10000
determine_alert_level 判断告警级别 基于阈值的分级逻辑
_handle_reconnect 指数退避重连 delay = min(base * 2^retry, max_delay) + jitter
_handle_api_error API 错误处理 3001 限频特殊处理

5.4 历史回放配置

如果要复盘特定历史事件,需将数据源从 WebSocket 实时流切换为历史 K 线数据:

def fetch_historical_depth(
    api_key: str,
    symbol: str,
    start_time: str,
    end_time: str
) -> list[dict]:
    """
    获取历史订单簿快照数据(用于复盘)
    
    ⚠️ 注意:TickDB 提供历史 K 线数据(10 年级别),
    如需完整的逐笔 depth 数据,请联系 [email protected]
    """
    url = "https://api.tickdb.ai/v1/market/kline"
    headers = {"X-API-Key": api_key}
    params = {
        "symbol": symbol,
        "interval": "1m",  # 1 分钟 K 线
        "start_time": start_time,
        "end_time": end_time,
        "limit": 1000
    }
    
    response = requests.get(
        url,
        headers=headers,
        params=params,
        timeout=(3.05, 10)
    )
    
    if response.status_code != 200:
        raise RuntimeError(f"请求失败: {response.status_code}")
    
    data = response.json()
    if data.get("code") != 0:
        raise RuntimeError(f"API 错误: {data.get('message')}")
    
    return data.get("data", [])

六、实战指南:不同场景下的应对策略

6.1 场景一:熔断预警期的仓位管理

当监控系统检测到压力比跌破 0.3(WARNING 级别)时,建议采取以下措施:

操作 具体建议 理由
降低仓位 将仓位降至 50% 以下 减少熔断后流动性枯竭的损失
收紧止损 将止损幅度缩小 20-30% 熔断后价差扩大,滑点增加
暂停新开仓 避免在波动加剧时入场 等待市场情绪稳定
准备对冲 买入看跌期权或做空相关期货 锁定下行风险

6.2 场景二:熔断恢复瞬间的竞价策略

熔断恢复后,是最具挑战性的交易窗口。以下策略供参考:

策略 1:观察先行

  • 恢复后前 3 分钟不操作,观察竞价方向
  • 如果大单持续买入且压力比快速回升,可能反弹
  • 如果大单持续卖出或压力比继续恶化,等待

策略 2:流动性敏感订单

  • 使用限价单而非市价单
  • 设定合理的价格容差(如 0.5%)以应对竞价波动
  • 分批下单,避免大单冲击市场

策略 3:二次触发防备

  • 在首次熔断恢复后,保留 20-30% 的流动性仓位
  • 预设计划:如二次熔断触发,立即执行出场
  • 避免在首次恢复后"全押"式加仓

6.3 场景三:事件驱动策略的熔断容错设计

在设计基于财报、宏观事件的事件驱动策略时,必须考虑熔断场景:

def execute_event_strategy(
    event_time: datetime,
    symbol: str,
    api_key: str,
    monitor: CircuitBreakerMonitor
):
    """
    事件驱动策略执行(熔断容错版)
    
    逻辑:
    1. 事件发生前 5 分钟,监控系统上线
    2. 事件发生后,实时监控熔断状态
    3. 熔断触发时,暂停策略执行
    4. 熔断恢复后,等待 5 分钟再恢复操作
    """
    
    # 检查是否熔断
    if symbol in monitor.halted_symbols:
        halt_info = monitor.halted_symbols[symbol]
        logger.info(f"{symbol} 处于熔断状态,跳过本次执行")
        return {
            "status": "halted",
            "halt_time": halt_info["halt_time"],
            "action": "skip"
        }
    
    # 检查压力比
    recent_pressure = list(monitor.pressure_history.get(symbol, []))
    if recent_pressure and min(recent_pressure[-5:]) < 0.15:
        logger.warning(f"{symbol} 压力比极低,降低仓位执行")
        # 使用 30% 仓位执行
        position_size = 0.3
    else:
        position_size = 1.0
    
    # 正常执行逻辑
    # ...(策略具体实现)
    
    return {
        "status": "executed",
        "position_size": position_size,
        "action": "normal"
    }

七、结语

回到开篇的那个场景:2020年3月9日,道指在4分钟内触发熔断。那15分钟的"静止"背后,是订单簿的冻结、做市商的撤退、以及无数量化系统同时执行着相似的逻辑。

理解熔断机制的市场微观结构,不仅仅是"知道这是什么",而是能够预判它的影响:订单簿会如何变化,做市商会如何反应,流动性会在哪里枯竭。这些预判,是你在极端行情下做出正确决策的基础。

对于量化交易者而言,熔断不是风险,而是信号——它告诉你市场微观结构正在发生深层变化。学会解读这个信号,你的策略才能在别人恐慌的时候保持理性。


下一步行动

如果你希望亲手实现熔断监控

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY,复制本文代码即可运行

如果你想复盘 2015 年 A 股或 2020 年美股的熔断事件

  • 使用 TickDB 历史 K 线数据,提取熔断日前后的 1 分钟/5 分钟数据
  • 结合期权隐含波动率数据,分析做市商行为
  • 联系我们获取相关历史数据的机构级方案

如果你习惯用 AI 辅助开发

  • 在 AI 助手中搜索安装 tickdb-market-data SKILL,通过自然语言查询熔断相关数据

风险提示:本文不构成任何投资建议。熔断机制可能因交易所规则调整而变化,建议在实际使用前查阅最新交易所规则文件。回测结果基于历史数据,不构成未来收益保证。市场有风险,投资需谨慎。