7x24 永不停歇:数字货币监控系统的特殊设计

“凌晨三点,手机震动。你睡眼惺忪地摸起手机,看见一条告警:BTC/USDT 深度异常。屏幕上显示的波动幅度是 0.3%——对于日内交易员,这甚至算不上噪声。但你没有上下文:现在是北京时间凌晨三点,纽约的交易员正在吃早餐,东京的刚刚下班。没有任何一个地方处于'正常交易时段'。

你盯着那条告警,陷入哲学思考:这条告警是真的异常,还是只是数字货币市场永远处于'交易时段'的正常波动?

这不是技术问题,这是存在主义危机。”

数字货币市场没有收盘时间。这句话听起来像常识,但在设计监控系统时,它会撕开一整套假设的裂缝。

当你为 A 股、美股设计监控时,"交易日"是清晰的:早上 9:30 到下午 4:00(美东时间)是交易时段,收盘后是风控复盘时间,第二天开盘前是数据准备窗口。你的调度任务可以安心地设定在"非交易时段"执行,你的告警可以区分"盘中剧烈波动"和"盘后正常调整"。

但加密货币交易所不会关门。7x24,永不停歇。你的监控系统必须承认这个现实,并在此基础上重建所有时间相关的概念。

本文拆解三个核心问题:

  1. 没有收盘,"交易日"怎么定义?
  2. 7x24 调度系统怎么设计?
  3. 告警在凌晨三点和下午三点有什么区别?

一、问题的根源:传统系统的三个隐含假设

让我们先解剖一下传统监控系统依赖的"交易日"假设。

1.1 假设一:存在"非交易时段"

传统系统的调度任务通常分为两类:

  • 盘中任务:实时监控、价格预警、订单簿分析
  • 盘后任务:日线计算、风险敞口汇总、数据清洗

这套机制的前提是"盘后存在"。但对于 BTC/USDT,这个假设不成立。

# 传统调度(美股)
cron: "0 16 * * 1-5"    # 每天16:00(收盘后)执行日线汇总
cron: "0 9 * * 1-5"     # 每个交易日9:00执行开盘准备

# 数字货币?这些 cron 表达式没有意义

没有收盘意味着:盘中和盘后的边界消失了。你的"日线"是 UTC 0 点到 24 点,还是北京时间 0 点到 24 点?如果你的团队在纽约,你用美东时间还是 UTC?

每个选择都有代价。

1.2 假设二:异常有参照系

传统告警系统依赖"正常波动范围"。这个范围通常基于历史数据计算,但隐含了一个时间维度:波动率在交易时段内和盘后是不同的

以苹果(AAPL)为例:

时段 平均日内波动 盘后平均波动
交易时段 1.2% -
盘后 30 分钟 - 0.3%
盘后 2 小时 - 0.1%

如果你在盘后看到 0.5% 的波动,这是一个值得关注的信号。但如果你把这个阈值套用到 BTC/USDT,你会发现:0.5% 在加密货币市场是家常便饭,24 小时随时可能发生。

没有"盘后"作为参照,"异常"的定义必须重新设计。

1.3 假设三:团队在"工作时间"响应

告警的价值在于有人响应。如果凌晨 3 点的告警淹没在噪音中,团队会进入"告警疲劳"——要么忽略一切告警,要么直接关闭通知。

传统系统可以通过"只在工作时间发送非紧急告警"来缓解这个问题。但 7x24 市场意味着:任何时候都可能是"工作时间",也可能不是。


二、解决方案一:交易日的重新定义

2.1 为什么 UTC 0 点是合理基准

在数字货币监控中,"交易日"的定义需要满足三个约束:

  1. 内部一致性:所有数据指标使用同一基准
  2. 跨团队对齐:不同地理位置的团队看到相同的"今天"
  3. 交易所对齐:至少与主流交易所的统计口径接近

综合这三个约束,UTC 0 点作为"交易日"起点是最优选择:

