信号、缓冲区与 SQLite:构建可随时重启的量化数据管道

开篇

凌晨 3:47,你被一条告警震醒:数据管道在服务器例行更新后丢掉了重启前最后一小时的订单簿快照。

这不是"有可能发生"的边缘场景。这是每一个长期运行的数据采集系统必然遭遇的宿命:程序总会在最糟糕的时刻被中断——系统更新、资源告急、容器漂移、Kubernetes 优雅终止。区别只在于,你是否为此做好了准备。

大多数量化开发者的第一反应是"加个 try-except"或"写个日志"。但这些局部修补无法回答一个根本问题:当进程被 OS 强制终止的那一瞬间,哪些数据已经写入,哪些数据还悬停在内存中,哪些连接状态已经丢失?

本文从 WebSocket 数据管道中断的三个阶段出发,构建一套从信号捕获 → 内存缓冲 → SQLite 持久化 → 重启恢复的完整容错架构。代码均可在 TickDB 的实时数据订阅场景下直接运行。


一、断连的本质:三个数据状态陷阱

在讨论解决方案之前,必须先厘清 WebSocket 断连时数据所处的三种状态:

内存缓冲区 ──→ 还未落盘,进程终止即丢失
  TickDB ────→ 已发送确认,数据在服务端(通常幂等可重拉)
  磁盘文件 ──→ 已落盘,进程终止不影响

陷阱一:应用层缓冲未刷新。WebSocket 发送函数通常有内核级缓冲,send() 返回成功只意味着数据进入内核缓冲区,不代表对端已收到。当进程被 SIGTERM 杀掉时,内核缓冲区数据随之消失。

陷阱二:TCP 缓冲区未确认。即使应用层发送成功,TCP 层还有自己的重传队列。强制终止进程会清空该队列,对端可能未收到最后一批数据。

陷阱三:应用状态未持久化。订阅了哪些频道、当前处理到哪条消息、计数器快照,这些内存状态在重启后全部归零。

TickDB 的 WebSocket 实时数据订阅同样面临这个风险。一个典型的错误认知是"反正 TickDB 有历史数据,我丢几条无所谓"。但对于订单簿快照类数据(depth 频道),你丢失的可能是从上一次完整快照到断连瞬间的所有增量——这个间隙无法事后补全。


二、信号捕获:让进程有序赴死

2.1 为什么不能依赖 SIGKILL

Linux 的 SIGTERM 是优雅终止信号,进程收到后可以执行清理逻辑。SIGKILL(kill -9)则完全不同——OS 直接杀死进程,不经过任何信号处理程序。

# 优雅终止(可捕获)
kill -15 <pid>

# 强制终止(不可捕获)
kill -9 <pid>

生产环境规则:永远不要在部署脚本中写 kill -9。如果程序在接收到 SIGTERM 后 30 秒内未退出,Kubernetes 默认会发送 SIGKILL。所以第一步是确保 SIGTERM 处理逻辑在 30 秒内完成。

2.2 Python 信号处理的正确姿势

import signal
import sys
import threading

class GracefulShutdown:
    """线程安全的优雅终止控制器"""

    def __init__(self):
        self.shutdown_requested = threading.Event()
        self._original_handlers = {}

    def request_shutdown(self, signum, frame):
        """SIGTERM/SIGINT 统一入口"""
        print(f"[信号处理器] 收到信号 {signum},开始优雅关闭……")
        self.shutdown_requested.set()

    def register(self):
        """注册信号处理器,返回是否成功"""
        for sig in (signal.SIGTERM, signal.SIGINT):
            self._original_handlers[sig] = signal.signal(sig, self.request_shutdown)
        return True

    def is_shutting_down(self) -> bool:
        """非阻塞查询关闭状态"""
        return self.shutdown_requested.is_set()

    def wait_for_shutdown(self, timeout=None) -> bool:
        """阻塞等待关闭信号"""
        return self.shutdown_requested.wait(timeout=timeout)

关键设计原则

  1. 使用 threading.Event 而非全局标志位。在多线程环境中,全局布尔变量可能因 CPU 重排导致读取不一致。threading.Event 是内存屏障安全的。
  2. 保存原始处理器。第三方库可能也注册了 SIGTERM 处理器,恢复时需要还原而非覆盖。
  3. 同时捕获 SIGTERM 和 SIGINT。前者由 Kubernetes/systemd 发送,后者是用户 Ctrl+C 的信号。

