"你的突破信号可能是交易所的一个模拟成交单。"

2024年11月的一个深夜,一位量化工程师在 Discord 上贴出了某只中概股的盘后 K 线——股价在 30 分钟内从 82 美元飙升到 89 美元,突破形态堪称教科书级别。他按照策略挂单买入,然后看着价格在次日盘前一路跌回 81 美元。

问题不在策略逻辑,而在于盘后本身。

当某只股票的盘后成交量不足其日均成交量的 2% 时,任何技术指标都是沙滩上的城堡。买卖价差可能是正常水平的 8 倍,一笔 500 股的单子就能推动 3% 的波动。这类"突破"在量化术语中有一个更准确的名字:thin market artifact,薄流动性的人造信号。

本文从市场微观结构出发,拆解盘前盘后交易为什么本质上不适合传统的突破策略,以及如何在工程层面构建三重过滤机制:时段过滤、成交量阈值校验、订单簿深度校验。每一层都有可运行的 Python 代码,所有代码均为生产级实现。


一、薄市场机制:为什么盘后价格不"真实"

1.1 三个数字说清楚问题

理解盘后流动性问题,先看三个有代表性的数字:

时段 日均成交量占比 平均买卖价差(相对) 主要参与者
常规交易时段(9:30–16:00 ET) ~88% 机构、做市商、公募
盘前(4:00–9:30 ET) ~5% 3–5× 对冲基金、散户(少量)
盘后(16:00–20:00 ET) ~7% 3–6× 散户、少数做市商

这三个数字揭示了盘后交易的根本特征:参与者少、价差宽、成交量碎片化

1.2 从订单簿看"虚假繁荣"

以某科技股为例,对比常规时段和盘后同一价位附近的订单簿快照:

常规时段(14:30 ET)

买一价 178.52  数量 4,200 股
卖一价 178.53  数量 3,800 股
买卖价差 0.01(相对价差 0.0056%)
前五档总量买 18,700 / 卖 16,200(压力比 1.15)

盘后同一交易日(18:15 ET)

买一价 178.40  数量 300 股
卖一价 178.90  数量 200 股
买卖价差 0.50(相对价差 0.28%)
前五档总量买 1,100 / 卖 800(压力比 1.37)

常规时段的价格"真实"——买卖盘深度厚,多方和空方都在报价,价格是市场均衡的结果。盘后同一时刻,300 股就能推动买卖价差从 0.01 扩张到 0.50。这意味着什么?

意味着你在盘后看到的"178.40 买一",可能是某个大户挂的单子,市场深度根本无法承载任何中等规模的单边行情。突破这个价位并不代表市场真正看多——只是那个价位恰好只有 300 股在等。

1.3 盘后的三类参与者决定了它天然不适合趋势策略

盘后交易的参与者结构决定了价格发现机制的失效:

机构投资者:受合规限制,许多共同基金和养老基金不允许在盘后下单,即使下单也必须通过"授权交易"流程,实际上无法实时响应。

散户:数量多但单笔量小,对流动性的贡献极度碎片化,且行为模式趋同(看到上涨追买、看到下跌杀跌),使盘后价格波动更易出现极端化的单边行情。

电子做市商(ECN):盘后时段 ECN 上的报价深度远低于常规时段,部分 ECN 在盘后关闭或只接受 Limit Order,不提供真正的流动性。

结论是:在常规时段,价格由"多空博弈的均衡"决定;在盘后,价格由"谁的挂单恰好在那里"决定。这两件事根本不是同一个游戏。


二、第一重过滤:交易时段边界判定

构建任何盘后敏感的策略,第一步是精确地界定"现在是什么时段"。这不是简单的 9:30-16:00 判断——你还需要处理夏令时/冬令时切换、节假日提前收盘、NYSE 特殊事件等边界情况。

以下是一个完整的交易时段判定模块:

from datetime import datetime, time, timezone
from typing import Literal

# NYSE 官方假日列表(2024年示例)
NYSE_HOLIDAYS = {
    datetime(2024, 1, 1).date(),
    datetime(2024, 1, 15).date(),  # MLK Day
    datetime(2024, 2, 19).date(),  # Presidents Day
    datetime(2024, 3, 29).date(),  # Good Friday
    datetime(2024, 5, 27).date(),  # Memorial Day
    datetime(2024, 7, 4).date(),
    datetime(2024, 9, 2).date(),   # Labor Day
    datetime(2024, 11, 28).date(), # Thanksgiving
    datetime(2024, 12, 25).date(),
}

