程序员的午夜惊魂:那条没存进去的数据去哪了

凌晨 3 点,你的行情监控服务突然被云厂商重启了。WebSocket 断开的瞬间,最后一笔深度数据——买卖盘口里那个关键的 45,000 股卖压堆积——还没来得及落盘就没了。

重启后的程序空空如也。它不知道刚才发生了什么,订单簿初始化为零,从头开始重建。10 分钟后,它才重新捕捉到类似的价格信号,但彼时市场环境早已不同——那次机会窗口,就这样永远错过了。

这不是虚构的事故。这是每一个写过生产级 WebSocket 客户端的工程师,或早或晚都会撞上的墙。

问题出在哪里?不是 WebSocket 不稳定,而是程序根本没有为“被中断”这件事做任何准备。连接是临时的,数据在内存里,进程挂了就是丢了——就像在一张没有保存功能的 Excel 里做表格,停电就全没了。

这篇文章要解决的就是这个问题:让程序在任意时刻被终止、重启、迁移,都能从中断处无缝接续,不丢失任何关键状态。

我们把这个问题拆成三块:

  1. 信号处理:怎么让程序收到终止信号后有序收尾,而不是突然暴毙?
  2. 检查点机制:把关键状态定期写到磁盘,重启后能恢复?
  3. 断点续传:订阅数据从哪条开始补,才能既不漏也不重复?

配合生产级代码,从 WebSocket 心跳重连到 SQLite 本地缓冲,完整闭环。


一、为什么你的程序总是在最关键的时刻崩

在谈方案之前,先弄清楚问题的本质。WebSocket 连接天然是不可靠的,这不算什么秘密。但工程师们容易低估的是不可靠的范围——它不只是网络层面的断开,还包括操作系统层面的信号。

当你执行 docker stopkubectl rollout restart,或云厂商触发机器重启时,进程收到的不是 SIGKILL(立即杀死),而是 SIGTERM(优雅终止)。默认超时 10 秒,超时后才会强制 SIGKILL

这意味着你有 10 秒窗口来做清理工作。但大多数程序根本没接管这个信号,收到 SIGTERM 后直接退出了——白瞎了这 10 秒。

SIGTERM (15) → 程序收到 → 收尾逻辑(你有 ~10 秒)→ 退出
SIGKILL (9)  → 程序收到 → 立即终止 → 数据丢失(无法捕获)

所以第一步,是让程序正确响应 SIGTERM,在宝贵的窗口期内把状态刷到磁盘。


二、信号处理:让程序优雅谢幕

2.1 基础信号捕获

Python 的 signal 模块可以拦截系统信号。以下是一个最简信号处理框架:

import signal
import sys
import time
from typing import Callable

class GracefulShutdown:
    """优雅关闭管理器"""
    
    def __init__(self):
        self.shutdown_requested = False
        self._handlers = []
    
    def request_shutdown(self, signum, frame):
        """SIGTERM/SIGINT 信号处理器"""
        print(f"\n[收到信号 {signum}] 优雅关闭中...", flush=True)
        self.shutdown_requested = True
        # 执行注册的所有收尾回调
        for handler in self._handlers:
            try:
                handler()
            except Exception as e:
                print(f"[收尾失败] {e}", flush=True)
    
    def register(self, callback: Callable[[], None]):
        """注册收尾回调,按注册顺序执行"""
        self._handlers.append(callback)
        return callback  # 支持装饰器用法
    
    def is_shutting_down(self) -> bool:
        return self.shutdown_requested


# 全局实例
shutdown_mgr = GracefulShutdown()

# 注册信号处理器(主进程)
signal.signal(signal.SIGTERM, shutdown_mgr.request_shutdown)
signal.signal(signal.SIGINT, shutdown_mgr.request_shutdown)

这个 GracefulShutdown 类做了三件事:

  • 捕获 SIGTERMSIGINTCtrl+C
  • 标记关闭状态,外部代码可以轮询这个标志来决定是否继续处理
  • 注册回调函数,在收到关闭信号后统一执行

