凌晨 3 点的告警:一位量化开发者的午休噩梦

凌晨 3 点,睡眼惺忪的你被手机震醒——监控系统发来告警:“数据流中断”。你条件反射地打开终端,手指悬在重启键上方。但定睛一看,时间戳显示 12:47。港股的午休时段。

这不是数据问题,是系统设计缺陷。

当监控系统不具备“市场时段感知”能力,正常的交易间隔会被误判为连接故障。随之而来的,可能是过度重连导致的 API 限频、凌晨无意义的告警通知,以及工程师对告警系统的信任崩塌。

本文拆解港股午休期的数据静默机制,给出生产级的时段感知调度方案,并提供完整的代码实现。


一、港股交易时段的微观结构

港股实行与 A 股、美股截然不同的分段交易机制,理解这一机制是构建健壮监控系统的前提。

1.1 交易时段划分

港股每个交易日的标准时段如下:

时段 时间 说明
开盘前竞价 09:00-09:30 盘前竞价,成交量有限
早市 09:30-12:00 连续竞价,正常数据流
午间休市 12:00-13:00 无数据推送,整整 1 小时
午市 13:00-16:00 连续竞价,正常数据流
收盘后竞价 16:00-16:10 盘后竞价(部分标的)

关键的生理现象:12:00-13:00 的整整 60 分钟内,交易所不推送任何行情数据。这不是 API 问题,不是网络问题,是市场机制的一部分。

1.2 数据流的时间分布特征

以 9988.HK(阿里巴巴)为例,一个典型交易日的 WebSocket 数据到达频率:

时段 预估消息频率 订单簿状态
09:30-12:00 50-200 msg/s 活跃,深度 10 档可用
12:00-13:00 0 msg/s 静止,无任何推送
13:00-16:00 50-200 msg/s 活跃恢复
16:00 后 <5 msg/s 收盘数据衰减

量化开发者的核心挑战:设计监控系统时,必须将这种“非连续性”纳入架构,不能假设数据流在交易日内始终存在。


二、误告警的根因分析

没有市场日历意识(Market Calendar Awareness)的监控系统,典型架构是:

WebSocket 连接 → 数据接收超时 → 触发重连 → 告警

这套逻辑在 12:00-13:00 会产生以下问题:

2.1 三种典型的误报场景

场景一:固定超时触发误报

监控系统设置 30 秒无数据则判定为“连接断开”。12:00 整,数据流停止,30 秒后告警,12:01 你被叫醒。

场景二:指数退避重连风暴

告警触发后,系统自动执行重连。如果使用标准的指数退避(如首次等待 1 秒,失败后 2 秒、4 秒...),在午休期间会连续发起重连请求。这不仅浪费资源,更可能触发 API 的限频机制(code: 3001),导致午休结束后真正需要恢复时反而被限流。

场景三:夜间无数据误判

港股收盘后(16:00)到次日开盘前(09:00),长达 17 小时无数据。缺乏市场日历的系统会在凌晨反复告警,工程师的睡眠和告警信噪比同步崩溃。

2.2 根因:系统假设“数据应该始终存在”

这是监控系统设计的第一原罪:用处理互联网服务中断的思维,处理金融市场的正常交易间隔

金融市场的数据流天然是非连续的——有开盘、午休、收盘、节假日。任何脱离市场日历的监控设计,都是在给自己挖坑。


三、时段感知调度:架构设计

正确的解决方案是在监控逻辑中嵌入市场日历感知(Market Calendar Awareness),使系统具备“时间上下文”(Temporal Context)。

3.1 核心架构

┌─────────────────────────────────────────────────────┐
│                    监控主循环                        │
│  ┌──────────┐    ┌──────────────┐    ┌──────────┐ │
│  │ 市场日历  │ → │ 时段判定器    │ → │ 差异化响应│ │
│  │ Calendar  │    │ TimeContext   │    │ Response │ │
│  └──────────┘    └──────────────┘    └──────────┘ │
│       ↑               ↑                   ↑       │
│       └───────────────┴───────────────────┘       │
│              状态上下文传递                         │
└─────────────────────────────────────────────────────┘

