7x24 永不停歇:数字货币监控系统的特殊设计
“凌晨三点,手机震动。你睡眼惺忪地摸起手机,看见一条告警:BTC/USDT 深度异常。屏幕上显示的波动幅度是 0.3%——对于日内交易员,这甚至算不上噪声。但你没有上下文:现在是北京时间凌晨三点,纽约的交易员正在吃早餐,东京的刚刚下班。没有任何一个地方处于'正常交易时段'。
你盯着那条告警,陷入哲学思考:这条告警是真的异常,还是只是数字货币市场永远处于'交易时段'的正常波动?
这不是技术问题,这是存在主义危机。”
数字货币市场没有收盘时间。这句话听起来像常识,但在设计监控系统时,它会撕开一整套假设的裂缝。
当你为 A 股、美股设计监控时,"交易日"是清晰的:早上 9:30 到下午 4:00(美东时间)是交易时段,收盘后是风控复盘时间,第二天开盘前是数据准备窗口。你的调度任务可以安心地设定在"非交易时段"执行,你的告警可以区分"盘中剧烈波动"和"盘后正常调整"。
但加密货币交易所不会关门。7x24,永不停歇。你的监控系统必须承认这个现实,并在此基础上重建所有时间相关的概念。
本文拆解三个核心问题:
- 没有收盘,"交易日"怎么定义?
- 7x24 调度系统怎么设计?
- 告警在凌晨三点和下午三点有什么区别?
一、问题的根源:传统系统的三个隐含假设
让我们先解剖一下传统监控系统依赖的"交易日"假设。
1.1 假设一:存在"非交易时段"
传统系统的调度任务通常分为两类:
- 盘中任务:实时监控、价格预警、订单簿分析
- 盘后任务:日线计算、风险敞口汇总、数据清洗
这套机制的前提是"盘后存在"。但对于 BTC/USDT,这个假设不成立。
# 传统调度(美股)
cron: "0 16 * * 1-5" # 每天16:00(收盘后)执行日线汇总
cron: "0 9 * * 1-5" # 每个交易日9:00执行开盘准备
# 数字货币?这些 cron 表达式没有意义
没有收盘意味着:盘中和盘后的边界消失了。你的"日线"是 UTC 0 点到 24 点,还是北京时间 0 点到 24 点?如果你的团队在纽约,你用美东时间还是 UTC?
每个选择都有代价。
1.2 假设二:异常有参照系
传统告警系统依赖"正常波动范围"。这个范围通常基于历史数据计算,但隐含了一个时间维度:波动率在交易时段内和盘后是不同的。
以苹果(AAPL)为例:
| 时段 | 平均日内波动 | 盘后平均波动 |
|---|---|---|
| 交易时段 | 1.2% | - |
| 盘后 30 分钟 | - | 0.3% |
| 盘后 2 小时 | - | 0.1% |
如果你在盘后看到 0.5% 的波动,这是一个值得关注的信号。但如果你把这个阈值套用到 BTC/USDT,你会发现:0.5% 在加密货币市场是家常便饭,24 小时随时可能发生。
没有"盘后"作为参照,"异常"的定义必须重新设计。
1.3 假设三:团队在"工作时间"响应
告警的价值在于有人响应。如果凌晨 3 点的告警淹没在噪音中,团队会进入"告警疲劳"——要么忽略一切告警,要么直接关闭通知。
传统系统可以通过"只在工作时间发送非紧急告警"来缓解这个问题。但 7x24 市场意味着:任何时候都可能是"工作时间",也可能不是。
二、解决方案一:交易日的重新定义
2.1 为什么 UTC 0 点是合理基准
在数字货币监控中,"交易日"的定义需要满足三个约束:
- 内部一致性:所有数据指标使用同一基准
- 跨团队对齐:不同地理位置的团队看到相同的"今天"
- 交易所对齐:至少与主流交易所的统计口径接近
综合这三个约束,UTC 0 点作为"交易日"起点是最优选择:
| 方案 | 一致性 | 跨团队对齐 | 交易所对齐 | 综合评价 |
|---|---|---|---|---|
| UTC 0 点 | ✅ 完全一致 | ✅ 全球通用 | ✅ Binance 等以此为基础 | 推荐 |
| 北京时间 0 点 | ⚠️ 差(UTC 8:00) | ❌ 纽约团队混乱 | ⚠️ 与部分交易所统计差 8 小时 | 不推荐 |
| 本地时间 0 点 | ❌ 不可行 | ❌ 不同人看到不同"今天" | ❌ 完全无法对齐 | 禁止 |
Binance、OKX 等主流交易所的"日线数据"默认以 UTC 0 点为分界。这意味着一旦你选择 UTC 0 点,可以直接使用交易所提供的日统计数据,无需自己计算。
2.2 滚动窗口:超越"日"的概念
对于 7x24 市场,固定窗口(每天 0 点到 24 点)有一个问题:边界处的数据不连续。
例如,BTC 在 UTC 23:59:59 发生了剧烈波动,这条数据属于"今天"还是"明天"?
更好的方案是滚动窗口(Sliding Window):
传统方案(固定窗口):
|------- Day 1 -------|------- Day 2 -------|------- Day 3 -------|
滚动窗口方案(以 24 小时为周期,持续滑动):
|---24h---| → |---24h---| → |---24h---|
↑ 每小时滑动一次,重新计算过去 24 小时的指标
滚动窗口的优势:
- 无边界:任何时刻都有完整的 24 小时数据
- 平滑过渡:避免 0 点前后数据跳变
- 支持自定义周期:可以计算 6 小时、12 小时、72 小时的滚动指标
2.3 交易时段切片:数字货币的"伪盘"
虽然没有收盘,但数字货币市场确实存在相对平静期。以 UTC 时间为基准:
| UTC 时段 | 对应北京时间 | 市场特征 |
|---|---|---|
| 0:00 - 8:00 | 8:00 - 16:00 | 亚洲时段,波动相对较低 |
| 8:00 - 16:00 | 16:00 - 0:00 | 欧洲时段,流动性增加 |
| 16:00 - 24:00 | 0:00 - 8:00 | 美亚重叠时段,波动最剧烈 |
这不是"交易时段/非交易时段"的二元划分,而是一个连续的光谱。 你的监控系统可以利用这个规律,对不同时段设置不同的告警阈值。
三、解决方案二:7x24 调度系统设计
3.1 调度器的核心挑战
传统调度器(如 cron)的设计假设是:任务在"需要时"运行,不需要时休息。但在 7x24 场景下,这个假设需要翻转:任务需要持续运行,只是在不同时间做不同的事。
两种调度范式的对比:
| 维度 | 传统调度(cron) | 7x24 调度(Continuous) |
|---|---|---|
| 触发方式 | 时间点触发 | 条件触发 + 时间触发 |
| 状态保持 | 无状态,每次独立 | 需维护状态(上次执行结果等) |
| 异常恢复 | 依赖外部监控 | 内置重试和状态恢复 |
| 资源占用 | 低(非运行时段几乎不占用) | 持续占用 |
| 适用场景 | 批处理、定时任务 | 实时监控、持续计算 |
3.2 分层调度架构
推荐的分层调度架构:
┌─────────────────────────────────────────────────────────────┐
│ 调度协调层(Scheduler) │
├─────────────────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 高频任务 │ │ 中频任务 │ │ 低频任务 │ │
│ │ (每 1-60s) │ │ (每 1-60m) │ │ (每小时+) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ 数据层(TickDB) │
│ 实时 depth/trades + 历史 kline │
└─────────────────────────────────────────────────────────────┘
各层职责:
- 高频任务:实时订单簿监控、买卖压力比计算、深度异常检测
- 中频任务:滚动窗口统计、波动率更新、趋势判断
- 低频任务:日线汇总、告警汇总报告、系统健康检查
3.3 生产级代码:7x24 调度引擎
以下是基于 Python 的 7x24 调度引擎核心实现:
import asyncio
import logging
from datetime import datetime, timezone
from typing import Callable, Dict, List, Optional
import os
logger = logging.getLogger(__name__)
class ContinuousScheduler:
"""
7x24 持续调度器
设计原则:
1. 基于 asyncio 实现,不阻塞主线程
2. 支持不同频率的任务分层调度
3. 内置心跳和健康检查
4. 任务失败自动重试(指数退避)
"""
def __init__(self):
self.tasks: Dict[str, asyncio.Task] = {}
self.last_run: Dict[str, datetime] = {}
self.failure_count: Dict[str, int] = {}
self.running = False
# 任务注册表:{name: (interval_seconds, callback, options)}
self._registry: Dict[str, tuple] = {}
def register(
self,
name: str,
interval: int,
callback: Callable,
*,
initial_delay: float = 0,
max_retries: int = 3,
retry_base_delay: float = 1.0
):
"""
注册调度任务
Args:
name: 任务名称(唯一标识)
interval: 执行间隔(秒)
callback: 异步回调函数
initial_delay: 初始延迟
max_retries: 最大重试次数
retry_base_delay: 重试基础延迟
"""
self._registry[name] = (
interval,
callback,
{
'initial_delay': initial_delay,
'max_retries': max_retries,
'retry_base_delay': retry_base_delay
}
)
logger.info(f"Registered task: {name} (interval={interval}s)")
async def _run_task(self, name: str):
"""执行单个任务,支持重试"""
interval, callback, options = self._registry[name]
max_retries = options['max_retries']
base_delay = options['retry_base_delay']
while self.running:
try:
await asyncio.sleep(interval)
# 记录执行开始时间
start_time = datetime.now(timezone.utc)
self.last_run[name] = start_time
# 执行任务
result = await callback()
# 重置失败计数
self.failure_count[name] = 0
logger.debug(f"Task {name} completed at {start_time.isoformat()}")
except asyncio.CancelledError:
logger.info(f"Task {name} cancelled")
break
except Exception as e:
self.failure_count[name] = self.failure_count.get(name, 0) + 1
retry_count = self.failure_count[name]
if retry_count > max_retries:
logger.error(
f"Task {name} failed {retry_count} times, "
f"entering cool-down: {e}"
)
# 进入冷却期:延长间隔
await asyncio.sleep(60 * (retry_count - max_retries))
else:
# 指数退避重试
delay = base_delay * (2 ** (retry_count - 1))
jitter = asyncio.get_event_loop().time() % 1 # 添加抖动
logger.warning(
f"Task {name} failed (attempt {retry_count}), "
f"retry in {delay + jitter:.1f}s: {e}"
)
await asyncio.sleep(delay + jitter)
async def _health_checker(self):
"""健康检查任务:监控所有任务状态"""
while self.running:
await asyncio.sleep(60) # 每分钟检查一次
for name in self._registry:
last = self.last_run.get(name)
failures = self.failure_count.get(name, 0)
if last:
elapsed = (datetime.now(timezone.utc) - last).total_seconds()
interval = self._registry[name][0]
if elapsed > interval * 3:
logger.warning(
f"Task {name} may be stuck: "
f"last ran {elapsed:.0f}s ago "
f"(interval={interval}s)"
)
if failures > 0:
logger.info(
f"Task {name} failure count: {failures}"
)
async def start(self):
"""启动调度器"""
self.running = True
# 启动所有注册的任务
for name in self._registry:
interval, _, _ = self._registry[name]
self.tasks[name] = asyncio.create_task(self._run_task(name))
# 启动健康检查
self.tasks['_health_check'] = asyncio.create_task(self._health_checker())
logger.info(f"Scheduler started with {len(self._registry)} tasks")
# 保持运行直到被取消
try:
await asyncio.gather(*self.tasks.values())
except asyncio.CancelledError:
logger.info("Scheduler shutting down...")
async def stop(self):
"""停止调度器"""
self.running = False
for task in self.tasks.values():
task.cancel()
await asyncio.gather(*self.tasks.values(), return_exceptions=True)
logger.info("Scheduler stopped")
# ============ 示例:数字货币监控系统任务 ============
async def monitor_depth_anomaly():
"""
高频任务:深度异常检测
订阅 TickDB depth 频道,检测买卖深度失衡
"""
import aiohttp
import json
api_key = os.environ.get("TICKDB_API_KEY")
if not api_key:
logger.warning("TICKDB_API_KEY not set, skipping depth monitor")
return None
url = "wss://api.tickdb.ai/ws/market/depth"
async with aiohttp.ClientSession() as session:
params = {"api_key": api_key, "symbol": "BTC.USDT"}
async with session.ws_connect(url, params=params) as ws:
# 心跳保活
ping_task = asyncio.create_task(_ping_loop(ws))
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
await _check_depth_imbalance(data)
elif msg.type == aiohttp.WSMsgType.CLOSING:
break
finally:
ping_task.cancel()
async def _ping_loop(ws):
"""WebSocket 心跳"""
while True:
await asyncio.sleep(30)
await ws.send_json({"cmd": "ping"})
async def _check_depth_imbalance(data: dict):
"""检查深度失衡"""
bids = data.get('bids', [])
asks = data.get('asks', [])
if not bids or not asks:
return
bid_total = sum(float(q) for _, q in bids[:10])
ask_total = sum(float(q) for _, q in asks[:10])
if bid_total == 0 or ask_total == 0:
return
pressure_ratio = bid_total / ask_total
# 检测极端失衡(超过 3:1 或 1:3)
if pressure_ratio > 3.0 or pressure_ratio < 0.33:
logger.warning(
f"Depth imbalance detected: pressure_ratio={pressure_ratio:.2f} "
f"(bid={bid_total:.0f}, ask={ask_total:.0f})"
)
# 触发告警(见下一节)
async def rolling_volatility_update():
"""
中频任务:更新滚动波动率
计算过去 24 小时的滚动波动率
"""
import requests
api_key = os.environ.get("TICKDB_API_KEY")
# 获取过去 24 小时的 1 小时 K 线
response = requests.get(
"https://api.tickdb.ai/v1/market/kline",
headers={"X-API-Key": api_key},
params={
"symbol": "BTC.USDT",
"interval": "1h",
"limit": 24
},
timeout=(3.05, 10)
)
data = response.json()
if data.get('code') != 0:
logger.error(f"Failed to fetch kline: {data}")
return None
klines = data['data']
returns = []
for i in range(1, len(klines)):
prev_close = float(klines[i-1]['close'])
curr_close = float(klines[i]['close'])
if prev_close > 0:
returns.append((curr_close - prev_close) / prev_close)
if not returns:
return None
# 计算标准差(年化波动率)
import statistics
daily_vol = statistics.stdev(returns)
annualized_vol = daily_vol * (24 ** 0.5) * 100 # 转为百分比
logger.info(f"24h Rolling volatility: {annualized_vol:.2f}%")
# 与历史均值比较
HISTORICAL_AVG_VOL = 65.0 # BTC 历史平均波动率(约 65%)
if annualized_vol > HISTORICAL_AVG_VOL * 1.5:
logger.warning(
f"Abnormally high volatility: {annualized_vol:.2f}% "
f"(historical avg: {HISTORICAL_AVG_VOL}%)"
)
return {'volatility': annualized_vol, 'timestamp': datetime.now(timezone.utc).isoformat()}
# ============ 启动调度器 ============
async def main():
scheduler = ContinuousScheduler()
# 注册任务
scheduler.register(
"depth_monitor",
interval=5, # 每 5 秒检查一次
callback=monitor_depth_anomaly,
max_retries=5
)
scheduler.register(
"volatility_update",
interval=300, # 每 5 分钟更新一次
callback=rolling_volatility_update,
initial_delay=10 # 启动后 10 秒开始
)
# 启动
await scheduler.start()
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s"
)
asyncio.run(main())
代码说明:
- 心跳保活:
_ping_loop每 30 秒发送一次 ping,防止 WebSocket 连接断开 - 指数退避重试:任务失败后等待时间指数增长(2^n 秒),避免频繁重试
- 抖动:在退避时间上添加随机抖动,防止多任务同时重试造成的"惊群效应"
- 健康检查:每秒检查所有任务的最后执行时间,发现"卡住"的任务
- 任务分层:高频(5 秒)、中频(5 分钟)、低频(可扩展)分层调度
四、解决方案三:告警防疲劳机制
4.1 告警疲劳的根源
告警疲劳(Alert Fatigue)在 7x24 场景下尤为严重:
- 持续压力:没有任何"休息时间",告警随时可能响起
- 信号淹没:噪音告警太多,真正的异常被稀释
- 上下文缺失:凌晨的告警没有足够的背景信息
核心解法:不是减少告警数量,而是提高告警质量——让每一次告警都值得响应。
4.2 四层防疲劳机制
| 层级 | 机制 | 效果 |
|---|---|---|
| 去重 | 同类告警在冷却期内只触发一次 | 减少重复告警 |
| 聚合 | 将短时间内的多个相关告警合并为一条 | 避免告警风暴 |
| 分级 | 按紧急程度分配不同响应策略 | 优化注意力分配 |
| 静默 | 可配置的静默窗口(可叠加) | 允许主动暂停告警 |
4.3 生产级代码:告警防疲劳引擎
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Set
from datetime import datetime, timedelta, timezone
from collections import defaultdict
import asyncio
import logging
import hashlib
logger = logging.getLogger(__name__)
@dataclass
class Alert:
"""告警数据模型"""
level: str # critical / warning / info
title: str
message: str
tags: Set[str] = field(default_factory=set) # 用于聚合的标签
metadata: Dict = field(default_factory=dict) # 额外上下文
def fingerprint(self) -> str:
"""生成唯一指纹,用于去重和聚合"""
# 基于 level + title + tags 生成指纹
content = f"{self.level}:{self.title}:{','.join(sorted(self.tags))}"
return hashlib.md5(content.encode()).hexdigest()[:12]
@dataclass
class AlertChannel:
"""告警渠道配置"""
name: str
min_level: str # 该渠道接收的最低告警级别
cooldown_seconds: int # 该渠道的冷却时间
class AlertManager:
"""
告警防疲劳引擎
功能:
1. 去重:同类告警在冷却期内只发送一次
2. 聚合:将短时间内多个相关告警合并
3. 分级:按级别路由到不同渠道
4. 静默:支持配置的静默窗口
"""
# 告警级别定义
LEVELS = ['debug', 'info', 'warning', 'critical']
LEVEL_PRIORITY = {l: i for i, l in enumerate(LEVELS)}
def __init__(self):
# 去重表:{fingerprint: last_sent_time}
self._dedup_cache: Dict[str, datetime] = {}
# 聚合缓冲区:{fingerprint: [alert, ...]}
self._aggregation_buffer: Dict[str, List[Alert]] = defaultdict(list)
# 静默规则:{rule_name: (start_hour, end_hour, weekdays)}
self._silence_rules: List[Dict] = []
# 冷却配置(秒)
self._cooldown: Dict[str, int] = {
'critical': 300, # 严重告警:5 分钟冷却
'warning': 900, # 警告告警:15 分钟冷却
'info': 3600, # 信息告警:1 小时冷却
}
# 聚合窗口(秒):窗口内的同类告警会被聚合
self._aggregation_window = 60
# 渠道配置
self._channels: Dict[str, AlertChannel] = {
'sms': AlertChannel('sms', 'critical', cooldown_seconds=1800),
'push': AlertChannel('push', 'warning', cooldown_seconds=600),
'log': AlertChannel('log', 'info', cooldown_seconds=60),
}
# 锁
self._lock = asyncio.Lock()
# 聚合定时器
self._aggregation_task: Optional[asyncio.Task] = None
def add_silence_rule(
self,
name: str,
start_hour: int,
end_hour: int,
weekdays: Optional[List[int]] = None,
tags: Optional[Set[str]] = None
):
"""
添加静默规则
Args:
name: 规则名称
start_hour: 静默开始小时(UTC)
end_hour: 静默结束小时(UTC)
weekdays: 静默适用的星期几(0=周一, 6=周日),None 表示每天都适用
tags: 仅静默带有这些标签的告警,None 表示静默所有
"""
self._silence_rules.append({
'name': name,
'start_hour': start_hour,
'end_hour': end_hour,
'weekdays': weekdays,
'tags': tags
})
logger.info(f"Added silence rule: {name}")
async def send(self, alert: Alert) -> bool:
"""
发送告警(入口方法)
Returns:
True if alert was sent, False if suppressed
"""
async with self._lock:
now = datetime.now(timezone.utc)
# 第一层:静默检查
if self._is_silenced(alert, now):
logger.debug(f"Alert silenced: {alert.title}")
return False
# 第二层:去重检查
fingerprint = alert.fingerprint()
if self._is_duplicate(fingerprint, alert.level, now):
logger.debug(f"Alert deduplicated: {alert.title}")
return False
# 第三层:聚合
self._aggregation_buffer[fingerprint].append(alert)
# 如果是立即告警(非聚合场景),直接发送
if alert.level == 'critical':
await self._flush_fingerprint(fingerprint)
return True
def _is_silenced(self, alert: Alert, now: datetime) -> bool:
"""检查告警是否在静默期内"""
current_hour = now.hour
current_weekday = now.weekday()
for rule in self._silence_rules:
# 检查时间窗口
if rule['start_hour'] <= current_hour < rule['end_hour']:
# 检查星期
if rule['weekdays'] and current_weekday not in rule['weekdays']:
continue
# 检查标签
if rule['tags'] and not alert.tags.intersection(rule['tags']):
continue
logger.info(f"Alert matched silence rule: {rule['name']}")
return True
return False
def _is_duplicate(self, fingerprint: str, level: str, now: datetime) -> bool:
"""检查是否重复告警"""
if fingerprint not in self._dedup_cache:
return False
last_sent = self._dedup_cache[fingerprint]
cooldown = self._cooldown.get(level, 600)
if (now - last_sent).total_seconds() < cooldown:
return True
return False
async def _flush_fingerprint(self, fingerprint: str):
"""发送聚合后的告警"""
alerts = self._aggregation_buffer.get(fingerprint, [])
if not alerts:
return
# 合并告警
combined_alert = self._merge_alerts(alerts)
# 按渠道发送
for channel_name, channel in self._channels.items():
if self._should_send_to_channel(combined_alert, channel):
await self._send_to_channel(channel_name, combined_alert)
# 更新去重缓存
self._dedup_cache[fingerprint] = datetime.now(timezone.utc)
# 清理缓冲区
del self._aggregation_buffer[fingerprint]
def _merge_alerts(self, alerts: List[Alert]) -> Alert:
"""合并多条告警"""
if len(alerts) == 1:
return alerts[0]
# 取最高级别
max_level = max(alerts, key=lambda a: self.LEVEL_PRIORITY[a.level]).level
# 合并元数据
merged_metadata = {'count': len(alerts), 'alerts': []}
for a in alerts:
merged_metadata['alerts'].append({
'title': a.title,
'message': a.message,
'timestamp': datetime.now(timezone.utc).isoformat()
})
return Alert(
level=max_level,
title=f"[{len(alerts)}x] {alerts[0].title}",
message=f"Multiple alerts in aggregation window",
tags=alerts[0].tags,
metadata=merged_metadata
)
def _should_send_to_channel(self, alert: Alert, channel: AlertChannel) -> bool:
"""检查是否应发送到该渠道"""
alert_priority = self.LEVEL_PRIORITY.get(alert.level, 0)
channel_priority = self.LEVEL_PRIORITY.get(channel.min_level, 0)
return alert_priority >= channel_priority
async def _send_to_channel(self, channel: str, alert: Alert):
"""发送到指定渠道(示例实现)"""
if channel == 'sms':
# 实际实现:调用短信 API
logger.warning(f"[SMS] {alert.level.upper()}: {alert.title}")
elif channel == 'push':
# 实际实现:调用推送服务
logger.warning(f"[PUSH] {alert.level.upper()}: {alert.title}")
elif channel == 'log':
# 日志渠道:始终记录
logger.info(f"[LOG] {alert.title}")
async def start_aggregation_loop(self):
"""启动聚合定时器(定期 flush 缓冲区)"""
while True:
await asyncio.sleep(self._aggregation_window)
async with self._lock:
fingerprints = list(self._aggregation_buffer.keys())
for fp in fingerprints:
await self._flush_fingerprint(fp)
async def flush_all(self):
"""手动 flush 所有缓冲区"""
async with self._lock:
fingerprints = list(self._aggregation_buffer.keys())
for fp in fingerprints:
await self._flush_fingerprint(fp)
# ============ 使用示例 ============
async def demo_alert_system():
"""演示告警防疲劳系统"""
manager = AlertManager()
# 添加静默规则:UTC 2:00-6:00(北京时间 10:00-14:00)静默 info 级别告警
manager.add_silence_rule(
name="lunch_silence",
start_hour=2,
end_hour=6,
tags={'info', 'news'} # 仅静默 info 和 news 标签
)
# 启动聚合循环
agg_task = asyncio.create_task(manager.start_aggregation_loop())
# 模拟告警场景
test_alerts = [
Alert(
level='critical',
title='BTC深度异常',
message='买卖压力比突破 5:1',
tags={'depth', 'critical'}
),
Alert(
level='warning',
title='波动率升高',
message='24h 波动率触及 1.5 倍历史均值',
tags={'volatility', 'warning'}
),
Alert(
level='info',
title='新地址活动',
message='检测到大额转入',
tags={'onchain', 'info'}
),
]
for alert in test_alerts:
sent = await manager.send(alert)
print(f"Sent: {sent} - {alert.title}")
# 模拟短时间内多次触发同类告警
print("\n--- Simulating alert burst ---")
for i in range(5):
alert = Alert(
level='warning',
title='订单簿刷新',
message=f'检测到订单簿更新 #{i+1}',
tags={'depth'}
)
sent = await manager.send(alert)
print(f"Attempt {i+1}: Sent={sent}")
# 等待聚合
await asyncio.sleep(2)
await manager.flush_all()
agg_task.cancel()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(demo_alert_system())
核心机制说明:
- 去重指纹:基于
level + title + tags生成哈希,相同组合在冷却期内只告警一次 - 聚合窗口:60 秒内的同类告警被合并,最后发送一条"5x"的聚合告警
- 静默规则:支持按时间、星期、标签三个维度配置静默
- 分级路由:critical 发所有渠道,warning 发推送和日志,info 只发日志
五、系统架构总览
将上述三个方案整合,完整的 7x24 数字货币监控系统架构如下:
┌─────────────────────────────────────────────────────────────────┐
│ 数据源层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ TickDB │ │ 交易所 │ │ 链上数据 │ │
│ │ WebSocket │ │ REST API │ │ 浏览器 │ │
│ │ (depth, │ │ (补充数据) │ │ (可选) │ │
│ │ trades) │ │ │ │ │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
└─────────┼────────────────┼────────────────┼─────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ 调度执行层 │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ ContinuousScheduler (7x24) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 高频任务 │ │ 中频任务 │ │ 低频任务 │ │ │
│ │ │ (5s) │ │ (5min) │ │ (1h+) │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 告警处理层 │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ AlertManager │ │
│ │ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │ │
│ │ │ 去重 │→ │ 聚合 │→ │ 分级 │→ │ 静默 │ │ │
│ │ └────────┘ └────────┘ └────────┘ └────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────┼───────────┐ │
│ ▼ ▼ ▼ │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ SMS │ │ PUSH │ │ LOG │ │
│ └────────┘ └────────┘ └────────┘ │
└─────────────────────────────────────────────────────────────────┘
六、部署方案对比
| 维度 | 个人开发者 | 小型团队 | 机构级 |
|---|---|---|---|
| 调度方案 | Python asyncio 单进程 | 多进程 + Redis 队列 | Kubernetes + 分布式调度 |
| 告警渠道 | 日志 + Push(免费) | Push + 飞书/钉钉 | 全渠道 + 专人值班 |
| 数据存储 | 本地 SQLite | PostgreSQL | 时序数据库(InfluxDB/Prometheus) |
| 故障恢复 | 手动重启 | Supervisor 自动拉起 | 健康检查 + 自动扩缩容 |
| TickDB 使用 | 免费层(基础 depth) | 标准层(depth + trades) | 专业层(全量数据 + SLA) |
结语
数字货币的 7x24 特性不是一个"特殊情况",而是一个更纯粹的挑战:当"交易日"这个基本假设消失后,你必须从头审视监控系统的每一个时间相关设计。
核心结论:
- 交易日的定义:采用 UTC 0 点作为基准,配合滚动窗口实现无边界数据统计
- 调度系统:从"定时触发"转向"持续运行",支持分层频率和故障自愈
- 告警机制:从"发出去"转向"发对",通过去重、聚合、分级、静默四层过滤提高告警质量
这些问题没有唯一正确答案,但有更好的起点。从承认"7x24 是常态"开始,你的监控系统设计就已经赢了一半。
下一步行动
如果你是个人开发者,想快速验证本文方案:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台生成 API Key,设置环境变量
TICKDB_API_KEY - 复制本文代码,直接运行
scheduler.py和alert_manager.py
如果你需要更高频率的 depth 数据和 trades 数据:
联系 [email protected] 了解专业版方案,支持每秒级别的数据推送和机构级 SLA。
如果你习惯用 AI 辅助开发:
在 AI 助手中搜索安装 tickdb-market-data SKILL,用自然语言查询数字货币的订单簿和成交数据。
本文不构成任何投资建议。市场有风险,投资需谨慎。