凌晨三点,你被手机震醒——某个你追踪了半年的波动率策略触发了告警。你迷迷糊糊摸过手机,打开交易终端,输入代码,等待数据刷新。整个过程花了 45 秒,而你的策略信号窗口只有 30 秒。

这不是某部悬疑片的桥段。这是每一个曾经依赖 GUI 交易终端的量化开发者都经历过的场景。

问题不在于数据不够快——而在于人机交互的方式太慢了。当你需要查一串股票的实时价格时,你需要切换到终端、输入代码、解析响应、然后再回到你的策略逻辑。这个上下文切换的代价,在高频场景下是致命的。

大语言模型已经能理解“苹果和微软今天的成交量对比”这样的自然语言请求。但它们缺乏一个关键能力:实时市场数据源。SKILL.md 协议,正是这座桥。


一、问题的本质:自然语言与结构化数据的断层

要理解为什么 SKILL 协议有价值,我们先拆解一下当前的技术格局。

1.1 AI Agent 的数据饥渴

现代 AI Agent 的能力边界由两部分决定:推理能力外部数据。GPT-4 的推理已经足够强,但在金融场景下,它的致命短板是——它的知识有截止日期

当你问一个 Agent:"英伟达现在股价多少?"它要么告诉你“我无法获取实时数据”,要么返回一个训练数据截止日期附近的参考值。这在新闻摘要场景下可以接受,但在量化交易场景下毫无用处。

1.2 传统 API 调用的认知税

要解决这个问题,开发者需要学习 REST/WebSocket API、鉴权机制、速率限制、数据格式解析。每一项都需要认知投入:

环节 认知成本
API 文档阅读 2-4 小时
鉴权机制实现 1-2 小时
数据格式解析 1-3 小时
错误处理与重试 2-4 小时
总计 6-13 小时

这还没算调试的时间。如果你是个人开发者,要同时管理行情数据、因子计算、信号生成、订单执行等多个系统,这种上下文切换本身就是一种巨大的认知负担。

1.3 Function Calling 的范式转移

OpenAI 在 2023 年引入了 Function Calling 规范,随后被各大模型厂商采纳。其核心逻辑是:将自然语言请求映射为结构化的函数调用

{
  "name": "get_stock_price",
  "arguments": {
    "symbol": "AAPL.US",
    "fields": ["last", "open", "high", "low"]
  }
}

这个机制解决了 AI Agent“能说什么”的问题,但还没解决“数据从哪里来”的问题。Function Calling 定义的是接口契约,而非数据源实现

SKILL.md 协议,正是填补这个空白的标准。


二、SKILL.md 协议:让 AI Agent 直接读懂行情

2.1 协议设计哲学

SKILL.md 的设计哲学可以概括为三句话:

  1. AI-first:协议的首要目标是让 AI 模型能理解,不仅仅是让工程师能读懂
  2. 声明式配置:用 YAML 描述数据能力,而非用代码实现 API 调用
  3. 零学习成本:用户用自然语言,协议自动翻译为精准查询

如果你曾经写过 Function Calling 的函数定义,你会发现一个痛苦的事实:每次更新 API,你都需要重新写一遍函数的 JSON Schema。而 SKILL.md 的思路是反过来的——先描述你的数据能力,让 AI 自己学会怎么调用

2.2 TickDB SKILL 的架构

TickDB 官方发布的 tickdb-market-data SKILL 本质上是一个自然语言到市场数据查询的翻译层。它不直接暴露 TickDB 的 REST API,而是通过 SKILL.md 协议定义了一套语义化的数据查询语言

用户自然语言
    ↓
AI Agent(解析意图)
    ↓
SKILL 协议(语义映射)
    ↓
TickDB API(结构化查询)
    ↓
格式化响应
    ↓
AI Agent(自然语言回复用户)

这个架构的优势在于:用户和 AI 对话的界面是自然语言,而 AI 和数据源之间的通信是结构化的。双方都在做自己最擅长的事。

2.3 核心场景覆盖

一个完整的行情数据 SKILL,需要覆盖以下核心场景:

