"你收到了一条 PagerDuty 告警:延迟超过 500ms。但你不知道是哪一路消息慢了,也不知道是网络问题还是你的协程泄漏了。"

凌晨三点的告警,是每个做实盘系统的开发者最不愿面对的场景。你翻开监控面板,发现只有 CPU 和内存——一个真正量化的系统需要什么指标?WebSocket 连接是否还在心跳?消息从接收到处理的端到端延迟是多少?你的协程池是否正在悄悄耗尽?

可观测性不是运维的附属品。对于量化系统,它是你在黑夜里定位问题的眼睛。


一、量化系统需要什么样的可观测性

传统的服务器监控只看两个指标:CPU 和内存。但这对于量化系统远远不够。

量化系统是 I/O 密集型 + 事件驱动型的混合体。你的系统可能同时维护着数十个 WebSocket 连接,实时处理消息流,背后是一个 asyncio 协程池。如果只监控 CPU,你永远不知道消息处理是否积压;如果只看内存,你也发现不了协程泄漏导致的慢性死亡。

量化系统的可观测性需要覆盖三个层次:

层次 监控目标 核心指标
连接层 WebSocket 健康状态 连接数、断开次数、心跳成功率
处理层 消息流水线 消息延迟(p50/p95/p99)、处理速率、积压队列深度
资源层 系统资源饱和度 协程数量、内存分配速率、GC 频率

这不是为了炫技,而是当你在凌晨三点收到告警时,这些指标能让你在五分钟内定位问题,而不是花两个小时翻日志。


二、Prometheus + Grafana:开源监控的事实标准

Prometheus 是 CNCF 毕业项目,专为云原生和微服务设计。它的核心优势是:

  • Pull 模型:Prometheus 服务主动拉取指标,不依赖客户端开启端口
  • 时序数据库:内置高效的时序存储,支持多维标签
  • 生态系统完善:Grafana 是 Prometheus 的官方推荐可视化层

整个架构如下:

┌─────────────────────────────────────────────────────────────┐
│                        Grafana                              │
│                  (Dashboard 可视化)                         │
└────────────────────────┬────────────────────────────────────┘
                         │ Query (PromQL)
                         ▼
┌─────────────────────────────────────────────────────────────┐
│                       Prometheus                            │
│                  (指标拉取 + 存储)                            │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐          │
│  │ /metrics    │  │ /metrics    │  │ /metrics    │          │
│  │ (instance1) │  │ (instance2) │  │ (instance3) │          │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘          │
└─────────┼────────────────┼────────────────┼─────────────────┘
          │ Pull           │ Pull           │ Pull
          ▼                ▼                ▼
┌─────────────────────────────────────────────────────────────┐
│                    Your Quant System                         │
│  ┌─────────────────────────────────────────────────────┐    │
│  │  Prometheus Client Library                           │    │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐              │    │
│  │  │ Counter  │ │  Gauge   │ │ Histogram│              │    │
│  │  └──────────┘ └──────────┘ └──────────┘              │    │
│  └─────────────────────────────────────────────────────┘    │
│                                                              │
│  WebSocket ──► Message Queue ──► Coroutine Pool ──► Storage  │
└─────────────────────────────────────────────────────────────┘

Prometheus 的指标类型只有四种,理解它们就能覆盖 90% 的监控场景:

类型 用途 典型场景
Counter 只增不减的计数器 请求总数、错误总数
Gauge 可增可减的瞬时值 当前连接数、内存使用量
Histogram 统计分布,直方图 请求延迟、消息大小
Summary 统计分位数 端到端延迟(客户端计算)

对于量化系统,Histogram 是最重要的类型——它能自动计算 p50、p95、p99,让你在不存储原始数据的情况下了解延迟分布。


三、生产级代码:Python Prometheus Client 实战

3.1 安装与初始化

pip install prometheus-client aiohttp
from prometheus_client import Counter, Gauge, Histogram, start_http_server
import os

# ============================================================
# 指标定义区
# ============================================================

# Counter:记录累积发生的事件
WS_CONNECTION_TOTAL = Counter(
    "ws_connections_total",
    "WebSocket 连接总数(包含成功和失败)",
    ["status"]  # labels: success, failure
)

WS_MESSAGES_TOTAL = Counter(
    "ws_messages_total",
    "WebSocket 消息总数",
    ["msg_type"]  # labels: tick, depth, trade
)

# Gauge:记录当前的瞬时值
WS_CONNECTIONS_ACTIVE = Gauge(
    "ws_connections_active",
    "当前活跃的 WebSocket 连接数"
)

COROUTINE_POOL_SIZE = Gauge(
    "coroutine_pool_size",
    "协程池当前大小"
)

COROUTINE_RUNNING = Gauge(
    "coroutine_running",
    "当前正在运行的协程数"
)

# Histogram:记录分布,用于计算延迟分位数
MESSAGE_LATENCY = Histogram(
    "message_processing_latency_seconds",
    "消息从收到到处理完成的延迟(秒)",
    ["msg_type"],
    buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5)
)

3.2 WebSocket 连接管理器(带完整监控埋点)

