策略是系统,不是圣杯

“在量化交易的漫长职业生涯中,你最大的敌人不是市场,而是你自己构建的那个'完美策略'。”

这是一个几乎每个量化交易者都会经历的心路历程:花费数月研究一个策略,在历史数据上跑出漂亮的夏普比率,上线实盘后却迅速失效。有人怪罪于市场风格的急剧切换,有人归咎于滑点和佣金的吞噬,更多人陷入无尽的自证循环——不断优化参数,不断过拟合,直到彻底失去对策略的信任。

这篇文章不提供“永不失效”的策略配方,因为这样的配方根本不存在。我们要做的是拆解策略为什么会失效失效前有哪些可观测的信号、以及如何构建一个能够自我感知、自我调整的迭代系统


一、为什么策略必然会失效

在讨论“如何构建系统”之前,必须先接受一个事实:所有策略都有生命周期。这不是悲观主义,而是量化交易的底层约束。

1.1 策略失效的三层机制

策略失效并非单一因素导致,而是三层机制叠加的结果:

第一层:市场微观结构的动态演化

订单路由的优化、做市商策略的迭代、高频交易者的博弈——这些构成了一个持续变化的微观战场。当你的策略基于某一时刻的价差分布设计,而竞争对手在三个月后压缩了这个价差,你的策略优势就会消失。

第二层:参与者行为的适应性收敛

这是行为金融学中的经典问题:当一个策略被足够多的参与者使用,策略的盈利来源本身就会被侵蚀。动量效应会因为反向交易者的增加而衰减,统计套利空间会因套利者的相互竞争而收窄。

第三层:参数空间的过拟合陷阱

即使你的策略逻辑是稳健的,过度优化参数也会导致策略在实盘中脆弱不堪。一组在历史数据上表现最优的参数,往往是对未来最无知的估计。

1.2 失效不是失败,是信号

关键在于:策略失效是一个可被观测的系统现象,而不是一个神秘的黑箱事件。

失效前的典型信号包括:

信号类型 可量化指标 触发阈值建议
盈利质量下降 单笔平均盈利 / 单笔平均亏损 连续 20 个交易日低于历史均值 30%
市场适应性减弱 策略盈亏比 vs 大盘相关性 盈亏比 < 1.0 超过 10 个交易日
波动率异常 策略收益标准差 vs 基准 标准差偏离 > 2σ
流动性敏感度上升 滑点占盈利比 超过 50%

如果你的监控系统中没有这些指标,你就是在“盲目飞行”。而如果你的策略没有预设的失效响应机制,你就是在“等待坠机”。


二、策略生命周期的四阶段模型

理解策略失效机制后,下一步是将策略视为一个具有明确生命周期的系统。

2.1 阶段定义

一个量化策略的生命周期通常包含以下四个阶段:

阶段一:研发验证期
├── 逻辑假设检验
├── 小样本回测
└── 参数敏感性分析

阶段二:实盘磨合期
├── 小资金实盘验证
├── 交易成本实测
└── 与模拟盘差异分析

阶段三:稳定盈利期
├── 仓位逐步放大
├── 日常监控完善
└── 冗余机制部署

阶段四:衰退与迭代期
├── 失效信号识别
├── 逻辑诊断与修复
└── 策略版本迭代或下线

2.2 每个阶段的核心交付物

阶段 时长参考 核心交付物 进入下一阶段的条件
研发验证期 2-4 周 策略逻辑文档、回测报告 样本外测试胜率 > 55%
实盘磨合期 1-3 月 实盘日志、滑点分析 连续 30 个交易日累计盈利
稳定盈利期 3-18 月 监控体系、应急响应手册 触发失效阈值次数 = 0
衰退与迭代期 不确定 诊断报告、迭代方案 完成诊断并给出行动决策

重要警示:许多交易者死在“阶段二”。他们在模拟盘上赚钱就迫不及待放大仓位,却忽略了实盘与模拟盘之间存在巨大的执行差异鸿沟——滑点、流动性、延迟,每一项都能让一个“完美策略”瞬间失效。