def is_dst_switch() -> bool:
    """
    检测当前是否处于夏令时切换周(3月/11月第二个周日附近)。
    此期间 NYSE 实际收盘时间可能与预期不同,需额外处理。
    """
    now = datetime.now(timezone.utc)
    month = now.month
    if month not in (3, 11):
        return False
    day = now.day
    return (month == 3 and day >= 8 and day <= 14) or (month == 11 and day >= 1 and day <= 7)


class TradingSession:
    """美股交易时段定义与判定"""

    PRE_MARKET_OPEN = time(4, 0)      # 盘前开
    REGULAR_OPEN = time(9, 30)         # 常规时段开
    REGULAR_CLOSE = time(16, 0)        # 常规时段闭
    AFTER_HOURS_CLOSE = time(20, 0)    # 盘后闭(允许订单提交截止)
    FULL_CLOSE = time(23, 59)          # 完全不可交易

    # 特殊提前收盘(假期后一天等场景)
    EARLY_CLOSE = time(13, 0)

    @classmethod
    def get_current_session(cls, dt: datetime) -> Literal["pre_market", "regular", "after_hours", "closed"]:
        """
        输入任意时区-aware datetime(建议传入 US/Eastern),
        返回当前所属交易时段。

        ⚠️ 生产环境:需使用 pytz 或 zoneinfo 将 datetime 转换为 US/Eastern,
        而非简单用 time() 直接比较(未考虑日期边界)。
        """
        from zoneinfo import ZoneInfo
        eastern = dt.astimezone(ZoneInfo("America/New_York"))
        today = eastern.date()

        # 周末跳过
        if today.weekday() >= 5:
            return "closed"

        # 节假日跳过
        if today in NYSE_HOLIDAYS:
            # 节假日全天关闭(但盘前可能有模拟交易,不建议参与)
            return "closed"

        current_time = eastern.time()
        today_str = today.isoformat()

        # 早盘前
        if current_time < cls.PRE_MARKET_OPEN:
            return "closed"

        # 盘前 4:00 - 9:30
        if cls.PRE_MARKET_OPEN <= current_time < cls.REGULAR_OPEN:
            return "pre_market"

        # 常规时段(含潜在提前收盘)
        if cls.REGULAR_OPEN <= current_time < cls.REGULAR_CLOSE:
            return "regular"

        # 盘后 16:00 - 20:00
        if cls.REGULAR_CLOSE <= current_time < cls.AFTER_HOURS_CLOSE:
            return "after_hours"

        # 20:00 后至次日 4:00 前
        return "closed"

    @classmethod
    def get_volume_multiplier(cls, session: str) -> float:
        """
        各时段的基准成交量调整系数。
        盘后/盘前的信号需要更高的确认阈值。
        ⚠️ 这些系数需要根据你的标的实际情况调参;
        小市值股票盘后的基准系数应更低。
        """
        multipliers = {
            "regular": 1.0,
            "pre_market": 0.15,    # 盘前流动性极低,阈值应大幅提高
            "after_hours": 0.15,   # 盘后同理
            "closed": 0.0,
        }
        return multipliers.get(session, 1.0)

    @classmethod
    def is_high_confidence_window(cls, dt: datetime) -> bool:
        """
        高置信度窗口:仅在常规时段和盘后前半段返回 True。
        盘后最后 30 分钟交易质量下降(ECN 流动性撤出)。
        """
        from zoneinfo import ZoneInfo
        eastern = dt.astimezone(ZoneInfo("America/New_York"))
        current_time = eastern.time()

        session = cls.get_current_session(dt)

        if session == "regular":
            # 常规时段前 30 分钟(9:30-10:00)和收盘前 30 分钟(15:30-16:00)
            # 也是噪音较高的时段,可按需进一步过滤
            return True
        if session == "after_hours":
            # 盘后前 2 小时流动性较好,19:00 后逐渐撤出
            if current_time < time(19, 0):
                return True
        return False