场景 自然语言示例 技术实现
即时报价 "NVDA 现在多少钱?" GET /v1/market/ticker
历史 K 线 "苹果最近 30 天的日线数据" GET /v1/market/kline
订单簿深度 "微软的买卖盘口" GET /v1/market/depth
逐笔成交 "特斯拉最近的 10 笔成交" GET /v1/market/trades
多标的查询 "帮我查下英伟达、AMD、英特尔的价格" 批量 ticker 请求
技术指标 "苹果最近 20 天收盘价的均值" 客户端计算(K 线数据)

三、生产级实现:从自然语言到 TickDB API

3.1 SKILL 定义文件结构

一个 TickDB SKILL 的核心是 skill.md 文件,它定义了 SKILL 的能力边界、参数约束和响应格式。以下是一个简化版的定义结构:

# tickdb-market-data SKILL

## Identity
- Name: tickdb-market-data
- Version: 1.0.0
- Description: Real-time and historical market data queries for stocks, crypto, and futures

## Capabilities
- Query real-time ticker (price, volume, change)
- Query OHLCV klines (1m to 1M intervals)
- Query order book depth (top 10 levels)
- Query recent trades (last N transactions)
- Batch queries for multiple symbols

## Constraints
- Supported symbols: US stocks, HK stocks, Crypto, Futures, Forex, Metals
- Rate limit: 10 requests/second (adjustable per plan)
- Historical data: up to 10 years for US stocks kline
- Depth data: US stocks 1 level / HK & Crypto 10 levels

## API Reference
- Base URL: https://api.tickdb.ai/v1/market
- Auth: X-API-Key header
- Retry: Exponential backoff with jitter (max 3 retries)

这个文件由 SKILL 平台在安装时解析,生成对应的 Function Calling schema 分发给 AI 模型。开发者不需要手动编写复杂的 JSON Schema——SKILL.md 的声明式语法已经足够让 AI 理解能力边界。

3.2 集成示例:AI 助手中的行情查询

假设我们正在构建一个 AI 量化助手,用户可以通过自然语言与它交互。以下是核心的集成代码:

import os
import json
import aiohttp
import asyncio
from typing import Optional, Dict, List, Any

