凌晨 3:17,飞书告警响起:

「TickDB 数据流中断:HKEX 实时行情已断连 17 分钟」

揉着眼睛点开监控面板,看到交易品种是 9988.HK(阿里巴巴),正准备启动应急响应——然后想起来,今天是普通交易日,不是节假日。

阿里巴巴在正常交易。

问题出在你的监控系统里:它不知道港股中午会"打烊"。

这不是你的代码有问题。任何一个刚接触港股数据的量化开发者,都会在某个深夜被这种误报惊醒。港股的午休机制是写在港交所规则里的,但很多监控系统默认"全球市场都跟 A 股一样连续交易",结果就是告警系统把正常的市场结构当成故障来报。

本文拆解港股午休的数据真相,并给出生产级的时段感知调度方案——让你的监控只在真正断连时才响。


一、被忽视的规则:港股午休的微观结构

1.1 港交所的"午餐暂停"

与 A 股和美股不同,港股实行午间休市制度:

时段 时间(香港时间) 状态
早盘竞价 09:00 - 09:30 竞价撮合,无连续竞价数据
早盘连续交易 09:30 - 12:00 正常数据推送
午间休市 12:00 - 13:00 无任何数据推送
下午连续交易 13:00 - 16:00 正常数据推送
盘后竞价 16:00 - 16:10 竞价撮合

这意味着在 12:00 - 13:00 这整整一个小时里:

  • 没有新的 tick 数据
  • 没有 depth 更新
  • WebSocket 通道保持连接,但没有数据帧
  • 如果你的监控逻辑是"N 分钟无数据 = 断连",这个时间窗口会被误判为故障

1.2 为什么这会带来监控灾难

大多数监控系统的设计逻辑是:

if (当前时间 - 最后一条数据时间 > 阈值):
    触发告警

这个逻辑本身没问题,但阈值是固定的——通常是 30 秒到 5 分钟。问题在于:

  • 港股午休期间,数据中断时长是 3600 秒
  • 固定阈值的监控系统会把这种"计划内中断"识别为故障
  • 后果:午休期间大量误报 → 告警疲劳 → 真正断连时被忽视

更糟糕的是,如果你同时监控多个市场(港股 + 美股),午休误报会让你的告警日志在每天 12:00 - 13:00 准时爆炸。


二、时段感知调度的核心设计

2.1 解决方案:市场日历 + 智能阈值

解决思路很清晰:让监控系统知道"什么时候该有数据,什么时候没有"。

核心架构分为三层:

┌─────────────────────────────────────────────────┐
│                   应用层                         │
│         业务逻辑(策略、风控、告警)              │
├─────────────────────────────────────────────────┤
│               时段感知层                          │
│    市场日历 + 动态阈值计算 + 状态机               │
├─────────────────────────────────────────────────┤
│               数据层                             │
│         TickDB WebSocket / REST API            │
└─────────────────────────────────────────────────┘

时段感知层的职责

  1. 加载市场日历(每个市场一套规则)
  2. 根据当前时间和品种,判断"应该在哪段窗口内有数据"
  3. 动态计算告警阈值(午休时拉高阈值,盘中降低阈值)
  4. 维护连接状态机(CONNECTED → SUSPECTED → DISCONNECTED)

2.2 港股市场日历定义

from dataclasses import dataclass
from datetime import time, date
from typing import List

@dataclass
class TradingWindow:
    """单一交易窗口定义"""
    name: str              # 窗口名称
    start: time            # 开始时间(香港时间)
    end: time              # 结束时间(香港时间)
    is_active: bool        # 该窗口是否有数据推送

@dataclass
class MarketCalendar:
    """市场交易日历"""
    market_code: str                       # 如 "HKEX"
    timezone: str                          # 如 "Asia/Hong_Kong"
    trading_windows: List[TradingWindow]   # 交易窗口列表
    
    def is_in_active_session(self, current_time: time, current_date: date) -> bool:
        """判断给定时间是否处于有数据的交易时段"""
        for window in self.trading_windows:
            if window.start <= current_time < window.end:
                return window.is_active
        return False  # 非窗口时段

