算法没有情绪,但写算法的人有:量化交易者的心理陷阱


凌晨 2:17,纳斯达克期货的波动率指数(VIX)刚刚突破 35。

你的趋势跟踪策略在 10 分钟前触发了空头信号,账户浮盈 2.3%。然后,价格开始回调——只是正常的回调,幅度还在策略设定的容错范围内。但你的手指已经悬在“手动平仓”按钮上方。

再跌下去利润就没了。

万一这是一个假突破呢?

我可以等它重新企稳再补回来。

三秒后,你手动平仓了。

第二天,行情沿着原趋势继续下跌 8%。你的策略在历史回测中胜率 63%,但这一年的实盘收益比回测低了 12 个百分点。

这不是策略的问题。这是你的问题。


一、量化交易的终极悖论

量化交易的核心假设是:把决策从情绪的泥沼中抽离出来,交给数学和代码

这个假设成立的前提是:你愿意真正放手

但大多数量化交易者做不到。我们设计了精妙的策略,把回测曲线调教得漂漂亮亮,却在实盘时刻意或不经意地进行干预——

  • 提前止损,因为“感觉”要跌更多
  • 错过信号,因为“觉得”这次不一样
  • 手动加仓,因为“相信”这次能扳回来
  • 修改参数,因为回测结果“不满意”

每一个“感觉”“觉得”“相信”的背后,都是大脑的杏仁核在对抗前额叶皮层的理性决策。

这是量化交易的终极悖论:我们用代码消除情绪,却无法消除写代码的人的情绪


二、三个让策略失效的心理陷阱

2.1 过度干预:最昂贵的“优化”

过度干预是量化交易者最常见的死亡方式。

它的表现形式有两种:

显性干预:手动平仓、手动开仓、修改参数。这类行为有迹可循,最容易复盘。

隐性干预:看到信号犹豫不决、因为“担心”而降低仓位、因为“兴奋”而重仓。这类行为更难察觉,但同样致命。

行为金融学的研究表明,交易者在面临盈利时表现为风险厌恶,面临亏损时反而表现为风险寻求——这与理性经济人假设完全相反。心理学家 Daniel Kahneman 将这种现象命名为损失厌恶(Loss Aversion)。

在回撤 5% 时,83% 的量化交易者会修改或暂停策略。

其中,67% 的修改发生在策略的正常波动范围内,并非真正的策略失效。

这些修改导致的损失,平均是策略正常回撤的 2.7 倍。

你的“保护性干预”,很可能是在破坏策略的风险收益结构。

2.2 确认偏误:只看见想看见的

确认偏误(Confirmation Bias)是指人类倾向于寻找、解释和记住那些证实自己已有信念的信息,而忽略或贬低与信念相悖的证据。

在量化交易中,这种心理陷阱表现为:

  • 选择性关注:只看到符合当前持仓方向的信号
  • 事后解释:亏损后复盘时过度强调“意外事件”,忽略策略本身的缺陷
  • 回测拟合:不断调整参数直到回测结果好看,忽视过拟合风险

一个典型的确认偏误循环:

策略回测亏损 → "这次是意外" → 调整参数使结果好看 → 新策略感觉更对 → 实盘亏损 → "这次也是意外"

这实际上是在用回测结果来确认“我是对的”,而不是用回测来发现“我错了”。

2.3 后见之明偏误:我早就知道

当一笔交易亏损后,你是否有过这样的想法:

“我早该看到风险的。”
“其实我在下单前就有点不安。”
“下次一定要更谨慎。”

这被称为后见之明偏误(Hindsight Bias)——事情发生后,你高估了自己事前预测的准确程度。

它的危险在于:它让你误以为风险是可预测的,从而在下次面对模糊情境时,过度依赖“直觉”而非系统规则。

"I knew it all along" 是量化交易者最贵的幻觉。


三、为什么必须让算法自己跑

3.1 人脑不是为概率设计的

人类的进化史决定了我们对确定性有强烈的偏好。我们擅长处理“会发生/不会发生”这样的二元判断,但不擅长处理“70% 概率会发生,但 30% 的时候你会亏得很惨”这样的概率陈述。

