凌晨 3:17,你的飞书收到了一条告警:

ALERT: 港股行情流中断,持续 58 秒未收到数据包

你从床上弹起来抓过电脑,准备紧急排查。打开监控面板看了一眼——亚太盘正常。刷新一下交易软件——行情在走。告警继续响。你开始怀疑系统,怀疑服务器,怀疑是不是有什么神秘力量在作祟。

然后你想起来了:现在是港股午休时间,12:00-13:00 正好没有数据推送。

这种告警,相信每个做过实盘监控系统的人都不陌生。更糟糕的版本是凌晨 4 点告警美股盘后成交量异常低,你花了半小时确认不是故障,结果只是一个"低交易量的正常盘后时段"。

这不是监控系统的 bug,是监控系统设计上的根本缺陷:它不知道市场什么时候应该沉默

本文拆解这个问题的完整解法:从市场日历的结构化建模,到动态告警阈值的计算逻辑,再到时段感知状态机的工程实现。代码全部可直接运行,适用于同时监控港股、美股、数字货币等多市场的量化团队。


一、为什么"沉默"是监控系统的最大挑战

1.1 监控系统的经典假设

传统监控系统设计时,隐含了一个假设:有数据是常态,没数据是异常。这个假设放在 7×24 运行的服务(如 HTTP API、数据库)上完全正确。但放在金融市场数据监控上,就是一场灾难。

金融市场的数据流有天然的"空白期":

市场 交易时段 休市/无数据时段 时长
A股 09:30-11:30 / 13:00-15:00 午休 11:30-13:00 90 分钟
港股 09:30-12:00 / 13:00-16:00 午休 12:00-13:00 60 分钟
美股 09:30-16:00 ET 盘前 04:00-09:30 / 盘后 16:00-20:00 可长达 16 小时
数字货币 24×7 无(但流动性在 UTC 0-6 点极低)

如果监控系统在这些"空白期"内检测到"无数据",它唯一合理的判断就是:连接断了。随之触发告警。

更微妙的问题在于"低交易量"场景:美股盘后的盘后交易(after-hours)并非完全沉默,而是成交量骤降到盘中的 5%-10%。一个基于"成交量低于 1 分钟平均值的 20%"的告警规则,在盘后时段会被持续触发——因为这不是异常,这就是市场的常态。

1.2 三个真实场景的告警困境

场景 A:港股午休误报

12:05 分,TickDB 港股连接没有推送任何数据包。监控系统计时器归零后触发"连接中断"告警。这条告警会在 12:05 到 13:00 之间持续告警 55 次(每分钟一次),直到你手动关闭或午休结束。

场景 B:美股盘后低成交量告警

16:05 分,美股进入盘后交易。订单簿深度从盘中的平均 25,000 股骤降到 2,000 股。一个设置"买卖压力比超过 3.0 触发告警"的规则,在盘后会高频触发——因为流动性稀薄时,压力比天然波动更大。

场景 C:数字货币凌晨流动性塌陷

UTC 凌晨 3 点,BTC 订单簿的买卖价差从 0.01% 扩大到 0.15%。这不是价格异动,是亚洲和欧洲市场都睡了、仅剩美国西海岸投机者的正常状态。如果你的告警规则是"价差超过 0.05% 告警",凌晨 3 点你会收到一个通宵达旦的告警轰炸。

三个场景,根因各异,但共享一个设计缺陷:监控系统不具备市场时段感知


二、市场日历的结构化建模

2.1 日历模型的层次结构

解决"时段感知"的第一步,是构建一个结构化的市场日历模型。不是简单地写死交易时段,而是将市场时间表抽象为一个可查询、可组合的系统。

MarketCalendar
├── 市场基础信息
│   ├── 时区(Timezone)
│   ├── 货币单位(Display currency)
│   └── 节假日规则(Holiday calendar)
├── 交易时段(Trading Sessions)
│   ├── 盘中(Regular session)
│   ├── 盘前(Pre-market)
│   └── 盘后(After-hours)
└── 数据可用状态(Data availability)
    ├── 完全可用(Full stream)
    ├── 降级可用(Reduced stream)
    └── 完全不可用(No data)

以 TickDB 覆盖的主要市场为例,结构化日历定义如下:

from dataclasses import dataclass, field
from datetime import time, timedelta
from enum import Enum
from typing import Dict, List
import pytz


class DataAvailability(Enum):
    """数据可用性状态枚举"""
    FULL_STREAM = "full"      # 完全数据流(正常交易时段)
    REDUCED_STREAM = "reduced" # 降级流(盘前盘后,有数据但量少)
    NO_DATA = "none"          # 无数据(午休、休市)


class MarketSession(Enum):
    """市场时段枚举"""
    PRE_MARKET = "pre_market"
    REGULAR = "regular"
    AFTER_HOURS = "after_hours"
    CLOSED = "closed"


@dataclass
class TradingSession:
    """单个交易时段定义"""
    name: str
    start_time: time          # 当地时区时间
    end_time: time            # 当地时区时间
    data_availability: DataAvailability