三、反馈回路:持续迭代的核心引擎

策略失效是必然的,但策略的“死亡”不是。关键是构建一套有效的反馈回路,让系统能够感知自身状态的变化并做出响应。

3.1 反馈回路的三要素

一个有效的反馈回路必须包含三个核心组件:

1. 感知层:数据采集与指标计算

这是整个系统的“感官”。你需要持续采集与策略表现相关的多维数据,包括但不限于:

  • 收益指标:日收益率、累计收益率、盈亏比、夏普比率
  • 风险指标:最大回撤、回撤持续时间、波动率
  • 执行指标:滑点实测值、执行延迟、订单簿深度变化
  • 市场状态指标:买卖价差、订单流失衡度、波动率曲面变化

2. 分析层:阈值判定与信号生成

感知层采集的数据需要与预设阈值比较,超出阈值时生成明确的告警信号。阈值设计需要遵循两个原则:

  • 保守性原则:宁可误报,不可漏报。设置较宽松的初始阈值,确认信号后再收紧。
  • 动态调整原则:阈值应随市场状态动态调整。牛市中的回撤阈值应高于熊市,因为系统性波动本身会更大。

3. 执行层:响应动作与日志记录

告警信号触发后,必须有预设的响应动作。这包括:

  • 仓位调整:减仓 X%,或暂停新开仓
  • 策略隔离:将当前策略实例与新策略分开运行
  • 日志记录:完整记录触发条件、响应动作、执行结果

3.2 反馈回路的代码架构

以下是一个简化但完整的策略监控反馈回路实现:

import os
import time
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Callable
from collections import deque
import numpy as np

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger(__name__)


@dataclass
class StrategyMetrics:
    """策略核心指标"""
    daily_returns: deque = field(default_factory=deque)
    drawdown: float = 0.0
    max_drawdown: float = 0.0
    sharpe_ratio: float = 0.0
    win_rate: float = 0.0
    profit_loss_ratio: float = 0.0
    slippage_ratio: float = 0.0
    
    # 订单簿状态指标(由外部数据源注入)
    bid_ask_spread: float = 0.0
    order_imbalance: float = 0.0


@dataclass
class AlertThresholds:
    """告警阈值配置"""
    # 收益质量
    profit_loss_ratio_min: float = 1.0
    win_rate_min: float = 0.45
    
    # 风险指标
    max_drawdown_limit: float = 0.15  # 15% 最大回撤
    daily_volatility_threshold: float = 0.03  # 单日波动 3%
    
    # 执行质量
    slippage_ratio_max: float = 0.50  # 滑点占盈利比不超过 50%
    
    # 市场状态
    spread_expansion_threshold: float = 2.0  # 价差扩大超过 2 倍历史均值
    imbalance_threshold: float = 3.0  # 订单流失衡度超过 3
    
    # 连续性要求(防止偶发性误报)
    consecutive_trigger_count: int = 3  # 连续 N 个周期触发才告警