方案 一致性 跨团队对齐 交易所对齐 综合评价
UTC 0 点 ✅ 完全一致 ✅ 全球通用 ✅ Binance 等以此为基础 推荐
北京时间 0 点 ⚠️ 差(UTC 8:00) ❌ 纽约团队混乱 ⚠️ 与部分交易所统计差 8 小时 不推荐
本地时间 0 点 ❌ 不可行 ❌ 不同人看到不同"今天" ❌ 完全无法对齐 禁止

Binance、OKX 等主流交易所的"日线数据"默认以 UTC 0 点为分界。这意味着一旦你选择 UTC 0 点,可以直接使用交易所提供的日统计数据,无需自己计算。

2.2 滚动窗口:超越"日"的概念

对于 7x24 市场,固定窗口(每天 0 点到 24 点)有一个问题:边界处的数据不连续

例如,BTC 在 UTC 23:59:59 发生了剧烈波动,这条数据属于"今天"还是"明天"?

更好的方案是滚动窗口(Sliding Window):

传统方案(固定窗口):
|------- Day 1 -------|------- Day 2 -------|------- Day 3 -------|

滚动窗口方案(以 24 小时为周期,持续滑动):
|---24h---| → |---24h---| → |---24h---|
     ↑ 每小时滑动一次,重新计算过去 24 小时的指标

滚动窗口的优势:

  • 无边界:任何时刻都有完整的 24 小时数据
  • 平滑过渡:避免 0 点前后数据跳变
  • 支持自定义周期:可以计算 6 小时、12 小时、72 小时的滚动指标

2.3 交易时段切片:数字货币的"伪盘"

虽然没有收盘,但数字货币市场确实存在相对平静期。以 UTC 时间为基准:

UTC 时段 对应北京时间 市场特征
0:00 - 8:00 8:00 - 16:00 亚洲时段,波动相对较低
8:00 - 16:00 16:00 - 0:00 欧洲时段,流动性增加
16:00 - 24:00 0:00 - 8:00 美亚重叠时段,波动最剧烈

这不是"交易时段/非交易时段"的二元划分,而是一个连续的光谱。 你的监控系统可以利用这个规律,对不同时段设置不同的告警阈值。


三、解决方案二:7x24 调度系统设计

3.1 调度器的核心挑战

传统调度器(如 cron)的设计假设是:任务在"需要时"运行,不需要时休息。但在 7x24 场景下,这个假设需要翻转:任务需要持续运行,只是在不同时间做不同的事。

两种调度范式的对比:

维度 传统调度(cron) 7x24 调度(Continuous)
触发方式 时间点触发 条件触发 + 时间触发
状态保持 无状态,每次独立 需维护状态(上次执行结果等)
异常恢复 依赖外部监控 内置重试和状态恢复
资源占用 低(非运行时段几乎不占用) 持续占用
适用场景 批处理、定时任务 实时监控、持续计算

3.2 分层调度架构

推荐的分层调度架构:

┌─────────────────────────────────────────────────────────────┐
│                     调度协调层(Scheduler)                   │
├─────────────────────────────────────────────────────────────┤
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐      │
│  │  高频任务     │  │  中频任务     │  │  低频任务     │      │
│  │  (每 1-60s)  │  │  (每 1-60m)  │  │  (每小时+)   │      │
│  └──────────────┘  └──────────────┘  └──────────────┘      │
├─────────────────────────────────────────────────────────────┤
│                      数据层(TickDB)                        │
│              实时 depth/trades + 历史 kline                  │
└─────────────────────────────────────────────────────────────┘

各层职责

  • 高频任务:实时订单簿监控、买卖压力比计算、深度异常检测
  • 中频任务:滚动窗口统计、波动率更新、趋势判断
  • 低频任务:日线汇总、告警汇总报告、系统健康检查

3.3 生产级代码:7x24 调度引擎

以下是基于 Python 的 7x24 调度引擎核心实现:

import asyncio
import logging
from datetime import datetime, timezone
from typing import Callable, Dict, List, Optional
import os

logger = logging.getLogger(__name__)


