企业级行情扩展:从标准 API 到自定义 SKILL 的实战指南

想象一个场景:你的量化团队需要实时监控 200 个标的的异常波动,当检测到流动性枯竭时自动触发对冲信号,同时将这些数据同步到内部的数据仓库。当团队规模从 3 人扩展到 30 人时,每个人都在重复造轮子——有人封装了 WebSocket,有人写了 K 线缓存,有人做了告警推送。

这是企业级行情系统的典型困境:标准 API 能解决 80% 的基础需求,但剩下的 20% 恰恰是企业竞争力的所在。TickDB SKILL 协议正是为解决这个断层而生——它不是让你换一套 API,而是让你在 TickDB 的基础上,用统一的方式扩展自己的业务逻辑。

本文将从 SKILL 的设计哲学出发,完整展示一个企业级行情监控 SKILL 的开发流程,涵盖开发规范、Function 扩展机制、以及私有部署的最佳实践。代码可直接运行,架构可直接复制。


一、为什么标准 API 不够用

在进入 SKILL 之前,先明确一个问题:TickDB 的 REST/WebSocket API 已经足够强大,为什么还需要 SKILL?

让我们看一个实际场景的对比:

维度 标准 REST/WebSocket API SKILL 扩展
数据获取 ✅ TickDB 全量数据 ✅ 在获取后叠加自定义逻辑
异常检测 ❌ 需要自己写 ✅ 内置常见模式,注册即用
告警推送 ❌ 需要对接飞书/企微/邮件 ✅ 标准化接入,一次配置多处复用
多标的协同 ❌ 需要自己管理状态 ✅ 跨标的状态机原生支持
团队共享 ❌ 每人复制一份代码 ✅ 一次发布,团队同步
企业合规 ❌ 数据出境/审计问题 ✅ 私有部署,数据不出域

标准 API 是工具,SKILL 是能力。工具解决“怎么做”,能力解决“谁能用、怎么管、出了事谁负责”。


二、SKILL 协议的核心设计

2.1 协议架构概览

TickDB SKILL 基于 JSON-RPC 2.0 协议扩展,其核心设计理念是**“声明即能力”**——开发者声明 SKILL 的输入、输出、依赖关系,系统自动处理部署、版本管理、跨 SKILL 协作。

一个 SKILL 由以下核心组件构成:

# skill.yaml - SKILL 声明文件
metadata:
  name: liquidity-monitor          # SKILL 唯一标识
  version: "1.0.0"                # 语义化版本
  description: 企业级流动性监控与异常告警
  
  author:                          # 开发者信息
    name: YourTeam
    contact: [email protected]
    
  capabilities:                    # SKILL 声明的能力
    - data.input: tickdb.depth     # 声明依赖 TickDB depth 数据
    - data.output: alert.push      # 声明输出为告警推送
    - trigger.interval: 1s         # 声明 1 秒级触发周期
    
  config_schema:                   # 配置项定义
    threshold_pressure_ratio:
      type: float
      default: 2.5
      description: "买卖压力比阈值,超过此值触发告警"
    alert_channels:
      type: array
      default: ["feishu"]
      description: "告警渠道列表"

2.2 Function 扩展机制

Function 是 SKILL 的核心扩展单元。一个 SKILL 可以定义多个 Function,每个 Function 完成一个独立的业务逻辑。

# functions/liquidity_detector.py
"""
企业级流动性检测 Function
"""