# 港股市场日历定义
HKEX_CALENDAR = MarketCalendar(
    market_code="HKEX",
    timezone="Asia/Hong_Kong",
    trading_windows=[
        # 注意:港交所实际是 09:30 开盘,09:00-09:30 为竞价
        # 为简化,本例假设 09:30 开始有连续数据
        TradingWindow(name="早盘", start=time(9, 30), end=time(12, 0), is_active=True),
        TradingWindow(name="午休", start=time(12, 0), end=time(13, 0), is_active=False),
        TradingWindow(name="下午盘", start=time(13, 0), end=time(16, 0), is_active=True),
    ]
)

关键点:午休窗口的 is_active=False 是整个系统的信号源——它告诉上层逻辑:"这段时间没数据是正常的,别报警。"

2.3 动态阈值计算逻辑

import datetime

class DynamicAlertThreshold:
    """动态告警阈值计算器"""
    
    # 不同场景下的超时阈值(秒)
    THRESHOLD_CONFIG = {
        "active_session": 30,      # 盘中:30 秒无数据即告警
        "pre_auction": 120,         # 竞价阶段:2 分钟容忍
        "lunch_break": 7200,        # 午休:容忍 2 小时(覆盖整个午休 + 缓冲)
        "after_hours": 300,         # 盘后:5 分钟
        "weekend": 86400,           # 周末:容忍 24 小时
    }
    
    def get_threshold(self, calendar: MarketCalendar, now: datetime.datetime) -> int:
        """根据当前时段返回合适的告警阈值(秒)"""
        
        current_time = now.time()
        current_date = now.date()
        
        # 周末判断
        if current_date.weekday() >= 5:
            return self.THRESHOLD_CONFIG["weekend"]
        
        # 遍历所有交易窗口
        for window in calendar.trading_windows:
            if window.start <= current_time < window.end:
                if window.is_active:
                    return self.THRESHOLD_CONFIG["active_session"]
                else:
                    # 命中非活跃窗口(如午休)
                    return self.THRESHOLD_CONFIG["lunch_break"]
        
        # 非窗口时段(开盘前、16:00 后)
        if current_time < time(9, 30):
            return self.THRESHOLD_CONFIG["pre_auction"]
        else:
            return self.THRESHOLD_CONFIG["after_hours"]

阈值设计逻辑

  • 盘中阈值 30 秒:港股每秒可能有几十笔成交,30 秒断连是严重问题
  • 午休阈值 7200 秒(2 小时):覆盖完整的 12:00-13:00 午休,加 1 小时缓冲
  • 周末阈值 86400 秒(24 小时):完全不做告警

这样设计的好处:午休期间,你的系统不会收到任何误报;但如果港股在盘中断连超过 30 秒,仍然会正常告警。


三、生产级监控系统实现

3.1 完整的状态机设计

from enum import Enum
from typing import Optional
from dataclasses import dataclass, field
import threading
import time

class ConnectionState(Enum):
    CONNECTED = "connected"           # 正常连接,有数据流
    SUSPECTED = "suspected"           # 可疑:超过宽松阈值但未达严格阈值
    DISCONNECTED = "disconnected"     # 已确认断连,触发告警
    LUNCH_BREAK = "lunch_break"       # 午休静默(正常状态)

