量化系统进程守护:用 supervisor 和 systemd 让你的策略永不宕机


凌晨 3:47,你被手机震动惊醒。策略账户亏损了 12 万。

原因是下午 6 点的那次 OOM Kill——服务器内存不足,Python 进程被系统干掉了。没有人知道。直到美股盘前数据开始推送,你的策略发现数据源断了,开始疯狂重试,然后触发风控阈值。

你盯着日志里那个孤零零的 Killed 关键字,第一次认真思考一个问题:我的策略凭什么假定自己会一直运行?


一、为什么量化系统需要进程守护

在生产环境中,量化策略面临的威胁远比“代码 bug”复杂得多:

威胁类型 典型场景 后果
进程崩溃 内存泄漏、段错误、未捕获异常 策略停止,订单悬空
网络中断 WebSocket 断连、API 超时 数据流中断,重试风暴
系统资源耗尽 OOM、磁盘写满、文件句柄耗尽 进程被 kill
服务器重启 手动重启、IDC 故障、实例迁移 策略不自动恢复
依赖服务宕机 Redis 挂了、数据库断了 策略异常退出

很多新手写的策略脚本是这样的:

python my_strategy.py &

然后祈祷它一直跑。这不是“运行策略”,这是“在运行期间赌运气”。

进程守护的本质:让系统帮你盯着进程。进程挂了,帮你拉起来;服务器重启了,帮你自动启动;进程不对劲了,帮你告警。


二、方案选型: supervisor 和 systemd 怎么选

量化系统的进程守护有两套成熟方案:

