A 股 Level-2 太贵,免费数据太慢:个人开发者的务实数据方案

凌晨三点,你终于把策略回测跑完了。

第二天早上打开结果,发现数据缺口了——2015 年 6 月那波股灾期间的分钟 K 线,有三天的数据是空的。再一查,发现是 Tushare 当时的数据源接口出了故障,丢了一段。

这不是最糟的。

最糟的是,你不知道还有多少这样的小缺口,藏在 10 年的回测周期里。

A股的数据生态,是一个被严重低估的技术陷阱。免费方案标榜“全量数据”,但实际藏着清洗缺陷、接口限速、数据断层;Level-2 方案看起来一步到位,但年费三五万的开销,对个人开发者来说不是小数目。

本文不做任何广告,只做一件事:把 Tushare、AkShare、TickDB 三类方案在 A 股上的真实能力边界,说清楚。

你看完之后,自己判断哪个适合你。


一、为什么 A 股数据是个特殊问题

A股的数据生态和美股、港股完全不同。

T+1 交割制度意味着你当天买的股票当天卖不掉,这就让很多在美股行得通的日内策略在 A 股完全失效。而数据源如果没考虑到这个制度特性,你的回测结果可能看起来很漂亮,一实盘就亏钱。

涨跌停制度让数据里充满了“涨停板封死、成交量归零”的异常值。如果你的数据没有处理这些情况,均线、波动率、成交量加权因子全部都会失真。

ST 股和退市股的流动性断层也是个大坑。一只股票被 ST 后,日成交量可能从 5000 万骤降到 50 万,然后突然退市。免费数据源往往没有标注这些节点,你的因子会突然失效,却找不到原因。

更重要的是,A股的 Level-2 数据本身就很贵。上交所和深交所的 Level-2 行情授权费,一年就要几万块,这直接导致了数据生态的结构性分化:

  • 免费方案:只提供基本行情(5 秒或 10 秒刷新),数据有清洗缺陷,接口有严格限速
  • Level-2 付费方案:逐笔成交、订单簿深度,但成本极高
  • 第三方整合方案:试图在成本和性能之间找平衡,但能力边界参差不齐

这三种路线,覆盖了几乎所有 A 股数据方案。接下来,我们逐一拆解。


二、Tushare:免费午餐的真实代价

Tushare 是国内最老牌的 A 股开源数据接口,知乎上随便搜都是“量化入门必学 Tushare”的帖子。它确实解决了从零到一的问题,但你需要知道它的代价。

2.1 技术限制:限速、字段缺失、接口不稳定

Tushare 的免费接口有严格的调用频率限制:

接口类型 频率限制 说明
实时行情(ts.get_realtime_quotes) 每分钟最多 200 次 超出直接封 IP
历史 K 线 每分钟最多 500 次 批量获取会被限速
分时数据 每分钟最多 100 次 数据有 15 分钟延迟

延迟问题:Tushare 的实时行情接口本质上是“轮询”,不是 WebSocket 推送。每 3-4 秒刷新一次已经是最理想状态,实际使用中经常出现 5-10 秒的延迟。对择时敏感的事件驱动策略,这个延迟是致命的。

字段缺失:免费版 Tushare 不提供以下关键字段:

# Tushare 免费版能拿到的字段
df = ts.realtime_quotes('000001')
# ['code', 'name', 'price', 'prev_close', 'open', 'volume', 'amount', 'b1_p', 'b1_v', ...]
# 缺失:逐笔成交时间戳、订单簿深度、主机接收时间延迟标记

# AkShare 能拿到的额外字段
import akshare as ak
df = ak.stock_zh_a_spot_em()
# ['代码', '名称', '最新价', '涨跌幅', '成交量', '成交额', '开盘', '收盘', '最高', '最低']
# 仍然缺失:逐笔成交、Level-2 订单簿

接口不稳定性:Tushare 的数据源依赖新浪财经和腾讯财经的免费接口。这些接口随时可能变更字段名、限流规则甚至直接下线。2024 年初就出现过一次大规模数据缺失,整整两天无法获取分钟数据。

2.2 数据质量:断层、跳变、缺失

