凌晨 3 点的告警:一位量化开发者的午休噩梦
凌晨 3 点,睡眼惺忪的你被手机震醒——监控系统发来告警:“数据流中断”。你条件反射地打开终端,手指悬在重启键上方。但定睛一看,时间戳显示 12:47。港股的午休时段。
这不是数据问题,是系统设计缺陷。
当监控系统不具备“市场时段感知”能力,正常的交易间隔会被误判为连接故障。随之而来的,可能是过度重连导致的 API 限频、凌晨无意义的告警通知,以及工程师对告警系统的信任崩塌。
本文拆解港股午休期的数据静默机制,给出生产级的时段感知调度方案,并提供完整的代码实现。
一、港股交易时段的微观结构
港股实行与 A 股、美股截然不同的分段交易机制,理解这一机制是构建健壮监控系统的前提。
1.1 交易时段划分
港股每个交易日的标准时段如下:
| 时段 | 时间 | 说明 |
|---|---|---|
| 开盘前竞价 | 09:00-09:30 | 盘前竞价,成交量有限 |
| 早市 | 09:30-12:00 | 连续竞价,正常数据流 |
| 午间休市 | 12:00-13:00 | 无数据推送,整整 1 小时 |
| 午市 | 13:00-16:00 | 连续竞价,正常数据流 |
| 收盘后竞价 | 16:00-16:10 | 盘后竞价(部分标的) |
关键的生理现象:12:00-13:00 的整整 60 分钟内,交易所不推送任何行情数据。这不是 API 问题,不是网络问题,是市场机制的一部分。
1.2 数据流的时间分布特征
以 9988.HK(阿里巴巴)为例,一个典型交易日的 WebSocket 数据到达频率:
| 时段 | 预估消息频率 | 订单簿状态 |
|---|---|---|
| 09:30-12:00 | 50-200 msg/s | 活跃,深度 10 档可用 |
| 12:00-13:00 | 0 msg/s | 静止,无任何推送 |
| 13:00-16:00 | 50-200 msg/s | 活跃恢复 |
| 16:00 后 | <5 msg/s | 收盘数据衰减 |
量化开发者的核心挑战:设计监控系统时,必须将这种“非连续性”纳入架构,不能假设数据流在交易日内始终存在。
二、误告警的根因分析
没有市场日历意识(Market Calendar Awareness)的监控系统,典型架构是:
WebSocket 连接 → 数据接收超时 → 触发重连 → 告警
这套逻辑在 12:00-13:00 会产生以下问题:
2.1 三种典型的误报场景
场景一:固定超时触发误报
监控系统设置 30 秒无数据则判定为“连接断开”。12:00 整,数据流停止,30 秒后告警,12:01 你被叫醒。
场景二:指数退避重连风暴
告警触发后,系统自动执行重连。如果使用标准的指数退避(如首次等待 1 秒,失败后 2 秒、4 秒...),在午休期间会连续发起重连请求。这不仅浪费资源,更可能触发 API 的限频机制(code: 3001),导致午休结束后真正需要恢复时反而被限流。
场景三:夜间无数据误判
港股收盘后(16:00)到次日开盘前(09:00),长达 17 小时无数据。缺乏市场日历的系统会在凌晨反复告警,工程师的睡眠和告警信噪比同步崩溃。
2.2 根因:系统假设“数据应该始终存在”
这是监控系统设计的第一原罪:用处理互联网服务中断的思维,处理金融市场的正常交易间隔。
金融市场的数据流天然是非连续的——有开盘、午休、收盘、节假日。任何脱离市场日历的监控设计,都是在给自己挖坑。
三、时段感知调度:架构设计
正确的解决方案是在监控逻辑中嵌入市场日历感知(Market Calendar Awareness),使系统具备“时间上下文”(Temporal Context)。
3.1 核心架构
┌─────────────────────────────────────────────────────┐
│ 监控主循环 │
│ ┌──────────┐ ┌──────────────┐ ┌──────────┐ │
│ │ 市场日历 │ → │ 时段判定器 │ → │ 差异化响应│ │
│ │ Calendar │ │ TimeContext │ │ Response │ │
│ └──────────┘ └──────────────┘ └──────────┘ │
│ ↑ ↑ ↑ │
│ └───────────────┴───────────────────┘ │
│ 状态上下文传递 │
└─────────────────────────────────────────────────────┘
时段判定器输出:
- TRADING: 正常交易,监控数据频率和质量
- BREAK: 午休/收盘,暂停告警、保持连接
- PRE_MARKET: 盘前,监控竞价异常
- HOLIDAY: 节假日,深度静默
3.2 时段判定器逻辑
from enum import Enum
from datetime import datetime, time
import pytz
class MarketSession(Enum):
"""港股市场时段枚举"""
PRE_MARKET = "pre_market" # 盘前竞价 09:00-09:30
TRADING_MORNING = "trading_morning" # 早市 09:30-12:00
BREAK = "break" # 午休 12:00-13:00
TRADING_AFTERNOON = "trading_afternoon" # 午市 13:00-16:00
CLOSED = "closed" # 收盘后或其他时间
HOLIDAY = "holiday" # 节假日
class HKMarketCalendar:
"""
港股市场日历:判定当前时段
核心职责:
1. 根据系统时间判断当前所属交易时段
2. 提供是否为交易日的判定
3. 计算距离下一个关键时间点(开盘、午休结束、收盘)的秒数
"""
# 港股交易时段定义
SESSIONS = [
(time(9, 0), time(9, 30), MarketSession.PRE_MARKET),
(time(9, 30), time(12, 0), MarketSession.TRADING_MORNING),
(time(12, 0), time(13, 0), MarketSession.BREAK),
(time(13, 0), time(16, 0), MarketSession.TRADING_AFTERNOON),
(time(16, 0), time(23, 59), MarketSession.CLOSED),
(time(0, 0), time(9, 0), MarketSession.CLOSED),
]
def __init__(self, tz: str = "Asia/Hong_Kong"):
self.tz = pytz.timezone(tz)
def now(self) -> datetime:
"""获取当前港股时间"""
return datetime.now(self.tz)
def get_current_session(self) -> MarketSession:
"""获取当前市场时段"""
current_time = self.now().time()
for start, end, session in self.SESSIONS:
if start <= current_time < end:
return session
return MarketSession.CLOSED
def is_trading(self) -> bool:
"""当前是否在交易时段(早市或午市)"""
session = self.get_current_session()
return session in (
MarketSession.PRE_MARKET,
MarketSession.TRADING_MORNING,
MarketSession.TRADING_AFTERNOON
)
def is_trading_active(self) -> bool:
"""当前是否在连续竞价时段(排除盘前竞价)"""
session = self.get_current_session()
return session in (
MarketSession.TRADING_MORNING,
MarketSession.TRADING_AFTERNOON
)
def is_break(self) -> bool:
"""当前是否在午休时段"""
return self.get_current_session() == MarketSession.BREAK
def get_seconds_to_next_session(self) -> int:
"""获取距离下一时段的秒数(用于精确等待)"""
now = self.now()
now_time = now.time()
# 遍历所有时段,找到下一个起点
for start, end, session in self.SESSIONS:
if now_time < start:
# 计算到下一个时段起点的时间
next_start = now.replace(
hour=start.hour,
minute=start.minute,
second=0,
microsecond=0
)
# 如果跨越了午夜,需要加一天
if next_start <= now:
from datetime import timedelta
next_start += timedelta(days=1)
return int((next_start - now).total_seconds())
# 如果当前时间在最后一个时段之后,返回到次日第一个时段的时间
from datetime import timedelta
tomorrow = now + timedelta(days=1)
first_start = self.SESSIONS[0][0]
next_start = tomorrow.replace(
hour=first_start.hour,
minute=first_start.minute,
second=0,
microsecond=0
)
return int((next_start - now).total_seconds())
def is_trading_day(self) -> bool:
"""
判断是否为交易日
注意:完整的交易日判断需要对接交易所日历或节假日列表。
此处给出简化版:工作日(非周六周日)为假设的交易日。
生产环境应接入标准节假日数据源。
"""
weekday = self.now().weekday()
return weekday < 5 # 0=Monday, 4=Friday
def get_session_info(self) -> dict:
"""获取完整的时段信息,用于日志和监控"""
session = self.get_current_session()
now = self.now()
info = {
"current_time": now.isoformat(),
"session": session.value,
"is_trading_day": self.is_trading_day(),
"seconds_to_next_session": self.get_seconds_to_next_session(),
"next_action": self._get_next_action_description()
}
return info
def _get_next_action_description(self) -> str:
"""获取下一动作描述"""
session = self.get_current_session()
seconds = self.get_seconds_to_next_session()
minutes = seconds // 60
descriptions = {
MarketSession.PRE_MARKET: f"开盘在即,还有 {minutes} 分钟",
MarketSession.TRADING_MORNING: f"午休即将开始,还有 {minutes} 分钟",
MarketSession.BREAK: f"午休进行中,距离午市开盘还有 {minutes} 分钟",
MarketSession.TRADING_AFTERNOON: f"午市进行中,收盘还有 {minutes} 分钟",
MarketSession.CLOSED: f"已收盘,开盘还有 {minutes} 分钟",
MarketSession.HOLIDAY: f"非交易日,请检查日历"
}
return descriptions.get(session, "状态未知")
关键设计决策:时段判定器不依赖外部 API 调用,而是基于本地时钟计算。这确保了在网络不稳定或 API 限流期间,时段判断依然准确。
四、生产级 WebSocket 监控:时段感知版
现在将市场日历感知嵌入 WebSocket 监控客户端,实现真正的“智能静默”。
4.1 整体架构
import json
import time
import threading
import random
import logging
from typing import Callable, Optional
from collections import deque
import os
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger(__name__)
class HKMarketMonitor:
"""
港股市场时段感知 WebSocket 监控客户端
核心增强:
1. 内置市场日历感知,午休期不告警
2. 动态超时策略:交易时段严格,休市时段宽松
3. 智能重连:在正确的时机恢复连接
4. 数据流质量监控:区分“数据缺失”和“连接断开”
"""
# 交易时段的超时阈值(秒)
TRADING_TIMEOUT = 10 # 交易时段:10秒无数据则告警
BREAK_TIMEOUT = 3600 # 午休时段:容忍1小时静默
PRE_MARKET_TIMEOUT = 60 # 盘前时段:60秒无数据则告警
# 重连参数
MAX_RETRY = 5
BASE_DELAY = 1
MAX_DELAY = 60
def __init__(
self,
api_key: str,
symbols: list,
on_data: Callable,
on_alert: Callable,
ws_url: str = "wss://api.tickdb.ai/ws/v1/market"
):
self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
if not self.api_key:
raise ValueError("API Key 未配置,请设置 TICKDB_API_KEY 环境变量")
self.symbols = symbols
self.on_data = on_data
self.on_alert = on_alert
self.ws_url = ws_url
self.ws = None
self.running = False
self.last_data_time = None
self.last_confirmed_session = None # 上一次确认的市场时段
# 市场日历实例
self.calendar = HKMarketCalendar()
# 数据流质量监控
self.message_buffer = deque(maxlen=100) # 保留最近100条消息的时间戳
self.missing_heartbeats = 0
# 线程安全
self._lock = threading.Lock()
self._monitor_thread = None
# 状态统计
self.stats = {
"total_messages": 0,
"alerts_triggered": 0,
"false_alerts_prevented": 0,
"reconnections": 0,
"break_time_received": 0
}
def start(self):
"""启动监控"""
if self.running:
logger.warning("监控已在运行中")
return
self.running = True
self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
self._monitor_thread.start()
logger.info(f"监控启动,监控标的:{self.symbols}")
def stop(self):
"""停止监控"""
self.running = False
if self.ws:
try:
self.ws.close()
except Exception:
pass
logger.info("监控已停止")
def _monitor_loop(self):
"""
主监控循环:整合市场日历感知与数据流监控
核心逻辑:
1. 判断当前市场时段
2. 根据时段选择差异化的超时策略
3. 在正确时机执行重连
"""
retry_count = 0
while self.running:
current_session = self.calendar.get_current_session()
# ==================== 时段感知决策 ====================
if current_session == MarketSession.BREAK:
# 午休时段:保持连接、监控重连、零告警
logger.info(f"【{current_session.value}】进入午休时段,数据静默为正常")
self._handle_break_session()
continue
elif current_session == MarketSession.CLOSED:
# 收盘或盘前:深度静默,不重连
logger.info(f"【{current_session.value}】非交易时段,保持连接但不监控")
self._handle_closed_session()
continue
# ==================== 交易时段:正常监控 ====================
if not self.ws or not self._is_ws_connected():
# 需要连接或重连
if retry_count >= self.MAX_RETRY:
logger.error(f"已达到最大重试次数({self.MAX_RETRY}),请人工介入")
self.on_alert({
"type": "connection_failure",
"message": "连续重连失败,请检查网络和 API Key",
"retry_count": retry_count
})
break
success = self._connect()
if success:
retry_count = 0
self.stats["reconnections"] += 1
logger.info("WebSocket 连接成功")
else:
retry_count += 1
delay = self._calculate_backoff(retry_count)
logger.warning(f"连接失败,{delay:.1f}秒后重试(第{retry_count}次)")
time.sleep(delay)
continue
# 交易时段的数据流监控
self._monitor_data_flow(current_session)
# 短暂休眠,避免空转
time.sleep(1)
def _handle_break_session(self):
"""
处理午休时段:智能静默
午休时段的处理策略:
1. 如果已连接,保持连接(避免复市后重新建立连接的开销)
2. 不执行重连逻辑
3. 不触发告警
4. 定期输出状态日志(每 10 分钟),便于人工观察
"""
if self.ws and self._is_ws_connected():
# 连接正常,仅维持心跳
logger.debug(f"【午休】连接正常,静默期已持续 {self._get_silence_duration():.0f} 秒")
# 每 10 分钟输出一次状态(便于运维观察)
silence_duration = self._get_silence_duration()
if silence_duration > 0 and silence_duration % 600 < 1:
session_info = self.calendar.get_session_info()
logger.info(f"【午休状态报告】{session_info['next_action']},"
f"已静默 {silence_duration:.0f} 秒,无告警(正常)")
# 午休结束时,等待 5 秒后再开始数据流监控(给交易所数据推送留出启动时间)
if self.last_confirmed_session == MarketSession.BREAK:
pass # 刚进入午休,不做额外处理
else:
# 刚从午休恢复,准备接收数据
logger.info("午休结束,5 秒后开始监控午市数据流")
time.sleep(5)
self.last_confirmed_session = MarketSession.BREAK
def _handle_closed_session(self):
"""处理收盘/非交易时段:深度静默"""
# 收盘时段不进行主动重连
if self.ws:
try:
self.ws.close()
self.ws = None
logger.debug("收盘时段,关闭 WebSocket 连接")
except Exception:
pass
# 非交易时段的长等待:等待到下一交易时段
seconds_to_wait = self.calendar.get_seconds_to_next_session()
logger.info(f"非交易时段,休眠 {seconds_to_wait} 秒至下一时段")
time.sleep(min(seconds_to_wait, 300)) # 最多休眠5分钟,避免死锁
def _monitor_data_flow(self, session: MarketSession):
"""
监控数据流:区分正常静默与异常断开
差异化超时策略:
- 盘前竞价:60秒无数据告警(可能存在竞价异常)
- 早市/午市:10秒无数据告警(真正的连接问题)
"""
silence_duration = self._get_silence_duration()
# 选择对应的超时阈值
if session == MarketSession.PRE_MARKET:
timeout = self.PRE_MARKET_TIMEOUT
elif session in (MarketSession.TRADING_MORNING, MarketSession.TRADING_AFTERNOON):
timeout = self.TRADING_TIMEOUT
else:
timeout = self.BREAK_TIMEOUT
if silence_duration > timeout:
# 可能存在连接问题,但先检查是否在合理范围内
self.stats["alerts_triggered"] += 1
alert_payload = {
"type": "data_flow_issue",
"session": session.value,
"silence_duration": silence_duration,
"timeout_threshold": timeout,
"message": f"{session.value} 时段已静默 {silence_duration:.0f} 秒"
}
logger.warning(f"触发告警:{alert_payload['message']}")
self.on_alert(alert_payload)
def _connect(self) -> bool:
"""
建立 WebSocket 连接
鉴权方式:URL 参数传递 api_key
"""
try:
import websocket
# 构建带鉴权的 URL
full_url = f"{self.ws_url}?api_key={self.api_key}"
self.ws = websocket.WebSocketApp(
full_url,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
on_open=self._on_open
)
# 设置心跳
self.ws.sock = None # 初始化
# 在独立线程中运行
ws_thread = threading.Thread(
target=self.ws.run_forever,
kwargs={"ping_interval": 30, "ping_timeout": 10},
daemon=True
)
ws_thread.start()
return True
except Exception as e:
logger.error(f"WebSocket 连接失败:{e}")
return False
def _on_open(self, ws):
"""WebSocket 连接打开后,订阅标的"""
logger.info("WebSocket 连接已打开,订阅行情数据")
# 订阅 tick 数据(港股可用)
subscribe_msg = {
"cmd": "subscribe",
"args": {
"symbols": self.symbols,
"channels": ["tick"]
}
}
try:
ws.send(json.dumps(subscribe_msg))
logger.info(f"已订阅:{self.symbols}")
except Exception as e:
logger.error(f"订阅失败:{e}")
def _on_message(self, ws, message):
"""处理收到的消息"""
self.last_data_time = time.time()
self.message_buffer.append(self.last_data_time)
self.stats["total_messages"] += 1
try:
data = json.loads(message)
# 处理 tick 数据
if data.get("type") == "tick":
self.on_data(data)
# 处理限频响应
elif data.get("code") == 3001:
retry_after = int(data.get("retry_after", 5))
logger.warning(f"限频触发,等待 {retry_after} 秒")
time.sleep(retry_after)
except json.JSONDecodeError:
logger.warning(f"JSON 解析失败:{message[:100]}")
def _on_error(self, ws, error):
"""错误处理"""
logger.error(f"WebSocket 错误:{error}")
def _on_close(self, ws, close_status_code, close_msg):
"""连接关闭处理"""
logger.info(f"WebSocket 连接关闭:{close_status_code} - {close_msg}")
# 关闭后不立即重连,等待主循环决策
# 时段感知逻辑决定是否需要重连
self.ws = None
def _is_ws_connected(self) -> bool:
"""检查 WebSocket 连接状态"""
if not self.ws or not self.ws.sock:
return False
return self.ws.sock.connected
def _get_silence_duration(self) -> float:
"""获取当前静默时长(秒)"""
if not self.last_data_time:
return 0.0
# 如果 WebSocket 未连接,从上次确认的数据时间计算
if not self._is_ws_connected():
return time.time() - self.last_data_time
return time.time() - self.last_data_time
def _calculate_backoff(self, retry_count: int) -> float:
"""
计算指数退避延迟(含抖动)
公式:min(BASE_DELAY * 2^retry_count + random(0, 10%), MAX_DELAY)
"""
delay = self.BASE_DELAY * (2 ** retry_count)
jitter = random.uniform(0, delay * 0.1) # 10% 抖动
return min(delay + jitter, self.MAX_DELAY)
def get_stats(self) -> dict:
"""获取监控统计信息"""
return {
**self.stats,
"current_session": self.calendar.get_current_session().value,
"silence_duration": self._get_silence_duration(),
"is_connected": self._is_ws_connected()
}
4.2 使用示例
import json
def handle_data(data):
"""数据处理回调"""
symbol = data.get("symbol", "UNKNOWN")
price = data.get("price", 0)
volume = data.get("volume", 0)
print(f"[数据] {symbol}: ${price} | 成交量: {volume}")
def handle_alert(alert):
"""告警处理回调"""
alert_type = alert.get("type")
if alert_type == "data_flow_issue":
# 检查是否因午休触发(误报过滤)
session = alert.get("session")
if session == "break":
print(f"【信息】午休期间,静默 {alert.get('silence_duration')} 秒(正常)")
return # 不执行实际告警
# 非午休时段的告警才真正触发通知
print(f"🚨 【告警】{alert.get('message')}")
elif alert_type == "connection_failure":
print(f"🔴 【严重】{alert.get('message')}")
# 初始化监控
monitor = HKMarketMonitor(
api_key=os.environ.get("TICKDB_API_KEY"),
symbols=["9988.HK", "0700.HK", "3690.HK"],
on_data=handle_data,
on_alert=handle_alert
)
# 启动
monitor.start()
# 主线程保持运行
try:
while True:
time.sleep(60)
stats = monitor.get_stats()
print(f"[状态] {stats['current_session']} | 静默 {stats['silence_duration']:.0f}s | "
f"消息数 {stats['total_messages']} | 告警 {stats['alerts_triggered']}")
except KeyboardInterrupt:
monitor.stop()
print("监控已停止")
4.3 运行输出示例
2026-04-25 11:59:58 [INFO] 监控启动,监控标的:['9988.HK', '0700.HK']
2026-04-25 12:00:00 [INFO] 【trading_morning】进入午休时段,数据静默为正常
2026-04-25 12:10:00 [INFO] 【午休状态报告】午休进行中,距离午市开盘还有 50 分钟,已静默 600 秒,无告警(正常)
2026-04-25 12:15:01 [INFO] 【午休状态报告】午休进行中,距离午市开盘还有 45 分钟,已静默 900 秒,无告警(正常)
2026-04-25 13:00:00 [INFO] 午休结束,5 秒后开始监控午市数据流
2026-04-25 13:00:06 [INFO] WebSocket 连接成功
2026-04-25 13:00:06 [INFO] WebSocket 连接已打开,订阅行情数据
2026-04-25 13:00:06 [INFO] 已订阅:['9988.HK', '0700.HK']
[数据] 9988.HK: $82.50 | 成交量: 1000
[数据] 0700.HK: $378.20 | 成交量: 500
五、午休期数据流的深度分析
5.1 为什么午休期间没有数据?
港股午休的设计有历史原因和现实考量:
历史渊源:香港联交所成立初期,交易大厅内需要人工传递纸质委托单,中午休市让经纪人有时间处理午间堆积的订单。随着电子化交易普及,这一制度保留至今,更多是出于市场参与者(尤其是机构)的习惯需求。
现实考量:
- 机构投资者需要午间复盘上午交易、调整下午策略
- 亚洲市场跨时区联动,午休是缓冲期(A股 11:30-13:00 也是同理)
- 部分对冲基金在午休期间调整持仓,下午开盘可能出现大幅跳空
5.2 午休期间你能做什么?
午休的 60 分钟静默期,不是“系统故障的隐患期”,而是“策略准备的黄金期”。
| 可执行任务 | 说明 |
|---|---|
| 上午策略复盘 | 分析订单簿变化、买卖压力比、流动性分布 |
| 下午仓位调整 | 基于午间新闻和隔夜美股期货调整 |
| 数据校准 | 检查 TickDB 返回数据的完整性(是否漏掉 tick) |
| 系统自检 | 执行健康检查,确认下午开盘时连接正常 |
以下是午休期间的数据完整性检查代码:
def verify_data_integrity(monitor: HKMarketMonitor, symbol: str, lookback_minutes: int = 180):
"""
验证上午数据的完整性
检查项:
1. 是否存在数据稀疏区间(可能存在连接问题)
2. 成交量的合理性(是否存在异常大单或异常静默)
3. 价格跳空的幅度(是否存在行情延迟)
"""
# ⚠️ 注意:此处需要存储上午的数据流
# 可使用 TickDB 的历史数据接口进行交叉验证
# 但 TickDB trades 接口不支持美股和港股,仅支持港股和数字货币
print(f"数据完整性验证:{symbol}")
print(" - 建议使用 TickDB /v1/market/kline 接口获取 1 分钟 K 线数据")
print(" - 对比实时数据与历史数据的成交量/价格一致性")
print(" - 如发现显著差异,记录为潜在数据问题供后续排查")
六、部署方案
6.1 分场景配置建议
| 场景 | 特点 | 推荐配置 |
|---|---|---|
| 个人量化 | 低频监控,轻量级 | 使用上述单进程方案,午休期完全静默 |
| 团队协作 | 多标的,中等频率 | 部署定时任务 + 飞书/钉钉告警,午休期降频 |
| 机构级 | 高频全市场,需要 99.9% 可用性 | 多活部署 + 独立市场日历服务 + 分级告警 |
6.2 高可用架构
# 机构级部署:多活 + 市场日历服务
class HAMarketMonitor:
"""
高可用版本:多活部署 + 市场日历服务
架构:
1. 主备双活 WebSocket 连接
2. 独立的市场日历服务(避免时钟漂移)
3. 三级告警:数据异常→连接断开→服务不可用
"""
def __init__(self, api_key: str, symbols: list):
self.primary = HKMarketMonitor(api_key, symbols, on_data, on_alert)
self.standby = HKMarketMonitor(api_key, symbols, on_data, on_alert)
self.calendar_service = "独立部署的市场日历服务" # 可对接 NTP + 港交所日历
七、结语:让监控系统“理解”市场
回到开篇的场景。如果那位量化开发者在构建监控系统时,预留了市场时段感知能力,凌晨 3 点的告警就不会发生。
这不是一个边缘案例。这是用处理互联网服务的思维,处理金融市场时的必然陷阱。
金融市场的数据流是非连续、非均匀的——有开盘竞价、盘中连续交易、午休静默、收盘竞价、节假日停牌。任何脱离市场日历的监控系统,都会在这套时间机制面前产生误报。
解决路径:
- 内置市场日历感知:系统需要“知道”当前是交易时段还是休市时段
- 差异化超时策略:交易时段严格(10-60秒),休市时段宽松(容忍小时级静默)
- 智能重连时机:不在午休期重连,不在收盘后重连
- 数据流质量监控:区分“数据缺失”和“连接断开”
TickDB 提供港股全档 depth 频道(10档)和 tick 逐笔数据,配合市场日历感知,你的监控系统将不再被午休绑架。
下一步行动
如果你希望修复现有监控系统的午休误报问题:
- 访问 tickdb.ai 注册获取免费 API Key(无需信用卡)
- 在控制台查看港股 9988.HK、0700.HK 的实时 depth 和 tick 数据
- 将本文的
HKMarketCalendar类集成到你的监控系统中
如果你需要完整的午休期策略工具链(复盘 + 下午预判 + 数据校准):
- 使用 TickDB
/v1/market/kline接口获取午休前后的 K 线数据 - 订阅港股 depth 频道,实时监控订单簿结构变化
- 联系 [email protected] 获取机构级数据方案(含 10 年历史 K 线)
如果你习惯用 AI 辅助开发:
在 AI 助手中搜索安装 tickdb-market-data SKILL,让 AI 帮你生成符合本文规范的监控代码。
风险提示:本文不构成任何投资建议。港股午休期间的价格跳空可能剧烈,实际交易中请设置合理的止损规则,并考虑盘前/盘后流动性不足带来的冲击成本。市场有风险,投资需谨慎。