def register_function(registry):
    """Function 注册函数"""
    
    @registry.function(
        name="detect_liquidity_vacuum",
        description="检测流动性真空",
        parameters={
            "type": "object",
            "properties": {
                "symbol": {"type": "string", "description": "交易标的"},
                "depth_window": {"type": "integer", "default": 5, "description": "深度窗口档数"},
                "time_window": {"type": "integer", "default": 60, "description": "时间窗口(秒)"}
            }
        }
    )
    def detect_liquidity_vacuum(params: dict, context: dict) -> dict:
        """
        检测流动性真空的核心逻辑
        
        Args:
            params: Function 输入参数
            context: 执行上下文(包含 TickDB 连接、配置、历史状态)
            
        Returns:
            dict: {
                "signal": "normal"|"warning"|"critical",
                "pressure_ratio": float,
                "spread_bps": float,
                "recommendation": str
            }
        """
        symbol = params["symbol"]
        depth_window = params.get("depth_window", 5)
        time_window = params.get("time_window", 60)
        
        # 从 context 获取 TickDB 连接
        tickdb = context["tickdb"]
        
        # 获取订单簿深度数据
        depth_data = tickdb.get_depth(symbol, limit=depth_window)
        
        # 计算买卖压力比
        bid_volume = sum(d["bid_size"] for d in depth_data["bids"])
        ask_volume = sum(d["ask_size"] for d in depth_data["asks"])
        pressure_ratio = ask_volume / bid_volume if bid_volume > 0 else float('inf')
        
        # 计算买卖价差(BPS)
        best_bid = depth_data["bids"][0]["price"]
        best_ask = depth_data["asks"][0]["price"]
        spread_bps = (best_ask - best_bid) / best_bid * 10000
        
        # 信号判定
        if pressure_ratio > 3.0 or spread_bps > 50:
            signal = "critical"
            recommendation = "建议减仓或启用对冲"
        elif pressure_ratio > 2.5 or spread_bps > 30:
            signal = "warning"
            recommendation = "关注流动性变化,准备应急方案"
        else:
            signal = "normal"
            recommendation = "流动性正常"
        
        return {
            "signal": signal,
            "pressure_ratio": round(pressure_ratio, 4),
            "spread_bps": round(spread_bps, 2),
            "recommendation": recommendation
        }
    
    return detect_liquidity_vacuum

2.3 生命周期与状态管理

企业级场景中,跨标的的状态追踪至关重要。SKILL 提供了原生的状态管理机制:

# functions/state_manager.py
"""
跨标的的状态管理器
"""

class LiquidityStateManager:
    """流动性监控状态管理器"""
    
    def __init__(self, persistence_path: str = "./state"):
        self.persistence_path = persistence_path
        self._states = {}  # symbol -> state dict
        self._history = {}  # symbol -> list of state changes
        
    def update_state(self, symbol: str, signal_data: dict) -> dict:
        """更新标的的状态"""
        import time
        
        prev_state = self._states.get(symbol, {})
        prev_signal = prev_state.get("signal", "unknown")
        curr_signal = signal_data["signal"]
        
        # 状态变化检测
        state_changed = prev_signal != curr_signal
        
        # 记录历史
        if symbol not in self._history:
            self._history[symbol] = []
        
        self._history[symbol].append({
            "timestamp": time.time(),
            "prev_signal": prev_signal,
            "curr_signal": curr_signal,
            "pressure_ratio": signal_data["pressure_ratio"]
        })
        
        # 保持最近 1000 条历史
        if len(self._history[symbol]) > 1000:
            self._history[symbol] = self._history[symbol][-1000:]
        
        # 更新当前状态
        self._states[symbol] = {
            "signal": curr_signal,
            "pressure_ratio": signal_data["pressure_ratio"],
            "spread_bps": signal_data["spread_bps"],
            "last_update": time.time(),
            "state_changed": state_changed
        }
        
        return self._states[symbol]
    
    def get_alert_trigger(self, symbol: str, config: dict) -> dict:
        """判断是否需要触发告警"""
        state = self._states.get(symbol, {})
        
        # 新状态变为 warning 或 critical
        if state.get("state_changed") and state.get("signal") != "normal":
            return {
                "should_alert": True,
                "alert_level": state["signal"],
                "message": f"[{symbol}] 流动性告警: {state['signal']}, "
                          f"压力比={state['pressure_ratio']}, "
                          f"价差={state['spread_bps']}bps"
            }
        
        # 持续 critical 状态,每 5 分钟告警一次
        if state.get("signal") == "critical":
            last_update = state.get("last_update", 0)
            if time.time() - last_update > 300:
                return {
                    "should_alert": True,
                    "alert_level": "critical_repeat",
                    "message": f"[{symbol}] 流动性持续紧张,请关注"
                }
        
        return {"should_alert": False}

三、生产级 SKILL 开发实战

3.1 项目结构

一个完整的企业级 SKILL 应遵循以下目录结构:

liquidity-monitor-skill/
├── skill.yaml                 # SKILL 声明文件
├── README.md                  # 使用文档
├── requirements.txt           # Python 依赖
│
├── functions/                 # Function 扩展目录
│   ├── __init__.py
│   ├── liquidity_detector.py  # 流动性检测
│   ├── state_manager.py       # 状态管理
│   └── alert_dispatcher.py     # 告警分发
│
├── handlers/                   # 事件处理器
│   ├── __init__.py
│   ├── on_depth_update.py     # 深度数据更新处理
│   └── on_signal_change.py    # 信号变化处理
│
├── utils/                      # 工具函数
│   ├── __init__.py
│   ├── tickdb_client.py       # TickDB 封装
│   └── alert_client.py        # 告警客户端
│
├── tests/                      # 测试目录
│   ├── test_liquidity_detector.py
│   └── test_state_manager.py
│
└── deploy/                     # 部署配置
    ├── docker-compose.yml
    └── Dockerfile

3.2 TickDB 客户端封装

# utils/tickdb_client.py
"""
TickDB 生产级客户端封装
包含:心跳、WebSocket 重连、限频处理、超时设置
"""

import os
import time
import json
import asyncio
import random
import logging
from typing import Optional, Callable, Dict, Any
from websocket import create_connection, WebSocketException
import requests

logger = logging.getLogger(__name__)


class TickDBProductionClient:
    """TickDB 生产级客户端"""
    
    def __init__(
        self,
        api_key: str = None,
        base_url: str = "https://api.tickdb.ai",
        ws_url: str = "wss://ws.tickdb.ai",
        max_retries: int = 5,
        base_delay: float = 1.0,
        max_delay: float = 60.0
    ):
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("API Key 未设置,请设置环境变量 TICKDB_API_KEY")
        
        self.base_url = base_url
        self.ws_url = ws_url
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        
        self._ws = None
        self._connected = False
        self._ping_interval = 30  # Ping 间隔(秒)
        
        # 速率限制状态
        self._rate_limit_remaining = None
        self._rate_limit_reset = None
    
    def _request_with_retry(
        self,
        method: str,
        endpoint: str,
        params: dict = None,
        json_data: dict = None,
        timeout: tuple = (3.05, 10)
    ) -> dict:
        """
        带重试机制的 HTTP 请求
        
        Args:
            method: HTTP 方法
            endpoint: API 端点
            params: URL 参数
            json_data: JSON 请求体
            timeout: (连接超时, 读取超时)
        """
        url = f"{self.base_url}{endpoint}"
        headers = {
            "X-API-Key": self.api_key,
            "Content-Type": "application/json"
        }
        
        for retry in range(self.max_retries):
            try:
                response = requests.request(
                    method=method,
                    url=url,
                    headers=headers,
                    params=params,
                    json=json_data,
                    timeout=timeout
                )
                
                # 速率限制处理
                if response.status_code == 429:
                    retry_after = int(response.headers.get("Retry-After", 5))
                    logger.warning(f"触发速率限制,等待 {retry_after} 秒")
                    time.sleep(retry_after)
                    continue
                
                # 业务错误码处理 (code: 3001)
                try:
                    data = response.json()
                    if data.get("code") == 3001:
                        retry_after = int(response.headers.get("Retry-After", 5))
                        logger.warning(f"业务限频(code:3001),等待 {retry_after} 秒")
                        time.sleep(retry_after)
                        continue
                except:
                    pass
                
                response.raise_for_status()
                return response.json()
                
            except requests.exceptions.Timeout:
                logger.warning(f"请求超时,重试 ({retry + 1}/{self.max_retries})")
            except requests.exceptions.RequestException as e:
                logger.warning(f"请求异常: {e},重试 ({retry + 1}/{self.max_retries})")
            
            # 指数退避 + 抖动
            if retry < self.max_retries - 1:
                delay = min(self.base_delay * (2 ** retry), self.max_delay)
                jitter = random.uniform(0, delay * 0.1)
                time.sleep(delay + jitter)
        
        raise RuntimeError(f"请求失败,已重试 {self.max_retries} 次")
    
    def get_depth(self, symbol: str, limit: int = 10) -> dict:
        """获取订单簿深度"""
        return self._request_with_retry(
            "GET",
            "/v1/market/depth",
            params={"symbol": symbol, "limit": limit}
        )
    
    def get_kline(
        self,
        symbol: str,
        interval: str = "1m",
        limit: int = 100,
        start_time: int = None,
        end_time: int = None
    ) -> dict:
        """获取 K 线数据"""
        params = {"symbol": symbol, "interval": interval, "limit": limit}
        if start_time:
            params["start_time"] = start_time
        if end_time:
            params["end_time"] = end_time
        
        return self._request_with_retry("GET", "/v1/market/kline", params=params)
    
    # ⚠️ 生产环境高频场景建议使用 aiohttp/asyncio
    def subscribe_depth(
        self,
        symbols: list,
        callback: Callable[[dict], None],
        on_error: Callable[[Exception], None] = None
    ):
        """
        WebSocket 订阅深度数据
        
        Args:
            symbols: 标的列表
            callback: 数据回调
            on_error: 错误回调
        """
        self._subscribe_internal(symbols, "depth", callback, on_error)
    
    def _subscribe_internal(
        self,
        symbols: list,
        channel: str,
        callback: Callable,
        on_error: Callable = None
    ):
        """WebSocket 订阅内部实现"""
        retry_count = 0
        
        while True:
            try:
                # 建立 WebSocket 连接(URL 参数传递 API Key)
                ws = create_connection(
                    f"{self.ws_url}?api_key={self.api_key}",
                    ping_timeout=20
                )
                self._ws = ws
                self._connected = True
                retry_count = 0  # 重置重试计数
                
                logger.info(f"WebSocket 已连接,订阅 {channel} 频道: {symbols}")
                
                # 发送订阅消息
                subscribe_msg = {
                    "cmd": "subscribe",
                    "channel": channel,
                    "symbols": symbols
                }
                ws.send(json.dumps(subscribe_msg))
                
                # 心跳 + 消息接收循环
                while True:
                    # 发送心跳
                    ws.send(json.dumps({"cmd": "ping"}))
                    
                    # 接收消息(带超时)
                    try:
                        message = ws.recv()
                        data = json.loads(message)
                        
                        if data.get("type") == "pong":
                            continue
                        
                        if data.get("channel") == channel:
                            callback(data)
                            
                    except WebSocketException as e:
                        logger.error(f"WebSocket 断开: {e}")
                        raise
                
            except (WebSocketException, ConnectionError) as e:
                self._connected = False
                retry_count += 1
                
                if on_error:
                    on_error(e)
                
                if retry_count >= self.max_retries:
                    logger.error(f"重试次数已达上限 ({self.max_retries})")
                    raise
                
                # 指数退避 + 抖动
                delay = min(self.base_delay * (2 ** retry_count), self.max_delay)
                jitter = random.uniform(0, delay * 0.1)
                logger.warning(f"WebSocket 重连中,{delay + jitter:.1f} 秒后重试 ({retry_count}/{self.max_retries})")
                time.sleep(delay + jitter)
    
    def close(self):
        """关闭连接"""
        if self._ws:
            self._ws.close()
            self._connected = False