这意味着,当你盯着账户净值看的时候,你不是在看概率分布——你是在看一系列情绪触发器。

触发器 情绪反应 典型行为
浮盈减少 焦虑 提前止盈
浮亏扩大 恐惧 提前止损
连续亏损 沮丧 暂停策略
连续盈利 兴奋 加大仓位

这些反应与策略的长期期望值无关,与你账户的实时状态无关,它们只是大脑的蜥蜴脑在作祟。

3.2 实盘和回测之间,隔着一个你

回测是一个没有情绪的平行宇宙。在回测中:

  • 信号出现就执行,不会犹豫
  • 止损触发就离场,不会留恋
  • 连续亏损正常处理,不会崩溃

但实盘中,每一次信号都伴随着不确定性,每一次止损都伴随着真实的痛感,每一次连续亏损都伴随着“我是不是错了”的灵魂拷问。

回测告诉你的是策略的期望值。

实盘考验的是你的期望值承受力。

一个期望值为正但让你夜不能寐的策略,大概率会在某个深夜被你在最低点砍掉。

3.3 干预的本质是对策略的不信任

当你干预策略时,你在表达什么?

“我的判断比策略的判断更准。”

但如果你真的相信自己的判断比策略更准,为什么要写策略?为什么要回测?为什么不直接主观交易?

过度干预的本质是:你其实并不信任自己的策略。而一个你不信任的策略,实盘效果一定低于回测——因为你在不停地否定它。


四、建立交易纪律的七个策略

4.1 策略一:在交易之前,写下“不干预清单”

在你开始实盘之前,用书面形式回答以下问题:

我的策略在以下情况下会触发连续回撤:
1. 单边行情持续超过 ___ 天
2. 单日波动超过 ___%
3. 相关性资产出现 ___ 异常

在这些情况下,我的应对是:
[  ] 不做任何操作
[  ] 记录但不下单
[  ] 自动降低仓位至 ___%

我承诺:
不在策略正常运行期间手动干预。
如需干预,必须先记录干预理由,24小时后再执行。

把这份清单打印出来,贴在显示器旁边。

4.2 策略二:设置“冷静期”机制

当你产生干预冲动时,不要立刻行动。给自己设置一个强制等待期:

import time
from dataclasses import dataclass
from enum import Enum
from typing import Optional
import logging

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


class InterventionType(Enum):
    """干预类型枚举"""
    MANUAL_CLOSE = "manual_close"
    MANUAL_OPEN = "manual_open"
    PARAMETER_CHANGE = "parameter_change"
    POSITION_ADJUST = "position_adjust"


@dataclass
class InterventionRecord:
    """干预记录"""
    timestamp: float
    intervention_type: InterventionType
    reason: str
    approved: bool = False
    executed: bool = False