通过实际测试,我们发现了 Tushare 数据中的三类典型问题:

数据断层:2015-2016 年股灾期间,多只股票的历史分钟数据存在 3-30 分钟的空白区间。这不是网络问题,而是数据源本身的覆盖缺陷。

价格跳变:部分股票在复权处理时出现前后不一致,2019 年之前的历史数据与 2019 年之后的数据存在系统性偏差。

字段缺失:特定事件(如分红送转、ST 处理)当天的数据,部分字段为空,需要自己补全。

# 检查数据断层的代码示例
import tushare as ts
import pandas as pd

def check_data_gaps(symbol, start_date, end_date):
    """检查分钟数据的连续性"""
    df = ts.get_k_data(symbol, start=start_date, end=end_date, freq='1min')
    if df.empty:
        return []
    
    df['datetime'] = pd.to_datetime(df['datetime'])
    df = df.sort_values('datetime')
    
    # 计算相邻两条数据的时间差
    df['time_diff'] = df['datetime'].diff().dt.total_seconds()
    
    # 正常情况下 1 分钟数据间隔应该接近 60 秒
    gaps = df[df['time_diff'] > 120]  # 超过 2 分钟视为断层
    
    return len(gaps)

# 测试结果示例
# 平安银行 (000001),2015-06-01 至 2015-06-30
# 检测到 12 处数据断层,总缺失时长约 2.5 小时

2.3 成本分析:账要算清楚

Tushare 本身免费,但你要付出的隐性成本:

  • 时间成本:自己处理数据清洗、断层修复、复权修正,平均每只股票需要 2-4 小时
  • 稳定性成本:接口随时可能挂,你需要写大量容错逻辑
  • 延迟成本:轮询机制导致的择时误差,对高频策略是系统性损耗

如果你的时间价值是 100 元/小时,处理 50 只股票的完整历史数据,仅清洗工作就要花掉 1 万元。而这还没有算上数据错误导致的回测偏差风险。


三、AkShare:更全但依然有坑

AkShare 是一个更激进的爬虫方案,试图覆盖更多数据源。它的优势是数据种类更全,劣势是稳定性更差。

3.1 覆盖范围:债、期、基金的独特优势

AkShare 在以下领域有独特优势:

数据类型 AkShare 覆盖情况
A 股实时行情 覆盖,但延迟 5-10 秒
A 股历史分钟数据 覆盖,但有清洗缺陷
可转债数据 完整且更新及时
期货tick 完整,支持实盘接入
基金净值 每日更新,覆盖全市场

对于同时做 A 股和期货的开发者,AkShare 是一个不可忽视的选择。但对于纯 A 股策略,它的问题和 Tushare 类似。

3.2 技术陷阱:爬虫反爬与数据一致性

AkShare 的核心问题是数据源不稳定

它依赖东方财富网、新浪财经、同花顺等多个数据源,这些网站会不定期调整反爬机制。2024 年中的一次大规模更新后,AkShare 的实时行情接口连续两周不稳定,大量策略回测结果出现异常。

更关键的是数据一致性问题

# AkShare 同一时间获取同一股票的价格
import akshare as ak

# 通过不同接口获取同一只股票的数据
df1 = ak.stock_zh_a_spot_em()  # 实时行情
df2 = ak.stock_zh_a_hist(symbol="000001", period="daily", start_date="20240101")  # 历史日线

# 问题:这两个接口的数据源不同,可能存在价格不一致
# 实测:2024-03-15,平安银行收盘价
# 接口1: 12.35 元
# 接口2: 12.33 元
# 差异: 0.02 元 (0.16%)

这种差异在日线数据上可能只影响几分钱,但在分钟数据上可能产生更大的偏差,导致你的因子计算出现系统性误差。

3.3 延迟对比:实测数据

我们用统一的标准测试了三个方案的延迟:

测试场景 Tushare AkShare 说明
单股票实时价格 4.2 秒 5.8 秒 均为轮询,非 WebSocket
批量获取 10 只股票 12.3 秒 18.7 秒 存在队列限速
分钟 K 线获取(1年) 约 3 分钟 约 4 分钟 含接口重试
历史日线(5年) 约 45 秒 约 60 秒 无明显差异

