你的策略不会说话,但 AI 可以替你开口

凌晨三点,你的手机震了。

你迷迷糊糊拿起手机,看到飞书推送:「AAPL.US 买盘压力比突破 2.5,建议关注」——这不是你自己写的监控脚本,是 AI Agent 在替你看着。

这不是科幻。是你接下来的 20 分钟能亲手搭出来的东西。

本文手把手演示:用 AI Agent 调用 TickDB 的行情 SKILL,把自然语言查询变成可执行的量化信号。不需要你会 Python,不需要你懂 WebSocket,连 API 文档都不用翻。你只需要知道你想问什么。


一、痛点:你和行情数据之间,隔着一整本手册

量化入门有三道坎:

第一道坎:数据获取。OpenBB、yfinance、Polygon……每家的 API 文档都是一本砖头。认证方式不一样,返回格式不一样,错误码含义不一样。等你好不容易跑通一个,数据质量还不一定够用。

第二道坎:实时监控。K 线数据能拿到了,但「价格突破 180 块时提醒我」需要自己写轮询逻辑,实时 depth 推送更是要从头学 WebSocket。

第三道坎:策略自动化。你有交易逻辑,但把它翻译成代码、维护状态、处理异常,比策略本身还复杂。

这三道坎的共同问题是:你的想法和代码之间,需要一个翻译层。

AI Agent 就是这个翻译层。而 TickDB SKILL 是给这个翻译层提供的标准工具包。


二、解法架构:三层分离的信号链路

在动手之前,先理解整个系统的运行逻辑。架构不复杂,分三层:

┌──────────────────────────────────────────────────────┐
│                  自然语言层                           │
│          "AAPL 财报后订单簿有异动吗?"                │
└────────────────────┬─────────────────────────────────┘
                     │ LLM 解析意图
                     ▼
┌──────────────────────────────────────────────────────┐
│               AI Agent 调度层                        │
│     决策用哪个 SKILL → 调用 SKILL → 解析结果        │
└────────────────────┬─────────────────────────────────┘
                     │ Tool Call
                     ▼
┌──────────────────────────────────────────────────────┐
│              TickDB SKILL 执行层                     │
│   depth 频道订阅  │  kline 历史查询  │  信号计算    │
└──────────────────────────────────────────────────────┘

Layer 1 是你的脑子:你用自然语言描述需求,不需要知道数据在哪、怎么取。

Layer 2 是 AI Agent:它理解你的问题,决定调用哪个工具,把工具返回的原始数据翻译成人类能理解的信息。

Layer 3 是 TickDB SKILL:它封装了所有行情数据的获取逻辑——REST 拉历史、WebSocket 推实时、统一鉴权、统一错误处理——你不用碰底层。

三层各司其职,你只需要会提问。


三、工具链全景:TickDB SKILL 是什么

3.1 SKILL 协议的本质

如果你用过 OpenAI 的 Function Calling 或者 Claude 的 Tool Use,SKILL 的概念不难理解:SKILL 是一种标准化的工具描述协议,让 AI Agent 知道某个工具能做什么、接受什么参数、返回什么格式。

区别在于:传统 Function Calling 是每个应用自己定义工具,而 SKILL 是一个通用层。任何支持 SKILL 协议的平台(比如 ClawHub 上的 AI 助手)都可以发现和调用这些工具。

TickDB 在 ClawHub 发布的 SKILL 目前包括:

SKILL 名称 功能 输入 输出
tickdb-market-data 市场数据查询 品种、指标、时间范围 K 线数据、实时行情
tickdb-depth-stream 订单簿实时订阅 品种、档位数量 WebSocket depth 快照流
tickdb-signal-monitor 条件监控与告警 品种、触发条件、通知方式 触发状态、告警记录

3.2 SKILL 的使用门槛

使用 TickDB SKILL 有两种路径:

路径 A:AI 助手内直接调用(零代码)

