从逐笔成交反推做市商行为:美股高频做市策略入门

价格是结果,订单流才是原因

2024 年某交易日的上午 10:23 分,某大型科技期权的隐含波动率在 3 秒内飙升 15%,而标的资产的价格几乎纹丝不动。如果你只看价格,你会错过这个信号。但如果你有逐笔成交数据,你会看到:连续的 100-200 股小单以略高于卖一的价格成交,持续时间 2.7 秒,总成交量 4,800 股——这不是散户的随机交易,而是典型的期权套利商在对冲行为。

这就是高频做市策略研究的核心问题:订单流是价格变动的先行指标

本文拆解从逐笔成交数据(tick data)中识别做市商行为的技术路径。我们会讨论做市商的撮合逻辑、tick 方向推断方法、以及如何通过库存偏移度量化做市商的被动程度。代码示例以港股和数字货币市场为主,方法论同样适用于美股。

重要说明:TickDB 的 trades 接口当前不支持美股和 A 股市场。若你专注于美股逐笔数据,需对接其他数据供应商。对于港股、数字货币等市场,TickDB 提供完整的逐笔成交和订单簿深度数据,可直接用于本文所述策略的研究与回测。


一、做市商的本质:双边报价的库存博弈

1.1 做市商不是“庄家”

讨论做市商行为前,必须澄清一个常见误解:做市商不是拥有信息优势、资金控盘的主力资金,而是承担流动性风险并从中获取价差的被动报价者

做市商的核心义务是:持续提供买价(bid)和卖价(offer),承诺在任何时刻以报价成交至少一定数量的股票。这个义务换来的权利是:买卖价差(bid-ask spread)。

盈利公式:
做市商利润 = 价差收入 - 逆向选择损失 - 库存成本
  • 逆向选择损失:做市商刚报价,就遇到价格向不利方向移动,说明对手掌握了私人信息
  • 库存成本:持仓量过大时,需要承担股价下行风险,可能被迫在不利价位平仓

1.2 订单簿中的做市商痕迹

在 Level 2 行情中,你通常能看到这样的结构:

档位 买量 买价 卖价 卖量
1 12,500 150.02 150.03 15,800
2 8,200 150.01 150.04 6,400
3 5,600 150.00 150.05 9,200

在这个例子中:

  • 买卖价差 = 150.03 - 150.02 = 0.01 美元(1 美分)
  • 中间价 = (150.02 + 150.03) / 2 = 150.025
  • 深度不平衡:卖方深度(15,800 + 6,400 + 9,200 = 31,400)远大于买方深度(26,300)

这种不对称性通常反映两类参与者:

  1. 被动型做市商:愿意在买方挂单,承担库存多头风险
  2. 对冲型机构:在卖出大量股票后,通过买入 ETF 或期货对冲

二、逐笔数据的关键字段与解读

2.1 tick 数据的标准结构

一条完整的逐笔成交记录包含以下字段:

字段 说明 策略意义
timestamp 成交时间戳(毫秒级) 计算交易频率、检测加速/减速
price 成交价格 判断成交是否在买卖价差内
volume 成交量(股数/币数) 区分大单和小单
side 成交方向(主动买/主动卖) 核心字段,判断买入压力还是卖出压力
order_id 订单编号(若有) 用于追踪订单生命周期
tick_direction 价格变动方向(+0/-/N) 标注是否相对于上一笔成交涨价、跌价或不变

2.2 tick 方向的三种状态

ticker_direction(tick 方向)是识别短期供需的重要指标:

含义 微观解读
+0 价格相对于上一笔上涨 主动买盘推动,供需暂时偏多
0 价格与上一笔持平 买卖力量均衡,等待方向
-0 价格相对于上一笔下跌 主动卖盘推动,供需暂时偏空

一个关键观察:连续出现多个 +0 且成交量较小,说明买方在被动吸收卖压但未能推动价格上涨——这往往是做市商在高价位持续挂单承接的信号。

2.3 成交价格在价差内的判定

通过对比成交价与买卖价差的位置,可以判断这笔交易是否对做市商不利:

def classify_tick_side(tick_price, bid, ask):
    """
    判定成交是否对做市商有利
    
    Args:
        tick_price: 成交价
        bid: 当前买一价
        ask: 当前卖一价
    
    Returns:
        str: 'at_bid'(对做市商有利) / 'at_ask'(对做市商不利) / 'inside'(在价差内)
    """
    spread = ask - bid
    
    if tick_price == bid:
        return 'at_bid'  # 做市商的买方挂单被动成交,赚了价差
    elif tick_price == ask:
        return 'at_ask'  # 做市商的卖方挂单被动成交,赚了价差
    else:
        return 'inside'  # 成交在买卖价差之间,说明有逆向选择
    
def classify_tick_direction(tick_price, prev_price, tick_direction):
    """
    结合 tick 方向和成交位置判断行为模式
    
    Returns:
        str: 行为模式分类
    """
    if tick_direction == '+0' and tick_price == bid:
        return 'passive_buy_aggression'
    elif tick_direction == '-0' and tick_price == ask:
        return 'passive_sell_aggression'
    elif tick_direction == '0' and tick_price < ask and tick_price > bid:
        return 'spread_crossing'
    return 'normal'