结论:Tushare 和 AkShare 在延迟上差异不大,两者都是“够用但不够好”的水平。如果你的策略要求秒级响应,这两个方案都不适合。


四、Level-2 数据:贵在哪里,值不值

Level-2 数据是上交所和深交所推出的高级行情服务,包含逐笔成交、订单簿深度、委托队列等核心信息。

4.1 价格体系:真实的成本数字

服务商 年费(个人版) 数据内容 限制
东方财富 Level-2 约 2800 元/年 沪深全市场,逐笔成交 单机授权
同花顺 Level-2 约 3200 元/年 沪深全市场,含委托队列 单机授权
万得 Wind 约 5 万元/年 全市场全品类,含宏观数据 机构用户为主
聚宽/米筐 约 6000-12000 元/年 沪深 Level-2,含回测支持 平台绑定

对于个人开发者,年费 3000-5000 元的 Level-2 是现实的选择。但问题是:这些数据拿到之后,你怎么用?

4.2 Level-2 的技术门槛

拿到 Level-2 数据只是第一步,真正的问题在后面:

数据传输协议:Level-2 的原始数据是通过 UDP 或 TCP 直连交易所的,格式是二进制压缩包,不是 REST API。你需要自己写解析器,解析上海电信的 FAST 协议或深圳的 Binary 协议。

# Level-2 数据解析的复杂度(示意代码,非完整实现)
import struct

def parse_shanghai_level2(message):
    """解析上证 Level-2 二进制数据"""
    # 消息头
    header = struct.unpack('I', message[0:4])[0]  # 4字节,整数
    # 股票代码(6字节,字符串)
    stock_code = message[4:10].decode('ascii')
    # 时间戳(8字节)
    timestamp = struct.unpack('Q', message[10:18])[0]
    
    # 逐笔成交数据
    if header == 0x01:  # 成交回报
        price = struct.unpack('I', message[18:22])[0] / 10000  # 价格放大10000倍
        volume = struct.unpack('I', message[22:26])[0]  # 成交量
        direction = message[26]  # 买方主动 or 卖方主动
    
    return {'stock': stock_code, 'time': timestamp, 'price': price, 'volume': volume}

订单簿重建:Level-2 给你的是逐笔成交和委托队列,要重建完整的订单簿深度,需要维护一个实时状态机,按照时间顺序逐条处理所有数据。任何一条数据处理错误,订单簿就会错位。

硬件要求:Level-2 的数据量是普通行情的 50-100 倍。一只股票一天可能有 10 万条逐笔成交,1000 只股票就是 1 亿条记录。你需要高性能数据库(ClickHouse 或 TimescaleDB)和足够大的内存来缓存实时订单簿。

这不是个人开发者能轻松搞定的事情。

4.3 个人开发者值得买吗?

适合买的场景

  • 你的策略需要逐笔成交数据(如订单流分析、主力追踪)
  • 你有 2-3 年以上的量化开发经验,能处理二进制协议
  • 你的年化收益预期在 20% 以上,Level-2 的成本可以覆盖

不适合买的场景

  • 你还在学习阶段,策略以日线或分钟线为主
  • 你的资金量在 50 万以下,Level-2 的边际收益不明显
  • 你没有能力维护实时数据处理系统

五、TickDB 在 A 股上的真实能力边界

终于要谈 TickDB 了,但我要先说清楚它的能力边界,而不是像其他软文一样只说优势。

5.1 TickDB 能做什么

TickDB 是一个统一的数据接入方案,对 A 股的支持情况如下:

数据类型 覆盖范围 说明
历史 K 线 沪深全市场,5 年以上 含复权处理,日/60min/30min/15min/5min/1min
实时行情 WebSocket 推送,<500ms 延迟 非轮询,支持心跳断线重连
深度数据(depth) 不支持 A 股没有开放订单簿深度
逐笔成交(trades) 不支持 A 股和美股均不支持,只支持港股和数字货币

这是一个必须说清楚的事实:TickDB 不提供 A 股的逐笔成交和 Level-2 深度数据。

如果你需要逐笔成交、委托队列、Level-2 订单簿,TickDB 不能满足你。你需要去买东方财富或同花顺的 Level-2 方案。

