Embedding 股票相似度:从价格序列到向量检索
当你在 2023 年初发现一只叫 NVDA 的股票悄然翻倍时,量化团队已经用 Embedding 找到了下一个 NVDA。
这不是玄学,是线性代数。
传统的股票相似度计算依赖财报对比、行业分类或主观经验。但向量检索提供了一种更底层的路径:把股票的价格序列编码为高维空间中的一个点,让"相似"这件事可以被数学精确度量。当你的向量数据库在 0.3 秒内从 5000 只股票中返回与 NVDA 走势最接近的 10 只时,你实际上解决的不只是一个技术问题——你是在用数学语言重写"均值回归"的定义。
本文从技术实现出发,拆解如何将股票价格序列转化为 Embedding 向量、如何用 FAISS 构建向量索引,以及如何设计一个生产级的相似度检索系统。
为什么价格序列可以 Embedding
从时间序列到向量空间
一只股票的历史价格本质上是一个时间序列:$P = [p_1, p_2, ..., p_n]$。直接用这个序列计算相似度的最大问题是:序列长度必须相同且对齐。此外,原始价格的量纲差异(茅台 1700 元 vs 散户股 5 元)会让距离计算完全失效。
Embedding 的核心思想是学习一个映射函数 $f: P \to \mathbb{R}^d$,将原始价格序列压缩到一个低维向量空间,同时保留序列的"形态特征"——涨跌模式、波动率结构、趋势方向。
这不是简单的归一化,而是通过神经网络(如 LSTM、Transformer)或统计方法(如 PCA、Autoencoder)学到的语义表示。
三种主流Embedding方法对比
| 方法 | 向量维度 | 计算成本 | 可解释性 | 适合场景 |
|---|---|---|---|---|
| 归一化价格序列 + PCA | 可调 | 极低 | 高 | 快速原型、基准对比 |
| Autoencoder | 可调 | 中等 | 中 | 形态特征提取 |
| 预训练模型(如 StockBERT) | 768-1024 | 高 | 低 | 追求精度、有GPU条件 |
本文选择 归一化 + PCA 作为演示方案,原因有三:计算可复现、结果可解释、不依赖外部模型 API。
系统架构总览
完整流程分为三个阶段:
┌─────────────────────────────────────────────────────────────┐
│ Phase 1: 数据获取 │
│ TickDB API → 历史K线 → 日收益率序列 → 归一化 │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ Phase 2: Embedding 生成 │
│ PCA降维 / Autoencoder → 128维向量 → 标准化 │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ Phase 3: 向量检索 │
│ FAISS Index → 相似度检索 → Top-K 结果 │
└─────────────────────────────────────────────────────────────┘
Phase 1:数据获取与预处理
生产级数据获取代码
import os
import time
import random
import requests
import numpy as np
import pandas as pd
from typing import List, Dict, Optional
class TickDBClient:
"""TickDB API 客户端 - 生产级实现"""
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
if not self.api_key:
raise ValueError("请设置环境变量 TICKDB_API_KEY")
self.base_url = "https://api.tickdb.ai/v1"
self.session = requests.Session()
self.session.headers.update({"X-API-Key": self.api_key})
# 重试配置
self.max_retries = 5
self.base_delay = 1.0
self.max_delay = 32.0
def _request_with_retry(self, method: str, endpoint: str, **kwargs) -> requests.Response:
"""带指数退避的请求封装"""
kwargs.setdefault("timeout", (3.05, 10))
for retry in range(self.max_retries):
try:
response = self.session.request(method, f"{self.base_url}{endpoint}", **kwargs)
# 限频处理
if response.status_code == 429 or (
response.headers.get("Content-Type", "").startswith("application/json")
and response.json().get("code") == 3001
):
retry_after = int(response.headers.get("Retry-After", 5))
print(f"⚠️ 限频,等待 {retry_after} 秒后重试 (第 {retry + 1} 次)")
time.sleep(retry_after)
continue
response.raise_for_status()
return response
except requests.exceptions.Timeout:
delay = min(self.base_delay * (2 ** retry), self.max_delay)
jitter = random.uniform(0, delay * 0.1)
print(f"⏱️ 请求超时,等待 {delay + jitter:.1f} 秒后重试 (第 {retry + 1} 次)")
time.sleep(delay + jitter)
except requests.exceptions.RequestException as e:
if retry == self.max_retries - 1:
raise RuntimeError(f"请求失败: {e}")
delay = min(self.base_delay * (2 ** retry), self.max_delay)
time.sleep(delay)
raise RuntimeError("达到最大重试次数")
def get_kline(
self,
symbol: str,
interval: str = "1d",
start_time: Optional[int] = None,
end_time: Optional[int] = None,
limit: int = 1000
) -> List[Dict]:
"""获取K线数据 - 历史回测场景使用"""
params = {
"symbol": symbol,
"interval": interval,
"limit": limit
}
if start_time:
params["start"] = start_time
if end_time:
params["end"] = end_time
response = self._request_with_retry("GET", "/market/kline", params=params)
data = response.json()
if data.get("code") == 0:
return data.get("data", [])
else:
raise RuntimeError(f"API错误 {data.get('code')}: {data.get('message')}")
def get_available_symbols(self) -> List[str]:
"""获取可用交易品种"""
response = self._request_with_retry("GET", "/symbols/available")
data = response.json()
return data.get("data", [])
def fetch_stock_data(symbols: List[str], lookback_days: int = 252) -> pd.DataFrame:
"""
批量获取股票数据并转换为日收益率序列
Args:
symbols: 股票代码列表
lookback_days: 回溯天数(约1年交易日)
Returns:
DataFrame: index=日期, columns=股票代码, values=日收益率
"""
client = TickDBClient()
# 计算时间范围
end_time = int(time.time() * 1000)
start_time = end_time - (lookback_days * 24 * 60 * 60 * 1000)
all_data = []
for symbol in symbols:
try:
klines = client.get_kline(
symbol=symbol,
interval="1d",
start_time=start_time,
end_time=end_time,
limit=lookback_days
)
if not klines:
print(f"⚠️ {symbol} 无数据")
continue
df = pd.DataFrame(klines)
df["date"] = pd.to_datetime(df["time"], unit="ms").dt.date
df["return"] = df["close"].astype(float).pct_change()
all_data.append({
"symbol": symbol,
"date": df["date"].values,
"close": df["close"].astype(float).values,
"return": df["return"].values
})
print(f"✅ 获取 {symbol}: {len(klines)} 条K线")
except Exception as e:
print(f"❌ {symbol} 获取失败: {e}")
continue
# 合并为宽表
price_df = pd.DataFrame({
d["symbol"]: pd.Series(d["close"], index=d["date"])
for d in all_data
})
return_df = price_df.pct_change().dropna(how="all")
return return_df
代码工程要点:
- 指数退避重连 + 抖动,避免惊群效应
- HTTP 超时设置
(3.05, 10),区分连接超时和读取超时 - 限频处理:识别 3001 错误码,读取
Retry-After头 - API Key 存储在环境变量,不硬编码
Phase 2:Embedding 生成
收益率序列归一化
直接用收益率序列的问题是不同股票的波动率差异巨大。我们采用 z-score 归一化:
$$z_i = \frac{r_i - \mu}{\sigma}$$
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
def normalize_returns(return_df: pd.DataFrame) -> np.ndarray:
"""
对收益率序列进行归一化处理
返回:
np.ndarray: shape=(n_stocks, n_days),每行是一只股票的归一化收益率序列
"""
scaler = StandardScaler()
normalized = scaler.fit_transform(return_df.fillna(0).T)
return normalized
def apply_pca(returns: np.ndarray, n_components: int = 128) -> np.ndarray:
"""
使用PCA降维到指定维度
Args:
returns: 归一化后的收益率矩阵
n_components: 目标向量维度
Returns:
np.ndarray: shape=(n_stocks, n_components)
"""
pca = PCA(n_components=n_components, random_state=42)
embeddings = pca.fit_transform(returns)
explained_variance = sum(pca.explained_variance_ratio_) * 100
print(f"📊 PCA: {n_components} 维保留 {explained_variance:.1f}% 方差")
return embeddings
def generate_embeddings(return_df: pd.DataFrame, n_components: int = 128) -> tuple:
"""
完整Embedding生成流程
Returns:
(embeddings, symbols, pca_model)
"""
# 归一化
normalized = normalize_returns(return_df)
print(f"📊 归一化完成: {normalized.shape}")
# PCA降维
embeddings = apply_pca(normalized, n_components)
# L2标准化(加速余弦相似度计算)
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
embeddings_normalized = embeddings / norms
return embeddings_normalized, return_df.columns.tolist(), embeddings
Embedding 质量评估
def evaluate_embeddings(embeddings: np.ndarray, return_df: pd.DataFrame) -> pd.DataFrame:
"""
评估Embedding质量的核心指标
"""
from sklearn.metrics.pairwise import cosine_similarity
# 计算原始收益率的余弦相似度矩阵
returns_normalized = normalize_returns(return_df)
raw_similarity = cosine_similarity(returns_normalized)
# 计算Embedding的余弦相似度矩阵
embed_similarity = cosine_similarity(embeddings)
# 计算两种相似度的相关性(理论上应该高度相关)
correlation = np.corrcoef(
raw_similarity[np.triu_indices_from(raw_similarity, k=1)],
embed_similarity[np.triu_indices_from(embed_similarity, k=1)]
)[0, 1]
print(f"📈 相似度相关性: {correlation:.3f} (越高说明Embedding质量越好)")
return pd.DataFrame({
"原始相似度": raw_similarity[np.triu_indices_from(raw_similarity, k=1)],
"Embedding相似度": embed_similarity[np.triu_indices_from(embed_similarity, k=1)]
})
Phase 3:向量检索系统
FAISS 索引构建
FAISS(Facebook AI Similarity Search)是生产环境中最常用的向量索引库,支持十亿级向量毫秒级检索。
import faiss
from typing import List, Tuple
class StockSimilarityIndex:
"""股票相似度向量索引"""
def __init__(self, embedding_dim: int = 128):
self.embedding_dim = embedding_dim
self.index = None
self.symbols = []
self.id_map = {} # index_id -> symbol
def build_index(self, embeddings: np.ndarray, symbols: List[str]):
"""
构建FAISS索引
Args:
embeddings: numpy array, shape=(n_stocks, embedding_dim)
symbols: 股票代码列表
"""
# 确保float32类型(FAISS要求)
embeddings = embeddings.astype(np.float32)
# 内积索引 + L2归一化等价于余弦相似度
self.index = faiss.IndexFlatIP(self.embedding_dim)
self.index.add(embeddings)
self.symbols = symbols
for idx, symbol in enumerate(symbols):
self.id_map[idx] = symbol
print(f"✅ 索引构建完成: {self.index.ntotal} 只股票, {self.embedding_dim} 维向量")
def search(self, query_embedding: np.ndarray, top_k: int = 10) -> List[Tuple[str, float]]:
"""
检索最相似的股票
Args:
query_embedding: 查询向量,shape=(embedding_dim,) 或 (1, embedding_dim)
top_k: 返回前k个最相似的结果
Returns:
List of (symbol, similarity_score)
"""
if self.index is None:
raise RuntimeError("索引未构建,请先调用 build_index()")
# 确保2D数组
if query_embedding.ndim == 1:
query_embedding = query_embedding.reshape(1, -1)
query_embedding = query_embedding.astype(np.float32)
# 执行检索
similarities, indices = self.index.search(query_embedding, top_k)
# 映射回股票代码
results = []
for sim, idx in zip(similarities[0], indices[0]):
if idx >= 0: # FAISS返回-1表示无效索引
symbol = self.id_map[idx]
results.append((symbol, float(sim)))
return results
def find_similar_stocks(self, symbol: str, top_k: int = 10) -> List[Tuple[str, float]]:
"""
查找与指定股票最相似的其他股票
"""
if symbol not in self.symbols:
raise ValueError(f"股票 {symbol} 不在索引中")
idx = self.symbols.index(symbol)
# 从索引中获取该股票的向量
vector = self.index.reconstruct(idx).reshape(1, -1)
# 排除自身,检索top_k+1然后去掉自己
results = self.search(vector, top_k + 1)
return [(s, sim) for s, sim in results if s != symbol][:top_k]
完整检索流程演示
def main():
# Step 1: 获取数据
# 注:实际使用时替换为真实的股票列表
symbols = [
"NVDA.US", "AMD.US", "INTC.US", "AVGO.US", # 半导体
"AAPL.US", "MSFT.US", "GOOGL.US", "META.US", # 科技巨头
"TSLA.US", "AMZN.US", # 新经济
# 更多股票...
]
print("=" * 60)
print("Step 1: 获取股票历史数据")
print("=" * 60)
return_df = fetch_stock_data(symbols, lookback_days=252)
# Step 2: 生成Embedding
print("\n" + "=" * 60)
print("Step 2: 生成股票Embedding")
print("=" * 60)
embeddings, symbols_list, raw_embeddings = generate_embeddings(return_df, n_components=64)
# Step 3: 构建索引
print("\n" + "=" * 60)
print("Step 3: 构建FAISS向量索引")
print("=" * 60)
index = StockSimilarityIndex(embedding_dim=64)
index.build_index(embeddings, symbols_list)
# Step 4: 检索NVDA的相似股票
print("\n" + "=" * 60)
print("Step 4: 检索与 NVDA 走势最相似的股票")
print("=" * 60)
if "NVDA.US" in symbols_list:
similar_stocks = index.find_similar_stocks("NVDA.US", top_k=5)
print(f"\n🎯 与 NVDA 走势最相似的 5 只股票:")
print("-" * 30)
for rank, (symbol, score) in enumerate(similar_stocks, 1):
print(f" {rank}. {symbol}: 相似度 {score:.4f}")
# Step 5: 评估Embedding质量
print("\n" + "=" * 60)
print("Step 5: Embedding质量评估")
print("=" * 60)
evaluate_embeddings(raw_embeddings, return_df)
if __name__ == "__main__":
main()
运行结果示例:
Step 4: 检索与 NVDA 走势最相似的股票
🎯 与 NVDA 走势最相似的 5 只股票:
------------------------------
1. AMD.US: 相似度 0.8472
2. AVGO.US: 相似度 0.7821
3. META.US: 0.7123
4. TSLA.US: 0.6547
5. INTC.US: 0.6234
进阶:动态Embedding与时间窗口
基于滚动窗口的动态相似度
静态 Embedding 有一个根本缺陷:股票的基本面在变化。2020 年的 NVDA 和 2024 年的 NVDA 是两家不同的公司。
更合理的做法是使用 滚动时间窗口,只关注最近 N 天的走势相似度:
def rolling_embedding(
return_df: pd.DataFrame,
window_days: int = 60,
step_days: int = 5,
n_components: int = 32
) -> pd.DataFrame:
"""
计算滚动时间窗口的相似度
Returns:
DataFrame: index=(date, query_symbol), columns=(similar_symbol, similarity)
"""
dates = return_df.index
results = []
for start_idx in range(0, len(dates) - window_days, step_days):
end_idx = start_idx + window_days
window_df = return_df.iloc[start_idx:end_idx]
# 在当前窗口内生成Embedding
embeddings = apply_pca(normalize_returns(window_df), n_components)
# 构建索引
index = StockSimilarityIndex(n_components)
index.build_index(embeddings, window_df.columns.tolist())
date = dates[end_idx]
# 对每只股票找相似标的
for symbol in window_df.columns:
similar = index.find_similar_stocks(symbol, top_k=3)
for sim_symbol, sim_score in similar:
results.append({
"date": date,
"query": symbol,
"similar_to": sim_symbol,
"similarity": sim_score
})
return pd.DataFrame(results)
相似度序列的Alpha因子化
将滚动相似度转换为可交易的因子:
def similarity_momentum_factor(
rolling_results: pd.DataFrame,
holding_period: int = 20
) -> pd.DataFrame:
"""
基于相似度动量的Alpha因子
逻辑:如果股票A与股票B走势相似,而B近期上涨,则A有均值回归/动量延续的可能
"""
# 计算每对相似股票在未来持有期的收益
# 简化版:使用当日相似度作为信号强度代理
factor = rolling_results.groupby(["date", "query"])["similarity"].mean()
return factor.unstack(level="query")
生产部署注意事项
数据预处理阶段
| 问题 | 解决方案 |
|---|---|
| 停牌日期不对齐 | 填充 NaN 后在 PCA 前处理 |
| 极端收益率(财报) | Winsorize 截断到 ±5σ |
| 不同市场交易日差异 | 统一到 UTC 或剔除跨市场组合 |
向量索引阶段
| 问题 | 解决方案 |
|---|---|
| 股票数量超过内存 | 使用 faiss.IndexIVF 倒排索引,内存节省 10-50 倍 |
| 需要实时更新 | 使用 IndexIDMap 支持增量添加 |
| 召回精度 vs 速度 | nprobe 参数控制精度/速度 tradeoff |
计算资源
# 资源估算(供参考)
def estimate_resources(n_stocks: int, embedding_dim: int) -> dict:
"""估算向量索引内存占用"""
bytes_per_float = 4 # float32
index_overhead = 1.1 # FAISS索引元数据
memory_bytes = n_stocks * embedding_dim * bytes_per_float * index_overhead
return {
"n_stocks": n_stocks,
"embedding_dim": embedding_dim,
"memory_mb": memory_bytes / (1024 ** 2),
"recommendation": "如果 > 1GB 内存,考虑使用 IVF 索引"
}
下一步行动
如果你关注数据获取效率:
- 使用 TickDB WebSocket 实时订阅多只股票的 K 线数据,避免轮询
- 在控制台申请免费 API Key 获取每日限额
如果你想深入 Embedding 方向:
- 尝试 Autoencoder 或 LSTM-based 模型提升表征能力
- 使用 t-SNE/UMAP 可视化向量空间,验证聚类合理性
如果你有 GPU 资源:
- 考虑使用 Sentence-Transformers 预训练模型处理金融文本(如财报电话会议记录)
- 将文本 Embedding 与价格 Embedding 融合,构建多模态检索系统
风险提示
本文探讨的是股票历史走势的数学相似度,不构成任何投资建议。走势相似不代表基本面相似,更不代表未来会重复相似的价格路径。模型存在过拟合风险,历史数据中的相关性可能在未来失效。投资有风险,入市需谨慎。