2.2 让 WebSocket 客户端感知关闭信号

现在把这个信号处理器和 WebSocket 客户端集成。下面的代码展示了一个完整的生产级 WebSocket 连接管理器:

import asyncio
import json
import logging
import random
import signal
import time
import os
from datetime import datetime
from typing import Optional

import websockets
import websockets.exceptions

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger(__name__)


class ResilientWebSocketClient:
    """
    具有优雅关闭和断点续传能力的 WebSocket 客户端。
    """
    
    BASE_URL = "wss://api.tickdb.ai/ws"
    
    def __init__(self, api_key: str, channels: list[str]):
        self.api_key = api_key
        self.channels = channels
        self.shutdown_mgr = GracefulShutdown()
        self.ws: Optional[websockets.WebSocketClientProtocol] = None
        self.reconnect_delay = 1.0
        self.max_reconnect_delay = 60.0
        self.retry_count = 0
        
        # 注册 WebSocket 相关的收尾回调
        self.shutdown_mgr.register(self._flush_buffer)
        self.shutdown_mgr.register(self._save_checkpoint)
    
    async def connect(self):
        """
        建立 WebSocket 连接,支持断点续传。
        """
        uri = f"{self.BASE_URL}?api_key={self.api_key}"
        
        # 从检查点恢复订阅位置
        checkpoint = self._load_checkpoint()
        if checkpoint:
            logger.info(f"[断点续传] 从 checkpoint 恢复: {checkpoint}")
        
        while not self.shutdown_mgr.is_shutting_down():
            try:
                self.ws = await websockets.connect(
                    uri,
                    ping_interval=20,
                    ping_timeout=10,
                    close_timeout=5,
                )
                logger.info("[连接已建立]")
                self.retry_count = 0
                self.reconnect_delay = 1.0
                
                # 订阅频道
                subscribe_msg = {
                    "cmd": "subscribe",
                    "args": self.channels
                }
                await self.ws.send(json.dumps(subscribe_msg))
                logger.info(f"[订阅] {self.channels}")
                
                # 启动心跳保活协程
                ping_task = asyncio.create_task(self._keepalive())
                
                # 消息处理循环
                await self._message_loop(ping_task, checkpoint)
                
            except websockets.exceptions.ConnectionClosed as e:
                logger.warning(f"[连接关闭] code={e.code} reason={e.reason}")
            except OSError as e:
                logger.error(f"[网络错误] {e}")
            except Exception as e:
                logger.exception(f"[异常] {e}")
            
            if self.shutdown_mgr.is_shutting_down():
                break
            
            # 指数退避 + 抖动重连
            jitter = random.uniform(0, self.reconnect_delay * 0.1)
            wait_time = self.reconnect_delay + jitter
            logger.info(f"[重连] {wait_time:.1f}秒后尝试 (尝试 {self.retry_count + 1})")
            
            await asyncio.sleep(wait_time)
            self.retry_count += 1
            self.reconnect_delay = min(
                self.reconnect_delay * 2, 
                self.max_reconnect_delay
            )
    
    async def _keepalive(self):
        """心跳保活"""
        while not self.shutdown_mgr.is_shutting_down():
            try:
                await asyncio.sleep(18)
                if self.ws and self.ws.open:
                    await self.ws.send(json.dumps({"cmd": "ping"}))
                    logger.debug("[心跳] ping 发送")
            except Exception:
                break
    
    async def _message_loop(self, ping_task, checkpoint):
        """消息处理主循环"""
        async for message in self.ws:
            if self.shutdown_mgr.is_shutting_down():
                logger.info("[关闭] 消息循环退出")
                ping_task.cancel()
                await self.ws.close()
                return
            
            try:
                data = json.loads(message)
                await self._process_message(data, checkpoint)
            except json.JSONDecodeError:
                logger.warning(f"[解析错误] {message}")
            except Exception:
                logger.exception("[处理异常]")
    
    async def _process_message(self, data, checkpoint):
        """处理接收到的消息"""
        # 消息处理逻辑
        # 此处调用数据处理方法
        pass
    
    def _flush_buffer(self):
        """收尾回调:刷新缓冲区到磁盘"""
        logger.info("[收尾] 正在刷新缓冲区...")
        # 强制将内存中的数据写入检查点
        pass
    
    def _save_checkpoint(self):
        """收尾回调:保存当前状态快照"""
        logger.info("[收尾] 保存检查点...")
        # 将订阅位置、时间戳等状态持久化
        pass
    
    def _load_checkpoint(self) -> Optional[dict]:
        """从磁盘恢复检查点"""
        checkpoint_path = os.path.join(
            os.path.dirname(os.path.abspath(__file__)),
            "checkpoint.json"
        )
        try:
            with open(checkpoint_path, "r") as f:
                return json.load(f)
        except (FileNotFoundError, json.JSONDecodeError):
            return None