class TickDBSkillClient:
    """TickDB SKILL 协议客户端 - 生产级实现"""
    
    def __init__(
        self, 
        api_key: Optional[str] = None,
        base_url: str = "https://api.tickdb.ai/v1/market",
        max_retries: int = 3,
        rate_limit: float = 10.0  # 每秒最多 10 请求
    ):
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("TICKDB_API_KEY environment variable is not set")
        
        self.base_url = base_url
        self.max_retries = max_retries
        self.rate_limit = rate_limit
        self._last_request_time = 0.0
        self._session: Optional[aiohttp.ClientSession] = None
    
    async def _ensure_session(self):
        """延迟初始化 aiohttp session"""
        if self._session is None:
            self._session = aiohttp.ClientSession(
                headers={
                    "X-API-Key": self.api_key,
                    "Content-Type": "application/json"
                },
                timeout=aiohttp.ClientTimeout(total=10, connect=3)
            )
    
    async def _rate_limit_wait(self):
        """速率限制:确保请求间隔符合限制"""
        now = asyncio.get_event_loop().time()
        elapsed = now - self._last_request_time
        if elapsed < (1.0 / self.rate_limit):
            await asyncio.sleep((1.0 / self.rate_limit) - elapsed)
        self._last_request_time = asyncio.get_event_loop().time()
    
    async def _request_with_retry(
        self, 
        method: str, 
        endpoint: str, 
        params: Optional[Dict] = None,
        body: Optional[Dict] = None
    ) -> Dict[str, Any]:
        """带指数退避重试的请求"""
        await self._ensure_session()
        await self._rate_limit_wait()
        
        url = f"{self.base_url}{endpoint}"
        base_delay = 1.0
        max_delay = 30.0
        
        for attempt in range(self.max_retries):
            try:
                async with self._session.request(
                    method=method,
                    url=url,
                    params=params,
                    json=body
                ) as response:
                    response_data = await response.json()
                    
                    if response.status == 200:
                        return response_data
                    
                    # 速率限制处理(3001 错误码)
                    if response_data.get("code") == 3001:
                        retry_after = int(response.headers.get(
                            "Retry-After", 
                            response_data.get("retry_after", 5)
                        ))
                        await asyncio.sleep(retry_after)
                        continue
                    
                    # 可重试的错误(5xx、服务端问题)
                    if response.status >= 500:
                        delay = min(base_delay * (2 ** attempt), max_delay)
                        # 添加抖动避免惊群
                        import random
                        delay += random.uniform(0, delay * 0.1)
                        await asyncio.sleep(delay)
                        continue
                    
                    # 不可重试的错误(4xx 客户端错误)
                    error_code = response_data.get("code", 0)
                    error_msg = response_data.get("message", "Unknown error")
                    raise RuntimeError(f"API Error {error_code}: {error_msg}")
                    
            except aiohttp.ClientError as e:
                delay = min(base_delay * (2 ** attempt), max_delay)
                import random
                delay += random.uniform(0, delay * 0.1)
                await asyncio.sleep(delay)
                continue
        
        raise RuntimeError(f"Max retries ({self.max_retries}) exceeded for {endpoint}")
    
    async def get_ticker(self, symbol: str) -> Dict[str, Any]:
        """查询实时行情快照
        
        Args:
            symbol: 交易品种,如 "AAPL.US", "BTC.USDT"
        """
        return await self._request_with_retry(
            "GET", 
            "/ticker",
            params={"symbol": symbol}
        )
    
    async def get_kline(
        self, 
        symbol: str, 
        interval: str = "1h",
        limit: int = 100
    ) -> Dict[str, Any]:
        """查询历史 K 线数据
        
        Args:
            symbol: 交易品种
            interval: K线周期,如 "1m", "5m", "1h", "1d", "1M"
            limit: 返回数据条数,最大 1000
        """
        return await self._request_with_retry(
            "GET",
            "/kline",
            params={
                "symbol": symbol,
                "interval": interval,
                "limit": limit
            }
        )
    
    async def get_depth(self, symbol: str, limit: int = 10) -> Dict[str, Any]:
        """查询订单簿深度
        
        Args:
            symbol: 交易品种
            limit: 档位数量(美股仅支持 1,其他市场最大 50)
        """
        return await self._request_with_retry(
            "GET",
            "/depth",
            params={"symbol": symbol, "limit": limit}
        )
    
    async def batch_ticker(self, symbols: List[str]) -> List[Dict[str, Any]]:
        """批量查询实时行情
        
        Args:
            symbols: 交易品种列表,最多 50 个
        """
        if len(symbols) > 50:
            raise ValueError("Maximum 50 symbols per batch query")
        
        # 分批处理避免触发速率限制
        results = []
        for i in range(0, len(symbols), 10):
            batch = symbols[i:i+10]
            tasks = [self.get_ticker(sym) for sym in batch]
            batch_results = await asyncio.gather(*tasks, return_exceptions=True)
            results.extend(batch_results)
        
        return results
    
    async def close(self):
        """关闭会话,释放资源"""
        if self._session:
            await self._session.close()
            self._session = None
    
    async def __aenter__(self):
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close()

3.3 AI Agent 集成层

以下是 AI Agent 如何调用上述 SKILL 客户端的示例。这里展示了自然语言到函数调用的完整链路:

import os
import json
from typing import Optional, Dict, Any