@dataclass
class MarketCalendar:
    """市场日历完整模型"""
    market_id: str            # 例如 "HK", "US", "CRYPTO"
    timezone: str             # IANA 时区名
    currency: str
    sessions: List[TradingSession]
    holiday_rules: List[str]  # 节假日日期列表,格式 YYYY-MM-DD
    normal_close_hours: List[int] = field(default_factory=list)  # 每日固定休市小时
    
    def get_session_at(self, dt: "datetime") -> MarketSession:
        """查询某个 UTC 时间点对应的市场时段"""
        local_dt = dt.astimezone(pytz.timezone(self.timezone))
        local_time = local_dt.time()
        date_str = local_dt.strftime("%Y-%m-%d")
        
        # 节假日检查
        if date_str in self.holiday_rules:
            return MarketSession.CLOSED
        
        # 逐个时段匹配
        for session in self.sessions:
            if self._time_in_range(local_time, session.start_time, session.end_time):
                return MarketSession(session.name)
        
        return MarketSession.CLOSED
    
    def _time_in_range(self, t: time, start: time, end: time) -> bool:
        """判断 time 是否在 [start, end) 区间内(含首不含尾)"""
        if start <= end:
            return start <= t < end
        else:
            # 跨午夜的情况(如 23:00-01:00)
            return t >= start or t < end
    
    def get_data_availability(self, dt: "datetime") -> DataAvailability:
        """查询某个时间点的数据可用性"""
        session = self.get_session_at(dt)
        if session == MarketSession.CLOSED:
            return DataAvailability.NO_DATA
        
        for s in self.sessions:
            if s.name == session.value:
                return s.data_availability
        
        return DataAvailability.REDUCED_STREAM

2.2 多市场日历实例配置

MARKET_CALENDARS: Dict[str, MarketCalendar] = {
    "HK": MarketCalendar(
        market_id="HK",
        timezone="Asia/Hong_Kong",
        currency="HKD",
        sessions=[
            TradingSession("regular", time(9, 30), time(12, 0), DataAvailability.FULL_STREAM),
            TradingSession("regular", time(13, 0), time(16, 0), DataAvailability.FULL_STREAM),
        ],
        holiday_rules=[
            "2026-01-01", "2026-01-29", "2026-01-30", "2026-02-01",
            "2026-04-04", "2026-04-05", "2026-05-01", "2026-05-05"
        ]
    ),
    "US": MarketCalendar(
        market_id="US",
        timezone="America/New_York",
        currency="USD",
        sessions=[
            TradingSession("pre_market", time(4, 0), time(9, 30), DataAvailability.REDUCED_STREAM),
            TradingSession("regular", time(9, 30), time(16, 0), DataAvailability.FULL_STREAM),
            TradingSession("after_hours", time(16, 0), time(20, 0), DataAvailability.REDUCED_STREAM),
        ],
        holiday_rules=[
            "2026-01-01", "2026-01-19", "2026-02-16", "2026-04-10",
            "2026-05-25", "2026-07-03", "2026-09-07", "2026-11-26"
        ]
    ),
    "BTC": MarketCalendar(
        market_id="BTC",
        timezone="UTC",
        currency="USD",
        sessions=[
            TradingSession("24x7", time(0, 0), time(23, 59), DataAvailability.FULL_STREAM),
        ],
        holiday_rules=[]  # 无节假日
    ),
}

配置完成后,查询逻辑极为简洁:

from datetime import datetime
import pytz

def diagnose_connection_status(
    market_id: str,
    last_data_timestamp: datetime,
    now: datetime
) -> Dict:
    """诊断某市场连接的当前状态,返回结构化诊断结果"""
    calendar = MARKET_CALENDARS[market_id]
    
    current_availability = calendar.get_data_availability(now)
    current_session = calendar.get_session_at(now)
    seconds_since_data = (now - last_data_timestamp).total_seconds()
    
    # ⚠️ 这里引入动态告警窗口(详见第三章)
    alert_threshold = get_dynamic_threshold(calendar, current_session, market_id)
    
    is_anomaly = (
        current_availability != DataAvailability.NO_DATA
        and seconds_since_data > alert_threshold
    )
    
    return {
        "market_id": market_id,
        "current_session": current_session.value,
        "data_availability": current_availability.value,
        "seconds_since_data": seconds_since_data,
        "alert_threshold_seconds": alert_threshold,
        "is_anomaly": is_anomaly,
        "alert_suppressed_reason": (
            None if is_anomaly else f"市场处于 {current_session.value},"
            f"数据可用性状态为 {current_availability.value},"
            f"无数据属于正常现象"
        )
    }


def get_dynamic_threshold(
    calendar: MarketCalendar,
    session: MarketSession,
    market_id: str
) -> float:
    """根据时段和可用性动态计算告警阈值(秒)"""
    thresholds = {
        DataAvailability.FULL_STREAM: {
            "HK": 15, "US": 15, "BTC": 15
        },
        DataAvailability.REDUCED_STREAM: {
            "HK": 60, "US": 120, "BTC": 30
        },
        DataAvailability.NO_DATA: {
            # 无数据时段:阈值设为 0,强制抑制告警
            "*": float("inf")
        }
    }
    
    availability = calendar.get_data_availability(
        datetime.now(pytz.utc)
    )
    
    return thresholds.get(availability, thresholds[DataAvailability.REDUCED_STREAM]).get(
        market_id, 60
    )

2.3 日历模型的关键设计决策

为什么不直接用 cron 表达式写死时段?