这段代码的结构值得细说几点:

为什么心跳和消息循环是分离的? 如果心跳放在主循环里,一旦某条消息处理卡住(IO 阻塞),心跳就无法发送,服务端会在 20 秒后主动断开连接。分离为独立的 asyncio.create_task 可以确保心跳不受消息处理速度影响。

指数退避的意义是什么? 断开后立即重连是新手最容易犯的错误。如果你的程序有 bug 导致连接反复断开又重连,瞬时间可能发出数千个连接请求,把服务器打爆,甚至触发限频(TickDB 的 3001 错误码)。指数退避 + 抖动能有效平滑重连节奏。


三、检查点机制:让状态可以“存档读档”

3.1 什么是检查点

检查点(Checkpoint)的核心思想很简单:把程序运行到某个时刻的关键状态序列化到磁盘,需要时再反序列化回来。

类比游戏:你在 BOSS 战之前手动存档,重启电脑后读档,从 BOSS 战开始打,而不是从第一关重新开始。

对于 WebSocket 行情程序,关键状态通常包括:

状态字段 说明
订阅频道列表 程序订阅了哪些市场、哪些数据频道
最后处理时间戳 防止重复处理已收到的数据
当前持仓快照 如果程序在管理仓位,需要持久化
缓冲区未刷新的数据 还在内存里等待批量写入磁盘的数据
序号位置(sequence ID) 如果数据有全局递增序号,这就是断点

3.2 SQLite 检查点:生产级状态持久化方案

对于需要频繁写入的状态,SQLite 是最合适的选择。它是嵌入式数据库,不需要单独的进程,文件即数据库,且完全支持事务。

import sqlite3
import json
import threading
from datetime import datetime
from contextlib import contextmanager
from pathlib import Path