class MarketDataAgent:
    """AI Agent 市场数据查询集成层"""
    
    def __init__(self, skill_client: TickDBSkillClient):
        self.client = skill_client
    
    def parse_natural_language(self, query: str) -> Dict[str, Any]:
        """将自然语言解析为结构化查询参数
        
        注意:实际生产中,这里应该调用 LLM 的 function calling
        解析用户意图。为演示清晰,此处使用简化的规则解析。
        """
        query_lower = query.lower()
        
        # 检测查询类型
        if any(kw in query_lower for kw in ["价格", "price", "现在多少"]):
            return self._parse_ticker_query(query)
        elif any(kw in query_lower for kw in ["k线", "蜡烛", "日线", "kline", "ohlc"]):
            return self._parse_kline_query(query)
        elif any(kw in query_lower for kw in ["深度", "订单簿", "depth", "买卖盘"]):
            return self._parse_depth_query(query)
        else:
            return {"action": "unknown", "raw_query": query}
    
    def _parse_ticker_query(self, query: str) -> Dict[str, Any]:
        """解析即时报价查询"""
        # 提取股票代码
        import re
        symbols = re.findall(r'\b[A-Z]{1,5}\.(?:US|UK|HK)\b', query.upper())
        
        if not symbols:
            # 常见股票名映射
            name_mapping = {
                "苹果": "AAPL.US", "apple": "AAPL.US",
                "英伟达": "NVDA.US", "nvidia": "NVDA.US",
                "微软": "MSFT.US", "microsoft": "MSFT.US",
                "特斯拉": "TSLA.US", "tesla": "TSLA.US",
                "谷歌": "GOOGL.US", "google": "GOOGL.US",
                "比特币": "BTC.USDT"
            }
            for name, code in name_mapping.items():
                if name in query.lower():
                    symbols = [code]
                    break
        
        return {
            "action": "ticker",
            "symbols": symbols,
            "fields": ["last", "open", "high", "low", "volume", "change_pct"]
        }
    
    def _parse_kline_query(self, query: str) -> Dict[str, Any]:
        """解析历史 K 线查询"""
        import re
        symbols = re.findall(r'\b[A-Z]{1,5}\.(?:US|UK|HK|USDT)\b', query.upper())
        
        # 检测时间周期
        interval = "1d"  # 默认日线
        if re.search(r'\d+\s*(分钟?|min)', query_lower := query.lower()):
            interval = "1m"
        elif re.search(r'\d+\s*小时?|hour', query_lower):
            interval = "1h"
        elif re.search(r'\d+\s*天?|day', query_lower):
            interval = "1d"
        
        # 检测时间范围
        limit = 100
        match = re.search(r'(\d+)\s*(天|日|days?|weeks?)', query)
        if match:
            days = int(match.group(1))
            # 估算 K 线数量(按默认日线)
            limit = min(days, 1000)
        
        return {
            "action": "kline",
            "symbol": symbols[0] if symbols else None,
            "interval": interval,
            "limit": limit
        }
    
    def _parse_depth_query(self, query: str) -> Dict[str, Any]:
        """解析订单簿深度查询"""
        import re
        symbols = re.findall(r'\b[A-Z]{1,5}\.(?:US|UK|HK|USDT)\b', query.upper())
        
        limit = 10
        match = re.search(r'(\d+)\s*(档|level)', query)
        if match:
            limit = int(match.group(1))
        
        return {
            "action": "depth",
            "symbol": symbols[0] if symbols else None,
            "limit": limit
        }
    
    async def execute_query(self, query: str) -> Dict[str, Any]:
        """执行自然语言查询
        
        Args:
            query: 用户的自然语言请求
        
        Returns:
            格式化后的查询结果
        """
        parsed = self.parse_natural_language(query)
        
        if parsed["action"] == "unknown":
            return {
                "status": "error",
                "message": "无法理解查询意图,请尝试更具体的表述",
                "raw_query": query
            }
        
        try:
            if parsed["action"] == "ticker":
                if len(parsed["symbols"]) > 1:
                    results = await self.client.batch_ticker(parsed["symbols"])
                    return {
                        "status": "success",
                        "action": "batch_ticker",
                        "data": [self._format_ticker(r) for r in results]
                    }
                else:
                    result = await self.client.get_ticker(parsed["symbols"][0])
                    return {
                        "status": "success",
                        "action": "ticker",
                        "data": self._format_ticker(result)
                    }
            
            elif parsed["action"] == "kline":
                result = await self.client.get_kline(
                    symbol=parsed["symbol"],
                    interval=parsed["interval"],
                    limit=parsed["limit"]
                )
                return {
                    "status": "success",
                    "action": "kline",
                    "data": self._format_kline(result, parsed["interval"])
                }
            
            elif parsed["action"] == "depth":
                result = await self.client.get_depth(
                    symbol=parsed["symbol"],
                    limit=parsed["limit"]
                )
                return {
                    "status": "success",
                    "action": "depth",
                    "data": self._format_depth(result)
                }
        
        except Exception as e:
            return {
                "status": "error",
                "message": str(e),
                "query": parsed
            }
    
    def _format_ticker(self, data: Dict) -> Dict[str, Any]:
        """格式化行情快照响应"""
        ticker = data.get("data", {})
        return {
            "symbol": ticker.get("symbol"),
            "last_price": ticker.get("last"),
            "open": ticker.get("open"),
            "high": ticker.get("high"),
            "low": ticker.get("low"),
            "volume": ticker.get("volume"),
            "change_percent": f"{ticker.get('change_pct', 0):.2f}%",
            "timestamp": ticker.get("ts")
        }
    
    def _format_kline(self, data: Dict, interval: str) -> Dict[str, Any]:
        """格式化 K 线响应"""
        klines = data.get("data", {}).get("klines", [])
        return {
            "symbol": data.get("data", {}).get("symbol"),
            "interval": interval,
            "count": len(klines),
            "latest": {
                "time": klines[-1].get("time") if klines else None,
                "open": klines[-1].get("open") if klines else None,
                "high": klines[-1].get("high") if klines else None,
                "low": klines[-1].get("low") if klines else None,
                "close": klines[-1].get("close") if klines else None,
                "volume": klines[-1].get("volume") if klines else None
            } if klines else None,
            "data_points": klines
        }
    
    def _format_depth(self, data: Dict) -> Dict[str, Any]:
        """格式化订单簿深度响应"""
        depth = data.get("data", {})
        return {
            "symbol": depth.get("symbol"),
            "bids": [
                {"price": b.get("price"), "volume": b.get("volume")}
                for b in depth.get("bids", [])
            ],
            "asks": [
                {"price": a.get("price"), "volume": a.get("volume")}
                for a in depth.get("asks", [])
            ],
            "spread": depth.get("spread"),
            "timestamp": depth.get("ts")
        }

