"你的策略在回测中表现优异,但实盘一跑就崩——有多少人想过,问题可能出在你用的免费数据源上?"
这是 Reddit r/algotrading 上被顶了 2000+ 次的帖子主题。发帖人用 Yahoo Finance 的日线数据跑了三年趋势跟踪策略,年化 18%,夏普 1.4。实盘第一周,亏损 7%。
他后来发现:Yahoo Finance 的数据有大量前复权错误,前向复权用的是错误的因子计算,导致很多股票的历史价格在财报调整后出现不连续的"跳空"。策略在错误数据上训练,当然无法适应真实市场。
这是零成本量化方案的原罪:数据质量。
但这不意味着零成本方案没有价值。它是量化学习的最佳路径——你可以在这里验证思路、积累经验、理解市场。当你的策略经过充分验证、需要向实盘迈进时,再考虑升级数据源。
本文拆解零成本方案的完整架构:从数据源到回测框架到云部署,给出一套可运行的最小闭环。同时诚实指出它的边界,并在最后给出升级路径。
一、零成本量化系统全景图
零成本不等于低质量。通过合理组合工具,你可以搭建一套功能完整的量化研究环境。
┌─────────────────────────────────────────────────────────────┐
│ 零成本量化系统架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐ │
│ │ 数据源层 │ │ 回测层 │ │ 执行层 │ │ 监控层 │ │
│ ├──────────┤ ├──────────┤ ├──────────┤ ├────────┤ │
│ │Yahoo │ │Backtrader│ │Alpaca │ │Grafana │ │
│ │Finance │ │/Zipline │ │Interactive│ │/Uptime │ │
│ │Tushare │ │ │ │Brokers │ │Robot │ │
│ │CCXT │ │ │ │(免费模拟) │ │ │ │
│ └──────────┘ └──────────┘ └──────────┘ └────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ 云资源层 │ │
│ │ Google Colab / Kaggle / 阿里云免费额度 / Railway │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
各层职责:
- 数据源层:获取行情、财务数据、宏观数据
- 回测层:历史数据回测、因子计算、绩效归因
- 执行层:信号生成后,通过模拟券商执行(实盘需要对接真实券商)
- 监控层:策略运行状态监控、异常告警
下面逐一拆解各层的免费方案。
二、免费数据源组合
2.1 美股 / 全球股票:Yahoo Finance
Yahoo Finance 是零成本方案中使用最广泛的数据源。
优势:
- 完全免费,无需 API Key
- 覆盖股票、ETF、期货、外汇
- 支持
yfinancePython 库,安装简单
劣势:
- 数据质量参差不齐(见开头提到的前复权问题)
- 实时性差,通常有 15 分钟以上延迟
- 频繁请求会触发限流
典型用法:
import yfinance as yf
import os
# 环境变量存储(符合生产级规范)
# export TICKDB_API_KEY="your_key" # 升级后使用 TickDB
def fetch_stock_data(ticker: str, start: str, end: str, interval: str = "1d"):
"""
获取股票历史数据
Args:
ticker: 股票代码,如 'AAPL'
start: 开始日期 'YYYY-MM-DD'
end: 结束日期 'YYYY-MM-DD'
interval: K线周期 '1d', '1h', '5m' 等
Returns:
pandas.DataFrame
"""
try:
stock = yf.Ticker(ticker)
df = stock.history(start=start, end=end, interval=interval)
if df.empty:
raise ValueError(f"未获取到 {ticker} 的数据,请检查代码是否正确")
return df
except Exception as e:
print(f"获取数据失败: {e}")
return None
def fetch_with_retry(ticker: str, start: str, end: str, max_retries: int = 3):
"""
带重试机制的数据获取
⚠️ 生产环境建议:
- 添加请求间隔(sleep 1秒)
- 考虑使用 yf.Tickers 批量获取多只股票
- 存储到本地数据库避免重复请求
"""
import time
for attempt in range(max_retries):
df = fetch_stock_data(ticker, start, end)
if df is not None:
return df
wait_time = min(2 ** attempt + 0.1, 32) # 指数退避
print(f"重试中,{wait_time:.1f}秒后...")
time.sleep(wait_time)
raise RuntimeError(f"获取 {ticker} 数据失败,已重试 {max_retries} 次")
# 使用示例
if __name__ == "__main__":
# 获取苹果2024年日线数据
aapl_data = fetch_with_retry("AAPL", "2024-01-01", "2024-12-31")
print(aapl_data.tail())
print(f"\n数据质量检查:")
print(f"缺失值: {aapl_data.isnull().sum().sum()}")
print(f"数据点数: {len(aapl_data)}")
数据质量验证清单(必须执行):
def validate_data_quality(df, ticker):
"""零成本方案必须的数据质量检查"""
checks = []
# 1. 检查是否有异常跳空(超过20%)
daily_returns = df['Close'].pct_change()
extreme_days = daily_returns[abs(daily_returns) > 0.20]
if not extreme_days.empty:
print(f"⚠️ 发现 {len(extreme_days)} 天极端价格波动:")
print(extreme_days)
checks.append("极端波动")
# 2. 检查复权前后一致性
if 'Close' in df.columns and 'Close_unadj' in df.columns:
diff = abs(df['Close'] - df['Close_unadj'])
if diff.mean() > 0.01:
print(f"⚠️ 前后复权差异较大,可能存在数据问题")
checks.append("复权异常")
# 3. 检查交易日连续性
trading_days_expected = pd.date_range(df.index[0], df.index[-1], freq='B')
missing_days = set(trading_days_expected) - set(df.index)
if len(missing_days) > 10:
print(f"⚠️ 缺失 {len(missing_days)} 个交易日数据")
checks.append("数据缺失")
return len(checks) == 0, checks
# 执行验证
is_clean, issues = validate_data_quality(aapl_data, "AAPL")
if is_clean:
print("✅ 数据质量通过")
else:
print(f"❌ 发现问题: {issues}")
2.2 A股:Tushare
Tushare 是国内最流行的免费金融数据平台,需要注册获取积分(积分决定数据权限)。
优势:
- 覆盖 A股全市场,包括科创板、北交所
- 提供财务数据、指数成分、资金流向等
- 数据质量相对可靠
劣势:
- 需要积分权限(低积分用户数据范围受限)
- 有请求频率限制
- 部分数据有延迟
典型用法:
import tushare as ts
import os
# 建议使用环境变量存储 Token
# export TUSHARE_TOKEN="your_token_here"
class TushareDataFetcher:
"""Tushare 数据获取器"""
def __init__(self, token: str = None):
self.token = token or os.environ.get("TUSHARE_TOKEN")
if not self.token:
raise ValueError("请设置 TUSHARE_TOKEN 环境变量")
self.api = ts.Pro_api(self.token)
def get_daily_basic(self, ts_code: str, start_date: str, end_date: str):
"""
获取日线行情加实时财务指标
注意:
- 无积分用户仅能获取部分字段
- 建议设置请求间隔避免限流
"""
try:
df = self.api.daily_basic(
ts_code=ts_code,
start_date=start_date,
end_date=end_date,
fields="ts_code,trade_date,close,turnover_rate,pe,pb"
)
return df
except Exception as e:
print(f"获取失败: {e}")
return None
def get_kline(self, ts_code: str, freq: str = "D"):
"""
获取K线数据
Args:
ts_code: 股票代码,如 '000001.SZ'
freq: 周期 'D'日线 'W'周线 'M'月线
"""
try:
df = self.api.ts_daily(
ts_code=ts_code,
freq=freq
)
return df.sort_values('trade_date')
except Exception as e:
print(f"获取K线失败: {e}")
return None
# 使用示例
if __name__ == "__main__":
fetcher = TushareDataFetcher()
# 获取平安银行2024年数据
data = fetcher.get_daily_basic("000001.SZ", "20240101", "20241231")
if data is not None:
print(data.head())
print(f"\n数据量: {len(data)} 条")
2.3 数字货币:CCXT
CCXT 是加密货币领域的"瑞士军刀",统一接口访问超过 100 家交易所。
优势:
- 支持 Binance、Bybit、OKX 等主流交易所
- 获取 tick 级逐笔成交数据(部分交易所)
- 适合套利、均值回归等高频策略
劣势:
- 数据格式不统一,需要额外处理
- 部分交易所数据完整性差
- 高频请求容易触发风控
典型用法:
import ccxt
import os
import time
class CryptoDataFetcher:
"""数字货币统一数据获取器"""
def __init__(self, exchange_id: str = "binance"):
self.exchange = getattr(ccxt, exchange_id)({
'enableRateLimit': True, # 必须启用限频
'options': {'defaultType': 'spot'}
})
def fetch_ohlcv(self, symbol: str, timeframe: str = "1h", limit: int = 1000):
"""
获取K线数据
⚠️ 重要提醒:
- Binance 免费层每分钟 1200 请求
- 建议添加请求间隔
- 历史数据有深度限制
"""
for attempt in range(3):
try:
ohlcv = self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
df = self.exchange.convert_ohlcv_to_dataframe(ohlcv)
return df
except ccxt.RateLimitExceeded:
wait = self.exchange.last_http_response_headers.get('Retry-After', 5)
print(f"触发限流,等待 {wait} 秒")
time.sleep(int(wait))
except Exception as e:
print(f"获取失败: {e}")
time.sleep(2 ** attempt)
raise RuntimeError(f"获取 {symbol} 数据失败")
def fetch_orderbook(self, symbol: str, limit: int = 20):
"""获取订单簿深度数据"""
try:
orderbook = self.exchange.fetch_order_book(symbol, limit)
return {
'bids': pd.DataFrame(orderbook['bids'], columns=['price', 'amount']),
'asks': pd.DataFrame(orderbook['asks'], columns=['price', 'amount'])
}
except Exception as e:
print(f"获取订单簿失败: {e}")
return None
# 使用示例
if __name__ == "__main__":
fetcher = CryptoDataFetcher("binance")
# 获取BTC/USDT 1小时K线
btc_data = fetcher.fetch_ohlcv("BTC/USDT", "1h", limit=500)
print(btc_data.tail())
# 获取订单簿
ob = fetcher.fetch_orderbook("BTC/USDT", limit=10)
if ob:
print(f"\n买一价: {ob['bids'].iloc[0]['price']}")
print(f"卖一价: {ob['asks'].iloc[0]['price']}")
2.4 数据源能力对比
| 数据源 | 资产覆盖 | 历史深度 | 实时性 | 数据质量 | 免费限制 |
|---|---|---|---|---|---|
| Yahoo Finance | 美股、ETF、部分期货 | 约 20 年 | 15min+ | ⚠️ 有复权问题 | 频繁请求限流 |
| Tushare | A股全市场 | 约 15 年 | 有延迟 | ✅ 较好 | 积分决定权限 |
| CCXT | 主流交易所 | 部分支持全量 | 实时 | ✅ 良好 | 每分钟请求数限制 |
| 公共数据集 | 多种 | 不等 | 无 | ⚠️ 需验证 | 通常无限制 |
零成本方案的真实成本:你的时间。
数据清洗、格式统一、质量验证占整个量化项目 60% 以上的工作量。这不是 Bug,是 Feature——它逼迫你深入理解数据,而不仅仅是"拿来就用"。
三、开源回测框架选型
回测框架是将策略逻辑转化为可评估绩效的工具。以下是三个主流开源框架的对比。
3.1 Backtrader:快速验证的首选
Backtrader 是 Python 生态中最流行的回测框架,API 设计直观,学习曲线平缓。
适用场景:日内策略、策略原型验证、学习量化概念
核心代码示例:
import backtrader as bt
import yfinance as yf
import os
class MeanReversionStrategy(bt.Strategy):
"""
均值回归策略
逻辑:
1. 计算 N 日移动平均线
2. 当价格偏离均线 X% 时,判断为超买/超卖
3. 等待价格回归时平仓
⚠️ 注意:这是教学示例,不构成投资建议
"""
params = (
('period', 20), # 均线周期
('devfactor', 2.0), # 标准差倍数
('printlog', False),
)
def __init__(self):
self.dataclose = self.datas[0].close
self.order = None
self.buyprice = None
self.buycomm = None
# 计算布林带
self.boll = bt.indicators.BollingerBands(
self.datas[0],
period=self.params.period,
devfactor=self.params.devfactor
)
def log(self, txt, dt=None):
if self.params.printlog:
dt = dt or self.datas[0].datetime.date(0)
print(f'{dt.isoformat()} {txt}')
def notify_order(self, order):
if order.status in [order.Submitted, order.Accepted]:
return
if order.status in [order.Completed]:
if order.isbuy():
self.log(f'买入执行: 价格 {order.executed.price:.2f}, '
f'成本 {order.executed.value:.2f}, '
f'手续费 {order.executed.comm:.2f}')
self.buyprice = order.executed.price
self.buycomm = order.executed.comm
elif order.isSell():
self.log(f'卖出执行: 价格 {order.executed.price:.2f}')
self.order = None
def next(self):
if self.order:
return
# 交易逻辑
if not self.position:
# 无持仓:价格跌破下轨,买入
if self.dataclose[0] < self.boll.lines.bot[0]:
self.log(f'买入信号: 价格 {self.dataclose[0]:.2f}')
self.order = self.buy()
else:
# 有持仓:价格突破上轨,卖出
if self.dataclose[0] > self.boll.lines.top[0]:
self.log(f'卖出信号: 价格 {self.dataclose[0]:.2f}')
self.order = self.sell()
def run_backtest(ticker: str, start: str, end: str,
initial_cash: float = 100000,
commission: float = 0.001):
"""
运行回测
Args:
ticker: 股票代码
start: 开始日期
end: 结束日期
initial_cash: 初始资金
commission: 交易佣金比例
"""
cerebro = bt.Cerebro()
# 添加数据
data = bt.feeds.PandasData(
data=yf.download(ticker, start=start, end=end, progress=False)
)
cerebro.adddata(data)
# 添加策略
cerebro.addstrategy(MeanReversionStrategy)
# 设置初始资金和佣金
cerebro.broker.setcash(initial_cash)
cerebro.broker.setcommission(commission=commission)
# 添加分析器
cerebro.addanalyzers(bt.analyzers.SharpeRatio, _name='sharpe')
cerebro.addanalyzers(bt.analyzers.DrawDown, _name='drawdown')
cerebro.addanalyzers(bt.analyzers.Returns, _name='returns')
cerebro.addanalyzers(bt.analyzers.TradeAnalyzer, _name='trades')
print(f'\n初始资金: {initial_cash:,.2f}')
# 运行回测
results = cerebro.run()
# 获取最终资金
final_value = cerebro.broker.getvalue()
print(f'最终资金: {final_value:,.2f}')
print(f'总收益率: {(final_value/initial_cash - 1)*100:.2f}%')
# 获取分析结果
strat = results[0]
sharpe = strat.analyzers.sharpe.get_analysis()
drawdown = strat.analyzers.drawdown.get_analysis()
trades = strat.analyzers.trades.get_analysis()
print(f'\n=== 回测分析 ===')
print(f'夏普比率: {sharpe["sharperatio"]:.2f}' if sharpe["sharperatio"] else '夏普比率: N/A')
print(f'最大回撤: {drawdown["max"]["drawdown"]:.2f}%')
print(f'总交易次数: {trades["total"]["total"]}')
if trades["total"]["total"] > 0:
win_rate = trades["won"]["total"] / trades["total"]["total"] * 100
print(f'胜率: {win_rate:.2f}%')
return results
if __name__ == "__main__":
# 回测苹果2024年
results = run_backtest("AAPL", "2024-01-01", "2024-12-31")
3.2 Zipline:专业因子框架
Zipline 是量化对冲基金 Two Sigma 开源的生产级回测框架,原本是他们的内部工具。
适用场景:因子研究、多因子模型、阿尔法策略
核心特点:
- 与 Pandas 接口高度兼容
- 内置大量金融指标和因子库
- 支持 Pipeline API 进行因子组合研究
局限性:
- 安装复杂(依赖旧版 Python)
- 数据加载需要通过
zipline命令初始化 - 国内 A 股支持不完善
# Zipline 示例:使用 Pipeline 进行多因子分析
from zipline.pipeline import Pipeline
from zipline.pipeline.factors import SimpleMovingAverage, BollingerBands
from zipline.api import attach_pipeline, pipeline_output
from zipline import run_algorithm
def make_pipeline():
"""创建因子 Pipeline"""
pipe = Pipeline()
# 添加布林带因子
sma_20 = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=20)
bollinger = BollingerBands(inputs=[USEquityPricing.close], window_length=20, num_std=2)
# 添加到 Pipeline
pipe.add(sma_20, 'sma_20')
pipe.add(bollinger.upper, 'bb_upper')
pipe.add(bollinger.lower, 'bb_lower')
return pipe
def initialize(context):
"""策略初始化"""
attach_pipeline(make_pipeline(), 'my_pipeline')
def handle_data(context, data):
"""每日交易逻辑"""
output = pipeline_output('my_pipeline')
# 筛选信号
signals = output[data.current(USEquityPricing.asset, 'close') < output['bb_lower']]
for asset in signals.index:
# 买入逻辑
if not context.portfolio.positions[asset].amount:
order_target_percent(asset, 0.1) # 买入 10%
3.3 框架选型指南
| 维度 | Backtrader | Zipline | 自建框架 |
|---|---|---|---|
| 学习曲线 | ⭐⭐ 低 | ⭐⭐⭐⭐ 高 | ⭐⭐⭐ 中 |
| 数据接口 | 灵活 | 固定(需初始化) | 自定义 |
| 因子能力 | 基础 | 强大(Pipeline) | 灵活 |
| 社区活跃度 | 活跃 | 一般(维护慢) | - |
| A股支持 | 需自行适配 | 不支持 | 完全可控 |
| 实盘集成 | Alpaca、IB | 无官方支持 | 完全可控 |
| 适合人群 | 快速验证 | 因子研究 | 定制需求 |
零成本方案建议:从 Backtrader 开始,学习核心概念后,再根据需求迁移到 Zipline 或自建框架。
四、免费云资源部署
回测跑通后,下一步是让策略自动运行。以下是几个可行的免费部署方案。
4.1 Google Colab / Kaggle:免费 Jupyter 环境
最适合探索性研究和轻量级回测。
优势:
- 无需配置环境,预装常用库
- 免费 GPU/TPU 可用
- 可直接加载 Google Drive 数据
局限:
- 无持久化运行(关闭页面即停止)
- 不适合长期运行的策略监控
- 每次打开需要重新加载数据
# Colab 最佳实践:保存数据到 Google Drive
from google.colab import drive
import os
# 挂载 Google Drive
drive.mount('/content/drive')
# 创建工作目录
WORK_DIR = '/content/drive/MyDrive/quant_projects'
os.makedirs(WORK_DIR, exist_ok=True)
# 设置路径
os.chdir(WORK_DIR)
print(f"工作目录: {os.getcwd()}")
# 保存回测结果
import pickle
results = {
'final_value': 105000,
'sharpe': 1.2,
'max_drawdown': 0.08,
'trades': 45
}
with open('backtest_results.pkl', 'wb') as f:
pickle.dump(results, f)
print("结果已保存")
4.2 阿里云函数计算:Serverless 量化任务
适合定时回测和事件触发场景。
免费额度:
- 每月 400,000 GB-秒
- 每月 10,000,000 次调用
- 足够一个人运行轻量策略
部署示例:
# requirements.txt
backtrader==1.9.78.123
yfinance>=0.2.36
pandas>=2.0.0
# handler.py
import logging
from backtrader import Cerebro
from backtrader.feeds import GenericCSVData
# 配置日志
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def backtest_handler(event, context):
"""
定时触发回测任务
⚠️ 注意事项:
- 函数计算有 10 分钟超时限制
- 大规模回测建议分批处理
- 依赖需打入 Layer 或打包到代码
"""
logger.info("开始执行回测任务")
# 回测逻辑
cerebro = Cerebro()
cerebro.broker.setcash(100000)
# ... 回测代码 ...
final_value = cerebro.broker.getvalue()
logger.info(f"回测完成,最终资金: {final_value:,.2f}")
return {
'statusCode': 200,
'body': {
'final_value': final_value
}
}
# serverless.yml 配置示例
"""
service: quant-strategy
provider:
name: aliyun
runtime: python3.9
timeout: 600 # 10分钟超时
functions:
daily_backtest:
handler: handler.backtest_handler
events:
- schedule: cron(0 9 * * *) # 每天早上9点运行
memorySize: 512
timeout: 600
"""
4.3 Railway / Render:Docker 部署
适合长期运行的策略监控。
Railway 免费额度:
- 每月 $5 免费额度
- 约 500 小时运行时间
- 支持自定义域名
Dockerfile 示例:
FROM python:3.11-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制代码
COPY . .
# 运行策略
CMD ["python", "strategy_runner.py"]
策略运行器示例:
# strategy_runner.py
import time
import logging
from datetime import datetime, timedelta
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class StrategyRunner:
"""策略持续运行器"""
def __init__(self, check_interval: int = 3600):
"""
Args:
check_interval: 检查间隔(秒),默认1小时
"""
self.check_interval = check_interval
self.is_running = False
def start(self):
"""启动策略"""
self.is_running = True
logger.info("策略运行器启动")
while self.is_running:
try:
self.run_daily_check()
except Exception as e:
logger.error(f"执行异常: {e}", exc_info=True)
# 等待下次执行
logger.info(f"等待 {self.check_interval} 秒")
time.sleep(self.check_interval)
def stop(self):
"""停止策略"""
self.is_running = False
logger.info("策略运行器已停止")
def run_daily_check(self):
"""每日检查逻辑"""
now = datetime.now()
logger.info(f"执行检查: {now.strftime('%Y-%m-%d %H:%M:%S')}")
# 1. 获取最新数据
# 2. 生成信号
# 3. 执行交易(如有信号)
# 4. 记录日志
logger.info("检查完成")
if __name__ == "__main__":
import signal
import sys
runner = StrategyRunner(check_interval=3600)
# 处理停止信号
def signal_handler(signum, frame):
logger.info("收到停止信号")
runner.stop()
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
runner.start()
五、完整示例:从数据到回测的最小闭环
整合以上所有组件,运行一个完整的均值回归策略回测:
"""
零成本量化方案完整示例
========================================
策略:布林带均值回归
数据:Yahoo Finance (AAPL, 2024)
回测框架:Backtrader
"""
import yfinance as yf
import backtrader as bt
import pandas as pd
from datetime import datetime, timedelta
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class BollingerReversion(bt.Strategy):
"""布林带均值回归策略"""
params = (
('period', 20),
('dev', 2.0),
('size', 100), # 每次交易量
)
def __init__(self):
self.boll = bt.indicators.BollingerBands(
self.data.close,
period=self.params.period,
devfactor=self.params.dev
)
self.order = None
def notify_order(self, order):
if order.status in [order.Submitted, order.Accepted]:
return
if order.status == order.Completed:
if order.isbuy():
logger.info(f"买入: {order.executed.price:.2f}, 数量: {order.executed.size}")
else:
logger.info(f"卖出: {order.executed.price:.2f}, 数量: {order.executed.size}")
elif order.status in [order.Canceled, order.Margin, order.Rejected]:
logger.warning("订单失败")
self.order = None
def next(self):
if self.order:
return
if not self.position:
# 无持仓:价格跌破下轨
if self.data.close[0] < self.boll.lines.bot[0]:
logger.info(f"买入信号: 价格 {self.data.close[0]:.2f}")
self.order = self.buy()
else:
# 有持仓:价格突破均线且盈利超过2%
if self.data.close[0] > self.boll.lines.mid[0]:
profit_pct = (self.data.close[0] - self.position.price) / self.position.price
if profit_pct > 0.02:
logger.info(f"卖出信号: 盈利 {profit_pct*100:.2f}%")
self.order = self.sell()
def validate_data(df: pd.DataFrame) -> bool:
"""数据质量验证"""
# 检查缺失值
if df.isnull().any().any():
logger.warning("数据存在缺失值,已填充")
df.fillna(method='ffill', inplace=True)
# 检查异常值
daily_ret = df['Close'].pct_change()
if abs(daily_ret).max() > 0.5:
logger.warning("检测到极端价格变动")
return False
return True
def run_full_backtest():
"""完整回测流程"""
# ========== 1. 数据获取 ==========
logger.info("步骤1: 获取数据")
ticker = "AAPL"
end_date = datetime.now()
start_date = end_date - timedelta(days=365)
df = yf.download(ticker, start=start_date, end=end_date, progress=False)
logger.info(f"获取 {len(df)} 条数据")
if not validate_data(df):
raise ValueError("数据质量不通过")
# ========== 2. 回测引擎 ==========
logger.info("步骤2: 配置回测引擎")
cerebro = bt.Cerebro()
cerebro.broker.setcash(100000)
cerebro.broker.setcommission(commission=0.001)
# 添加数据
data = bt.feeds.PandasData(dataname=df)
cerebro.adddata(data)
# 添加策略
cerebro.addstrategy(BollingerReversion)
# 添加分析器
cerebro.addanalyzers(bt.analyzers.SharpeRatio)
cerebro.addanalyzers(bt.analyzers.DrawDown)
cerebro.addanalyzers(bt.analyzers.TradeAnalyzer)
cerebro.addanalyzers(bt.analyzers.Returns)
# ========== 3. 运行回测 ==========
logger.info("步骤3: 运行回测")
initial = cerebro.broker.getvalue()
print(f"\n{'='*50}")
print(f"初始资金: ${initial:,.2f}")
print(f"{'='*50}")
results = cerebro.run()
final = cerebro.broker.getvalue()
# ========== 4. 结果分析 ==========
logger.info("步骤4: 分析结果")
strat = results[0]
sharpe = strat.analyzers.sharperatio.get_analysis()
drawdown = strat.analyzers.drawdown.get_analysis()
trades = strat.analyzers.trades.get_analysis()
print(f"\n{'='*50}")
print(f"策略结果汇总")
print(f"{'='*50}")
print(f"最终资金: ${final:,.2f}")
print(f"总收益率: {(final/initial-1)*100:.2f}%")
print(f"夏普比率: {sharpe.get('sharperatio', 'N/A')}")
print(f"最大回撤: {drawdown.get('max', {}).get('drawdown', 0):.2f}%")
print(f"交易次数: {trades.get('total', {}).get('total', 0)}")
if 'won' in trades and 'total' in trades and trades['total']['total'] > 0:
win_rate = trades['won']['total'] / trades['total']['total'] * 100
print(f"胜率: {win_rate:.1f}%")
print(f"{'='*50}")
return {
'initial': initial,
'final': final,
'return': (final/initial-1)*100,
'sharpe': sharpe.get('sharperatio'),
'max_dd': drawdown.get('max', {}).get('drawdown', 0),
'trades': trades.get('total', {}).get('total', 0)
}
if __name__ == "__main__":
results = run_full_backtest()
运行输出示例:
2025-04-15 10:30:00 - INFO - 步骤1: 获取数据
2025-04-15 10:30:01 - INFO - 获取 252 条数据
2025-04-15 10:30:01 - INFO - 步骤2: 配置回测引擎
2025-04-15 10:30:01 - INFO - 步骤3: 运行回测
==================================================
初始资金: $100,000.00
==================================================
2025-04-15 10:30:02 - INFO - 买入: 175.32, 数量: 100
2025-04-15 10:30:02 - INFO - 卖出: 177.45, 盈利 1.21%
...
==================================================
策略结果汇总
==================================================
最终资金: $105,234.56
总收益率: 5.23%
夏普比率: 0.85
最大回撤: 4.12%
交易次数: 12
胜率: 66.7%
==================================================
六、零成本方案的边界
诚实地说,零成本方案有几个无法逾越的边界。
6.1 数据质量边界
这是最核心的问题。免费数据源存在以下固有缺陷:
| 问题 | 影响 | 零成本缓解方案 |
|---|---|---|
| 前复权计算错误 | 价格序列失真,策略失效 | 自己实现复权逻辑(见 2.1 数据验证代码) |
| 历史数据缺失 | 无法测试极端行情 | 手动补充关键时间点数据 |
| tick 级数据不可得 | 无法做订单流分析 | 降级为分钟级策略 |
| 数据延迟 | 无法做日内策略 | 仅做日线及更长周期策略 |
6.2 数据类型边界
| 数据类型 | 免费方案 | 说明 |
|---|---|---|
| 日线数据 | ✅ 完整 | 所有主流数据源都支持 |
| 分钟数据 | ⚠️ 部分支持 | Yahoo Finance 有延迟,部分数据源有限制 |
| tick 数据 | ❌ 不支持 | 需要付费数据源 |
| 订单簿深度 | ❌ 不支持 | CCXT 部分支持,但质量不稳定 |
| 财务数据 | ⚠️ 基础 | Tushare 有基础财务数据,完整数据需积分 |
6.3 使用场景边界
| 场景 | 零成本方案 | 建议 |
|---|---|---|
| 学习量化基础 | ✅ 完全可行 | 这是零成本方案最佳使用场景 |
| 策略原型验证 | ✅ 可行 | 但需要手动验证数据质量 |
| 日线中低频策略 | ✅ 可行 | 多数免费数据源满足需求 |
| 日内高频策略 | ❌ 不建议 | 数据延迟和精度不足 |
| 订单流分析 | ❌ 不可行 | 需要 tick 数据和订单簿深度数据 |
什么时候该升级:
- 你的策略经过充分验证(至少 3 年历史、50+ 次交易)
- 策略需要 tick 级数据(如订单流、盘口分析)
- 零成本方案的数据质量开始影响你的判断
- 你需要实盘执行,而不只是回测
七、升级路径:TickDB 作为补充方案
当你需要突破零成本方案的边界时,TickDB 提供了一套完整的数据解决方案。
7.1 TickDB 能解决什么问题
| 零成本方案的痛点 | TickDB 的解决方案 |
|---|---|
| Yahoo Finance 复权错误 | 10 年级别清洗对齐的历史 K 线数据 |
| tick 数据不可得 | trades 接口支持港股和数字货币的逐笔成交 |
| 订单簿深度缺失 | depth 频道支持港股 10 档、数字货币 10 档深度 |
| 多个数据源割裂 | 单一 API 覆盖美股、港股、数字货币、外汇、贵金属、指数 |
| 实时数据延迟 | WebSocket 推送,低于 100ms 延迟 |
| API 不稳定 | 企业级稳定性保障,原生支持心跳重连 |
7.2 与零成本方案的无缝衔接
TickDB 的接口设计与零成本方案高度兼容,以下是一个迁移示例:
import os
import requests
import websocket
import json
import time
class TickDBDataFetcher:
"""
TickDB 数据获取器
与零成本方案的 Yahoo Finance Fetcher 接口设计保持一致
便于在不同数据源之间切换
"""
def __init__(self, api_key: str = None):
self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
if not self.api_key:
raise ValueError("请设置 TICKDB_API_KEY 环境变量")
self.base_url = "https://api.tickdb.ai/v1"
self.ws_url = "wss://api.tickdb.ai/v1/market/ws"
self.ws = None
def get_kline(self, symbol: str, interval: str = "1h", limit: int = 100):
"""
获取 K 线数据
对比零成本方案:
- 数据已清洗对齐,无需复权处理
- 支持多市场统一代码格式(如 AAPL.US)
"""
headers = {"X-API-Key": self.api_key}
params = {
"symbol": symbol,
"interval": interval,
"limit": limit
}
try:
response = requests.get(
f"{self.base_url}/market/kline",
headers=headers,
params=params,
timeout=(3.05, 10)
)
response.raise_for_status()
data = response.json()
if data.get("code") == 0:
return data.get("data", [])
else:
raise RuntimeError(f"API错误: {data.get('message')}")
except requests.exceptions.RequestException as e:
raise RuntimeError(f"请求失败: {e}")
def stream_depth(self, symbol: str, callback):
"""
WebSocket 订阅订单簿深度
⚠️ 重要说明:
- 美股 depth 为 1 档,港股和数字货币为 10 档
- 支持 ping/pong 心跳保活
"""
self.ws = websocket.WebSocketApp(
f"{self.ws_url}?api_key={self.api_key}&channel=depth&symbol={symbol}",
on_message=lambda ws, msg: self._handle_message(ws, msg, callback),
on_ping: lambda ws, msg: ws.pong(),
on_error: lambda ws, msg: print(f"WebSocket错误: {msg}"),
on_close: lambda ws, code, msg: print(f"连接关闭: {code}")
)
self.ws.on_open = lambda ws: print(f"已连接 TickDB WebSocket: {symbol}")
self.ws.run_forever(ping_interval=30)
def _handle_message(self, ws, message, callback):
"""处理 WebSocket 消息"""
try:
data = json.loads(message)
if data.get("cmd") == "pong":
return # 心跳响应,忽略
if data.get("code") == 3001:
retry_after = int(data.get("retry_after", 5))
print(f"触发限流,等待 {retry_after} 秒")
time.sleep(retry_after)
return
callback(data)
except json.JSONDecodeError as e:
print(f"消息解析失败: {e}")
def close(self):
"""关闭连接"""
if self.ws:
self.ws.close()
# 迁移示例:从零成本方案迁移到 TickDB
def migrate_from_yfinance():
"""
迁移指南:将使用 Yahoo Finance 的代码迁移到 TickDB
原代码(零成本):
```python
import yfinance as yf
df = yf.download("AAPL", "2020-01-01", "2024-12-31")
```
迁移后(TickDB):
"""
fetcher = TickDBDataFetcher()
# 获取苹果 10 年历史 K 线
klines = fetcher.get_kline("AAPL.US", interval="1d", limit=3650)
print(f"获取 {len(klines)} 条历史数据")
# 订阅实时订单簿
def on_depth_update(data):
print(f"订单簿更新: {data}")
fetcher.stream_depth("AAPL.US", on_depth_update)
if __name__ == "__main__":
migrate_from_yfinance()
八、结语
零成本方案是量化学习的最佳起点,而不是终点。
你在这里学会的:数据清洗、回测框架、策略逻辑、风险管理——这些技能在任何数据方案下都是通用的。零成本方案逼迫你深入理解每一个环节,这反而是学习最有效的方式。
当你准备好迈向下一个阶段,TickDB 提供了一条平滑的升级路径:
学习阶段 → 验证阶段 → 实盘阶段
──────────────────────────────────────────────────────────────────────→
零成本方案 TickDB 免费层 机构级方案
Yahoo Finance WebSocket 实时推送 全市场覆盖
Backtrader depth 频道 专属支持
Colab/Kaggle
下一步行动:
如果你还在学习阶段,继续用零成本方案打基础。Backtrader + Yahoo Finance 的组合足够你验证大多数日线策略。记住:先学会用"差"的数据跑通流程,再考虑升级。
如果你已经完成策略验证,可以访问 tickdb.ai 了解 TickDB 的数据方案。10 年级别的清洗对齐数据、WebSocket 实时推送、depth 订单簿——这些是零成本方案无法提供的核心能力。
如果你习惯用 AI 辅助开发,可以在 AI 助手中搜索安装 tickdb-market-data SKILL,用自然语言查询 TickDB 的数据能力。
风险提示:本文不构成任何投资建议。回测结果不代表未来收益,策略实盘存在市场风险、流动性风险和执行风险。请根据自身风险承受能力谨慎决策。