class StrategyFeedbackLoop:
    """
    策略反馈回路:感知-分析-执行
    
    设计原则:
    1. 感知层与执行层完全解耦,便于独立测试
    2. 阈值配置外部化,支持热更新
    3. 所有告警必须记录,供后续分析
    4. 响应动作支持回调注入,便于扩展
    """
    
    def __init__(
        self,
        thresholds: AlertThresholds,
        alert_callbacks: Optional[list[Callable]] = None
    ):
        self.thresholds = thresholds
        self.alert_callbacks = alert_callbacks or []
        
        self.metrics = StrategyMetrics()
        self.consecutive_violations = {}
        self.alert_history = []
        
        # 统计状态(用于动态阈值调整)
        self.historical_spreads = deque(maxlen=100)
        self.historical_returns = deque(maxlen=60)
    
    def update_metrics(
        self,
        daily_return: float,
        trade_results: list[dict],
        order_book_state: Optional[dict] = None
    ):
        """
        更新策略指标
        
        Args:
            daily_return: 当日收益率(小数形式,如 0.02 表示 2%)
            trade_results: 当日交易结果列表
                每次交易应包含: { 'pnl': float, 'slippage': float, 'direction': str }
            order_book_state: 订单簿状态(可选)
                应包含: { 'bid_ask_spread': float, 'order_imbalance': float }
        """
        # 更新收益序列
        self.metrics.daily_returns.append(daily_return)
        self.historical_returns.append(daily_return)
        
        # 更新统计指标
        if len(self.metrics.daily_returns) > 1:
            returns = list(self.metrics.daily_returns)
            
            # 计算当前回撤
            cumulative = np.cumprod(1 + np.array(returns))
            running_max = np.maximum.accumulate(cumulative)
            self.metrics.drawdown = (cumulative[-1] - running_max[-1]) / running_max[-1]
            self.metrics.max_drawdown = max(self.metrics.max_drawdown, self.metrics.drawdown)
            
            # 计算夏普比率(年化,假设 252 交易日)
            if np.std(returns) > 0:
                self.metrics.sharpe_ratio = (
                    np.mean(returns) / np.std(returns) * np.sqrt(252)
                )
        
        # 分析交易结果
        if trade_results:
            winning_trades = [t for t in trade_results if t['pnl'] > 0]
            losing_trades = [t for t in trade_results if t['pnl'] <= 0]
            
            self.metrics.win_rate = len(winning_trades) / len(trade_results)
            
            if winning_trades and losing_trades:
                avg_win = np.mean([t['pnl'] for t in winning_trades])
                avg_loss = abs(np.mean([t['pnl'] for t in losing_trades]))
                self.metrics.profit_loss_ratio = avg_win / avg_loss if avg_loss > 0 else 0
            
            total_slippage = sum(t.get('slippage', 0) for t in trade_results)
            total_pnl = sum(t['pnl'] for t in trade_results)
            if total_pnl > 0:
                self.metrics.slippage_ratio = total_slippage / total_pnl
        
        # 更新订单簿状态
        if order_book_state:
            self.metrics.bid_ask_spread = order_book_state.get('bid_ask_spread', 0)
            self.metrics.order_imbalance = order_book_state.get('order_imbalance', 0)
            
            self.historical_spreads.append(self.metrics.bid_ask_spread)
    
    def _check_threshold(
        self,
        metric_value: float,
        threshold: float,
        metric_name: str,
        direction: str = 'below'
    ) -> bool:
        """
        检查指标是否触发阈值
        
        Args:
            metric_value: 当前指标值
            threshold: 阈值
            metric_name: 指标名称(用于日志)
            direction: 'below' 或 'above'
        """
        triggered = (
            (direction == 'below' and metric_value < threshold) or
            (direction == 'above' and metric_value > threshold)
        )
        
        if triggered:
            logger.warning(
                f"[{metric_name}] 触发阈值: 当前值={metric_value:.4f}, "
                f"阈值={threshold:.4f}, 方向={direction}"
            )
        
        return triggered
    
    def _get_dynamic_threshold(
        self,
        base_threshold: float,
        historical_values: deque,
        multiplier: float = 1.0
    ) -> float:
        """基于历史数据计算动态阈值"""
        if len(historical_values) < 10:
            return base_threshold
        
        hist_mean = np.mean(historical_values)
        hist_std = np.std(historical_values)
        
        return base_threshold * multiplier + hist_std * 0.5
    
    def run_analysis(self) -> list[dict]:
        """
        执行反馈分析,返回触发的告警列表
        
        包含三层检测:
        1. 绝对阈值检测(静态阈值)
        2. 动态阈值检测(基于历史分布)
        3. 连续性检测(防止偶发性误报)
        """
        alerts = []
        now = datetime.now().isoformat()
        
        # ========== 绝对阈值检测 ==========
        
        # 收益质量检测
        if self._check_threshold(
            self.metrics.profit_loss_ratio,
            self.thresholds.profit_loss_ratio_min,
            'profit_loss_ratio'
        ):
            alerts.append({
                'timestamp': now,
                'level': 'WARNING',
                'type': 'PROFIT_QUALITY',
                'message': f'盈亏比 {self.metrics.profit_loss_ratio:.2f} 低于阈值',
                'action': 'REDUCE_POSITION'
            })
        
        # 胜率检测
        if self._check_threshold(
            self.metrics.win_rate,
            self.thresholds.win_rate_min,
            'win_rate'
        ):
            alerts.append({
                'timestamp': now,
                'level': 'WARNING',
                'type': 'WIN_RATE',
                'message': f'胜率 {self.metrics.win_rate:.2%} 低于阈值',
                'action': 'REDUCE_POSITION'
            })
        
        # 最大回撤检测
        if self._check_threshold(
            self.metrics.max_drawdown,
            self.thresholds.max_drawdown_limit,
            'max_drawdown'
        ):
            alerts.append({
                'timestamp': now,
                'level': 'CRITICAL',
                'type': 'MAX_DRAWDOWN',
                'message': f'最大回撤 {self.metrics.max_drawdown:.2%} 超过限制',
                'action': 'STOP_STRATEGY'
            })
        
        # 滑点占比检测
        if self._check_threshold(
            self.metrics.slippage_ratio,
            self.thresholds.slippage_ratio_max,
            'slippage_ratio'
        ):
            alerts.append({
                'timestamp': now,
                'level': 'WARNING',
                'type': 'SLIPPAGE',
                'message': f'滑点占比 {self.metrics.slippage_ratio:.2%} 过高',
                'action': 'REDUCE_POSITION'
            })
        
        # ========== 动态阈值检测(订单簿状态) ==========
        
        if len(self.historical_spreads) >= 20:
            dynamic_spread_threshold = self._get_dynamic_threshold(
                np.mean(self.historical_spreads),
                self.historical_spreads,
                multiplier=2.0
            )
            
            if self.metrics.bid_ask_spread > dynamic_spread_threshold:
                alerts.append({
                    'timestamp': now,
                    'level': 'INFO',
                    'type': 'MARKET_CONDITION',
                    'message': f'买卖价差扩大,可能影响策略表现',
                    'action': 'MONITOR_ONLY'
                })
        
        # ========== 连续性检测 ==========
        
        for i, alert in enumerate(alerts):
            alert_type = alert['type']
            
            # 初始化或更新连续计数
            if alert_type not in self.consecutive_violations:
                self.consecutive_violations[alert_type] = 0
            
            self.consecutive_violations[alert_type] += 1
            
            # 检查是否连续触发
            if self.consecutive_violations[alert_type] >= self.thresholds.consecutive_trigger_count:
                alert['consecutive_count'] = self.consecutive_violations[alert_type]
                alert['confirmed'] = True
                
                # 升级告警级别
                if alert['level'] == 'WARNING':
                    alert['level'] = 'CRITICAL'
                    alert['message'] = f"[确认] {alert['message']}"
            else:
                alert['consecutive_count'] = self.consecutive_violations[alert_type]
                alert['confirmed'] = False
        
        # 重置未触发的告警计数
        all_types = {'PROFIT_QUALITY', 'WIN_RATE', 'MAX_DRAWDOWN', 'SLIPPAGE', 'MARKET_CONDITION'}
        triggered_types = {a['type'] for a in alerts}
        for t in all_types - triggered_types:
            if t in self.consecutive_violations:
                self.consecutive_violations[t] = 0
        
        return alerts
    
    def execute_response(self, alerts: list[dict]):
        """
        执行响应动作
        
        ⚠️ 生产环境建议:
        - STOP_STRATEGY 操作应要求二次确认
        - 执行后应发送通知到监控渠道
        - 所有响应操作必须记录到审计日志
        """
        for alert in alerts:
            if not alert.get('confirmed', False):
                continue
            
            action = alert['action']
            
            logger.critical(f"[响应执行] {alert['message']}, 动作: {action}")
            
            # 触发回调
            for callback in self.alert_callbacks:
                try:
                    callback(alert)
                except Exception as e:
                    logger.error(f"[响应回调异常] {e}")
            
            # 记录到告警历史
            self.alert_history.append(alert)
            
            # ⚠️ 生产环境应实现具体动作逻辑:
            # if action == 'STOP_STRATEGY':
            #     self._stop_strategy()
            # elif action == 'REDUCE_POSITION':
            #     self._reduce_position(factor=0.5)
    
    def run_full_cycle(
        self,
        daily_return: float,
        trade_results: list[dict],
        order_book_state: Optional[dict] = None
    ):
        """
        运行完整的反馈循环
        
        封装了 update -> analyze -> response 的完整流程
        """
        self.update_metrics(daily_return, trade_results, order_book_state)
        alerts = self.run_analysis()
        
        if alerts:
            self.execute_response(alerts)
        
        return alerts

