非农数据发布瞬间的外汇订单簿变化:EURUSD 流动性监控实战

场景适配说明:TickDB 当前支持港股 10 档深度数据、数字货币 10 档深度数据,暂不支持外汇 EURUSD。为保证技术演示的可操作性,本文以 BTCUSDT 永续合约作为深度数据监控的实战示例——其 24/7 交易、高频波动、机构参与的特性和外汇市场高度相似,流动性分析方法论可完全迁移。


模块一:开篇

凌晨 2:00,华尔街的量化交易员老张盯着三块屏幕。

左边是 CME 外汇期货报价,右边是彭博终端,中间那块屏幕上,他的 Python 脚本刚刚在非农数据发布前 3 秒完成了一次完整的订单簿快照。

然后——什么都没发生。

老张知道,这只是暴风雨前的宁静。15 分钟后,美国劳工部准时公布新增非农就业人数,数字远超市场预期。EURUSD 在 30 秒内下跌了 127 个 pip,价差从正常的 0.5 pip 瞬间扩大到 12 pip。

对于做市商,这是收割流动性的黄金窗口;对于量化策略,这是被止损单埋葬的屠杀现场;对于风险监控系统,这是必须捕捉的异常信号。

非农数据发布瞬间,外汇订单簿会发生什么?买卖挂单的分布如何重构?作为量化开发者,我们能否用代码实时监控这种流动性突变,并在流动性真空窗口中做出更理性的风控决策?

本文给出完整答案:用 TickDB 的 depth 频道构建一套生产级的流动性监控引擎,以 BTCUSDT 永续合约作为实战演示(非农数据发布时,外汇和加密市场的流动性塌陷模式高度相似)。


模块二:微观结构拆解:非农数据前后的订单簿快照

先看数据,再讲代码。

下面是一组模拟的非农数据发布前后,BTCUSDT 永续合约订单簿的快照对比(基于真实市场结构的合理推演):

时间节点 买一价 卖一价 价差(pip) 买盘深度(前5档) 卖盘深度(前5档) 压力比 备注
非农前 60s 67,450.00 67,450.50 0.5 4,200,000 3,980,000 0.95 正常交易区间
非农前 5s 67,448.00 67,455.00 7.0 1,800,000 2,100,000 1.17 做市商开始撤单
数据发布后 2s 67,320.00 67,480.00 160.0 420,000 890,000 2.12 流动性真空
数据发布后 15s 67,180.00 67,200.00 20.0 1,200,000 1,450,000 1.21 空头主导反弹
数据发布后 60s 67,250.00 67,251.00 1.0 3,500,000 3,200,000 0.91 流动性回归

关键微观结构特征

  1. 做市商提前撤退:数据发布前 5 秒,深度骤降 50%,这是聪明钱在管理风险暴露
  2. 价差瞬间扩大 320 倍:从 0.5 pip 到 160 pip,滑点成本足以吃掉一个趋势策略的全部利润
  3. 压力比骤变:从 0.95 跌至 0.47 再升至 2.12,方向切换在 17 秒内完成
  4. 流动性 V 型复苏:60 秒后深度回归正常,但方向已重构

这就是为什么裸追涨杀跌在宏观数据发布窗口是送钱行为——你的止损单正好撞上流动性真空。


模块三:事件驱动策略逻辑:三段式框架

针对非农这类宏观数据,我们采用事前-事中-事后三段式监控框架:

┌─────────────────────────────────────────────────────────────────┐
│                     宏观事件驱动流动性监控                        │
├─────────────────────────────────────────────────────────────────┤
│  事前 (T-60s ~ T-5s)                                            │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │ • 订阅 depth 频道,建立基线快照                           │   │
│  │ • 计算正常流动性深度均值(baseline_depth)               │   │
│  │ • 监控买卖压力比是否进入预警区间(<0.8 或 >1.2)          │   │
│  │ • 设置心跳检测,识别做市商撤单信号                        │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              ▼                                   │
│  事中 (T+0 ~ T+30s)                                              │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │ • 检测价差突破阈值(>10 pip = 流动性真空)               │   │
│  │ • 暂停所有趋势跟踪类委托单(防止滑点屠杀)                 │   │
│  │ • 实时计算买卖压力比方向切换                             │   │
│  │ • 触发飞书/钉钉告警推送                                   │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              ▼                                   │
│  事中 (T+30s ~ T+300s)                                           │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │ • 监控流动性回归信号(深度恢复至基线 80%)               │   │
│  │ • 分析压力比稳定方向,辅助方向判断                       │   │
│  │ • 评估波动率回归路径                                     │   │
│  └─────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘

核心监控指标

指标 正常值 预警阈值 危险阈值 计算方式
买卖价差 0.5~2 pip >10 pip >50 pip ask - bid
流动性深度 >1,000,000 <500,000 <200,000 Σ(前5档数量)
压力比 0.9~1.1 <0.7 或 >1.4 <0.5 或 >2.0 buy_depth / sell_depth
深度衰减率 <5%/min >20%/min >50%/min Δdepth / baseline

模块四:生产级代码

4.1 WebSocket 连接与深度数据订阅

"""
宏观事件驱动流动性监控引擎
TickDB depth 频道实战:BTCUSDT 永续合约

⚠️  生产环境高频场景建议使用 aiohttp/asyncio
⚠️  本文代码演示完整流程逻辑,可直接运行验证
"""

import os
import json
import time
import hmac
import hashlib
import random
import logging
from datetime import datetime
from collections import deque
from dataclasses import dataclass, field
from typing import Optional

import websocket
import requests

# ============================================================================
# 配置区域
# ============================================================================

@dataclass
class MonitorConfig:
    """监控引擎配置"""
    # TickDB API 配置
    api_key: str = os.environ.get("TICKDB_API_KEY", "")
    api_secret: str = os.environ.get("TICKDB_API_SECRET", "")
    
    # WebSocket 配置
    ws_url: str = "wss://api.tickdb.ai/ws/v1/market"
    ping_interval: int = 20  # 心跳间隔(秒)
    
    # 监控标的
    symbol: str = "BTC.USDT"  # BTCUSDT 永续合约
    
    # 预警阈值
    spread_warning: float = 10.0      # 价差预警阈值(USD)
    spread_danger: float = 50.0       # 价差危险阈值
    depth_baseline: float = 1000000.0  # 深度基线(正常值)
    depth_warning_ratio: float = 0.5  # 深度低于基线50%触发预警
    pressure_warning: float = 1.4     # 压力比预警
    pressure_danger: float = 2.0      # 压力比危险
    
    # 滑动窗口参数
    snapshot_window: int = 60  # 快照窗口(秒)
    baseline_window: int = 300  # 基线计算窗口(秒)
    
    # 重连配置
    max_retries: int = 10
    base_delay: float = 1.0
    max_delay: float = 60.0

# ============================================================================
# 工具函数
# ============================================================================

def setup_logging() -> logging.Logger:
    """配置日志"""
    logger = logging.getLogger("LiquidityMonitor")
    logger.setLevel(logging.DEBUG)
    
    if not logger.handlers:
        handler = logging.StreamHandler()
        handler.setFormatter(
            logging.Formatter(
                "%(asctime)s | %(levelname)-8s | %(message)s",
                datefmt="%Y-%m-%d %H:%M:%S"
            )
        )
        logger.addHandler(handler)
    
    return logger


def generate_auth_signature(api_secret: str, timestamp: str) -> str:
    """生成 TickDB 认证签名"""
    message = timestamp.encode("utf-8")
    secret = api_secret.encode("utf-8")
    signature = hmac.new(secret, message, hashlib.sha256).hexdigest()
    return signature


# ============================================================================
# 核心监控引擎
# ============================================================================

@dataclass
class DepthSnapshot:
    """订单簿快照"""
    timestamp: float
    bid_price: float
    ask_price: float
    spread: float
    bid_depth: float  # 前N档买盘总量
    ask_depth: float  # 前N档卖盘总量
    pressure_ratio: float  # 买卖压力比
    raw_data: dict  # 原始数据备份


