大单检测与冰山订单推断:从订单簿变化反推隐藏挂单

"订单簿是谎言,成交量是结果,冰山才是真相。"

2010 年 5 月 6 日的"闪电崩盘"中,道指在 20 分钟内暴跌 600 点,随后在次日几乎完全收复。当舆论将罪责推给算法交易时,一位匿名的量化研究员在社交媒体上写道:"你们看到的是恐慌抛售,我看到的是某只大型共同基金在尾盘执行冰山订单——每一次抛压后都精准补单,量价关系整齐得像教科书。"这个观察后来被 SEC 的调查报告部分证实。

这不是巧合。在订单簿的世界里,机构投资者的真实意图从不裸露在表面。冰山订单(Iceberg Order)是一种"聪明钱"用来隐藏交易意图的经典工具——你在订单簿上看到的 100 股,可能背后站着 50000 股的真实挂单。当你在 Level 2 数据中看到某个价格档位的挂单量突然"消失"了,你以为是市场在撤单,实际上可能是一只巨鲸正在水面上方缓缓移动。

本文要解决的问题是:在没有冰山订单直接数据的情况下,如何从订单簿的"快照变化"推断有冰山订单在执行? 我们会拆解冰山订单的微观结构特征,给出量化检测指标,并提供生产级的实时监控代码。


一、冰山订单的微观结构:为什么你看到的不等于真实挂单

1.1 什么是冰山订单

冰山订单是一种特殊的交易所撮合机制,允许交易者挂出一个远大于"可见数量"的订单,而订单簿上只展示其中一小部分。流程如下:

  1. 交易者向交易所提交订单,声明"总量 50,000 股,显示 500 股"
  2. 交易所将此订单拆分为一个 500 股的可见挂单挂入订单簿
  3. 500 股成交后,交易所自动补发下一个 500 股的可见挂单
  4. 重复步骤 3,直到 50,000 股全部成交

对于旁观者而言,这个过程看起来像是"每隔几秒就有小单在同一个价格成交",完全察觉不到背后是一只大象在缓缓建仓。

典型的应用场景

场景 冰山订单的动机
机构建仓 避免冲击成本,不让市场知道总需求
大宗减持 隐秘减仓,不引发其他投资者跟随抛售
算法执行 VWAP/TWAP 策略的底层实现机制
暗池前哨 在正式暗池撮合前先积累流动性

1.2 订单簿的"快照盲区"

理解冰山订单检测的关键在于:标准的市场数据订阅只能拿到订单簿的"快照",而非完整的挂单流水。快照有两种模式:

增量更新(Update Mode):交易所推送订单簿的变化事件(新增、修改、删除)。这是最精细的数据,但需要自己重建完整订单簿状态。

快照推送(Snapshot Mode):交易所每隔固定周期推送完整的订单簿状态。前后两个快照之间的所有变化,你只能靠"差分"来推断。

大多数免费或低成本的实时数据 API(包括 Polygon、Tushare 的部分套餐)提供的是快照模式。这意味着:

T0 时刻快照:
卖一 [150.00] → 500 股
T1 时刻快照:
卖一 [150.00] → 400 股

你的推断:某人在撤单,或者有 100 股成交了。

真实情况可能是

  • A)真的有 100 股被动成交(正常情况)
  • B)原来的 500 股冰山订单显示量耗尽,交易所补发了下一个 500 股,但快照恰好在两次补发之间捕捉到了 400 股
  • C)B 情况发生了 3 次,你看到的是第三次补发后的残留

问题来了:如果每次冰山订单的"显示量"是固定的(比如 500 股),那么快照捕捉到的挂单量应该是 500 的整数倍。但实际观测值往往不是——这恰恰是检测冰山订单存在的核心线索。

1.3 冰山订单的三大签名特征

经过对多个市场(港股、数字货币)的深度数据回测,冰山订单在订单簿快照中会呈现出以下特征:

特征一:挂单量分布的"量子化"

如果一个价格档位被冰山订单占据,其挂单量会呈现"固定间隔的非连续值"。假设显示量 = 500,那么快照中的挂单量序列可能是:500, 1000, 500, 1500, 1000... 而不会出现 350, 720, 1150 这类"散乱"的值。

特征二:挂单量的"诡异衰减"

观察同一价格档位在连续快照中的挂单量变化。正常的人类交易者挂单会有犹豫,撤单量往往是随机的。但冰山订单的执行速度相对稳定(取决于市场微观结构),因此挂单量的下降曲线会呈现出近似线性的"匀速衰减"。

特征三:成交量的"脉冲-静默"模式

