当回测说"年化 80%"时,你在为什么付出代价?

"策略在回测里赚钱,在实盘里亏钱"——这不是策略的错,是你看不见回测框架的假设边界。

每一个量化交易者迟早会遇到这个时刻:对着历史数据跑出的漂亮曲线信心满满,结果一上实盘就被打得满地找牙。有人怪市场变了,有人怪滑点太狠,但真正的问题往往藏在回测框架的底层假设里——你用的是什么引擎,它牺牲了什么来换取速度?

回测引擎主要分为两大流派:向量化回测事件驱动回测。前者代码简洁、速度快,是大多数回测框架的选择;后者实现复杂,但能模拟更真实的市场微观结构。今天,我们从零开始,设计一个事件驱动回测引擎。


一、向量化回测的三个致命假设

在动手之前,先理解为什么我们需要事件驱动。

向量化回测的典型代码是这样的:

signals = (
    pd.DataFrame({'price': prices})
    .assign(sma20=lambda x: x['price'].rolling(20).mean())
    .assign(sma50=lambda x: x['price'].rolling(50).mean())
    .assign(position=lambda x: np.where(x['sma20'] > x['sma50'], 1, -1))
)
returns = signals['position'].shift(1) * signals['price'].pct_change()

这段代码优雅、简洁,一行 returns = ... 就完成了整个回测。但优雅是有代价的:

假设一:信号与成交无缝衔接。 代码中 shift(1) 假设你能以第二天开盘价成交。但实盘中,订单从发出到成交有延迟——可能是 100ms,可能是 5 秒。向量化回测无法模拟这种延迟带来的影响。

假设二:成交价就是报价。 returns = position.shift(1) * price.pct_change() 假设你能以当根 K 线的收盘价成交,且成交数量对价格没有冲击。但实盘中,大单会冲击市场,你买的时候价格已经在涨了。

假设三:数据采样代表真实行情。 如果你用日线数据做回测,框架假设整天只有一个价格序列。但真实的盘中可能有无数次价格穿越你的止损线又收回来,你永远不会知道——因为你只看日线。

这三个假设叠加在一起,就是"回测过度拟合"的根源:你的策略在人工构造的、无摩擦的、无延迟的市场里运行,怎么可能不出错?


二、事件驱动框架的核心思想

事件驱动回测的核心只有一个:时间是真实的,事件是顺序的

在向量化框架里,所有时间点的数据同时存在,你可以随意索引。但在事件驱动框架里,时间是不可逆的——你只能看见当前时刻及之前的事件,不能"偷看"未来。

事件驱动回测的典型循环:

while not end_of_data:
    event = event_queue.get_next_event()  # 获取下一个时间戳的事件
    
    if event.type == EventType.MARKET_DATA:
        strategy.on_market_data(event)     # 策略接收数据,产生信号
        for order in strategy.orders:
            risk_manager.check(order)       # 风控检查
            order_router.submit(order)      # 提交订单
    
    elif event.type == EventType.ORDER_UPDATE:
        broker.process_update(event)       # 处理订单状态变更
    
    elif event.type == EventType.FILL:
        portfolio.update(event)            # 更新持仓和资金
    
    event_queue.pop()                      # 事件出队

这个循环的关键在于:每个时间戳,事件依次流转。你无法在处理 FILL 事件时"顺带"看看下一根 K 线是什么。这意味着你必须在设计框架时就想清楚:每个事件类型需要什么信息,谁负责产生,谁负责消费。


三、事件系统设计:事件类型与层次结构

事件驱动框架的第一个组件是事件类型定义。

from dataclasses import dataclass
from enum import Enum, auto
from datetime import datetime
from typing import Optional, Any


class EventType(Enum):
    """事件类型枚举"""
    MARKET_DATA = auto()      # 市场数据事件
    SIGNAL = auto()           # 策略信号事件
    ORDER_NEW = auto()        # 新订单提交
    ORDER_SUBMITTED = auto()  # 订单已提交至交易所
    ORDER_PARTIAL_FILL = auto()  # 部分成交
    ORDER_FILLED = auto()     # 全部成交
    ORDER_CANCELLED = auto()  # 订单撤销
    ORDER_REJECTED = auto()   # 订单被拒绝
    POSITION_UPDATE = auto()  # 持仓更新
    CASH_UPDATE = auto()      # 资金更新


@dataclass
class Event:
    """事件基类"""
    timestamp: datetime
    type: EventType
    data: Any = None
    
    def __repr__(self):
        return f"<Event {self.timestamp} {self.type.name}>"


