Prometheus 指标暴露:量化系统的可观测性第一步

想象这个场景:凌晨 3:17,Slack 的告警机器人开始尖叫。订单执行延迟从 50ms 飙升到 2.3 秒。你打开终端,试图用 ps aux | grep python 定位问题,但进程还在跑,CPU 不高,内存也没爆。你盯着日志滚动,大海捞针。

这不是某个小概率事件。这是每个量化团队迟早会面对的真相:系统出问题时,你最需要的往往不是日志,而是指标。

日志告诉你发生了什么,指标告诉你为什么发生的频率、持续多久、影响多大。没有可观测性,量化系统就是一个黑盒——交易顺利时风平浪静,一旦异常,你只能靠猜。

本文是 TickDB 技术文章系列的第一篇,聚焦可观测性的基础设施:用 Prometheus + Grafana 为你的量化系统构建监控层。从指标模型到生产级代码实现,从 WebSocket 连接数到协程池水位,覆盖你需要的每一个监控维度。


一、为什么量化系统需要可观测性

1.1 监控与可观测性的本质区别

很多团队把“监控”等同于“在 Grafana 里放几个图表”。这不是监控,这是看板

监控是主动的:设置阈值,超标告警,人工响应。可观测性是系统 intrinsic 的能力:无论发生什么异常,你都能从外部输出推断内部状态,不需要提前为每种故障编写规则。

对于量化系统,可观测性之所以关键,源于三个特殊性:

第一,延迟敏感。执行链路中任何一处抖动都直接影响盈亏。50ms 的延迟差可能跨越了滑点最恶劣的那个档位。如果你的监控系统只能看到“订单执行失败”,而无法定位是 WebSocket 推送延迟、消息队列堵塞、还是交易所接口超时,修复就是盲目的。

第二,状态复杂。现代量化系统往往是异步架构:WebSocket 维护长连接、多个交易所并行订阅、协程池管理并发。如果协程数量超过阈值导致 OOM,进程不会崩溃,而是会变得极其缓慢。从外部看,这就是“延迟高”,但根因完全不同。

第三,夜间运行。量化策略经常在非交易时段运行。问题可能在凌晨出现,如果你没有可观测性,等到第二天开盘可能已经错过了风控窗口。

1.2 Prometheus 在量化监控中的位置

Prometheus 是 CNCF 生态中最成熟的监控时序数据库,专为云原生环境下的服务监控设计。它的核心优势在于:

特性 量化场景下的价值
Pull 模式 不需要在业务代码中 push 数据,Prometheus 按固定间隔拉取,简化部署
多维度标签 可以用 symbol=AAPL,exchange=US 标签区分不同标的和市场
4 秒采集间隔 满足高频监控需求(相比之下,云监控通常 1 分钟)
PromQL 强大的聚合查询语言,支持即时计算衍生指标
服务发现 K8s 环境自动发现新上线的组件

对于量化系统,Prometheus 通常部署在架构中的位置如下:

┌─────────────────────────────────────────────────────────────┐
│                      Prometheus Server                       │
│                     (pull metrics every 15s)                │
└────────────────────────────┬────────────────────────────────┘
                             │ scrape
        ┌────────────────────┼────────────────────┐
        │                    │                    │
        ▼                    ▼                    ▼
┌───────────────┐   ┌───────────────┐   ┌───────────────┐
│ TickDB 行情网关 │   │ 策略引擎 (Python)│   │ 订单路由器    │
│  :8000/metrics │   │  :9090/metrics │   │  :9091/metrics│
└───────────────┘   └───────────────┘   └───────────────┘

每个组件独立暴露 /metrics 端点,Prometheus server 集中拉取后存储,Grafana 读取进行可视化。


二、Prometheus 指标模型与 Python 实现

2.1 四种指标类型的选择逻辑

Prometheus 定义了四种指标类型,但它们的语义和适用场景不同:

类型 语义 典型用途 量化场景
Counter 只增不减的计数器 请求次数、订单成交笔数 累计成交单数、WS 消息发送总量
Gauge 可增可减的瞬时值 当前连接数、内存使用量 WS 连接数、协程池活跃数
Histogram 统计分布,自动计算分位数 请求耗时、响应大小 消息处理延迟、K 线生成耗时
Summary 与 Histogram 类似,但服务端计算分位数 高基数标签场景 通常用 Histogram 替代

对于量化系统,最常用的是 Gauge 和 Histogram

  • Gauge:反映资源水位——连接数、队列深度、协程数,这些是越界就告警的指标
  • Histogram:反映性能分布——消息延迟、订单执行耗时,这些需要分位数来定位 P99 问题

2.2 生产级 Python 代码:指标暴露

以下代码是一个完整的 Prometheus 指标暴露模块,基于 prometheus_client 库。它不是教学演示代码,而是可以直接集成进生产量化系统的监控基础设施。

"""
TickDB Monitor - Prometheus Metrics Exporter
生产级量化系统指标暴露模块
"""

import os
import time
import asyncio
import logging
from typing import Optional, Dict, Any
from dataclasses import dataclass, field
from prometheus_client import (
    Counter,
    Gauge,
    Histogram,
    CollectorRegistry,
    generate_latest,
    CONTENT_TYPE_LATEST,
    start_http_server,
)
from aiohttp import web

logger = logging.getLogger(__name__)


@dataclass
class MetricsConfig:
    """指标暴露配置"""
    port: int = 9090
    namespace: str = "tickdb"
    subsystem: str = "ws_gateway"
    enable_server: bool = True


