一个人、一台云服务器、一套量化系统:个人开发者全栈指南

凌晨三点,你被一条告警推送叫醒。揉了揉眼睛,打开手机看了一眼——策略还在跑,订单正常执行,没有发生你想象中的爆仓惨剧。

这是你独立运行的量化系统,连续稳定运行第 847 天。

没有交易团队,没有基础设施团队,没有 24 小时值班运维。你只有一台月均 40 美元的云服务器,和一套自己从零搭建的量化系统。

这不是天方夜谭。这是每个个人量化开发者都应该能实现的状态。

本文是写给那些预算有限、时间有限、但对系统可靠性和代码质量有要求的个人开发者。我们不追求“花小钱办大事”的营销叙事——那是骗子的话术。我们追求的是:在有限的资源下,做出稳定、可维护、经得起时间考验的量化系统。


一、现实约束:个人开发者的四重困境

在动手之前,先清醒地认识自己的处境。

个人开发者搭建量化系统,通常面临四重约束:

约束维度 具体表现 潜在风险
计算资源 单台 2 核 4G 云服务器,磁盘 50-100G 多策略并发时 CPU 打满,内存 OOM
网络带宽 入门级带宽(通常 1-5Mbps) 高频轮询 API 触发限频,WebSocket 连接不稳定
运维精力 兼职开发,白天有主业工作 凌晨故障无人值守,系统可用性依赖手动维护
预算上限 月均 50-200 美元 无法使用商业级的行情源和基础设施

大多数个人开发者的失败路径是这样的:第一周兴致勃勃,用 Node.js、Python、Go 搭了一套华丽架构;第三周发现服务器内存不够,开始删代码;第六周策略逻辑还没跑通,系统稳定性问题已经耗尽了所有热情;第三个月,系统彻底放弃,重头再来。

根本原因是:用团队开发的思维,做个人开发的资源规划。

本文的方法论核心是:极简主义架构 + 自动化运维 + 可量化的成本控制


二、架构设计原则:能省则省,但不要省错地方

2.1 一台服务器能跑多少东西?

先回答一个常见误区:量化系统需要很贵的服务器吗?

对于个人开发者而言,绝大多数策略的计算量,根本用不到 2 核以上的 CPU。以下是一个参考基准:

场景 CPU 占用 内存占用 带宽需求
日线均值回归策略(持有 1-5 天) <5% <200MB 低频轮询即可
30 分钟级别趋势跟踪 <15% <500MB 中频,WebSocket 推荐
5 分钟级别 CTA(10 个标的以内) <30% <800MB 需要稳定的 WebSocket
1 分钟级别高频信号(本人不推荐个人做) >60% >1.5GB 高带宽,需要专线

结论:2 核 4G 的云服务器,对于 95% 的个人策略来说,硬件层面是够用的。

真正的问题不在硬件,在架构。

2.2 三层分离原则

个人开发者最容易犯的错误是:把所有东西堆在一个进程里

行情接收、信号计算、订单管理、数据库、监控告警,全塞进一个 Python 脚本。然后每次修改策略都要重启,有时候重启着重启着,订单还在挂着,就出问题了。

正确的做法是三层分离

┌─────────────────────────────────────────────────────┐
│                   云服务器 (2C4G)                    │
├─────────────────────────────────────────────────────┤
│  Layer 1: 数据层                                     │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  │
│  │  TickDB     │  │  SQLite     │  │  Redis      │  │
│  │  (行情订阅)  │  │  (本地存储)  │  │  (缓存)     │  │
│  └─────────────┘  └─────────────┘  └─────────────┘  │
├─────────────────────────────────────────────────────┤
│  Layer 2: 策略层                                      │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  │
│  │ Signal      │  │ Position    │  │ Risk        │  │
│  │ Engine      │  │ Manager     │  │ Controller  │  │
│  └─────────────┘  └─────────────┘  └─────────────┘  │
├─────────────────────────────────────────────────────┤
│  Layer 3: 执行层                                      │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  │
│  │ Order       │  │ Alert       │  │ Dashboard   │  │
│  │ Executor    │  │ Notifier    │  │ (Optional)  │  │
│  └─────────────┘  └─────────────┘  └─────────────┘  │
└─────────────────────────────────────────────────────┘

