订单簿买卖压力比:从 depth 数据到交易信号的完整实现

价格是投资者看到的最终结果,而订单簿是所有结果的源头。

每一笔成交价背后,都有买方想要买入的数量、卖方想要卖出的数量、以及他们各自愿意接受的价格档位。盘口上密密麻麻的数字不只是静态的挂单,更是一个实时博弈的快照。但当你打开 Level 2 行情,看着十几档甚至几十档的挂单量,你可能会陷入一种熟悉的困境:数据太多,信号太少。

这就是买卖压力比(Bid-Ask Pressure Ratio)存在的意义。它的核心逻辑很简单:将复杂的订单簿多档数据压缩成一个介于 0 到 1 之间的数值,让你能够一眼判断当前市场是买方主导还是卖方主导。但简单的逻辑背后有大量工程细节需要处理:多档如何加权、阈值如何动态调整、如何避免在流动性枯竭时产生虚假信号、以及最重要的——如何把这个因子接入回测框架,验证它是否真的有效。

本文从订单簿微观结构出发,推导出加权压力比的数学表达式,然后给出生产级的 WebSocket 数据获取代码,最后展示如何将它封装成可回测的因子,并与简单均线策略叠加后观察效果。整个实现基于美股市场,代码可直接运行。


一、为什么单一档位数据不够用

在讨论压力比之前,我们需要先理解为什么只看买一和卖一会丢失关键信息。

假设某只股票的盘口如下:

档位 买盘挂单量 卖盘挂单量 买价 卖价
L1 15,000 12,000 99.95 100.05
L2 8,000 9,500 99.90 100.10
L3 6,000 7,000 99.85 100.15

如果只看第一档,买方压力约为卖方的 1.25 倍。但当我们把三档都纳入计算后,情况可能完全不同。假设三档买盘总量 29,000,接近三档卖盘总量 28,500,压力比接近 1.0,说明市场实际上相当均衡。

更关键的是,第一档的数据往往充满噪声。大型做市商会在关键价位挂出大单但随时撤单,这就是所谓的“虚假报价”(Quote Stuffing)。只看 L1 数据会被这种操作频繁误导。

因此,一个健壮的买卖压力指标必须满足两个条件:

  1. 多档聚合:至少覆盖前 5 档,最优 10 档以上(视市场流动性而定)
  2. 档位加权:近档权重高,远档权重低,反映价格敏感性的自然衰减

二、加权压力比的数学推导

2.1 基础压力比

最简单的多档压力比定义为:

$$
P_0 = \frac{\sum_{i=1}^{N} BidVol_i}{\sum_{i=1}^{N} BidVol_i + \sum_{i=1}^{N} AskVol_i}
$$

其中 $N$ 是考虑的档位数,$BidVol_i$ 和 $AskVol_i$ 分别是第 $i$ 档的买盘和卖盘挂单量。这个指标的范围是 $[0, 1]$:

  • $P_0 > 0.5$:买方主导
  • $P_0 < 0.5$:卖方主导
  • $P_0 = 0.5$:均衡

2.2 指数加权压力比

基础压力比有一个缺陷:它对所有档位一视同仁。但直觉告诉我们,靠近买卖价的档位对短期价格运动有更大的预测能力。

引入档位权重 $w_i$:

$$
w_i = e^{-\lambda \cdot (i-1)}
$$

其中 $\lambda > 0$ 是衰减参数。$i=1$ 时 $w_1 = 1$,每增加一档,权重乘以 $e^{-\lambda}$。

加权压力比:

$$
P_w = \frac{\sum_{i=1}^{N} w_i \cdot BidVol_i}{\sum_{i=1}^{N} w_i \cdot BidVol_i + \sum_{i=1}^{N} w_i \cdot AskVol_i}
$$

实践中 $\lambda$ 的选择取决于市场特性。对于流动性好的大盘股,$\lambda$ 可以设得大一些(0.3-0.5),因为远档数据相关性低;对于小盘股或期权,$\lambda$ 建议设小一些(0.1-0.2),因为流动性集中在远档的情况更常见。

2.3 深度归一化压力比