维度 supervisor systemd
学习曲线 低,上手快 中,需要理解 unit 文件
功能边界 专注进程管理 整个系统的 init 方案
适用场景 个人/小团队,进程数量少 服务器运维、进程多、需要系统级集成
进程隔离 非常好
依赖管理 弱(需要自己写启动顺序) 强(支持 After=Requires=
资源限制 基本 完整(CPU、内存、文件句柄)
日志管理 自带 logrotate journald 自动管理

实操建议

  • 个人服务器、量化容器:先用 supervisor,配置简单,调试方便
  • 机构环境、生产服务器:上 systemd,稳定性更强,资源隔离更完整
  • 两者不冲突,可以共存(但建议统一用 systemd,避免管理碎片化)

三、supervisor 深度配置:从安装到生产级

3.1 安装与基础架构

# Ubuntu/Debian
sudo apt-get update
sudo apt-get install supervisor

# CentOS/RHEL
sudo yum install supervisor

# 启动并设置开机自启
sudo systemctl enable supervisord
sudo systemctl start supervisord

supervisor 的核心架构:

supervisord (主进程,daemon)
    ├── supervisord.conf (主配置)
    │   └── include 目录下的 .ini 文件
    └──supervisorctl (命令行控制工具)

主配置文件位于 /etc/supervisor/supervisord.conf,业务配置放在 /etc/supervisor/conf.d/ 目录下。

3.2 生产级策略配置模板

假设你的量化策略是这样的结构:

/home/quant/
├── strategies/
│   ├── momentum_strategy.py
│   └── mean_reversion_strategy.py
├── config/
│   └── config.yaml
├── logs/
│   └── (由 supervisor 管理)
└── requirements.txt

创建一个专门给 momentum_strategy 用的 supervisor 配置:

; /etc/supervisor/conf.d/momentum_strategy.ini

[program:momentum_strategy]
; 命令行必须是绝对路径
command=/home/quant/miniconda3/envs/quant/bin/python /home/quant/strategies/momentum_strategy.py

; 工作目录
directory=/home/quant

; 环境变量(API Key 等敏感信息)
environment=PYTHONPATH="/home/quant",TICKDB_API_KEY="%(ENV_TICKDB_API_KEY)s"

; 用户身份(不要用 root 运行策略!)
user=quant

; 自动重启策略
autorestart=true
startretries=3
exitcodes=0,2          ; 只有正常退出或被 SIGTERM 才认为需要重启

; 进程启动行为
startsecs=10           ; 启动后运行 10 秒才算成功(给 WebSocket 预热时间)
stopwaitsecs=30        ; 等待进程优雅退出的时间

; 状态检测(关键!)
; 进程状态文件位置
stdout_logfile=/var/log/supervisor/momentum_strategy.log
stdout_logfile_maxbytes=100MB
stdout_logfile_backups=5
stderr_logfile=/var/log/supervisor/momentum_strategy_err.log
stderr_logfile_maxbytes=50MB
stderr_logfile_backups=5

; 重定向 stdin(某些策略需要)
; stdin_none_file=/dev/null
; stdout_none_file=/dev/null

; 优先级(数字越小越先启动,数字越大越先停止)
priority=999

; 杀掉进程时发送 SIGKILL 的等待时间(超过则强制 kill)
stopasgroup=true
killasgroup=true

3.3 让 WebSocket 策略稳定运行

对于使用 WebSocket 接收 TickDB 数据的策略,核心诉求是:断线重连。这需要在代码层面和 supervisor 层面双重保障。

策略代码中的重连逻辑(关键部分):

# /home/quant/strategies/momentum_strategy.py

import os
import time
import json
import websocket
from datetime import datetime

class TickDBWebSocket:
    """TickDB WebSocket 连接管理器(生产级)"""
    
    def __init__(self, symbols: list, api_key: str):
        self.symbols = symbols
        self.api_key = api_key
        self.ws = None
        self.reconnect_delay = 1  # 初始重连延迟(秒)
        self.max_reconnect_delay = 60  # 最大重连延迟
        self.max_reconnect_attempts = 0  # 0 表示无限重连
        self._running = False
    
    def connect(self):
        """建立 WebSocket 连接"""
        # TickDB WebSocket 认证通过 URL 参数
        url = f"wss://api.tickdb.ai/ws/v1/market?api_key={self.api_key}"
        
        self.ws = websocket.WebSocketApp(
            url,
            on_message=self._on_message,
            on_error=self._on_error,
            on_close=self._on_close,
            on_open=self._on_open,
        )
        
        # ⚠️ 生产环境建议使用线程运行,而非阻塞主线程
        self._running = True
        self.ws.run_forever(
            ping_interval=30,  # 心跳保活
            ping_timeout=10,
            reconnect=0  # 关闭自动重连,手动控制
        )
    
    def _on_message(self, ws, message):
        """处理接收到的消息"""
        try:
            data = json.loads(message)
            # 消息类型判断
            if data.get("type") == "depth":
                self._handle_depth(data)
            elif data.get("type") == "trade":
                self._handle_trade(data)
        except json.JSONDecodeError:
            pass  # 心跳包等非 JSON 数据
    
    def _on_error(self, ws, error):
        """WebSocket 错误处理"""
        print(f"[{datetime.now()}] WebSocket 错误: {error}")
        # ⚠️ 关键:不要在回调中重连,会阻塞事件循环
        # 应该让外部监控线程处理
    
    def _on_close(self, ws, close_status_code, close_msg):
        """连接关闭回调"""
        print(f"[{datetime.now()}] WebSocket 关闭,状态码: {close_status_code}")
        if self._running:
            self._schedule_reconnect()
    
    def _on_open(self, ws):
        """连接建立后,订阅数据频道"""
        # 订阅订单簿深度数据(美股 1 档)
        subscribe_depth = {
            "cmd": "sub",
            "channel": "depth",
            "symbol": self.symbols
        }
        self.ws.send(json.dumps(subscribe_depth))
        
        # 重置重连延迟
        self.reconnect_delay = 1
        print(f"[{datetime.now()}] WebSocket 连接成功,已订阅: {self.symbols}")
    
    def _schedule_reconnect(self):
        """指数退避重连调度"""
        print(f"[{datetime.now()}] 计划 {self.reconnect_delay} 秒后重连...")
        time.sleep(self.reconnect_delay)
        
        # 指数退避:延迟翻倍,但不超过最大值
        self.reconnect_delay = min(
            self.reconnect_delay * 2 + random.uniform(0, 1),  # 加抖动
            self.max_reconnect_delay
        )
        
        # ⚠️ 这里使用 daemon=False 的线程,以便 supervisor 能正确检测进程状态
        reconnect_thread = threading.Thread(target=self.connect, daemon=False)
        reconnect_thread.start()
    
    def _handle_depth(self, data):
        """处理订单簿数据"""
        symbol = data.get("symbol")
        bids = data.get("bids", [])
        asks = data.get("asks", [])
        
        # 计算买卖压力比
        total_bid_volume = sum(float(b[1]) for b in bids)
        total_ask_volume = sum(float(a[1]) for a in asks)
        
        if total_ask_volume > 0:
            pressure_ratio = total_bid_volume / total_ask_volume
            # 压力比 > 2 通常意味着买盘强势
            if pressure_ratio > 2:
                self._emit_signal(symbol, "BUY", pressure_ratio)
            elif pressure_ratio < 0.5:
                self._emit_signal(symbol, "SELL", pressure_ratio)
    
    def _emit_signal(self, symbol, direction, ratio):
        """发出交易信号"""
        print(f"[{datetime.now()}] 信号: {symbol} {direction} (压力比: {ratio:.2f})")
        # 这里连接你的下单引擎
    
    def start(self):
        """启动连接(供 supervisor 管理的主入口)"""
        print(f"[{datetime.now()}] 策略启动,环境检查...")
        
        # 前置检查:验证 API Key 有效
        if not self.api_key:
            raise ValueError("TICKDB_API_KEY 环境变量未设置")
        
        # ⚠️ 生产环境:建议在这里启动健康检查线程
        health_thread = threading.Thread(target=self._health_check_loop, daemon=True)
        health_thread.start()
        
        self.connect()


def main():
    api_key = os.environ.get("TICKDB_API_KEY")
    symbols = ["AAPL.US", "NVDA.US", "TSLA.US"]
    
    ws_manager = TickDBWebSocket(symbols, api_key)
    ws_manager.start()


if __name__ == "__main__":
    main()

3.4 supervisor 管理命令速查

# 读取并加载新配置
sudo supervisorctl reread
sudo supervisorctl update

# 查看所有进程状态
sudo supervisorctl status

# 单独管理某个进程
sudo supervisorctl start momentum_strategy
sudo supervisorctl stop momentum_strategy
sudo supervisorctl restart momentum_strategy

# 实时查看日志
sudo supervisorctl tail -f momentum_strategy
sudo supervisorctl tail -f momentum_strategy stderr

# 进入交互式控制台
sudo supervisorctl

3.5 健康检查脚本(可选增强)

supervisor 本身不提供 HTTP 健康检查端点,如果你的系统需要被外部监控(如 Prometheus、飞书机器人),可以给策略加一个轻量的 HTTP 健康检查端口:

# /home/quant/strategies/momentum_strategy.py

from http.server import HTTPServer, BaseHTTPRequestHandler
import threading

class HealthCheckHandler(BaseHTTPRequestHandler):
    """轻量健康检查端点"""
    
    def do_GET(self):
        if self.path == "/health":
            # 检查策略核心状态
            is_healthy = (
                self.server.strategy_ws.ws is not None and
                self.server.strategy_ws.ws.sock is not None and
                self.server.strategy_ws.ws.sock.connected
            )
            
            self.send_response(200 if is_healthy else 503)
            self.send_header("Content-Type", "application/json")
            self.end_headers()
            self.wfile.write(json.dumps({
                "status": "healthy" if is_healthy else "unhealthy",
                "uptime": time.time() - self.server.start_time
            }).encode())
        else:
            self.send_response(404)
            self.end_headers()
    
    def log_message(self, format, *args):
        pass  # 抑制日志输出

def start_health_server(ws_manager):
    """启动健康检查 HTTP 服务"""
    server = HTTPServer(("0.0.0.0", 8080), HealthCheckHandler)
    server.strategy_ws = ws_manager
    server.start_time = time.time()
    server.serve_forever()

四、systemd 深度配置:企业级守护方案

4.1 基础概念速览

systemd 是 Linux 系统的 init 进程(PID 1),它的核心概念:

  • unit:系统管理的资源单元,类型包括 .service.socket.timer.path
  • target:类似“运行级别”的概念
  • journal:systemd 的日志系统(journalctl 查看)

我们的目标是创建 .service 文件来管理量化策略。

4.2 systemd Unit 文件编写

; /etc/systemd/system/quant-momentum.service

[Unit]
Description=Momentum Strategy - TickDB WebSocket Market Data
Documentation=https://your-docs.internal/strategies/momentum
After=network-online.target    ; 网络完全就绪后再启动
Wants=network-online.target

[Service]
Type=simple                     ; 简单进程(默认,推荐)
User=quant
Group=quant

; ⚠️ 重要:工作目录
WorkingDirectory=/home/quant

; 完整命令(含路径)
ExecStart=/home/quant/miniconda3/envs/quant/bin/python /home/quant/strategies/momentum_strategy.py

; 环境变量文件(推荐,API Key 不写在 unit 文件里)
EnvironmentFile=/home/quant/config/secrets.env

; 优雅停止(给进程 60 秒的优雅退出时间)
; systemd 会发送 SIGTERM,策略需要捕获并做清理
TimeoutStopSec=60

; 重启策略
Restart=on-failure              ; 仅在异常退出时重启
RestartSec=5                    ; 等待 5 秒后重启
StartLimitIntervalSec=300       ; 5 分钟内
StartLimitBurst=5               ; 最多重启 5 次

; 防止 systemd 杀死整个进程组(防止子进程变成孤儿)
KillMode=process

; 日志管理(交给 journald)
StandardOutput=journal
StandardError=journal
SyslogIdentifier=momentum_strategy

; 资源限制(生产环境强烈建议设置)
; 防止策略耗尽系统资源
LimitNOFILE=65535               ; 最大文件句柄数
LimitNPROC=4096                 ; 最大进程数
MemoryMax=2G                    ; 最大内存(2GB)
CPUQuota=80%                    ; CPU 时间配额

[Install]
WantedBy=multi-user.target      ; 开机自启

环境变量文件/home/quant/config/secrets.env,权限设为 600):