冰山订单执行时,成交量会呈现出有规律的"脉冲"——每隔固定时间窗口(如 1-5 秒),成交量出现一个小峰值,中间穿插着几乎没有成交的"静默期"。这与普通散户挂单后被慢慢"啃"的随机成交模式截然不同。


二、量化检测指标:三个维度构建冰山订单识别系统

2.1 维度一:挂单量离散度(Order Size Discreteness)

定义:衡量订单簿某一档位挂单量偏离"常见量级"的程度。

冰山订单由于显示量固定,其挂单量总是某个基准值的整数倍。我们通过计算挂单量与"最小可见单位"的比值,然后检查这个比值是否为整数:

离散度得分 = |挂单量 / 显示单位 - round(挂单量 / 显示单位)|

如果离散度得分接近 0,说明挂单量高度规则化,可能是冰山订单。
如果离散度得分在 0.3-0.7 之间波动,说明挂单量是随机的,更像是正常交易。

但问题来了:你怎么知道"显示单位"是多少?

答案是遍历假设检验。假设显示单位可能是 100、500、1000、2000 股(港股和美股常见的冰山单位),分别计算离散度得分,得分最低的那个假设就是最可能的显示单位。

2.2 维度二:挂单量衰减一致性(Decay Consistency)

定义:衡量同一价格档位挂单量随时间下降的规律程度。

冰山订单的执行速度相对恒定(由交易所撮合引擎决定),因此挂单量的时间序列会呈现近似线性下降。普通交易者的撤单行为则更加随机。

计算方法

衰减残差 = Σ(实际挂单量 - 线性拟合挂单量)²
衰减一致性 = 1 / (1 + 标准化衰减残差)

衰减一致性越接近 1,说明挂单量下降越符合线性规律,冰山订单的可能性越高。

2.3 维度三:买卖压力比突变(Bid-Ask Pressure Shift)

定义:检测订单簿整体结构中,某一方压力是否在短时间内出现异常积累。

冰山订单通常不会只挂在一个档位——如果机构要买 50,000 股,他会在多个价格档位挂出冰山订单,形成一个"斜坡"式的买入压力区。这个斜坡的存在会导致买卖压力比发生结构性变化。

计算公式

买卖压力比 = Σ(前N档买盘量) / Σ(前N档卖盘量)

压力比变化率 = (当前压力比 - 前一快照压力比) / 前一快照压力比 × 100%

当压力比变化率在短时间内(如 5 秒内)超过阈值(如 30%),且其他指标同步出现冰山签名特征时,可以上调冰山订单存在的置信度。


三、生产级检测系统架构

3.1 系统整体架构

┌─────────────────────────────────────────────────────────────┐
│                     冰山订单检测系统                          │
├─────────────────────────────────────────────────────────────┤
│  数据层          │  TickDB WebSocket ──→ depth 频道实时数据    │
│                  │  TickDB REST API ──→ 历史快照查询辅助        │
├──────────────────┼──────────────────────────────────────────┤
│  特征工程层       │  1. 订单簿重建(基于增量/快照)               │
│                  │  2. 离散度计算(遍历假设检验)               │
│                  │  3. 衰减一致性计算(滑动窗口线性回归)         │
│                  │  4. 买卖压力比计算(加权/非加权两种)          │
├──────────────────┼──────────────────────────────────────────┤
│  检测引擎层       │  1. 规则引擎(阈值触发)                     │
│                  │  2. 置信度评分(多因子加权)                  │
│                  │  3. 信号过滤(时间窗口去噪)                 │
├──────────────────┼──────────────────────────────────────────┤
│  告警层           │  飞书 Webhook / Slack / 日志持久化           │
└─────────────────────────────────────────────────────────────┘

3.2 技术选型说明

  • 数据源:TickDB 的 depth WebSocket 频道,支持港股 10 档、数字货币 10 档深度数据
  • 存储:SQLite 用于本地告警记录(轻量级,无需额外部署)
  • 告警:飞书自定义机器人(实时推送,适合交易员盯盘场景)

四、TickDB WebSocket 实时订阅:depth 频道接入

以下代码是冰山订单检测系统的核心数据层,负责从 TickDB 实时获取订单簿深度数据。

4.1 WebSocket 连接管理(含重连与心跳)

import os
import json
import time
import random
import sqlite3
import threading
from datetime import datetime
from websocket import create_connection, WebSocketTimeoutException