class CheckpointDB:
    """
    基于 SQLite 的检查点持久化。
    线程安全,支持并发读写。
    """
    
    def __init__(self, db_path: str = "tickdb_checkpoint.db"):
        self.db_path = db_path
        self._lock = threading.Lock()
        self._init_db()
    
    def _init_db(self):
        """初始化数据库表结构"""
        with self._get_conn() as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS checkpoint (
                    key TEXT PRIMARY KEY,
                    value TEXT NOT NULL,
                    updated_at TEXT NOT NULL
                )
            """)
            conn.execute("""
                CREATE TABLE IF NOT EXISTS pending_data (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    channel TEXT NOT NULL,
                    timestamp TEXT NOT NULL,
                    sequence INTEGER,
                    payload TEXT NOT NULL,
                    processed INTEGER DEFAULT 0,
                    created_at TEXT NOT NULL
                )
            """)
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_pending_unprocessed
                ON pending_data(processed, created_at)
            """)
            conn.commit()
    
    @contextmanager
    def _get_conn(self):
        """线程安全的数据库连接"""
        with self._lock:
            conn = sqlite3.connect(self.db_path, check_same_thread=False)
            conn.row_factory = sqlite3.Row
            try:
                yield conn
            finally:
                conn.close()
    
    def save_checkpoint(self, key: str, value: dict):
        """保存单个检查点"""
        with self._get_conn() as conn:
            conn.execute("""
                INSERT OR REPLACE INTO checkpoint (key, value, updated_at)
                VALUES (?, ?, ?)
            """, (key, json.dumps(value), datetime.utcnow().isoformat()))
            conn.commit()
    
    def load_checkpoint(self, key: str) -> Optional[dict]:
        """加载检查点,不存在返回 None"""
        with self._get_conn() as conn:
            row = conn.execute(
                "SELECT value FROM checkpoint WHERE key = ?", (key,)
            ).fetchone()
            if row:
                return json.loads(row["value"])
            return None
    
    def save_pending(self, channel: str, timestamp: str, 
                     sequence: Optional[int], payload: dict):
        """
        将尚未处理完毕的数据写入待处理队列。
        WebSocket 断连或程序重启时,未落盘的数据在这里暂存。
        """
        with self._get_conn() as conn:
            conn.execute("""
                INSERT INTO pending_data 
                (channel, timestamp, sequence, payload, created_at)
                VALUES (?, ?, ?, ?, ?)
            """, (
                channel, timestamp, sequence,
                json.dumps(payload), datetime.utcnow().isoformat()
            ))
            conn.commit()
    
    def fetch_unprocessed(self, channel: str, limit: int = 100) -> list[dict]:
        """取出未处理的数据用于断点续传补数"""
        with self._get_conn() as conn:
            rows = conn.execute("""
                SELECT id, timestamp, sequence, payload
                FROM pending_data
                WHERE channel = ? AND processed = 0
                ORDER BY id ASC
                LIMIT ?
            """, (channel, limit)).fetchall()
            return [
                {
                    "id": row["id"],
                    "timestamp": row["timestamp"],
                    "sequence": row["sequence"],
                    "payload": json.loads(row["payload"])
                }
                for row in rows
            ]
    
    def mark_processed(self, record_ids: list[int]):
        """标记数据已处理"""
        if not record_ids:
            return
        placeholders = ",".join("?" * len(record_ids))
        with self._get_conn() as conn:
            conn.execute(f"""
                UPDATE pending_data SET processed = 1
                WHERE id IN ({placeholders})
            """, record_ids)
            conn.commit()
    
    def cleanup_processed(self, days: int = 7):
        """清理 7 天前的已处理数据,防止数据库膨胀"""
        with self._get_conn() as conn:
            conn.execute("""
                DELETE FROM pending_data
                WHERE processed = 1
                AND created_at < datetime('now', ?)
            """, (f"-{days} days",))
            conn.commit()

这个检查点模块解决了几件实际的事:

为什么用 threading.Lock 而不是直接锁行? SQLite 本身不支持高并发写入,单个写入事务是原子的,但多个线程同时开连接容易触发 database is locked 错误。用一个全局锁串行化所有数据库操作,是最简单可靠的做法。

pending_data 表的设计逻辑是什么? 这张表相当于一个缓冲队列。程序在收到 WebSocket 数据后,立即写入这张表(标记 processed=0),然后再做业务处理。如果程序在处理过程中崩溃,下次启动时从这张表里读出 processed=0 的记录继续处理——数据没有丢

cleanup 策略怎么定? 保留 7 天的已处理数据是经验值。太短则无法做历史回放分析,太长则数据库体积膨胀。对于高频数据场景,可以按记录数而非天数做清理(如超过 10 万条则清理最旧的 20%)。


四、断点续传:从哪里开始继续

检查点解决了“状态保存”的问题,但还有一个关键问题没解决:重启后,从哪里开始补数据?

TickDB 的 WebSocket 连接在断连重连后,默认从头开始推送最新数据。如果你在处理一条数据后、写入 checkpoint 前崩溃了,那么这条数据已经丢失了——因为你没有持久化它,服务器也没有再推一遍。

这就是为什么需要在 _process_message 中引入一个“先写后处理”的顺序:

async def _process_message(self, data, checkpoint):
    """
    处理消息的标准流程:
    1. 先写入 pending_data(持久化)
    2. 更新内存状态
    3. 标记处理完成
    4. 更新检查点
    """
    channel = data.get("channel", "unknown")
    timestamp = data.get("ts", datetime.utcnow().isoformat())
    sequence = data.get("seq")  # 某些频道支持全局序号
    
    # ⭐ 步骤 1:先把数据写入 SQLite
    # 即使后续处理失败,数据也已安全存储
    self.checkpoint_db.save_pending(
        channel=channel,
        timestamp=timestamp,
        sequence=sequence,
        payload=data,
    )
    
    # ⭐ 步骤 2:处理业务逻辑
    await self._handle_data(data)
    
    # ⭐ 步骤 3:更新检查点(记录处理到哪了)
    self.checkpoint_db.save_checkpoint("last_processed", {
        "channel": channel,
        "timestamp": timestamp,
        "sequence": sequence,
        "processed_at": datetime.utcnow().isoformat()
    })
    
    # ⭐ 步骤 4:标记 pending 为已处理(可选,提高查询效率)
    # 在实际实现中,定期批量标记比逐条标记性能更好

这个“先写后处理”的顺序看起来违反直觉——直觉告诉你应该先处理再存盘。但数据安全的黄金法则是:任何在内存中产生的关键数据,在确认处理完成之前,必须先落盘。

顺序一颠倒,崩溃就丢数据。顺序对了,崩溃只是在重复处理已经落盘的数据——这可以通过检查点去重来避免。

4.1 重启后的恢复流程

async def recovery_loop(self):
    """
    启动时运行的恢复流程:
    1. 从 pending_data 恢复未处理数据
    2. 从 REST API 补全订阅启动后的数据缺口
    3. 清理已处理数据
    """
    logger.info("[恢复] 开始断点续传...")
    
    # 恢复步骤 1:处理 SQLite 中未落盘的数据
    pending = self.checkpoint_db.fetch_unprocessed(limit=500)
    if pending:
        logger.info(f"[恢复] 发现 {len(pending)} 条未处理数据,开始补处理...")
        for record in pending:
            try:
                await self._handle_data(record["payload"])
                self.checkpoint_db.mark_processed([record["id"]])
            except Exception:
                logger.error(f"[恢复] 处理失败,跳过: {record['id']}")
                continue
    
    # 恢复步骤 2:检查时间戳缺口,通过 REST API 补全
    last = self.checkpoint_db.load_checkpoint("last_processed")
    if last and last.get("timestamp"):
        logger.info(f"[恢复] 检查数据缺口,last={last['timestamp']}")
        # 通过 TickDB REST API 补齐 WebSocket 断连期间的数据
        gap_data = await self._fetch_gap_data(
            last["timestamp"], 
            datetime.utcnow().isoformat()
        )
        for item in gap_data:
            await self._handle_data(item)
    
    # 恢复步骤 3:清理过期数据
    self.checkpoint_db.cleanup_processed(days=7)
    logger.info("[恢复] 完成")

为什么要分两步恢复? 因为 WebSocket 的 pending_data 只保存了你程序运行期间收到的数据。如果 WebSocket 断开期间(例如凌晨机器重启后)服务器推送了新的数据,而这些数据没进你的程序——那这部分缺口就只有 REST API 能补。


五、TickDB 的数据恢复能力:如何用 API 补全缺口

说到 REST API 补数,这里展开一下。TickDB 提供了一套完整的 REST 接口,核心的两个是:

接口 用途 什么时候用
GET /v1/market/kline 获取历史 K 线数据 补全历史缺口
GET /v1/market/trades 获取逐笔成交数据 补全交易事件
import os
import time
import requests
from datetime import datetime, timedelta

API_BASE = "https://api.tickdb.ai/v1"