class LiquidityMonitor:
    """
    流动性监控引擎
    
    功能:
    1. WebSocket 实时订阅 depth 频道
    2. 滑动窗口计算流动性指标
    3. 多阈值告警检测
    4. 事件驱动信号生成
    """
    
    def __init__(self, config: MonitorConfig):
        self.config = config
        self.logger = setup_logging()
        
        # 连接状态
        self.ws: Optional[websocket.WebSocket] = None
        self.is_connected = False
        self.retry_count = 0
        
        # 数据缓冲
        self.snapshots: deque = deque(maxlen=config.baseline_window)
        self.latest_snapshot: Optional[DepthSnapshot] = None
        
        # 统计指标
        self.baseline_depth: float = config.depth_baseline
        self.snapshot_count: int = 0
        
        # 告警状态
        self.spread_alerted: bool = False
        self.depth_alerted: bool = False
        self.pressure_alerted: bool = False
    
    # -------------------------------------------------------------------------
    # WebSocket 生命周期管理
    # -------------------------------------------------------------------------
    
    def connect(self) -> bool:
        """
        建立 WebSocket 连接
        
        ⚠️  TickDB WebSocket 鉴权通过 URL 参数传递
        """
        try:
            # 构建认证参数
            timestamp = str(int(time.time() * 1000))
            signature = generate_auth_signature(
                self.config.api_secret, 
                timestamp
            )
            
            # URL 参数鉴权
            ws_url = (
                f"{self.config.ws_url}"
                f"?api_key={self.config.api_key}"
                f"&timestamp={timestamp}"
                f"&signature={signature}"
            )
            
            self.ws = websocket.WebSocketApp(
                ws_url,
                on_message=self._on_message,
                on_error=self._on_error,
                on_close=self._on_close,
                on_open=self._on_open,
            )
            
            self.logger.info(f"正在连接 TickDB: {self.config.symbol}")
            
            # 启动连接(blocking)
            # 生产环境建议使用 threading 或 asyncio
            self.ws.run_forever(
                ping_interval=self.config.ping_interval,
                ping_timeout=10
            )
            
            return True
            
        except Exception as e:
            self.logger.error(f"连接失败: {e}")
            return False
    
    def _on_open(self, ws: websocket.WebSocket):
        """连接建立后的初始化"""
        self.logger.info("✅ WebSocket 连接已建立")
        self.is_connected = True
        self.retry_count = 0
        
        # 订阅 depth 频道
        # ⚠️  depth 频道:港股10档/数字货币10档,不支持外汇
        subscribe_msg = {
            "cmd": "subscribe",
            "args": {
                "channel": "depth",
                "symbol": self.config.symbol,
                "depth": 10  # 最大10档
            }
        }
        ws.send(json.dumps(subscribe_msg))
        self.logger.info(f"已订阅 depth 频道: {self.config.symbol}")
    
    def _on_message(self, ws: websocket.WebSocket, message: str):
        """处理接收到的消息"""
        try:
            data = json.loads(message)
            
            # 处理 ping 消息
            if data.get("type") == "ping":
                ws.send(json.dumps({"type": "pong", "timestamp": data.get("timestamp")}))
                return
            
            # 处理 depth 数据
            if data.get("channel") == "depth":
                self._process_depth_update(data)
                
        except json.JSONDecodeError as e:
            self.logger.warning(f"消息解析失败: {e}")
        except Exception as e:
            self.logger.error(f"消息处理异常: {e}")
    
    def _on_error(self, ws: websocket.WebSocket, error):
        """错误处理"""
        self.logger.error(f"WebSocket 错误: {error}")
        self.is_connected = False
    
    def _on_close(self, ws: websocket.WebSocket, close_status_code, close_msg):
        """连接关闭处理:指数退避重连"""
        self.logger.warning(f"连接关闭: {close_status_code} - {close_msg}")
        self.is_connected = False
        
        if self.retry_count < self.config.max_retries:
            # 指数退避 + 抖动
            delay = min(
                self.config.base_delay * (2 ** self.retry_count),
                self.config.max_delay
            )
            jitter = random.uniform(0, delay * 0.1)
            wait_time = delay + jitter
            
            self.retry_count += 1
            self.logger.info(f"🔄 {wait_time:.1f}秒后第 {self.retry_count} 次重连...")
            
            time.sleep(wait_time)
            self.connect()
        else:
            self.logger.error("重连次数耗尽,请检查网络或 API 配置")
    
    # -------------------------------------------------------------------------
    # 深度数据处理
    # -------------------------------------------------------------------------
    
    def _process_depth_update(self, data: dict):
        """
        处理 depth 频道推送
        
        TickDB depth 数据结构:
        {
            "channel": "depth",
            "symbol": "BTC.USDT",
            "timestamp": 1703123456789,
            "bids": [[price, qty], ...],  # 买盘前10档
            "asks": [[price, qty], ...]   # 卖盘前10档
        }
        """
        try:
            timestamp = data.get("timestamp", time.time() * 1000) / 1000
            bids = data.get("bids", [])
            asks = data.get("asks", [])
            
            if not bids or not asks:
                return
            
            # 提取最优档位
            bid_price = float(bids[0][0])
            ask_price = float(asks[0][0])
            spread = ask_price - bid_price
            
            # 计算深度(累加前10档)
            bid_depth = sum(float(b[1]) for b in bids[:10])
            ask_depth = sum(float(a[1]) for a in asks[:10])
            
            # 计算买卖压力比
            pressure_ratio = bid_depth / ask_depth if ask_depth > 0 else float("inf")
            
            # 构建快照
            snapshot = DepthSnapshot(
                timestamp=timestamp,
                bid_price=bid_price,
                ask_price=ask_price,
                spread=spread,
                bid_depth=bid_depth,
                ask_depth=ask_depth,
                pressure_ratio=pressure_ratio,
                raw_data=data
            )
            
            # 更新缓冲
            self.snapshots.append(snapshot)
            self.latest_snapshot = snapshot
            self.snapshot_count += 1
            
            # 更新基线(每60个快照重新计算)
            if self.snapshot_count % 60 == 0:
                self._update_baseline()
            
            # 执行告警检测
            self._check_alerts(snapshot)
            
        except Exception as e:
            self.logger.error(f"深度数据处理异常: {e}")
    
    def _update_baseline(self):
        """更新流动性基线(滑动窗口均值)"""
        if len(self.snapshots) < 10:
            return
        
        recent = list(self.snapshots)[-self.config.baseline_window:]
        self.baseline_depth = sum(
            s.bid_depth + s.ask_depth for s in recent
        ) / len(recent) / 2
        
        self.logger.debug(f"基线更新: {self.baseline_depth:,.0f}")
    
    # -------------------------------------------------------------------------
    # 告警检测
    # -------------------------------------------------------------------------
    
    def _check_alerts(self, snapshot: DepthSnapshot):
        """多阈值告警检测"""
        alerts = []
        
        # 1. 价差告警
        if snapshot.spread > self.config.spread_danger and not self.spread_alerted:
            alerts.append(f"🚨 【危险】价差突破: {snapshot.spread:.1f} USD (阈值: {self.config.spread_danger})")
            self.spread_alerted = True
        elif snapshot.spread > self.config.spread_warning and not self.spread_alerted:
            alerts.append(f"⚠️ 【预警】价差扩大: {snapshot.spread:.1f} USD (阈值: {self.config.spread_warning})")
            self.spread_alerted = True
        
        # 2. 深度告警
        current_depth = (snapshot.bid_depth + snapshot.ask_depth) / 2
        depth_ratio = current_depth / self.baseline_depth
        
        if depth_ratio < 0.2 and not self.depth_alerted:
            alerts.append(f"🚨 【危险】流动性枯竭: 当前 {depth_ratio:.1%} 基线 (阈值: 20%)")
            self.depth_alerted = True
        elif depth_ratio < self.config.depth_warning_ratio and not self.depth_alerted:
            alerts.append(f"⚠️ 【预警】深度下降: {depth_ratio:.1%} 基线")
            self.depth_alerted = True
        
        # 3. 压力比告警
        if snapshot.pressure_ratio > self.config.pressure_danger and not self.pressure_alerted:
            alerts.append(f"🚨 【危险】买压极度异常: {snapshot.pressure_ratio:.2f}")
            self.pressure_alerted = True
        elif snapshot.pressure_ratio > self.config.pressure_warning and not self.pressure_alerted:
            alerts.append(f"⚠️ 【预警】压力比偏离: {snapshot.pressure_ratio:.2f}")
            self.pressure_alerted = True
        
        # 输出告警
        if alerts:
            self._send_alerts(alerts, snapshot)
    
    def _send_alerts(self, alerts: list, snapshot: DepthSnapshot):
        """发送告警通知"""
        timestamp_str = datetime.fromtimestamp(snapshot.timestamp).strftime("%H:%M:%S.%f")[:-3]
        
        for alert in alerts:
            self.logger.warning(alert)
        
        # 格式化输出当前快照
        status = f"""
╔══════════════════════════════════════════════════════════════╗
║  流动性快照 [{timestamp_str}]                                ║
╠══════════════════════════════════════════════════════════════╣
║  标的价格: {snapshot.bid_price:.2f} / {snapshot.ask_price:.2f}                    ║
║  买卖价差: {snapshot.spread:.2f} USD                                 ║
║  买盘深度: {snapshot.bid_depth:>12,.0f}  ({snapshot.bid_depth/self.baseline_depth:.1%} 基线)   ║
║  卖盘深度: {snapshot.ask_depth:>12,.0f}  ({snapshot.ask_depth/self.baseline_depth:.1%} 基线)   ║
║  压力比:   {snapshot.pressure_ratio:>12.2f}                                     ║
╚══════════════════════════════════════════════════════════════╝
"""
        self.logger.info(status)
    
    # -------------------------------------------------------------------------
    # 重置告警状态(新事件周期开始时调用)
    # -------------------------------------------------------------------------
    
    def reset_alerts(self):
        """重置告警状态(新非农周期开始前调用)"""
        self.spread_alerted = False
        self.depth_alerted = False
        self.pressure_alerted = False
        self.logger.info("告警状态已重置,等待新事件周期...")


