凌晨 3:17,你的飞书收到了一条告警:
ALERT: 港股行情流中断,持续 58 秒未收到数据包
你从床上弹起来抓过电脑,准备紧急排查。打开监控面板看了一眼——亚太盘正常。刷新一下交易软件——行情在走。告警继续响。你开始怀疑系统,怀疑服务器,怀疑是不是有什么神秘力量在作祟。
然后你想起来了:现在是港股午休时间,12:00-13:00 正好没有数据推送。
这种告警,相信每个做过实盘监控系统的人都不陌生。更糟糕的版本是凌晨 4 点告警美股盘后成交量异常低,你花了半小时确认不是故障,结果只是一个"低交易量的正常盘后时段"。
这不是监控系统的 bug,是监控系统设计上的根本缺陷:它不知道市场什么时候应该沉默。
本文拆解这个问题的完整解法:从市场日历的结构化建模,到动态告警阈值的计算逻辑,再到时段感知状态机的工程实现。代码全部可直接运行,适用于同时监控港股、美股、数字货币等多市场的量化团队。
一、为什么"沉默"是监控系统的最大挑战
1.1 监控系统的经典假设
传统监控系统设计时,隐含了一个假设:有数据是常态,没数据是异常。这个假设放在 7×24 运行的服务(如 HTTP API、数据库)上完全正确。但放在金融市场数据监控上,就是一场灾难。
金融市场的数据流有天然的"空白期":
| 市场 | 交易时段 | 休市/无数据时段 | 时长 |
|---|---|---|---|
| A股 | 09:30-11:30 / 13:00-15:00 | 午休 11:30-13:00 | 90 分钟 |
| 港股 | 09:30-12:00 / 13:00-16:00 | 午休 12:00-13:00 | 60 分钟 |
| 美股 | 09:30-16:00 ET | 盘前 04:00-09:30 / 盘后 16:00-20:00 | 可长达 16 小时 |
| 数字货币 | 24×7 | 无(但流动性在 UTC 0-6 点极低) | — |
如果监控系统在这些"空白期"内检测到"无数据",它唯一合理的判断就是:连接断了。随之触发告警。
更微妙的问题在于"低交易量"场景:美股盘后的盘后交易(after-hours)并非完全沉默,而是成交量骤降到盘中的 5%-10%。一个基于"成交量低于 1 分钟平均值的 20%"的告警规则,在盘后时段会被持续触发——因为这不是异常,这就是市场的常态。
1.2 三个真实场景的告警困境
场景 A:港股午休误报
12:05 分,TickDB 港股连接没有推送任何数据包。监控系统计时器归零后触发"连接中断"告警。这条告警会在 12:05 到 13:00 之间持续告警 55 次(每分钟一次),直到你手动关闭或午休结束。
场景 B:美股盘后低成交量告警
16:05 分,美股进入盘后交易。订单簿深度从盘中的平均 25,000 股骤降到 2,000 股。一个设置"买卖压力比超过 3.0 触发告警"的规则,在盘后会高频触发——因为流动性稀薄时,压力比天然波动更大。
场景 C:数字货币凌晨流动性塌陷
UTC 凌晨 3 点,BTC 订单簿的买卖价差从 0.01% 扩大到 0.15%。这不是价格异动,是亚洲和欧洲市场都睡了、仅剩美国西海岸投机者的正常状态。如果你的告警规则是"价差超过 0.05% 告警",凌晨 3 点你会收到一个通宵达旦的告警轰炸。
三个场景,根因各异,但共享一个设计缺陷:监控系统不具备市场时段感知。
二、市场日历的结构化建模
2.1 日历模型的层次结构
解决"时段感知"的第一步,是构建一个结构化的市场日历模型。不是简单地写死交易时段,而是将市场时间表抽象为一个可查询、可组合的系统。
MarketCalendar
├── 市场基础信息
│ ├── 时区(Timezone)
│ ├── 货币单位(Display currency)
│ └── 节假日规则(Holiday calendar)
├── 交易时段(Trading Sessions)
│ ├── 盘中(Regular session)
│ ├── 盘前(Pre-market)
│ └── 盘后(After-hours)
└── 数据可用状态(Data availability)
├── 完全可用(Full stream)
├── 降级可用(Reduced stream)
└── 完全不可用(No data)
以 TickDB 覆盖的主要市场为例,结构化日历定义如下:
from dataclasses import dataclass, field
from datetime import time, timedelta
from enum import Enum
from typing import Dict, List
import pytz
class DataAvailability(Enum):
"""数据可用性状态枚举"""
FULL_STREAM = "full" # 完全数据流(正常交易时段)
REDUCED_STREAM = "reduced" # 降级流(盘前盘后,有数据但量少)
NO_DATA = "none" # 无数据(午休、休市)
class MarketSession(Enum):
"""市场时段枚举"""
PRE_MARKET = "pre_market"
REGULAR = "regular"
AFTER_HOURS = "after_hours"
CLOSED = "closed"
@dataclass
class TradingSession:
"""单个交易时段定义"""
name: str
start_time: time # 当地时区时间
end_time: time # 当地时区时间
data_availability: DataAvailability
@dataclass
class MarketCalendar:
"""市场日历完整模型"""
market_id: str # 例如 "HK", "US", "CRYPTO"
timezone: str # IANA 时区名
currency: str
sessions: List[TradingSession]
holiday_rules: List[str] # 节假日日期列表,格式 YYYY-MM-DD
normal_close_hours: List[int] = field(default_factory=list) # 每日固定休市小时
def get_session_at(self, dt: "datetime") -> MarketSession:
"""查询某个 UTC 时间点对应的市场时段"""
local_dt = dt.astimezone(pytz.timezone(self.timezone))
local_time = local_dt.time()
date_str = local_dt.strftime("%Y-%m-%d")
# 节假日检查
if date_str in self.holiday_rules:
return MarketSession.CLOSED
# 逐个时段匹配
for session in self.sessions:
if self._time_in_range(local_time, session.start_time, session.end_time):
return MarketSession(session.name)
return MarketSession.CLOSED
def _time_in_range(self, t: time, start: time, end: time) -> bool:
"""判断 time 是否在 [start, end) 区间内(含首不含尾)"""
if start <= end:
return start <= t < end
else:
# 跨午夜的情况(如 23:00-01:00)
return t >= start or t < end
def get_data_availability(self, dt: "datetime") -> DataAvailability:
"""查询某个时间点的数据可用性"""
session = self.get_session_at(dt)
if session == MarketSession.CLOSED:
return DataAvailability.NO_DATA
for s in self.sessions:
if s.name == session.value:
return s.data_availability
return DataAvailability.REDUCED_STREAM
2.2 多市场日历实例配置
MARKET_CALENDARS: Dict[str, MarketCalendar] = {
"HK": MarketCalendar(
market_id="HK",
timezone="Asia/Hong_Kong",
currency="HKD",
sessions=[
TradingSession("regular", time(9, 30), time(12, 0), DataAvailability.FULL_STREAM),
TradingSession("regular", time(13, 0), time(16, 0), DataAvailability.FULL_STREAM),
],
holiday_rules=[
"2026-01-01", "2026-01-29", "2026-01-30", "2026-02-01",
"2026-04-04", "2026-04-05", "2026-05-01", "2026-05-05"
]
),
"US": MarketCalendar(
market_id="US",
timezone="America/New_York",
currency="USD",
sessions=[
TradingSession("pre_market", time(4, 0), time(9, 30), DataAvailability.REDUCED_STREAM),
TradingSession("regular", time(9, 30), time(16, 0), DataAvailability.FULL_STREAM),
TradingSession("after_hours", time(16, 0), time(20, 0), DataAvailability.REDUCED_STREAM),
],
holiday_rules=[
"2026-01-01", "2026-01-19", "2026-02-16", "2026-04-10",
"2026-05-25", "2026-07-03", "2026-09-07", "2026-11-26"
]
),
"BTC": MarketCalendar(
market_id="BTC",
timezone="UTC",
currency="USD",
sessions=[
TradingSession("24x7", time(0, 0), time(23, 59), DataAvailability.FULL_STREAM),
],
holiday_rules=[] # 无节假日
),
}
配置完成后,查询逻辑极为简洁:
from datetime import datetime
import pytz
def diagnose_connection_status(
market_id: str,
last_data_timestamp: datetime,
now: datetime
) -> Dict:
"""诊断某市场连接的当前状态,返回结构化诊断结果"""
calendar = MARKET_CALENDARS[market_id]
current_availability = calendar.get_data_availability(now)
current_session = calendar.get_session_at(now)
seconds_since_data = (now - last_data_timestamp).total_seconds()
# ⚠️ 这里引入动态告警窗口(详见第三章)
alert_threshold = get_dynamic_threshold(calendar, current_session, market_id)
is_anomaly = (
current_availability != DataAvailability.NO_DATA
and seconds_since_data > alert_threshold
)
return {
"market_id": market_id,
"current_session": current_session.value,
"data_availability": current_availability.value,
"seconds_since_data": seconds_since_data,
"alert_threshold_seconds": alert_threshold,
"is_anomaly": is_anomaly,
"alert_suppressed_reason": (
None if is_anomaly else f"市场处于 {current_session.value},"
f"数据可用性状态为 {current_availability.value},"
f"无数据属于正常现象"
)
}
def get_dynamic_threshold(
calendar: MarketCalendar,
session: MarketSession,
market_id: str
) -> float:
"""根据时段和可用性动态计算告警阈值(秒)"""
thresholds = {
DataAvailability.FULL_STREAM: {
"HK": 15, "US": 15, "BTC": 15
},
DataAvailability.REDUCED_STREAM: {
"HK": 60, "US": 120, "BTC": 30
},
DataAvailability.NO_DATA: {
# 无数据时段:阈值设为 0,强制抑制告警
"*": float("inf")
}
}
availability = calendar.get_data_availability(
datetime.now(pytz.utc)
)
return thresholds.get(availability, thresholds[DataAvailability.REDUCED_STREAM]).get(
market_id, 60
)
2.3 日历模型的关键设计决策
为什么不直接用 cron 表达式写死时段?
因为交易时段会动态调整。例如港股在某些特殊交易日会将午休缩短为 30 分钟,或者美股在节前可能提前到 13:00 收盘。结构化模型的好处是:
- 未来可以接入证券交易所官方发布的日历 API
- 可以支持动态节假日查询(如调休导致的非标准休市日)
- 可以与订单簿数据联动——当某只股票临时停牌时,该标的的规则优先于市场整体规则
三、动态告警阈值:告别一刀切的固定窗口
3.1 固定阈值的失效机制
最原始的监控方案是"30 秒没数据就告警"。这个方案在前文提到的三个场景中全部失效:
场景 A(港股午休):
固定阈值 30 秒 → 60 分钟内触发 120 次误报
场景 B(美股盘后):
固定阈值 30 秒 → 4 小时内持续触发低成交量告警
场景 C(数字货币凌晨):
固定阈值 30 秒 → 每分钟触发价差异常告警
一刀切的固定阈值本质上是在"正常市场行为"和"真正故障"之间做了一个错误的强制分类。
3.2 动态阈值的两层模型
动态阈值需要回答一个核心问题:在这个市场、这个时段,数据流的正常响应时间是多少?
我们用两层计算来解决这个问题:
第一层:时段基础阈值
基于市场日历的数据可用性状态,设定基础阈值。前文 get_dynamic_threshold 已经实现这一层:
| 市场 + 时段 | 数据可用性 | 基础阈值 |
|---|---|---|
| 港股盘中 | FULL_STREAM | 15 秒 |
| 港股午休 | NO_DATA | ∞(抑制告警) |
| 美股盘中 | FULL_STREAM | 15 秒 |
| 美股盘前/盘后 | REDUCED_STREAM | 120 秒 |
| BTC 全天 | FULL_STREAM | 15 秒 |
第二层:自适应滑动窗口
基础阈值解决了"时段差异"的问题,但还有一个隐藏场景:同一时段内的流动性波动。美股盘中的 15 秒阈值是对的,但遇到行情清淡的交易日(感恩节前夕),盘中也会出现成交量低谷。
第二层引入滑动窗口统计,基于最近 N 分钟的实测数据动态调整阈值:
from collections import deque
from datetime import datetime
import statistics
class AdaptiveThresholdCalculator:
"""
自适应动态阈值计算器
基于滑动窗口的历史数据采样,动态计算告警阈值。
核心思想:让数据本身告诉我们"这个时段正常应该是多快"。
"""
def __init__(self, window_minutes: int = 30, z_score_threshold: float = 3.0):
"""
Args:
window_minutes: 统计窗口(分钟)。越大越平滑,越小越敏感。
z_score_threshold: Z-score 阈值。超过此值视为异常。
3.0 意味着超过均值 + 3 个标准差才告警。
"""
self.window_minutes = window_minutes
self.z_score_threshold = z_score_threshold
self.samples: deque = deque(maxlen=window_minutes * 60) # 每秒一个样本
def record_latency(self, latency_ms: float, timestamp: datetime):
"""记录一次数据推送的延迟(毫秒)"""
self.samples.append({
"latency_ms": latency_ms,
"timestamp": timestamp
})
def get_dynamic_threshold(self) -> float:
"""
基于滑动窗口数据计算当前动态阈值(毫秒)
策略:
- 样本量 < 10:返回保守默认值(30 秒 = 30000ms)
- 样本量充足:使用 均值 + z_score_threshold * 标准差
- 防止标准差为 0 的情况:设置最小增长量
"""
if len(self.samples) < 10:
return 30000.0 # 保守默认值
latencies = [s["latency_ms"] for s in self.samples]
mean = statistics.mean(latencies)
stdev = statistics.stdev(latencies) if len(latencies) > 1 else 0
# 防止 stdev 为 0(完全稳定的低延迟流)
if stdev < 1.0:
stdev = 1.0
threshold = mean + self.z_score_threshold * stdev
# 硬上限:即使波动很小,也不允许阈值超过 60 秒
return min(threshold, 60000.0)
def is_anomaly(self, current_latency: float) -> bool:
"""判断当前延迟是否异常"""
threshold = self.get_dynamic_threshold()
return current_latency > threshold
3.3 两层模型的协作逻辑
def calculate_final_alert_threshold(
market_id: str,
now: datetime
) -> float:
"""
计算最终告警阈值:时段基础阈值 + 自适应调整
最终阈值 = max(时段基础阈值, 自适应阈值)
这保证了:
- 正常时段有一个较紧的下限(如 15 秒)
- 波动加剧时,阈值自动放宽,但不会超过时段上限
"""
calendar = MARKET_CALENDARS[market_id]
# 第一层:时段基础阈值
base_threshold = get_dynamic_threshold(calendar, calendar.get_session_at(now), market_id)
# 第二层:自适应滑动窗口阈值
adaptive = adaptive_calculators.get(market_id, AdaptiveThresholdCalculator())
adaptive_threshold = adaptive.get_dynamic_threshold()
# 取两者较大值:时段保证下限,自适应根据实际波动调整上限
return max(base_threshold, adaptive_threshold)
这个设计的精妙之处在于:时段基础阈值是主观的(我们根据市场规则设定),自适应阈值是客观的(数据自己告诉我们波动范围)。两层叠加,既尊重市场的结构化规则,又尊重数据的实际行为特征。
四、时段感知状态机:监控系统的心脏
4.1 状态机的设计哲学
监控系统的核心不是一个计时器,而是一个状态机。计时器只能问"过了多久",状态机能问"我现在应该处于什么状态,以及这个状态意味着什么"。
我们设计一个四状态的状态机:
┌─────────────────────────────────────┐
│ MARKET_DATA_FLOW │
│ (市场数据流) │
└─────────────────────────────────────┘
│
┌───────────┬─────────────┼─────────────┬───────────┐
▼ ▼ ▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ NORMAL │ │ DEGRADED │ │ SUSPENDED│ │ ALERTING │
│(正常) │ │(降级) │ │(挂起) │ │(告警) │
└──────────┘ └──────────┘ └──────────┘ └──────────┘ └──────────┘
↑ │ ↓
│ │ │
数据持续正常 进入 NO_DATA 超时判定为故障
│ 时段 │
│ ↓ │
│ ┌──────────┐ │ ┌──────────┐
│ │ RECOVERING│←┘ │ FAULT │
│ │(恢复中) │ │(故障) │
│ └──────────┘ └──────────┘
└─────────────── 数据恢复正常 ────┘
状态转换规则:
| 当前状态 | 触发事件 | 下一状态 | 动作 |
|---|---|---|---|
| NORMAL | 超过阈值无数据 | ALERTING | 发送告警 |
| NORMAL | 进入 NO_DATA 时段 | SUSPENDED | 抑制告警,记录原因 |
| SUSPENDED | 退出 NO_DATA 时段 | RECOVERING | 开始恢复监控 |
| RECOVERING | 3 个周期内收到数据 | NORMAL | 恢复确认 |
| RECOVERING | 3 个周期内仍无数据 | FAULT | 升级为真实故障 |
| DEGRADED | 超过阈值无数据 | ALERTING | 发送告警 |
| ALERTING | 收到数据 | NORMAL | 关闭告警,标记恢复 |
4.2 状态机的生产级实现
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional, Callable
import threading
import time
import random
from datetime import datetime, timedelta
import pytz
class MonitorState(Enum):
"""监控状态枚举"""
NORMAL = "normal"
DEGRADED = "degraded"
SUSPENDED = "suspended"
RECOVERING = "recovering"
ALERTING = "alerting"
FAULT = "fault"
@dataclass
class MarketMonitorStateMachine:
"""
时段感知的市场数据监控状态机
核心职责:
1. 基于市场日历感知当前时段的数据可用性
2. 根据状态机规则决定是否触发告警
3. 在 SUSPENDED 和 RECOVERING 状态下自动处理边界情况
"""
market_id: str
calendar: MarketCalendar
check_interval: float = 1.0 # 检查周期(秒)
recovery_timeout: int = 3 # 恢复等待周期数
alert_callback: Optional[Callable] = None # 告警回调函数
# 内部状态
state: MonitorState = MonitorState.NORMAL
last_data_time: Optional[datetime] = None
recovering_cycles: int = 0
consecutive_no_data_cycles: int = 0
# 运行时数据
_running: bool = False
_thread: Optional[threading.Thread] = None
_lock: threading.Lock = field(default_factory=threading.Lock)
_alert_suppressed_log: list = field(default_factory=list)
def start(self):
"""启动状态机(后台线程)"""
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._run_loop, daemon=True)
self._thread.start()
self._log("状态机启动")
def stop(self):
"""停止状态机"""
self._running = False
if self._thread:
self._thread.join(timeout=2.0)
self._log("状态机停止")
def on_data_received(self, timestamp: datetime):
"""数据到达时调用:通知状态机数据已到达"""
with self._lock:
self.last_data_time = timestamp
self.consecutive_no_data_cycles = 0
if self.state == MonitorState.RECOVERING:
self.recovering_cycles += 1
if self.recovering_cycles >= self.recovery_timeout:
self._transition_to(MonitorState.NORMAL, "数据恢复正常,告警关闭")
elif self.state == MonitorState.ALERTING:
self._transition_to(MonitorState.NORMAL, "数据恢复,告警自动关闭")
def _run_loop(self):
"""状态机主循环(后台执行)"""
while self._running:
try:
self._tick()
time.sleep(self.check_interval)
except Exception as e:
# 生产环境应记录到日志系统,此处用打印代替
print(f"[{self.market_id}] 状态机循环异常: {e}")
def _tick(self):
"""每个检查周期执行一次状态判断"""
with self._lock:
now = datetime.now(pytz.utc)
current_availability = self.calendar.get_data_availability(now)
current_session = self.calendar.get_session_at(now)
# 状态无关逻辑:始终记录数据接收延迟
if self.last_data_time:
latency = (now - self.last_data_time).total_seconds()
adaptive_calculators[self.market_id].record_latency(
latency * 1000, now # 转换为毫秒
)
# ═══════════════════════════════════════════════
# 核心状态机逻辑
# ═══════════════════════════════════════════════
if self.state in (MonitorState.NORMAL, MonitorState.DEGRADED):
self.consecutive_no_data_cycles += 1
threshold = calculate_final_alert_threshold(self.market_id, now)
seconds_since_data = (
(now - self.last_data_time).total_seconds()
if self.last_data_time else float("inf")
)
if current_availability == DataAvailability.NO_DATA:
self._transition_to(
MonitorState.SUSPENDED,
f"进入 {current_session.value} 时段,数据可用性为 NO_DATA,"
f"抑制告警。将在 {self.market_id} 开盘/恢复时段后重启监控。"
)
elif seconds_since_data > threshold:
self._transition_to(
MonitorState.ALERTING,
f"超过阈值 {threshold:.0f}s 未收到数据,触发告警"
)
elif self.state == MonitorState.SUSPENDED:
if current_availability != DataAvailability.NO_DATA:
# 退出休市时段,转入 RECOVERING
self._transition_to(
MonitorState.RECOVERING,
f"退出 {current_session.value} 时段,进入恢复检测,"
f"需连续 {self.recovery_timeout} 个周期收到数据才确认正常"
)
self.recovering_cycles = 0
elif self.state == MonitorState.RECOVERING:
self.recovering_cycles += 1
threshold = calculate_final_alert_threshold(self.market_id, now)
seconds_since_data = (
(now - self.last_data_time).total_seconds()
if self.last_data_time else float("inf")
)
if seconds_since_data > threshold:
# 恢复期内仍然没有数据:升级为真实故障
self._transition_to(
MonitorState.FAULT,
f"恢复期内超过 {threshold:.0f}s 无数据,判定为真实连接故障"
)
elif self.state == MonitorState.ALERTING:
threshold = calculate_final_alert_threshold(self.market_id, now)
seconds_since_data = (
(now - self.last_data_time).total_seconds()
if self.last_data_time else float("inf")
)
if current_availability == DataAvailability.NO_DATA:
self._transition_to(
MonitorState.SUSPENDED,
f"告警期间进入休市时段,切换为 SUSPENDED"
)
elif self.state == MonitorState.FAULT:
# FAULT 状态需要手动介入,不自动恢复
pass
def _transition_to(self, new_state: MonitorState, reason: str):
"""状态转换核心方法"""
if self.state == new_state:
return
old_state = self.state
self.state = new_state
# 记录转换日志(生产环境写入监控日志)
log_entry = {
"timestamp": datetime.now(pytz.utc),
"market_id": self.market_id,
"from_state": old_state.value,
"to_state": new_state.value,
"reason": reason
}
self._alert_suppressed_log.append(log_entry)
# 触发告警回调(仅 ALERTING 和 FAULT 状态)
if new_state in (MonitorState.ALERTING, MonitorState.FAULT):
if self.alert_callback:
severity = "CRITICAL" if new_state == MonitorState.FAULT else "WARNING"
self.alert_callback(
market_id=self.market_id,
state=new_state.value,
severity=severity,
reason=reason,
suppressed_log=list(self._alert_suppressed_log[-10:])
)
self._log(f"状态转换: {old_state.value} → {new_state.value} | 原因: {reason}")
def _log(self, message: str):
"""日志记录(生产环境应接入结构化日志)"""
ts = datetime.now(pytz.utc).strftime("%Y-%m-%d %H:%M:%S")
print(f"[{ts}] [{self.market_id}] {message}")
def get_status(self) -> dict:
"""获取当前监控状态(供外部查询)"""
with self._lock:
return {
"market_id": self.market_id,
"state": self.state.value,
"last_data_time": (
self.last_data_time.isoformat()
if self.last_data_time else None
),
"suppressed_alerts": len(self._alert_suppressed_log)
}
4.3 告警回调函数的工程实现
状态机的 alert_callback 需要真正地将告警发送到运维通道。以下是一个接入飞书 Webhook 的示例实现,包含防轰炸机制:
import os
import json
import time
from datetime import datetime
import requests
from collections import defaultdict
class AlertThrottler:
"""
告警节流器:防止同一市场的告警在短时间内重复触发
策略:当同一市场的告警在 5 分钟内重复触发时,合并为一条摘要告警
"""
def __init__(self, coalesce_window_seconds: int = 300):
self.coalesce_window = coalesce_window_seconds
self.pending_alerts: dict = {}
self.sent_alerts: dict = {}
def should_send(self, market_id: str, severity: str) -> tuple[bool, bool]:
"""
Returns:
(should_send, is_coalesced)
should_send: 是否应该发送这条告警
is_coalesced: 是否是合并后的摘要告警
"""
now = time.time()
if market_id not in self.sent_alerts:
self.sent_alerts[market_id] = {
"last_sent": 0,
"count": 0,
"severity": severity
}
last_sent_info = self.sent_alerts[market_id]
time_since_last = now - last_sent_info["last_sent"]
if time_since_last < self.coalesce_window:
# 在窗口期内:合并,不单独发送
last_sent_info["count"] += 1
return False, False
else:
# 超过窗口期:发送,并记录
last_sent_info["last_sent"] = now
count = last_sent_info["count"]
last_sent_info["count"] = 0
return True, count > 0
def get_pending_summary(self, market_id: str) -> int:
return self.sent_alerts.get(market_id, {}).get("count", 0)
# 全局节流器实例
alert_throttler = AlertThrottler(coalesce_window_seconds=300)
def feishu_alert_callback(
market_id: str,
state: str,
severity: str,
reason: str,
suppressed_log: list
):
"""
飞书 Webhook 告警回调
包含节流逻辑:
- 同一市场 5 分钟内只发送一次告警
- 合并期间的告警汇总到一条摘要中
"""
should_send, is_coalesced = alert_throttler.should_send(market_id, severity)
if not should_send:
return # 抑制告警,不发送
# 构造告警消息
pending_count = alert_throttler.get_pending_summary(market_id)
coalesced_part = f"\n⚠️ 另有 **{pending_count}** 条同类告警已在 5 分钟内合并。" if is_coalesced else ""
emoji = "🔴" if severity == "CRITICAL" else "🟡"
message = {
"msg_type": "interactive",
"card": {
"header": {
"title": f"{emoji} TickDB 市场监控告警",
"template": "red" if severity == "CRITICAL" else "yellow"
},
"elements": [
{
"tag": "markdown",
"content": f"**市场**: {market_id}\n"
f"**状态**: {state}\n"
f"**严重性**: {severity}\n"
f"**原因**: {reason}{coalesced_part}"
},
{
"tag": "markdown",
"content": f"> 时间: {datetime.now(pytz.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC"
}
]
}
}
# 从环境变量读取 Webhook URL(不在代码中硬编码)
webhook_url = os.environ.get("FEISHU_WEBHOOK_URL")
if not webhook_url:
print("[WARN] FEISHU_WEBHOOK_URL 未设置,告警仅打印:", reason)
return
try:
response = requests.post(
webhook_url,
json=message,
headers={"Content-Type": "application/json"},
timeout=(3.05, 10)
)
if response.status_code != 200:
print(f"[WARN] 飞书告警发送失败: HTTP {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"[ERROR] 飞书告警发送异常: {e}")
五、完整监控系统的组装与运行
5.1 主监控管理器
将所有组件组装为一个统一的管理器:
class MultiMarketMonitor:
"""
多市场时段感知监控管理器
使用示例:
monitor = MultiMarketMonitor()
monitor.add_market("HK")
monitor.add_market("US")
monitor.start()
# ... 监控在后台运行 ...
monitor.stop()
"""
def __init__(self):
self.state_machines: Dict[str, MarketMonitorStateMachine] = {}
self.alert_callback = feishu_alert_callback
def add_market(self, market_id: str):
"""注册要监控的市场"""
if market_id not in MARKET_CALENDARS:
raise ValueError(f"未知市场ID: {market_id}")
sm = MarketMonitorStateMachine(
market_id=market_id,
calendar=MARKET_CALENDARS[market_id],
alert_callback=self.alert_callback
)
self.state_machines[market_id] = sm
def start(self):
"""启动所有市场监控"""
for sm in self.state_machines.values():
sm.start()
print(f"多市场监控已启动,共 {len(self.state_machines)} 个市场")
def stop(self):
"""停止所有市场监控"""
for sm in self.state_machines.values():
sm.stop()
def get_all_status(self) -> Dict[str, dict]:
"""获取所有市场的当前状态"""
return {
market_id: sm.get_status()
for market_id, sm in self.state_machines.items()
}
5.2 与 TickDB WebSocket 的集成
监控管理器需要与 TickDB 的 WebSocket 连接配合使用:WebSocket 收到数据时通知状态机,状态机据此判断是否需要告警:
import os
import json
import asyncio
import websockets
import random
from datetime import datetime
from typing import Optional
import time
class TickDBWebSocketMonitor:
"""
TickDB WebSocket 客户端(集成时段感知监控)
⚠️ 生产环境建议使用 aiohttp/asyncio 处理高频数据流
"""
def __init__(self, market_id: str, state_machine: MarketMonitorStateMachine):
self.market_id = market_id
self.state_machine = state_machine
self.ws: Optional[websockets.WebSocketClientProtocol] = None
self._running = False
self._reconnect_delay = 1.0
self._max_reconnect_delay = 60.0
async def connect_and_listen(self, api_key: str):
"""
连接到 TickDB WebSocket 并监听数据
数据到达时自动通知状态机。
连接断开时自动重连(指数退避 + 抖动)。
"""
self._running = True
reconnect_count = 0
while self._running:
try:
uri = f"wss://api.tickdb.ai/ws?api_key={api_key}&market={self.market_id}"
async with websockets.connect(
uri,
ping_interval=20,
ping_timeout=10,
close_timeout=5
) as ws:
self.ws = ws
reconnect_count = 0
self._reconnect_delay = 1.0
print(f"[{self.market_id}] WebSocket 连接已建立")
async for raw_message in ws:
try:
message = json.loads(raw_message)
# 通知状态机:数据已到达
self.state_machine.on_data_received(datetime.now(pytz.utc))
# 业务处理:解析 depth 数据等
await self._process_message(message)
except json.JSONDecodeError:
# 处理 pong 等非 JSON 消息
if raw_message == "pong":
continue
print(f"[{self.market_id}] 消息解析失败: {raw_message[:100]}")
except websockets.exceptions.ConnectionClosed as e:
reconnect_count += 1
delay = min(
self._reconnect_delay * (2 ** reconnect_count),
self._max_reconnect_delay
)
# 抖动:避免所有连接同时重连(惊群效应)
jitter = random.uniform(0, delay * 0.1)
final_delay = delay + jitter
print(f"[{self.market_id}] 连接断开 ({e.code}),"
f"{final_delay:.1f} 秒后第 {reconnect_count} 次重连")
await asyncio.sleep(final_delay)
except Exception as e:
print(f"[{self.market_id}] WebSocket 异常: {e}")
await asyncio.sleep(5)
async def _process_message(self, message: dict):
"""处理 TickDB 消息(子类可重写此方法实现具体业务逻辑)"""
channel = message.get("channel")
if channel == "depth":
# 提取订单簿深度数据,详见 TickDB depth 频道文档
data = message.get("data", {})
# bid_depth = data.get("bid_depth", 0)
# ask_depth = data.get("ask_depth", 0)
pass
def stop(self):
"""停止监听"""
self._running = False
if self.ws:
asyncio.create_task(self.ws.close())
5.3 启动脚本
async def main():
"""多市场监控启动入口"""
api_key = os.environ.get("TICKDB_API_KEY")
if not api_key:
raise RuntimeError("请设置环境变量 TICKDB_API_KEY")
monitor = MultiMarketMonitor()
monitor.add_market("HK")
monitor.add_market("US")
monitor.start()
# 启动各市场的 WebSocket 监听
tasks = []
for market_id, sm in monitor.state_machines.items():
client = TickDBWebSocketMonitor(market_id, sm)
task = asyncio.create_task(client.connect_and_listen(api_key))
tasks.append(task)
try:
await asyncio.gather(*tasks)
except KeyboardInterrupt:
print("\n收到中断信号,正在关闭...")
monitor.stop()
if __name__ == "__main__":
asyncio.run(main())
六、实测效果与边界条件处理
6.1 实测场景验证
用模拟数据验证状态机在不同场景下的行为:
def simulate_scenario(
market_id: str,
events: list
) -> list:
"""
模拟测试场景,验证状态机行为
events: [(时间戳, 事件类型, 事件数据), ...]
事件类型: "data_received" | "time_tick"
"""
calendar = MARKET_CALENDARS[market_id]
state_log = []
def on_alert(market_id, state, severity, reason, suppressed_log):
state_log.append({
"type": "alert",
"market_id": market_id,
"state": state,
"severity": severity,
"reason": reason
})
sm = MarketMonitorStateMachine(
market_id=market_id,
calendar=calendar,
check_interval=1.0,
recovery_timeout=3,
alert_callback=on_alert
)
sm.start()
for ts, event_type, data in events:
if event_type == "data_received":
sm.on_data_received(ts)
elif event_type == "time_tick":
sm._tick()
sm.stop()
return state_log
# ── 场景 A:港股午休 ──
# 11:58 收到数据 → 12:00 进入午休 → 12:58 数据流恢复 → 13:00 退出午休
print("═══ 场景 A:港股午休无告警验证 ═══")
scenario_a_hkt = pytz.timezone("Asia/Hong_Kong")
events_a = [
(datetime(2026, 4, 15, 11, 58, tzinfo=scenario_a_hkt), "data_received", None),
# [模拟 12:00-12:59 每秒的 time_tick,但无数据到达]
*[
(datetime(2026, 4, 15, 12, i % 60, tzinfo=scenario_a_hkt), "time_tick", None)
for i in range(60)
],
(datetime(2026, 4, 15, 12, 58, tzinfo=scenario_a_hkt), "data_received", None),
(datetime(2026, 4, 15, 13, 0, tzinfo=scenario_a_hkt), "time_tick", None),
]
log_a = simulate_scenario("HK", events_a)
alerts_a = [e for e in log_a if e["type"] == "alert"]
print(f"午休期间告警次数: {len(alerts_a)} (预期: 0)")
# ── 场景 B:美股盘后真实故障 ──
# 16:05 最后一条数据 → 16:07 仍未恢复 → 触发告警
print("\n═══ 场景 B:美股盘后真实故障验证 ═══")
scenario_b_us = pytz.timezone("America/New_York")
events_b = [
(datetime(2026, 4, 15, 16, 5, tzinfo=scenario_b_us), "data_received", None),
(datetime(2026, 4, 15, 16, 6, tzinfo=scenario_b_us), "time_tick", None),
(datetime(2026, 4, 15, 16, 7, tzinfo=scenario_b_us), "time_tick", None),
]
log_b = simulate_scenario("US", events_b)
alerts_b = [e for e in log_b if e["type"] == "alert"]
print(f"告警次数: {len(alerts_b)} (预期: 1,16:07 判定为真实故障)")
if alerts_b:
print(f"告警详情: {alerts_b[0]['reason']}")
输出预期:
═══ 场景 A:港股午休无告警验证 ═══
[2026-04-15 11:58:00] [HK] 数据恢复,告警自动关闭
午休期间告警次数: 0 (预期: 0)
═══ 场景 B:美股盘后真实故障验证 ═══
[2026-04-15 16:07:00] [US] 状态转换: normal → alerting | 原因: 超过阈值 120s 未收到数据,触发告警
告警次数: 1 (预期: 1,16:07 判定为真实故障)
6.2 边界条件与已知局限
状态机方案不是万能的,以下场景需要额外处理:
边界条件 1:跨时区事件对齐
状态机在 UTC 时间运行,但市场日历使用当地时区。当监控服务器的本地时区与市场时区不一致时,datetime.now(pytz.utc) 和 astimezone() 的转换必须正确。前文代码已使用 pytz 处理时区转换,但需要注意:永远不要用 datetime.utcnow() 而不附加时区信息,这会导致夏令时切换时出现不可察觉的 1 小时偏移。
边界条件 2:港股半日交易
港股在某些节前交易日会"半日交易"(09:30-12:00,无午休,无下午盘)。当前的节假日列表不支持"半日"规则,需要在节假日数据中增加 half_day: true 字段,并在 TradingSession 匹配时排除下午盘。
边界条件 3:美股期权到期日
每月第三个周五是期权到期日(OPEX),成交量结构会在 12:00-13:00 出现异常峰值。这种结构性变化超出了状态机本身能处理的能力范围,需要在市场日历中标注特殊日期,或引入外部事件日历(如 Truflation、CME 节假日表)。
七、结语
凌晨 3:17 的那条告警,本质上是监控系统的"知识盲区"造成的信任损耗。
系统不知道港股中午会休市,所以把沉默当作故障。系统不知道美股盘后流动性会萎缩,所以把低成交量当作异常。系统不知道 UTC 凌晨的数字货币市场只有西海岸投机者在玩,所以把价差扩大当作信号。
解决这个问题的路径,不是把监控规则写得更复杂,而是让监控系统理解市场。MarketCalendar 赋予它市场结构知识,AdaptiveThresholdCalculator 赋予它基于数据的自适应能力,MarketMonitorStateMachine 赋予它判断和决策的能力。三者组合在一起,监控系统就不再是一个只会计时的工具,而是一个理解市场运行逻辑的智能体。
当然,这条路没有终点。市场规则会变化,新的流动性结构会出现,历史数据会积累新的模式。监控系统的知识库需要持续更新,状态机的规则需要持续调参,自适应计算器的滑动窗口需要持续评估。
但至少,从今天开始,你的飞书不会在凌晨 3:17 响起来了。
下一步行动
如果你在管理多市场量化监控系统:
- 将本文的
MarketCalendar模型替换你当前的硬编码时段配置 - 审查现有的告警规则,将所有固定时间阈值替换为
calculate_final_alert_threshold - 将监控系统改造为状态机架构,参考
MarketMonitorStateMachine的状态转换逻辑
如果你刚开始搭建量化监控基础设施:
- 访问 tickdb.ai 注册(免费,无需信用卡),获取 API Key
- 在控制台查看 TickDB 行情频道(depth / kline / trades)的完整接口文档
- 使用本文的 WebSocket 客户端代码接入实时行情,集成状态机监控
如果你关心 TickDB 产品能力:
- TickDB 的美股、港股、数字货币实时行情均支持 WebSocket 推送,可与本文的监控方案直接集成
- 多市场行情的盘前、盘中、盘后时段规则可通过 TickDB 的
/market-info接口获取(部分市场支持) - 联系 [email protected] 获取机构级的行情+监控一体化方案
如果你习惯用 AI 辅助开发:
- 在 ClawHub 搜索安装
tickdb-market-dataSKILL,快速获取 TickDB API 的使用指南和代码模板
本文不构成任何投资建议。监控系统设计方案适用于一般性技术参考,具体实现时请根据你的业务场景和市场特点进行调整。市场有风险,投资需谨慎。