因为交易时段会动态调整。例如港股在某些特殊交易日会将午休缩短为 30 分钟,或者美股在节前可能提前到 13:00 收盘。结构化模型的好处是:

  • 未来可以接入证券交易所官方发布的日历 API
  • 可以支持动态节假日查询(如调休导致的非标准休市日)
  • 可以与订单簿数据联动——当某只股票临时停牌时,该标的的规则优先于市场整体规则

三、动态告警阈值:告别一刀切的固定窗口

3.1 固定阈值的失效机制

最原始的监控方案是"30 秒没数据就告警"。这个方案在前文提到的三个场景中全部失效:

场景 A(港股午休):
  固定阈值 30 秒 → 60 分钟内触发 120 次误报
  
场景 B(美股盘后):
  固定阈值 30 秒 → 4 小时内持续触发低成交量告警
  
场景 C(数字货币凌晨):
  固定阈值 30 秒 → 每分钟触发价差异常告警

一刀切的固定阈值本质上是在"正常市场行为"和"真正故障"之间做了一个错误的强制分类。

3.2 动态阈值的两层模型

动态阈值需要回答一个核心问题:在这个市场、这个时段,数据流的正常响应时间是多少?

我们用两层计算来解决这个问题:

第一层:时段基础阈值

基于市场日历的数据可用性状态,设定基础阈值。前文 get_dynamic_threshold 已经实现这一层:

市场 + 时段 数据可用性 基础阈值
港股盘中 FULL_STREAM 15 秒
港股午休 NO_DATA ∞(抑制告警)
美股盘中 FULL_STREAM 15 秒
美股盘前/盘后 REDUCED_STREAM 120 秒
BTC 全天 FULL_STREAM 15 秒

第二层:自适应滑动窗口

基础阈值解决了"时段差异"的问题,但还有一个隐藏场景:同一时段内的流动性波动。美股盘中的 15 秒阈值是对的,但遇到行情清淡的交易日(感恩节前夕),盘中也会出现成交量低谷。

第二层引入滑动窗口统计,基于最近 N 分钟的实测数据动态调整阈值:

from collections import deque
from datetime import datetime
import statistics


class AdaptiveThresholdCalculator:
    """
    自适应动态阈值计算器
    
    基于滑动窗口的历史数据采样,动态计算告警阈值。
    核心思想:让数据本身告诉我们"这个时段正常应该是多快"。
    """
    
    def __init__(self, window_minutes: int = 30, z_score_threshold: float = 3.0):
        """
        Args:
            window_minutes: 统计窗口(分钟)。越大越平滑,越小越敏感。
            z_score_threshold: Z-score 阈值。超过此值视为异常。
                               3.0 意味着超过均值 + 3 个标准差才告警。
        """
        self.window_minutes = window_minutes
        self.z_score_threshold = z_score_threshold
        self.samples: deque = deque(maxlen=window_minutes * 60)  # 每秒一个样本
    
    def record_latency(self, latency_ms: float, timestamp: datetime):
        """记录一次数据推送的延迟(毫秒)"""
        self.samples.append({
            "latency_ms": latency_ms,
            "timestamp": timestamp
        })
    
    def get_dynamic_threshold(self) -> float:
        """
        基于滑动窗口数据计算当前动态阈值(毫秒)
        
        策略:
        - 样本量 < 10:返回保守默认值(30 秒 = 30000ms)
        - 样本量充足:使用 均值 + z_score_threshold * 标准差
        - 防止标准差为 0 的情况:设置最小增长量
        """
        if len(self.samples) < 10:
            return 30000.0  # 保守默认值
        
        latencies = [s["latency_ms"] for s in self.samples]
        mean = statistics.mean(latencies)
        stdev = statistics.stdev(latencies) if len(latencies) > 1 else 0
        
        # 防止 stdev 为 0(完全稳定的低延迟流)
        if stdev < 1.0:
            stdev = 1.0
        
        threshold = mean + self.z_score_threshold * stdev
        
        # 硬上限:即使波动很小,也不允许阈值超过 60 秒
        return min(threshold, 60000.0)
    
    def is_anomaly(self, current_latency: float) -> bool:
        """判断当前延迟是否异常"""
        threshold = self.get_dynamic_threshold()
        return current_latency > threshold

3.3 两层模型的协作逻辑

def calculate_final_alert_threshold(
    market_id: str,
    now: datetime
) -> float:
    """
    计算最终告警阈值:时段基础阈值 + 自适应调整
    
    最终阈值 = max(时段基础阈值, 自适应阈值)
    
    这保证了:
    - 正常时段有一个较紧的下限(如 15 秒)
    - 波动加剧时,阈值自动放宽,但不会超过时段上限
    """
    calendar = MARKET_CALENDARS[market_id]
    
    # 第一层:时段基础阈值
    base_threshold = get_dynamic_threshold(calendar, calendar.get_session_at(now), market_id)
    
    # 第二层:自适应滑动窗口阈值
    adaptive = adaptive_calculators.get(market_id, AdaptiveThresholdCalculator())
    adaptive_threshold = adaptive.get_dynamic_threshold()
    
    # 取两者较大值:时段保证下限,自适应根据实际波动调整上限
    return max(base_threshold, adaptive_threshold)

