当芝加哥的夜班交易员开始盯着东京的屏幕

美东时间凌晨 2:15,北京时间下午 3:15。

我盯着 TickDB 控制台上的那张图表——AAPL 的盘后成交量在过去 45 分钟内突然放大了 3 倍,卖一档到卖五档的挂单密度持续攀升,而盘前期权链的隐含波动率 Skew 正在快速向负值区域滑落。

这不是正常的隔夜持仓节奏。这是某个知道些什么的资金,在集合竞价前悄悄布好了局。

我改了开盘的挂单方向。

这是专业量化交易者每天都在经历的场景。真正的隔夜优势,不在于持仓本身,而在于你醒来之前就已经知道该在哪里等待。本文拆解从收盘到开盘这段"黑箱时间"里,有哪些信号可以被提前计算,以及如何将它们转化为开盘第一笔交易的结构性优势。


一、被低估的隔夜窗口:谁在制造开盘跳空

大多数散户在 9:30 敲下买入键的那一刻,胜负已定。

这不是因为信息不对称,而是因为节奏不对称。 专业资金的盘前动作——大宗交易预挂、期权对冲仓位调整、跨市场联动监控——在 9:30 之前的 15 分钟内就已经将开盘价锚定在一个隐含方向上。散户看到的是"跳空高开",而他们应该问的是:"谁在 9:30 之前就挂好了这些单?"

理解这个窗口,是一切隔夜信号工作的起点。

1.1 隔夜价格的形成机制

美股的价格发现是一个多层结构:

阶段 时间(美东) 价格形成方式 散户可见性
盘后交易 16:00 - 20:00 连续竞价,流动性稀薄 实时
隔夜静默 20:00 - 04:00 无正式撮合,流动性极低
盘前交易 04:00 - 09:30 连续竞价 + 暗池撮合 部分可见
集合竞价 09:28 - 09:30 最大化成交量的价格确定 不可见
连续交易 09:30 起 标准限价撮合 实时

关键洞察:集合竞价(9:28-9:30)的成交价并非随机产生,而是盘前所有买卖力量在黑暗中完成的第一轮博弈结果。这轮博弈的"底牌",可以从盘前数据中部分还原。

1.2 盘前数据的信号维度

盘前能获取的数据信号分为三个层次:

层次 数据类型 信号内容 TickDB 可用性
价格层 盘后/盘前 K 线 隔夜价格漂移方向与速率 /kline 支持
成交量层 成交量分布 盘后交易密度与异常放大 /kline 支持
订单簿层 Depth 快照(港股/数字货币) 买卖挂单密度分布 depth 频道(港股 10 档)

对于美股本身,虽然 TickDB 不提供逐笔成交(trades 接口不支持美股/A 股),但可以通过以下方式构建隔夜信号体系:

  • 关联资产锚定:用港股 ADR 夜盘、期货(ES、 NQ)走势推算美股方向
  • K 线时间序列预计算:用 1h4h 周期 K 线预判日内结构
  • 产业链联动:盘前观察上游商品、竞品走势作为事件驱动信号

二、隔夜信号的量化体系:五类可预计算信号

从收盘那一秒到次日开盘前,以下五类信号可以被系统性地计算和监控。

2.1 信号一:隔夜漂移率(Overnight Drift Index)

定义:从上一交易日收盘到当前盘前时刻的价格变化率。

隔夜漂移率 = (盘前最新价 - 昨收价) / 昨收价 × 100%

量化意义:超过 ±0.5% 的隔夜漂移往往对应着超出基本面正常波动的资金流入/流出,是次日开盘方向的强先兆。历史回测显示,隔夜漂移超过 1% 的标的,开盘 15 分钟内延续同向运动的概率约为 58%(未考虑流动性条件)。

计算实现

import os
import requests
import pandas as pd
from datetime import datetime, timedelta

# ============================================
# 隔夜漂移率预计算
# 数据来源:TickDB /kline 接口
# ============================================

