回测赚到手软,实盘亏到失眠:量化策略迁移失败的 5 个真相
你花了三个月优化一个策略,夏普比率 2.8,最大回撤 12%,年化收益 41%。历史 K 线数据回测结果漂亮得像艺术品。
然后你把它挂到实盘。
三个月后,账户亏了 18%。
你打开日志,发现:成交价格比回测预期差了 0.3%,滑点吞噬了所有利润;延迟导致信号失效,订单还没成交行情已经反转;最要命的是,策略在回测里从没遇到过的极端行情,在实盘里连续出现了两次。
这不是你的策略错了。这是回测和实盘之间,存在五道你从未正视过的鸿沟。
本文逐一拆解这五道鸿沟的本质、成因,以及在 TickDB 架构下如何用工程手段填平它们。
一、滑点鸿沟:回测里不存在的摩擦力
1.1 滑点的本质
回测假设订单以下一个 bar 的开盘价或当前 bar 的收盘价成交。但实盘中:
- 市价单在流动性不足时滑向不利方向
- 限价单可能永远无法成交
- 大单拆单时市场冲击成本非线性叠加
滑点的本质是订单簿状态的瞬时变化,而回测引擎无法还原这种微观结构。
1.2 量化滑点的两种思路
事后校准法:在回测引擎中加入固定的滑点系数(如 0.1%、0.2%),作为摩擦成本的悲观估计。优点是简单,缺点是粗糙——它假设所有行情下滑点恒定,而事实恰恰相反。
事前模拟法:使用 TickDB depth 频道实时还原订单簿深度,结合订单规模估算市场冲击。这才是真正有意义的做法。
import os
import time
import json
import random
import requests
import websocket
# ============================================================
# TickDB WebSocket 实时订单簿监控 + 滑点估算
# ============================================================
class OrderBookMonitor:
"""
监听 TickDB depth 频道,计算买卖压力比,
在下单前评估当前流动性深度,辅助判断滑点风险
"""
def __init__(self, symbol: str):
self.symbol = symbol
self.api_key = os.environ.get("TICKDB_API_KEY")
if not self.api_key:
raise ValueError("请设置环境变量 TICKDB_API_KEY")
self.ws_url = "wss://api.tickdb.ai/v1/market/depth"
self.ws = None
self.order_book = {"bids": [], "asks": []}
self.reconnect_delay = 1.0
self.max_reconnect_delay = 32.0
def connect(self):
"""建立 WebSocket 连接,带指数退避重连"""
try:
# ⚠️ 生产环境高频场景建议使用 aiohttp/asyncio
self.ws = websocket.create_connection(
f"{self.ws_url}?symbol={self.symbol}&api_key={self.api_key}",
timeout=10
)
self.reconnect_delay = 1.0 # 重置退避
print(f"[{self.symbol}] WebSocket 已连接,开始监听 depth 频道")
except Exception as e:
self._handle_connection_error(e)
def _handle_connection_error(self, error: Exception):
"""指数退避 + 抖动重连"""
print(f"[{self.symbol}] 连接错误: {error}")
if self.ws:
self.ws.close()
# 指数退避
delay = self.reconnect_delay
# 抖动:避免惊群效应
jitter = random.uniform(0, delay * 0.1)
print(f"[{self.symbol}] {delay + jitter:.2f}s 后重连...")
time.sleep(delay + jitter)
self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay)
self.connect()
def _send_heartbeat(self):
"""心跳保活"""
if self.ws and self.ws.connected:
try:
self.ws.send(json.dumps({"cmd": "ping"}))
except Exception as e:
print(f"心跳发送失败: {e}")
self._handle_connection_error(e)
def _handle_rate_limit(self, response_data: dict):
"""限频处理(code: 3001)"""
code = response_data.get("code", 0)
if code == 3001:
retry_after = int(response_data.get("retry_after", 5))
print(f"⚠️ 请求频率超限,等待 {retry_after}s")
time.sleep(retry_after)
return True
return False
def estimate_slippage(self, order_size: float, side: str = "buy") -> dict:
"""
基于当前订单簿深度估算滑点风险
Args:
order_size: 目标成交股数
side: "buy" 或 "sell"
Returns:
dict: 包含平均滑点、影响价格、风险评级
"""
if side == "buy":
levels = self.order_book.get("asks", [])
else:
levels = self.order_book.get("bids", [])
if not levels:
return {"error": "订单簿暂无数据"}
cumulative_qty = 0.0
cumulative_cost = 0.0
best_price = float(levels[0].get("price", 0)) if levels else 0
for level in levels:
price = float(level.get("price", 0))
qty = float(level.get("qty", 0))
if cumulative_qty + qty >= order_size:
# 最后一批,部分成交
remaining = order_size - cumulative_qty
cumulative_cost += remaining * price
cumulative_qty = order_size
break
else:
cumulative_cost += qty * price
cumulative_qty += qty
if cumulative_qty == 0:
return {"error": "流动性不足"}
avg_fill_price = cumulative_cost / cumulative_qty
# 计算相对于最优价格的滑点
slippage_bps = abs(avg_fill_price - best_price) / best_price * 10000
# 风险评级
if slippage_bps < 5:
risk_level = "LOW"
elif slippage_bps < 20:
risk_level = "MEDIUM"
else:
risk_level = "HIGH"
return {
"symbol": self.symbol,
"order_size": order_size,
"side": side,
"best_price": best_price,
"avg_fill_price": avg_fill_price,
"slippage_bps": round(slippage_bps, 2),
"risk_level": risk_level,
"cumulative_qty": cumulative_qty
}
def run(self, check_interval: int = 30):
"""持续监控订单簿,定时估算滑点风险"""
self.connect()
last_heartbeat = time.time()
try:
while True:
try:
msg = self.ws.recv()
data = json.loads(msg)
if "code" in data:
if self._handle_rate_limit(data):
continue
if data.get("type") == "depth":
self.order_book = {
"bids": data.get("bids", []),
"asks": data.get("asks", [])
}
# 每 30 秒输出滑点风险评估(针对 10000 股大单)
if time.time() - last_heartbeat >= check_interval:
result = self.estimate_slippage(10000, side="buy")
if "error" not in result:
print(
f"[{self.symbol}] 买单滑点估算 | "
f"最优价: {result['best_price']:.2f} | "
f"平均成交价: {result['avg_fill_price']:.2f} | "
f"滑点: {result['slippage_bps']:.1f}bps | "
f"风险: {result['risk_level']}"
)
last_heartbeat = time.time()
# 发送心跳
self._send_heartbeat()
except websocket.WebSocketTimeoutException:
print(f"[{self.symbol}] 接收超时,发送心跳...")
self._send_heartbeat()
except Exception as e:
print(f"处理消息异常: {e}")
self._handle_connection_error(e)
except KeyboardInterrupt:
print("监控已停止")
self.ws.close()
if __name__ == "__main__":
monitor = OrderBookMonitor("BTC.USDT")
monitor.run(check_interval=30)
运行说明:设置
TICKDB_API_KEY环境变量后,运行脚本即可监控 BTC 订单簿深度,每 30 秒输出一份针对 10000 股大单的滑点风险评估。滑点超过 20bps 时建议暂缓下单。
1.3 滑点回测的正确姿势
回测时不要只加一个固定滑点系数。用历史 depth 数据(TickDB 支持)重现订单簿结构,分行情场景(低波动 / 高波动 / 极端行情)分别校准滑点参数。然后在回测引擎中根据当时的波动率和流动性深度动态施加滑点。
这不是锦上添花。这是基本功。
二、延迟鸿沟:信号到成交之间的幽灵
2.1 延迟的来源分层
从策略发出信号到订单成交,至少经历以下延迟层:
策略信号生成
→ 订单路由(网络传输)
→ 券商 / 交易所接口处理
→ 市场订单簿匹配
→ 成交确认返回
每一层的延迟叠加起来,轻则几十毫秒,重则数百毫秒。在高频策略中,这几百毫秒就是天堂和地狱的差距。
2.2 延迟对策略的影响因策略类型而异
| 策略类型 | 延迟敏感度 | 典型延迟容忍 |
|---|---|---|
| 高频做市 | 极高(<1ms) | 不适合个人量化 |
| 趋势跟踪(日内) | 高(<100ms) | 需要专线 / 共置 |
| 事件驱动(财报) | 中(<1s) | 事件窗口 5-30 分钟,延迟可接受 |
| 均值回归(多日) | 低(<10s) | 几乎无影响 |
对于大多数个人量化策略而言,延迟的主要矛盾不是网络层,而是你获取数据的速度和精度。你用的是轮询(polling)还是 WebSocket 推送?你的数据源实时性如何?
2.3 轮询 vs 推送:延迟的本质差异
# ============================================================
# 轮询方式:每 1 秒请求一次,延迟 = 0~1 秒随机
# ============================================================
def polling_fetch(symbol: str, api_key: str):
"""轮询获取最新行情(错误示范)"""
response = requests.get(
"https://api.tickdb.ai/v1/market/kline/latest",
headers={"X-API-Key": api_key},
params={"symbol": symbol},
timeout=(3.05, 10) # 超时设置
)
data = response.json()
return data.get("data", {})
# ============================================================
# WebSocket 方式:行情推送,延迟 <100ms
# ============================================================
def websocket_fetch(symbol: str, api_key: str):
"""WebSocket 实时行情(正确示范)"""
ws = websocket.create_connection(
f"wss://api.tickdb.ai/v1/market/kline?symbol={symbol}&api_key={api_key}",
timeout=10
)
while True:
msg = ws.recv()
data = json.loads(msg)
# data 包含实时 kline,更新延迟 <100ms
yield data
结论:选择 WebSocket 而非轮询,是填平延迟鸿沟的第一步。TickDB 所有实时数据通道均支持 WebSocket 订阅。
三、断连鸿沟:实盘运行的隐性杀手
3.1 断连的代价
实盘跑策略,最怕的不是行情判断错误,而是半夜断线了你不知道。
2019 年某量化团队的实盘事故:服务器断网 4 小时,期间策略在账户里开满了反向头寸,损失超过 200 万。事后复盘,发现问题很简单——网络抖动导致 WebSocket 连接断开,策略没有任何重连机制,继续按照内存中的旧数据下单。
3.2 生产级断连处理框架
断连处理不是加一个 try/except 那么简单。它需要:
- 心跳检测:定期发送 ping,检测连接存活
- 断连感知:心跳超时后立即触发重连
- 指数退避:避免断连后疯狂重试造成雪崩
- 数据恢复:重连后从断点恢复数据流,不丢行情
- 告警通知:断连超过阈值时立即通知运维
下面是一个完整的断连处理模块:
import os
import time
import json
import random
import requests
import threading
import websocket
from datetime import datetime
# ============================================================
# 生产级 WebSocket 连接管理器(含断连处理 + 告警)
# ============================================================
class RobustWebSocketClient:
"""
带心跳、断连重连、抖动退避、告警通知的 WebSocket 客户端
适用于实盘环境下的行情订阅
"""
def __init__(self, symbol: str, on_data_callback=None, on_disconnect_callback=None):
self.symbol = symbol
self.api_key = os.environ.get("TICKDB_API_KEY")
if not self.api_key:
raise ValueError("请设置环境变量 TICKDB_API_KEY")
self.ws_url = f"wss://api.tickdb.ai/v1/market/kline?symbol={symbol}&api_key={self.api_key}"
self.ws = None
self.running = False
self.on_data_callback = on_data_callback
self.on_disconnect_callback = on_disconnect_callback
# 重连参数
self.base_delay = 1.0
self.max_delay = 60.0
self.reconnect_attempts = 0
# 心跳参数
self.heartbeat_interval = 20 # 秒
self.last_heartbeat_time = time.time()
self.heartbeat_timeout = 30 # 秒
# 告警参数
self.disconnect_start_time = None
self.alert_threshold = 60 # 断连超过 60 秒告警
self._lock = threading.Lock()
def connect(self):
"""建立连接,带超时保护"""
try:
self.ws = websocket.create_connection(
self.ws_url,
timeout=10
)
self.ws.settimeout(5)
self.reconnect_attempts = 0
print(f"[{self.symbol}] ✓ 连接成功 | {datetime.now().strftime('%H:%M:%S')}")
except Exception as e:
print(f"[{self.symbol}] ✗ 连接失败: {e}")
self._schedule_reconnect()
def _send_heartbeat(self):
"""发送心跳,保持连接活跃"""
if self.ws and self.ws.connected:
try:
self.ws.send(json.dumps({"cmd": "ping"}))
self.last_heartbeat_time = time.time()
except Exception as e:
print(f"[{self.symbol}] 心跳发送失败: {e}")
self._handle_disconnect()
def _check_heartbeat(self):
"""检测心跳超时"""
elapsed = time.time() - self.last_heartbeat_time
if elapsed > self.heartbeat_timeout:
print(f"[{self.symbol}] ⚠️ 心跳超时({elapsed:.0f}s),疑似断连")
self._handle_disconnect()
def _handle_disconnect(self):
"""处理断连:记录时间戳、触发回调、调度重连"""
with self._lock:
if self.disconnect_start_time is None:
self.disconnect_start_time = time.time()
print(f"[{self.symbol}] ⚠️ 断连检测 | 时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
if self.on_disconnect_callback:
self.on_disconnect_callback(self.symbol)
self._schedule_reconnect()
def _schedule_reconnect(self):
"""指数退避 + 抖动调度重连"""
if not self.running:
return
# 计算延迟
delay = min(self.base_delay * (2 ** self.reconnect_attempts), self.max_delay)
jitter = random.uniform(0, delay * 0.1)
total_delay = delay + jitter
print(f"[{self.symbol}] ⏳ {total_delay:.1f}s 后重连(第 {self.reconnect_attempts + 1} 次)")
time.sleep(total_delay)
self.reconnect_attempts += 1
self.connect()
def run(self):
"""主循环:接收消息 + 心跳检测"""
self.running = True
self.connect()
try:
while self.running:
# 心跳检测
self._check_heartbeat()
try:
msg = self.ws.recv()
data = json.loads(msg)
# 处理限频
if data.get("code") == 3001:
retry_after = int(data.get("retry_after", 5))
print(f"[{self.symbol}] ⚠️ 限频,等待 {retry_after}s")
time.sleep(retry_after)
continue
# 处理数据
if self.on_data_callback:
self.on_data_callback(data)
# 更新心跳时间戳
self.last_heartbeat_time = time.time()
except websocket.WebSocketTimeoutException:
# 超时不一定是断连,可能是当前没有数据,继续等待
pass
except Exception as e:
print(f"[{self.symbol}] ✗ 接收异常: {e}")
self._handle_disconnect()
except KeyboardInterrupt:
print(f"[{self.symbol}] 收到停止信号,正在关闭连接...")
self.running = False
if self.ws:
self.ws.close()
# ============================================================
# 告警回调示例(可接入飞书 / 钉钉 / 邮件)
# ============================================================
def on_disconnect(symbol: str):
"""断连告警回调"""
elapsed = time.time()
alert_msg = (
f"🚨 TickDB WebSocket 断连告警\n"
f"品种: {symbol}\n"
f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
f"请立即检查服务器网络和策略状态"
)
print(alert_msg)
# TODO: 接入飞书 Webhook / 钉钉机器人 / 邮件通知
def on_data(data: dict):
"""数据回调"""
print(f"[数据更新] {data.get('symbol')} | K线时间: {data.get('kline', {}).get('time')}")
if __name__ == "__main__":
client = RobustWebSocketClient(
symbol="AAPL.US",
on_data_callback=on_data,
on_disconnect_callback=on_disconnect
)
client.run()
工程预警:上述代码适用于低频策略(每秒几条消息)。高频场景下(>100 条/秒),请改用
asyncio+aiohttp重构,避免 GIL 阻塞导致的消息积压。
3.3 断连日志的必要性
每次断连、重连、告警都必须记录日志,包含:
- 断连时间戳
- 重连次数
- 漏掉的行情数量(估算)
- 告警发送时间
这些日志是事后复盘的核心依据,也是判断策略是否在断连期间产生异常头寸的关键证据。
四、过拟合鸿沟:回测优化的是历史,不是未来
4.1 过拟合的三种形态
回测过拟合不只是“参数调太多了”。它有三种更隐蔽的形态:
参数过拟合:用夏普比率最大化作为目标函数,在历史数据上搜索最优参数。典型表现是回测曲线漂亮,实盘曲线惨不忍睹。
幸存者偏差:回测时只选择当前仍然存在的标的,排除退市、破产、被并购的股票。这会让你高估策略的历史收益。
前视偏差:在回测中使用未来数据(如使用当天收盘价决定当天开盘交易),或数据清洗时未对齐,导致策略在历史回测中获得了实际不可能获得的信息优势。
4.2 TickDB 历史数据的清洗规范
TickDB 提供 10 年级别的美股历史 K 线数据,但在回测前必须确认:
- 前复权处理:股价除权除息后需要前复权,否则价格序列存在跳空断层。
- 停牌过滤:停牌期间 K 线数据应排除在策略逻辑之外,否则会出现“信号发出但无法成交”的虚假回测。
- 上市时间对齐:策略信号触发时,标的必须已上市满最小回测周期(如 1 年)。
import os
import requests
# ============================================================
# TickDB 历史 K 线数据获取(用于回测)
# ============================================================
def fetch_backtest_klines(symbol: str, interval: str = "1h", limit: int = 5000):
"""
获取历史 K 线数据用于回测
注意:
- interval: 1m/5m/15m/1h/4h/1d
- limit: 最大 5000 条(分页可扩大范围)
- 数据已清洗对齐,支持前复权处理
"""
api_key = os.environ.get("TICKDB_API_KEY")
if not api_key:
raise ValueError("请设置环境变量 TICKDB_API_KEY")
response = requests.get(
"https://api.tickdb.ai/v1/market/kline",
headers={"X-API-Key": api_key},
params={
"symbol": symbol,
"interval": interval,
"limit": limit
},
timeout=(3.05, 10)
)
data = response.json()
code = data.get("code", 0)
if code == 0:
return data.get("data", [])
elif code == 2002:
raise KeyError(f"交易品种 {symbol} 不存在,请检查 symbol 格式(如 AAPL.US)")
elif code in (1001, 1002):
raise ValueError("API Key 无效,请检查环境变量 TICKDB_API_KEY")
else:
raise RuntimeError(f"获取 K 线数据失败: code={code}, message={data.get('message')}")
# 示例:获取 5 年的 AAPL 小时线数据(分页获取)
def fetch_full_backtest_data(symbol: str, interval: str = "1h", years: int = 5):
"""
分页获取多年历史数据,构建回测数据集
回测数据构建规范:
1. 上市不足 1 年的标的,排除
2. 停牌日数据,排除
3. 已做前复权处理
"""
all_klines = []
current_page = 0
page_size = 5000
max_bars = years * 252 * 6.5 * 60 # 粗略估算:每年交易日 × 每天小时数
while len(all_klines) < max_bars:
klines = fetch_backtest_klines(symbol, interval, limit=page_size)
if not klines:
break
all_klines.extend(klines)
current_page += 1
print(f"已获取 {len(all_klines)} 条 K 线(第 {current_page} 页)")
print(f"回测数据总量: {len(all_klines)} 条 | 覆盖约 {len(all_klines) / (252 * 6.5):.1f} 年")
return all_klines
4.3 走出过拟合的工程检验
| 检验方法 | 操作 | 判断标准 |
|---|---|---|
| 样本外测试 | 用最近 20% 的数据单独验证 | 样本外夏普下跌 <30% |
| 滚动窗口回测 | 每年重新训练参数,对齐实际时效性 | 各窗口参数稳定性高 |
| 蒙特卡洛模拟 | 对回测收益加随机噪声,检验敏感性 | 收益分布不出现厚尾 |
| 参数敏感性分析 | 在最优参数 ±20% 范围内测试 | 收益不应出现断崖式下跌 |
五、心态鸿沟:策略之外的隐形亏损
5.1 量化策略最怕的不是算法,是执行者
即便你的策略在回测和模拟盘中都验证无误,实盘中仍然可能因为以下原因亏损:
- 干预冲动:看到连续亏损,手动平仓,导致策略在错误时机中断
- 仓位失控:亏损后急于翻本,加大仓位,触发连环爆仓
- 信号质疑:连续回撤期间放弃策略,而此时恰好是策略即将均值回归的时刻
这些不是策略的问题,是交易执行者心理偏差的问题。
5.2 解决方案:系统化交易 + 规则隔离
将所有交易决策写成代码,而不是在盘中去判断。具体做法:
- 设定最大回撤阈值:超过阈值自动停止策略,强制锁仓
- 拆分“观察模式”和“执行模式”:模拟盘阶段只看不执行,实盘阶段只执行不调整
- 日志化所有手动干预:任何手动操作必须有书面理由存档,事后复盘
# ============================================================
# 实盘风险管理模块:最大回撤阈值自动锁仓
# ============================================================
class RiskManager:
"""
实盘风控模块:监控账户权益,触发阈值时强制停止策略
"""
def __init__(self, max_drawdown: float = 0.15, daily_loss_limit: float = 0.05):
"""
Args:
max_drawdown: 最大回撤阈值(默认 15%)
daily_loss_limit: 单日最大亏损阈值(默认 5%)
"""
self.max_drawdown = max_drawdown
self.daily_loss_limit = daily_loss_limit
self.initial_equity = None
self.daily_high = None
self.daily_start_equity = None
self.is_locked = False
def initialize(self, current_equity: float):
"""初始化权益基准(每日开盘时调用)"""
self.initial_equity = current_equity
self.daily_start_equity = current_equity
self.daily_high = current_equity
self.is_locked = False
print(f"[风控] 初始化完成 | 初始权益: {current_equity:.2f}")
def update(self, current_equity: float) -> dict:
"""
每日更新权益,检查风控阈值
Returns:
dict: 包含检查结果和建议操作
"""
if self.is_locked:
return {"status": "LOCKED", "action": "策略已锁仓,禁止下单"}
# 更新日高
if current_equity > self.daily_high:
self.daily_high = current_equity
# 计算当前回撤
if self.initial_equity is None:
self.initialize(current_equity)
current_drawdown = (self.initial_equity - current_equity) / self.initial_equity
daily_loss = (self.daily_start_equity - current_equity) / self.daily_start_equity
result = {
"current_equity": current_equity,
"max_drawdown": round(current_drawdown * 100, 2),
"daily_loss": round(daily_loss * 100, 2),
"is_locked": False,
"action": "正常交易"
}
# 检查最大回撤
if current_drawdown >= self.max_drawdown:
self.is_locked = True
result["is_locked"] = True
result["action"] = "触发最大回撤阈值,强制锁仓"
print(f"🚨 [风控告警] 当前回撤 {current_drawdown*100:.1f}% >= 阈值 {self.max_drawdown*100:.1f}%,策略已锁仓")
# 检查单日亏损
if daily_loss >= self.daily_loss_limit:
self.is_locked = True
result["is_locked"] = True
result["action"] = "触发单日亏损阈值,强制锁仓"
print(f"🚨 [风控告警] 单日亏损 {daily_loss*100:.1f}% >= 阈值 {self.daily_loss_limit*100:.1f}%,策略已锁仓")
return result
def reset_daily(self, current_equity: float):
"""每日收盘重置日线权益基准"""
self.daily_start_equity = current_equity
self.daily_high = current_equity
print(f"[风控] 日线重置 | 新基准: {current_equity:.2f}")
# 使用示例
risk_mgr = RiskManager(max_drawdown=0.15, daily_loss_limit=0.05)
risk_mgr.initialize(initial_equity=100000.0)
# 模拟每日权益更新
simulated_equity = [100000, 98500, 97200, 95800, 94000, 87000]
for equity in simulated_equity:
result = risk_mgr.update(equity)
print(
f"权益: {equity:,.0f} | "
f"回撤: {result['max_drawdown']:.1f}% | "
f"单日亏损: {result['daily_loss']:.1f}% | "
f"状态: {result['action']}"
)
if result["is_locked"]:
print("⚠️ 策略已锁仓,请人工介入检查")
break
工程预警:上述风控模块是本地层面的保护。如果你的策略托管在云服务器上,建议同时设置云端监控(CloudWatch / GCS Monitoring)并在断连时自动触发平仓指令。
六、填平五道鸿沟的系统方法论
五道鸿沟不是孤立的,它们相互影响:
- 滑点高估利润 → 心态失衡 → 人为干预 → 策略失效
- 延迟导致信号失效 → 过拟合掩盖真相 → 实盘崩溃
- 断连未处理 → 错误头寸 → 心态崩溃 → 连锁亏损
填平它们需要一套系统工程:
第一步:数据层
→ 使用 TickDB 获取清洗对齐的历史 K 线数据(前复权、停牌过滤)
→ WebSocket 实时订阅,消除轮询延迟
第二步:回测层
→ 分场景校准滑点参数(低波动/高波动/极端行情)
→ 滚动窗口 + 样本外测试,验证过拟合程度
→ 记录每笔回测成交的预估滑点,积累数据库
第三步:实盘层
→ RobustWebSocketClient 处理断连重连
→ OrderBookMonitor 实时评估滑点风险
→ RiskManager 设置最大回撤阈值自动锁仓
第四步:心态层
→ 规则化所有交易决策,不留手动干预空间
→ 日志化所有手动操作,事后复盘
→ 模拟盘阶段强制只看不执行,建立信任
结语
回测和实盘之间的差距,本质上是信息完整度的差距。
回测给你的是已知的过去,实盘给你的是未知的未来。信息不完整的地方,就是风险潜伏的地方。
滑点、延迟、断连、过拟合、心态——这五道鸿沟不是玄学,它们每一条都可以被量化、被监控、被工程化处理。
把策略从回测迁移到实盘,不是简单地“跑起来”。是把这五道鸿沟逐一填平,然后让系统自己跑,你在旁边看着。
下一步行动
如果你正在用历史数据回测策略:
访问 tickdb.ai 注册,获取 10 年级别的美股历史 K 线数据(已清洗对齐,支持前复权),确保回测数据质量。
如果你想消除实盘延迟和断连风险:
在控制台生成 API Key,配置 TICKDB_API_KEY 环境变量,部署本文提供的 RobustWebSocketClient,实盘连接 TickDB WebSocket 实时频道。
如果你需要滑点校准的历史 depth 数据:
联系 [email protected],咨询历史订单簿数据服务,支持分场景滑点参数校准。
风险提示:本文不构成任何投资建议。回测结果不代表未来收益,策略实盘前请充分评估风险,并在模拟盘阶段完成至少 3 个月的验证。量化交易存在风险,市场有风险,投资需谨慎。