Prometheus 指标暴露:量化系统的可观测性第一步
想象这个场景:凌晨 3:17,Slack 的告警机器人开始尖叫。订单执行延迟从 50ms 飙升到 2.3 秒。你打开终端,试图用 ps aux | grep python 定位问题,但进程还在跑,CPU 不高,内存也没爆。你盯着日志滚动,大海捞针。
这不是某个小概率事件。这是每个量化团队迟早会面对的真相:系统出问题时,你最需要的往往不是日志,而是指标。
日志告诉你发生了什么,指标告诉你为什么发生的频率、持续多久、影响多大。没有可观测性,量化系统就是一个黑盒——交易顺利时风平浪静,一旦异常,你只能靠猜。
本文是 TickDB 技术文章系列的第一篇,聚焦可观测性的基础设施:用 Prometheus + Grafana 为你的量化系统构建监控层。从指标模型到生产级代码实现,从 WebSocket 连接数到协程池水位,覆盖你需要的每一个监控维度。
一、为什么量化系统需要可观测性
1.1 监控与可观测性的本质区别
很多团队把“监控”等同于“在 Grafana 里放几个图表”。这不是监控,这是看板。
监控是主动的:设置阈值,超标告警,人工响应。可观测性是系统 intrinsic 的能力:无论发生什么异常,你都能从外部输出推断内部状态,不需要提前为每种故障编写规则。
对于量化系统,可观测性之所以关键,源于三个特殊性:
第一,延迟敏感。执行链路中任何一处抖动都直接影响盈亏。50ms 的延迟差可能跨越了滑点最恶劣的那个档位。如果你的监控系统只能看到“订单执行失败”,而无法定位是 WebSocket 推送延迟、消息队列堵塞、还是交易所接口超时,修复就是盲目的。
第二,状态复杂。现代量化系统往往是异步架构:WebSocket 维护长连接、多个交易所并行订阅、协程池管理并发。如果协程数量超过阈值导致 OOM,进程不会崩溃,而是会变得极其缓慢。从外部看,这就是“延迟高”,但根因完全不同。
第三,夜间运行。量化策略经常在非交易时段运行。问题可能在凌晨出现,如果你没有可观测性,等到第二天开盘可能已经错过了风控窗口。
1.2 Prometheus 在量化监控中的位置
Prometheus 是 CNCF 生态中最成熟的监控时序数据库,专为云原生环境下的服务监控设计。它的核心优势在于:
| 特性 | 量化场景下的价值 |
|---|---|
| Pull 模式 | 不需要在业务代码中 push 数据,Prometheus 按固定间隔拉取,简化部署 |
| 多维度标签 | 可以用 symbol=AAPL,exchange=US 标签区分不同标的和市场 |
| 4 秒采集间隔 | 满足高频监控需求(相比之下,云监控通常 1 分钟) |
| PromQL | 强大的聚合查询语言,支持即时计算衍生指标 |
| 服务发现 | K8s 环境自动发现新上线的组件 |
对于量化系统,Prometheus 通常部署在架构中的位置如下:
┌─────────────────────────────────────────────────────────────┐
│ Prometheus Server │
│ (pull metrics every 15s) │
└────────────────────────────┬────────────────────────────────┘
│ scrape
┌────────────────────┼────────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ TickDB 行情网关 │ │ 策略引擎 (Python)│ │ 订单路由器 │
│ :8000/metrics │ │ :9090/metrics │ │ :9091/metrics│
└───────────────┘ └───────────────┘ └───────────────┘
每个组件独立暴露 /metrics 端点,Prometheus server 集中拉取后存储,Grafana 读取进行可视化。
二、Prometheus 指标模型与 Python 实现
2.1 四种指标类型的选择逻辑
Prometheus 定义了四种指标类型,但它们的语义和适用场景不同:
| 类型 | 语义 | 典型用途 | 量化场景 |
|---|---|---|---|
| Counter | 只增不减的计数器 | 请求次数、订单成交笔数 | 累计成交单数、WS 消息发送总量 |
| Gauge | 可增可减的瞬时值 | 当前连接数、内存使用量 | WS 连接数、协程池活跃数 |
| Histogram | 统计分布,自动计算分位数 | 请求耗时、响应大小 | 消息处理延迟、K 线生成耗时 |
| Summary | 与 Histogram 类似,但服务端计算分位数 | 高基数标签场景 | 通常用 Histogram 替代 |
对于量化系统,最常用的是 Gauge 和 Histogram:
- Gauge:反映资源水位——连接数、队列深度、协程数,这些是越界就告警的指标
- Histogram:反映性能分布——消息延迟、订单执行耗时,这些需要分位数来定位 P99 问题
2.2 生产级 Python 代码:指标暴露
以下代码是一个完整的 Prometheus 指标暴露模块,基于 prometheus_client 库。它不是教学演示代码,而是可以直接集成进生产量化系统的监控基础设施。
"""
TickDB Monitor - Prometheus Metrics Exporter
生产级量化系统指标暴露模块
"""
import os
import time
import asyncio
import logging
from typing import Optional, Dict, Any
from dataclasses import dataclass, field
from prometheus_client import (
Counter,
Gauge,
Histogram,
CollectorRegistry,
generate_latest,
CONTENT_TYPE_LATEST,
start_http_server,
)
from aiohttp import web
logger = logging.getLogger(__name__)
@dataclass
class MetricsConfig:
"""指标暴露配置"""
port: int = 9090
namespace: str = "tickdb"
subsystem: str = "ws_gateway"
enable_server: bool = True
class QuantMetrics:
"""
量化系统 Prometheus 指标采集器
核心设计原则:
1. 指标标签统一命名空间,避免冲突
2. Histogram 的 buckets 根据量化场景调优
3. 所有操作线程安全
"""
def __init__(self, config: Optional[MetricsConfig] = None):
self.config = config or MetricsConfig()
self.registry = CollectorRegistry()
# ─────────────────────────────────────────────────────────────
# 第一类:连接与资源指标 (Gauge)
# ─────────────────────────────────────────────────────────────
# WebSocket 连接相关
self.ws_connections_active = Gauge(
name="ws_connections_active",
documentation="当前活跃的 WebSocket 连接数",
namespace=self.config.namespace,
subsystem=f"{self.config.subsystem}_connection",
registry=self.registry,
labelnames=["exchange", "account_id"],
)
self.ws_connections_reconnect_total = Counter(
name="ws_connections_reconnect_total",
documentation="WebSocket 重连次数",
namespace=self.config.namespace,
subsystem=f"{self.config.subsystem}_connection",
registry=self.registry,
labelnames=["exchange", "reason"],
)
self.ws_connections_error_total = Counter(
name="ws_connections_error_total",
documentation="WebSocket 连接错误总数",
namespace=self.config.namespace,
subsystem=f"{self.config.subsystem}_connection",
registry=self.registry,
labelnames=["exchange", "error_type"],
)
# 协程池指标
self.coroutine_pool_active = Gauge(
name="coroutine_pool_active",
documentation="当前活跃协程数",
namespace=self.config.namespace,
subsystem="executor",
registry=self.registry,
labelnames=["pool_name"],
)
self.coroutine_pool_queued = Gauge(
name="coroutine_pool_queued",
documentation="协程池队列深度",
namespace=self.config.namespace,
subsystem="executor",
registry=self.registry,
labelnames=["pool_name"],
)
self.coroutine_pool_max = Gauge(
name="coroutine_pool_max",
documentation="协程池最大容量",
namespace=self.config.namespace,
subsystem="executor",
registry=self.registry,
)
# ─────────────────────────────────────────────────────────────
# 第二类:消息与延迟指标 (Histogram)
# ─────────────────────────────────────────────────────────────
# 消息延迟分布 - 量化场景优化 buckets
self.ws_message_delay_seconds = Histogram(
name="ws_message_delay_seconds",
documentation="WebSocket 消息从接收到达处理的延迟",
namespace=self.config.namespace,
subsystem=f"{self.config.subsystem}_message",
registry=self.registry,
labelnames=["exchange", "msg_type"],
buckets=(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5),
)
# 消息处理耗时
self.message_process_seconds = Histogram(
name="message_process_seconds",
documentation="消息处理总耗时",
namespace=self.config.namespace,
subsystem=f"{self.config.subsystem}_message",
registry=self.registry,
labelnames=["exchange", "msg_type"],
buckets=(0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5),
)
# ─────────────────────────────────────────────────────────────
# 第三类:业务指标 (Counter)
# ─────────────────────────────────────────────────────────────
self.messages_received_total = Counter(
name="messages_received_total",
documentation="收到的消息总数",
namespace=self.config.namespace,
subsystem=f"{self.config.subsystem}_message",
registry=self.registry,
labelnames=["exchange", "msg_type"],
)
self.messages_sent_total = Counter(
name="messages_sent_total",
documentation="发送的消息总数",
namespace=self.config.namespace,
subsystem=f"{self.config.subsystem}_message",
registry=self.registry,
labelnames=["exchange", "msg_type"],
)
self.orders_submitted_total = Counter(
name="orders_submitted_total",
documentation="提交订单总数",
namespace=self.config.namespace,
subsystem="order",
registry=self.registry,
labelnames=["exchange", "order_type", "status"],
)
self.market_data_requests_total = Counter(
name="market_data_requests_total",
documentation="行情数据请求总数",
namespace=self.config.namespace,
subsystem="market_data",
registry=self.registry,
labelnames=["exchange", "data_type"],
)
# 限频触发计数
self.rate_limit_hits_total = Counter(
name="rate_limit_hits_total",
documentation="触发频率限制的次数",
namespace=self.config.namespace,
subsystem="api",
registry=self.registry,
labelnames=["exchange", "endpoint"],
)
# ─────────────────────────────────────────────────────────────
# 第四类:健康状态指标 (Gauge)
# ─────────────────────────────────────────────────────────────
self.health_status = Gauge(
name="health_status",
documentation="组件健康状态 (1=健康, 0=异常)",
namespace=self.config.namespace,
subsystem="health",
registry=self.registry,
labelnames=["component", "instance"],
)
self.last_heartbeat_timestamp = Gauge(
name="last_heartbeat_timestamp",
documentation="上次心跳时间戳",
namespace=self.config.namespace,
subsystem="health",
registry=self.registry,
labelnames=["component"],
)
logger.info(f"QuantMetrics initialized with namespace={self.config.namespace}")
def record_message_received(
self,
exchange: str,
msg_type: str,
delay_seconds: float
):
"""
记录消息接收
Args:
exchange: 交易所标识,如 'binance', 'okx'
msg_type: 消息类型,如 'kline', 'depth', 'trade'
delay_seconds: 从网络收到到代码处理的延迟
"""
self.messages_received_total.labels(
exchange=exchange,
msg_type=msg_type
).inc()
self.ws_message_delay_seconds.labels(
exchange=exchange,
msg_type=msg_type
).observe(delay_seconds)
def record_connection_event(
self,
exchange: str,
account_id: str,
event: str, # 'connected', 'disconnected', 'reconnect', 'error'
reason: str = None,
error_type: str = None,
):
"""
记录连接状态变更事件
event 映射关系:
- connected: 活跃连接数 +1
- disconnected: 活跃连接数 -1
- reconnect: 重连计数器 +1
- error: 错误计数器 +1
"""
if event == "connected":
self.ws_connections_active.labels(
exchange=exchange,
account_id=account_id
).inc()
elif event == "disconnected":
self.ws_connections_active.labels(
exchange=exchange,
account_id=account_id
).dec()
elif event == "reconnect":
self.ws_connections_reconnect_total.labels(
exchange=exchange,
reason=reason or "unknown"
).inc()
elif event == "error":
self.ws_connections_error_total.labels(
exchange=exchange,
error_type=error_type or "unknown"
).inc()
def update_coroutine_pool(
self,
pool_name: str,
active: int,
queued: int,
max_size: int,
):
"""
更新协程池状态
当 active / max_size > 0.8 时,应触发告警
当 queued 持续 > 0 时,说明有任务堆积
"""
self.coroutine_pool_active.labels(pool_name=pool_name).set(active)
self.coroutine_pool_queued.labels(pool_name=pool_name).set(queued)
self.coroutine_pool_max.set(max_size) # 通常不随时间变化
def record_api_rate_limit(
self,
exchange: str,
endpoint: str,
retry_after: float,
):
"""
记录 API 限频事件
当某 endpoint 的限频频率超过阈值时,考虑:
1. 降低请求频率
2. 增加退避时间
3. 切换备用数据源
"""
self.rate_limit_hits_total.labels(
exchange=exchange,
endpoint=endpoint
).inc()
# 将 retry_after 记录到 Histogram,帮助分析限频严重程度
# 注意:这需要额外一个 Histogram 来跟踪 retry_after 分布
def update_health(
self,
component: str,
instance: str,
is_healthy: bool,
):
"""更新组件健康状态"""
self.health_status.labels(
component=component,
instance=instance
).set(1 if is_healthy else 0)
self.last_heartbeat_timestamp.labels(
component=component
).set_to_current_time()
def get_metrics(self) -> bytes:
"""暴露 /metrics 端点"""
return generate_latest(self.registry)
async def metrics_handler(self, request: web.Request) -> web.Response:
"""HTTP metrics 暴露端点"""
metrics_output = self.get_metrics()
return web.Response(
body=metrics_output,
content_type=CONTENT_TYPE_LATEST,
)
def start_server(self, host: str = "0.0.0.0") -> None:
"""
在独立端口启动 metrics HTTP 服务
⚠️ 生产环境建议:
- 使用独立端口,不与应用主服务混用
- 绑定到 localhost 或内网 IP,不对外暴露
- 配合 Prometheus 的 scrape 间隔配置
"""
start_http_server(
port=self.config.port,
addr=host,
registry=self.registry,
)
logger.info(f"Metrics server started on {host}:{self.config.port}")
2.3 集成进 WebSocket 网关
以上指标类需要与实际业务代码集成。以下是 WebSocket 客户端集成示例,展示如何采集真实数据:
import asyncio
import websockets
import json
import time
from typing import Optional, Callable, Dict, Any
from prometheus_client import Histogram
class WebSocketClientWithMetrics:
"""
带 Prometheus 指标的 WebSocket 客户端
核心监控能力:
1. 连接状态追踪(活跃连接数、断开、错误)
2. 消息延迟测量(到达时间 vs 处理时间)
3. 重连机制与重连计数
"""
def __init__(
self,
uri: str,
metrics: QuantMetrics,
exchange: str,
account_id: str,
):
self.uri = uri
self.metrics = metrics
self.exchange = exchange
self.account_id = account_id
self._ws: Optional[websockets.WebSocketClientProtocol] = None
self._connected = False
self._running = False
# 重连配置:指数退避 + 抖动
self._base_delay = 1.0
self._max_delay = 60.0
self._max_retries = float('inf')
# 限频配置
self._rate_limit_code = 3001
self._default_retry_after = 5
async def connect(self) -> bool:
"""
建立 WebSocket 连接
⚠️ 工程注意事项:
- 生产环境必须有超时控制,避免挂死在连接建立
- 需要处理 SSL 证书验证(生产环境不应跳过)
- 建议设置 User-Agent,便于交易所定位问题
"""
try:
headers = {
"User-Agent": "TickDB-Monitor/1.0",
}
self._ws = await asyncio.wait_for(
self._ws = await websockets.connect(
self.uri,
extra_headers=headers,
ping_interval=20, # 20 秒一次心跳
ping_timeout=10, # 10 秒超时
close_timeout=5,
open_timeout=10,
)
return True
except asyncio.TimeoutError:
self.metrics.record_connection_event(
self.exchange, self.account_id,
"error", error_type="connection_timeout"
)
return False
async def send_with_metrics(
self,
payload: Dict[str, Any],
msg_type: str = "subscribe",
) -> bool:
"""
发送消息并记录指标
包括:
- 发送成功/失败计数
- 发送延迟
"""
if not self._ws or not self._connected:
return False
try:
# 发送前时间戳
send_time = time.perf_counter()
await self._ws.send(json.dumps(payload))
self.metrics.messages_sent_total.labels(
exchange=self.exchange,
msg_type=msg_type
).inc()
return True
except websockets.exceptions.ConnectionClosed as e:
self.metrics.record_connection_event(
self.exchange, self.account_id,
"error", error_type="send_after_close"
)
await self._handle_disconnect(e)
return False
async def receive_with_metrics(self) -> Optional[Dict[str, Any]]:
"""
接收消息并记录延迟指标
延迟定义:消息到达业务代码的时间 - 消息携带的时间戳(如果交易所提供)
这可以帮助你测量:
1. 网络延迟
2. 消息队列等待时间
3. 反序列化耗时
"""
if not self._ws or not self._connected:
return None
try:
message = await asyncio.wait_for(
self._ws.recv(),
timeout=30.0,
)
# 到达时间
arrival_time = time.perf_counter()
data = json.loads(message)
# 尝试提取消息内嵌时间戳
# 多数交易所 API 会在 payload 中包含 event_time 或 ts 字段
embedded_time = data.get("event_time") or data.get("ts")
if embedded_time:
# 延迟 = 到达时间 - 消息产生时间
# 注意:这里需要根据交易所时间格式转换
try:
delay_seconds = arrival_time - (embedded_time / 1000)
delay_seconds = max(0, delay_seconds) # 防止负值
except (TypeError, ValueError):
delay_seconds = None
else:
delay_seconds = None
# 推断消息类型
msg_type = self._infer_msg_type(data)
self.metrics.messages_received_total.labels(
exchange=self.exchange,
msg_type=msg_type
).inc()
if delay_seconds is not None:
self.metrics.ws_message_delay_seconds.labels(
exchange=self.exchange,
msg_type=msg_type
).observe(delay_seconds)
return data
except asyncio.TimeoutError:
# 30 秒未收到消息,可能是连接假死
self.metrics.record_connection_event(
self.exchange, self.account_id,
"error", error_type="receive_timeout"
)
return None
def _infer_msg_type(self, data: Dict) -> str:
"""从消息内容推断类型"""
if "depth" in data:
return "depth"
elif "kline" in data:
return "kline"
elif "trade" in data:
return "trade"
elif "ticker" in data:
return "ticker"
return "unknown"
async def _handle_disconnect(self, exc: Exception):
"""断开连接处理:记录指标 + 触发重连"""
self._connected = False
self.metrics.record_connection_event(
self.exchange, self.account_id,
"disconnected"
)
# 根据异常类型判断重连原因
reason = self._classify_disconnect(exc)
await self._reconnect_with_backoff(reason)
async def _reconnect_with_backoff(self, reason: str, retry: int = 0):
"""
指数退避重连
退避公式:min(base * 2^retry + jitter, max_delay)
- base: 初始延迟 1s
- jitter: ±10% 随机抖动,避免惊群效应
- max_delay: 最大 60s
"""
delay = min(self._base_delay * (2 ** retry), self._max_delay)
jitter = delay * 0.1 * (2 * asyncio.get_event_loop().time() % 1 - 1)
total_delay = delay + jitter
self.metrics.record_connection_event(
self.exchange, self.account_id,
"reconnect", reason=reason
)
logger.warning(
f"WebSocket disconnected ({reason}), "
f"reconnecting in {total_delay:.1f}s (retry #{retry})"
)
await asyncio.sleep(total_delay)
success = await self.connect()
if success:
self._connected = True
self.metrics.record_connection_event(
self.exchange, self.account_id,
"connected"
)
else:
await self._reconnect_with_backoff(reason, retry + 1)
def _classify_disconnect(self, exc: Exception) -> str:
"""对断开原因分类,便于重连策略决策"""
exc_type = type(exc).__name__
if "ConnectionClosed" in exc_type:
if hasattr(exc, "code"):
if exc.code == 1000:
return "normal_closure"
elif exc.code == 1001:
return "server_going_away"
elif exc.code >= 4000:
return "server_error"
return "abnormal_close"
if "TimeoutError" in exc_type:
return "connection_timeout"
if "SSL" in str(exc):
return "ssl_error"
return "unknown_error"
三、核心监控指标体系
3.1 指标设计原则
为量化系统设计监控指标,需要遵循三个原则:
原则一:分层设计。将指标分为资源层(CPU、内存)、连接层(WS 连接状态)、消息层(延迟、吞吐量)、业务层(订单、成交),每一层的问题会表现为不同层的指标异常。
原则二:Goldilocks 粒度。标签太多会导致 Prometheus cardinality 爆炸;标签太少会无法定位问题。对于量化系统,建议按 exchange、account_id(或 strategy_id)、msg_type 三个维度打标签。
原则三:延迟分位数。不要只看平均值,P50 会掩盖长尾延迟。量化系统关注 P95 和 P99,异常延迟可能直接影响撮合。
3.2 核心指标清单
以下是 TickDB 推荐的生产级量化系统必选监控指标:
连接层指标
| 指标名 | 类型 | 标签 | 告警阈值建议 | 告警原因 |
|---|---|---|---|---|
tickdb_ws_gateway_connection_ws_connections_active |
Gauge | exchange, account_id | > 50 或 < 预期 | 连接数异常(被踢/异常断开) |
tickdb_ws_gateway_connection_ws_connections_reconnect_total |
Counter | exchange, reason | 增长率 > 5/min | 频繁重连,网络或服务器问题 |
tickdb_ws_gateway_connection_ws_connections_error_total |
Counter | exchange, error_type | 增长率 > 1/min | 持续错误,需排查 |
延迟层指标
| 指标名 | 类型 | 标签 | 告警阈值建议 | 含义 |
|---|---|---|---|---|
tickdb_ws_gateway_message_ws_message_delay_seconds |
Histogram | exchange, msg_type | P99 > 500ms | 消息到达延迟过高 |
tickdb_ws_gateway_message_message_process_seconds |
Histogram | exchange, msg_type | P99 > 100ms | 业务处理耗时过长 |
协程层指标
| 指标名 | 类型 | 标签 | 告警阈值建议 | 含义 |
|---|---|---|---|---|
tickdb_executor_coroutine_pool_active |
Gauge | pool_name | > max * 0.8 | 协程池接近满载 |
tickdb_executor_coroutine_pool_queued |
Gauge | pool_name | > 100 持续 5min | 任务堆积,可能 OOM 前兆 |
业务层指标
| 指标名 | 类型 | 标签 | 告警阈值建议 | 含义 |
|---|---|---|---|---|
tickdb_order_order_orders_submitted_total |
Counter | exchange, order_type, status | 失败率 > 5% | 订单执行异常 |
tickdb_api_rate_limit_hits_total |
Counter | exchange, endpoint | 增长率 > 10/min | 频繁触发限频 |
3.3 PromQL 查询示例
以下是几个关键场景的 PromQL 查询,可直接用于 Grafana 面板或告警规则:
# 1. WebSocket 连接数(按交易所聚合)
sum(tickdb_ws_gateway_connection_ws_connections_active) by (exchange)
# 2. 消息延迟 P99(按消息类型)
histogram_quantile(0.99,
rate(tickdb_ws_gateway_message_ws_message_delay_seconds_bucket[5m])
) by (msg_type)
# 3. 协程池利用率(超过 80% 告警)
tickdb_executor_coroutine_pool_active / tickdb_executor_coroutine_pool_max
# 4. 重连频率(每分钟超过 5 次触发告警)
increase(tickdb_ws_gateway_connection_ws_connections_reconnect_total[1m]) > 5
# 5. 消息吞吐量(按交易所和类型)
rate(tickdb_ws_gateway_message_messages_received_total[1m]) by (exchange, msg_type)
四、Grafana 仪表盘设计
4.1 仪表盘分层结构
一个完整的量化系统监控仪表盘应包含四层,从全局概览到细节下钻:
┌─────────────────────────────────────────────────────────────────┐
│ Layer 1: 系统总览 (System Overview) │
│ - 全部连接数、消息吞吐量、错误率 │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Layer 2: 连接健康 (Connection Health) │
│ - 各交易所连接状态、重连次数、错误分布 │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Layer 3: 延迟分析 (Latency Analysis) │
│ - 消息延迟 P50/P95/P99 分布、处理耗时热力图 │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Layer 4: 协程池详情 (Coroutine Pool) │
│ - 各池活跃数、队列深度、利用率趋势 │
└─────────────────────────────────────────────────────────────────┘
4.2 仪表盘 JSON 配置
以下是 Layer 2(连接健康)面板的 Grafana JSON 配置,可直接导入:
{
"title": "Connection Health - By Exchange",
"uid": "ws-connection-health",
"type": "row",
"panels": [
{
"title": "Active Connections",
"type": "timeseries",
"gridPos": {"x": 0, "y": 0, "w": 12, "h": 8},
"targets": [
{
"expr": "sum(tickdb_ws_gateway_connection_ws_connections_active) by (exchange)",
"legendFormat": "{{exchange}}"
}
],
"fieldConfig": {
"defaults": {
"unit": "short",
"thresholds": {
"mode": "absolute",
"steps": [
{"color": "green", "value": null},
{"color": "yellow", "value": 40},
{"color": "red", "value": 50}
]
}
}
},
"options": {
"legend": {"displayMode": "table", "placement": "right"}
}
},
{
"title": "Reconnect Rate (per minute)",
"type": "timeseries",
"gridPos": {"x": 12, "y": 0, "w": 12, "h": 8},
"targets": [
{
"expr": "increase(tickdb_ws_gateway_connection_ws_connections_reconnect_total[1m])",
"legendFormat": "{{exchange}} - {{reason}}"
}
],
"fieldConfig": {
"defaults": {
"unit": "short",
"thresholds": {
"mode": "absolute",
"steps": [
{"color": "green", "value": null},
{"color": "orange", "value": 5},
{"color": "red", "value": 10}
]
}
}
},
"options": {
"legend": {"displayMode": "table", "placement": "right"}
}
},
{
"title": "Error Distribution",
"type": "piechart",
"gridPos": {"x": 0, "y": 8, "w": 8, "h": 8},
"targets": [
{
"expr": "sum(increase(tickdb_ws_gateway_connection_ws_connections_error_total[1h])) by (error_type)",
"legendFormat": "{{error_type}}"
}
],
"options": {
"displayLabels": ["name", "value", "percent"]
}
},
{
"title": "Message Latency P99",
"type": "timeseries",
"gridPos": {"x": 8, "y": 8, "w": 16, "h": 8},
"targets": [
{
"expr": "histogram_quantile(0.99, rate(tickdb_ws_gateway_message_ws_message_delay_seconds_bucket[5m])) by (exchange, msg_type)",
"legendFormat": "{{exchange}} - {{msg_type}}"
}
],
"fieldConfig": {
"defaults": {
"unit": "s",
"custom": {
"lineWidth": 2,
"fillOpacity": 10
},
"thresholds": {
"mode": "absolute",
"steps": [
{"color": "green", "value": null},
{"color": "yellow", "value": 0.3},
{"color": "red", "value": 0.5}
]
}
}
}
}
]
}
4.3 关键面板配置技巧
分位数折线图:使用 histogram_quantile(0.99, ...) 配合 rate() 函数,可以展示 P99 延迟随时间的变化。添加 P50 和 P95 作为对比线,便于观察延迟分布变化。
热力图:对于 Histogram 类型指标,Grafana 支持热力图展示。配置方式:
# Grafana 热力图面板数据源
sum(rate(tickdb_ws_gateway_message_message_process_seconds_bucket[5m])) by (le)
告警规则配置:
# Prometheus alerting rules
groups:
- name: quant_system
rules:
- alert: HighReconnectRate
expr: increase(tickdb_ws_gateway_connection_ws_connections_reconnect_total[1m]) > 10
for: 2m
labels:
severity: warning
annotations:
summary: "WebSocket 重连频率过高"
description: "{{ $labels.exchange }} 在过去 1 分钟内重连 {{ $value }} 次"
- alert: CoroutinePoolNearCapacity
expr: tickdb_executor_coroutine_pool_active / tickdb_executor_coroutine_pool_max > 0.8
for: 5m
labels:
severity: critical
annotations:
summary: "协程池接近容量上限"
description: "池 {{ $labels.pool_name }} 利用率 {{ $value | humanizePercentage }}"
- alert: MessageLatencyHigh
expr: histogram_quantile(0.99, rate(tickdb_ws_gateway_message_ws_message_delay_seconds_bucket[5m])) > 0.5
for: 3m
labels:
severity: warning
annotations:
summary: "消息延迟过高"
description: "{{ $labels.exchange }}-{{ $labels.msg_type }} P99 延迟 {{ $value }}s"
五、协程池监控实现
5.1 为什么协程池监控容易被忽视
协程(asyncio Task)是 Python 异步量化系统的核心资源。策略执行、API 调用、数据处理都依赖协程调度。
但协程池问题往往在系统稳定运行时被忽视,直到以下症状出现:
- 订单延迟增加,但网络正常
- 日志中开始出现
RuntimeError: Event loop is running - 内存持续增长,最终 OOM
这些问题都指向同一根本原因:协程泄漏。
5.2 生产级协程池监控代码
import asyncio
import functools
import weakref
from typing import Dict, Set, Optional, Callable, Any
from dataclasses import dataclass, field
from prometheus_client import Gauge, Counter
import time
@dataclass
class CoroutinePool:
"""
协程池 + Prometheus 监控
监控能力:
- 活跃协程数
- 队列深度
- 协程执行时长
- 协程泄漏检测
"""
name: str
max_workers: int
metrics: QuantMetrics
_active_coroutines: Set[asyncio.Task] = field(default_factory=set)
_pending_tasks: asyncio.Queue = field(default_factory=asyncio.Queue)
_leaked_tasks: Set[asyncio.Task] = field(default_factory=set)
_closed = False
def __post_init__(self):
"""初始化 Prometheus 指标"""
self.execution_time = Histogram(
name="coroutine_execution_seconds",
documentation="协程执行时长",
namespace=self.metrics.config.namespace,
subsystem="coroutine_pool",
labelnames=["pool_name", "task_name"],
buckets=(0.001, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0),
registry=self.metrics.registry,
)
self.tasks_submitted = Counter(
name="coroutine_pool_tasks_submitted_total",
documentation="提交的任务总数",
namespace=self.metrics.config.namespace,
subsystem="coroutine_pool",
labelnames=["pool_name"],
registry=self.metrics.registry,
)
self.tasks_completed = Counter(
name="coroutine_pool_tasks_completed_total",
documentation="完成的任务总数",
namespace=self.metrics.config.namespace,
subsystem="coroutine_pool",
labelnames=["pool_name", "status"],
registry=self.metrics.registry,
)
async def submit(
self,
coro: Callable,
task_name: Optional[str] = None,
timeout: Optional[float] = None,
) -> Any:
"""
提交协程任务到池中执行
Args:
coro: 协程函数(未执行)
task_name: 任务名称(用于监控)
timeout: 超时时间(秒)
Returns:
协程返回值
Raises:
asyncio.TimeoutError: 任务超时
RuntimeError: 池已关闭
"""
if self._closed:
raise RuntimeError(f"CoroutinePool {self.name} is closed")
task_name = task_name or coro.__name__
self.tasks_submitted.labels(pool_name=self.name).inc()
async def wrapped_coro():
start_time = time.perf_counter()
try:
if timeout:
result = await asyncio.wait_for(coro(), timeout=timeout)
else:
result = await coro()
self.tasks_completed.labels(
pool_name=self.name, status="success"
).inc()
return result
except asyncio.TimeoutError:
self.tasks_completed.labels(
pool_name=self.name, status="timeout"
).inc()
raise
except Exception as e:
self.tasks_completed.labels(
pool_name=self.name, status="error"
).inc()
raise
finally:
execution_time = time.perf_counter() - start_time
self.execution_time.labels(
pool_name=self.name, task_name=task_name
).observe(execution_time)
# 更新活跃协程数
self._active_coroutines.discard(asyncio.current_task())
self._update_metrics()
task = asyncio.create_task(wrapped_coro())
self._active_coroutines.add(task)
task.add_done_callback(self._on_task_done)
self._update_metrics()
return await task
def _on_task_done(self, task: asyncio.Task):
"""
任务完成回调
关键功能:
1. 从活跃集合移除
2. 检测协程泄漏(任务完成但未正确清理)
"""
self._active_coroutines.discard(task)
# 检查是否有异常未被捕获
if task.done() and not task.cancelled():
exc = task.exception()
if exc:
logger.error(f"Task in pool {self.name} failed: {exc}")
self._update_metrics()
def _update_metrics(self):
"""更新 Prometheus 指标"""
self.metrics.update_coroutine_pool(
pool_name=self.name,
active=len(self._active_coroutines),
queued=self._pending_tasks.qsize(),
max_size=self.max_workers,
)
def get_pool_status(self) -> Dict[str, Any]:
"""
获取池状态快照
用于健康检查接口和手动调试
"""
return {
"name": self.name,
"max_workers": self.max_workers,
"active_count": len(self._active_coroutines),
"utilization": len(self._active_coroutines) / self.max_workers,
"pending_count": self._pending_tasks.qsize(),
"is_full": len(self._active_coroutines) >= self.max_workers,
}
def force_cancel_stale_tasks(self, max_age_seconds: float = 300):
"""
强制取消超长运行的任务
⚠️ 警告:这是最后的手段,会导致任务中断
用于处理协程泄漏(任务卡住但未释放)的情况
Args:
max_age_seconds: 超过此时间的任务将被取消
"""
current_task = asyncio.current_task()
stale_tasks = []
for task in self._active_coroutines:
if task is current_task:
continue
if task.done():
continue
# 注意:asyncio.Task 没有直接的开始时间属性
# 实际实现需要包装 task 或使用自定义 Task 类
# 这里仅为概念演示
for task in stale_tasks:
task.cancel()
logger.warning(f"Cancelled stale task in pool {self.name}")
return len(stale_tasks)
async def close(self, timeout: float = 10.0):
"""
关闭协程池
等待现有任务完成或超时后强制取消
"""
self._closed = True
if self._active_coroutines:
logger.info(f"Waiting for {len(self._active_coroutines)} tasks in pool {self.name}")
await asyncio.wait_for(
asyncio.gather(*self._active_coroutines, return_exceptions=True),
timeout=timeout,
)
self._active_coroutines.clear()
self._update_metrics()
5.3 协程泄漏排查思路
当发现协程池持续增长时,按以下步骤排查:
# 1. 导出当前活跃协程的堆栈(调试用)
import traceback
def dump_active_coroutines(pool: CoroutinePool):
"""打印活跃协程的堆栈追踪"""
for i, task in enumerate(pool._active_coroutines):
print(f"\n=== Task {i} ===")
print(f"State: {task.get_stack()}")
print(f"Coro: {task.get_coro()}")
# Python 3.11+ 可用 task.print_stack()
if hasattr(task, 'print_stack'):
task.print_stack()
# 2. 常见泄漏原因
"""
1. 事件循环阻塞:
- 在协程中使用了同步阻塞调用(time.sleep 而非 asyncio.sleep)
- 解决方法:使用 asyncio.to_thread 包装同步代码
2. 回调地狱:
- 嵌套的 asyncio.create_task 没有 await
- 解决方法:确保所有 create_task 都在监控范围内
3. 异常未处理:
- 协程内部抛出异常,但没有 try-except
- 解决方法:在协程入口添加统一的异常处理
4. 资源未释放:
- 协程持有数据库连接、WebSocket 等资源
- 解决方法:使用 async with 或显式清理
"""
六、完整监控集成示例
以下是整合了所有组件的完整运行示例:
"""
TickDB 量化系统监控集成示例
完整演示:Prometheus + Grafana + 协程池 + WebSocket 监控
"""
import asyncio
import logging
from aiohttp import web
from prometheus_client import REGISTRY
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger(__name__)
async def health_check(request: web.Request) -> web.Response:
"""健康检查端点"""
return web.json_response({"status": "ok"})
async def run_demo():
"""演示完整监控流程"""
# ─────────────────────────────────────────────────────────────
# Step 1: 初始化指标采集器
# ─────────────────────────────────────────────────────────────
metrics_config = MetricsConfig(
port=9090,
namespace="tickdb",
subsystem="demo",
)
metrics = QuantMetrics(config=metrics_config)
# 启动独立的 metrics HTTP 服务
metrics.start_server(host="0.0.0.0")
logger.info("Metrics server started on :9090")
# ─────────────────────────────────────────────────────────────
# Step 2: 初始化协程池
# ─────────────────────────────────────────────────────────────
pool = CoroutinePool(
name="strategy_executor",
max_workers=100,
metrics=metrics,
)
logger.info(f"Coroutine pool initialized: {pool.get_pool_status()}")
# ─────────────────────────────────────────────────────────────
# Step 3: 模拟消息处理(模拟量化系统工作)
# ─────────────────────────────────────────────────────────────
async def simulate_market_data_processing():
"""模拟行情数据处理"""
# 模拟不同消息类型的延迟
import random
delay = random.uniform(0.001, 0.1)
await asyncio.sleep(delay)
return {"processed": True}
async def simulate_order_submission(order_id: str):
"""模拟订单提交"""
await asyncio.sleep(random.uniform(0.01, 0.2))
return {"order_id": order_id, "status": "filled"}
# ─────────────────────────────────────────────────────────────
# Step 4: 模拟运行场景
# ─────────────────────────────────────────────────────────────
# 模拟 WebSocket 连接事件
metrics.record_connection_event(
exchange="binance",
account_id="acc_001",
event="connected"
)
# 模拟消息接收(持续 30 秒)
start_time = asyncio.get_event_loop().time()
message_count = 0
while asyncio.get_event_loop().time() - start_time < 30:
# 模拟不同类型的消息
msg_types = ["kline", "depth", "trade", "ticker"]
msg_type = random.choice(msg_types)
# 模拟消息延迟(正常范围 5-50ms,偶尔有抖动)
if random.random() > 0.95:
delay = random.uniform(0.1, 0.5) # 异常延迟
else:
delay = random.uniform(0.005, 0.05) # 正常延迟
metrics.record_message_received(
exchange="binance",
msg_type=msg_type,
delay_seconds=delay,
)
# 模拟协程池任务
await pool.submit(
simulate_market_data_processing(),
task_name=f"process_{msg_type}",
)
message_count += 1
await asyncio.sleep(0.1) # 100ms 间隔
# 模拟连接断开
metrics.record_connection_event(
exchange="binance",
account_id="acc_001",
event="disconnected"
)
# ─────────────────────────────────────────────────────────────
# Step 5: 输出最终统计
# ─────────────────────────────────────────────────────────────
logger.info(f"Demo completed. Processed {message_count} messages.")
logger.info(f"Pool status: {pool.get_pool_status()}")
# 健康状态更新
metrics.update_health(
component="demo_gateway",
instance="instance_001",
is_healthy=True,
)
await pool.close(timeout=5.0)
if __name__ == "__main__":
# 运行演示
asyncio.run(run_demo())
# 保持主进程运行,以便 Prometheus 抓取
print("\n✅ Demo complete. Metrics available at http://localhost:9090/metrics")
print("📊 Import the Grafana dashboard JSON from the article to visualize the data.")
运行以上代码后,访问 http://localhost:9090/metrics,你应该能看到类似以下格式的输出:
# HELP tickdb_ws_gateway_connection_ws_connections_active Current active WebSocket connections
# TYPE tickdb_ws_gateway_connection_ws_connections_active gauge
tickdb_ws_gateway_connection_ws_connections_active{account_id="acc_001",exchange="binance"} 1.0
# HELP tickdb_ws_gateway_message_ws_message_delay_seconds WebSocket message delay from receive to process
# TYPE tickdb_ws_gateway_message_ws_message_delay_seconds histogram
tickdb_ws_gateway_message_ws_message_delay_seconds_bucket{exchange="binance",msg_type="kline",le="0.01"} 45.0
tickdb_ws_gateway_message_ws_message_delay_seconds_bucket{exchange="binance",msg_type="kline",le="0.05"} 120.0
tickdb_ws_gateway_message_ws_message_delay_seconds_bucket{exchange="binance",msg_type="kline",le="+Inf"} 150.0
tickdb_ws_gateway_message_ws_message_delay_seconds_sum{exchange="binance",msg_type="kline"} 3.45
tickdb_ws_gateway_message_ws_message_delay_seconds_count{exchange="binance",msg_type="kline"} 150.0
结语:可观测性是量化系统的基础设施
本文覆盖了量化系统可观测性的核心基础设施:
- 指标分层设计:从连接层、消息层到协程层、业务层,每一层的问题都有对应的指标可以发现
- Prometheus 集成:使用 Gauge、Histogram、Counter 三种核心类型,覆盖资源、延迟、吞吐量
- 生产级代码:包含心跳、重连、限频、超时的完整监控方案
- Grafana 可视化:完整的仪表盘结构和告警规则配置
- 协程池监控:容易被忽视但直接影响系统稳定性的组件
可观测性不是“出问题后用的工具”,而是“系统设计的一部分”。在问题发生之前就规划好指标,在系统异常时才能快速定位。
下一步行动
如果你正在搭建量化系统:
- 将本文的
QuantMetrics类集成进你的 WebSocket 网关 - 配置 Prometheus scrape 间隔为 15 秒(对于延迟敏感场景可缩短到 5 秒)
- 导入 Grafana 仪表盘,观察基线指标
如果你希望更深入:
- 研究 Prometheus 的
remote_write方案,将数据写入 long-term storage - 探索
OpenTelemetry作为 vendor-neutral 的可观测性标准 - 联系 [email protected] 获取 TickDB 行情网关的内置监控方案
如果你习惯用 AI 辅助开发:
在 AI 助手中搜索安装 tickdb-market-data SKILL,TickDB 提供开箱即用的行情数据 API,集成 Prometheus 指标暴露,可直接对接 Grafana 监控体系。
风险提示:本文不构成任何投资建议。量化交易存在固有风险,回测结果不代表未来表现。生产环境中请确保监控覆盖与告警机制完善后再进行实盘交易。