如果你使用集成了 ClawHub 的 AI 助手,直接在对话中描述需求,Agent 会自动发现并调用 TickDB SKILL。你不需要写任何代码,甚至不需要知道 SKILL 是什么。

路径 B:通过 SDK 集成到自己的 Agent(低代码)

如果你的团队有自己的 AI Agent 系统,可以通过 TickDB 提供的 SKILL 描述文件(遵循 SKILL 协议规范)将其接入,自定义调度逻辑。下面演示的就是这种路径。


四、亲手实现:构建一个行情查询 Agent

下面我们用代码搭建一个完整的 Agent,原型可直接运行,核心逻辑约 150 行。

4.1 环境准备

pip install openai aiohttp python-dotenv

所需依赖极简:我们只需要一个大模型客户端(OpenAI 兼容接口)、一个异步 HTTP 库、以及环境变量管理。TickDB SKILL 的封装已经内置了 WebSocket 和 REST 的所有复杂度。

4.2 核心实现

import os
import json
import asyncio
import aiohttp
from typing import Optional

# ============================
# SKILL 协议定义层
# ============================

class TickDBSkill:
    """
    TickDB SKILL 协议封装
    遵循 SKILL 规范:工具名、描述、参数 schema、端点
    """
    SKILL_ID = "tickdb-market-data@latest"
    BASE_URL = "https://api.tickdb.ai/v1"

    def __init__(self, api_key: str):
        self.api_key = api_key

    @property
    def manifest(self) -> dict:
        """
        SKILL Manifest:AI Agent 发现和使用此工具的依据
        """
        return {
            "skill_id": self.SKILL_ID,
            "name": "get_market_data",
            "description": "查询金融品种的行情数据,支持 K 线历史和实时报价",
            "parameters": {
                "type": "object",
                "properties": {
                    "symbol": {
                        "type": "string",
                        "description": "交易品种代码,如 AAPL.US、BTC.USDT",
                        "examples": ["AAPL.US", "NVDA.US", "BTC.USDT"]
                    },
                    "data_type": {
                        "type": "string",
                        "enum": ["kline", "quote", "depth"],
                        "description": "数据类型:kline=K线历史,quote=实时报价,depth=订单簿深度"
                    },
                    "interval": {
                        "type": "string",
                        "description": "K线周期,仅 data_type=kline 时使用",
                        "examples": ["1m", "5m", "1h", "1d"]
                    },
                    "limit": {
                        "type": "integer",
                        "description": "返回数据条数",
                        "default": 100
                    }
                },
                "required": ["symbol", "data_type"]
            }
        }

    async def execute(self, symbol: str, data_type: str,
                      interval: Optional[str] = None,
                      limit: int = 100) -> dict:
        """
        SKILL 执行入口:统一路由到具体数据接口
        """
        headers = {"X-API-Key": self.api_key}

        if data_type == "kline":
            params = {
                "symbol": symbol,
                "interval": interval or "1h",
                "limit": limit
            }
            async with aiohttp.ClientSession() as session:
                async with session.get(
                    f"{self.BASE_URL}/market/kline",
                    headers=headers,
                    params=params,
                    timeout=aiohttp.ClientTimeout(total=10)
                ) as resp:
                    result = await resp.json()
                    return self._parse_response(result)

        elif data_type == "quote":
            async with aiohttp.ClientSession() as session:
                async with session.get(
                    f"{self.BASE_URL}/market/quote",
                    headers=headers,
                    params={"symbol": symbol},
                    timeout=aiohttp.ClientTimeout(total=10)
                ) as resp:
                    result = await resp.json()
                    return self._parse_response(result)

        elif data_type == "depth":
            # depth 数据通过 WebSocket 实时订阅
            return {"data_type": "depth", "note": "depth 需要 WebSocket 订阅,见下方实现"}

        raise ValueError(f"不支持的数据类型: {data_type}")

    def _parse_response(self, response: dict) -> dict:
        """标准化响应解析"""
        code = response.get("code", 0)
        if code == 0:
            return {"success": True, "data": response.get("data", [])}
        if code in (1001, 1002):
            raise PermissionError("API Key 无效,请检查环境变量 TICKDB_API_KEY")
        if code == 2002:
            raise ValueError(f"交易品种不存在: {response.get('message')}")
        if code == 3001:
            retry_after = int(response.headers.get("Retry-After", 5))
            raise RuntimeError(f"请求频率超限,请在 {retry_after} 秒后重试")
        raise RuntimeError(f"API 错误 {code}: {response.get('message')}")