@dataclass
class MarketDataEvent(Event):
    """市场数据事件"""
    symbol: str
    open: float
    high: float
    low: float
    close: float
    volume: float
    
    def __post_init__(self):
        self.type = EventType.MARKET_DATA

这是一个扁平的事件结构,每个事件包含时间戳、类型和数据。在更复杂的框架中,你可能会引入事件层次——比如把 ORDER_FILLED 作为 ORDER_UPDATE 的子类,或者引入事件链来追踪订单从提交到成交的完整生命周期。

但对于大多数回测场景,扁平结构足够用了。


四、撮合引擎:订单簿的设计与撮合算法

撮合引擎是事件驱动回测的核心。没有它,你的订单永远只是"想法";有了它,想法才能变成"成交"。

撮合引擎需要模拟交易所的撮合逻辑:价格优先,时间优先。价格相同的情况下,先提交的订单先成交。

from dataclasses import dataclass, field
from sortedcontainers import SortedList
from enum import Enum
from datetime import datetime
from typing import Dict, List, Optional
from collections import namedtuple


class OrderSide(Enum):
    BUY = "BUY"
    SELL = "SELL"


class OrderType(Enum):
    LIMIT = "LIMIT"
    MARKET = "MARKET"


class OrderStatus(Enum):
    PENDING = "PENDING"
    SUBMITTED = "SUBMITTED"
    PARTIAL_FILLED = "PARTIAL_FILLED"
    FILLED = "FILLED"
    CANCELLED = "CANCELLED"
    REJECTED = "REJECTED"


Order = namedtuple('Order', ['id', 'symbol', 'side', 'price', 'quantity', 'timestamp'])


class OrderBook:
    """
    限价订单簿
    
    买盘按价格降序排列(高价优先)
    卖盘按价格升序排列(低价优先)
    同价格按时间戳升序排列(先到先得)
    """
    
    def __init__(self, symbol: str):
        self.symbol = symbol
        # (price, timestamp, order) - SortedList 用元组比较实现多键排序
        self.bids = SortedList(key=lambda x: (-x[0], x[1]))  # 买盘:价格降序
        self.asks = SortedList(key=lambda x: (x[0], x[1]))    # 卖盘:价格升序
    
    def add_order(self, order: Order) -> List['Fill']:
        """添加订单并执行撮合,返回成交列表"""
        fills = []
        
        if order.side == OrderSide.BUY:
            # 检查卖盘是否有可成交订单
            remaining_qty = order.quantity
            idx = 0
            
            while remaining_qty > 0 and idx < len(self.asks):
                ask_price, _, ask_order = self.asks[idx]
                
                # 买单只能在 ≤ 买价的价格成交
                if ask_price > order.price:
                    break
                
                fill_qty = min(remaining_qty, ask_order.quantity)
                fills.append(Fill(
                    timestamp=order.timestamp,
                    symbol=self.symbol,
                    price=ask_price,
                    quantity=fill_qty,
                    buy_order_id=order.id,
                    sell_order_id=ask_order.id
                ))
                
                remaining_qty -= fill_qty
                
                # 更新对手订单数量
                new_ask_order = ask_order._replace(
                    quantity=ask_order.quantity - fill_qty
                )
                
                if new_ask_order.quantity == 0:
                    self.asks.pop(idx)
                else:
                    self.asks[idx] = (ask_price, ask_order.timestamp, new_ask_order)
                    idx += 1
            
            # 剩余数量挂单
            if remaining_qty > 0:
                new_order = order._replace(quantity=remaining_qty)
                self.bids.add((new_order.price, new_order.timestamp, new_order))
        
        else:  # SELL
            # 检查买盘是否有可成交订单
            remaining_qty = order.quantity
            idx = 0
            
            while remaining_qty > 0 and idx < len(self.bids):
                bid_price, _, bid_order = self.bids[idx]
                
                # 卖单只能在 ≥ 卖价的价格成交
                if bid_price < order.price:
                    break
                
                fill_qty = min(remaining_qty, bid_order.quantity)
                fills.append(Fill(
                    timestamp=order.timestamp,
                    symbol=self.symbol,
                    price=bid_price,
                    quantity=fill_qty,
                    buy_order_id=bid_order.id,
                    sell_order_id=order.id
                ))
                
                remaining_qty -= fill_qty
                
                new_bid_order = bid_order._replace(
                    quantity=bid_order.quantity - fill_qty
                )
                
                if new_bid_order.quantity == 0:
                    self.bids.pop(idx)
                else:
                    self.bids[idx] = (bid_price, bid_order.timestamp, new_bid_order)
                    idx += 1
            
            if remaining_qty > 0:
                new_order = order._replace(quantity=remaining_qty)
                self.asks.add((new_order.price, new_order.timestamp, new_order))
        
        return fills
    
    def cancel_order(self, order_id: str) -> bool:
        """撤销订单"""
        for book in [self.bids, self.asks]:
            for i, (price, ts, order) in enumerate(book):
                if order.id == order_id:
                    book.pop(i)
                    return True
        return False
    
    def get_best_bid(self) -> Optional[float]:
        return self.bids[0][0] if self.bids else None
    
    def get_best_ask(self) -> Optional[float]:
        return self.asks[0][0] if self.asks else None


