从逐笔成交反推做市商行为:美股高频做市策略入门
价格是结果,订单流才是原因
2024 年某交易日的上午 10:23 分,某大型科技期权的隐含波动率在 3 秒内飙升 15%,而标的资产的价格几乎纹丝不动。如果你只看价格,你会错过这个信号。但如果你有逐笔成交数据,你会看到:连续的 100-200 股小单以略高于卖一的价格成交,持续时间 2.7 秒,总成交量 4,800 股——这不是散户的随机交易,而是典型的期权套利商在对冲行为。
这就是高频做市策略研究的核心问题:订单流是价格变动的先行指标。
本文拆解从逐笔成交数据(tick data)中识别做市商行为的技术路径。我们会讨论做市商的撮合逻辑、tick 方向推断方法、以及如何通过库存偏移度量化做市商的被动程度。代码示例以港股和数字货币市场为主,方法论同样适用于美股。
重要说明:TickDB 的 trades 接口当前不支持美股和 A 股市场。若你专注于美股逐笔数据,需对接其他数据供应商。对于港股、数字货币等市场,TickDB 提供完整的逐笔成交和订单簿深度数据,可直接用于本文所述策略的研究与回测。
一、做市商的本质:双边报价的库存博弈
1.1 做市商不是“庄家”
讨论做市商行为前,必须澄清一个常见误解:做市商不是拥有信息优势、资金控盘的主力资金,而是承担流动性风险并从中获取价差的被动报价者。
做市商的核心义务是:持续提供买价(bid)和卖价(offer),承诺在任何时刻以报价成交至少一定数量的股票。这个义务换来的权利是:买卖价差(bid-ask spread)。
盈利公式:
做市商利润 = 价差收入 - 逆向选择损失 - 库存成本
- 逆向选择损失:做市商刚报价,就遇到价格向不利方向移动,说明对手掌握了私人信息
- 库存成本:持仓量过大时,需要承担股价下行风险,可能被迫在不利价位平仓
1.2 订单簿中的做市商痕迹
在 Level 2 行情中,你通常能看到这样的结构:
| 档位 | 买量 | 买价 | 卖价 | 卖量 |
|---|---|---|---|---|
| 1 | 12,500 | 150.02 | 150.03 | 15,800 |
| 2 | 8,200 | 150.01 | 150.04 | 6,400 |
| 3 | 5,600 | 150.00 | 150.05 | 9,200 |
在这个例子中:
- 买卖价差 = 150.03 - 150.02 = 0.01 美元(1 美分)
- 中间价 = (150.02 + 150.03) / 2 = 150.025
- 深度不平衡:卖方深度(15,800 + 6,400 + 9,200 = 31,400)远大于买方深度(26,300)
这种不对称性通常反映两类参与者:
- 被动型做市商:愿意在买方挂单,承担库存多头风险
- 对冲型机构:在卖出大量股票后,通过买入 ETF 或期货对冲
二、逐笔数据的关键字段与解读
2.1 tick 数据的标准结构
一条完整的逐笔成交记录包含以下字段:
| 字段 | 说明 | 策略意义 |
|---|---|---|
timestamp |
成交时间戳(毫秒级) | 计算交易频率、检测加速/减速 |
price |
成交价格 | 判断成交是否在买卖价差内 |
volume |
成交量(股数/币数) | 区分大单和小单 |
side |
成交方向(主动买/主动卖) | 核心字段,判断买入压力还是卖出压力 |
order_id |
订单编号(若有) | 用于追踪订单生命周期 |
tick_direction |
价格变动方向(+0/-/N) | 标注是否相对于上一笔成交涨价、跌价或不变 |
2.2 tick 方向的三种状态
ticker_direction(tick 方向)是识别短期供需的重要指标:
| 值 | 含义 | 微观解读 |
|---|---|---|
+0 |
价格相对于上一笔上涨 | 主动买盘推动,供需暂时偏多 |
0 |
价格与上一笔持平 | 买卖力量均衡,等待方向 |
-0 |
价格相对于上一笔下跌 | 主动卖盘推动,供需暂时偏空 |
一个关键观察:连续出现多个 +0 且成交量较小,说明买方在被动吸收卖压但未能推动价格上涨——这往往是做市商在高价位持续挂单承接的信号。
2.3 成交价格在价差内的判定
通过对比成交价与买卖价差的位置,可以判断这笔交易是否对做市商不利:
def classify_tick_side(tick_price, bid, ask):
"""
判定成交是否对做市商有利
Args:
tick_price: 成交价
bid: 当前买一价
ask: 当前卖一价
Returns:
str: 'at_bid'(对做市商有利) / 'at_ask'(对做市商不利) / 'inside'(在价差内)
"""
spread = ask - bid
if tick_price == bid:
return 'at_bid' # 做市商的买方挂单被动成交,赚了价差
elif tick_price == ask:
return 'at_ask' # 做市商的卖方挂单被动成交,赚了价差
else:
return 'inside' # 成交在买卖价差之间,说明有逆向选择
def classify_tick_direction(tick_price, prev_price, tick_direction):
"""
结合 tick 方向和成交位置判断行为模式
Returns:
str: 行为模式分类
"""
if tick_direction == '+0' and tick_price == bid:
return 'passive_buy_aggression'
elif tick_direction == '-0' and tick_price == ask:
return 'passive_sell_aggression'
elif tick_direction == '0' and tick_price < ask and tick_price > bid:
return 'spread_crossing'
return 'normal'
三、识别做市商行为的核心指标
3.1 买卖压力比(Buy-Sell Pressure Ratio)
买卖压力比是衡量短期供需失衡的最直接指标:
$$
\text{买卖压力比} = \frac{\sum_{i=1}^{N} \text{主动买入量}i}{\sum{i=1}^{N} \text{主动卖出量}_i}
$$
当买卖压力比 > 1 时,买方力量占优;< 1 时,卖方力量占优;接近 1 时,市场相对均衡。
做市商视角:如果买卖压力比持续偏离 1,但价格未动,说明做市商正在双向挂单吸收流动性。
3.2 订单成交率(Fill Rate)
在高频数据中,订单成交率反映被动程度:
$$
\text{被动成交率} = \frac{\text{被动成交量(等待后成交)}}{\text{总成交量}}
$$
高被动成交率意味着大量订单在被动等待后被“吃掉”,通常是机构或做市商的大单拆小。
3.3 库存偏移度(Inventory Skew)
这是区分不同类型做市商的关键指标。做市商通过调整报价来管理库存:
$$
\text{库存偏移度} = \frac{Q_{bid} - Q_{ask}}{Q_{bid} + Q_{ask}}
$$
| 库存偏移度 | 解读 |
|---|---|
| +0.3 ~ +0.5 | 库存偏多头,做市商预期价格下跌或主动持有空仓 |
| -0.3 ~ -0.5 | 库存偏空头,做市商预期价格上涨或主动持有多仓 |
| -0.1 ~ +0.1 | 库存中性,做市商在快速对冲 |
一个典型场景:当某股票出现利好消息但价格尚未启动时,做市商的库存偏移度可能已经转负(持有多头),这意味着他们在积极买入并相信价格会上涨。
3.4 实际计算示例
以下代码演示如何从 TickDB 的 depth 频道数据计算订单簿不平衡度和库存偏移度:
import os
import json
import time
import asyncio
import aiohttp
import numpy as np
from collections import deque
class OrderBookAnalyzer:
"""基于 TickDB depth 频道的订单簿分析"""
def __init__(self, api_key: str, symbol: str):
self.api_key = api_key
self.symbol = symbol
self.ws_url = f"wss://api.tickdb.ai/ws/v1/market/depth?symbol={symbol}&api_key={api_key}"
self.order_book = {'bids': {}, 'asks': {}}
self.history = deque(maxlen=100) # 保留最近 100 个快照
self.running = False
async def connect(self):
"""WebSocket 连接,含心跳和重连"""
session = aiohttp.ClientSession()
retry_count = 0
max_retries = 5
while retry_count < max_retries:
try:
async with session.ws_connect(
self.ws_url,
timeout=aiohttp.WSMsgType.PING
) as ws:
print(f"[{self.symbol}] WebSocket 已连接")
self.running = True
retry_count = 0 # 重置重试计数
# 启动心跳保活任务
ping_task = asyncio.create_task(self._send_ping(ws))
# 启动消息处理任务
receive_task = asyncio.create_task(self._receive(ws))
# 等待任一任务完成
done, pending = await asyncio.wait(
[ping_task, receive_task],
return_when=asyncio.FIRST_COMPLETED
)
# 取消未完成的任务
for task in pending:
task.cancel()
except aiohttp.ClientError as e:
retry_count += 1
delay = min(30, 2 ** retry_count) # 指数退避,最大 30 秒
jitter = np.random.uniform(0, delay * 0.1) # 抖动避免惊群
print(f"[{self.symbol}] 连接断开 ({e}),{delay + jitter:.1f} 秒后重试...")
await asyncio.sleep(delay + jitter)
finally:
await session.close()
print(f"[{self.symbol}] 达到最大重试次数,停止连接")
async def _send_ping(self, ws):
"""每 30 秒发送一次 ping 保活"""
while True:
await asyncio.sleep(30)
try:
await ws.send_json({"cmd": "ping"})
except Exception as e:
print(f"[{self.symbol}] Ping 失败: {e}")
break
async def _receive(self, ws):
"""接收并处理消息"""
async for msg in ws:
if msg.type == aiohttp.WSMsgType.ERROR:
print(f"[{self.symbol}] WebSocket 错误")
break
elif msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
if data.get('type') == 'pong':
continue # 心跳响应,跳过
self._process_depth_update(data)
def _process_depth_update(self, data: dict):
"""处理深度更新,计算库存偏移度"""
if 'data' not in data:
return
snapshot = data['data']
self.order_book['bids'] = {
float(p): float(v) for p, v in snapshot.get('bids', {}).items()
}
self.order_book['asks'] = {
float(p): float(v) for p, v in snapshot.get('asks', {}).items()
}
# 计算前 5 档的库存偏移度
imbalance = self.calculate_imbalance(depth=5)
skew = self.calculate_inventory_skew(depth=5)
# 记录历史
self.history.append({
'timestamp': time.time(),
'imbalance': imbalance,
'skew': skew,
'bid_depth': sum(self.order_book['bids'].values()),
'ask_depth': sum(self.order_book['asks'].values())
})
def calculate_imbalance(self, depth: int = 5) -> float:
"""
计算订单簿不平衡度
imbalance > 0: 买方深度占优
imbalance < 0: 卖方深度占优
"""
bid_levels = sorted(self.order_book['bids'].items(), reverse=True)[:depth]
ask_levels = sorted(self.order_book['asks'].items())[:depth]
bid_vol = sum(v for _, v in bid_levels)
ask_vol = sum(v for _, v in ask_levels)
if bid_vol + ask_vol == 0:
return 0.0
return (bid_vol - ask_vol) / (bid_vol + ask_vol)
def calculate_inventory_skew(self, depth: int = 5) -> float:
"""
计算库存偏移度(模拟做市商的库存偏好)
正值表示偏向买方(预期下跌),负值表示偏向卖方(预期上涨)
"""
imbalance = self.calculate_imbalance(depth)
# 简单映射:库存偏移度 ≈ 不平衡度(简化模型)
# 实际模型中需要考虑成交量历史、加权时间等
return imbalance
def get_market_regime(self, window: int = 20) -> str:
"""
基于历史数据判断市场状态
Returns:
'volatile': 高波动,供需失衡严重
'balanced': 平衡,做市商活跃
'trend': 单边趋势,跟随方向
"""
if len(self.history) < window:
return 'unknown'
recent = list(self.history)[-window:]
imbalances = [h['imbalance'] for h in recent]
avg_imbalance = np.mean(imbalances)
std_imbalance = np.std(imbalances)
if std_imbalance > 0.5:
return 'volatile'
elif abs(avg_imbalance) < 0.2:
return 'balanced'
else:
return 'trend'
async def main():
"""主函数:演示订单簿分析"""
api_key = os.environ.get("TICKDB_API_KEY")
if not api_key:
raise ValueError("请设置 TICKDB_API_KEY 环境变量")
# 以港股腾讯为例
analyzer = OrderBookAnalyzer(api_key, "0700.HK")
# 启动分析(后台运行 60 秒)
analysis_task = asyncio.create_task(analyzer.connect())
# 模拟监控逻辑
await asyncio.sleep(60)
analyzer.running = False
# 输出分析结果
if len(analyzer.history) > 0:
print("\n=== 60 秒订单簿分析报告 ===")
imbalances = [h['imbalance'] for h in analyzer.history]
print(f"平均不平衡度: {np.mean(imbalances):.3f}")
print(f"不平衡度标准差: {np.std(imbalances):.3f}")
print(f"市场状态: {analyzer.get_market_regime()}")
else:
print("未获取到数据")
if __name__ == "__main__":
asyncio.run(main())
⚠️ 生产环境提示:上述代码为演示版本。若用于实际策略:
- 建议使用
asyncio异步架构处理多标的并行监控- 深度数据建议存储到本地数据库进行历史回放分析
- 高频场景(<1 秒采样)需考虑网络延迟和本地时钟校准
四、从 tick 数据反推做市商策略
4.1 被动型 vs 激进型做市商
通过对成交数据的聚类分析,可以将做市商分为两类:
| 特征 | 被动型做市商 | 激进型做市商 |
|---|---|---|
| 挂单位置 | 紧贴买一/卖一 | 远离中间价,主动扫单 |
| 成交率 | 低(挂单常被跳过) | 高(主动成交) |
| 库存管理 | 严格,偏移度低 | 宽松,接受较大敞口 |
| 盈利来源 | 价差 + 逆向选择 | 趋势跟随 |
识别被动型做市商的信号:
- 大量小单(100-300 股)在买一/卖一位置反复成交
- 价格不变但成交量累积
- 订单簿深度在某一档位异常集中
识别激进型做市商的信号:
- 大单(>1000 股)以市价单快速成交
- 成交价格经常穿越买卖价差(inside ticks)
- 短时间内连续推动价格向同一方向移动
4.2 基于 tick 方向的行为推断
以下是一个简化的做市商行为识别算法:
from collections import defaultdict
from dataclasses import dataclass, field
from typing import List, Dict
@dataclass
class TickRecord:
timestamp: float
price: float
volume: int
direction: str # '+0', '0', '-0'
side: str # 'buy', 'sell'
is_inside: bool = False # 是否在价差内成交
@dataclass
class MakerBehaviorProfile:
"""做市商行为画像"""
symbol: str
passive_buy_ratio: float = 0.0 # 被动买入占比
passive_sell_ratio: float = 0.0 # 被动卖出占比
aggression_index: float = 0.0 # 攻击指数
inventory_skew: float = 0.0 # 库存偏移度
avg_tick_size: float = 0.0 # 平均成交单量
tick_dominance: str = 'neutral' # 主导方向
def classify_maker_type(self) -> str:
"""分类做市商类型"""
if self.aggression_index < 0.3 and abs(self.inventory_skew) < 0.2:
return 'passive'
elif self.aggression_index > 0.7:
return 'aggressive'
else:
return 'mixed'
class MakerBehaviorDetector:
"""基于成交数据的做市商行为检测器"""
def __init__(self, symbol: str, inside_threshold: float = 0.0001):
self.symbol = symbol
self.inside_threshold = inside_threshold # 价差内成交的容差
self.ticks: List[TickRecord] = []
self.stats = {
'passive_buy': 0,
'passive_sell': 0,
'aggressive_buy': 0,
'aggressive_sell': 0,
'inside_buy': 0,
'inside_sell': 0,
'plus_ticks': 0,
'minus_ticks': 0
}
def process_tick(self, tick: TickRecord, bid: float, ask: float):
"""处理单条 tick,分类行为"""
self.ticks.append(tick)
mid_price = (bid + ask) / 2
tick_price = tick.price
# 判断是否在价差内成交
if bid < tick_price < ask:
tick.is_inside = True
elif tick_price <= bid + (ask - bid) * self.inside_threshold:
tick.is_inside = False
# 基于 tick 方向和成交位置分类
if tick.direction == '+0':
self.stats['plus_ticks'] += 1
if tick.side == 'buy' and not tick.is_inside:
self.stats['passive_buy'] += 1
elif tick.side == 'buy' and tick.is_inside:
self.stats['aggressive_buy'] += 1
elif tick.direction == '-0':
self.stats['minus_ticks'] += 1
if tick.side == 'sell' and not tick.is_inside:
self.stats['passive_sell'] += 1
elif tick.side == 'sell' and tick.is_inside:
self.stats['aggressive_sell'] += 1
elif tick.direction == '0' and tick.is_inside:
# 穿越价差的交易,最可能是信息驱动
if tick.side == 'buy':
self.stats['aggressive_buy'] += 1
else:
self.stats['aggressive_sell'] += 1
def generate_profile(self) -> MakerBehaviorProfile:
"""生成行为画像"""
total_buy = self.stats['passive_buy'] + self.stats['aggressive_buy']
total_sell = self.stats['passive_sell'] + self.stats['aggressive_sell']
total = total_buy + total_sell
if total == 0:
return MakerBehaviorProfile(symbol=self.symbol)
# 计算各指标
passive_buy_ratio = self.stats['passive_buy'] / total if total > 0 else 0
passive_sell_ratio = self.stats['passive_sell'] / total if total > 0 else 0
# 攻击指数:主动成交 / 总成交量
aggressive_total = self.stats['aggressive_buy'] + self.stats['aggressive_sell']
aggression_index = aggressive_total / total
# 库存偏移度
buy_skew = (total_buy - total_sell) / total
# tick 主导演绎
total_ticks = self.stats['plus_ticks'] + self.stats['minus_ticks']
if total_ticks > 0:
tick_ratio = (self.stats['plus_ticks'] - self.stats['minus_ticks']) / total_ticks
if tick_ratio > 0.3:
tick_dominance = 'buy'
elif tick_ratio < -0.3:
tick_dominance = 'sell'
else:
tick_dominance = 'neutral'
else:
tick_dominance = 'neutral'
# 平均成交单量
volumes = [t.volume for t in self.ticks]
avg_tick_size = sum(volumes) / len(volumes) if volumes else 0
return MakerBehaviorProfile(
symbol=self.symbol,
passive_buy_ratio=passive_buy_ratio,
passive_sell_ratio=passive_sell_ratio,
aggression_index=aggression_index,
inventory_skew=buy_skew,
avg_tick_size=avg_tick_size,
tick_dominance=tick_dominance
)
def interpret_profile(self, profile: MakerBehaviorProfile) -> Dict:
"""解读行为画像,输出策略建议"""
maker_type = profile.classify_maker_type()
interpretation = {
'maker_type': maker_type,
'market_signal': None,
'trading_implication': None
}
if maker_type == 'passive':
if profile.tick_dominance == 'buy':
interpretation['market_signal'] = '被动买盘主导,可能存在支撑位'
interpretation['trading_implication'] = '考虑区间下沿买入,止损设于买一下方'
elif profile.tick_dominance == 'sell':
interpretation['market_signal'] = '被动卖盘主导,可能存在阻力位'
interpretation['trading_implication'] = '考虑区间上沿卖出,止损设于卖一上方'
else:
interpretation['market_signal'] = '多空被动力量均衡'
elif maker_type == 'aggressive':
if profile.inventory_skew > 0.3:
interpretation['market_signal'] = '激进买盘推动,短期看多'
interpretation['trading_implication'] = '趋势跟踪为主,不逆势操作'
else:
interpretation['market_signal'] = '激进卖盘推动,短期看空'
interpretation['trading_implication'] = '关注流动性枯竭信号'
return interpretation
五、实战:TickDB 数据获取与实时监控
5.1 获取港股逐笔成交数据
对于港股和数字货币市场,TickDB 提供完整的逐笔成交数据。以下是获取历史成交记录的代码示例:
import os
import requests
import time
from datetime import datetime, timedelta
# ⚠️ TickDB trades 接口支持:港股、数字货币
# ⚠️ 不支持:美股、A 股
TICKDB_BASE_URL = "https://api.tickdb.ai/v1"
API_KEY = os.environ.get("TICKDB_API_KEY")
def get_historical_trades(symbol: str, start_time: int, end_time: int, limit: int = 1000):
"""
获取历史逐笔成交数据
Args:
symbol: 交易品种,如 '700.HK'(腾讯)或 'BTC.USDT'
start_time: 开始时间戳(毫秒)
end_time: 结束时间戳(毫秒)
limit: 每页返回数量,最大 1000
Returns:
list: 成交记录列表
"""
url = f"{TICKDB_BASE_URL}/market/trades"
headers = {"X-API-Key": API_KEY}
all_trades = []
params = {
"symbol": symbol,
"start": start_time,
"end": end_time,
"limit": limit
}
while True:
try:
response = requests.get(
url,
headers=headers,
params=params,
timeout=(3.05, 10) # HTTP timeout
)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
print(f"触发限频,等待 {retry_after} 秒...")
time.sleep(retry_after)
continue
response.raise_for_status()
data = response.json()
if data.get("code") == 0:
trades = data.get("data", {}).get("list", [])
all_trades.extend(trades)
# 检查是否还有下一页
if len(trades) < limit:
break
# 更新游标获取下一页
if trades:
params["start"] = trades[-1].get("ts", params["start"]) + 1
else:
break
else:
print(f"API 错误: {data}")
break
except requests.exceptions.Timeout:
print("请求超时,重试...")
time.sleep(1)
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
break
return all_trades
def calculate_tick_direction(trades: list) -> list:
"""
计算逐笔成交的 tick 方向
Returns:
list: 附加 tick_direction 字段的成交记录
"""
if not trades:
return []
processed = []
prev_price = None
for trade in trades:
price = float(trade.get("p", 0))
if prev_price is None:
direction = "N" # 第一笔,无方向
elif price > prev_price:
direction = "+0"
elif price < prev_price:
direction = "-0"
else:
direction = "0"
trade["tick_direction"] = direction
processed.append(trade)
prev_price = price
return processed
def analyze_maker_activity(trades: list, window_seconds: int = 60) -> dict:
"""
分析指定时间窗口内的做市商活动
Args:
trades: 成交记录列表
window_seconds: 分析窗口(秒)
Returns:
dict: 分析结果
"""
if not trades:
return {}
# 按时间窗口分组
window_ms = window_seconds * 1000
windows = defaultdict(list)
for trade in trades:
ts = trade.get("ts", 0)
window_key = (ts // window_ms) * window_ms
windows[window_key].append(trade)
results = []
for window_start, window_trades in sorted(windows.items()):
# 统计买卖方向
buy_volume = sum(int(t.get("v", 0)) for t in window_trades
if t.get("S") == "buy")
sell_volume = sum(int(t.get("v", 0)) for t in window_trades
if t.get("S") == "sell")
# 计算 tick 方向分布
plus_ticks = sum(1 for t in window_trades if t.get("tick_direction") == "+0")
minus_ticks = sum(1 for t in window_trades if t.get("tick_direction") == "-0")
# 计算平均成交单量
volumes = [int(t.get("v", 0)) for t in window_trades]
avg_size = sum(volumes) / len(volumes) if volumes else 0
# 判断主导行为
if buy_volume > sell_volume * 1.5 and avg_size < 500:
behavior = "passive_buy"
elif sell_volume > buy_volume * 1.5 and avg_size < 500:
behavior = "passive_sell"
elif avg_size > 2000:
behavior = "aggressive_hedge"
else:
behavior = "mixed"
results.append({
"window_start": datetime.fromtimestamp(window_start / 1000).isoformat(),
"trade_count": len(window_trades),
"buy_volume": buy_volume,
"sell_volume": sell_volume,
"pressure_ratio": buy_volume / sell_volume if sell_volume > 0 else float('inf'),
"plus_ticks": plus_ticks,
"minus_ticks": minus_ticks,
"avg_tick_size": avg_size,
"behavior": behavior
})
return {
"symbol": trades[0].get("symbol", "unknown"),
"total_trades": len(trades),
"window_results": results
}
# 使用示例
if __name__ == "__main__":
api_key = os.environ.get("TICKDB_API_KEY")
if not api_key:
raise ValueError("请设置 TICKDB_API_KEY 环境变量")
# 获取最近 1 小时的腾讯港股成交数据
end_time = int(time.time() * 1000)
start_time = end_time - 3600 * 1000 # 1 小时前
print(f"正在获取 700.HK 最近 1 小时成交数据...")
trades = get_historical_trades("700.HK", start_time, end_time)
if trades:
print(f"获取到 {len(trades)} 条成交记录")
# 计算 tick 方向
processed_trades = calculate_tick_direction(trades)
# 分析做市商活动
analysis = analyze_maker_activity(processed_trades, window_seconds=300)
print(f"\n=== 5 分钟窗口做市商行为分析 ===")
for window in analysis.get("window_results", [])[-5:]: # 最近 5 个窗口
print(f"[{window['window_start']}] "
f"成交量 {window['buy_volume']}/{window['sell_volume']} | "
f"买卖比 {window['pressure_ratio']:.2f} | "
f"行为 {window['behavior']}")
else:
print("未获取到数据,检查 API Key 或 symbol 是否正确")
5.2 实时监控 WebSocket 实现
以下代码演示如何通过 WebSocket 实时接收逐笔成交数据并进行做市商行为监控:
import os
import json
import time
import asyncio
import aiohttp
import numpy as np
from collections import deque
class MakerActivityMonitor:
"""
实时监控做市商行为的 WebSocket 客户端
功能:
1. 接收 TickDB trades WebSocket 推送
2. 实时计算买卖压力比
3. 检测被动/激进成交模式
4. 触发阈值告警
"""
def __init__(self, api_key: str, symbol: str):
self.api_key = api_key
self.symbol = symbol
self.ws_url = f"wss://api.tickdb.ai/ws/v1/market/trades?symbol={symbol}&api_key={api_key}"
# 滑动窗口参数
self.window_size = 100 # 最近 100 笔成交
self.recent_trades = deque(maxlen=self.window_size)
self.last_prices = deque(maxlen=2)
# 告警阈值
self.alert_thresholds = {
'pressure_ratio': 2.5, # 买卖压力比超过此值告警
'aggression_ratio': 0.7, # 激进成交占比超过此值告警
'tick_unbalance': 0.6 # tick 方向失衡超过此值告警
}
self.monitoring = False
self.stats = {
'total_trades': 0,
'passive_buy': 0,
'passive_sell': 0,
'aggressive_buy': 0,
'aggressive_sell': 0
}
async def connect(self):
"""建立 WebSocket 连接,含心跳保活"""
session = aiohttp.ClientSession()
retry_count = 0
max_retries = 5
base_delay = 1
while retry_count < max_retries:
try:
async with session.ws_connect(
self.ws_url,
timeout=aiohttp.WSMsgType.PING
) as ws:
print(f"[{self.symbol}] WebSocket 已连接,开始监控做市商行为")
self.monitoring = True
retry_count = 0
# 启动心跳和接收任务
await asyncio.gather(
self._heartbeat(ws),
self._receive(ws)
)
except aiohttp.ClientError as e:
retry_count += 1
delay = min(60, base_delay * (2 ** retry_count))
jitter = np.random.uniform(0, delay * 0.1)
print(f"[{self.symbol}] 连接异常 ({e}),{delay + jitter:.1f} 秒后重试...")
await asyncio.sleep(delay + jitter)
finally:
await session.close()
async def _heartbeat(self, ws):
"""心跳保活,每 25 秒发送一次 ping"""
while self.monitoring:
await asyncio.sleep(25)
try:
await ws.send_json({"cmd": "ping"})
except Exception:
break
async def _receive(self, ws):
"""接收并处理成交推送"""
async for msg in ws:
if not self.monitoring:
break
if msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
if data.get('type') == 'pong':
continue
self._process_trade(data)
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"[{self.symbol}] WebSocket 错误")
self.monitoring = False
break
def _process_trade(self, data: dict):
"""处理单条成交推送"""
try:
trade_data = data.get('data', {})
price = float(trade_data.get('p', 0))
volume = int(trade_data.get('v', 0))
side = trade_data.get('S', '') # 'buy' or 'sell'
ts = trade_data.get('ts', 0)
# 计算 tick 方向
if len(self.last_prices) >= 2:
prev_price = self.last_prices[-2]
if price > prev_price:
tick_dir = '+0'
elif price < prev_price:
tick_dir = '-0'
else:
tick_dir = '0'
else:
tick_dir = 'N'
self.last_prices.append(price)
# 分类成交类型(简化判断)
if tick_dir == 'N':
# 第一笔,无法判断
is_aggressive = False
elif side == 'buy':
is_aggressive = tick_dir == '-0' # 主动买但价格下跌
else:
is_aggressive = tick_dir == '+0' # 主动卖但价格上涨
# 更新统计
self.stats['total_trades'] += 1
if side == 'buy':
if is_aggressive:
self.stats['aggressive_buy'] += 1
else:
self.stats['passive_buy'] += 1
else:
if is_aggressive:
self.stats['aggressive_sell'] += 1
else:
self.stats['passive_sell'] += 1
# 添加到滑动窗口
self.recent_trades.append({
'ts': ts,
'price': price,
'volume': volume,
'side': side,
'tick_dir': tick_dir,
'is_aggressive': is_aggressive
})
# 定期输出状态(每 50 笔)
if self.stats['total_trades'] % 50 == 0:
self._emit_status()
self._check_alerts()
except Exception as e:
print(f"处理成交数据异常: {e}")
def _emit_status(self):
"""输出当前监控状态"""
total = self.stats['total_trades']
if total == 0:
return
buy_vol = self.stats['passive_buy'] + self.stats['aggressive_buy']
sell_vol = self.stats['passive_sell'] + self.stats['aggressive_sell']
pressure_ratio = buy_vol / sell_vol if sell_vol > 0 else float('inf')
aggression_ratio = (self.stats['aggressive_buy'] + self.stats['aggressive_sell']) / total
print(f"[{self.symbol}] {total} 笔 | "
f"买卖比 {pressure_ratio:.2f} | "
f"激进占比 {aggression_ratio:.1%} | "
f"被动买 {self.stats['passive_buy']} | "
f"被动卖 {self.stats['passive_sell']}")
def _check_alerts(self):
"""检查是否触发告警条件"""
total = self.stats['total_trades']
if total < 20:
return
buy_vol = self.stats['passive_buy'] + self.stats['aggressive_buy']
sell_vol = self.stats['passive_sell'] + self.stats['aggressive_sell']
pressure_ratio = buy_vol / sell_vol if sell_vol > 0 else float('inf')
# 检查买卖压力比异常
if pressure_ratio > self.alert_thresholds['pressure_ratio']:
print(f"🚨 告警:买卖压力比 {pressure_ratio:.2f} 超过阈值 "
f"({self.alert_thresholds['pressure_ratio']}),买方力量显著")
elif pressure_ratio < 1 / self.alert_thresholds['pressure_ratio']:
print(f"🚨 告警:买卖压力比 {pressure_ratio:.2f} 低于阈值 "
f"(反向),卖方力量显著")
def get_current_metrics(self) -> dict:
"""获取当前监控指标"""
total = self.stats['total_trades']
if total == 0:
return {}
buy_vol = self.stats['passive_buy'] + self.stats['aggressive_buy']
sell_vol = self.stats['passive_sell'] + self.stats['aggressive_sell']
return {
'total_trades': total,
'buy_volume': buy_vol,
'sell_volume': sell_vol,
'pressure_ratio': buy_vol / sell_vol if sell_vol > 0 else float('inf'),
'aggression_ratio': (
self.stats['aggressive_buy'] + self.stats['aggressive_sell']
) / total,
'passive_buy_ratio': self.stats['passive_buy'] / total,
'passive_sell_ratio': self.stats['passive_sell'] / total
}
async def main():
"""主函数:启动做市商行为监控"""
api_key = os.environ.get("TICKDB_API_KEY")
if not api_key:
raise ValueError("请设置 TICKDB_API_KEY 环境变量")
# 监控港股腾讯和数字货币 BTC(示例多标的监控)
symbols = ["700.HK", "BTC.USDT"]
monitors = [
MakerActivityMonitor(api_key, symbol)
for symbol in symbols
]
print(f"启动 {len(symbols)} 个监控实例:{symbols}")
# 并行运行所有监控
tasks = [monitor.connect() for monitor in monitors]
# 运行 5 分钟后停止
try:
await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True),
timeout=300
)
except asyncio.TimeoutError:
print("\n监控达到 5 分钟,停止...")
for monitor in monitors:
monitor.monitoring = False
if __name__ == "__main__":
asyncio.run(main())
⚠️ 部署提示:实际生产环境中,建议:
- 将告警推送集成到飞书/Slack/钉钉等 IM 工具
- 历史数据存入 TimescaleDB/InfluxDB 进行长期分析
- 多标的监控时使用信号量控制并发连接数
六、美股做市商策略的特殊考量
6.1 美股 vs 其他市场的关键差异
| 维度 | 美股 | 港股 | 数字货币 |
|---|---|---|---|
| 主要参与者 | 高频做市商(HFT)、共同基金 | 本地机构、做市商 | 交易所、量化基金 |
| 订单簿深度 | 典型 1-5 档(TickDB depth 支持) | 10 档深度 | 10-50 档深度 |
| 成交确认速度 | ~10ms | ~50ms | ~5ms |
| 价差特性 | Penny increments(0.01 美元) | 0.05 港币起 | 交易所自定义 |
| 特殊事件 | 财报、指数再平衡 | 涡轮牛熊证影响 | 减半、交易所维护 |
6.2 美股 tick 数据供应商
由于 TickDB 当前不支持美股逐笔数据,以下是常见的美股数据源:
| 供应商 | 数据类型 | 延迟 | 备注 |
|---|---|---|---|
| Polygon.io | Level 1 / Trades | 实时 | 有免费层,支持 WebSocket |
| NYSE TAQ | 历史 tick | T+1 | 学术研究首选 |
| Nasdaq TotalView | Level 2 + ITCH | 实时 | 机构级别 |
| IEX Cloud | Trades + Quotes | 实时 | 相对便宜 |
建议路径:若你的策略需要美股 + 港股/数字货币的混合研究,可以使用 Polygon 获取美股数据,用 TickDB 获取港股/数字货币数据,方法论保持一致。
七、策略框架:基于做市商行为的交易思路
7.1 策略一:被动单衰竭检测
逻辑:当被动型做市商在某一价位积累了大量订单后,如果这些订单开始被连续"吃掉"但价格未动,说明存在更强的对手方在吸收流动性。
入场信号:
- 连续 5+ 笔成交在同一价位
- 买卖压力比 > 2.0
- tick 方向从
+0转为-0(价格从上涨转为下跌)
出场:价格反向突破被动单累积价位 ± 0.5 档
7.2 策略二:激进单突破确认
逻辑:激进型机构推动价格突破关键位后,被动型做市商会调整报价,形成新的支撑/阻力。
入场信号:
- 大单(>1000 股)快速推动价格突破 20 日高点
- 突破后 5 分钟内出现被动买盘(
+0tick) - 订单簿深度在突破位上方快速增加
仓位管理:激进单突破 → 顺势追入 → 被动单衰竭 → 减仓/止盈
7.3 策略三:库存偏移度均值回归
逻辑:做市商不会长期持有单向库存,会通过报价调整回归中性。当库存偏移度达到极端值时,价格存在均值回归压力。
入场信号:
- 库存偏移度持续 > 0.4 或 < -0.4 超过 10 分钟
- 偏移方向与当前趋势相反(做市商在"逆势"积累库存)
假设:做市商的信息优势使其能准确判断短期价格方向,其库存行为是反向指标。
八、数据能力边界说明
作为 TickDB 内容战略专家,必须如实说明产品的能力边界:
8.1 TickDB 当前支持的数据类型
| 市场 | trades(逐笔成交) | depth(订单簿深度) | kline(历史 K 线) |
|---|---|---|---|
| 美股 | ❌ 不支持 | 1 档 | ✅ 10 年级别 |
| 港股 | ✅ 支持 | 10 档 | ✅ 5 年级别 |
| 数字货币 | ✅ 支持 | 10-50 档 | ✅ 全量历史 |
| A股 | ❌ 不支持 | ❌ 不支持 | ✅ 支持 |
8.2 如何选择数据源
| 你的研究目标 | 推荐数据源组合 |
|---|---|
| 港股/数字货币高频策略 | TickDB 全套(trades + depth) |
| 美股长周期回测 | TickDB kline + 第三方 tick 数据 |
| 跨市场比较研究 | TickDB(港股/数字货币)+ Polygon(美股) |
| 期权做市商行为 | 需对接期权数据供应商(OVAL、CBOE) |
下一步行动
如果你希望深入研究港股/数字货币的做市商行为:
- 访问 tickdb.ai 注册(免费层包含 10,000 次 API 调用)
- 在控制台获取 API Key,设置环境变量
TICKDB_API_KEY - 复制本文代码,在本地运行观察实际数据模式
- 关注 TickDB 公众号,获取市场微观结构系列更新
如果你需要美股逐笔数据:
- 建议对接 Polygon.io(适合个人研究者)或 NYSE TAQ(适合学术研究)
- 本文的方法论框架可完全迁移,仅需调整数据接口
如果你习惯用 AI 辅助开发:
- 在 AI 助手中搜索安装
tickdb-market-dataSKILL - 可直接用自然语言描述需求,AI 会调用 TickDB API 获取数据
结语
订单流是市场的心电图。价格是已经发生的博弈结果,而逐笔成交记录了博弈的过程。做市商是这场博弈中最重要的被动参与者——他们的报价行为、库存管理、对冲节奏,都隐藏在高频数据中。
识别做市商不是为了"跟随庄家",而是为了理解流动性的供给结构。当你知道被动买盘正在某个价位积累时,你可以判断突破的概率;当你发现激进卖盘正在扫货时,你可以预估回调的深度。
这不是预测未来的水晶球,而是解读当下市场状态的放大镜。
风险提示:本文不构成任何投资建议。做市商行为分析属于高频策略范畴,存在以下风险:数据延迟导致信号失效、极端行情下流动性枯竭、交易成本侵蚀利润、市场结构变化影响策略有效性。历史回测结果不代表未来表现,请在充分测试后谨慎使用。