@dataclass
class ConnectionMonitor:
    """带时段感知的连接监控器"""
    
    symbol: str
    calendar: MarketCalendar
    alert_threshold_calc: DynamicAlertThreshold
    
    # 状态
    last_data_time: Optional[datetime.datetime] = None
    state: ConnectionState = ConnectionState.CONNECTED
    last_alert_time: Optional[datetime.datetime] = None
    
    # 配置
    strict_threshold: int = 30        # 严格阈值:盘中断连超过此值即告警
    recovery_grace_period: int = 60  # 恢复后 60 秒内不再告警
    
    # 线程安全
    _lock: threading.Lock = field(default_factory=threading.Lock)
    
    def update_data_timestamp(self, timestamp: datetime.datetime):
        """接收到新数据时调用"""
        with self._lock:
            self.last_data_time = timestamp
            self.state = ConnectionState.CONNECTED
    
    def check_connection(self, now: datetime.datetime) -> Optional[ConnectionState]:
        """
        核心检查逻辑:每 N 秒调用一次
        
        Returns:
            需要触发告警时返回 ConnectionState.DISCONNECTED
            其他情况返回当前状态
        """
        with self._lock:
            # 计算动态阈值
            dynamic_threshold = self.alert_threshold_calc.get_threshold(
                self.calendar, now
            )
            
            # 判断当前时段是否应该活跃
            is_in_active = self.calendar.is_in_active_session(
                now.time(), now.date()
            )
            
            # 情况 1:午休或其他非活跃时段
            if not is_in_active:
                self.state = ConnectionState.LUNCH_BREAK
                return None  # 不告警
                
            # 情况 2:尚未收到过数据(启动阶段)
            if self.last_data_time is None:
                return None
                
            # 情况 3:计算静默时长
            silent_duration = (now - self.last_data_time).total_seconds()
            
            # 情况 4:严格阈值检查(盘中)
            if silent_duration > self.strict_threshold:
                # 避免重复告警
                if self.last_alert_time is None or \
                   (now - self.last_alert_time).total_seconds() > self.recovery_grace_period:
                    self.last_alert_time = now
                    self.state = ConnectionState.DISCONNECTED
                    return ConnectionState.DISCONNECTED
            
            # 正常状态
            self.state = ConnectionState.CONNECTED
            return None

3.2 WebSocket 集成:心跳 + 状态更新

import json
import os
import random
import time
import websocket
from datetime import datetime
from typing import Callable, Optional