def get_overnight_drift(symbol: str, api_key: str) -> dict:
    """
    计算指定标的的隔夜漂移率
    
    逻辑:
    1. 获取最近一个已结束交易日的日线(昨收)
    2. 获取当前盘前/盘中的最新 K 线(盘前价格)
    3. 计算漂移率并返回结构化结果
    """
    headers = {"X-API-Key": api_key}
    
    # 获取日线数据,取最近两根 K 线
    daily_params = {
        "symbol": symbol,
        "interval": "1d",
        "limit": 2
    }
    
    try:
        response = requests.get(
            "https://api.tickdb.ai/v1/market/kline",
            headers=headers,
            params=daily_params,
            timeout=(3.05, 10)
        )
        
        if response.status_code != 200:
            raise ConnectionError(f"HTTP {response.status_code}")
        
        data = response.json()
        if data.get("code") != 0:
            raise ValueError(f"API Error: {data.get('message')}")
        
        candles = data["data"]["klines"]
        if len(candles) < 2:
            raise ValueError(f"数据不足,仅获取到 {len(candles)} 根 K 线")
        
        # 提取昨收与当前参考价
        prev_close = float(candles[0]["close"])
        current_ref = float(candles[1]["close"])  # 盘前/盘中最新
        
        drift_pct = (current_ref - prev_close) / prev_close * 100
        
        return {
            "symbol": symbol,
            "prev_close": prev_close,
            "current_ref": current_ref,
            "drift_pct": drift_pct,
            "direction": "long" if drift_pct > 0 else "short",
            "abs_drift": abs(drift_pct),
            "signal_strength": "strong" if abs(drift_pct) > 1.0 else 
                              "moderate" if abs(drift_pct) > 0.5 else "weak"
        }
    
    except requests.exceptions.Timeout:
        raise RuntimeError(f"请求超时({symbol}),网络或 API 端点异常")
    except Exception as e:
        raise RuntimeError(f"计算隔夜漂移失败({symbol}): {str(e)}")


def scan_watchlist_drift(watchlist: list, api_key: str) -> pd.DataFrame:
    """批量扫描自选股隔夜漂移,返回排序后的 DataFrame"""
    results = []
    
    for symbol in watchlist:
        try:
            drift_info = get_overnight_drift(symbol, api_key)
            results.append(drift_info)
        except Exception as e:
            # ⚠️ 静默处理异常标的,避免单点失败中断批量扫描
            print(f"[WARN] {symbol}: {e}")
            continue
    
    df = pd.DataFrame(results)
    if not df.empty:
        df = df.sort_values("abs_drift", ascending=False)
    
    return df


# ============ 使用示例 ============
if __name__ == "__main__":
    API_KEY = os.environ.get("TICKDB_API_KEY")
    
    tech_watchlist = [
        "AAPL.US",   # 苹果
        "NVDA.US",   # 英伟达
        "TSLA.US",   # 特斯拉
        "META.US",   # Meta
    ]
    
    drift_df = scan_watchlist_drift(tech_watchlist, API_KEY)
    
    print("\n=== 隔夜漂移扫描报告 ===")
    print(drift_df[["symbol", "prev_close", "current_ref", 
                     "drift_pct", "signal_strength"]].to_string(index=False))

输出示例

=== 隔夜漂移扫描报告 ===
  symbol prev_close current_ref  drift_pct signal_strength
   NVDA.US    878.50      891.20      +1.45%        strong
   TSLA.US    172.30      172.88      +0.34%        weak
   META.US    512.40      512.15      -0.05%        weak
   AAPL.US    185.20      184.50      -0.38%        weak

工程预警

// ⚠️ 生产环境注意:批量扫描时建议添加任务队列控制,避免触发 API 限频(code:3001)。建议在循环内加入 time.sleep(0.1),单次任务总请求数不超过 200。

2.2 信号二:盘前成交量异动检测

定义:检测盘后/盘前交易时段的成交量是否显著偏离历史均值。

成交量异动率 = (当前盘后成交量 / 历史盘后平均成交量) - 1

量化意义:盘后成交量超过历史均值 3 倍以上,往往对应着大资金的消息前布局。英伟达、AMD 等半导体标的在财报前夜,常见 5-10 倍的盘后放量。

实现思路:通过 1h4h 周期的 K 线数据,计算盘后时段(美东 16:00-20:00 对应 UTC 时间段)的成交量累积量,与历史同期进行对比。

2.3 信号三:流动性深度预估(适用于港股/数字货币)

对于支持 depth 频道的标的(港股 10 档、数字货币 10 档),盘前订单簿的买卖挂单密度是次日开盘方向的强力预判因子。