class ContinuousScheduler:
    """
    7x24 持续调度器
    
    设计原则:
    1. 基于 asyncio 实现,不阻塞主线程
    2. 支持不同频率的任务分层调度
    3. 内置心跳和健康检查
    4. 任务失败自动重试(指数退避)
    """
    
    def __init__(self):
        self.tasks: Dict[str, asyncio.Task] = {}
        self.last_run: Dict[str, datetime] = {}
        self.failure_count: Dict[str, int] = {}
        self.running = False
        
        # 任务注册表:{name: (interval_seconds, callback, options)}
        self._registry: Dict[str, tuple] = {}
        
    def register(
        self,
        name: str,
        interval: int,
        callback: Callable,
        *,
        initial_delay: float = 0,
        max_retries: int = 3,
        retry_base_delay: float = 1.0
    ):
        """
        注册调度任务
        
        Args:
            name: 任务名称(唯一标识)
            interval: 执行间隔(秒)
            callback: 异步回调函数
            initial_delay: 初始延迟
            max_retries: 最大重试次数
            retry_base_delay: 重试基础延迟
        """
        self._registry[name] = (
            interval,
            callback,
            {
                'initial_delay': initial_delay,
                'max_retries': max_retries,
                'retry_base_delay': retry_base_delay
            }
        )
        logger.info(f"Registered task: {name} (interval={interval}s)")
    
    async def _run_task(self, name: str):
        """执行单个任务,支持重试"""
        interval, callback, options = self._registry[name]
        max_retries = options['max_retries']
        base_delay = options['retry_base_delay']
        
        while self.running:
            try:
                await asyncio.sleep(interval)
                
                # 记录执行开始时间
                start_time = datetime.now(timezone.utc)
                self.last_run[name] = start_time
                
                # 执行任务
                result = await callback()
                
                # 重置失败计数
                self.failure_count[name] = 0
                
                logger.debug(f"Task {name} completed at {start_time.isoformat()}")
                
            except asyncio.CancelledError:
                logger.info(f"Task {name} cancelled")
                break
                
            except Exception as e:
                self.failure_count[name] = self.failure_count.get(name, 0) + 1
                retry_count = self.failure_count[name]
                
                if retry_count > max_retries:
                    logger.error(
                        f"Task {name} failed {retry_count} times, "
                        f"entering cool-down: {e}"
                    )
                    # 进入冷却期:延长间隔
                    await asyncio.sleep(60 * (retry_count - max_retries))
                else:
                    # 指数退避重试
                    delay = base_delay * (2 ** (retry_count - 1))
                    jitter = asyncio.get_event_loop().time() % 1  # 添加抖动
                    logger.warning(
                        f"Task {name} failed (attempt {retry_count}), "
                        f"retry in {delay + jitter:.1f}s: {e}"
                    )
                    await asyncio.sleep(delay + jitter)
    
    async def _health_checker(self):
        """健康检查任务:监控所有任务状态"""
        while self.running:
            await asyncio.sleep(60)  # 每分钟检查一次
            
            for name in self._registry:
                last = self.last_run.get(name)
                failures = self.failure_count.get(name, 0)
                
                if last:
                    elapsed = (datetime.now(timezone.utc) - last).total_seconds()
                    interval = self._registry[name][0]
                    
                    if elapsed > interval * 3:
                        logger.warning(
                            f"Task {name} may be stuck: "
                            f"last ran {elapsed:.0f}s ago "
                            f"(interval={interval}s)"
                        )
                
                if failures > 0:
                    logger.info(
                        f"Task {name} failure count: {failures}"
                    )
    
    async def start(self):
        """启动调度器"""
        self.running = True
        
        # 启动所有注册的任务
        for name in self._registry:
            interval, _, _ = self._registry[name]
            self.tasks[name] = asyncio.create_task(self._run_task(name))
        
        # 启动健康检查
        self.tasks['_health_check'] = asyncio.create_task(self._health_checker())
        
        logger.info(f"Scheduler started with {len(self._registry)} tasks")
        
        # 保持运行直到被取消
        try:
            await asyncio.gather(*self.tasks.values())
        except asyncio.CancelledError:
            logger.info("Scheduler shutting down...")
    
    async def stop(self):
        """停止调度器"""
        self.running = False
        
        for task in self.tasks.values():
            task.cancel()
        
        await asyncio.gather(*self.tasks.values(), return_exceptions=True)
        logger.info("Scheduler stopped")