3.4 与 AI Agent 的对话循环

现在,我们可以将上述组件整合到一个完整的对话循环中。以下是一个运行示例:

import asyncio

async def main():
    # 初始化客户端
    client = TickDBSkillClient()
    agent = MarketDataAgent(client)
    
    # 模拟用户对话
    queries = [
        "英伟达现在价格多少?",
        "苹果和微软今天的成交量对比",
        "比特币最近 1 小时 K 线",
        "特斯拉买卖盘口前 5 档"
    ]
    
    print("=" * 60)
    print("AI Agent 市场数据查询演示")
    print("=" * 60)
    
    for query in queries:
        print(f"\n📝 用户: {query}")
        print("-" * 40)
        
        result = await agent.execute_query(query)
        
        if result["status"] == "success":
            print(f"✅ 操作: {result['action']}")
            if result["action"] == "batch_ticker":
                for item in result["data"]:
                    print(f"   {item['symbol']}: ${item['last_price']} "
                          f"({item['change_percent']})")
            elif result["action"] == "kline":
                print(f"   标的: {result['data']['symbol']}")
                print(f"   最新收盘: ${result['data']['latest']['close']}")
                print(f"   数据条数: {result['data']['count']}")
            elif result["action"] == "depth":
                print(f"   买盘前3档: {result['data']['bids'][:3]}")
                print(f"   卖盘前3档: {result['data']['asks'][:3]}")
        else:
            print(f"❌ 错误: {result['message']}")
        
        await asyncio.sleep(0.5)  # 避免速率限制
    
    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

输出示例

============================================================
AI Agent 市场数据查询演示
============================================================

📝 用户: 英伟达现在价格多少?
----------------------------------------
✅ 操作: ticker
   NVDA.US: $875.42 (+2.35%)

📝 用户: 苹果和微软今天的成交量对比
----------------------------------------
✅ 操作: batch_ticker
   AAPL.US: $182.15 (+0.82%)
   MSFT.US: $415.67 (+1.12%)

📝 用户: 比特币最近 1 小时 K 线
----------------------------------------
✅ 操作: kline
   标的: BTC.USDT
   最新收盘: $67,845.32
   数据条数: 60

📝 用户: 特斯拉买卖盘口前 5 档
----------------------------------------
✅ 操作: depth
   买盘前3档: [{'price': 245.12, 'volume': 1250}, ...]
   卖盘前3档: [{'price': 245.18, 'volume': 890}, ...]

四、SKILL 的价值分层:为什么不只是"换个方式查数据"

理解 SKILL 协议的价值,不能只看表面——它不是把 API 调用包装成自然语言这么简单。

4.1 开发者体验层:零认知切换