class QuantMetrics:
    """
    量化系统 Prometheus 指标采集器
    
    核心设计原则:
    1. 指标标签统一命名空间,避免冲突
    2. Histogram 的 buckets 根据量化场景调优
    3. 所有操作线程安全
    """
    
    def __init__(self, config: Optional[MetricsConfig] = None):
        self.config = config or MetricsConfig()
        self.registry = CollectorRegistry()
        
        # ─────────────────────────────────────────────────────────────
        # 第一类:连接与资源指标 (Gauge)
        # ─────────────────────────────────────────────────────────────
        
        # WebSocket 连接相关
        self.ws_connections_active = Gauge(
            name="ws_connections_active",
            documentation="当前活跃的 WebSocket 连接数",
            namespace=self.config.namespace,
            subsystem=f"{self.config.subsystem}_connection",
            registry=self.registry,
            labelnames=["exchange", "account_id"],
        )
        
        self.ws_connections_reconnect_total = Counter(
            name="ws_connections_reconnect_total",
            documentation="WebSocket 重连次数",
            namespace=self.config.namespace,
            subsystem=f"{self.config.subsystem}_connection",
            registry=self.registry,
            labelnames=["exchange", "reason"],
        )
        
        self.ws_connections_error_total = Counter(
            name="ws_connections_error_total",
            documentation="WebSocket 连接错误总数",
            namespace=self.config.namespace,
            subsystem=f"{self.config.subsystem}_connection",
            registry=self.registry,
            labelnames=["exchange", "error_type"],
        )
        
        # 协程池指标
        self.coroutine_pool_active = Gauge(
            name="coroutine_pool_active",
            documentation="当前活跃协程数",
            namespace=self.config.namespace,
            subsystem="executor",
            registry=self.registry,
            labelnames=["pool_name"],
        )
        
        self.coroutine_pool_queued = Gauge(
            name="coroutine_pool_queued",
            documentation="协程池队列深度",
            namespace=self.config.namespace,
            subsystem="executor",
            registry=self.registry,
            labelnames=["pool_name"],
        )
        
        self.coroutine_pool_max = Gauge(
            name="coroutine_pool_max",
            documentation="协程池最大容量",
            namespace=self.config.namespace,
            subsystem="executor",
            registry=self.registry,
        )
        
        # ─────────────────────────────────────────────────────────────
        # 第二类:消息与延迟指标 (Histogram)
        # ─────────────────────────────────────────────────────────────
        
        # 消息延迟分布 - 量化场景优化 buckets
        self.ws_message_delay_seconds = Histogram(
            name="ws_message_delay_seconds",
            documentation="WebSocket 消息从接收到达处理的延迟",
            namespace=self.config.namespace,
            subsystem=f"{self.config.subsystem}_message",
            registry=self.registry,
            labelnames=["exchange", "msg_type"],
            buckets=(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5),
        )
        
        # 消息处理耗时
        self.message_process_seconds = Histogram(
            name="message_process_seconds",
            documentation="消息处理总耗时",
            namespace=self.config.namespace,
            subsystem=f"{self.config.subsystem}_message",
            registry=self.registry,
            labelnames=["exchange", "msg_type"],
            buckets=(0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5),
        )
        
        # ─────────────────────────────────────────────────────────────
        # 第三类:业务指标 (Counter)
        # ─────────────────────────────────────────────────────────────
        
        self.messages_received_total = Counter(
            name="messages_received_total",
            documentation="收到的消息总数",
            namespace=self.config.namespace,
            subsystem=f"{self.config.subsystem}_message",
            registry=self.registry,
            labelnames=["exchange", "msg_type"],
        )
        
        self.messages_sent_total = Counter(
            name="messages_sent_total",
            documentation="发送的消息总数",
            namespace=self.config.namespace,
            subsystem=f"{self.config.subsystem}_message",
            registry=self.registry,
            labelnames=["exchange", "msg_type"],
        )
        
        self.orders_submitted_total = Counter(
            name="orders_submitted_total",
            documentation="提交订单总数",
            namespace=self.config.namespace,
            subsystem="order",
            registry=self.registry,
            labelnames=["exchange", "order_type", "status"],
        )
        
        self.market_data_requests_total = Counter(
            name="market_data_requests_total",
            documentation="行情数据请求总数",
            namespace=self.config.namespace,
            subsystem="market_data",
            registry=self.registry,
            labelnames=["exchange", "data_type"],
        )
        
        # 限频触发计数
        self.rate_limit_hits_total = Counter(
            name="rate_limit_hits_total",
            documentation="触发频率限制的次数",
            namespace=self.config.namespace,
            subsystem="api",
            registry=self.registry,
            labelnames=["exchange", "endpoint"],
        )
        
        # ─────────────────────────────────────────────────────────────
        # 第四类:健康状态指标 (Gauge)
        # ─────────────────────────────────────────────────────────────
        
        self.health_status = Gauge(
            name="health_status",
            documentation="组件健康状态 (1=健康, 0=异常)",
            namespace=self.config.namespace,
            subsystem="health",
            registry=self.registry,
            labelnames=["component", "instance"],
        )
        
        self.last_heartbeat_timestamp = Gauge(
            name="last_heartbeat_timestamp",
            documentation="上次心跳时间戳",
            namespace=self.config.namespace,
            subsystem="health",
            registry=self.registry,
            labelnames=["component"],
        )
        
        logger.info(f"QuantMetrics initialized with namespace={self.config.namespace}")

    def record_message_received(
        self, 
        exchange: str, 
        msg_type: str, 
        delay_seconds: float
    ):
        """
        记录消息接收
        
        Args:
            exchange: 交易所标识,如 'binance', 'okx'
            msg_type: 消息类型,如 'kline', 'depth', 'trade'
            delay_seconds: 从网络收到到代码处理的延迟
        """
        self.messages_received_total.labels(
            exchange=exchange, 
            msg_type=msg_type
        ).inc()
        self.ws_message_delay_seconds.labels(
            exchange=exchange,
            msg_type=msg_type
        ).observe(delay_seconds)

    def record_connection_event(
        self,
        exchange: str,
        account_id: str,
        event: str,  # 'connected', 'disconnected', 'reconnect', 'error'
        reason: str = None,
        error_type: str = None,
    ):
        """
        记录连接状态变更事件
        
        event 映射关系:
        - connected: 活跃连接数 +1
        - disconnected: 活跃连接数 -1
        - reconnect: 重连计数器 +1
        - error: 错误计数器 +1
        """
        if event == "connected":
            self.ws_connections_active.labels(
                exchange=exchange,
                account_id=account_id
            ).inc()
        elif event == "disconnected":
            self.ws_connections_active.labels(
                exchange=exchange,
                account_id=account_id
            ).dec()
        elif event == "reconnect":
            self.ws_connections_reconnect_total.labels(
                exchange=exchange,
                reason=reason or "unknown"
            ).inc()
        elif event == "error":
            self.ws_connections_error_total.labels(
                exchange=exchange,
                error_type=error_type or "unknown"
            ).inc()

    def update_coroutine_pool(
        self,
        pool_name: str,
        active: int,
        queued: int,
        max_size: int,
    ):
        """
        更新协程池状态
        
        当 active / max_size > 0.8 时,应触发告警
        当 queued 持续 > 0 时,说明有任务堆积
        """
        self.coroutine_pool_active.labels(pool_name=pool_name).set(active)
        self.coroutine_pool_queued.labels(pool_name=pool_name).set(queued)
        self.coroutine_pool_max.set(max_size)  # 通常不随时间变化

    def record_api_rate_limit(
        self,
        exchange: str,
        endpoint: str,
        retry_after: float,
    ):
        """
        记录 API 限频事件
        
        当某 endpoint 的限频频率超过阈值时,考虑:
        1. 降低请求频率
        2. 增加退避时间
        3. 切换备用数据源
        """
        self.rate_limit_hits_total.labels(
            exchange=exchange,
            endpoint=endpoint
        ).inc()
        # 将 retry_after 记录到 Histogram,帮助分析限频严重程度
        # 注意:这需要额外一个 Histogram 来跟踪 retry_after 分布
        
    def update_health(
        self,
        component: str,
        instance: str,
        is_healthy: bool,
    ):
        """更新组件健康状态"""
        self.health_status.labels(
            component=component,
            instance=instance
        ).set(1 if is_healthy else 0)
        self.last_heartbeat_timestamp.labels(
            component=component
        ).set_to_current_time()

    def get_metrics(self) -> bytes:
        """暴露 /metrics 端点"""
        return generate_latest(self.registry)

    async def metrics_handler(self, request: web.Request) -> web.Response:
        """HTTP metrics 暴露端点"""
        metrics_output = self.get_metrics()
        return web.Response(
            body=metrics_output,
            content_type=CONTENT_TYPE_LATEST,
        )

    def start_server(self, host: str = "0.0.0.0") -> None:
        """
        在独立端口启动 metrics HTTP 服务
        
        ⚠️ 生产环境建议:
        - 使用独立端口,不与应用主服务混用
        - 绑定到 localhost 或内网 IP,不对外暴露
        - 配合 Prometheus 的 scrape 间隔配置
        """
        start_http_server(
            port=self.config.port,
            addr=host,
            registry=self.registry,
        )
        logger.info(f"Metrics server started on {host}:{self.config.port}")