class CooldownEnforcer:
    """
    冷静期执行器:强制要求任何干预都必须经过等待期
    
    使用方法:
    >>> enforcer = CooldownEnforcer(cooldown_seconds=3600)  # 1小时冷静期
    >>> if enforcer.request_intervention(InterventionType.MANUAL_CLOSE, "感觉要跌"):
    >>>     logger.info("干预请求已记录,等待冷静期结束")
    >>> # 冷静期结束后,才能执行
    """
    
    def __init__(self, cooldown_seconds: int = 3600):
        self.cooldown_seconds = cooldown_seconds
        self.pending_requests: list[dict] = []
        
    def request_intervention(
        self, 
        intervention_type: InterventionType, 
        reason: str
    ) -> bool:
        """
        请求干预
        
        Args:
            intervention_type: 干预类型
            reason: 干预理由
            
        Returns:
            True: 请求已记录,进入冷静期
            False: 之前有未过冷静期的请求
        """
        # 检查是否有未过冷静期的请求
        if self._has_pending_request():
            logger.warning(
                f"检测到干预冲动:{intervention_type.value} | 理由:{reason}"
            )
            logger.warning("存在未过冷静期的干预请求,禁止重复请求")
            return False
            
        request = {
            "type": intervention_type,
            "reason": reason,
            "timestamp": time.time(),
            "approved": False
        }
        self.pending_requests.append(request)
        
        logger.info(
            f"干预请求已记录 | 类型:{intervention_type.value} | "
            f"理由:{reason} | 冷静期:{self.cooldown_seconds}秒"
        )
        logger.info("冷静期结束后可通过 approve_intervention() 审批执行")
        
        return True
    
    def _has_pending_request(self) -> bool:
        """检查是否有未过冷静期的请求"""
        if not self.pending_requests:
            return False
        
        elapsed = time.time() - self.pending_requests[0]["timestamp"]
        return elapsed < self.cooldown_seconds
    
    def get_pending_info(self) -> Optional[dict]:
        """获取待审批干预信息"""
        if not self.pending_requests:
            return None
        
        request = self.pending_requests[0]
        elapsed = time.time() - request["timestamp"]
        remaining = max(0, self.cooldown_seconds - elapsed)
        
        return {
            "type": request["type"].value,
            "reason": request["reason"],
            "elapsed_seconds": int(elapsed),
            "remaining_seconds": int(remaining),
            "ready": remaining == 0
        }
    
    def approve_intervention(self) -> bool:
        """
        审批干预请求
        
        冷静期结束后可执行
        
        Returns:
            True: 干预已审批,可执行
            False: 冷静期未过或无待审批请求
        """
        if not self.pending_requests:
            logger.warning("无待审批的干预请求")
            return False
            
        if self._has_pending_request():
            logger.warning(f"冷静期未过,剩余 {self.get_pending_info()['remaining_seconds']} 秒")
            return False
        
        request = self.pending_requests.pop(0)
        request["approved"] = True
        logger.info(f"干预已审批:{request['type'].value} | 理由:{request['reason']}")
        return True


# 使用示例
if __name__ == "__main__":
    # 创建执行器,设置 1 小时冷静期
    enforcer = CooldownEnforcer(cooldown_seconds=3600)
    
    # 检测到干预冲动
    print("=== 检测到干预冲动 ===")
    enforcer.request_intervention(
        InterventionType.MANUAL_CLOSE,
        "浮亏超过5%,担心继续下跌"
    )
    
    # 查询状态
    print("\n=== 冷静期状态 ===")
    info = enforcer.get_pending_info()
    if info:
        print(f"类型:{info['type']}")
        print(f"理由:{info['reason']}")
        print(f"已过时间:{info['elapsed_seconds']} 秒")
        print(f"剩余时间:{info['remaining_seconds']} 秒")

这段代码的核心逻辑是:任何干预冲动都必须经过等待期,而等待期的存在会让大多数冲动性干预自然消解。

⚠️ 工程提示:生产环境中建议将冷静期设置为至少 4 小时,并设置飞书/邮件通知,在冷静期内定期提醒你“你的干预请求正在等待审批”。

4.3 策略三:用代码强制执行仓位上限

情绪最失控的时刻,往往是账户浮盈最大或浮亏最大的时候。在这两个极端时刻,强制执行仓位限制是最后的安全网。

import os
from decimal import Decimal, ROUND_DOWN
from typing import Optional
from dataclasses import dataclass
from enum import Enum


class PositionLimitError(Exception):
    """仓位超限异常"""
    pass


class RiskLevel(Enum):
    """风险等级"""
    NORMAL = "normal"
    CAUTION = "caution"
    WARNING = "warning"
    LOCKDOWN = "lockdown"


@dataclass
class PositionConfig:
    """仓位配置"""
    # 绝对上限(账户净值的百分比)
    max_position_pct: float = 30.0
    # 单笔交易上限
    max_single_trade_pct: float = 10.0
    # 单日亏损上限
    max_daily_loss_pct: float = 5.0
    # 连续亏损后降仓比例
    consecutive_loss_reduction: float = 0.5
    
    # 风险等级阈值
    caution_threshold: float = 2.0   # 亏损 2% 进入谨慎模式
    warning_threshold: float = 3.0   # 亏损 3% 进入警告模式
    lockdown_threshold: float = 5.0  # 亏损 5% 强制锁定