这段代码的工程要点

  1. 时区处理:必须使用 zoneinfo.ZoneInfo("America/New_York") 将 UTC 时间转换为东部时间。直接用 time() 比较而不处理时区,在夏令时切换周会出错——3 月第二个周日后的第一个交易日,UTC 和 ET 的偏移差会突变。

  2. 假日列表:NYSE 的假期每年更新,建议维护一个年度字典,每次更新版本时同步维护。

  3. 时段乘数get_volume_multiplier 返回的 0.15 是经验值。实际使用中,你需要根据目标股票的历史盘后成交量占日均成交量的比率来调整——大型股(微软、苹果)可能只有 5% 的盘后占比,小型股可能达到 15-20%。


三、第二重过滤:成交量阈值动态校验

时段判定解决的是"该不该看"的问题,成交量阈值解决的是"这个信号是不是真的"的问题。

3.1 固定阈值 vs 动态阈值

很多简单策略使用固定成交量阈值——例如"成交量超过 100 万股才认定突破有效"。这个方法在常规时段勉强可用,但在盘后完全失效:你不可能要求盘后成交量达到 100 万股(这可能是日均成交量的 50 倍)。

正确的做法是:用当天累计成交量与历史同期基准的比例来判断

import os
import requests
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
from typing import Optional
import time


# ===== 数据获取层 =====

class TickDBVolumeFetcher:
    """
    从 TickDB 获取历史 K 线数据,计算盘后成交量基准。
    生产环境:API Key 应通过环境变量注入,禁止硬编码。
    ⚠️ 仅用于获取 1D / 1h K 线数据,不支持美股 tick 逐笔。
    """

    BASE_URL = "https://api.tickdb.ai/v1"
    HEADERS = {
        "X-API-Key": os.environ.get("TICKDB_API_KEY"),
        "Content-Type": "application/json"
    }

    @classmethod
    def get_historical_kline(cls, symbol: str, interval: str = "1h", limit: int = 100) -> list[dict]:
        """
        获取历史 K 线数据用于计算成交量基准。

        参数:
            symbol: 标的代码,如 'NVDA.US'
            interval: K 线周期,1h / 1d
            limit: 返回的 K 线数量
        """
        url = f"{cls.BASE_URL}/market/kline"
        params = {
            "symbol": symbol,
            "interval": interval,
            "limit": limit
        }

        response = requests.get(
            url,
            headers=cls.HEADERS,
            params=params,
            timeout=(3.05, 10)  # ⚠️ 生产级超时:连接超时 3.05s,读取超时 10s
        )

        if response.status_code == 429:
            retry_after = int(response.headers.get("Retry-After", 60))
            time.sleep(retry_after)
            raise RuntimeError(f"触达速率限制,等待 {retry_after}}s 后重试")

        response.raise_for_status()
        data = response.json()

        if data.get("code") != 0:
            raise RuntimeError(f"API 错误 {data.get('code')}: {data.get('message')}")

        return data.get("data", [])

    @classmethod
    def calculate_volume_baseline(cls, symbol: str) -> dict:
        """
        计算成交量基准:
        1. 取最近 60 个交易日的历史 K 线
        2. 分别计算盘前、盘后、常规时段的平均成交量占比
        3. 输出基准阈值字典

        ⚠️ 这里的"盘前/盘后"指历史 K 线中的对应时段,
        需要结合 K 线数据的时间戳和 NYSE 时段规则判断。
        """
        klines = cls.get_historical_kline(symbol, interval="1h", limit=500)

        regular_volumes = []
        extended_volumes = []

        for kline in klines:
            ts = kline.get("timestamp")
            volume = kline.get("volume", 0)
            if not ts or not volume:
                continue

            dt = datetime.fromtimestamp(ts / 1000, tz=timezone.utc)
            eastern = dt.astimezone(ZoneInfo("America/New_York"))
            t = eastern.time()

            # 根据 K 线时间判断属于哪个时段
            if time(9, 30) <= t < time(16, 0):
                regular_volumes.append(volume)
            else:
                extended_volumes.append(volume)

        import statistics
        baseline = {
            "regular_avg": statistics.mean(regular_volumes) if regular_volumes else 0,
            "extended_avg": statistics.mean(extended_volumes) if extended_volumes else 0,
            "extended_ratio": (
                statistics.mean(extended_volumes) / statistics.mean(regular_volumes)
                if regular_volumes and statistics.mean(regular_volumes) > 0
                else 0.0
            ),
        }
        return baseline