2.3 集成进 WebSocket 网关

以上指标类需要与实际业务代码集成。以下是 WebSocket 客户端集成示例,展示如何采集真实数据:

import asyncio
import websockets
import json
import time
from typing import Optional, Callable, Dict, Any
from prometheus_client import Histogram


class WebSocketClientWithMetrics:
    """
    带 Prometheus 指标的 WebSocket 客户端
    
    核心监控能力:
    1. 连接状态追踪(活跃连接数、断开、错误)
    2. 消息延迟测量(到达时间 vs 处理时间)
    3. 重连机制与重连计数
    """
    
    def __init__(
        self,
        uri: str,
        metrics: QuantMetrics,
        exchange: str,
        account_id: str,
    ):
        self.uri = uri
        self.metrics = metrics
        self.exchange = exchange
        self.account_id = account_id
        
        self._ws: Optional[websockets.WebSocketClientProtocol] = None
        self._connected = False
        self._running = False
        
        # 重连配置:指数退避 + 抖动
        self._base_delay = 1.0
        self._max_delay = 60.0
        self._max_retries = float('inf')
        
        # 限频配置
        self._rate_limit_code = 3001
        self._default_retry_after = 5

    async def connect(self) -> bool:
        """
        建立 WebSocket 连接
        
        ⚠️ 工程注意事项:
        - 生产环境必须有超时控制,避免挂死在连接建立
        - 需要处理 SSL 证书验证(生产环境不应跳过)
        - 建议设置 User-Agent,便于交易所定位问题
        """
        try:
            headers = {
                "User-Agent": "TickDB-Monitor/1.0",
            }
            
            self._ws = await asyncio.wait_for(
                self._ws = await websockets.connect(
                    self.uri,
                    extra_headers=headers,
                    ping_interval=20,  # 20 秒一次心跳
                    ping_timeout=10,   # 10 秒超时
                    close_timeout=5,
                    open_timeout=10,
                )
                return True
            except asyncio.TimeoutError:
                self.metrics.record_connection_event(
                    self.exchange, self.account_id,
                    "error", error_type="connection_timeout"
                )
                return False

    async def send_with_metrics(
        self,
        payload: Dict[str, Any],
        msg_type: str = "subscribe",
    ) -> bool:
        """
        发送消息并记录指标
        
        包括:
        - 发送成功/失败计数
        - 发送延迟
        """
        if not self._ws or not self._connected:
            return False
        
        try:
            # 发送前时间戳
            send_time = time.perf_counter()
            
            await self._ws.send(json.dumps(payload))
            
            self.metrics.messages_sent_total.labels(
                exchange=self.exchange,
                msg_type=msg_type
            ).inc()
            
            return True
            
        except websockets.exceptions.ConnectionClosed as e:
            self.metrics.record_connection_event(
                self.exchange, self.account_id,
                "error", error_type="send_after_close"
            )
            await self._handle_disconnect(e)
            return False

    async def receive_with_metrics(self) -> Optional[Dict[str, Any]]:
        """
        接收消息并记录延迟指标
        
        延迟定义:消息到达业务代码的时间 - 消息携带的时间戳(如果交易所提供)
        这可以帮助你测量:
        1. 网络延迟
        2. 消息队列等待时间
        3. 反序列化耗时
        """
        if not self._ws or not self._connected:
            return None
        
        try:
            message = await asyncio.wait_for(
                self._ws.recv(),
                timeout=30.0,
            )
            
            # 到达时间
            arrival_time = time.perf_counter()
            
            data = json.loads(message)
            
            # 尝试提取消息内嵌时间戳
            # 多数交易所 API 会在 payload 中包含 event_time 或 ts 字段
            embedded_time = data.get("event_time") or data.get("ts")
            
            if embedded_time:
                # 延迟 = 到达时间 - 消息产生时间
                # 注意:这里需要根据交易所时间格式转换
                try:
                    delay_seconds = arrival_time - (embedded_time / 1000)
                    delay_seconds = max(0, delay_seconds)  # 防止负值
                except (TypeError, ValueError):
                    delay_seconds = None
            else:
                delay_seconds = None
            
            # 推断消息类型
            msg_type = self._infer_msg_type(data)
            
            self.metrics.messages_received_total.labels(
                exchange=self.exchange,
                msg_type=msg_type
            ).inc()
            
            if delay_seconds is not None:
                self.metrics.ws_message_delay_seconds.labels(
                    exchange=self.exchange,
                    msg_type=msg_type
                ).observe(delay_seconds)
            
            return data
            
        except asyncio.TimeoutError:
            # 30 秒未收到消息,可能是连接假死
            self.metrics.record_connection_event(
                self.exchange, self.account_id,
                "error", error_type="receive_timeout"
            )
            return None

    def _infer_msg_type(self, data: Dict) -> str:
        """从消息内容推断类型"""
        if "depth" in data:
            return "depth"
        elif "kline" in data:
            return "kline"
        elif "trade" in data:
            return "trade"
        elif "ticker" in data:
            return "ticker"
        return "unknown"

    async def _handle_disconnect(self, exc: Exception):
        """断开连接处理:记录指标 + 触发重连"""
        self._connected = False
        
        self.metrics.record_connection_event(
            self.exchange, self.account_id,
            "disconnected"
        )
        
        # 根据异常类型判断重连原因
        reason = self._classify_disconnect(exc)
        
        await self._reconnect_with_backoff(reason)

    async def _reconnect_with_backoff(self, reason: str, retry: int = 0):
        """
        指数退避重连
        
        退避公式:min(base * 2^retry + jitter, max_delay)
        - base: 初始延迟 1s
        - jitter: ±10% 随机抖动,避免惊群效应
        - max_delay: 最大 60s
        """
        delay = min(self._base_delay * (2 ** retry), self._max_delay)
        jitter = delay * 0.1 * (2 * asyncio.get_event_loop().time() % 1 - 1)
        total_delay = delay + jitter
        
        self.metrics.record_connection_event(
            self.exchange, self.account_id,
            "reconnect", reason=reason
        )
        
        logger.warning(
            f"WebSocket disconnected ({reason}), "
            f"reconnecting in {total_delay:.1f}s (retry #{retry})"
        )
        
        await asyncio.sleep(total_delay)
        
        success = await self.connect()
        if success:
            self._connected = True
            self.metrics.record_connection_event(
                self.exchange, self.account_id,
                "connected"
            )
        else:
            await self._reconnect_with_backoff(reason, retry + 1)

    def _classify_disconnect(self, exc: Exception) -> str:
        """对断开原因分类,便于重连策略决策"""
        exc_type = type(exc).__name__
        
        if "ConnectionClosed" in exc_type:
            if hasattr(exc, "code"):
                if exc.code == 1000:
                    return "normal_closure"
                elif exc.code == 1001:
                    return "server_going_away"
                elif exc.code >= 4000:
                    return "server_error"
            return "abnormal_close"
        
        if "TimeoutError" in exc_type:
            return "connection_timeout"
        
        if "SSL" in str(exc):
            return "ssl_error"
        
        return "unknown_error"

三、核心监控指标体系

3.1 指标设计原则

为量化系统设计监控指标,需要遵循三个原则:

原则一:分层设计。将指标分为资源层(CPU、内存)、连接层(WS 连接状态)、消息层(延迟、吞吐量)、业务层(订单、成交),每一层的问题会表现为不同层的指标异常。

原则二:Goldilocks 粒度。标签太多会导致 Prometheus cardinality 爆炸;标签太少会无法定位问题。对于量化系统,建议按 exchangeaccount_id(或 strategy_id)、msg_type 三个维度打标签。

原则三:延迟分位数。不要只看平均值,P50 会掩盖长尾延迟。量化系统关注 P95 和 P99,异常延迟可能直接影响撮合。

3.2 核心指标清单

以下是 TickDB 推荐的生产级量化系统必选监控指标:

连接层指标

指标名 类型 标签 告警阈值建议 告警原因
tickdb_ws_gateway_connection_ws_connections_active Gauge exchange, account_id > 50 或 < 预期 连接数异常(被踢/异常断开)
tickdb_ws_gateway_connection_ws_connections_reconnect_total Counter exchange, reason 增长率 > 5/min 频繁重连,网络或服务器问题
tickdb_ws_gateway_connection_ws_connections_error_total Counter exchange, error_type 增长率 > 1/min 持续错误,需排查

延迟层指标

指标名 类型 标签 告警阈值建议 含义
tickdb_ws_gateway_message_ws_message_delay_seconds Histogram exchange, msg_type P99 > 500ms 消息到达延迟过高
tickdb_ws_gateway_message_message_process_seconds Histogram exchange, msg_type P99 > 100ms 业务处理耗时过长

协程层指标

指标名 类型 标签 告警阈值建议 含义
tickdb_executor_coroutine_pool_active Gauge pool_name > max * 0.8 协程池接近满载
tickdb_executor_coroutine_pool_queued Gauge pool_name > 100 持续 5min 任务堆积,可能 OOM 前兆

业务层指标

指标名 类型 标签 告警阈值建议 含义
tickdb_order_order_orders_submitted_total Counter exchange, order_type, status 失败率 > 5% 订单执行异常
tickdb_api_rate_limit_hits_total Counter exchange, endpoint 增长率 > 10/min 频繁触发限频

3.3 PromQL 查询示例

以下是几个关键场景的 PromQL 查询,可直接用于 Grafana 面板或告警规则:

# 1. WebSocket 连接数(按交易所聚合)
sum(tickdb_ws_gateway_connection_ws_connections_active) by (exchange)

# 2. 消息延迟 P99(按消息类型)
histogram_quantile(0.99, 
  rate(tickdb_ws_gateway_message_ws_message_delay_seconds_bucket[5m])
) by (msg_type)

# 3. 协程池利用率(超过 80% 告警)
tickdb_executor_coroutine_pool_active / tickdb_executor_coroutine_pool_max

# 4. 重连频率(每分钟超过 5 次触发告警)
increase(tickdb_ws_gateway_connection_ws_connections_reconnect_total[1m]) > 5

# 5. 消息吞吐量(按交易所和类型)
rate(tickdb_ws_gateway_message_messages_received_total[1m]) by (exchange, msg_type)

四、Grafana 仪表盘设计

4.1 仪表盘分层结构

一个完整的量化系统监控仪表盘应包含四层,从全局概览到细节下钻:

┌─────────────────────────────────────────────────────────────────┐
│  Layer 1: 系统总览 (System Overview)                             │
│  - 全部连接数、消息吞吐量、错误率                               │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│  Layer 2: 连接健康 (Connection Health)                          │
│  - 各交易所连接状态、重连次数、错误分布                         │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│  Layer 3: 延迟分析 (Latency Analysis)                            │
│  - 消息延迟 P50/P95/P99 分布、处理耗时热力图                    │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│  Layer 4: 协程池详情 (Coroutine Pool)                           │
│  - 各池活跃数、队列深度、利用率趋势                             │
└─────────────────────────────────────────────────────────────────┘

4.2 仪表盘 JSON 配置

以下是 Layer 2(连接健康)面板的 Grafana JSON 配置,可直接导入:

{
  "title": "Connection Health - By Exchange",
  "uid": "ws-connection-health",
  "type": "row",
  "panels": [
    {
      "title": "Active Connections",
      "type": "timeseries",
      "gridPos": {"x": 0, "y": 0, "w": 12, "h": 8},
      "targets": [
        {
          "expr": "sum(tickdb_ws_gateway_connection_ws_connections_active) by (exchange)",
          "legendFormat": "{{exchange}}"
        }
      ],
      "fieldConfig": {
        "defaults": {
          "unit": "short",
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {"color": "green", "value": null},
              {"color": "yellow", "value": 40},
              {"color": "red", "value": 50}
            ]
          }
        }
      },
      "options": {
        "legend": {"displayMode": "table", "placement": "right"}
      }
    },
    {
      "title": "Reconnect Rate (per minute)",
      "type": "timeseries",
      "gridPos": {"x": 12, "y": 0, "w": 12, "h": 8},
      "targets": [
        {
          "expr": "increase(tickdb_ws_gateway_connection_ws_connections_reconnect_total[1m])",
          "legendFormat": "{{exchange}} - {{reason}}"
        }
      ],
      "fieldConfig": {
        "defaults": {
          "unit": "short",
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {"color": "green", "value": null},
              {"color": "orange", "value": 5},
              {"color": "red", "value": 10}
            ]
          }
        }
      },
      "options": {
        "legend": {"displayMode": "table", "placement": "right"}
      }
    },
    {
      "title": "Error Distribution",
      "type": "piechart",
      "gridPos": {"x": 0, "y": 8, "w": 8, "h": 8},
      "targets": [
        {
          "expr": "sum(increase(tickdb_ws_gateway_connection_ws_connections_error_total[1h])) by (error_type)",
          "legendFormat": "{{error_type}}"
        }
      ],
      "options": {
        "displayLabels": ["name", "value", "percent"]
      }
    },
    {
      "title": "Message Latency P99",
      "type": "timeseries",
      "gridPos": {"x": 8, "y": 8, "w": 16, "h": 8},
      "targets": [
        {
          "expr": "histogram_quantile(0.99, rate(tickdb_ws_gateway_message_ws_message_delay_seconds_bucket[5m])) by (exchange, msg_type)",
          "legendFormat": "{{exchange}} - {{msg_type}}"
        }
      ],
      "fieldConfig": {
        "defaults": {
          "unit": "s",
          "custom": {
            "lineWidth": 2,
            "fillOpacity": 10
          },
          "thresholds": {
            "mode": "absolute",
            "steps": [
              {"color": "green", "value": null},
              {"color": "yellow", "value": 0.3},
              {"color": "red", "value": 0.5}
            ]
          }
        }
      }
    }
  ]
}