import asyncio
import aiohttp
import time
import random
from contextlib import asynccontextmanager

class MonitoredWebSocketManager:
    """带 Prometheus 监控的 WebSocket 连接管理器"""
    
    def __init__(self, api_key: str, metrics_port: int = 8000):
        self.api_key = api_key
        self.connections = {}  # symbol -> session
        self.running_tasks = set()
        
        # 启动 Prometheus metrics HTTP 服务
        # ⚠️ 生产环境建议与主服务分离部署
        start_http_server(metrics_port)
        print(f"Prometheus metrics exposed at :{metrics_port}/metrics")
    
    @asynccontextmanager
    async def connect(self, symbol: str, url: str):
        """上下文管理器:自动处理连接生命周期和监控埋点"""
        retry_count = 0
        max_retries = 5
        base_delay = 1.0
        
        while retry_count < max_retries:
            try:
                headers = {"X-API-Key": self.api_key}
                async with aiohttp.ClientSession() as session:
                    # ⚠️ 协程数 +1
                    COROUTINE_RUNNING.inc()
                    COROUTINE_POOL_SIZE.inc()
                    
                    async with session.ws_connect(
                        url,
                        headers=headers,
                        timeout=aiohttp.WSMsgType.TEXT,
                        heartbeat=30  # 30秒心跳保活
                    ) as ws:
                        self.connections[symbol] = ws
                        WS_CONNECTIONS_ACTIVE.inc()
                        WS_CONNECTION_TOTAL.labels(status="success").inc()
                        
                        print(f"[{symbol}] WebSocket connected")
                        
                        try:
                            yield ws
                        finally:
                            # 正常断开时标记
                            self.connections.pop(symbol, None)
                            WS_CONNECTIONS_ACTIVE.dec()
                            # ⚠️ 协程数 -1
                            COROUTINE_RUNNING.dec()
                            COROUTINE_POOL_SIZE.dec()
                            
            except aiohttp.ClientError as e:
                WS_CONNECTION_TOTAL.labels(status="failure").inc()
                retry_count += 1
                
                # 指数退避 + 抖动
                delay = min(base_delay * (2 ** retry_count), 60)
                jitter = random.uniform(0, delay * 0.1)
                wait_time = delay + jitter
                
                print(f"[{symbol}] Connection failed: {e}, "
                      f"retry {retry_count}/{max_retries} in {wait_time:.1f}s")
                await asyncio.sleep(wait_time)
                
            except Exception as e:
                # 未预期异常,打印日志但不重试(防止无限循环)
                print(f"[{symbol}] Unexpected error: {e}")
                raise
        
        raise RuntimeError(f"[{symbol}] Max retries exceeded after {max_retries} attempts")
    
    async def subscribe_depth(self, symbol: str):
        """订阅 depth 频道,实时计算消息延迟"""
        ws_url = f"wss://api.tickdb.ai/ws?api_key={self.api_key}"
        
        async with self.connect(symbol, ws_url) as ws:
            # 发送订阅命令
            await ws.send_json({
                "cmd": "subscribe",
                "channel": "depth",
                "symbol": symbol
            })
            
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    # ========== 核心监控埋点 ==========
                    receive_time = time.perf_counter()
                    
                    try:
                        data = msg.json()
                        
                        # 从消息中提取时间戳计算延迟
                        # 假设服务端在 payload 中附带 server_time
                        if "server_time" in data:
                            latency = receive_time - data["server_time"]
                            MESSAGe_LATENCY.labels(msg_type="depth").observe(latency)
                        
                        WS_MESSAGES_TOTAL.labels(msg_type="depth").inc()
                        
                    except Exception as e:
                        print(f"[{symbol}] Parse error: {e}")
                        continue
                        
                elif msg.type == aiohttp.WSMsgType.PING:
                    await ws.pong()

3.3 协程池健康监控(防止泄漏)

协程泄漏是 asyncio 应用最隐蔽的问题。一个被遗忘的 await 或死循环的协程会悄悄占用资源,直到系统崩溃。

import asyncio
import weakref
from prometheus_client import REGISTRY, CollectorRegistry

class CoroutineLeakDetector:
    """协程生命周期追踪器(用于检测泄漏)"""
    
    def __init__(self):
        # 追踪活跃协程的创建和销毁
        self.created_count = 0
        self.destroyed_count = 0
        self.active_coroutines = weakref.WeakSet()
        
        # 注册自定义收集器
        REGISTRY.register(self)
    
    def track(self, coro):
        """装饰器:追踪协程的生命周期"""
        self.created_count += 1
        self.active_coroutines.add(coro)
        
        async def wrapped():
            try:
                result = await coro
                return result
            finally:
                self.destroyed_count += 1
                self.active_coroutines.discard(coro)
        
        return wrapped()
    
    def collect(self):
        """Prometheus 收集器接口"""
        yield GaugeMetricFamily(
            "quant_coroutines_created_total",
            "历史创建的协程总数",
            value=self.created_count
        )
        yield GaugeMetricFamily(
            "quant_coroutines_active",
            "当前活跃协程数",
            value=len(self.active_coroutines)
        )
        yield GaugeMetricFamily(
            "quant_coroutines_destroyed_total",
            "历史销毁的协程总数",
            value=self.destroyed_count
        )