三层分离的核心价值:

  • 独立演进:行情接入改了,不影响策略逻辑
  • 故障隔离:策略崩溃,不会丢订单
  • 资源可控:各层可以独立设置超时和重试策略

2.3 状态外置原则

第二个常见错误:把策略状态放在内存里

进程重启,状态丢失,不知道当前持仓是什么,不知道上次信号是什么时候发的,不知道风控阈值被改成了什么。

所有持久化状态,必须写入数据库。

这不是性能问题,是可靠性问题。SQLite 对于个人开发者来说完全够用,文件只有几 MB,启动速度毫秒级,数据用 sqlitebrowser 随时可查。

# 状态外置示例:持仓表结构
import sqlite3
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal

@dataclass
class Position:
    symbol: str
    quantity: Decimal
    avg_price: Decimal
    updated_at: datetime

class PositionStore:
    def __init__(self, db_path: str = "positions.db"):
        self.db_path = db_path
        self._init_db()
    
    def _init_db(self):
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS positions (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    symbol TEXT NOT NULL UNIQUE,
                    quantity REAL NOT NULL DEFAULT 0,
                    avg_price REAL NOT NULL DEFAULT 0,
                    updated_at TEXT NOT NULL,
                    created_at TEXT NOT NULL DEFAULT (datetime('now'))
                )
            """)
            # 创建 updated_at 的索引,加速查询
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_positions_updated 
                ON positions(updated_at)
            """)
    
    def upsert(self, position: Position):
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                INSERT INTO positions (symbol, quantity, avg_price, updated_at)
                VALUES (?, ?, ?, ?)
                ON CONFLICT(symbol) DO UPDATE SET
                    quantity = excluded.quantity,
                    avg_price = excluded.avg_price,
                    updated_at = excluded.updated_at
            """, (
                position.symbol,
                float(position.quantity),
                float(position.avg_price),
                position.updated_at.isoformat()
            ))
    
    def get(self, symbol: str) -> Position | None:
        with sqlite3.connect(self.db_path) as conn:
            row = conn.execute(
                "SELECT symbol, quantity, avg_price, updated_at FROM positions WHERE symbol = ?",
                (symbol,)
            ).fetchone()
            if row:
                return Position(
                    symbol=row[0],
                    quantity=Decimal(str(row[1])),
                    avg_price=Decimal(str(row[2])),
                    updated_at=datetime.fromisoformat(row[3])
                )
        return None
    
    def get_all(self) -> list[Position]:
        with sqlite3.connect(self.db_path) as conn:
            rows = conn.execute(
                "SELECT symbol, quantity, avg_price, updated_at FROM positions ORDER BY updated_at DESC"
            ).fetchall()
            return [
                Position(
                    symbol=r[0],
                    quantity=Decimal(str(r[1])),
                    avg_price=Decimal(str(r[2])),
                    updated_at=datetime.fromisoformat(r[3])
                )
                for r in rows
            ]

三、成本控制:每一分钱的去向

3.1 云服务器选型

个人量化系统对云服务器的要求其实很低,以下是 2024 年主流选择的成本对比:

供应商 规格 月均成本 备注
阿里云 ECS 轻量应用服务器 2核2G,流量包 1TB ¥40-60 国内首选,延迟低
腾讯云轻量应用服务器 2核2G,流量包 1TB ¥40-60 同上
AWS EC2 t3.micro 2核2G,按量付费 $10-25 流量另算,需设置预算告警
阿里云国际版 2核1G $5-10 最便宜,但需信用卡
静电云 (Vultr) 2核4G $20-25 全球节点,支持 WebSocket

推荐配置:2 核 2G,流量包足够(至少 500GB/月)。对于需要接收实时行情的个人策略,这个配置绑绑够用。

不推荐:4 核以上的配置。CPU 利用率低,但费用翻倍。不如用省下的钱买更好的行情数据源。

3.2 行情数据源成本

行情数据是量化系统最核心的输入,也是成本差异最大的地方。

数据源 月费(个人版) 数据类型 适合场景
TickDB 免费层(限制额度) 美股/港股/加密货币 K 线 + depth 个人入门,首选
Polygon.io 免费层 美股 ticker 轻量级需求
Alpaca 免费层 美股行情 配合美股券商使用
Interactive Brokers $0 美股行情(需开户) 整合交易
付费专业数据 $30-500/月 各级别数据 高频/机构

对于个人开发者,TickDB 免费层 + 港股/加密货币的组合,已经足够完成绝大多数策略的回测和实盘验证。

3.3 隐性成本清单

很多开发者只算服务器成本,忽略了隐性成本:

隐性成本 估算 说明
域名(可选) ¥30-50/年 用于访问自建 Dashboard
监控告警服务 免费 ~ ¥20/月 飞书/钉钉免费,PagerDuty ¥20/月
SSL 证书 免费 Let's Encrypt,自动续期
日志存储 <1GB SQLite 日志,几乎不占空间
备份 免费 定时打包上传 OSS,¥0.01/GB

实际月均成本:服务器 ¥40 + 可选服务 ¥20 = ¥60(约 $8)


四、生产级行情接入:TickDB WebSocket 实操

行情接入是量化系统的入口,也是出问题最多的地方。手动轮询?被限频。心跳没处理?连接假死。断线没重连?数据断了一上午浑然不知。

以下是 TickDB WebSocket 的生产级接入代码,包含所有必要的工程健壮性处理:

"""
TickDB WebSocket 行情接入 - 生产级实现
功能:订阅美股/港股/加密货币实时 K 线和深度数据
特性:心跳保活、指数退避重连、限频自适应、状态外置
"""

import os
import json
import time
import random
import sqlite3
import threading
from dataclasses import dataclass, field
from datetime import datetime, timezone
from decimal import Decimal
from typing import Callable, Optional
from enum import Enum

import websocket


class Market(Enum):
    US = "US"      # 美股
    HK = "HK"      # 港股
    CRYPTO = "CRYPTO"  # 加密货币


@dataclass
class TickData:
    """Tick 数据结构"""
    symbol: str
    market: str
    timestamp: datetime
    open: Decimal
    high: Decimal
    low: Decimal
    close: Decimal
    volume: Decimal


@dataclass
class DepthData:
    """订单簿深度数据结构"""
    symbol: str
    market: str
    timestamp: datetime
    asks: list[tuple[Decimal, Decimal]]  # [(price, quantity), ...]
    bids: list[tuple[Decimal, Decimal]]


class TickDBWebSocketClient:
    """
    TickDB WebSocket 客户端 - 生产级实现
    
    工程特性:
    - 心跳保活:每 30 秒发送 ping,超时自动重连
    - 指数退避重连:初始延迟 1 秒,最大延迟 60 秒,含抖动
    - 限频自适应:识别 3001 错误码,等待 Retry-After
    - 线程安全:独立接收线程,数据通过队列传递
    - 状态外置:断线时间戳、订阅列表持久化
    """
    
    # ⚠️ 常量配置
    PING_INTERVAL = 30  # 心跳间隔(秒)
    PING_TIMEOUT = 10   # 心跳超时(秒)
    BASE_DELAY = 1      # 初始重连延迟(秒)
    MAX_DELAY = 60      # 最大重连延迟(秒)
    MAX_RETRIES = 10   # 最大重试次数(0 = 无限重试)
    
    def __init__(
        self,
        api_key: str,
        market: Market = Market.US,
        on_tick: Optional[Callable[[TickData], None]] = None,
        on_depth: Optional[Callable[[DepthData], None]] = None,
        db_path: str = "tickdb_state.db"
    ):
        self.api_key = api_key
        self.market = market
        self.on_tick = on_tick
        self.on_depth = on_depth
        self.db_path = db_path
        
        # 内部状态
        self._ws: Optional[websocket.WebSocketApp] = None
        self._receive_thread: Optional[threading.Thread] = None
        self._running = False
        self._reconnect_delay = self.BASE_DELAY
        self._retry_count = 0
        self._last_pong_time: Optional[datetime] = None
        self._subscriptions: set[str] = set()
        self._lock = threading.Lock()
        
        # 初始化状态数据库
        self._init_state_db()
    
    def _init_state_db(self):
        """初始化状态持久化数据库"""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS connection_log (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    event TEXT NOT NULL,
                    timestamp TEXT NOT NULL,
                    details TEXT
                )
            """)
            conn.execute("""
                CREATE TABLE IF NOT EXISTS subscriptions (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    symbol TEXT NOT NULL UNIQUE,
                    market TEXT NOT NULL,
                    subscribed_at TEXT NOT NULL
                )
            """)
    
    def _log_event(self, event: str, details: str = ""):
        """记录连接事件日志"""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                "INSERT INTO connection_log (event, timestamp, details) VALUES (?, ?, ?)",
                (event, datetime.now(timezone.utc).isoformat(), details)
            )
    
    def _get_ws_url(self) -> str:
        """构建 WebSocket URL(含鉴权参数)"""
        base_url = "wss://api.tickdb.ai/v1/market/stream"
        return f"{base_url}?api_key={self.api_key}"
    
    def connect(self):
        """启动 WebSocket 连接"""
        self._running = True
        self._retry_count = 0
        self._reconnect_delay = self.BASE_DELAY
        self._connect_inner()
    
    def _connect_inner(self):
        """内部连接逻辑"""
        while self._running:
            try:
                ws_url = self._get_ws_url()
                self._ws = websocket.WebSocketApp(
                    ws_url,
                    on_message=self._on_message,
                    on_error=self._on_error,
                    on_close=self._on_close,
                    on_open=self._on_open
                )
                
                self._log_event("connecting", f"URL: {ws_url.split('?')[0]}")
                
                # 启动接收线程(非阻塞)
                self._receive_thread = threading.Thread(
                    target=self._ws.run_forever,
                    kwargs={
                        "ping_interval": self.PING_INTERVAL,
                        "ping_timeout": self.PING_TIMEOUT
                    },
                    daemon=True
                )
                self._receive_thread.start()
                
                return  # 连接成功,退出循环
                
            except Exception as e:
                self._log_event("connect_error", str(e))
                if not self._running:
                    return
                self._schedule_reconnect()
    
    def _on_open(self, ws):
        """WebSocket 连接打开回调"""
        self._log_event("connected")
        self._retry_count = 0
        self._reconnect_delay = self.BASE_DELAY
        print(f"[TickDB] 连接已建立,时间: {datetime.now():%H:%M:%S}")
        
        # 重订阅之前的标的
        with self._lock:
            for symbol in self._subscriptions:
                self._subscribe_symbol(ws, symbol)
    
    def _on_message(self, ws, message: str):
        """处理接收到的消息"""
        try:
            data = json.loads(message)
            
            # 处理心跳响应
            if data.get("type") == "pong":
                self._last_pong_time = datetime.now(timezone.utc)
                return
            
            # 处理限频错误
            code = data.get("code", 0)
            if code == 3001:
                retry_after = int(data.get("retry_after", 5))
                self._log_event("rate_limited", f"等待 {retry_after} 秒")
                time.sleep(retry_after)
                return
            
            # 处理心跳请求
            if data.get("type") == "ping":
                ws.send(json.dumps({"type": "pong", "ts": data.get("ts")}))
                return
            
            # 处理行情数据
            if "data" in data:
                self._process_data(data)
                
        except json.JSONDecodeError as e:
            self._log_event("parse_error", str(e))
    
    def _process_data(self, data: dict):
        """处理行情数据,分发到对应的回调"""
        data_type = data.get("type", "")
        payload = data.get("data", {})
        
        if data_type in ("kline", "tick"):
            tick = self._parse_tick(payload)
            if self.on_tick:
                self.on_tick(tick)
                
        elif data_type == "depth":
            depth = self._parse_depth(payload)
            if self.on_depth:
                self.on_depth(depth)
    
    def _parse_tick(self, data: dict) -> TickData:
        """解析 K 线/tick 数据"""
        return TickData(
            symbol=data.get("symbol", ""),
            market=data.get("market", self.market.value),
            timestamp=datetime.fromtimestamp(data.get("ts", 0) / 1000, tz=timezone.utc),
            open=Decimal(str(data.get("open", 0))),
            high=Decimal(str(data.get("high", 0))),
            low=Decimal(str(data.get("low", 0))),
            close=Decimal(str(data.get("close", 0))),
            volume=Decimal(str(data.get("volume", 0)))
        )
    
    def _parse_depth(self, data: dict) -> DepthData:
        """解析订单簿深度数据"""
        asks = [
            (Decimal(str(p)), Decimal(str(q))) 
            for p, q in data.get("asks", [])
        ]
        bids = [
            (Decimal(str(p)), Decimal(str(q))) 
            for p, q in data.get("bids", [])
        ]
        return DepthData(
            symbol=data.get("symbol", ""),
            market=data.get("market", self.market.value),
            timestamp=datetime.fromtimestamp(data.get("ts", 0) / 1000, tz=timezone.utc),
            asks=asks,
            bids=bids
        )
    
    def _on_error(self, ws, error):
        """错误回调"""
        self._log_event("websocket_error", str(error))
        print(f"[TickDB] 错误: {error}")
    
    def _on_close(self, ws, close_status_code, close_msg):
        """连接关闭回调"""
        self._log_event(
            "disconnected", 
            f"code={close_status_code}, msg={close_msg}"
        )
        print(f"[TickDB] 连接断开 (code={close_status_code})")
        
        if self._running:
            self._schedule_reconnect()
    
    def _schedule_reconnect(self):
        """调度重连(含指数退避和抖动)"""
        self._retry_count += 1
        
        if self.MAX_RETRIES > 0 and self._retry_count > self.MAX_RETRIES:
            self._log_event("max_retries_reached", f"重试 {self._retry_count} 次后放弃")
            self._running = False
            return
        
        # 指数退避
        delay = min(self.BASE_DELAY * (2 ** (self._retry_count - 1)), self.MAX_DELAY)
        # 加抖动(避免惊群效应)
        jitter = random.uniform(0, delay * 0.1)
        total_delay = delay + jitter
        
        self._log_event("scheduling_reconnect", f"等待 {total_delay:.1f} 秒")
        print(f"[TickDB] {total_delay:.1f} 秒后重连 (重试 #{self._retry_count})")
        
        time.sleep(total_delay)
        
        if self._running:
            self._connect_inner()
    
    def subscribe(self, symbols: list[str], channels: list[str] = None):
        """
        订阅标的
        
        Args:
            symbols: 标的列表,如 ["AAPL.US", "TSLA.US"]
            channels: 频道列表,默认 ["kline"],可选 ["kline", "depth"]
        """
        if channels is None:
            channels = ["kline"]
        
        with self._lock:
            for symbol in symbols:
                self._subscriptions.add(symbol)
            
            if self._ws and self._ws.sock and self._ws.sock.connected:
                for symbol in symbols:
                    self._subscribe_symbol(self._ws, symbol, channels)
    
    def _subscribe_symbol(self, ws, symbol: str, channels: list[str] = None):
        """发送订阅命令"""
        if channels is None:
            channels = ["kline"]
        
        for channel in channels:
            sub_msg = {
                "cmd": "sub",
                "symbol": symbol,
                "channel": channel,
                "market": self.market.value
            }
            ws.send(json.dumps(sub_msg))
            print(f"[TickDB] 订阅: {symbol} @ {channel}")
            
            # 持久化订阅记录
            with sqlite3.connect(self.db_path) as conn:
                conn.execute(
                    """INSERT OR REPLACE INTO subscriptions 
                       (symbol, market, subscribed_at) VALUES (?, ?, ?)""",
                    (symbol, self.market.value, datetime.now(timezone.utc).isoformat())
                )
    
    def unsubscribe(self, symbols: list[str]):
        """取消订阅"""
        with self._lock:
            for symbol in symbols:
                self._subscriptions.discard(symbol)
                
                if self._ws and self._ws.sock and self._ws.sock.connected:
                    unsub_msg = {
                        "cmd": "unsub",
                        "symbol": symbol,
                        "market": self.market.value
                    }
                    self._ws.send(json.dumps(unsub_msg))
    
    def disconnect(self):
        """主动断开连接"""
        self._running = False
        if self._ws:
            self._ws.close()
    
    def get_connection_status(self) -> dict:
        """获取连接状态(用于监控)"""
        return {
            "running": self._running,
            "retry_count": self._retry_count,
            "subscriptions_count": len(self._subscriptions),
            "last_pong": self._last_pong_time.isoformat() if self._last_pong_time else None
        }


