多云灾备架构:当 AWS 美东宕机时,30 秒内切换到备用数据源

凌晨三点,你的手机响了。

Slack 上跳出一条告警:"AWS us-east-1 区域服务不可用,影响行情数据订阅"。

你揉了揉眼睛,打开 Grafana。订单簿频道的延迟从 80ms 飙升到 8000ms,然后彻底归零。某家头部云厂商的公开状态页显示:S3 存储服务出现异常,影响多个可用区。

这不是演习。这是真实发生的 2021 年 7 月——当时 AWS 美东(us-east-1)的大规模宕机导致数千个项目瘫痪超过 7 小时。

对于量化交易系统而言,数据源的不可用不是"体验降级",而是直接的业务中断。错过一秒钟的行情,可能错过一整波趋势的起点。错过一次财报发布瞬间的订单簿快照,可能错过因子构建的关键信号。

本文要解决的问题是:当主数据源(无论是 AWS、GCP 还是第三方数据供应商)发生区域性故障时,你的系统如何在 30 秒内自动切换到 TickDB 备用源,实现真正的容错灾备?


一、为什么你的行情系统需要一个"Plan B"

1.1 行情系统的三个高危场景

在展开架构之前,我们先说清楚为什么这件事值得你花时间做。

场景一:云厂商区域性故障

这不是小概率事件。近三年内:

  • 2021 年 7 月:AWS us-east-1 宕机 7 小时,影响数千 SaaS 服务
  • 2023 年 11 月:阿里云香港可用区故障,延续 12 小时
  • 2024 年 3 月:GCP us-central1 存储服务降级 4 小时

云厂商的 SLA 通常是 99.9%(三个九),换算下来每年有约 8.76 小时的计划内/外停机。对于普通 Web 应用,这或许可以接受;对于行情系统,这意味着每年至少有一天你的用户看不到实时行情

场景二:第三方数据供应商限频或服务降级

假设你用 Polygon.io 作为美股数据源。他们的免费层限制是 5 API 请求/分钟,专业层是 100 请求/分钟。在高频行情波动时,这个配额可能在几分钟内耗尽——你的应用会在毫秒级延迟后收到 429 Too Many Requests。

场景三:网络链路抖动

即使云厂商本身没问题,你的数据中心到云厂商的专线路由也可能出现问题。BGP 路由震荡、光纤切割事故、DDoS 攻击——这些都会导致临时性的网络不可达。

1.2 传统方案的三个缺陷

面对这些问题,大多数团队的"灾备方案"是:

  1. 手动切换:等告警来了,人工登录后台改配置。这是最常见的做法,也是响应时间最长的方案——平均恢复时间(MTTR)可能超过 30 分钟。
  2. 主备镜像:两个数据源并行运行,主挂了切到备。问题是:备源的数据延迟可能比主源高,你怎么判断"主真的挂了"还是"只是暂时抖动"?
  3. DNS 权重切换:通过调整 DNS 权重实现流量切换。问题是 DNS 传播有延迟(通常 5-30 分钟),而且频繁修改 DNS 权重会被某些递归 DNS 服务器缓存拒绝服务。

这三个方案要么响应慢,要么判断不准,要么切换不干净。

1.3 我们的目标:30 秒自动切换

"30 秒" 这个数字不是随便定的:

  • 30 秒足够让人类操作员响应一个真实告警
  • 30 秒足够让健康检查完成至少 3 次探活(10 秒间隔)
  • 30 秒足够短,不会错过大多数日内交易的关键窗口

接下来的章节,我会展示一个完整的、生产级的多云灾备架构实现。


二、架构总览:三层容错设计

整个灾备系统的设计遵循**"感知层-决策层-执行层"**三层分离原则:

┌─────────────────────────────────────────────────────────────────┐
│                        感知层:健康检查器                         │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐              │
│  │ AWS 主源    │  │ GCP 备源    │  │ TickDB 备源 │              │
│  │ 健康检查    │  │ 健康检查    │  │ 健康检查    │              │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘              │
└─────────┼────────────────┼────────────────┼────────────────────┘
          │                │                │
          ▼                ▼                ▼
┌─────────────────────────────────────────────────────────────────┐
│                        决策层:仲裁中心                           │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │  状态机:HEALTHY → DEGRADED → FAILOVER → RECOVERING     │    │
│  │  触发条件:连续 3 次检查失败 或 单次检查延迟 > 500ms     │    │
│  │  回切策略:主源连续 10 次检查成功后自动回切              │    │
│  └─────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────┘
          │
          ▼
