企业级量化数据能力扩展:从数据孤岛到统一行情平台
“你团队的行情数据,有多少个'烟囱'?”
每家量化机构都有自己的数据故事。有的团队用 Polygon 做美股,用 Binance 接入加密货币,用自有爬虫抓港股 level 2——三个月后,他们的行情系统里躺着 4 个 API 密钥、3 套鉴权逻辑、2 种数据格式,和无数个凌晨 3 点因为某个数据源宕机而被叫醒的夜晚。
这不是能力问题,是架构问题。
当团队从单市场、单策略扩展到多资产、跨时区、7×24 小时运行时,数据层的复杂度会以指数级增长。一个合格的行情平台,应该让上层应用感知不到底层数据源的差异——无论数据来自哪里,最终呈现给策略的是一个统一、可靠、可扩展的抽象层。
这正是 TickDB SKILL 协议设计的核心目标。本文深入拆解 SKILL 生态的扩展机制,从协议规范到代码实现,帮助技术负责人理解如何在 TickDB 基础上构建企业级的专属行情能力。
一、为什么量化团队需要 SKILL 扩展
1.1 数据孤岛的典型症状
在引入 SKILL 扩展之前,很多量化团队的数据架构呈现以下特征:
| 症状 | 具体表现 |
|---|---|
| 接口碎片化 | 每个数据源一套 SDK,代码库中散落着 polygon_client、binance_connector、tushare_wrapper |
| 鉴权复杂度 | API Key 分散在环境变量、配置文件、甚至代码注释里 |
| 容错孤岛 | 每个数据源有自己的重连逻辑,但彼此不兼容 |
| 数据格式不统一 | 时间戳有时区差异,价格有精度差异,订单簿有档位差异 |
| 扩展成本高 | 每接入一个新数据源,需要重复开发连接、鉴权、容错模块 |
这些症状的根本原因不是技术能力不足,而是缺乏统一的抽象层。
1.2 SKILL 扩展的解决思路
SKILL 协议的本质是一个标准化的扩展契约。它定义了三个核心能力:
| 能力 | 说明 |
|---|---|
| Function 扩展 | 自定义业务函数,注册到 AI 助手的工具链中 |
| 数据源绑定 | 将私有数据源或第三方 API 封装为标准化的数据获取接口 |
| 执行上下文 | 在安全的沙箱环境中执行用户代码,可访问 TickDB 核心能力 |
通过 SKILL 扩展,团队可以将:
- 自有历史数据库封装为
get_historical_data - 私募数据供应商接入封装为
get_alternative_data - 内部因子库封装为
apply_custom_factors - 跨市场数据清洗逻辑封装为
normalize_market_data
最终,AI 助手在执行任务时,会自动选择合适的 SKILL Function,就像调用本地函数一样自然。
二、TickDB SKILL 协议架构解析
2.1 协议核心组件
一个完整的 SKILL 包由以下组件构成:
my-custom-skill/
├── manifest.json # SKILL 元数据定义
├── functions/ # Function 扩展目录
│ ├── __init__.py
│ ├── get_equity_depth.py
│ ├── apply_factor.py
│ └── normalize_data.py
├── handlers/ # 事件处理(可选)
│ └── on_market_event.py
├── config.yaml # SKILL 私有配置
├── requirements.txt # 依赖声明
└── README.md # 使用文档
2.2 manifest.json:扩展的契约定义
manifest.json 是 SKILL 的核心配置文件,AI 助手通过它理解这个 SKILL 能做什么、接受什么参数、返回什么格式。
{
"name": "enterprise-market-data",
"version": "1.0.0",
"description": "企业级行情数据扩展:支持私有数据源、跨市场归一化、自定义因子",
"author": "QuantTeam-Alpha",
"tickdb_version": ">=2.0.0",
"capabilities": {
"functions": [
{
"name": "get_equity_depth",
"description": "获取美股/港股订单簿深度数据(支持私有数据源兜底)",
"parameters": {
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "标的代码,如 AAPL.US、0700.HK"
},
"depth": {
"type": "integer",
"description": "深度档位,默认 10",
"default": 10
},
"fallback_to_private": {
"type": "boolean",
"description": "TickDB 数据不可用时是否查询私有数据源",
"default": true
}
},
"required": ["symbol"]
},
"returns": {
"type": "object",
"description": "订单簿深度快照,包含买卖盘各档数据"
}
},
{
"name": "apply_custom_factors",
"description": "应用团队自定义因子库对行情数据进行处理",
"parameters": {
"type": "object",
"properties": {
"data": {
"type": "object",
"description": "原始行情数据"
},
"factors": {
"type": "array",
"items": {"type": "string"},
"description": "因子名称列表,如 [vwap_alpha, order_imbalance]"
}
},
"required": ["data", "factors"]
}
}
]
},
"config_schema": {
"private_data_endpoint": {
"type": "string",
"description": "私有数据源 API 地址",
"required": true
},
"api_key": {
"type": "string",
"description": "私有数据源鉴权密钥(加密存储)",
"required": true,
"sensitive": true
},
"default_timeout": {
"type": "integer",
"description": "请求超时时间(秒)",
"default": 30
}
},
"permissions": [
"tickdb:read:kline",
"tickdb:read:depth",
"http:request",
"env:read"
]
}
这份 manifest 的关键设计点:
| 设计点 | 说明 |
|---|---|
capabilities.functions |
声明 SKILL 提供的所有 Function,AI 助手会根据描述自动路由调用 |
config_schema |
定义企业私有配置项,支持敏感字段加密存储 |
permissions |
SKILL 运行时的权限边界,遵循最小权限原则 |
tickdb_version |
声明依赖的 TickDB 版本,确保兼容性 |
2.3 Function 注册与执行流程
用户请求
↓
AI 助手解析意图,匹配 Function
↓
加载 SKILL manifest,验证权限
↓
创建执行上下文(注入配置、Token)
↓
执行 Function 代码
↓
返回标准化结果给 AI 助手
这个流程对企业有两个关键意义:
- 权限可控:每个 SKILL 的权限边界在 manifest 中明确定义,不会出现越权访问
- 上下文隔离:私有配置(如 API Key)在 SKILL 沙箱内运行,不会泄露到 AI 助手的全局上下文
三、生产级 Function 开发实战
3.1 环境准备与项目初始化
# 创建 SKILL 项目目录
mkdir -p enterprise-market-data/{functions,handlers,tests}
cd enterprise-market-data
# 创建虚拟环境(推荐)
python -m venv .venv
source .venv/bin/activate
# 安装依赖
cat > requirements.txt << 'EOF'
tickdb>=2.0.0
requests>=2.31.0
pydantic>=2.0.0
python-dotenv>=1.0.0
EOF
pip install -r requirements.txt
3.2 核心 Function:私有数据源兜底 + TickDB 优先
以下是一个典型的企业级 Function 实现:当 TickDB 数据不可用时,自动切换到私有数据源。
# functions/get_equity_depth.py
"""
企业级订单簿深度获取 Function
策略:TickDB 优先 → 私有数据源兜底 → 降级告警
"""
import os
import time
import logging
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
import requests
import tickdb
# 标准 SKILL Function 装饰器
from tickdb.skill import register_function, ExecutionContext
logger = logging.getLogger(__name__)
@dataclass
class DepthLevel:
"""订单簿单档数据"""
price: float
volume: int
order_count: int = 0
@dataclass
class DepthSnapshot:
"""标准化深度快照"""
symbol: str
timestamp: int
bids: List[DepthLevel] # 买盘(按价格降序)
asks: List[DepthLevel] # 卖盘(按价格升序)
source: str # "tickdb" / "private" / "fallback"
def pressure_ratio(self, levels: int = 5) -> float:
"""计算买卖压力比(前 N 档)"""
bid_vol = sum(b.volume for b in self.bids[:levels])
ask_vol = sum(a.volume for a in self.asks[:levels])
return bid_vol / ask_vol if ask_vol > 0 else 0.0
class PrivateDataSourceError(Exception):
"""私有数据源异常"""
pass
def get_private_depth(
endpoint: str,
api_key: str,
symbol: str,
depth: int,
timeout: int
) -> Optional[Dict[str, Any]]:
"""
从私有数据源获取订单簿深度
Args:
endpoint: 私有 API 地址
api_key: 鉴权密钥
symbol: 标的代码
depth: 深度档位
timeout: 超时时间(秒)
Returns:
原始数据字典,失败返回 None
"""
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"symbol": symbol,
"depth": depth,
"fields": ["price", "volume", "order_count"]
}
try:
response = requests.post(
f"{endpoint}/v1/depth/snapshot",
headers=headers,
json=payload,
timeout=(3.05, timeout) # ⚠️ 必须设置超时
)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
logger.error(f"私有数据源超时: {symbol} @ {endpoint}")
raise PrivateDataSourceError(f"请求超时 {timeout}s")
except requests.exceptions.RequestException as e:
logger.error(f"私有数据源请求失败: {e}")
raise PrivateDataSourceError(str(e))
def normalize_private_data(raw: Dict[str, Any], symbol: str) -> DepthSnapshot:
"""将私有数据源响应转换为标准格式"""
# 不同供应商的字段名可能有差异,统一归一化
bids_raw = raw.get("bids", []) or raw.get("buy", []) or []
asks_raw = raw.get("asks", []) or raw.get("sell", []) or []
bids = [
DepthLevel(
price=float(b.get("price", 0)),
volume=int(b.get("volume", 0)),
order_count=int(b.get("order_count", 0))
)
for b in bids_raw[:10]
]
asks = [
DepthLevel(
price=float(a.get("price", 0)),
volume=int(a.get("volume", 0)),
order_count=int(a.get("order_count", 0))
)
for a in asks_raw[:10]
]
return DepthSnapshot(
symbol=symbol,
timestamp=int(time.time() * 1000),
bids=bids,
asks=asks,
source="private"
)
@register_function(
name="get_equity_depth",
description="获取股票订单簿深度数据,支持多数据源兜底"
)
def get_equity_depth(
context: ExecutionContext,
symbol: str,
depth: int = 10,
fallback_to_private: bool = True
) -> Dict[str, Any]:
"""
获取股票订单簿深度
策略:
1. 优先从 TickDB 获取(实时性好)
2. 若 TickDB 失败且 fallback_to_private=True,尝试私有数据源
3. 记录数据来源,便于后续审计
Args:
context: SKILL 执行上下文(包含配置、TickDB 客户端)
symbol: 标的代码,如 AAPL.US
depth: 深度档位,默认 10
fallback_to_private: 是否启用私有数据源兜底
Returns:
标准化深度快照(包含 pressure_ratio 等衍生指标)
"""
# ===== 第一层:TickDB 优先 =====
try:
tickdb_client = context.tickdb
snapshot = tickdb_client.get_depth_snapshot(
symbol=symbol,
depth=depth
)
# 转换为标准格式
result = DepthSnapshot(
symbol=symbol,
timestamp=int(time.time() * 1000),
bids=[DepthLevel(price=b['price'], volume=b['volume']) for b in snapshot.get('bids', [])],
asks=[DepthLevel(price=a['price'], volume=a['volume']) for a in snapshot.get('asks', [])],
source="tickdb"
)
logger.info(f"TickDB 数据获取成功: {symbol}")
return {
"success": True,
"data": {
"symbol": result.symbol,
"timestamp": result.timestamp,
"source": result.source,
"bids": [{"price": b.price, "volume": b.volume} for b in result.bids],
"asks": [{"price": a.price, "volume": a.volume} for a in result.asks],
"pressure_ratio_5": result.pressure_ratio(5),
"pressure_ratio_10": result.pressure_ratio(10)
}
}
except tickdb.exceptions.SymbolNotFoundError:
logger.warning(f"TickDB 不支持该标的: {symbol}")
if not fallback_to_private:
return {"success": False, "error": f"标的 {symbol} 不被支持"}
except tickdb.exceptions.RateLimitError as e:
logger.warning(f"TickDB 限频触发: {e}")
# ⚠️ 限频时直接降级,不立即重试(避免触发更严格的限流)
except Exception as e:
logger.error(f"TickDB 数据获取异常: {e}")
# ===== 第二层:私有数据源兜底 =====
if not fallback_to_private:
return {"success": False, "error": "TickDB 数据不可用"}
config = context.config
private_endpoint = config.get("private_data_endpoint")
private_key = config.get("api_key")
if not private_endpoint or not private_key:
logger.error("私有数据源配置缺失")
return {
"success": False,
"error": "TickDB 数据不可用,且未配置私有数据源"
}
try:
timeout = config.get("default_timeout", 30)
raw = get_private_depth(
endpoint=private_endpoint,
api_key=private_key,
symbol=symbol,
depth=depth,
timeout=timeout
)
result = normalize_private_data(raw, symbol)
logger.info(f"私有数据源兜底成功: {symbol}")
return {
"success": True,
"data": {
"symbol": result.symbol,
"timestamp": result.timestamp,
"source": result.source,
"bids": [{"price": b.price, "volume": b.volume} for b in result.bids],
"asks": [{"price": a.price, "volume": a.volume} for a in result.asks],
"pressure_ratio_5": result.pressure_ratio(5),
"pressure_ratio_10": result.pressure_ratio(10)
}
}
except PrivateDataSourceError as e:
logger.error(f"私有数据源彻底失败: {e}")
return {
"success": False,
"error": f"所有数据源均不可用: {str(e)}",
"fallback_attempted": True
}
# ===== 兜底:降级响应 =====
return {
"success": False,
"error": "数据获取失败",
"symbol": symbol,
"fallback_attempted": fallback_to_private
}
代码质量要点:
| 要点 | 实现方式 |
|---|---|
| 心跳与超时 | HTTP 请求设置 (3.05, timeout) 双超时 |
| 限频处理 | 识别 TickDB 限频异常,直接降级而非立即重试 |
| 降级链路 | TickDB → 私有数据源 → 降级告警,三层保障 |
| 数据归一化 | 统一输出格式,附带衍生指标(pressure_ratio) |
| 敏感信息 | 从 context.config 注入,不硬编码在代码中 |
| 工程预警 | 代码注释标注 ⚠️ 提醒生产环境注意事项 |
3.3 因子扩展 Function
# functions/apply_factor.py
"""
自定义因子库 Function
演示如何封装团队内部因子为可复用的 SKILL Function
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Any
from datetime import datetime, timedelta
from tickdb.skill import register_function, ExecutionContext
# ===== 因子定义 =====
class FactorLibrary:
"""企业内部因子库(示例)"""
@staticmethod
def vwap_alpha(klines: List[Dict]) -> float:
"""
VWAP Alpha:当前价格相对于日内 VWAP 的偏离度
逻辑:偏离度 > 0 表示当前价在 VWAP 上方(偏强)
"""
df = pd.DataFrame(klines)
df['typical_price'] = (df['high'] + df['low'] + df['close']) / 3
df['volume_price'] = df['typical_price'] * df['volume']
vwap = df['volume_price'].sum() / df['volume'].sum()
current_price = klines[-1]['close'] if klines else 0
return (current_price - vwap) / vwap if vwap > 0 else 0
@staticmethod
def order_imbalance_ratio(depth: Dict) -> float:
"""
订单簿失衡比
逻辑:买盘总量 / 卖盘总量,> 1 表示买压更强
"""
bid_vol = sum(b.get('volume', 0) for b in depth.get('bids', []))
ask_vol = sum(a.get('volume', 0) for a in depth.get('asks', []))
return bid_vol / ask_vol if ask_vol > 0 else 1.0
@staticmethod
def momentum_5min(klines: List[Dict]) -> float:
"""
5 分钟动量:最近 5 分钟收益率年化
"""
if len(klines) < 2:
return 0.0
start_price = klines[0]['close']
end_price = klines[-1]['close']
if start_price == 0:
return 0.0
raw_return = (end_price - start_price) / start_price
# 年化系数(假设 390 个 5 分钟交易周期)
annualization_factor = np.sqrt(390 * 252) # 波动率年化
return raw_return * annualization_factor
@staticmethod
def spreadWidening(depth_before: Dict, depth_after: Dict) -> float:
"""
价差扩大因子:事件前后期差变化
逻辑:价差扩大通常意味着流动性紧张
"""
def calc_spread(depth: Dict) -> float:
if not depth.get('bids') or not depth.get('asks'):
return 0.0
best_bid = depth['bids'][0]['price']
best_ask = depth['asks'][0]['price']
return (best_ask - best_bid) / best_bid if best_bid > 0 else 0.0
spread_before = calc_spread(depth_before)
spread_after = calc_spread(depth_after)
return (spread_after - spread_before) / spread_before if spread_before > 0 else 0.0
# ===== Function 注册 =====
AVAILABLE_FACTORS = {
"vwap_alpha": FactorLibrary.vwap_alpha,
"order_imbalance": FactorLibrary.order_imbalance_ratio,
"momentum_5min": FactorLibrary.momentum_5min,
"spread_widening": FactorLibrary.spreadWidening,
}
@register_function(
name="apply_custom_factors",
description="对企业行情数据应用自定义因子库"
)
def apply_custom_factors(
context: ExecutionContext,
data: Dict[str, Any],
factors: List[str]
) -> Dict[str, Any]:
"""
应用自定义因子
Args:
context: 执行上下文
data: 行情数据(支持 klines、depth、或两者的组合)
factors: 要计算的因子名称列表
Returns:
包含各因子计算结果的字典
"""
results = {
"computed_at": datetime.utcnow().isoformat(),
"factors": {}
}
# 检查因子是否在可用列表中
unavailable = [f for f in factors if f not in AVAILABLE_FACTORS]
if unavailable:
results["warnings"] = {
"unavailable_factors": unavailable,
"hint": f"可用因子: {list(AVAILABLE_FACTORS.keys())}"
}
# 计算可用因子
for factor_name in factors:
if factor_name not in AVAILABLE_FACTORS:
continue
try:
factor_func = AVAILABLE_FACTORS[factor_name]
# 根据因子类型提取所需数据
if factor_name == "spread_widening":
# spread_widening 需要两个 depth 快照
before_data = data.get("depth_before", {})
after_data = data.get("depth_after", {})
value = factor_func(before_data, after_data)
elif "depth" in factor_name or "imbalance" in factor_name:
value = factor_func(data.get("depth", {}))
else:
value = factor_func(data.get("klines", []))
results["factors"][factor_name] = round(value, 6)
except Exception as e:
results["factors"][factor_name] = {
"error": str(e),
"status": "failed"
}
# 组合信号(若计算了多个因子)
if len(results["factors"]) > 1:
valid_factors = {
k: v for k, v in results["factors"].items()
if isinstance(v, (int, float))
}
if valid_factors:
# 等权平均组合
results["composite_signal"] = round(
sum(valid_factors.values()) / len(valid_factors), 6
)
return results
3.4 跨市场数据归一化 Function
# functions/normalize_market_data.py
"""
跨市场数据归一化 Function
解决不同市场数据格式不一致的问题
"""
from typing import Dict, Any, List
from datetime import datetime
from dataclasses import dataclass
import pytz
from tickdb.skill import register_function, ExecutionContext
@dataclass
class NormalizedKline:
"""归一化 K 线数据"""
symbol: str
exchange: str # US / HK / CN / CRYPTO
timestamp_utc: int # Unix ms
open: float
high: float
low: float
close: float
volume: int
currency: str # USD / HKD / CNY
@dataclass
class NormalizedDepth:
"""归一化订单簿数据"""
symbol: str
exchange: str
timestamp_utc: int
bids: List[Dict[str, Any]] # [{price, volume}]
asks: List[Dict[str, Any]]
precision: int # 价格小数位数
class MarketNormalizer:
"""市场数据归一化器"""
# 各市场的时区和交易时段
TIMEZONES = {
"US": "America/New_York",
"HK": "Asia/Hong_Kong",
"CN": "Asia/Shanghai",
"CRYPTO": "UTC"
}
# TickDB 标的格式映射
SYMBOL_PATTERNS = {
r"^[A-Z]+\.US$": {"exchange": "US", "currency": "USD"},
r"^[0-9]+\.HK$": {"exchange": "HK", "currency": "HKD"},
r"^[A-Z]+USDT?$": {"exchange": "CRYPTO", "currency": "USDT"},
r"^[A-Z]+CNY$": {"exchange": "CN", "currency": "CNY"}
}
@classmethod
def parse_symbol(cls, symbol: str) -> Dict[str, str]:
"""解析标的代码,返回市场信息"""
for pattern, info in cls.SYMBOL_PATTERNS.items():
import re
if re.match(pattern, symbol):
return {**info, "symbol": symbol}
return {"exchange": "UNKNOWN", "currency": "USD", "symbol": symbol}
@classmethod
def to_utc_timestamp(cls, dt: datetime, exchange: str) -> int:
"""将各市场时间转换为 UTC 时间戳"""
tz = pytz.timezone(cls.TIMEZONES.get(exchange, "UTC"))
if dt.tzinfo is None:
dt = tz.localize(dt)
else:
dt = dt.astimezone(pytz.UTC)
return int(dt.timestamp() * 1000)
@register_function(
name="normalize_market_data",
description="将不同来源、不同市场的行情数据归一化为统一格式"
)
def normalize_market_data(
context: ExecutionContext,
data: Dict[str, Any],
target_format: str = "normalized"
) -> Dict[str, Any]:
"""
跨市场数据归一化
支持的数据结构:
- klines: K 线数据列表
- depth: 订单簿数据
- mixed: 包含 klines 和 depth 的混合数据
Args:
context: 执行上下文
data: 原始数据(来自任意数据源)
target_format: 目标格式,目前仅支持 "normalized"
Returns:
归一化后的标准格式数据
"""
if target_format != "normalized":
return {
"success": False,
"error": f"不支持的目标格式: {target_format}"
}
result = {
"normalized_at": datetime.utcnow().isoformat() + "Z",
"symbols": [],
"data": {}
}
# 检测数据结构类型
if "klines" in data:
result["data"]["klines"] = []
for kline in data["klines"]:
normalized = cls._normalize_kline(kline)
result["data"]["klines"].append(normalized)
if normalized["symbol"] not in result["symbols"]:
result["symbols"].append(normalized["symbol"])
elif "depth" in data:
result["data"]["depth"] = cls._normalize_depth(data["depth"])
if result["data"]["depth"]["symbol"] not in result["symbols"]:
result["symbols"].append(result["data"]["depth"]["symbol"])
elif "mixed" in data:
# 混合数据,分别归一化
result["data"]["klines"] = [
cls._normalize_kline(k) for k in data["mixed"].get("klines", [])
]
result["data"]["depth"] = cls._normalize_depth(data["mixed"].get("depth", {}))
result["symbols"] = list(set([
k["symbol"] for k in result["data"]["klines"]
] + [result["data"]["depth"]["symbol"]]))
return result
# 将方法绑定到类(简化调用)
NormalizeKline = NormalizedKline
normalize_market_data.__class__ = type('Temp', (), {
'cls': MarketNormalizer
})
四、SKILL 配置与企业级部署
4.1 config.yaml:企业私有配置
# config.yaml
# ⚠️ 敏感配置(api_key)应通过环境变量覆盖,不提交到代码仓库
# 私有数据源配置
private_data_endpoint: ${PRIVATE_DATA_ENDPOINT}
api_key: ${PRIVATE_DATA_API_KEY}
# TickDB 连接配置
tickdb:
endpoint: https://api.tickdb.ai
api_key: ${TICKDB_API_KEY}
default_timeout: 30
max_retries: 3
# 数据源优先级
data_source_priority:
- tickdb # 优先 TickDB
- private # 私有数据源兜底
# 告警配置
alerts:
enabled: true
channels:
- feishu # 飞书
threshold:
data_failure_count: 3
consecutive_failures_window: 300 # 5分钟内失败3次触发告警
# 缓存策略
cache:
enabled: true
ttl_seconds: 5 # 深度数据缓存 5 秒
max_size_mb: 512
# 日志配置
logging:
level: INFO
format: json
output: stdout
4.2 企业部署方案对比
| 维度 | 标准 SKILL | 企业私有部署 | 完全自建 |
|---|---|---|---|
| 部署位置 | ClawHub 托管 | 企业私有云 / VPC | 完全自托管 |
| 数据主权 | TickDB + 私有混合 | 企业完全控制 | 100% 自主 |
| 定制能力 | Function 扩展 | Function + 底层协议 | 无限制 |
| 运维成本 | 零运维 | 需维护 K8s 集群 | 全栈运维 |
| 适用规模 | 5 人以下团队 | 20 人以上量化团队 | 百人以上机构 |
| 启动周期 | 1 天 | 2-4 周 | 3-6 个月 |
| 成本结构 | 按量付费 | 年度订阅 + 基础设施 | 研发人力成本 |
4.3 私有部署配置示例
# 企业私有部署 config(tickdb-enterprise.yaml)
# 适用于需要完全数据主权和定制能力的机构
enterprise:
mode: private_cloud
region: cn-hongkong # 或 ap-southeast-1
# 企业级高可用配置
ha:
enabled: true
replicas: 3
failover_timeout: 30s
# 私有数据源白名单
allowed_private_sources:
- internal-mysql
- s3://company-market-data
- proprietary-feed-01
# 安全策略
security:
mfa_required: true
ip_whitelist:
- 10.0.0.0/8
- 172.16.0.0/12
audit_log_enabled: true
data_encryption: AES-256
# SKILL 执行环境
execution:
sandbox_enabled: true
max_execution_time: 60s
memory_limit: 2Gi
cpu_limit: 2 cores
五、价值对比:自建 vs SKILL 扩展 vs 企业部署
| 能力维度 | 完全自建 | TickDB 标准版 | SKILL 扩展 | 企业私有部署 |
|---|---|---|---|---|
| 多数据源整合 | ||||
| 美股 + 港股 + 加密货币 | 需对接 3 个供应商 | TickDB 统一 API | TickDB + 私有源一键切换 | 同左,可扩展 |
| 私有数据源兜底 | 需自行实现 | 不支持 | Function 自动降级 | 同左 |
| 开发效率 | ||||
| 接入新数据源 | 2-4 周 | - | 1-2 天 | 1-2 天 |
| 自定义因子封装 | 从零开发 | 不支持 | Function 复用 | 同左 |
| 可靠性 | ||||
| 心跳/重连机制 | 自行实现 | TickDB SDK | TickDB SDK + Function 封装 | 同左 |
| 限频自适应 | 自行实现 | SDK 原生支持 | SDK 原生支持 | 同左 |
| 多数据源容灾 | 自行实现 | Function 层兜底 | Function 层 + 告警 | 同左 + 企业 SLA |
| 运维 | ||||
| 监控告警 | 自行搭建 | 可选增值服务 | 可选增值服务 | 企业级监控 |
| 版本更新 | 自行维护 | 自动 | ClawHub 自动推送 | 企业内网更新 |
| 数据能力 | ||||
| 历史 K 线(美股 10 年) | 需单独采购 | TickDB | TickDB | TickDB |
| 实时 depth(美股 1 档) | 需单独采购 | TickDB | TickDB | TickDB |
| tick 级逐笔(港股/加密) | 需单独采购 | TickDB | TickDB | TickDB |
| 成本 | ||||
| 初始投入 | 高(人力 + 采购) | 低 | 低 | 中(订阅费 + 基础设施) |
| 边际成本 | 固定 | 按量 | 按量 | 按量 |
六、生命周期管理:从注册到迭代
6.1 SKILL 发布流程
# 1. 本地验证
tickdb skill validate ./enterprise-market-data
# 2. 运行测试套件
tickdb skill test ./enterprise-market-data --coverage
# 3. 打包发布
tickdb skill publish ./enterprise-market-data \
--version 1.0.0 \
--visibility private \
--tag "enterprise,quant-team"
# 4. 版本管理
tickdb skill release ./enterprise-market-data \
--from 1.0.0 \
--to 1.1.0 \
--changelog ./CHANGELOG.md
6.2 版本兼容策略
// manifest.json 中的版本约束
{
"tickdb_version": ">=2.0.0,<3.0.0",
"deprecated_functions": [
{
"name": "get_depth_snapshot",
"deprecated_since": "1.5.0",
"will_remove_in": "2.0.0",
"replacement": "get_equity_depth"
}
]
}
结语
TickDB SKILL 协议的本质,是为量化团队提供了一把钥匙——打开数据孤岛的锁,建立统一的行情能力平台。
通过 SKILL 扩展,你可以:
- 复用:将私有数据源、自定义因子封装为可复用的 Function,避免重复开发
- 统一:在 AI 助手的执行上下文中,自动选择最优数据源,无需手动切换
- 扩展:从标准 SKILL 到企业私有部署,按需演进,不用一开始就过度设计
行情数据的竞争,本质上是数据整合效率的竞争。谁能用更低的成本、更短的时间,将多源异构数据整合为统一的、可供策略直接使用的高质量数据,谁就占据了优势。
SKILL 协议正是为这个目标而设计。
下一步行动
如果你想快速体验 SKILL 扩展:
- 访问 tickdb.ai 注册(免费,无需信用卡)
- 在 ClawHub 搜索
tickdb-market-data安装官方 SKILL - 在 AI 助手中调用
get_kline、get_depth等 Function
如果你需要企业级私有部署:
- 访问 tickdb.ai/enterprise 了解私有化方案
- 联系 [email protected] 获取定制报价
- 安排技术对接,评估数据源接入需求
如果你希望开发自定义 SKILL:
- 阅读 SKILL 协议规范文档
- 参考本文示例代码,从
manifest.json开始构建 - 在 ClawHub 创建私有 SKILL,邀请团队成员协作
风险提示:本文不构成任何投资建议。SKILL 扩展涉及 API 调用和数据处理,实际使用中请确保妥善管理 API Key 密钥,并遵守各数据源的使用条款。市场有风险,投资需谨慎。