# ============================================================
# 使用示例
# ============================================================

def on_tick_callback(tick: TickData):
    """K 线数据回调示例"""
    print(f"[Tick] {tick.symbol} @ {tick.close} (vol: {tick.volume})")


def on_depth_callback(depth: DepthData):
    """订单簿数据回调示例"""
    # 计算买卖压力比
    bid_volume = sum(q for _, q in depth.bids)
    ask_volume = sum(q for _, q in depth.asks)
    pressure_ratio = bid_volume / ask_volume if ask_volume > 0 else 0
    
    print(f"[Depth] {depth.symbol} 买卖压力比: {pressure_ratio:.2f}")


if __name__ == "__main__":
    # 从环境变量读取 API Key(⚠️ 生产环境必须使用环境变量)
    api_key = os.environ.get("TICKDB_API_KEY")
    if not api_key:
        raise ValueError("请设置环境变量 TICKDB_API_KEY")
    
    # 初始化客户端
    client = TickDBWebSocketClient(
        api_key=api_key,
        market=Market.US,
        on_tick=on_tick_callback,
        on_depth=on_depth_callback
    )
    
    # 连接并订阅
    client.connect()
    client.subscribe(["AAPL.US", "TSLA.US"], channels=["kline", "depth"])
    
    # ⚠️ 生产环境:使用 signal.SIGINT 捕获优雅退出
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("\n[系统] 收到中断信号,正在优雅退出...")
        client.disconnect()
        print("[系统] 已断开连接")