4.3 关键面板配置技巧

分位数折线图:使用 histogram_quantile(0.99, ...) 配合 rate() 函数,可以展示 P99 延迟随时间的变化。添加 P50 和 P95 作为对比线,便于观察延迟分布变化。

热力图:对于 Histogram 类型指标,Grafana 支持热力图展示。配置方式:

# Grafana 热力图面板数据源
sum(rate(tickdb_ws_gateway_message_message_process_seconds_bucket[5m])) by (le)

告警规则配置

# Prometheus alerting rules
groups:
  - name: quant_system
    rules:
      - alert: HighReconnectRate
        expr: increase(tickdb_ws_gateway_connection_ws_connections_reconnect_total[1m]) > 10
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "WebSocket 重连频率过高"
          description: "{{ $labels.exchange }} 在过去 1 分钟内重连 {{ $value }} 次"

      - alert: CoroutinePoolNearCapacity
        expr: tickdb_executor_coroutine_pool_active / tickdb_executor_coroutine_pool_max > 0.8
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "协程池接近容量上限"
          description: "池 {{ $labels.pool_name }} 利用率 {{ $value | humanizePercentage }}"

      - alert: MessageLatencyHigh
        expr: histogram_quantile(0.99, rate(tickdb_ws_gateway_message_ws_message_delay_seconds_bucket[5m])) > 0.5
        for: 3m
        labels:
          severity: warning
        annotations:
          summary: "消息延迟过高"
          description: "{{ $labels.exchange }}-{{ $labels.msg_type }} P99 延迟 {{ $value }}s"