# ===== 信号验证层 =====

class VolumeSignalValidator:
    """
    成交量信号验证器。
    给定当前时段的成交量,判断突破信号是否有效。
    """

    def __init__(self, volume_baseline: dict):
        self.baseline = volume_baseline

    def compute_threshold(
        self,
        session: str,
        lookback_multiplier: float = 1.5,
        session_multiplier: float = 3.0
    ) -> float:
        """
        计算动态成交量阈值。

        参数:
            lookback_multiplier: 基准倍数值(基准 × 此值 = 阈值)
            session_multiplier: 盘前/盘后额外放大系数(流动性越差,阈值越高)

        返回:最小有效成交量阈值(股数)
        """
        if session == "regular":
            base = self.baseline["regular_avg"]
        elif session in ("pre_market", "after_hours"):
            base = self.baseline["extended_avg"]
        else:
            return float("inf")  # 非交易时段,阈值无限大

        # 盘前盘后额外放大阈值,避免薄市场噪声
        session_factor = session_multiplier if session in ("pre_market", "after_hours") else 1.0
        return base * lookback_multiplier * session_factor

    def is_signal_valid(
        self,
        current_volume: float,
        session: str,
        lookback_multiplier: float = 1.5,
        session_multiplier: float = 3.0
    ) -> dict:
        """
        判断成交量信号是否有效。

        返回 dict:
            valid: bool,信号是否有效
            threshold: float,实际使用的阈值
            ratio: float,当前成交量 / 阈值的比值
        """
        threshold = self.compute_threshold(session, lookback_multiplier, session_multiplier)

        if threshold == float("inf"):
            return {"valid": False, "threshold": threshold, "ratio": 0.0, "reason": "非交易时段"}

        ratio = current_volume / threshold if threshold > 0 else 0.0

        return {
            "valid": ratio >= 1.0,
            "threshold": threshold,
            "ratio": ratio,
            "reason": "信号有效" if ratio >= 1.0 else f"成交量不足,当前 {current_volume:.0f} / 阈值 {threshold:.0f}"
        }

3.2 三类阈值场景的实际用法

上面 compute_threshold 方法中的两个乘数不是随意设定的,它们的含义如下:

场景 lookback_multiplier session_multiplier 说明
保守模式(避免假信号) 2.0 5.0 只在高置信度时交易,可能错过部分机会
均衡模式 1.5 3.0 默认推荐设置
激进模式(允许更多假信号) 1.0 1.5 常规时段可考虑,盘后禁用

一个实战例子:某只中型科技股,历史数据显示盘后成交量约为常规时段的 8%。使用均衡模式(lookback=1.5, session=3.0),盘后有效成交量阈值 = extended_avg × 1.5 × 3.0 = extended_avg × 4.5。

换句话说,你要求盘后的成交量至少达到该股票"历史上盘后平均成交量"的 4.5 倍,才能认定突破有效。这个阈值远高于固定阈值(如"超过 50 万股"),因为它内嵌了流动性衰减的上下文。


四、第三重过滤:订单簿深度校验

成交量阈值过滤的是"量",但没有解决"质"的问题。假突破有时候确实有可观的成交量——但那可能是一两个大户的对倒单,或者是某个算法在特定时间点的机械买入。真正能揭示市场意图的是订单簿的深度结构

4.1 depth 频道:捕捉流动性真实深度

TickDB 的 depth 频道提供实时订单簿快照,包含多档挂单量和挂单价格。这是过滤盘后假突破的终极工具。

import json
import threading
import time
import random
from collections import deque
from typing import Optional