# /home/quant/config/secrets.env
TICKDB_API_KEY=tk_live_xxxxxxxxxxxx
REDIS_HOST=localhost
REDIS_PORT=6379
# 设置正确权限(防止其他用户读取)
sudo chmod 600 /home/quant/config/secrets.env
sudo chown quant:quant /home/quant/config/secrets.env

4.3 量化策略的 systemd 配置模板

对于有多个策略的量化团队,建议统一管理:

/etc/systemd/system/
├── quant-momentum.service      # 动量策略
├── quant-mean-rev.service      # 均值回归策略
├── quant-scanner.service       # 全市场扫描器
└── quant-monitor.service       # 统一监控脚本

每个服务文件的关键差异

# 均值回归策略 - 优先级更高,依赖 Redis
[Unit]
Description=Mean Reversion Strategy
After=network-online.target redis.service
Wants=network-online.target redis.service

# 动量策略 - 可以在均值回归之后启动
[Unit]
Description=Momentum Strategy
After=network-online.target quant-mean-rev.service
Wants=quant-mean-rev.service

4.4 systemd Timer:定时任务与健康检查

systemd 的 Timer 是 cron 的替代品,优势在于:

  • 任务执行日志统一在 journald 中
  • 可以设置精确的调度
  • 可以设置任务执行后多久内必须完成(防止任务堆积)
  • 可以设置任务的依赖关系