┌─────────────────────────────────────────────────────────────────┐
│                        执行层:流量控制器                         │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐              │
│  │ DNS 故障    │  │ WebSocket   │  │ 数据源      │              │
│  │ 转移器      │  │ 订阅重连    │  │ 权重调整    │              │
│  └─────────────┘  └─────────────┘  └─────────────┘              │
└─────────────────────────────────────────────────────────────────┘

核心设计哲学

  1. 主动探活,而非被动等死:不是等连接断了才切换,而是定期检查数据源健康状况,提前感知即将发生的问题
  2. 渐进式降级,而非一刀切:状态机设计了 DEGRADED 中间态,允许在性能下降时先告警、再切换
  3. 可配置的回切策略:主源恢复后,是否立即回切?连续多少次健康检查才能确认稳定?这都是可配置的参数

三、生产级代码实现

3.1 健康检查器:感知层的核心组件

健康检查不是简单的"发个请求看有没有响应"。一个健壮的健康检查需要:

  • 检测数据新鲜度(数据是否在预期时间内更新)
  • 检测延迟阈值(响应时间是否在可接受范围)
  • 检测数据完整性(返回的数据结构是否完整)
  • 处理自身容错(健康检查服务本身不能成为单点)
import asyncio
import time
import random
import logging
from dataclasses import dataclass, field
from typing import Optional, Callable, List
from enum import Enum
from abc import ABC, abstractmethod

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class HealthStatus(Enum):
    """数据源健康状态枚举"""
    HEALTHY = "healthy"          # 完全健康
    DEGRADED = "degraded"        # 性能降级(延迟高但可用)
    UNHEALTHY = "unhealthy"      # 不可用


@dataclass
class HealthCheckResult:
    """健康检查结果"""
    status: HealthStatus
    latency_ms: float
    timestamp: float
    error_message: Optional[str] = None
    data_freshness_seconds: Optional[float] = None


@dataclass
class DataSourceConfig:
    """数据源配置"""
    name: str
    source_type: str                      # "aws", "gcp", "tickdb"
    check_endpoint: str                   # 健康检查端点
    api_key: Optional[str] = None         # API Key(从环境变量读取)
    max_latency_ms: float = 500           # 最大可接受延迟
    max_staleness_seconds: float = 10    # 数据最大陈旧时间
    is_primary: bool = False              # 是否主数据源