⚠️ 工程笔记:SKILL 的 manifest 是给 AI Agent 看的接口契约。参数描述要足够精确(枚举值、示例值、默认值缺一不可),否则 Agent 会随机填参数。上线前建议用 pydantic 做 schema 校验,而非仅依赖 LLM 的理解。

4.3 AI Agent 调度层

import openai

# ============================
# Agent 调度层
# ============================

class QuantAgent:
    """
    量化 AI Agent:自然语言 → SKILL 调用 → 自然语言回复
    """
    SYSTEM_PROMPT = """你是一个量化交易助手。用户会提出关于行情的问题,
    你需要判断是否需要调用工具,并将工具返回的数据翻译成可操作的洞察。

    可用工具:
    - get_market_data(symbol, data_type, interval, limit): 查询 K 线历史、实时报价

    回答规范:
    - 数据要具体(时间、价格、涨跌幅)
    - 要有行情解读(趋势、异动、对比基准)
    - 不给买卖建议"""

    def __init__(self, skill: TickDBSkill):
        self.skill = skill
        self.client = openai.OpenAI(
            api_key=os.environ.get("OPENAI_API_KEY"),
            base_url=os.environ.get("OPENAI_BASE_URL", "https://api.openai.com/v1")
        )
        self.tools = [skill.manifest]

    async def chat(self, user_message: str) -> str:
        """单轮对话处理:解析意图 → 调用工具 → 返回解读"""

        messages = [
            {"role": "system", "content": self.SYSTEM_PROMPT},
            {"role": "user", "content": user_message}
        ]

        # 第一步:让 LLM 决定是否调用工具
        response = self.client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            tools=self.tools,
            tool_choice="auto"
        )

        assistant_message = response.choices[0].message

        # 第二步:如果 LLM 选择调用工具
        if assistant_message.tool_calls:
            tool_call = assistant_message.tool_calls[0]
            func_name = tool_call.function.name
            func_args = json.loads(tool_call.function.arguments)

            # ⚠️ 生产环境高频场景建议使用 aiohttp/asyncio 并发调用多个工具
            result = await self.skill.execute(**func_args)

            # 第三步:把工具结果反馈给 LLM 生成最终回复
            messages.append(assistant_message)
            messages.append({
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": json.dumps(result, ensure_ascii=False)
            })

            final_response = self.client.chat.completions.create(
                model="gpt-4o",
                messages=messages
            )
            return final_response.choices[0].message.content

        # 无需工具,直接回答
        return assistant_message.content

4.4 WebSocket 实时订阅(depth 频道)

import asyncio
import websockets
import json
import random

# ============================
# depth 频道 WebSocket 订阅
# ============================