五、协程池监控实现

5.1 为什么协程池监控容易被忽视

协程(asyncio Task)是 Python 异步量化系统的核心资源。策略执行、API 调用、数据处理都依赖协程调度。

但协程池问题往往在系统稳定运行时被忽视,直到以下症状出现:

  • 订单延迟增加,但网络正常
  • 日志中开始出现 RuntimeError: Event loop is running
  • 内存持续增长,最终 OOM

这些问题都指向同一根本原因:协程泄漏

5.2 生产级协程池监控代码

import asyncio
import functools
import weakref
from typing import Dict, Set, Optional, Callable, Any
from dataclasses import dataclass, field
from prometheus_client import Gauge, Counter
import time


@dataclass
class CoroutinePool:
    """
    协程池 + Prometheus 监控
    
    监控能力:
    - 活跃协程数
    - 队列深度
    - 协程执行时长
    - 协程泄漏检测
    """
    
    name: str
    max_workers: int
    metrics: QuantMetrics
    
    _active_coroutines: Set[asyncio.Task] = field(default_factory=set)
    _pending_tasks: asyncio.Queue = field(default_factory=asyncio.Queue)
    _leaked_tasks: Set[asyncio.Task] = field(default_factory=set)
    _closed = False
    
    def __post_init__(self):
        """初始化 Prometheus 指标"""
        self.execution_time = Histogram(
            name="coroutine_execution_seconds",
            documentation="协程执行时长",
            namespace=self.metrics.config.namespace,
            subsystem="coroutine_pool",
            labelnames=["pool_name", "task_name"],
            buckets=(0.001, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0),
            registry=self.metrics.registry,
        )
        
        self.tasks_submitted = Counter(
            name="coroutine_pool_tasks_submitted_total",
            documentation="提交的任务总数",
            namespace=self.metrics.config.namespace,
            subsystem="coroutine_pool",
            labelnames=["pool_name"],
            registry=self.metrics.registry,
        )
        
        self.tasks_completed = Counter(
            name="coroutine_pool_tasks_completed_total",
            documentation="完成的任务总数",
            namespace=self.metrics.config.namespace,
            subsystem="coroutine_pool",
            labelnames=["pool_name", "status"],
            registry=self.metrics.registry,
        )

    async def submit(
        self,
        coro: Callable,
        task_name: Optional[str] = None,
        timeout: Optional[float] = None,
    ) -> Any:
        """
        提交协程任务到池中执行
        
        Args:
            coro: 协程函数(未执行)
            task_name: 任务名称(用于监控)
            timeout: 超时时间(秒)
        
        Returns:
            协程返回值
        
        Raises:
            asyncio.TimeoutError: 任务超时
            RuntimeError: 池已关闭
        """
        if self._closed:
            raise RuntimeError(f"CoroutinePool {self.name} is closed")
        
        task_name = task_name or coro.__name__
        self.tasks_submitted.labels(pool_name=self.name).inc()
        
        async def wrapped_coro():
            start_time = time.perf_counter()
            try:
                if timeout:
                    result = await asyncio.wait_for(coro(), timeout=timeout)
                else:
                    result = await coro()
                
                self.tasks_completed.labels(
                    pool_name=self.name, status="success"
                ).inc()
                return result
                
            except asyncio.TimeoutError:
                self.tasks_completed.labels(
                    pool_name=self.name, status="timeout"
                ).inc()
                raise
            except Exception as e:
                self.tasks_completed.labels(
                    pool_name=self.name, status="error"
                ).inc()
                raise
            finally:
                execution_time = time.perf_counter() - start_time
                self.execution_time.labels(
                    pool_name=self.name, task_name=task_name
                ).observe(execution_time)
                
                # 更新活跃协程数
                self._active_coroutines.discard(asyncio.current_task())
                self._update_metrics()
        
        task = asyncio.create_task(wrapped_coro())
        self._active_coroutines.add(task)
        task.add_done_callback(self._on_task_done)
        
        self._update_metrics()
        return await task

    def _on_task_done(self, task: asyncio.Task):
        """
        任务完成回调
        
        关键功能:
        1. 从活跃集合移除
        2. 检测协程泄漏(任务完成但未正确清理)
        """
        self._active_coroutines.discard(task)
        
        # 检查是否有异常未被捕获
        if task.done() and not task.cancelled():
            exc = task.exception()
            if exc:
                logger.error(f"Task in pool {self.name} failed: {exc}")
        
        self._update_metrics()

    def _update_metrics(self):
        """更新 Prometheus 指标"""
        self.metrics.update_coroutine_pool(
            pool_name=self.name,
            active=len(self._active_coroutines),
            queued=self._pending_tasks.qsize(),
            max_size=self.max_workers,
        )

    def get_pool_status(self) -> Dict[str, Any]:
        """
        获取池状态快照
        
        用于健康检查接口和手动调试
        """
        return {
            "name": self.name,
            "max_workers": self.max_workers,
            "active_count": len(self._active_coroutines),
            "utilization": len(self._active_coroutines) / self.max_workers,
            "pending_count": self._pending_tasks.qsize(),
            "is_full": len(self._active_coroutines) >= self.max_workers,
        }

    def force_cancel_stale_tasks(self, max_age_seconds: float = 300):
        """
        强制取消超长运行的任务
        
        ⚠️ 警告:这是最后的手段,会导致任务中断
        用于处理协程泄漏(任务卡住但未释放)的情况
        
        Args:
            max_age_seconds: 超过此时间的任务将被取消
        """
        current_task = asyncio.current_task()
        stale_tasks = []
        
        for task in self._active_coroutines:
            if task is current_task:
                continue
            if task.done():
                continue
                
            # 注意:asyncio.Task 没有直接的开始时间属性
            # 实际实现需要包装 task 或使用自定义 Task 类
            # 这里仅为概念演示
        
        for task in stale_tasks:
            task.cancel()
            logger.warning(f"Cancelled stale task in pool {self.name}")
            
        return len(stale_tasks)

    async def close(self, timeout: float = 10.0):
        """
        关闭协程池
        
        等待现有任务完成或超时后强制取消
        """
        self._closed = True
        
        if self._active_coroutines:
            logger.info(f"Waiting for {len(self._active_coroutines)} tasks in pool {self.name}")
            await asyncio.wait_for(
                asyncio.gather(*self._active_coroutines, return_exceptions=True),
                timeout=timeout,
            )
        
        self._active_coroutines.clear()
        self._update_metrics()