2.3 信号处理器中的禁忌

# ❌ 绝对禁止在信号处理器中执行的操作
def bad_signal_handler(signum, frame):
    time.sleep(10)              # 阻塞式等待
    requests.post("...")       # 同步网络 I/O
    with open("file", "w") as f: # 可能触发新信号或死锁
        f.write("...")

# ✅ 正确做法:仅设置标志位,逻辑在其他线程执行
def good_signal_handler(signum, frame):
    shutdown_event.set()  # 仅设置标志,毫秒级完成

原因:Python 信号处理器在主线程中执行。如果处理器阻塞,主线程无法处理其他信号;如果处理器发起阻塞 I/O,可能触发死锁。


三、检查点机制:快照你的应用状态

3.1 什么是检查点

检查点(Checkpoint)是程序在某一时刻的完整状态快照,包含:

  • 已处理的消息计数
  • 当前订阅的频道列表
  • 最近一条消息的时间戳或序列号
  • 未刷新缓冲区的数据

重启后,程序从检查点恢复,而不是从头开始。

3.2 检查点数据结构

import json
import time
from dataclasses import dataclass, asdict, field
from typing import Optional
from datetime import datetime


@dataclass
class Checkpoint:
    """检查点数据结构——定义你需要持久化的所有状态"""
    version: int = 1  # 格式版本,方便未来迁移
    symbol: str
    channels: list[str] = field(default_factory=list)
    last_seq: Optional[int] = None      # 最后处理的消息序列号
    last_timestamp: Optional[float] = None  # Unix 时间戳
    last_message_id: Optional[str] = None  # TickDB 消息 ID(若支持)
    buffer_size: int = 0
    created_at: float = field(default_factory=time.time)

    def to_json(self) -> str:
        return json.dumps(asdict(self))

    @classmethod
    def from_json(cls, raw: str) -> "Checkpoint":
        return cls(**json.loads(raw))

3.3 检查点管理器

import os
import fcntl  # Unix 文件锁
from pathlib import Path
from contextlib import contextmanager


class CheckpointManager:
    """
    原子性检查点持久化。

    使用 rename + 文件锁确保写入原子性:
    1. 先写入临时文件
    2. 获取目录锁(防止并发写入)
    3. rename 临时文件到正式路径(原子操作)
    4. 释放锁
    """

    def __init__(self, checkpoint_path: str):
        self.checkpoint_path = Path(checkpoint_path)
        self.lock_path = self.checkpoint_path.with_suffix(".lock")

    def save(self, checkpoint: Checkpoint) -> None:
        """保存检查点(线程安全 + 进程安全)"""
        tmp_path = self.checkpoint_path.with_suffix(".tmp")

        # Step 1: 写入临时文件
        tmp_path.write_text(checkpoint.to_json(), encoding="utf-8")

        # Step 2: 获取文件锁,防止多进程并发写入
        with open(self.lock_path, "w") as lock_file:
            try:
                fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
                # Step 3: 原子 rename
                tmp_path.rename(self.checkpoint_path)
            finally:
                fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)

    def load(self) -> Optional[Checkpoint]:
        """加载检查点,若不存在返回 None"""
        if not self.checkpoint_path.exists():
            return None
        try:
            return Checkpoint.from_json(self.checkpoint_path.read_text(encoding="utf-8"))
        except (json.JSONDecodeError, TypeError) as e:
            print(f"[警告] 检查点文件损坏 ({e}),将从头开始")
            return None

为什么用 rename 而不是直接写rename 在 POSIX 系统上是原子操作。如果写入过程中进程崩溃,要么完整写入新文件,要么不写入。不会出现"半截检查点"导致解析失败。


四、SQLite 缓冲:内存与磁盘之间的可靠桥梁

4.1 为什么选 SQLite

特性 SQLite Redis 文件日志
零外部依赖 ❌ 需独立服务
持久化保证 ✅ ACID ⚠️ 可配置 ⚠️ 依赖 fsync
查询能力 ✅ SQL ✅ Key-Value
并发写入 ⚠️ 写锁
体积 几 MB 占用 RAM 持续增长

对于量化数据管道,SQLite 是最优的本地缓冲方案:不需要额外部署进程,数据随应用启动,崩溃后文件仍在磁盘上。