class DepthMonitor:
    """
    订单簿深度实时监控
    ⚠️ 生产环境需实现:心跳保活、指数退避重连、限频处理
    """

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.ws_url = "wss://stream.tickdb.ai/v1/ws"

    async def subscribe(self, symbols: list[str]):
        """订阅多品种订单簿深度"""

        async with websockets.connect(
            self.ws_url + f"?api_key={self.api_key}"
        ) as ws:
            # 发送订阅指令
            subscribe_msg = {
                "cmd": "subscribe",
                "params": {
                    "channels": ["depth"],
                    "symbols": symbols
                }
            }
            await ws.send(json.dumps(subscribe_msg))

            # 维持连接,接收实时数据
            while True:
                try:
                    message = await asyncio.wait_for(ws.recv(), timeout=30)

                    # ⚠️ 发送心跳保活(部分 WebSocket 服务需要客户端主动 ping)
                    # await ws.send(json.dumps({"cmd": "ping"}))

                    data = json.loads(message)
                    if data.get("channel") == "depth":
                        await self._analyze_depth(data["data"])

                except asyncio.TimeoutError:
                    # 发送心跳保活
                    await ws.send(json.dumps({"cmd": "ping"}))
                    continue

    async def _analyze_depth(self, depth_data: dict):
        """计算买卖压力比,发出告警"""
        symbol = depth_data["symbol"]
        bids = depth_data.get("bids", [])  # [(price, volume), ...]
        asks = depth_data.get("asks", [])

        # 计算前 5 档累计量
        bid_volume = sum(float(v) for _, v in bids[:5])
        ask_volume = sum(float(v) for _, v in asks[:5])

        if ask_volume == 0:
            return

        pressure_ratio = bid_volume / ask_volume

        print(f"[{symbol}] 买卖压力比: {pressure_ratio:.2f} "
              f"(买盘累计: {bid_volume:.0f}, 卖盘累计: {ask_volume:.0f})")

        # 触发告警条件:压力比超过阈值
        if pressure_ratio > 2.5:
            print(f"🚨 【告警】{symbol} 买盘压力异常,当前压力比 {pressure_ratio:.2f}")
        elif pressure_ratio < 0.4:
            print(f"🚨 【告警】{symbol} 卖盘压力异常,当前压力比 {pressure_ratio:.2f}")

    # ============================
    # 指数退避重连封装(生产级推荐写法)
    # ============================
    async def subscribe_with_reconnect(self, symbols: list[str]):
        """
        带重连机制的订阅入口
        """
        base_delay = 1
        max_delay = 60
        retry = 0

        while True:
            try:
                await self.subscribe(symbols)
            except Exception as e:
                retry += 1
                delay = min(base_delay * (2 ** retry), max_delay)
                jitter = random.uniform(0, delay * 0.1)  # 抖动,避免惊群
                print(f"连接断开: {e},{delay + jitter:.1f}s 后重试...")
                await asyncio.sleep(delay + jitter)

⚠️ 工程笔记:心跳保活(ping/pong)和指数退避重连是 WebSocket 长期稳定运行的基础。上面的 subscribe_with_reconnect 方法展示了生产级的重连策略:每次断开后等待时间翻倍(上限 60 秒),加上随机抖动避免所有客户端同时重连。


五、实战演示:三个常见场景

5.1 场景一:查询历史 K 线并判断趋势

你:最近一周英伟达的走势怎么样?有没有跌破过 20 日均线?

Agent → 调用 get_market_data(symbol=NVDA.US, data_type=kline, interval=1d, limit=30)
Agent ← 收到 K 线数据,计算均线,返回趋势解读

Agent:
> NVDA 过去 10 个交易日收于 20 日均线上方,均线支撑在 $875 附近。
> 近一周最低点 $862,未有效跌破均线。
> 当前价格 $918,偏离均线约 4.9%,处于均线支撑确认后的反弹结构中。
> 过去 5 日成交量均值为 3.2 亿股,较 20 日均值缩量 12%,量能有所收敛。

5.2 场景二:实时监控财报发布窗口

async def earnings_watch():
    """财报发布窗口的订单簿监控"""
    api_key = os.environ.get("TICKDB_API_KEY")
    monitor = DepthMonitor(api_key)

    # 监控标的选择:财报季常见波动标的
    targets = ["NVDA.US", "AAPL.US", "MSFT.US"]

    print(f"开始监控: {targets} 的订单簿深度...")
    await monitor.subscribe_with_reconnect(targets)