场景:每隔 5 分钟检查策略是否存活,未存活则重启

; /etc/systemd/system/quant-momentum-healthcheck.timer

[Unit]
Description=定时检查 Momentum 策略健康状态

[Timer]
# 每隔 5 分钟执行一次
OnBootSec=1min       ; 开机 1 分钟后开始
OnUnitActiveSec=5min

# 如果错过了调度时间(比如系统休眠),立即执行
Persistent=true

[Install]
WantedBy=timers.target
; /etc/systemd/system/quant-momentum-healthcheck.service

[Unit]
Description=检查 Momentum 策略进程状态

[Service]
Type=oneshot          ; 一次性任务,执行完就退出
ExecStart=/home/quant/scripts/healthcheck.sh momentum_strategy
User=root
#!/bin/bash
# /home/quant/scripts/healthcheck.sh

SERVICE_NAME=$1
STATUS=$(systemctl is-active ${SERVICE_NAME}.service)

if [ "$STATUS" != "active" ]; then
    echo "[$(date)] 检测到 ${SERVICE_NAME} 不活跃,尝试重启..."
    systemctl restart ${SERVICE_NAME}.service
    
    # 等待几秒后再次检查
    sleep 5
    NEW_STATUS=$(systemctl is-active ${SERVICE_NAME}.service)
    
    if [ "$NEW_STATUS" != "active" ]; then
        # 发送告警(这里用飞书 webhook 示例)
        curl -X POST 'https://open.feishu.cn/open-apis/bot/v2/hook/xxx' \
            -H 'Content-Type: application/json' \
            -d '{
                "msg_type": "text",
                "content": {"text": "【告警】'${SERVICE_NAME}' 重启失败,请检查!"}
            }'
    fi