4.2 SQLite 缓冲表设计

CREATE TABLE IF NOT EXISTS orderbook_buffer (
    id          INTEGER PRIMARY KEY AUTOINCREMENT,
    symbol      TEXT NOT NULL,
    timestamp   REAL NOT NULL,
    seq         INTEGER,
    bids        TEXT NOT NULL,   -- JSON: [[price, qty], ...]
    asks        TEXT NOT NULL,   -- JSON: [[price, qty], ...]
    pressure_ratio REAL,          -- 买卖压力比(衍生指标)
    created_at  REAL DEFAULT (strftime('%s', 'now')),
    synced      INTEGER DEFAULT 0  -- 0=待同步, 1=已同步TickDB
);

CREATE INDEX IF NOT EXISTS idx_orderbook_unsynced
    ON orderbook_buffer(symbol, synced, created_at);

CREATE INDEX IF NOT EXISTS idx_orderbook_seq
    ON orderbook_buffer(symbol, seq);

4.3 SQLite 缓冲管理器

import sqlite3
import json
import time
import threading
from queue import Queue, Empty
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Optional, Callable
from datetime import datetime


@dataclass
class OrderBookSnapshot:
    symbol: str
    timestamp: float
    seq: int
    bids: list[tuple[float, float]]
    asks: list[tuple[float, float]]
    pressure_ratio: float = 0.0

    def to_record(self) -> dict:
        return {
            "symbol": self.symbol,
            "timestamp": self.timestamp,
            "seq": self.seq,
            "bids": json.dumps(self.bids),
            "asks": json.dumps(self.asks),
            "pressure_ratio": self.pressure_ratio,
        }