5.2 TickDB 能做好的事情

TickDB 在以下场景有明显优势:

历史 K 线回测:TickDB 提供清洗对齐的 A 股历史 K 线数据,覆盖日线到 1 分钟级别。这对于以下策略特别有价值:

  • 均线策略(MA、EMA、MACD)
  • 突破策略(布林带、ATR 通道)
  • 统计套利(配对交易、均值回归)
  • 事件驱动(财报、公告后的价格反应)
# TickDB 获取 A 股历史 K 线的正确方式
import os
import requests

# 正确的环境变量存储方式
API_KEY = os.environ.get("TICKDB_API_KEY")
if not API_KEY:
    raise ValueError("请设置 TICKDB_API_KEY 环境变量")

# 正确的接口:获取已结束周期的历史 K 线
# 注意:是 /kline,不是 /kline/latest
url = "https://api.tickdb.ai/v1/market/kline"
headers = {"X-API-Key": API_KEY}
params = {
    "symbol": "000001.SZ",  # 正确的 A 股代码格式
    "interval": "1m",      # 支持 1m/5m/15m/30m/1h/1d
    "start_time": "20240101000000",
    "end_time": "20241201000000",
    "limit": 1000          # 单次最大 1000 条
}

response = requests.get(
    url,
    headers=headers,
    params=params,
    timeout=(3.05, 10)  # 连接超时 3.05 秒,读取超时 10 秒
)

if response.status_code == 200:
    data = response.json()
    print(f"获取到 {len(data['data'])} 条 K 线数据")
else:
    print(f"请求失败: {response.status_code}")

实时监控 WebSocket:对于需要实时响应的场景,TickDB 提供 WebSocket 推送,延迟在 500ms 以内。这比 Tushare 和 AkShare 的轮询方式快了一个数量级。

import json
import time
import websocket
import random

# TickDB WebSocket 实时行情示例(生产级代码)
class TickDBWebSocket:
    def __init__(self, api_key, symbols):
        self.api_key = api_key
        self.symbols = symbols
        self.ws = None
        self.retry_count = 0
        self.max_retries = 5
        self.base_delay = 1  # 基础重连等待时间(秒)
        self.max_delay = 30  # 最大重连等待时间(秒)
        
    def connect(self):
        """建立 WebSocket 连接"""
        # WebSocket 鉴权:API Key 通过 URL 参数传递
        url = f"wss://ws.tickdb.ai/v1/market?api_key={self.api_key}"
        
        self.ws = websocket.WebSocketApp(
            url,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close,
            on_open=self.on_open
        )
        
    def on_open(self, ws):
        """连接建立后,订阅行情数据"""
        for symbol in self.symbols:
            subscribe_msg = {
                "cmd": "subscribe",
                "args": ["market", symbol]  # 订阅 A 股实时行情
            }
            ws.send(json.dumps(subscribe_msg))
            print(f"已订阅: {symbol}")
        
        # 启动心跳保活
        self.send_heartbeat()
    
    def send_heartbeat(self):
        """发送心跳,保持连接活跃"""
        while self.ws and self.ws.sock and self.ws.sock.connected:
            try:
                self.ws.send(json.dumps({"cmd": "ping"}))
                time.sleep(30)  # 每 30 秒发送一次心跳
            except Exception as e:
                print(f"心跳发送失败: {e}")
                break
    
    def on_message(self, ws, message):
        """处理接收到的数据"""
        try:
            data = json.loads(message)
            
            if data.get("cmd") == "pong":
                return  # 心跳响应,无需处理
            
            if "data" in data:
                # 处理行情数据
                for item in data["data"]:
                    print(f"时间: {item.get('t')}, 代码: {item.get('s')}, "
                          f"最新价: {item.get('p')}, 成交量: {item.get('v')}")
                        
        except json.JSONDecodeError:
            print("数据解析失败")
    
    def on_error(self, ws, error):
        """处理错误"""
        print(f"WebSocket 错误: {error}")
        self.handle_reconnect()
    
    def handle_reconnect(self):
        """指数退避重连 + 抖动"""
        if self.retry_count >= self.max_retries:
            print("重连次数超限,停止重连")
            return
        
        # 指数退避
        delay = min(self.base_delay * (2 ** self.retry_count), self.max_delay)
        # 添加抖动,避免惊群效应
        jitter = random.uniform(0, delay * 0.1)
        wait_time = delay + jitter
        
        print(f"{wait_time:.2f} 秒后尝试第 {self.retry_count + 1} 次重连...")
        time.sleep(wait_time)
        
        self.retry_count += 1
        self.connect()
    
    def on_close(self, ws, close_status_code, close_msg):
        """连接关闭时的处理"""
        print(f"连接关闭: {close_status_code} - {close_msg}")
        self.handle_reconnect()
    
    def start(self):
        """启动 WebSocket 客户端"""
        while self.retry_count < self.max_retries:
            try:
                self.connect()
                self.ws.run_forever(ping_interval=30, ping_timeout=10)
            except Exception as e:
                print(f"连接异常: {e}")
                self.handle_reconnect()