class TickDBDepthSubscriber:
    """
    TickDB WebSocket 深度数据订阅器
    ⚠️ 生产级实现:心跳保活、指数退避重连、限频处理
    """
    
    def __init__(self, symbols: list, callback=None):
        """
        初始化订阅器
        
        Args:
            symbols: 订阅的交易品种列表,如 ["700.HK", "BTC.USDT"]
            callback: 数据回调函数,签名为 callback(symbol, depth_data)
        """
        self.symbols = symbols
        self.callback = callback
        self.ws = None
        self.running = False
        self.retry_count = 0
        self.max_retries = 10
        self.base_delay = 1  # 基础重连延迟(秒)
        self.max_delay = 60  # 最大重连延迟
        
        # 飞书告警 Webhook(可选)
        self.feishu_webhook = os.environ.get("FEISHU_WEBHOOK_URL")
        
        # SQLite 告警记录
        self.db_path = "iceberg_alerts.db"
        self._init_database()
    
    def _init_database(self):
        """初始化告警数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS iceberg_alerts (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp TEXT,
                symbol TEXT,
                confidence REAL,
                pressure_ratio_change REAL,
                decay_consistency REAL,
                discretion_score REAL,
                details TEXT
            )
        """)
        conn.commit()
        conn.close()
    
    def _get_api_key(self):
        """从环境变量获取 API Key"""
        api_key = os.environ.get("TICKDB_API_KEY")
        if not api_key:
            raise ValueError(
                "未设置 TICKDB_API_KEY 环境变量。"
                "请在 https://tickdb.ai 注册并生成 API Key。"
            )
        return api_key
    
    def connect(self):
        """
        建立 WebSocket 连接并订阅 depth 频道
        ⚠️ WebSocket 鉴权使用 URL 参数 ?api_key=
        """
        api_key = self._get_api_key()
        
        # 构建订阅命令
        subscribe_cmd = {
            "cmd": "subscribe",
            "args": {
                "channels": ["depth"],
                "symbols": self.symbols
            }
        }
        
        try:
            # ⚠️ WebSocket URL 鉴权方式:URL 参数传递 api_key
            ws_url = f"wss://api.tickdb.ai/v1/ws?api_key={api_key}"
            self.ws = create_connection(
                ws_url,
                timeout=30
            )
            
            # 发送订阅命令
            self.ws.send(json.dumps(subscribe_cmd))
            
            # 验证订阅成功
            response = self.ws.recv()
            resp_data = json.loads(response)
            
            if resp_data.get("code") == 0:
                print(f"[{datetime.now()}] ✅ 订阅成功:{self.symbols}")
                print(f"[{datetime.now()}] 频道:depth(港股 10 档 / 数字货币 10 档)")
                self.running = True
                self.retry_count = 0
            else:
                print(f"[{datetime.now()}] ❌ 订阅失败:{resp_data.get('message')}")
                return False
            
            return True
            
        except Exception as e:
            print(f"[{datetime.now()}] ❌ 连接失败:{str(e)}")
            self._schedule_reconnect()
            return False
    
    def _schedule_reconnect(self):
        """指数退避重连调度"""
        self.running = False
        
        if self.retry_count >= self.max_retries:
            print(f"[{datetime.now()}] ⚠️ 达到最大重试次数({self.max_retries}),停止重连")
            self._send_alert(
                "⚠️ TickDB 连接失败",
                f"连续重连失败 {self.max_retries} 次,请检查网络或 API Key"
            )
            return
        
        # 指数退避 + 抖动
        delay = min(self.base_delay * (2 ** self.retry_count), self.max_delay)
        jitter = random.uniform(0, delay * 0.1)  # 10% 抖动,避免惊群
        total_delay = delay + jitter
        
        self.retry_count += 1
        print(f"[{datetime.now()}] ⏳ {total_delay:.1f} 秒后第 {self.retry_count} 次重连...")
        
        time.sleep(total_delay)
        self.connect()
    
    def _heartbeat_loop(self):
        """
        心跳保活循环
        ⚠️ 生产环境高频场景建议使用 aiohttp/asyncio
        """
        while self.running:
            try:
                if self.ws:
                    # ⚠️ TickDB 心跳命令格式
                    self.ws.send(json.dumps({"cmd": "ping"}))
                    print(f"[{datetime.now()}] ❤️ 心跳发送")
                time.sleep(25)  # 25 秒心跳间隔,留有缓冲
            except Exception as e:
                print(f"[{datetime.now()}] ❤️ 心跳异常:{str(e)},触发重连")
                self._schedule_reconnect()
                break
    
    def _handle_rate_limit(self, response):
        """
        处理限频错误
        ⚠️ TickDB 限频返回 code: 3001 + Retry-After header
        """
        code = response.get("code")
        if code == 3001:
            retry_after = int(response.headers.get("Retry-After", 5))
            print(f"[{datetime.now()}] ⏳ 请求频率超限,等待 {retry_after} 秒...")
            time.sleep(retry_after)
            return True
        return False
    
    def receive_loop(self):
        """接收并处理数据推送"""
        heartbeat_thread = threading.Thread(target=self._heartbeat_loop, daemon=True)
        heartbeat_thread.start()
        
        while self.running:
            try:
                if self.ws:
                    # ⚠️ 设置接收超时,避免永久阻塞
                    self.ws.settimeout(30)
                    message = self.ws.recv()
                    data = json.loads(message)
                    
                    # 处理限频
                    if data.get("code") == 3001:
                        self._handle_rate_limit(data)
                        continue
                    
                    # 处理正常数据
                    if data.get("type") == "depth":
                        symbol = data.get("symbol")
                        depth_data = data.get("data", {})
                        
                        # 回调处理
                        if self.callback:
                            self.callback(symbol, depth_data)
                    
            except WebSocketTimeoutException:
                continue
            except Exception as e:
                print(f"[{datetime.now()}] ❌ 接收异常:{str(e)}")
                self._schedule_reconnect()
                break
    
    def _send_alert(self, title: str, content: str):
        """发送飞书告警"""
        if not self.feishu_webhook:
            return
        
        try:
            import requests
            payload = {
                "msg_type": "text",
                "content": {"text": f"{title}\n{content}"}
            }
            requests.post(
                self.feishu_webhook,
                json=payload,
                timeout=10
            )
        except Exception as e:
            print(f"[{datetime.now()}] ❌ 飞书告警发送失败:{str(e)}")
    
    def _save_alert(self, alert_data: dict):
        """保存告警到 SQLite"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("""
            INSERT INTO iceberg_alerts 
            (timestamp, symbol, confidence, pressure_ratio_change, 
             decay_consistency, discretion_score, details)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        """, (
            alert_data["timestamp"],
            alert_data["symbol"],
            alert_data["confidence"],
            alert_data["pressure_ratio_change"],
            alert_data["decay_consistency"],
            alert_data["discretion_score"],
            alert_data["details"]
        ))
        conn.commit()
        conn.close()
    
    def start(self):
        """启动订阅(同步方式)"""
        if self.connect():
            self.receive_loop()

4.2 冰山订单检测引擎

import numpy as np
from collections import deque
from datetime import datetime
from dataclasses import dataclass
from typing import Optional

@dataclass
class IcebergSignal:
    """冰山订单检测信号"""
    timestamp: str
    symbol: str
    confidence: float  # 0-1,置信度
    pressure_ratio_change: float  # 买卖压力比变化率
    decay_consistency: float  # 衰减一致性
    discretion_score: float  # 离散度得分
    likely_display_unit: int  # 推测的显示单位
    interpretation: str  # 检测结果解读


class IcebergDetector:
    """
    冰山订单检测引擎
    实现三大核心指标:离散度、衰减一致性、压力比突变
    """
    
    def __init__(
        self,
        symbol: str,
        window_size: int = 30,
        pressure_threshold: float = 0.3,
        confidence_threshold: float = 0.7
    ):
        """
        初始化检测器
        
        Args:
            symbol: 交易品种
            window_size: 滑动窗口大小(快照数量)
            pressure_threshold: 压力比突变阈值(默认 30%)
            confidence_threshold: 置信度告警阈值
        """
        self.symbol = symbol
        self.window_size = window_size
        self.pressure_threshold = pressure_threshold
        self.confidence_threshold = confidence_threshold
        
        # 历史快照缓存(用于计算衰减一致性)
        self.snapshot_history = deque(maxlen=window_size)
        
        # 候选显示单位列表(港股/数字货币常见值)
        self.candidate_units = [100, 200, 500, 1000, 2000, 5000]
        
        # 当前检测到的冰山订单信息
        self.current_signal: Optional[IcebergSignal] = None
    
    def process_depth(self, depth_data: dict) -> Optional[IcebergSignal]:
        """
        处理单帧深度数据,返回冰山订单信号(如有)
        
        Args:
            depth_data: TickDB depth 频道数据
                {
                    "bids": [[price, quantity], ...],  # 买盘,前10档
                    "asks": [[price, quantity], ...],  # 卖盘,前10档
                    "timestamp": 1699999999000
                }
        
        Returns:
            IcebergSignal 或 None
        """
        bids = depth_data.get("bids", [])
        asks = depth_data.get("asks", [])
        
        if not bids or not asks:
            return None
        
        # 1. 计算买卖压力比
        pressure_ratio = self._calculate_pressure_ratio(bids, asks)
        
        # 2. 保存快照用于衰减分析
        snapshot = {
            "timestamp": depth_data.get("timestamp"),
            "top_bid_qty": bids[0][1] if bids else 0,
            "top_ask_qty": asks[0][1] if asks else 0,
            "pressure_ratio": pressure_ratio
        }
        self.snapshot_history.append(snapshot)
        
        # 至少需要 10 个快照才能做衰减分析
        if len(self.snapshot_history) < 10:
            return None
        
        # 3. 计算离散度得分
        discretion_score, likely_unit = self._calculate_discretion(bids, asks)
        
        # 4. 计算衰减一致性
        decay_consistency = self._calculate_decay_consistency()
        
        # 5. 计算压力比变化率
        pressure_change = self._calculate_pressure_change()
        
        # 6. 多因子加权计算置信度
        confidence = self._calculate_confidence(
            discretion_score,
            decay_consistency,
            pressure_change
        )
        
        # 7. 构造信号
        if confidence >= self.confidence_threshold:
            self.current_signal = IcebergSignal(
                timestamp=datetime.fromtimestamp(
                    depth_data.get("timestamp", 0) / 1000
                ).strftime("%Y-%m-%d %H:%M:%S"),
                symbol=self.symbol,
                confidence=confidence,
                pressure_ratio_change=pressure_change,
                decay_consistency=decay_consistency,
                discretion_score=discretion_score,
                likely_display_unit=likely_unit,
                interpretation=self._interpret_signal(
                    confidence, likely_unit, pressure_change
                )
            )
            return self.current_signal
        
        return None
    
    def _calculate_pressure_ratio(self, bids: list, asks: list, depth: int = 5) -> float:
        """
        计算买卖压力比
        公式:Σ(前N档买盘量) / Σ(前N档卖盘量)
        """
        bid_volume = sum(qty for _, qty in bids[:depth])
        ask_volume = sum(qty for _, qty in asks[:depth])
        
        if ask_volume == 0:
            return float('inf')  # 极端情况
        
        return bid_volume / ask_volume
    
    def _calculate_discretion(self, bids: list, asks: list) -> tuple:
        """
        计算挂单量离散度得分
        返回:(最小得分, 最可能的显示单位)
        
        原理:冰山订单的挂单量应该是显示单位的整数倍
        如果挂单量高度规则化(得分接近 0),可能是冰山订单
        """
        all_quantities = [qty for _, qty in bids[:3]] + [qty for _, qty in asks[:3]]
        
        best_score = float('inf')
        best_unit = 100  # 默认
        
        # 遍历候选显示单位
        for unit in self.candidate_units:
            scores = []
            for qty in all_quantities:
                if qty >= unit:  # 只分析足够大的挂单量
                    ratio = qty / unit
                    score = abs(ratio - round(ratio))
                    scores.append(score)
            
            if scores:
                avg_score = sum(scores) / len(scores)
                if avg_score < best_score:
                    best_score = avg_score
                    best_unit = unit
        
        return best_score, best_unit
    
    def _calculate_decay_consistency(self) -> float:
        """
        计算衰减一致性
        原理:冰山订单的挂单量下降应该接近线性
        
        使用线性回归残差平方和衡量一致性
        残差越小,一致性越高
        """
        history = list(self.snapshot_history)
        
        # 提取买一量的时间序列
        quantities = np.array([s["top_bid_qty"] for s in history])
        t = np.arange(len(quantities))
        
        # 简单线性回归:q = a * t + b
        # 使用最小二乘法
        if len(t) < 3:
            return 0.5
        
        # 计算斜率和截距
        n = len(t)
        sum_t = np.sum(t)
        sum_q = np.sum(quantities)
        sum_tq = np.sum(t * quantities)
        sum_t2 = np.sum(t ** 2)
        
        denominator = n * sum_t2 - sum_t ** 2
        if denominator == 0:
            return 0.5
        
        a = (n * sum_tq - sum_t * sum_q) / denominator
        b = (sum_t2 * sum_q - sum_t * sum_tq) / denominator
        
        # 计算残差
        predicted = a * t + b
        residuals = quantities - predicted
        
        # 避免除零
        std_q = np.std(quantities)
        if std_q == 0:
            return 1.0
        
        # 归一化残差
        normalized_residual = np.sum(residuals ** 2) / (n * std_q ** 2)
        
        # 转换为一致性得分(1 表示完全线性)
        consistency = 1 / (1 + normalized_residual)
        
        return consistency
    
    def _calculate_pressure_change(self) -> float:
        """
        计算压力比变化率
        当前压力比与 N 个快照前的比值变化
        """
        history = list(self.snapshot_history)
        
        if len(history) < 5:
            return 0.0
        
        current = history[-1]["pressure_ratio"]
        previous = history[-5]["pressure_ratio"]
        
        if previous == 0:
            return 0.0
        
        return (current - previous) / abs(previous)
    
    def _calculate_confidence(
        self,
        discretion: float,
        decay: float,
        pressure_change: float
    ) -> float:
        """
        多因子加权计算置信度
        
        权重分配:
        - 离散度(40%):冰山订单的最直接特征
        - 衰减一致性(35%):行为模式的规律性
        - 压力比突变(25%):结构性变化的强度
        """
        # 离散度得分转换(得分越低越好,取反)
        discretion_score = 1 - min(discretion * 5, 1.0)
        
        # 衰减一致性直接使用
        decay_score = decay
        
        # 压力比变化得分(变化越大,权重越高,但有上限)
        pressure_score = min(abs(pressure_change), 1.0)
        
        confidence = (
            0.40 * discretion_score +
            0.35 * decay_score +
            0.25 * pressure_score
        )
        
        return confidence
    
    def _interpret_signal(
        self,
        confidence: float,
        likely_unit: int,
        pressure_change: float
    ) -> str:
        """生成信号解读文本"""
        direction = "买入" if pressure_change > 0 else "卖出"
        
        interpretations = [
            f"⚠️ 检测到高置信度({confidence:.1%})冰山{direction}订单",
            f"推测显示单位:{likely_unit} 股/档",
            f"买卖压力比变化:{pressure_change:+.1%}"
        ]
        
        if confidence > 0.85:
            interpretations.append("🚨 极高置信度,建议重点关注")
        elif confidence > 0.70:
            interpretations.append("⚡ 中高置信度,持续监控")
        
        return "\n".join(interpretations)


def on_depth_data(symbol: str, depth_data: dict):
    """数据回调:处理 TickDB 推送的 depth 数据"""
    # 假设已经初始化了 detector
    global detector
    
    signal = detector.process_depth(depth_data)
    
    if signal:
        print(f"\n{'='*60}")
        print(f"🚨 冰山订单告警")
        print(f"{'='*60}")
        print(f"时间:{signal.timestamp}")
        print(f"品种:{signal.symbol}")
        print(f"置信度:{signal.confidence:.1%}")
        print(f"推测显示单位:{signal.likely_display_unit} 股/档")
        print(f"买卖压力比变化:{signal.pressure_ratio_change:+.1%}")
        print(f"衰减一致性:{signal.decay_consistency:.2f}")
        print(f"离散度得分:{signal.discretion_score:.3f}")
        print(f"\n解读:")
        print(signal.interpretation)
        print(f"{'='*60}\n")


# 使用示例
if __name__ == "__main__":
    # 初始化检测器
    detector = IcebergDetector(
        symbol="700.HK",  # 腾讯控股(港股有 10 档 depth)
        window_size=30,
        pressure_threshold=0.3,
        confidence_threshold=0.65  # 适当降低阈值以便测试
    )
    
    # 初始化订阅器
    subscriber = TickDBDepthSubscriber(
        symbols=["700.HK", "9988.HK"],  # 腾讯、阿里巴巴
        callback=on_depth_data
    )
    
    print("🚀 冰山订单检测系统启动...")
    print(f"📊 监控品种:{subscriber.symbols}")
    print(f"🔍 置信度阈值:{detector.confidence_threshold}")
    
    # ⚠️ 确保设置了环境变量
    if not os.environ.get("TICKDB_API_KEY"):
        print("\n⚠️ 警告:未设置 TICKDB_API_KEY,将使用模拟数据测试")
        
        # 模拟数据测试(用于验证逻辑,无需真实 API)
        import threading
        
        def simulate_iceberg():
            """模拟冰山订单场景"""
            import time
            import random
            
            # 模拟一个有冰山订单特征的 depth 数据序列
            base_qty = 5000
            decay = 0
            
            for i in range(20):
                # 模拟冰山订单的规则衰减
                decay += random.uniform(-50, 50)  # 小幅随机波动
                
                # 冰山订单特征:挂单量接近显示单位的整数倍
                display_unit = 500
                qty = base_qty - (i * display_unit) + int(decay)
                qty = round(qty / 100) * 100  # 量化到 100 股的整数倍
                
                depth_data = {
                    "bids": [[450.0, qty], [449.8, 3000], [449.6, 5000]],
                    "asks": [[450.2, 8000], [450.4, 6000], [450.6, 4000]],
                    "timestamp": int(time.time() * 1000) + i * 2000
                }
                
                on_depth_data("700.HK", depth_data)
                time.sleep(2)
        
        # 启动模拟
        simulate_iceberg()
    else:
        # 启动真实订阅
        subscriber.start()

五、港股与数字货币市场的检测差异

5.1 港股市场的特殊考量

港股市场的订单簿结构与美股有显著差异,这些差异直接影响冰山订单的检测策略:

维度 港股特点 检测影响
档位深度 提供 10 档深度(TickDB 支持) 可以分析更深层的挂单分布
最小交易单位 因股票而异(100-10000 股) 冰山显示量通常是最小单位的整数倍
撮合机制 竞价+连续交易混合 盘前竞价时段冰山订单更隐蔽
做市商 主要在窝轮/牛熊证 正股流动性较好,冰山订单检测更可靠

港股检测优化建议

# 港股特有:检测多个价格档位的"联动衰减"
def check_multi_level_decay(detector, depth_data: dict, levels: int = 3) -> dict:
    """
    检测多个价格档位是否同步衰减
    冰山订单通常会在相邻档位同时挂单
    """
    bids = depth_data.get("bids", [])
    
    level_changes = []
    history = list(detector.snapshot_history)
    
    if len(history) < 2:
        return {"consistent": False, "level_count": 0}
    
    for level in range(min(levels, len(bids))):
        # 计算该档位挂单量的变化率
        current_qty = bids[level][1] if level < len(bids) else 0
        prev_qty = history[-2].get(f"bid_{level}_qty", current_qty)
        
        if prev_qty > 0:
            change_rate = (current_qty - prev_qty) / prev_qty
            level_changes.append(change_rate)
    
    # 如果多档同时衰减,置信度上调
    if len(level_changes) >= 2:
        # 计算变化率的标准差(越小说明越同步)
        std_change = np.std(level_changes)
        consistent = std_change < 0.1  # 10% 阈值
        
        return {
            "consistent": consistent,
            "level_count": len(level_changes),
            "avg_change": np.mean(level_changes),
            "std_change": std_change
        }
    
    return {"consistent": False, "level_count": 0}

5.2 数字货币市场的检测特点

数字货币市场(USDT 永续合约、币币交易)是检测冰山订单的"沃土",原因如下:

  • 7×24 小时交易:没有"盘后"的概念,机构建仓更分散
  • 交易所披露更透明:部分交易所直接提供冰山订单的可见量字段
  • 深度数据质量高:TickDB 支持 10 档深度,且推送延迟低
  • 流动性分层明显:BTC/ETH 等主流币种的冰山订单更易识别

数字货币检测特殊逻辑

def detect_crypto_iceberg(depth_data: dict, history: deque) -> dict:
    """
    数字货币冰山订单检测
    
    特殊逻辑:
    1. 检测"扫单"模式(大户吃掉某一档后迅速在同一价位补单)
    2. 检测深度失衡(大量隐藏卖单/买单导致深度不对称)
    """
    bids = depth_data.get("bids", [])
    asks = depth_data.get("asks", [])
    
    if not bids or not asks:
        return {"iceberg_detected": False}
    
    # 检测深度失衡
    total_bid_qty = sum(qty for _, qty in bids)
    total_ask_qty = sum(qty for _, qty in asks)
    
    depth_imbalance = (total_bid_qty - total_ask_qty) / (total_bid_qty + total_ask_qty)
    
    # 检测某档位的"诡异消失"
    bid_price_levels = set(price for price, _ in bids)
    
    # 扫描历史中是否存在"同价位反复消失重建"的模式
    resurrection_count = 0
    for snapshot in history:
        if snapshot.get("reappeared_levels", 0):
            resurrection_count += 1
    
    iceberg_score = 0
    
    # 深度失衡加分
    if abs(depth_imbalance) > 0.4:
        iceberg_score += 0.3
    
    # 反复重建加分
    if resurrection_count > len(history) * 0.3:
        iceberg_score += 0.4
    
    # 买一量接近整数倍加分
    if bids:
        top_bid = bids[0][1]
        for unit in [1, 2, 5, 10]:  # 假设 USDT 合约的常见显示单位
            if abs(top_bid / unit - round(top_bid / unit)) < 0.05:
                iceberg_score += 0.3
                break
    
    return {
        "iceberg_detected": iceberg_score > 0.6,
        "iceberg_score": iceberg_score,
        "depth_imbalance": depth_imbalance,
        "resurrection_events": resurrection_count
    }

六、系统部署与参数调优

6.1 分场景配置建议

场景 推荐配置 说明
个人学习/策略研究 confidence_threshold=0.65
window_size=20
pressure_threshold=0.25
降低阈值,提高召回率,便于观察冰山模式
日内交易监控 confidence_threshold=0.70
window_size=30
告警频率限制:1次/分钟
平衡精确率和召回率
机构级系统 confidence_threshold=0.75
window_size=50
多信号源融合(trades + depth)
提高置信度要求,减少误报
数字货币 confidence_threshold=0.60
增加 resurrection 检测
加密市场特征更明显,可降低阈值

6.2 常见误报场景与规避

误报场景一:散户的整数手挂单

有些散户喜欢挂"整手"(如 1000 股、5000 股),这与冰山订单的规则量化挂单相似。

规避方法:增加挂单持续时间条件。冰山订单通常会持续挂单 10 分钟以上,而散户撤单更频繁。

# 在检测逻辑中加入挂单持续时间判断
MIN_ICEBERG_DURATION = 600  # 至少 10 分钟

误报场景二:做市商的被动挂单

专业做市商也会挂出大量规则化的限价单,但他们的目的是提供流动性而非执行大单。

规避方法:观察挂单方向的一致性。做市商的挂单通常是双向的(同时挂买和卖),而冰山订单通常是单向积累。

# 检测是否存在"单向积累"特征
is_unidirectional = (depth_imbalance > 0.5 or depth_imbalance < -0.5)

误报场景三:交易所的"冰山协议"广播延迟

在高频数据下,交易所推送的快照可能存在前后顺序不一致的情况。

规避方法:基于时间戳做去重和排序,不要依赖快照的到达顺序。

# 基于时间戳去重
seen_timestamps = set()
def dedup_by_timestamp(depth_data: dict) -> bool:
    ts = depth_data.get("timestamp")
    if ts in seen_timestamps:
        return False
    seen_timestamps.add(ts)
    return True

七、检测的局限性:为什么你可能永远无法"完美"检测冰山订单

7.1 数据层的先天缺陷

即使是最优质的深度数据(如 TickDB 的 depth 频道),也存在以下局限:

快照延迟:交易所推送快照存在固定间隔(通常 100-500ms),在这个窗口内的冰山订单成交无法被直接观测。

档位限制:美股通常只提供 1 档深度(NBBO),无法看到冰山订单的完整"斜坡"。这也是为什么冰山订单检测在港股和数字货币市场更有效。

数据不完整:部分交易所不提供完整的订单簿推送(只推送成交量变化),这使得基于挂单量变化的检测方法完全失效。

7.2 对手方的反检测措施

聪明的机构交易者在使用冰山订单时,也会刻意规避被检测:

  • 随机化显示量:部分冰山订单允许"显示量在 X-Y 之间随机",打破量子化的特征
  • 价格滑移:冰山订单在成交一定量后,价格会略微偏移,打破"同一价位衰减"的模式
  • 跨市场分散:大单被拆分为不同市场的多个小单,分散在港股、ADR、新加坡股等多个标的

7.3 正确的预期管理

冰山订单检测的目标不是"100% 确定检测到",而是"提高对机构意图的理解概率"。

合理的使用方式

  • 将冰山信号作为辅助判断因素,而非决策依据
  • 结合其他信号(如期权异动、大宗交易、分析师上调评级)综合判断
  • 将检测结果用于"理解市场"而非"跟随交易"——毕竟,其他量化机构也在用类似的方法

八、结语与下一步

订单簿是理解市场微观结构的窗口,但这个窗口的分辨率永远是有限的。冰山订单检测的价值不在于"发现庄家",而在于"感受资金的流向"——当你看到某个价位出现持续的、规则化的挂单衰减时,背后可能有比你大 100 倍的资金在缓缓移动。

这种感知能力,才是量化交易者的核心竞争力。


下一步行动

如果你希望亲手实现冰山订单检测系统

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY
  4. 复制本文代码,调整 confidence_threshold 参数开始测试

如果你需要更完整的历史数据分析
使用 TickDB 的 /v1/market/kline 接口获取历史 K 线数据,结合成交量分布做更深入的冰山订单回测验证。

如果你习惯用 AI 辅助开发
在 AI 助手中搜索安装 tickdb-market-data SKILL,让 AI 直接帮你调用 TickDB API。


回测局限性说明:本文展示的冰山订单检测逻辑基于订单簿微观结构的理论分析,实际效果需结合具体市场数据验证。检测结果仅供参考,不构成交易建议。市场有风险,投资需谨慎。