程序员入局量化:你写的第一行代码,应该用来交易

很多程序员学量化,第一件事就踩了坑——跑去研究 K 线形态、缠论、波浪理论。

不是说这些没用,而是对于一个刚入门的程序员来说,这些东西的学习路径太陡了。你花三个月研究技术分析,最后发现:代码写不出来。

量化入门的正确姿势,应该反过来——先让代码跑起来,再理解市场。

本文的目标很简单:用你熟悉的编程方式,拿真实的美股历史数据,写一个能跑出结果的均线策略。整个过程不会超过 200 行代码,但覆盖了量化系统的核心要素:数据获取、策略逻辑、回测验证、结果可视化。

当你跑通第一个策略,哪怕只是一个简单的双均线,你会发现:原来量化交易没有那么神秘,它本质上就是一个数据处理流程。


一、量化系统的基本架构:一个数据处理流程

在动手之前,先建立一个正确的认知框架。

量化交易系统并不复杂,它本质上就是一个数据驱动的自动化交易流程

数据获取 → 策略计算 → 信号生成 → 回测验证 → 实盘执行
模块 核心问题 你的工作
数据获取 从哪里拿数据?格式是什么? 调用 API,写数据清洗脚本
策略计算 什么时候买、什么时候卖? 写策略逻辑(条件判断)
信号生成 当前应该持仓还是空仓? 输出 0/1 或仓位比例
回测验证 这个策略在过去赚不赚钱? 计算收益率、夏普比率、最大回撤
实盘执行 自动下单 接入券商 API(本文不涉及)

对于入门阶段,你需要掌握前三步,数据获取、策略逻辑、回测验证。 实盘执行涉及券商对接和风控合规,更复杂,留到你有了稳定策略之后。

接下来,我们从数据获取开始。


二、数据获取:你需要的不是实时行情,而是历史 K 线

2.1 选数据的坑

新手最容易犯的错,是在数据上花太多时间。

有些人先去研究彭博终端怎么接入,有些人去爬财经网站,有些人在 Reddit 上找别人分享的 CSV 文件。这些方法的共同问题是:数据质量不可控,格式不统一,你还没开始写策略,就被数据搞死了。

对于入门级回测,你只需要一种数据:清洗对齐的历史 K 线

什么是"清洗对齐"?

  • 清洗:去除停牌日、异常值(如拆股导致的价格跳变)
  • 对齐:确保时间戳按统一时区和频率处理

TickDB 提供了 10 年级别的美股历史 K 线数据,覆盖 NYSE、NASDAQ 主流标的,已完成清洗和对齐,可以直接用于回测。

2.2 用 Python 获取历史 K 线

下面是一个完整的数据获取脚本,使用 TickDB REST API 获取指定股票的历史日线数据:

import os
import requests
import pandas as pd
from datetime import datetime, timedelta

# ===========================================
# 生产级配置
# ===========================================
# API Key 从环境变量读取,避免硬编码在代码中
TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY")
if not TICKDB_API_KEY:
    raise ValueError("请设置环境变量 TICKDB_API_KEY")

BASE_URL = "https://api.tickdb.ai/v1/market"

# HTTP 超时设置:连接超时 3.05 秒,读取超时 10 秒
# 3.05 是为了适配云厂商负载均衡器的 TCP 半连接超时机制
TIMEOUT = (3.05, 10)