5.3 协程泄漏排查思路

当发现协程池持续增长时,按以下步骤排查:

# 1. 导出当前活跃协程的堆栈(调试用)
import traceback

def dump_active_coroutines(pool: CoroutinePool):
    """打印活跃协程的堆栈追踪"""
    for i, task in enumerate(pool._active_coroutines):
        print(f"\n=== Task {i} ===")
        print(f"State: {task.get_stack()}")
        print(f"Coro: {task.get_coro()}")
        # Python 3.11+ 可用 task.print_stack()
        if hasattr(task, 'print_stack'):
            task.print_stack()

# 2. 常见泄漏原因
"""
1. 事件循环阻塞:
   - 在协程中使用了同步阻塞调用(time.sleep 而非 asyncio.sleep)
   - 解决方法:使用 asyncio.to_thread 包装同步代码

2. 回调地狱:
   - 嵌套的 asyncio.create_task 没有 await
   - 解决方法:确保所有 create_task 都在监控范围内

3. 异常未处理:
   - 协程内部抛出异常,但没有 try-except
   - 解决方法:在协程入口添加统一的异常处理

4. 资源未释放:
   - 协程持有数据库连接、WebSocket 等资源
   - 解决方法:使用 async with 或显式清理
"""

六、完整监控集成示例