3.3 反馈回路的工程警示

以上代码是一个最小化可行产品。在生产环境中,以下问题必须解决:

问题 建议
数据持久化 告警历史应写入数据库,支持长期分析和回溯
异步处理 反馈循环应与交易执行线程分离,避免阻塞
告警通知 必须有飞书/邮件/Slack 等通知渠道
阈值热更新 生产环境应支持不重启服务更新阈值配置
测试覆盖 必须有单元测试覆盖各检测逻辑
二次确认 STOP_STRATEGY 等高风险动作应有确认机制

四、冗余设计:系统韧性的底层保障

反馈回路解决的是“感知和响应”的问题,而冗余设计解决的是“在响应过程中系统不崩溃”的问题。

4.1 冗余的三层含义

在量化交易系统的语境下,“冗余”不是简单的“备机冗余”,而是一个多层次的设计原则:

第一层:数据冗余

任何策略所依赖的数据源都可能出现延迟、断连或数据错误。系统在设计时必须考虑:

  • 多个数据源并行获取,选取最早返回且通过校验的数据
  • 断连时的数据补全机制
  • 异常数据的识别和剔除规则

第二层:逻辑冗余

策略的交易逻辑应有多个层次,当主逻辑失效时,备用逻辑可以继续运行或安全退出:

  • 主策略触发止损条件 → 执行预设止损,同时降低新开仓频率
  • 主策略连续亏损 N 个周期 → 暂停主策略,自动切换到低频观察模式
  • 市场状态异常 → 缩小仓位,同时扩大止盈止损阈值