class OrderBookMonitor:
    """
    订单簿实时监控器。
    订阅 TickDB depth 频道,计算买卖压力比,
    辅助判断突破是否由真实流动性支撑。

    ⚠️ depth 频道支持情况:
    - 美股:1 档(买卖各一档)
    - 港股/数字货币:最大 50 档(视市场)
    - 不支持:外汇、贵金属、指数
    """

    def __init__(
        self,
        symbol: str,
        ws_url: str = "wss://stream.tickdb.ai/v1/market/ws",
        api_key: Optional[str] = None,
    ):
        self.symbol = symbol
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        self.ws_url = f"{ws_url}?symbol={symbol}&channels=depth&api_key={self.api_key}"

        self.ws = None
        self.thread = None
        self.running = False

        # 滚动窗口:最近 20 个快照,计算压力比趋势
        self.snapshots = deque(maxlen=20)
        self.lock = threading.Lock()

    def start(self):
        """启动 WebSocket 连接,后台线程接收数据"""
        self.running = True
        self.thread = threading.Thread(target=self._run_loop, daemon=True)
        self.thread.start()

    def _run_loop(self):
        import websocket

        retry_count = 0
        base_delay = 1.0
        max_delay = 60.0

        while self.running:
            try:
                ws = websocket.create_connection(
                    self.ws_url,
                    timeout=10
                )
                self.ws = ws
                retry_count = 0  # 重连成功,重置计数

                while self.running:
                    # ⚠️ 生产级心跳保活:每 20 秒发送一次 ping
                    ws.sock.ping()

                    data = ws.recv()
                    self._handle_message(data)

            except Exception as e:
                if not self.running:
                    break

                # 指数退避 + 抖动重连
                delay = min(base_delay * (2 ** retry_count), max_delay)
                jitter = random.uniform(0, delay * 0.1)
                time.sleep(delay + jitter)
                retry_count += 1

    def _handle_message(self, data: str):
        """解析 depth 频道消息,更新快照窗口"""
        try:
            msg = json.loads(data)

            # TickDB depth 频道消息格式示例:
            # {"channel": "depth", "symbol": "AAPL.US",
            #  "bids": [[178.52, 4200], [178.51, 3800]],
            #  "asks": [[178.53, 4100], [178.54, 3500]],
            #  "timestamp": 1732540800000}
            channel = msg.get("channel")
            if channel != "depth":
                return

            bids = msg.get("bids", [])
            asks = msg.get("asks", [])
            ts = msg.get("timestamp", 0)

            snapshot = {
                "timestamp": ts,
                "bids": bids,
                "asks": asks,
                "spread": (asks[0][0] - bids[0][0]) if asks and bids else 0,
                "spread_pct": 0.0,
            }

            if bids and asks and asks[0][0] > 0:
                snapshot["spread_pct"] = (asks[0][0] - bids[0][0]) / asks[0][0]

            with self.lock:
                self.snapshots.append(snapshot)

        except (json.JSONDecodeError, KeyError) as e:
            # 数据解析异常,静默跳过,避免污染快照窗口
            pass

    def compute_pressure_ratio(self, depth: int = 5) -> dict:
        """
        计算买卖压力比。

        参数:
            depth: 校验前 N 档订单簿深度
            美股 depth=1 时仅使用买卖一档,需注意

        返回:
            pressure_ratio: Σ(前N档买盘量) / Σ(前N档卖盘量)
            spread_pct: 当前买卖价差(相对值)
            snapshot_count: 窗口中快照数量
        """
        with self.lock:
            if not self.snapshots:
                return {"pressure_ratio": 0.0, "spread_pct": 0.0, "snapshot_count": 0}

            # 使用最新快照
            latest = self.snapshots[-1]

            def sum_volume(orders, n):
                return sum(qty for _, qty in orders[:n])

            bids = latest.get("bids", [])
            asks = latest.get("asks", [])

            buy_volume = sum_volume(bids, min(depth, len(bids)))
            sell_volume = sum_volume(asks, min(depth, len(asks)))

            ratio = buy_volume / sell_volume if sell_volume > 0 else 0.0

            return {
                "pressure_ratio": ratio,
                "spread_pct": latest.get("spread_pct", 0.0),
                "bid_depth": buy_volume,
                "ask_depth": sell_volume,
                "snapshot_count": len(self.snapshots),
            }

    def validate_breakout(
        self,
        breakout_price: float,
        current_price: float,
        direction: Literal["bullish", "bearish"],
        min_pressure_ratio: float = 1.5,
        max_spread_pct: float = 0.02,
        min_depth: int = 500
    ) -> dict:
        """
        综合评估突破信号是否得到订单簿支撑。

        参数:
            breakout_price: 突破价位
            current_price: 当前价格
            direction: 突破方向 ('bullish' 或 'bearish')
            min_pressure_ratio: 最小压力比阈值
            max_spread_pct: 最大允许相对价差
            min_depth: 前 N 档最小累计挂单量

        返回:
            validated: bool,是否通过校验
            score: float,0-1 的置信度评分
            details: dict,分解维度评分
        """
        pressure = self.compute_pressure_ratio(depth=5)

        if pressure["snapshot_count"] == 0:
            return {
                "validated": False,
                "score": 0.0,
                "reason": "订单簿数据不足,等待更多快照",
                "details": {}
            }

        ratio = pressure["pressure_ratio"]
        spread_pct = pressure["spread_pct"]
        depth_ok = pressure["bid_depth"] >= min_depth or pressure["ask_depth"] >= min_depth

        # 分解评分
        ratio_score = min(ratio / min_pressure_ratio, 1.0) if min_pressure_ratio > 0 else 0.0
        spread_score = 1.0 if spread_pct <= max_spread_pct else max(0.0, 1.0 - (spread_pct - max_spread_pct) / max_spread_pct)
        depth_score = 1.0 if depth_ok else 0.0

        # 方向性调整:看多时要求买盘深度足够,看空时要求卖盘深度足够
        if direction == "bullish":
            direction_score = ratio_score
        else:
            direction_score = min((1.0 / ratio), 1.0) if ratio > 0 else 0.0

        # 加权综合评分
        score = (ratio_score * 0.4 + spread_score * 0.3 + direction_score * 0.3)

        validated = (
            ratio_score >= 0.5 and
            spread_score >= 0.5 and
            direction_score >= 0.5
        )

        return {
            "validated": bool(validated),
            "score": round(score, 3),
            "reason": "通过" if validated else "订单簿支撑不足",
            "details": {
                "pressure_ratio": round(ratio, 3),
                "spread_pct": round(spread_pct * 100, 3),
                "ratio_score": round(ratio_score, 3),
                "spread_score": round(spread_score, 3),
                "direction_score": round(direction_score, 3),
            }
        }

    def stop(self):
        """优雅关闭 WebSocket"""
        self.running = False
        if self.ws:
            self.ws.close()