Fill = namedtuple('Fill', [
    'timestamp', 'symbol', 'price', 'quantity',
    'buy_order_id', 'sell_order_id'
])

这段代码的核心逻辑是:遍历对手盘,寻找可成交的订单。可成交的条件是:

  • 买单:对手卖价 ≤ 买价(你能以不高于你报价的价格买到)
  • 卖单:对手买价 ≥ 卖价(你能以不低于你报价的价格卖出)

撮合完成后,剩余未成交的数量重新挂入订单簿。


五、撮合引擎:完整实现与撮合模式选择

撮合引擎封装订单簿,并提供更高级的接口:

import uuid
from typing import Dict
from dataclasses import dataclass


class MatchingEngine:
    """
    撮合引擎
    
    管理多标的订单簿,执行撮合逻辑
    """
    
    def __init__(self, slippage_model=None):
        self.order_books: Dict[str, OrderBook] = {}
        self.slippge_model = slippage_model  # 滑点模型
        
        # 存储所有订单,用于状态追踪
        self.orders: Dict[str, Order] = {}
        self.fills: List[Fill] = []
    
    def submit_order(self, order: Order) -> List[Fill]:
        """
        提交订单
        
        Args:
            order: 订单对象
            
        Returns:
            成交事件列表
        """
        # 初始化订单簿
        if order.symbol not in self.order_books:
            self.order_books[order.symbol] = OrderBook(order.symbol)
        
        self.orders[order.id] = order
        
        # 市价单转换为限价单
        if order.side == OrderSide.BUY:
            best_ask = self.order_books[order.symbol].get_best_ask()
            effective_price = order.price if order.price > 0 else (
                best_ask if best_ask else float('inf')
            )
        else:
            best_bid = self.order_books[order.symbol].get_best_bid()
            effective_price = order.price if order.price > 0 else (
                best_bid if best_bid else 0
            )
        
        effective_order = order._replace(price=effective_price)
        fills = self.order_books[order.symbol].add_order(effective_order)
        
        # 应用滑点
        if self.slippage_model and fills:
            fills = [
                fill._replace(
                    price=self.slippage_model.apply(fill)
                )
                for fill in fills
            ]
        
        self.fills.extend(fills)
        return fills
    
    def cancel_order(self, order_id: str, symbol: str) -> bool:
        """撤销订单"""
        return self.order_books[symbol].cancel_order(order_id)
    
    def get_position(self, symbol: str) -> int:
        """获取持仓数量"""
        net = 0
        for fill in self.fills:
            if fill.symbol == symbol:
                if fill.buy_order_id:  # 成交发生在买单侧
                    net += fill.quantity
                else:
                    net -= fill.quantity
        return net

撮合模式的选择

上面的撮合引擎是逐笔撮合——每一笔新订单都与对手盘的每一笔挂单逐一撮合。这是最精确的撮合方式,但也是最慢的。

对于不同频率的策略,你需要不同的撮合模式:

撮合模式 适用频率 精度 速度
逐笔撮合 分钟级以下 最高 最慢
档位撮合 分钟级 中等 中等
快照撮合 小时级以上 较低 最快

档位撮合的核心思想是:把订单簿按价格档位聚合,一个档位内的所有订单视为一个整体成交。实现上,把 add_order 方法中对 SortedList 的遍历改为对档位数据的直接操作:

# 档位撮合伪代码
best_price_level = self.order_books[symbol].get_best_ask()
available_qty = sum(order.quantity for order in best_price_level.orders)
fill_qty = min(order.quantity, available_qty)
# 整个档位视为瞬间成交