第三层:执行冗余

订单执行层面可能出现滑点超出预期、订单被拒、成交报告延迟等情况:

  • 订单发送后必须等待确认,超时重试
  • 同一信号的多笔订单应有成交汇总机制
  • 交易记录与账户实际持仓应定期对账

4.2 冗余机制的代码示例

以下是一个简化的仓位管理和应急响应的实现框架:

import threading
from enum import Enum
from typing import Optional
from dataclasses import dataclass


class StrategyState(Enum):
    """策略状态机"""
    NORMAL = 'normal'
    DEGRADED = 'degraded'      # 降级运行(降低仓位)
    OBSERVATION = 'observation' # 仅观察,不交易
    STOPPED = 'stopped'        # 完全停止


@dataclass
class PositionConfig:
    """仓位配置"""
    base_size: float = 1.0       # 基础仓位
    degraded_factor: float = 0.5  # 降级时的仓位缩减系数
    observation_factor: float = 0.0  # 观察模式不下单
    
    max_position: float = 10.0   # 最大仓位上限
    emergency_stop_loss: float = 0.15  # 紧急止损线(15%)


@dataclass
class StrategyContext:
    """策略运行上下文(线程安全)"""
    state: StrategyState = StrategyState.NORMAL
    current_size: float = 1.0
    consecutive_losses: int = 0
    last_update: Optional[datetime] = None
    
    _lock: threading.Lock = field(default_factory=threading.Lock)


