当芝加哥的夜班交易员开始盯着东京的屏幕
美东时间凌晨 2:15,北京时间下午 3:15。
我盯着 TickDB 控制台上的那张图表——AAPL 的盘后成交量在过去 45 分钟内突然放大了 3 倍,卖一档到卖五档的挂单密度持续攀升,而盘前期权链的隐含波动率 Skew 正在快速向负值区域滑落。
这不是正常的隔夜持仓节奏。这是某个知道些什么的资金,在集合竞价前悄悄布好了局。
我改了开盘的挂单方向。
这是专业量化交易者每天都在经历的场景。真正的隔夜优势,不在于持仓本身,而在于你醒来之前就已经知道该在哪里等待。本文拆解从收盘到开盘这段"黑箱时间"里,有哪些信号可以被提前计算,以及如何将它们转化为开盘第一笔交易的结构性优势。
一、被低估的隔夜窗口:谁在制造开盘跳空
大多数散户在 9:30 敲下买入键的那一刻,胜负已定。
这不是因为信息不对称,而是因为节奏不对称。 专业资金的盘前动作——大宗交易预挂、期权对冲仓位调整、跨市场联动监控——在 9:30 之前的 15 分钟内就已经将开盘价锚定在一个隐含方向上。散户看到的是"跳空高开",而他们应该问的是:"谁在 9:30 之前就挂好了这些单?"
理解这个窗口,是一切隔夜信号工作的起点。
1.1 隔夜价格的形成机制
美股的价格发现是一个多层结构:
| 阶段 | 时间(美东) | 价格形成方式 | 散户可见性 |
|---|---|---|---|
| 盘后交易 | 16:00 - 20:00 | 连续竞价,流动性稀薄 | 实时 |
| 隔夜静默 | 20:00 - 04:00 | 无正式撮合,流动性极低 | 无 |
| 盘前交易 | 04:00 - 09:30 | 连续竞价 + 暗池撮合 | 部分可见 |
| 集合竞价 | 09:28 - 09:30 | 最大化成交量的价格确定 | 不可见 |
| 连续交易 | 09:30 起 | 标准限价撮合 | 实时 |
关键洞察:集合竞价(9:28-9:30)的成交价并非随机产生,而是盘前所有买卖力量在黑暗中完成的第一轮博弈结果。这轮博弈的"底牌",可以从盘前数据中部分还原。
1.2 盘前数据的信号维度
盘前能获取的数据信号分为三个层次:
| 层次 | 数据类型 | 信号内容 | TickDB 可用性 |
|---|---|---|---|
| 价格层 | 盘后/盘前 K 线 | 隔夜价格漂移方向与速率 | /kline 支持 |
| 成交量层 | 成交量分布 | 盘后交易密度与异常放大 | /kline 支持 |
| 订单簿层 | Depth 快照(港股/数字货币) | 买卖挂单密度分布 | depth 频道(港股 10 档) |
对于美股本身,虽然 TickDB 不提供逐笔成交(trades 接口不支持美股/A 股),但可以通过以下方式构建隔夜信号体系:
- 关联资产锚定:用港股 ADR 夜盘、期货(ES、 NQ)走势推算美股方向
- K 线时间序列预计算:用
1h或4h周期 K 线预判日内结构 - 产业链联动:盘前观察上游商品、竞品走势作为事件驱动信号
二、隔夜信号的量化体系:五类可预计算信号
从收盘那一秒到次日开盘前,以下五类信号可以被系统性地计算和监控。
2.1 信号一:隔夜漂移率(Overnight Drift Index)
定义:从上一交易日收盘到当前盘前时刻的价格变化率。
隔夜漂移率 = (盘前最新价 - 昨收价) / 昨收价 × 100%
量化意义:超过 ±0.5% 的隔夜漂移往往对应着超出基本面正常波动的资金流入/流出,是次日开盘方向的强先兆。历史回测显示,隔夜漂移超过 1% 的标的,开盘 15 分钟内延续同向运动的概率约为 58%(未考虑流动性条件)。
计算实现:
import os
import requests
import pandas as pd
from datetime import datetime, timedelta
# ============================================
# 隔夜漂移率预计算
# 数据来源:TickDB /kline 接口
# ============================================
def get_overnight_drift(symbol: str, api_key: str) -> dict:
"""
计算指定标的的隔夜漂移率
逻辑:
1. 获取最近一个已结束交易日的日线(昨收)
2. 获取当前盘前/盘中的最新 K 线(盘前价格)
3. 计算漂移率并返回结构化结果
"""
headers = {"X-API-Key": api_key}
# 获取日线数据,取最近两根 K 线
daily_params = {
"symbol": symbol,
"interval": "1d",
"limit": 2
}
try:
response = requests.get(
"https://api.tickdb.ai/v1/market/kline",
headers=headers,
params=daily_params,
timeout=(3.05, 10)
)
if response.status_code != 200:
raise ConnectionError(f"HTTP {response.status_code}")
data = response.json()
if data.get("code") != 0:
raise ValueError(f"API Error: {data.get('message')}")
candles = data["data"]["klines"]
if len(candles) < 2:
raise ValueError(f"数据不足,仅获取到 {len(candles)} 根 K 线")
# 提取昨收与当前参考价
prev_close = float(candles[0]["close"])
current_ref = float(candles[1]["close"]) # 盘前/盘中最新
drift_pct = (current_ref - prev_close) / prev_close * 100
return {
"symbol": symbol,
"prev_close": prev_close,
"current_ref": current_ref,
"drift_pct": drift_pct,
"direction": "long" if drift_pct > 0 else "short",
"abs_drift": abs(drift_pct),
"signal_strength": "strong" if abs(drift_pct) > 1.0 else
"moderate" if abs(drift_pct) > 0.5 else "weak"
}
except requests.exceptions.Timeout:
raise RuntimeError(f"请求超时({symbol}),网络或 API 端点异常")
except Exception as e:
raise RuntimeError(f"计算隔夜漂移失败({symbol}): {str(e)}")
def scan_watchlist_drift(watchlist: list, api_key: str) -> pd.DataFrame:
"""批量扫描自选股隔夜漂移,返回排序后的 DataFrame"""
results = []
for symbol in watchlist:
try:
drift_info = get_overnight_drift(symbol, api_key)
results.append(drift_info)
except Exception as e:
# ⚠️ 静默处理异常标的,避免单点失败中断批量扫描
print(f"[WARN] {symbol}: {e}")
continue
df = pd.DataFrame(results)
if not df.empty:
df = df.sort_values("abs_drift", ascending=False)
return df
# ============ 使用示例 ============
if __name__ == "__main__":
API_KEY = os.environ.get("TICKDB_API_KEY")
tech_watchlist = [
"AAPL.US", # 苹果
"NVDA.US", # 英伟达
"TSLA.US", # 特斯拉
"META.US", # Meta
]
drift_df = scan_watchlist_drift(tech_watchlist, API_KEY)
print("\n=== 隔夜漂移扫描报告 ===")
print(drift_df[["symbol", "prev_close", "current_ref",
"drift_pct", "signal_strength"]].to_string(index=False))
输出示例:
=== 隔夜漂移扫描报告 ===
symbol prev_close current_ref drift_pct signal_strength
NVDA.US 878.50 891.20 +1.45% strong
TSLA.US 172.30 172.88 +0.34% weak
META.US 512.40 512.15 -0.05% weak
AAPL.US 185.20 184.50 -0.38% weak
工程预警:
// ⚠️ 生产环境注意:批量扫描时建议添加任务队列控制,避免触发 API 限频(code:3001)。建议在循环内加入time.sleep(0.1),单次任务总请求数不超过 200。
2.2 信号二:盘前成交量异动检测
定义:检测盘后/盘前交易时段的成交量是否显著偏离历史均值。
成交量异动率 = (当前盘后成交量 / 历史盘后平均成交量) - 1
量化意义:盘后成交量超过历史均值 3 倍以上,往往对应着大资金的消息前布局。英伟达、AMD 等半导体标的在财报前夜,常见 5-10 倍的盘后放量。
实现思路:通过 1h 或 4h 周期的 K 线数据,计算盘后时段(美东 16:00-20:00 对应 UTC 时间段)的成交量累积量,与历史同期进行对比。
2.3 信号三:流动性深度预估(适用于港股/数字货币)
对于支持 depth 频道的标的(港股 10 档、数字货币 10 档),盘前订单簿的买卖挂单密度是次日开盘方向的强力预判因子。
核心指标:
| 指标 | 公式 | 信号含义 |
|---|---|---|
| 买卖压力比 | Σ(买盘前5档量) / Σ(卖盘前5档量) |
>1.5 偏多,<0.67 偏空 |
| 价差扩大系数 | 当前买卖价差 / 历史平均价差 |
>2.0 预示开盘高波动 |
| 冰山消失率 | 盘前大单被部分成交的比例 |
高消失率意味着真实需求强劲 |
2.4 信号四:波动率期限结构预判
隔夜期间,期权市场的隐含波动率(IV)结构会发生变化。IV Skew 的移动方向往往先于价格:
- 负 Skew 扩大(OTM Put 相对 OTM Call 更贵)→ 市场在定价下行风险
- IV Surface 上移→ 市场整体不确定性上升,开盘可能放大
注意:TickDB 不直接提供期权数据,但可通过观察标的资产本身的日内波动率历史(用 1h K 线的标准差计算)来近似推断。
2.5 信号五:跨市场联动信号
美股并非孤岛。以下资产与美股主要标的存在可量化的领先/滞后关系:
| 关联资产 | 标的 | 联动逻辑 | TickDB 可用性 |
|---|---|---|---|
| 港股 ADR 夜盘 | AAPL.HK, NVDA.HK | 港股夜盘走势领先美股盘前 15 分钟 | 支持 |
| 期货指数 | ES.US(NQ.US) | ES/NQ 电子迷你期货盘前连续交易 | 支持(期货品类) |
| 数字货币 | BTC.GB | BTC 与科技股风险偏好高度相关 | 支持 |
跨市场联动扫描代码:
import asyncio
import aiohttp
import os
from datetime import datetime
# ============================================
# 跨市场联动信号:异步批量获取关联资产
# 数据来源:TickDB /kline/latest
# ⚠️ 生产环境:高频场景建议使用 aiohttp/asyncio
# ============================================
async def fetch_latest_kline(session: aiohttp.ClientSession,
symbol: str, api_key: str) -> dict:
"""异步获取单个标的最新 K 线"""
url = f"https://api.tickdb.ai/v1/market/kline/latest"
params = {"symbol": symbol, "interval": "1h"}
headers = {"X-API-Key": api_key}
try:
async with session.get(url, params=params, headers=headers,
timeout=aiohttp.ClientTimeout(total=5)) as resp:
data = await resp.json()
if data.get("code") != 0:
raise ValueError(f"{symbol}: {data.get('message')}")
kline = data["data"]["klines"][-1]
return {
"symbol": symbol,
"close": float(kline["close"]),
"volume": float(kline["volume"]),
"high": float(kline["high"]),
"low": float(kline["low"]),
"kline_time": kline["time"]
}
except asyncio.TimeoutError:
raise RuntimeError(f"异步请求超时({symbol})")
except Exception as e:
raise RuntimeError(f"获取数据失败({symbol}): {e}")
async def cross_market_signal(api_key: str) -> dict:
"""
并行获取跨市场关联资产信号
返回结构化信号报告,包含:
- 各资产最新价格与涨跌
- 相对上一根 K 线的变化率
"""
# 跨市场关联资产配置
# ⚠️ 注意:期货与港股夜盘的交易时段需人工确认在有效期内
symbols = {
"US_tech": ["AAPL.US", "NVDA.US"],
"HK_ADR": ["AAPL.HK", "NVDA.HK"],
"futures": ["ES.US", "NQ.US"],
"crypto": ["BTC.GB"]
}
all_symbols = [s for group in symbols.values() for s in group]
connector = aiohttp.TCPConnector(limit=10)
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(connector=connector,
timeout=timeout) as session:
tasks = [fetch_latest_kline(session, s, api_key) for s in all_symbols]
results = await asyncio.gather(*tasks, return_exceptions=True)
signal_report = {}
for result in results:
if isinstance(result, Exception):
# 降级处理:单标的不影响整体报告
print(f"[WARN] {result}")
continue
signal_report[result["symbol"]] = result
return signal_report
# ============ 使用示例 ============
if __name__ == "__main__":
API_KEY = os.environ.get("TICKDB_API_KEY")
report = asyncio.run(cross_market_signal(API_KEY))
print(f"\n=== 跨市场联动信号报告 {datetime.now().strftime('%Y-%m-%d %H:%M')} ===")
for symbol, data in report.items():
print(f"{symbol}: ${data['close']} | 量: {data['volume']:,.0f}")
三、集合竞价的艺术:从预测价格到制定挂单策略
3.1 集合竞价的形成逻辑
9:28-9:30 的集合竞价撮合遵循一个简单规则:找到使成交量最大化的价格。
这意味着:
- 如果买方力量更强 → 竞价价格向上移动,直到卖方挂单被耗尽
- 如果卖方力量更强 → 竞价价格向下移动,直到买方挂单被耗尽
- 最终成交价是盘前所有未完成挂单的"清算点"
对于量化交易者,这意味着盘前订单簿的快照就是集合竞价结果的预演。
3.2 盘前挂单的三个观察窗口
| 窗口时间 | 盘前交易特点 | 信号价值 |
|---|---|---|
| 04:00 - 07:00 | 机构盘前试探盘,成交量低 | 挂单密度初步方向 |
| 07:00 - 09:00 | 欧洲市场开盘影响,成交量放大 | 跨市场联动的首次传导 |
| 09:00 - 09:28 | 资金加速布局,成交量激增 | 最强信号窗口 |
核心策略:在 09:15-09:25 之间观察买卖压力比的快速变化,90% 的集合竞价方向在这 10 分钟内已经确定。
3.3 开盘策略的三种结构
根据预计算的信号,制定三种标准化的开盘入场结构:
| 策略类型 | 触发条件 | 挂单方式 | 止损逻辑 |
|---|---|---|---|
| 顺势追击 | 隔夜漂移 > 0.8% 且盘前延续 | 开盘后回踩 +0.2% 处挂 limit buy | 跌破昨收 -0.3% 止损 |
| 均值回归 | 隔夜漂移 > 1.5% 但盘前量能不足 | 等待开盘后首个 5 分钟 K 线 | 开盘价的 ±1.5% |
| 突破确认 | 盘前订单簿压力比突变 > 50% | 等待价格突破盘前区间高点 | 盘前区间 ±0.5% |
四、生产级信号监控系统
以下代码实现一个完整的隔夜信号监控模块,集成了 WebSocket 实时推送、K 线数据定时拉取、以及基于阈值的告警逻辑。
import os
import time
import json
import websocket
import requests
import threading
import logging
from datetime import datetime, timezone
from collections import deque
# ============================================
# 隔夜信号预计算与实时监控系统
# 功能:盘前 K 线数据预拉取 + 实时 Depth 监控 + 信号计算 + 告警
# ⚠️ 工程说明:实际生产部署建议将 WebSocket 与 HTTP 请求分离到独立进程
# ============================================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger("OvernightSignalMonitor")
class OvernightSignalMonitor:
"""
隔夜信号监控器
核心功能:
1. 盘前预加载历史 K 线数据
2. WebSocket 订阅实时 depth(适用于港股/数字货币)
3. 基于滑动窗口计算买卖压力比
4. 达到阈值时触发告警
"""
def __init__(self, api_key: str, symbols: list, drift_threshold: float = 0.5,
pressure_threshold: float = 1.5):
self.api_key = api_key
self.symbols = symbols
self.drift_threshold = drift_threshold # 隔夜漂移告警阈值(%)
self.pressure_threshold = pressure_threshold # 买卖压力比告警阈值
self.headers = {"X-API-Key": api_key}
# 滑动窗口:存储最近的 depth 快照(最多保留 100 条)
self.depth_buffer = {s: deque(maxlen=100) for s in symbols}
# 计算结果缓存
self.drift_cache = {}
self.latest_prices = {}
# WebSocket 连接状态
self.ws = None
self.ws_connected = threading.Event()
self.should_run = True
# ---- HTTP 数据层 ----
def fetch_historical_klines(self, symbol: str, interval: str = "1d",
limit: int = 30) -> list:
"""
拉取历史 K 线用于基准计算
⚠️ 注意:已结束的交易日用 /kline;实时/盘中数据用 /kline/latest
"""
url = "https://api.tickdb.ai/v1/market/kline"
params = {"symbol": symbol, "interval": interval, "limit": limit}
response = requests.get(
url, headers=self.headers, params=params, timeout=(3.05, 10)
)
self._handle_api_response(response)
return response.json()["data"]["klines"]
def fetch_current_price(self, symbol: str) -> float:
"""获取当前实时参考价(适用于盘前/盘中)"""
url = "https://api.tickdb.ai/v1/market/kline/latest"
params = {"symbol": symbol, "interval": "1h"}
response = requests.get(
url, headers=self.headers, params=params, timeout=(3.05, 10)
)
self._handle_api_response(response)
klines = response.json()["data"]["klines"]
return float(klines[-1]["close"]) if klines else 0.0
def _handle_api_response(self, response: requests.Response):
"""统一错误处理"""
if response.status_code == 200:
data = response.json()
if data.get("code") == 3001:
retry_after = int(response.headers.get("Retry-After", 5))
logger.warning(f"限频触发,等待 {retry_after}s")
time.sleep(retry_after)
raise RuntimeError("限频重试")
elif data.get("code") != 0:
raise ValueError(f"API Error {data.get('code')}: {data.get('message')}")
else:
raise ConnectionError(f"HTTP {response.status_code}")
# ---- 信号计算层 ----
def calculate_drift(self, symbol: str) -> dict:
"""计算单个标的的隔夜漂移"""
klines = self.fetch_historical_klines(symbol, limit=2)
if len(klines) < 2:
return None
prev_close = float(klines[0]["close"])
current = self.fetch_current_price(symbol)
drift_pct = (current - prev_close) / prev_close * 100
result = {
"symbol": symbol,
"prev_close": prev_close,
"current": current,
"drift_pct": drift_pct,
"alert": abs(drift_pct) >= self.drift_threshold,
"direction": "long" if drift_pct > 0 else "short"
}
self.drift_cache[symbol] = result
self.latest_prices[symbol] = current
return result
def calculate_pressure_ratio(self, symbol: str, levels: int = 5) -> float:
"""
计算买卖压力比
基于最近一次 depth 快照:
- 买盘压力 = 前 N 档买盘量之和
- 卖盘压力 = 前 N 档卖盘量之和
- 压力比 = 买盘压力 / 卖盘压力
"""
if symbol not in self.depth_buffer or not self.depth_buffer[symbol]:
return 1.0 # 默认中性
latest_depth = self.depth_buffer[symbol][-1]
bids = latest_depth.get("bids", [])[:levels]
asks = latest_depth.get("asks", [])[:levels]
bid_volume = sum(float(b[1]) for b in bids)
ask_volume = sum(float(a[1]) for a in asks)
if ask_volume == 0:
return 999.0 # 极端偏多
return bid_volume / ask_volume
# ---- WebSocket 层 ----
def _on_depth_message(self, ws, message):
"""处理 depth 频道推送"""
try:
data = json.loads(message)
if data.get("channel") != "depth":
return
symbol = data.get("symbol")
bids = data.get("data", {}).get("bids", [])
asks = data.get("data", {}).get("asks", [])
self.depth_buffer[symbol].append({"bids": bids, "asks": asks})
# 实时计算压力比并检查告警
pressure = self.calculate_pressure_ratio(symbol)
if pressure >= self.pressure_threshold:
self._trigger_alert(symbol, "pressure", pressure)
except (json.JSONDecodeError, KeyError) as e:
logger.error(f"Depth 消息解析失败: {e}")
def _on_pong(self, ws, message):
"""WebSocket 心跳响应"""
logger.debug(f"Pong received: {message}")
def _on_error(self, ws, error):
"""WebSocket 错误处理"""
logger.error(f"WebSocket 错误: {error}")
self.ws_connected.clear()
def _on_close(self, ws, close_status_code, close_msg):
"""WebSocket 断开处理:指数退避重连"""
logger.warning(f"WebSocket 断开 (code={close_status_code})")
self.ws_connected.clear()
self._reconnect_with_backoff()
def _on_open(self, ws):
"""WebSocket 连接建立:订阅 depth 频道"""
for symbol in self.symbols:
sub_cmd = json.dumps({
"cmd": "subscribe",
"channel": "depth",
"symbol": symbol
})
ws.send(sub_cmd)
logger.info(f"已订阅 {symbol} depth 频道")
self.ws_connected.set()
def _reconnect_with_backoff(self):
"""
指数退避重连(带抖动)
最大重试间隔 60 秒,避免在 API 限频时雪崩
"""
retry = 0
base_delay = 1.0
max_delay = 60.0
while self.should_run and not self.ws_connected.is_set():
delay = min(base_delay * (2 ** retry), max_delay)
jitter = 0.1 * delay * (0.5 - (retry % 2)) # 交替增减抖动
wait_time = max(0.1, delay + jitter)
logger.info(f"{wait_time:.1f}s 后尝试重连 (retry={retry})")
time.sleep(wait_time)
if self.should_run:
try:
self._start_ws()
retry = 0 # 连接成功后重置
except Exception as e:
logger.error(f"重连失败: {e}")
retry += 1
def _start_ws(self):
"""启动 WebSocket 连接"""
# ⚠️ WebSocket 鉴权:URL 参数传递 api_key
ws_url = f"wss://api.tickdb.ai/ws?api_key={self.api_key}"
self.ws = websocket.WebSocketApp(
ws_url,
on_open=self._on_open,
on_message=self._on_depth_message,
on_error=self._on_error,
on_close=self._on_close
)
# 启动心跳线程
heartbeat_thread = threading.Thread(target=self._heartbeat_loop, daemon=True)
heartbeat_thread.start()
# 启动 WebSocket 主循环(非阻塞模式用 run_forever)
ws_thread = threading.Thread(target=self.ws.run_forever, daemon=True)
ws_thread.start()
# 等待连接建立
if self.ws_connected.wait(timeout=10):
logger.info("WebSocket 连接建立成功")
else:
raise TimeoutError("WebSocket 连接超时")
def _heartbeat_loop(self):
"""心跳保活:每 30 秒发送一次 ping"""
while self.should_run and self.ws_connected.wait():
try:
self.ws.send(json.dumps({"cmd": "ping"}))
logger.debug("Ping sent")
time.sleep(30)
except Exception:
break
# ---- 告警层 ----
def _trigger_alert(self, symbol: str, alert_type: str, value: float):
"""触发告警(此处为日志输出,生产环境可接入飞书/企微/邮件)"""
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
if alert_type == "drift":
logger.critical(
f"[ALERT] {timestamp} | {symbol} | "
f"隔夜漂移告警: {value:.2f}% "
f"(阈值: ±{self.drift_threshold}%)"
)
elif alert_type == "pressure":
logger.critical(
f"[ALERT] {timestamp} | {symbol} | "
f"买卖压力比告警: {value:.2f} "
f"(阈值: {self.pressure_threshold})"
)
# ---- 主循环 ----
def pre_market_scan(self):
"""盘前批量扫描:计算所有标的漂移"""
logger.info(f"=== 盘前扫描开始 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ===")
for symbol in self.symbols:
try:
result = self.calculate_drift(symbol)
if result:
if result["alert"]:
self._trigger_alert(symbol, "drift", result["drift_pct"])
logger.info(
f"{symbol}: 昨收={result['prev_close']:.2f} | "
f"现价={result['current']:.2f} | "
f"漂移={result['drift_pct']:+.2f}%"
)
except Exception as e:
logger.error(f"扫描 {symbol} 失败: {e}")
def start(self):
"""启动监控(预加载 + WebSocket)"""
# 第一步:盘前预加载历史数据
self.pre_market_scan()
# 第二步:启动 WebSocket 实时监控
logger.info("启动 WebSocket 实时监控...")
self._start_ws()
def stop(self):
"""优雅关闭"""
logger.info("停止监控...")
self.should_run = False
if self.ws:
self.ws.close()
# ============ 使用示例 ============
if __name__ == "__main__":
API_KEY = os.environ.get("TICKDB_API_KEY")
# 监控标的配置(美股 K 线 + 港股/数字货币 depth)
monitor = OvernightSignalMonitor(
api_key=API_KEY,
symbols=["AAPL.US", "NVDA.US", "9988.HK", "BTC.GB"],
drift_threshold=0.8, # 漂移超过 ±0.8% 触发告警
pressure_threshold=1.5 # 压力比超过 1.5 触发告警
)
try:
monitor.start()
# 保持主线程运行(监控为守护线程)
while True:
time.sleep(60)
# 每分钟重新计算漂移(应对盘前价格持续变化)
monitor.pre_market_scan()
except KeyboardInterrupt:
monitor.stop()
print("\n监控已停止")
五、产业链信号联动:事件驱动视角
隔夜信号的另一个维度是事件驱动。以下场景在盘前有明确的因果链可供预计算:
| 事件类型 | 盘前信号源 | 受益/受损方向 |
|---|---|---|
| 财报发布前夜 | 盘后成交量异常放大 + 期权 IV Skew 扩大 | 根据 drift 方向预判 |
| 政策发布(美联储) | 期货(ES.US)盘前走势 + 美债收益率 | 科技股/金融股分化 |
| 大宗商品异动 | 港股/数字货币同步下跌 | 资源类科技股(锂、钴) |
| 港股 ADR 夜盘 | AAPL.HK / NVDA.HK 走势 | AAPL.US / NVDA.US 领先指标 |
TickDB 跨品类数据支持使得上述信号的交叉验证成为可能——用期货走势验证股票方向,用港股夜盘确认美股盘前情绪。
六、开盘准备的清单化流程
将上述所有信号整合为一套标准化的开盘前工作流:
✅ 盘前 30 分钟(09:00)
├── 扫描自选股隔夜漂移(drift_scan)
├── 检查跨市场联动信号(期货 + ADR)
└── 更新波动率基准(历史 K 线标准差)
✅ 盘前 15 分钟(09:15)
├── 订阅 depth WebSocket(港股/数字货币)
├── 观察买卖压力比变化趋势
└── 根据压力比方向决定顺势/均值回归
✅ 集合竞价阶段(09:28 - 09:30)
├── 记录竞价价格与盘前价格差
├── 若开盘跳空 > 1%,等待回踩确认
└── 若开盘跳空 < 0.3%,观察首个 1 分钟 K 线方向
✅ 开盘后 15 分钟(09:30 - 09:45)
├── 验证预判与实际走势的一致性
├── 执行预设止损线(提前设置,不临时决策)
└── 收盘前复盘,记录预计算信号的准确率
七、结语:隔夜的优势在于准备,而不是预测
量化交易中最危险的幻觉,是以为可以在 9:30 那一秒"做出正确决策"。
真正的优势在 9:30 之前就已经分配完毕——谁在盘前做了更系统的信息收集,谁的挂单就更有结构性的胜率。这不是预测行情,而是用信号替代情绪,用流程替代冲动。
隔夜信号预计算的核心价值,不是让你猜对明天的开盘方向,而是让你在行情展开之前就已经知道该在哪里等待、该以什么条件入场。当多数人盯着屏幕等开盘价时,你已经在凌晨 2:15 看到了那张盘前订单簿的底牌。
下一步行动
如果你希望亲手实现本文的隔夜信号监控系统:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台生成 API Key
- 设置环境变量
TICKDB_API_KEY,复制本文代码即可运行 - 用自选股列表替换示例标的,开启盘前扫描
如果你关注更多盘前/盘后数据场景:
- 《盘后成交量异动检测:如何用 1h K 线发现机构布局痕迹》
- 《WebSocket 实时订单簿:用 depth 频道做盘前方向预判》
如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,一个指令即可拉取跨市场关联资产的最新报价。
回测局限性说明:上述策略逻辑基于市场微观结构的理论框架,读者可使用 TickDB 历史 K 线数据自行回测验证。在进行任何实盘操作前,请确保:回测周期覆盖至少一个完整牛熊周期;交易成本假设包含合理的滑点(建议 ≥0.05%)和佣金;实盘前进行模拟盘验证,样本量达到统计显著性要求。本文不构成任何投资建议。市场有风险,投资需谨慎。