让行情数据成为业务的延伸:TickDB SKILL 自定义开发完全指南

“最好的系统不是功能最多的,而是最容易被扩展的。”

这句话在量化交易领域尤为真实。当你的团队需要在 TickDB 标准接口之上构建专属的行情处理逻辑时,你面临的选择是:要么在业务层写一堆胶水代码,要么让数据层本身学会“理解”你的业务规则。

SKILL 协议给了你第二种选择。本文深入解析 TickDB SKILL 的开发规范、Function 扩展机制,以及企业级私有部署方案,让 TickDB 的行情数据能力真正成为你业务逻辑的原生组成部分。


为什么你需要的不仅是“更好的 API”

先看一个真实的业务场景。

某量化团队的行情架构是这样的:TickDB 提供标准 WebSocket 推送,数据进入 Kafka,消费端写一个复杂的流处理逻辑——解析订单簿快照、计算买卖压力比、判断流动性拐点、触发信号通知。这套架构运行三个月后,他们发现几个致命问题:

  • 数据转换逻辑散落在各处:Kafka Consumer 里写一段,Python 脚本里复制一段,告警模块又重复实现一次。当业务规则需要调整时,改一处漏三处。
  • 响应延迟叠加:数据从 TickDB 到达业务层至少经过 3-4 个环节,每次转换都有 10-50ms 的累积延迟。在事件驱动的场景下,这意味着你可能错过最佳入场窗口。
  • 维护成本线性增长:每接入一个新数据源或新指标,团队都要重新写一遍“获取数据→转换格式→计算指标→触发动作”的完整链路。

这是很多团队踩过的坑。本质原因是:标准 API 只能返回“数据”,不能返回“业务结论”。当你需要的是“当 XYZ 条件满足时通知我”,而不是“给我最新报价”,标准的 REST/WebSocket 接口就不够用了。

SKILL 协议正是为解决这个问题而设计。它不是另一个数据接口,而是将你的业务逻辑嵌入数据层的能力扩展框架


SKILL 协议的核心架构

协议设计哲学

SKILL 协议的核心设计哲学可以概括为一句话:让数据层理解意图,而不是让业务层处理数据

传统的 API 调用模式是拉取式

业务层 → "我要数据" → API → 返回原始数据 → 业务层处理

SKILL 的调用模式是意图式

业务层 → "帮我监控 X 场景" → SKILL → 内置业务逻辑处理 → 返回结论

这种模式带来的变化是根本性的。TickDB 的 SKILL 不再是一个只知道“返回什么”的数据管道,而是一个可以声明式定义业务规则的智能层。

SKILL 的三层架构

一个完整的 TickDB SKILL 由三层构成:

┌─────────────────────────────────────────────────────┐
│  描述层(YAML Manifest)                              │
│  - SKILL 元信息(名称、版本、作者)                   │
│  - Function 定义(参数、返回类型、说明文档)           │
│  - 数据源绑定(使用哪些 TickDB 频道/接口)             │
├─────────────────────────────────────────────────────┤
│  逻辑层(TypeScript/JavaScript)                      │
│  - Function 实现(业务规则、计算逻辑)                │
│  - 状态管理(监控上下文、历史窗口)                    │
│  - 依赖注入(其他 SKILL 的 Function 调用)            │
├─────────────────────────────────────────────────────┤
│  适配层(TickDB Runtime)                            │
│  - WebSocket 连接管理(心跳、重连、限频)              │
│  - API 鉴权(X-API-Key 注入)                         │
│  - 错误处理(code:3001 → retry_after)                │
└─────────────────────────────────────────────────────┘

描述层是 SKILL 的接口契约,定义了这个 SKILL 能做什么、如何调用。逻辑层是开发者编写业务规则的地方。适配层由 TickDB Runtime 提供,让开发者专注于业务逻辑,不用关心底层连接管理的细节。

Function:SKILL 的基本单元