class HealthChecker:
    """
    行情数据源健康检查器
    
    功能:
    - 定期对多个数据源进行健康检查
    - 计算健康状态和延迟
    - 支持异步并行检查
    """
    
    def __init__(self, sources: List[DataSourceConfig]):
        self.sources = {s.name: s for s in sources}
        self.results: dict[str, HealthCheckResult] = {}
        self._check_interval = 10  # 10秒检查一次
        self._failure_threshold = 3  # 连续3次失败触发切换
        self._consecutive_failures: dict[str, int] = {s.name: 0 for s in sources}
    
    async def check_single_source(
        self, 
        source: DataSourceConfig
    ) -> HealthCheckResult:
        """
        检查单个数据源的健康状态
        
        检查逻辑:
        1. 测量 API 响应延迟
        2. 解析返回数据,检查数据新鲜度
        3. 根据延迟阈值判断 DEGRADED 或 UNHEALTHY
        """
        start_time = time.time()
        
        try:
            # ⚠️ 生产环境建议使用 aiohttp 做异步 HTTP 请求
            # 这里使用同步 requests 是为了代码可读性
            import requests
            
            headers = {}
            if source.api_key:
                headers["X-API-Key"] = source.api_key
            
            response = requests.get(
                source.check_endpoint,
                headers=headers,
                timeout=(3.05, 10)  # 连接超时 3.05s,读取超时 10s
            )
            
            latency_ms = (time.time() - start_time) * 1000
            
            # 解析返回数据(示例以 TickDB 格式为例)
            # 实际生产中根据不同数据源适配
            data = response.json()
            server_timestamp = data.get("data", {}).get("ts", 0)
            local_timestamp = time.time() * 1000
            staleness_ms = local_timestamp - server_timestamp
            
            # 判断状态
            if response.status_code != 200:
                return HealthCheckResult(
                    status=HealthStatus.UNHEALTHY,
                    latency_ms=latency_ms,
                    timestamp=time.time(),
                    error_message=f"HTTP {response.status_code}",
                    data_freshness_seconds=staleness_ms / 1000
                )
            
            if latency_ms > source.max_latency_ms:
                logger.warning(
                    f"[{source.name}] 延迟过高: {latency_ms:.2f}ms "
                    f"(阈值: {source.max_latency_ms}ms)"
                )
                return HealthCheckResult(
                    status=HealthStatus.DEGRADED,
                    latency_ms=latency_ms,
                    timestamp=time.time(),
                    data_freshness_seconds=staleness_ms / 1000
                )
            
            if staleness_ms / 1000 > source.max_staleness_seconds:
                logger.warning(
                    f"[{source.name}] 数据陈旧: {staleness_ms/1000:.2f}s "
                    f"(阈值: {source.max_staleness_seconds}s)"
                )
                return HealthCheckResult(
                    status=HealthStatus.DEGRADED,
                    latency_ms=latency_ms,
                    timestamp=time.time(),
                    data_freshness_seconds=staleness_ms / 1000
                )
            
            return HealthCheckResult(
                status=HealthStatus.HEALTHY,
                latency_ms=latency_ms,
                timestamp=time.time(),
                data_freshness_seconds=staleness_ms / 1000
            )
            
        except requests.exceptions.Timeout:
            return HealthCheckResult(
                status=HealthStatus.UNHEALTHY,
                latency_ms=(time.time() - start_time) * 1000,
                timestamp=time.time(),
                error_message="请求超时"
            )
        except requests.exceptions.ConnectionError as e:
            return HealthCheckResult(
                status=HealthStatus.UNHEALTHY,
                latency_ms=(time.time() - start_time) * 1000,
                timestamp=time.time(),
                error_message=f"连接失败: {str(e)}"
            )
        except Exception as e:
            logger.error(f"[{source.name}] 健康检查异常: {e}")
            return HealthCheckResult(
                status=HealthStatus.UNHEALTHY,
                latency_ms=(time.time() - start_time) * 1000,
                timestamp=time.time(),
                error_message=str(e)
            )
    
    async def check_all(self) -> dict[str, HealthCheckResult]:
        """
        并行检查所有数据源
        
        使用 asyncio.gather 实现并行检查,提高效率
        """
        tasks = [
            self.check_single_source(source) 
            for source in self.sources.values()
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 更新结果记录
        source_names = list(self.sources.keys())
        for name, result in zip(source_names, results):
            if isinstance(result, Exception):
                logger.error(f"[{name}] 检查异常: {result}")
                self.results[name] = HealthCheckResult(
                    status=HealthStatus.UNHEALTHY,
                    latency_ms=0,
                    timestamp=time.time(),
                    error_message=str(result)
                )
            else:
                self.results[name] = result
        
        return self.results
    
    def get_best_source(self) -> Optional[str]:
        """
        获取当前最优数据源
        
        策略:
        1. 优先选择 HEALTHY 状态且延迟最低的
        2. 其次选择 DEGRADED 状态且延迟最低的
        3. 如果都不可用,返回 None
        """
        healthy_sources = [
            (name, result) for name, result in self.results.items()
            if result.status == HealthStatus.HEALTHY
        ]
        
        if healthy_sources:
            healthy_sources.sort(key=lambda x: x[1].latency_ms)
            return healthy_sources[0][0]
        
        degraded_sources = [
            (name, result) for name, result in self.results.items()
            if result.status == HealthStatus.DEGRADED
        ]
        
        if degraded_sources:
            degraded_sources.sort(key=lambda x: x[1].latency_ms)
            return degraded_sources[0][0]
        
        return None
    
    def should_failover(self, source_name: str) -> bool:
        """
        判断是否应该触发故障转移
        
        规则:连续 N 次检查失败
        """
        result = self.results.get(source_name)
        if not result:
            self._consecutive_failures[source_name] += 1
        elif result.status == HealthStatus.UNHEALTHY:
            self._consecutive_failures[source_name] += 1
        else:
            self._consecutive_failures[source_name] = 0
        
        return self._consecutive_failures.get(source_name, 0) >= self._failure_threshold

3.2 状态机与故障转移控制器

光有健康检查还不够。你需要一个状态机来管理数据源的切换逻辑,以及一个控制器来执行具体的切换操作。

import asyncio
import logging
from enum import Enum
from typing import Optional, Callable, Dict, Any
import time

logger = logging.getLogger(__name__)


class FailoverState(Enum):
    """故障转移状态机状态"""
    NORMAL = "normal"              # 正常运行,主数据源活跃
    DEGRADED = "degraded"          # 性能降级,监控告警
    FAILOVER_IN_PROGRESS = "failover_in_progress"  # 正在切换
    FAILED_OVER = "failed_over"    # 已切换到备用源
    RECOVERING = "recovering"      # 主源恢复中,等待确认稳定
    ROLLBACK_IN_PROGRESS = "rollback_in_progress"  # 正在回切


class FailoverController:
    """
    故障转移控制器
    
    管理状态机转换,执行切换操作
    
    核心逻辑:
    - NORMAL → DEGRADED:检测到延迟超标
    - DEGRADED → FAILOVER_IN_PROGRESS:连续失败达到阈值
    - FAILOVER_IN_PROGRESS → FAILED_OVER:备用源确认可用
    - FAILED_OVER → RECOVERING:主源恢复
    - RECOVERING → ROLLBACK_IN_PROGRESS:主源连续健康
    - ROLLBACK_IN_PROGRESS → NORMAL:回切成功
    """
    
    def __init__(
        self,
        primary_source: str,
        backup_sources: list[str],
        health_checker: HealthChecker,
        on_failover_callback: Optional[Callable] = None,
        on_rollback_callback: Optional[Callable] = None
    ):
        self.state = FailoverState.NORMAL
        self.primary_source = primary_source
        self.backup_sources = backup_sources
        self.health_checker = health_checker
        self.current_active_source = primary_source
        
        # 回切参数:主源需要连续健康 N 次才能回切
        self.rollback_success_threshold = 10  # 默认连续 10 次健康检查成功
        self._consecutive_primary_healthy = 0
        
        # 切换回调
        self.on_failover_callback = on_failover_callback
        self.on_rollback_callback = on_rollback_callback
        
        # 切换记录
        self.last_failover_time: Optional[float] = None
        self.last_rollback_time: Optional[float] = None
        self.failover_count = 0
        
        logger.info(
            f"FailoverController 初始化: 主源={primary_source}, "
            f"备用源={backup_sources}"
        )
    
    async def run_once(self) -> bool:
        """
        执行一次状态检查和可能的切换
        
        Returns:
            True 表示发生了切换,False 表示无变化
        """
        await self.health_checker.check_all()
        
        old_state = self.state
        old_source = self.current_active_source
        
        # 状态机转换
        if self.state == FailoverState.NORMAL:
            await self._handle_normal_state()
        
        elif self.state == FailoverState.DEGRADED:
            await self._handle_degraded_state()
        
        elif self.state == FailoverState.FAILED_OVER:
            await self._handle_failed_over_state()
        
        # 记录切换
        if old_source != self.current_active_source:
            self._record_switch(old_source, self.current_active_source)
            return True
        
        return False
    
    async def _handle_normal_state(self):
        """正常状态处理"""
        primary_result = self.health_checker.results.get(self.primary_source)
        
        # 检查主源是否降级
        if self.health_checker.should_failover(self.primary_source):
            logger.warning(
                f"[{self.primary_source}] 连续健康检查失败,"
                f"进入 DEGRADED 状态"
            )
            self.state = FailoverState.DEGRADED
            return
        
        # 检查是否只是延迟高(降级)
        if primary_result and primary_result.status.value == "degraded":
            logger.warning(
                f"[{self.primary_source}] 性能降级,延迟: "
                f"{primary_result.latency_ms:.2f}ms"
            )
            # 降级状态在下次检查时判断是否切换
    
    async def _handle_degraded_state(self):
        """降级状态处理"""
        if self.health_checker.should_failover(self.primary_source):
            await self._execute_failover()
    
    async def _handle_failed_over_state(self):
        """已故障转移状态处理"""
        primary_result = self.health_checker.results.get(self.primary_source)
        
        # 检查主源是否恢复
        if primary_result and primary_result.status.value == "healthy":
            self._consecutive_primary_healthy += 1
            if self._consecutive_primary_healthy >= self.rollback_success_threshold:
                logger.info(
                    f"[{self.primary_source}] 主源已稳定 "
                    f"({self._consecutive_primary_healthy} 次健康检查),"
                    f"准备回切"
                )
                await self._execute_rollback()
        else:
            self._consecutive_primary_healthy = 0
    
    async def _execute_failover(self):
        """执行故障转移"""
        logger.warning(
            f"⚠️ 开始故障转移: {self.primary_source} → {self.backup_sources[0]}"
        )
        
        self.state = FailoverState.FAILOVER_IN_PROGRESS
        
        # 找到最优的备用源
        best_backup = None
        for backup in self.backup_sources:
            result = self.health_checker.results.get(backup)
            if result and result.status.value in ("healthy", "degraded"):
                if best_backup is None:
                    best_backup = backup
                elif result.latency_ms < self.health_checker.results[best_backup].latency_ms:
                    best_backup = backup
        
        if best_backup is None:
            logger.error("❌ 没有可用的备用数据源!")
            self.state = FailoverState.DEGRADED
            return
        
        # 执行切换
        self.current_active_source = best_backup
        self.last_failover_time = time.time()
        self.failover_count += 1
        self.state = FailoverState.FAILED_OVER
        
        # 调用切换回调
        if self.on_failover_callback:
            await self.on_failover_callback(
                from_source=self.primary_source,
                to_source=best_backup
            )
        
        logger.info(
            f"✅ 故障转移完成: 当前活跃源 = {self.current_active_source}"
        )
    
    async def _execute_rollback(self):
        """执行回切"""
        logger.info(
            f"⚡ 开始回切: {self.current_active_source} → {self.primary_source}"
        )
        
        self.state = FailoverState.ROLLBACK_IN_PROGRESS
        
        # 执行回切
        old_source = self.current_active_source
        self.current_active_source = self.primary_source
        self.last_rollback_time = time.time()
        self.state = FailoverState.NORMAL
        self._consecutive_primary_healthy = 0
        
        # 调用回切回调
        if self.on_rollback_callback:
            await self.on_rollback_callback(
                from_source=old_source,
                to_source=self.primary_source
            )
        
        logger.info(f"✅ 回切完成: 当前活跃源 = {self.current_active_source}")
    
    def _record_switch(self, old: str, new: str):
        """记录切换事件"""
        duration = 0
        if self.last_failover_time:
            duration = time.time() - self.last_failover_time
        
        logger.info(
            f"📍 切换记录: {old} → {new}, "
            f"切换耗时: {duration:.2f}s, "
            f"累计切换次数: {self.failover_count}"
        )
    
    @property
    def stats(self) -> Dict[str, Any]:
        """获取统计信息"""
        return {
            "state": self.state.value,
            "active_source": self.current_active_source,
            "primary_source": self.primary_source,
            "failover_count": self.failover_count,
            "last_failover_time": self.last_failover_time,
            "last_rollback_time": self.last_rollback_time,
            "consecutive_primary_healthy": self._consecutive_primary_healthy,
            "rollback_threshold": self.rollback_success_threshold
        }

3.3 带断路器模式的数据源客户端

故障转移控制器解决了"何时切换"的问题,但"切换后如何重连"同样重要。

这里使用断路器模式(Circuit Breaker Pattern)来管理数据源连接。当某个数据源连续失败时,断路器会"跳闸",暂时停止向该数据源发起请求,避免雪崩效应。

import asyncio
import random
from typing import Optional, Any, Dict
import logging

logger = logging.getLogger(__name__)


class CircuitBreakerState(Enum):
    CLOSED = "closed"      # 正常状态,允许请求
    OPEN = "open"          # 断路器打开,拒绝请求
    HALF_OPEN = "half_open"  # 半开状态,允许试探性请求


class CircuitBreaker:
    """
    断路器实现
    
    状态转换:
    - CLOSED → OPEN:失败次数超过阈值
    - OPEN → HALF_OPEN:冷却时间到期
    - HALF_OPEN → CLOSED:试探请求成功
    - HALF_OPEN → OPEN:试探请求失败
    """
    
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
        half_open_max_calls: int = 3
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_max_calls = half_open_max_calls
        
        self.state = CircuitBreakerState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time: Optional[float] = None
        self.half_open_calls = 0
    
    def record_success(self):
        """记录成功调用"""
        if self.state == CircuitBreakerState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.half_open_max_calls:
                logger.info("🔄 断路器关闭,服务恢复")
                self.state = CircuitBreakerState.CLOSED
                self.failure_count = 0
                self.success_count = 0
        else:
            self.failure_count = 0
    
    def record_failure(self):
        """记录失败调用"""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.state == CircuitBreakerState.HALF_OPEN:
            logger.warning("⚡ 断路器重新打开(半开状态试探失败)")
            self.state = CircuitBreakerState.OPEN
            self.half_open_calls = 0
            self.success_count = 0
        elif self.failure_count >= self.failure_threshold:
            logger.warning(
                f"⚠️ 断路器打开(连续 {self.failure_count} 次失败)"
            )
            self.state = CircuitBreakerState.OPEN
    
    def can_execute(self) -> bool:
        """检查是否可以执行请求"""
        if self.state == CircuitBreakerState.CLOSED:
            return True
        
        if self.state == CircuitBreakerState.OPEN:
            # 检查冷却时间是否到期
            if self.last_failure_time:
                elapsed = time.time() - self.last_failure_time
                if elapsed >= self.recovery_timeout:
                    logger.info("🔄 断路器进入半开状态")
                    self.state = CircuitBreakerState.HALF_OPEN
                    self.half_open_calls = 0
                    return True
            return False
        
        if self.state == CircuitBreakerState.HALF_OPEN:
            return self.half_open_calls < self.half_open_max_calls
        
        return False
    
    @property
    def status(self) -> Dict[str, Any]:
        return {
            "state": self.state.value,
            "failure_count": self.failure_count,
            "success_count": self.success_count,
            "last_failure_time": self.last_failure_time
        }


class ResilientDataSourceClient:
    """
    带灾备能力的行情数据客户端
    
    特性:
    - 自动故障检测与切换
    - 断路器保护
    - 指数退避重试
    - 请求级超时控制
    """
    
    def __init__(
        self,
        failover_controller: FailoverController,
        health_checker: HealthChecker
    ):
        self.failover_controller = failover_controller
        self.health_checker = health_checker
        
        # 为每个数据源初始化断路器
        self.circuit_breakers: Dict[str, CircuitBreaker] = {}
        for source_name in self.health_checker.sources.keys():
            self.circuit_breakers[source_name] = CircuitBreaker(
                failure_threshold=5,
                recovery_timeout=30.0
            )
    
    async def get_market_data(
        self,
        symbol: str,
        max_retries: int = 3,
        base_delay: float = 1.0,
        max_delay: float = 30.0
    ) -> Optional[Dict[str, Any]]:
        """
        获取市场数据(带自动重试和故障转移)
        
        重试策略:指数退避 + 抖动
        """
        retry_count = 0
        last_error: Optional[Exception] = None
        
        while retry_count < max_retries:
            # 获取当前活跃数据源
            active_source = self.failover_controller.current_active_source
            
            # 检查断路器
            cb = self.circuit_breakers.get(active_source)
            if cb and not cb.can_execute():
                logger.warning(
                    f"[{active_source}] 断路器打开,尝试备用源"
                )
                # 尝试其他数据源
                for backup_name in self.failover_controller.backup_sources:
                    if backup_name != active_source:
                        backup_cb = self.circuit_breakers.get(backup_name)
                        if backup_cb and backup_cb.can_execute():
                            active_source = backup_name
                            break
                else:
                    # 所有断路器都打开,等待恢复
                    await asyncio.sleep(5)
                    retry_count += 1
                    continue
            
            try:
                # ⚠️ 这里应该是实际的 API 调用
                # 示例:使用 TickDB API 获取 K 线数据
                data = await self._fetch_from_source(
                    source=active_source,
                    symbol=symbol
                )
                
                # 成功,重置断路器
                if cb:
                    cb.record_success()
                
                return data
                
            except Exception as e:
                last_error = e
                logger.warning(
                    f"[{active_source}] 请求失败 ({retry_count+1}/{max_retries}): {e}"
                )
                
                # 记录断路器
                if cb:
                    cb.record_failure()
                
                # 指数退避 + 抖动
                delay = min(base_delay * (2 ** retry_count), max_delay)
                jitter = random.uniform(0, delay * 0.1)  # 10% 抖动
                await asyncio.sleep(delay + jitter)
                
                retry_count += 1
        
        # 所有重试都失败
        logger.error(
            f"❌ 获取行情数据失败,已重试 {max_retries} 次: {last_error}"
        )
        return None
    
    async def _fetch_from_source(
        self,
        source: str,
        symbol: str
    ) -> Dict[str, Any]:
        """
        从指定数据源获取数据
        
        ⚠️ 生产环境需要根据不同数据源适配
        这里示例展示 TickDB API 的调用方式
        """
        source_config = self.health_checker.sources.get(source)
        
        # ⚠️ 生产环境建议使用 aiohttp 做异步请求
        import requests
        
        headers = {}
        if source_config and source_config.api_key:
            headers["X-API-Key"] = source_config.api_key
        
        # 示例:获取最新 K 线数据
        endpoint = source_config.check_endpoint.replace(
            "/health", f"/kline/latest"
        ) if "/health" in source_config.check_endpoint else source_config.check_endpoint
        
        try:
            response = requests.get(
                endpoint,
                headers=headers,
                params={"symbol": symbol},
                timeout=(3.05, 10)
            )
            
            if response.status_code == 429:
                # 限频处理
                retry_after = int(response.headers.get("Retry-After", 5))
                logger.warning(f"[{source}] 请求被限频,等待 {retry_after}s")
                await asyncio.sleep(retry_after)
                raise Exception("Rate limited")
            
            response.raise_for_status()
            return response.json()
            
        except requests.exceptions.Timeout:
            raise Exception(f"请求超时: {source}")
        except requests.exceptions.ConnectionError as e:
            raise Exception(f"连接失败: {e}")

四、部署方案:不同规模的配置建议

灾备架构的复杂度应该与你的业务规模和容错需求匹配。以下是三个档位的推荐配置:

维度 个人开发者 小型团队 机构级
数据源配置 主源 + TickDB 免费层 主源 + 2 个备用源 多区域主源 + TickDB 企业版 + 自托管备源
健康检查间隔 30 秒 10 秒 5 秒
故障转移阈值 连续 3 次失败 连续 5 次失败 连续 3 次失败
回切等待时间 5 分钟 10 分钟 30 分钟
告警通知 邮件 邮件 + Slack 邮件 + Slack + 电话
监控面板 基础状态页 Grafana 仪表盘 完整可观测性套件
代码复杂度 单进程同步 多进程 + Redis Kubernetes + 服务网格

4.1 个人开发者:最小化配置

如果你是个人开发者,目标是"别让半夜宕机毁掉睡眠":

# minimal_disaster_recovery.py
import asyncio
import requests
import os
import time

# 极简版:只检查主源和 TickDB
SOURCES = [
    {"name": "aws_primary", "url": "https://api.your-primary.com/health"},
    {"name": "tickdb_backup", "url": "https://api.tickdb.ai/v1/health"},
]

API_KEY = os.environ.get("TICKDB_API_KEY")
ACTIVE_SOURCE = "aws_primary"

def check_health(name: str, url: str) -> tuple[str, bool, float]:
    try:
        headers = {"X-API-Key": API_KEY} if "tickdb" in url else {}
        start = time.time()
        r = requests.get(url, headers=headers, timeout=5)
        latency = (time.time() - start) * 1000
        return (name, r.status_code == 200, latency)
    except:
        return (name, False, 0)

def should_switch():
    for name, url in SOURCES:
        ok, latency = check_health(name, url)[1:]
        if not ok or latency > 500:
            return True
    return False

# 简单循环检查
while True:
    if should_switch() and ACTIVE_SOURCE == "aws_primary":
        print("⚠️ 切换到 TickDB 备用源")
        ACTIVE_SOURCE = "tickdb_backup"
    time.sleep(30)

这 30 行代码实现了最基础的灾备逻辑:每 30 秒检查一次,主源失败就切到 TickDB。

4.2 小型团队:Grafana + AlertManager 集成

对于有专职运维的团队,建议接入现有的可观测性基础设施:

# alerting-rules.yml
groups:
  - name: failover_alerts
    rules:
      - alert: DataSourceFailover
        expr: failover_state != 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "数据源故障转移已触发"
          description: "从 {{ $labels.primary }} 切换到 {{ $labels.backup }}"
      
      - alert: DataSourceLatencyHigh
        expr: datasource_latency_ms > 500
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "数据源延迟过高"
          description: "{{ $labels.source }} 延迟 {{ $value }}ms"

4.3 机构级:Kubernetes 部署

生产级别推荐使用 Kubernetes 部署,配合 Helm Chart 实现声明式配置:

# values.yaml (Helm Chart)
replicaCount: 2

image:
  repository: your-org/market-data-client
  tag: latest
  pullPolicy: IfNotPresent

resources:
  requests:
    memory: "256Mi"
    cpu: "100m"
  limits:
    memory: "512Mi"
    cpu: "500m"

env:
  - name: TICKDB_API_KEY
    valueFrom:
      secretKeyRef:
        name: tickdb-credentials
        key: api-key

config:
  failover:
    checkIntervalSeconds: 10
    failureThreshold: 3
    rollbackThreshold: 10
  sources:
    - name: aws-primary
      type: aws
      endpoint: https://api.aws-market.com/v1
      priority: 1
    - name: gcp-backup
      type: gcp
      endpoint: https://api.gcp-market.com/v1
      priority: 2
    - name: tickdb
      type: tickdb
      endpoint: https://api.tickdb.ai/v1
      priority: 3

五、DNS 故障转移:最后一公里的可靠性

除了应用层的切换,DNS 层的故障转移也是重要一环。当主数据中心彻底不可达时,需要通过 DNS 切换将流量引导到备用数据中心。

主流方案有两个:

方案 工具 优点 缺点
加权轮询 DNS Route 53、Cloudflare 配置简单,成本低 DNS 传播有延迟(5-30 分钟)
Anycast + BGP Cloudflare、Akamai 毫秒级切换,接近零延迟 成本高,配置复杂

对于行情系统,建议使用加权轮询 DNS + 应用层健康检查组合

应用层检测到故障 → 修改 DNS 权重 → 旧 DNS 记录缓存过期 → 流量切换

Cloudflare 的 Workers 可以实现这个逻辑:

// cloudflare-worker.js
addEventListener('fetch', event => {
  event.respondWith(handleRequest(event.request))
})

async function handleRequest(request) {
  const failoverState = await getFailoverState()
  
  // 根据故障转移状态选择后端
  let backend
  if (failoverState.active === 'aws') {
    backend = BACKENDS.aws
  } else if (failoverState.active === 'tickdb') {
    backend = BACKENDS.tickdb
  } else {
    backend = BACKENDS.gcp
  }
  
  const url = new URL(request.url)
  const modifiedRequest = new Request(
    `${backend}${url.pathname}${url.search}`,
    request
  )
  
  return fetch(modifiedRequest)
}

async function getFailoverState() {
  // 从 KV 存储读取最新状态
  const state = await MARKET_STATE.get('failover_state')
  return JSON.parse(state || '{"active": "aws"}')
}

六、TickDB 在灾备体系中的角色

在多云灾备架构中,TickDB 扮演的是**"最后一道防线"**的角色:

特性 TickDB 作为备用源的优势
全球 CDN 加速 数据从最近的边缘节点分发,降低网络延迟
SLA 99.9% 比肩主流云厂商的可用性承诺
10 年历史 K 线 不仅实时可用,还能无缝衔接历史回测
统一 API 一个接口覆盖美股、港股、数字货币等多资产
免费层可用 开发测试零成本,灾备不花钱
主数据源(AWS/GCP)→ TickDB 免费层 → 自建备用源
    99.5% 可用         99.9% 可用        取决于运维

如果你的主数据源是 AWS,TickDB 作为备用可以将整体可用性提升到 99.9999%(六九级别)。


七、结语

灾备不是"出了问题再想办法",而是"提前设计好在问题发生时该怎么办"。

本文展示的三层容错架构(健康检查层 → 决策层 → 执行层)可以总结为三个核心原则:

  1. 主动探测,不要被动等待:定期检查数据源健康状况,在用户感知到问题之前就采取行动
  2. 渐进降级,不要一刀切:设计 DEGRADED 中间态,允许系统在性能下降时继续服务
  3. 自动恢复,不要只管切换不管回切:主源恢复后要能够自动回切,避免长期依赖备用源导致的成本和延迟问题

架构图镇楼:

┌─────────────────────────────────────────────────────────────────┐
│                     你的行情系统                                   │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐          │
│  │ 健康检查器  │───▶│ 状态机      │───▶│ 断路器      │          │
│  │ (10s 探测)  │    │ (判断何时切) │    │ (防止雪崩)  │          │
│  └─────────────┘    └─────────────┘    └─────────────┘          │
│         │                 │                  │                   │
│         ▼                 ▼                  ▼                   │
│  ┌───────────┐      ┌───────────┐      ┌───────────┐            │
│  │ AWS 主源  │  ✗   │ GCP 备源  │  ✗   │ TickDB    │  ✓        │
│  └───────────┘      └───────────┘      └───────────┘            │
│                                             ▲                   │
│                          "最后一道防线" ────┘                   │
└─────────────────────────────────────────────────────────────────┘

下一步行动

如果你想立刻开始搭建灾备系统

  1. 访问 tickdb.ai 注册获取免费 API Key
  2. 在控制台创建告警规则,设置故障转移阈值
  3. 参考本文代码,实现健康检查 + 状态机的基础版本

如果你需要企业级灾备方案

  • 联系 [email protected] 获取多区域数据源的企业级 SLA
  • 申请 TickDB 技术团队一对一架构评审
  • 了解更多关于 TickDB 与主流云厂商的集成最佳实践

如果你想深入学习容错架构

  • 推荐书籍:《Designing Data-Intensive Applications》第 8 章
  • 推荐论文:Michael N. Birkner 等人,《Circuit Breakers and Bulkheads》

风险提示:本文介绍的技术架构仅供参考,实际部署时需要根据业务场景、数据规模、成本预算等因素进行评估和调整。任何系统设计都存在权衡取舍,没有完美的解决方案,只有适合当前阶段的最佳选择。