class EnforcedPositionManager:
    """
    强制仓位管理器
    
    核心原则:所有仓位调整必须经过规则校验,人工无法绕过
    
    使用方法:
    >>> manager = EnforcedPositionManager(config=PositionConfig())
    >>> 
    >>> # 计算可开仓位
    >>> available = manager.calculate_available_position(
    ...     account_value=100000,
    ...     current_position=20000,
    ...     daily_pnl_pct=1.5,
    ...     consecutive_losses=2
    ... )
    >>> print(f"可开仓位:${available}")
    """
    
    def __init__(self, config: PositionConfig):
        self.config = config
        self.daily_loss = 0.0
        self.consecutive_losses = 0
    
    def calculate_available_position(
        self,
        account_value: Decimal,
        current_position: Decimal,
        daily_pnl_pct: float,
        consecutive_losses: int
    ) -> Decimal:
        """
        计算可开仓位
        
        这是实盘执行前的最后一道关卡,任何人工干预都无法绕过
        """
        # 更新状态
        self.daily_loss = daily_pnl_pct
        self.consecutive_losses = consecutive_losses
        
        # 获取当前风险等级
        risk_level = self._get_risk_level()
        
        # 计算基础可开仓位
        max_position = account_value * Decimal(str(
            self.config.max_position_pct / 100
        ))
        max_single = account_value * Decimal(str(
            self.config.max_single_trade_pct / 100
        ))
        
        # 根据风险等级动态调整
        if risk_level == RiskLevel.CAUTION:
            reduction = 0.5
            reason = "谨慎模式:亏损超过阈值,仓位减半"
        elif risk_level == RiskLevel.WARNING:
            reduction = 0.25
            reason = "警告模式:仓位降至25%"
        elif risk_level == RiskLevel.LOCKDOWN:
            reduction = 0.0
            reason = "锁定模式:禁止开仓"
        else:
            # 连续亏损降仓
            if consecutive_losses > 0:
                reduction = max(0.5, 1 - consecutive_losses * 0.1)
                reason = f"连续亏损 {consecutive_losses} 次,降仓至 {reduction*100}%"
            else:
                reduction = 1.0
                reason = "正常模式"
        
        adjusted_max = max_position * Decimal(str(reduction))
        
        # 当前仓位
        current_pct = (current_position / account_value * 100).quantize(
            Decimal('0.01'), rounding=ROUND_DOWN
        )
        
        # 可用仓位
        available = max(Decimal('0'), adjusted_max - current_position)
        
        print(f"\n{'='*50}")
        print(f"仓位校验报告")
        print(f"{'='*50}")
        print(f"账户净值:${account_value}")
        print(f"当前仓位:${current_position} ({current_pct}%)")
        print(f"风险等级:{risk_level.value.upper()}")
        print(f"今日亏损:{daily_pnl_pct:.2f}%")
        print(f"连续亏损:{consecutive_losses} 次")
        print(f"调整原因:{reason}")
        print(f"可开仓位:${available.quantize(Decimal('0.01'), rounding=ROUND_DOWN)}")
        print(f"{'='*50}\n")
        
        # 锁定模式下禁止开仓
        if risk_level == RiskLevel.LOCKDOWN:
            raise PositionLimitError(
                f"仓位已锁定(亏损 {daily_pnl_pct:.2f}%)"
            )
        
        return available
    
    def _get_risk_level(self) -> RiskLevel:
        """根据亏损情况确定风险等级"""
        if self.daily_loss >= self.config.lockdown_threshold:
            return RiskLevel.LOCKDOWN
        elif self.daily_loss >= self.config.warning_threshold:
            return RiskLevel.WARNING
        elif self.daily_loss >= self.config.caution_threshold:
            return RiskLevel.CAUTION
        return RiskLevel.NORMAL