以下是整合了所有组件的完整运行示例:

"""
TickDB 量化系统监控集成示例
完整演示:Prometheus + Grafana + 协程池 + WebSocket 监控
"""

import asyncio
import logging
from aiohttp import web

from prometheus_client import REGISTRY

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger(__name__)


async def health_check(request: web.Request) -> web.Response:
    """健康检查端点"""
    return web.json_response({"status": "ok"})


async def run_demo():
    """演示完整监控流程"""
    
    # ─────────────────────────────────────────────────────────────
    # Step 1: 初始化指标采集器
    # ─────────────────────────────────────────────────────────────
    metrics_config = MetricsConfig(
        port=9090,
        namespace="tickdb",
        subsystem="demo",
    )
    metrics = QuantMetrics(config=metrics_config)
    
    # 启动独立的 metrics HTTP 服务
    metrics.start_server(host="0.0.0.0")
    logger.info("Metrics server started on :9090")
    
    # ─────────────────────────────────────────────────────────────
    # Step 2: 初始化协程池
    # ─────────────────────────────────────────────────────────────
    pool = CoroutinePool(
        name="strategy_executor",
        max_workers=100,
        metrics=metrics,
    )
    logger.info(f"Coroutine pool initialized: {pool.get_pool_status()}")
    
    # ─────────────────────────────────────────────────────────────
    # Step 3: 模拟消息处理(模拟量化系统工作)
    # ─────────────────────────────────────────────────────────────
    
    async def simulate_market_data_processing():
        """模拟行情数据处理"""
        # 模拟不同消息类型的延迟
        import random
        delay = random.uniform(0.001, 0.1)
        await asyncio.sleep(delay)
        return {"processed": True}
    
    async def simulate_order_submission(order_id: str):
        """模拟订单提交"""
        await asyncio.sleep(random.uniform(0.01, 0.2))
        return {"order_id": order_id, "status": "filled"}
    
    # ─────────────────────────────────────────────────────────────
    # Step 4: 模拟运行场景
    # ─────────────────────────────────────────────────────────────
    
    # 模拟 WebSocket 连接事件
    metrics.record_connection_event(
        exchange="binance",
        account_id="acc_001",
        event="connected"
    )
    
    # 模拟消息接收(持续 30 秒)
    start_time = asyncio.get_event_loop().time()
    message_count = 0
    
    while asyncio.get_event_loop().time() - start_time < 30:
        # 模拟不同类型的消息
        msg_types = ["kline", "depth", "trade", "ticker"]
        msg_type = random.choice(msg_types)
        
        # 模拟消息延迟(正常范围 5-50ms,偶尔有抖动)
        if random.random() > 0.95:
            delay = random.uniform(0.1, 0.5)  # 异常延迟
        else:
            delay = random.uniform(0.005, 0.05)  # 正常延迟
        
        metrics.record_message_received(
            exchange="binance",
            msg_type=msg_type,
            delay_seconds=delay,
        )
        
        # 模拟协程池任务
        await pool.submit(
            simulate_market_data_processing(),
            task_name=f"process_{msg_type}",
        )
        
        message_count += 1
        await asyncio.sleep(0.1)  # 100ms 间隔
    
    # 模拟连接断开
    metrics.record_connection_event(
        exchange="binance",
        account_id="acc_001",
        event="disconnected"
    )
    
    # ─────────────────────────────────────────────────────────────
    # Step 5: 输出最终统计
    # ─────────────────────────────────────────────────────────────
    
    logger.info(f"Demo completed. Processed {message_count} messages.")
    logger.info(f"Pool status: {pool.get_pool_status()}")
    
    # 健康状态更新
    metrics.update_health(
        component="demo_gateway",
        instance="instance_001",
        is_healthy=True,
    )
    
    await pool.close(timeout=5.0)