代码工程要点说明

特性 实现位置 重要性
心跳保活 _on_open + _on_message 处理 pong 防止连接假死
指数退避重连 _schedule_reconnect 避免高频重连触发限频
抖动 random.uniform 避免多实例同时重连
限频自适应 _on_message 处理 code 3001 遵守 API 契约
状态外置 SQLite 存储连接日志和订阅列表 断线恢复后可重订阅
线程安全 threading.Lock 保护共享状态 并发安全
超时设置 ping_timeout 参数 快速感知断线

五、自动化运维:让系统自己跑,不要人盯着

5.1 监控告警体系

个人开发者最大的焦虑来源是:我不知道系统什么时候崩了

等到第二天早上起来一看,策略已经停了 8 个小时,错过了一整波行情。这种事发生几次,信心就崩了。

解决方案:分层告警

层级 监控内容 告警方式 响应时间要求
连接层 WebSocket 断连、重连次数 飞书/钉钉机器人 5 分钟内
数据层 数据流中断、异常值 飞书/钉钉机器人 5 分钟内
策略层 信号触发异常、持仓不一致 飞书/钉钉机器人 10 分钟内
执行层 订单失败、滑点超阈值 飞书/钉钉机器人 + SMS 立即
系统层 CPU/内存/磁盘告警 基础监控 30 分钟内