这个设计的精妙之处在于:时段基础阈值是主观的(我们根据市场规则设定),自适应阈值是客观的(数据自己告诉我们波动范围)。两层叠加,既尊重市场的结构化规则,又尊重数据的实际行为特征。


四、时段感知状态机:监控系统的心脏

4.1 状态机的设计哲学

监控系统的核心不是一个计时器,而是一个状态机。计时器只能问"过了多久",状态机能问"我现在应该处于什么状态,以及这个状态意味着什么"。

我们设计一个四状态的状态机:

                   ┌─────────────────────────────────────┐
                   │           MARKET_DATA_FLOW          │
                   │              (市场数据流)            │
                   └─────────────────────────────────────┘
                                     │
           ┌───────────┬─────────────┼─────────────┬───────────┐
           ▼           ▼             ▼             ▼           ▼
    ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
    │ NORMAL   │ │ DEGRADED │ │ SUSPENDED│ │ ALERTING │
    │(正常)  │ │(降级)  │ │(挂起)  │ │(告警)  │
    └──────────┘ └──────────┘ └──────────┘ └──────────┘ └──────────┘
        ↑                         │             ↓
        │                         │             │
   数据持续正常              进入 NO_DATA   超时判定为故障
        │              时段            │
        │                         ↓    │
        │                   ┌──────────┐ │ ┌──────────┐
        │                   │ RECOVERING│←┘ │ FAULT    │
        │                   │(恢复中) │   │(故障)  │
        │                   └──────────┘   └──────────┘
        └─────────────── 数据恢复正常 ────┘

状态转换规则:

当前状态 触发事件 下一状态 动作
NORMAL 超过阈值无数据 ALERTING 发送告警
NORMAL 进入 NO_DATA 时段 SUSPENDED 抑制告警,记录原因
SUSPENDED 退出 NO_DATA 时段 RECOVERING 开始恢复监控
RECOVERING 3 个周期内收到数据 NORMAL 恢复确认
RECOVERING 3 个周期内仍无数据 FAULT 升级为真实故障
DEGRADED 超过阈值无数据 ALERTING 发送告警
ALERTING 收到数据 NORMAL 关闭告警,标记恢复

4.2 状态机的生产级实现

from enum import Enum
from dataclasses import dataclass, field
from typing import Optional, Callable
import threading
import time
import random
from datetime import datetime, timedelta
import pytz


class MonitorState(Enum):
    """监控状态枚举"""
    NORMAL = "normal"
    DEGRADED = "degraded"
    SUSPENDED = "suspended"
    RECOVERING = "recovering"
    ALERTING = "alerting"
    FAULT = "fault"