# ============ 示例:数字货币监控系统任务 ============

async def monitor_depth_anomaly():
    """
    高频任务:深度异常检测
    订阅 TickDB depth 频道,检测买卖深度失衡
    """
    import aiohttp
    import json
    
    api_key = os.environ.get("TICKDB_API_KEY")
    if not api_key:
        logger.warning("TICKDB_API_KEY not set, skipping depth monitor")
        return None
    
    url = "wss://api.tickdb.ai/ws/market/depth"
    
    async with aiohttp.ClientSession() as session:
        params = {"api_key": api_key, "symbol": "BTC.USDT"}
        
        async with session.ws_connect(url, params=params) as ws:
            # 心跳保活
            ping_task = asyncio.create_task(_ping_loop(ws))
            
            try:
                async for msg in ws:
                    if msg.type == aiohttp.WSMsgType.TEXT:
                        data = json.loads(msg.data)
                        await _check_depth_imbalance(data)
                    elif msg.type == aiohttp.WSMsgType.CLOSING:
                        break
            finally:
                ping_task.cancel()


async def _ping_loop(ws):
    """WebSocket 心跳"""
    while True:
        await asyncio.sleep(30)
        await ws.send_json({"cmd": "ping"})


async def _check_depth_imbalance(data: dict):
    """检查深度失衡"""
    bids = data.get('bids', [])
    asks = data.get('asks', [])
    
    if not bids or not asks:
        return
    
    bid_total = sum(float(q) for _, q in bids[:10])
    ask_total = sum(float(q) for _, q in asks[:10])
    
    if bid_total == 0 or ask_total == 0:
        return
    
    pressure_ratio = bid_total / ask_total
    
    # 检测极端失衡(超过 3:1 或 1:3)
    if pressure_ratio > 3.0 or pressure_ratio < 0.33:
        logger.warning(
            f"Depth imbalance detected: pressure_ratio={pressure_ratio:.2f} "
            f"(bid={bid_total:.0f}, ask={ask_total:.0f})"
        )
        # 触发告警(见下一节)


async def rolling_volatility_update():
    """
    中频任务:更新滚动波动率
    计算过去 24 小时的滚动波动率
    """
    import requests
    
    api_key = os.environ.get("TICKDB_API_KEY")
    
    # 获取过去 24 小时的 1 小时 K 线
    response = requests.get(
        "https://api.tickdb.ai/v1/market/kline",
        headers={"X-API-Key": api_key},
        params={
            "symbol": "BTC.USDT",
            "interval": "1h",
            "limit": 24
        },
        timeout=(3.05, 10)
    )
    
    data = response.json()
    if data.get('code') != 0:
        logger.error(f"Failed to fetch kline: {data}")
        return None
    
    klines = data['data']
    returns = []
    
    for i in range(1, len(klines)):
        prev_close = float(klines[i-1]['close'])
        curr_close = float(klines[i]['close'])
        if prev_close > 0:
            returns.append((curr_close - prev_close) / prev_close)
    
    if not returns:
        return None
    
    # 计算标准差(年化波动率)
    import statistics
    daily_vol = statistics.stdev(returns)
    annualized_vol = daily_vol * (24 ** 0.5) * 100  # 转为百分比
    
    logger.info(f"24h Rolling volatility: {annualized_vol:.2f}%")
    
    # 与历史均值比较
    HISTORICAL_AVG_VOL = 65.0  # BTC 历史平均波动率(约 65%)
    
    if annualized_vol > HISTORICAL_AVG_VOL * 1.5:
        logger.warning(
            f"Abnormally high volatility: {annualized_vol:.2f}% "
            f"(historical avg: {HISTORICAL_AVG_VOL}%)"
        )
    
    return {'volatility': annualized_vol, 'timestamp': datetime.now(timezone.utc).isoformat()}