# ============================================================================
# 主程序入口
# ============================================================================

def main():
    """主程序"""
    
    # 检查 API Key
    api_key = os.environ.get("TICKDB_API_KEY")
    api_secret = os.environ.get("TICKDB_API_SECRET")
    
    if not api_key or not api_secret:
        print("""
❌ 错误: 未设置 API 凭证

请设置环境变量:
export TICKDB_API_KEY="your_api_key"
export TICKDB_API_SECRET="your_api_secret"

然后重新运行。
        """)
        return
    
    # 创建配置
    config = MonitorConfig(
        api_key=api_key,
        api_secret=api_secret,
        symbol="BTC.USDT"  # BTCUSDT 永续合约
    )
    
    # 启动监控
    monitor = LiquidityMonitor(config)
    
    print(f"""
╔══════════════════════════════════════════════════════════════╗
║       流动性监控引擎启动                                    ║
║       标的: {config.symbol:<50}║
║       WebSocket: {config.ws_url:<43}║
╚══════════════════════════════════════════════════════════════╝
    """)
    
    try:
        monitor.connect()
    except KeyboardInterrupt:
        print("\n🛑 用户中断,退出监控...")


if __name__ == "__main__":
    main()

代码关键设计说明

设计要素 实现方式 原因
心跳保活 ping_interval=20 + on_message 处理 ping/pong 防止长连接被中间设备断开
指数退避重连 delay = base * 2^retry,上限 60s 避免高频重连压垮服务器
抖动 jitter = random.uniform(0, delay * 0.1) 防止多实例惊群效应
URL 鉴权 ?api_key=&timestamp=&signature= TickDB WebSocket 规范
滑动窗口 deque(maxlen=300) + 基线更新 适应流动性动态基线变化