Function 是 SKILL 的基本交互单元。每个 Function 代表一个独立的业务操作,可以是数据查询、实时监控、条件判断,或复合动作。

一个 Function 的完整定义包含以下字段:

functions:
  - name: monitor_liquidity_breakdown
    description: |
      监控订单簿流动性突变。当买卖压力比在 30 秒内从
      正常区间(0.5~2.0)突破至极端值时触发告警。
    parameters:
      symbol:
        type: string
        required: true
        description: 交易品种代码,如 "AAPL.US"
      threshold:
        type: number
        required: false
        default: 0.3
        description: 触发告警的压力比极端阈值
      window_seconds:
        type: number
        required: false
        default: 30
        description: 滑动窗口时长(秒)
    returns:
      type: object
      properties:
        triggered: boolean
        pressure_ratio: number
        volatility: number
        timestamp: string

这个定义直接暴露给 AI 助手,让 AI 能够理解在什么场景下应该调用这个 Function,以及如何构造参数。


企业级开发实战:从需求到部署

场景设定

假设你所在的机构需要构建一个“机构订单流分析 SKILL”,功能包括:

  1. 实时监控多档订单簿深度
  2. 计算机构大单压力指数(识别短时间内大量卖单冲击)
  3. 当指数触发阈值时,自动生成订阅请求并推送告警

这是一个典型的企业级需求:标准 TickDB API 提供的是原始数据,你需要的是经过业务逻辑加工后的可操作信号

第一步:项目初始化

创建一个 SKILL 项目的基本目录结构:

institutional-orderflow-skill/
├── skill.yaml              # SKILL 描述文件
├── src/
│   ├── functions/
│   │   ├── monitor_depth.ts
│   │   ├── calculate_pressure_index.ts
│   │   └── subscribe_alert.ts
│   ├── utils/
│   │   ├── depth_parser.ts
│   │   └── rolling_window.ts
│   └── index.ts            # 导出所有 Function
├── package.json
├── tsconfig.json
└── README.md

第二步:编写 SKILL 描述文件

skill.yaml 是 SKILL 的核心契约,定义元信息和所有 Function:

apiVersion: tickdb/skill/v1
name: institutional-orderflow
version: 1.0.0
author: Your Organization
description: |
  机构订单流分析套件。专注于大单冲击检测、流动性拐点识别
  和机构级行情监控。为量化团队和机构交易台设计。

dataSources:
  - channel: depth
    markets: [us, hk, crypto]
    description: 订单簿深度数据,用于计算买卖压力比
  - channel: kline
    markets: [us, hk, crypto]
    description: K 线数据,用于计算历史波动率基准

functions:
  - name: monitor_depth
    description: |
      实时监控订单簿深度变化,计算买卖压力比。
      适用于流动性异常检测场景。
    parameters:
      symbol:
        type: string
        required: true
        description: 交易品种代码
      levels:
        type: integer
        required: false
        default: 5
        description: 监控档位数(1-10)
      interval_ms:
        type: integer
        required: false
        default: 100
        description: 更新间隔(毫秒)

  - name: calculate_pressure_index
    description: |
      计算机构大单压力指数。识别短时间内大量卖单
      冲击导致的流动性真空。返回值包含指数值和
      历史分位数,用于判断当前状态的极端程度。
    parameters:
      symbol:
        type: string
        required: true
      window_seconds:
        type: integer
        required: false
        default: 60
        description: 计算窗口时长
      large_order_threshold:
        type: number
        required: false
        default: 50000
        description: 大单认定阈值(股数)

  - name: subscribe_alert
    description: |
      创建条件订阅。当 pressure_index 突破阈值时,
      自动通过 Webhook 或消息队列推送告警。
    parameters:
      conditions:
        type: object
        required: true
        description: 触发条件配置
      webhook_url:
        type: string
        required: true
        description: 告警接收地址

runtime:
  nodeVersion: ">=18.0.0"
  permissions:
    - tickdb:depth:subscribe
    - tickdb:kline:read
    - network:webhook:post