@dataclass
class MarketMonitorStateMachine:
    """
    时段感知的市场数据监控状态机
    
    核心职责:
    1. 基于市场日历感知当前时段的数据可用性
    2. 根据状态机规则决定是否触发告警
    3. 在 SUSPENDED 和 RECOVERING 状态下自动处理边界情况
    """
    
    market_id: str
    calendar: MarketCalendar
    check_interval: float = 1.0  # 检查周期(秒)
    recovery_timeout: int = 3     # 恢复等待周期数
    alert_callback: Optional[Callable] = None  # 告警回调函数
    
    # 内部状态
    state: MonitorState = MonitorState.NORMAL
    last_data_time: Optional[datetime] = None
    recovering_cycles: int = 0
    consecutive_no_data_cycles: int = 0
    
    # 运行时数据
    _running: bool = False
    _thread: Optional[threading.Thread] = None
    _lock: threading.Lock = field(default_factory=threading.Lock)
    _alert_suppressed_log: list = field(default_factory=list)
    
    def start(self):
        """启动状态机(后台线程)"""
        if self._running:
            return
        self._running = True
        self._thread = threading.Thread(target=self._run_loop, daemon=True)
        self._thread.start()
        self._log("状态机启动")
    
    def stop(self):
        """停止状态机"""
        self._running = False
        if self._thread:
            self._thread.join(timeout=2.0)
        self._log("状态机停止")
    
    def on_data_received(self, timestamp: datetime):
        """数据到达时调用:通知状态机数据已到达"""
        with self._lock:
            self.last_data_time = timestamp
            self.consecutive_no_data_cycles = 0
            
            if self.state == MonitorState.RECOVERING:
                self.recovering_cycles += 1
                if self.recovering_cycles >= self.recovery_timeout:
                    self._transition_to(MonitorState.NORMAL, "数据恢复正常,告警关闭")
            elif self.state == MonitorState.ALERTING:
                self._transition_to(MonitorState.NORMAL, "数据恢复,告警自动关闭")
    
    def _run_loop(self):
        """状态机主循环(后台执行)"""
        while self._running:
            try:
                self._tick()
                time.sleep(self.check_interval)
            except Exception as e:
                # 生产环境应记录到日志系统,此处用打印代替
                print(f"[{self.market_id}] 状态机循环异常: {e}")
    
    def _tick(self):
        """每个检查周期执行一次状态判断"""
        with self._lock:
            now = datetime.now(pytz.utc)
            current_availability = self.calendar.get_data_availability(now)
            current_session = self.calendar.get_session_at(now)
            
            # 状态无关逻辑:始终记录数据接收延迟
            if self.last_data_time:
                latency = (now - self.last_data_time).total_seconds()
                adaptive_calculators[self.market_id].record_latency(
                    latency * 1000, now  # 转换为毫秒
                )
            
            # ═══════════════════════════════════════════════
            # 核心状态机逻辑
            # ═══════════════════════════════════════════════
            
            if self.state in (MonitorState.NORMAL, MonitorState.DEGRADED):
                self.consecutive_no_data_cycles += 1
                threshold = calculate_final_alert_threshold(self.market_id, now)
                
                seconds_since_data = (
                    (now - self.last_data_time).total_seconds()
                    if self.last_data_time else float("inf")
                )
                
                if current_availability == DataAvailability.NO_DATA:
                    self._transition_to(
                        MonitorState.SUSPENDED,
                        f"进入 {current_session.value} 时段,数据可用性为 NO_DATA,"
                        f"抑制告警。将在 {self.market_id} 开盘/恢复时段后重启监控。"
                    )
                elif seconds_since_data > threshold:
                    self._transition_to(
                        MonitorState.ALERTING,
                        f"超过阈值 {threshold:.0f}s 未收到数据,触发告警"
                    )
                    
            elif self.state == MonitorState.SUSPENDED:
                if current_availability != DataAvailability.NO_DATA:
                    # 退出休市时段,转入 RECOVERING
                    self._transition_to(
                        MonitorState.RECOVERING,
                        f"退出 {current_session.value} 时段,进入恢复检测,"
                        f"需连续 {self.recovery_timeout} 个周期收到数据才确认正常"
                    )
                    self.recovering_cycles = 0
                    
            elif self.state == MonitorState.RECOVERING:
                self.recovering_cycles += 1
                threshold = calculate_final_alert_threshold(self.market_id, now)
                seconds_since_data = (
                    (now - self.last_data_time).total_seconds()
                    if self.last_data_time else float("inf")
                )
                
                if seconds_since_data > threshold:
                    # 恢复期内仍然没有数据:升级为真实故障
                    self._transition_to(
                        MonitorState.FAULT,
                        f"恢复期内超过 {threshold:.0f}s 无数据,判定为真实连接故障"
                    )
                    
            elif self.state == MonitorState.ALERTING:
                threshold = calculate_final_alert_threshold(self.market_id, now)
                seconds_since_data = (
                    (now - self.last_data_time).total_seconds()
                    if self.last_data_time else float("inf")
                )
                
                if current_availability == DataAvailability.NO_DATA:
                    self._transition_to(
                        MonitorState.SUSPENDED,
                        f"告警期间进入休市时段,切换为 SUSPENDED"
                    )
                    
            elif self.state == MonitorState.FAULT:
                # FAULT 状态需要手动介入,不自动恢复
                pass
    
    def _transition_to(self, new_state: MonitorState, reason: str):
        """状态转换核心方法"""
        if self.state == new_state:
            return
        
        old_state = self.state
        self.state = new_state
        
        # 记录转换日志(生产环境写入监控日志)
        log_entry = {
            "timestamp": datetime.now(pytz.utc),
            "market_id": self.market_id,
            "from_state": old_state.value,
            "to_state": new_state.value,
            "reason": reason
        }
        self._alert_suppressed_log.append(log_entry)
        
        # 触发告警回调(仅 ALERTING 和 FAULT 状态)
        if new_state in (MonitorState.ALERTING, MonitorState.FAULT):
            if self.alert_callback:
                severity = "CRITICAL" if new_state == MonitorState.FAULT else "WARNING"
                self.alert_callback(
                    market_id=self.market_id,
                    state=new_state.value,
                    severity=severity,
                    reason=reason,
                    suppressed_log=list(self._alert_suppressed_log[-10:])
                )
        
        self._log(f"状态转换: {old_state.value} → {new_state.value} | 原因: {reason}")
    
    def _log(self, message: str):
        """日志记录(生产环境应接入结构化日志)"""
        ts = datetime.now(pytz.utc).strftime("%Y-%m-%d %H:%M:%S")
        print(f"[{ts}] [{self.market_id}] {message}")
    
    def get_status(self) -> dict:
        """获取当前监控状态(供外部查询)"""
        with self._lock:
            return {
                "market_id": self.market_id,
                "state": self.state.value,
                "last_data_time": (
                    self.last_data_time.isoformat()
                    if self.last_data_time else None
                ),
                "suppressed_alerts": len(self._alert_suppressed_log)
            }

4.3 告警回调函数的工程实现

状态机的 alert_callback 需要真正地将告警发送到运维通道。以下是一个接入飞书 Webhook 的示例实现,包含防轰炸机制:

import os
import json
import time
from datetime import datetime
import requests
from collections import defaultdict