飞书机器人告警示例

import requests
import os
from datetime import datetime


class FeishuNotifier:
    """
    飞书机器人告警 - 生产级实现
    
    ⚠️ 使用限制:
    - 免费版机器人每月最多发送 500 条消息
    - 建议按层级设置告警阈值,避免无效告警
    """
    
    def __init__(self, webhook_url: str = None):
        # 从环境变量读取飞书 Webhook URL
        self.webhook_url = webhook_url or os.environ.get("FEISHU_WEBHOOK_URL")
        if not self.webhook_url:
            raise ValueError("请设置环境变量 FEISHU_WEBHOOK_URL")
    
    def send_alert(
        self,
        title: str,
        content: str,
        level: str = "warning"  # info / warning / error / critical
    ):
        """
        发送告警消息
        
        Args:
            title: 告警标题
            content: 告警详情
            level: 告警级别
        """
        level_emoji = {
            "info": "ℹ️",
            "warning": "⚠️",
            "error": "🔴",
            "critical": "🚨"
        }
        
        message = {
            "msg_type": "interactive",
            "card": {
                "header": {
                    "title": f"{level_emoji.get(level, '⚠️')} {title}",
                    "template": self._level_to_color(level)
                },
                "elements": [
                    {
                        "tag": "markdown",
                        "content": content
                    },
                    {
                        "tag": "note",
                        "elements": [
                            {
                                "tag": "text",
                                "text": f"触发时间: {datetime.now():%Y-%m-%d %H:%M:%S}"
                            }
                        ]
                    }
                ]
            }
        }
        
        response = requests.post(
            self.webhook_url,
            json=message,
            headers={"Content-Type": "application/json"},
            timeout=10
        )
        
        if response.status_code != 200:
            print(f"[告警发送失败] HTTP {response.status_code}: {response.text}")
            return False
        
        return True
    
    def _level_to_color(self, level: str) -> str:
        """告警级别到卡片颜色的映射"""
        color_map = {
            "info": "grey",
            "warning": "yellow",
            "error": "red",
            "critical": "red"
        }
        return color_map.get(level, "grey")