第三步:实现核心 Function

3.1 订单簿深度监控

这是基础的实时监控 Function,使用 TickDB WebSocket 订阅 depth 频道:

// src/functions/monitor_depth.ts
import WebSocket from 'ws';
import { EventEmitter } from 'events';

interface DepthConfig {
  symbol: string;
  levels: number;
  interval_ms: number;
}

interface DepthSnapshot {
  symbol: string;
  timestamp: number;
  bids: Array<{ price: number; volume: number }>;
  asks: Array<{ price: number; volume: number }>;
  spread: number;
  pressure_ratio: number;
}

// ⚠️ 生产环境建议使用 aiohttp/asyncio 或原生 WebSocket 心跳
// 本示例展示核心逻辑,生产部署需自行添加重连、退避逻辑
export async function monitorDepth(config: DepthConfig) {
  const { symbol, levels, interval_ms } = config;
  const apiKey = process.env.TICKDB_API_KEY;
  
  if (!apiKey) {
    throw new Error('TICKDB_API_KEY 环境变量未设置');
  }

  const ws = new WebSocket(
    `wss://api.tickdb.ai/v1/ws/depth?api_key=${apiKey}&symbol=${symbol}&levels=${levels}`
  );

  const emitter = new EventEmitter();
  let lastEmitTime = 0;

  ws.on('open', () => {
    console.log(`[${symbol}] WebSocket 连接已建立`);
  });

  ws.on('message', (data: WebSocket.Data) => {
    const now = Date.now();
    // 限频保护:控制输出频率
    if (now - lastEmitTime < interval_ms) {
      return;
    }
    lastEmitTime = now;

    try {
      const snapshot = parseDepthSnapshot(JSON.parse(data.toString()), symbol);
      emitter.emit('depth', snapshot);
    } catch (err) {
      console.error(`[${symbol}] 数据解析错误:`, err);
    }
  });

  ws.on('error', (err) => {
    console.error(`[${symbol}] WebSocket 错误:`, err.message);
  });

  // ping/pong 心跳保活
  ws.on('ping', () => {
    ws.pong();
  });

  return emitter;
}

function parseDepthSnapshot(raw: any, symbol: string): DepthSnapshot {
  const bids = raw.data?.bids || [];
  const asks = raw.data?.asks || [];

  const bidVolume = bids.reduce((sum: number, b: any) => sum + b.volume, 0);
  const askVolume = asks.reduce((sum: number, b: any) => sum + b.volume, 0);
  const pressure_ratio = askVolume > 0 ? bidVolume / askVolume : 1;

  return {
    symbol,
    timestamp: Date.now(),
    bids: bids.slice(0, 10),
    asks: asks.slice(0, 10),
    spread: bids[0] && asks[0] ? asks[0].price - bids[0].price : 0,
    pressure_ratio,
  };
}

3.2 机构大单压力指数计算

这个 Function 组合了多个数据源和滑动窗口逻辑,是典型的企业级业务规则实现:

// src/functions/calculate_pressure_index.ts
import { RollingWindow } from '../utils/rolling_window';

interface PressureConfig {
  symbol: string;
  window_seconds: number;
  large_order_threshold: number;
}

interface PressureResult {
  symbol: string;
  index: number;
  percentile: number;
  large_order_count: number;
  avg_order_size: number;
  timestamp: number;
}