fi

启动定时器

sudo systemctl daemon-reload
sudo systemctl enable --now quant-momentum-healthcheck.timer

4.5 systemd 常用命令

# 启动、停止、重启
sudo systemctl start quant-momentum.service
sudo systemctl stop quant-momentum.service
sudo systemctl restart quant-momentum.service

# 查看状态(详细信息)
sudo systemctl status quant-momentum.service

# 查看日志
journalctl -u quant-momentum.service -f        # 实时跟踪
journalctl -u quant-momentum.service --since "1 hour ago"
journalctl -u quant-momentum.service -p err    # 仅错误

# 开机自启
sudo systemctl enable quant-momentum.service
sudo systemctl disable quant-momentum.service

# 查看所有 unit 状态
systemctl list-units --type=service --state=running

五、两种方案对比与选型建议

维度 supervisor systemd
适用场景 个人开发者、小团队 机构服务器、生产环境
配置复杂度
进程数量 <10 个推荐 任意数量
资源限制 需配合 ulimit 原生支持(MemoryMax、CPUQuota)
日志轮转 需配置 logrotate journald 自动管理
依赖管理 强(After=Requires=
定时任务 需配合 cron 原生 Timer
容器兼容 在容器内运行有限制 容器化环境标准方案

实际推荐

  • Docker/Kubernetes 环境:用 systemd 管理宿主机,容器内部署策略(容器自带进程管理)
  • 裸金属/虚拟机:直接用 systemd,简单统一
  • 不想学 systemd 的个人开发者:supervisor 完全够用,配置更直观

六、完整监控体系:进程守护 + 数据监控 + 告警

进程守护只是最后一层保险。真正稳健的系统需要多层监控:

层级 监控内容 工具推荐
进程层 进程是否存活、内存/CPU 占用 systemd/supervisor
连接层 WebSocket 是否连接、数据是否推送 策略内心跳 + 监控脚本
业务层 订单是否正常、头寸是否符合预期 风控脚本
系统层 磁盘、内存、网络 Prometheus + Grafana

TickDB + 进程守护的组合

[TickDB] --- WebSocket ---> [量化策略] 
                                |
                                v
                        [systemd/supervisor]
                                |
                                v
                        [心跳检测] ---> [告警]

七、总结

进程守护不是可选项,而是生产环境的入场券

  • 如果你的策略崩溃后不会自动拉起,你就是在用真金白银“实时看护”
  • 如果你的服务器重启后策略不会自动恢复,你就是在“每天手动检查”
  • 如果你的 WebSocket 断连后没有重连机制,你的策略就是在“裸奔”

supervisor 适合快速上手,配置文件直观,适合个人开发者。

systemd 是 Linux 事实标准,功能完整,资源隔离强,适合生产环境。

无论选哪个,记住三个核心配置:

  1. autorestart=trueRestart=on-failure
  2. 日志输出重定向(方便排查问题)
  3. 优雅停止(TimeoutStopSec,给策略时间做清理)

下一步行动

如果你想亲手实现 TickDB 数据的稳定接收

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key,配置环境变量 TICKDB_API_KEY
  3. 复制本文的 WebSocket 代码,结合 systemd/supervisor 部署

如果你想了解更多 TickDB 的数据能力

  • 深度频道(depth):实时订单簿深度,最大 50 档
  • 成交流(trades):逐笔成交,港股和数字货币可用
  • K 线数据:10 年级别历史数据,支持回测

如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,让 AI 帮你生成符合生产级规范的接入代码。


本文不构成任何投资建议。策略运行有风险,请做好充分的风控措施。