# 便捷函数:创建全局单例
_client_instance: Optional[TickDBProductionClient] = None


def get_client() -> TickDBProductionClient:
    """获取 TickDB 客户端单例"""
    global _client_instance
    if _client_instance is None:
        _client_instance = TickDBProductionClient()
    return _client_instance

3.3 告警分发器

# functions/alert_dispatcher.py
"""
告警分发器
支持:飞书、企业微信、钉钉、邮件
"""

import os
import time
import logging
from typing import List, Optional
from enum import Enum

logger = logging.getLogger(__name__)


class AlertChannel(Enum):
    FEISHU = "feishu"
    WECHAT_WORK = "wechat_work"
    DINGTALK = "dingtalk"
    EMAIL = "email"
    WEBHOOK = "webhook"


class AlertDispatcher:
    """告警分发器"""
    
    def __init__(self):
        self._channels: dict = {}
        self._init_channels()
    
    def _init_channels(self):
        """初始化各渠道客户端"""
        # 飞书 Webhook
        feishu_webhook = os.environ.get("FEISHU_WEBHOOK_URL")
        if feishu_webhook:
            self._channels[AlertChannel.FEISHU] = {
                "webhook": feishu_webhook,
                "enabled": True
            }
        
        # 企业微信 Webhook
        wechat_webhook = os.environ.get("WECHAT_WORK_WEBHOOK")
        if wechat_webhook:
            self._channels[AlertChannel.WECHAT_WORK] = {
                "webhook": wechat_webhook,
                "enabled": True
            }
        
        # 钉钉 Webhook
        dingtalk_webhook = os.environ.get("DINGTALK_WEBHOOK_URL")
        if dingtalk_webhook:
            self._channels[AlertChannel.DINGTALK] = {
                "webhook": dingtalk_webhook,
                "enabled": True
            }
        
        # 邮件配置
        smtp_host = os.environ.get("SMTP_HOST")
        if smtp_host:
            self._channels[AlertChannel.EMAIL] = {
                "host": smtp_host,
                "port": int(os.environ.get("SMTP_PORT", 587)),
                "username": os.environ.get("SMTP_USERNAME"),
                "password": os.environ.get("SMTP_PASSWORD"),
                "from_addr": os.environ.get("ALERT_FROM_ADDR"),
                "to_addrs": os.environ.get("ALERT_TO_ADDRS", "").split(","),
                "enabled": True
            }
        
        logger.info(f"已初始化 {len(self._channels)} 个告警渠道")
    
    def dispatch(
        self,
        message: str,
        level: str = "warning",
        channels: List[str] = None,
        title: str = None
    ) -> dict:
        """
        分发告警
        
        Args:
            message: 告警内容
            level: 告警级别 (info/warning/critical)
            channels: 指定渠道,不指定则发送到所有已配置渠道
            title: 告警标题
        """
        if channels is None:
            channels = [c.value for c in self._channels.keys() if self._channels[c].get("enabled")]
        
        results = {}
        
        for channel_name in channels:
            try:
                channel = AlertChannel(channel_name)
                if channel not in self._channels or not self._channels[channel].get("enabled"):
                    results[channel_name] = {"status": "skipped", "reason": "channel not configured"}
                    continue
                
                result = self._send_to_channel(channel, message, level, title)
                results[channel_name] = result
                
            except ValueError:
                results[channel_name] = {"status": "error", "reason": f"unknown channel: {channel_name}"}
            except Exception as e:
                logger.error(f"发送告警到 {channel_name} 失败: {e}")
                results[channel_name] = {"status": "error", "reason": str(e)}
        
        return results
    
    def _send_to_channel(
        self,
        channel: AlertChannel,
        message: str,
        level: str,
        title: str = None
    ) -> dict:
        """发送到指定渠道"""
        import requests
        
        if channel == AlertChannel.FEISHU:
            return self._send_feishu(message, level, title)
        elif channel == AlertChannel.WECHAT_WORK:
            return self._send_wechat_work(message, level, title)
        elif channel == AlertChannel.DINGTALK:
            return self._send_dingtalk(message, level, title)
        elif channel == AlertChannel.EMAIL:
            return self._send_email(message, level, title)
        
        return {"status": "unknown"}
    
    def _send_feishu(self, message: str, level: str, title: str = None) -> dict:
        """发送飞书消息"""
        import requests
        
        webhook = self._channels[AlertChannel.FEISHU]["webhook"]
        
        # 根据告警级别选择颜色
        color_map = {
            "info": "blue",
            "warning": "yellow",
            "critical": "red"
        }
        
        payload = {
            "msg_type": "interactive",
            "card": {
                "header": {
                    "title": {
                        "tag": "plain_text",
                        "content": title or f"【{level.upper()}】TickDB 告警"
                    },
                    "template": color_map.get(level, "gray")
                },
                "elements": [
                    {
                        "tag": "div",
                        "text": {
                            "tag": "lark_md",
                            "content": message
                        }
                    },
                    {
                        "tag": "note",
                        "elements": [
                            {
                                "tag": "plain_text",
                                "content": f"触发时间: {time.strftime('%Y-%m-%d %H:%M:%S')}"
                            }
                        ]
                    }
                ]
            }
        }
        
        response = requests.post(webhook, json=payload, timeout=10)
        return {"status": "success" if response.ok else "error", "response": response.text}
    
    def _send_wechat_work(self, message: str, level: str, title: str = None) -> dict:
        """发送企业微信消息"""
        import requests
        
        webhook = self._channels[AlertChannel.WECHAT_WORK]["webhook"]
        
        payload = {
            "msgtype": "markdown",
            "markdown": {
                "content": f"### {title or f'【{level.upper()}】TickDB 告警'}\n{message}\n> 触发时间: {time.strftime('%Y-%m-%d %H:%M:%S')}"
            }
        }
        
        response = requests.post(webhook, json=payload, timeout=10)
        return {"status": "success" if response.ok else "error"}
    
    def _send_dingtalk(self, message: str, level: str, title: str = None) -> dict:
        """发送钉钉消息"""
        import requests
        
        webhook = self._channels[AlertChannel.DINGTALK]["webhook"]
        
        payload = {
            "msgtype": "markdown",
            "markdown": {
                "title": title or f"{level.upper()} 告警",
                "text": f"### {title or f'【{level.upper()}】TickDB 告警'}\n{message}\n> 触发时间: {time.strftime('%Y-%m-%d %H:%M:%S')}"
            }
        }
        
        response = requests.post(webhook, json=payload, timeout=10)
        return {"status": "success" if response.ok else "error"}
    
    def _send_email(self, message: str, level: str, title: str = None) -> dict:
        """发送邮件"""
        import smtplib
        from email.mime.text import MIMEText
        from email.header import Header
        
        config = self._channels[AlertChannel.EMAIL]
        
        msg = MIMEText(message, 'html', 'utf-8')
        msg['Subject'] = Header(title or f"[{level.upper()}] TickDB 行情告警", 'utf-8')
        msg['From'] = config["from_addr"]
        msg['To'] = ",".join(config["to_addrs"])
        
        try:
            with smtplib.SMTP(config["host"], config["port"]) as server:
                server.starttls()
                server.login(config["username"], config["password"])
                server.send_message(msg)
            return {"status": "success"}
        except Exception as e:
            return {"status": "error", "reason": str(e)}