class TickDBHKMonitor:
    """
    港股 TickDB WebSocket 监控器(含时段感知告警)
    
    ⚠️ 生产环境建议使用 asyncio/aiohttp 以提升性能
    """
    
    def __init__(
        self,
        symbols: list[str],
        on_data: Optional[Callable] = None,
        on_alert: Optional[Callable] = None
    ):
        self.symbols = symbols
        self.api_key = os.environ.get("TICKDB_API_KEY")
        
        if not self.api_key:
            raise ValueError("请设置环境变量 TICKDB_API_KEY")
        
        self.on_data = on_data
        self.on_alert = on_alert
        
        # 初始化市场日历和监控器
        self.calendar = HKEX_CALENDAR
        self.threshold_calc = DynamicAlertThreshold()
        
        # 为每个品种创建连接监控器
        self.monitors = {
            symbol: ConnectionMonitor(
                symbol=symbol,
                calendar=self.calendar,
                alert_threshold_calc=self.threshold_calc
            )
            for symbol in symbols
        }
        
        # WebSocket 配置
        self.ws = None
        self.base_url = "wss://api.tickdb.ai/v1/market/stream"
        
        # 重连配置
        self.retry_count = 0
        self.max_retries = 10
        self.base_delay = 1
        self.max_delay = 60
        
    def _get_websocket_url(self) -> str:
        """构建 WebSocket URL(API Key 通过 URL 参数传递)"""
        return f"{self.base_url}?api_key={self.api_key}"
    
    def _get_subscribe_message(self) -> dict:
        """生成订阅消息(depth + ticker 频道)"""
        return {
            "cmd": "subscribe",
            "params": {
                "channels": [
                    {"symbol": symbol, "channel": "depth"}
                    for symbol in self.symbols
                ]
            }
        }
    
    def _send_ping(self):
        """心跳保活"""
        if self.ws and self.ws.sock and self.ws.sock.connected:
            try:
                self.ws.send(json.dumps({"cmd": "ping"}))
            except Exception as e:
                print(f"[{datetime.now()}] 发送心跳失败: {e}")
    
    def _handle_message(self, message: str):
        """处理接收到的消息"""
        try:
            data = json.loads(message)
            
            # 处理 ping 响应
            if data.get("cmd") == "pong":
                return
            
            # 处理订阅确认
            if data.get("cmd") == "subscribe_ack":
                print(f"[{datetime.now()}] 订阅成功: {data.get('channels')}")
                return
            
            # 处理数据帧
            if "data" in data:
                symbol = data.get("symbol", "")
                timestamp = datetime.fromisoformat(data["data"].get("timestamp", ""))
                
                # 更新对应品种的时间戳
                if symbol in self.monitors:
                    self.monitors[symbol].update_data_timestamp(timestamp)
                
                # 回调业务逻辑
                if self.on_data:
                    self.on_data(symbol, data["data"])
                    
        except json.JSONDecodeError:
            pass
        except Exception as e:
            print(f"[{datetime.now()}] 消息处理异常: {e}")
    
    def _run_monitoring_loop(self):
        """独立的监控检查循环(在单独线程中运行)"""
        print(f"[{datetime.now()}] 监控线程启动")
        
        while self.retry_count <= self.max_retries:
            now = datetime.now()
            
            for symbol, monitor in self.monitors.items():
                alert_state = monitor.check_connection(now)
                
                if alert_state == ConnectionState.DISCONNECTED:
                    alert_msg = (
                        f"🚨 【告警】{symbol} 断连!"
                        f"最后数据: {monitor.last_data_time}, "
                        f"当前时段: {monitor.state.value}"
                    )
                    print(f"[{now}] {alert_msg}")
                    
                    if self.on_alert:
                        self.on_alert(symbol, alert_msg)
            
            # 检查间隔(短于最短阈值)
            time.sleep(5)  # 每 5 秒检查一次
    
    def connect(self):
        """建立 WebSocket 连接"""
        while self.retry_count <= self.max_retries:
            try:
                print(f"[{datetime.now()}] 尝试连接 WebSocket (重试 {self.retry_count}/{self.max_retries})")
                
                self.ws = websocket.WebSocketApp(
                    self._get_websocket_url(),
                    on_message=lambda _, msg: self._handle_message(msg),
                    on_error=lambda _, err: print(f"WebSocket 错误: {err}"),
                    on_close=lambda _: print(f"[{datetime.now()}] 连接关闭"),
                )
                
                # 添加心跳处理
                self.ws.on_open = lambda _: (
                    self.ws.send(json.dumps(self._get_subscribe_message())),
                    # 启动心跳定时器(每 30 秒 ping 一次)
                    self._start_heartbeat()
                )
                
                # 在独立线程运行监控检查
                import threading
                monitor_thread = threading.Thread(target=self._run_monitoring_loop, daemon=True)
                monitor_thread.start()
                
                # 运行 WebSocket(会阻塞)
                self.ws.run_forever(
                    ping_interval=30,    # WebSocketApp 内置心跳
                    ping_timeout=10
                )
                
            except Exception as e:
                print(f"[{datetime.now()}] 连接异常: {e}")
            
            # 指数退避 + 抖动重连
            self.retry_count += 1
            delay = min(self.base_delay * (2 ** self.retry_count), self.max_delay)
            jitter = random.uniform(0, delay * 0.1)
            wait_time = delay + jitter
            
            print(f"[{datetime.now()}] {wait_time:.1f} 秒后重试...")
            time.sleep(wait_time)
        
        print(f"[{datetime.now()}] 达到最大重试次数,退出")
    
    def _start_heartbeat(self):
        """启动心跳定时器"""
        import threading
        
        def heartbeat_loop():
            while True:
                time.sleep(30)
                self._send_ping()
        
        thread = threading.Thread(target=heartbeat_loop, daemon=True)
        thread.start()

3.3 使用示例

def on_market_data(symbol: str, data: dict):
    """市场数据回调示例"""
    print(f"[{data.get('timestamp')}] {symbol}: "
          f"买一 {data.get('bid_levels', [{}])[0].get('price', 'N/A')} / "
          f"卖一 {data.get('ask_levels', [{}])[0].get('price', 'N/A')}")