三、识别做市商行为的核心指标

3.1 买卖压力比(Buy-Sell Pressure Ratio)

买卖压力比是衡量短期供需失衡的最直接指标:

$$
\text{买卖压力比} = \frac{\sum_{i=1}^{N} \text{主动买入量}i}{\sum{i=1}^{N} \text{主动卖出量}_i}
$$

当买卖压力比 > 1 时,买方力量占优;< 1 时,卖方力量占优;接近 1 时,市场相对均衡。

做市商视角:如果买卖压力比持续偏离 1,但价格未动,说明做市商正在双向挂单吸收流动性。

3.2 订单成交率(Fill Rate)

在高频数据中,订单成交率反映被动程度:

$$
\text{被动成交率} = \frac{\text{被动成交量(等待后成交)}}{\text{总成交量}}
$$

高被动成交率意味着大量订单在被动等待后被“吃掉”,通常是机构或做市商的大单拆小。

3.3 库存偏移度(Inventory Skew)

这是区分不同类型做市商的关键指标。做市商通过调整报价来管理库存:

$$
\text{库存偏移度} = \frac{Q_{bid} - Q_{ask}}{Q_{bid} + Q_{ask}}
$$

库存偏移度 解读
+0.3 ~ +0.5 库存偏多头,做市商预期价格下跌或主动持有空仓
-0.3 ~ -0.5 库存偏空头,做市商预期价格上涨或主动持有多仓
-0.1 ~ +0.1 库存中性,做市商在快速对冲

一个典型场景:当某股票出现利好消息但价格尚未启动时,做市商的库存偏移度可能已经转负(持有多头),这意味着他们在积极买入并相信价格会上涨。

3.4 实际计算示例

以下代码演示如何从 TickDB 的 depth 频道数据计算订单簿不平衡度和库存偏移度:

import os
import json
import time
import asyncio
import aiohttp
import numpy as np
from collections import deque

class OrderBookAnalyzer:
    """基于 TickDB depth 频道的订单簿分析"""
    
    def __init__(self, api_key: str, symbol: str):
        self.api_key = api_key
        self.symbol = symbol
        self.ws_url = f"wss://api.tickdb.ai/ws/v1/market/depth?symbol={symbol}&api_key={api_key}"
        self.order_book = {'bids': {}, 'asks': {}}
        self.history = deque(maxlen=100)  # 保留最近 100 个快照
        self.running = False
        
    async def connect(self):
        """WebSocket 连接,含心跳和重连"""
        session = aiohttp.ClientSession()
        retry_count = 0
        max_retries = 5
        
        while retry_count < max_retries:
            try:
                async with session.ws_connect(
                    self.ws_url,
                    timeout=aiohttp.WSMsgType.PING
                ) as ws:
                    print(f"[{self.symbol}] WebSocket 已连接")
                    self.running = True
                    retry_count = 0  # 重置重试计数
                    
                    # 启动心跳保活任务
                    ping_task = asyncio.create_task(self._send_ping(ws))
                    # 启动消息处理任务
                    receive_task = asyncio.create_task(self._receive(ws))
                    
                    # 等待任一任务完成
                    done, pending = await asyncio.wait(
                        [ping_task, receive_task],
                        return_when=asyncio.FIRST_COMPLETED
                    )
                    
                    # 取消未完成的任务
                    for task in pending:
                        task.cancel()
                    
            except aiohttp.ClientError as e:
                retry_count += 1
                delay = min(30, 2 ** retry_count)  # 指数退避,最大 30 秒
                jitter = np.random.uniform(0, delay * 0.1)  # 抖动避免惊群
                print(f"[{self.symbol}] 连接断开 ({e}),{delay + jitter:.1f} 秒后重试...")
                await asyncio.sleep(delay + jitter)
            finally:
                await session.close()
                
        print(f"[{self.symbol}] 达到最大重试次数,停止连接")
        
    async def _send_ping(self, ws):
        """每 30 秒发送一次 ping 保活"""
        while True:
            await asyncio.sleep(30)
            try:
                await ws.send_json({"cmd": "ping"})
            except Exception as e:
                print(f"[{self.symbol}] Ping 失败: {e}")
                break
                
    async def _receive(self, ws):
        """接收并处理消息"""
        async for msg in ws:
            if msg.type == aiohttp.WSMsgType.ERROR:
                print(f"[{self.symbol}] WebSocket 错误")
                break
            elif msg.type == aiohttp.WSMsgType.TEXT:
                data = json.loads(msg.data)
                if data.get('type') == 'pong':
                    continue  # 心跳响应,跳过
                self._process_depth_update(data)
                
    def _process_depth_update(self, data: dict):
        """处理深度更新,计算库存偏移度"""
        if 'data' not in data:
            return
            
        snapshot = data['data']
        self.order_book['bids'] = {
            float(p): float(v) for p, v in snapshot.get('bids', {}).items()
        }
        self.order_book['asks'] = {
            float(p): float(v) for p, v in snapshot.get('asks', {}).items()
        }
        
        # 计算前 5 档的库存偏移度
        imbalance = self.calculate_imbalance(depth=5)
        skew = self.calculate_inventory_skew(depth=5)
        
        # 记录历史
        self.history.append({
            'timestamp': time.time(),
            'imbalance': imbalance,
            'skew': skew,
            'bid_depth': sum(self.order_book['bids'].values()),
            'ask_depth': sum(self.order_book['asks'].values())
        })
        
    def calculate_imbalance(self, depth: int = 5) -> float:
        """
        计算订单簿不平衡度
        
        imbalance > 0: 买方深度占优
        imbalance < 0: 卖方深度占优
        """
        bid_levels = sorted(self.order_book['bids'].items(), reverse=True)[:depth]
        ask_levels = sorted(self.order_book['asks'].items())[:depth]
        
        bid_vol = sum(v for _, v in bid_levels)
        ask_vol = sum(v for _, v in ask_levels)
        
        if bid_vol + ask_vol == 0:
            return 0.0
            
        return (bid_vol - ask_vol) / (bid_vol + ask_vol)
    
    def calculate_inventory_skew(self, depth: int = 5) -> float:
        """
        计算库存偏移度(模拟做市商的库存偏好)
        
        正值表示偏向买方(预期下跌),负值表示偏向卖方(预期上涨)
        """
        imbalance = self.calculate_imbalance(depth)
        
        # 简单映射:库存偏移度 ≈ 不平衡度(简化模型)
        # 实际模型中需要考虑成交量历史、加权时间等
        return imbalance
    
    def get_market_regime(self, window: int = 20) -> str:
        """
        基于历史数据判断市场状态
        
        Returns:
            'volatile': 高波动,供需失衡严重
            'balanced': 平衡,做市商活跃
            'trend': 单边趋势,跟随方向
        """
        if len(self.history) < window:
            return 'unknown'
            
        recent = list(self.history)[-window:]
        imbalances = [h['imbalance'] for h in recent]
        
        avg_imbalance = np.mean(imbalances)
        std_imbalance = np.std(imbalances)
        
        if std_imbalance > 0.5:
            return 'volatile'
        elif abs(avg_imbalance) < 0.2:
            return 'balanced'
        else:
            return 'trend'