模块五:订单流与流动性深度算法

5.1 核心衍生指标计算

基于 TickDB depth 频道推送的原始数据,我们可以计算以下流动性指标:

from dataclasses import dataclass
from typing import List


@dataclass
class LiquidityMetrics:
    """流动性指标集"""
    # 基础指标
    spread: float                    # 买卖价差
    spread_pct: float               # 价差百分比
    
    # 深度指标
    total_depth: float              # 总深度(买+卖)
    net_depth_imbalance: float      # 净深度失衡度
    depth_ratio: float              # 深度比(相对于基线)
    
    # 压力指标
    pressure_ratio: float           # 买卖压力比
    pressure_ imbalance: float     # 压力失衡度(标准化到 -1~1)
    
    # 加权指标
    vwap_imbalance: float          # 加权价差失衡
    depth_concentration: float      # 深度集中度(前3档占比)


def calculate_liquidity_metrics(
    bids: List[List[float]],      # [[price, qty], ...]
    asks: List[List[float]],
    baseline_depth: float = 1000000.0,
    depth_levels: int = 10
) -> LiquidityMetrics:
    """
    计算流动性综合指标
    
    参数:
        bids: 买盘深度数据(已排序,bids[0] = 买一)
        asks: 卖盘深度数据(已排序,asks[0] = 卖一)
        baseline_depth: 正常流动性基线
        depth_levels: 参与计算的档位数
    
    返回:
        LiquidityMetrics: 综合流动性指标
    """
    
    # 基础数据提取
    bid_price = float(bids[0][0])
    ask_price = float(asks[0][0])
    mid_price = (bid_price + ask_price) / 2
    
    # 1. 价差指标
    spread = ask_price - bid_price
    spread_pct = (spread / mid_price) * 100
    
    # 2. 深度计算(前 N 档)
    def calc_side_depth(side_data: List[List[float]]) -> float:
        return sum(float(d[1]) for d in side_data[:depth_levels])
    
    bid_depth = calc_side_depth(bids)
    ask_depth = calc_side_depth(asks)
    total_depth = bid_depth + ask_depth
    
    # 3. 深度失衡度
    net_depth_imbalance = (bid_depth - ask_depth) / total_depth if total_depth > 0 else 0
    depth_ratio = total_depth / (2 * baseline_depth)  # 相对于基线
    
    # 4. 买卖压力比
    pressure_ratio = bid_depth / ask_depth if ask_depth > 0 else float("inf")
    pressure_imbalance = (pressure_ratio - 1) / (pressure_ratio + 1)  # 标准化到 -1~1
    
    # 5. VWAP 失衡(加权平均价偏离中点)
    def calc_vwap(side_data: List[List[float]]) -> float:
        total_value = sum(float(d[0]) * float(d[1]) for d in side_data[:depth_levels])
        total_qty = sum(float(d[1]) for d in side_data[:depth_levels])
        return total_value / total_qty if total_qty > 0 else mid_price
    
    bid_vwap = calc_vwap(bids)
    ask_vwap = calc_vwap(asks)
    vwap_imbalance = (ask_vwap - bid_vwap) / (2 * mid_price) if mid_price > 0 else 0
    
    # 6. 深度集中度(前3档占总深度的比例)
    top3_bid = sum(float(d[1]) for d in bids[:3])
    top3_ask = sum(float(d[1]) for d in asks[:3])
    depth_concentration = (top3_bid + top3_ask) / total_depth if total_depth > 0 else 1.0
    
    return LiquidityMetrics(
        spread=spread,
        spread_pct=spread_pct,
        total_depth=total_depth,
        net_depth_imbalance=net_depth_imbalance,
        depth_ratio=depth_ratio,
        pressure_ratio=pressure_ratio,
        pressure_imbalance=pressure_imbalance,
        vwap_imbalance=vwap_imbalance,
        depth_concentration=depth_concentration
    )


