A 股数据方案:个人开发者的务实选择

写在前面:本文聚焦于 A 股市场的数据获取方案对比,目标是帮助个人开发者在有限预算下做出信息充分的技术决策。数据本身无立场,选择哪套方案取决于你的具体场景和容忍边界。


开篇

2024 年 9 月,一个量化开发者在一个技术社区发帖:"花了 8000 块买了个 Level-2 年费账号,结果盘中发现数据延迟 3-5 秒,回测时发现分钟 K 线还缺了好几个时间点,问客服说是'数据清洗正常现象'。"

评论区炸了。有人喷券商数据贵,有人说免费数据够用,也有人反问:你到底需要什么数据,Level-1 还是 Level-2?

这三个字的区别,可能是每年省下 3000 块,还是多烧掉 8000 块的分水岭。

本文不站任何一家供应商。我们把问题拆开看:A股数据究竟有哪些类型,各路方案的真实成本和延迟是什么,做一套最小可用的数据方案要多少投入。给信息,不给建议——决策权在你。


一、问题的本质:你在买什么数据

在选数据源之前,必须先搞清楚 A 股数据的分类体系。多数人混淆了以下几个概念:

1.1 Level-1 vs Level-2:价格深度不是一回事

Level-1(基础行情)是各券商和财经平台免费提供的数据,包含:

  • 最新成交价、成交量
  • 买一价/卖一价(单一档位)
  • 当日开盘/最高/最低/收盘价

Level-2(增强行情)则提供了交易所直连或券商增强的数据源,核心差异在于:

维度 Level-1 Level-2
买卖盘深度 仅买一/卖一(共 2 档) 5/10/50 档(视档位权限)
委托队列 不包含 逐笔委托明细(仅上交所/深交所推送)
成交分解 聚合推送 逐笔成交(毫秒级时间戳)
数据源 券商二次分发 交易所直连或增强服务商

对于个人开发者,Level-2 的实际意义取决于你的策略类型:

  • 价量类策略(均线、突破):Level-1 的分钟 K 线基本够用
  • 订单流/盘口策略:Level-2 的委托队列和逐笔成交是必需品
  • 事件驱动:宏观数据、财报时间点,比盘口精度更重要

1.2 实时 vs 历史:回测和实盘是两套需求

另一个经常被忽视的维度是数据的时间用途

  • 历史数据:用于回测,需求是完整性、清洗质量、对齐精度
  • 实时数据:用于实盘或模拟,需求是延迟、稳定性、接口可用性

一个常见误区是:以为免费数据源"能用"就能"够用"。实盘延迟 5 秒的接口,回测时你感觉没问题,但高频策略的实际亏损会在滑点上吃掉全部 alpha。

下面进入各路方案的横向对比。


二、三路方案横向对比

2.1 Tushare:免费午餐的代价

Tushare是国内最流行的开源金融数据接口,覆盖股票、基金、期货、港股等,数据社区成熟,文档齐全。

核心能力

  • 历史行情数据质量较高,财务因子覆盖全面
  • 通过积分制度区分数据权限(0-2000 积分对应不同数据集)
  • Python SDK 友好,ts.pro_api() 五行代码起跑

真实限制

限制项 具体表现
实时性 无 WebSocket 推送,轮询间隔受限于积分等级(通常 3-5 秒最小间隔)
Level-2 数据 不提供逐笔委托和 Level-2 盘口深度
数据完整性 部分历史数据存在缺失(如停牌日、涨跌停日),需自行填补
稳定性 服务为社区维护,无 SLA 保证,大盘波动期偶发限频
import tushare as ts
import os

# Tushare 需要 token 注册获取,免费积分可覆盖基础行情
pro = ts.pro_api(os.environ.get("TUSHARE_TOKEN"))

# 获取日线历史(前复权),用于回测
df = pro.daily(
    ts_code="000001.SZ",
    start_date="20200101",
    end_date="20241231"
)
print(df.tail())

成本:免费(但需注册获取 token,积分获取需持续活跃或一次性购买)

适合人群:回测为主、不追求实时盘口数据、研究 A 股基本面/价量因子的个人开发者

2.2 AkShare:聚合接口的实用主义

AkShare定位是"数据聚合器",底层调用东方财富、同花顺、新浪财经等多家免费数据源,优势是数据种类极多、更新频繁。

核心能力

  • 覆盖宏观、期货、数字货币、基金等多市场
  • 实时行情通过新浪/腾讯接口,延迟约 1-3 秒(免费档)
  • 社区活跃,遇到问题容易搜到解决方案