# ============ 启动调度器 ============

async def main():
    scheduler = ContinuousScheduler()
    
    # 注册任务
    scheduler.register(
        "depth_monitor",
        interval=5,  # 每 5 秒检查一次
        callback=monitor_depth_anomaly,
        max_retries=5
    )
    
    scheduler.register(
        "volatility_update",
        interval=300,  # 每 5 分钟更新一次
        callback=rolling_volatility_update,
        initial_delay=10  # 启动后 10 秒开始
    )
    
    # 启动
    await scheduler.start()


if __name__ == "__main__":
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s | %(levelname)s | %(message)s"
    )
    asyncio.run(main())

代码说明

  • 心跳保活_ping_loop 每 30 秒发送一次 ping,防止 WebSocket 连接断开
  • 指数退避重试:任务失败后等待时间指数增长(2^n 秒),避免频繁重试
  • 抖动:在退避时间上添加随机抖动,防止多任务同时重试造成的"惊群效应"
  • 健康检查:每秒检查所有任务的最后执行时间,发现"卡住"的任务
  • 任务分层:高频(5 秒)、中频(5 分钟)、低频(可扩展)分层调度

四、解决方案三:告警防疲劳机制

4.1 告警疲劳的根源

告警疲劳(Alert Fatigue)在 7x24 场景下尤为严重:

  1. 持续压力:没有任何"休息时间",告警随时可能响起
  2. 信号淹没:噪音告警太多,真正的异常被稀释
  3. 上下文缺失:凌晨的告警没有足够的背景信息

核心解法:不是减少告警数量,而是提高告警质量——让每一次告警都值得响应。

4.2 四层防疲劳机制

层级 机制 效果
去重 同类告警在冷却期内只触发一次 减少重复告警
聚合 将短时间内的多个相关告警合并为一条 避免告警风暴
分级 按紧急程度分配不同响应策略 优化注意力分配
静默 可配置的静默窗口(可叠加) 允许主动暂停告警

4.3 生产级代码:告警防疲劳引擎

from dataclasses import dataclass, field
from typing import Dict, List, Optional, Set
from datetime import datetime, timedelta, timezone
from collections import defaultdict
import asyncio
import logging
import hashlib

logger = logging.getLogger(__name__)


@dataclass
class Alert:
    """告警数据模型"""
    level: str  # critical / warning / info
    title: str
    message: str
    tags: Set[str] = field(default_factory=set)  # 用于聚合的标签
    metadata: Dict = field(default_factory=dict)  # 额外上下文
    
    def fingerprint(self) -> str:
        """生成唯一指纹,用于去重和聚合"""
        # 基于 level + title + tags 生成指纹
        content = f"{self.level}:{self.title}:{','.join(sorted(self.tags))}"
        return hashlib.md5(content.encode()).hexdigest()[:12]


@dataclass
class AlertChannel:
    """告警渠道配置"""
    name: str
    min_level: str  # 该渠道接收的最低告警级别
    cooldown_seconds: int  # 该渠道的冷却时间


