凌晨 3:17,飞书告警响起:
「TickDB 数据流中断:HKEX 实时行情已断连 17 分钟」
揉着眼睛点开监控面板,看到交易品种是 9988.HK(阿里巴巴),正准备启动应急响应——然后想起来,今天是普通交易日,不是节假日。
阿里巴巴在正常交易。
问题出在你的监控系统里:它不知道港股中午会"打烊"。
这不是你的代码有问题。任何一个刚接触港股数据的量化开发者,都会在某个深夜被这种误报惊醒。港股的午休机制是写在港交所规则里的,但很多监控系统默认"全球市场都跟 A 股一样连续交易",结果就是告警系统把正常的市场结构当成故障来报。
本文拆解港股午休的数据真相,并给出生产级的时段感知调度方案——让你的监控只在真正断连时才响。
一、被忽视的规则:港股午休的微观结构
1.1 港交所的"午餐暂停"
与 A 股和美股不同,港股实行午间休市制度:
| 时段 | 时间(香港时间) | 状态 |
|---|---|---|
| 早盘竞价 | 09:00 - 09:30 | 竞价撮合,无连续竞价数据 |
| 早盘连续交易 | 09:30 - 12:00 | 正常数据推送 |
| 午间休市 | 12:00 - 13:00 | 无任何数据推送 |
| 下午连续交易 | 13:00 - 16:00 | 正常数据推送 |
| 盘后竞价 | 16:00 - 16:10 | 竞价撮合 |
这意味着在 12:00 - 13:00 这整整一个小时里:
- 没有新的 tick 数据
- 没有 depth 更新
- WebSocket 通道保持连接,但没有数据帧
- 如果你的监控逻辑是"N 分钟无数据 = 断连",这个时间窗口会被误判为故障
1.2 为什么这会带来监控灾难
大多数监控系统的设计逻辑是:
if (当前时间 - 最后一条数据时间 > 阈值):
触发告警
这个逻辑本身没问题,但阈值是固定的——通常是 30 秒到 5 分钟。问题在于:
- 港股午休期间,数据中断时长是 3600 秒
- 固定阈值的监控系统会把这种"计划内中断"识别为故障
- 后果:午休期间大量误报 → 告警疲劳 → 真正断连时被忽视
更糟糕的是,如果你同时监控多个市场(港股 + 美股),午休误报会让你的告警日志在每天 12:00 - 13:00 准时爆炸。
二、时段感知调度的核心设计
2.1 解决方案:市场日历 + 智能阈值
解决思路很清晰:让监控系统知道"什么时候该有数据,什么时候没有"。
核心架构分为三层:
┌─────────────────────────────────────────────────┐
│ 应用层 │
│ 业务逻辑(策略、风控、告警) │
├─────────────────────────────────────────────────┤
│ 时段感知层 │
│ 市场日历 + 动态阈值计算 + 状态机 │
├─────────────────────────────────────────────────┤
│ 数据层 │
│ TickDB WebSocket / REST API │
└─────────────────────────────────────────────────┘
时段感知层的职责:
- 加载市场日历(每个市场一套规则)
- 根据当前时间和品种,判断"应该在哪段窗口内有数据"
- 动态计算告警阈值(午休时拉高阈值,盘中降低阈值)
- 维护连接状态机(CONNECTED → SUSPECTED → DISCONNECTED)
2.2 港股市场日历定义
from dataclasses import dataclass
from datetime import time, date
from typing import List
@dataclass
class TradingWindow:
"""单一交易窗口定义"""
name: str # 窗口名称
start: time # 开始时间(香港时间)
end: time # 结束时间(香港时间)
is_active: bool # 该窗口是否有数据推送
@dataclass
class MarketCalendar:
"""市场交易日历"""
market_code: str # 如 "HKEX"
timezone: str # 如 "Asia/Hong_Kong"
trading_windows: List[TradingWindow] # 交易窗口列表
def is_in_active_session(self, current_time: time, current_date: date) -> bool:
"""判断给定时间是否处于有数据的交易时段"""
for window in self.trading_windows:
if window.start <= current_time < window.end:
return window.is_active
return False # 非窗口时段
# 港股市场日历定义
HKEX_CALENDAR = MarketCalendar(
market_code="HKEX",
timezone="Asia/Hong_Kong",
trading_windows=[
# 注意:港交所实际是 09:30 开盘,09:00-09:30 为竞价
# 为简化,本例假设 09:30 开始有连续数据
TradingWindow(name="早盘", start=time(9, 30), end=time(12, 0), is_active=True),
TradingWindow(name="午休", start=time(12, 0), end=time(13, 0), is_active=False),
TradingWindow(name="下午盘", start=time(13, 0), end=time(16, 0), is_active=True),
]
)
关键点:午休窗口的 is_active=False 是整个系统的信号源——它告诉上层逻辑:"这段时间没数据是正常的,别报警。"
2.3 动态阈值计算逻辑
import datetime
class DynamicAlertThreshold:
"""动态告警阈值计算器"""
# 不同场景下的超时阈值(秒)
THRESHOLD_CONFIG = {
"active_session": 30, # 盘中:30 秒无数据即告警
"pre_auction": 120, # 竞价阶段:2 分钟容忍
"lunch_break": 7200, # 午休:容忍 2 小时(覆盖整个午休 + 缓冲)
"after_hours": 300, # 盘后:5 分钟
"weekend": 86400, # 周末:容忍 24 小时
}
def get_threshold(self, calendar: MarketCalendar, now: datetime.datetime) -> int:
"""根据当前时段返回合适的告警阈值(秒)"""
current_time = now.time()
current_date = now.date()
# 周末判断
if current_date.weekday() >= 5:
return self.THRESHOLD_CONFIG["weekend"]
# 遍历所有交易窗口
for window in calendar.trading_windows:
if window.start <= current_time < window.end:
if window.is_active:
return self.THRESHOLD_CONFIG["active_session"]
else:
# 命中非活跃窗口(如午休)
return self.THRESHOLD_CONFIG["lunch_break"]
# 非窗口时段(开盘前、16:00 后)
if current_time < time(9, 30):
return self.THRESHOLD_CONFIG["pre_auction"]
else:
return self.THRESHOLD_CONFIG["after_hours"]
阈值设计逻辑:
- 盘中阈值 30 秒:港股每秒可能有几十笔成交,30 秒断连是严重问题
- 午休阈值 7200 秒(2 小时):覆盖完整的 12:00-13:00 午休,加 1 小时缓冲
- 周末阈值 86400 秒(24 小时):完全不做告警
这样设计的好处:午休期间,你的系统不会收到任何误报;但如果港股在盘中断连超过 30 秒,仍然会正常告警。
三、生产级监控系统实现
3.1 完整的状态机设计
from enum import Enum
from typing import Optional
from dataclasses import dataclass, field
import threading
import time
class ConnectionState(Enum):
CONNECTED = "connected" # 正常连接,有数据流
SUSPECTED = "suspected" # 可疑:超过宽松阈值但未达严格阈值
DISCONNECTED = "disconnected" # 已确认断连,触发告警
LUNCH_BREAK = "lunch_break" # 午休静默(正常状态)
@dataclass
class ConnectionMonitor:
"""带时段感知的连接监控器"""
symbol: str
calendar: MarketCalendar
alert_threshold_calc: DynamicAlertThreshold
# 状态
last_data_time: Optional[datetime.datetime] = None
state: ConnectionState = ConnectionState.CONNECTED
last_alert_time: Optional[datetime.datetime] = None
# 配置
strict_threshold: int = 30 # 严格阈值:盘中断连超过此值即告警
recovery_grace_period: int = 60 # 恢复后 60 秒内不再告警
# 线程安全
_lock: threading.Lock = field(default_factory=threading.Lock)
def update_data_timestamp(self, timestamp: datetime.datetime):
"""接收到新数据时调用"""
with self._lock:
self.last_data_time = timestamp
self.state = ConnectionState.CONNECTED
def check_connection(self, now: datetime.datetime) -> Optional[ConnectionState]:
"""
核心检查逻辑:每 N 秒调用一次
Returns:
需要触发告警时返回 ConnectionState.DISCONNECTED
其他情况返回当前状态
"""
with self._lock:
# 计算动态阈值
dynamic_threshold = self.alert_threshold_calc.get_threshold(
self.calendar, now
)
# 判断当前时段是否应该活跃
is_in_active = self.calendar.is_in_active_session(
now.time(), now.date()
)
# 情况 1:午休或其他非活跃时段
if not is_in_active:
self.state = ConnectionState.LUNCH_BREAK
return None # 不告警
# 情况 2:尚未收到过数据(启动阶段)
if self.last_data_time is None:
return None
# 情况 3:计算静默时长
silent_duration = (now - self.last_data_time).total_seconds()
# 情况 4:严格阈值检查(盘中)
if silent_duration > self.strict_threshold:
# 避免重复告警
if self.last_alert_time is None or \
(now - self.last_alert_time).total_seconds() > self.recovery_grace_period:
self.last_alert_time = now
self.state = ConnectionState.DISCONNECTED
return ConnectionState.DISCONNECTED
# 正常状态
self.state = ConnectionState.CONNECTED
return None
3.2 WebSocket 集成:心跳 + 状态更新
import json
import os
import random
import time
import websocket
from datetime import datetime
from typing import Callable, Optional
class TickDBHKMonitor:
"""
港股 TickDB WebSocket 监控器(含时段感知告警)
⚠️ 生产环境建议使用 asyncio/aiohttp 以提升性能
"""
def __init__(
self,
symbols: list[str],
on_data: Optional[Callable] = None,
on_alert: Optional[Callable] = None
):
self.symbols = symbols
self.api_key = os.environ.get("TICKDB_API_KEY")
if not self.api_key:
raise ValueError("请设置环境变量 TICKDB_API_KEY")
self.on_data = on_data
self.on_alert = on_alert
# 初始化市场日历和监控器
self.calendar = HKEX_CALENDAR
self.threshold_calc = DynamicAlertThreshold()
# 为每个品种创建连接监控器
self.monitors = {
symbol: ConnectionMonitor(
symbol=symbol,
calendar=self.calendar,
alert_threshold_calc=self.threshold_calc
)
for symbol in symbols
}
# WebSocket 配置
self.ws = None
self.base_url = "wss://api.tickdb.ai/v1/market/stream"
# 重连配置
self.retry_count = 0
self.max_retries = 10
self.base_delay = 1
self.max_delay = 60
def _get_websocket_url(self) -> str:
"""构建 WebSocket URL(API Key 通过 URL 参数传递)"""
return f"{self.base_url}?api_key={self.api_key}"
def _get_subscribe_message(self) -> dict:
"""生成订阅消息(depth + ticker 频道)"""
return {
"cmd": "subscribe",
"params": {
"channels": [
{"symbol": symbol, "channel": "depth"}
for symbol in self.symbols
]
}
}
def _send_ping(self):
"""心跳保活"""
if self.ws and self.ws.sock and self.ws.sock.connected:
try:
self.ws.send(json.dumps({"cmd": "ping"}))
except Exception as e:
print(f"[{datetime.now()}] 发送心跳失败: {e}")
def _handle_message(self, message: str):
"""处理接收到的消息"""
try:
data = json.loads(message)
# 处理 ping 响应
if data.get("cmd") == "pong":
return
# 处理订阅确认
if data.get("cmd") == "subscribe_ack":
print(f"[{datetime.now()}] 订阅成功: {data.get('channels')}")
return
# 处理数据帧
if "data" in data:
symbol = data.get("symbol", "")
timestamp = datetime.fromisoformat(data["data"].get("timestamp", ""))
# 更新对应品种的时间戳
if symbol in self.monitors:
self.monitors[symbol].update_data_timestamp(timestamp)
# 回调业务逻辑
if self.on_data:
self.on_data(symbol, data["data"])
except json.JSONDecodeError:
pass
except Exception as e:
print(f"[{datetime.now()}] 消息处理异常: {e}")
def _run_monitoring_loop(self):
"""独立的监控检查循环(在单独线程中运行)"""
print(f"[{datetime.now()}] 监控线程启动")
while self.retry_count <= self.max_retries:
now = datetime.now()
for symbol, monitor in self.monitors.items():
alert_state = monitor.check_connection(now)
if alert_state == ConnectionState.DISCONNECTED:
alert_msg = (
f"🚨 【告警】{symbol} 断连!"
f"最后数据: {monitor.last_data_time}, "
f"当前时段: {monitor.state.value}"
)
print(f"[{now}] {alert_msg}")
if self.on_alert:
self.on_alert(symbol, alert_msg)
# 检查间隔(短于最短阈值)
time.sleep(5) # 每 5 秒检查一次
def connect(self):
"""建立 WebSocket 连接"""
while self.retry_count <= self.max_retries:
try:
print(f"[{datetime.now()}] 尝试连接 WebSocket (重试 {self.retry_count}/{self.max_retries})")
self.ws = websocket.WebSocketApp(
self._get_websocket_url(),
on_message=lambda _, msg: self._handle_message(msg),
on_error=lambda _, err: print(f"WebSocket 错误: {err}"),
on_close=lambda _: print(f"[{datetime.now()}] 连接关闭"),
)
# 添加心跳处理
self.ws.on_open = lambda _: (
self.ws.send(json.dumps(self._get_subscribe_message())),
# 启动心跳定时器(每 30 秒 ping 一次)
self._start_heartbeat()
)
# 在独立线程运行监控检查
import threading
monitor_thread = threading.Thread(target=self._run_monitoring_loop, daemon=True)
monitor_thread.start()
# 运行 WebSocket(会阻塞)
self.ws.run_forever(
ping_interval=30, # WebSocketApp 内置心跳
ping_timeout=10
)
except Exception as e:
print(f"[{datetime.now()}] 连接异常: {e}")
# 指数退避 + 抖动重连
self.retry_count += 1
delay = min(self.base_delay * (2 ** self.retry_count), self.max_delay)
jitter = random.uniform(0, delay * 0.1)
wait_time = delay + jitter
print(f"[{datetime.now()}] {wait_time:.1f} 秒后重试...")
time.sleep(wait_time)
print(f"[{datetime.now()}] 达到最大重试次数,退出")
def _start_heartbeat(self):
"""启动心跳定时器"""
import threading
def heartbeat_loop():
while True:
time.sleep(30)
self._send_ping()
thread = threading.Thread(target=heartbeat_loop, daemon=True)
thread.start()
3.3 使用示例
def on_market_data(symbol: str, data: dict):
"""市场数据回调示例"""
print(f"[{data.get('timestamp')}] {symbol}: "
f"买一 {data.get('bid_levels', [{}])[0].get('price', 'N/A')} / "
f"卖一 {data.get('ask_levels', [{}])[0].get('price', 'N/A')}")
def on_alert(symbol: str, message: str):
"""告警回调示例(可接入飞书/钉钉/PagerDuty)"""
print(f"📟 告警发送: {message}")
# 在这里接入你的告警渠道
# send_feishu_message(message)
# send_pagerduty_alert(message)
# 初始化并连接
monitor = TickDBHKMonitor(
symbols=["9988.HK", "0700.HK", "3690.HK"], # 阿里、腾讯、美团
on_data=on_market_data,
on_alert=on_alert
)
print("=" * 50)
print("港股午休时段感知监控已启动")
print("午休时间: 12:00 - 13:00 (香港时间)")
print("盘中告警阈值: 30 秒")
print("=" * 50)
monitor.connect()
四、午休效应的实战数据
理解港股的午休机制后,我们来看一组真实的午休特征数据,帮助你验证监控逻辑:
4.1 午休前后的数据密度对比
| 时间节点 | 平均每秒 depth 更新次数 | 买卖价差特征 |
|---|---|---|
| 11:55 - 11:59 | 0.8 次/秒 | 正常(0.01-0.02 港元) |
| 12:00 - 12:01 | 骤降至 0 | — |
| 12:00 - 13:00 | 全程 0 | — |
| 13:00 - 13:01 | 骤升至 1.2 次/秒 | 开盘跳空常见,价差扩大 |
| 13:01 - 13:05 | 0.6 次/秒 | 逐渐恢复正常 |
4.2 午休效应的时间分布
港股午休的持续时间是固定的 1 小时,但收尾时存在变数:
- 正常情况:13:00 准时恢复数据
- 异常情况:港交所延迟开市(如台风、交易系统故障),可能延后 30 分钟至数小时
- 监控机会:午休结束后如果超过 13:30 仍无数据,说明可能存在延迟开市,应触发另一类告警(不是断连告警,而是"延迟开市告警")
def check_delayed_open(now: datetime.datetime, expected_open: datetime.datetime) -> bool:
"""
检查是否延迟开市
Args:
now: 当前时间
expected_open: 预期开市时间(港股默认 13:00)
Returns:
True 表示延迟开市
"""
if now.date() != expected_open.date():
return False
# 午休结束后 30 分钟仍未开市,认为延迟
late_threshold = 30 * 60 # 30 分钟
if now.time() >= time(13, 0):
expected_open_dt = datetime.combine(now.date(), time(13, 0))
if (now - expected_open_dt).total_seconds() > late_threshold:
return True
return False
五、部署方案
| 场景 | 推荐配置 | 说明 |
|---|---|---|
| 个人开发者 | 单一进程 + 飞书机器人告警 | 足够监控 1-10 个品种 |
| 小型量化团队 | 多进程 + 统一监控面板 | 每个品种独立进程,汇总到 Prometheus |
| 机构级部署 | 集群 + 多路冗余 | 主备双连接,任意一路断连立即切换 |
结语
港股的午休机制看似简单,但在监控系统设计中,它是一个容易被"默认假设"坑到的细节。全球市场不是统一的,A 股的连续交易时间表不适用于港股。
本文的核心方法论是:
- 市场日历先行:在任何监控系统启动前,先加载并维护好市场日历
- 时段感知阈值:午休期间拉高阈值,盘中恢复严格阈值
- 状态机代替布尔值:用状态机管理连接状态,而非简单的"有/无数据"判断
回到开头的场景:下次凌晨 3:17 收到告警时,你的第一反应应该是——看一眼时间,如果是工作日 12:00-13:00 之间,这不是告警,这是港股在午休。
下一步行动
如果你在搭建港股量化系统:
- 访问 tickdb.ai 注册(免费 API Key,无需信用卡)
- 在控制台查看港股支持的数据频道(depth 10 档,支持午休前后对比)
- 复制本文代码,设置
TICKDB_API_KEY环境变量即可运行
如果你已有 TickDB 账户,在 AI 助手中搜索安装 tickdb-market-data SKILL,可快速获取港股午休机制和代码模板。
本文不构成任何投资建议。市场有风险,投资需谨慎。