class RedundantPositionManager:
    """
    冗余仓位管理器
    
    职责:
    1. 维护策略状态机
    2. 根据状态计算实际仓位
    3. 处理连续亏损等降级条件
    4. 提供紧急停止接口
    """
    
    def __init__(self, config: PositionConfig):
        self.config = config
        self.context = StrategyContext()
        
        # 降级条件配置
        self.consecutive_loss_threshold = 5
        self.loss_recovery_threshold = 3  # 连续盈利 N 次后恢复
    
    def update_state(
        self,
        trade_result: Optional[float] = None,
        current_drawdown: float = 0.0
    ):
        """
        更新策略状态
        
        Args:
            trade_result: 交易结果(盈利为正,亏损为负),None 表示无交易
            current_drawdown: 当前回撤
        """
        with self.context._lock:
            now = datetime.now()
            
            # 处理交易结果
            if trade_result is not None:
                if trade_result < 0:
                    self.context.consecutive_losses += 1
                else:
                    self.context.consecutive_losses = max(
                        0, 
                        self.context.consecutive_losses - 1
                    )
            
            self.context.last_update = now
            
            # 状态转换逻辑
            self._evaluate_state_transition(current_drawdown)
            
            # 更新实际仓位
            self._update_position_size()
    
    def _evaluate_state_transition(self, current_drawdown: float):
        """评估并执行状态转换"""
        current_state = self.context.state
        new_state = current_state
        
        # 紧急停止条件
        if current_drawdown >= self.config.emergency_stop_loss:
            new_state = StrategyState.STOPPED
            print(f"[紧急停止] 回撤 {current_drawdown:.2%} 超过限制")
        
        # 降级条件
        elif (
            self.context.consecutive_losses >= self.consecutive_loss_threshold
            and current_state == StrategyState.NORMAL
        ):
            new_state = StrategyState.DEGRADED
            print(f"[降级运行] 连续亏损 {self.context.consecutive_losses} 次")
        
        # 恢复条件(从降级恢复到正常)
        elif (
            self.context.consecutive_losses < self.consecutive_loss_threshold / 2
            and current_state == StrategyState.DEGRADED
        ):
            new_state = StrategyState.NORMAL
            print(f"[恢复正常] 连续盈利 {self.loss_recovery_threshold}+ 次")
        
        # 完全停止条件
        elif (
            self.context.consecutive_losses >= self.consecutive_loss_threshold * 2
            and current_state == StrategyState.DEGRADED
        ):
            new_state = StrategyState.STOPPED
            print(f"[强制停止] 降级后继续亏损,需要人工介入")
        
        # 仅观察模式(降级后连续盈利未完全恢复)
        elif (
            self.context.consecutive_losses < self.consecutive_loss_threshold
            and self.context.consecutive_losses > 0
            and current_state == StrategyState.STOPPED
        ):
            # 从停止恢复时,先进入仅观察
            new_state = StrategyState.OBSERVATION
        
        self.context.state = new_state
    
    def _update_position_size(self):
        """根据状态更新实际仓位"""
        state = self.context.state
        base = self.config.base_size
        
        if state == StrategyState.NORMAL:
            self.context.current_size = base
        elif state == StrategyState.DEGRADED:
            self.context.current_size = base * self.config.degraded_factor
        elif state == StrategyState.OBSERVATION:
            self.context.current_size = base * self.config.observation_factor
        elif state == StrategyState.STOPPED:
            self.context.current_size = 0.0
        
        # 限制最大仓位
        self.context.current_size = min(
            self.context.current_size,
            self.config.max_position
        )
    
    def get_position_size(self) -> float:
        """获取当前应使用的仓位"""
        with self.context._lock:
            return self.context.current_size
    
    def get_state(self) -> StrategyState:
        """获取当前策略状态"""
        with self.context._lock:
            return self.context.state
    
    def force_stop(self, reason: str):
        """强制停止策略(人工介入)"""
        with self.context._lock:
            self.context.state = StrategyState.STOPPED
            self.context.current_size = 0.0
            print(f"[人工停止] {reason}")
    
    def force_resume(self):
        """强制恢复策略(人工介入)"""
        with self.context._lock:
            self.context.state = StrategyState.OBSERVATION
            self.context.consecutive_losses = 0
            print(f"[人工恢复] 从观察模式开始")