时段判定器输出:
- TRADING: 正常交易,监控数据频率和质量
- BREAK: 午休/收盘,暂停告警、保持连接
- PRE_MARKET: 盘前,监控竞价异常
- HOLIDAY: 节假日,深度静默

3.2 时段判定器逻辑

from enum import Enum
from datetime import datetime, time
import pytz

class MarketSession(Enum):
    """港股市场时段枚举"""
    PRE_MARKET = "pre_market"          # 盘前竞价 09:00-09:30
    TRADING_MORNING = "trading_morning" # 早市 09:30-12:00
    BREAK = "break"                     # 午休 12:00-13:00
    TRADING_AFTERNOON = "trading_afternoon" # 午市 13:00-16:00
    CLOSED = "closed"                   # 收盘后或其他时间
    HOLIDAY = "holiday"                 # 节假日

class HKMarketCalendar:
    """
    港股市场日历:判定当前时段
    
    核心职责:
    1. 根据系统时间判断当前所属交易时段
    2. 提供是否为交易日的判定
    3. 计算距离下一个关键时间点(开盘、午休结束、收盘)的秒数
    """
    
    # 港股交易时段定义
    SESSIONS = [
        (time(9, 0), time(9, 30), MarketSession.PRE_MARKET),
        (time(9, 30), time(12, 0), MarketSession.TRADING_MORNING),
        (time(12, 0), time(13, 0), MarketSession.BREAK),
        (time(13, 0), time(16, 0), MarketSession.TRADING_AFTERNOON),
        (time(16, 0), time(23, 59), MarketSession.CLOSED),
        (time(0, 0), time(9, 0), MarketSession.CLOSED),
    ]
    
    def __init__(self, tz: str = "Asia/Hong_Kong"):
        self.tz = pytz.timezone(tz)
    
    def now(self) -> datetime:
        """获取当前港股时间"""
        return datetime.now(self.tz)
    
    def get_current_session(self) -> MarketSession:
        """获取当前市场时段"""
        current_time = self.now().time()
        
        for start, end, session in self.SESSIONS:
            if start <= current_time < end:
                return session
        
        return MarketSession.CLOSED
    
    def is_trading(self) -> bool:
        """当前是否在交易时段(早市或午市)"""
        session = self.get_current_session()
        return session in (
            MarketSession.PRE_MARKET,
            MarketSession.TRADING_MORNING,
            MarketSession.TRADING_AFTERNOON
        )
    
    def is_trading_active(self) -> bool:
        """当前是否在连续竞价时段(排除盘前竞价)"""
        session = self.get_current_session()
        return session in (
            MarketSession.TRADING_MORNING,
            MarketSession.TRADING_AFTERNOON
        )
    
    def is_break(self) -> bool:
        """当前是否在午休时段"""
        return self.get_current_session() == MarketSession.BREAK
    
    def get_seconds_to_next_session(self) -> int:
        """获取距离下一时段的秒数(用于精确等待)"""
        now = self.now()
        now_time = now.time()
        
        # 遍历所有时段,找到下一个起点
        for start, end, session in self.SESSIONS:
            if now_time < start:
                # 计算到下一个时段起点的时间
                next_start = now.replace(
                    hour=start.hour, 
                    minute=start.minute, 
                    second=0, 
                    microsecond=0
                )
                # 如果跨越了午夜,需要加一天
                if next_start <= now:
                    from datetime import timedelta
                    next_start += timedelta(days=1)
                return int((next_start - now).total_seconds())
        
        # 如果当前时间在最后一个时段之后,返回到次日第一个时段的时间
        from datetime import timedelta
        tomorrow = now + timedelta(days=1)
        first_start = self.SESSIONS[0][0]
        next_start = tomorrow.replace(
            hour=first_start.hour,
            minute=first_start.minute,
            second=0,
            microsecond=0
        )
        return int((next_start - now).total_seconds())
    
    def is_trading_day(self) -> bool:
        """
        判断是否为交易日
        
        注意:完整的交易日判断需要对接交易所日历或节假日列表。
        此处给出简化版:工作日(非周六周日)为假设的交易日。
        生产环境应接入标准节假日数据源。
        """
        weekday = self.now().weekday()
        return weekday < 5  # 0=Monday, 4=Friday
    
    def get_session_info(self) -> dict:
        """获取完整的时段信息,用于日志和监控"""
        session = self.get_current_session()
        now = self.now()
        
        info = {
            "current_time": now.isoformat(),
            "session": session.value,
            "is_trading_day": self.is_trading_day(),
            "seconds_to_next_session": self.get_seconds_to_next_session(),
            "next_action": self._get_next_action_description()
        }
        return info
    
    def _get_next_action_description(self) -> str:
        """获取下一动作描述"""
        session = self.get_current_session()
        seconds = self.get_seconds_to_next_session()
        minutes = seconds // 60
        
        descriptions = {
            MarketSession.PRE_MARKET: f"开盘在即,还有 {minutes} 分钟",
            MarketSession.TRADING_MORNING: f"午休即将开始,还有 {minutes} 分钟",
            MarketSession.BREAK: f"午休进行中,距离午市开盘还有 {minutes} 分钟",
            MarketSession.TRADING_AFTERNOON: f"午市进行中,收盘还有 {minutes} 分钟",
            MarketSession.CLOSED: f"已收盘,开盘还有 {minutes} 分钟",
            MarketSession.HOLIDAY: f"非交易日,请检查日历"
        }
        return descriptions.get(session, "状态未知")