def get_historical_klines(symbol: str, interval: str = "1d", 
                          start_time: int = None, limit: int = 300) -> pd.DataFrame:
    """
    获取历史 K 线数据
    
    Args:
        symbol: 交易品种,如 'AAPL.US'
        interval: K 线周期,如 '1d' (日线)、'1h' (小时线)
        start_time: 开始时间戳(毫秒),默认取最近 limit 条
        limit: 每页数量,最大 1000
    
    Returns:
        包含 OHLCV 数据的 DataFrame
    """
    headers = {
        "X-API-Key": TICKDB_API_KEY,
        "Content-Type": "application/json"
    }
    
    params = {
        "symbol": symbol,
        "interval": interval,
        "limit": min(limit, 1000)  # 单次最多 1000 条
    }
    
    if start_time:
        params["start_time"] = start_time
    
    all_data = []
    current_start = start_time
    
    # 循环获取直到拿到足够数据
    while len(all_data) < limit:
        if current_start:
            params["start_time"] = current_start
        
        response = requests.get(
            f"{BASE_URL}/kline",
            headers=headers,
            params=params,
            timeout=TIMEOUT
        )
        
        if response.status_code != 200:
            raise RuntimeError(f"请求失败: {response.status_code} - {response.text}")
        
        result = response.json()
        
        # 错误码处理(见手册核心知识库)
        if result.get("code") != 0:
            code = result.get("code")
            if code in (1001, 1002):
                raise ValueError("API Key 无效,请检查环境变量 TICKDB_API_KEY")
            if code == 2002:
                raise KeyError(f"交易品种 {symbol} 不存在,请检查 symbol 格式")
            raise RuntimeError(f"API 错误 {code}: {result.get('message')}")
        
        data = result.get("data", [])
        if not data:
            break
        
        all_data.extend(data)
        current_start = data[-1]["start_time"]  # 用于下一页
        
        # 如果返回数据少于请求量,说明已经到头了
        if len(data) < params["limit"]:
            break
    
    # 转换为 DataFrame
    df = pd.DataFrame(all_data)
    
    # 时间戳转换
    df["datetime"] = pd.to_datetime(df["start_time"], unit="ms", utc=True)
    df.set_index("datetime", inplace=True)
    
    # 列重命名,保留 OHLCV
    df.rename(columns={
        "open": "Open",
        "high": "High",
        "low": "Low",
        "close": "Close",
        "volume": "Volume"
    }, inplace=True)
    
    return df[["Open", "High", "Low", "Close", "Volume"]]


if __name__ == "__main__":
    # 示例:获取苹果公司近一年日线数据
    end_time = int(datetime.now().timestamp() * 1000)
    start_time = int((datetime.now() - timedelta(days=365)).timestamp() * 1000)
    
    print("正在获取 AAPL.US 近一年日线数据...")
    df = get_historical_klines(
        symbol="AAPL.US",
        interval="1d",
        start_time=start_time
    )
    
    print(f"\n数据概览(共 {len(df)} 条):")
    print(df.tail(10))
    print(f"\n数据范围:{df.index.min()} ~ {df.index.max()}")

这段代码的关键设计

  1. 环境变量存储 API Key:敏感信息不硬编码在代码中
  2. HTTP 超时设置timeout=(3.05, 10) 适配云服务架构,防止请求挂死
  3. 错误码处理:区分了 Key 无效、品种不存在等不同错误类型
  4. 分页循环:超过 1000 条数据自动翻页
  5. 时区处理:统一转换为 UTC 时间戳,避免夏令时陷阱

运行这段代码,你会得到一个包含日期、开盘价、最高价、最低价、收盘价、成交量的 DataFrame,这就是你策略的原料。


三、策略开发:双均线交叉策略

3.1 为什么从均线策略开始

均线策略是量化入门最经典的选择,原因有三:

  1. 逻辑简单:短期均线上穿长期均线买,下穿卖,不需要理解复杂的技术指标
  2. 可解释性强:你清楚知道策略在做什么,便于调试和优化
  3. 基准参照:均线策略是很多复杂策略的性能基准,如果你的策略跑不赢均线,说明有问题

3.2 双均线策略的逻辑

短期均线(MA_short):反映近期价格趋势,敏感度高
长期均线(MA_long):反映长期趋势,噪声较少

买入信号:MA_short 从下方穿越 MA_long(金叉)
卖出信号:MA_short 从上方穿越 MA_long(死叉)

注意,这里的"买入信号"不是让你立刻买入,而是策略告诉你应该持仓。回测系统会根据信号生成实际的交易指令。

3.3 策略代码实现

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt


class DualMovingAverageStrategy:
    """双均线交叉策略"""
    
    def __init__(self, short_window: int = 20, long_window: int = 60):
        """
        Args:
            short_window: 短期均线周期
            long_window: 长期均线周期
        """
        if short_window >= long_window:
            raise ValueError("短期窗口必须小于长期窗口")
        
        self.short_window = short_window
        self.long_window = long_window
    
    def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
        """
        生成交易信号
        
        Args:
            data: 包含 Close 列的 DataFrame
        
        Returns:
            添加了 MA 和 signal 列的 DataFrame
        """
        df = data.copy()
        
        # 计算均线
        df["MA_short"] = df["Close"].rolling(window=self.short_window).mean()
        df["MA_long"] = df["Close"].rolling(window=self.long_window).mean()
        
        # 生成信号:1 持仓,0 空仓
        # 当短期均线 >= 长期均线时,持仓
        df["signal"] = 0
        df.loc[df["MA_short"] >= df["MA_long"], "signal"] = 1
        
        # 前向填充第一个有效信号之前的空仓状态
        df["signal"] = df["signal"].fillna(0)
        
        return df
    
    def backtest(self, data: pd.DataFrame, initial_capital: float = 100000,
                 commission_rate: float = 0.001) -> dict:
        """
        简单回测(不包含仓位管理,仅做多)
        
        Args:
            data: 包含 Close 和 signal 列的 DataFrame
            initial_capital: 初始资金
            commission_rate: 交易佣金率(双边)
        
        Returns:
            包含回测结果的字典
        """
        df = self.generate_signals(data)
        
        # 过滤掉均线未生效的初期数据
        df = df.dropna(subset=["MA_short", "MA_long"])
        
        # 计算每日收益率
        df["daily_return"] = df["Close"].pct_change()
        
        # 策略收益:signal 滞后一期(避免偷价)
        # signal=1 时,第二天享受收益;signal=0 时,不持仓
        df["strategy_return"] = df["signal"].shift(1) * df["daily_return"]
        
        # 扣除交易成本
        # 只有在 signal 变化时(即实际交易)才扣除成本
        df["trade"] = df["signal"].diff().abs()
        df["strategy_return"] -= df["trade"] * commission_rate
        
        # 计算累计收益
        df["cumulative_return"] = (1 + df["strategy_return"]).cumprod()
        df["portfolio_value"] = initial_capital * df["cumulative_return"]
        
        # 基准收益(买入持有)
        df["benchmark_return"] = (1 + df["daily_return"]).cumprod()
        
        # 计算指标
        metrics = self._calculate_metrics(df, initial_capital)
        metrics["data"] = df
        
        return metrics
    
    def _calculate_metrics(self, df: pd.DataFrame, initial_capital: float) -> dict:
        """计算回测指标"""
        total_return = df["cumulative_return"].iloc[-1] - 1
        
        # 年化收益率
        trading_days = 252
        years = len(df) / trading_days
        annual_return = (1 + total_return) ** (1 / years) - 1
        
        # 夏普比率
        risk_free_rate = 0.04  # 假设无风险利率 4%
        excess_returns = df["strategy_return"] - risk_free_rate / trading_days
        sharpe_ratio = np.sqrt(trading_days) * excess_returns.mean() / excess_returns.std()
        
        # 最大回撤
        cumulative = df["cumulative_return"]
        running_max = cumulative.cummax()
        drawdown = (cumulative - running_max) / running_max
        max_drawdown = drawdown.min()
        
        # 胜率
        winning_days = (df["strategy_return"] > 0).sum()
        total_trading_days = (df["strategy_return"] != 0).sum()
        win_rate = winning_days / total_trading_days if total_trading_days > 0 else 0
        
        # 交易次数
        trade_count = int(df["trade"].sum() / 2)  # 买卖各算一次
        
        return {
            "total_return": total_return,
            "annual_return": annual_return,
            "sharpe_ratio": sharpe_ratio,
            "max_drawdown": max_drawdown,
            "win_rate": win_rate,
            "trade_count": trade_count,
            "final_value": df["portfolio_value"].iloc[-1]
        }


# ===========================================
# 运行回测
# ===========================================
if __name__ == "__main__":
    # 使用上一节获取的数据
    # 假设 df 是 get_historical_klines 返回的 DataFrame
    
    # 演示用:生成模拟数据(实际使用时替换为真实数据)
    dates = pd.date_range(start="2023-01-01", end="2024-12-31", freq="B")
    np.random.seed(42)
    price = 150 + np.cumsum(np.random.randn(len(dates)) * 2)
    df = pd.DataFrame({
        "Close": price,
        "Volume": np.random.randint(1000000, 5000000, len(dates))
    }, index=dates)
    
    # 初始化策略
    strategy = DualMovingAverageStrategy(short_window=20, long_window=60)
    
    # 运行回测
    results = strategy.backtest(df, initial_capital=100000, commission_rate=0.001)
    
    # 打印结果
    print("=" * 50)
    print("双均线策略回测结果")
    print("=" * 50)
    print(f"回测周期:{df.index.min().strftime('%Y-%m-%d')} ~ {df.index.max().strftime('%Y-%m-%d')}")
    print(f"初始资金:${100000:,.2f}")
    print(f"最终价值:${results['final_value']:,.2f}")
    print("-" * 50)
    print(f"总收益率:{results['total_return']*100:.2f}%")
    print(f"年化收益率:{results['annual_return']*100:.2f}%")
    print(f"夏普比率:{results['sharpe_ratio']:.2f}")
    print(f"最大回撤:{results['max_drawdown']*100:.2f}%")
    print(f"胜率:{results['win_rate']*100:.1f}%")
    print(f"交易次数:{results['trade_count']}")
    print("=" * 50)