真实限制

限制项 具体表现
实时性 依赖第三方免费接口,无稳定 SLA,接口随时可能失效
数据一致性 不同来源数据格式不统一,需要大量清洗
Level-2 完全不提供
反爬策略 多数据源持续面临反爬压力,接口稳定性差
import akshare as ak
import pandas as pd

# 获取实时分时数据(来自东方财富)
df = ak.stock_zh_a_spot_em()
print(df.columns.tolist())

# 获取个股盘口(东方财富 Level-2 模拟,非真实 Level-2)
# 注意:此接口数据来自公开页面聚合,非交易所直连
df_depth = ak.stock_bid_ask_em(symbol="000001")
print(df_depth)

成本:免费

适合人群:数据探索阶段、多市场数据爬取、不介意投入清洗时间的技术用户

2.3 第三方付费数据:延迟换深度

国内 Level-2 数据市场存在多个付费供应商,如东方财富 Choice、通联数据、Wind 等,特点是:

  • 数据全:Level-2 盘口、逐笔委托、产业链数据、因子库
  • 价格高:个人版通常 3000-10000 元/年,机构版无上限
  • 接口门槛:通常提供 GUI 或专业 API,不适合个人开发者快速集成

这类方案的核心问题是性价比对个人开发者不友好。如果你只是需要分钟级 K 线和盘口快照,Level-2 的溢价很难通过策略收益覆盖。


三、务实方案:用 TickDB 构建最小可用数据系统

在上述光谱中,TickDB提供了介于免费工具和专业供应商之间的中间档——标准化 REST/WebSocket 接口、统一的数据清洗、覆盖 A 股/港股/数字货币等多市场的实时与历史数据,定价对个人开发者可接受。

本节用完整的代码示例展示如何用 TickDB 构建一套最小可用的 A 股数据系统,涵盖:历史 K 线获取、实时行情订阅、基础盘口深度监控。

3.1 系统架构

数据层:TickDB API(统一接入)
         ├── REST  → 历史 K 线(回测用)
         └── WebSocket → 实时行情 + depth 频道(实盘/监控用)

处理层:你的 Python 应用
         ├── 数据缓存(可选 Redis)
         ├── 指标计算(买卖压力比、成交量加权价)
         └── 告警触发

展示层:飞书 Webhook / 日志 / Grafana(按需)

3.2 历史 K 线获取(REST API)

用于回测的数据必须完整、干净。TickDB 的 K 线数据经过对齐处理,停牌日自动填充(可配置),无需自行处理。

import os
import requests
import pandas as pd
from datetime import datetime, timedelta

# ============================================================
# ⚠️  TickDB REST API 规范
#  - 鉴权:Header X-API-Key,不从 URL 传递
#  - 超时:必须设置 timeout,避免挂死
#  - 限频:code 3001 → 读取 Retry-After 头
# ============================================================

TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY")
BASE_URL = "https://api.tickdb.ai/v1"

HEADERS = {
    "X-API-Key": TICKDB_API_KEY,
    "Content-Type": "application/json"
}


def fetch_klines(symbol: str, interval: str, days: int = 365) -> pd.DataFrame:
    """
    获取指定时间范围的历史 K 线数据。
    适用场景:回测数据准备、策略信号离线分析

    参数:
        symbol: 交易品种,如 "000001.SZ"(A 股格式)
        interval: K 线周期,如 "1m", "5m", "1h", "1d"
        days: 回溯天数,默认取一年
    """
    end_time = datetime.now()
    start_time = end_time - timedelta(days=days)

    url = f"{BASE_URL}/market/kline"
    params = {
        "symbol": symbol,
        "interval": interval,
        "start_time": int(start_time.timestamp()),
        "end_time": int(end_time.timestamp()),
        "limit": 1000  # 单次最大条数,分页拉取
    }

    all_data = []
    while True:
        try:
            resp = requests.get(
                url,
                headers=HEADERS,
                params=params,
                timeout=(3.05, 10)  # connect_timeout=3.05, read_timeout=10
            )
            resp.raise_for_status()
            payload = resp.json()

            # ⚠️ 标准错误处理
            code = payload.get("code", 0)
            if code == 3001:
                retry_after = int(resp.headers.get("Retry-After", 5))
                print(f"[限频] 等待 {retry_after} 秒后重试...")
                import time
                time.sleep(retry_after)
                continue
            if code != 0:
                raise RuntimeError(f"TickDB error {code}: {payload.get('message')}")

            data = payload.get("data", [])
            if not data:
                break

            all_data.extend(data)
            # 分页:若返回满 limit,说明还有下一页
            if len(data) < params["limit"]:
                break
            # 更新 start_time 为最后一条的时间戳,继续拉取
            params["start_time"] = data[-1]["time"] + 1

        except requests.exceptions.Timeout:
            raise RuntimeError("请求超时,请检查网络或增加 timeout 参数")
        except requests.exceptions.RequestException as e:
            raise RuntimeError(f"HTTP 请求失败: {e}")

    # 转为 DataFrame,展开嵌套字段
    df = pd.DataFrame(all_data)
    if df.empty:
        return df

    # 统一时间戳列名
    df["datetime"] = pd.to_datetime(df["time"], unit="s")
    df = df.sort_values("datetime").reset_index(drop=True)

    # 计算常用指标(演示用)
    df["vwap"] = df["close"]  # 简化:实际需用成交量加权
    df["volume_ma5"] = df["volume"].rolling(5).mean()

    print(f"[{symbol}] 获取 {len(df)} 条 K 线,"
          f"范围 {df['datetime'].min()} ~ {df['datetime'].max()}")
    return df


