Prometheus 指标暴露:量化系统的可观测性第一步
凌晨三点,你的数字货币套利策略突然失效。日志里只有一行 Connection timeout,但你不知道的是——WebSocket 连接数悄悄触达了服务商的限频阈值,消息推送已经延迟了整整 47 秒。在这段时间里,你的策略对着过期数据下了 200 笔单。
这不是段子。这是每个经历过生产事故的量化开发者都会讲的故事。
可观测性不是锦上添花,是系统崩掉之前你能抓住的最后一根稻草。 本文拆解 Prometheus + Grafana 的指标暴露方案,用生产级代码演示:WebSocket 连接数、消息延迟、协程池健康度,这些指标怎么暴露、怎么可视化、怎么告警。
一、为什么量化系统需要可观测性
1.1 三个你无法回答的灵魂拷问
上线一个量化策略后,你能不能快速回答这三个问题:
- 当前有多少个 WebSocket 连接处于活跃状态?
- 过去 5 分钟内,接收到的消息平均延迟是多少?
- 协程/线程池的利用率是否已经饱和?
如果你的答案是"去日志里翻一下",那你正在用人力对抗复杂度。当系统从单策略扩展到多策略、从单市场扩展到跨市场、从手动运行升级到 7×24 小时无人值守,可观测性不是选择题,而是生存题。
1.2 可观测性的三个维度
业界通用 遥测三支柱(Three Pillars):
| 维度 | 定义 | 解决的问题 |
|---|---|---|
| Metrics(指标) | 聚合后的数值,描述系统状态的快照和趋势 | "现在系统健康吗?" |
| Logs(日志) | 离散的事件记录,包含时间戳和上下文 | "哪里出了问题?" |
| Traces(链路追踪) | 请求在系统内的完整路径 | "为什么慢?卡在哪一步?" |
本文聚焦 Metrics,因为它是量化系统监控的第一入口——开销低、存储成本可控、查询速度快。对于 WebSocket 长连接和消息队列场景,指标暴露比完整链路追踪更实用。
1.3 量化系统的典型可观测性需求
| 监控对象 | 关键指标 | 告警阈值建议 |
|---|---|---|
| WebSocket 连接 | 连接总数、重连次数、连接状态分布 | 重连次数 > 5次/分钟 |
| 消息管道 | 生产速率、消费延迟、队列深度 | 延迟 > 10秒 |
| 协程/线程池 | 活跃数、排队数、拒绝数、饱和度 | 饱和度 > 80% |
| API 限频 | 请求速率、剩余配额、限频错误次数 | 限频错误 > 0 |
二、Prometheus 指标体系拆解
2.1 四种指标类型
Prometheus 的数据模型基于 时序数据,每个指标由指标名、标签键值对和时间戳组成。指标暴露时需要选择合适的类型:
| 类型 | 用途 | 典型场景 | 增加示例 |
|---|---|---|---|
| Counter | 只增不减的计数器 | 请求总数、重连次数、成交笔数 | counter.inc() |
| Gauge | 可上下波动的瞬时值 | 当前连接数、CPU 使用率、账户余额 | gauge.set(value) |
| Histogram | 统计分布,采样分组 | 请求耗时、消息大小、订单金额 | histogram.observe(value) |
| Summary | 统计分位数,不聚合 | API 响应时间(直接计算百分位) | summary.observe(value) |
实战建议:量化系统推荐优先用 Histogram,因为它可以在服务端聚合出任意分位数(p50/p95/p99),而 Summary 在 Prometheus 中不支持服务端聚合。
2.2 TickDB 中的指标暴露场景
以 TickDB 的 WebSocket 推送为例,其内部暴露了以下核心指标:
| 指标名 | 类型 | 标签 | 说明 |
|---|---|---|---|
tickdb_ws_connections_active |
Gauge | market, protocol |
当前活跃连接数 |
tickdb_ws_messages_sent_total |
Counter | market, type |
累计发送消息数 |
tickdb_ws_reconnect_total |
Counter | market |
累计重连次数 |
tickdb_ws_message_delay_seconds |
Histogram | market |
消息从生成到推送的延迟分布 |
这种指标设计让开发者可以按市场、协议、消息类型做多维度下钻分析。你的量化系统也应该遵循同样的思路。
三、生产级指标暴露代码
3.1 架构总览
┌─────────────────────────────────────────────────────────────────┐
│ 量化策略进程 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ WebSocket │ │ 消息处理 │ │ 交易引擎 │ │
│ │ 连接管理 │ │ Pipeline │ │ Core │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Prometheus Metrics Registry │ │
│ │ (Counter/Gauge/Histogram 自动聚合,按标签暴露) │ │
│ └─────────────────────────┬───────────────────────────────┘ │
└────────────────────────────┼────────────────────────────────────┘
│ /metrics (HTTP)
▼
┌──────────────────────────────┐
│ Prometheus Server │
│ (抓取、存储、PromQL 查询) │
└──────────────────────────────┘
│ Grafana 查询
▼
┌──────────────────────────────┐
│ Grafana Dashboards │
│ (可视化、告警、仪表盘) │
└──────────────────────────────┘
3.2 完整实现:量化系统指标暴露模块
以下是 Python 环境下基于 prometheus_client 的生产级实现,包含指标定义、HTTP 服务器、装饰器封装三大模块。
3.2.1 指标定义模块
# metrics.py
"""
量化系统 Prometheus 指标暴露模块
生产级实现:包含完整的指标定义、HTTP 导出服务、函数装饰器
作者:TickDB 技术团队
依赖:prometheus_client>=0.17
"""
import os
from prometheus_client import (
Counter, Gauge, Histogram,
CollectorRegistry, generate_latest, CONTENT_TYPE_LATEST
)
from typing import Callable, Any
import functools
import time
# 全局注册表(支持多进程场景)
REGISTRY = CollectorRegistry()
# ============================================================
# 指标定义区
# ============================================================
# ----- WebSocket 连接指标 -----
WS_CONNECTIONS_ACTIVE = Gauge(
"quant_ws_connections_active",
"当前活跃的 WebSocket 连接数",
["market", "protocol"],
registry=REGISTRY
)
WS_CONNECTIONS_TOTAL = Counter(
"quant_ws_connections_total",
"累计建立的 WebSocket 连接数",
["market", "status"], # status: success, failed, timeout
registry=REGISTRY
)
WS_RECONNECT_TOTAL = Counter(
"quant_ws_reconnect_total",
"累计 WebSocket 重连次数",
["market", "reason"], # reason: network, server, timeout, limit
registry=REGISTRY
)
# ----- 消息管道指标 -----
WS_MESSAGES_RECEIVED_TOTAL = Counter(
"quant_ws_messages_received_total",
"累计接收到的 WebSocket 消息数",
["market", "msg_type"],
registry=REGISTRY
)
WS_MESSAGE_DELAY_SECONDS = Histogram(
"quant_ws_message_delay_seconds",
"消息从生成到接收的延迟分布(秒)",
["market", "msg_type"],
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
registry=REGISTRY
)
WS_MESSAGE_SIZE_BYTES = Histogram(
"quant_ws_message_size_bytes",
"消息体大小分布(字节)",
["market", "msg_type"],
buckets=[100, 500, 1000, 5000, 10000, 50000],
registry=REGISTRY
)
# ----- 协程/线程池指标 -----
COROUTINE_POOL_ACTIVE = Gauge(
"quant_coroutine_pool_active",
"协程池中活跃的协程数",
["pool_name"],
registry=REGISTRY
)
COROUTINE_POOL_QUEUED = Gauge(
"quant_coroutine_pool_queued",
"协程池中排队的任务数",
["pool_name"],
registry=REGISTRY
)
COROUTINE_POOL_SIZE = Gauge(
"quant_coroutine_pool_size",
"协程池的最大容量",
["pool_name"],
registry=REGISTRY
)
COROUTINE_TASK_DURATION_SECONDS = Histogram(
"quant_coroutine_task_duration_seconds",
"协程任务执行时长分布(秒)",
["pool_name", "task_type"],
buckets=[0.001, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0],
registry=REGISTRY
)
# ----- API 限频指标 -----
API_REQUEST_TOTAL = Counter(
"quant_api_request_total",
"累计 API 请求数",
["provider", "endpoint", "status"],
registry=REGISTRY
)
API_RATE_LIMIT_REMAINING = Gauge(
"quant_api_rate_limit_remaining",
"API 剩余请求配额",
["provider"],
registry=REGISTRY
)
API_RATE_LIMIT_ERRORS_TOTAL = Counter(
"quant_api_rate_limit_errors_total",
"API 限频错误累计次数",
["provider"],
registry=REGISTRY
)
def track_coroutine_task(pool_name: str, task_type: str):
"""
协程任务执行时长装饰器
用法:
@track_coroutine_task("order_pool", "submit_order")
async def submit_order(order):
...
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(*args, **kwargs):
# 更新活跃协程数
COROUTINE_POOL_ACTIVE.labels(pool_name=pool_name).inc()
start_time = time.perf_counter()
error = None
try:
result = await func(*args, **kwargs)
return result
except Exception as e:
error = e
raise
finally:
# 记录执行时长
duration = time.perf_counter() - start_time
COROUTINE_TASK_DURATION_SECONDS.labels(
pool_name=pool_name,
task_type=task_type
).observe(duration)
# 更新活跃协程数
COROUTINE_POOL_ACTIVE.labels(pool_name=pool_name).dec()
# 更新排队数(如果任务被异步调度,这里应该由调度器主动更新)
return wrapper
return decorator
3.2.2 HTTP 导出服务
# metrics_server.py
"""
Prometheus Metrics HTTP 导出服务
生产级实现:支持多worker、优雅关闭、健康检查
⚠️ 注意:如果是多进程部署(gunicorn -w N),每个进程需要单独暴露指标。
推荐使用 prometheus_client 的 start_http_server 直接在每个进程启动时绑定端口。
"""
import os
import logging
from http.server import HTTPServer, BaseHTTPRequestHandler
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
import threading
import signal
import sys
from metrics import REGISTRY
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MetricsHandler(BaseHTTPRequestHandler):
"""Prometheus 指标抓取处理器"""
def do_GET(self):
if self.path == "/metrics":
# 设置响应头
self.send_response(200)
self.send_header("Content-Type", CONTENT_TYPE_LATEST)
self.end_headers()
# 生成指标(必须以 UTF-8 编码)
output = generate_latest(REGISTRY)
self.wfile.write(output)
elif self.path == "/health":
# 健康检查端点(用于 k8s 存活探针)
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(b"OK")
elif self.path == "/ready":
# 就绪检查端点(用于 k8s 就绪探针)
# 可以扩展为检查数据库连接、API Key 等依赖
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(b"READY")
else:
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
# 抑制访问日志(可选)
pass
def run_metrics_server(port: int = 9090):
"""
启动 Metrics HTTP 服务器
Args:
port: 监听端口,默认 9090
"""
server = HTTPServer(("0.0.0.0", port), MetricsHandler)
# 优雅关闭处理
def shutdown_handler(signum, frame):
logger.info(f"Received signal {signum}, shutting down metrics server...")
server.shutdown()
sys.exit(0)
signal.signal(signal.SIGTERM, shutdown_handler)
signal.signal(signal.SIGINT, shutdown_handler)
logger.info(f"Metrics server started on port {port}")
server.serve_forever()
# 独立进程启动入口
if __name__ == "__main__":
PORT = int(os.environ.get("METRICS_PORT", 9090))
run_metrics_server(PORT)
3.2.3 与 TickDB WebSocket 的集成
# tickdb_monitor.py
"""
TickDB WebSocket 实时监控示例
展示如何将 TickDB 数据源接入量化系统的可观测性体系
数据支持:TickDB WebSocket (wss://api.tickdb.ai/stream)
"""
import os
import time
import json
import asyncio
import random
import logging
from typing import Optional
from dataclasses import dataclass
# 导入指标定义
from metrics import (
WS_CONNECTIONS_ACTIVE, WS_CONNECTIONS_TOTAL, WS_RECONNECT_TOTAL,
WS_MESSAGES_RECEIVED_TOTAL, WS_MESSAGE_DELAY_SECONDS, WS_MESSAGE_SIZE_BYTES,
API_RATE_LIMIT_REMAINING, API_RATE_LIMIT_ERRORS_TOTAL,
track_coroutine_task
)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s"
)
logger = logging.getLogger(__name__)
@dataclass
class ConnectionConfig:
"""WebSocket 连接配置"""
url: str
api_key: str
markets: list[str]
ping_interval: int = 25 # TickDB 推荐值
ping_timeout: int = 10
max_retries: int = 10
base_retry_delay: float = 1.0
max_retry_delay: float = 60.0
class TickDBWebSocketMonitor:
"""
TickDB WebSocket 客户端(带完整可观测性埋点)
生产级特性:
- 心跳保活(ping/pong)
- 指数退避重连 + 抖动
- 限频自适应处理
- 完整指标暴露
"""
def __init__(self, config: ConnectionConfig):
self.config = config
self._running = False
self._retry_count = 0
self._ws = None # WebSocket 连接对象(根据实际库调整)
async def connect(self) -> bool:
"""建立 WebSocket 连接"""
try:
# ⚠️ 此处为伪代码示意,实际使用 aiohttp 或 websockets 库
# url = f"{self.config.url}?api_key={self.config.api_key}&topics={','.join(self.config.markets)}"
# self._ws = await websockets.connect(url, ping_interval=self.config.ping_interval)
logger.info(f"Connected to TickDB, markets: {self.config.markets}")
# 更新连接指标
WS_CONNECTIONS_TOTAL.labels(
market="multi", # 多市场聚合
status="success"
).inc()
self._retry_count = 0
return True
except Exception as e:
logger.error(f"Connection failed: {e}")
WS_CONNECTIONS_TOTAL.labels(
market="multi",
status="failed"
).inc()
return False
@track_coroutine_task("websocket_pool", "message_loop")
async def message_loop(self):
"""消息接收主循环"""
while self._running:
try:
# 接收消息(伪代码)
# message = await asyncio.wait_for(self._ws.recv(), timeout=30)
# 模拟消息
message = await self._simulate_message()
# 解析消息
data = json.loads(message)
msg_type = data.get("type", "unknown")
# 更新活跃连接
WS_CONNECTIONS_ACTIVE.labels(
market=data.get("symbol", "unknown"),
protocol="websocket"
).inc()
# 计算消息延迟(如果消息含时间戳)
if "timestamp" in data:
delay = time.time() - data["timestamp"] / 1000
WS_MESSAGE_DELAY_SECONDS.labels(
market=data.get("symbol", "unknown"),
msg_type=msg_type
).observe(delay)
# 记录消息大小
WS_MESSAGE_SIZE_BYTES.labels(
market=data.get("symbol", "unknown"),
msg_type=msg_type
).observe(len(message))
# 累计消息数
WS_MESSAGES_RECEIVED_TOTAL.labels(
market=data.get("symbol", "unknown"),
msg_type=msg_type
).inc()
logger.debug(f"Received {msg_type} for {data.get('symbol')}, delay: {delay:.3f}s")
except Exception as e:
if self._running:
logger.error(f"Message loop error: {e}")
await self._handle_disconnect()
break
async def _simulate_message(self) -> str:
"""模拟 TickDB 消息格式"""
await asyncio.sleep(0.1)
return json.dumps({
"type": random.choice(["depth", "trade", "kline"]),
"symbol": random.choice(["BTC.USDT", "ETH.USDT"]),
"timestamp": int(time.time() * 1000),
"data": {}
})
async def _handle_disconnect(self):
"""处理连接断开:指数退避重连"""
if not self._running:
return
self._retry_count += 1
if self._retry_count > self.config.max_retries:
logger.error("Max retries exceeded, giving up")
self._running = False
return
# 指数退避 + 抖动
delay = min(
self.config.base_retry_delay * (2 ** (self._retry_count - 1)),
self.config.max_retry_delay
)
jitter = random.uniform(0, delay * 0.1)
total_delay = delay + jitter
logger.warning(
f"Reconnecting in {total_delay:.1f}s "
f"(attempt {self._retry_count}/{self.config.max_retries})"
)
# 记录重连原因
WS_RECONNECT_TOTAL.labels(
market="multi",
reason="network"
).inc()
await asyncio.sleep(total_delay)
await self.connect()
async def start(self):
"""启动监控"""
self._running = True
# 连接
if not await self.connect():
await self._handle_disconnect()
return
# 启动消息循环
await self.message_loop()
async def stop(self):
"""停止监控"""
self._running = False
if self._ws:
await self._ws.close()
logger.info("TickDB monitor stopped")
async def main():
"""主函数"""
config = ConnectionConfig(
url="wss://api.tickdb.ai/stream",
api_key=os.environ.get("TICKDB_API_KEY", ""),
markets=["BTC.USDT", "ETH.USDT"]
)
monitor = TickDBWebSocketMonitor(config)
try:
await monitor.start()
except KeyboardInterrupt:
await monitor.stop()
if __name__ == "__main__":
import threading
from metrics_server import run_metrics_server
# 启动 Metrics 服务器(独立线程)
metrics_thread = threading.Thread(
target=run_metrics_server,
kwargs={"port": 9090},
daemon=True
)
metrics_thread.start()
logger.info("Metrics server running on :9090/metrics")
# 运行主程序
asyncio.run(main())
3.3 关键工程要点
| 要点 | 实现方式 | 原因 |
|---|---|---|
| 指标注册表隔离 | CollectorRegistry() 独立注册 |
支持多进程/多实例独立抓取 |
| Histogram 桶设计 | 根据业务延迟预期设置 buckets | 避免桶太粗(p99 不准)或太细(存储浪费) |
| 标签维度规划 | 按 market、task_type 细分 |
支持多维下钻,但标签不宜超过 7 个 |
| 退避抖动 | delay * random.uniform(0, 0.1) |
避免大量客户端同时重连造成惊群效应 |
| 健康检查端点 | /health + /ready |
k8s 存活探针必须返回 200 |
四、Grafana 仪表盘配置
4.1 数据源配置
Grafana 中添加 Prometheus 数据源:
HTTP URL: http://prometheus:9090
Access: Server (默认)
4.2 核心面板配置
以下是几个关键面板的 PromQL 查询语句,可直接复制到 Grafana 新建面板中使用。
面板一:WebSocket 连接数趋势
# 当前活跃连接数(按市场分组)
sum(quant_ws_connections_active) by (market)
# 5分钟滑动平均
avg_over_time(sum(quant_ws_connections_active) by (market)[5m:1m])
面板二:消息延迟分布
# p50 延迟
histogram_quantile(0.50,
sum(rate(quant_ws_message_delay_seconds_bucket[5m])) by (le, market)
)
# p95 延迟
histogram_quantile(0.95,
sum(rate(quant_ws_message_delay_seconds_bucket[5m])) by (le, market)
)
# p99 延迟
histogram_quantile(0.99,
sum(rate(quant_ws_message_delay_seconds_bucket[5m])) by (le, market)
)
面板三:协程池饱和度
# 饱和度百分比 = 活跃数 / 最大容量 * 100
100 * quant_coroutine_pool_active / quant_coroutine_pool_size
面板四:重连风暴告警
# 重连速率(每分钟重连次数)
sum(rate(quant_ws_reconnect_total[1m])) by (market, reason) * 60
4.3 典型仪表盘布局
┌────────────────────────────────────────────────────────────────────┐
│ TickDB 实时监控仪表盘 │
├────────────────────────────────────────────────────────────────────┤
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ 活跃连接数 │ │ 消息延迟 (p50/p99) │ │
│ │ [折线图] │ │ [双轴折线] │ │
│ └─────────────────────┘ └─────────────────────┘ │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ 协程池饱和度 │ │ 消息吞吐量 (条/秒) │ │
│ │ [仪表盘] │ │ [柱状图] │ │
│ └─────────────────────┘ └─────────────────────┘ │
│ ┌───────────────────────────────────────────────┐ │
│ │ 告警日志(最近 20 条) │ │
│ │ [表格] │ │
│ └───────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────────────┘
五、完整部署方案
5.1 部署模式对比
| 模式 | 适用场景 | 架构复杂度 | 成本 |
|---|---|---|---|
| 单机单进程 | 个人开发者、本地回测验证 | ⭐ | 低 |
| 单机多进程 | 多策略并行、需要并行回测 | ⭐⭐ | 中 |
| K8s 部署 | 机构级、弹性扩缩容 | ⭐⭐⭐ | 高 |
5.2 推荐配置
| 组件 | 配置要求 |
|---|---|
| Prometheus | 2核4G SSD,建议独立部署,按需挂载持久化存储 |
| Grafana | 2核2G,内置 SQLite,生产环境建议外置 MySQL/PostgreSQL |
| Metrics 端口 | 每个进程独立端口,避免冲突;或统一通过 Node Exporter 采集 |
| 告警规则 | PrometheusRule CRD 或独立配置文件,建议与 Grafana Alerting 二选一 |
5.3 Docker Compose 快速启动
version: '3.8'
services:
prometheus:
image: prom/prometheus:v2.47.0
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.retention.time=30d'
grafana:
image: grafana/grafana:10.1.0
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
volumes:
- grafana_data:/var/lib/grafana
depends_on:
- prometheus
volumes:
prometheus_data:
grafana_data:
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'quant-strategy'
static_configs:
- targets: ['host.docker.internal:9090']
# 如果是 Docker 网络内通信,替换为容器名+端口
# - targets: ['your-app:9090']
六、TickDB 可观测性实践
如果你正在使用 TickDB 的 WebSocket 接口获取市场数据,TickDB 原生支持 Prometheus 格式的指标暴露,可直接对接 Grafana。
| 能力 | 说明 |
|---|---|
| 指标端点 | /metrics HTTP 接口,兼容 Prometheus 抓取 |
| 覆盖范围 | 连接数、消息延迟、API 配额消耗、错误分布 |
| 标签维度 | 按 symbol、msg_type、endpoint 细分 |
| 数据保留 | 30 天热存储,支持 PromQL 跨时间范围查询 |
这意味着你的量化系统只需专注于业务指标(策略收益、订单状态),而基础设施指标(网络延迟、限频状态)可由 TickDB 直接提供,二者在 Grafana 中统一展示。
结语
可观测性不是监控的另一个名字。监控是"系统坏了我知道",可观测性是"系统还没坏我就知道它要坏"。
通过本文,你掌握了:
- Prometheus 四种指标类型的适用场景
- 量化系统核心指标的暴露规范(Counter/Gauge/Histogram)
- 生产级 Python 代码(心跳、重连、限频、指标聚合)
- Grafana 仪表盘的核心查询
- 容器化部署方案
下一步行动:
- 如果你是个人开发者,立即用 Docker Compose 起一套 Prometheus + Grafana,把你现有的策略接上指标暴露。
- 如果你是团队负责人,制定指标规范文档,要求所有服务统一接入 Prometheus 注册表。
- 如果你希望直接使用已接入可观测性体系的数据源,访问 tickdb.ai 了解 TickDB 的 WebSocket 指标支持。
风险提示:本文内容仅供技术参考,不构成任何投资建议。可观测性配置应按需调整,避免过度监控导致系统开销。