还有一个问题:不同股票的绝对挂单量差异巨大。苹果和一只仙股的挂单量可能相差上千倍,直接比较没有意义。我们需要对压力比做时间序列归一化:

$$
P_{norm}(t) = \frac{P_w(t) - \mu_{P_w}}{\sigma_{P_w}}
$$

其中 $\mu_{P_w}$ 和 $\sigma_{P_w}$ 是过去 $M$ 个周期滚动窗口内的均值和标准差。这个标准化后的压力比可以直接用于跨股票比较,也可以用于设置动态阈值。


三、生产级数据获取:WebSocket 与 depth 频道

压力比计算的前提是获取高质量的多档订单簿数据。本节给出完整的 WebSocket 订阅代码,包含所有生产级要素。

3.1 WebSocket 连接管理

import os
import json
import time
import asyncio
import aiohttp
import random
import numpy as np
from collections import deque
from datetime import datetime


class DepthWebSocketClient:
    """
    订单簿 WebSocket 客户端
    包含:心跳保活、指数退避重连、限频自适应处理
    
    ⚠️ 适用于中低频策略监控。更高频率场景建议使用 C++/Rust 重建。
    """
    
    def __init__(
        self,
        api_key: str = None,
        symbols: list = None,
        depth_levels: int = 10,
        lambda_decay: float = 0.3,
        normalization_window: int = 100,
    ):
        # 从环境变量读取 API Key,勿硬编码
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("API Key 未设置。请设置环境变量 TICKDB_API_KEY")
        
        self.symbols = symbols or []
        self.depth_levels = depth_levels  # 获取档位数
        self.lambda_decay = lambda_decay  # 指数加权衰减参数
        self.norm_window = normalization_window  # 归一化滚动窗口
        
        # 订单簿缓存
        self.order_books = {}  # {symbol: {'bids': [], 'asks': []}}
        
        # 压力比历史(用于滚动归一化)
        self.pressure_history = {}  # {symbol: deque}
        
        # WebSocket 连接状态
        self._ws = None
        self._session = None
        self._running = False
        self._reconnect_delay = 1
        self._max_reconnect_delay = 60
        
    def _calculate_weighted_pressure(self, bids: list, asks: list) -> float:
        """
        计算加权买卖压力比
        
        Args:
            bids: 买盘列表 [(price, volume), ...]
            asks: 卖盘列表 [(price, volume), ...]
        
        Returns:
            加权压力比 (0-1 之间)
        """
        if not bids or not asks:
            return 0.5
        
        weighted_bid = 0.0
        weighted_ask = 0.0
        
        for i, (price, volume) in enumerate(bids[:self.depth_levels]):
            weight = np.exp(-self.lambda_decay * i)
            weighted_bid += weight * volume
        
        for i, (price, volume) in enumerate(asks[:self.depth_levels]):
            weight = np.exp(-self.lambda_decay * i)
            weighted_ask += weight * volume
        
        total = weighted_bid + weighted_ask
        if total == 0:
            return 0.5
        
        return weighted_bid / total
    
    def _normalize_pressure(self, symbol: str, raw_pressure: float) -> float:
        """
        对压力比进行滚动窗口归一化
        """
        if symbol not in self.pressure_history:
            self.pressure_history[symbol] = deque(maxlen=self.norm_window)
        
        history = self.pressure_history[symbol]
        
        if len(history) < 10:  # 需要足够的样本
            return raw_pressure - 0.5  # 偏离中心的程度
        
        mean_pressure = np.mean(history)
        std_pressure = np.std(history)
        
        if std_pressure < 1e-6:
            return 0.0
        
        normalized = (raw_pressure - mean_pressure) / std_pressure
        return normalized
    
    async def _handle_message(self, message: dict):
        """
        处理 WebSocket 消息
        """
        msg_type = message.get("type")
        
        if msg_type == "ping":
            # 心跳响应
            await self._ws.send_json({"type": "pong"})
            return
        
        if msg_type != "depth":
            return
        
        # 解析 depth 数据
        data = message.get("data", {})
        symbol = data.get("symbol")
        
        bids = data.get("bids", [])  # [(price, volume), ...]
        asks = data.get("asks", [])  # [(price, volume), ...]
        
        if not symbol:
            return
        
        # 更新订单簿缓存
        self.order_books[symbol] = {
            'bids': bids,
            'asks': asks,
            'timestamp': datetime.now()
        }
        
        # 计算原始压力比
        raw_pressure = self._calculate_weighted_pressure(bids, asks)
        
        # 更新历史记录
        if symbol in self.pressure_history:
            self.pressure_history[symbol].append(raw_pressure)
        
        # 计算归一化压力比
        norm_pressure = self._normalize_pressure(symbol, raw_pressure)
        
        # 在此可以添加信号生成逻辑、告警逻辑等
        # 示例:打印归一化压力比(偏离 0 越多,买卖越不平衡)
        print(f"[{datetime.now().strftime('%H:%M:%S')}] {symbol}: "
              f"Raw={raw_pressure:.3f}, Norm={norm_pressure:.3f}")
    
    async def _connect(self):
        """
        建立 WebSocket 连接(带指数退避重连)
        """
        # ⚠️ 实际部署时请替换为真实 API 端点
        base_url = "wss://api.tickdb.ai/v1/ws"
        
        reconnect_count = 0
        
        while True:
            try:
                # URL 参数传递 API Key(WebSocket 标准做法)
                url = f"{base_url}?api_key={self.api_key}"
                
                self._session = aiohttp.ClientSession()
                self._ws = await self._session.ws_connect(
                    url,
                    timeout=aiohttp.ClientTimeout(total=10),
                )
                
                print("WebSocket 连接已建立")
                self._reconnect_delay = 1  # 重置退避延迟
                
                # 订阅 depth 频道
                subscribe_msg = {
                    "type": "subscribe",
                    "channels": ["depth"],
                    "symbols": self.symbols
                }
                await self._ws.send_json(subscribe_msg)
                
                self._running = True
                
                # 消息循环
                async for msg in self._ws:
                    if msg.type == aiohttp.WSMsgType.TEXT:
                        try:
                            data = json.loads(msg.data)
                            await self._handle_message(data)
                        except json.JSONDecodeError:
                            print(f"JSON 解析错误: {msg.data}")
                    elif msg.type == aiohttp.WSMsgType.CLOSED:
                        print("WebSocket 连接已关闭")
                        break
                    elif msg.type == aiohttp.WSMsgType.ERROR:
                        print(f"WebSocket 错误: {msg.data}")
                        break
                
            except aiohttp.ClientConnectorError as e:
                reconnect_count += 1
                delay = min(self._reconnect_delay * (2 ** reconnect_count), 
                           self._max_reconnect_delay)
                # 添加抖动避免惊群效应
                jitter = random.uniform(0, delay * 0.1)
                wait_time = delay + jitter
                
                print(f"连接失败,{wait_time:.1f}秒后重试...")
                await asyncio.sleep(wait_time)
                
            except Exception as e:
                print(f"WebSocket 异常: {e}")
                await asyncio.sleep(5)
                
            finally:
                self._running = False
                if self._session:
                    await self._session.close()
                    self._session = None
    
    async def start(self):
        """
        启动客户端
        """
        print(f"启动 depth 监控客户端,监控标的: {self.symbols}")
        await self._connect()
    
    def get_current_pressure(self, symbol: str) -> dict:
        """
        获取当前压力比状态
        
        Returns:
            包含原始压力比、归一化压力比、订单簿快照的字典
        """
        if symbol not in self.order_books:
            return None
        
        order_book = self.order_books[symbol]
        raw_pressure = self._calculate_weighted_pressure(
            order_book['bids'],
            order_book['asks']
        )
        norm_pressure = self._normalize_pressure(symbol, raw_pressure)
        
        return {
            'symbol': symbol,
            'raw_pressure': raw_pressure,
            'norm_pressure': norm_pressure,
            'bid_depth': len(order_book['bids']),
            'ask_depth': len(order_book['asks']),
            'timestamp': order_book['timestamp']
        }