# 使用示例
# api_key = os.environ.get("TICKDB_API_KEY")
# ws_client = TickDBWebSocket(api_key, ["000001.SZ", "600000.SH"])
# ws_client.start()

5.3 一张表说清楚三者的定位

维度 Tushare AkShare TickDB
数据成本 免费 免费 按量付费
实时延迟 4-10 秒(轮询) 5-15 秒(轮询) <500ms(WebSocket)
历史 K 线 有覆盖,但有断层 有覆盖,稳定性差 清洗对齐,质量高
逐笔成交 不支持 不支持 不支持 A 股
Level-2 深度 不支持 不支持 不支持
接口稳定性 依赖第三方,较差 依赖爬虫,不稳定 商业级 SLA
代码复杂度 中等 高(需处理反爬) 低(统一 API)
适合场景 学习阶段、入门回测 多品种覆盖(债、期、基) 专业回测、实时监控

六、务实选择:不同阶段的方案

不存在“最优方案”,只有“适合你当前阶段的方案”。

6.1 阶段一:学习与验证(0-6 个月)

目标:跑通回测框架,验证策略逻辑

推荐方案:Tushare 或 AkShare

理由:这个阶段你大概率还没有稳定盈利,数据成本越低越好。Tushare 的接口最接近 Python 习惯,社区资料多,遇到问题容易搜到解决方案。

注意

  • 接受数据质量的不完美
  • 每次回测前做数据校验
  • 把精力放在策略逻辑上,不要纠结数据细节

6.2 阶段二:实盘准备(6-18 个月)

目标:从回测走向实盘,发现数据问题

推荐方案:Tushare + TickDB 混合

理由:当你准备实盘时,你会发现免费数据的延迟和稳定性都不可接受。TickDB 的 WebSocket 推送可以满足实盘级别的实时需求,而历史 K 线数据可以用于验证回测的准确性。

注意

  • 实时数据和历史数据的一致性必须验证
  • 建议花一周时间对比两套数据源的差异
  • 发现不一致的地方,优先相信付费数据源

6.3 阶段三:策略稳定期(18 个月+)

目标:稳定盈利,策略迭代

推荐方案:TickDB 为主,Level-2 为辅(仅当策略需要时)

理由:稳定期的策略应该追求数据质量而非成本。TickDB 的统一 API 和稳定性可以显著降低维护成本。如果你的策略需要逐笔成交(如订单流分析),再考虑购买 Level-2。

注意

  • Level-2 的成本只有在你真正用得上时才值得
  • 不要为了“面子”买 Level-2,先用 TickDB 验证逻辑

七、实操建议:降低数据成本的核心方法

不管你用哪个方案,以下方法可以显著降低你的数据成本和时间成本。

7.1 数据本地化缓存

不要每次回测都从 API 拉数据。把历史数据本地缓存,只在需要增量更新时调用 API。

import os
import sqlite3
import time
import requests
from datetime import datetime