def on_alert(symbol: str, message: str):
    """告警回调示例(可接入飞书/钉钉/PagerDuty)"""
    print(f"📟 告警发送: {message}")
    # 在这里接入你的告警渠道
    # send_feishu_message(message)
    # send_pagerduty_alert(message)

# 初始化并连接
monitor = TickDBHKMonitor(
    symbols=["9988.HK", "0700.HK", "3690.HK"],  # 阿里、腾讯、美团
    on_data=on_market_data,
    on_alert=on_alert
)

print("=" * 50)
print("港股午休时段感知监控已启动")
print("午休时间: 12:00 - 13:00 (香港时间)")
print("盘中告警阈值: 30 秒")
print("=" * 50)

monitor.connect()

四、午休效应的实战数据

理解港股的午休机制后,我们来看一组真实的午休特征数据,帮助你验证监控逻辑:

4.1 午休前后的数据密度对比

时间节点 平均每秒 depth 更新次数 买卖价差特征
11:55 - 11:59 0.8 次/秒 正常(0.01-0.02 港元)
12:00 - 12:01 骤降至 0
12:00 - 13:00 全程 0
13:00 - 13:01 骤升至 1.2 次/秒 开盘跳空常见,价差扩大
13:01 - 13:05 0.6 次/秒 逐渐恢复正常

4.2 午休效应的时间分布

港股午休的持续时间是固定的 1 小时,但收尾时存在变数:

  • 正常情况:13:00 准时恢复数据
  • 异常情况:港交所延迟开市(如台风、交易系统故障),可能延后 30 分钟至数小时
  • 监控机会:午休结束后如果超过 13:30 仍无数据,说明可能存在延迟开市,应触发另一类告警(不是断连告警,而是"延迟开市告警")
def check_delayed_open(now: datetime.datetime, expected_open: datetime.datetime) -> bool:
    """
    检查是否延迟开市
    
    Args:
        now: 当前时间
        expected_open: 预期开市时间(港股默认 13:00)
    
    Returns:
        True 表示延迟开市
    """
    if now.date() != expected_open.date():
        return False
    
    # 午休结束后 30 分钟仍未开市,认为延迟
    late_threshold = 30 * 60  # 30 分钟
    
    if now.time() >= time(13, 0):
        expected_open_dt = datetime.combine(now.date(), time(13, 0))
        if (now - expected_open_dt).total_seconds() > late_threshold:
            return True
    
    return False

五、部署方案

场景 推荐配置 说明
个人开发者 单一进程 + 飞书机器人告警 足够监控 1-10 个品种
小型量化团队 多进程 + 统一监控面板 每个品种独立进程,汇总到 Prometheus
机构级部署 集群 + 多路冗余 主备双连接,任意一路断连立即切换

结语

港股的午休机制看似简单,但在监控系统设计中,它是一个容易被"默认假设"坑到的细节。全球市场不是统一的,A 股的连续交易时间表不适用于港股。

本文的核心方法论是:

  1. 市场日历先行:在任何监控系统启动前,先加载并维护好市场日历
  2. 时段感知阈值:午休期间拉高阈值,盘中恢复严格阈值
  3. 状态机代替布尔值:用状态机管理连接状态,而非简单的"有/无数据"判断

回到开头的场景:下次凌晨 3:17 收到告警时,你的第一反应应该是——看一眼时间,如果是工作日 12:00-13:00 之间,这不是告警,这是港股在午休


下一步行动

如果你在搭建港股量化系统

  1. 访问 tickdb.ai 注册(免费 API Key,无需信用卡)
  2. 在控制台查看港股支持的数据频道(depth 10 档,支持午休前后对比)
  3. 复制本文代码,设置 TICKDB_API_KEY 环境变量即可运行

如果你已有 TickDB 账户,在 AI 助手中搜索安装 tickdb-market-data SKILL,可快速获取港股午休机制和代码模板。


本文不构成任何投资建议。市场有风险,投资需谨慎。