3.4 SKILL 主入口

# main.py
"""
Liquidity Monitor SKILL - 主入口
"""

import os
import time
import logging
import signal
import sys
from typing import List

from utils.tickdb_client import TickDBProductionClient
from utils.alert_client import AlertDispatcher
from functions.liquidity_detector import detect_liquidity_vacuum
from functions.state_manager import LiquidityStateManager

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


class LiquidityMonitorSkill:
    """流动性监控 SKILL 主类"""
    
    def __init__(self, symbols: List[str] = None):
        """
        初始化 SKILL
        
        Args:
            symbols: 监控的标的列表,默认从环境变量 TICKDB_SYMBOLS 读取
        """
        # 初始化配置
        self.symbols = symbols or os.environ.get(
            "TICKDB_SYMBOLS", "BTC.USDT,ETH.USDT,AAPL.US"
        ).split(",")
        
        self.threshold_pressure_ratio = float(
            os.environ.get("THRESHOLD_PRESSURE_RATIO", "2.5")
        )
        self.threshold_spread_bps = float(
            os.environ.get("THRESHOLD_SPREAD_BPS", "30")
        )
        self.alert_channels = os.environ.get(
            "ALERT_CHANNELS", "feishu"
        ).split(",")
        
        # 初始化组件
        self.tickdb = TickDBProductionClient()
        self.alert_dispatcher = AlertDispatcher()
        self.state_manager = LiquidityStateManager()
        
        # 运行状态
        self._running = False
        
        logger.info(f"SKILL 初始化完成,监控标的: {self.symbols}")
        logger.info(f"告警阈值 - 压力比: {self.threshold_pressure_ratio}, 价差: {self.threshold_spread_bps}bps")
        logger.info(f"告警渠道: {self.alert_channels}")
    
    def start(self):
        """启动 SKILL"""
        self._running = True
        
        # 注册信号处理
        signal.signal(signal.SIGINT, self._signal_handler)
        signal.signal(signal.SIGTERM, self._signal_handler)
        
        logger.info("SKILL 启动,开始监控...")
        
        try:
            self.tickdb.subscribe_depth(
                symbols=self.symbols,
                callback=self._on_depth_update,
                on_error=self._on_ws_error
            )
        except KeyboardInterrupt:
            logger.info("收到键盘中断信号")
        except Exception as e:
            logger.error(f"SKILL 运行异常: {e}")
            raise
        finally:
            self.stop()
    
    def stop(self):
        """停止 SKILL"""
        self._running = False
        self.tickdb.close()
        logger.info("SKILL 已停止")
    
    def _signal_handler(self, signum, frame):
        """处理系统信号"""
        logger.info(f"收到信号 {signum},准备停止...")
        self._running = False
        self.stop()
        sys.exit(0)
    
    def _on_depth_update(self, data: dict):
        """
        深度数据更新回调
        
        Args:
            data: TickDB WebSocket 推送的数据
        """
        try:
            symbol = data.get("symbol")
            depth = data.get("depth", {})
            bids = depth.get("bids", [])
            asks = depth.get("asks", [])
            
            if not bids or not asks:
                return
            
            # 计算买卖压力比
            bid_volume = sum(b.get("size", 0) for b in bids)
            ask_volume = sum(a.get("size", 0) for a in asks)
            pressure_ratio = ask_volume / bid_volume if bid_volume > 0 else float('inf')
            
            # 计算买卖价差(BPS)
            best_bid = bids[0].get("price", 0)
            best_ask = asks[0].get("price", 0)
            spread_bps = (best_ask - best_bid) / best_bid * 10000 if best_bid > 0 else 0
            
            signal_data = {
                "signal": "normal",
                "pressure_ratio": pressure_ratio,
                "spread_bps": spread_bps,
                "recommendation": ""
            }
            
            # 信号判定
            if pressure_ratio > 3.0 or spread_bps > 50:
                signal_data["signal"] = "critical"
                signal_data["recommendation"] = "建议减仓或启用对冲"
            elif pressure_ratio > self.threshold_pressure_ratio or spread_bps > self.threshold_spread_bps:
                signal_data["signal"] = "warning"
                signal_data["recommendation"] = "关注流动性变化,准备应急方案"
            
            # 更新状态
            self.state_manager.update_state(symbol, signal_data)
            
            # 检查是否需要告警
            alert_info = self.state_manager.get_alert_trigger(symbol, {
                "threshold_pressure_ratio": self.threshold_pressure_ratio,
                "threshold_spread_bps": self.threshold_spread_bps
            })
            
            if alert_info.get("should_alert"):
                self._send_alert(alert_info)
            
            # 调试日志
            if signal_data["signal"] != "normal":
                logger.warning(
                    f"[{symbol}] 信号={signal_data['signal']}, "
                    f"压力比={pressure_ratio:.2f}, 价差={spread_bps:.1f}bps"
                )
                
        except Exception as e:
            logger.error(f"处理深度数据异常: {e}")
    
    def _send_alert(self, alert_info: dict):
        """发送告警"""
        level_map = {
            "warning": "warning",
            "critical": "critical",
            "critical_repeat": "critical"
        }
        
        results = self.alert_dispatcher.dispatch(
            message=alert_info["message"],
            level=level_map.get(alert_info["alert_level"], "warning"),
            channels=self.alert_channels,
            title="流动性异常告警"
        )
        
        for channel, result in results.items():
            if result.get("status") == "success":
                logger.info(f"告警已发送至 {channel}")
            else:
                logger.error(f"告警发送至 {channel} 失败: {result.get('reason')}")
    
    def _on_ws_error(self, error: Exception):
        """WebSocket 错误处理"""
        logger.error(f"WebSocket 连接错误: {error}")