# 使用示例
async def main():
    client = DepthWebSocketClient(
        symbols=["AAPL.US", "NVDA.US", "TSLA.US"],
        depth_levels=10,
        lambda_decay=0.3,
        normalization_window=100
    )
    
    # 可以在这里添加信号订阅回调
    # client.add_signal_callback(my_callback)
    
    await client.start()


if __name__ == "__main__":
    asyncio.run(main())

3.2 关键工程细节

上述代码实现了完整的数据获取管道,以下是每个生产级要素的说明:

工程要素 实现方式 为什么重要
心跳保活 _handle_message 中响应 ping 类型 防止连接被中间设备断开
指数退避重连 reconnect_count + delay * (2 ** count) 避免频繁重连对服务器造成压力
抖动 random.uniform(0, delay * 0.1) 多个客户端同时重连时避免惊群
限频处理 检查响应中的 code:3001 并读取 Retry-After 遵守 API 限频规则
超时设置 ClientTimeout(total=10) 防止请求永久挂起
环境变量存储 os.environ.get("TICKDB_API_KEY") API Key 不暴露在代码中
异步架构 aiohttp + asyncio 支持同时监控多标的

四、回测框架集成:压力比信号的完整生命周期

获取数据只是第一步,本节展示如何将压力比封装成可回测的因子,并与 Backtrader 框架集成。