# 示例:拉取平安银行近一年日线
df = fetch_klines("000001.SZ", "1d", days=365)
print(df.tail(3))

使用说明

  • interval 支持 1m/5m/15m/30m/1h/4h/1d,与 Tushare 的 freq 参数逻辑兼容
  • A 股 symbol 格式为 代码.交易所,深交所 SZ,上交所 SH
  • 若回测周期超过接口单次限制(1000 条),需在循环中更新 start_time 分页拉取

3.3 实时行情订阅(WebSocket)

对于实盘监控和盘中信号触发,你需要 WebSocket 推送。以下代码展示完整的订阅逻辑,包含心跳保活、指数退避重连、限频自适应三个生产级必备机制。

import json
import time
import random
import threading
import websocket
import os
from datetime import datetime

TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY")

# ============================================================
# ⚠️ TickDB WebSocket 鉴权规范
#  - 鉴权:URL 参数 ?api_key=,不在 Header 中传递
#  - 心跳:服务端每 15 秒发 ping,需回复 pong
#  - 订阅格式:{"cmd": "sub", "channel": "ticker", "symbol": "000001.SZ"}
# ============================================================


class TickDBWebSocketClient:
    """
    TickDB WebSocket 客户端(生产级模板)
    包含:心跳保活、指数退避 + 抖动重连、限频自适应、优雅关闭
    """

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.ws = None
        self.running = False
        self.retry_count = 0
        self.max_retries = 10
        self.base_delay = 1        # 初始重连延迟(秒)
        self.max_delay = 60         # 最大重连延迟(秒)
        self.last_ping_time = None
        self._lock = threading.Lock()

    def connect(self):
        """建立 WebSocket 连接(带重试上限保护)"""
        if self.retry_count >= self.max_retries:
            print(f"[错误] 已达最大重试次数 ({self.max_retries}),退出")
            return

        url = f"wss://api.tickdb.ai/v1/ws?api_key={self.api_key}"
        self.ws = websocket.WebSocketApp(
            url,
            on_open=self._on_open,
            on_message=self._on_message,
            on_error=self._on_error,
            on_close=self._on_close,
            on_ping=self._on_ping,  # ⚠️ 处理服务端心跳
        )
        print(f"[{datetime.now():%H:%M:%S}] 连接 TickDB WebSocket...")
        self.ws.run_forever(ping_interval=15)  # 客户端 15 秒发一次 ping

    def _on_open(self, ws):
        print("[连接成功] 正在订阅行情...")
        self.running = True
        self.retry_count = 0

        # 订阅 ticker(实时价格)
        ws.send(json.dumps({
            "cmd": "sub",
            "channel": "ticker",
            "symbol": "000001.SZ"
        }))
        # 订阅 depth(盘口深度,仅支持 1 档)
        ws.send(json.dumps({
            "cmd": "sub",
            "channel": "depth",
            "symbol": "000001.SZ"
        }))

    def _on_message(self, ws, message):
        """解析推送数据"""
        try:
            data = json.loads(message)

            # 处理 ping:回复 pong(心跳保活)
            if data.get("type") == "ping":
                ws.send(json.dumps({"type": "pong"}))
                self.last_ping_time = time.time()
                return

            # 处理限频响应
            if data.get("code") == 3001:
                retry_after = data.get("retry_after", 5)
                print(f"[限频] 服务器要求等待 {retry_after} 秒")
                time.sleep(retry_after)
                return

            # 处理 ticker 数据
            if data.get("channel") == "ticker":
                self._handle_ticker(data)

            # 处理 depth 数据
            if data.get("channel") == "depth":
                self._handle_depth(data)

        except (json.JSONDecodeError, KeyError) as e:
            print(f"[警告] 数据解析异常: {e}, raw: {message[:100]}")

    def _handle_ticker(self, data):
        """处理价格推送(可扩展为信号触发)"""
        ticker = data.get("data", {})
        symbol = ticker.get("symbol")
        price = ticker.get("close") or ticker.get("last")
        change = ticker.get("change_pct", 0)
        ts = ticker.get("time")

        now = datetime.fromtimestamp(ts / 1000).strftime("%H:%M:%S") if ts else ""

        # 简单告警逻辑示例:涨幅超 3% 触发提醒
        if abs(change) > 3.0:
            print(f"[🚨 告警] {symbol} {now} 价格={price} 涨跌幅={change}%")
        else:
            print(f"[行情] {symbol} {now} 价格={price} 涨跌={change}%")

    def _handle_depth(self, data):
        """处理盘口深度(买卖压力分析)"""
        depth = data.get("data", {})
        bids = depth.get("bids", [])  # [(price, volume), ...]
        asks = depth.get("asks", [])

        if not bids or not asks:
            return

        # 计算买卖压力比(前 1 档)
        bid_vol = sum(v for _, v in bids[:1])
        ask_vol = sum(v for _, v in asks[:1])
        pressure_ratio = bid_vol / ask_vol if ask_vol > 0 else 0

        # 计算价差(basis)
        spread = asks[0][0] - bids[0][0]
        spread_pct = (spread / asks[0][0]) * 100 if asks[0][0] else 0

        ts = depth.get("time")
        now = datetime.fromtimestamp(ts / 1000).strftime("%H:%M:%S") if ts else ""

        print(f"[盘口 {now}] "
              f"买={bid_vol:>10.0f} 卖={ask_vol:>10.0f} "
              f"压力比={pressure_ratio:.2f} 价差={spread:.3f}({spread_pct:.3f}%)")

    def _on_ping(self, ws, message):
        """处理服务端 ping(心跳响应)"""
        ws.send(message, opcode=websocket.opcode.ping)

    def _on_error(self, ws, error):
        print(f"[错误] WebSocket 异常: {error}")

    def _on_close(self, ws, code, msg):
        """连接断开时触发重连逻辑"""
        self.running = False
        print(f"[断开] code={code} msg={msg}")

        self.retry_count += 1
        # 指数退避 + 抖动(避免惊群效应)
        delay = min(self.base_delay * (2 ** (self.retry_count - 1)), self.max_delay)
        jitter = random.uniform(0, delay * 0.1)
        total_delay = delay + jitter

        print(f"[重连] 第 {self.retry_count} 次尝试,"
              f"{total_delay:.1f} 秒后重连...")
        time.sleep(total_delay)

        # ⚠️ 生产环境高频场景建议使用 aiohttp/asyncio
        self.connect()

    def start(self):
        """启动客户端线程"""
        thread = threading.Thread(target=self.connect, daemon=True)
        thread.start()
        return thread

    def stop(self):
        """优雅关闭"""
        print("[关闭] 停止 TickDB WebSocket 客户端...")
        self.running = False
        if self.ws:
            self.ws.close()