# 使用示例
notifier = FeishuNotifier()

# 连接断连告警
notifier.send_alert(
    title="TickDB 连接断开",
    content="**连接状态**: WebSocket 已断开\n**重试次数**: 3\n**影响**: 行情数据中断",
    level="error"
)

5.2 自动重启与自愈

系统崩溃不可怕,可怕的是崩溃后没有人知道、没有人处理。

方案 1:systemd 守护进程(推荐)

# /etc/systemd/system/quant-trader.service
[Unit]
Description=Quant Trader Service
After=network.target

[Service]
Type=simple
User=ubuntu
WorkingDirectory=/home/ubuntu/quant-trader
Environment="PATH=/home/ubuntu/quant-trader/venv/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin"
Environment="TICKDB_API_KEY=your_api_key_here"
Environment="FEISHU_WEBHOOK_URL=https://open.feishu.cn/open-apis/bot/v2/hook/xxxx"
ExecStart=/home/ubuntu/quant-trader/venv/bin/python main.py
Restart=always
RestartSec=10

# 最多重启 10 次,避免死循环
StartLimitBurst=10
StartLimitIntervalSec=300

# 日志输出到 journalctl
StandardOutput=journal
StandardError=journal

[Install]
WantedBy=multi-user.target

启用方式:

sudo systemctl enable quant-trader
sudo systemctl start quant-trader
sudo systemctl status quant-trader  # 查看状态
journalctl -u quant-trader -f      # 实时查看日志