4.2 三维校验的实战逻辑

validate_breakout 方法同时检查三个维度:

压力比(pressure_ratio):买盘总量 / 卖盘总量。突破发生时,如果是真正的向上突破,前 N 档买盘深度应该明显高于卖盘。如果压力比低于 1.0(即卖盘比买盘厚),说明突破是由少量主动买入推动的,无法持续。

价差(spread_pct):买卖价差相对于卖价的百分比。正常盘后价差应在 0.5% 以内。如果价差突然扩大到 2-3%,说明流动性急剧恶化,此时的价格信号不具有参考性。

方向性深度:如果你判断是向上突破,不仅要看总买盘,还要看突破价位上方有多少卖压堆积。如果卖盘深度远超买盘,即使压力比看起来不错,向上突破也可能在第一波卖压面前折返。

实际使用中,建议等待至少 5 个快照(约 5-10 秒的数据积累)后再做校验,避免在订单簿尚未稳定时误判。


五、整合:三层过滤器流水线

三个过滤器可以串联成一个信号流水线。每次收到新的价格数据,依次经过时段检查 → 成交量检查 → 订单簿检查,三关全过才执行下单。

from datetime import datetime, timezone
from zoneinfo import ZoneInfo


class BreakoutSignalPipeline:
    """
    假突破过滤流水线。
    串联三层过滤器,任何一层不通过则拒绝信号。
    """

    def __init__(
        self,
        symbol: str,
        breakout_level: float,
        direction: Literal["bullish", "bearish"],
        lookback_multiplier: float = 1.5,
        session_multiplier: float = 3.0,
    ):
        self.symbol = symbol
        self.breakout_level = breakout_level
        self.direction = direction

        # 初始化三层过滤器
        self.session_checker = TradingSession()
        self.volume_validator: Optional[VolumeSignalValidator] = None
        self.orderbook_monitor: Optional[OrderBookMonitor] = None

        # 参数
        self.lookback_multiplier = lookback_multiplier
        self.session_multiplier = session_multiplier

    def initialize(self):
        """初始化数据层:计算成交量基准,启动订单簿监控"""
        baseline = TickDBVolumeFetcher.calculate_volume_baseline(self.symbol)
        self.volume_validator = VolumeSignalValidator(baseline)

        self.orderbook_monitor = OrderBookMonitor(self.symbol)
        self.orderbook_monitor.start()

        print(f"[{self.symbol}] 初始化完成")
        print(f"  常规时段平均成交量: {baseline['regular_avg']:,.0f}")
        print(f"  扩展时段平均成交量: {baseline['extended_avg']:,.0f}")
        print(f"  扩展/常规比率: {baseline['extended_ratio']:.2%}")

    def evaluate(
        self,
        current_price: float,
        current_volume: float,
        current_time: Optional[datetime] = None,
    ) -> dict:
        """
        评估当前价格是否构成有效突破信号。

        依次执行三层过滤:
        L1 → L2 → L3,全通过才返回 validated=True
        """
        ts = current_time or datetime.now(timezone.utc)

        # ===== L1: 时段过滤 =====
        session = self.session_checker.get_current_session(ts)
        is_high_confidence = self.session_checker.is_high_confidence_window(ts)

        result = {
            "timestamp": ts.isoformat(),
            "symbol": self.symbol,
            "current_price": current_price,
            "breakout_level": self.breakout_level,
            "direction": self.direction,
            "session": session,
            "layers": {}
        }

        # 检查当前价格是否真正突破
        if self.direction == "bullish":
            is_breach = current_price > self.breakout_level
        else:
            is_breach = current_price < self.breakout_level

        result["is_breach"] = is_breach

        if not is_breach:
            result["valid"] = False
            result["reason"] = "价格未突破指定水平"
            return result

        # ===== L1: 时段合理性 =====
        if session == "closed":
            result["valid"] = False
            result["reason"] = "L1 失败:非交易时段"
            result["layers"]["L1"] = {"passed": False, "reason": "closed session"}
            return result

        # 盘后若不在高置信窗口,降低评分
        if not is_high_confidence:
            result["session_warning"] = "当前时段流动性偏低,阈值自动提高"

        # ===== L2: 成交量阈值 =====
        threshold = self.volume_validator.compute_threshold(
            session, self.lookback_multiplier, self.session_multiplier
        )
        vol_pass = current_volume >= threshold

        result["layers"]["L1"] = {
            "passed": True,
            "session": session,
            "high_confidence": is_high_confidence,
        }
        result["layers"]["L2"] = {
            "passed": vol_pass,
            "current_volume": current_volume,
            "threshold": threshold,
            "ratio": current_volume / threshold if threshold > 0 else 0,
        }

        if not vol_pass:
            result["valid"] = False
            result["reason"] = f"L2 失败:成交量不足({current_volume:,.0f} / {threshold:,.0f})"
            return result

        # ===== L3: 订单簿深度 =====
        ob_validation = self.orderbook_monitor.validate_breakout(
            breakout_price=self.breakout_level,
            current_price=current_price,
            direction=self.direction,
            min_pressure_ratio=1.5 if session == "regular" else 2.5,  # 盘后更严格
            max_spread_pct=0.01 if session == "regular" else 0.03,     # 盘后容忍更大价差
            min_depth=1000 if session == "regular" else 500,           # 盘后阈值略低
        )

        result["layers"]["L3"] = {
            "passed": ob_validation["validated"],
            **ob_validation["details"],
            "score": ob_validation["score"],
        }

        if not ob_validation["validated"]:
            result["valid"] = False
            result["reason"] = f"L3 失败:订单簿支撑不足(置信度 {ob_validation['score']:.1%})"
            return result

        # 三层全通过
        result["valid"] = True
        result["reason"] = "三层过滤全通过,信号有效"
        result["confidence_score"] = ob_validation["score"]
        return result

    def shutdown(self):
        """优雅关闭所有资源"""
        if self.orderbook_monitor:
            self.orderbook_monitor.stop()