def fetch_historical_klines(symbol: str, interval: str = "1m",
                             start_time: str = None, end_time: str = None,
                             limit: int = 1000) -> list[dict]:
    """
    通过 REST API 获取历史 K 线数据,用于断点续传后的缺口补全。
    """
    headers = {"X-API-Key": os.environ.get("TICKDB_API_KEY")}
    
    params = {
        "symbol": symbol,
        "interval": interval,
        "limit": limit,
    }
    if start_time:
        params["start"] = start_time
    if end_time:
        params["end"] = end_time
    
    response = requests.get(
        f"{API_BASE}/market/kline",
        headers=headers,
        params=params,
        timeout=(3.05, 10),
    )
    
    if response.status_code == 429:
        retry_after = int(response.headers.get("Retry-After", 5))
        time.sleep(retry_after)
        return fetch_historical_klines(symbol, interval, start_time, 
                                       end_time, limit)
    
    response.raise_for_status()
    data = response.json()
    
    if data.get("code") == 0:
        return data.get("data", [])
    elif data.get("code") == 3001:
        retry_after = int(response.headers.get("Retry-After", 5))
        time.sleep(retry_after)
        return fetch_historical_klines(symbol, interval, start_time,
                                       end_time, limit)
    else:
        raise RuntimeError(f"API 错误: {data.get('message')}")


def find_gaps(symbol: str, existing_timestamps: list[str],
              interval: str = "1m") -> list[tuple[str, str]]:
    """
    找出历史数据中的时间缺口。
    输入:已有数据的时间戳列表
    返回:(gap_start, gap_end) 元组列表
    """
    if not existing_timestamps:
        return []
    
    sorted_ts = sorted(existing_timestamps)
    gaps = []
    
    for i in range(len(sorted_ts) - 1):
        current = datetime.fromisoformat(sorted_ts[i].replace("Z", "+00:00"))
        next_ts = datetime.fromisoformat(sorted_ts[i + 1].replace("Z", "+00:00"))
        
        # 如果时间间隔大于 K 线周期的 1.5 倍,认为有缺口
        if interval == "1m":
            threshold = timedelta(minutes=2)
        elif interval == "5m":
            threshold = timedelta(minutes=8)
        elif interval == "1h":
            threshold = timedelta(hours=2)
        else:
            threshold = timedelta(days=1)
        
        if next_ts - current > threshold:
            gaps.append((
                sorted_ts[i],
                sorted_ts[i + 1]
            ))
    
    return gaps

一个实际场景:假设你的程序在 14:30 到 14:35 之间因为机器重启断开了。14:35 重启后,先处理 pending_data 中 14:30 之前没处理完的数据,然后调用 fetch_historical_klines 把 14:30–14:35 的 K 线补回来。处理完这些数据后,你的内存状态就和“程序从未中断过”完全一致了。


六、完整集成:把一切串联起来

把前面的所有组件组装在一起,就是一个完整的生产级行情监控程序:

import asyncio
import json
import logging
import os
import signal

import websockets
import websockets.exceptions

from checkpoint import CheckpointDB
from websocket_client import ResilientWebSocketClient

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger(__name__)


class MarketMonitor:
    """
    TickDB 行情监控服务。
    特性:
    - 信号捕获(SIGTERM/SIGINT)
    - 优雅关闭
    - 检查点持久化(SQLite)
    - 断点续传
    - 指数退避重连
    - 批量补数
    """
    
    def __init__(self):
        self.shutdown_mgr = GracefulShutdown()
        self.checkpoint_db = CheckpointDB()
        
        # 接收关闭信号
        signal.signal(signal.SIGTERM, self.shutdown_mgr.request_shutdown)
        signal.signal(signal.SIGINT, self.shutdown_mgr.request_shutdown)
    
    async def run(self):
        """主运行循环"""
        logger.info("[启动] TickDB 行情监控服务")
        
        # 先执行恢复流程(如果有未处理数据)
        await self.recovery()
        
        # 再启动 WebSocket 实时连接
        client = ResilientWebSocketClient(
            api_key=os.environ.get("TICKDB_API_KEY", ""),
            channels=["depth.AAPL.US", "quote.AAPL.US"]
        )
        
        try:
            await asyncio.create_task(client.connect())
        except asyncio.CancelledError:
            logger.info("[主循环] 任务取消")
        finally:
            await self.cleanup()
    
    async def recovery(self):
        """启动时恢复未处理数据"""
        pending = self.checkpoint_db.fetch_unprocessed(
            channel="depth.AAPL.US", limit=200
        )
        if pending:
            logger.info(f"[恢复] 发现 {len(pending)} 条待处理数据")
            for record in pending:
                await self.process_data(record["payload"])
                self.checkpoint_db.mark_processed([record["id"]])
    
    async def process_data(self, data: dict):
        """处理单条行情数据"""
        # 业务处理逻辑:更新内存状态、计算指标、触发告警等
        pass
    
    async def cleanup(self):
        """程序退出前的清理"""
        logger.info("[关闭] 执行最终检查点保存")
        self.checkpoint_db.save_checkpoint("service_stopped", {
            "timestamp": __import__("datetime").datetime.utcnow().isoformat()
        })
        logger.info("[关闭] 服务已停止")