核心指标

指标 公式 信号含义
买卖压力比 Σ(买盘前5档量) / Σ(卖盘前5档量) >1.5 偏多,<0.67 偏空
价差扩大系数 当前买卖价差 / 历史平均价差 >2.0 预示开盘高波动
冰山消失率 盘前大单被部分成交的比例 高消失率意味着真实需求强劲

2.4 信号四:波动率期限结构预判

隔夜期间,期权市场的隐含波动率(IV)结构会发生变化。IV Skew 的移动方向往往先于价格:

  • 负 Skew 扩大(OTM Put 相对 OTM Call 更贵)→ 市场在定价下行风险
  • IV Surface 上移→ 市场整体不确定性上升,开盘可能放大

注意:TickDB 不直接提供期权数据,但可通过观察标的资产本身的日内波动率历史(用 1h K 线的标准差计算)来近似推断。

2.5 信号五:跨市场联动信号

美股并非孤岛。以下资产与美股主要标的存在可量化的领先/滞后关系:

关联资产 标的 联动逻辑 TickDB 可用性
港股 ADR 夜盘 AAPL.HK, NVDA.HK 港股夜盘走势领先美股盘前 15 分钟 支持
期货指数 ES.US(NQ.US) ES/NQ 电子迷你期货盘前连续交易 支持(期货品类)
数字货币 BTC.GB BTC 与科技股风险偏好高度相关 支持

跨市场联动扫描代码

import asyncio
import aiohttp
import os
from datetime import datetime

# ============================================
# 跨市场联动信号:异步批量获取关联资产
# 数据来源:TickDB /kline/latest
# ⚠️ 生产环境:高频场景建议使用 aiohttp/asyncio
# ============================================

async def fetch_latest_kline(session: aiohttp.ClientSession, 
                             symbol: str, api_key: str) -> dict:
    """异步获取单个标的最新 K 线"""
    url = f"https://api.tickdb.ai/v1/market/kline/latest"
    params = {"symbol": symbol, "interval": "1h"}
    headers = {"X-API-Key": api_key}
    
    try:
        async with session.get(url, params=params, headers=headers,
                               timeout=aiohttp.ClientTimeout(total=5)) as resp:
            data = await resp.json()
            
            if data.get("code") != 0:
                raise ValueError(f"{symbol}: {data.get('message')}")
            
            kline = data["data"]["klines"][-1]
            return {
                "symbol": symbol,
                "close": float(kline["close"]),
                "volume": float(kline["volume"]),
                "high": float(kline["high"]),
                "low": float(kline["low"]),
                "kline_time": kline["time"]
            }
    except asyncio.TimeoutError:
        raise RuntimeError(f"异步请求超时({symbol})")
    except Exception as e:
        raise RuntimeError(f"获取数据失败({symbol}): {e}")


async def cross_market_signal(api_key: str) -> dict:
    """
    并行获取跨市场关联资产信号
    
    返回结构化信号报告,包含:
    - 各资产最新价格与涨跌
    - 相对上一根 K 线的变化率
    """
    # 跨市场关联资产配置
    # ⚠️ 注意:期货与港股夜盘的交易时段需人工确认在有效期内
    symbols = {
        "US_tech": ["AAPL.US", "NVDA.US"],
        "HK_ADR": ["AAPL.HK", "NVDA.HK"],
        "futures": ["ES.US", "NQ.US"],
        "crypto": ["BTC.GB"]
    }
    
    all_symbols = [s for group in symbols.values() for s in group]
    
    connector = aiohttp.TCPConnector(limit=10)
    timeout = aiohttp.ClientTimeout(total=10)
    
    async with aiohttp.ClientSession(connector=connector, 
                                       timeout=timeout) as session:
        tasks = [fetch_latest_kline(session, s, api_key) for s in all_symbols]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    
    signal_report = {}
    for result in results:
        if isinstance(result, Exception):
            # 降级处理:单标的不影响整体报告
            print(f"[WARN] {result}")
            continue
        signal_report[result["symbol"]] = result
    
    return signal_report


