"你收到了一条 PagerDuty 告警:延迟超过 500ms。但你不知道是哪一路消息慢了,也不知道是网络问题还是你的协程泄漏了。"
凌晨三点的告警,是每个做实盘系统的开发者最不愿面对的场景。你翻开监控面板,发现只有 CPU 和内存——一个真正量化的系统需要什么指标?WebSocket 连接是否还在心跳?消息从接收到处理的端到端延迟是多少?你的协程池是否正在悄悄耗尽?
可观测性不是运维的附属品。对于量化系统,它是你在黑夜里定位问题的眼睛。
一、量化系统需要什么样的可观测性
传统的服务器监控只看两个指标:CPU 和内存。但这对于量化系统远远不够。
量化系统是 I/O 密集型 + 事件驱动型的混合体。你的系统可能同时维护着数十个 WebSocket 连接,实时处理消息流,背后是一个 asyncio 协程池。如果只监控 CPU,你永远不知道消息处理是否积压;如果只看内存,你也发现不了协程泄漏导致的慢性死亡。
量化系统的可观测性需要覆盖三个层次:
| 层次 | 监控目标 | 核心指标 |
|---|---|---|
| 连接层 | WebSocket 健康状态 | 连接数、断开次数、心跳成功率 |
| 处理层 | 消息流水线 | 消息延迟(p50/p95/p99)、处理速率、积压队列深度 |
| 资源层 | 系统资源饱和度 | 协程数量、内存分配速率、GC 频率 |
这不是为了炫技,而是当你在凌晨三点收到告警时,这些指标能让你在五分钟内定位问题,而不是花两个小时翻日志。
二、Prometheus + Grafana:开源监控的事实标准
Prometheus 是 CNCF 毕业项目,专为云原生和微服务设计。它的核心优势是:
- Pull 模型:Prometheus 服务主动拉取指标,不依赖客户端开启端口
- 时序数据库:内置高效的时序存储,支持多维标签
- 生态系统完善:Grafana 是 Prometheus 的官方推荐可视化层
整个架构如下:
┌─────────────────────────────────────────────────────────────┐
│ Grafana │
│ (Dashboard 可视化) │
└────────────────────────┬────────────────────────────────────┘
│ Query (PromQL)
▼
┌─────────────────────────────────────────────────────────────┐
│ Prometheus │
│ (指标拉取 + 存储) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ /metrics │ │ /metrics │ │ /metrics │ │
│ │ (instance1) │ │ (instance2) │ │ (instance3) │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
└─────────┼────────────────┼────────────────┼─────────────────┘
│ Pull │ Pull │ Pull
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────┐
│ Your Quant System │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Prometheus Client Library │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Counter │ │ Gauge │ │ Histogram│ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ WebSocket ──► Message Queue ──► Coroutine Pool ──► Storage │
└─────────────────────────────────────────────────────────────┘
Prometheus 的指标类型只有四种,理解它们就能覆盖 90% 的监控场景:
| 类型 | 用途 | 典型场景 |
|---|---|---|
| Counter | 只增不减的计数器 | 请求总数、错误总数 |
| Gauge | 可增可减的瞬时值 | 当前连接数、内存使用量 |
| Histogram | 统计分布,直方图 | 请求延迟、消息大小 |
| Summary | 统计分位数 | 端到端延迟(客户端计算) |
对于量化系统,Histogram 是最重要的类型——它能自动计算 p50、p95、p99,让你在不存储原始数据的情况下了解延迟分布。
三、生产级代码:Python Prometheus Client 实战
3.1 安装与初始化
pip install prometheus-client aiohttp
from prometheus_client import Counter, Gauge, Histogram, start_http_server
import os
# ============================================================
# 指标定义区
# ============================================================
# Counter:记录累积发生的事件
WS_CONNECTION_TOTAL = Counter(
"ws_connections_total",
"WebSocket 连接总数(包含成功和失败)",
["status"] # labels: success, failure
)
WS_MESSAGES_TOTAL = Counter(
"ws_messages_total",
"WebSocket 消息总数",
["msg_type"] # labels: tick, depth, trade
)
# Gauge:记录当前的瞬时值
WS_CONNECTIONS_ACTIVE = Gauge(
"ws_connections_active",
"当前活跃的 WebSocket 连接数"
)
COROUTINE_POOL_SIZE = Gauge(
"coroutine_pool_size",
"协程池当前大小"
)
COROUTINE_RUNNING = Gauge(
"coroutine_running",
"当前正在运行的协程数"
)
# Histogram:记录分布,用于计算延迟分位数
MESSAGE_LATENCY = Histogram(
"message_processing_latency_seconds",
"消息从收到到处理完成的延迟(秒)",
["msg_type"],
buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5)
)
3.2 WebSocket 连接管理器(带完整监控埋点)
import asyncio
import aiohttp
import time
import random
from contextlib import asynccontextmanager
class MonitoredWebSocketManager:
"""带 Prometheus 监控的 WebSocket 连接管理器"""
def __init__(self, api_key: str, metrics_port: int = 8000):
self.api_key = api_key
self.connections = {} # symbol -> session
self.running_tasks = set()
# 启动 Prometheus metrics HTTP 服务
# ⚠️ 生产环境建议与主服务分离部署
start_http_server(metrics_port)
print(f"Prometheus metrics exposed at :{metrics_port}/metrics")
@asynccontextmanager
async def connect(self, symbol: str, url: str):
"""上下文管理器:自动处理连接生命周期和监控埋点"""
retry_count = 0
max_retries = 5
base_delay = 1.0
while retry_count < max_retries:
try:
headers = {"X-API-Key": self.api_key}
async with aiohttp.ClientSession() as session:
# ⚠️ 协程数 +1
COROUTINE_RUNNING.inc()
COROUTINE_POOL_SIZE.inc()
async with session.ws_connect(
url,
headers=headers,
timeout=aiohttp.WSMsgType.TEXT,
heartbeat=30 # 30秒心跳保活
) as ws:
self.connections[symbol] = ws
WS_CONNECTIONS_ACTIVE.inc()
WS_CONNECTION_TOTAL.labels(status="success").inc()
print(f"[{symbol}] WebSocket connected")
try:
yield ws
finally:
# 正常断开时标记
self.connections.pop(symbol, None)
WS_CONNECTIONS_ACTIVE.dec()
# ⚠️ 协程数 -1
COROUTINE_RUNNING.dec()
COROUTINE_POOL_SIZE.dec()
except aiohttp.ClientError as e:
WS_CONNECTION_TOTAL.labels(status="failure").inc()
retry_count += 1
# 指数退避 + 抖动
delay = min(base_delay * (2 ** retry_count), 60)
jitter = random.uniform(0, delay * 0.1)
wait_time = delay + jitter
print(f"[{symbol}] Connection failed: {e}, "
f"retry {retry_count}/{max_retries} in {wait_time:.1f}s")
await asyncio.sleep(wait_time)
except Exception as e:
# 未预期异常,打印日志但不重试(防止无限循环)
print(f"[{symbol}] Unexpected error: {e}")
raise
raise RuntimeError(f"[{symbol}] Max retries exceeded after {max_retries} attempts")
async def subscribe_depth(self, symbol: str):
"""订阅 depth 频道,实时计算消息延迟"""
ws_url = f"wss://api.tickdb.ai/ws?api_key={self.api_key}"
async with self.connect(symbol, ws_url) as ws:
# 发送订阅命令
await ws.send_json({
"cmd": "subscribe",
"channel": "depth",
"symbol": symbol
})
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
# ========== 核心监控埋点 ==========
receive_time = time.perf_counter()
try:
data = msg.json()
# 从消息中提取时间戳计算延迟
# 假设服务端在 payload 中附带 server_time
if "server_time" in data:
latency = receive_time - data["server_time"]
MESSAGe_LATENCY.labels(msg_type="depth").observe(latency)
WS_MESSAGES_TOTAL.labels(msg_type="depth").inc()
except Exception as e:
print(f"[{symbol}] Parse error: {e}")
continue
elif msg.type == aiohttp.WSMsgType.PING:
await ws.pong()
3.3 协程池健康监控(防止泄漏)
协程泄漏是 asyncio 应用最隐蔽的问题。一个被遗忘的 await 或死循环的协程会悄悄占用资源,直到系统崩溃。
import asyncio
import weakref
from prometheus_client import REGISTRY, CollectorRegistry
class CoroutineLeakDetector:
"""协程生命周期追踪器(用于检测泄漏)"""
def __init__(self):
# 追踪活跃协程的创建和销毁
self.created_count = 0
self.destroyed_count = 0
self.active_coroutines = weakref.WeakSet()
# 注册自定义收集器
REGISTRY.register(self)
def track(self, coro):
"""装饰器:追踪协程的生命周期"""
self.created_count += 1
self.active_coroutines.add(coro)
async def wrapped():
try:
result = await coro
return result
finally:
self.destroyed_count += 1
self.active_coroutines.discard(coro)
return wrapped()
def collect(self):
"""Prometheus 收集器接口"""
yield GaugeMetricFamily(
"quant_coroutines_created_total",
"历史创建的协程总数",
value=self.created_count
)
yield GaugeMetricFamily(
"quant_coroutines_active",
"当前活跃协程数",
value=len(self.active_coroutines)
)
yield GaugeMetricFamily(
"quant_coroutines_destroyed_total",
"历史销毁的协程总数",
value=self.destroyed_count
)
# 使用示例
detector = CoroutineLeakDetector()
async def background_task(task_id: int):
"""后台任务示例"""
while True:
await asyncio.sleep(10)
print(f"Task {task_id} running")
# 创建任务时自动追踪
async def main():
tasks = []
for i in range(10):
tasks.append(detector.track(background_task(i)))
await asyncio.gather(*tasks)
# 运行后检查:
# curl http://localhost:8000/metrics | grep quant_coroutines
# 如果 active < created - destroyed,说明有协程泄漏
3.4 告警规则配置
# prometheus_rules.yml
groups:
- name: quant_system_alerts
rules:
# WebSocket 连接数异常
- alert: WebSocketConnectionDrop
expr: ws_connections_active < 1
for: 1m
labels:
severity: critical
annotations:
summary: "活跃 WebSocket 连接数为 0"
description: "持续 1 分钟无活跃连接,请检查网络或 API 服务"
# 消息延迟过高
- alert: MessageLatencyHigh
expr: histogram_quantile(0.95, message_processing_latency_seconds) > 0.5
for: 5m
labels:
severity: warning
annotations:
summary: "消息处理延迟 P95 超过 500ms"
description: "当前 P95: {{ $value }}s"
# 协程数异常增长
- alert: CoroutineLeakSuspected
expr: (quant_coroutines_created_total - quant_coroutines_destroyed_total) > 100
for: 10m
labels:
severity: warning
annotations:
summary: "协程泄漏可疑"
description: "未回收协程数超过 100,请检查协程池"
四、核心指标体系
一个完整的量化系统监控面板,至少需要以下指标:
| 指标类型 | 指标名 | 说明 | 告警阈值 |
|---|---|---|---|
| Gauge | ws_connections_active |
活跃连接数 | = 0 时告警 |
| Counter | ws_connection_total{status="failure"} |
失败连接数(速率) | 5分钟内 > 10 次 |
| Histogram | message_processing_latency_seconds |
消息延迟 p95 | > 500ms |
| Gauge | quant_coroutines_active |
活跃协程数 | 持续增长 |
| Counter | ws_messages_total |
消息吞吐量(速率) | 突然跌至 0 |
| Gauge | memory_usage_bytes |
内存使用量 | > 80% |
| Counter | api_errors_total{code="3001"} |
限频错误数 | 持续出现 |
这些指标覆盖了连接层、处理层和资源层,是量化系统可观测性的最小可行集。
五、Grafana 仪表盘设计
有了 Prometheus 拉取数据,Grafana 只需要写 PromQL 查询即可。
5.1 连接健康面板
# 当前活跃连接数
ws_connections_active
# 连接失败率(5分钟窗口)
rate(ws_connection_total{status="failure"}[5m])
5.2 延迟分布面板
# P50 延迟
histogram_quantile(0.50, rate(message_processing_latency_seconds_bucket[5m]))
# P95 延迟
histogram_quantile(0.95, rate(message_processing_latency_seconds_bucket[5m]))
# P99 延迟
histogram_quantile(0.99, rate(message_processing_latency_seconds_bucket[5m]))
5.3 协程健康面板
# 活跃协程数趋势
quant_coroutines_active
# 协程创建速率
rate(quant_coroutines_created_total[5m])
# 协程销毁速率
rate(quant_coroutines_destroyed_total[5m])
# 如果创建速率 != 销毁速率,存在泄漏风险
Grafana Dashboard 的最佳实践是分层展示:最上方放 SLA 指标(延迟 P99),中间放业务指标(连接数、吞吐量),底部放资源指标(协程、内存)。
六、总结
可观测性不是监控的升级版,而是一种设计哲学。它要求你在设计系统时就考虑"出了问题怎么排查",而不是出了问题再想办法。
对于量化系统,这意味着:
- 连接层:知道 WebSocket 是否还活着,心跳是否正常
- 处理层:知道消息从收到到处理的端到端延迟,是否在积压
- 资源层:知道协程是否在泄漏,内存是否在膨胀
Prometheus + Grafana 是这个体系的事实标准。Python prometheus-client 库提供了开箱即用的指标类型,而 Histogram 类型的 histogram_quantile 函数是计算延迟分位数的利器——不需要存储原始数据,Prometheus 帮你算好。
凌晨三点的告警不再让你手忙脚乱,因为你有眼睛了。
下一步行动
如果你是个人开发者,现在就可以在本地运行这套监控:
# 1. 安装依赖
pip install prometheus-client aiohttp
# 2. 启动 Prometheus(docker)
docker run -d -p 9090:9090 -v $(pwd)/prometheus.yml:/etc/prometheus/prometheus.yml prom/prometheus
# 3. 启动 Grafana(docker)
docker run -d -p 3000:3000 grafana/grafana
如果你希望有一个开箱即用的 TickDB 监控模板,在 GitHub 搜索 TickDB/observability,有社区贡献的 Grafana Dashboard JSON。
如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 prometheus-setup SKILL,可以快速生成指标定义和告警规则。
风险提示:本文不构成任何投资建议。Prometheus 和 Grafana 是第三方开源工具,部署时请注意网络安全配置,不要将 metrics 端口暴露在公网。市场有风险,投资需谨慎。