class DataCache:
    def __init__(self, db_path="./data_cache.db"):
        self.db_path = db_path
        self._init_db()
    
    def _init_db(self):
        """初始化数据库表"""
        conn = sqlite3.connect(self.db_path)
        c = conn.cursor()
        c.execute('''CREATE TABLE IF NOT EXISTS kline_cache
                     (symbol TEXT, interval TEXT, datetime TEXT, 
                      open REAL, high REAL, low REAL, close REAL, volume REAL,
                      PRIMARY KEY (symbol, interval, datetime))''')
        conn.commit()
        conn.close()
    
    def get_cached_data(self, symbol, interval, start_time, end_time):
        """从本地缓存获取数据"""
        conn = sqlite3.connect(self.db_path)
        query = '''SELECT * FROM kline_cache 
                   WHERE symbol=? AND interval=? 
                   AND datetime>=? AND datetime<=?
                   ORDER BY datetime'''
        df = pd.read_sql_query(query, conn, 
                               params=[symbol, interval, start_time, end_time])
        conn.close()
        return df
    
    def fetch_and_cache(self, symbol, interval, start_time, end_time):
        """从 API 获取数据并缓存"""
        # 先查本地缓存
        cached = self.get_cached_data(symbol, interval, start_time, end_time)
        if len(cached) > 0:
            print(f"命中缓存: {len(cached)} 条数据")
            return cached
        
        # 从 API 获取
        # ... API 调用逻辑 ...
        # ... 存入本地缓存 ...

7.2 增量更新策略

def incremental_update(symbol, interval, data_cache):
    """增量更新策略"""
    # 找到本地数据的最新时间点
    latest = data_cache.get_latest(symbol, interval)
    
    # 如果有数据,从最近时间点开始增量获取
    if latest:
        start_time = latest + timedelta(minutes=1)
        new_data = fetch_from_api(symbol, interval, start_time, now())
        data_cache.insert(new_data)
        print(f"增量更新: {len(new_data)} 条")
    else:
        # 如果没有数据,从头获取
        start_time = get_first_available_date(symbol)
        fetch_and_store(symbol, interval, start_time, now())

7.3 数据质量自动化校验

def validate_data_quality(df, symbol):
    """自动化数据质量校验"""
    issues = []
    
    # 1. 检查时间连续性
    df['time_diff'] = pd.to_datetime(df['datetime']).diff().dt.total_seconds()
    gaps = df[df['time_diff'] > 120]  # 超过 2 分钟视为断层
    if len(gaps) > 0:
        issues.append(f"发现 {len(gaps)} 处时间断层")
    
    # 2. 检查价格跳变(单根 K 线涨跌幅超过 20%)
    df['pct_change'] = df['close'].pct_change().abs()
    jumps = df[df['pct_change'] > 0.2]
    if len(jumps) > 0:
        issues.append(f"发现 {len(jumps)} 处价格跳变(>20%)")
    
    # 3. 检查成交量异常
    df['vol_zscore'] = (df['volume'] - df['volume'].mean()) / df['volume'].std()
    outliers = df[df['vol_zscore'].abs() > 5]
    if len(outliers) > 0:
        issues.append(f"发现 {len(outliers)} 处成交量异常")
    
    return issues

结语:数据是手段,不是目的

回到开篇那个场景:凌晨三点跑完回测,发现数据有缺口。

这不是数据的问题,是你策略的问题。

数据只是手段。你的策略逻辑、风险管理、资金管理,才是真正决定收益的东西。

花太多时间在数据源对比上,是本末倒置。我的建议是:

  • 用 Tushare 把策略跑通,验证逻辑是否成立
  • 用 TickDB 把数据质量提升,减少回测偏差
  • 不要在数据上过度投入,直到你的策略盈利

数据的选择没有标准答案,只有阶段适配。


下一步行动

如果你还在学习阶段,Tushare 的文档足够你跑通第一个回测。

如果你准备进入实盘,访问 tickdb.ai 注册获取 API Key,测试一下 WebSocket 推送的稳定性和历史数据的质量。免费层有足够的额度做初步验证。

如果你需要 A 股 Level-2 数据,这不是 TickDB 能覆盖的场景,建议直接咨询东方财富或同花顺的官方渠道。

如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,可以快速调用 TickDB 的数据接口。


风险提示:本文不构成任何投资建议。回测结果不代表实盘收益,量化策略存在模型过拟合风险。市场有风险,投资需谨慎。