“我的策略回测夏普 2.8,最大回撤 3%。实盘第一周,回撤 12%。”
这不是策略失灵的故事。这是一个关于认知差距的故事。
2021 年,一个有着 8 年量化经验的团队,用机器学习训练了一个美股统计套利模型。回测 5 年,夏普 3.1,最大回撤 2.4%。他们信心满满地上线实盘。三个月后,策略被关闭。事后复盘,他们发现:回测中从未出现的滑点,在高波动时段高达 8 个最小报价单位;API 延迟在流动性紧张时超过 500ms;某个回调逻辑在网络抖动时死锁。
这个团队犯的错误,每个量化新手都会犯:把回测环境当成真实市场的镜像,而忽略了那 5 道鸿沟。
本文系统拆解从回测到实盘的五个核心差距,并给出带生产级健壮性的监控代码框架,帮助你在上线前填平这些坑。
一、滑点与市场冲击:回测里没有的交易成本
1.1 回测假设 vs 现实
回测引擎默认你的订单是价格接受者:你下单,价格就是报价盘上的那个数字。现实是,你的订单本身在影响市场价格——尤其是在流动性较差的标的或波动剧烈的时段。
三个典型场景:
| 场景 | 回测假设 | 实盘真相 |
|---|---|---|
| 美股小盘股限价单 | 立即成交,无滑点 | 卖一价挂单量不足,滑点 2-5 个 tick |
| 财报发布瞬间 | 价格连续,无缺口 | 流动性真空,价格跳空 10%+ |
| 流动性枯竭期 | bid/ask 价差正常 | 价差扩大 10 倍,部分档位消失 |
量化数据:根据 SEC 和多家量化机构的研究披露,标普 500 成分股的实盘滑点约为 0.5-2 个最小报价单位,但在财报发布前后,滑点可达 10-30 个 tick。
1.2 滑点的量化建模
回测时,滑点应该作为显式成本纳入:
# 回测中的滑点模型(简化版)
def apply_slippage(execution_price, expected_price, slippage_bps=5):
"""
模拟滑点成本
参数:
execution_price: 预期成交价
expected_price: 报价盘价格
slippage_bps: 滑点成本(基点),默认 5bps
流动性好的大盘股: 1-3bps
流动性差的标的: 10-50bps
返回:
实际成交价(对对手方有利)
"""
# 正数表示你付了更高价格(买入)或拿到更低价格(卖出)
slippage = expected_price * (slippage_bps / 10000)
return execution_price + slippage
# 波动率自适应滑点模型
def adaptive_slippage(price, volatility_percentile, base_bps=5):
"""
基于当前波动率的动态滑点估算
volatility_percentile: 波动率在历史分布中的分位
0.5 = 中位数波动率
0.9 = 高波动时段
"""
volatility_multiplier = 1 + (volatility_percentile - 0.5) * 4
return base_bps * volatility_multiplier
1.3 实战校准方法
历史分位数法:用你的策略执行时段的历史数据,计算实际滑点的 95% 分位数,用作保守估计:
# 滑点历史校准伪代码
def calibrate_slippage_from_history(trades_df, benchmark_col="mid_price"):
"""
trades_df: 历史成交记录,需包含实际成交价和基准价
"""
trades_df["slippage_bps"] = (
(trades_df["execution_price"] - trades_df[benchmark_col])
/ trades_df[benchmark_col] * 10000
)
# 保守估计:使用 95% 分位数
conservative_slippage = trades_df["slippage_bps"].quantile(0.95)
return {
"mean": trades_df["slippage_bps"].mean(),
"median": trades_df["slippage_bps"].median(),
"p95": conservative_slippage,
"p99": trades_df["slippage_bps"].quantile(0.99)
}
实践建议:如果你的回测滑点假设是 2bps,实盘很可能在 8-15bps。先用保守假设跑通逻辑,再逐步放宽测试。
二、延迟与执行时窗:策略信号与执行的时差
2.1 延迟的来源拆解
延迟不是单点问题,而是链式叠加:
总延迟 = 数据传输延迟 + 策略计算延迟 + 订单路由延迟 + 交易所处理延迟 + 市场反馈延迟
| 环节 | 典型延迟 | 影响因素 |
|---|---|---|
| 数据传输 | 5-50ms | 网络距离、CDN、协议(HTTP vs WebSocket) |
| 策略计算 | 1-10ms | 因子数量、Python GIL(若用多线程) |
| 订单路由 | 10-100ms | broker 机构、订单类型、手工确认 |
| 交易所处理 | 1-5ms | 交易所技术架构、订单队列深度 |
| 市场反馈 | 取决于流动性 | 订单簿厚度、你的订单量占比 |
总计:高频统计套利延迟需 <50ms;日内波段策略 <500ms 可接受;日线级别策略 <5s 足够。
2.2 延迟对策略的影响
均值回归策略:信号有效期极短,延迟 200ms 可能让信号从“低估”变成“高估”。回测假设信号触发即成交,实盘则需要:
# 考虑延迟的信号有效性模型
class SignalWithLatency:
def __init__(self, signal_validity_ms=500):
self.signal_validity_ms = signal_validity_ms
self.pending_signals = []
def evaluate_signal(self, z_score, latency_ms, threshold=2.0):
"""
z_score: 当前 z 分数
latency_ms: 预估延迟
threshold: 入场阈值
"""
# 延迟导致的信号衰减因子
latency_decay = max(0, 1 - latency_ms / self.signal_validity_ms)
effective_signal = z_score * latency_decay
# 回测不考虑延迟,总是直接用 z_score 判断
# 实盘必须用 effective_signal
return {
"raw_signal": z_score,
"effective_signal": effective_signal,
"latency_penalty_pct": (1 - latency_decay) * 100,
"action": "EXECUTE" if effective_signal > threshold else "SKIP"
}
趋势跟踪策略:对延迟容忍度更高,但会在震荡行情中产生更多虚假信号(因为趋势确认滞后,进场点位差)。
2.3 实盘延迟监控
以下代码演示如何在实盘中持续监控延迟,并记录异常:
import time
import logging
from collections import deque
from datetime import datetime
class LatencyMonitor:
"""
延迟监控器:记录策略信号到实际成交的全链路延迟
"""
def __init__(self, window_size=100):
self.signal_times = {} # signal_id -> timestamp
self.execution_times = {} # signal_id -> timestamp
self.latencies = deque(maxlen=window_size)
self.logger = logging.getLogger("latency_monitor")
def mark_signal(self, signal_id):
self.signal_times[signal_id] = time.perf_counter()
def mark_execution(self, signal_id, execution_price, fill_price=None):
if signal_id not in self.signal_times:
self.logger.warning(f"Signal {signal_id} not found")
return
exec_time = time.perf_counter()
signal_time = self.signal_times[signal_id]
latency_ms = (exec_time - signal_time) * 1000
self.execution_times[signal_id] = {
"execution_time": exec_time,
"execution_price": execution_price,
"fill_price": fill_price,
"latency_ms": latency_ms
}
self.latencies.append(latency_ms)
# 异常延迟告警
if latency_ms > self._get_threshold():
self.logger.warning(
f"High latency detected: {latency_ms:.2f}ms "
f"(threshold: {self._get_threshold():.2f}ms)"
)
def _get_threshold(self):
"""基于滑动窗口动态计算告警阈值"""
if len(self.latencies) < 10:
return 500 # 初始默认值
import statistics
return statistics.median(self.latencies) * 3
def get_stats(self):
import statistics
if not self.latencies:
return {"count": 0}
latencies_list = list(self.latencies)
return {
"count": len(latencies_list),
"mean_ms": statistics.mean(latencies_list),
"median_ms": statistics.median(latencies_list),
"p95_ms": sorted(latencies_list)[int(len(latencies_list) * 0.95)],
"p99_ms": sorted(latencies_list)[int(len(latencies_list) * 0.99)],
"max_ms": max(latencies_list)
}
三、连接稳定性:断线、限频与容错机制
3.1 回测环境的隐含假设
回测假设数据永远可用:每个时间点,你都能拿到干净的、连续的市场数据。实盘环境不是:
- 网络抖动导致数据丢包
- API 服务端重启导致连接断开
- 请求频率超限被限流
- 交易所端故障导致数据延迟或缺失
3.2 断连处理的工程实践
这是实盘和回测差距最大的地方,也是新手最容易忽视的地方。以下是 TickDB WebSocket 接入的生产级代码,包含心跳、重连、限频等完整容错机制:
import os
import json
import time
import random
import logging
import threading
from datetime import datetime
try:
import websocket
except ImportError:
raise ImportError("请先安装: pip install websocket-client")
# ============== 配置区 ==============
API_KEY = os.environ.get("TICKDB_API_KEY")
if not API_KEY:
raise ValueError("请设置环境变量 TICKDB_API_KEY")
WS_URL = "wss://api.tickdb.ai/ws/market"
SYMBOL = "AAPL.US" # 示例标的
# 重连参数
INITIAL_RECONNECT_DELAY = 1 # 初始重连延迟(秒)
MAX_RECONNECT_DELAY = 60 # 最大重连延迟(秒)
RECONNECT_BACKOFF_MULTIPLIER = 2 # 退避倍数
MAX_RECONNECT_ATTEMPTS = 20 # 最大重连次数
# 限频参数(3001 错误处理)
RATE_LIMIT_BACKOFF = 5 # 限频默认等待时间
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger("TickDB_WS")
class TickDBWebSocketClient:
"""
TickDB WebSocket 客户端 - 生产级健壮实现
特性:
- 心跳保活(ping/pong)
- 指数退避 + 抖动重连
- 限频自适应处理(code 3001)
- 线程安全的回调机制
- 断连期间数据缓冲
⚠️ 注意:高频场景建议使用 aiohttp/asyncio 重写
"""
def __init__(self, api_key, symbol, url=WS_URL):
self.api_key = api_key
self.symbol = symbol
self.url = f"{url}?api_key={api_key}"
self.ws = None
self.running = False
self.reconnect_attempts = 0
# 心跳参数
self.last_ping_time = None
self.ping_interval = 20 # 秒
self.pong_timeout = 10 # 秒
# 限频状态
self.rate_limited = False
self.retry_after = RATE_LIMIT_BACKOFF
# 数据回调
self.on_depth_update = None
self.on_trade_update = None
self.on_error = None
# 断连缓冲(用于恢复)
self.data_buffer = []
self.buffer_lock = threading.Lock()
def connect(self):
"""建立 WebSocket 连接"""
logger.info(f"正在连接到 {self.symbol}...")
try:
self.ws = websocket.WebSocketApp(
self.url,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
on_open=self._on_open
)
self.running = True
self.ws.run_forever(
ping_interval=self.ping_interval,
ping_timeout=self.pong_timeout
)
except Exception as e:
logger.error(f"连接失败: {e}")
self._schedule_reconnect()
def _on_open(self, ws):
"""连接建立时的回调"""
logger.info(f"连接已建立,开始订阅 {self.symbol}")
self.reconnect_attempts = 0
# 订阅 depth 频道(订单簿深度)
subscribe_msg = {
"cmd": "subscribe",
"args": {
"channel": "depth",
"symbol": self.symbol
}
}
ws.send(json.dumps(subscribe_msg))
logger.info(f"已订阅 depth 频道: {self.symbol}")
# 如果 API 支持,订阅 trades 频道(逐笔成交)
trades_msg = {
"cmd": "subscribe",
"args": {
"channel": "trades",
"symbol": self.symbol
}
}
ws.send(json.dumps(trades_msg))
logger.info(f"已订阅 trades 频道: {self.symbol}")
def _on_message(self, ws, message):
"""处理接收到的消息"""
try:
data = json.loads(message)
# 处理限频响应
if code := data.get("code"):
if code == 3001:
retry_after = int(data.get("headers", {}).get(
"Retry-After", RATE_LIMIT_BACKOFF
))
logger.warning(f"触发限频,等待 {retry_after} 秒")
time.sleep(retry_after)
return
# 处理心跳响应
if data.get("type") == "pong":
latency = (time.time() - self.last_ping_time) * 1000
logger.debug(f"心跳响应延迟: {latency:.2f}ms")
return
# 处理深度数据
if data.get("channel") == "depth":
with self.buffer_lock:
self.data_buffer.append({
"timestamp": datetime.now().isoformat(),
"data": data.get("data", {})
})
# 保持缓冲区大小
if len(self.data_buffer) > 1000:
self.data_buffer = self.data_buffer[-500:]
if self.on_depth_update:
self.on_depth_update(data.get("data", {}))
# 处理成交数据
elif data.get("channel") == "trades":
if self.on_trade_update:
self.on_trade_update(data.get("data", {}))
except json.JSONDecodeError as e:
logger.error(f"JSON 解析错误: {e}")
except Exception as e:
logger.error(f"消息处理异常: {e}")
def _on_error(self, ws, error):
"""错误处理"""
logger.error(f"WebSocket 错误: {error}")
if self.on_error:
self.on_error(error)
def _on_close(self, ws, close_status_code, close_msg):
"""连接关闭时的回调"""
logger.warning(
f"连接已关闭 (状态码: {close_status_code}, 消息: {close_msg})"
)
self.running = False
self._schedule_reconnect()
def _schedule_reconnect(self):
"""调度重连(指数退避 + 抖动)"""
if self.reconnect_attempts >= MAX_RECONNECT_ATTEMPTS:
logger.error("达到最大重连次数,停止重试")
return
# 计算延迟:指数退避
delay = min(
INITIAL_RECONNECT_DELAY * (RECONNECT_BACKOFF_MULTIPLIER ** self.reconnect_attempts),
MAX_RECONNECT_DELAY
)
# 添加抖动(避免惊群效应)
jitter = random.uniform(0, delay * 0.1)
total_delay = delay + jitter
self.reconnect_attempts += 1
logger.info(
f"将在 {total_delay:.2f} 秒后进行第 {self.reconnect_attempts} 次重连 "
f"(最大 {MAX_RECONNECT_ATTEMPTS} 次)"
)
time.sleep(total_delay)
if not self.running:
self.connect()
def send_ping(self):
"""手动发送 ping(某些服务端需要)"""
if self.ws and self.running:
self.last_ping_time = time.time()
self.ws.send(json.dumps({"cmd": "ping"}))
def disconnect(self):
"""主动断开连接"""
logger.info("正在关闭连接...")
self.running = False
if self.ws:
self.ws.close()
def get_buffer_snapshot(self):
"""获取断连期间的数据缓冲(用于事后分析)"""
with self.buffer_lock:
return list(self.data_buffer)
# ============== 使用示例 ==============
if __name__ == "__main__":
client = TickDBWebSocketClient(api_key=API_KEY, symbol=SYMBOL)
# 定义数据回调
def handle_depth(depth_data):
"""处理订单簿深度更新"""
bids = depth_data.get("bids", [])
asks = depth_data.get("asks", [])
if bids and asks:
best_bid = float(bids[0][0])
best_ask = float(asks[0][0])
spread = (best_ask - best_bid) / best_bid * 10000
logger.info(
f"[订单簿] 买一: {best_bid} | 卖一: {best_ask} | "
f"价差: {spread:.1f} bps | 档位: {len(bids)}x{len(asks)}"
)
# 示例:检测流动性异常
if spread > 20: # 价差超过 20bps
logger.warning(f"⚠️ 流动性紧张: 价差 {spread:.1f} bps")
def handle_trade(trade_data):
"""处理逐笔成交"""
price = trade_data.get("price")
volume = trade_data.get("volume")
side = trade_data.get("side", "UNKNOWN")
logger.debug(f"[成交] {side} {volume} @ {price}")
def handle_error(error):
"""处理连接错误"""
logger.error(f"连接异常: {error}")
client.on_depth_update = handle_depth
client.on_trade_update = handle_trade
client.on_error = handle_error
try:
logger.info("启动 TickDB WebSocket 客户端...")
client.connect()
except KeyboardInterrupt:
logger.info("收到中断信号,正在关闭...")
client.disconnect()
3.3 断连日志分析模板
断连后,以下信息对诊断问题至关重要:
def analyze_connection_issues(log_entries):
"""
分析断连日志,识别模式
"""
issues = {
"timeout_count": 0,
"rate_limit_count": 0,
"reconnect_count": 0,
"avg_reconnect_delay": [],
"data_gaps": [] # 缺失的时间段
}
for entry in log_entries:
if "timeout" in entry.get("message", "").lower():
issues["timeout_count"] += 1
if "rate limit" in entry.get("message", "").lower() or "3001" in str(entry):
issues["rate_limit_count"] += 1
if "reconnect" in entry.get("message", "").lower():
issues["reconnect_count"] += 1
if "delay" in entry:
issues["avg_reconnect_delay"].append(entry["delay"])
return {
**issues,
"avg_reconnect_delay": (
sum(issues["avg_reconnect_delay"]) / len(issues["avg_reconnect_delay"])
if issues["avg_reconnect_delay"] else 0
)
}
四、过拟合陷阱:回测在“作弊”你
4.1 过拟合的三种形态
形态一:参数过拟合
# 回测代码中的典型陷阱
import numpy as np
def optimize_parameters(prices, param_range):
"""
演示:如果用同一组数据优化参数,会严重过拟合
"""
results = []
for window in param_range:
for threshold in np.arange(0.5, 3.0, 0.1):
# 在训练数据上测试(这就是过拟合的根源)
pnl = backtest(prices, window, threshold)
results.append({"window": window, "threshold": threshold, "pnl": pnl})
# 找最优参数
best = max(results, key=lambda x: x["pnl"])
# ⚠️ 这里的夏普比率是假的!它在同一批数据上“作弊”
return best
形态二:未来函数(Look-ahead Bias)
# 错误示例:使用了未来信息
def wrong_indicator(prices):
"""
计算均线时错误地用了未来数据
"""
# 这是错的:假设我们用当天收盘价计算信号再交易当天
# 实际应该用前一天的信息
return {
"signal": "BUY" if prices[-1] > prices[-1].mean() else "SELL",
# 正确做法:
# "signal": "BUY" if prices[-1] > prices[:-1].mean() else "SELL"
}
# 更隐蔽的例子:特征工程中的未来函数
def extract_features(df):
df["future_return"] = df["close"].shift(-1) # ⚠️ 直接暴露了答案
return df
形态三:幸存者偏差
只选择当前存在的标的进行回测,忽略了历史上已经退市、破产、被收购的标的。这些“消失”的标的在回测期内的收益是 -100%,但你看不见。
4.2 过拟合的量化检验
Walk-Forward Analysis(前进分析)
def walk_forward_validation(prices, train_window, test_window, step):
"""
前进分析:用滚动窗口防止过拟合
原理:
1. 用 [t, t+train_window] 的数据优化参数
2. 用 [t+train_window, t+train_window+test_window] 的数据验证
3. 滚动前进
"""
results = []
t = 0
while t + train_window + test_window < len(prices):
train_data = prices[t:t+train_window]
test_data = prices[t+train_window:t+train_window+test_window]
# 在训练集上优化
optimal_params = optimize_parameters(train_data)
# 在测试集上验证(不使用优化数据)
test_sharpe = backtest(test_data, **optimal_params)
results.append({
"train_sharpe": backtest(train_data, **optimal_params),
"test_sharpe": test_sharpe,
"params": optimal_params,
"period": f"{t} to {t+train_window+test_window}"
})
t += step
return pd.DataFrame(results)
键指标:如果训练集夏普 2.5,测试集夏普 0.8——差距超过 50%,严重过拟合。
4.3 降低过拟合的工程实践
| 方法 | 说明 | 实施难度 |
|---|---|---|
| 样本外验证 | 留出 20-30% 数据不做参数优化 | 低 |
| 前进分析 | 滚动窗口,多次验证 | 中 |
| 交叉验证 | K-Fold 时序交叉验证 | 中 |
| 简化模型 | 减少参数数量(Occam 剃刀) | 低 |
| 约束条件 | 给参数加物理意义约束 | 中 |
| Monte Carlo | 多次随机抽样验证稳健性 | 中 |
五、心态与纪律:被量化的“人性弱点”
5.1 量化策略中的人性陷阱
很多人以为,量化交易可以完全消除情绪影响。实际上,执行层面的人性弱点仍然存在:
陷阱一:手工干预(Manual Override)
回测时,策略 100% 执行。实盘中,你会忍不住在连续亏损时手动平仓,或者在错过几次信号后觉得“策略失效了”。一个经验法则:如果你的策略在 20 个独立事件中胜率 55%,你可能在连续 5 次亏损后认为它坏了——但这 5 次亏损完全在统计范围内。
陷阱二:容量错配
小资金回测的策略,用大资金实盘时会失效:你的订单量超过了标的的日均成交量 1%,就会显著影响价格。
def estimate_position_impact(position_size, adv_20):
"""
估算持仓对市场的冲击成本
adv_20: 过去 20 日平均成交量
"""
participation_rate = position_size / adv_20
# 经验模型(非精确,供参考)
# participation_rate > 5% 时,冲击成本显著增加
impact_cost_bps = participation_rate * 3 # 简化模型
return {
"participation_rate": participation_rate,
"estimated_impact_bps": impact_cost_bps,
"warning": participation_rate > 0.05
}
陷阱三:低估非交易日
回测只在交易日运行,但实盘需要处理隔夜风险、周末风险、假期风险。你需要考虑:
- 隔夜仓位的宏观事件暴露
- 期货到期日的移仓成本
- 期权到期日前后的 gamma 风险
5.2 系统化纪律执行
将策略的入场、出场、风控规则硬编码,避免手工决策:
class ExecutionGuard:
"""
执行纪律守卫器:强制执行预定义的交易规则
"""
def __init__(self, max_position_pct=0.1, max_drawdown_pct=0.05):
self.max_position_pct = max_position_pct
self.max_drawdown_pct = max_drawdown_pct
self.daily_loss = 0
self.trade_count = 0
self.is_emergency_stop = False
def pre_trade_check(self, signal, current_positions, portfolio_value):
"""
交易前检查
返回: (can_trade: bool, reason: str)
"""
# 检查紧急停止
if self.is_emergency_stop:
return False, "emergency_stop_active"
# 检查日亏损限制
if self.daily_loss > self.max_drawdown_pct * portfolio_value:
self.is_emergency_stop = True
return False, "daily_loss_limit_reached"
# 检查仓位限制
new_position_value = signal.get("position_value", 0)
if new_position_value > self.max_position_pct * portfolio_value:
return False, "position_size_exceeded"
# 检查连续亏损(防止情绪化干预)
if self.trade_count > 0 and self.trade_count % 10 == 0:
if self._recent_win_rate() < 0.4:
return False, "win_rate_below_threshold"
return True, "approved"
def post_trade_update(self, pnl):
"""交易后更新统计"""
self.daily_loss += pnl if pnl < 0 else 0
self.trade_count += 1
def _recent_win_rate(self, window=10):
"""计算最近 N 笔交易的胜率"""
# 简化实现
return 0.5 # 实际应从交易记录中计算
def reset_daily(self):
"""每日重置(开盘前调用)"""
self.daily_loss = 0
self.is_emergency_stop = False
self.trade_count = 0
5.3 心理预期的量化校准
用统计语言替代直觉判断:
| 直觉判断 | 统计校准 |
|---|---|
| “连续亏了 5 笔,策略坏了” | 胜率 55% 时,连续 5 次亏损概率约 18% |
| “这笔止损亏太多了” | 单笔最大亏损是预期的 1.5 倍,在 2 倍标准差内 |
| “这周行情太差了” | 周亏损 -3%,年化波动率 15% 时,单周 -3σ 的概率约 0.3% |
| “再等等就会反弹” | 这是均值回归的陷阱,趋势策略不应逆势加仓 |
六、系统性检验清单:上线前的自检
在将策略从回测迁移到实盘之前,用以下清单逐项检验:
6.1 数据层面
| 检查项 | 通过标准 | 工具/方法 |
|---|---|---|
| 数据完整性 | 无缺失时间点,或已记录并处理 | 频率统计、缺口检测 |
| 数据对齐 | K 线开高低收时间戳一致 | 对比 OHLC 时间 |
| 滑点假设 | 使用 95% 分位数保守估计 | 历史成交 vs 报价对比 |
| 前视偏差 | 无任何使用未来数据的地方 | 代码审计、因果分析 |
6.2 执行层面
| 检查项 | 通过标准 | 工具/方法 |
|---|---|---|
| 延迟监控 | 记录信号到成交的全链路延迟 | LatencyMonitor 类 |
| 断连恢复 | 支持指数退避重连,有缓冲 | 模拟断网测试 |
| 限频处理 | 识别 3001 响应,自动等待 | 代码审查 |
| 仓位校验 | 实盘仓位与策略意图一致 | 每日对账 |
6.3 风控层面
| 检查项 | 通过标准 | 工具/方法 |
|---|---|---|
| 单笔亏损限制 | 硬编码,不依赖人工判断 | ExecutionGuard |
| 日亏损限制 | 触发后自动停止交易 | 风控模块 |
| 仓位上限 | 硬编码,不允许超仓 | 风控模块 |
| 净值回撤 | 接近阈值时降频或停止 | 实时监控 |
6.4 过拟合检验
| 检查项 | 通过标准 | 工具/方法 |
|---|---|---|
| 前进分析 | 测试集夏普 / 训练集夏普 > 0.7 | Walk-Forward |
| 参数敏感度 | 参数 ±10% 时,收益变化 < 30% | 参数扫描 |
| 样本外验证 | 留出数据测试,差距可接受 | 时间切片 |
结语
回测和实盘之间的 5 道鸿沟,不是“模型不够好”,而是认知不够完整。
滑点让你知道交易是有成本的;延迟让你知道信号是有时效的;断连让你知道系统是需要容错的;过拟合让你知道历史是有局限的;心态让你知道纪律是有价值的。
量化交易的本质,不是找到一个“圣杯”,而是系统性地管理不确定性。这 5 道鸿沟填平了,你的策略才真正从“回测里的好策略”变成“实盘中的好系统”。
下一步行动
如果你正在做策略回测:
- 检查你的滑点假设是否使用了 95% 分位数的保守估计
- 运行 Walk-Forward 分析,检验参数稳健性
- 排查代码中是否存在前视偏差
如果你准备上线实盘:
- 部署连接监控代码,记录每一次断连和延迟
- 用生产级 WebSocket 客户端替代轮询(示例代码可直接运行)
- 将风控规则硬编码,避免手工干预
如果你习惯用 AI 辅助开发:
在 ClawHub 搜索安装 tickdb-market-data SKILL,用自然语言查询市场数据。
免责声明:本文不构成任何投资建议。回测结果基于历史数据,不能保证未来收益。实盘交易涉及真实资金损失风险,请谨慎评估自身风险承受能力后再做决策。市场有风险,投资需谨慎。