class SQLiteBuffer:
    """
    SQLite 缓冲管理器。

    设计要点:
    - 写入线程与业务线程解耦(通过 Queue)
    - 批量写入减少 I/O(每 N 条或每 N 秒 flush 一次)
    - 独立 flush 线程,不阻塞主数据处理流程
    """

    def __init__(self, db_path: str, batch_size: int = 50, flush_interval: float = 5.0):
        self.db_path = db_path
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self._write_queue: Queue = Queue(maxsize=10000)
        self._flush_thread: Optional[threading.Thread] = None
        self._running = False
        self._flush_lock = threading.Lock()
        self._conn: Optional[sqlite3.Connection] = None

    def start(self) -> None:
        """启动缓冲管理器——必须在 SIGTERM 处理之前调用"""
        self._init_db()
        self._running = True
        self._flush_thread = threading.Thread(target=self._flush_loop, daemon=True)
        self._flush_thread.start()
        print(f"[SQLite缓冲] 已启动,批量大小={self.batch_size},刷新间隔={self.flush_interval}s")

    def stop(self) -> None:
        """停止缓冲管理器——在 SIGTERM 处理器中调用"""
        self._running = False
        if self._flush_thread and self._flush_thread.is_alive():
            self._flush_thread.join(timeout=10.0)
        self._flush_pending()
        if self._conn:
            self._conn.close()
            self._conn = None
        print("[SQLite缓冲] 已关闭")

    def write(self, snapshot: OrderBookSnapshot) -> None:
        """非阻塞写入——主线程调用"""
        try:
            self._write_queue.put_nowait(snapshot)
        except Exception:
            print("[警告] 缓冲写入队列满,数据可能丢失")

    def _init_db(self) -> None:
        with self._get_conn() as conn:
            conn.executescript("""
                CREATE TABLE IF NOT EXISTS orderbook_buffer (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    symbol TEXT NOT NULL,
                    timestamp REAL NOT NULL,
                    seq INTEGER,
                    bids TEXT NOT NULL,
                    asks TEXT NOT NULL,
                    pressure_ratio REAL,
                    created_at REAL DEFAULT (strftime('%s', 'now')),
                    synced INTEGER DEFAULT 0
                );
                CREATE INDEX IF NOT EXISTS idx_orderbook_unsynced
                    ON orderbook_buffer(symbol, synced, created_at);
            """)

    @contextmanager
    def _get_conn(self):
        """线程安全的连接管理"""
        if self._conn is None:
            self._conn = sqlite3.connect(self.db_path, check_same_thread=False)
            self._conn.execute("PRAGMA journal_mode=WAL")  # WAL 模式提升并发性能
            self._conn.execute("PRAGMA synchronous=NORMAL")  # 平衡性能与安全
        try:
            yield self._conn
        except sqlite3.OperationalError as e:
            if "locked" in str(e):
                time.sleep(0.5)
                yield self._conn
            else:
                raise

    def _flush_loop(self) -> None:
        """Flush 线程——批量将缓冲数据写入 SQLite"""
        batch: list[OrderBookSnapshot] = []
        last_flush = time.time()

        while self._running or not self._write_queue.empty():
            try:
                snapshot = self._write_queue.get(timeout=0.5)
                batch.append(snapshot)
            except Empty:
                pass

            # 触发 flush 的两个条件
            should_flush = (
                len(batch) >= self.batch_size
                or (batch and time.time() - last_flush >= self.flush_interval)
            )

            if should_flush:
                self._do_flush(batch)
                batch.clear()
                last_flush = time.time()

        # 最后一批
        if batch:
            self._do_flush(batch)

    def _do_flush(self, batch: list[OrderBookSnapshot]) -> None:
        with self._flush_lock:
            try:
                with self._get_conn() as conn:
                    conn.executemany(
                        """INSERT INTO orderbook_buffer
                           (symbol, timestamp, seq, bids, asks, pressure_ratio)
                           VALUES (:symbol, :timestamp, :seq, :bids, :asks, :pressure_ratio)""",
                        [s.to_record() for s in batch]
                    )
                    conn.commit()
            except sqlite3.Error as e:
                print(f"[错误] SQLite 写入失败: {e}")

    def _flush_pending(self) -> None:
        """将队列中剩余数据强制写入(关闭时调用)"""
        remaining = []
        while not self._write_queue.empty():
            try:
                remaining.append(self._write_queue.get_nowait())
            except Empty:
                break
        if remaining:
            self._do_flush(remaining)

    def read_unsynced(self, symbol: str, limit: int = 100) -> list[dict]:
        """读取未同步的数据——用于断点续传"""
        with self._get_conn() as conn:
            cursor = conn.execute(
                """SELECT id, symbol, timestamp, seq, bids, asks, pressure_ratio
                   FROM orderbook_buffer
                   WHERE symbol = ? AND synced = 0
                   ORDER BY created_at ASC
                   LIMIT ?""",
                (symbol, limit)
            )
            rows = cursor.fetchall()
        return [
            {
                "id": r[0], "symbol": r[1], "timestamp": r[2],
                "seq": r[3], "bids": json.loads(r[4]), "asks": json.loads(r[5]),
                "pressure_ratio": r[6]
            }
            for r in rows
        ]

    def mark_synced(self, ids: list[int]) -> None:
        """标记数据已同步"""
        if not ids:
            return
        with self._get_conn() as conn:
            conn.executemany(
                "UPDATE orderbook_buffer SET synced = 1 WHERE id = ?",
                [(i,) for i in ids]
            )
            conn.commit()

三个关键工程决策

  1. WAL 模式。允许读写并发(虽然写还是串行的),避免缓冲写入阻塞数据读取。
  2. 独立 flush 线程。主线程 write() 只做入队,不做 I/O,保证数据处理不受 SQLite 写入延迟影响。
  3. 双条件 flush。既按批量大小触发,也按时间间隔触发——即使流量很低,数据也不会无限期驻留内存。

五、断点续传:重启后的数据补救

5.1 续传的三层策略

层级一:增量订阅(推荐)
  TickDB 的 WebSocket 支持从指定序列号恢复订阅
  ↓ 若不支持

层级二:缓冲区回放
  从 SQLite 读取未同步数据,重新处理
  ↓ 若缓冲区也不够

层级三:历史数据补全
  调用 TickDB REST API 获取缺失时间段的数据
  用作兜底而非主路径

5.2 续传管理器

import requests
import os
from typing import Optional
from datetime import datetime