策略代码的关键设计

  1. 信号滞后一期signal.shift(1) 避免"偷价",确保回测结果可执行
  2. 交易成本模拟:只在 signal 变化时扣除佣金,贴近实际情况
  3. 风险指标全面:总收益、年化、夏普、最大回撤、胜率、交易次数
  4. 封装为类:方便参数调优和策略扩展

四、回测验证:你的策略到底赚不赚钱

4.1 运行回测并解读结果

将上面的数据获取和策略代码串联起来,运行完整的回测流程:

# 主程序
from get_data import get_historical_klines
from strategy import DualMovingAverageStrategy

# 1. 获取数据
df = get_historical_klines(
    symbol="AAPL.US",
    interval="1d",
    start_time=int((pd.Timestamp.now() - pd.DateOffset(years=2)).timestamp() * 1000)
)

# 2. 运行回测
strategy = DualMovingAverageStrategy(short_window=20, long_window=60)
results = strategy.backtest(df, initial_capital=100000)

# 3. 输出结果
print(f"年化收益率:{results['annual_return']*100:.2f}%")
print(f"夏普比率:{results['sharpe_ratio']:.2f}")
print(f"最大回撤:{results['max_drawdown']*100:.2f}%")

假设回测结果是:

指标 数值 解读
年化收益率 18.5% 跑赢了同期标普 500(约 12%)
夏普比率 0.85 大于 0.5 说明有一定风险调整后收益
最大回撤 -15.3% 历史上最大亏损 15.3%,需确认是否可接受
胜率 54% 每两次交易大约赢一次

4.2 如何判断策略是否有效

基准对比是核心原则。任何策略必须和基准比较,常见基准包括:

  • 买入持有:同标的的买入持有策略
  • 市场指数:如标普 500 ETF (SPY)
  • 固定收益:如国债收益率

如果你的均线策略年化 18%,但买入持有同标的是 25%,说明策略反而拖累了收益。这时候需要思考:是参数设置问题,还是策略本身不适合这个标的。

最大回撤是生命线。一个年化 30% 但最大回撤 50% 的策略,实际执行中很难坚持——你会在谷底割肉离场。经验规则:

  • 最大回撤 < 20%:风控优秀
  • 20% < 最大回撤 < 30%:可接受
  • 最大回撤 > 30%:需要重新评估策略逻辑

4.3 常见回测陷阱

陷阱 说明 规避方法
前视偏差 使用了未来才能知道的数据 信号使用 shift(1) 滞后
过拟合 参数在历史数据上拟合太好,新数据失效 用更长周期数据验证,参数不要调太精确
幸存者偏差 只用现在还存在的股票回测 使用包含已退市标的的全量历史数据
流动性陷阱 大资金无法按回测价格成交 加入滑点模拟:price * (1 + slippage)

五、可视化:让策略结果一目了然

数据再漂亮,不如图表直观。下面是一个完整的可视化模块:

import matplotlib.pyplot as plt
import matplotlib.dates as mdates