class AlertManager:
    """
    告警防疲劳引擎
    
    功能:
    1. 去重:同类告警在冷却期内只发送一次
    2. 聚合:将短时间内多个相关告警合并
    3. 分级:按级别路由到不同渠道
    4. 静默:支持配置的静默窗口
    """
    
    # 告警级别定义
    LEVELS = ['debug', 'info', 'warning', 'critical']
    LEVEL_PRIORITY = {l: i for i, l in enumerate(LEVELS)}
    
    def __init__(self):
        # 去重表:{fingerprint: last_sent_time}
        self._dedup_cache: Dict[str, datetime] = {}
        
        # 聚合缓冲区:{fingerprint: [alert, ...]}
        self._aggregation_buffer: Dict[str, List[Alert]] = defaultdict(list)
        
        # 静默规则:{rule_name: (start_hour, end_hour, weekdays)}
        self._silence_rules: List[Dict] = []
        
        # 冷却配置(秒)
        self._cooldown: Dict[str, int] = {
            'critical': 300,      # 严重告警:5 分钟冷却
            'warning': 900,       # 警告告警:15 分钟冷却
            'info': 3600,         # 信息告警:1 小时冷却
        }
        
        # 聚合窗口(秒):窗口内的同类告警会被聚合
        self._aggregation_window = 60
        
        # 渠道配置
        self._channels: Dict[str, AlertChannel] = {
            'sms': AlertChannel('sms', 'critical', cooldown_seconds=1800),
            'push': AlertChannel('push', 'warning', cooldown_seconds=600),
            'log': AlertChannel('log', 'info', cooldown_seconds=60),
        }
        
        # 锁
        self._lock = asyncio.Lock()
        
        # 聚合定时器
        self._aggregation_task: Optional[asyncio.Task] = None
    
    def add_silence_rule(
        self,
        name: str,
        start_hour: int,
        end_hour: int,
        weekdays: Optional[List[int]] = None,
        tags: Optional[Set[str]] = None
    ):
        """
        添加静默规则
        
        Args:
            name: 规则名称
            start_hour: 静默开始小时(UTC)
            end_hour: 静默结束小时(UTC)
            weekdays: 静默适用的星期几(0=周一, 6=周日),None 表示每天都适用
            tags: 仅静默带有这些标签的告警,None 表示静默所有
        """
        self._silence_rules.append({
            'name': name,
            'start_hour': start_hour,
            'end_hour': end_hour,
            'weekdays': weekdays,
            'tags': tags
        })
        logger.info(f"Added silence rule: {name}")
    
    async def send(self, alert: Alert) -> bool:
        """
        发送告警(入口方法)
        
        Returns:
            True if alert was sent, False if suppressed
        """
        async with self._lock:
            now = datetime.now(timezone.utc)
            
            # 第一层:静默检查
            if self._is_silenced(alert, now):
                logger.debug(f"Alert silenced: {alert.title}")
                return False
            
            # 第二层:去重检查
            fingerprint = alert.fingerprint()
            if self._is_duplicate(fingerprint, alert.level, now):
                logger.debug(f"Alert deduplicated: {alert.title}")
                return False
            
            # 第三层:聚合
            self._aggregation_buffer[fingerprint].append(alert)
            
            # 如果是立即告警(非聚合场景),直接发送
            if alert.level == 'critical':
                await self._flush_fingerprint(fingerprint)
            
            return True
    
    def _is_silenced(self, alert: Alert, now: datetime) -> bool:
        """检查告警是否在静默期内"""
        current_hour = now.hour
        current_weekday = now.weekday()
        
        for rule in self._silence_rules:
            # 检查时间窗口
            if rule['start_hour'] <= current_hour < rule['end_hour']:
                # 检查星期
                if rule['weekdays'] and current_weekday not in rule['weekdays']:
                    continue
                
                # 检查标签
                if rule['tags'] and not alert.tags.intersection(rule['tags']):
                    continue
                
                logger.info(f"Alert matched silence rule: {rule['name']}")
                return True
        
        return False
    
    def _is_duplicate(self, fingerprint: str, level: str, now: datetime) -> bool:
        """检查是否重复告警"""
        if fingerprint not in self._dedup_cache:
            return False
        
        last_sent = self._dedup_cache[fingerprint]
        cooldown = self._cooldown.get(level, 600)
        
        if (now - last_sent).total_seconds() < cooldown:
            return True
        
        return False
    
    async def _flush_fingerprint(self, fingerprint: str):
        """发送聚合后的告警"""
        alerts = self._aggregation_buffer.get(fingerprint, [])
        if not alerts:
            return
        
        # 合并告警
        combined_alert = self._merge_alerts(alerts)
        
        # 按渠道发送
        for channel_name, channel in self._channels.items():
            if self._should_send_to_channel(combined_alert, channel):
                await self._send_to_channel(channel_name, combined_alert)
        
        # 更新去重缓存
        self._dedup_cache[fingerprint] = datetime.now(timezone.utc)
        
        # 清理缓冲区
        del self._aggregation_buffer[fingerprint]
    
    def _merge_alerts(self, alerts: List[Alert]) -> Alert:
        """合并多条告警"""
        if len(alerts) == 1:
            return alerts[0]
        
        # 取最高级别
        max_level = max(alerts, key=lambda a: self.LEVEL_PRIORITY[a.level]).level
        
        # 合并元数据
        merged_metadata = {'count': len(alerts), 'alerts': []}
        for a in alerts:
            merged_metadata['alerts'].append({
                'title': a.title,
                'message': a.message,
                'timestamp': datetime.now(timezone.utc).isoformat()
            })
        
        return Alert(
            level=max_level,
            title=f"[{len(alerts)}x] {alerts[0].title}",
            message=f"Multiple alerts in aggregation window",
            tags=alerts[0].tags,
            metadata=merged_metadata
        )
    
    def _should_send_to_channel(self, alert: Alert, channel: AlertChannel) -> bool:
        """检查是否应发送到该渠道"""
        alert_priority = self.LEVEL_PRIORITY.get(alert.level, 0)
        channel_priority = self.LEVEL_PRIORITY.get(channel.min_level, 0)
        return alert_priority >= channel_priority
    
    async def _send_to_channel(self, channel: str, alert: Alert):
        """发送到指定渠道(示例实现)"""
        if channel == 'sms':
            # 实际实现:调用短信 API
            logger.warning(f"[SMS] {alert.level.upper()}: {alert.title}")
        elif channel == 'push':
            # 实际实现:调用推送服务
            logger.warning(f"[PUSH] {alert.level.upper()}: {alert.title}")
        elif channel == 'log':
            # 日志渠道:始终记录
            logger.info(f"[LOG] {alert.title}")
    
    async def start_aggregation_loop(self):
        """启动聚合定时器(定期 flush 缓冲区)"""
        while True:
            await asyncio.sleep(self._aggregation_window)
            
            async with self._lock:
                fingerprints = list(self._aggregation_buffer.keys())
                
            for fp in fingerprints:
                await self._flush_fingerprint(fp)
    
    async def flush_all(self):
        """手动 flush 所有缓冲区"""
        async with self._lock:
            fingerprints = list(self._aggregation_buffer.keys())
        
        for fp in fingerprints:
            await self._flush_fingerprint(fp)