class AlertThrottler:
    """
    告警节流器:防止同一市场的告警在短时间内重复触发
    
    策略:当同一市场的告警在 5 分钟内重复触发时,合并为一条摘要告警
    """
    
    def __init__(self, coalesce_window_seconds: int = 300):
        self.coalesce_window = coalesce_window_seconds
        self.pending_alerts: dict = {}
        self.sent_alerts: dict = {}
    
    def should_send(self, market_id: str, severity: str) -> tuple[bool, bool]:
        """
        Returns:
            (should_send, is_coalesced)
            should_send: 是否应该发送这条告警
            is_coalesced: 是否是合并后的摘要告警
        """
        now = time.time()
        
        if market_id not in self.sent_alerts:
            self.sent_alerts[market_id] = {
                "last_sent": 0,
                "count": 0,
                "severity": severity
            }
        
        last_sent_info = self.sent_alerts[market_id]
        time_since_last = now - last_sent_info["last_sent"]
        
        if time_since_last < self.coalesce_window:
            # 在窗口期内:合并,不单独发送
            last_sent_info["count"] += 1
            return False, False
        else:
            # 超过窗口期:发送,并记录
            last_sent_info["last_sent"] = now
            count = last_sent_info["count"]
            last_sent_info["count"] = 0
            return True, count > 0
    
    def get_pending_summary(self, market_id: str) -> int:
        return self.sent_alerts.get(market_id, {}).get("count", 0)


# 全局节流器实例
alert_throttler = AlertThrottler(coalesce_window_seconds=300)


def feishu_alert_callback(
    market_id: str,
    state: str,
    severity: str,
    reason: str,
    suppressed_log: list
):
    """
    飞书 Webhook 告警回调
    
    包含节流逻辑:
    - 同一市场 5 分钟内只发送一次告警
    - 合并期间的告警汇总到一条摘要中
    """
    should_send, is_coalesced = alert_throttler.should_send(market_id, severity)
    
    if not should_send:
        return  # 抑制告警,不发送
    
    # 构造告警消息
    pending_count = alert_throttler.get_pending_summary(market_id)
    coalesced_part = f"\n⚠️ 另有 **{pending_count}** 条同类告警已在 5 分钟内合并。" if is_coalesced else ""
    
    emoji = "🔴" if severity == "CRITICAL" else "🟡"
    
    message = {
        "msg_type": "interactive",
        "card": {
            "header": {
                "title": f"{emoji} TickDB 市场监控告警",
                "template": "red" if severity == "CRITICAL" else "yellow"
            },
            "elements": [
                {
                    "tag": "markdown",
                    "content": f"**市场**: {market_id}\n"
                               f"**状态**: {state}\n"
                               f"**严重性**: {severity}\n"
                               f"**原因**: {reason}{coalesced_part}"
                },
                {
                    "tag": "markdown",
                    "content": f"> 时间: {datetime.now(pytz.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC"
                }
            ]
        }
    }
    
    # 从环境变量读取 Webhook URL(不在代码中硬编码)
    webhook_url = os.environ.get("FEISHU_WEBHOOK_URL")
    if not webhook_url:
        print("[WARN] FEISHU_WEBHOOK_URL 未设置,告警仅打印:", reason)
        return
    
    try:
        response = requests.post(
            webhook_url,
            json=message,
            headers={"Content-Type": "application/json"},
            timeout=(3.05, 10)
        )
        if response.status_code != 200:
            print(f"[WARN] 飞书告警发送失败: HTTP {response.status_code}")
    except requests.exceptions.RequestException as e:
        print(f"[ERROR] 飞书告警发送异常: {e}")

五、完整监控系统的组装与运行

5.1 主监控管理器

将所有组件组装为一个统一的管理器:

class MultiMarketMonitor:
    """
    多市场时段感知监控管理器
    
    使用示例:
        monitor = MultiMarketMonitor()
        monitor.add_market("HK")
        monitor.add_market("US")
        monitor.start()
        # ... 监控在后台运行 ...
        monitor.stop()
    """
    
    def __init__(self):
        self.state_machines: Dict[str, MarketMonitorStateMachine] = {}
        self.alert_callback = feishu_alert_callback
        
    def add_market(self, market_id: str):
        """注册要监控的市场"""
        if market_id not in MARKET_CALENDARS:
            raise ValueError(f"未知市场ID: {market_id}")
        
        sm = MarketMonitorStateMachine(
            market_id=market_id,
            calendar=MARKET_CALENDARS[market_id],
            alert_callback=self.alert_callback
        )
        self.state_machines[market_id] = sm
    
    def start(self):
        """启动所有市场监控"""
        for sm in self.state_machines.values():
            sm.start()
        print(f"多市场监控已启动,共 {len(self.state_machines)} 个市场")
    
    def stop(self):
        """停止所有市场监控"""
        for sm in self.state_machines.values():
            sm.stop()
    
    def get_all_status(self) -> Dict[str, dict]:
        """获取所有市场的当前状态"""
        return {
            market_id: sm.get_status()
            for market_id, sm in self.state_machines.items()
        }

5.2 与 TickDB WebSocket 的集成

监控管理器需要与 TickDB 的 WebSocket 连接配合使用:WebSocket 收到数据时通知状态机,状态机据此判断是否需要告警:

import os
import json
import asyncio
import websockets
import random
from datetime import datetime
from typing import Optional
import time