async def main():
    """主函数:演示订单簿分析"""
    api_key = os.environ.get("TICKDB_API_KEY")
    if not api_key:
        raise ValueError("请设置 TICKDB_API_KEY 环境变量")
    
    # 以港股腾讯为例
    analyzer = OrderBookAnalyzer(api_key, "0700.HK")
    
    # 启动分析(后台运行 60 秒)
    analysis_task = asyncio.create_task(analyzer.connect())
    
    # 模拟监控逻辑
    await asyncio.sleep(60)
    analyzer.running = False
    
    # 输出分析结果
    if len(analyzer.history) > 0:
        print("\n=== 60 秒订单簿分析报告 ===")
        imbalances = [h['imbalance'] for h in analyzer.history]
        print(f"平均不平衡度: {np.mean(imbalances):.3f}")
        print(f"不平衡度标准差: {np.std(imbalances):.3f}")
        print(f"市场状态: {analyzer.get_market_regime()}")
    else:
        print("未获取到数据")


if __name__ == "__main__":
    asyncio.run(main())

⚠️ 生产环境提示:上述代码为演示版本。若用于实际策略:

  1. 建议使用 asyncio 异步架构处理多标的并行监控
  2. 深度数据建议存储到本地数据库进行历史回放分析
  3. 高频场景(<1 秒采样)需考虑网络延迟和本地时钟校准

四、从 tick 数据反推做市商策略

4.1 被动型 vs 激进型做市商

通过对成交数据的聚类分析,可以将做市商分为两类:

特征 被动型做市商 激进型做市商
挂单位置 紧贴买一/卖一 远离中间价,主动扫单
成交率 低(挂单常被跳过) 高(主动成交)
库存管理 严格,偏移度低 宽松,接受较大敞口
盈利来源 价差 + 逆向选择 趋势跟随

识别被动型做市商的信号

  • 大量小单(100-300 股)在买一/卖一位置反复成交
  • 价格不变但成交量累积
  • 订单簿深度在某一档位异常集中

识别激进型做市商的信号

  • 大单(>1000 股)以市价单快速成交
  • 成交价格经常穿越买卖价差(inside ticks)
  • 短时间内连续推动价格向同一方向移动

4.2 基于 tick 方向的行为推断

以下是一个简化的做市商行为识别算法:

from collections import defaultdict
from dataclasses import dataclass, field
from typing import List, Dict

@dataclass
class TickRecord:
    timestamp: float
    price: float
    volume: int
    direction: str  # '+0', '0', '-0'
    side: str       # 'buy', 'sell'
    is_inside: bool = False  # 是否在价差内成交

@dataclass
class MakerBehaviorProfile:
    """做市商行为画像"""
    symbol: str
    passive_buy_ratio: float = 0.0   # 被动买入占比
    passive_sell_ratio: float = 0.0  # 被动卖出占比
    aggression_index: float = 0.0    # 攻击指数
    inventory_skew: float = 0.0     # 库存偏移度
    avg_tick_size: float = 0.0       # 平均成交单量
    tick_dominance: str = 'neutral'   # 主导方向
    
    def classify_maker_type(self) -> str:
        """分类做市商类型"""
        if self.aggression_index < 0.3 and abs(self.inventory_skew) < 0.2:
            return 'passive'
        elif self.aggression_index > 0.7:
            return 'aggressive'
        else:
            return 'mixed'