关键设计决策:时段判定器不依赖外部 API 调用,而是基于本地时钟计算。这确保了在网络不稳定或 API 限流期间,时段判断依然准确。


四、生产级 WebSocket 监控:时段感知版

现在将市场日历感知嵌入 WebSocket 监控客户端,实现真正的“智能静默”。

4.1 整体架构

import json
import time
import threading
import random
import logging
from typing import Callable, Optional
from collections import deque
import os

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger(__name__)

class HKMarketMonitor:
    """
    港股市场时段感知 WebSocket 监控客户端
    
    核心增强:
    1. 内置市场日历感知,午休期不告警
    2. 动态超时策略:交易时段严格,休市时段宽松
    3. 智能重连:在正确的时机恢复连接
    4. 数据流质量监控:区分“数据缺失”和“连接断开”
    """
    
    # 交易时段的超时阈值(秒)
    TRADING_TIMEOUT = 10          # 交易时段:10秒无数据则告警
    BREAK_TIMEOUT = 3600         # 午休时段:容忍1小时静默
    PRE_MARKET_TIMEOUT = 60       # 盘前时段:60秒无数据则告警
    
    # 重连参数
    MAX_RETRY = 5
    BASE_DELAY = 1
    MAX_DELAY = 60
    
    def __init__(
        self,
        api_key: str,
        symbols: list,
        on_data: Callable,
        on_alert: Callable,
        ws_url: str = "wss://api.tickdb.ai/ws/v1/market"
    ):
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("API Key 未配置,请设置 TICKDB_API_KEY 环境变量")
        
        self.symbols = symbols
        self.on_data = on_data
        self.on_alert = on_alert
        
        self.ws_url = ws_url
        self.ws = None
        self.running = False
        self.last_data_time = None
        self.last_confirmed_session = None  # 上一次确认的市场时段
        
        # 市场日历实例
        self.calendar = HKMarketCalendar()
        
        # 数据流质量监控
        self.message_buffer = deque(maxlen=100)  # 保留最近100条消息的时间戳
        self.missing_heartbeats = 0
        
        # 线程安全
        self._lock = threading.Lock()
        self._monitor_thread = None
        
        # 状态统计
        self.stats = {
            "total_messages": 0,
            "alerts_triggered": 0,
            "false_alerts_prevented": 0,
            "reconnections": 0,
            "break_time_received": 0
        }
    
    def start(self):
        """启动监控"""
        if self.running:
            logger.warning("监控已在运行中")
            return
        
        self.running = True
        self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
        self._monitor_thread.start()
        logger.info(f"监控启动,监控标的:{self.symbols}")
    
    def stop(self):
        """停止监控"""
        self.running = False
        if self.ws:
            try:
                self.ws.close()
            except Exception:
                pass
        logger.info("监控已停止")
    
    def _monitor_loop(self):
        """
        主监控循环:整合市场日历感知与数据流监控
        
        核心逻辑:
        1. 判断当前市场时段
        2. 根据时段选择差异化的超时策略
        3. 在正确时机执行重连
        """
        retry_count = 0
        
        while self.running:
            current_session = self.calendar.get_current_session()
            
            # ==================== 时段感知决策 ====================
            if current_session == MarketSession.BREAK:
                # 午休时段:保持连接、监控重连、零告警
                logger.info(f"【{current_session.value}】进入午休时段,数据静默为正常")
                self._handle_break_session()
                continue
                
            elif current_session == MarketSession.CLOSED:
                # 收盘或盘前:深度静默,不重连
                logger.info(f"【{current_session.value}】非交易时段,保持连接但不监控")
                self._handle_closed_session()
                continue
                
            # ==================== 交易时段:正常监控 ====================
            if not self.ws or not self._is_ws_connected():
                # 需要连接或重连
                if retry_count >= self.MAX_RETRY:
                    logger.error(f"已达到最大重试次数({self.MAX_RETRY}),请人工介入")
                    self.on_alert({
                        "type": "connection_failure",
                        "message": "连续重连失败,请检查网络和 API Key",
                        "retry_count": retry_count
                    })
                    break
                
                success = self._connect()
                if success:
                    retry_count = 0
                    self.stats["reconnections"] += 1
                    logger.info("WebSocket 连接成功")
                else:
                    retry_count += 1
                    delay = self._calculate_backoff(retry_count)
                    logger.warning(f"连接失败,{delay:.1f}秒后重试(第{retry_count}次)")
                    time.sleep(delay)
                    continue
            
            # 交易时段的数据流监控
            self._monitor_data_flow(current_session)
            
            # 短暂休眠,避免空转
            time.sleep(1)
    
    def _handle_break_session(self):
        """
        处理午休时段:智能静默
        
        午休时段的处理策略:
        1. 如果已连接,保持连接(避免复市后重新建立连接的开销)
        2. 不执行重连逻辑
        3. 不触发告警
        4. 定期输出状态日志(每 10 分钟),便于人工观察
        """
        if self.ws and self._is_ws_connected():
            # 连接正常,仅维持心跳
            logger.debug(f"【午休】连接正常,静默期已持续 {self._get_silence_duration():.0f} 秒")
        
        # 每 10 分钟输出一次状态(便于运维观察)
        silence_duration = self._get_silence_duration()
        if silence_duration > 0 and silence_duration % 600 < 1:
            session_info = self.calendar.get_session_info()
            logger.info(f"【午休状态报告】{session_info['next_action']},"
                       f"已静默 {silence_duration:.0f} 秒,无告警(正常)")
        
        # 午休结束时,等待 5 秒后再开始数据流监控(给交易所数据推送留出启动时间)
        if self.last_confirmed_session == MarketSession.BREAK:
            pass  # 刚进入午休,不做额外处理
        else:
            # 刚从午休恢复,准备接收数据
            logger.info("午休结束,5 秒后开始监控午市数据流")
            time.sleep(5)
        
        self.last_confirmed_session = MarketSession.BREAK
    
    def _handle_closed_session(self):
        """处理收盘/非交易时段:深度静默"""
        # 收盘时段不进行主动重连
        if self.ws:
            try:
                self.ws.close()
                self.ws = None
                logger.debug("收盘时段,关闭 WebSocket 连接")
            except Exception:
                pass
        
        # 非交易时段的长等待:等待到下一交易时段
        seconds_to_wait = self.calendar.get_seconds_to_next_session()
        logger.info(f"非交易时段,休眠 {seconds_to_wait} 秒至下一时段")
        time.sleep(min(seconds_to_wait, 300))  # 最多休眠5分钟,避免死锁
    
    def _monitor_data_flow(self, session: MarketSession):
        """
        监控数据流:区分正常静默与异常断开
        
        差异化超时策略:
        - 盘前竞价:60秒无数据告警(可能存在竞价异常)
        - 早市/午市:10秒无数据告警(真正的连接问题)
        """
        silence_duration = self._get_silence_duration()
        
        # 选择对应的超时阈值
        if session == MarketSession.PRE_MARKET:
            timeout = self.PRE_MARKET_TIMEOUT
        elif session in (MarketSession.TRADING_MORNING, MarketSession.TRADING_AFTERNOON):
            timeout = self.TRADING_TIMEOUT
        else:
            timeout = self.BREAK_TIMEOUT
        
        if silence_duration > timeout:
            # 可能存在连接问题,但先检查是否在合理范围内
            self.stats["alerts_triggered"] += 1
            
            alert_payload = {
                "type": "data_flow_issue",
                "session": session.value,
                "silence_duration": silence_duration,
                "timeout_threshold": timeout,
                "message": f"{session.value} 时段已静默 {silence_duration:.0f} 秒"
            }
            
            logger.warning(f"触发告警:{alert_payload['message']}")
            self.on_alert(alert_payload)
    
    def _connect(self) -> bool:
        """
        建立 WebSocket 连接
        
        鉴权方式:URL 参数传递 api_key
        """
        try:
            import websocket
            
            # 构建带鉴权的 URL
            full_url = f"{self.ws_url}?api_key={self.api_key}"
            
            self.ws = websocket.WebSocketApp(
                full_url,
                on_message=self._on_message,
                on_error=self._on_error,
                on_close=self._on_close,
                on_open=self._on_open
            )
            
            # 设置心跳
            self.ws.sock = None  # 初始化
            
            # 在独立线程中运行
            ws_thread = threading.Thread(
                target=self.ws.run_forever,
                kwargs={"ping_interval": 30, "ping_timeout": 10},
                daemon=True
            )
            ws_thread.start()
            
            return True
            
        except Exception as e:
            logger.error(f"WebSocket 连接失败:{e}")
            return False
    
    def _on_open(self, ws):
        """WebSocket 连接打开后,订阅标的"""
        logger.info("WebSocket 连接已打开,订阅行情数据")
        
        # 订阅 tick 数据(港股可用)
        subscribe_msg = {
            "cmd": "subscribe",
            "args": {
                "symbols": self.symbols,
                "channels": ["tick"]
            }
        }
        
        try:
            ws.send(json.dumps(subscribe_msg))
            logger.info(f"已订阅:{self.symbols}")
        except Exception as e:
            logger.error(f"订阅失败:{e}")
    
    def _on_message(self, ws, message):
        """处理收到的消息"""
        self.last_data_time = time.time()
        self.message_buffer.append(self.last_data_time)
        self.stats["total_messages"] += 1
        
        try:
            data = json.loads(message)
            
            # 处理 tick 数据
            if data.get("type") == "tick":
                self.on_data(data)
            
            # 处理限频响应
            elif data.get("code") == 3001:
                retry_after = int(data.get("retry_after", 5))
                logger.warning(f"限频触发,等待 {retry_after} 秒")
                time.sleep(retry_after)
            
        except json.JSONDecodeError:
            logger.warning(f"JSON 解析失败:{message[:100]}")
    
    def _on_error(self, ws, error):
        """错误处理"""
        logger.error(f"WebSocket 错误:{error}")
    
    def _on_close(self, ws, close_status_code, close_msg):
        """连接关闭处理"""
        logger.info(f"WebSocket 连接关闭:{close_status_code} - {close_msg}")
        
        # 关闭后不立即重连,等待主循环决策
        # 时段感知逻辑决定是否需要重连
        self.ws = None
    
    def _is_ws_connected(self) -> bool:
        """检查 WebSocket 连接状态"""
        if not self.ws or not self.ws.sock:
            return False
        return self.ws.sock.connected
    
    def _get_silence_duration(self) -> float:
        """获取当前静默时长(秒)"""
        if not self.last_data_time:
            return 0.0
        
        # 如果 WebSocket 未连接,从上次确认的数据时间计算
        if not self._is_ws_connected():
            return time.time() - self.last_data_time
        
        return time.time() - self.last_data_time
    
    def _calculate_backoff(self, retry_count: int) -> float:
        """
        计算指数退避延迟(含抖动)
        
        公式:min(BASE_DELAY * 2^retry_count + random(0, 10%), MAX_DELAY)
        """
        delay = self.BASE_DELAY * (2 ** retry_count)
        jitter = random.uniform(0, delay * 0.1)  # 10% 抖动
        return min(delay + jitter, self.MAX_DELAY)
    
    def get_stats(self) -> dict:
        """获取监控统计信息"""
        return {
            **self.stats,
            "current_session": self.calendar.get_current_session().value,
            "silence_duration": self._get_silence_duration(),
            "is_connected": self._is_ws_connected()
        }