# 使用示例
detector = CoroutineLeakDetector()

async def background_task(task_id: int):
    """后台任务示例"""
    while True:
        await asyncio.sleep(10)
        print(f"Task {task_id} running")

# 创建任务时自动追踪
async def main():
    tasks = []
    for i in range(10):
        tasks.append(detector.track(background_task(i)))
    
    await asyncio.gather(*tasks)

# 运行后检查:
# curl http://localhost:8000/metrics | grep quant_coroutines
# 如果 active < created - destroyed,说明有协程泄漏

3.4 告警规则配置

# prometheus_rules.yml
groups:
  - name: quant_system_alerts
    rules:
      # WebSocket 连接数异常
      - alert: WebSocketConnectionDrop
        expr: ws_connections_active < 1
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "活跃 WebSocket 连接数为 0"
          description: "持续 1 分钟无活跃连接,请检查网络或 API 服务"
      
      # 消息延迟过高
      - alert: MessageLatencyHigh
        expr: histogram_quantile(0.95, message_processing_latency_seconds) > 0.5
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "消息处理延迟 P95 超过 500ms"
          description: "当前 P95: {{ $value }}s"
      
      # 协程数异常增长
      - alert: CoroutineLeakSuspected
        expr: (quant_coroutines_created_total - quant_coroutines_destroyed_total) > 100
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "协程泄漏可疑"
          description: "未回收协程数超过 100,请检查协程池"

四、核心指标体系

一个完整的量化系统监控面板,至少需要以下指标:

指标类型 指标名 说明 告警阈值
Gauge ws_connections_active 活跃连接数 = 0 时告警
Counter ws_connection_total{status="failure"} 失败连接数(速率) 5分钟内 > 10 次
Histogram message_processing_latency_seconds 消息延迟 p95 > 500ms
Gauge quant_coroutines_active 活跃协程数 持续增长
Counter ws_messages_total 消息吞吐量(速率) 突然跌至 0
Gauge memory_usage_bytes 内存使用量 > 80%
Counter api_errors_total{code="3001"} 限频错误数 持续出现

这些指标覆盖了连接层、处理层和资源层,是量化系统可观测性的最小可行集。


五、Grafana 仪表盘设计

有了 Prometheus 拉取数据,Grafana 只需要写 PromQL 查询即可。

5.1 连接健康面板

# 当前活跃连接数
ws_connections_active

# 连接失败率(5分钟窗口)
rate(ws_connection_total{status="failure"}[5m])

5.2 延迟分布面板

# P50 延迟
histogram_quantile(0.50, rate(message_processing_latency_seconds_bucket[5m]))

# P95 延迟
histogram_quantile(0.95, rate(message_processing_latency_seconds_bucket[5m]))

# P99 延迟
histogram_quantile(0.99, rate(message_processing_latency_seconds_bucket[5m]))

5.3 协程健康面板

# 活跃协程数趋势
quant_coroutines_active

# 协程创建速率
rate(quant_coroutines_created_total[5m])

# 协程销毁速率
rate(quant_coroutines_destroyed_total[5m])

# 如果创建速率 != 销毁速率,存在泄漏风险

Grafana Dashboard 的最佳实践是分层展示:最上方放 SLA 指标(延迟 P99),中间放业务指标(连接数、吞吐量),底部放资源指标(协程、内存)。


六、总结

可观测性不是监控的升级版,而是一种设计哲学。它要求你在设计系统时就考虑"出了问题怎么排查",而不是出了问题再想办法。

对于量化系统,这意味着:

  • 连接层:知道 WebSocket 是否还活着,心跳是否正常
  • 处理层:知道消息从收到到处理的端到端延迟,是否在积压
  • 资源层:知道协程是否在泄漏,内存是否在膨胀

Prometheus + Grafana 是这个体系的事实标准。Python prometheus-client 库提供了开箱即用的指标类型,而 Histogram 类型的 histogram_quantile 函数是计算延迟分位数的利器——不需要存储原始数据,Prometheus 帮你算好。

凌晨三点的告警不再让你手忙脚乱,因为你有眼睛了。


下一步行动

如果你是个人开发者,现在就可以在本地运行这套监控:

# 1. 安装依赖
pip install prometheus-client aiohttp

# 2. 启动 Prometheus(docker)
docker run -d -p 9090:9090 -v $(pwd)/prometheus.yml:/etc/prometheus/prometheus.yml prom/prometheus

# 3. 启动 Grafana(docker)
docker run -d -p 3000:3000 grafana/grafana

如果你希望有一个开箱即用的 TickDB 监控模板,在 GitHub 搜索 TickDB/observability,有社区贡献的 Grafana Dashboard JSON。

如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 prometheus-setup SKILL,可以快速生成指标定义和告警规则。


风险提示:本文不构成任何投资建议。Prometheus 和 Grafana 是第三方开源工具,部署时请注意网络安全配置,不要将 metrics 端口暴露在公网。市场有风险,投资需谨慎。