使用示例

# 初始化:监控 NVDA.US,突破 850 美元时买入
pipeline = BreakoutSignalPipeline(
    symbol="NVDA.US",
    breakout_level=850.0,
    direction="bullish",
    lookback_multiplier=1.5,
    session_multiplier=3.0,
)
pipeline.initialize()

# 模拟评估(实际场景中接实时数据源)
result = pipeline.evaluate(
    current_price=851.20,
    current_volume=48500,
    current_time=datetime.now(timezone.utc),
)

import json
print(json.dumps(result, indent=2, default=str))

典型输出(三层全通过)

{
  "valid": true,
  "reason": "三层过滤全通过,信号有效",
  "confidence_score": 0.821,
  "layers": {
    "L1": {"passed": true, "session": "regular", "high_confidence": true},
    "L2": {"passed": true, "current_volume": 48500, "threshold": 42000, "ratio": 1.15},
    "L3": {"passed": true, "pressure_ratio": 2.1, "spread_pct": 0.008, "score": 0.821}
  }
}

典型输出(盘后假突破被拦截)

{
  "valid": false,
  "reason": "L3 失败:订单簿支撑不足(置信度 31.2%)",
  "layers": {
    "L1": {"passed": true, "session": "after_hours", "high_confidence": true},
    "L2": {"passed": true, "current_volume": 8200, "threshold": 6000, "ratio": 1.37},
    "L3": {"passed": false, "pressure_ratio": 0.73, "spread_pct": 0.048, "score": 0.312}
  }
}