# SKILL 入口点
def main():
    """SKILL 主入口"""
    skill = LiquidityMonitorSkill()
    skill.start()


if __name__ == "__main__":
    main()

四、私有部署方案

4.1 部署架构

企业级 SKILL 的私有部署需要考虑数据合规、访问控制和高可用。以下是三种典型部署模式:

部署模式 适用场景 数据位置 复杂度
单机 Docker 小团队验证、个人使用 数据出域
Docker Compose 团队内部、高可用 可选数据本地缓存 ⭐⭐
K8s 集群 大规模部署、多租户 完全私有化 ⭐⭐⭐

4.2 Docker Compose 部署配置

# deploy/docker-compose.yml
version: '3.8'

services:
  liquidity-monitor:
    build:
      context: ..
      dockerfile: deploy/Dockerfile
    container_name: liquidity-monitor
    restart: unless-stopped
    
    environment:
      # TickDB 配置
      - TICKDB_API_KEY=${TICKDB_API_KEY}
      - TICKDB_SYMBOLS=${TICKDB_SYMBOLS:-BTC.USDT,ETH.USDT,AAPL.US}
      - TICKDB_WS_URL=${TICKDB_WS_URL:-wss://ws.tickdb.ai}
      - TICKDB_API_URL=${TICKDB_API_URL:-https://api.tickdb.ai}
      
      # 告警阈值
      - THRESHOLD_PRESSURE_RATIO=${THRESHOLD_PRESSURE_RATIO:-2.5}
      - THRESHOLD_SPREAD_BPS=${THRESHOLD_SPREAD_BPS:-30}
      
      # 告警渠道
      - ALERT_CHANNELS=${ALERT_CHANNELS:-feishu}
      - FEISHU_WEBHOOK_URL=${FEISHU_WEBHOOK_URL}
      - WECHAT_WORK_WEBHOOK=${WECHAT_WORK_WEBHOOK}
      - DINGTALK_WEBHOOK_URL=${DINGTALK_WEBHOOK_URL}
      
      # 邮件配置
      - SMTP_HOST=${SMTP_HOST}
      - SMTP_PORT=${SMTP_PORT:-587}
      - SMTP_USERNAME=${SMTP_USERNAME}
      - SMTP_PASSWORD=${SMTP_PASSWORD}
      - ALERT_FROM_ADDR=${ALERT_FROM_ADDR}
      - ALERT_TO_ADDRS=${ALERT_TO_ADDRS}
    
    volumes:
      # 持久化状态数据
      - ./state:/app/state
      # 持久化日志
      - ./logs:/app/logs
    
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 40s
    
    networks:
      - monitoring

  # 可选:Prometheus 监控
  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    restart: unless-stopped
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus
    networks:
      - monitoring
    profiles:
      - monitoring

  # 可选:Grafana 可视化
  grafana:
    image: grafana/grafana:latest
    container_name: grafana
    restart: unless-stopped
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD:-admin}
    volumes:
      - grafana_data:/var/lib/grafana
    networks:
      - monitoring
    profiles:
      - monitoring

networks:
  monitoring:
    driver: bridge

volumes:
  prometheus_data:
  grafana_data:

4.3 健康检查与监控

# healthcheck.py
"""
健康检查端点
用于 Docker healthcheck 和 K8s readiness probe
"""

from flask import Flask, jsonify
import psutil
import os

app = Flask(__name__)

@app.route('/health')
def health():
    """健康检查端点"""
    # 检查进程状态
    process = psutil.Process(os.getpid())
    
    # 检查内存使用
    memory_info = process.memory_info()
    memory_mb = memory_info.rss / 1024 / 1024
    
    # 检查 CPU 使用
    cpu_percent = process.cpu_percent(interval=0.1)
    
    return jsonify({
        "status": "healthy",
        "pid": os.getpid(),
        "memory_mb": round(memory_mb, 2),
        "cpu_percent": round(cpu_percent, 2),
        "uptime": process.create_time()
    })


@app.route('/ready')
def ready():
    """就绪检查端点"""
    # 检查 TickDB 连接
    from utils.tickdb_client import get_client
    try:
        client = get_client()
        # 简单探测连接
        return jsonify({
            "status": "ready",
            "tickdb_connected": client._connected if hasattr(client, '_connected') else True
        })
    except Exception as e:
        return jsonify({
            "status": "not_ready",
            "error": str(e)
        }), 503


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8080)

