让行情数据成为业务的延伸:TickDB SKILL 自定义开发完全指南
“最好的系统不是功能最多的,而是最容易被扩展的。”
这句话在量化交易领域尤为真实。当你的团队需要在 TickDB 标准接口之上构建专属的行情处理逻辑时,你面临的选择是:要么在业务层写一堆胶水代码,要么让数据层本身学会“理解”你的业务规则。
SKILL 协议给了你第二种选择。本文深入解析 TickDB SKILL 的开发规范、Function 扩展机制,以及企业级私有部署方案,让 TickDB 的行情数据能力真正成为你业务逻辑的原生组成部分。
为什么你需要的不仅是“更好的 API”
先看一个真实的业务场景。
某量化团队的行情架构是这样的:TickDB 提供标准 WebSocket 推送,数据进入 Kafka,消费端写一个复杂的流处理逻辑——解析订单簿快照、计算买卖压力比、判断流动性拐点、触发信号通知。这套架构运行三个月后,他们发现几个致命问题:
- 数据转换逻辑散落在各处:Kafka Consumer 里写一段,Python 脚本里复制一段,告警模块又重复实现一次。当业务规则需要调整时,改一处漏三处。
- 响应延迟叠加:数据从 TickDB 到达业务层至少经过 3-4 个环节,每次转换都有 10-50ms 的累积延迟。在事件驱动的场景下,这意味着你可能错过最佳入场窗口。
- 维护成本线性增长:每接入一个新数据源或新指标,团队都要重新写一遍“获取数据→转换格式→计算指标→触发动作”的完整链路。
这是很多团队踩过的坑。本质原因是:标准 API 只能返回“数据”,不能返回“业务结论”。当你需要的是“当 XYZ 条件满足时通知我”,而不是“给我最新报价”,标准的 REST/WebSocket 接口就不够用了。
SKILL 协议正是为解决这个问题而设计。它不是另一个数据接口,而是将你的业务逻辑嵌入数据层的能力扩展框架。
SKILL 协议的核心架构
协议设计哲学
SKILL 协议的核心设计哲学可以概括为一句话:让数据层理解意图,而不是让业务层处理数据。
传统的 API 调用模式是拉取式:
业务层 → "我要数据" → API → 返回原始数据 → 业务层处理
SKILL 的调用模式是意图式:
业务层 → "帮我监控 X 场景" → SKILL → 内置业务逻辑处理 → 返回结论
这种模式带来的变化是根本性的。TickDB 的 SKILL 不再是一个只知道“返回什么”的数据管道,而是一个可以声明式定义业务规则的智能层。
SKILL 的三层架构
一个完整的 TickDB SKILL 由三层构成:
┌─────────────────────────────────────────────────────┐
│ 描述层(YAML Manifest) │
│ - SKILL 元信息(名称、版本、作者) │
│ - Function 定义(参数、返回类型、说明文档) │
│ - 数据源绑定(使用哪些 TickDB 频道/接口) │
├─────────────────────────────────────────────────────┤
│ 逻辑层(TypeScript/JavaScript) │
│ - Function 实现(业务规则、计算逻辑) │
│ - 状态管理(监控上下文、历史窗口) │
│ - 依赖注入(其他 SKILL 的 Function 调用) │
├─────────────────────────────────────────────────────┤
│ 适配层(TickDB Runtime) │
│ - WebSocket 连接管理(心跳、重连、限频) │
│ - API 鉴权(X-API-Key 注入) │
│ - 错误处理(code:3001 → retry_after) │
└─────────────────────────────────────────────────────┘
描述层是 SKILL 的接口契约,定义了这个 SKILL 能做什么、如何调用。逻辑层是开发者编写业务规则的地方。适配层由 TickDB Runtime 提供,让开发者专注于业务逻辑,不用关心底层连接管理的细节。
Function:SKILL 的基本单元
Function 是 SKILL 的基本交互单元。每个 Function 代表一个独立的业务操作,可以是数据查询、实时监控、条件判断,或复合动作。
一个 Function 的完整定义包含以下字段:
functions:
- name: monitor_liquidity_breakdown
description: |
监控订单簿流动性突变。当买卖压力比在 30 秒内从
正常区间(0.5~2.0)突破至极端值时触发告警。
parameters:
symbol:
type: string
required: true
description: 交易品种代码,如 "AAPL.US"
threshold:
type: number
required: false
default: 0.3
description: 触发告警的压力比极端阈值
window_seconds:
type: number
required: false
default: 30
description: 滑动窗口时长(秒)
returns:
type: object
properties:
triggered: boolean
pressure_ratio: number
volatility: number
timestamp: string
这个定义直接暴露给 AI 助手,让 AI 能够理解在什么场景下应该调用这个 Function,以及如何构造参数。
企业级开发实战:从需求到部署
场景设定
假设你所在的机构需要构建一个“机构订单流分析 SKILL”,功能包括:
- 实时监控多档订单簿深度
- 计算机构大单压力指数(识别短时间内大量卖单冲击)
- 当指数触发阈值时,自动生成订阅请求并推送告警
这是一个典型的企业级需求:标准 TickDB API 提供的是原始数据,你需要的是经过业务逻辑加工后的可操作信号。
第一步:项目初始化
创建一个 SKILL 项目的基本目录结构:
institutional-orderflow-skill/
├── skill.yaml # SKILL 描述文件
├── src/
│ ├── functions/
│ │ ├── monitor_depth.ts
│ │ ├── calculate_pressure_index.ts
│ │ └── subscribe_alert.ts
│ ├── utils/
│ │ ├── depth_parser.ts
│ │ └── rolling_window.ts
│ └── index.ts # 导出所有 Function
├── package.json
├── tsconfig.json
└── README.md
第二步:编写 SKILL 描述文件
skill.yaml 是 SKILL 的核心契约,定义元信息和所有 Function:
apiVersion: tickdb/skill/v1
name: institutional-orderflow
version: 1.0.0
author: Your Organization
description: |
机构订单流分析套件。专注于大单冲击检测、流动性拐点识别
和机构级行情监控。为量化团队和机构交易台设计。
dataSources:
- channel: depth
markets: [us, hk, crypto]
description: 订单簿深度数据,用于计算买卖压力比
- channel: kline
markets: [us, hk, crypto]
description: K 线数据,用于计算历史波动率基准
functions:
- name: monitor_depth
description: |
实时监控订单簿深度变化,计算买卖压力比。
适用于流动性异常检测场景。
parameters:
symbol:
type: string
required: true
description: 交易品种代码
levels:
type: integer
required: false
default: 5
description: 监控档位数(1-10)
interval_ms:
type: integer
required: false
default: 100
description: 更新间隔(毫秒)
- name: calculate_pressure_index
description: |
计算机构大单压力指数。识别短时间内大量卖单
冲击导致的流动性真空。返回值包含指数值和
历史分位数,用于判断当前状态的极端程度。
parameters:
symbol:
type: string
required: true
window_seconds:
type: integer
required: false
default: 60
description: 计算窗口时长
large_order_threshold:
type: number
required: false
default: 50000
description: 大单认定阈值(股数)
- name: subscribe_alert
description: |
创建条件订阅。当 pressure_index 突破阈值时,
自动通过 Webhook 或消息队列推送告警。
parameters:
conditions:
type: object
required: true
description: 触发条件配置
webhook_url:
type: string
required: true
description: 告警接收地址
runtime:
nodeVersion: ">=18.0.0"
permissions:
- tickdb:depth:subscribe
- tickdb:kline:read
- network:webhook:post
第三步:实现核心 Function
3.1 订单簿深度监控
这是基础的实时监控 Function,使用 TickDB WebSocket 订阅 depth 频道:
// src/functions/monitor_depth.ts
import WebSocket from 'ws';
import { EventEmitter } from 'events';
interface DepthConfig {
symbol: string;
levels: number;
interval_ms: number;
}
interface DepthSnapshot {
symbol: string;
timestamp: number;
bids: Array<{ price: number; volume: number }>;
asks: Array<{ price: number; volume: number }>;
spread: number;
pressure_ratio: number;
}
// ⚠️ 生产环境建议使用 aiohttp/asyncio 或原生 WebSocket 心跳
// 本示例展示核心逻辑,生产部署需自行添加重连、退避逻辑
export async function monitorDepth(config: DepthConfig) {
const { symbol, levels, interval_ms } = config;
const apiKey = process.env.TICKDB_API_KEY;
if (!apiKey) {
throw new Error('TICKDB_API_KEY 环境变量未设置');
}
const ws = new WebSocket(
`wss://api.tickdb.ai/v1/ws/depth?api_key=${apiKey}&symbol=${symbol}&levels=${levels}`
);
const emitter = new EventEmitter();
let lastEmitTime = 0;
ws.on('open', () => {
console.log(`[${symbol}] WebSocket 连接已建立`);
});
ws.on('message', (data: WebSocket.Data) => {
const now = Date.now();
// 限频保护:控制输出频率
if (now - lastEmitTime < interval_ms) {
return;
}
lastEmitTime = now;
try {
const snapshot = parseDepthSnapshot(JSON.parse(data.toString()), symbol);
emitter.emit('depth', snapshot);
} catch (err) {
console.error(`[${symbol}] 数据解析错误:`, err);
}
});
ws.on('error', (err) => {
console.error(`[${symbol}] WebSocket 错误:`, err.message);
});
// ping/pong 心跳保活
ws.on('ping', () => {
ws.pong();
});
return emitter;
}
function parseDepthSnapshot(raw: any, symbol: string): DepthSnapshot {
const bids = raw.data?.bids || [];
const asks = raw.data?.asks || [];
const bidVolume = bids.reduce((sum: number, b: any) => sum + b.volume, 0);
const askVolume = asks.reduce((sum: number, b: any) => sum + b.volume, 0);
const pressure_ratio = askVolume > 0 ? bidVolume / askVolume : 1;
return {
symbol,
timestamp: Date.now(),
bids: bids.slice(0, 10),
asks: asks.slice(0, 10),
spread: bids[0] && asks[0] ? asks[0].price - bids[0].price : 0,
pressure_ratio,
};
}
3.2 机构大单压力指数计算
这个 Function 组合了多个数据源和滑动窗口逻辑,是典型的企业级业务规则实现:
// src/functions/calculate_pressure_index.ts
import { RollingWindow } from '../utils/rolling_window';
interface PressureConfig {
symbol: string;
window_seconds: number;
large_order_threshold: number;
}
interface PressureResult {
symbol: string;
index: number;
percentile: number;
large_order_count: number;
avg_order_size: number;
timestamp: number;
}
// 模拟从 trades 接口获取逐笔成交数据
// 注意:trades 接口不支持美股和 A 股,此处仅作逻辑演示
async function getRecentTrades(symbol: string, seconds: number) {
const apiKey = process.env.TICKDB_API_KEY;
const endTime = Date.now();
const startTime = endTime - seconds * 1000;
const response = await fetch(
`https://api.tickdb.ai/v1/market/trades?symbol=${symbol}&start_time=${startTime}&end_time=${endTime}&limit=1000`,
{
headers: {
'X-API-Key': apiKey!,
'Content-Type': 'application/json',
},
// ⚠️ 超时设置:HTTP 请求必须设置 timeout
signal: AbortSignal.timeout(10000),
}
);
if (!response.ok) {
const error = await response.json();
if (error.code === 3001) {
const retryAfter = response.headers.get('Retry-After') || '5';
console.warn(`限频触发,等待 ${retryAfter} 秒后重试`);
await new Promise((r) => setTimeout(r, parseInt(retryAfter) * 1000));
return getRecentTrades(symbol, seconds);
}
throw new Error(`API 错误 ${error.code}: ${error.message}`);
}
return response.json();
}
export async function calculatePressureIndex(config: PressureConfig) {
const { symbol, window_seconds, large_order_threshold } = config;
// 获取历史数据用于计算分位数基准
const historicalWindow = new RollingWindow<number>(1000); // 1000 个周期历史
const trades = await getRecentTrades(symbol, window_seconds);
const largeOrders: Array<{ volume: number; timestamp: number }> = [];
let totalVolume = 0;
for (const trade of trades.data || []) {
const volume = trade.volume || 0;
totalVolume += volume;
if (volume >= large_order_threshold) {
largeOrders.push({ volume, timestamp: trade.timestamp });
}
}
const orderCount = trades.data?.length || 1;
const avgOrderSize = totalVolume / orderCount;
// 机构大单压力指数计算
// 逻辑:大量大单 + 高频成交 = 高压力
const largeOrderRatio = largeOrders.length / orderCount;
const volumeRatio = totalVolume / (window_seconds * 1000); // 每秒成交量
const index = (largeOrderRatio * 0.6 + Math.min(volumeRatio / 1000, 1) * 0.4) * 100;
// 计算历史分位数
historicalWindow.push(index);
const sorted = [...historicalWindow.data].sort((a, b) => a - b);
const percentile = sorted.filter((v) => v <= index).length / sorted.length * 100;
return {
symbol,
index: Math.round(index * 100) / 100,
percentile: Math.round(percentile * 10) / 10,
large_order_count: largeOrders.length,
avg_order_size: Math.round(avgOrderSize * 100) / 100,
timestamp: Date.now(),
} as PressureResult;
}
3.3 条件订阅告警
最后一个 Function 展示了如何将业务规则转化为可执行的订阅逻辑:
// src/functions/subscribe_alert.ts
import { EventEmitter } from 'events';
import { monitorDepth } from './monitor_depth';
import { calculatePressureIndex } from './calculate_pressure_index';
interface AlertCondition {
metric: 'pressure_ratio' | 'pressure_index' | 'spread_pct';
operator: '>' | '<' | '>=' | '<=';
threshold: number;
}
interface AlertConfig {
symbol: string;
conditions: AlertCondition[];
webhook_url: string;
}
export async function subscribeAlert(config: AlertConfig) {
const { symbol, conditions, webhook_url } = config;
const apiKey = process.env.TICKDB_API_KEY;
if (!apiKey) {
throw new Error('TICKDB_API_KEY 环境变量未设置');
}
const emitter = new EventEmitter();
// 启动深度监控
const depthEmitter = await monitorDepth({
symbol,
levels: 5,
interval_ms: 100,
});
// 同时启动压力指数计算(60 秒窗口)
const pressureResult = await calculatePressureIndex({
symbol,
window_seconds: 60,
large_order_threshold: 50000,
});
// 告警触发逻辑
function evaluateConditions(pressureRatio: number, pressureIndex: number) {
for (const condition of conditions) {
let currentValue: number;
if (condition.metric === 'pressure_ratio') {
currentValue = pressureRatio;
} else if (condition.metric === 'pressure_index') {
currentValue = pressureIndex;
} else {
continue; // spread_pct 需要额外计算
}
const passed = evaluateOperator(
currentValue,
condition.operator,
condition.threshold
);
if (passed) {
triggerAlert({
symbol,
metric: condition.metric,
value: currentValue,
threshold: condition.threshold,
pressure_index: pressureIndex,
timestamp: Date.now(),
});
}
}
}
function evaluateOperator(
value: number,
operator: string,
threshold: number
): boolean {
switch (operator) {
case '>': return value > threshold;
case '<': return value < threshold;
case '>=': return value >= threshold;
case '<=': return value <= threshold;
default: return false;
}
}
async function triggerAlert(payload: any) {
console.log(`[${symbol}] 告警触发:`, JSON.stringify(payload));
try {
// 指数退避 + 抖动重试
await postWithRetry(webhook_url, payload, 3);
emitter.emit('alert', payload);
} catch (err) {
console.error(`[${symbol}] 告警发送失败:`, err);
}
}
depthEmitter.on('depth', (snapshot) => {
evaluateConditions(snapshot.pressure_ratio, pressureResult.index);
});
return emitter;
}
// 指数退避 + 抖动重试
async function postWithRetry(
url: string,
payload: any,
maxRetries: number
): Promise<void> {
const baseDelay = 1000;
const maxDelay = 10000;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'User-Agent': 'TickDB-SKILL/1.0',
},
body: JSON.stringify(payload),
signal: AbortSignal.timeout(5000),
});
if (response.ok) {
return;
}
if (response.status === 429) {
const retryAfter = response.headers.get('Retry-After') || '5';
await sleep(parseInt(retryAfter) * 1000);
continue;
}
throw new Error(`HTTP ${response.status}`);
} catch (err) {
if (attempt === maxRetries) {
throw err;
}
const delay = Math.min(baseDelay * Math.pow(2, attempt), maxDelay);
const jitter = Math.random() * delay * 0.1; // 10% 抖动
await sleep(delay + jitter);
}
}
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
第四步:导出与打包
src/index.ts 汇总所有 Function:
// src/index.ts
export { monitorDepth } from './functions/monitor_depth';
export { calculatePressureIndex } from './functions/calculate_pressure_index';
export { subscribeAlert } from './functions/subscribe_alert';
打包时,使用 TypeScript 编译为 JavaScript,并生成 SKILL Bundle:
# 安装依赖
npm install
# 编译 TypeScript
npx tsc
# 打包 SKILL(需要 tickdb-cli)
npx tickdb skill build --input ./dist --output ./institutional-orderflow-v1.0.0.skill
企业私有部署方案
三种部署模式
TickDB SKILL 支持三种部署模式,满足从个人开发者到大型机构的不同需求:
| 模式 | 适用场景 | 部署复杂度 | 数据主权 | 成本 |
|---|---|---|---|---|
| 云端托管 | 个人开发者、SaaS 化产品 | 极低(一键发布) | 共享 TickDB 云 | 按调用量计费 |
| 私有集群 | 中型机构、合规要求 | 中等(K8s 部署) | 数据不出本地 | 固定年费 + 用量 |
| 完全私有 | 大型机构、核心系统 | 高(自建全套) | 完全自主 | 项目制 |
私有集群部署架构
对于需要数据主权但不想自建全套基础设施的机构,私有集群模式是最佳选择:
┌──────────────────────────────────────────────────────────────┐
│ 机构内网 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 量化终端 │ │ 风控系统 │ │ 策略回测 │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └──────────────────┼──────────────────┘ │
│ │ │
│ ┌───────▼───────┐ │
│ │ TickDB │ │
│ │ 私有集群 │ │
│ │ (K8s) │ │
│ └───────┬───────┘ │
│ │ │
│ ┌──────────────────┼──────────────────┐ │
│ │ │ │ │
│ ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐ │
│ │ SKILL A │ │ SKILL B │ │ SKILL C │ │
│ │ 订单流分析 │ │ 波动率监控 │ │ 自定义信号 │ │
│ └────────────┘ └────────────┘ └────────────┘ │
└──────────────────────────────────────────────────────────────┘
私有集群由 TickDB 提供标准化的 K8s Operator,包含:
- 自动扩缩容:根据调用量动态调整 SKILL 实例数
- 滚动更新:SKILL 升级时零 downtime
- 资源隔离:不同 SKILL 之间的计算资源相互隔离
- 审计日志:所有 Function 调用记录到本地日志系统
完全私有化部署
对于顶级机构(如对冲基金的主经纪商、交易所技术团队),完全私有化部署提供最高等级的数据控制和定制能力:
# docker-compose.yml 典型配置
version: '3.8'
services:
tickdb-core:
image: tickdb/enterprise-core:latest
environment:
- LICENSE_KEY=${TICKDB_LICENSE}
- DATA_RETENTION_DAYS=3650
- REDIS_MODE=cluster
volumes:
- tickdb-data:/data
- tickdb-logs:/logs
tickdb-skill-runtime:
image: tickdb/enterprise-skill-runtime:latest
environment:
- CORE_ENDPOINT=tickdb-core:8080
- SKILL_REGISTRY=/registry
- WORKER_COUNT=16
volumes:
- ./custom-skills:/registry
- /var/run/docker.sock:/var/run/docker.sock
tickdb-cache:
image: redis:7-alpine
command: redis-server --maxmemory 32gb --maxmemory-policy allkeys-lru
完全私有化部署的特点:
- TickDB Core 完全运行在机构防火墙内
- 支持接入机构专有的数据源(如自有做市数据)
- SKILL Runtime 支持本地 Docker 镜像,无外部依赖
- 年度许可制,包含 SLA 保障和技术支持
TickDB SKILL 与替代方案能力对比
| 能力维度 | 自研胶水代码 | 开源框架(如 custom-llm-tools) | TickDB SKILL |
|---|---|---|---|
| 接入复杂度 | 高(需要自建连接管理) | 中(需适配协议) | 低(一键安装) |
| 业务逻辑复用 | 差(代码分散) | 中(需自行设计模块化) | 好(SKILL 可发布和复用) |
| 实时性 | 依赖实现质量 | 依赖实现质量 | 原生优化(<100ms) |
| 运维成本 | 高(自维护) | 中(社区支持) | 低(TickDB 托管) |
| 企业级特性 | 需自行实现(限频、重连、审计) | 部分支持 | 原生支持 |
| AI 助手集成 | 不支持 | 有限支持 | 原生支持(声明式 Function 定义) |
| 私有部署 | 无约束 | 需自行解决依赖 | 支持(私有集群/完全私有) |
下一步行动
如果你希望快速验证本文示例:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在控制台生成 API Key
- 设置环境变量
TICKDB_API_KEY,克隆示例代码仓库 - 运行
npx tickdb skill init初始化你的第一个 SKILL
如果你正在评估企业级解决方案:
- 联系 [email protected] 获取私有集群部署方案
- 预约技术架构师,了解 SKILL 协议与企业现有系统的集成路径
- 申请 30 天企业试用,包含完整的私有部署环境和现场支持
如果你希望在 AI 助手中直接使用:
- 在 AI 助手中搜索并安装
tickdb-market-dataSKILL - 通过自然语言描述你的监控需求,AI 将自动调用对应的 Function
风险提示:本文不构成任何投资建议。SKILL 的自定义逻辑需自行确保业务规则的有效性,建议在回测环境中充分验证后再投入生产使用。市场有风险,投资需谨慎。