if __name__ == "__main__":
    monitor = MarketMonitor()
    try:
        asyncio.run(monitor.run())
    except KeyboardInterrupt:
        pass

七、架构全景:状态在系统中的流动路径

用一个简单的流程图来总结整个机制的工作方式:

正常运行期间:
┌─────────────────────────────────────────────────┐
│  WebSocket 推送数据                              │
│         ↓                                        │
│  save_pending() → SQLite pending_data (processed=0) │
│         ↓                                        │
│  业务处理 → mark_processed()                     │
│         ↓                                        │
│  save_checkpoint() → 记录处理位置                 │
└─────────────────────────────────────────────────┘

程序崩溃 / 收到 SIGTERM:
┌─────────────────────────────────────────────────┐
│  信号捕获 → GracefulShutdown                      │
│         ↓                                        │
│  收尾回调执行                                    │
│  save_pending() 强制刷新未落盘数据               │
│  save_checkpoint() 保存最后状态                  │
│         ↓                                        │
│  进程退出                                        │
└─────────────────────────────────────────────────┘

程序重启恢复:
┌─────────────────────────────────────────────────┐
│  启动 → recovery()                               │
│         ↓                                        │
│  fetch_unprocessed() → 补处理 pending_data      │
│         ↓                                        │
│  load_checkpoint() → 获取最后处理位置            │
│         ↓                                        │
│  REST API 补缺口 → fetch_historical_klines()    │
│         ↓                                        │
│  状态完整 → 启动 WebSocket 实时连接              │
│         ↓                                        │
│  正常接收新数据(从断点处继续)                   │
└─────────────────────────────────────────────────┘

这个流程的核心洞察是:把数据流经程序的每个关键节点都加上持久化checkpoint。 任何节点之后崩溃,都能从最近的有效检查点恢复。


结语

程序员的午夜惊魂,本质上是对“状态管理”的忽视。WebSocket 给了你实时数据的通道,但它不保证你的程序能完整地处理每一条数据。

优雅关闭、状态检查点、断点续传——这三件事凑在一起,就构成了一套可靠的状态管理框架。它的核心原理并不复杂:永远不要信任内存中的数据是安全的,直到它已经被持久化到磁盘。

把这套机制跑通之后,你就不用再提心吊胆地盯着凌晨的告警了。程序知道自己从哪里中断、该从哪里恢复。它比你自己记得更清楚。


下一步行动

如果你正在写生产级的 WebSocket 客户端:直接拿上面的 CheckpointDB 类和 GracefulShutdown 类用,改一改表结构和字段名就能适配你的业务。

如果你在评估 TickDB 作为数据源:它的 REST API(/kline/trades)加上 WebSocket 的 depthquote 频道,配合上面的恢复逻辑,可以构建一个零数据丢失的行情采集系统。访问 tickdb.ai 注册获取免费 API Key。

如果你更关心部署层面的可靠性:上面的代码解决了进程级别的容错,而 K8s 的 preStop 钩子配合这个优雅关闭机制,可以把容错延伸到容器层面——让容器在收到停止信号后,有足够时间完成收尾逻辑。

如果你习惯用 AI 辅助开发:在 AI 助手中搜索安装 tickdb-market-data SKILL,可以直接用自然语言查询 TickDB 的数据能力和接口规范。


本文不构成任何投资建议。市场有风险,投资需谨慎。