五、价值对比:自定义 SKILL vs 其他方案

维度 自己从头开发 使用 TickDB SKILL
开发周期 2-4 周 1-3 天
代码量 2000-5000 行 500-1000 行
重连/限频处理 需自行实现 原生支持
跨 SKILL 协作 支持 Function 复用
版本管理 Git 管理 SKILL Registry
企业合规 自行处理 可私有部署,数据不出域
团队共享 复制代码 一次发布,全员同步
维护成本

结论:SKILL 不是要替代你的业务逻辑,而是要接管那些“每个项目都要写一遍”的基础设施代码——连接管理、错误处理、监控告警、状态持久化。让你的团队专注于真正的业务价值。


六、总结与下一步

核心要点回顾

  1. SKILL 协议是 TickDB 生态的扩展层,让企业能够在标准 API 基础上构建专属能力
  2. Function 扩展机制提供了声明式的业务逻辑注册方式,支持版本管理和跨 SKILL 复用
  3. 生产级代码规范(心跳、重连、限频、超时)是企业级系统的必备要素
  4. 私有部署确保数据合规,满足金融机构的安全要求
  5. Docker Compose 部署兼顾了简单性和可扩展性

立即行动

如果你是个人开发者

  1. 访问 tickdb.ai 注册(免费 API Key)
  2. 复制本文的代码仓库
  3. 填写 .env 配置,运行 docker-compose up

如果你是企业团队

  1. 联系 [email protected] 获取私有部署方案
  2. 与技术团队对接 SKILL 定制需求
  3. 安排 PoC(概念验证),验证数据合规和技术可行性

如果你习惯用 AI 辅助开发
在 AI 助手中搜索安装 tickdb-market-data SKILL,可以直接用自然语言调用 TickDB 的各项能力。


风险提示:本文仅提供技术实现参考,不构成任何投资建议。企业级部署请咨询合规部门,确保符合当地监管要求。