# asyncio.run(earnings_watch())
输出示例:
[NVDA.US] 买卖压力比: 1.46 (买盘累计: 12400, 卖盘累计: 8490)
[NVDA.US] 买卖压力比: 2.81 (买盘累计: 23600, 卖盘累计: 8400)
🚀 【告警】NVDA.US 买盘压力异常,当前压力比 2.81
[NVDA.US] 买卖压力比: 0.26 (买盘累计: 9100, 卖盘累计: 35200)
🚀 【告警】NVDA.US 卖盘压力异常,当前压力比 0.26

从压力比 1.46 → 2.81 → 0.26,你看到了什么?

这不是随机波动。这是流动性真空(Bid/Ask 都薄)之后,多空双方在财报信息不对称下的极端博弈信号。这个信号出现的时间点,比价格反应提前 3-5 秒。

5.3 场景三:自然语言条件告警

你:当 AAPL 的日内跌幅超过 3% 且成交量超过昨日 1.5 倍时,飞书通知我

Agent → 调用 tickdb-signal-monitor SKILL,设置双条件触发器
Agent ← 配置完成,返回监控 ID

Agent:
> 已设置告警规则:
> 标的:AAPL.US
> 触发条件:日内跌幅 > 3% AND 成交量 > 前日成交量 × 1.5
> 通知方式:飞书
> 监控 ID:alert-20260415-001
> 可随时说「取消 alert-20260415-001」终止监控。

六、部署方案:按需选择

场景 推荐方案 说明
个人尝鲜 ClawHub AI 助手直接调用 零部署,5 分钟上手
个人量化 本地 Python 脚本 + 定时任务 凌晨监控,不依赖第三方
团队协作 部署为 API 服务 + 飞书/钉钉机器人 团队共享监控规则
机构级 Docker 容器 + Kubernetes + 独立数据通道 高可用保障,SLA 支持

个人开发者从「ClawHub 直接调用」起步,体验链路完整性后,再根据需求逐步升级到本地脚本或 API 服务。


七、限制与边界:SKILL 不是银弹

坦诚说清楚这套方案的边界:

SKILL 能做的

  • 把自然语言翻译成数据查询
  • 封装 WebSocket 订阅和 REST 调用的复杂度
  • 提供标准化的工具契约,让多 Agent 协作成为可能

SKILL 不能做的

  • 替你做交易决策(风险自担,且我们不做建议)
  • 预测价格方向(历史数据是后视镜,不是挡风玻璃)
  • 保证告警实时性(WebSocket 网络链路有延迟,极端行情下可能丢失几秒数据)

当前限制

  • depth 频道美股仅支持 1 档,港股和数字货币支持 10 档(外汇、贵金属暂不支持)
  • trades 接口不支持美股和 A 股(订单流分析目前仅限港股和数字货币)
  • SKILL 的条件触发逻辑在边缘节点执行,不适合高频剥头皮策略(延迟敏感场景请直接用 WebSocket 原生接入)

八、下一步行动

如果你想直接体验
在支持 ClawHub 的 AI 助手中,搜索并安装 tickdb-market-data SKILL,在对话中输入「帮我看看 AAPL 最近一周的行情」,感受自然语言到行情数据的完整链路。

如果你想跑通本地脚本

  1. 注册 TickDB(tickdb.ai,免费额度,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY
  4. 复制本文第四节的代码,替换 API Key 后直接运行

如果你需要 10 年历史 K 线做策略回测
访问 tickdb.ai 联系机构团队,了解企业级数据覆盖和数据导出方案。

如果你想搭团队级监控
发送邮件至 [email protected],说明团队规模和监控场景,获取定制化架构方案。


风险提示:本文不构成任何投资建议。AI Agent 生成的分析基于历史数据和规则引擎,市场存在不确定性,实际交易前请充分评估风险。市场有风险,投资需谨慎。


本文核心代码基于 Python 3.10+,依赖 openai/aiohttp/websockets/python-dotenv,API Key 通过环境变量注入,不从代码中硬编码。