多云灾备架构:当 AWS 美东宕机时,30 秒内切换到备用数据源
凌晨三点,你的手机响了。
Slack 上跳出一条告警:"AWS us-east-1 区域服务不可用,影响行情数据订阅"。
你揉了揉眼睛,打开 Grafana。订单簿频道的延迟从 80ms 飙升到 8000ms,然后彻底归零。某家头部云厂商的公开状态页显示:S3 存储服务出现异常,影响多个可用区。
这不是演习。这是真实发生的 2021 年 7 月——当时 AWS 美东(us-east-1)的大规模宕机导致数千个项目瘫痪超过 7 小时。
对于量化交易系统而言,数据源的不可用不是"体验降级",而是直接的业务中断。错过一秒钟的行情,可能错过一整波趋势的起点。错过一次财报发布瞬间的订单簿快照,可能错过因子构建的关键信号。
本文要解决的问题是:当主数据源(无论是 AWS、GCP 还是第三方数据供应商)发生区域性故障时,你的系统如何在 30 秒内自动切换到 TickDB 备用源,实现真正的容错灾备?
一、为什么你的行情系统需要一个"Plan B"
1.1 行情系统的三个高危场景
在展开架构之前,我们先说清楚为什么这件事值得你花时间做。
场景一:云厂商区域性故障
这不是小概率事件。近三年内:
- 2021 年 7 月:AWS us-east-1 宕机 7 小时,影响数千 SaaS 服务
- 2023 年 11 月:阿里云香港可用区故障,延续 12 小时
- 2024 年 3 月:GCP us-central1 存储服务降级 4 小时
云厂商的 SLA 通常是 99.9%(三个九),换算下来每年有约 8.76 小时的计划内/外停机。对于普通 Web 应用,这或许可以接受;对于行情系统,这意味着每年至少有一天你的用户看不到实时行情。
场景二:第三方数据供应商限频或服务降级
假设你用 Polygon.io 作为美股数据源。他们的免费层限制是 5 API 请求/分钟,专业层是 100 请求/分钟。在高频行情波动时,这个配额可能在几分钟内耗尽——你的应用会在毫秒级延迟后收到 429 Too Many Requests。
场景三:网络链路抖动
即使云厂商本身没问题,你的数据中心到云厂商的专线路由也可能出现问题。BGP 路由震荡、光纤切割事故、DDoS 攻击——这些都会导致临时性的网络不可达。
1.2 传统方案的三个缺陷
面对这些问题,大多数团队的"灾备方案"是:
- 手动切换:等告警来了,人工登录后台改配置。这是最常见的做法,也是响应时间最长的方案——平均恢复时间(MTTR)可能超过 30 分钟。
- 主备镜像:两个数据源并行运行,主挂了切到备。问题是:备源的数据延迟可能比主源高,你怎么判断"主真的挂了"还是"只是暂时抖动"?
- DNS 权重切换:通过调整 DNS 权重实现流量切换。问题是 DNS 传播有延迟(通常 5-30 分钟),而且频繁修改 DNS 权重会被某些递归 DNS 服务器缓存拒绝服务。
这三个方案要么响应慢,要么判断不准,要么切换不干净。
1.3 我们的目标:30 秒自动切换
"30 秒" 这个数字不是随便定的:
- 30 秒足够让人类操作员响应一个真实告警
- 30 秒足够让健康检查完成至少 3 次探活(10 秒间隔)
- 30 秒足够短,不会错过大多数日内交易的关键窗口
接下来的章节,我会展示一个完整的、生产级的多云灾备架构实现。
二、架构总览:三层容错设计
整个灾备系统的设计遵循**"感知层-决策层-执行层"**三层分离原则:
┌─────────────────────────────────────────────────────────────────┐
│ 感知层:健康检查器 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ AWS 主源 │ │ GCP 备源 │ │ TickDB 备源 │ │
│ │ 健康检查 │ │ 健康检查 │ │ 健康检查 │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
└─────────┼────────────────┼────────────────┼────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ 决策层:仲裁中心 │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 状态机:HEALTHY → DEGRADED → FAILOVER → RECOVERING │ │
│ │ 触发条件:连续 3 次检查失败 或 单次检查延迟 > 500ms │ │
│ │ 回切策略:主源连续 10 次检查成功后自动回切 │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 执行层:流量控制器 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ DNS 故障 │ │ WebSocket │ │ 数据源 │ │
│ │ 转移器 │ │ 订阅重连 │ │ 权重调整 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘
核心设计哲学:
- 主动探活,而非被动等死:不是等连接断了才切换,而是定期检查数据源健康状况,提前感知即将发生的问题
- 渐进式降级,而非一刀切:状态机设计了 DEGRADED 中间态,允许在性能下降时先告警、再切换
- 可配置的回切策略:主源恢复后,是否立即回切?连续多少次健康检查才能确认稳定?这都是可配置的参数
三、生产级代码实现
3.1 健康检查器:感知层的核心组件
健康检查不是简单的"发个请求看有没有响应"。一个健壮的健康检查需要:
- 检测数据新鲜度(数据是否在预期时间内更新)
- 检测延迟阈值(响应时间是否在可接受范围)
- 检测数据完整性(返回的数据结构是否完整)
- 处理自身容错(健康检查服务本身不能成为单点)
import asyncio
import time
import random
import logging
from dataclasses import dataclass, field
from typing import Optional, Callable, List
from enum import Enum
from abc import ABC, abstractmethod
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class HealthStatus(Enum):
"""数据源健康状态枚举"""
HEALTHY = "healthy" # 完全健康
DEGRADED = "degraded" # 性能降级(延迟高但可用)
UNHEALTHY = "unhealthy" # 不可用
@dataclass
class HealthCheckResult:
"""健康检查结果"""
status: HealthStatus
latency_ms: float
timestamp: float
error_message: Optional[str] = None
data_freshness_seconds: Optional[float] = None
@dataclass
class DataSourceConfig:
"""数据源配置"""
name: str
source_type: str # "aws", "gcp", "tickdb"
check_endpoint: str # 健康检查端点
api_key: Optional[str] = None # API Key(从环境变量读取)
max_latency_ms: float = 500 # 最大可接受延迟
max_staleness_seconds: float = 10 # 数据最大陈旧时间
is_primary: bool = False # 是否主数据源
class HealthChecker:
"""
行情数据源健康检查器
功能:
- 定期对多个数据源进行健康检查
- 计算健康状态和延迟
- 支持异步并行检查
"""
def __init__(self, sources: List[DataSourceConfig]):
self.sources = {s.name: s for s in sources}
self.results: dict[str, HealthCheckResult] = {}
self._check_interval = 10 # 10秒检查一次
self._failure_threshold = 3 # 连续3次失败触发切换
self._consecutive_failures: dict[str, int] = {s.name: 0 for s in sources}
async def check_single_source(
self,
source: DataSourceConfig
) -> HealthCheckResult:
"""
检查单个数据源的健康状态
检查逻辑:
1. 测量 API 响应延迟
2. 解析返回数据,检查数据新鲜度
3. 根据延迟阈值判断 DEGRADED 或 UNHEALTHY
"""
start_time = time.time()
try:
# ⚠️ 生产环境建议使用 aiohttp 做异步 HTTP 请求
# 这里使用同步 requests 是为了代码可读性
import requests
headers = {}
if source.api_key:
headers["X-API-Key"] = source.api_key
response = requests.get(
source.check_endpoint,
headers=headers,
timeout=(3.05, 10) # 连接超时 3.05s,读取超时 10s
)
latency_ms = (time.time() - start_time) * 1000
# 解析返回数据(示例以 TickDB 格式为例)
# 实际生产中根据不同数据源适配
data = response.json()
server_timestamp = data.get("data", {}).get("ts", 0)
local_timestamp = time.time() * 1000
staleness_ms = local_timestamp - server_timestamp
# 判断状态
if response.status_code != 200:
return HealthCheckResult(
status=HealthStatus.UNHEALTHY,
latency_ms=latency_ms,
timestamp=time.time(),
error_message=f"HTTP {response.status_code}",
data_freshness_seconds=staleness_ms / 1000
)
if latency_ms > source.max_latency_ms:
logger.warning(
f"[{source.name}] 延迟过高: {latency_ms:.2f}ms "
f"(阈值: {source.max_latency_ms}ms)"
)
return HealthCheckResult(
status=HealthStatus.DEGRADED,
latency_ms=latency_ms,
timestamp=time.time(),
data_freshness_seconds=staleness_ms / 1000
)
if staleness_ms / 1000 > source.max_staleness_seconds:
logger.warning(
f"[{source.name}] 数据陈旧: {staleness_ms/1000:.2f}s "
f"(阈值: {source.max_staleness_seconds}s)"
)
return HealthCheckResult(
status=HealthStatus.DEGRADED,
latency_ms=latency_ms,
timestamp=time.time(),
data_freshness_seconds=staleness_ms / 1000
)
return HealthCheckResult(
status=HealthStatus.HEALTHY,
latency_ms=latency_ms,
timestamp=time.time(),
data_freshness_seconds=staleness_ms / 1000
)
except requests.exceptions.Timeout:
return HealthCheckResult(
status=HealthStatus.UNHEALTHY,
latency_ms=(time.time() - start_time) * 1000,
timestamp=time.time(),
error_message="请求超时"
)
except requests.exceptions.ConnectionError as e:
return HealthCheckResult(
status=HealthStatus.UNHEALTHY,
latency_ms=(time.time() - start_time) * 1000,
timestamp=time.time(),
error_message=f"连接失败: {str(e)}"
)
except Exception as e:
logger.error(f"[{source.name}] 健康检查异常: {e}")
return HealthCheckResult(
status=HealthStatus.UNHEALTHY,
latency_ms=(time.time() - start_time) * 1000,
timestamp=time.time(),
error_message=str(e)
)
async def check_all(self) -> dict[str, HealthCheckResult]:
"""
并行检查所有数据源
使用 asyncio.gather 实现并行检查,提高效率
"""
tasks = [
self.check_single_source(source)
for source in self.sources.values()
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 更新结果记录
source_names = list(self.sources.keys())
for name, result in zip(source_names, results):
if isinstance(result, Exception):
logger.error(f"[{name}] 检查异常: {result}")
self.results[name] = HealthCheckResult(
status=HealthStatus.UNHEALTHY,
latency_ms=0,
timestamp=time.time(),
error_message=str(result)
)
else:
self.results[name] = result
return self.results
def get_best_source(self) -> Optional[str]:
"""
获取当前最优数据源
策略:
1. 优先选择 HEALTHY 状态且延迟最低的
2. 其次选择 DEGRADED 状态且延迟最低的
3. 如果都不可用,返回 None
"""
healthy_sources = [
(name, result) for name, result in self.results.items()
if result.status == HealthStatus.HEALTHY
]
if healthy_sources:
healthy_sources.sort(key=lambda x: x[1].latency_ms)
return healthy_sources[0][0]
degraded_sources = [
(name, result) for name, result in self.results.items()
if result.status == HealthStatus.DEGRADED
]
if degraded_sources:
degraded_sources.sort(key=lambda x: x[1].latency_ms)
return degraded_sources[0][0]
return None
def should_failover(self, source_name: str) -> bool:
"""
判断是否应该触发故障转移
规则:连续 N 次检查失败
"""
result = self.results.get(source_name)
if not result:
self._consecutive_failures[source_name] += 1
elif result.status == HealthStatus.UNHEALTHY:
self._consecutive_failures[source_name] += 1
else:
self._consecutive_failures[source_name] = 0
return self._consecutive_failures.get(source_name, 0) >= self._failure_threshold
3.2 状态机与故障转移控制器
光有健康检查还不够。你需要一个状态机来管理数据源的切换逻辑,以及一个控制器来执行具体的切换操作。
import asyncio
import logging
from enum import Enum
from typing import Optional, Callable, Dict, Any
import time
logger = logging.getLogger(__name__)
class FailoverState(Enum):
"""故障转移状态机状态"""
NORMAL = "normal" # 正常运行,主数据源活跃
DEGRADED = "degraded" # 性能降级,监控告警
FAILOVER_IN_PROGRESS = "failover_in_progress" # 正在切换
FAILED_OVER = "failed_over" # 已切换到备用源
RECOVERING = "recovering" # 主源恢复中,等待确认稳定
ROLLBACK_IN_PROGRESS = "rollback_in_progress" # 正在回切
class FailoverController:
"""
故障转移控制器
管理状态机转换,执行切换操作
核心逻辑:
- NORMAL → DEGRADED:检测到延迟超标
- DEGRADED → FAILOVER_IN_PROGRESS:连续失败达到阈值
- FAILOVER_IN_PROGRESS → FAILED_OVER:备用源确认可用
- FAILED_OVER → RECOVERING:主源恢复
- RECOVERING → ROLLBACK_IN_PROGRESS:主源连续健康
- ROLLBACK_IN_PROGRESS → NORMAL:回切成功
"""
def __init__(
self,
primary_source: str,
backup_sources: list[str],
health_checker: HealthChecker,
on_failover_callback: Optional[Callable] = None,
on_rollback_callback: Optional[Callable] = None
):
self.state = FailoverState.NORMAL
self.primary_source = primary_source
self.backup_sources = backup_sources
self.health_checker = health_checker
self.current_active_source = primary_source
# 回切参数:主源需要连续健康 N 次才能回切
self.rollback_success_threshold = 10 # 默认连续 10 次健康检查成功
self._consecutive_primary_healthy = 0
# 切换回调
self.on_failover_callback = on_failover_callback
self.on_rollback_callback = on_rollback_callback
# 切换记录
self.last_failover_time: Optional[float] = None
self.last_rollback_time: Optional[float] = None
self.failover_count = 0
logger.info(
f"FailoverController 初始化: 主源={primary_source}, "
f"备用源={backup_sources}"
)
async def run_once(self) -> bool:
"""
执行一次状态检查和可能的切换
Returns:
True 表示发生了切换,False 表示无变化
"""
await self.health_checker.check_all()
old_state = self.state
old_source = self.current_active_source
# 状态机转换
if self.state == FailoverState.NORMAL:
await self._handle_normal_state()
elif self.state == FailoverState.DEGRADED:
await self._handle_degraded_state()
elif self.state == FailoverState.FAILED_OVER:
await self._handle_failed_over_state()
# 记录切换
if old_source != self.current_active_source:
self._record_switch(old_source, self.current_active_source)
return True
return False
async def _handle_normal_state(self):
"""正常状态处理"""
primary_result = self.health_checker.results.get(self.primary_source)
# 检查主源是否降级
if self.health_checker.should_failover(self.primary_source):
logger.warning(
f"[{self.primary_source}] 连续健康检查失败,"
f"进入 DEGRADED 状态"
)
self.state = FailoverState.DEGRADED
return
# 检查是否只是延迟高(降级)
if primary_result and primary_result.status.value == "degraded":
logger.warning(
f"[{self.primary_source}] 性能降级,延迟: "
f"{primary_result.latency_ms:.2f}ms"
)
# 降级状态在下次检查时判断是否切换
async def _handle_degraded_state(self):
"""降级状态处理"""
if self.health_checker.should_failover(self.primary_source):
await self._execute_failover()
async def _handle_failed_over_state(self):
"""已故障转移状态处理"""
primary_result = self.health_checker.results.get(self.primary_source)
# 检查主源是否恢复
if primary_result and primary_result.status.value == "healthy":
self._consecutive_primary_healthy += 1
if self._consecutive_primary_healthy >= self.rollback_success_threshold:
logger.info(
f"[{self.primary_source}] 主源已稳定 "
f"({self._consecutive_primary_healthy} 次健康检查),"
f"准备回切"
)
await self._execute_rollback()
else:
self._consecutive_primary_healthy = 0
async def _execute_failover(self):
"""执行故障转移"""
logger.warning(
f"⚠️ 开始故障转移: {self.primary_source} → {self.backup_sources[0]}"
)
self.state = FailoverState.FAILOVER_IN_PROGRESS
# 找到最优的备用源
best_backup = None
for backup in self.backup_sources:
result = self.health_checker.results.get(backup)
if result and result.status.value in ("healthy", "degraded"):
if best_backup is None:
best_backup = backup
elif result.latency_ms < self.health_checker.results[best_backup].latency_ms:
best_backup = backup
if best_backup is None:
logger.error("❌ 没有可用的备用数据源!")
self.state = FailoverState.DEGRADED
return
# 执行切换
self.current_active_source = best_backup
self.last_failover_time = time.time()
self.failover_count += 1
self.state = FailoverState.FAILED_OVER
# 调用切换回调
if self.on_failover_callback:
await self.on_failover_callback(
from_source=self.primary_source,
to_source=best_backup
)
logger.info(
f"✅ 故障转移完成: 当前活跃源 = {self.current_active_source}"
)
async def _execute_rollback(self):
"""执行回切"""
logger.info(
f"⚡ 开始回切: {self.current_active_source} → {self.primary_source}"
)
self.state = FailoverState.ROLLBACK_IN_PROGRESS
# 执行回切
old_source = self.current_active_source
self.current_active_source = self.primary_source
self.last_rollback_time = time.time()
self.state = FailoverState.NORMAL
self._consecutive_primary_healthy = 0
# 调用回切回调
if self.on_rollback_callback:
await self.on_rollback_callback(
from_source=old_source,
to_source=self.primary_source
)
logger.info(f"✅ 回切完成: 当前活跃源 = {self.current_active_source}")
def _record_switch(self, old: str, new: str):
"""记录切换事件"""
duration = 0
if self.last_failover_time:
duration = time.time() - self.last_failover_time
logger.info(
f"📍 切换记录: {old} → {new}, "
f"切换耗时: {duration:.2f}s, "
f"累计切换次数: {self.failover_count}"
)
@property
def stats(self) -> Dict[str, Any]:
"""获取统计信息"""
return {
"state": self.state.value,
"active_source": self.current_active_source,
"primary_source": self.primary_source,
"failover_count": self.failover_count,
"last_failover_time": self.last_failover_time,
"last_rollback_time": self.last_rollback_time,
"consecutive_primary_healthy": self._consecutive_primary_healthy,
"rollback_threshold": self.rollback_success_threshold
}
3.3 带断路器模式的数据源客户端
故障转移控制器解决了"何时切换"的问题,但"切换后如何重连"同样重要。
这里使用断路器模式(Circuit Breaker Pattern)来管理数据源连接。当某个数据源连续失败时,断路器会"跳闸",暂时停止向该数据源发起请求,避免雪崩效应。
import asyncio
import random
from typing import Optional, Any, Dict
import logging
logger = logging.getLogger(__name__)
class CircuitBreakerState(Enum):
CLOSED = "closed" # 正常状态,允许请求
OPEN = "open" # 断路器打开,拒绝请求
HALF_OPEN = "half_open" # 半开状态,允许试探性请求
class CircuitBreaker:
"""
断路器实现
状态转换:
- CLOSED → OPEN:失败次数超过阈值
- OPEN → HALF_OPEN:冷却时间到期
- HALF_OPEN → CLOSED:试探请求成功
- HALF_OPEN → OPEN:试探请求失败
"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
half_open_max_calls: int = 3
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self.state = CircuitBreakerState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time: Optional[float] = None
self.half_open_calls = 0
def record_success(self):
"""记录成功调用"""
if self.state == CircuitBreakerState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.half_open_max_calls:
logger.info("🔄 断路器关闭,服务恢复")
self.state = CircuitBreakerState.CLOSED
self.failure_count = 0
self.success_count = 0
else:
self.failure_count = 0
def record_failure(self):
"""记录失败调用"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitBreakerState.HALF_OPEN:
logger.warning("⚡ 断路器重新打开(半开状态试探失败)")
self.state = CircuitBreakerState.OPEN
self.half_open_calls = 0
self.success_count = 0
elif self.failure_count >= self.failure_threshold:
logger.warning(
f"⚠️ 断路器打开(连续 {self.failure_count} 次失败)"
)
self.state = CircuitBreakerState.OPEN
def can_execute(self) -> bool:
"""检查是否可以执行请求"""
if self.state == CircuitBreakerState.CLOSED:
return True
if self.state == CircuitBreakerState.OPEN:
# 检查冷却时间是否到期
if self.last_failure_time:
elapsed = time.time() - self.last_failure_time
if elapsed >= self.recovery_timeout:
logger.info("🔄 断路器进入半开状态")
self.state = CircuitBreakerState.HALF_OPEN
self.half_open_calls = 0
return True
return False
if self.state == CircuitBreakerState.HALF_OPEN:
return self.half_open_calls < self.half_open_max_calls
return False
@property
def status(self) -> Dict[str, Any]:
return {
"state": self.state.value,
"failure_count": self.failure_count,
"success_count": self.success_count,
"last_failure_time": self.last_failure_time
}
class ResilientDataSourceClient:
"""
带灾备能力的行情数据客户端
特性:
- 自动故障检测与切换
- 断路器保护
- 指数退避重试
- 请求级超时控制
"""
def __init__(
self,
failover_controller: FailoverController,
health_checker: HealthChecker
):
self.failover_controller = failover_controller
self.health_checker = health_checker
# 为每个数据源初始化断路器
self.circuit_breakers: Dict[str, CircuitBreaker] = {}
for source_name in self.health_checker.sources.keys():
self.circuit_breakers[source_name] = CircuitBreaker(
failure_threshold=5,
recovery_timeout=30.0
)
async def get_market_data(
self,
symbol: str,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0
) -> Optional[Dict[str, Any]]:
"""
获取市场数据(带自动重试和故障转移)
重试策略:指数退避 + 抖动
"""
retry_count = 0
last_error: Optional[Exception] = None
while retry_count < max_retries:
# 获取当前活跃数据源
active_source = self.failover_controller.current_active_source
# 检查断路器
cb = self.circuit_breakers.get(active_source)
if cb and not cb.can_execute():
logger.warning(
f"[{active_source}] 断路器打开,尝试备用源"
)
# 尝试其他数据源
for backup_name in self.failover_controller.backup_sources:
if backup_name != active_source:
backup_cb = self.circuit_breakers.get(backup_name)
if backup_cb and backup_cb.can_execute():
active_source = backup_name
break
else:
# 所有断路器都打开,等待恢复
await asyncio.sleep(5)
retry_count += 1
continue
try:
# ⚠️ 这里应该是实际的 API 调用
# 示例:使用 TickDB API 获取 K 线数据
data = await self._fetch_from_source(
source=active_source,
symbol=symbol
)
# 成功,重置断路器
if cb:
cb.record_success()
return data
except Exception as e:
last_error = e
logger.warning(
f"[{active_source}] 请求失败 ({retry_count+1}/{max_retries}): {e}"
)
# 记录断路器
if cb:
cb.record_failure()
# 指数退避 + 抖动
delay = min(base_delay * (2 ** retry_count), max_delay)
jitter = random.uniform(0, delay * 0.1) # 10% 抖动
await asyncio.sleep(delay + jitter)
retry_count += 1
# 所有重试都失败
logger.error(
f"❌ 获取行情数据失败,已重试 {max_retries} 次: {last_error}"
)
return None
async def _fetch_from_source(
self,
source: str,
symbol: str
) -> Dict[str, Any]:
"""
从指定数据源获取数据
⚠️ 生产环境需要根据不同数据源适配
这里示例展示 TickDB API 的调用方式
"""
source_config = self.health_checker.sources.get(source)
# ⚠️ 生产环境建议使用 aiohttp 做异步请求
import requests
headers = {}
if source_config and source_config.api_key:
headers["X-API-Key"] = source_config.api_key
# 示例:获取最新 K 线数据
endpoint = source_config.check_endpoint.replace(
"/health", f"/kline/latest"
) if "/health" in source_config.check_endpoint else source_config.check_endpoint
try:
response = requests.get(
endpoint,
headers=headers,
params={"symbol": symbol},
timeout=(3.05, 10)
)
if response.status_code == 429:
# 限频处理
retry_after = int(response.headers.get("Retry-After", 5))
logger.warning(f"[{source}] 请求被限频,等待 {retry_after}s")
await asyncio.sleep(retry_after)
raise Exception("Rate limited")
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
raise Exception(f"请求超时: {source}")
except requests.exceptions.ConnectionError as e:
raise Exception(f"连接失败: {e}")
四、部署方案:不同规模的配置建议
灾备架构的复杂度应该与你的业务规模和容错需求匹配。以下是三个档位的推荐配置:
| 维度 | 个人开发者 | 小型团队 | 机构级 |
|---|---|---|---|
| 数据源配置 | 主源 + TickDB 免费层 | 主源 + 2 个备用源 | 多区域主源 + TickDB 企业版 + 自托管备源 |
| 健康检查间隔 | 30 秒 | 10 秒 | 5 秒 |
| 故障转移阈值 | 连续 3 次失败 | 连续 5 次失败 | 连续 3 次失败 |
| 回切等待时间 | 5 分钟 | 10 分钟 | 30 分钟 |
| 告警通知 | 邮件 | 邮件 + Slack | 邮件 + Slack + 电话 |
| 监控面板 | 基础状态页 | Grafana 仪表盘 | 完整可观测性套件 |
| 代码复杂度 | 单进程同步 | 多进程 + Redis | Kubernetes + 服务网格 |
4.1 个人开发者:最小化配置
如果你是个人开发者,目标是"别让半夜宕机毁掉睡眠":
# minimal_disaster_recovery.py
import asyncio
import requests
import os
import time
# 极简版:只检查主源和 TickDB
SOURCES = [
{"name": "aws_primary", "url": "https://api.your-primary.com/health"},
{"name": "tickdb_backup", "url": "https://api.tickdb.ai/v1/health"},
]
API_KEY = os.environ.get("TICKDB_API_KEY")
ACTIVE_SOURCE = "aws_primary"
def check_health(name: str, url: str) -> tuple[str, bool, float]:
try:
headers = {"X-API-Key": API_KEY} if "tickdb" in url else {}
start = time.time()
r = requests.get(url, headers=headers, timeout=5)
latency = (time.time() - start) * 1000
return (name, r.status_code == 200, latency)
except:
return (name, False, 0)
def should_switch():
for name, url in SOURCES:
ok, latency = check_health(name, url)[1:]
if not ok or latency > 500:
return True
return False
# 简单循环检查
while True:
if should_switch() and ACTIVE_SOURCE == "aws_primary":
print("⚠️ 切换到 TickDB 备用源")
ACTIVE_SOURCE = "tickdb_backup"
time.sleep(30)
这 30 行代码实现了最基础的灾备逻辑:每 30 秒检查一次,主源失败就切到 TickDB。
4.2 小型团队:Grafana + AlertManager 集成
对于有专职运维的团队,建议接入现有的可观测性基础设施:
# alerting-rules.yml
groups:
- name: failover_alerts
rules:
- alert: DataSourceFailover
expr: failover_state != 0
for: 1m
labels:
severity: critical
annotations:
summary: "数据源故障转移已触发"
description: "从 {{ $labels.primary }} 切换到 {{ $labels.backup }}"
- alert: DataSourceLatencyHigh
expr: datasource_latency_ms > 500
for: 5m
labels:
severity: warning
annotations:
summary: "数据源延迟过高"
description: "{{ $labels.source }} 延迟 {{ $value }}ms"
4.3 机构级:Kubernetes 部署
生产级别推荐使用 Kubernetes 部署,配合 Helm Chart 实现声明式配置:
# values.yaml (Helm Chart)
replicaCount: 2
image:
repository: your-org/market-data-client
tag: latest
pullPolicy: IfNotPresent
resources:
requests:
memory: "256Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
env:
- name: TICKDB_API_KEY
valueFrom:
secretKeyRef:
name: tickdb-credentials
key: api-key
config:
failover:
checkIntervalSeconds: 10
failureThreshold: 3
rollbackThreshold: 10
sources:
- name: aws-primary
type: aws
endpoint: https://api.aws-market.com/v1
priority: 1
- name: gcp-backup
type: gcp
endpoint: https://api.gcp-market.com/v1
priority: 2
- name: tickdb
type: tickdb
endpoint: https://api.tickdb.ai/v1
priority: 3
五、DNS 故障转移:最后一公里的可靠性
除了应用层的切换,DNS 层的故障转移也是重要一环。当主数据中心彻底不可达时,需要通过 DNS 切换将流量引导到备用数据中心。
主流方案有两个:
| 方案 | 工具 | 优点 | 缺点 |
|---|---|---|---|
| 加权轮询 DNS | Route 53、Cloudflare | 配置简单,成本低 | DNS 传播有延迟(5-30 分钟) |
| Anycast + BGP | Cloudflare、Akamai | 毫秒级切换,接近零延迟 | 成本高,配置复杂 |
对于行情系统,建议使用加权轮询 DNS + 应用层健康检查组合:
应用层检测到故障 → 修改 DNS 权重 → 旧 DNS 记录缓存过期 → 流量切换
Cloudflare 的 Workers 可以实现这个逻辑:
// cloudflare-worker.js
addEventListener('fetch', event => {
event.respondWith(handleRequest(event.request))
})
async function handleRequest(request) {
const failoverState = await getFailoverState()
// 根据故障转移状态选择后端
let backend
if (failoverState.active === 'aws') {
backend = BACKENDS.aws
} else if (failoverState.active === 'tickdb') {
backend = BACKENDS.tickdb
} else {
backend = BACKENDS.gcp
}
const url = new URL(request.url)
const modifiedRequest = new Request(
`${backend}${url.pathname}${url.search}`,
request
)
return fetch(modifiedRequest)
}
async function getFailoverState() {
// 从 KV 存储读取最新状态
const state = await MARKET_STATE.get('failover_state')
return JSON.parse(state || '{"active": "aws"}')
}
六、TickDB 在灾备体系中的角色
在多云灾备架构中,TickDB 扮演的是**"最后一道防线"**的角色:
| 特性 | TickDB 作为备用源的优势 |
|---|---|
| 全球 CDN 加速 | 数据从最近的边缘节点分发,降低网络延迟 |
| SLA 99.9% | 比肩主流云厂商的可用性承诺 |
| 10 年历史 K 线 | 不仅实时可用,还能无缝衔接历史回测 |
| 统一 API | 一个接口覆盖美股、港股、数字货币等多资产 |
| 免费层可用 | 开发测试零成本,灾备不花钱 |
主数据源(AWS/GCP)→ TickDB 免费层 → 自建备用源
99.5% 可用 99.9% 可用 取决于运维
如果你的主数据源是 AWS,TickDB 作为备用可以将整体可用性提升到 99.9999%(六九级别)。
七、结语
灾备不是"出了问题再想办法",而是"提前设计好在问题发生时该怎么办"。
本文展示的三层容错架构(健康检查层 → 决策层 → 执行层)可以总结为三个核心原则:
- 主动探测,不要被动等待:定期检查数据源健康状况,在用户感知到问题之前就采取行动
- 渐进降级,不要一刀切:设计 DEGRADED 中间态,允许系统在性能下降时继续服务
- 自动恢复,不要只管切换不管回切:主源恢复后要能够自动回切,避免长期依赖备用源导致的成本和延迟问题
架构图镇楼:
┌─────────────────────────────────────────────────────────────────┐
│ 你的行情系统 │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 健康检查器 │───▶│ 状态机 │───▶│ 断路器 │ │
│ │ (10s 探测) │ │ (判断何时切) │ │ (防止雪崩) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ AWS 主源 │ ✗ │ GCP 备源 │ ✗ │ TickDB │ ✓ │
│ └───────────┘ └───────────┘ └───────────┘ │
│ ▲ │
│ "最后一道防线" ────┘ │
└─────────────────────────────────────────────────────────────────┘
下一步行动
如果你想立刻开始搭建灾备系统:
- 访问 tickdb.ai 注册获取免费 API Key
- 在控制台创建告警规则,设置故障转移阈值
- 参考本文代码,实现健康检查 + 状态机的基础版本
如果你需要企业级灾备方案:
- 联系 [email protected] 获取多区域数据源的企业级 SLA
- 申请 TickDB 技术团队一对一架构评审
- 了解更多关于 TickDB 与主流云厂商的集成最佳实践
如果你想深入学习容错架构:
- 推荐书籍:《Designing Data-Intensive Applications》第 8 章
- 推荐论文:Michael N. Birkner 等人,《Circuit Breakers and Bulkheads》
风险提示:本文介绍的技术架构仅供参考,实际部署时需要根据业务场景、数据规模、成本预算等因素进行评估和调整。任何系统设计都存在权衡取舍,没有完美的解决方案,只有适合当前阶段的最佳选择。