4.2 使用示例

import json

def handle_data(data):
    """数据处理回调"""
    symbol = data.get("symbol", "UNKNOWN")
    price = data.get("price", 0)
    volume = data.get("volume", 0)
    print(f"[数据] {symbol}: ${price} | 成交量: {volume}")

def handle_alert(alert):
    """告警处理回调"""
    alert_type = alert.get("type")
    
    if alert_type == "data_flow_issue":
        # 检查是否因午休触发(误报过滤)
        session = alert.get("session")
        if session == "break":
            print(f"【信息】午休期间,静默 {alert.get('silence_duration')} 秒(正常)")
            return  # 不执行实际告警
        
        # 非午休时段的告警才真正触发通知
        print(f"🚨 【告警】{alert.get('message')}")
        
    elif alert_type == "connection_failure":
        print(f"🔴 【严重】{alert.get('message')}")

# 初始化监控
monitor = HKMarketMonitor(
    api_key=os.environ.get("TICKDB_API_KEY"),
    symbols=["9988.HK", "0700.HK", "3690.HK"],
    on_data=handle_data,
    on_alert=handle_alert
)

# 启动
monitor.start()

# 主线程保持运行
try:
    while True:
        time.sleep(60)
        stats = monitor.get_stats()
        print(f"[状态] {stats['current_session']} | 静默 {stats['silence_duration']:.0f}s | "
              f"消息数 {stats['total_messages']} | 告警 {stats['alerts_triggered']}")