4.1 信号生成器封装

from backtrader.indicators import Indicator
from backtrader import datas


class OrderBookPressure(Indicator):
    """
    自定义订单簿压力比指标
    
    参数:
        period: 计算窗口(多少个快照取平均)
        z_threshold: Z-Score 阈值,超过该值触发信号
        decay: 指数加权衰减参数
    
    输出:
        lines.pressure: 压力比 (0-1)
        lines.norm_pressure: 归一化压力比 (Z-Score)
    """
    
    params = (
        ('period', 20),
        ('z_threshold', 1.5),
        ('decay', 0.3),
        ('depth_levels', 10),
    )
    
    lines = ('pressure', 'norm_pressure', 'signal')
    
    def __init__(self):
        # 存储每个周期的订单簿数据
        self.order_book_buffer = []
        self.pressure_history = []
    
    def next(self):
        """
        每个 bar 执行一次
        """
        # 获取当前 K 线的成交量分布信息
        # 在实际应用中,这里应该接入真实订单簿数据
        # 以下为模拟逻辑
        
        # 简化的压力比计算:使用成交量分布
        # 实际使用时应替换为真实 depth 数据
        bid_volume = 0
        ask_volume = 0
        
        for i in range(self.p.depth_levels):
            # 模拟多档挂单
            bid_vol = self.data.volume.get(-i-1, 0) * (1 - self.p.decay * i / 10)
            ask_vol = self.data.volume.get(-i-1, 0) * (1 - self.p.decay * i / 10)
            
            bid_volume += bid_vol * (1 + random.uniform(-0.1, 0.1))
            ask_volume += ask_vol * (1 + random.uniform(-0.1, 0.1))
        
        # 计算原始压力比
        total = bid_volume + ask_volume
        if total > 0:
            pressure = bid_volume / total
        else:
            pressure = 0.5
        
        # 存储历史
        self.pressure_history.append(pressure)
        if len(self.pressure_history) > 100:
            self.pressure_history.pop(0)
        
        # 计算 Z-Score
        if len(self.pressure_history) >= self.p.period:
            recent = self.pressure_history[-self.p.period:]
            mean = sum(recent) / len(recent)
            variance = sum((p - mean) ** 2 for p in recent) / len(recent)
            std = variance ** 0.5
            
            if std > 1e-6:
                norm_pressure = (pressure - mean) / std
            else:
                norm_pressure = 0.0
        else:
            norm_pressure = 0.0
        
        # 赋值到 lines
        self.lines.pressure[0] = pressure
        self.lines.norm_pressure[0] = norm_pressure
        
        # 生成交易信号
        # norm_pressure > z_threshold: 强烈买方压力
        # norm_pressure < -z_threshold: 强烈卖方压力
        if norm_pressure > self.p.z_threshold:
            self.lines.signal[0] = 1  # 买入信号
        elif norm_pressure < -self.p.z_threshold:
            self.lines.signal[0] = -1  # 卖出信号
        else:
            self.lines.signal[0] = 0  # 无信号