class MakerBehaviorDetector:
    """基于成交数据的做市商行为检测器"""
    
    def __init__(self, symbol: str, inside_threshold: float = 0.0001):
        self.symbol = symbol
        self.inside_threshold = inside_threshold  # 价差内成交的容差
        self.ticks: List[TickRecord] = []
        self.stats = {
            'passive_buy': 0,
            'passive_sell': 0,
            'aggressive_buy': 0,
            'aggressive_sell': 0,
            'inside_buy': 0,
            'inside_sell': 0,
            'plus_ticks': 0,
            'minus_ticks': 0
        }
        
    def process_tick(self, tick: TickRecord, bid: float, ask: float):
        """处理单条 tick,分类行为"""
        self.ticks.append(tick)
        
        mid_price = (bid + ask) / 2
        tick_price = tick.price
        
        # 判断是否在价差内成交
        if bid < tick_price < ask:
            tick.is_inside = True
        elif tick_price <= bid + (ask - bid) * self.inside_threshold:
            tick.is_inside = False
            
        # 基于 tick 方向和成交位置分类
        if tick.direction == '+0':
            self.stats['plus_ticks'] += 1
            if tick.side == 'buy' and not tick.is_inside:
                self.stats['passive_buy'] += 1
            elif tick.side == 'buy' and tick.is_inside:
                self.stats['aggressive_buy'] += 1
                
        elif tick.direction == '-0':
            self.stats['minus_ticks'] += 1
            if tick.side == 'sell' and not tick.is_inside:
                self.stats['passive_sell'] += 1
            elif tick.side == 'sell' and tick.is_inside:
                self.stats['aggressive_sell'] += 1
                
        elif tick.direction == '0' and tick.is_inside:
            # 穿越价差的交易,最可能是信息驱动
            if tick.side == 'buy':
                self.stats['aggressive_buy'] += 1
            else:
                self.stats['aggressive_sell'] += 1
                
    def generate_profile(self) -> MakerBehaviorProfile:
        """生成行为画像"""
        total_buy = self.stats['passive_buy'] + self.stats['aggressive_buy']
        total_sell = self.stats['passive_sell'] + self.stats['aggressive_sell']
        total = total_buy + total_sell
        
        if total == 0:
            return MakerBehaviorProfile(symbol=self.symbol)
            
        # 计算各指标
        passive_buy_ratio = self.stats['passive_buy'] / total if total > 0 else 0
        passive_sell_ratio = self.stats['passive_sell'] / total if total > 0 else 0
        
        # 攻击指数:主动成交 / 总成交量
        aggressive_total = self.stats['aggressive_buy'] + self.stats['aggressive_sell']
        aggression_index = aggressive_total / total
        
        # 库存偏移度
        buy_skew = (total_buy - total_sell) / total
        
        # tick 主导演绎
        total_ticks = self.stats['plus_ticks'] + self.stats['minus_ticks']
        if total_ticks > 0:
            tick_ratio = (self.stats['plus_ticks'] - self.stats['minus_ticks']) / total_ticks
            if tick_ratio > 0.3:
                tick_dominance = 'buy'
            elif tick_ratio < -0.3:
                tick_dominance = 'sell'
            else:
                tick_dominance = 'neutral'
        else:
            tick_dominance = 'neutral'
            
        # 平均成交单量
        volumes = [t.volume for t in self.ticks]
        avg_tick_size = sum(volumes) / len(volumes) if volumes else 0
        
        return MakerBehaviorProfile(
            symbol=self.symbol,
            passive_buy_ratio=passive_buy_ratio,
            passive_sell_ratio=passive_sell_ratio,
            aggression_index=aggression_index,
            inventory_skew=buy_skew,
            avg_tick_size=avg_tick_size,
            tick_dominance=tick_dominance
        )
    
    def interpret_profile(self, profile: MakerBehaviorProfile) -> Dict:
        """解读行为画像,输出策略建议"""
        maker_type = profile.classify_maker_type()
        
        interpretation = {
            'maker_type': maker_type,
            'market_signal': None,
            'trading_implication': None
        }
        
        if maker_type == 'passive':
            if profile.tick_dominance == 'buy':
                interpretation['market_signal'] = '被动买盘主导,可能存在支撑位'
                interpretation['trading_implication'] = '考虑区间下沿买入,止损设于买一下方'
            elif profile.tick_dominance == 'sell':
                interpretation['market_signal'] = '被动卖盘主导,可能存在阻力位'
                interpretation['trading_implication'] = '考虑区间上沿卖出,止损设于卖一上方'
            else:
                interpretation['market_signal'] = '多空被动力量均衡'
                
        elif maker_type == 'aggressive':
            if profile.inventory_skew > 0.3:
                interpretation['market_signal'] = '激进买盘推动,短期看多'
                interpretation['trading_implication'] = '趋势跟踪为主,不逆势操作'
            else:
                interpretation['market_signal'] = '激进卖盘推动,短期看空'
                interpretation['trading_implication'] = '关注流动性枯竭信号'
                
        return interpretation