except KeyboardInterrupt:
    monitor.stop()
    print("监控已停止")

4.3 运行输出示例

2026-04-25 11:59:58 [INFO] 监控启动,监控标的:['9988.HK', '0700.HK']
2026-04-25 12:00:00 [INFO] 【trading_morning】进入午休时段,数据静默为正常
2026-04-25 12:10:00 [INFO] 【午休状态报告】午休进行中,距离午市开盘还有 50 分钟,已静默 600 秒,无告警(正常)
2026-04-25 12:15:01 [INFO] 【午休状态报告】午休进行中,距离午市开盘还有 45 分钟,已静默 900 秒,无告警(正常)
2026-04-25 13:00:00 [INFO] 午休结束,5 秒后开始监控午市数据流
2026-04-25 13:00:06 [INFO] WebSocket 连接成功
2026-04-25 13:00:06 [INFO] WebSocket 连接已打开,订阅行情数据
2026-04-25 13:00:06 [INFO] 已订阅:['9988.HK', '0700.HK']
[数据] 9988.HK: $82.50 | 成交量: 1000
[数据] 0700.HK: $378.20 | 成交量: 500

五、午休期数据流的深度分析

5.1 为什么午休期间没有数据?

港股午休的设计有历史原因和现实考量:

历史渊源:香港联交所成立初期,交易大厅内需要人工传递纸质委托单,中午休市让经纪人有时间处理午间堆积的订单。随着电子化交易普及,这一制度保留至今,更多是出于市场参与者(尤其是机构)的习惯需求。