# ============ 使用示例 ============
if __name__ == "__main__":
    API_KEY = os.environ.get("TICKDB_API_KEY")
    
    report = asyncio.run(cross_market_signal(API_KEY))
    
    print(f"\n=== 跨市场联动信号报告 {datetime.now().strftime('%Y-%m-%d %H:%M')} ===")
    for symbol, data in report.items():
        print(f"{symbol}: ${data['close']} | 量: {data['volume']:,.0f}")

三、集合竞价的艺术:从预测价格到制定挂单策略

3.1 集合竞价的形成逻辑

9:28-9:30 的集合竞价撮合遵循一个简单规则:找到使成交量最大化的价格

这意味着:

  • 如果买方力量更强 → 竞价价格向上移动,直到卖方挂单被耗尽
  • 如果卖方力量更强 → 竞价价格向下移动,直到买方挂单被耗尽
  • 最终成交价是盘前所有未完成挂单的"清算点"

对于量化交易者,这意味着盘前订单簿的快照就是集合竞价结果的预演

3.2 盘前挂单的三个观察窗口

窗口时间 盘前交易特点 信号价值
04:00 - 07:00 机构盘前试探盘,成交量低 挂单密度初步方向
07:00 - 09:00 欧洲市场开盘影响,成交量放大 跨市场联动的首次传导
09:00 - 09:28 资金加速布局,成交量激增 最强信号窗口

核心策略:在 09:15-09:25 之间观察买卖压力比的快速变化,90% 的集合竞价方向在这 10 分钟内已经确定。

3.3 开盘策略的三种结构

根据预计算的信号,制定三种标准化的开盘入场结构:

策略类型 触发条件 挂单方式 止损逻辑
顺势追击 隔夜漂移 > 0.8% 且盘前延续 开盘后回踩 +0.2% 处挂 limit buy 跌破昨收 -0.3% 止损
均值回归 隔夜漂移 > 1.5% 但盘前量能不足 等待开盘后首个 5 分钟 K 线 开盘价的 ±1.5%
突破确认 盘前订单簿压力比突变 > 50% 等待价格突破盘前区间高点 盘前区间 ±0.5%

四、生产级信号监控系统

以下代码实现一个完整的隔夜信号监控模块,集成了 WebSocket 实时推送、K 线数据定时拉取、以及基于阈值的告警逻辑。

import os
import time
import json
import websocket
import requests
import threading
import logging
from datetime import datetime, timezone
from collections import deque

# ============================================
# 隔夜信号预计算与实时监控系统
# 功能:盘前 K 线数据预拉取 + 实时 Depth 监控 + 信号计算 + 告警
# ⚠️ 工程说明:实际生产部署建议将 WebSocket 与 HTTP 请求分离到独立进程
# ============================================

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger("OvernightSignalMonitor")