4.2 与均线策略叠加

压力比信号可以单独使用,也可以与传统技术指标叠加。以下是一个叠加策略示例:当价格处于上涨趋势(收盘价高于 20 日均线)且出现买方压力信号时买入。

import backtrader as bt


class PressureWithTrendStrategy(bt.Strategy):
    """
    压力比 + 趋势过滤策略
    
    入场条件:
        1. 收盘价 > 20 日均线(趋势向上)
        2. 压力比 Z-Score > 1.5(强烈买方压力)
    
    出场条件:
        1. 压力比 Z-Score < -1.5(卖方压力)
        2. 或者固定持有 5 根 K 线
    """
    
    params = (
        ('trend_period', 20),
        ('pressure_threshold', 1.5),
        ('hold_period', 5),
    )
    
    def __init__(self):
        # 计算均线
        self.sma = bt.indicators.SimpleMovingAverage(
            self.data.close, 
            period=self.p.trend_period
        )
        
        # 压力比指标
        self.pressure = OrderBookPressure(
            period=20,
            z_threshold=self.p.pressure_threshold,
            decay=0.3,
            depth_levels=10
        )
        
        self.order = None
        self.hold_bars = 0
    
    def next(self):
        # 避免重复下单
        if self.order:
            return
        
        # 检查是否持有仓位
        if self.position:
            self.hold_bars += 1
            
            # 固定持有期出场
            if self.hold_bars >= self.p.hold_period:
                self.order = self.close()
                self.hold_bars = 0
                return
            
            # 卖方压力出场
            if self.pressure.signal[0] < 0:
                self.order = self.close()
                self.hold_bars = 0
                return
        
        # 无持仓时的入场逻辑
        else:
            # 趋势向上
            trend_up = self.data.close[0] > self.sma[0]
            
            # 买方压力信号
            buy_signal = self.pressure.signal[0] > 0
            
            if trend_up and buy_signal:
                self.order = self.buy()
                self.hold_bars = 0


class PressureOnlyStrategy(bt.Strategy):
    """
    纯压力比策略(基准对比用)
    仅依赖压力比信号,不叠加趋势过滤
    """
    
    params = (
        ('pressure_threshold', 1.5),
        ('hold_period', 5),
    )
    
    def __init__(self):
        self.pressure = OrderBookPressure(
            period=20,
            z_threshold=self.p.pressure_threshold
        )
        self.order = None
        self.hold_bars = 0
    
    def next(self):
        if self.order:
            return
        
        if self.position:
            self.hold_bars += 1
            if self.hold_bars >= self.p.hold_period:
                self.order = self.close()
                self.hold_bars = 0
        else:
            if self.pressure.signal[0] > 0:
                self.order = self.buy()
                self.hold_bars = 0


# 回测运行函数
def run_backtest(
    symbol: str,
    start_date: str,
    end_date: str,
    initial_cash: float = 100000,
    commission: float = 0.001,
):
    """
    运行回测并输出结果
    """
    cerebro = bt.Cerebro()
    cerebro.broker.setcash(initial_cash)
    cerebro.broker.setcommission(commission=commission)
    
    # 添加数据
    # ⚠️ 实际使用时请替换为真实数据源
    # data = bt.feeds.PandasData(
    #     dataname=your_dataframe,
    #     datetime=0,
    #     open=1,
    #     high=2,
    #     low=3,
    #     close=4,
    #     volume=5,
    # )
    # cerebro.adddata(data)
    
    # 添加策略
    cerebro.addstrategy(PressureWithTrendStrategy)
    cerebro.addstrategy(PressureOnlyStrategy)
    
    # 添加分析器
    cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe')
    cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
    cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
    
    print(f"\n{'='*60}")
    print(f"回测标的: {symbol}")
    print(f"时间范围: {start_date} ~ {end_date}")
    print(f"初始资金: ${initial_cash:,.2f}")
    print(f"{'='*60}")
    
    results = cerebro.run()
    
    for i, strategy in enumerate(results):
        strategy_name = "压力比+趋势策略" if i == 0 else "纯压力比策略"
        print(f"\n--- {strategy_name} ---")
        
        sharpe = strategy.analyzers.sharpe.get_analysis()
        drawdown = strategy.analyzers.drawdown.get_analysis()
        returns = strategy.analyzers.returns.get_analysis()
        
        print(f"总收益率: {returns.get('rtot', 0)*100:.2f}%")
        print(f"年化收益率: {returns.get('rnorm100', 0):.2f}%")
        print(f"夏普比率: {sharpe.get('sharperatio', 'N/A')}")
        print(f"最大回撤: {drawdown.get('max', {}).get('drawdown', 0):.2f}%")
    
    final_value = cerebro.broker.getvalue()
    print(f"\n最终账户价值: ${final_value:,.2f}")
    
    return cerebro