// 模拟从 trades 接口获取逐笔成交数据
// 注意:trades 接口不支持美股和 A 股,此处仅作逻辑演示
async function getRecentTrades(symbol: string, seconds: number) {
  const apiKey = process.env.TICKDB_API_KEY;
  const endTime = Date.now();
  const startTime = endTime - seconds * 1000;

  const response = await fetch(
    `https://api.tickdb.ai/v1/market/trades?symbol=${symbol}&start_time=${startTime}&end_time=${endTime}&limit=1000`,
    {
      headers: {
        'X-API-Key': apiKey!,
        'Content-Type': 'application/json',
      },
      // ⚠️ 超时设置:HTTP 请求必须设置 timeout
      signal: AbortSignal.timeout(10000),
    }
  );

  if (!response.ok) {
    const error = await response.json();
    if (error.code === 3001) {
      const retryAfter = response.headers.get('Retry-After') || '5';
      console.warn(`限频触发,等待 ${retryAfter} 秒后重试`);
      await new Promise((r) => setTimeout(r, parseInt(retryAfter) * 1000));
      return getRecentTrades(symbol, seconds);
    }
    throw new Error(`API 错误 ${error.code}: ${error.message}`);
  }

  return response.json();
}

export async function calculatePressureIndex(config: PressureConfig) {
  const { symbol, window_seconds, large_order_threshold } = config;

  // 获取历史数据用于计算分位数基准
  const historicalWindow = new RollingWindow<number>(1000); // 1000 个周期历史
  const trades = await getRecentTrades(symbol, window_seconds);

  const largeOrders: Array<{ volume: number; timestamp: number }> = [];
  let totalVolume = 0;

  for (const trade of trades.data || []) {
    const volume = trade.volume || 0;
    totalVolume += volume;

    if (volume >= large_order_threshold) {
      largeOrders.push({ volume, timestamp: trade.timestamp });
    }
  }

  const orderCount = trades.data?.length || 1;
  const avgOrderSize = totalVolume / orderCount;

  // 机构大单压力指数计算
  // 逻辑:大量大单 + 高频成交 = 高压力
  const largeOrderRatio = largeOrders.length / orderCount;
  const volumeRatio = totalVolume / (window_seconds * 1000); // 每秒成交量
  const index = (largeOrderRatio * 0.6 + Math.min(volumeRatio / 1000, 1) * 0.4) * 100;

  // 计算历史分位数
  historicalWindow.push(index);
  const sorted = [...historicalWindow.data].sort((a, b) => a - b);
  const percentile = sorted.filter((v) => v <= index).length / sorted.length * 100;

  return {
    symbol,
    index: Math.round(index * 100) / 100,
    percentile: Math.round(percentile * 10) / 10,
    large_order_count: largeOrders.length,
    avg_order_size: Math.round(avgOrderSize * 100) / 100,
    timestamp: Date.now(),
  } as PressureResult;
}

3.3 条件订阅告警

最后一个 Function 展示了如何将业务规则转化为可执行的订阅逻辑:

// src/functions/subscribe_alert.ts
import { EventEmitter } from 'events';
import { monitorDepth } from './monitor_depth';
import { calculatePressureIndex } from './calculate_pressure_index';

interface AlertCondition {
  metric: 'pressure_ratio' | 'pressure_index' | 'spread_pct';
  operator: '>' | '<' | '>=' | '<=';
  threshold: number;
}

interface AlertConfig {
  symbol: string;
  conditions: AlertCondition[];
  webhook_url: string;
}