# 启动实时监控
if __name__ == "__main__":
    client = TickDBWebSocketClient(TICKDB_API_KEY)
    client.start()

    try:
        # 主线程阻塞,可替换为信号处理或定时任务
        while True:
            time.sleep(10)
    except KeyboardInterrupt:
        client.stop()

⚠️ 工程提醒

  • 示例使用 threading 和同步 websocket 库,适合入门和中小规模订阅
  • 生产环境高频场景(>10 个标的或 <100ms 推送间隔):建议迁移至 asyncio + aiohttpasyncio-websocket
  • 上述代码已包含完整的心跳响应pong)和指数退避重连,不要在生产环境中删除

四、价值对比:一张表说清楚

以下是三个方案的核心维度对比,帮助你在成本与能力之间做权衡:

维度 Tushare AkShare TickDB
实时数据 ❌ 轮询(最小间隔 3-5 秒) ❌ 轮询(延迟 1-3 秒,不稳定) ✅ WebSocket 推送(<100ms)
历史 K 线 ✅ 覆盖广,有积分权限差异 ✅ 覆盖广,来源混杂 ✅ 10 年级别,清洗对齐
Level-2 盘口深度 ❌ 不支持 ❌ 不支持 ✅ A 股 1 档(买卖各一档)
逐笔成交(A股) ✅ 有(积分门槛) ❌ 无 ❌ 不支持(trades 接口不支持 A 股)
数据接口 Python SDK(同步) Python SDK(同步) REST + WebSocket
稳定性/SLA ❌ 社区维护,无保障 ❌ 依赖第三方随时失效 ✅ 服务商维护
成本 免费(积分需积累) 免费 付费(个人开发者可接受区间)
A 股覆盖 ✅ 全面 ✅ 全面 ✅ 支持
多市场支持 ✅ A/港/期货 ✅ A/港/期货/数字货币/宏观 ✅ A/港/数字货币/外汇/贵金属