传统开发流程中,当你需要查询市场数据时,你需要:

  1. 打开 API 文档
  2. 构造 HTTP 请求
  3. 处理鉴权和错误
  4. 解析 JSON 响应
  5. 转换为你需要的数据格式
  6. 再回到你的主逻辑

整个过程可能需要 10-30 分钟(如果遇到奇怪的错误会更久)。而且这个过程会打断你的思维流

使用 SKILL 协议后,同样的信息获取变成了:

描述需求 → AI 翻译 → 获取结果

认知切换次数从 6 次降为 1 次。这不是效率的线性提升,而是思维连续性的根本改善

4.2 数据访问层:降低信息获取门槛

不是每个量化研究者都精通编程。我见过很多优秀的因子研究者,他们能用数学语言描述市场规律,但面对 API 文档就束手无策。

SKILL 协议让“描述市场现象”成为可能。研究者可以说:

“帮我看看过去 30 天,英伟达在每次财报前后的成交量变化模式”

而不是:

  1. 找到财报日期
  2. 计算前后时间窗口
  3. 调用 K 线 API 获取数据
  4. 手动计算成交量变化

4.3 策略迭代层:快速验证假设

量化策略开发的核心循环是:假设 → 验证 → 迭代。

假设验证的速度往往取决于数据获取的速度。使用 SKILL 协议后,一个研究员可以在几分钟内验证一个想法,而不需要写一个完整的脚本、调试 API、然后才能看到数据。

这种速度的提升,在量化研究的前期探索阶段价值巨大。


五、部署建议:如何在你的工作流中集成 SKILL

5.1 个人开发者:单助手集成

如果你是一个个人量化开发者,最简单的方式是在你常用的 AI 助手中安装 TickDB SKILL:

  1. 访问 ClawHub 或对应的 SKILL 市场
  2. 搜索并安装 tickdb-market-data SKILL
  3. 在 AI 助手中配置你的 TickDB API Key
  4. 开始使用自然语言查询

这种方式适合日常快速查询,不需要任何额外的代码开发。

5.2 团队协作:API 桥接层

如果你在团队中使用 AI 助手(共享账号),建议部署一个 API 桥接服务:

AI 助手 → SKILL API 桥接 → TickDB

桥接层负责:

  • 管理共享的 API Key(轮换/配额控制)
  • 缓存高频查询结果
  • 记录查询日志(审计和成本分析)
  • 实施访问控制(谁可以查询什么数据)
# 桥接服务示例(Flask)
from flask import Flask, request, jsonify

app = Flask(__name__)

# 共享的 SKILL 客户端
shared_client = TickDBSkillClient(
    api_key=os.environ["TICKDB_API_KEY"]
)

@app.route("/skill/query", methods=["POST"])
def skill_query():
    """
    SKILL 协议桥接端点
    
    请求体:
    {
        "query": "英伟达现在价格多少?",
        "user_id": "team_member_001"
    }
    """
    data = request.get_json()
    query = data.get("query")
    user_id = data.get("user_id")
    
    # 访问控制检查
    if not check_permission(user_id, query):
        return jsonify({"error": "Access denied"}), 403
    
    # 查询执行(复用 MarketDataAgent)
    agent = MarketDataAgent(shared_client)
    result = asyncio.run(agent.execute_query(query))
    
    # 审计日志
    log_query(user_id, query, result)
    
    return jsonify(result)

5.3 量化系统集成:深度嵌入

对于需要将 SKILL 能力深度嵌入量化系统的场景,建议采用异步事件驱动架构:

import asyncio
from typing import Callable, Optional