def interpret_metrics(metrics: LiquidityMetrics) -> dict:
    """
    解读流动性指标,返回可读的状态描述
    
    返回:
        {
            "market_state": "正常/偏多/偏空/紧张/真空",
            "signals": ["预警信号列表"],
            "action": "建议操作"
        }
    """
    
    signals = []
    market_state = "正常"
    
    # 状态判断逻辑
    if metrics.depth_ratio < 0.2:
        market_state = "流动性真空"
        signals.append("深度不足基线20%,极端滑点风险")
    elif metrics.depth_ratio < 0.5:
        market_state = "流动性紧张"
        signals.append("深度偏低,滑点成本上升")
    
    if metrics.spread_pct > 0.01:  # 价差 > 0.01%
        signals.append(f"价差扩大: {metrics.spread:.2f}")
    
    if abs(metrics.pressure_imbalance) > 0.3:
        direction = "多头" if metrics.pressure_imbalance > 0 else "空头"
        signals.append(f"压力比严重失衡,向{direction}倾斜")
    
    if metrics.depth_concentration > 0.7:
        signals.append("深度集中于浅档,大单冲击风险高")
    
    # 操作建议
    if market_state in ("流动性紧张", "流动性真空"):
        action = "建议暂停委托,等待流动性回归"
    elif abs(metrics.pressure_imbalance) > 0.5:
        action = "注意方向性风险,控制仓位"
    else:
        action = "市场正常,可执行常规策略"
    
    return {
        "market_state": market_state,
        "signals": signals,
        "action": action
    }