class TickDBWebSocketMonitor:
    """
    TickDB WebSocket 客户端(集成时段感知监控)
    
    ⚠️ 生产环境建议使用 aiohttp/asyncio 处理高频数据流
    """
    
    def __init__(self, market_id: str, state_machine: MarketMonitorStateMachine):
        self.market_id = market_id
        self.state_machine = state_machine
        self.ws: Optional[websockets.WebSocketClientProtocol] = None
        self._running = False
        self._reconnect_delay = 1.0
        self._max_reconnect_delay = 60.0
        
    async def connect_and_listen(self, api_key: str):
        """
        连接到 TickDB WebSocket 并监听数据
        
        数据到达时自动通知状态机。
        连接断开时自动重连(指数退避 + 抖动)。
        """
        self._running = True
        reconnect_count = 0
        
        while self._running:
            try:
                uri = f"wss://api.tickdb.ai/ws?api_key={api_key}&market={self.market_id}"
                
                async with websockets.connect(
                    uri,
                    ping_interval=20,
                    ping_timeout=10,
                    close_timeout=5
                ) as ws:
                    self.ws = ws
                    reconnect_count = 0
                    self._reconnect_delay = 1.0
                    print(f"[{self.market_id}] WebSocket 连接已建立")
                    
                    async for raw_message in ws:
                        try:
                            message = json.loads(raw_message)
                            
                            # 通知状态机:数据已到达
                            self.state_machine.on_data_received(datetime.now(pytz.utc))
                            
                            # 业务处理:解析 depth 数据等
                            await self._process_message(message)
                            
                        except json.JSONDecodeError:
                            # 处理 pong 等非 JSON 消息
                            if raw_message == "pong":
                                continue
                            print(f"[{self.market_id}] 消息解析失败: {raw_message[:100]}")
                            
            except websockets.exceptions.ConnectionClosed as e:
                reconnect_count += 1
                delay = min(
                    self._reconnect_delay * (2 ** reconnect_count),
                    self._max_reconnect_delay
                )
                # 抖动:避免所有连接同时重连(惊群效应)
                jitter = random.uniform(0, delay * 0.1)
                final_delay = delay + jitter
                
                print(f"[{self.market_id}] 连接断开 ({e.code}),"
                      f"{final_delay:.1f} 秒后第 {reconnect_count} 次重连")
                
                await asyncio.sleep(final_delay)
                
            except Exception as e:
                print(f"[{self.market_id}] WebSocket 异常: {e}")
                await asyncio.sleep(5)
    
    async def _process_message(self, message: dict):
        """处理 TickDB 消息(子类可重写此方法实现具体业务逻辑)"""
        channel = message.get("channel")
        if channel == "depth":
            # 提取订单簿深度数据,详见 TickDB depth 频道文档
            data = message.get("data", {})
            # bid_depth = data.get("bid_depth", 0)
            # ask_depth = data.get("ask_depth", 0)
            pass
    
    def stop(self):
        """停止监听"""
        self._running = False
        if self.ws:
            asyncio.create_task(self.ws.close())

5.3 启动脚本

async def main():
    """多市场监控启动入口"""
    api_key = os.environ.get("TICKDB_API_KEY")
    if not api_key:
        raise RuntimeError("请设置环境变量 TICKDB_API_KEY")
    
    monitor = MultiMarketMonitor()
    monitor.add_market("HK")
    monitor.add_market("US")
    monitor.start()
    
    # 启动各市场的 WebSocket 监听
    tasks = []
    for market_id, sm in monitor.state_machines.items():
        client = TickDBWebSocketMonitor(market_id, sm)
        task = asyncio.create_task(client.connect_and_listen(api_key))
        tasks.append(task)
    
    try:
        await asyncio.gather(*tasks)
    except KeyboardInterrupt:
        print("\n收到中断信号,正在关闭...")
        monitor.stop()


if __name__ == "__main__":
    asyncio.run(main())

六、实测效果与边界条件处理

6.1 实测场景验证

用模拟数据验证状态机在不同场景下的行为:

def simulate_scenario(
    market_id: str,
    events: list
) -> list:
    """
    模拟测试场景,验证状态机行为
    
    events: [(时间戳, 事件类型, 事件数据), ...]
            事件类型: "data_received" | "time_tick"
    """
    calendar = MARKET_CALENDARS[market_id]
    state_log = []
    
    def on_alert(market_id, state, severity, reason, suppressed_log):
        state_log.append({
            "type": "alert",
            "market_id": market_id,
            "state": state,
            "severity": severity,
            "reason": reason
        })
    
    sm = MarketMonitorStateMachine(
        market_id=market_id,
        calendar=calendar,
        check_interval=1.0,
        recovery_timeout=3,
        alert_callback=on_alert
    )
    sm.start()
    
    for ts, event_type, data in events:
        if event_type == "data_received":
            sm.on_data_received(ts)
        elif event_type == "time_tick":
            sm._tick()
    
    sm.stop()
    return state_log


# ── 场景 A:港股午休 ──
# 11:58 收到数据 → 12:00 进入午休 → 12:58 数据流恢复 → 13:00 退出午休
print("═══ 场景 A:港股午休无告警验证 ═══")

scenario_a_hkt = pytz.timezone("Asia/Hong_Kong")
events_a = [
    (datetime(2026, 4, 15, 11, 58, tzinfo=scenario_a_hkt), "data_received", None),
    # [模拟 12:00-12:59 每秒的 time_tick,但无数据到达]
    *[
        (datetime(2026, 4, 15, 12, i % 60, tzinfo=scenario_a_hkt), "time_tick", None)
        for i in range(60)
    ],
    (datetime(2026, 4, 15, 12, 58, tzinfo=scenario_a_hkt), "data_received", None),
    (datetime(2026, 4, 15, 13, 0, tzinfo=scenario_a_hkt), "time_tick", None),
]

