3 个人的量化团队,如何搭建生产级数据基础设施
你一定听过这个故事。
三个程序员凑在一起,决定做点不一样的事——用量化策略在市场里捞金。A 股、港股、数字货币,策略文档写得漂亮,代码跑得通,历史回测数据也调得漂亮。然后呢?
然后各自用自己的电脑跑策略,数据各自存一份,API Key 藏在本地 .env 文件里,代码通过微信发来发去,版本号靠文件名后缀——v3_final_真的_final.py。
三个月后,他们发现策略回测结果对不上。不是策略的问题,是数据源不一致——你用的日线数据是前复权,我用的是不复权,他用的是后复权。
这不是段子。这是大多数小型量化团队的真实处境。
本文不聊策略,不聊因子,不聊阿尔法。聊的是:三个人的量化团队,怎么从「三个独立开发者拼凑」进化到「一个能跑生产级策略的协作单元」。核心解决三个问题:数据怎么共享、代码怎么管理、权限怎么控制。
一、小团队做量化,比想象中更「孤独」
我见过太多「民间量化小组」的协作状态:一个人写策略,一个人调参数,一个人盯风控。听起来分工明确,实际上是三个「数据孤岛」在各自运转。
1.1 数据孤岛的三个典型症状
症状一:数据版本混乱
三个人各自下载历史数据,本地存储路径不一致,文件名不规范。有人用 BTCUSDT_1h.csv,有人用 btc_usdt_1hour.csv,有人干脆叫 data_final.csv。回测时用的数据源不同,结果自然对不上。
症状二:API Key 散落各处
TickDB 的 API Key、交易所的 API Key、风控系统的 webhook URL,统统写在代码里或者本地 .env 文件。新机器要跑策略?先把所有 Key 重新配一遍。人员变动?祈祷交接文档还在。
症状三:代码「融合」靠人工
两个人同时改了 signal_generator.py,合并靠微信对线:「你那版改了什么?」「我加了均线策略。」「啊那我把你那段删了……」
这不是协作,这是「协作事故」的高发地带。
1.2 小团队协作的独特挑战
大型量化机构有专门的运维团队、数据库管理员、代码审查流程。小团队没有。但小团队面临的复杂度并不比大团队低——反而因为「人少所以没流程」,更容易出问题。
| 维度 | 大团队 | 小团队(1-5 人) |
|---|---|---|
| 代码管理 | Git Flow + PR 审查 | 手动合并,靠信任 |
| 数据存储 | 共享数据库 + 版本管理 | 各自本地,无版本 |
| 权限控制 | LDAP/SSO + 细粒度权限 | 无,随意访问 |
| 故障恢复 | 备份 + 监控 + 值班 | 无,挂了才知道 |
| 新人上手 | 文档 + 培训 | 「你看看 XXX 的代码吧」 |
小团队要解决的,不是「有没有这些机制」,而是「如何在有限资源下实现最基础的协作基础设施」。
二、核心架构:三层结构解决数据共享
解决小团队协作问题的核心思路很简单:把数据和配置「中央化」,把代码「版本化」,把权限「分层化」。
2.1 架构总览
┌─────────────────────────────────────────────────────────┐
│ 协作基础设施架构 │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 成员 A │ │ 成员 B │ │ 成员 C │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ └─────────────┼─────────────┘ │
│ │ │
│ ┌──────────────────▼──────────────────┐ │
│ │ Git 仓库 │ │
│ │ ├── strategies/ (策略代码) │ │
│ │ ├── configs/ (配置文件) │ │
│ │ ├── notebooks/ (分析笔记) │ │
│ │ └── .env.shared (共享环境变量) │ │
│ └──────────────────┬──────────────────┘ │
│ │ │
│ ┌──────────────────▼──────────────────┐ │
│ │ 共享数据存储 │ │
│ │ ├── /data/market/ (历史数据) │ │
│ │ ├── /data/signals/ (信号输出) │ │
│ │ └── /data/results/ (回测结果) │ │
│ └──────────────────┬──────────────────┘ │
│ │ │
│ ┌──────────────────▼──────────────────┐ │
│ │ API Gateway │ │
│ │ ├── TickDB (市场数据) │ │
│ │ ├── Broker (交易执行) │ │
│ │ └── Alert (告警通知) │ │
│ └──────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
三层结构各司其职:
- 代码层:Git 仓库管理策略代码、配置文件、notebooks
- 数据层:共享存储管理市场数据、回测结果、信号输出
- 服务层:统一 API 网关管理 TickDB、交易所、风控系统等外部服务
2.2 为什么这个架构适合小团队
大团队用 Kubernetes、Prometheus、完整的 CI/CD 流水线。小团队用不起这些运维成本。这个架构的精髓是:用最少的工具,实现最核心的协作需求。
Git + 共享存储 + 统一 API 网关,三件套搞定 80% 的协作问题。剩下的 20%,靠约定俗成的协作规范来补。
三、生产级代码:API Key 管理与共享数据获取
这一节给出生产级代码示例,解决两个核心问题:如何安全地管理团队共享的 API Key,以及如何从 TickDB 获取市场数据并写入共享存储。
3.1 统一配置管理:共享 .env 文件
先解决 API Key 管理问题。方案:使用加密的共享 .env.shared 文件,配合 git-crypt 做透明加密,或者用更简单的方案——共享存储 + 统一读取层。
这里推荐一个适合小团队的方案:集中式配置管理服务。
# config/team_config.py
# 小团队统一配置管理模块
import os
import json
from pathlib import Path
from typing import Optional
from datetime import datetime
class TeamConfig:
"""团队共享配置管理器
使用规范:
1. 配置文件存储在共享存储的 /config 目录
2. 每个成员的本地机器只需要配置 LOCAL_CONFIG_PATH
3. API Key 不存储在代码仓库,只存储在共享目录的加密文件
"""
def __init__(self, team_storage_path: str = None):
# 共享存储根目录,各成员自行配置本地路径
self.team_storage = team_storage_path or os.environ.get(
"TEAM_STORAGE_PATH",
"/mnt/team-storage" # 默认路径,可根据实际情况修改
)
self._ensure_dirs()
def _ensure_dirs(self):
"""确保目录结构存在"""
Path(self.team_storage, "config").mkdir(parents=True, exist_ok=True)
Path(self.team_storage, "data/market").mkdir(parents=True, exist_ok=True)
Path(self.team_storage, "data/signals").mkdir(parents=True, exist_ok=True)
Path(self.team_storage, "data/results").mkdir(parents=True, exist_ok=True)
@property
def api_keys(self) -> dict:
"""获取团队共享的 API Keys
⚠️ 实际使用时,强烈建议这个文件加密存储
"""
key_file = Path(self.team_storage, "config/api_keys.json")
if not key_file.exists():
raise FileNotFoundError(
f"API Key 配置文件不存在: {key_file}\n"
"请联系团队负责人获取配置或初始化配置文件。"
)
with open(key_file, 'r') as f:
keys = json.load(f)
# 运行时验证关键 Key 是否配置
required_keys = ["TICKDB_API_KEY", "BROKER_API_KEY"]
missing = [k for k in required_keys if k not in keys or not keys[k]]
if missing:
raise ValueError(f"缺少必需的 API Key: {', '.join(missing)}")
return keys
def get_tickdb_key(self) -> str:
"""获取 TickDB API Key"""
return self.api_keys.get("TICKDB_API_KEY", "")
def get_broker_key(self) -> str:
"""获取交易所 API Key"""
return self.api_keys.get("BROKER_API_KEY", "")
def log_access(self, key_name: str, member: str = None):
"""记录 API Key 访问日志(审计用)"""
log_dir = Path(self.team_storage, "logs")
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / "api_access.log"
timestamp = datetime.now().isoformat()
member = member or os.environ.get("USER", "unknown")
with open(log_file, 'a') as f:
f.write(f"[{timestamp}] {member} accessed {key_name}\n")
# 全局单例
team_config = TeamConfig()
// config/api_keys.json.example
// 示例配置文件,真实文件不提交到 Git
{
"TICKDB_API_KEY": "your-tickdb-api-key-here",
"BROKER_API_KEY": "your-broker-api-key-here",
"BROKER_SECRET": "your-broker-secret-here",
"FEISHU_WEBHOOK": "https://open.feishu.cn/open-apis/bot/v2/hook/xxx",
"last_updated": "2026-04-01",
"updated_by": "team_lead"
}
⚠️ 工程警告:
api_keys.json文件绝对不能提交到 Git 仓库,必须加入.gitignore- 建议对该文件启用额外的加密或权限控制
- 每次代码访问 Key 时自动记录日志,便于审计
3.2 共享数据获取:TickDB 历史数据同步
团队数据一致性的核心:所有成员从同一个数据源获取数据,并存储到共享目录。
# data/market_data_fetcher.py
import os
import time
import json
import logging
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Optional
import requests
import pandas as pd
from config.team_config import team_config
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MarketDataFetcher:
"""TickDB 市场数据获取器
功能:
1. 从 TickDB API 获取历史 K 线数据
2. 统一存储到团队共享目录
3. 记录数据版本和来源,保证团队数据一致性
⚠️ 生产环境高频场景建议使用 aiohttp + asyncio 异步架构
"""
BASE_URL = "https://api.tickdb.ai/v1"
def __init__(self):
self.api_key = team_config.get_tickdb_key()
self.storage_path = Path(team_config.team_storage, "data/market")
self.storage_path.mkdir(parents=True, exist_ok=True)
self._retry_count = 3
self._retry_delay = 5
def _headers(self) -> dict:
return {"X-API-Key": self.api_key}
def _request_with_retry(self, method: str, url: str, **kwargs) -> dict:
"""带重试机制的请求封装
处理场景:
- 限频 (code: 3001):读取 Retry-After 头等待
- 网络抖动:指数退避重试
"""
for attempt in range(self._retry_count):
try:
response = requests.request(
method,
url,
headers=self._headers(),
timeout=(3.05, 10), # ⚠️ 超时必须设置
**kwargs
)
result = response.json()
# 处理限频
if result.get("code") == 3001:
retry_after = int(response.headers.get("Retry-After", self._retry_delay))
logger.warning(f"触发限频,等待 {retry_after} 秒后重试...")
time.sleep(retry_after)
continue
# 处理成功
if result.get("code") == 0:
return result.get("data")
# 处理其他错误
error_msg = result.get("message", "Unknown error")
logger.error(f"API 请求失败: {error_msg}")
if result.get("code") in (1001, 1002):
raise ValueError(f"API Key 无效: {error_msg}")
elif result.get("code") == 2002:
raise KeyError(f"交易品种不存在: {error_msg}")
return None
except requests.exceptions.Timeout:
logger.warning(f"请求超时(第 {attempt + 1} 次重试)")
time.sleep(self._retry_delay * (2 ** attempt)) # 指数退避
except requests.exceptions.RequestException as e:
logger.error(f"网络请求异常: {e}")
time.sleep(self._retry_delay * (2 ** attempt))
raise RuntimeError(f"请求失败,已重试 {self._retry_count} 次")
def fetch_klines(
self,
symbol: str,
interval: str = "1h",
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
limit: int = 1000
) -> pd.DataFrame:
"""获取 K 线数据并存储到共享目录
参数:
- symbol: 交易品种,如 "BTC.USDT"
- interval: K 线周期,如 "1m", "5m", "1h", "1d"
- start_time: 开始时间,默认获取最近 limit 条
- end_time: 结束时间
- limit: 每次请求的最大条数(TickDB 上限 1000)
"""
params = {
"symbol": symbol,
"interval": interval,
"limit": limit
}
if start_time:
params["start_time"] = int(start_time.timestamp() * 1000)
if end_time:
params["end_time"] = int(end_time.timestamp() * 1000)
logger.info(f"获取 {symbol} {interval} K 线数据...")
data = self._request_with_retry("GET", f"{self.BASE_URL}/market/kline", params=params)
if not data:
return pd.DataFrame()
df = pd.DataFrame(data)
# 标准化字段名
if "open_time" in df.columns:
df["timestamp"] = pd.to_datetime(df["open_time"], unit="ms")
elif "t" in df.columns:
df["timestamp"] = pd.to_datetime(df["t"], unit="ms")
# 数据验证
if df.empty:
logger.warning(f"{symbol} {interval} 无数据返回")
return df
logger.info(f"获取到 {len(df)} 条数据,时间范围: {df['timestamp'].min()} ~ {df['timestamp'].max()}")
# 存储到共享目录
self._save_to_storage(df, symbol, interval)
return df
def _save_to_storage(self, df: pd.DataFrame, symbol: str, interval: str):
"""保存数据到共享存储,包含版本信息"""
safe_symbol = symbol.replace("/", "_")
filename = f"{safe_symbol}_{interval}.parquet"
filepath = self.storage_path / filename
# 保存数据文件
df.to_parquet(filepath, index=False)
# 保存元数据
metadata = {
"symbol": symbol,
"interval": interval,
"record_count": len(df),
"time_range": {
"start": df["timestamp"].min().isoformat() if not df.empty else None,
"end": df["timestamp"].max().isoformat() if not df.empty else None
},
"fetched_at": datetime.now().isoformat(),
"fetched_by": os.environ.get("USER", "unknown"),
"source": "TickDB"
}
metadata_file = self.storage_path / f"{safe_symbol}_{interval}.meta.json"
with open(metadata_file, 'w') as f:
json.dump(metadata, f, indent=2)
logger.info(f"数据已保存至: {filepath}")
def load_from_storage(self, symbol: str, interval: str) -> pd.DataFrame:
"""从共享存储加载数据(团队成员统一使用此方法)"""
safe_symbol = symbol.replace("/", "_")
filepath = self.storage_path / f"{safe_symbol}_{interval}.parquet"
if not filepath.exists():
logger.warning(f"本地缓存不存在,先拉取数据: {filepath}")
return self.fetch_klines(symbol, interval)
# 读取本地数据
df = pd.read_parquet(filepath)
# 读取元数据,检查是否需要更新
metadata_file = self.storage_path / f"{safe_symbol}_{interval}.meta.json"
if metadata_file.exists():
with open(metadata_file, 'r') as f:
metadata = json.load(f)
# 检查数据新鲜度(超过 1 天需要更新)
fetched_at = datetime.fromisoformat(metadata["fetched_at"])
if (datetime.now() - fetched_at).total_seconds() > 86400:
logger.info("数据已超过 24 小时,尝试增量更新...")
self._incremental_update(symbol, interval, df)
return df
def _incremental_update(self, symbol: str, interval: str, local_df: pd.DataFrame):
"""增量更新数据(避免全量拉取)"""
if local_df.empty:
return
latest_time = local_df["timestamp"].max()
new_data = self.fetch_klines(symbol, interval, start_time=latest_time)
if not new_data.empty:
# 合并数据并去重
combined = pd.concat([local_df, new_data]).drop_duplicates(subset=["timestamp"])
combined = combined.sort_values("timestamp")
self._save_to_storage(combined, symbol, interval)
logger.info(f"增量更新完成,新增 {len(new_data)} 条数据")
# 使用示例
if __name__ == "__main__":
fetcher = MarketDataFetcher()
# 团队成员 A:首次获取数据
btc_1h = fetcher.fetch_klines("BTC.USDT", interval="1h", limit=500)
# 团队成员 B:从共享存储加载(数据与 A 一致)
btc_1h_cached = fetcher.load_from_storage("BTC.USDT", interval="1h")
# 验证数据一致性
print(f"成员 A 获取: {len(btc_1h)} 条")
print(f"成员 B 加载: {len(btc_1h_cached)} 条")
print(f"数据一致: {len(btc_1h) == len(btc_1h_cached)}")
关键设计点解释:
- 统一数据源:所有成员通过
load_from_storage()加载数据,确保数据版本一致 - 元数据追踪:每份数据附带
meta.json,记录来源、获取时间、获取者 - 增量更新:避免重复拉取全量数据,节省 API 调用
- 限频处理:遇到 TickDB 限频 (code: 3001) 时读取
Retry-After等待
四、Git 协作规范:适合小团队的 Git Flow
代码管理是协作基础设施的核心。大型团队的 Git Flow 过于复杂,小团队需要一个「刚刚好」的方案。
4.1 推荐工作流:简化版 Git Flow
┌─ feature/xxx ─────────┐
│ │
main ─────────────┴──┬─────────────────────┘
▲ │
│ ┌──────▼──────┐
│ │ develop │
│ └──────┬──────┘
│ │
│ ┌──────▼──────┐
└────────┤ release │
└─────────────┘
核心规则:
| 角色 | 分支 | 规则 |
|---|---|---|
main |
生产分支 | 只接收 release 分支合并,禁止直接推送 |
develop |
开发分支 | 所有功能先合并到这里,可直接推送(约定俗成) |
feature/* |
功能分支 | 从 develop 创建,完成后合并回 develop |
4.2 协作约定(必须遵守)
约定一:提交信息规范
# 格式:<类型>: <简短描述>
# 好的提交
git commit -m "feat: 添加均线交叉信号生成器"
git commit -m "fix: 修复止盈逻辑的精度问题"
git commit -m "chore: 更新 TickDB API 版本依赖"
# 不好的提交(禁止)
git commit -m "更新"
git commit -m "asdf"
git commit -m "改了代码"
约定二:合并前必须审查
小团队没有 CI/CD,但必须有基本的代码审查流程。规则:
feature/*合并到develop前,至少一个队友 review- Review 方式:GitHub PR / GitLab MR / 线下面对面都行
- 简单改动可放宽,但策略核心逻辑必须 review
约定三:配置分离
量化策略项目/
├── config/
│ ├── api_keys.json # ⚠️ 不提交,存储在共享目录
│ └── strategy_config.py # ✅ 提交,不含敏感信息
├── data/ # ✅ 不提交,存储在共享目录
├── strategies/ # ✅ 提交
├── notebooks/ # ✅ 提交
└── .gitignore
.gitignore 必须包含:
# 敏感配置
config/api_keys.json
.env
.env.*
# 数据文件(统一存储在共享目录)
data/
# Python
__pycache__/
*.py[cod]
*.egg-info/
# IDE
.vscode/
.idea/
# 日志
*.log
logs/
4.3 解决冲突的「君子协定」
小团队不可避免会遇到代码冲突。君子协定:
- 谁改谁负责:修改了某个文件,冲突时优先由修改者解决
- 先沟通再动手:解决冲突前先和对方说一声,别偷偷覆盖
- 策略代码优先:策略逻辑文件冲突时,找第三人做最终裁判
五、权限控制:适度即可,不要过度设计
小团队的权限控制容易被忽视,也容易被过度设计。原则:够用就行,信任为基。
5.1 三级权限模型
| 级别 | 成员 | 权限内容 |
|---|---|---|
| L1:观察者 | 投资人、外部顾问 | 只读数据、可查看策略文档 |
| L2:研究者 | 策略研究员 | 读写策略代码、读写回测结果、可拉取数据 |
| L3:管理者 | 团队 Lead | 全部权限 + 配置文件管理 + Git 仓库管理 |
5.2 实际实现:共享存储的权限设置
# 团队共享目录权限配置(Linux/macOS)
# 假设团队成员:alice, bob, charlie
# 创建共享组
sudo groupadd quant_team
# 添加成员到组
sudo usermod -aG quant_team alice
sudo usermod -aG quant_team bob
sudo usermod -aG quant_team charlie
# 配置共享目录
sudo chown -R :quant_team /mnt/team-storage
sudo chmod -R 2775 /mnt/team-storage # 2775 = rwxrwsr-x
# 设置 SGID(新建文件自动继承组)
find /mnt/team-storage -type d -exec chmod g+s {} \;
# 配置目录级别权限
# 只有 Lead 可写配置目录
sudo chown alice:quant_team /mnt/team-storage/config
sudo chmod 775 /mnt/team-storage/config
# 数据目录全员可写
sudo chmod 2775 /mnt/team-storage/data
5.3 API Key 的访问控制
TickDB API Key 建议的管理方式:
- 不要共享同一个 Key:每个成员用自己的 Key,方便追踪使用和权限管理
- Tiered Access:按功能分级,Junior 成员用只读 Key,Senior 用读写 Key
- 记录访问日志:每次调用 API 时记录操作人(代码中实现)
# config/api_keys.json 多用户版本
{
"members": {
"alice": {
"tickdb_key": "tk_live_xxx_alice",
"role": "L3",
"active": true
},
"bob": {
"tickdb_key": "tk_live_xxx_bob",
"role": "L2",
"active": true
}
},
"shared": {
"broker_key": "broker_xxx",
"broker_secret": "secret_xxx",
"role": "L3", # 只有 L3 角色可访问
"updated_by": "alice"
}
}
六、部署方案:从小到大的演进路径
小团队的协作基础设施不需要一步到位,可以根据团队规模和需求演进。
6.1 分阶段配置建议
| 阶段 | 团队规模 | 存储方案 | Git 托管 | 权限管理 |
|---|---|---|---|---|
| 阶段一:草台班子 | 1-2 人 | NAS / 移动硬盘 | GitHub private | 无(靠信任) |
| 阶段二:初具规模 | 2-4 人 | 云盘(Dropbox/OneDrive)+ 本地备份 | GitHub/GitLab | 基础文件权限 |
| 阶段三:生产级 | 4 人以上 | 云对象存储(OSS/S3)+ 自动化备份 | GitLab self-hosted | LDAP/SSO |
6.2 阶段一的具体配置(适合大多数小团队)
存储:
- 使用 NAS 或云盘(OneDrive/Dropbox 均可)
- 目录结构:
/量化项目/data/、/量化项目/config/、/量化项目/strategies/ - 每日自动备份到另一块磁盘
Git:
- GitHub private 仓库(免费)
- 团队成员邀请为 collaborator
- 启用 two-factor authentication
配置:
- API Key 存储在加密的本地文件 + 共享云盘备份
- 使用 1Password / Bitwarden 等密码管理器存储个人 Key
6.3 TickDB 在协作中的角色
回到本文开头提到的数据一致性问题。TickDB 在小团队协作中扮演的是统一数据源的角色:
- 单一数据源:所有成员从 TickDB 获取市场数据,不从多个来源拼凑
- 版本可控:TickDB 的历史数据经过清洗对齐,团队使用统一的数据版本
- API 调用日志:通过 team_config 的访问日志,可追踪谁在什么时间获取了什么数据
这是解决「三个人回测结果对不上」问题的根本方案:不是靠约定「大家都用前复权」,而是靠基础设施「大家都用 TickDB」。
七、结语:从「三个独立开发者」到「一个协作单元」
协作基础设施的建设,本质上是在回答一个问题:三个人的量化团队,能不能像一个人一样高效运转?
答案是:可以。但需要投入时间搭建基础设施,而不是把所有时间都花在策略迭代上。
本文给出了三个核心组件的解决方案:
- 数据共享:统一数据源(TickDB)+ 共享存储 + 版本追踪
- 代码管理:简化 Git Flow + 协作约定 + 配置分离
- 权限控制:三级权限模型 + 分层 API Key 管理
这些方案不完美,但足够小团队跑生产级策略。当团队从 3 个人扩展到 10 个人时,这些基础设施会成为你们最坚实的基础。
下一步行动
如果你正在独自做量化,发现数据管理越来越混乱:
整理现有的数据文件,建立统一的存储规范和命名约定,这是协作的第一步。
如果你已经有了小团队,数据一致性问题频发:
- 在团队共享存储中建立统一的数据目录结构
- 所有成员从 TickDB 获取数据,统一写入共享目录
- 建立 Git 仓库,将策略代码版本化管理
如果你的团队已有 3 人以上,正在寻找更系统的协作方案:
联系 [email protected],获取 TickDB 机构版方案,包含团队数据权限管理、使用量统计、专属技术支持。
本文不构成任何投资建议。市场有风险,投资需谨慎。