5.2 非农事件窗口的监控策略

from datetime import datetime, timedelta
import threading


class EconomicEventMonitor:
    """
    宏观事件驱动监控器
    
    核心功能:
    1. 管理事件日历(美国非农、CPI、利率决议等)
    2. 在事件窗口自动调整监控参数
    3. 事后记录流动性数据供回测
    """
    
    # 常见宏观事件(时间UTC)
    EVENT_CALENDAR = {
        # 美国非农就业(每月第一个周五 13:30 UTC = 21:30 北京时间冬令时)
        "US_NFP": {"time_utc": "13:30", "impact": "high", "dof_edition": True},
        
        # 美国CPI(通常月中 12:30 UTC)
        "US_CPI": {"time_utc": "12:30", "impact": "high", "dof_edition": True},
        
        # FOMC利率决议(每年8次,具体时间不固定,需手动配置)
        "FOMC": {"time_utc": "18:00", "impact": "critical", "dof_edition": False},
        
        # 欧元区CPI(通常月中)
        "EU_CPI": {"time_utc": "09:00", "impact": "high", "dof_edition": True},
    }
    
    def __init__(self, liquidity_monitor: LiquidityMonitor):
        self.monitor = liquidity_monitor
        self.is_event_window = False
        self.event_start_time: Optional[datetime] = None
        
        # 监控窗口配置(秒)
        self.pre_event_seconds = 60      # 事件前监控时长
        self.post_event_seconds = 300   # 事件后监控时长
        self.lock = threading.Lock()
    
    def check_event_windows(self) -> bool:
        """
        检查当前是否处于事件窗口
        
        返回:
            True: 处于事件窗口
            False: 正常交易时间
        """
        now = datetime.utcnow()
        current_time_str = now.strftime("%H:%M")
        
        # 简化判断:检查当前时间是否接近已知事件时间
        for event_name, event_info in self.EVENT_CALENDAR.items():
            target_time = event_info["time_utc"]
            
            if current_time_str == target_time:
                self._enter_event_window(event_name)
                return True
        
        # 检查是否仍在事件窗口内
        if self.is_event_window and self.event_start_time:
            elapsed = (now - self.event_start_time).total_seconds()
            if elapsed > self.post_event_seconds:
                self._exit_event_window()
        
        return self.is_event_window
    
    def _enter_event_window(self, event_name: str):
        """进入事件窗口"""
        with self.lock:
            if not self.is_event_window:
                self.is_event_window = True
                self.event_start_time = datetime.utcnow()
                
                # 重置告警状态
                self.monitor.reset_alerts()
                
                # 调整阈值(事件窗口更敏感)
                self.monitor.config.spread_warning = 5.0   # 降低预警阈值
                self.monitor.config.spread_danger = 20.0
                self.monitor.config.depth_warning_ratio = 0.7  # 更早预警
                
                print(f"⚡ 进入事件窗口: {event_name}")
    
    def _exit_event_window(self):
        """退出事件窗口"""
        with self.lock:
            self.is_event_window = False
            self.event_start_time = None
            
            # 恢复默认阈值
            self.monitor.config.spread_warning = 10.0
            self.monitor.config.spread_danger = 50.0
            self.monitor.config.depth_warning_ratio = 0.5
            
            print("📊 事件窗口结束,恢复正常监控")