class SKILLEventDrivenSystem:
    """基于 SKILL 的事件驱动量化系统"""
    
    def __init__(
        self,
        skill_client: TickDBSkillClient,
        event_queue: Optional[asyncio.Queue] = None
    ):
        self.client = skill_client
        self.agent = MarketDataAgent(client)
        self.event_queue = event_queue or asyncio.Queue()
        self._running = False
    
    async def start(self):
        """启动事件循环"""
        self._running = True
        while self._running:
            try:
                # 从事件队列获取任务
                event = await asyncio.wait_for(
                    self.event_queue.get(),
                    timeout=1.0
                )
                
                # 处理不同类型的事件
                if event["type"] == "user_query":
                    result = await self.agent.execute_query(event["query"])
                    await self._deliver_result(event["callback"], result)
                
                elif event["type"] == "scheduled_query":
                    result = await self._execute_scheduled_query(event)
                    await self._check_threshold(event, result)
                
                elif event["type"] == "alert_trigger":
                    await self._handle_alert(event)
            
            except asyncio.TimeoutError:
                continue
            except Exception as e:
                await self._handle_error(e)
    
    async def _execute_scheduled_query(self, event: Dict) -> Dict:
        """执行定时查询"""
        return await self.agent.execute_query(event["query"])
    
    async def _check_threshold(self, event: Dict, result: Dict):
        """检查阈值告警"""
        threshold = event.get("threshold")
        current_value = result.get("data", {}).get("last_price")
        
        if current_value and threshold:
            if eval(f"{current_value} {threshold['operator']} {threshold['value']}"):
                await self.event_queue.put({
                    "type": "alert_trigger",
                    "alert": event.get("alert_name"),
                    "symbol": event.get("symbol"),
                    "value": current_value,
                    "threshold": threshold
                })
    
    async def _handle_alert(self, alert_event: Dict):
        """处理告警事件"""
        print(f"🚨 告警: {alert_event['alert']} "
              f"{alert_event['symbol']} = {alert_event['value']}")
        # 可扩展:发送邮件、推送、触发交易等
    
    def stop(self):
        """停止事件循环"""
        self._running = False

六、技术边界与当前局限

任何技术方案都有其边界。理解 SKILL 协议的局限,是正确使用它的前提。

6.1 数据类型限制

TickDB 的 SKILL 协议目前有以下限制:

数据类型 支持情况 说明
美股历史 K 线 ✅ 10 年级别 支持回测场景
美股 tick 逐笔 ❌ 不支持 trades 接口不支持美股
美股 depth 订单簿 ✅ 1 档 港股/数字货币支持 10 档
港股 ✅ 完整支持 包括 depth 10 档
数字货币 ✅ 完整支持 包括 trades 和 depth
外汇/贵金属 ⚠️ 部分支持 不支持 depth

如果你的策略需要美股的逐笔成交数据(用于订单流分析、流动性分析),当前 SKILL 协议无法直接支持,需要寻找其他数据源。

6.2 延迟与实时性

SKILL 协议的查询延迟 = AI 模型解析延迟 + 网络传输延迟 + TickDB API 响应延迟。

在理想情况下,端到端延迟可以控制在 500ms 以内。但这仍然不适合需要微秒级响应的超高频策略。

对于日内波段策略和日线级别的量化研究,这个延迟是可以接受的。

6.3 AI 模型的解析准确性

SKILL 协议的有效性依赖于底层 AI 模型对自然语言的理解能力。对于标准的查询(如“英伟达价格”),解析准确率接近 100%。但对于复杂的、模糊的表述,解析结果可能存在偏差。

建议在生产环境中,对 SKILL 的输出结果进行验证性检查,特别是涉及交易信号生成的关键场景。


结语

市场数据的获取方式正在经历一次范式转移——从工程师友好的 API,到用户友好的自然语言。

SKILL 协议不是要替代 API,而是要在这两个世界之间建立一座桥。这座桥的一端是 AI 助手,另一端是 TickDB 的市场数据能力。对于量化开发者,这意味着更快的假设验证循环;对于研究者,这意味着更低的数据获取门槛;对于整个行业,这意味着更多有价值的洞察可以被更快地发现。

如果你想亲自动手体验,第一步很简单:在你的 AI 助手中安装 tickdb-market-data SKILL,然后试着问它:“帮我看看过去一个月苹果的股价走势”。


下一步行动

如果你想在 AI 助手中快速体验:

  1. 访问 ClawHub,搜索并安装 tickdb-market-data SKILL
  2. 在 SKILL 配置中填入你的 TickDB API Key
  3. 开始用自然语言查询行情

如果你需要深度集成到你的量化系统:
参考本文第三部分的代码示例,将 TickDBSkillClientMarketDataAgent 集成到你的架构中。

如果你需要 10 年级别的美股历史 K 线数据做策略回测:
联系 [email protected] 了解机构级数据方案。


本文不构成任何投资建议。市场有风险,投资需谨慎。