方案 2:supervisord(备选)

# /etc/supervisord.conf
[program:quant-trader]
command=/home/ubuntu/quant-trader/venv/bin/python main.py
directory=/home/ubuntu/quant-trader
user=ubuntu
autostart=true
autorestart=true
startretries=10
stderr_logfile=/var/log/quant-trader.err.log
stdout_logfile=/var/log/quant-trader.out.log
environment=TICKDB_API_KEY="your_key",FEISHU_WEBHOOK_URL="your_webhook"

5.3 日志轮转与归档

日志是排查问题的唯一依据,但日志文件会无限增长,必须配置轮转。

# /etc/logrotate.d/quant-trader
/home/ubuntu/quant-trader/logs/*.log {
    daily           # 每天轮转
    rotate 7        # 保留 7 天
    compress        # 压缩旧日志
    missingok        # 不存在也不报错
    notifempty      # 空文件不轮转
    create 0644 ubuntu ubuntu  # 新建文件权限
    postrotate
        systemctl reload quant-trader > /dev/null 2>&1 || true
    endscript
}

六、分场景部署方案

根据个人开发者的不同需求,提供三档部署方案:

维度 入门版 标准版 进阶版
适用场景 学习验证 个人实盘(≤5 个策略) 多策略并发(≤20 个策略)
服务器 2 核 2G,¥40/月 2 核 4G,¥80/月 4 核 8G,¥200/月
行情数据 TickDB 免费层 TickDB 付费层($15/月) TickDB 付费层 + 备用数据源
数据库 SQLite SQLite + Redis PostgreSQL
监控告警 飞书机器人 飞书机器人 + 基础监控 全套监控 + Grafana
自动运维 systemd systemd + logrotate systemd + Ansible
月均成本 ¥50 ¥150 ¥300

入门版详细配置

# 系统配置
OS: Ubuntu 22.04 LTS
Python: 3.11 (venv 隔离)
磁盘: 50GB SSD (够用)

# 目录结构
/home/ubuntu/quant-trader/
├── main.py              # 入口文件
├── config.py            # 配置管理
├── strategy/            # 策略模块
│   ├── __init__.py
│   ├── mean_reversion.py
│   └── trend_following.py
├── engine/              # 引擎模块
│   ├── __init__.py
│   ├── data_client.py   # TickDB 接入
│   ├── signal.py        # 信号计算
│   ├── executor.py      # 订单执行
│   └── risk.py          # 风险管理
├── data/                # 数据存储
│   ├── positions.db     # 持仓状态
│   └── tickdb_state.db  # 连接状态
├── logs/                # 日志目录
└── requirements.txt     # 依赖清单

七、常见陷阱与避坑指南

陷阱一:过度工程

症状:还没开始做策略,先花了三周写了一个「支持多数据源、多市场、多策略的超级框架」。

后果:框架本身成了维护负担,策略逻辑没写几行。

解法:先跑通一个最简单的策略,再逐步重构。80% 的个人策略,用单进程 + 单数据源 + 单策略就够了。

陷阱二:忽略交易成本

症状:回测年化收益 30%,实盘年化收益 3%。

原因:回测没考虑滑点、佣金冲击,实际交易成本吃掉大部分利润。

解法

# 成本估算模板
def estimate_real_cost(backtest_return: float, avg_trade_count: int) -> float:
    """
    估算实盘成本对收益的影响
    
    假设:
    - 单边佣金:0.03% (美股)
    - 滑点假设:0.05% (流动性正常的标的)
    - 每次交易总成本:0.08%
    """
    commission = 0.0003 * 2 * avg_trade_count  # 双边佣金
    slippage = 0.0005 * 2 * avg_trade_count       # 双边滑点
    total_cost = commission + slippage
    
    real_return = backtest_return - total_cost
    return real_return

# 示例:年交易 50 次,回测收益 30%
# 实际成本:50 * 0.08% * 2 = 8%
# 实盘收益:30% - 8% = 22%

陷阱三:没有断点测试

症状:回测跑了好几年,实盘一跑就崩。

原因:回测环境和实盘环境不一致,数据源不同,API 行为有差异。

解法

  1. 先用历史数据做「模拟实盘」(纸交易),跑至少 2 周
  2. 每天对比模拟订单和实际订单的执行差异
  3. 确认无误后,再切换到实盘

八、结语

一个人、一台云服务器、一套量化系统。

这不是一个「小而美」的故事,这是一个关于约束下的工程决策的故事。

在资源有限的情况下,你必须学会:

  • 优先级排序:什么必须自己做,什么可以依赖现成服务
  • 可量化的取舍:每增加一个依赖,就要问自己「它带来的稳定性收益,值不值得这个维护成本」
  • 自动化优于手动:任何需要人盯着的事情,最终都会失败

系统稳定运行的秘诀只有一个:让它自己跑,让它自己报警,让它自己恢复

剩下的时间,去研究策略,去享受生活,去睡一个安稳觉。


下一步行动

如果你是刚入门量化开发

  1. 在 tickdb.ai 注册,获取免费 API Key(无需信用卡)
  2. 复制本文的行情接入代码,跑通第一个订阅
  3. 用 SQLite 实现一个最简单的持仓记录表

如果你已经在跑策略,但系统不够稳定

  1. 用 systemd 重构你的启动脚本,加上自动重启
  2. 配置飞书机器人告警,确保第一时间知道系统状态
  3. 用状态外置原则重构你的策略,至少把持仓数据存到数据库

如果你想系统性地学习量化工程
关注 TickDB 公众号,每周更新产业链深度拆解和工程实践。

如果你习惯用 AI 辅助开发
在 AI 助手中搜索安装 tickdb-market-data SKILL,用自然语言查询 TickDB 的行情数据能力。


本文不构成任何投资建议。市场有风险,投资需谨慎。