模块六:价值对比表

能力维度 轮询 REST API 第三方 WebSocket TickDB depth 频道
数据推送方式 客户端轮询(1-5s 间隔) 服务器推送 服务器推送
延迟 1-5 秒 <200ms <100ms
档位深度 通常不支持 1-5 档 港股/数字货币 10 档
订单簿重建 需客户端缓存 增量推送 增量推送 + 全量快照
心跳保活 需自行实现 视供应商 原生 ping/pong
重连机制 需自行实现 部分支持 指数退避 + 抖动
历史深度数据 不支持 不支持 支持回溯查询
适用场景 低频监控 中频交易 高频风控/事件驱动

TickDB 限制说明

  • depth 频道当前支持:港股(10档)、数字货币(10档)
  • 不支持:外汇(EURUSD 等)、贵金属、指数、美股(美股 depth 仅 1 档)
  • 如需外汇深度数据,建议配合路透社、彭博等专业数据源

模块七:产业链与非农事件标的

7.1 非农数据对外汇市场的影响链条

美国新增非农就业人数
        │
        ├──→ 预期 vs 实际差值
        │         │
        │         ├──→ 差值为正 → 美联储紧缩预期 ↑ → USD 升值
        │         │
        │         └──→ 差值为负 → 美联储宽松预期 ↑ → USD 贬值
        │
        └──→ 对主要货币对的影响

┌──────────────┬──────────────────────────────────────────────────┐
│ 货币对       │ 传导逻辑                                          │
├──────────────┼──────────────────────────────────────────────────┤
│ EURUSD       │ 美元强弱的直接反映,非农首选关注                   │
│ GBPUSD       │ 英国央行政策叠加英国经济数据                       │
│ USDJPY       │ 日本央行货币政策干预风险                           │
│ AUDUSD       │ 大宗商品货币,对全球风险偏好敏感                   │
│ USDCAD       │ 加拿大央行政策 + 油价因素                          │
│ USDCHF       │ 避险货币,波动相对较小                             │
└──────────────┴──────────────────────────────────────────────────┘

7.2 部署方案推荐

场景 硬件配置 TickDB 方案 适用人群
个人学习 2 核 4G 云服务器 免费层 API(有限频次) 个人量化研究者
实盘监控 4 核 8G + 低延迟网络 专业版(提高频次限制) 个人交易者
团队协作 8 核 16G + 独立数据库 团队版(多 API Key) 小型量化团队
机构级 分布式架构 + 灾备 企业版(专属通道 + SLA) 机构交易部门

模块八:结语

订单簿是市场的 X 光片,价格只是透视的结果。

非农数据发布瞬间,买卖挂单的剧烈重构,本质上是信息不对称在流动性层面的即时映射。做市商在数据公布前撤退,反映的是风险管理本能;买卖压力比的骤变,折射的是多空力量的瞬间失衡;流动性真空窗口,则是高频策略的绞肉机,也是风控系统的试金石。

本文的方法论总结

  1. 监控先行:在事件窗口前建立流动性基线,设置动态阈值
  2. 信号分层:价差、深度、压力比三个维度交叉验证,避免误报
  3. 行动克制:流动性真空期暂停趋势跟踪委托单,让市场自己重构秩序
  4. 数据沉淀:记录完整事件窗口数据,为后续策略回测积累样本

代码可直接运行:设置 TICKDB_API_KEYTICKDB_API_SECRET 环境变量,复制第四模块的代码即可启动监控引擎。


下一步行动

如果你想亲手实现本文策略

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key 和 API Secret
  3. 设置环境变量 TICKDB_API_KEYTICKDB_API_SECRET
  4. 安装依赖:pip install websocket-client
  5. 复制本文代码即可运行

如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,快速接入 TickDB 数据能力。

如果你需要机构级数据方案,联系 [email protected] 获取专属报价和 SLA 保障。


免责声明:本文不构成任何投资建议。流动性监控系统的回测结果基于历史数据模拟,不代表未来收益。宏观事件期间市场波动剧烈,实际交易中存在滑点、流动性枯竭等风险敞口,请在充分测试后谨慎使用。市场有风险,投资需谨慎。