def plot_backtest_results(df: pd.DataFrame, strategy_name: str = "Dual MA"):
    """
    绘制回测结果图表
    """
    fig, axes = plt.subplots(3, 1, figsize=(14, 12), 
                              gridspec_kw={'height_ratios': [3, 1, 2]})
    
    # ===== 图1:价格与均线 =====
    ax1 = axes[0]
    ax1.plot(df.index, df["Close"], label="Close Price", alpha=0.7, linewidth=1)
    ax1.plot(df.index, df["MA_short"], label=f"MA Short ({strategy.short_window})", 
             linewidth=1.2)
    ax1.plot(df.index, df["MA_long"], label=f"MA Long ({strategy.long_window})", 
             linewidth=1.2)
    
    # 标注买卖点
    buy_signals = df[df["trade"] > 0].query("signal == 1")
    sell_signals = df[df["trade"] > 0].query("signal == 0")
    
    ax1.scatter(buy_signals.index, buy_signals["Close"], 
                marker="^", color="green", s=80, label="Buy", zorder=5)
    ax1.scatter(sell_signals.index, sell_signals["Close"], 
                marker="v", color="red", s=80, label="Sell", zorder=5)
    
    ax1.set_title(f"{strategy_name} Strategy - Price Action", fontsize=14)
    ax1.set_ylabel("Price ($)")
    ax1.legend(loc="upper left")
    ax1.grid(True, alpha=0.3)
    
    # ===== 图2:持仓状态 =====
    ax2 = axes[1]
    ax2.fill_between(df.index, 0, df["signal"], 
                     alpha=0.3, color="blue", label="Position")
    ax2.set_ylabel("Position")
    ax2.set_yticks([0, 1])
    ax2.set_yticklabels(["Empty", "Long"])
    ax2.legend(loc="upper right")
    ax2.grid(True, alpha=0.3)
    
    # ===== 图3:累计收益对比 =====
    ax3 = axes[2]
    ax3.plot(df.index, df["cumulative_return"], 
             label="Strategy", linewidth=1.5, color="blue")
    ax3.plot(df.index, df["benchmark_return"], 
             label="Buy & Hold", linewidth=1.5, color="gray", alpha=0.7)
    
    # 标注最大回撤区间
    cumulative = df["cumulative_return"]
    running_max = cumulative.cummax()
    drawdown = (cumulative - running_max) / running_max
    
    ax3.fill_between(df.index, drawdown, 0, 
                     alpha=0.2, color="red", label="Drawdown")
    
    ax3.set_title("Cumulative Returns vs Benchmark", fontsize=14)
    ax3.set_xlabel("Date")
    ax3.set_ylabel("Cumulative Return")
    ax3.legend(loc="upper left")
    ax3.grid(True, alpha=0.3)
    ax3.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m"))
    ax3.xaxis.set_major_locator(mdates.MonthLocator(interval=3))
    plt.setp(ax3.xaxis.get_majorticklabels(), rotation=45)
    
    plt.tight_layout()
    plt.savefig("backtest_result.png", dpi=150, bbox_inches="tight")
    plt.show()
    
    print("图表已保存至 backtest_result.png")


# 运行可视化
plot_backtest_results(results["data"])

生成的图表会包含三个部分:

  1. 价格与均线:展示股价走势、双均线交叉点、买卖信号位置
  2. 持仓状态:直观看到策略在哪些时间段持仓、哪些时间段空仓
  3. 累计收益对比:策略 vs 买入持有的对比曲线,阴影部分是回撤区间

六、从 0 到 1 之后:你的下一步

恭喜你,已经完成了第一个完整的量化策略开发流程:

数据获取 → 策略设计 → 回测验证 → 结果可视化

但这只是起点。下面是几个自然的演进方向:

6.1 策略优化

当前的双均线策略过于简单,可以尝试的方向:

  • 参数优化:用网格搜索找到最优的短期/长期均线周期组合
  • 仓位管理:不止 0/1 持仓,加入仓位比例(如 50% 持仓)
  • 止损机制:设置固定止损或跟踪止损,控制单次亏损

6.2 标的扩展

一只股票不足以说明策略有效。需要验证:

  • 同一策略在不同股票上的表现
  • 不同市场(港股、数字货币)的适用性
  • 多标的组合的效果

6.3 数据深度

当前使用的是日线数据,可以尝试:

  • 小时线/分钟线:更高频率的数据,信号更灵敏但噪声也更大
  • 订单簿数据:使用 TickDB 的 depth 频道,分析盘口结构
  • 多数据源融合:将价格数据与基本面数据(如财报)结合

下一步行动

如果你希望亲手运行本文代码

  1. 访问 tickdb.ai 注册账号(免费获取 API Key,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY
  4. 复制本文代码,创建 get_data.pystrategy.pyvisualize.py 三个文件
  5. 运行 get_data.py 获取数据,然后执行回测

如果你想直接看完整可运行的代码示例,TickDB 提供了预置的策略模板,访问文档中心获取。

如果你对更高阶的数据感兴趣:TickDB 的 depth 频道可以获取订单簿深度数据,适合分析流动性结构和盘口博弈——这是从基础策略进阶到量化研究者的关键技能。


风险提示:本文不构成任何投资建议。回测结果基于历史数据模拟,不代表未来收益。量化策略存在市场风险,可能导致本金亏损。请充分理解策略逻辑后再进行实盘操作。