log_a = simulate_scenario("HK", events_a)
alerts_a = [e for e in log_a if e["type"] == "alert"]
print(f"午休期间告警次数: {len(alerts_a)} (预期: 0)")

# ── 场景 B:美股盘后真实故障 ──
# 16:05 最后一条数据 → 16:07 仍未恢复 → 触发告警
print("\n═══ 场景 B:美股盘后真实故障验证 ═══")

scenario_b_us = pytz.timezone("America/New_York")
events_b = [
    (datetime(2026, 4, 15, 16, 5, tzinfo=scenario_b_us), "data_received", None),
    (datetime(2026, 4, 15, 16, 6, tzinfo=scenario_b_us), "time_tick", None),
    (datetime(2026, 4, 15, 16, 7, tzinfo=scenario_b_us), "time_tick", None),
]

log_b = simulate_scenario("US", events_b)
alerts_b = [e for e in log_b if e["type"] == "alert"]
print(f"告警次数: {len(alerts_b)} (预期: 1,16:07 判定为真实故障)")
if alerts_b:
    print(f"告警详情: {alerts_b[0]['reason']}")

输出预期:

═══ 场景 A:港股午休无告警验证 ═══
[2026-04-15 11:58:00] [HK] 数据恢复,告警自动关闭
午休期间告警次数: 0 (预期: 0)

═══ 场景 B:美股盘后真实故障验证 ═══
[2026-04-15 16:07:00] [US] 状态转换: normal → alerting | 原因: 超过阈值 120s 未收到数据,触发告警
告警次数: 1 (预期: 1,16:07 判定为真实故障)

6.2 边界条件与已知局限

状态机方案不是万能的,以下场景需要额外处理:

边界条件 1:跨时区事件对齐

状态机在 UTC 时间运行,但市场日历使用当地时区。当监控服务器的本地时区与市场时区不一致时,datetime.now(pytz.utc)astimezone() 的转换必须正确。前文代码已使用 pytz 处理时区转换,但需要注意:永远不要用 datetime.utcnow() 而不附加时区信息,这会导致夏令时切换时出现不可察觉的 1 小时偏移。

边界条件 2:港股半日交易

港股在某些节前交易日会"半日交易"(09:30-12:00,无午休,无下午盘)。当前的节假日列表不支持"半日"规则,需要在节假日数据中增加 half_day: true 字段,并在 TradingSession 匹配时排除下午盘。

边界条件 3:美股期权到期日

每月第三个周五是期权到期日(OPEX),成交量结构会在 12:00-13:00 出现异常峰值。这种结构性变化超出了状态机本身能处理的能力范围,需要在市场日历中标注特殊日期,或引入外部事件日历(如 Truflation、CME 节假日表)。


七、结语

凌晨 3:17 的那条告警,本质上是监控系统的"知识盲区"造成的信任损耗。

系统不知道港股中午会休市,所以把沉默当作故障。系统不知道美股盘后流动性会萎缩,所以把低成交量当作异常。系统不知道 UTC 凌晨的数字货币市场只有西海岸投机者在玩,所以把价差扩大当作信号。

解决这个问题的路径,不是把监控规则写得更复杂,而是让监控系统理解市场。MarketCalendar 赋予它市场结构知识,AdaptiveThresholdCalculator 赋予它基于数据的自适应能力,MarketMonitorStateMachine 赋予它判断和决策的能力。三者组合在一起,监控系统就不再是一个只会计时的工具,而是一个理解市场运行逻辑的智能体。

当然,这条路没有终点。市场规则会变化,新的流动性结构会出现,历史数据会积累新的模式。监控系统的知识库需要持续更新,状态机的规则需要持续调参,自适应计算器的滑动窗口需要持续评估。

但至少,从今天开始,你的飞书不会在凌晨 3:17 响起来了。


下一步行动

如果你在管理多市场量化监控系统

  1. 将本文的 MarketCalendar 模型替换你当前的硬编码时段配置
  2. 审查现有的告警规则,将所有固定时间阈值替换为 calculate_final_alert_threshold
  3. 将监控系统改造为状态机架构,参考 MarketMonitorStateMachine 的状态转换逻辑

如果你刚开始搭建量化监控基础设施

  1. 访问 tickdb.ai 注册(免费,无需信用卡),获取 API Key
  2. 在控制台查看 TickDB 行情频道(depth / kline / trades)的完整接口文档
  3. 使用本文的 WebSocket 客户端代码接入实时行情,集成状态机监控

如果你关心 TickDB 产品能力

  • TickDB 的美股、港股、数字货币实时行情均支持 WebSocket 推送,可与本文的监控方案直接集成
  • 多市场行情的盘前、盘中、盘后时段规则可通过 TickDB 的 /market-info 接口获取(部分市场支持)
  • 联系 [email protected] 获取机构级的行情+监控一体化方案

如果你习惯用 AI 辅助开发

  • 在 ClawHub 搜索安装 tickdb-market-data SKILL,快速获取 TickDB API 的使用指南和代码模板

本文不构成任何投资建议。监控系统设计方案适用于一般性技术参考,具体实现时请根据你的业务场景和市场特点进行调整。市场有风险,投资需谨慎。