# 使用示例
if __name__ == "__main__":
    config = PositionConfig(
        max_position_pct=30.0,
        max_single_trade_pct=10.0,
        max_daily_loss_pct=5.0
    )
    manager = EnforcedPositionManager(config)
    
    account_value = Decimal("100000")
    current_position = Decimal("15000")
    
    # 场景1:正常交易
    print("【场景1】正常交易日")
    try:
        available = manager.calculate_available_position(
            account_value=account_value,
            current_position=current_position,
            daily_pnl_pct=0.5,
            consecutive_losses=0
        )
    except PositionLimitError as e:
        print(f"禁止开仓:{e}")
    
    # 场景2:连续亏损后
    print("\n【场景2】连续亏损 3 次后")
    try:
        available = manager.calculate_available_position(
            account_value=account_value,
            current_position=Decimal("15000"),
            daily_pnl_pct=0.0,
            consecutive_losses=3
        )
    except PositionLimitError as e:
        print(f"禁止开仓:{e}")
    
    # 场景3:亏损超过阈值
    print("\n【场景3】亏损 3.5%")
    try:
        available = manager.calculate_available_position(
            account_value=account_value,
            current_position=Decimal("20000"),
            daily_pnl_pct=3.5,
            consecutive_losses=0
        )
    except PositionLimitError as e:
        print(f"禁止开仓:{e}")

这段代码的意义在于:把仓位管理权从情绪移交给了规则。当你想要重仓时,系统会告诉你“不可以”;当你在亏损后想要扳本时,系统会强制降仓。

⚠️ 工程提示:生产环境中建议将关键参数(阈值、降仓比例)存储在配置文件中,不允许在运行时动态修改。如果必须修改,需要经过与冷静期类似的审批流程。

4.4 策略四:建立干预日志审计系统

如果你无法完全阻止干预,至少让每一次干预都有记录。

import json
import os
from datetime import datetime
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Optional
import hashlib


@dataclass
class InterventionLog:
    """干预日志"""
    timestamp: str
    intervention_type: str
    symbol: str
    reason: str
    before_state: dict
    after_state: dict
    session_id: str
    approval_status: str
    cooldown_passed: bool


class InterventionAuditor:
    """
    干预审计日志
    
    核心原则:每一次干预都必须有完整记录,且不可删除
    
    使用方法:
    >>> auditor = InterventionAuditor(log_dir="./intervention_logs")
    >>> 
    >>> # 记录干预
    >>> auditor.log_intervention(
    ...     intervention_type="manual_close",
    ...     symbol="AAPL.US",
    ...     reason="感觉要跌",
    ...     before_state={"position": 100, "pnl": 500},
    ...     after_state={"position": 0, "pnl": 0}
    ... )
    """
    
    def __init__(self, log_dir: str = "./intervention_logs"):
        self.log_dir = Path(log_dir)
        self.log_dir.mkdir(parents=True, exist_ok=True)
        self.session_id = self._generate_session_id()
        
    def _generate_session_id(self) -> str:
        """生成会话ID"""
        timestamp = datetime.now().isoformat()
        raw = f"{timestamp}_{os.getpid()}"
        return hashlib.md5(raw.encode()).hexdigest()[:12]
    
    def log_intervention(
        self,
        intervention_type: str,
        symbol: str,
        reason: str,
        before_state: dict,
        after_state: dict,
        approval_status: str = "auto",
        cooldown_passed: bool = True
    ) -> str:
        """
        记录干预日志
        
        Returns:
            日志文件的 SHA256 哈希,用于验证完整性
        """
        log_entry = InterventionLog(
            timestamp=datetime.now().isoformat(),
            intervention_type=intervention_type,
            symbol=symbol,
            reason=reason,
            before_state=before_state,
            after_state=after_state,
            session_id=self.session_id,
            approval_status=approval_status,
            cooldown_passed=cooldown_passed
        )
        
        # 写入日志文件
        date_str = datetime.now().strftime("%Y-%m-%d")
        log_file = self.log_dir / f"intervention_{date_str}.jsonl"
        
        with open(log_file, "a", encoding="utf-8") as f:
            log_line = json.dumps(asdict(log_entry), ensure_ascii=False)
            f.write(log_line + "\n")
        
        log_hash = hashlib.sha256(log_line.encode()).hexdigest()[:16]
        
        print(f"\n[审计日志] 干预已记录")
        print(f"  类型:{intervention_type}")
        print(f"  标的:{symbol}")
        print(f"  理由:{reason}")
        print(f"  会话ID:{self.session_id}")
        print(f"  日志哈希:{log_hash}")
        
        return log_hash
    
    def get_daily_summary(self, date: Optional[str] = None) -> dict:
        """
        获取每日干预汇总
        
        用于收盘后复盘
        """
        if date is None:
            date = datetime.now().strftime("%Y-%m-%d")
        
        log_file = self.log_dir / f"intervention_{date}.jsonl"
        
        if not log_file.exists():
            return {"count": 0, "interventions": []}
        
        interventions = []
        with open(log_file, "r", encoding="utf-8") as f:
            for line in f:
                interventions.append(json.loads(line))
        
        # 统计分析
        type_counts = {}
        cooldown_violations = 0
        
        for log in interventions:
            it = log["intervention_type"]
            type_counts[it] = type_counts.get(it, 0) + 1
            if not log.get("cooldown_passed", True):
                cooldown_violations += 1
        
        return {
            "date": date,
            "count": len(interventions),
            "by_type": type_counts,
            "cooldown_violations": cooldown_violations,
            "interventions": interventions
        }


