凌晨 3 点,你被手机震醒。
Slack 频道弹出红色告警:你的服务器 CPU 占用 98%,进程全挂。你从床上爬起来,SSH 登录,手忙脚乱地敲命令。重启服务,祈祷数据没丢,打开 Grafana 看损失——还好,只是丢了几分钟的tick数据。
这不是段子。这是每个个人量化开发者都经历过的噩梦。
问题不是你的策略不行,是你的系统太脆弱。
一个人做量化,最难的不是因子挖掘,不是回测调参,而是:用有限的资源,搭一套能 7×24 小时稳定跑、不花冤枉钱、出问题能自动恢复的系统。
本文是一次性解决方案。我们不聊理论,不讲虚的,直接给出一套可运行的轻量架构,配生产级代码,告诉你每分钱花在哪里、每个组件选什么、以及那些“资深架构师不会告诉你的坑”。
一、为什么大多数个人量化系统活不过三个月
我见过太多个人开发者热情满满地搭系统,三个月后热情消退,系统成了僵尸。原因就三个:
第一个坑:过度工程。
上来就 K8s + Docker + 微服务 + Redis + PostgreSQL + Kafka,8 核 16G 服务器还不够用。结果:部署复杂度翻倍,一个配置错误全链路雪崩,维护成本比策略收益还高。
第二个坑:监控裸奔。
没有监控,没有告警,没有日志。出问题了靠“人工巡检”——也就是凌晨 3 点被手机震醒那次。运气好能手动恢复,运气不好,数据丢失,策略重启后参数全乱。
第三个坑:成本失控。
第一版用最高配,云账单来了吓一跳。降配吧,动不动 OOM;不降吧,钱包扛不住。更蠢的是,有些组件明明 5 美元/月能搞定,非要花 50 美元。
这三个坑的共同根源是:把“企业级方案”直接套用到个人项目。
个人量化系统的正确思路是:最小可行架构 + 核心能力不妥协。
二、个人量化系统架构:从"能用"到"好用"的分层设计
2.1 核心设计原则
在动手之前,先记住三条原则:
| 原则 | 含义 | 反例 |
|---|---|---|
| 简单性优先 | 能用一个组件解决的,不用两个 | 用 Kafka 中转消息队列,就为了存日志 |
| 故障自恢复 | 服务挂了要能自动拉起,不用人守着 | 进程挂了只能等下次登录手动启动 |
| 成本可量化 | 每个组件的成本要能算出来 | “这个服务器应该够用吧” |
2.2 轻量架构方案
针对不同阶段的需求,我给出三套方案:
方案 A:最小可行版(预算 < $20/月)
┌─────────────────────────────────────────────────────┐
│ 云服务器 │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 行情接入 │→│ 策略引擎 │→│ 订单执行 │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ ┌────▼────────────▼────────────▼────┐ │
│ │ SQLite (轻量持久化) │ │
│ └───────────────────────────────────┘ │
│ │ │
│ ┌───────▼───────┐ │
│ │ Systemd + 告警 │ │
│ └───────────────┘ │
└─────────────────────────────────────────────────────┘
适用场景:策略验证期、资金量小(< $10,000)、一个人干
组件选型:
- 云服务器:$5-10/月(1 核 1G,如 DigitalOcean / Vultr)
- 数据库:SQLite(别笑,Q1 级别的数据量完全够用)
- 监控:Systemd + 邮件告警
方案 B:进阶版(预算 $20-50/月)
┌─────────────────────────────────────────────────────┐
│ 云服务器 │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 行情接入 │→│ 策略引擎 │→│ 订单执行 │ │ 历史存储 │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │ │
│ ┌────▼────────────▼────────────▼────────────▼────┐ │
│ │ PostgreSQL │ │
│ └─────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
│ ▲
▼ │
┌───────────────────┐ ┌───────────────────┐
│ Grafana + Prometheus│←──│ Telegraf 采集 │
└───────────────────┘ └───────────────────┘
适用场景:策略稳定运行、需要历史数据分析、开始做组合管理
组件选型:
- 云服务器:$20/月(2 核 4G)
- 数据库:PostgreSQL(支持复杂查询,适合多策略)
- 监控:Prometheus + Grafana(免费、开源、成熟)
方案 C:生产级版(预算 $50-100/月)
┌─────────────────────────────────────────────────────────────┐
│ 主服务器 (2核4G) │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 行情接入 │→│ 策略引擎 │→│ 订单执行 │ │ 历史存储 │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │
└───────┼────────────┼────────────┼────────────┼────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────┐
│ 故障转移 + 自动重启 + 状态同步 │
└─────────────────────────────────────────────────────────────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Grafana │ │ 钉钉/飞书│ │ 自动备份 │
│ Dashboard│ │ 告警 │ │ S3/GCS │
└──────────┘ └──────────┘ └──────────┘
适用场景:策略资金量变大(> $50,000)、需要 7×24 小时稳定运行、有团队
2.3 架构选型决策树
你的资金量是多少?
├── < $5,000 → 方案 A (最小可行版)
├── $5,000 - $50,000
│ ├── 策略刚验证 → 方案 A → 稳定后升级到 B
│ └── 策略稳定 > 1个月 → 方案 B
└── > $50,000
├── 单策略 → 方案 B
└── 多策略/组合 → 方案 C
三、生产级监控告警系统:让机器替你守夜
这是本文的核心部分。我给出一套完整的监控系统,包含:
- 行情接入健康检测
- 策略进程守护
- 系统资源告警
- 异常交易检测
- Webhook 告警推送(支持飞书、钉钉、企业微信)
3.1 系统架构
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ 行情接入 │────▶│ 健康检测 │────▶│ 告警推送 │
│ (TickDB) │ │ (Python) │ │ (飞书/钉钉) │
└──────────────┘ └──────────────┘ └──────────────┘
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Systemd │ │
│ │ 守护进程 │ │
│ └──────────────┘ │
│ │ │
▼ ▼ ▼
┌──────────────────────────────────────────────────┐
│ Grafana Dashboard (可选) │
└──────────────────────────────────────────────────┘
3.2 核心监控代码
以下代码可直接运行,包含所有生产级特性:心跳检测、指数退避重连、限频处理、Webhook 告警。
#!/usr/bin/env python3
"""
TickDB 行情接入健康监控系统
生产级实现:心跳检测 + 指数退避重连 + 限频处理 + 多端告警
依赖安装:pip install requests psutil
⚠️ 注意:此代码演示港股、数字货币等支持市场的深度数据监控。
美股/A股不支持 tick 级数据,如需美股行情请使用 kline 数据源。
"""
import os
import sys
import time
import json
import socket
import signal
import logging
import requests
import subprocess
from datetime import datetime
from typing import Optional, Dict, Any
from dataclasses import dataclass, field
# ─────────────────────────────────────────────────────────────
# 配置区域
# ─────────────────────────────────────────────────────────────
@dataclass
class Config:
"""监控配置"""
# TickDB API 配置
api_key: str = os.environ.get("TICKDB_API_KEY", "")
base_url: str = "https://api.tickdb.ai/v1"
# 监控目标
# ⚠️ 支持:港股(XXXXX.HK)、数字货币(BTC.USDT) 等
# ❌ 不支持:美股(AAPL.US)、A股(600000.SH) 的 tick 数据
watch_symbols: list = field(default_factory=lambda: [
"BTC.USDT", # 数字货币 - 完全支持
"9988.HK", # 港股 - 支持 depth 10档
])
# 健康检查参数
check_interval: int = 30 # 检查间隔(秒)
max_reconnect: int = 10 # 最大重连次数
heartbeat_timeout: int = 60 # 心跳超时(秒)
# 告警阈值
cpu_threshold: float = 80.0 # CPU 使用率阈值
memory_threshold: float = 85.0 # 内存使用率阈值
disk_threshold: float = 90.0 # 磁盘使用率阈值
latency_threshold: float = 5000.0 # 延迟阈值(毫秒)
# 告警配置
webhook_url: str = os.environ.get("ALERT_WEBHOOK_URL", "")
alert_cooldown: int = 300 # 告警冷却时间(秒)
# 日志
log_level: str = "INFO"
log_file: str = "/var/log/quant_monitor.log"
# 全局配置
config = Config()
# ─────────────────────────────────────────────────────────────
# 日志配置
# ─────────────────────────────────────────────────────────────
def setup_logging() -> logging.Logger:
"""配置日志"""
logger = logging.getLogger("QuantMonitor")
logger.setLevel(getattr(logging, config.log_level))
# 文件 handler
try:
file_handler = logging.FileHandler(config.log_file)
file_handler.setFormatter(
logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
)
logger.addHandler(file_handler)
except PermissionError:
# 没有写入权限时只输出到 console
pass
# Console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(
logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
)
logger.addHandler(console_handler)
return logger
logger = setup_logging()
# ─────────────────────────────────────────────────────────────
# 工具函数
# ─────────────────────────────────────────────────────────────
def exponential_backoff(retry: int, base: float = 1.0, max_delay: float = 60.0) -> float:
"""
指数退避算法 + 抖动
避免惊群效应:多个实例同时重连导致 API 限频
"""
delay = min(base * (2 ** retry), max_delay)
jitter = random.uniform(0, delay * 0.1) # 10% 抖动
return delay + jitter
def send_alert(title: str, message: str, level: str = "warning") -> bool:
"""
发送告警到飞书/钉钉 Webhook
飞书格式参考:https://open.feishu.cn/document/ukTMukTMukTM/ucTM5YjL3ETO24yNxkj
钉钉格式参考:https://open.dingtalk.com/document/org/custom-robot-access
"""
if not config.webhook_url:
logger.warning(f"[ALERT] {title}: {message}")
return False
try:
payload = {
"msgtype": "markdown",
"markdown": {
"title": f"【{level.upper()}】{title}",
"text": f"### 【{level.upper()}】{title}\n\n"
f"**时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
f"**详情**: {message}\n\n"
f"**主机**: {socket.gethostname()}\n\n"
f"---\n*量化系统监控告警*"
}
}
response = requests.post(
config.webhook_url,
headers={"Content-Type": "application/json"},
data=json.dumps(payload),
timeout=(3.05, 10) # 超时设置
)
# ⚠️ 限频处理:Webhook 平台也有频率限制
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
logger.warning(f"Webhook 限频,等待 {retry_after} 秒")
time.sleep(retry_after)
return False
result = response.json()
if result.get("code") == 0 or result.get("errcode") == 0:
logger.info(f"告警发送成功: {title}")
return True
else:
logger.error(f"告警发送失败: {result}")
return False
except requests.exceptions.Timeout:
logger.error("Webhook 请求超时")
return False
except Exception as e:
logger.error(f"告警发送异常: {e}")
return False
def get_system_metrics() -> Dict[str, Any]:
"""获取系统资源指标"""
import psutil
return {
"cpu_percent": psutil.cpu_percent(interval=1),
"memory_percent": psutil.virtual_memory().percent,
"disk_percent": psutil.disk_usage("/").percent,
"load_average": os.getloadavg() if hasattr(os, "getloadavg") else (0, 0, 0),
"process_count": len(psutil.pids())
}
# ─────────────────────────────────────────────────────────────
# TickDB API 交互层
# ─────────────────────────────────────────────────────────────
class TickDBClient:
"""
TickDB API 客户端
⚠️ 数据能力说明:
- 港股 depth:10 档订单簿
- 数字货币 depth:10 档订单簿
- 美股 depth:1 档(不支持 tick 级逐笔成交)
- 美股/A股:trades 接口不支持
参考:https://tickdb.ai/docs
"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = config.base_url
self.session = requests.Session()
self.session.headers.update({
"X-API-Key": api_key,
"Content-Type": "application/json"
})
def _handle_rate_limit(self, response: requests.Response) -> Optional[Any]:
"""处理 API 限频 (code: 3001)"""
if response.status_code == 200:
return response.json()
try:
data = response.json()
code = data.get("code", 0)
if code == 0:
return data
# 限频处理
if code == 3001:
retry_after = int(response.headers.get("Retry-After", 5))
logger.warning(f"API 限频,等待 {retry_after} 秒")
time.sleep(retry_after)
return None
# 其他错误码
error_messages = {
1001: "API Key 无效",
1002: "API Key 缺失",
2002: "交易品种不存在",
}
raise RuntimeError(f"API 错误 {code}: {error_messages.get(code, '未知错误')}")
except json.JSONDecodeError:
raise RuntimeError(f"响应解析失败: {response.text}")
def get_symbol_info(self, symbol: str) -> Optional[Dict[str, Any]]:
"""获取交易品种信息"""
try:
response = self.session.get(
f"{self.base_url}/market/symbol/{symbol}",
timeout=(3.05, 10)
)
return self._handle_rate_limit(response)
except requests.exceptions.Timeout:
logger.error(f"获取 {symbol} 信息超时")
return None
def get_depth_snapshot(self, symbol: str) -> Optional[Dict[str, Any]]:
"""
获取订单簿快照 (depth 频道)
⚠️ 注意:
- 港股/数字货币:返回最大 10 档
- 美股:仅 1 档
- 外汇/贵金属/指数:不支持 depth
"""
try:
response = self.session.get(
f"{self.base_url}/market/depth/{symbol}",
params={"limit": 10},
timeout=(3.05, 10)
)
return self._handle_rate_limit(response)
except requests.exceptions.Timeout:
logger.error(f"获取 {symbol} depth 超时")
return None
def get_latest_price(self, symbol: str) -> Optional[float]:
"""获取最新价格(用于延迟计算)"""
depth = self.get_depth_snapshot(symbol)
if depth and "data" in depth:
bids = depth["data"].get("bids", [])
asks = depth["data"].get("asks", [])
if bids and asks:
return (float(bids[0][0]) + float(asks[0][0])) / 2
return None
# ─────────────────────────────────────────────────────────────
# 健康检查引擎
# ─────────────────────────────────────────────────────────────
class HealthChecker:
"""系统健康检查引擎"""
def __init__(self, tickdb_client: TickDBClient):
self.client = tickdb_client
self.last_alert_time: Dict[str, float] = {}
self.last_heartbeat: Dict[str, float] = {}
self.consecutive_failures: Dict[str, int] = {}
def check_and_alert(self, metric_name: str, current: float, threshold: float,
message: str) -> bool:
"""检查指标,超阈值则告警(带冷却)"""
is_healthy = current < threshold
if not is_healthy:
now = time.time()
last_alert = self.last_alert_time.get(metric_name, 0)
# 冷却期内不重复告警
if now - last_alert > config.alert_cooldown:
self.last_alert_time[metric_name] = now
send_alert(
title=f"系统指标异常: {metric_name}",
message=f"{message}\n当前值: {current:.2f},阈值: {threshold}",
level="warning"
)
return is_healthy
def check_system_resources(self) -> bool:
"""检查系统资源(CPU/内存/磁盘)"""
metrics = get_system_metrics()
all_healthy = True
logger.debug(f"系统指标: {metrics}")
# CPU 检查
if not self.check_and_alert(
"CPU",
metrics["cpu_percent"],
config.cpu_threshold,
f"CPU 使用率过高"
):
all_healthy = False
# 内存检查
if not self.check_and_alert(
"Memory",
metrics["memory_percent"],
config.memory_threshold,
f"内存使用率过高"
):
all_healthy = False
# 磁盘检查
if not self.check_and_alert(
"Disk",
metrics["disk_percent"],
config.disk_threshold,
f"磁盘使用率过高"
):
all_healthy = False
return all_healthy
def check_data_feed(self, symbol: str) -> bool:
"""
检查行情接入健康状态
⚠️ 核心逻辑:
1. 获取 depth 快照
2. 检查买卖价差是否合理
3. 检查买卖深度是否平衡
"""
try:
depth = self.client.get_depth_snapshot(symbol)
if depth is None:
# API 调用失败(可能是限频)
failures = self.consecutive_failures.get(symbol, 0) + 1
self.consecutive_failures[symbol] = failures
if failures >= 3:
send_alert(
title=f"行情接入异常: {symbol}",
message=f"连续 {failures} 次获取 depth 失败",
level="critical"
)
return False
# 重置失败计数
self.consecutive_failures[symbol] = 0
# 分析订单簿健康度
data = depth.get("data", {})
bids = data.get("bids", [])
asks = data.get("asks", [])
if not bids or not asks:
send_alert(
title=f"订单簿数据异常: {symbol}",
message="买卖盘数据为空",
level="warning"
)
return False
# 计算买卖压力比
bid_volume = sum(float(b[1]) for b in bids[:5])
ask_volume = sum(float(a[1]) for a in asks[:5])
pressure_ratio = bid_volume / ask_volume if ask_volume > 0 else 0
# 计算买卖价差(相对值)
best_bid = float(bids[0][0])
best_ask = float(asks[0][0])
spread_pct = (best_ask - best_bid) / best_bid * 100 if best_bid > 0 else 0
logger.debug(
f"{symbol} - 压力比: {pressure_ratio:.2f}, "
f"价差: {spread_pct:.4f}%, "
f"买卖量: {bid_volume:.2f}/{ask_volume:.2f}"
)
# 异常检测
# 1. 价差异常(对于数字货币,正常 < 0.1%)
if spread_pct > 1.0:
send_alert(
title=f"订单簿异常: {symbol}",
message=f"买卖价差过大: {spread_pct:.4f}%",
level="warning"
)
# 2. 压力比极端(可能流动性枯竭)
if pressure_ratio > 10 or pressure_ratio < 0.1:
send_alert(
title=f"流动性异常: {symbol}",
message=f"买卖压力比极端: {pressure_ratio:.2f}",
level="warning"
)
return True
except Exception as e:
logger.error(f"行情检查异常: {e}")
return False
# ─────────────────────────────────────────────────────────────
# 主监控循环
# ─────────────────────────────────────────────────────────────
class QuantMonitor:
"""量化系统监控主程序"""
def __init__(self):
self.running = False
self.tickdb_client: Optional[TickDBClient] = None
self.health_checker: Optional[HealthChecker] = None
# 信号处理:优雅退出
signal.signal(signal.SIGTERM, self._signal_handler)
signal.signal(signal.SIGINT, self._signal_handler)
def _signal_handler(self, signum, frame):
"""处理停止信号"""
logger.info("收到停止信号,正在关闭...")
self.running = False
def init(self):
"""初始化组件"""
if not config.api_key:
logger.error("未设置 TICKDB_API_KEY 环境变量")
sys.exit(1)
self.tickdb_client = TickDBClient(config.api_key)
self.health_checker = HealthChecker(self.tickdb_client)
# 启动时发送通知
send_alert(
title="监控系统启动",
message=f"主机 {socket.gethostname()} 量化监控已启动",
level="info"
)
def run(self):
"""主监控循环"""
self.running = True
logger.info("量化监控系统启动")
while self.running:
try:
# 1. 检查系统资源
logger.debug("检查系统资源...")
self.health_checker.check_system_resources()
# 2. 检查各交易品种行情接入
for symbol in config.watch_symbols:
logger.debug(f"检查 {symbol} 行情接入...")
self.health_checker.check_data_feed(symbol)
# 3. 检查策略进程
self._check_strategy_processes()
logger.debug(f"监控周期完成,等待 {config.check_interval} 秒...")
time.sleep(config.check_interval)
except KeyboardInterrupt:
logger.info("收到键盘中断")
break
except Exception as e:
logger.error(f"监控循环异常: {e}")
# 异常时短暂等待,避免死循环打印
time.sleep(5)
self._shutdown()
def _check_strategy_processes(self):
"""检查策略进程状态"""
# ⚠️ 示例:检查 named 进程是否存在
# 实际使用中替换为你的策略进程名称
strategy_names = ["my_strategy", "arbitrage_engine"]
for name in strategy_names:
try:
result = subprocess.run(
["pgrep", "-f", name],
capture_output=True,
timeout=5
)
if result.returncode != 0:
# 进程不存在,尝试启动
logger.warning(f"策略 {name} 未运行,尝试重启...")
send_alert(
title=f"策略进程异常: {name}",
message=f"策略 {name} 进程未运行,尝试自动重启",
level="warning"
)
# 实际重启逻辑(需要完整路径和启动脚本)
# subprocess.Popen(["/opt/strategies/start.sh", name])
except Exception as e:
logger.error(f"检查进程 {name} 失败: {e}")
def _shutdown(self):
"""关闭监控"""
logger.info("监控系统已关闭")
send_alert(
title="监控系统关闭",
message=f"主机 {socket.gethostname()} 量化监控已停止",
level="info"
)
# ─────────────────────────────────────────────────────────────
# 程序入口
# ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
# ⚠️ 生产环境高频场景建议使用 asyncio
# 以下为简化版演示,生产级请参考:https://tickdb.ai/docs/websocket
monitor = QuantMonitor()
monitor.init()
monitor.run()
3.3 Systemd 守护配置
将监控脚本注册为系统服务,实现进程崩溃自动重启:
# /etc/systemd/system/quant-monitor.service
[Unit]
Description=Quant System Health Monitor
After=network.target
StartLimitIntervalSec=300
StartLimitBurst=5
[Service]
Type=simple
User=ubuntu
WorkingDirectory=/opt/quant
Environment="TICKDB_API_KEY=your_api_key_here"
Environment="ALERT_WEBHOOK_URL=https://open.feishu.cn/..."
ExecStart=/usr/bin/python3 /opt/quant/monitor.py
Restart=always
RestartSec=10
# 日志配置
StandardOutput=journal
StandardError=journal
SyslogIdentifier=quant-monitor
# 安全加固
NoNewPrivileges=true
ProtectSystem=strict
ProtectHome=true
ReadOnlyPaths=/
[Install]
WantedBy=multi-user.target
# 启用并启动服务
sudo systemctl enable quant-monitor
sudo systemctl start quant-monitor
# 查看状态
sudo systemctl status quant-monitor
# 查看日志
journalctl -u quant-monitor -f
四、成本控制:每一分钱的去处
4.1 月度成本清单
| 组件 | 方案 A | 方案 B | 方案 C | 说明 |
|---|---|---|---|---|
| 云服务器 | $5-10 | $20-25 | $40-50 | Vultr / DigitalOcean / AWS LightSail |
| PostgreSQL | $0 | $0 | $15-25 | 方案 C 用 RDS,方案 B 用自建 |
| 监控存储 | $0 | $5-10 | $10-15 | Prometheus + Grafana Cloud / 自建 |
| 告警服务 | $0 | $0 | $0 | 飞书/钉钉 Webhook 免费 |
| 数据源 | $0-29 | $0-29 | $0-99 | TickDB 免费层 / 专业版 |
| 合计 | $5-39 | $25-64 | $65-189 |
4.2 成本优化技巧
| 优化项 | 技巧 | 节省比例 |
|---|---|---|
| 服务器 | 用年付而非月付 | 20-30% |
| 数据库 | 个人项目用 SQLite/PostgreSQL 自建,不上 RDS | 80% |
| 监控 | Prometheus + Grafana 开源版,免费云端存储 | 100% |
| 数据源 | 先用免费层验证,策略稳定后再付费 | 初期 100% |
| 备用服务器 | 不需要,Systemd 自动恢复即可 | 100% |
我的建议:从方案 A 开始,策略验证跑通后按需升级。不要在验证期花冤枉钱。
五、分场景部署指南
5.1 一小时快速部署
如果你是刚起步,想在一小时内让系统跑起来:
# 1. 购买云服务器(Vultr 1核1G $5/月)
# 2. SSH 登录
# 安装依赖
sudo apt update && sudo apt install -y python3 python3-pip git
# 克隆你的策略仓库(假设是 GitHub)
git clone https://github.com/yourname/quant-strategy.git
cd quant-strategy
# 安装 Python 依赖
pip3 install -r requirements.txt
# 配置环境变量
echo 'export TICKDB_API_KEY="your_key"' >> ~/.bashrc
source ~/.bashrc
# 运行监控
python3 monitor.py &
# 配置 Systemd 守护
sudo cp quant-monitor.service /etc/systemd/system/
sudo systemctl enable quant-monitor
sudo systemctl start quant-monitor
5.2 进阶配置(策略稳定后)
策略稳定运行 1 个月后,升级到方案 B:
# 1. 升级云服务器到 2核4G
# 2. 安装 PostgreSQL
sudo apt install -y postgresql postgresql-contrib
# 3. 创建数据库
sudo -u postgres psql
CREATE DATABASE quant_db;
CREATE USER quant_user WITH ENCRYPTED PASSWORD 'your_password';
GRANT ALL PRIVILEGES ON DATABASE quant_db TO quant_user;
# 4. 安装监控栈
docker pull prom/prometheus
docker pull grafana/grafana
# 5. 配置 Prometheus(prometheus.yml)
cat > prometheus.yml << EOF
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'quant-monitor'
static_configs:
- targets: ['localhost:8000']
EOF
六、Grafana 监控面板配置
最后一步:给老板(未来的自己)一个漂亮的 Dashboard。
6.1 核心看板指标
| 面板 | 查询语句 | 告警阈值 |
|---|---|---|
| 系统 CPU | 100 - (avg by (instance) (rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) |
> 80% |
| 内存使用 | 100 * (1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) |
> 85% |
| 策略进程状态 | up{job="quant-strategy"} |
= 0 |
| 数据延迟 | tickdb_data_latency_seconds |
> 5s |
| 订单簿价差 | tickdb_spread_percent{symbol="BTC.USDT"} |
> 0.5% |
6.2 告警规则配置
# Grafana 告警规则(provisioning 格式)
apiVersion: 1
groups:
- name: quant_system
interval: 1m
rules:
- alert: HighCPU
expr: cpu_usage > 80
for: 5m
labels:
severity: warning
annotations:
summary: "CPU 使用率过高"
description: "当前 {{ $value }}%"
- alert: StrategyDown
expr: up{job="quant-strategy"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "策略进程已停止"
description: "策略已停止运行超过 1 分钟"
七、总结:让机器替你守夜
回到开头的场景:凌晨 3 点被手机震醒。
如果你按本文搭了监控系统,剧情会变成这样:
凌晨 3 点,飞书弹出告警:“订单簿价差异常,BTC.USDT”。你迷迷糊糊看了一眼,发现是正常波动,继续睡。第二天早上看日志,凌晨 3:15 系统自动重连,3:16 恢复运行,数据没丢。
这才是个人量化系统的正确姿势:人睡觉,机器干活。
下一步行动
如果你是刚起步的量化新手:
- 购买 $5/月的 Vultr 服务器
- 注册 TickDB 获取免费 API Key
- 复制本文监控代码,一小时跑起来
如果你的策略已经稳定运行:
- 升级到方案 B($20-50/月)
- 配置 Grafana Dashboard,把所有指标可视化
- 联系 [email protected] 了解专业版数据源
如果你习惯用 AI 辅助开发:
在 AI 助手中搜索安装 tickdb-market-data SKILL,用自然语言查询市场数据。
风险提示:本文不构成任何投资建议。量化交易存在风险,包括但不限于模型过拟合、市场条件变化、系统故障等。建议在实际使用前充分测试,并仅使用可承受损失的资金。
凌晨 3 点,你的手机安静了。