五、持续迭代的工程实践

反馈回路和冗余设计解决的是“如何感知问题”和“如何防止崩溃”,但策略的真正生命力来自持续的迭代

5.1 迭代的节奏控制

迭代不是“随时优化参数”,而是有节奏、有依据的过程。

日常级别(每个交易日)

  • 监控指标是否在阈值内
  • 记录异常事件(滑点放大、延迟增加)
  • 更新日志,不做参数修改

周级别

  • 汇总一周的监控数据
  • 分析告警事件的原因
  • 评估是否需要进入诊断流程

月级别

  • 策略表现评估(与基准对比)
  • 回顾逻辑假设是否仍然成立
  • 制定迭代或下线决策

5.2 迭代决策树

策略表现低于预期
│
├── 是否在阈值内?
│   ├── 是 → 持续监控,记录异常
│   └── 否 → 进入诊断流程
│
├── 诊断:是市场问题还是策略问题?
│   ├── 市场问题 → 评估是否等待市场恢复
│   └── 策略问题 → 进入逻辑审查
│
├── 逻辑审查:核心假设是否还成立?
│   ├── 是 → 调整参数或仓位,继续运行
│   └── 否 → 制定新版本策略计划
│
└── 决策:迭代 or 下线

5.3 版本管理的工程规范

当决定进行策略迭代时,必须遵循以下工程规范:

1. 保留旧版本

永远不要在线上直接修改策略逻辑。每次迭代应生成新的策略版本,旧版本保留至少 30 天的完整日志。这样当新版本出现问题时可以快速回滚。

2. 并行运行

新版本策略应在隔离环境中与旧版本并行运行,观察期不少于 2 周。比较维度包括:收益、回撤、执行质量。

3. 渐进式切换

新版本验证通过后,不是一次性全量切换,而是逐步增加权重:10% → 30% → 50% → 100%,每个阶段观察 3-5 个交易日。

4. 完整记录

每个版本必须记录:

  • 创建时间和创建人
  • 修改的具体逻辑或参数
  • 验证方式和验证结果
  • 切换时间和切换比例

六、从策略到系统:认知的跃迁

回到最初的问题:为什么没有“永远赚钱”的策略?

答案很简单:因为市场不是静态的,而你的策略如果也不进化,它必然会被市场淘汰。

这不是策略的失败,而是所有交易系统都必须面对的现实约束。

真正的问题不是“哪个策略更好”,而是你是否有能力构建一个能够持续感知、响应、进化的交易系统

这个系统包括:

  • 感知层:实时监控策略表现、市场状态、执行质量
  • 分析层:阈值判定、信号生成、诊断逻辑
  • 执行层:仓位管理、状态切换、应急响应
  • 迭代层:版本管理、并行验证、渐进切换

当你把这个系统构建起来,你会发现:策略不再是那个需要被寻找的“圣杯”,而是系统中一个可以被替换的组件


下一步行动

如果你在构建策略的路上屡次受挫:建议你先暂停策略开发,用一周时间搭建本文所述的监控和反馈系统。没有监控的策略就像没有仪表盘的飞机——你不知道自己在往哪里飞。

如果你已经有了策略但缺少迭代机制:从本文的反馈回路代码开始,在你的策略中嵌入这套监控逻辑。从“连续 N 次亏损降低仓位”这样的简单规则开始,逐步增加复杂性。

如果你希望进一步深入量化系统的工程实践:关注 TickDB 的技术专栏,后续会有更多关于回测框架、订单执行、实盘监控的深度文章。


风险提示:本文不构成任何投资建议。量化交易涉及复杂的金融风险,任何策略在实盘运行前必须经过充分的验证和风险评估。市场有风险,投资需谨慎。