# ============ 使用示例 ============

async def demo_alert_system():
    """演示告警防疲劳系统"""
    manager = AlertManager()
    
    # 添加静默规则:UTC 2:00-6:00(北京时间 10:00-14:00)静默 info 级别告警
    manager.add_silence_rule(
        name="lunch_silence",
        start_hour=2,
        end_hour=6,
        tags={'info', 'news'}  # 仅静默 info 和 news 标签
    )
    
    # 启动聚合循环
    agg_task = asyncio.create_task(manager.start_aggregation_loop())
    
    # 模拟告警场景
    test_alerts = [
        Alert(
            level='critical',
            title='BTC深度异常',
            message='买卖压力比突破 5:1',
            tags={'depth', 'critical'}
        ),
        Alert(
            level='warning',
            title='波动率升高',
            message='24h 波动率触及 1.5 倍历史均值',
            tags={'volatility', 'warning'}
        ),
        Alert(
            level='info',
            title='新地址活动',
            message='检测到大额转入',
            tags={'onchain', 'info'}
        ),
    ]
    
    for alert in test_alerts:
        sent = await manager.send(alert)
        print(f"Sent: {sent} - {alert.title}")
    
    # 模拟短时间内多次触发同类告警
    print("\n--- Simulating alert burst ---")
    for i in range(5):
        alert = Alert(
            level='warning',
            title='订单簿刷新',
            message=f'检测到订单簿更新 #{i+1}',
            tags={'depth'}
        )
        sent = await manager.send(alert)
        print(f"Attempt {i+1}: Sent={sent}")
    
    # 等待聚合
    await asyncio.sleep(2)
    await manager.flush_all()
    
    agg_task.cancel()


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    asyncio.run(demo_alert_system())