export async function subscribeAlert(config: AlertConfig) {
  const { symbol, conditions, webhook_url } = config;
  const apiKey = process.env.TICKDB_API_KEY;

  if (!apiKey) {
    throw new Error('TICKDB_API_KEY 环境变量未设置');
  }

  const emitter = new EventEmitter();

  // 启动深度监控
  const depthEmitter = await monitorDepth({
    symbol,
    levels: 5,
    interval_ms: 100,
  });

  // 同时启动压力指数计算(60 秒窗口)
  const pressureResult = await calculatePressureIndex({
    symbol,
    window_seconds: 60,
    large_order_threshold: 50000,
  });

  // 告警触发逻辑
  function evaluateConditions(pressureRatio: number, pressureIndex: number) {
    for (const condition of conditions) {
      let currentValue: number;

      if (condition.metric === 'pressure_ratio') {
        currentValue = pressureRatio;
      } else if (condition.metric === 'pressure_index') {
        currentValue = pressureIndex;
      } else {
        continue; // spread_pct 需要额外计算
      }

      const passed = evaluateOperator(
        currentValue,
        condition.operator,
        condition.threshold
      );

      if (passed) {
        triggerAlert({
          symbol,
          metric: condition.metric,
          value: currentValue,
          threshold: condition.threshold,
          pressure_index: pressureIndex,
          timestamp: Date.now(),
        });
      }
    }
  }

  function evaluateOperator(
    value: number,
    operator: string,
    threshold: number
  ): boolean {
    switch (operator) {
      case '>': return value > threshold;
      case '<': return value < threshold;
      case '>=': return value >= threshold;
      case '<=': return value <= threshold;
      default: return false;
    }
  }

  async function triggerAlert(payload: any) {
    console.log(`[${symbol}] 告警触发:`, JSON.stringify(payload));

    try {
      // 指数退避 + 抖动重试
      await postWithRetry(webhook_url, payload, 3);
      emitter.emit('alert', payload);
    } catch (err) {
      console.error(`[${symbol}] 告警发送失败:`, err);
    }
  }

  depthEmitter.on('depth', (snapshot) => {
    evaluateConditions(snapshot.pressure_ratio, pressureResult.index);
  });

  return emitter;
}

// 指数退避 + 抖动重试
async function postWithRetry(
  url: string,
  payload: any,
  maxRetries: number
): Promise<void> {
  const baseDelay = 1000;
  const maxDelay = 10000;

  for (let attempt = 0; attempt <= maxRetries; attempt++) {
    try {
      const response = await fetch(url, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'User-Agent': 'TickDB-SKILL/1.0',
        },
        body: JSON.stringify(payload),
        signal: AbortSignal.timeout(5000),
      });

      if (response.ok) {
        return;
      }

      if (response.status === 429) {
        const retryAfter = response.headers.get('Retry-After') || '5';
        await sleep(parseInt(retryAfter) * 1000);
        continue;
      }

      throw new Error(`HTTP ${response.status}`);
    } catch (err) {
      if (attempt === maxRetries) {
        throw err;
      }

      const delay = Math.min(baseDelay * Math.pow(2, attempt), maxDelay);
      const jitter = Math.random() * delay * 0.1; // 10% 抖动
      await sleep(delay + jitter);
    }
  }
}