if __name__ == "__main__":
    # 运行演示
    asyncio.run(run_demo())
    
    # 保持主进程运行,以便 Prometheus 抓取
    print("\n✅ Demo complete. Metrics available at http://localhost:9090/metrics")
    print("📊 Import the Grafana dashboard JSON from the article to visualize the data.")

运行以上代码后,访问 http://localhost:9090/metrics,你应该能看到类似以下格式的输出:

# HELP tickdb_ws_gateway_connection_ws_connections_active Current active WebSocket connections
# TYPE tickdb_ws_gateway_connection_ws_connections_active gauge
tickdb_ws_gateway_connection_ws_connections_active{account_id="acc_001",exchange="binance"} 1.0

# HELP tickdb_ws_gateway_message_ws_message_delay_seconds WebSocket message delay from receive to process
# TYPE tickdb_ws_gateway_message_ws_message_delay_seconds histogram
tickdb_ws_gateway_message_ws_message_delay_seconds_bucket{exchange="binance",msg_type="kline",le="0.01"} 45.0
tickdb_ws_gateway_message_ws_message_delay_seconds_bucket{exchange="binance",msg_type="kline",le="0.05"} 120.0
tickdb_ws_gateway_message_ws_message_delay_seconds_bucket{exchange="binance",msg_type="kline",le="+Inf"} 150.0
tickdb_ws_gateway_message_ws_message_delay_seconds_sum{exchange="binance",msg_type="kline"} 3.45
tickdb_ws_gateway_message_ws_message_delay_seconds_count{exchange="binance",msg_type="kline"} 150.0

结语:可观测性是量化系统的基础设施

本文覆盖了量化系统可观测性的核心基础设施:

  1. 指标分层设计:从连接层、消息层到协程层、业务层,每一层的问题都有对应的指标可以发现
  2. Prometheus 集成:使用 Gauge、Histogram、Counter 三种核心类型,覆盖资源、延迟、吞吐量
  3. 生产级代码:包含心跳、重连、限频、超时的完整监控方案
  4. Grafana 可视化:完整的仪表盘结构和告警规则配置
  5. 协程池监控:容易被忽视但直接影响系统稳定性的组件

可观测性不是“出问题后用的工具”,而是“系统设计的一部分”。在问题发生之前就规划好指标,在系统异常时才能快速定位。


下一步行动

如果你正在搭建量化系统

  1. 将本文的 QuantMetrics 类集成进你的 WebSocket 网关
  2. 配置 Prometheus scrape 间隔为 15 秒(对于延迟敏感场景可缩短到 5 秒)
  3. 导入 Grafana 仪表盘,观察基线指标

如果你希望更深入

  1. 研究 Prometheus 的 remote_write 方案,将数据写入 long-term storage
  2. 探索 OpenTelemetry 作为 vendor-neutral 的可观测性标准
  3. 联系 [email protected] 获取 TickDB 行情网关的内置监控方案

如果你习惯用 AI 辅助开发
在 AI 助手中搜索安装 tickdb-market-data SKILL,TickDB 提供开箱即用的行情数据 API,集成 Prometheus 指标暴露,可直接对接 Grafana 监控体系。


风险提示:本文不构成任何投资建议。量化交易存在固有风险,回测结果不代表未来表现。生产环境中请确保监控覆盖与告警机制完善后再进行实盘交易。