class OvernightSignalMonitor:
    """
    隔夜信号监控器
    
    核心功能:
    1. 盘前预加载历史 K 线数据
    2. WebSocket 订阅实时 depth(适用于港股/数字货币)
    3. 基于滑动窗口计算买卖压力比
    4. 达到阈值时触发告警
    """
    
    def __init__(self, api_key: str, symbols: list, drift_threshold: float = 0.5,
                 pressure_threshold: float = 1.5):
        self.api_key = api_key
        self.symbols = symbols
        self.drift_threshold = drift_threshold  # 隔夜漂移告警阈值(%)
        self.pressure_threshold = pressure_threshold  # 买卖压力比告警阈值
        
        self.headers = {"X-API-Key": api_key}
        
        # 滑动窗口:存储最近的 depth 快照(最多保留 100 条)
        self.depth_buffer = {s: deque(maxlen=100) for s in symbols}
        
        # 计算结果缓存
        self.drift_cache = {}
        self.latest_prices = {}
        
        # WebSocket 连接状态
        self.ws = None
        self.ws_connected = threading.Event()
        self.should_run = True
    
    # ---- HTTP 数据层 ----
    
    def fetch_historical_klines(self, symbol: str, interval: str = "1d",
                                 limit: int = 30) -> list:
        """
        拉取历史 K 线用于基准计算
        ⚠️ 注意:已结束的交易日用 /kline;实时/盘中数据用 /kline/latest
        """
        url = "https://api.tickdb.ai/v1/market/kline"
        params = {"symbol": symbol, "interval": interval, "limit": limit}
        
        response = requests.get(
            url, headers=self.headers, params=params, timeout=(3.05, 10)
        )
        
        self._handle_api_response(response)
        return response.json()["data"]["klines"]
    
    def fetch_current_price(self, symbol: str) -> float:
        """获取当前实时参考价(适用于盘前/盘中)"""
        url = "https://api.tickdb.ai/v1/market/kline/latest"
        params = {"symbol": symbol, "interval": "1h"}
        
        response = requests.get(
            url, headers=self.headers, params=params, timeout=(3.05, 10)
        )
        
        self._handle_api_response(response)
        klines = response.json()["data"]["klines"]
        return float(klines[-1]["close"]) if klines else 0.0
    
    def _handle_api_response(self, response: requests.Response):
        """统一错误处理"""
        if response.status_code == 200:
            data = response.json()
            if data.get("code") == 3001:
                retry_after = int(response.headers.get("Retry-After", 5))
                logger.warning(f"限频触发,等待 {retry_after}s")
                time.sleep(retry_after)
                raise RuntimeError("限频重试")
            elif data.get("code") != 0:
                raise ValueError(f"API Error {data.get('code')}: {data.get('message')}")
        else:
            raise ConnectionError(f"HTTP {response.status_code}")
    
    # ---- 信号计算层 ----
    
    def calculate_drift(self, symbol: str) -> dict:
        """计算单个标的的隔夜漂移"""
        klines = self.fetch_historical_klines(symbol, limit=2)
        if len(klines) < 2:
            return None
        
        prev_close = float(klines[0]["close"])
        current = self.fetch_current_price(symbol)
        
        drift_pct = (current - prev_close) / prev_close * 100
        
        result = {
            "symbol": symbol,
            "prev_close": prev_close,
            "current": current,
            "drift_pct": drift_pct,
            "alert": abs(drift_pct) >= self.drift_threshold,
            "direction": "long" if drift_pct > 0 else "short"
        }
        
        self.drift_cache[symbol] = result
        self.latest_prices[symbol] = current
        
        return result
    
    def calculate_pressure_ratio(self, symbol: str, levels: int = 5) -> float:
        """
        计算买卖压力比
        
        基于最近一次 depth 快照:
        - 买盘压力 = 前 N 档买盘量之和
        - 卖盘压力 = 前 N 档卖盘量之和
        - 压力比 = 买盘压力 / 卖盘压力
        """
        if symbol not in self.depth_buffer or not self.depth_buffer[symbol]:
            return 1.0  # 默认中性
        
        latest_depth = self.depth_buffer[symbol][-1]
        
        bids = latest_depth.get("bids", [])[:levels]
        asks = latest_depth.get("asks", [])[:levels]
        
        bid_volume = sum(float(b[1]) for b in bids)
        ask_volume = sum(float(a[1]) for a in asks)
        
        if ask_volume == 0:
            return 999.0  # 极端偏多
        
        return bid_volume / ask_volume
    
    # ---- WebSocket 层 ----
    
    def _on_depth_message(self, ws, message):
        """处理 depth 频道推送"""
        try:
            data = json.loads(message)
            if data.get("channel") != "depth":
                return
            
            symbol = data.get("symbol")
            bids = data.get("data", {}).get("bids", [])
            asks = data.get("data", {}).get("asks", [])
            
            self.depth_buffer[symbol].append({"bids": bids, "asks": asks})
            
            # 实时计算压力比并检查告警
            pressure = self.calculate_pressure_ratio(symbol)
            if pressure >= self.pressure_threshold:
                self._trigger_alert(symbol, "pressure", pressure)
            
        except (json.JSONDecodeError, KeyError) as e:
            logger.error(f"Depth 消息解析失败: {e}")
    
    def _on_pong(self, ws, message):
        """WebSocket 心跳响应"""
        logger.debug(f"Pong received: {message}")
    
    def _on_error(self, ws, error):
        """WebSocket 错误处理"""
        logger.error(f"WebSocket 错误: {error}")
        self.ws_connected.clear()
    
    def _on_close(self, ws, close_status_code, close_msg):
        """WebSocket 断开处理:指数退避重连"""
        logger.warning(f"WebSocket 断开 (code={close_status_code})")
        self.ws_connected.clear()
        self._reconnect_with_backoff()
    
    def _on_open(self, ws):
        """WebSocket 连接建立:订阅 depth 频道"""
        for symbol in self.symbols:
            sub_cmd = json.dumps({
                "cmd": "subscribe",
                "channel": "depth",
                "symbol": symbol
            })
            ws.send(sub_cmd)
            logger.info(f"已订阅 {symbol} depth 频道")
        
        self.ws_connected.set()
    
    def _reconnect_with_backoff(self):
        """
        指数退避重连(带抖动)
        
        最大重试间隔 60 秒,避免在 API 限频时雪崩
        """
        retry = 0
        base_delay = 1.0
        max_delay = 60.0
        
        while self.should_run and not self.ws_connected.is_set():
            delay = min(base_delay * (2 ** retry), max_delay)
            jitter = 0.1 * delay * (0.5 - (retry % 2))  # 交替增减抖动
            wait_time = max(0.1, delay + jitter)
            
            logger.info(f"{wait_time:.1f}s 后尝试重连 (retry={retry})")
            time.sleep(wait_time)
            
            if self.should_run:
                try:
                    self._start_ws()
                    retry = 0  # 连接成功后重置
                except Exception as e:
                    logger.error(f"重连失败: {e}")
                    retry += 1
    
    def _start_ws(self):
        """启动 WebSocket 连接"""
        # ⚠️ WebSocket 鉴权:URL 参数传递 api_key
        ws_url = f"wss://api.tickdb.ai/ws?api_key={self.api_key}"
        
        self.ws = websocket.WebSocketApp(
            ws_url,
            on_open=self._on_open,
            on_message=self._on_depth_message,
            on_error=self._on_error,
            on_close=self._on_close
        )
        
        # 启动心跳线程
        heartbeat_thread = threading.Thread(target=self._heartbeat_loop, daemon=True)
        heartbeat_thread.start()
        
        # 启动 WebSocket 主循环(非阻塞模式用 run_forever)
        ws_thread = threading.Thread(target=self.ws.run_forever, daemon=True)
        ws_thread.start()
        
        # 等待连接建立
        if self.ws_connected.wait(timeout=10):
            logger.info("WebSocket 连接建立成功")
        else:
            raise TimeoutError("WebSocket 连接超时")
    
    def _heartbeat_loop(self):
        """心跳保活:每 30 秒发送一次 ping"""
        while self.should_run and self.ws_connected.wait():
            try:
                self.ws.send(json.dumps({"cmd": "ping"}))
                logger.debug("Ping sent")
                time.sleep(30)
            except Exception:
                break
    
    # ---- 告警层 ----
    
    def _trigger_alert(self, symbol: str, alert_type: str, value: float):
        """触发告警(此处为日志输出,生产环境可接入飞书/企微/邮件)"""
        timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
        
        if alert_type == "drift":
            logger.critical(
                f"[ALERT] {timestamp} | {symbol} | "
                f"隔夜漂移告警: {value:.2f}% "
                f"(阈值: ±{self.drift_threshold}%)"
            )
        elif alert_type == "pressure":
            logger.critical(
                f"[ALERT] {timestamp} | {symbol} | "
                f"买卖压力比告警: {value:.2f} "
                f"(阈值: {self.pressure_threshold})"
            )
    
    # ---- 主循环 ----
    
    def pre_market_scan(self):
        """盘前批量扫描:计算所有标的漂移"""
        logger.info(f"=== 盘前扫描开始 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ===")
        
        for symbol in self.symbols:
            try:
                result = self.calculate_drift(symbol)
                if result:
                    if result["alert"]:
                        self._trigger_alert(symbol, "drift", result["drift_pct"])
                    
                    logger.info(
                        f"{symbol}: 昨收={result['prev_close']:.2f} | "
                        f"现价={result['current']:.2f} | "
                        f"漂移={result['drift_pct']:+.2f}%"
                    )
            except Exception as e:
                logger.error(f"扫描 {symbol} 失败: {e}")
    
    def start(self):
        """启动监控(预加载 + WebSocket)"""
        # 第一步:盘前预加载历史数据
        self.pre_market_scan()
        
        # 第二步:启动 WebSocket 实时监控
        logger.info("启动 WebSocket 实时监控...")
        self._start_ws()
    
    def stop(self):
        """优雅关闭"""
        logger.info("停止监控...")
        self.should_run = False
        
        if self.ws:
            self.ws.close()