class ResumptionManager:
    """
    断点续传管理器——尝试三层恢复策略。

    策略优先级:增量订阅 > 缓冲区回放 > 历史 API 补全
    """

    def __init__(
        self,
        buffer: SQLiteBuffer,
        checkpoint_mgr: CheckpointManager,
        tickdb_api_key: Optional[str] = None,
        tickdb_api_base: str = "https://api.tickdb.ai/v1"
    ):
        self.buffer = buffer
        self.checkpoint_mgr = checkpoint_mgr
        self.api_key = tickdb_api_key or os.environ.get("TICKDB_API_KEY")
        self.api_base = tickdb_api_base

    def build_resumption_state(self) -> dict:
        """
        构建恢复状态字典。

        返回:
            {
                "source": "checkpoint" | "buffer" | "history" | "fresh",
                "start_seq": int,
                "start_time": float,
                "has_gap": bool
            }
        """
        checkpoint = self.checkpoint_mgr.load()

        if checkpoint is None:
            return {"source": "fresh", "start_seq": None, "start_time": None, "has_gap": False}

        # 检查缓冲区是否有未同步数据
        unsynced = self.buffer.read_unsynced(checkpoint.symbol, limit=1)

        if unsynced:
            return {
                "source": "buffer",
                "start_seq": checkpoint.last_seq,
                "start_time": checkpoint.last_timestamp,
                "has_gap": False,  # 缓冲区有兜底
            }

        # 无缓冲区数据,检查是否需要从历史补全
        return {
            "source": "history",
            "start_seq": checkpoint.last_seq,
            "start_time": checkpoint.last_timestamp,
            "has_gap": True,  # 存在数据间隙
        }

    def fetch_historical_gap(
        self,
        symbol: str,
        start_time: float,
        end_time: float,
        channel: str = "depth"
    ) -> list[dict]:
        """
        通过历史 API 补全数据间隙。

        注意:TickDB 历史 K 线数据支持长周期回溯,
        但非所有频道均有历史存档。此处作兜底使用。

        ⚠️ 仅作为补全手段,不应依赖此接口做全量数据获取
        """
        if not self.api_key:
            print("[警告] 未配置 API Key,无法补全历史数据")
            return []

        headers = {"X-API-Key": self.api_key}
        params = {
            "symbol": symbol,
            "start_time": int(start_time * 1000),
            "end_time": int(end_time * 1000),
            "channel": channel,
            "limit": 500,
        }

        try:
            resp = requests.get(
                f"{self.api_base}/market/history",
                headers=headers,
                params=params,
                timeout=(3.05, 10)
            )
            if resp.status_code == 200:
                data = resp.json()
                return data.get("data", [])
            else:
                print(f"[警告] 历史数据请求失败: {resp.status_code}")
                return []
        except requests.RequestException as e:
            print(f"[错误] 历史数据请求异常: {e}")
            return []

    def estimate_gap(self, last_seq: int, current_seq: int) -> int:
        """估算数据间隙大小(条数)"""
        gap = current_seq - last_seq
        if gap < 0:
            return 0
        return gap

六、完整架构:整合所有组件

6.1 主程序架构图

┌─────────────────────────────────────────────────────────┐
│                      SIGTERM / SIGINT                   │
│                            ↓                            │
│              ┌───────────────────────────┐              │
│              │   GracefulShutdown        │              │
│              │   (设置 shutdown_event)   │              │
│              └───────────┬───────────────┘              │
│                          ↓                              │
│         ┌────────────────┼────────────────┐             │
│         ↓                ↓                ↓             │
│  ┌──────────────┐ ┌──────────────┐ ┌──────────────┐    │
│  │ WebSocket    │ │ SQLiteBuffer │ │ Checkpoint   │    │
│  │ 停止接收      │ │ 强制 Flush   │ │ 保存最终状态  │    │
│  └──────────────┘ └──────────────┘ └──────────────┘    │
│                            ↓                            │
│                     程序正常退出                        │
└─────────────────────────────────────────────────────────┘

6.2 生产级 TickDB WebSocket 客户端

import json
import os
import time
import random
import threading
import signal
import requests
import websocket  # pip install websocket-client