现实考量

  • 机构投资者需要午间复盘上午交易、调整下午策略
  • 亚洲市场跨时区联动,午休是缓冲期(A股 11:30-13:00 也是同理)
  • 部分对冲基金在午休期间调整持仓,下午开盘可能出现大幅跳空

5.2 午休期间你能做什么?

午休的 60 分钟静默期,不是“系统故障的隐患期”,而是“策略准备的黄金期”。

可执行任务 说明
上午策略复盘 分析订单簿变化、买卖压力比、流动性分布
下午仓位调整 基于午间新闻和隔夜美股期货调整
数据校准 检查 TickDB 返回数据的完整性(是否漏掉 tick)
系统自检 执行健康检查,确认下午开盘时连接正常

以下是午休期间的数据完整性检查代码:

def verify_data_integrity(monitor: HKMarketMonitor, symbol: str, lookback_minutes: int = 180):
    """
    验证上午数据的完整性
    
    检查项:
    1. 是否存在数据稀疏区间(可能存在连接问题)
    2. 成交量的合理性(是否存在异常大单或异常静默)
    3. 价格跳空的幅度(是否存在行情延迟)
    """
    # ⚠️ 注意:此处需要存储上午的数据流
    # 可使用 TickDB 的历史数据接口进行交叉验证
    # 但 TickDB trades 接口不支持美股和港股,仅支持港股和数字货币
    
    print(f"数据完整性验证:{symbol}")
    print("  - 建议使用 TickDB /v1/market/kline 接口获取 1 分钟 K 线数据")
    print("  - 对比实时数据与历史数据的成交量/价格一致性")
    print("  - 如发现显著差异,记录为潜在数据问题供后续排查")

