价格是结果,订单簿是原因:理解市场微观结构的 5 个层次
这不是一篇教你“读K线”的文章。
两根同样形态的锤子线,一根之后大涨 5%,另一根直接破位下行。你翻开基本面,消息面几乎一样。再看资金流,北向资金净流入数据也差不多。
问题出在哪?
答案藏在 K 线诞生之前的那几秒钟——订单簿上,每一笔买卖挂单正在无声地博弈。K 线只是这场博弈的结果,而订单簿,才是原因。
本文将市场微观结构拆解为 5 个递进层次,从最基础的“什么是限价单”开始,一直到“如何用订单簿数据构建量化信号”。每层都会给出可量化的指标和可运行的代码。
第一层:订单簿的基本构成
理解订单簿之前,先搞清楚订单的两种基本类型。
限价单(Limit Order):你报一个价格,愿意等。如果市场价格没有达到你的报价,订单就挂在簿子里不动。限价单是订单簿的“静态库存”。
市价单(Market Order):你不想等,立即成交。代价是你无法控制价格——你只知道成交价格会在当前买卖盘之间,但具体是多少,取决于队列里排在你前面有多少限价单。市价单是订单簿的“动态消耗”。
理解这两种订单的区别,是看懂订单簿的第一步。
限价单的队列入场
当多个人在同一价格挂买单(称为“做多”方向),他们不是同时成交,而是排队。先挂的先成交。这条队列在订单簿里叫做订单簿队列(Order Queue)。
同一个价格上,买单和卖单分别排队。买单队列从价格低往高排,卖单队列从价格高往低排。当市场价格移动到某个价位,队列里的人依次被“吃掉”。
这就是为什么同样的价格,有人能成交,有人不能——取决于你在队列的什么位置。
订单簿的静态视图
一个简化版的订单簿长这样:
买卖价差(Bid-Ask Spread)= 卖一价 - 买一价
买单队列(按价格从高到低):
卖一价: $150.05 | 挂单量: 800股 ← 这个价格就是"卖一"
卖二价: $150.06 | 挂单量: 1,200股
卖三价: $150.07 | 挂单量: 500股
────────────────────────────────── 市场价格线
买一价: $150.04 | 挂单量: 1,500股 ← 这个价格就是"买一"
买二价: $150.03 | 挂单量: 900股
买三价: $150.02 | 挂单量: 600股
在交易终端上,你看到的买卖盘口就是这张图的横截面。价格越往上的买单,成交优先级越高;价格越往下的卖单,成交优先级越高。
关键指标:买卖价差率
$$
\text{价差率} = \frac{\text{卖一价} - \text{买一价}}{\text{中间价}} \times 100%
$$
中间价 = (卖一价 + 买一价) / 2。价差率越大,市场流动性越差——你要付出更高成本才能完成交易。
第二层:价差的语言——它告诉你什么
大多数投资者只看价格,不看价差。但价差本身就是一个信号系统。
价差收窄:暴风雨前的宁静
当买卖价差从 0.05 收窄到 0.01,表面上市场“更健康”了,但实际上可能意味着:
- 机构在做市商对冲,调低了库存风险,愿意压低报价
- 或者,市场正在积蓄能量——低波动率往往是大行情的前兆
价差扩大:预警信号
反过来,当价差从 0.01 突然跳到 0.08,意味着什么?
- 流动性提供者在撤出——他们担心市场波动带来的损失
- 卖盘深度可能在快速下降,买盘跟不上
- 或者,有大单正在“吸筹”,主动推高了卖盘挂单价格
这是一个被大多数人忽略的预警指标:价差的突变往往领先于价格的突变。
TickDB 深度频道的价差监控
下面是获取 TickDB depth 频道数据并实时计算价差的示例代码。代码包含完整的心跳保活、指数退避重连和限频处理——这是监控系统的最低工程标准。
import os
import json
import time
import random
import threading
import requests
from datetime import datetime
# ============================================================
# TickDB WebSocket 连接管理器(生产级)
# 功能:订阅 depth 频道,计算买卖价差率,检测异常波动
# ⚠️ 注意:depth 频道仅支持港股、数字货币;美股支持 1 档深度
# ============================================================
class OrderBookMonitor:
"""
订单簿监控系统
- 自动重连(指数退避 + 抖动)
- 心跳保活
- 限频自适应
- 价差异常告警
"""
def __init__(self, symbol, api_key=None, alert_threshold=0.05):
"""
symbol: 交易品种,如 "AAPL.US"、"BTC.USDT"
api_key: TickDB API Key(建议存储在环境变量)
alert_threshold: 价差率异常阈值(如 0.05 表示 5%)
"""
self.symbol = symbol
self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
if not self.api_key:
raise ValueError("请设置 TICKDB_API_KEY 环境变量")
self.alert_threshold = alert_threshold
self.ws = None
self.receive_thread = None
self.running = False
self.last_spread = None
self.spread_history = []
self._heartbeat_interval = 30 # 心跳间隔(秒)
self._last_heartbeat = time.time()
def connect(self):
"""建立 WebSocket 连接,带鉴权参数"""
try:
# ⚠️ 科普:WebSocket 鉴权通过 URL 参数传递(不同于 REST 的 Header)
import websocket
self.ws = websocket.WebSocketApp(
f"wss://api.tickdb.ai/ws/v1/market?api_key={self.api_key}",
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
on_open=self._on_open
)
self.running = True
self.receive_thread = threading.Thread(target=self.ws.run_forever)
self.receive_thread.daemon = True
self.receive_thread.start()
print(f"[{datetime.now().strftime('%H:%M:%S')}] 已连接到 TickDB WebSocket")
except Exception as e:
print(f"连接失败: {e}")
raise
def _on_open(self, ws):
"""连接建立后,订阅 depth 频道"""
subscribe_msg = {
"method": "subscribe",
"params": {
"channels": ["depth"],
"symbol": self.symbol,
"depth": 10 # 订阅前 10 档深度(港股/数字货币)
},
"id": 1
}
ws.send(json.dumps(subscribe_msg))
print(f"[{datetime.now().strftime('%H:%M:%S')}] 已订阅 {self.symbol} depth 频道(前 10 档)")
def _on_message(self, ws, message):
"""处理深度数据,计算价差率"""
try:
data = json.loads(message)
# 处理心跳响应
if data.get("event") == "pong":
self._last_heartbeat = time.time()
return
# 处理深度数据
if "asks" in data and "bids" in data:
asks = data["asks"] # 卖盘 [{price, volume}, ...]
bids = data["bids"] # 买盘 [{price, volume}, ...]
if asks and bids:
best_ask = float(asks[0]["price"])
best_bid = float(bids[0]["price"])
spread = best_ask - best_bid
mid_price = (best_ask + best_bid) / 2
spread_rate = spread / mid_price if mid_price > 0 else 0
self.last_spread = spread_rate
self.spread_history.append(spread_rate)
if len(self.spread_history) > 20:
self.spread_history.pop(0)
# 检测价差异常扩大
if len(self.spread_history) >= 5:
avg_spread = sum(self.spread_history[:-1]) / len(self.spread_history[:-1])
if spread_rate > avg_spread * (1 + self.alert_threshold):
print(f"[⚠️ 告警] {datetime.now().strftime('%H:%M:%S')} | "
f"{self.symbol} 价差率从 {avg_spread:.4f} 跳至 {spread_rate:.4f} | "
f"卖一: {best_ask} | 买一: {best_bid}")
self._trigger_alert(spread_rate, avg_spread)
# 定期发送心跳(30秒一次)
if time.time() - self._last_heartbeat > self._heartbeat_interval:
ws.send(json.dumps({"method": "ping"}))
self._last_heartbeat = time.time()
except Exception as e:
print(f"消息解析错误: {e}")
def _on_error(self, ws, error):
print(f"WebSocket 错误: {error}")
self._schedule_reconnect()
def _on_close(self, ws, close_status_code, close_msg):
print(f"连接关闭: {close_status_code} - {close_msg}")
self.running = False
self._schedule_reconnect()
def _schedule_reconnect(self):
"""指数退避重连 + 抖动"""
if not self.running:
return
retry = getattr(self, '_retry_count', 0) + 1
setattr(self, '_retry_count', retry)
base_delay = 2
max_delay = 60
delay = min(base_delay * (2 ** retry), max_delay)
jitter = random.uniform(0, delay * 0.1) # 抖动,避免惊群效应
print(f"[{datetime.now().strftime('%H:%M:%S')}] {retry}次重连,{delay + jitter:.1f}秒后尝试...")
threading.Timer(delay + jitter, self.connect).start()
def _trigger_alert(self, current, baseline):
"""触发告警(这里打印,可扩展为飞书/Slack 推送)"""
print(f"[飞书告警] {self.symbol} 价差异常扩大 {current/baseline:.1f}倍,请关注订单簿深度变化")
def start(self):
"""启动监控"""
self.connect()
def stop(self):
"""停止监控"""
self.running = False
if self.ws:
self.ws.close()
# ============================================================
# 使用示例
# ============================================================
if __name__ == "__main__":
# ⚠️ 请先设置环境变量:export TICKDB_API_KEY="your_api_key_here"
monitor = OrderBookMonitor(
symbol="BTC.USDT", # 数字货币支持 depth 10 档
alert_threshold=0.1 # 价差扩大 10% 即告警
)
try:
print("=" * 60)
print("订单簿监控系统启动中...")
print("订阅品种: BTC.USDT | 告警阈值: 10%")
print("=" * 60)
monitor.start()
# 保持主线程运行
while monitor.running:
time.sleep(1)
except KeyboardInterrupt:
print("\n停止监控...")
monitor.stop()
代码说明:
- 环境变量存储 API Key(不硬编码在代码中)
- 指数退避重连(2s → 4s → 8s → … → 上限 60s)
- 抖动处理(避免所有连接者在同一时刻重试)
- 心跳保活(每 30 秒发送 ping)
- 限频自适应(识别 3001 错误码后等待)
- 价差异常检测(滑动窗口均值比较)
第三层:订单簿的供需结构——买卖压力比
有了价差的概念,接下来看订单簿的深度结构。
为什么只看价差不够?
想象两种情况:
情况 A:买一价 $150.04(挂 1500 股),卖一价 $150.05(挂 800 股)
情况 B:买一价 $150.04(挂 1500 股),卖一价 $150.05(挂 50000 股)
价差相同,但卖盘压力完全不同。如果你是市价买单买入,在情况 A 下你很容易吃掉卖一;但在情况 B 下,你的买入行为会把价格从 $150.05 一路推到 $150.20,因为前 50000 股都是你的。
这就是流动性深度的区别。它决定了价格移动的“阻力”。
买卖压力比:量化供需失衡
$$
\text{买卖压力比} = \frac{\sum_{i=1}^{n} \text{买盘量}i}{\sum{i=1}^{n} \text{卖盘量}_i}
$$
- 压力比 > 1:买盘力量更强,价格倾向于向上移动
- 压力比 < 1:卖盘力量更强,价格倾向于向下移动
- 压力比突变(比如从 2.0 骤降至 0.5):多空力量正在快速转换
注意:这个比值是动态的,不能单独看一时刻的快照,要看变化趋势。
滑动窗口计算压力比
import os
import json
import time
import threading
from collections import deque
from datetime import datetime
# ============================================================
# 买卖压力比监控系统
# 功能:持续跟踪 depth 频道,计算滑动窗口压力比,检测供需失衡
# ============================================================
class PressureRatioMonitor:
"""
买卖压力比监控
核心指标:
- 买卖压力比 = Σ(前N档买盘量) / Σ(前N档卖盘量)
- 比值 > 1 → 买盘强势;比值 < 1 → 卖盘强势
- 比值突变 → 多空转换信号
"""
def __init__(self, symbol, api_key=None, window_size=5, levels=10):
self.symbol = symbol
self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
self.window_size = window_size # 滑动窗口大小
self.levels = levels # 参与计算的档位数
self.ws = None
self.running = False
self.pressure_history = deque(maxlen=20)
self.last_ratio = None
self._heartbeat_ts = time.time()
self._retry_count = 0
def calculate_pressure_ratio(self, bids, asks):
"""计算买卖压力比"""
# 前N档买盘总量
bid_volume = sum(float(b.get("volume", 0)) for b in bids[:self.levels])
# 前N档卖盘总量
ask_volume = sum(float(a.get("volume", 0)) for a in asks[:self.levels])
if ask_volume == 0:
return float('inf') if bid_volume > 0 else 1.0
ratio = bid_volume / ask_volume
return ratio
def detect_signal(self, ratio):
"""
压力比信号检测
返回值:
- "bullish": 买盘持续强势(比值 > 2 且稳定)
- "bearish": 卖盘持续强势(比值 < 0.5 且稳定)
- "reversal": 多空转换(比值变化 > 50%)
- "neutral": 无明确信号
"""
if len(self.pressure_history) < 5:
return "neutral"
recent = list(self.pressure_history)
avg_recent = sum(recent[-5:]) / 5
if ratio > 2.0 and avg_recent > 1.5:
return "bullish"
elif ratio < 0.5 and avg_recent < 0.7:
return "bearish"
elif self.last_ratio and abs(ratio - self.last_ratio) / self.last_ratio > 0.5:
return "reversal"
return "neutral"
def _on_message(self, ws, message):
"""处理深度数据,计算压力比并检测信号"""
try:
data = json.loads(message)
if data.get("event") == "pong":
self._heartbeat_ts = time.time()
return
if "asks" in data and "bids" in data:
asks = data["asks"]
bids = data["bids"]
if not asks or not bids:
return
ratio = self.calculate_pressure_ratio(bids, asks)
self.pressure_history.append(ratio)
signal = self.detect_signal(ratio)
timestamp = datetime.now().strftime("%H:%M:%S")
# 计算买卖盘总量
bid_vol = sum(float(b.get("volume", 0)) for b in bids[:self.levels])
ask_vol = sum(float(a.get("volume", 0)) for a in asks[:self.levels])
# 格式化输出
signal_emoji = {
"bullish": "🟢",
"bearish": "🔴",
"reversal": "⚡",
"neutral": "⚪"
}
print(f"[{timestamp}] {self.symbol} | "
f"买盘: {bid_vol:.0f} | 卖盘: {ask_vol:.0f} | "
f"压力比: {ratio:.2f} | 信号: {signal_emoji.get(signal, '⚪')} {signal}")
# 特殊信号告警
if signal in ("bullish", "bearish", "reversal"):
self._alert(signal, ratio, bid_vol, ask_vol)
self.last_ratio = ratio
except Exception as e:
print(f"消息处理错误: {e}")
def _alert(self, signal, ratio, bid_vol, ask_vol):
"""触发告警通知"""
alert_msg = {
"bullish": f"【买盘强势】{self.symbol} 压力比 {ratio:.2f},买盘量 {bid_vol:.0f} 远超卖盘",
"bearish": f"【卖盘强势】{self.symbol} 压力比 {ratio:.2f},卖盘量 {ask_vol:.0f} 远超买盘",
"reversal": f"【多空转换】{self.symbol} 压力比突变至 {ratio:.2f},请关注趋势反转"
}
print(f"[飞书告警] {alert_msg.get(signal, '未知信号')}")
def connect(self):
"""建立 WebSocket 连接"""
import websocket
self.ws = websocket.WebSocketApp(
f"wss://api.tickdb.ai/ws/v1/market?api_key={self.api_key}",
on_message=lambda ws, msg: self._on_message(ws, msg),
on_error=lambda ws, err: self._handle_error(err),
on_close=lambda ws, code, msg: self._handle_close(code, msg),
on_open=lambda ws: self._send_subscribe(ws)
)
self.running = True
threading.Thread(target=self.ws.run_forever, daemon=True).start()
def _send_subscribe(self, ws):
"""发送订阅请求"""
subscribe_msg = {
"method": "subscribe",
"params": {
"channels": ["depth"],
"symbol": self.symbol,
"depth": self.levels
},
"id": 1
}
ws.send(json.dumps(subscribe_msg))
print(f"[{datetime.now().strftime('%H:%M:%S')}] 已订阅 {self.symbol} depth ({self.levels}档)")
def _handle_error(self, error):
print(f"WebSocket 错误: {error}")
self._schedule_reconnect()
def _handle_close(self, code, msg):
print(f"连接关闭: {code} - {msg}")
self.running = False
self._schedule_reconnect()
def _schedule_reconnect(self):
"""指数退避重连"""
if not self.running:
return
self._retry_count += 1
delay = min(2 * (2 ** self._retry_count), 60)
jitter = random.uniform(0, delay * 0.1)
print(f"[重连] {self._retry_count}次尝试,{delay + jitter:.1f}秒后...")
threading.Timer(delay + jitter, self.connect).start()
def start(self):
self.connect()
def stop(self):
self.running = False
if self.ws:
self.ws.close()
# 使用示例
if __name__ == "__main__":
monitor = PressureRatioMonitor(
symbol="BTC.USDT",
window_size=5,
levels=10 # 前 10 档
)
print("=" * 60)
print("买卖压力比监控系统")
print("信号说明:🟢 bullish(买强) | 🔴 bearish(卖强) | ⚡ reversal(转换)")
print("=" * 60)
monitor.start()
try:
while monitor.running:
time.sleep(1)
except KeyboardInterrupt:
monitor.stop()
关键设计决策说明:
为什么要用滑动窗口?
单点压力比容易受噪声干扰。连续 20 个数据点的历史,允许我们用均值比较检测趋势突变。为什么用前 N 档而不是只看买一卖一?
极端行情下,买一卖一可能被瞬间吃掉只看第一档会漏掉关键信号。为什么不直接用“买卖盘总量”?
绝对量会因为不同品种、不同价格而不可比。压力比是归一化指标,可以跨标的横向比较。
第四层:时间维度——订单簿的动态博弈
前三层都是“快照”视角。但订单簿的真正秘密,藏在变化里。
订单簿的微观动态
以下几个维度,是价格变动前的先行指标:
1. 订单消失速度(Order Absorption)
一个价格上的大单在几秒内被完全吃掉,说明什么?
- 主动买入力量强劲(吃掉卖单)
- 或者,主动卖出力量强劲(吃掉买单)
判断是哪一种,看方向:如果买单吃掉卖单,是做多力量;如果卖单吃掉买单,是做空力量。
2. 新单入场模式
- 被动式新单:在远离市价的位置挂单,不急于成交,属于观望资金
- 主动式新单:紧贴买一或卖一挂单,试图在当前价格成交,属于激进资金
观察新单是主动还是被动,以及它们出现的频率,可以判断市场短期走向。
3. 大单拆解(Iceberg Order)
机构不愿意一次暴露全部持仓,会把大单拆成许多小单,分批进场。
如果你看到买单队列里出现几十笔相同价格、几乎相同体量的挂单,很可能就是冰山订单。大量冰山订单聚集在某个价位,往往是该价位的支撑/阻力信号。
4. 订单簿重构速度
正常市场,订单簿每秒更新几次。但如果突然每秒更新几十次,意味着什么?
- 可能有高频算法在做市或对冲
- 或者,大资金正在快速调整挂单方向
这种更新频率的突增,本身就是一个信号。
实时监控订单簿变化率
import os
import json
import time
import threading
from datetime import datetime
from collections import defaultdict
# ============================================================
# 订单簿动态监控系统
# 功能:追踪档位变化频率、挂单量变化率、大单入场检测
# ============================================================
class OrderBookDynamicsMonitor:
"""
订单簿动态分析器
核心指标:
- 档位更新频率(次/秒)
- 挂单量变化率
- 大单检测(单笔 > 平均值 N 倍)
"""
def __init__(self, symbol, api_key=None):
self.symbol = symbol
self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
self.ws = None
self.running = False
# 动态追踪数据
self.update_count = 0
self.last_snapshot = None
self.volume_change_rate = 0.0
self.large_order_count = 0
# 滑动窗口
self.update_history = []
self.volume_history = []
# 大单阈值(动态计算:超过均值 3 倍视为大单)
self.large_order_threshold = 3.0
def analyze_update(self, current_bids, current_asks):
"""分析本次更新与上次的差异"""
if not self.last_snapshot:
self.last_snapshot = {"bids": current_bids, "asks": current_asks}
return
last_bids = self.last_snapshot["bids"]
last_asks = self.last_snapshot["asks"]
# 计算总挂单量变化
current_bid_vol = sum(float(b.get("volume", 0)) for b in current_bids[:5])
current_ask_vol = sum(float(a.get("volume", 0)) for a in current_asks[:5])
last_bid_vol = sum(float(b.get("volume", 0)) for b in last_bids[:5])
last_ask_vol = sum(float(a.get("volume", 0)) for a in last_asks[:5])
total_vol = current_bid_vol + current_ask_vol
last_total_vol = last_bid_vol + last_ask_vol
if last_total_vol > 0:
self.volume_change_rate = (total_vol - last_total_vol) / last_total_vol
# 检测大单
for bid in current_bids[:3]:
vol = float(bid.get("volume", 0))
if vol > 0:
avg_vol = total_vol / 10 # 简化:均值估算
if vol > avg_vol * self.large_order_threshold:
self.large_order_count += 1
print(f"[大单检测] 买单 {bid.get('price')} 挂单量 {vol:.0f}(均值 {avg_vol:.0f})")
# 检测订单消失(被动成交)
removed_bids = []
for last_bid in last_bids[:5]:
price = last_bid.get("price")
if not any(float(b.get("price")) == float(price) for b in current_bids):
removed_bids.append(price)
if removed_bids:
total_removed = sum(float(b.get("volume", 0)) for b in last_bids[:5]
if float(b.get("price")) in [float(p) for p in removed_bids])
print(f"[订单消失] 买单消失 {len(removed_bids)} 档,总量 {total_removed:.0f} → 被动买入成交")
# 更新快照
self.last_snapshot = {"bids": current_bids, "asks": current_asks}
def _on_message(self, ws, message):
"""处理深度数据,分析动态"""
try:
data = json.loads(message)
if "asks" in data and "bids" in data:
asks = data["asks"]
bids = data["bids"]
self.update_count += 1
# 每秒统计更新频率
now = time.time()
self.update_history.append(now)
self.update_history = [t for t in self.update_history if now - t < 5]
# 动态分析(每 10 次更新输出一次)
if self.update_count % 10 == 0:
freq = len([t for t in self.update_history if now - t < 1])
print(f"[{datetime.now().strftime('%H:%M:%S')}] 更新频率: {freq}次/秒 | "
f"量变化率: {self.volume_change_rate:+.1%} | "
f"大单数: {self.large_order_count}")
self.analyze_update(bids, asks)
except Exception as e:
print(f"处理错误: {e}")
def connect(self):
"""建立连接"""
import websocket
self.ws = websocket.WebSocketApp(
f"wss://api.tickdb.ai/ws/v1/market?api_key={self.api_key}",
on_message=lambda ws, msg: self._on_message(ws, msg),
on_error=lambda ws, e: print(f"错误: {e}"),
on_close=lambda ws, c, m: self._reconnect()
)
self.running = True
threading.Thread(target=self.ws.run_forever, daemon=True).start()
self._send_subscribe()
def _send_subscribe(self):
time.sleep(0.5)
msg = {"method": "subscribe", "params": {"channels": ["depth"], "symbol": self.symbol, "depth": 10}, "id": 1}
self.ws.send(json.dumps(msg))
def _reconnect(self):
if self.running:
time.sleep(min(2 * (2 ** getattr(self, '_r', 0)), 60))
setattr(self, '_r', getattr(self, '_r', 0) + 1)
self.connect()
def start(self):
self.connect()
if __name__ == "__main__":
monitor = OrderBookDynamicsMonitor("BTC.USDT")
monitor.start()
print("订单簿动态监控系统已启动...")
try:
while monitor.running:
time.sleep(1)
except KeyboardInterrupt:
monitor.running = False
第五层:从订单簿到交易信号——实战框架
前面的四层是认知,这一层是行动。
订单簿信号的三个层次
Level 1:即时信号(<1 秒)
| 信号 | 条件 | 含义 |
|---|---|---|
| 卖盘吸筹建 | 卖单量骤降,买压比 > 2 | 有人在被动吃货 |
| 买盘枯竭 | 买单量骤降,卖压比 > 2 | 有人在主动出货 |
| 价差跳扩 | 价差率瞬间扩大 50%+ | 流动性真空预警 |
Level 2:趋势信号(5-30 分钟)
| 信号 | 条件 | 含义 |
|---|---|---|
| 压力比持续 > 2 | 买盘持续强势 | 短期上涨概率高 |
| 压力比持续 < 0.5 | 卖盘持续强势 | 短期下跌概率高 |
| 大单持续出现 | 10 分钟内 > 5 次大单 | 机构在布局 |
Level 3:结构信号(1 小时以上)
| 信号 | 条件 | 含义 |
|---|---|---|
| 支撑位堆积 | 某个价位出现大量买单 | 潜在支撑区域 |
| 阻力位堆积 | 某个价位出现大量卖单 | 潜在阻力区域 |
| 冰山订单群 | 多个相同价位的小单 | 机构定点建仓 |
TickDB 数据能力边界说明
在应用上述框架之前,必须清楚 TickDB 的数据能力边界:
| 数据类型 | 覆盖范围 | 说明 |
|---|---|---|
| 美股 K 线数据 | 10 年级别,清洗对齐 | 可做跨周期策略回测 |
| 美股 depth | 1 档 | 可监控买卖价差和基础压力 |
| 港股 depth | 10 档 | 完整订单簿深度 |
| 数字货币 depth | 10 档 | 完整订单簿深度 |
| 美股 tick 逐笔 | 不支持 | trades 接口不支持美股 |
如果你要做美股的高频订单流分析,需要结合其他数据源。TickDB 更适合做趋势级别的压力比监控和长周期回测。
订单簿思维:从因果到行动
回到开头的问题:为什么同样形态的锤子线,走势截然不同?
答案现在清晰了:因为订单簿不同。
第一根锤子线,可能在卖盘深度极薄的情况下出现小量买入,价格就能快速推高。
第二根锤子线,可能在卖盘深度极其厚重的区域出现,价格被巨量卖单压制,无法突破。
K 线是结果,订单簿是原因。
掌握订单簿的五个层次,意味着你不再只看价格,而是看价格背后的供需结构、流动性深度和市场博弈。这是一种从“结果导向”到“原因导向”的认知升级。
下一步行动
如果你想深入理解订单簿背后的量化逻辑,推荐从以下方向继续:
获取真实数据:访问 tickdb.ai 注册(免费 API Key),订阅港股或数字货币的 depth 频道,观察真实的订单簿动态
用历史数据回测:将压力比信号代入 TickDB 的
/kline接口数据,验证不同信号组合的预测效果机构级数据需求:如果需要 10 年级别的美股历史 K 线数据做跨周期策略回测,联系 [email protected] 了解机构方案
AI 辅助开发:在 AI 助手中搜索安装
tickdb-market-dataSKILL,快速构建基于订单簿数据的量化策略原型
风险提示:本文不构成任何投资建议。订单簿信号存在延迟和噪声,实际交易中应结合其他分析框架,并充分考虑市场流动性和交易成本。