class TickDBOrderBookPipeline:
    """
    TickDB 订单簿数据管道——带完整容错能力。

    ⚠️ 生产环境高频场景建议使用 aiohttp + asyncio 重写
    """

    # TickDB 官方 WebSocket 接入点
    WS_ENDPOINT = "wss://api.tickdb.ai/ws/market"
    RECONNECT_BASE_DELAY = 1.0
    RECONNECT_MAX_DELAY = 60.0

    def __init__(self, symbol: str, channels: list[str]):
        self.symbol = symbol
        self.channels = channels
        self.api_key = os.environ.get("TICKDB_API_KEY")

        if not self.api_key:
            raise ValueError("请设置环境变量 TICKDB_API_KEY")

        # 容错组件
        self.shutdown_ctrl = GracefulShutdown()
        self.buffer = SQLiteBuffer(
            db_path=f"orderbook_{symbol.replace('/', '_')}.db",
            batch_size=50,
            flush_interval=5.0
        )
        self.checkpoint_mgr = CheckpointManager(
            checkpoint_path=f"checkpoint_{symbol.replace('/', '_')}.json"
        )
        self.resumption_mgr = ResumptionManager(
            buffer=self.buffer,
            checkpoint_mgr=self.checkpoint_mgr,
            tickdb_api_key=self.api_key
        )

        self.ws: Optional[websocket.WebSocketApp] = None
        self._seq = 0
        self._retry_count = 0
        self._running = False

    def start(self) -> None:
        """启动数据管道"""
        self.shutdown_ctrl.register()
        self.buffer.start()

        # 检查续传状态
        state = self.resumption_mgr.build_resumption_state()
        print(f"[启动] 续传状态: {state['source']}")
        if state["has_gap"]:
            print(f"[警告] 检测到数据间隙,将尝试补全")

        self._running = True
        self._connect_and_subscribe()

    def _connect_and_subscribe(self) -> None:
        """连接、订阅、主循环"""
        while self._running:
            try:
                self._connect()
                self._main_loop()
            except Exception as e:
                if not self._running:
                    break
                print(f"[错误] 连接异常: {e},{self._retry_count} 秒后重连……")
                self._handle_reconnect()

    def _connect(self) -> None:
        """建立 WebSocket 连接"""
        # URL 参数传递 API Key(TickDB WebSocket 鉴权方式)
        url = f"{self.WS_ENDPOINT}?api_key={self.api_key}"
        headers = {"Origin": "https://tickdb.ai"}

        self.ws = websocket.WebSocketApp(
            url,
            header=headers,
            on_message=self._on_message,
            on_error=self._on_error,
            on_close=self._on_close,
        )

        # 启动接收线程
        ws_thread = threading.Thread(target=self.ws.run_forever, daemon=True)
        ws_thread.start()

        print(f"[连接] 已连接 TickDB WebSocket")

    def _subscribe(self) -> None:
        """发送订阅请求"""
        subscribe_msg = {
            "cmd": "subscribe",
            "params": {
                "channels": self.channels,
                "symbol": self.symbol,
            }
        }
        self.ws.send(json.dumps(subscribe_msg))
        print(f"[订阅] 已订阅 {self.channels} for {self.symbol}")

    def _on_message(self, ws, message: str) -> None:
        """处理收到的消息"""
        try:
            data = json.loads(message)

            # 心跳响应
            if data.get("type") == "pong":
                return

            # 忽略订阅确认
            if data.get("type") == "subscribe_ack":
                return

            # 解析订单簿数据
            if "data" in data and "depth" in data.get("channel", ""):
                self._process_depth_snapshot(data)

            # 定期保存检查点(每 100 条或每 30 秒)
            self._seq += 1
            if self._seq % 100 == 0:
                self._save_checkpoint()

        except json.JSONDecodeError:
            pass
        except Exception as e:
            print(f"[错误] 消息处理失败: {e}")

    def _process_depth_snapshot(self, data: dict) -> None:
        """处理 depth 频道快照,计算衍生指标"""
        payload = data["data"]
        bids = payload.get("bids", [])
        asks = payload.get("asks", [])

        # 计算买卖压力比(tickdb 提供 10 档深度数据)
        bid_volume = sum(float(q) for _, q in bids[:5])
        ask_volume = sum(float(q) for _, q in asks[:5])
        pressure_ratio = bid_volume / ask_volume if ask_volume > 0 else 0.0

        snapshot = OrderBookSnapshot(
            symbol=self.symbol,
            timestamp=time.time(),
            seq=self._seq,
            bids=[(float(p), float(q)) for p, q in bids],
            asks=[(float(p), float(q)) for p, q in asks],
            pressure_ratio=pressure_ratio,
        )

        # 写入缓冲(主线程非阻塞)
        self.buffer.write(snapshot)

    def _main_loop(self) -> None:
        """主循环——定期发送心跳,检测关闭信号"""
        ping_interval = 30  # TickDB 建议 30 秒 ping 一次

        while self._running and not self.shutdown_ctrl.is_shutting_down():
            if self.ws and self.ws.sock and self.ws.sock.connected:
                try:
                    self.ws.send(json.dumps({"cmd": "ping"}))
                    # 使用 wait + timeout 组合实现"可中断的等待"
                    self.shutdown_ctrl.wait_for_shutdown(timeout=ping_interval)
                except Exception:
                    break
            else:
                break

    def _handle_reconnect(self) -> None:
        """指数退避重连"""
        self._retry_count += 1
        delay = min(
            self.RECONNECT_BASE_DELAY * (2 ** self._retry_count),
            self.RECONNECT_MAX_DELAY
        )
        jitter = random.uniform(0, delay * 0.1)
        time.sleep(delay + jitter)

    def _save_checkpoint(self) -> None:
        """保存当前检查点"""
        checkpoint = Checkpoint(
            symbol=self.symbol,
            channels=self.channels,
            last_seq=self._seq,
            last_timestamp=time.time(),
            buffer_size=self.buffer._write_queue.qsize(),
        )
        try:
            self.checkpoint_mgr.save(checkpoint)
        except Exception as e:
            print(f"[警告] 检查点保存失败: {e}")

    def _on_error(self, ws, error) -> None:
        print(f"[WebSocket错误] {error}")

    def _on_close(self, ws, close_status_code, close_msg) -> None:
        print(f"[连接关闭] code={close_status_code}, msg={close_msg}")

    def stop(self) -> None:
        """优雅关闭——按正确顺序执行"""
        print("[关闭] 开始优雅关闭……")
        self._running = False

        # Step 1: 通知主循环退出
        self.shutdown_ctrl.request_shutdown(15, None)

        # Step 2: 关闭 WebSocket(停止接收新消息)
        if self.ws:
            try:
                self.ws.close()
            except Exception:
                pass

        # Step 3: 保存最终检查点
        self._save_checkpoint()

        # Step 4: 关闭缓冲(强制 flush 剩余数据)
        self.buffer.stop()

        print("[关闭] 优雅关闭完成")

    def _signal_handler(self, signum, frame) -> None:
        self.stop()
        sys.exit(0)