选择哪种撮合模式,取决于你的策略频率和精度要求。没有最好的撮合模式,只有适合你的撮合模式。


六、回测主循环:事件循环与时间推进

现在,我们把撮合引擎、策略、风险管理组装在一起:

import pandas as pd
from datetime import datetime
from typing import List, Optional, Callable


class BacktestEngine:
    """
    事件驱动回测引擎
    """
    
    def __init__(
        self,
        initial_cash: float = 100_000,
        commission_rate: float = 0.001,
        slippage_model=None
    ):
        self.initial_cash = initial_cash
        self.cash = initial_cash
        self.commission_rate = commission_rate
        
        # 组件
        self.matching_engine = MatchingEngine(slippage_model=slippage_model)
        self.strategy: Optional[Callable] = None
        self.position = {}  # symbol -> quantity
        
        # 事件日志
        self.events = []
        self.equity_curve = []
        
        # 统计
        self.total_trades = 0
        self.total_commission = 0
    
    def set_strategy(self, strategy: Callable):
        """设置策略函数
        
        Strategy signature: (engine, market_data) -> List[Order]
        """
        self.strategy = strategy
    
    def run(self, market_data: pd.DataFrame):
        """
        运行回测
        
        Args:
            market_data: DataFrame with columns [timestamp, symbol, open, high, low, close, volume]
        """
        market_data = market_data.sort_values('timestamp').reset_index(drop=True)
        
        for _, row in market_data.iterrows():
            timestamp = row['timestamp']
            symbol = row['symbol'] if 'symbol' in row else market_data['symbol'].iloc[0]
            
            # 构造市场数据事件
            md_event = MarketDataEvent(
                timestamp=timestamp,
                symbol=symbol,
                open=row['open'],
                high=row['high'],
                low=row['low'],
                close=row['close'],
                volume=row['volume']
            )
            
            # 策略生成订单
            if self.strategy:
                try:
                    orders = self.strategy(self, md_event)
                except Exception as e:
                    print(f"策略执行错误: {e}")
                    orders = []
                
                # 风控检查 + 提交订单
                for order in orders:
                    order = self._risk_check(order)
                    if order:
                        self._submit_order(order)
            
            # 更新权益曲线
            self._update_equity(timestamp)
        
        self._generate_summary()
    
    def _risk_check(self, order: Order) -> Optional[Order]:
        """风控检查"""
        # 检查资金
        estimated_cost = order.price * order.quantity * (1 + self.commission_rate)
        if order.side == OrderSide.BUY and estimated_cost > self.cash:
            return None
        
        # 检查做空限制
        if order.side == OrderSide.SELL:
            current_position = self.position.get(order.symbol, 0)
            if current_position < order.quantity:
                return None
        
        return order
    
    def _submit_order(self, order: Order):
        """提交订单"""
        fills = self.matching_engine.submit_order(order)
        
        for fill in fills:
            self._process_fill(fill)
    
    def _process_fill(self, fill: Fill):
        """处理成交"""
        commission = fill.price * fill.quantity * self.commission_rate
        self.total_commission += commission
        
        if fill.buy_order_id:  # 买方成交
            cost = fill.price * fill.quantity + commission
            self.cash -= cost
            self.position[fill.symbol] = self.position.get(fill.symbol, 0) + fill.quantity
        else:  # 卖方成交
            revenue = fill.price * fill.quantity - commission
            self.cash += revenue
            self.position[fill.symbol] = self.position.get(fill.symbol, 0) - fill.quantity
        
        self.total_trades += 1
    
    def _update_equity(self, timestamp: datetime):
        """更新权益曲线"""
        position_value = sum(
            pos * self.matching_engine.order_books.get(
                sym, OrderBook(sym)
            ).get_best_bid() or 0
            for sym, pos in self.position.items()
        )
        self.equity_curve.append({
            'timestamp': timestamp,
            'cash': self.cash,
            'position_value': position_value,
            'total_equity': self.cash + position_value
        })
    
    def _generate_summary(self):
        """生成回测报告"""
        print(f"总交易次数: {self.total_trades}")
        print(f"总佣金: {self.total_commission:.2f}")
        print(f"最终资金: {self.cash:.2f}")
        print(f"初始资金: {self.initial_cash:.2f}")
        print(f"收益率: {(self.cash / self.initial_cash - 1) * 100:.2f}%")