六、部署方案

6.1 分场景配置建议

场景 特点 推荐配置
个人量化 低频监控,轻量级 使用上述单进程方案,午休期完全静默
团队协作 多标的,中等频率 部署定时任务 + 飞书/钉钉告警,午休期降频
机构级 高频全市场,需要 99.9% 可用性 多活部署 + 独立市场日历服务 + 分级告警

6.2 高可用架构

# 机构级部署:多活 + 市场日历服务
class HAMarketMonitor:
    """
    高可用版本:多活部署 + 市场日历服务
    
    架构:
    1. 主备双活 WebSocket 连接
    2. 独立的市场日历服务(避免时钟漂移)
    3. 三级告警:数据异常→连接断开→服务不可用
    """
    
    def __init__(self, api_key: str, symbols: list):
        self.primary = HKMarketMonitor(api_key, symbols, on_data, on_alert)
        self.standby = HKMarketMonitor(api_key, symbols, on_data, on_alert)
        self.calendar_service = "独立部署的市场日历服务"  # 可对接 NTP + 港交所日历

七、结语:让监控系统“理解”市场

回到开篇的场景。如果那位量化开发者在构建监控系统时,预留了市场时段感知能力,凌晨 3 点的告警就不会发生。

这不是一个边缘案例。这是用处理互联网服务的思维,处理金融市场时的必然陷阱。

金融市场的数据流是非连续、非均匀的——有开盘竞价、盘中连续交易、午休静默、收盘竞价、节假日停牌。任何脱离市场日历的监控系统,都会在这套时间机制面前产生误报。

解决路径

  1. 内置市场日历感知:系统需要“知道”当前是交易时段还是休市时段
  2. 差异化超时策略:交易时段严格(10-60秒),休市时段宽松(容忍小时级静默)
  3. 智能重连时机:不在午休期重连,不在收盘后重连
  4. 数据流质量监控:区分“数据缺失”和“连接断开”

TickDB 提供港股全档 depth 频道(10档)和 tick 逐笔数据,配合市场日历感知,你的监控系统将不再被午休绑架。


下一步行动

如果你希望修复现有监控系统的午休误报问题

  1. 访问 tickdb.ai 注册获取免费 API Key(无需信用卡)
  2. 在控制台查看港股 9988.HK、0700.HK 的实时 depth 和 tick 数据
  3. 将本文的 HKMarketCalendar 类集成到你的监控系统中

如果你需要完整的午休期策略工具链(复盘 + 下午预判 + 数据校准):

  • 使用 TickDB /v1/market/kline 接口获取午休前后的 K 线数据
  • 订阅港股 depth 频道,实时监控订单簿结构变化
  • 联系 [email protected] 获取机构级数据方案(含 10 年历史 K 线)

如果你习惯用 AI 辅助开发

在 AI 助手中搜索安装 tickdb-market-data SKILL,让 AI 帮你生成符合本文规范的监控代码。


风险提示:本文不构成任何投资建议。港股午休期间的价格跳空可能剧烈,实际交易中请设置合理的止损规则,并考虑盘前/盘后流动性不足带来的冲击成本。市场有风险,投资需谨慎。