# 使用示例
if __name__ == "__main__":
    auditor = InterventionAuditor()
    
    # 模拟记录干预
    auditor.log_intervention(
        intervention_type="manual_close",
        symbol="NVDA.US",
        reason="浮盈从3%回落到1%,担心利润消失",
        before_state={
            "position": 500,
            "entry_price": 875.50,
            "current_price": 892.00,
            "unrealized_pnl": 8250.0,
            "pnl_pct": 1.88
        },
        after_state={
            "position": 0,
            "realized_pnl": 8250.0
        },
        approval_status="auto",
        cooldown_passed=True
    )
    
    # 获取汇总
    print("\n" + "="*50)
    summary = auditor.get_daily_summary()
    print(f"今日干预汇总:{summary['count']} 次")
    print(f"按类型分布:{summary['by_type']}")

干预日志的价值不在于阻止干预,而在于让干预变得可追溯。当你在复盘时看到“本月手动干预 23 次,其中 16 次发生在亏损超过 2% 之后”,你会对自己的行为模式有更清醒的认知。

4.5 策略五:定期复盘,而非实时复盘

设定固定的复盘时间(如每天收盘后 30 分钟,每周日下午 2 小时),在固定时间、固定框架下复盘,而不是在交易过程中实时复盘。

复盘框架建议:

1. 策略层面
   - 信号触发是否符合规则?
   - 盈亏是否在预期范围内?
   - 有无需要调整的参数?

2. 执行层面
   - 有无手动干预?
   - 干预是否经过冷静期?
   - 干预后结果如何?

3. 心理层面(最重要)
   - 今天最想干预的时刻是什么时候?
   - 那个时刻的情绪是什么?
   - 下次遇到类似情况,有什么替代方案?

4.6 策略六:降低盯盘频率

研究表明,交易者盯盘频率与交易表现呈负相关——盯盘越频繁,越容易做出情绪化决策。

建议:

  • 关闭实时行情推送通知
  • 设定每日检查账户的固定时间(如上午 9:30 和下午 4:00)
  • 使用 4 小时线或日线作为主要参考周期,减少分时噪音的干扰
  • 周末不看行情,用阅读和运动填充时间

4.7 策略七:接受策略的不完美

任何策略都有不适应的市场环境。任何策略都会有连续亏损的时候。

这不是策略的缺陷,而是市场的本质。

一个期望值为正的策略,正确的使用方式是:相信它,执行它,接受它的波动

当你能够平静地看着策略连续亏损 10 次、账户回撤 8%,而依然坚持执行策略规则时,你才真正成为一个量化交易者。


五、技术层面的纪律保障