这段回测引擎的逻辑非常直接:按时间顺序遍历数据,每个时间点执行一次策略 + 订单撮合。关键的改进空间在于:

  1. 滑点模型:上面的实现使用固定的佣金率。实盘中,滑点与订单大小、市场流动性相关。更精确的模型需要引入流动性调整订单大小影响

  2. 多标的处理:上面的实现假设数据只有一个标的。如果有多标的,你需要按时间戳聚合同一时刻的所有标的,然后并行处理。

  3. 事件流处理:上面的实现是同步的。如果你想支持实时回测或流式处理,需要引入异步架构。


七、从回测到实盘:架构升级路径

上面的回测引擎是同步的、单线程的,适合日级到小时级的策略。但如果你要做分钟级秒级的高频策略,你需要异步架构。

import asyncio
from typing import AsyncGenerator


class AsyncBacktestEngine(BacktestEngine):
    """
    异步事件驱动回测引擎
    
    支持实时数据流和异步策略
    """
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.event_queue = asyncio.Queue()
        self.running = False
    
    async def run_stream(
        self, 
        data_stream: AsyncGenerator[MarketDataEvent, None]
    ):
        """
        异步运行回测
        
        Args:
            data_stream: 异步数据流(可以是 WebSocket 连接或文件读取器)
        """
        self.running = True
        
        async for event in data_stream:
            if not self.running:
                break
            
            # 策略执行(异步)
            if self.strategy:
                strategy_coro = self.strategy(self, event)
                
                if asyncio.iscoroutine(strategy_coro):
                    orders = await strategy_coro
                else:
                    orders = strategy_coro
                
                # 风控 + 提交订单
                for order in orders:
                    if self._risk_check(order):
                        await self._submit_order_async(order)
            
            # 更新权益
            self._update_equity(event.timestamp)
    
    async def _submit_order_async(self, order: Order):
        """异步提交订单"""
        fills = self.matching_engine.submit_order(order)
        for fill in fills:
            self._process_fill(fill)
    
    def stop(self):
        """停止回测"""
        self.running = False

异步架构的核心变化:

  • 策略函数可以是 async def,支持 await 调用外部 API(如 TickDB 的 WebSocket 推送)
  • 数据源改为异步生成器,可以是文件、网络流或真实的 WebSocket 连接
  • 事件处理与数据接收解耦,避免阻塞

如果你的策略需要同时订阅多个数据源(多个标的或多个市场),异步架构几乎是必选项。


八、回测局限性:你在为什么付出代价

回到开篇的问题:为什么回测赚钱,实盘亏钱?

在理解了事件驱动框架的原理后,我们现在可以系统性地回答这个问题:

回测假设 实盘现实 影响
订单立即成交 有网络延迟和处理时间 低估交易成本
成交价 = 报价 大单冲击市场 低估市场冲击
可交易任意数量 流动性约束 无法执行大仓位
数据无缺失 可能丢包、断线 事件处理不连续
固定采样频率 真实行情连续 高频策略严重失真

事件驱动框架无法消除这些局限性,但它能让你清晰地看到每个假设在哪里。 当你发现回测结果与实盘差距大时,首先检查的是框架的假设,而不是策略本身。

回测局限性说明:上述回测框架基于历史数据模拟,不代表未来收益。框架中存在以下未充分模拟的因素:滑点采用固定比例(0.1%),未考虑订单大小对流动性的冲击;撮合采用逐笔撮合,与实际交易所的订单簿维护存在差异;未模拟涨跌停、停牌等特殊市场状态;未考虑资金规模对市场的影响。建议在实际使用前进行更严格的样本外测试和实盘模拟。


九、架构选型决策树

最后,一句话总结如何选择回测框架:

  • 策略频率 ≥ 小时级,数据量 < 100 万行 → 同步事件驱动框架足够
  • **策略频率 < 小时级**,数据量 > 100 万行 → 向量化框架 + 事件验证
  • 策略频率 < 分钟级,需要多数据源 → 异步事件驱动框架
  • 策略频率秒级或更高 → 你需要专业的 C++/Rust 低延迟框架,这不是 Python 能稳定处理的场景

十、性能扩展:从 Pandas 到 ClickHouse

当数据量超过内存限制时,你需要扩展存储和查询能力。

数据规模 存储方案 查询方式
< 10GB Pandas/Parquet 全量加载
10GB - 100GB Parquet + 分区 按时间范围加载
> 100GB ClickHouse / DuckDB SQL 查询 + 流式加载