五、实战:TickDB 数据获取与实时监控

5.1 获取港股逐笔成交数据

对于港股和数字货币市场,TickDB 提供完整的逐笔成交数据。以下是获取历史成交记录的代码示例:

import os
import requests
import time
from datetime import datetime, timedelta

# ⚠️ TickDB trades 接口支持:港股、数字货币
# ⚠️ 不支持:美股、A 股

TICKDB_BASE_URL = "https://api.tickdb.ai/v1"
API_KEY = os.environ.get("TICKDB_API_KEY")

def get_historical_trades(symbol: str, start_time: int, end_time: int, limit: int = 1000):
    """
    获取历史逐笔成交数据
    
    Args:
        symbol: 交易品种,如 '700.HK'(腾讯)或 'BTC.USDT'
        start_time: 开始时间戳(毫秒)
        end_time: 结束时间戳(毫秒)
        limit: 每页返回数量,最大 1000
    
    Returns:
        list: 成交记录列表
    """
    url = f"{TICKDB_BASE_URL}/market/trades"
    headers = {"X-API-Key": API_KEY}
    
    all_trades = []
    params = {
        "symbol": symbol,
        "start": start_time,
        "end": end_time,
        "limit": limit
    }
    
    while True:
        try:
            response = requests.get(
                url, 
                headers=headers, 
                params=params,
                timeout=(3.05, 10)  # HTTP timeout
            )
            
            if response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", 5))
                print(f"触发限频,等待 {retry_after} 秒...")
                time.sleep(retry_after)
                continue
                
            response.raise_for_status()
            data = response.json()
            
            if data.get("code") == 0:
                trades = data.get("data", {}).get("list", [])
                all_trades.extend(trades)
                
                # 检查是否还有下一页
                if len(trades) < limit:
                    break
                # 更新游标获取下一页
                if trades:
                    params["start"] = trades[-1].get("ts", params["start"]) + 1
                else:
                    break
            else:
                print(f"API 错误: {data}")
                break
                
        except requests.exceptions.Timeout:
            print("请求超时,重试...")
            time.sleep(1)
        except requests.exceptions.RequestException as e:
            print(f"请求失败: {e}")
            break
    
    return all_trades


def calculate_tick_direction(trades: list) -> list:
    """
    计算逐笔成交的 tick 方向
    
    Returns:
        list: 附加 tick_direction 字段的成交记录
    """
    if not trades:
        return []
    
    processed = []
    prev_price = None
    
    for trade in trades:
        price = float(trade.get("p", 0))
        
        if prev_price is None:
            direction = "N"  # 第一笔,无方向
        elif price > prev_price:
            direction = "+0"
        elif price < prev_price:
            direction = "-0"
        else:
            direction = "0"
            
        trade["tick_direction"] = direction
        processed.append(trade)
        prev_price = price
    
    return processed


