程序员的午夜惊魂:那条没存进去的数据去哪了
凌晨 3 点,你的行情监控服务突然被云厂商重启了。WebSocket 断开的瞬间,最后一笔深度数据——买卖盘口里那个关键的 45,000 股卖压堆积——还没来得及落盘就没了。
重启后的程序空空如也。它不知道刚才发生了什么,订单簿初始化为零,从头开始重建。10 分钟后,它才重新捕捉到类似的价格信号,但彼时市场环境早已不同——那次机会窗口,就这样永远错过了。
这不是虚构的事故。这是每一个写过生产级 WebSocket 客户端的工程师,或早或晚都会撞上的墙。
问题出在哪里?不是 WebSocket 不稳定,而是程序根本没有为“被中断”这件事做任何准备。连接是临时的,数据在内存里,进程挂了就是丢了——就像在一张没有保存功能的 Excel 里做表格,停电就全没了。
这篇文章要解决的就是这个问题:让程序在任意时刻被终止、重启、迁移,都能从中断处无缝接续,不丢失任何关键状态。
我们把这个问题拆成三块:
- 信号处理:怎么让程序收到终止信号后有序收尾,而不是突然暴毙?
- 检查点机制:把关键状态定期写到磁盘,重启后能恢复?
- 断点续传:订阅数据从哪条开始补,才能既不漏也不重复?
配合生产级代码,从 WebSocket 心跳重连到 SQLite 本地缓冲,完整闭环。
一、为什么你的程序总是在最关键的时刻崩
在谈方案之前,先弄清楚问题的本质。WebSocket 连接天然是不可靠的,这不算什么秘密。但工程师们容易低估的是不可靠的范围——它不只是网络层面的断开,还包括操作系统层面的信号。
当你执行 docker stop、kubectl rollout restart,或云厂商触发机器重启时,进程收到的不是 SIGKILL(立即杀死),而是 SIGTERM(优雅终止)。默认超时 10 秒,超时后才会强制 SIGKILL。
这意味着你有 10 秒窗口来做清理工作。但大多数程序根本没接管这个信号,收到 SIGTERM 后直接退出了——白瞎了这 10 秒。
SIGTERM (15) → 程序收到 → 收尾逻辑(你有 ~10 秒)→ 退出
SIGKILL (9) → 程序收到 → 立即终止 → 数据丢失(无法捕获)
所以第一步,是让程序正确响应 SIGTERM,在宝贵的窗口期内把状态刷到磁盘。
二、信号处理:让程序优雅谢幕
2.1 基础信号捕获
Python 的 signal 模块可以拦截系统信号。以下是一个最简信号处理框架:
import signal
import sys
import time
from typing import Callable
class GracefulShutdown:
"""优雅关闭管理器"""
def __init__(self):
self.shutdown_requested = False
self._handlers = []
def request_shutdown(self, signum, frame):
"""SIGTERM/SIGINT 信号处理器"""
print(f"\n[收到信号 {signum}] 优雅关闭中...", flush=True)
self.shutdown_requested = True
# 执行注册的所有收尾回调
for handler in self._handlers:
try:
handler()
except Exception as e:
print(f"[收尾失败] {e}", flush=True)
def register(self, callback: Callable[[], None]):
"""注册收尾回调,按注册顺序执行"""
self._handlers.append(callback)
return callback # 支持装饰器用法
def is_shutting_down(self) -> bool:
return self.shutdown_requested
# 全局实例
shutdown_mgr = GracefulShutdown()
# 注册信号处理器(主进程)
signal.signal(signal.SIGTERM, shutdown_mgr.request_shutdown)
signal.signal(signal.SIGINT, shutdown_mgr.request_shutdown)
这个 GracefulShutdown 类做了三件事:
- 捕获
SIGTERM和SIGINT(Ctrl+C) - 标记关闭状态,外部代码可以轮询这个标志来决定是否继续处理
- 注册回调函数,在收到关闭信号后统一执行
2.2 让 WebSocket 客户端感知关闭信号
现在把这个信号处理器和 WebSocket 客户端集成。下面的代码展示了一个完整的生产级 WebSocket 连接管理器:
import asyncio
import json
import logging
import random
import signal
import time
import os
from datetime import datetime
from typing import Optional
import websockets
import websockets.exceptions
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger(__name__)
class ResilientWebSocketClient:
"""
具有优雅关闭和断点续传能力的 WebSocket 客户端。
"""
BASE_URL = "wss://api.tickdb.ai/ws"
def __init__(self, api_key: str, channels: list[str]):
self.api_key = api_key
self.channels = channels
self.shutdown_mgr = GracefulShutdown()
self.ws: Optional[websockets.WebSocketClientProtocol] = None
self.reconnect_delay = 1.0
self.max_reconnect_delay = 60.0
self.retry_count = 0
# 注册 WebSocket 相关的收尾回调
self.shutdown_mgr.register(self._flush_buffer)
self.shutdown_mgr.register(self._save_checkpoint)
async def connect(self):
"""
建立 WebSocket 连接,支持断点续传。
"""
uri = f"{self.BASE_URL}?api_key={self.api_key}"
# 从检查点恢复订阅位置
checkpoint = self._load_checkpoint()
if checkpoint:
logger.info(f"[断点续传] 从 checkpoint 恢复: {checkpoint}")
while not self.shutdown_mgr.is_shutting_down():
try:
self.ws = await websockets.connect(
uri,
ping_interval=20,
ping_timeout=10,
close_timeout=5,
)
logger.info("[连接已建立]")
self.retry_count = 0
self.reconnect_delay = 1.0
# 订阅频道
subscribe_msg = {
"cmd": "subscribe",
"args": self.channels
}
await self.ws.send(json.dumps(subscribe_msg))
logger.info(f"[订阅] {self.channels}")
# 启动心跳保活协程
ping_task = asyncio.create_task(self._keepalive())
# 消息处理循环
await self._message_loop(ping_task, checkpoint)
except websockets.exceptions.ConnectionClosed as e:
logger.warning(f"[连接关闭] code={e.code} reason={e.reason}")
except OSError as e:
logger.error(f"[网络错误] {e}")
except Exception as e:
logger.exception(f"[异常] {e}")
if self.shutdown_mgr.is_shutting_down():
break
# 指数退避 + 抖动重连
jitter = random.uniform(0, self.reconnect_delay * 0.1)
wait_time = self.reconnect_delay + jitter
logger.info(f"[重连] {wait_time:.1f}秒后尝试 (尝试 {self.retry_count + 1})")
await asyncio.sleep(wait_time)
self.retry_count += 1
self.reconnect_delay = min(
self.reconnect_delay * 2,
self.max_reconnect_delay
)
async def _keepalive(self):
"""心跳保活"""
while not self.shutdown_mgr.is_shutting_down():
try:
await asyncio.sleep(18)
if self.ws and self.ws.open:
await self.ws.send(json.dumps({"cmd": "ping"}))
logger.debug("[心跳] ping 发送")
except Exception:
break
async def _message_loop(self, ping_task, checkpoint):
"""消息处理主循环"""
async for message in self.ws:
if self.shutdown_mgr.is_shutting_down():
logger.info("[关闭] 消息循环退出")
ping_task.cancel()
await self.ws.close()
return
try:
data = json.loads(message)
await self._process_message(data, checkpoint)
except json.JSONDecodeError:
logger.warning(f"[解析错误] {message}")
except Exception:
logger.exception("[处理异常]")
async def _process_message(self, data, checkpoint):
"""处理接收到的消息"""
# 消息处理逻辑
# 此处调用数据处理方法
pass
def _flush_buffer(self):
"""收尾回调:刷新缓冲区到磁盘"""
logger.info("[收尾] 正在刷新缓冲区...")
# 强制将内存中的数据写入检查点
pass
def _save_checkpoint(self):
"""收尾回调:保存当前状态快照"""
logger.info("[收尾] 保存检查点...")
# 将订阅位置、时间戳等状态持久化
pass
def _load_checkpoint(self) -> Optional[dict]:
"""从磁盘恢复检查点"""
checkpoint_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"checkpoint.json"
)
try:
with open(checkpoint_path, "r") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return None
这段代码的结构值得细说几点:
为什么心跳和消息循环是分离的? 如果心跳放在主循环里,一旦某条消息处理卡住(IO 阻塞),心跳就无法发送,服务端会在 20 秒后主动断开连接。分离为独立的 asyncio.create_task 可以确保心跳不受消息处理速度影响。
指数退避的意义是什么? 断开后立即重连是新手最容易犯的错误。如果你的程序有 bug 导致连接反复断开又重连,瞬时间可能发出数千个连接请求,把服务器打爆,甚至触发限频(TickDB 的 3001 错误码)。指数退避 + 抖动能有效平滑重连节奏。
三、检查点机制:让状态可以“存档读档”
3.1 什么是检查点
检查点(Checkpoint)的核心思想很简单:把程序运行到某个时刻的关键状态序列化到磁盘,需要时再反序列化回来。
类比游戏:你在 BOSS 战之前手动存档,重启电脑后读档,从 BOSS 战开始打,而不是从第一关重新开始。
对于 WebSocket 行情程序,关键状态通常包括:
| 状态字段 | 说明 |
|---|---|
| 订阅频道列表 | 程序订阅了哪些市场、哪些数据频道 |
| 最后处理时间戳 | 防止重复处理已收到的数据 |
| 当前持仓快照 | 如果程序在管理仓位,需要持久化 |
| 缓冲区未刷新的数据 | 还在内存里等待批量写入磁盘的数据 |
| 序号位置(sequence ID) | 如果数据有全局递增序号,这就是断点 |
3.2 SQLite 检查点:生产级状态持久化方案
对于需要频繁写入的状态,SQLite 是最合适的选择。它是嵌入式数据库,不需要单独的进程,文件即数据库,且完全支持事务。
import sqlite3
import json
import threading
from datetime import datetime
from contextlib import contextmanager
from pathlib import Path
class CheckpointDB:
"""
基于 SQLite 的检查点持久化。
线程安全,支持并发读写。
"""
def __init__(self, db_path: str = "tickdb_checkpoint.db"):
self.db_path = db_path
self._lock = threading.Lock()
self._init_db()
def _init_db(self):
"""初始化数据库表结构"""
with self._get_conn() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS checkpoint (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS pending_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
channel TEXT NOT NULL,
timestamp TEXT NOT NULL,
sequence INTEGER,
payload TEXT NOT NULL,
processed INTEGER DEFAULT 0,
created_at TEXT NOT NULL
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_pending_unprocessed
ON pending_data(processed, created_at)
""")
conn.commit()
@contextmanager
def _get_conn(self):
"""线程安全的数据库连接"""
with self._lock:
conn = sqlite3.connect(self.db_path, check_same_thread=False)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
def save_checkpoint(self, key: str, value: dict):
"""保存单个检查点"""
with self._get_conn() as conn:
conn.execute("""
INSERT OR REPLACE INTO checkpoint (key, value, updated_at)
VALUES (?, ?, ?)
""", (key, json.dumps(value), datetime.utcnow().isoformat()))
conn.commit()
def load_checkpoint(self, key: str) -> Optional[dict]:
"""加载检查点,不存在返回 None"""
with self._get_conn() as conn:
row = conn.execute(
"SELECT value FROM checkpoint WHERE key = ?", (key,)
).fetchone()
if row:
return json.loads(row["value"])
return None
def save_pending(self, channel: str, timestamp: str,
sequence: Optional[int], payload: dict):
"""
将尚未处理完毕的数据写入待处理队列。
WebSocket 断连或程序重启时,未落盘的数据在这里暂存。
"""
with self._get_conn() as conn:
conn.execute("""
INSERT INTO pending_data
(channel, timestamp, sequence, payload, created_at)
VALUES (?, ?, ?, ?, ?)
""", (
channel, timestamp, sequence,
json.dumps(payload), datetime.utcnow().isoformat()
))
conn.commit()
def fetch_unprocessed(self, channel: str, limit: int = 100) -> list[dict]:
"""取出未处理的数据用于断点续传补数"""
with self._get_conn() as conn:
rows = conn.execute("""
SELECT id, timestamp, sequence, payload
FROM pending_data
WHERE channel = ? AND processed = 0
ORDER BY id ASC
LIMIT ?
""", (channel, limit)).fetchall()
return [
{
"id": row["id"],
"timestamp": row["timestamp"],
"sequence": row["sequence"],
"payload": json.loads(row["payload"])
}
for row in rows
]
def mark_processed(self, record_ids: list[int]):
"""标记数据已处理"""
if not record_ids:
return
placeholders = ",".join("?" * len(record_ids))
with self._get_conn() as conn:
conn.execute(f"""
UPDATE pending_data SET processed = 1
WHERE id IN ({placeholders})
""", record_ids)
conn.commit()
def cleanup_processed(self, days: int = 7):
"""清理 7 天前的已处理数据,防止数据库膨胀"""
with self._get_conn() as conn:
conn.execute("""
DELETE FROM pending_data
WHERE processed = 1
AND created_at < datetime('now', ?)
""", (f"-{days} days",))
conn.commit()
这个检查点模块解决了几件实际的事:
为什么用 threading.Lock 而不是直接锁行? SQLite 本身不支持高并发写入,单个写入事务是原子的,但多个线程同时开连接容易触发 database is locked 错误。用一个全局锁串行化所有数据库操作,是最简单可靠的做法。
pending_data 表的设计逻辑是什么? 这张表相当于一个缓冲队列。程序在收到 WebSocket 数据后,立即写入这张表(标记 processed=0),然后再做业务处理。如果程序在处理过程中崩溃,下次启动时从这张表里读出 processed=0 的记录继续处理——数据没有丢。
cleanup 策略怎么定? 保留 7 天的已处理数据是经验值。太短则无法做历史回放分析,太长则数据库体积膨胀。对于高频数据场景,可以按记录数而非天数做清理(如超过 10 万条则清理最旧的 20%)。
四、断点续传:从哪里开始继续
检查点解决了“状态保存”的问题,但还有一个关键问题没解决:重启后,从哪里开始补数据?
TickDB 的 WebSocket 连接在断连重连后,默认从头开始推送最新数据。如果你在处理一条数据后、写入 checkpoint 前崩溃了,那么这条数据已经丢失了——因为你没有持久化它,服务器也没有再推一遍。
这就是为什么需要在 _process_message 中引入一个“先写后处理”的顺序:
async def _process_message(self, data, checkpoint):
"""
处理消息的标准流程:
1. 先写入 pending_data(持久化)
2. 更新内存状态
3. 标记处理完成
4. 更新检查点
"""
channel = data.get("channel", "unknown")
timestamp = data.get("ts", datetime.utcnow().isoformat())
sequence = data.get("seq") # 某些频道支持全局序号
# ⭐ 步骤 1:先把数据写入 SQLite
# 即使后续处理失败,数据也已安全存储
self.checkpoint_db.save_pending(
channel=channel,
timestamp=timestamp,
sequence=sequence,
payload=data,
)
# ⭐ 步骤 2:处理业务逻辑
await self._handle_data(data)
# ⭐ 步骤 3:更新检查点(记录处理到哪了)
self.checkpoint_db.save_checkpoint("last_processed", {
"channel": channel,
"timestamp": timestamp,
"sequence": sequence,
"processed_at": datetime.utcnow().isoformat()
})
# ⭐ 步骤 4:标记 pending 为已处理(可选,提高查询效率)
# 在实际实现中,定期批量标记比逐条标记性能更好
这个“先写后处理”的顺序看起来违反直觉——直觉告诉你应该先处理再存盘。但数据安全的黄金法则是:任何在内存中产生的关键数据,在确认处理完成之前,必须先落盘。
顺序一颠倒,崩溃就丢数据。顺序对了,崩溃只是在重复处理已经落盘的数据——这可以通过检查点去重来避免。
4.1 重启后的恢复流程
async def recovery_loop(self):
"""
启动时运行的恢复流程:
1. 从 pending_data 恢复未处理数据
2. 从 REST API 补全订阅启动后的数据缺口
3. 清理已处理数据
"""
logger.info("[恢复] 开始断点续传...")
# 恢复步骤 1:处理 SQLite 中未落盘的数据
pending = self.checkpoint_db.fetch_unprocessed(limit=500)
if pending:
logger.info(f"[恢复] 发现 {len(pending)} 条未处理数据,开始补处理...")
for record in pending:
try:
await self._handle_data(record["payload"])
self.checkpoint_db.mark_processed([record["id"]])
except Exception:
logger.error(f"[恢复] 处理失败,跳过: {record['id']}")
continue
# 恢复步骤 2:检查时间戳缺口,通过 REST API 补全
last = self.checkpoint_db.load_checkpoint("last_processed")
if last and last.get("timestamp"):
logger.info(f"[恢复] 检查数据缺口,last={last['timestamp']}")
# 通过 TickDB REST API 补齐 WebSocket 断连期间的数据
gap_data = await self._fetch_gap_data(
last["timestamp"],
datetime.utcnow().isoformat()
)
for item in gap_data:
await self._handle_data(item)
# 恢复步骤 3:清理过期数据
self.checkpoint_db.cleanup_processed(days=7)
logger.info("[恢复] 完成")
为什么要分两步恢复? 因为 WebSocket 的 pending_data 只保存了你程序运行期间收到的数据。如果 WebSocket 断开期间(例如凌晨机器重启后)服务器推送了新的数据,而这些数据没进你的程序——那这部分缺口就只有 REST API 能补。
五、TickDB 的数据恢复能力:如何用 API 补全缺口
说到 REST API 补数,这里展开一下。TickDB 提供了一套完整的 REST 接口,核心的两个是:
| 接口 | 用途 | 什么时候用 |
|---|---|---|
GET /v1/market/kline |
获取历史 K 线数据 | 补全历史缺口 |
GET /v1/market/trades |
获取逐笔成交数据 | 补全交易事件 |
import os
import time
import requests
from datetime import datetime, timedelta
API_BASE = "https://api.tickdb.ai/v1"
def fetch_historical_klines(symbol: str, interval: str = "1m",
start_time: str = None, end_time: str = None,
limit: int = 1000) -> list[dict]:
"""
通过 REST API 获取历史 K 线数据,用于断点续传后的缺口补全。
"""
headers = {"X-API-Key": os.environ.get("TICKDB_API_KEY")}
params = {
"symbol": symbol,
"interval": interval,
"limit": limit,
}
if start_time:
params["start"] = start_time
if end_time:
params["end"] = end_time
response = requests.get(
f"{API_BASE}/market/kline",
headers=headers,
params=params,
timeout=(3.05, 10),
)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
time.sleep(retry_after)
return fetch_historical_klines(symbol, interval, start_time,
end_time, limit)
response.raise_for_status()
data = response.json()
if data.get("code") == 0:
return data.get("data", [])
elif data.get("code") == 3001:
retry_after = int(response.headers.get("Retry-After", 5))
time.sleep(retry_after)
return fetch_historical_klines(symbol, interval, start_time,
end_time, limit)
else:
raise RuntimeError(f"API 错误: {data.get('message')}")
def find_gaps(symbol: str, existing_timestamps: list[str],
interval: str = "1m") -> list[tuple[str, str]]:
"""
找出历史数据中的时间缺口。
输入:已有数据的时间戳列表
返回:(gap_start, gap_end) 元组列表
"""
if not existing_timestamps:
return []
sorted_ts = sorted(existing_timestamps)
gaps = []
for i in range(len(sorted_ts) - 1):
current = datetime.fromisoformat(sorted_ts[i].replace("Z", "+00:00"))
next_ts = datetime.fromisoformat(sorted_ts[i + 1].replace("Z", "+00:00"))
# 如果时间间隔大于 K 线周期的 1.5 倍,认为有缺口
if interval == "1m":
threshold = timedelta(minutes=2)
elif interval == "5m":
threshold = timedelta(minutes=8)
elif interval == "1h":
threshold = timedelta(hours=2)
else:
threshold = timedelta(days=1)
if next_ts - current > threshold:
gaps.append((
sorted_ts[i],
sorted_ts[i + 1]
))
return gaps
一个实际场景:假设你的程序在 14:30 到 14:35 之间因为机器重启断开了。14:35 重启后,先处理 pending_data 中 14:30 之前没处理完的数据,然后调用 fetch_historical_klines 把 14:30–14:35 的 K 线补回来。处理完这些数据后,你的内存状态就和“程序从未中断过”完全一致了。
六、完整集成:把一切串联起来
把前面的所有组件组装在一起,就是一个完整的生产级行情监控程序:
import asyncio
import json
import logging
import os
import signal
import websockets
import websockets.exceptions
from checkpoint import CheckpointDB
from websocket_client import ResilientWebSocketClient
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger(__name__)
class MarketMonitor:
"""
TickDB 行情监控服务。
特性:
- 信号捕获(SIGTERM/SIGINT)
- 优雅关闭
- 检查点持久化(SQLite)
- 断点续传
- 指数退避重连
- 批量补数
"""
def __init__(self):
self.shutdown_mgr = GracefulShutdown()
self.checkpoint_db = CheckpointDB()
# 接收关闭信号
signal.signal(signal.SIGTERM, self.shutdown_mgr.request_shutdown)
signal.signal(signal.SIGINT, self.shutdown_mgr.request_shutdown)
async def run(self):
"""主运行循环"""
logger.info("[启动] TickDB 行情监控服务")
# 先执行恢复流程(如果有未处理数据)
await self.recovery()
# 再启动 WebSocket 实时连接
client = ResilientWebSocketClient(
api_key=os.environ.get("TICKDB_API_KEY", ""),
channels=["depth.AAPL.US", "quote.AAPL.US"]
)
try:
await asyncio.create_task(client.connect())
except asyncio.CancelledError:
logger.info("[主循环] 任务取消")
finally:
await self.cleanup()
async def recovery(self):
"""启动时恢复未处理数据"""
pending = self.checkpoint_db.fetch_unprocessed(
channel="depth.AAPL.US", limit=200
)
if pending:
logger.info(f"[恢复] 发现 {len(pending)} 条待处理数据")
for record in pending:
await self.process_data(record["payload"])
self.checkpoint_db.mark_processed([record["id"]])
async def process_data(self, data: dict):
"""处理单条行情数据"""
# 业务处理逻辑:更新内存状态、计算指标、触发告警等
pass
async def cleanup(self):
"""程序退出前的清理"""
logger.info("[关闭] 执行最终检查点保存")
self.checkpoint_db.save_checkpoint("service_stopped", {
"timestamp": __import__("datetime").datetime.utcnow().isoformat()
})
logger.info("[关闭] 服务已停止")
if __name__ == "__main__":
monitor = MarketMonitor()
try:
asyncio.run(monitor.run())
except KeyboardInterrupt:
pass
七、架构全景:状态在系统中的流动路径
用一个简单的流程图来总结整个机制的工作方式:
正常运行期间:
┌─────────────────────────────────────────────────┐
│ WebSocket 推送数据 │
│ ↓ │
│ save_pending() → SQLite pending_data (processed=0) │
│ ↓ │
│ 业务处理 → mark_processed() │
│ ↓ │
│ save_checkpoint() → 记录处理位置 │
└─────────────────────────────────────────────────┘
程序崩溃 / 收到 SIGTERM:
┌─────────────────────────────────────────────────┐
│ 信号捕获 → GracefulShutdown │
│ ↓ │
│ 收尾回调执行 │
│ save_pending() 强制刷新未落盘数据 │
│ save_checkpoint() 保存最后状态 │
│ ↓ │
│ 进程退出 │
└─────────────────────────────────────────────────┘
程序重启恢复:
┌─────────────────────────────────────────────────┐
│ 启动 → recovery() │
│ ↓ │
│ fetch_unprocessed() → 补处理 pending_data │
│ ↓ │
│ load_checkpoint() → 获取最后处理位置 │
│ ↓ │
│ REST API 补缺口 → fetch_historical_klines() │
│ ↓ │
│ 状态完整 → 启动 WebSocket 实时连接 │
│ ↓ │
│ 正常接收新数据(从断点处继续) │
└─────────────────────────────────────────────────┘
这个流程的核心洞察是:把数据流经程序的每个关键节点都加上持久化checkpoint。 任何节点之后崩溃,都能从最近的有效检查点恢复。
结语
程序员的午夜惊魂,本质上是对“状态管理”的忽视。WebSocket 给了你实时数据的通道,但它不保证你的程序能完整地处理每一条数据。
优雅关闭、状态检查点、断点续传——这三件事凑在一起,就构成了一套可靠的状态管理框架。它的核心原理并不复杂:永远不要信任内存中的数据是安全的,直到它已经被持久化到磁盘。
把这套机制跑通之后,你就不用再提心吊胆地盯着凌晨的告警了。程序知道自己从哪里中断、该从哪里恢复。它比你自己记得更清楚。
下一步行动
如果你正在写生产级的 WebSocket 客户端:直接拿上面的 CheckpointDB 类和 GracefulShutdown 类用,改一改表结构和字段名就能适配你的业务。
如果你在评估 TickDB 作为数据源:它的 REST API(/kline、/trades)加上 WebSocket 的 depth、quote 频道,配合上面的恢复逻辑,可以构建一个零数据丢失的行情采集系统。访问 tickdb.ai 注册获取免费 API Key。
如果你更关心部署层面的可靠性:上面的代码解决了进程级别的容错,而 K8s 的 preStop 钩子配合这个优雅关闭机制,可以把容错延伸到容器层面——让容器在收到停止信号后,有足够时间完成收尾逻辑。
如果你习惯用 AI 辅助开发:在 AI 助手中搜索安装 tickdb-market-data SKILL,可以直接用自然语言查询 TickDB 的数据能力和接口规范。
本文不构成任何投资建议。市场有风险,投资需谨慎。