如果你需要快速查询 10 年级别的历史 K 线数据进行策略回测,一个常见的方案是使用 TickDB 的 REST API 按需拉取数据:

import os
import requests
import pandas as pd


def fetch_historical_klines(symbol: str, interval: str = "1d", limit: int = 3650):
    """
    从 TickDB 获取历史 K 线数据用于回测
    
    API 文档: https://docs.tickdb.ai
    
    Args:
        symbol: 交易品种,如 "AAPL.US"
        interval: K 线周期,如 "1d", "1h", "1m"
        limit: 获取数量
    
    Returns:
        DataFrame with [timestamp, open, high, low, close, volume]
    """
    api_key = os.environ.get("TICKDB_API_KEY")
    if not api_key:
        raise ValueError("请设置环境变量 TICKDB_API_KEY")
    
    headers = {"X-API-Key": api_key}
    params = {
        "symbol": symbol,
        "interval": interval,
        "limit": limit
    }
    
    response = requests.get(
        "https://api.tickdb.ai/v1/market/kline",
        headers=headers,
        params=params,
        timeout=(3.05, 10)  # 连接超时 3.05s,读取超时 10s
    )
    
    if response.status_code != 200:
        raise RuntimeError(f"请求失败: {response.status_code}")
    
    data = response.json()
    
    if data.get("code") != 0:
        raise RuntimeError(f"API 错误: {data.get('message')}")
    
    df = pd.DataFrame(data["data"]["klines"])
    df["timestamp"] = pd.to_datetime(df["t"], unit="ms")
    df = df.rename(columns={
        "o": "open", "h": "high", "l": "low", 
        "c": "close", "v": "volume"
    })
    
    return df[["timestamp", "open", "high", "low", "close", "volume"]]


# 使用示例
if __name__ == "__main__":
    klines = fetch_historical_klines("AAPL.US", "1d", 3650)
    
    engine = BacktestEngine(
        initial_cash=100_000,
        commission_rate=0.001
    )
    
    # 设置简单的双均线策略
    def ma_strategy(engine, md_event):
        df = engine.price_history.get(md_event.symbol, pd.DataFrame())
        df = pd.concat([df, pd.DataFrame([{
            'timestamp': md_event.timestamp,
            'close': md_event.close
        }])], ignore_index=True)
        engine.price_history[md_event.symbol] = df
        
        if len(df) < 50:
            return []
        
        ma20 = df['close'].iloc[-20:].mean()
        ma50 = df['close'].iloc[-50:].mean()
        ma20_prev = df['close'].iloc[-21:-1].mean()
        ma50_prev = df['close'].iloc[-51:-1].mean()
        
        position = engine.position.get(md_event.symbol, 0)
        
        if ma20 > ma50 and ma20_prev <= ma50_prev:
            return [Order(
                id=str(uuid.uuid4()),
                symbol=md_event.symbol,
                side=OrderSide.BUY,
                price=md_event.close,
                quantity=100,
                timestamp=md_event.timestamp
            )]
        elif ma20 < ma50 and ma20_prev >= ma50_prev:
            return [Order(
                id=str(uuid.uuid4()),
                symbol=md_event.symbol,
                side=OrderSide.SELL,
                price=md_event.close,
                quantity=position,
                timestamp=md_event.timestamp
            )]
        return []
    
    engine.price_history = {}
    engine.set_strategy(ma_strategy)
    engine.run(klines)

这段代码展示了完整的回测流程:从 TickDB 获取历史 K 线 → 构造事件 → 运行策略 → 输出结果。对于更复杂的策略,你可以把 ma_strategy 替换成你自己的策略逻辑。


下一步行动

如果你想亲手实现一个回测引擎

  1. 从本文的撮合引擎代码开始,添加仓位管理和绩效计算模块
  2. 用 TickDB 的历史 K 线数据跑一个双均线策略,观察实盘差距
  3. 加入滑点模型和流动性约束,重新评估策略有效性

如果你已经有回测框架,想升级为异步架构

  1. 参考 AsyncBacktestEngine 的设计,将策略函数改为 async def
  2. 用 TickDB 的 WebSocket 推送替代 REST API,观察延迟差异

如果你需要更长的历史数据做跨周期回测
TickDB 提供 10 年级别的美股历史 K 线数据,REST API 可按需拉取,适合日线级别的中长期策略回测。访问 tickdb.ai 了解更多。


本文不构成任何投资建议。市场有风险,投资需谨慎。回测结果基于历史数据,不代表未来收益。