def analyze_maker_activity(trades: list, window_seconds: int = 60) -> dict:
    """
    分析指定时间窗口内的做市商活动
    
    Args:
        trades: 成交记录列表
        window_seconds: 分析窗口(秒)
    
    Returns:
        dict: 分析结果
    """
    if not trades:
        return {}
    
    # 按时间窗口分组
    window_ms = window_seconds * 1000
    windows = defaultdict(list)
    
    for trade in trades:
        ts = trade.get("ts", 0)
        window_key = (ts // window_ms) * window_ms
        windows[window_key].append(trade)
    
    results = []
    
    for window_start, window_trades in sorted(windows.items()):
        # 统计买卖方向
        buy_volume = sum(int(t.get("v", 0)) for t in window_trades 
                        if t.get("S") == "buy")
        sell_volume = sum(int(t.get("v", 0)) for t in window_trades 
                         if t.get("S") == "sell")
        
        # 计算 tick 方向分布
        plus_ticks = sum(1 for t in window_trades if t.get("tick_direction") == "+0")
        minus_ticks = sum(1 for t in window_trades if t.get("tick_direction") == "-0")
        
        # 计算平均成交单量
        volumes = [int(t.get("v", 0)) for t in window_trades]
        avg_size = sum(volumes) / len(volumes) if volumes else 0
        
        # 判断主导行为
        if buy_volume > sell_volume * 1.5 and avg_size < 500:
            behavior = "passive_buy"
        elif sell_volume > buy_volume * 1.5 and avg_size < 500:
            behavior = "passive_sell"
        elif avg_size > 2000:
            behavior = "aggressive_hedge"
        else:
            behavior = "mixed"
        
        results.append({
            "window_start": datetime.fromtimestamp(window_start / 1000).isoformat(),
            "trade_count": len(window_trades),
            "buy_volume": buy_volume,
            "sell_volume": sell_volume,
            "pressure_ratio": buy_volume / sell_volume if sell_volume > 0 else float('inf'),
            "plus_ticks": plus_ticks,
            "minus_ticks": minus_ticks,
            "avg_tick_size": avg_size,
            "behavior": behavior
        })
    
    return {
        "symbol": trades[0].get("symbol", "unknown"),
        "total_trades": len(trades),
        "window_results": results
    }


# 使用示例
if __name__ == "__main__":
    api_key = os.environ.get("TICKDB_API_KEY")
    if not api_key:
        raise ValueError("请设置 TICKDB_API_KEY 环境变量")
    
    # 获取最近 1 小时的腾讯港股成交数据
    end_time = int(time.time() * 1000)
    start_time = end_time - 3600 * 1000  # 1 小时前
    
    print(f"正在获取 700.HK 最近 1 小时成交数据...")
    trades = get_historical_trades("700.HK", start_time, end_time)
    
    if trades:
        print(f"获取到 {len(trades)} 条成交记录")
        
        # 计算 tick 方向
        processed_trades = calculate_tick_direction(trades)
        
        # 分析做市商活动
        analysis = analyze_maker_activity(processed_trades, window_seconds=300)
        
        print(f"\n=== 5 分钟窗口做市商行为分析 ===")
        for window in analysis.get("window_results", [])[-5:]:  # 最近 5 个窗口
            print(f"[{window['window_start']}] "
                  f"成交量 {window['buy_volume']}/{window['sell_volume']} | "
                  f"买卖比 {window['pressure_ratio']:.2f} | "
                  f"行为 {window['behavior']}")
    else:
        print("未获取到数据,检查 API Key 或 symbol 是否正确")

5.2 实时监控 WebSocket 实现

以下代码演示如何通过 WebSocket 实时接收逐笔成交数据并进行做市商行为监控:

import os
import json
import time
import asyncio
import aiohttp
import numpy as np
from collections import deque

class MakerActivityMonitor:
    """
    实时监控做市商行为的 WebSocket 客户端
    
    功能:
    1. 接收 TickDB trades WebSocket 推送
    2. 实时计算买卖压力比
    3. 检测被动/激进成交模式
    4. 触发阈值告警
    """
    
    def __init__(self, api_key: str, symbol: str):
        self.api_key = api_key
        self.symbol = symbol
        self.ws_url = f"wss://api.tickdb.ai/ws/v1/market/trades?symbol={symbol}&api_key={api_key}"
        
        # 滑动窗口参数
        self.window_size = 100  # 最近 100 笔成交
        self.recent_trades = deque(maxlen=self.window_size)
        self.last_prices = deque(maxlen=2)
        
        # 告警阈值
        self.alert_thresholds = {
            'pressure_ratio': 2.5,      # 买卖压力比超过此值告警
            'aggression_ratio': 0.7,   # 激进成交占比超过此值告警
            'tick_unbalance': 0.6      # tick 方向失衡超过此值告警
        }
        
        self.monitoring = False
        self.stats = {
            'total_trades': 0,
            'passive_buy': 0,
            'passive_sell': 0,
            'aggressive_buy': 0,
            'aggressive_sell': 0
        }
        
    async def connect(self):
        """建立 WebSocket 连接,含心跳保活"""
        session = aiohttp.ClientSession()
        retry_count = 0
        max_retries = 5
        base_delay = 1
        
        while retry_count < max_retries:
            try:
                async with session.ws_connect(
                    self.ws_url,
                    timeout=aiohttp.WSMsgType.PING
                ) as ws:
                    print(f"[{self.symbol}] WebSocket 已连接,开始监控做市商行为")
                    self.monitoring = True
                    retry_count = 0
                    
                    # 启动心跳和接收任务
                    await asyncio.gather(
                        self._heartbeat(ws),
                        self._receive(ws)
                    )
                    
            except aiohttp.ClientError as e:
                retry_count += 1
                delay = min(60, base_delay * (2 ** retry_count))
                jitter = np.random.uniform(0, delay * 0.1)
                print(f"[{self.symbol}] 连接异常 ({e}),{delay + jitter:.1f} 秒后重试...")
                await asyncio.sleep(delay + jitter)
            finally:
                await session.close()
                
    async def _heartbeat(self, ws):
        """心跳保活,每 25 秒发送一次 ping"""
        while self.monitoring:
            await asyncio.sleep(25)
            try:
                await ws.send_json({"cmd": "ping"})
            except Exception:
                break
                
    async def _receive(self, ws):
        """接收并处理成交推送"""
        async for msg in ws:
            if not self.monitoring:
                break
            if msg.type == aiohttp.WSMsgType.TEXT:
                data = json.loads(msg.data)
                if data.get('type') == 'pong':
                    continue
                self._process_trade(data)
            elif msg.type == aiohttp.WSMsgType.ERROR:
                print(f"[{self.symbol}] WebSocket 错误")
                self.monitoring = False
                break
                
    def _process_trade(self, data: dict):
        """处理单条成交推送"""
        try:
            trade_data = data.get('data', {})
            price = float(trade_data.get('p', 0))
            volume = int(trade_data.get('v', 0))
            side = trade_data.get('S', '')  # 'buy' or 'sell'
            ts = trade_data.get('ts', 0)
            
            # 计算 tick 方向
            if len(self.last_prices) >= 2:
                prev_price = self.last_prices[-2]
                if price > prev_price:
                    tick_dir = '+0'
                elif price < prev_price:
                    tick_dir = '-0'
                else:
                    tick_dir = '0'
            else:
                tick_dir = 'N'
                
            self.last_prices.append(price)
            
            # 分类成交类型(简化判断)
            if tick_dir == 'N':
                # 第一笔,无法判断
                is_aggressive = False
            elif side == 'buy':
                is_aggressive = tick_dir == '-0'  # 主动买但价格下跌
            else:
                is_aggressive = tick_dir == '+0'  # 主动卖但价格上涨
            
            # 更新统计
            self.stats['total_trades'] += 1
            if side == 'buy':
                if is_aggressive:
                    self.stats['aggressive_buy'] += 1
                else:
                    self.stats['passive_buy'] += 1
            else:
                if is_aggressive:
                    self.stats['aggressive_sell'] += 1
                else:
                    self.stats['passive_sell'] += 1
                    
            # 添加到滑动窗口
            self.recent_trades.append({
                'ts': ts,
                'price': price,
                'volume': volume,
                'side': side,
                'tick_dir': tick_dir,
                'is_aggressive': is_aggressive
            })
            
            # 定期输出状态(每 50 笔)
            if self.stats['total_trades'] % 50 == 0:
                self._emit_status()
                self._check_alerts()
                
        except Exception as e:
            print(f"处理成交数据异常: {e}")
            
    def _emit_status(self):
        """输出当前监控状态"""
        total = self.stats['total_trades']
        if total == 0:
            return
            
        buy_vol = self.stats['passive_buy'] + self.stats['aggressive_buy']
        sell_vol = self.stats['passive_sell'] + self.stats['aggressive_sell']
        pressure_ratio = buy_vol / sell_vol if sell_vol > 0 else float('inf')
        
        aggression_ratio = (self.stats['aggressive_buy'] + self.stats['aggressive_sell']) / total
        
        print(f"[{self.symbol}] {total} 笔 | "
              f"买卖比 {pressure_ratio:.2f} | "
              f"激进占比 {aggression_ratio:.1%} | "
              f"被动买 {self.stats['passive_buy']} | "
              f"被动卖 {self.stats['passive_sell']}")
              
    def _check_alerts(self):
        """检查是否触发告警条件"""
        total = self.stats['total_trades']
        if total < 20:
            return
            
        buy_vol = self.stats['passive_buy'] + self.stats['aggressive_buy']
        sell_vol = self.stats['passive_sell'] + self.stats['aggressive_sell']
        pressure_ratio = buy_vol / sell_vol if sell_vol > 0 else float('inf')
        
        # 检查买卖压力比异常
        if pressure_ratio > self.alert_thresholds['pressure_ratio']:
            print(f"🚨 告警:买卖压力比 {pressure_ratio:.2f} 超过阈值 "
                  f"({self.alert_thresholds['pressure_ratio']}),买方力量显著")
                  
        elif pressure_ratio < 1 / self.alert_thresholds['pressure_ratio']:
            print(f"🚨 告警:买卖压力比 {pressure_ratio:.2f} 低于阈值 "
                  f"(反向),卖方力量显著")
                  
    def get_current_metrics(self) -> dict:
        """获取当前监控指标"""
        total = self.stats['total_trades']
        if total == 0:
            return {}
            
        buy_vol = self.stats['passive_buy'] + self.stats['aggressive_buy']
        sell_vol = self.stats['passive_sell'] + self.stats['aggressive_sell']
        
        return {
            'total_trades': total,
            'buy_volume': buy_vol,
            'sell_volume': sell_vol,
            'pressure_ratio': buy_vol / sell_vol if sell_vol > 0 else float('inf'),
            'aggression_ratio': (
                self.stats['aggressive_buy'] + self.stats['aggressive_sell']
            ) / total,
            'passive_buy_ratio': self.stats['passive_buy'] / total,
            'passive_sell_ratio': self.stats['passive_sell'] / total
        }


async def main():
    """主函数:启动做市商行为监控"""
    api_key = os.environ.get("TICKDB_API_KEY")
    if not api_key:
        raise ValueError("请设置 TICKDB_API_KEY 环境变量")
    
    # 监控港股腾讯和数字货币 BTC(示例多标的监控)
    symbols = ["700.HK", "BTC.USDT"]
    monitors = [
        MakerActivityMonitor(api_key, symbol) 
        for symbol in symbols
    ]
    
    print(f"启动 {len(symbols)} 个监控实例:{symbols}")
    
    # 并行运行所有监控
    tasks = [monitor.connect() for monitor in monitors]
    
    # 运行 5 分钟后停止
    try:
        await asyncio.wait_for(
            asyncio.gather(*tasks, return_exceptions=True),
            timeout=300
        )
    except asyncio.TimeoutError:
        print("\n监控达到 5 分钟,停止...")
        for monitor in monitors:
            monitor.monitoring = False


if __name__ == "__main__":
    asyncio.run(main())

⚠️ 部署提示:实际生产环境中,建议:

  1. 将告警推送集成到飞书/Slack/钉钉等 IM 工具
  2. 历史数据存入 TimescaleDB/InfluxDB 进行长期分析
  3. 多标的监控时使用信号量控制并发连接数

六、美股做市商策略的特殊考量

6.1 美股 vs 其他市场的关键差异

维度 美股 港股 数字货币
主要参与者 高频做市商(HFT)、共同基金 本地机构、做市商 交易所、量化基金
订单簿深度 典型 1-5 档(TickDB depth 支持) 10 档深度 10-50 档深度
成交确认速度 ~10ms ~50ms ~5ms
价差特性 Penny increments(0.01 美元) 0.05 港币起 交易所自定义
特殊事件 财报、指数再平衡 涡轮牛熊证影响 减半、交易所维护

6.2 美股 tick 数据供应商

由于 TickDB 当前不支持美股逐笔数据,以下是常见的美股数据源:

供应商 数据类型 延迟 备注
Polygon.io Level 1 / Trades 实时 有免费层,支持 WebSocket
NYSE TAQ 历史 tick T+1 学术研究首选
Nasdaq TotalView Level 2 + ITCH 实时 机构级别
IEX Cloud Trades + Quotes 实时 相对便宜

建议路径:若你的策略需要美股 + 港股/数字货币的混合研究,可以使用 Polygon 获取美股数据,用 TickDB 获取港股/数字货币数据,方法论保持一致。


七、策略框架:基于做市商行为的交易思路

7.1 策略一:被动单衰竭检测

逻辑:当被动型做市商在某一价位积累了大量订单后,如果这些订单开始被连续"吃掉"但价格未动,说明存在更强的对手方在吸收流动性。

入场信号

  • 连续 5+ 笔成交在同一价位
  • 买卖压力比 > 2.0
  • tick 方向从 +0 转为 -0(价格从上涨转为下跌)

出场:价格反向突破被动单累积价位 ± 0.5 档

7.2 策略二:激进单突破确认

逻辑:激进型机构推动价格突破关键位后,被动型做市商会调整报价,形成新的支撑/阻力。

入场信号

  • 大单(>1000 股)快速推动价格突破 20 日高点
  • 突破后 5 分钟内出现被动买盘(+0 tick)
  • 订单簿深度在突破位上方快速增加

仓位管理:激进单突破 → 顺势追入 → 被动单衰竭 → 减仓/止盈

7.3 策略三:库存偏移度均值回归

逻辑:做市商不会长期持有单向库存,会通过报价调整回归中性。当库存偏移度达到极端值时,价格存在均值回归压力。

入场信号

  • 库存偏移度持续 > 0.4 或 < -0.4 超过 10 分钟
  • 偏移方向与当前趋势相反(做市商在"逆势"积累库存)

假设:做市商的信息优势使其能准确判断短期价格方向,其库存行为是反向指标。


八、数据能力边界说明

作为 TickDB 内容战略专家,必须如实说明产品的能力边界:

8.1 TickDB 当前支持的数据类型

市场 trades(逐笔成交) depth(订单簿深度) kline(历史 K 线)
美股 ❌ 不支持 1 档 ✅ 10 年级别
港股 ✅ 支持 10 档 ✅ 5 年级别
数字货币 ✅ 支持 10-50 档 ✅ 全量历史
A股 ❌ 不支持 ❌ 不支持 ✅ 支持

8.2 如何选择数据源

你的研究目标 推荐数据源组合
港股/数字货币高频策略 TickDB 全套(trades + depth)
美股长周期回测 TickDB kline + 第三方 tick 数据
跨市场比较研究 TickDB(港股/数字货币)+ Polygon(美股)
期权做市商行为 需对接期权数据供应商(OVAL、CBOE)

下一步行动

如果你希望深入研究港股/数字货币的做市商行为

  1. 访问 tickdb.ai 注册(免费层包含 10,000 次 API 调用)
  2. 在控制台获取 API Key,设置环境变量 TICKDB_API_KEY
  3. 复制本文代码,在本地运行观察实际数据模式
  4. 关注 TickDB 公众号,获取市场微观结构系列更新

如果你需要美股逐笔数据

  • 建议对接 Polygon.io(适合个人研究者)或 NYSE TAQ(适合学术研究)
  • 本文的方法论框架可完全迁移,仅需调整数据接口

如果你习惯用 AI 辅助开发

  • 在 AI 助手中搜索安装 tickdb-market-data SKILL
  • 可直接用自然语言描述需求,AI 会调用 TickDB API 获取数据

结语

订单流是市场的心电图。价格是已经发生的博弈结果,而逐笔成交记录了博弈的过程。做市商是这场博弈中最重要的被动参与者——他们的报价行为、库存管理、对冲节奏,都隐藏在高频数据中。

识别做市商不是为了"跟随庄家",而是为了理解流动性的供给结构。当你知道被动买盘正在某个价位积累时,你可以判断突破的概率;当你发现激进卖盘正在扫货时,你可以预估回调的深度。

这不是预测未来的水晶球,而是解读当下市场状态的放大镜。


风险提示:本文不构成任何投资建议。做市商行为分析属于高频策略范畴,存在以下风险:数据延迟导致信号失效、极端行情下流动性枯竭、交易成本侵蚀利润、市场结构变化影响策略有效性。历史回测结果不代表未来表现,请在充分测试后谨慎使用。