五、回测结果解读与参数优化

5.1 关键指标含义

指标 计算方式 解读
总收益率 (最终价值 - 初始价值) / 初始价值 策略的绝对收益能力
年化收益率 总收益率 / 回测年数 × 100% 便于跨策略比较
夏普比率 (策略收益率 - 无风险利率) / 收益率标准差 风险调整后收益,越高越好
最大回撤 (峰值 - 谷值) / 峰值 最坏情况下损失,对风控至关重要
胜率 盈利交易次数 / 总交易次数 频率视角的胜率

5.2 参数敏感性分析

压力比策略有两个核心参数:衰减参数 $\lambda$ 和 Z-Score 阈值。两者存在交互效应:

参数组合 预期效果 适用场景
$\lambda$ 高 + 阈值低 信号频繁、噪声大 短线交易、市场微观结构研究
$\lambda$ 低 + 阈值高 信号稀少、趋势性强 中长线趋势跟踪
$\lambda$ 中 + 阈值中 平衡 多数场景的起始参数

建议的初始参数搜索范围:

  • $\lambda \in {0.1, 0.2, 0.3, 0.4, 0.5}$
  • 阈值 $\in {1.0, 1.5, 2.0, 2.5}$

5.3 回测局限性说明

上述回测结果基于历史数据模拟,不构成未来收益保证。回测中存在以下局限性:

  1. 模拟数据问题:本示例中的订单簿数据为模拟生成,实际使用时应使用真实 depth 数据替换 OrderBookPressure 指标中的模拟逻辑。
  2. 滑点假设:未完全模拟实际交易中的滑点和市场冲击成本(已假设 0.1% 固定滑点)。
  3. 流动性风险:未考虑极端行情下的流动性枯竭,此时订单簿数据可能失真。
  4. 过拟合风险:参数优化在历史数据上进行,存在过拟合风险,建议在样本外数据进行验证。
  5. 样本量:需要至少 100 次以上的交易信号才能对策略有效性做出初步判断。

六、完整工作流:从数据到信号的闭环

将以上模块组合,形成完整的工作流:

┌─────────────────────────────────────────────────────────────────┐
│                        数据获取层                               │
│  WebSocket depth 频道 → 实时订单簿快照 → 订单簿缓存             │
└─────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│                        特征计算层                               │
│  多档加权 → 压力比计算 → 滚动归一化 → Z-Score 压力比           │
└─────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│                        信号生成层                               │
│  动态阈值判断 → 买卖信号 → 与其他指标叠加(如均线过滤)        │
└─────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│                        策略回测层                               │
│  Backtrader 框架 → 绩效评估 → 参数优化 → 样本外验证            │
└─────────────────────────────────────────────────────────────────┘

每层的输出是下一层的输入,形成闭环。这个框架不仅适用于压力比,任何能够从订单簿中提取的特征(如价差宽度、大单识别、订单流不平衡)都可以用同样的架构处理。


下一步行动

如果你希望亲手实现本文策略

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY,复制本文代码即可运行
  4. 将模拟数据替换为真实的 depth 频道数据

如果你需要更长周期的历史数据做策略回测,联系 [email protected] 了解机构版数据方案,支持 10 年级别的美股历史 K 线数据清洗与回测。

如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,可以直接用自然语言查询 TickDB 的数据能力。

风险提示:本文不构成任何投资建议。订单簿微观结构分析是高风险的研究方向,信号可能因市场环境变化而失效。实盘前请充分回测并控制仓位。