核心机制说明

  • 去重指纹:基于 level + title + tags 生成哈希,相同组合在冷却期内只告警一次
  • 聚合窗口:60 秒内的同类告警被合并,最后发送一条"5x"的聚合告警
  • 静默规则:支持按时间、星期、标签三个维度配置静默
  • 分级路由:critical 发所有渠道,warning 发推送和日志,info 只发日志

五、系统架构总览

将上述三个方案整合,完整的 7x24 数字货币监控系统架构如下:

┌─────────────────────────────────────────────────────────────────┐
│                         数据源层                                 │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐             │
│  │  TickDB     │  │  交易所      │  │  链上数据    │             │
│  │  WebSocket  │  │  REST API   │  │  浏览器      │             │
│  │  (depth,    │  │  (补充数据)  │  │  (可选)      │             │
│  │   trades)   │  │             │  │             │             │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘             │
└─────────┼────────────────┼────────────────┼─────────────────────┘
          │                │                │
          ▼                ▼                ▼
┌─────────────────────────────────────────────────────────────────┐
│                        调度执行层                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │              ContinuousScheduler (7x24)                  │   │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐               │   │
│  │  │ 高频任务  │  │ 中频任务  │  │ 低频任务  │               │   │
│  │  │ (5s)     │  │ (5min)   │  │ (1h+)    │               │   │
│  │  └──────────┘  └──────────┘  └──────────┘               │   │
│  └─────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘
          │
          ▼
┌─────────────────────────────────────────────────────────────────┐
│                        告警处理层                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                  AlertManager                            │   │
│  │  ┌────────┐  ┌────────┐  ┌────────┐  ┌────────┐        │   │
│  │  │  去重   │→ │  聚合   │→ │  分级   │→ │  静默   │        │   │
│  │  └────────┘  └────────┘  └────────┘  └────────┘        │   │
│  └─────────────────────────────────────────────────────────┘   │
│                          │                                      │
│              ┌───────────┼───────────┐                         │
│              ▼           ▼           ▼                          │
│         ┌────────┐  ┌────────┐  ┌────────┐                     │
│         │  SMS   │  │  PUSH  │  │  LOG   │                     │
│         └────────┘  └────────┘  └────────┘                     │
└─────────────────────────────────────────────────────────────────┘

六、部署方案对比

维度 个人开发者 小型团队 机构级
调度方案 Python asyncio 单进程 多进程 + Redis 队列 Kubernetes + 分布式调度
告警渠道 日志 + Push(免费) Push + 飞书/钉钉 全渠道 + 专人值班
数据存储 本地 SQLite PostgreSQL 时序数据库(InfluxDB/Prometheus)
故障恢复 手动重启 Supervisor 自动拉起 健康检查 + 自动扩缩容
TickDB 使用 免费层(基础 depth) 标准层(depth + trades) 专业层(全量数据 + SLA)

结语

数字货币的 7x24 特性不是一个"特殊情况",而是一个更纯粹的挑战:当"交易日"这个基本假设消失后,你必须从头审视监控系统的每一个时间相关设计。

核心结论

  1. 交易日的定义:采用 UTC 0 点作为基准,配合滚动窗口实现无边界数据统计
  2. 调度系统:从"定时触发"转向"持续运行",支持分层频率和故障自愈
  3. 告警机制:从"发出去"转向"发对",通过去重、聚合、分级、静默四层过滤提高告警质量

这些问题没有唯一正确答案,但有更好的起点。从承认"7x24 是常态"开始,你的监控系统设计就已经赢了一半。


下一步行动

如果你是个人开发者,想快速验证本文方案

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key,设置环境变量 TICKDB_API_KEY
  3. 复制本文代码,直接运行 scheduler.pyalert_manager.py

如果你需要更高频率的 depth 数据和 trades 数据
联系 [email protected] 了解专业版方案,支持每秒级别的数据推送和机构级 SLA。

如果你习惯用 AI 辅助开发
在 AI 助手中搜索安装 tickdb-market-data SKILL,用自然语言查询数字货币的订单簿和成交数据。


本文不构成任何投资建议。市场有风险,投资需谨慎。