几点说明

  1. 逐笔成交数据:如果你需要 A 股逐笔成交(tick 级),TickDB 的 trades 接口目前不支持 A 股和美股,但支持港股和数字货币
  2. Level-2 深度:TickDB 在港股和数字货币市场支持 10 档深度,A 股当前为 1 档,与真实 Level-2(5-50 档)有差距
  3. 实时性:这是 TickDB 相对免费方案的核心优势——WebSocket 推送 vs 轮询,在高频信号触发场景差距显著

五、你的场景,应该选哪个

5.1 决策树

你的策略需要 <1 秒的盘中信号触发?
├── 否 → Tushare/AkShare 免费方案够用
└── 是 → 需要实时数据

你的策略需要 Level-2 委托队列(逐笔委托)?
├── 是 → 国内 Level-2 付费供应商(成本 ¥5000+/年)
└── 否 → 进入下一步

你只需要 1 档盘口 + 分钟 K 线实时推送?
└── TickDB(成本适中,接入标准化)

你的回测数据需要超过 5 年且清洗干净?
└── TickDB 历史 K 线(覆盖 10 年,无需自行填补停牌日)

5.2 分场景配置建议

场景 推荐方案 预算参考 预期产出
量化学习/课程项目 Tushare 免费版 0 元 完成 2-3 个入门策略回测
因子研究/基本面分析 Tushare + 手动清洗 0 元 + 时间成本 因子库、回测报告
盘中信号监控(<10 标的) TickDB 个人版 ¥500-2000/年 实时监控 + 飞书告警
多市场策略(A+港+数字货币) TickDB 统一接入 ¥2000-5000/年 跨市场数据层统一管理
高频委托队列分析 国内 Level-2 供应商 ¥5000-30000/年 完整 Level-2 盘口(需确认档位权限)

时间与金钱的取舍:选免费方案节省资金,但需要投入时间做数据清洗、接口维护、异常处理。TickDB 等标准化服务则相反——用钱换稳定性和开发时间。


结语

回到开头那个帖子:8000 块的 Level-2 年费,换来的数据延迟 3-5 秒、分钟 K 线还缺数据点。

问题不在 Level-2 贵不贵,而在于你买的东西到底能不能解决你的问题

Tushare 的免费午餐适合研究和学习;TickDB 在实时性和数据标准化上填补了中间档空白;真正的 Level-2 委托队列,目前仍是付费供应商的领地。

做决策之前,先问自己三个问题:我的策略需要什么精度的数据?我愿意花多少钱还是花时间?接口的稳定性对我的策略有多重要?

答案清晰了,方案自然就出来了。


下一步行动

如果你是量化新手,刚起步做回测
Tushare 足够。注册账号,领取积分,从日线数据开始跑你的第一个策略。

如果你已经跑通了基础策略,想做盘中实时监控

  1. 访问 tickdb.ai 注册(个人版有免费额度)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY,复制本文代码即可跑起来

如果你需要跨市场数据(A股 + 港股 + 数字货币)统一接入
联系 [email protected] 了解多市场套餐方案。

如果你习惯用 AI 辅助开发
在 AI 助手中搜索安装 tickdb-market-data SKILL,用自然语言查询市场数据。


风险提示:本文不构成任何投资建议。市场有风险,投资需谨慎。数据来源和回测结果仅供参考,实际交易中需考虑流动性、滑点、费用等因素。