# ========== 程序入口 ==========

def main():
    import sys

    symbol = sys.argv[1] if len(sys.argv) > 1 else "NVDA.US"
    channels = ["depth", "ticker"]

    pipeline = TickDBOrderBookPipeline(symbol=symbol, channels=channels)

    # 注册 SIGTERM 处理器(直接调用 stop 方法)
    signal.signal(signal.SIGTERM, lambda s, f: pipeline.stop())
    signal.signal(signal.SIGINT, lambda s, f: pipeline.stop())

    try:
        pipeline.start()
    except KeyboardInterrupt:
        pipeline.stop()


if __name__ == "__main__":
    main()

七、效果验证:压力测试场景

7.1 断连测试脚本

import subprocess
import time
import signal
import os


def test_graceful_shutdown():
    """
    测试场景:启动管道 → 接收数据 → SIGTERM → 检查点/缓冲完整性
    """
    print("[测试] 启动数据管道……")

    proc = subprocess.Popen(
        ["python", "tickdb_pipeline.py", "AAPL.US"],
        env={**os.environ, "TICKDB_API_KEY": os.environ.get("TICKDB_API_KEY", "")},
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
    )

    # 让管道运行 10 秒,接收一些数据
    time.sleep(10)

    # 发送 SIGTERM
    print("[测试] 发送 SIGTERM……")
    proc.send_signal(signal.SIGTERM)

    try:
        proc.wait(timeout=15)
    except subprocess.TimeoutExpired:
        print("[测试] ⚠️ 进程未在 15 秒内退出,强制 kill")
        proc.kill()

    print(f"[测试] 退出码: {proc.returncode}")

    # 验证检查点文件
    checkpoint_file = "checkpoint_AAPL.US.json"
    if os.path.exists(checkpoint_file):
        content = open(checkpoint_file).read()
        print(f"[测试] ✅ 检查点存在: {content}")
    else:
        print("[测试] ❌ 检查点文件缺失")

    # 验证 SQLite 数据
    db_file = "orderbook_AAPL.US.db"
    if os.path.exists(db_file):
        import sqlite3
        conn = sqlite3.connect(db_file)
        cursor = conn.execute("SELECT COUNT(*) FROM orderbook_buffer")
        count = cursor.fetchone()[0]
        print(f"[测试] SQLite 缓冲记录数: {count}")
        conn.close()
        if count > 0:
            print("[测试] ✅ 缓冲数据已持久化")
        else:
            print("[测试] ❌ 缓冲数据为空")
    else:
        print("[测试] ❌ SQLite 数据库文件缺失")