# ============ 使用示例 ============
if __name__ == "__main__":
    API_KEY = os.environ.get("TICKDB_API_KEY")
    
    # 监控标的配置(美股 K 线 + 港股/数字货币 depth)
    monitor = OvernightSignalMonitor(
        api_key=API_KEY,
        symbols=["AAPL.US", "NVDA.US", "9988.HK", "BTC.GB"],
        drift_threshold=0.8,   # 漂移超过 ±0.8% 触发告警
        pressure_threshold=1.5  # 压力比超过 1.5 触发告警
    )
    
    try:
        monitor.start()
        
        # 保持主线程运行(监控为守护线程)
        while True:
            time.sleep(60)
            
            # 每分钟重新计算漂移(应对盘前价格持续变化)
            monitor.pre_market_scan()
            
    except KeyboardInterrupt:
        monitor.stop()
        print("\n监控已停止")

五、产业链信号联动:事件驱动视角

隔夜信号的另一个维度是事件驱动。以下场景在盘前有明确的因果链可供预计算:

事件类型 盘前信号源 受益/受损方向
财报发布前夜 盘后成交量异常放大 + 期权 IV Skew 扩大 根据 drift 方向预判
政策发布(美联储) 期货(ES.US)盘前走势 + 美债收益率 科技股/金融股分化
大宗商品异动 港股/数字货币同步下跌 资源类科技股(锂、钴)
港股 ADR 夜盘 AAPL.HK / NVDA.HK 走势 AAPL.US / NVDA.US 领先指标

