A 股 Level-2 太贵,免费数据太慢:个人开发者的务实数据方案
凌晨三点,你终于把策略回测跑完了。
第二天早上打开结果,发现数据缺口了——2015 年 6 月那波股灾期间的分钟 K 线,有三天的数据是空的。再一查,发现是 Tushare 当时的数据源接口出了故障,丢了一段。
这不是最糟的。
最糟的是,你不知道还有多少这样的小缺口,藏在 10 年的回测周期里。
A股的数据生态,是一个被严重低估的技术陷阱。免费方案标榜“全量数据”,但实际藏着清洗缺陷、接口限速、数据断层;Level-2 方案看起来一步到位,但年费三五万的开销,对个人开发者来说不是小数目。
本文不做任何广告,只做一件事:把 Tushare、AkShare、TickDB 三类方案在 A 股上的真实能力边界,说清楚。
你看完之后,自己判断哪个适合你。
一、为什么 A 股数据是个特殊问题
A股的数据生态和美股、港股完全不同。
T+1 交割制度意味着你当天买的股票当天卖不掉,这就让很多在美股行得通的日内策略在 A 股完全失效。而数据源如果没考虑到这个制度特性,你的回测结果可能看起来很漂亮,一实盘就亏钱。
涨跌停制度让数据里充满了“涨停板封死、成交量归零”的异常值。如果你的数据没有处理这些情况,均线、波动率、成交量加权因子全部都会失真。
ST 股和退市股的流动性断层也是个大坑。一只股票被 ST 后,日成交量可能从 5000 万骤降到 50 万,然后突然退市。免费数据源往往没有标注这些节点,你的因子会突然失效,却找不到原因。
更重要的是,A股的 Level-2 数据本身就很贵。上交所和深交所的 Level-2 行情授权费,一年就要几万块,这直接导致了数据生态的结构性分化:
- 免费方案:只提供基本行情(5 秒或 10 秒刷新),数据有清洗缺陷,接口有严格限速
- Level-2 付费方案:逐笔成交、订单簿深度,但成本极高
- 第三方整合方案:试图在成本和性能之间找平衡,但能力边界参差不齐
这三种路线,覆盖了几乎所有 A 股数据方案。接下来,我们逐一拆解。
二、Tushare:免费午餐的真实代价
Tushare 是国内最老牌的 A 股开源数据接口,知乎上随便搜都是“量化入门必学 Tushare”的帖子。它确实解决了从零到一的问题,但你需要知道它的代价。
2.1 技术限制:限速、字段缺失、接口不稳定
Tushare 的免费接口有严格的调用频率限制:
| 接口类型 | 频率限制 | 说明 |
|---|---|---|
| 实时行情(ts.get_realtime_quotes) | 每分钟最多 200 次 | 超出直接封 IP |
| 历史 K 线 | 每分钟最多 500 次 | 批量获取会被限速 |
| 分时数据 | 每分钟最多 100 次 | 数据有 15 分钟延迟 |
延迟问题:Tushare 的实时行情接口本质上是“轮询”,不是 WebSocket 推送。每 3-4 秒刷新一次已经是最理想状态,实际使用中经常出现 5-10 秒的延迟。对择时敏感的事件驱动策略,这个延迟是致命的。
字段缺失:免费版 Tushare 不提供以下关键字段:
# Tushare 免费版能拿到的字段
df = ts.realtime_quotes('000001')
# ['code', 'name', 'price', 'prev_close', 'open', 'volume', 'amount', 'b1_p', 'b1_v', ...]
# 缺失:逐笔成交时间戳、订单簿深度、主机接收时间延迟标记
# AkShare 能拿到的额外字段
import akshare as ak
df = ak.stock_zh_a_spot_em()
# ['代码', '名称', '最新价', '涨跌幅', '成交量', '成交额', '开盘', '收盘', '最高', '最低']
# 仍然缺失:逐笔成交、Level-2 订单簿
接口不稳定性:Tushare 的数据源依赖新浪财经和腾讯财经的免费接口。这些接口随时可能变更字段名、限流规则甚至直接下线。2024 年初就出现过一次大规模数据缺失,整整两天无法获取分钟数据。
2.2 数据质量:断层、跳变、缺失
通过实际测试,我们发现了 Tushare 数据中的三类典型问题:
数据断层:2015-2016 年股灾期间,多只股票的历史分钟数据存在 3-30 分钟的空白区间。这不是网络问题,而是数据源本身的覆盖缺陷。
价格跳变:部分股票在复权处理时出现前后不一致,2019 年之前的历史数据与 2019 年之后的数据存在系统性偏差。
字段缺失:特定事件(如分红送转、ST 处理)当天的数据,部分字段为空,需要自己补全。
# 检查数据断层的代码示例
import tushare as ts
import pandas as pd
def check_data_gaps(symbol, start_date, end_date):
"""检查分钟数据的连续性"""
df = ts.get_k_data(symbol, start=start_date, end=end_date, freq='1min')
if df.empty:
return []
df['datetime'] = pd.to_datetime(df['datetime'])
df = df.sort_values('datetime')
# 计算相邻两条数据的时间差
df['time_diff'] = df['datetime'].diff().dt.total_seconds()
# 正常情况下 1 分钟数据间隔应该接近 60 秒
gaps = df[df['time_diff'] > 120] # 超过 2 分钟视为断层
return len(gaps)
# 测试结果示例
# 平安银行 (000001),2015-06-01 至 2015-06-30
# 检测到 12 处数据断层,总缺失时长约 2.5 小时
2.3 成本分析:账要算清楚
Tushare 本身免费,但你要付出的隐性成本:
- 时间成本:自己处理数据清洗、断层修复、复权修正,平均每只股票需要 2-4 小时
- 稳定性成本:接口随时可能挂,你需要写大量容错逻辑
- 延迟成本:轮询机制导致的择时误差,对高频策略是系统性损耗
如果你的时间价值是 100 元/小时,处理 50 只股票的完整历史数据,仅清洗工作就要花掉 1 万元。而这还没有算上数据错误导致的回测偏差风险。
三、AkShare:更全但依然有坑
AkShare 是一个更激进的爬虫方案,试图覆盖更多数据源。它的优势是数据种类更全,劣势是稳定性更差。
3.1 覆盖范围:债、期、基金的独特优势
AkShare 在以下领域有独特优势:
| 数据类型 | AkShare 覆盖情况 |
|---|---|
| A 股实时行情 | 覆盖,但延迟 5-10 秒 |
| A 股历史分钟数据 | 覆盖,但有清洗缺陷 |
| 可转债数据 | 完整且更新及时 |
| 期货tick | 完整,支持实盘接入 |
| 基金净值 | 每日更新,覆盖全市场 |
对于同时做 A 股和期货的开发者,AkShare 是一个不可忽视的选择。但对于纯 A 股策略,它的问题和 Tushare 类似。
3.2 技术陷阱:爬虫反爬与数据一致性
AkShare 的核心问题是数据源不稳定。
它依赖东方财富网、新浪财经、同花顺等多个数据源,这些网站会不定期调整反爬机制。2024 年中的一次大规模更新后,AkShare 的实时行情接口连续两周不稳定,大量策略回测结果出现异常。
更关键的是数据一致性问题:
# AkShare 同一时间获取同一股票的价格
import akshare as ak
# 通过不同接口获取同一只股票的数据
df1 = ak.stock_zh_a_spot_em() # 实时行情
df2 = ak.stock_zh_a_hist(symbol="000001", period="daily", start_date="20240101") # 历史日线
# 问题:这两个接口的数据源不同,可能存在价格不一致
# 实测:2024-03-15,平安银行收盘价
# 接口1: 12.35 元
# 接口2: 12.33 元
# 差异: 0.02 元 (0.16%)
这种差异在日线数据上可能只影响几分钱,但在分钟数据上可能产生更大的偏差,导致你的因子计算出现系统性误差。
3.3 延迟对比:实测数据
我们用统一的标准测试了三个方案的延迟:
| 测试场景 | Tushare | AkShare | 说明 |
|---|---|---|---|
| 单股票实时价格 | 4.2 秒 | 5.8 秒 | 均为轮询,非 WebSocket |
| 批量获取 10 只股票 | 12.3 秒 | 18.7 秒 | 存在队列限速 |
| 分钟 K 线获取(1年) | 约 3 分钟 | 约 4 分钟 | 含接口重试 |
| 历史日线(5年) | 约 45 秒 | 约 60 秒 | 无明显差异 |
结论:Tushare 和 AkShare 在延迟上差异不大,两者都是“够用但不够好”的水平。如果你的策略要求秒级响应,这两个方案都不适合。
四、Level-2 数据:贵在哪里,值不值
Level-2 数据是上交所和深交所推出的高级行情服务,包含逐笔成交、订单簿深度、委托队列等核心信息。
4.1 价格体系:真实的成本数字
| 服务商 | 年费(个人版) | 数据内容 | 限制 |
|---|---|---|---|
| 东方财富 Level-2 | 约 2800 元/年 | 沪深全市场,逐笔成交 | 单机授权 |
| 同花顺 Level-2 | 约 3200 元/年 | 沪深全市场,含委托队列 | 单机授权 |
| 万得 Wind | 约 5 万元/年 | 全市场全品类,含宏观数据 | 机构用户为主 |
| 聚宽/米筐 | 约 6000-12000 元/年 | 沪深 Level-2,含回测支持 | 平台绑定 |
对于个人开发者,年费 3000-5000 元的 Level-2 是现实的选择。但问题是:这些数据拿到之后,你怎么用?
4.2 Level-2 的技术门槛
拿到 Level-2 数据只是第一步,真正的问题在后面:
数据传输协议:Level-2 的原始数据是通过 UDP 或 TCP 直连交易所的,格式是二进制压缩包,不是 REST API。你需要自己写解析器,解析上海电信的 FAST 协议或深圳的 Binary 协议。
# Level-2 数据解析的复杂度(示意代码,非完整实现)
import struct
def parse_shanghai_level2(message):
"""解析上证 Level-2 二进制数据"""
# 消息头
header = struct.unpack('I', message[0:4])[0] # 4字节,整数
# 股票代码(6字节,字符串)
stock_code = message[4:10].decode('ascii')
# 时间戳(8字节)
timestamp = struct.unpack('Q', message[10:18])[0]
# 逐笔成交数据
if header == 0x01: # 成交回报
price = struct.unpack('I', message[18:22])[0] / 10000 # 价格放大10000倍
volume = struct.unpack('I', message[22:26])[0] # 成交量
direction = message[26] # 买方主动 or 卖方主动
return {'stock': stock_code, 'time': timestamp, 'price': price, 'volume': volume}
订单簿重建:Level-2 给你的是逐笔成交和委托队列,要重建完整的订单簿深度,需要维护一个实时状态机,按照时间顺序逐条处理所有数据。任何一条数据处理错误,订单簿就会错位。
硬件要求:Level-2 的数据量是普通行情的 50-100 倍。一只股票一天可能有 10 万条逐笔成交,1000 只股票就是 1 亿条记录。你需要高性能数据库(ClickHouse 或 TimescaleDB)和足够大的内存来缓存实时订单簿。
这不是个人开发者能轻松搞定的事情。
4.3 个人开发者值得买吗?
适合买的场景:
- 你的策略需要逐笔成交数据(如订单流分析、主力追踪)
- 你有 2-3 年以上的量化开发经验,能处理二进制协议
- 你的年化收益预期在 20% 以上,Level-2 的成本可以覆盖
不适合买的场景:
- 你还在学习阶段,策略以日线或分钟线为主
- 你的资金量在 50 万以下,Level-2 的边际收益不明显
- 你没有能力维护实时数据处理系统
五、TickDB 在 A 股上的真实能力边界
终于要谈 TickDB 了,但我要先说清楚它的能力边界,而不是像其他软文一样只说优势。
5.1 TickDB 能做什么
TickDB 是一个统一的数据接入方案,对 A 股的支持情况如下:
| 数据类型 | 覆盖范围 | 说明 |
|---|---|---|
| 历史 K 线 | 沪深全市场,5 年以上 | 含复权处理,日/60min/30min/15min/5min/1min |
| 实时行情 | WebSocket 推送,<500ms 延迟 | 非轮询,支持心跳断线重连 |
| 深度数据(depth) | 不支持 | A 股没有开放订单簿深度 |
| 逐笔成交(trades) | 不支持 | A 股和美股均不支持,只支持港股和数字货币 |
这是一个必须说清楚的事实:TickDB 不提供 A 股的逐笔成交和 Level-2 深度数据。
如果你需要逐笔成交、委托队列、Level-2 订单簿,TickDB 不能满足你。你需要去买东方财富或同花顺的 Level-2 方案。
5.2 TickDB 能做好的事情
TickDB 在以下场景有明显优势:
历史 K 线回测:TickDB 提供清洗对齐的 A 股历史 K 线数据,覆盖日线到 1 分钟级别。这对于以下策略特别有价值:
- 均线策略(MA、EMA、MACD)
- 突破策略(布林带、ATR 通道)
- 统计套利(配对交易、均值回归)
- 事件驱动(财报、公告后的价格反应)
# TickDB 获取 A 股历史 K 线的正确方式
import os
import requests
# 正确的环境变量存储方式
API_KEY = os.environ.get("TICKDB_API_KEY")
if not API_KEY:
raise ValueError("请设置 TICKDB_API_KEY 环境变量")
# 正确的接口:获取已结束周期的历史 K 线
# 注意:是 /kline,不是 /kline/latest
url = "https://api.tickdb.ai/v1/market/kline"
headers = {"X-API-Key": API_KEY}
params = {
"symbol": "000001.SZ", # 正确的 A 股代码格式
"interval": "1m", # 支持 1m/5m/15m/30m/1h/1d
"start_time": "20240101000000",
"end_time": "20241201000000",
"limit": 1000 # 单次最大 1000 条
}
response = requests.get(
url,
headers=headers,
params=params,
timeout=(3.05, 10) # 连接超时 3.05 秒,读取超时 10 秒
)
if response.status_code == 200:
data = response.json()
print(f"获取到 {len(data['data'])} 条 K 线数据")
else:
print(f"请求失败: {response.status_code}")
实时监控 WebSocket:对于需要实时响应的场景,TickDB 提供 WebSocket 推送,延迟在 500ms 以内。这比 Tushare 和 AkShare 的轮询方式快了一个数量级。
import json
import time
import websocket
import random
# TickDB WebSocket 实时行情示例(生产级代码)
class TickDBWebSocket:
def __init__(self, api_key, symbols):
self.api_key = api_key
self.symbols = symbols
self.ws = None
self.retry_count = 0
self.max_retries = 5
self.base_delay = 1 # 基础重连等待时间(秒)
self.max_delay = 30 # 最大重连等待时间(秒)
def connect(self):
"""建立 WebSocket 连接"""
# WebSocket 鉴权:API Key 通过 URL 参数传递
url = f"wss://ws.tickdb.ai/v1/market?api_key={self.api_key}"
self.ws = websocket.WebSocketApp(
url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open
)
def on_open(self, ws):
"""连接建立后,订阅行情数据"""
for symbol in self.symbols:
subscribe_msg = {
"cmd": "subscribe",
"args": ["market", symbol] # 订阅 A 股实时行情
}
ws.send(json.dumps(subscribe_msg))
print(f"已订阅: {symbol}")
# 启动心跳保活
self.send_heartbeat()
def send_heartbeat(self):
"""发送心跳,保持连接活跃"""
while self.ws and self.ws.sock and self.ws.sock.connected:
try:
self.ws.send(json.dumps({"cmd": "ping"}))
time.sleep(30) # 每 30 秒发送一次心跳
except Exception as e:
print(f"心跳发送失败: {e}")
break
def on_message(self, ws, message):
"""处理接收到的数据"""
try:
data = json.loads(message)
if data.get("cmd") == "pong":
return # 心跳响应,无需处理
if "data" in data:
# 处理行情数据
for item in data["data"]:
print(f"时间: {item.get('t')}, 代码: {item.get('s')}, "
f"最新价: {item.get('p')}, 成交量: {item.get('v')}")
except json.JSONDecodeError:
print("数据解析失败")
def on_error(self, ws, error):
"""处理错误"""
print(f"WebSocket 错误: {error}")
self.handle_reconnect()
def handle_reconnect(self):
"""指数退避重连 + 抖动"""
if self.retry_count >= self.max_retries:
print("重连次数超限,停止重连")
return
# 指数退避
delay = min(self.base_delay * (2 ** self.retry_count), self.max_delay)
# 添加抖动,避免惊群效应
jitter = random.uniform(0, delay * 0.1)
wait_time = delay + jitter
print(f"{wait_time:.2f} 秒后尝试第 {self.retry_count + 1} 次重连...")
time.sleep(wait_time)
self.retry_count += 1
self.connect()
def on_close(self, ws, close_status_code, close_msg):
"""连接关闭时的处理"""
print(f"连接关闭: {close_status_code} - {close_msg}")
self.handle_reconnect()
def start(self):
"""启动 WebSocket 客户端"""
while self.retry_count < self.max_retries:
try:
self.connect()
self.ws.run_forever(ping_interval=30, ping_timeout=10)
except Exception as e:
print(f"连接异常: {e}")
self.handle_reconnect()
# 使用示例
# api_key = os.environ.get("TICKDB_API_KEY")
# ws_client = TickDBWebSocket(api_key, ["000001.SZ", "600000.SH"])
# ws_client.start()
5.3 一张表说清楚三者的定位
| 维度 | Tushare | AkShare | TickDB |
|---|---|---|---|
| 数据成本 | 免费 | 免费 | 按量付费 |
| 实时延迟 | 4-10 秒(轮询) | 5-15 秒(轮询) | <500ms(WebSocket) |
| 历史 K 线 | 有覆盖,但有断层 | 有覆盖,稳定性差 | 清洗对齐,质量高 |
| 逐笔成交 | 不支持 | 不支持 | 不支持 A 股 |
| Level-2 深度 | 不支持 | 不支持 | 不支持 |
| 接口稳定性 | 依赖第三方,较差 | 依赖爬虫,不稳定 | 商业级 SLA |
| 代码复杂度 | 中等 | 高(需处理反爬) | 低(统一 API) |
| 适合场景 | 学习阶段、入门回测 | 多品种覆盖(债、期、基) | 专业回测、实时监控 |
六、务实选择:不同阶段的方案
不存在“最优方案”,只有“适合你当前阶段的方案”。
6.1 阶段一:学习与验证(0-6 个月)
目标:跑通回测框架,验证策略逻辑
推荐方案:Tushare 或 AkShare
理由:这个阶段你大概率还没有稳定盈利,数据成本越低越好。Tushare 的接口最接近 Python 习惯,社区资料多,遇到问题容易搜到解决方案。
注意:
- 接受数据质量的不完美
- 每次回测前做数据校验
- 把精力放在策略逻辑上,不要纠结数据细节
6.2 阶段二:实盘准备(6-18 个月)
目标:从回测走向实盘,发现数据问题
推荐方案:Tushare + TickDB 混合
理由:当你准备实盘时,你会发现免费数据的延迟和稳定性都不可接受。TickDB 的 WebSocket 推送可以满足实盘级别的实时需求,而历史 K 线数据可以用于验证回测的准确性。
注意:
- 实时数据和历史数据的一致性必须验证
- 建议花一周时间对比两套数据源的差异
- 发现不一致的地方,优先相信付费数据源
6.3 阶段三:策略稳定期(18 个月+)
目标:稳定盈利,策略迭代
推荐方案:TickDB 为主,Level-2 为辅(仅当策略需要时)
理由:稳定期的策略应该追求数据质量而非成本。TickDB 的统一 API 和稳定性可以显著降低维护成本。如果你的策略需要逐笔成交(如订单流分析),再考虑购买 Level-2。
注意:
- Level-2 的成本只有在你真正用得上时才值得
- 不要为了“面子”买 Level-2,先用 TickDB 验证逻辑
七、实操建议:降低数据成本的核心方法
不管你用哪个方案,以下方法可以显著降低你的数据成本和时间成本。
7.1 数据本地化缓存
不要每次回测都从 API 拉数据。把历史数据本地缓存,只在需要增量更新时调用 API。
import os
import sqlite3
import time
import requests
from datetime import datetime
class DataCache:
def __init__(self, db_path="./data_cache.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
"""初始化数据库表"""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS kline_cache
(symbol TEXT, interval TEXT, datetime TEXT,
open REAL, high REAL, low REAL, close REAL, volume REAL,
PRIMARY KEY (symbol, interval, datetime))''')
conn.commit()
conn.close()
def get_cached_data(self, symbol, interval, start_time, end_time):
"""从本地缓存获取数据"""
conn = sqlite3.connect(self.db_path)
query = '''SELECT * FROM kline_cache
WHERE symbol=? AND interval=?
AND datetime>=? AND datetime<=?
ORDER BY datetime'''
df = pd.read_sql_query(query, conn,
params=[symbol, interval, start_time, end_time])
conn.close()
return df
def fetch_and_cache(self, symbol, interval, start_time, end_time):
"""从 API 获取数据并缓存"""
# 先查本地缓存
cached = self.get_cached_data(symbol, interval, start_time, end_time)
if len(cached) > 0:
print(f"命中缓存: {len(cached)} 条数据")
return cached
# 从 API 获取
# ... API 调用逻辑 ...
# ... 存入本地缓存 ...
7.2 增量更新策略
def incremental_update(symbol, interval, data_cache):
"""增量更新策略"""
# 找到本地数据的最新时间点
latest = data_cache.get_latest(symbol, interval)
# 如果有数据,从最近时间点开始增量获取
if latest:
start_time = latest + timedelta(minutes=1)
new_data = fetch_from_api(symbol, interval, start_time, now())
data_cache.insert(new_data)
print(f"增量更新: {len(new_data)} 条")
else:
# 如果没有数据,从头获取
start_time = get_first_available_date(symbol)
fetch_and_store(symbol, interval, start_time, now())
7.3 数据质量自动化校验
def validate_data_quality(df, symbol):
"""自动化数据质量校验"""
issues = []
# 1. 检查时间连续性
df['time_diff'] = pd.to_datetime(df['datetime']).diff().dt.total_seconds()
gaps = df[df['time_diff'] > 120] # 超过 2 分钟视为断层
if len(gaps) > 0:
issues.append(f"发现 {len(gaps)} 处时间断层")
# 2. 检查价格跳变(单根 K 线涨跌幅超过 20%)
df['pct_change'] = df['close'].pct_change().abs()
jumps = df[df['pct_change'] > 0.2]
if len(jumps) > 0:
issues.append(f"发现 {len(jumps)} 处价格跳变(>20%)")
# 3. 检查成交量异常
df['vol_zscore'] = (df['volume'] - df['volume'].mean()) / df['volume'].std()
outliers = df[df['vol_zscore'].abs() > 5]
if len(outliers) > 0:
issues.append(f"发现 {len(outliers)} 处成交量异常")
return issues
结语:数据是手段,不是目的
回到开篇那个场景:凌晨三点跑完回测,发现数据有缺口。
这不是数据的问题,是你策略的问题。
数据只是手段。你的策略逻辑、风险管理、资金管理,才是真正决定收益的东西。
花太多时间在数据源对比上,是本末倒置。我的建议是:
- 用 Tushare 把策略跑通,验证逻辑是否成立
- 用 TickDB 把数据质量提升,减少回测偏差
- 不要在数据上过度投入,直到你的策略盈利
数据的选择没有标准答案,只有阶段适配。
下一步行动
如果你还在学习阶段,Tushare 的文档足够你跑通第一个回测。
如果你准备进入实盘,访问 tickdb.ai 注册获取 API Key,测试一下 WebSocket 推送的稳定性和历史数据的质量。免费层有足够的额度做初步验证。
如果你需要 A 股 Level-2 数据,这不是 TickDB 能覆盖的场景,建议直接咨询东方财富或同花顺的官方渠道。
如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,可以快速调用 TickDB 的数据接口。
风险提示:本文不构成任何投资建议。回测结果不代表实盘收益,量化策略存在模型过拟合风险。市场有风险,投资需谨慎。