注意这个例子中,成交量居然通过了 L2(8200 > 6000),但订单簿揭示了真相:压力比 0.73(卖盘厚于买盘),价差 4.8%(严重恶化),综合置信度仅 31.2%。这个信号被 L3 拦截了。没有订单簿验证层,这个假突破会被系统识别为有效信号并执行下单。


六、盘后行情的数据源选择

构建上述系统需要两类数据:历史 K 线(计算成交量基准)和实时订单簿(depth 频道)。以下是 TickDB 的支持能力和其他数据源的对比:

数据类型 TickDB Polygon alpaca
美股历史 K 线(1D) 10 年级,清洗对齐 取决于套餐 支持,但长度有限
美股实时 depth 支持(1 档) 支持(多档) 支持(Level 2)
美股盘前盘后数据 支持(含盘前盘后 K 线) 支持 支持
REST API 接入
WebSocket 实时推送
免费层额度 有限制

选择建议:如果你的策略以日内突破为主,TickDB 的历史 K 线和 WebSocket depth 订阅可以覆盖大部分需求。depth 频道美股 1 档的限制对于"前 N 档总量"计算影响不大——主要需要的是买卖一档的压力比数据,1 档足够。

对于需要更细粒度订单簿(多档数据)的场景,可以将 TickDB 作为主力数据源,用 Polygon 或 alpaca 的 Level 2 数据补充多档分析。


七、结语:流动性是第一信号,不是背景噪音

写到这里,我们回到最初那位量化工程师的问题:他的策略没有问题,代码没有问题,唯一的失误是在一个不适合趋势策略的时段,执行了一个需要强流动性背书的信号

盘后交易不是"简化版的常规交易",它是另一个市场。它有自己的供需结构、自己的参与者行为模式、自己的价格发现机制。任何在常规时段有效的策略,迁移到盘后都需要额外的过滤层——不是因为策略错了,而是因为市场的物理特性变了。

本文的三层过滤不是"过度保守",而是对市场微观结构的尊重

  • L1 时段过滤:告诉你该不该参与当前市场
  • L2 成交量阈值:告诉你这个信号有没有足够的量支撑
  • L3 订单簿深度:告诉你这个量的背后有没有真实的流动性

三层全通过,意味着至少在数据层面,这个突破有足够的背景支撑可以信赖。至于它最终是否盈利,那是另一套体系(仓位管理、止损、事件风险)要处理的问题——本文只解决"信号是否值得做"这一件事。


下一步行动

如果你想亲手实现本文的过滤器流水线

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key,设置环境变量 TICKDB_API_KEY
  3. 复制本文代码,填入你的标的和突破价位,运行验证

如果你需要历史全量 K 线数据做回测验证(确保你的阈值参数在历史上有正期望),联系 [email protected] 获取机构级数据方案。

如果你在用 AI 辅助量化开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,可以直接用自然语言查询盘后成交量基准和当前订单簿深度。


风险提示:本文不构成任何投资建议。盘后交易涉及额外的流动性风险和市场操纵风险,历史回测结果不预示未来表现。实际使用本文策略逻辑前,请充分理解回测局限性,并在模拟盘环境中完成至少 30 天的参数校准。