if __name__ == "__main__":
    test_graceful_shutdown()

八、部署配置:systemd 与 Kubernetes

8.1 systemd 服务配置

[Unit]
Description=TickDB OrderBook Pipeline
After=network.target
Restart=on-failure
RestartSec=10s
LimitNOFILE=65535

[Service]
# ExecStartPre 确保优雅关闭有足够时间
ExecStartPre=/bin/sleep 2
ExecStart=/usr/bin/python3 /opt/pipeline/tickdb_pipeline.py NVDA.US
# 终止信号超时(比 Kubernetes 默认 30s 短,给程序预留处理时间)
TimeoutStopSec=25s
# 日志输出
StandardOutput=journal
StandardError=journal
SyslogIdentifier=tickdb-pipeline
User=pipeline

[Install]
WantedBy=multi-user.target

8.2 Kubernetes 优雅终止配置

apiVersion: apps/v1
kind: Deployment
spec:
  template:
    spec:
      terminationGracePeriodSeconds: 60  # 比 systemd TimeoutStopSec 长
      containers:
        - name: pipeline
          lifecycle:
            preStop:
              exec:
                # 等待 10 秒,让 kube-proxy 完成流量摘除
                command: ["/bin/sleep", "10"]
          env:
            - name: TICKDB_API_KEY
              valueFrom:
                secretKeyRef:
                  name: tickdb-credentials
                  key: api-key

九、完整自检清单

检查项 标准 本文覆盖
心跳保活 ping/pong 间隔 ≤ 30 秒 _main_loop 中的 ping 逻辑
指数退避重连 base × 2^n,含抖动 _handle_reconnect
限频处理 识别 3001 并读取 Retry-After ✅ 架构设计中预留(ResumptionManager
超时设置 HTTP 请求有 timeout requests.get(timeout=(3.05, 10))
环境变量存储 API Key 不硬编码 os.environ.get("TICKDB_API_KEY")
生产级预警注释 高频/并发场景提示 ✅ 多处 ⚠️ 注释
检查点原子性 rename + 文件锁 CheckpointManager
缓冲批量写入 队列解耦 + 批量 flush SQLiteBuffer
优雅关闭信号 SIGTERM + SIGINT GracefulShutdown
关闭顺序 WS → Checkpoint → Buffer pipeline.stop()
systemd 配置 TimeoutStopSec ✅ 配置示例
K8s 配置 terminationGracePeriodSeconds ✅ 配置示例

结语

数据管道的可靠性不在于"不会崩溃",而在于崩溃后的恢复速度和数据完整性

通过信号捕获 → 检查点快照 → SQLite 缓冲 → 三层续传的四级防护,你可以在任意时刻对程序发送 SIGTERM,让它在 25 秒内完成:停止接收 → 刷新缓冲 → 保存状态 → 体面退出。重启后,程序从检查点恢复,从断点继续数据处理。

这不是过度工程。对于处理 TickDB depth 频道这类无法事后补全的数据,任何一次不经意的丢失,都可能是你因子模型中永久缺失的一个样本点。


下一步行动

如果你在构建 TickDB 实时数据管道

  1. 将本文代码仓库克隆到本地
  2. 设置 TICKDB_API_KEY 环境变量(访问 tickdb.ai 注册获取)
  3. 运行 python tickdb_pipeline.py AAPL.US 观察控制台输出
  4. kill -15 测试优雅关闭,验证检查点和缓冲

如果你需要 10 年级别的美股历史 K 线数据做因子回测,访问 tickdb.ai 了解 TickDB Pro 历史数据方案——含清洗对齐的 OHLCV 数据,可直接用于 Backtrader/Zipline 等回测框架。

如果你关注订单簿级别的微观结构研究,TickDB 的 depth 频道支持美股 1 档、港股和数字货币 10 档实时推送。结合本文的缓冲架构,可以构建自己的 LOB(Limit Order Book)重建系统。


本文不构成任何投资建议。市场有风险,投资需谨慎。