企业级行情扩展:从标准 API 到自定义 SKILL 的实战指南
想象一个场景:你的量化团队需要实时监控 200 个标的的异常波动,当检测到流动性枯竭时自动触发对冲信号,同时将这些数据同步到内部的数据仓库。当团队规模从 3 人扩展到 30 人时,每个人都在重复造轮子——有人封装了 WebSocket,有人写了 K 线缓存,有人做了告警推送。
这是企业级行情系统的典型困境:标准 API 能解决 80% 的基础需求,但剩下的 20% 恰恰是企业竞争力的所在。TickDB SKILL 协议正是为解决这个断层而生——它不是让你换一套 API,而是让你在 TickDB 的基础上,用统一的方式扩展自己的业务逻辑。
本文将从 SKILL 的设计哲学出发,完整展示一个企业级行情监控 SKILL 的开发流程,涵盖开发规范、Function 扩展机制、以及私有部署的最佳实践。代码可直接运行,架构可直接复制。
一、为什么标准 API 不够用
在进入 SKILL 之前,先明确一个问题:TickDB 的 REST/WebSocket API 已经足够强大,为什么还需要 SKILL?
让我们看一个实际场景的对比:
| 维度 | 标准 REST/WebSocket API | SKILL 扩展 |
|---|---|---|
| 数据获取 | ✅ TickDB 全量数据 | ✅ 在获取后叠加自定义逻辑 |
| 异常检测 | ❌ 需要自己写 | ✅ 内置常见模式,注册即用 |
| 告警推送 | ❌ 需要对接飞书/企微/邮件 | ✅ 标准化接入,一次配置多处复用 |
| 多标的协同 | ❌ 需要自己管理状态 | ✅ 跨标的状态机原生支持 |
| 团队共享 | ❌ 每人复制一份代码 | ✅ 一次发布,团队同步 |
| 企业合规 | ❌ 数据出境/审计问题 | ✅ 私有部署,数据不出域 |
标准 API 是工具,SKILL 是能力。工具解决“怎么做”,能力解决“谁能用、怎么管、出了事谁负责”。
二、SKILL 协议的核心设计
2.1 协议架构概览
TickDB SKILL 基于 JSON-RPC 2.0 协议扩展,其核心设计理念是**“声明即能力”**——开发者声明 SKILL 的输入、输出、依赖关系,系统自动处理部署、版本管理、跨 SKILL 协作。
一个 SKILL 由以下核心组件构成:
# skill.yaml - SKILL 声明文件
metadata:
name: liquidity-monitor # SKILL 唯一标识
version: "1.0.0" # 语义化版本
description: 企业级流动性监控与异常告警
author: # 开发者信息
name: YourTeam
contact: [email protected]
capabilities: # SKILL 声明的能力
- data.input: tickdb.depth # 声明依赖 TickDB depth 数据
- data.output: alert.push # 声明输出为告警推送
- trigger.interval: 1s # 声明 1 秒级触发周期
config_schema: # 配置项定义
threshold_pressure_ratio:
type: float
default: 2.5
description: "买卖压力比阈值,超过此值触发告警"
alert_channels:
type: array
default: ["feishu"]
description: "告警渠道列表"
2.2 Function 扩展机制
Function 是 SKILL 的核心扩展单元。一个 SKILL 可以定义多个 Function,每个 Function 完成一个独立的业务逻辑。
# functions/liquidity_detector.py
"""
企业级流动性检测 Function
"""
def register_function(registry):
"""Function 注册函数"""
@registry.function(
name="detect_liquidity_vacuum",
description="检测流动性真空",
parameters={
"type": "object",
"properties": {
"symbol": {"type": "string", "description": "交易标的"},
"depth_window": {"type": "integer", "default": 5, "description": "深度窗口档数"},
"time_window": {"type": "integer", "default": 60, "description": "时间窗口(秒)"}
}
}
)
def detect_liquidity_vacuum(params: dict, context: dict) -> dict:
"""
检测流动性真空的核心逻辑
Args:
params: Function 输入参数
context: 执行上下文(包含 TickDB 连接、配置、历史状态)
Returns:
dict: {
"signal": "normal"|"warning"|"critical",
"pressure_ratio": float,
"spread_bps": float,
"recommendation": str
}
"""
symbol = params["symbol"]
depth_window = params.get("depth_window", 5)
time_window = params.get("time_window", 60)
# 从 context 获取 TickDB 连接
tickdb = context["tickdb"]
# 获取订单簿深度数据
depth_data = tickdb.get_depth(symbol, limit=depth_window)
# 计算买卖压力比
bid_volume = sum(d["bid_size"] for d in depth_data["bids"])
ask_volume = sum(d["ask_size"] for d in depth_data["asks"])
pressure_ratio = ask_volume / bid_volume if bid_volume > 0 else float('inf')
# 计算买卖价差(BPS)
best_bid = depth_data["bids"][0]["price"]
best_ask = depth_data["asks"][0]["price"]
spread_bps = (best_ask - best_bid) / best_bid * 10000
# 信号判定
if pressure_ratio > 3.0 or spread_bps > 50:
signal = "critical"
recommendation = "建议减仓或启用对冲"
elif pressure_ratio > 2.5 or spread_bps > 30:
signal = "warning"
recommendation = "关注流动性变化,准备应急方案"
else:
signal = "normal"
recommendation = "流动性正常"
return {
"signal": signal,
"pressure_ratio": round(pressure_ratio, 4),
"spread_bps": round(spread_bps, 2),
"recommendation": recommendation
}
return detect_liquidity_vacuum
2.3 生命周期与状态管理
企业级场景中,跨标的的状态追踪至关重要。SKILL 提供了原生的状态管理机制:
# functions/state_manager.py
"""
跨标的的状态管理器
"""
class LiquidityStateManager:
"""流动性监控状态管理器"""
def __init__(self, persistence_path: str = "./state"):
self.persistence_path = persistence_path
self._states = {} # symbol -> state dict
self._history = {} # symbol -> list of state changes
def update_state(self, symbol: str, signal_data: dict) -> dict:
"""更新标的的状态"""
import time
prev_state = self._states.get(symbol, {})
prev_signal = prev_state.get("signal", "unknown")
curr_signal = signal_data["signal"]
# 状态变化检测
state_changed = prev_signal != curr_signal
# 记录历史
if symbol not in self._history:
self._history[symbol] = []
self._history[symbol].append({
"timestamp": time.time(),
"prev_signal": prev_signal,
"curr_signal": curr_signal,
"pressure_ratio": signal_data["pressure_ratio"]
})
# 保持最近 1000 条历史
if len(self._history[symbol]) > 1000:
self._history[symbol] = self._history[symbol][-1000:]
# 更新当前状态
self._states[symbol] = {
"signal": curr_signal,
"pressure_ratio": signal_data["pressure_ratio"],
"spread_bps": signal_data["spread_bps"],
"last_update": time.time(),
"state_changed": state_changed
}
return self._states[symbol]
def get_alert_trigger(self, symbol: str, config: dict) -> dict:
"""判断是否需要触发告警"""
state = self._states.get(symbol, {})
# 新状态变为 warning 或 critical
if state.get("state_changed") and state.get("signal") != "normal":
return {
"should_alert": True,
"alert_level": state["signal"],
"message": f"[{symbol}] 流动性告警: {state['signal']}, "
f"压力比={state['pressure_ratio']}, "
f"价差={state['spread_bps']}bps"
}
# 持续 critical 状态,每 5 分钟告警一次
if state.get("signal") == "critical":
last_update = state.get("last_update", 0)
if time.time() - last_update > 300:
return {
"should_alert": True,
"alert_level": "critical_repeat",
"message": f"[{symbol}] 流动性持续紧张,请关注"
}
return {"should_alert": False}
三、生产级 SKILL 开发实战
3.1 项目结构
一个完整的企业级 SKILL 应遵循以下目录结构:
liquidity-monitor-skill/
├── skill.yaml # SKILL 声明文件
├── README.md # 使用文档
├── requirements.txt # Python 依赖
│
├── functions/ # Function 扩展目录
│ ├── __init__.py
│ ├── liquidity_detector.py # 流动性检测
│ ├── state_manager.py # 状态管理
│ └── alert_dispatcher.py # 告警分发
│
├── handlers/ # 事件处理器
│ ├── __init__.py
│ ├── on_depth_update.py # 深度数据更新处理
│ └── on_signal_change.py # 信号变化处理
│
├── utils/ # 工具函数
│ ├── __init__.py
│ ├── tickdb_client.py # TickDB 封装
│ └── alert_client.py # 告警客户端
│
├── tests/ # 测试目录
│ ├── test_liquidity_detector.py
│ └── test_state_manager.py
│
└── deploy/ # 部署配置
├── docker-compose.yml
└── Dockerfile
3.2 TickDB 客户端封装
# utils/tickdb_client.py
"""
TickDB 生产级客户端封装
包含:心跳、WebSocket 重连、限频处理、超时设置
"""
import os
import time
import json
import asyncio
import random
import logging
from typing import Optional, Callable, Dict, Any
from websocket import create_connection, WebSocketException
import requests
logger = logging.getLogger(__name__)
class TickDBProductionClient:
"""TickDB 生产级客户端"""
def __init__(
self,
api_key: str = None,
base_url: str = "https://api.tickdb.ai",
ws_url: str = "wss://ws.tickdb.ai",
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0
):
self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
if not self.api_key:
raise ValueError("API Key 未设置,请设置环境变量 TICKDB_API_KEY")
self.base_url = base_url
self.ws_url = ws_url
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self._ws = None
self._connected = False
self._ping_interval = 30 # Ping 间隔(秒)
# 速率限制状态
self._rate_limit_remaining = None
self._rate_limit_reset = None
def _request_with_retry(
self,
method: str,
endpoint: str,
params: dict = None,
json_data: dict = None,
timeout: tuple = (3.05, 10)
) -> dict:
"""
带重试机制的 HTTP 请求
Args:
method: HTTP 方法
endpoint: API 端点
params: URL 参数
json_data: JSON 请求体
timeout: (连接超时, 读取超时)
"""
url = f"{self.base_url}{endpoint}"
headers = {
"X-API-Key": self.api_key,
"Content-Type": "application/json"
}
for retry in range(self.max_retries):
try:
response = requests.request(
method=method,
url=url,
headers=headers,
params=params,
json=json_data,
timeout=timeout
)
# 速率限制处理
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
logger.warning(f"触发速率限制,等待 {retry_after} 秒")
time.sleep(retry_after)
continue
# 业务错误码处理 (code: 3001)
try:
data = response.json()
if data.get("code") == 3001:
retry_after = int(response.headers.get("Retry-After", 5))
logger.warning(f"业务限频(code:3001),等待 {retry_after} 秒")
time.sleep(retry_after)
continue
except:
pass
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
logger.warning(f"请求超时,重试 ({retry + 1}/{self.max_retries})")
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e},重试 ({retry + 1}/{self.max_retries})")
# 指数退避 + 抖动
if retry < self.max_retries - 1:
delay = min(self.base_delay * (2 ** retry), self.max_delay)
jitter = random.uniform(0, delay * 0.1)
time.sleep(delay + jitter)
raise RuntimeError(f"请求失败,已重试 {self.max_retries} 次")
def get_depth(self, symbol: str, limit: int = 10) -> dict:
"""获取订单簿深度"""
return self._request_with_retry(
"GET",
"/v1/market/depth",
params={"symbol": symbol, "limit": limit}
)
def get_kline(
self,
symbol: str,
interval: str = "1m",
limit: int = 100,
start_time: int = None,
end_time: int = None
) -> dict:
"""获取 K 线数据"""
params = {"symbol": symbol, "interval": interval, "limit": limit}
if start_time:
params["start_time"] = start_time
if end_time:
params["end_time"] = end_time
return self._request_with_retry("GET", "/v1/market/kline", params=params)
# ⚠️ 生产环境高频场景建议使用 aiohttp/asyncio
def subscribe_depth(
self,
symbols: list,
callback: Callable[[dict], None],
on_error: Callable[[Exception], None] = None
):
"""
WebSocket 订阅深度数据
Args:
symbols: 标的列表
callback: 数据回调
on_error: 错误回调
"""
self._subscribe_internal(symbols, "depth", callback, on_error)
def _subscribe_internal(
self,
symbols: list,
channel: str,
callback: Callable,
on_error: Callable = None
):
"""WebSocket 订阅内部实现"""
retry_count = 0
while True:
try:
# 建立 WebSocket 连接(URL 参数传递 API Key)
ws = create_connection(
f"{self.ws_url}?api_key={self.api_key}",
ping_timeout=20
)
self._ws = ws
self._connected = True
retry_count = 0 # 重置重试计数
logger.info(f"WebSocket 已连接,订阅 {channel} 频道: {symbols}")
# 发送订阅消息
subscribe_msg = {
"cmd": "subscribe",
"channel": channel,
"symbols": symbols
}
ws.send(json.dumps(subscribe_msg))
# 心跳 + 消息接收循环
while True:
# 发送心跳
ws.send(json.dumps({"cmd": "ping"}))
# 接收消息(带超时)
try:
message = ws.recv()
data = json.loads(message)
if data.get("type") == "pong":
continue
if data.get("channel") == channel:
callback(data)
except WebSocketException as e:
logger.error(f"WebSocket 断开: {e}")
raise
except (WebSocketException, ConnectionError) as e:
self._connected = False
retry_count += 1
if on_error:
on_error(e)
if retry_count >= self.max_retries:
logger.error(f"重试次数已达上限 ({self.max_retries})")
raise
# 指数退避 + 抖动
delay = min(self.base_delay * (2 ** retry_count), self.max_delay)
jitter = random.uniform(0, delay * 0.1)
logger.warning(f"WebSocket 重连中,{delay + jitter:.1f} 秒后重试 ({retry_count}/{self.max_retries})")
time.sleep(delay + jitter)
def close(self):
"""关闭连接"""
if self._ws:
self._ws.close()
self._connected = False
# 便捷函数:创建全局单例
_client_instance: Optional[TickDBProductionClient] = None
def get_client() -> TickDBProductionClient:
"""获取 TickDB 客户端单例"""
global _client_instance
if _client_instance is None:
_client_instance = TickDBProductionClient()
return _client_instance
3.3 告警分发器
# functions/alert_dispatcher.py
"""
告警分发器
支持:飞书、企业微信、钉钉、邮件
"""
import os
import time
import logging
from typing import List, Optional
from enum import Enum
logger = logging.getLogger(__name__)
class AlertChannel(Enum):
FEISHU = "feishu"
WECHAT_WORK = "wechat_work"
DINGTALK = "dingtalk"
EMAIL = "email"
WEBHOOK = "webhook"
class AlertDispatcher:
"""告警分发器"""
def __init__(self):
self._channels: dict = {}
self._init_channels()
def _init_channels(self):
"""初始化各渠道客户端"""
# 飞书 Webhook
feishu_webhook = os.environ.get("FEISHU_WEBHOOK_URL")
if feishu_webhook:
self._channels[AlertChannel.FEISHU] = {
"webhook": feishu_webhook,
"enabled": True
}
# 企业微信 Webhook
wechat_webhook = os.environ.get("WECHAT_WORK_WEBHOOK")
if wechat_webhook:
self._channels[AlertChannel.WECHAT_WORK] = {
"webhook": wechat_webhook,
"enabled": True
}
# 钉钉 Webhook
dingtalk_webhook = os.environ.get("DINGTALK_WEBHOOK_URL")
if dingtalk_webhook:
self._channels[AlertChannel.DINGTALK] = {
"webhook": dingtalk_webhook,
"enabled": True
}
# 邮件配置
smtp_host = os.environ.get("SMTP_HOST")
if smtp_host:
self._channels[AlertChannel.EMAIL] = {
"host": smtp_host,
"port": int(os.environ.get("SMTP_PORT", 587)),
"username": os.environ.get("SMTP_USERNAME"),
"password": os.environ.get("SMTP_PASSWORD"),
"from_addr": os.environ.get("ALERT_FROM_ADDR"),
"to_addrs": os.environ.get("ALERT_TO_ADDRS", "").split(","),
"enabled": True
}
logger.info(f"已初始化 {len(self._channels)} 个告警渠道")
def dispatch(
self,
message: str,
level: str = "warning",
channels: List[str] = None,
title: str = None
) -> dict:
"""
分发告警
Args:
message: 告警内容
level: 告警级别 (info/warning/critical)
channels: 指定渠道,不指定则发送到所有已配置渠道
title: 告警标题
"""
if channels is None:
channels = [c.value for c in self._channels.keys() if self._channels[c].get("enabled")]
results = {}
for channel_name in channels:
try:
channel = AlertChannel(channel_name)
if channel not in self._channels or not self._channels[channel].get("enabled"):
results[channel_name] = {"status": "skipped", "reason": "channel not configured"}
continue
result = self._send_to_channel(channel, message, level, title)
results[channel_name] = result
except ValueError:
results[channel_name] = {"status": "error", "reason": f"unknown channel: {channel_name}"}
except Exception as e:
logger.error(f"发送告警到 {channel_name} 失败: {e}")
results[channel_name] = {"status": "error", "reason": str(e)}
return results
def _send_to_channel(
self,
channel: AlertChannel,
message: str,
level: str,
title: str = None
) -> dict:
"""发送到指定渠道"""
import requests
if channel == AlertChannel.FEISHU:
return self._send_feishu(message, level, title)
elif channel == AlertChannel.WECHAT_WORK:
return self._send_wechat_work(message, level, title)
elif channel == AlertChannel.DINGTALK:
return self._send_dingtalk(message, level, title)
elif channel == AlertChannel.EMAIL:
return self._send_email(message, level, title)
return {"status": "unknown"}
def _send_feishu(self, message: str, level: str, title: str = None) -> dict:
"""发送飞书消息"""
import requests
webhook = self._channels[AlertChannel.FEISHU]["webhook"]
# 根据告警级别选择颜色
color_map = {
"info": "blue",
"warning": "yellow",
"critical": "red"
}
payload = {
"msg_type": "interactive",
"card": {
"header": {
"title": {
"tag": "plain_text",
"content": title or f"【{level.upper()}】TickDB 告警"
},
"template": color_map.get(level, "gray")
},
"elements": [
{
"tag": "div",
"text": {
"tag": "lark_md",
"content": message
}
},
{
"tag": "note",
"elements": [
{
"tag": "plain_text",
"content": f"触发时间: {time.strftime('%Y-%m-%d %H:%M:%S')}"
}
]
}
]
}
}
response = requests.post(webhook, json=payload, timeout=10)
return {"status": "success" if response.ok else "error", "response": response.text}
def _send_wechat_work(self, message: str, level: str, title: str = None) -> dict:
"""发送企业微信消息"""
import requests
webhook = self._channels[AlertChannel.WECHAT_WORK]["webhook"]
payload = {
"msgtype": "markdown",
"markdown": {
"content": f"### {title or f'【{level.upper()}】TickDB 告警'}\n{message}\n> 触发时间: {time.strftime('%Y-%m-%d %H:%M:%S')}"
}
}
response = requests.post(webhook, json=payload, timeout=10)
return {"status": "success" if response.ok else "error"}
def _send_dingtalk(self, message: str, level: str, title: str = None) -> dict:
"""发送钉钉消息"""
import requests
webhook = self._channels[AlertChannel.DINGTALK]["webhook"]
payload = {
"msgtype": "markdown",
"markdown": {
"title": title or f"{level.upper()} 告警",
"text": f"### {title or f'【{level.upper()}】TickDB 告警'}\n{message}\n> 触发时间: {time.strftime('%Y-%m-%d %H:%M:%S')}"
}
}
response = requests.post(webhook, json=payload, timeout=10)
return {"status": "success" if response.ok else "error"}
def _send_email(self, message: str, level: str, title: str = None) -> dict:
"""发送邮件"""
import smtplib
from email.mime.text import MIMEText
from email.header import Header
config = self._channels[AlertChannel.EMAIL]
msg = MIMEText(message, 'html', 'utf-8')
msg['Subject'] = Header(title or f"[{level.upper()}] TickDB 行情告警", 'utf-8')
msg['From'] = config["from_addr"]
msg['To'] = ",".join(config["to_addrs"])
try:
with smtplib.SMTP(config["host"], config["port"]) as server:
server.starttls()
server.login(config["username"], config["password"])
server.send_message(msg)
return {"status": "success"}
except Exception as e:
return {"status": "error", "reason": str(e)}
3.4 SKILL 主入口
# main.py
"""
Liquidity Monitor SKILL - 主入口
"""
import os
import time
import logging
import signal
import sys
from typing import List
from utils.tickdb_client import TickDBProductionClient
from utils.alert_client import AlertDispatcher
from functions.liquidity_detector import detect_liquidity_vacuum
from functions.state_manager import LiquidityStateManager
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class LiquidityMonitorSkill:
"""流动性监控 SKILL 主类"""
def __init__(self, symbols: List[str] = None):
"""
初始化 SKILL
Args:
symbols: 监控的标的列表,默认从环境变量 TICKDB_SYMBOLS 读取
"""
# 初始化配置
self.symbols = symbols or os.environ.get(
"TICKDB_SYMBOLS", "BTC.USDT,ETH.USDT,AAPL.US"
).split(",")
self.threshold_pressure_ratio = float(
os.environ.get("THRESHOLD_PRESSURE_RATIO", "2.5")
)
self.threshold_spread_bps = float(
os.environ.get("THRESHOLD_SPREAD_BPS", "30")
)
self.alert_channels = os.environ.get(
"ALERT_CHANNELS", "feishu"
).split(",")
# 初始化组件
self.tickdb = TickDBProductionClient()
self.alert_dispatcher = AlertDispatcher()
self.state_manager = LiquidityStateManager()
# 运行状态
self._running = False
logger.info(f"SKILL 初始化完成,监控标的: {self.symbols}")
logger.info(f"告警阈值 - 压力比: {self.threshold_pressure_ratio}, 价差: {self.threshold_spread_bps}bps")
logger.info(f"告警渠道: {self.alert_channels}")
def start(self):
"""启动 SKILL"""
self._running = True
# 注册信号处理
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
logger.info("SKILL 启动,开始监控...")
try:
self.tickdb.subscribe_depth(
symbols=self.symbols,
callback=self._on_depth_update,
on_error=self._on_ws_error
)
except KeyboardInterrupt:
logger.info("收到键盘中断信号")
except Exception as e:
logger.error(f"SKILL 运行异常: {e}")
raise
finally:
self.stop()
def stop(self):
"""停止 SKILL"""
self._running = False
self.tickdb.close()
logger.info("SKILL 已停止")
def _signal_handler(self, signum, frame):
"""处理系统信号"""
logger.info(f"收到信号 {signum},准备停止...")
self._running = False
self.stop()
sys.exit(0)
def _on_depth_update(self, data: dict):
"""
深度数据更新回调
Args:
data: TickDB WebSocket 推送的数据
"""
try:
symbol = data.get("symbol")
depth = data.get("depth", {})
bids = depth.get("bids", [])
asks = depth.get("asks", [])
if not bids or not asks:
return
# 计算买卖压力比
bid_volume = sum(b.get("size", 0) for b in bids)
ask_volume = sum(a.get("size", 0) for a in asks)
pressure_ratio = ask_volume / bid_volume if bid_volume > 0 else float('inf')
# 计算买卖价差(BPS)
best_bid = bids[0].get("price", 0)
best_ask = asks[0].get("price", 0)
spread_bps = (best_ask - best_bid) / best_bid * 10000 if best_bid > 0 else 0
signal_data = {
"signal": "normal",
"pressure_ratio": pressure_ratio,
"spread_bps": spread_bps,
"recommendation": ""
}
# 信号判定
if pressure_ratio > 3.0 or spread_bps > 50:
signal_data["signal"] = "critical"
signal_data["recommendation"] = "建议减仓或启用对冲"
elif pressure_ratio > self.threshold_pressure_ratio or spread_bps > self.threshold_spread_bps:
signal_data["signal"] = "warning"
signal_data["recommendation"] = "关注流动性变化,准备应急方案"
# 更新状态
self.state_manager.update_state(symbol, signal_data)
# 检查是否需要告警
alert_info = self.state_manager.get_alert_trigger(symbol, {
"threshold_pressure_ratio": self.threshold_pressure_ratio,
"threshold_spread_bps": self.threshold_spread_bps
})
if alert_info.get("should_alert"):
self._send_alert(alert_info)
# 调试日志
if signal_data["signal"] != "normal":
logger.warning(
f"[{symbol}] 信号={signal_data['signal']}, "
f"压力比={pressure_ratio:.2f}, 价差={spread_bps:.1f}bps"
)
except Exception as e:
logger.error(f"处理深度数据异常: {e}")
def _send_alert(self, alert_info: dict):
"""发送告警"""
level_map = {
"warning": "warning",
"critical": "critical",
"critical_repeat": "critical"
}
results = self.alert_dispatcher.dispatch(
message=alert_info["message"],
level=level_map.get(alert_info["alert_level"], "warning"),
channels=self.alert_channels,
title="流动性异常告警"
)
for channel, result in results.items():
if result.get("status") == "success":
logger.info(f"告警已发送至 {channel}")
else:
logger.error(f"告警发送至 {channel} 失败: {result.get('reason')}")
def _on_ws_error(self, error: Exception):
"""WebSocket 错误处理"""
logger.error(f"WebSocket 连接错误: {error}")
# SKILL 入口点
def main():
"""SKILL 主入口"""
skill = LiquidityMonitorSkill()
skill.start()
if __name__ == "__main__":
main()
四、私有部署方案
4.1 部署架构
企业级 SKILL 的私有部署需要考虑数据合规、访问控制和高可用。以下是三种典型部署模式:
| 部署模式 | 适用场景 | 数据位置 | 复杂度 |
|---|---|---|---|
| 单机 Docker | 小团队验证、个人使用 | 数据出域 | ⭐ |
| Docker Compose | 团队内部、高可用 | 可选数据本地缓存 | ⭐⭐ |
| K8s 集群 | 大规模部署、多租户 | 完全私有化 | ⭐⭐⭐ |
4.2 Docker Compose 部署配置
# deploy/docker-compose.yml
version: '3.8'
services:
liquidity-monitor:
build:
context: ..
dockerfile: deploy/Dockerfile
container_name: liquidity-monitor
restart: unless-stopped
environment:
# TickDB 配置
- TICKDB_API_KEY=${TICKDB_API_KEY}
- TICKDB_SYMBOLS=${TICKDB_SYMBOLS:-BTC.USDT,ETH.USDT,AAPL.US}
- TICKDB_WS_URL=${TICKDB_WS_URL:-wss://ws.tickdb.ai}
- TICKDB_API_URL=${TICKDB_API_URL:-https://api.tickdb.ai}
# 告警阈值
- THRESHOLD_PRESSURE_RATIO=${THRESHOLD_PRESSURE_RATIO:-2.5}
- THRESHOLD_SPREAD_BPS=${THRESHOLD_SPREAD_BPS:-30}
# 告警渠道
- ALERT_CHANNELS=${ALERT_CHANNELS:-feishu}
- FEISHU_WEBHOOK_URL=${FEISHU_WEBHOOK_URL}
- WECHAT_WORK_WEBHOOK=${WECHAT_WORK_WEBHOOK}
- DINGTALK_WEBHOOK_URL=${DINGTALK_WEBHOOK_URL}
# 邮件配置
- SMTP_HOST=${SMTP_HOST}
- SMTP_PORT=${SMTP_PORT:-587}
- SMTP_USERNAME=${SMTP_USERNAME}
- SMTP_PASSWORD=${SMTP_PASSWORD}
- ALERT_FROM_ADDR=${ALERT_FROM_ADDR}
- ALERT_TO_ADDRS=${ALERT_TO_ADDRS}
volumes:
# 持久化状态数据
- ./state:/app/state
# 持久化日志
- ./logs:/app/logs
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
networks:
- monitoring
# 可选:Prometheus 监控
prometheus:
image: prom/prometheus:latest
container_name: prometheus
restart: unless-stopped
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
networks:
- monitoring
profiles:
- monitoring
# 可选:Grafana 可视化
grafana:
image: grafana/grafana:latest
container_name: grafana
restart: unless-stopped
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD:-admin}
volumes:
- grafana_data:/var/lib/grafana
networks:
- monitoring
profiles:
- monitoring
networks:
monitoring:
driver: bridge
volumes:
prometheus_data:
grafana_data:
4.3 健康检查与监控
# healthcheck.py
"""
健康检查端点
用于 Docker healthcheck 和 K8s readiness probe
"""
from flask import Flask, jsonify
import psutil
import os
app = Flask(__name__)
@app.route('/health')
def health():
"""健康检查端点"""
# 检查进程状态
process = psutil.Process(os.getpid())
# 检查内存使用
memory_info = process.memory_info()
memory_mb = memory_info.rss / 1024 / 1024
# 检查 CPU 使用
cpu_percent = process.cpu_percent(interval=0.1)
return jsonify({
"status": "healthy",
"pid": os.getpid(),
"memory_mb": round(memory_mb, 2),
"cpu_percent": round(cpu_percent, 2),
"uptime": process.create_time()
})
@app.route('/ready')
def ready():
"""就绪检查端点"""
# 检查 TickDB 连接
from utils.tickdb_client import get_client
try:
client = get_client()
# 简单探测连接
return jsonify({
"status": "ready",
"tickdb_connected": client._connected if hasattr(client, '_connected') else True
})
except Exception as e:
return jsonify({
"status": "not_ready",
"error": str(e)
}), 503
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8080)
五、价值对比:自定义 SKILL vs 其他方案
| 维度 | 自己从头开发 | 使用 TickDB SKILL |
|---|---|---|
| 开发周期 | 2-4 周 | 1-3 天 |
| 代码量 | 2000-5000 行 | 500-1000 行 |
| 重连/限频处理 | 需自行实现 | 原生支持 |
| 跨 SKILL 协作 | 无 | 支持 Function 复用 |
| 版本管理 | Git 管理 | SKILL Registry |
| 企业合规 | 自行处理 | 可私有部署,数据不出域 |
| 团队共享 | 复制代码 | 一次发布,全员同步 |
| 维护成本 | 高 | 低 |
结论:SKILL 不是要替代你的业务逻辑,而是要接管那些“每个项目都要写一遍”的基础设施代码——连接管理、错误处理、监控告警、状态持久化。让你的团队专注于真正的业务价值。
六、总结与下一步
核心要点回顾
- SKILL 协议是 TickDB 生态的扩展层,让企业能够在标准 API 基础上构建专属能力
- Function 扩展机制提供了声明式的业务逻辑注册方式,支持版本管理和跨 SKILL 复用
- 生产级代码规范(心跳、重连、限频、超时)是企业级系统的必备要素
- 私有部署确保数据合规,满足金融机构的安全要求
- Docker Compose 部署兼顾了简单性和可扩展性
立即行动
如果你是个人开发者:
- 访问 tickdb.ai 注册(免费 API Key)
- 复制本文的代码仓库
- 填写
.env配置,运行docker-compose up
如果你是企业团队:
- 联系 [email protected] 获取私有部署方案
- 与技术团队对接 SKILL 定制需求
- 安排 PoC(概念验证),验证数据合规和技术可行性
如果你习惯用 AI 辅助开发:
在 AI 助手中搜索安装 tickdb-market-data SKILL,可以直接用自然语言调用 TickDB 的各项能力。
风险提示:本文仅提供技术实现参考,不构成任何投资建议。企业级部署请咨询合规部门,确保符合当地监管要求。