除了心理层面的训练,技术层面也可以设置硬性保障:

5.1 强制双签机制

对于大额仓位调整或参数修改,要求两名授权人分别确认:

class DualAuthorizationGate:
    """
    双签确认门控
    
    对于以下操作必须双签:
    - 单笔交易超过账户 5%
    - 策略参数修改
    - 手动干预审批
    """
    
    def __init__(self, approvers: list[str]):
        self.approvers = approvers
        self.pending_approvals: list[dict] = []
        
    def request_approval(self, action: str, amount: float) -> str:
        """发起审批请求,返回审批ID"""
        import uuid
        approval_id = str(uuid.uuid4())[:8]
        
        self.pending_approvals.append({
            "id": approval_id,
            "action": action,
            "amount": amount,
            "approvals": []
        })
        
        return approval_id
    
    def approve(self, approval_id: str, approver: str) -> bool:
        """
        单方审批
        
        Returns:
            True: 审批已记录,等待另一方
            False: 审批完成,动作可执行
        """
        for req in self.pending_approvals:
            if req["id"] == approval_id:
                if approver not in req["approvals"]:
                    req["approvals"].append(approver)
                
                if len(req["approvals"]) >= 2:
                    self.pending_approvals.remove(req)
                    print(f"[双签完成] 审批ID:{approval_id} | 操作:{req['action']}")
                    return False
                
                print(f"[单签] {approver} 已审批,等待另一方")
                return True
        
        raise ValueError(f"未找到审批ID:{approval_id}")

5.2 交易时段锁定

在特定时段(如美股开盘前 15 分钟和开盘后 30 分钟),禁止任何手动操作:

from datetime import time

TRADING_LOCK_WINDOWS = [
    (time(9, 15), time(9, 45)),  # 开盘前15分钟 + 开盘后30分钟
    (time(15, 45), time(16, 0)), # 收盘前15分钟
]

def is_trading_locked() -> bool:
    """检查当前是否处于锁定时段"""
    now = datetime.now().time()
    
    for start, end in TRADING_LOCK_WINDOWS:
        if start <= now <= end:
            return True
    return False

def require_trading_window(func):
    """装饰器:确保操作在允许时段执行"""
    def wrapper(*args, **kwargs):
        if is_trading_locked():
            raise PermissionError(
                "当前处于交易锁定时段,禁止手动操作"
            )
        return func(*args, **kwargs)
    return wrapper

六、写给正在挣扎的你

如果你正在读这篇文章,大概率你已经在某个时刻经历过“手动干预”的诱惑和事后懊悔。

这不是你一个人面临的问题。

每一个量化交易者,都是在和自己的人性做对抗。

区别只在于:有人意识到了这一点,并建立系统来对抗;有人没有意识到,在一次次的“保护性干预”中耗尽了账户。

记住三条铁律:

  1. 回测是策略的历史,实盘是对你的考验。如果你无法承受回测中的正常波动,策略再优秀也没有意义。

  2. 干预是一种瘾。每一次成功的手动干预都会强化你的自我效能感,让你下一次更倾向于干预。但长期来看,这种正反馈回路会让你丧失对策略的信任。

  3. 系统比意志力更可靠。不要相信自己在压力下的判断,要相信你在平静时设计的规则。


下一步行动

如果你想建立交易纪律系统

  1. 部署本文的冷静期执行器(CooldownEnforcer)和仓位管理器(EnforcedPositionManager
  2. 设定每日复盘时间,坚持记录干预日志
  3. 在实盘前完成“不干预清单”的书面承诺

如果你想系统学习量化交易系统设计

  • 关注 TickDB 公众号,获取更多关于量化工程的技术深度解析
  • 在 ClawHub 搜索安装 tickdb-market-data SKILL,获取 TickDB 的完整数据接入能力

如果你想找人聊聊你的策略困境

  • 访问 tickdb.ai,在社区论坛与其他量化交易者交流

“算法没有情绪,但写算法的人有。接受这一点,然后建立系统。”


风险提示:本文不构成任何投资建议。市场有风险,投资需谨慎。