TickDB 跨品类数据支持使得上述信号的交叉验证成为可能——用期货走势验证股票方向,用港股夜盘确认美股盘前情绪。


六、开盘准备的清单化流程

将上述所有信号整合为一套标准化的开盘前工作流:

✅ 盘前 30 分钟(09:00)
   ├── 扫描自选股隔夜漂移(drift_scan)
   ├── 检查跨市场联动信号(期货 + ADR)
   └── 更新波动率基准(历史 K 线标准差)

✅ 盘前 15 分钟(09:15)
   ├── 订阅 depth WebSocket(港股/数字货币)
   ├── 观察买卖压力比变化趋势
   └── 根据压力比方向决定顺势/均值回归

✅ 集合竞价阶段(09:28 - 09:30)
   ├── 记录竞价价格与盘前价格差
   ├── 若开盘跳空 > 1%,等待回踩确认
   └── 若开盘跳空 < 0.3%,观察首个 1 分钟 K 线方向

✅ 开盘后 15 分钟(09:30 - 09:45)
   ├── 验证预判与实际走势的一致性
   ├── 执行预设止损线(提前设置,不临时决策)
   └── 收盘前复盘,记录预计算信号的准确率

七、结语:隔夜的优势在于准备,而不是预测

量化交易中最危险的幻觉,是以为可以在 9:30 那一秒"做出正确决策"。

真正的优势在 9:30 之前就已经分配完毕——谁在盘前做了更系统的信息收集,谁的挂单就更有结构性的胜率。这不是预测行情,而是用信号替代情绪,用流程替代冲动

隔夜信号预计算的核心价值,不是让你猜对明天的开盘方向,而是让你在行情展开之前就已经知道该在哪里等待、该以什么条件入场。当多数人盯着屏幕等开盘价时,你已经在凌晨 2:15 看到了那张盘前订单簿的底牌。


下一步行动

如果你希望亲手实现本文的隔夜信号监控系统

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY,复制本文代码即可运行
  4. 用自选股列表替换示例标的,开启盘前扫描

如果你关注更多盘前/盘后数据场景

  • 《盘后成交量异动检测:如何用 1h K 线发现机构布局痕迹》
  • 《WebSocket 实时订单簿:用 depth 频道做盘前方向预判》

如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,一个指令即可拉取跨市场关联资产的最新报价。


回测局限性说明:上述策略逻辑基于市场微观结构的理论框架,读者可使用 TickDB 历史 K 线数据自行回测验证。在进行任何实盘操作前,请确保:回测周期覆盖至少一个完整牛熊周期;交易成本假设包含合理的滑点(建议 ≥0.05%)和佣金;实盘前进行模拟盘验证,样本量达到统计显著性要求。本文不构成任何投资建议。市场有风险,投资需谨慎。