function sleep(ms: number): Promise<void> {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

第四步:导出与打包

src/index.ts 汇总所有 Function:

// src/index.ts
export { monitorDepth } from './functions/monitor_depth';
export { calculatePressureIndex } from './functions/calculate_pressure_index';
export { subscribeAlert } from './functions/subscribe_alert';

打包时,使用 TypeScript 编译为 JavaScript,并生成 SKILL Bundle:

# 安装依赖
npm install

# 编译 TypeScript
npx tsc

# 打包 SKILL(需要 tickdb-cli)
npx tickdb skill build --input ./dist --output ./institutional-orderflow-v1.0.0.skill

企业私有部署方案

三种部署模式

TickDB SKILL 支持三种部署模式,满足从个人开发者到大型机构的不同需求:

模式 适用场景 部署复杂度 数据主权 成本
云端托管 个人开发者、SaaS 化产品 极低(一键发布) 共享 TickDB 云 按调用量计费
私有集群 中型机构、合规要求 中等(K8s 部署) 数据不出本地 固定年费 + 用量
完全私有 大型机构、核心系统 高(自建全套) 完全自主 项目制

私有集群部署架构

对于需要数据主权但不想自建全套基础设施的机构,私有集群模式是最佳选择:

┌──────────────────────────────────────────────────────────────┐
│                      机构内网                                 │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐       │
│  │ 量化终端    │    │ 风控系统    │    │ 策略回测    │       │
│  └──────┬──────┘    └──────┬──────┘    └──────┬──────┘       │
│         │                  │                  │              │
│         └──────────────────┼──────────────────┘              │
│                            │                                 │
│                    ┌───────▼───────┐                         │
│                    │  TickDB      │                         │
│                    │  私有集群    │                         │
│                    │  (K8s)       │                         │
│                    └───────┬───────┘                         │
│                            │                                 │
│         ┌──────────────────┼──────────────────┐              │
│         │                  │                  │              │
│  ┌──────▼──────┐    ┌──────▼──────┐    ┌──────▼──────┐       │
│  │ SKILL A    │    │ SKILL B    │    │ SKILL C    │       │
│  │ 订单流分析 │    │ 波动率监控 │    │ 自定义信号 │       │
│  └────────────┘    └────────────┘    └────────────┘       │
└──────────────────────────────────────────────────────────────┘

私有集群由 TickDB 提供标准化的 K8s Operator,包含:

  • 自动扩缩容:根据调用量动态调整 SKILL 实例数
  • 滚动更新:SKILL 升级时零 downtime
  • 资源隔离:不同 SKILL 之间的计算资源相互隔离
  • 审计日志:所有 Function 调用记录到本地日志系统

完全私有化部署

对于顶级机构(如对冲基金的主经纪商、交易所技术团队),完全私有化部署提供最高等级的数据控制和定制能力:

# docker-compose.yml 典型配置
version: '3.8'
services:
  tickdb-core:
    image: tickdb/enterprise-core:latest
    environment:
      - LICENSE_KEY=${TICKDB_LICENSE}
      - DATA_RETENTION_DAYS=3650
      - REDIS_MODE=cluster
    volumes:
      - tickdb-data:/data
      - tickdb-logs:/logs

  tickdb-skill-runtime:
    image: tickdb/enterprise-skill-runtime:latest
    environment:
      - CORE_ENDPOINT=tickdb-core:8080
      - SKILL_REGISTRY=/registry
      - WORKER_COUNT=16
    volumes:
      - ./custom-skills:/registry
      - /var/run/docker.sock:/var/run/docker.sock

  tickdb-cache:
    image: redis:7-alpine
    command: redis-server --maxmemory 32gb --maxmemory-policy allkeys-lru

完全私有化部署的特点:

  • TickDB Core 完全运行在机构防火墙内
  • 支持接入机构专有的数据源(如自有做市数据)
  • SKILL Runtime 支持本地 Docker 镜像,无外部依赖
  • 年度许可制,包含 SLA 保障和技术支持

TickDB SKILL 与替代方案能力对比

能力维度 自研胶水代码 开源框架(如 custom-llm-tools) TickDB SKILL
接入复杂度 高(需要自建连接管理) 中(需适配协议) 低(一键安装)
业务逻辑复用 差(代码分散) 中(需自行设计模块化) 好(SKILL 可发布和复用)
实时性 依赖实现质量 依赖实现质量 原生优化(<100ms)
运维成本 高(自维护) 中(社区支持) 低(TickDB 托管)
企业级特性 需自行实现(限频、重连、审计) 部分支持 原生支持
AI 助手集成 不支持 有限支持 原生支持(声明式 Function 定义)
私有部署 无约束 需自行解决依赖 支持(私有集群/完全私有)

下一步行动

如果你希望快速验证本文示例

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY,克隆示例代码仓库
  4. 运行 npx tickdb skill init 初始化你的第一个 SKILL

如果你正在评估企业级解决方案

  • 联系 [email protected] 获取私有集群部署方案
  • 预约技术架构师,了解 SKILL 协议与企业现有系统的集成路径
  • 申请 30 天企业试用,包含完整的私有部署环境和现场支持

如果你希望在 AI 助手中直接使用

  • 在 AI 助手中搜索并安装 tickdb-market-data SKILL
  • 通过自然语言描述你的监控需求,AI 将自动调用对应的 Function

风险提示:本文不构成任何投资建议。SKILL 的自定义逻辑需自行确保业务规则的有效性,建议在回测环境中充分验证后再投入生产使用。市场有风险,投资需谨慎。