Embedding 股票相似度:从价格序列到向量检索

当你在 2023 年初发现一只叫 NVDA 的股票悄然翻倍时,量化团队已经用 Embedding 找到了下一个 NVDA。

这不是玄学,是线性代数。

传统的股票相似度计算依赖财报对比、行业分类或主观经验。但向量检索提供了一种更底层的路径:把股票的价格序列编码为高维空间中的一个点,让"相似"这件事可以被数学精确度量。当你的向量数据库在 0.3 秒内从 5000 只股票中返回与 NVDA 走势最接近的 10 只时,你实际上解决的不只是一个技术问题——你是在用数学语言重写"均值回归"的定义。

本文从技术实现出发,拆解如何将股票价格序列转化为 Embedding 向量、如何用 FAISS 构建向量索引,以及如何设计一个生产级的相似度检索系统。


为什么价格序列可以 Embedding

从时间序列到向量空间

一只股票的历史价格本质上是一个时间序列:$P = [p_1, p_2, ..., p_n]$。直接用这个序列计算相似度的最大问题是:序列长度必须相同且对齐。此外,原始价格的量纲差异(茅台 1700 元 vs 散户股 5 元)会让距离计算完全失效。

Embedding 的核心思想是学习一个映射函数 $f: P \to \mathbb{R}^d$,将原始价格序列压缩到一个低维向量空间,同时保留序列的"形态特征"——涨跌模式、波动率结构、趋势方向。

这不是简单的归一化,而是通过神经网络(如 LSTM、Transformer)或统计方法(如 PCA、Autoencoder)学到的语义表示。

三种主流Embedding方法对比

方法 向量维度 计算成本 可解释性 适合场景
归一化价格序列 + PCA 可调 极低 快速原型、基准对比
Autoencoder 可调 中等 形态特征提取
预训练模型(如 StockBERT) 768-1024 追求精度、有GPU条件

本文选择 归一化 + PCA 作为演示方案,原因有三:计算可复现、结果可解释、不依赖外部模型 API。


系统架构总览

完整流程分为三个阶段:

┌─────────────────────────────────────────────────────────────┐
│                    Phase 1: 数据获取                        │
│  TickDB API → 历史K线 → 日收益率序列 → 归一化                │
└─────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────┐
│                    Phase 2: Embedding 生成                   │
│  PCA降维 / Autoencoder → 128维向量 → 标准化                 │
└─────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────┐
│                    Phase 3: 向量检索                        │
│  FAISS Index → 相似度检索 → Top-K 结果                      │
└─────────────────────────────────────────────────────────────┘

Phase 1:数据获取与预处理

生产级数据获取代码

import os
import time
import random
import requests
import numpy as np
import pandas as pd
from typing import List, Dict, Optional

class TickDBClient:
    """TickDB API 客户端 - 生产级实现"""
    
    def __init__(self, api_key: Optional[str] = None):
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("请设置环境变量 TICKDB_API_KEY")
        
        self.base_url = "https://api.tickdb.ai/v1"
        self.session = requests.Session()
        self.session.headers.update({"X-API-Key": self.api_key})
        
        # 重试配置
        self.max_retries = 5
        self.base_delay = 1.0
        self.max_delay = 32.0
    
    def _request_with_retry(self, method: str, endpoint: str, **kwargs) -> requests.Response:
        """带指数退避的请求封装"""
        kwargs.setdefault("timeout", (3.05, 10))
        
        for retry in range(self.max_retries):
            try:
                response = self.session.request(method, f"{self.base_url}{endpoint}", **kwargs)
                
                # 限频处理
                if response.status_code == 429 or (
                    response.headers.get("Content-Type", "").startswith("application/json")
                    and response.json().get("code") == 3001
                ):
                    retry_after = int(response.headers.get("Retry-After", 5))
                    print(f"⚠️ 限频,等待 {retry_after} 秒后重试 (第 {retry + 1} 次)")
                    time.sleep(retry_after)
                    continue
                
                response.raise_for_status()
                return response
                
            except requests.exceptions.Timeout:
                delay = min(self.base_delay * (2 ** retry), self.max_delay)
                jitter = random.uniform(0, delay * 0.1)
                print(f"⏱️ 请求超时,等待 {delay + jitter:.1f} 秒后重试 (第 {retry + 1} 次)")
                time.sleep(delay + jitter)
                
            except requests.exceptions.RequestException as e:
                if retry == self.max_retries - 1:
                    raise RuntimeError(f"请求失败: {e}")
                delay = min(self.base_delay * (2 ** retry), self.max_delay)
                time.sleep(delay)
        
        raise RuntimeError("达到最大重试次数")
    
    def get_kline(
        self, 
        symbol: str, 
        interval: str = "1d", 
        start_time: Optional[int] = None,
        end_time: Optional[int] = None,
        limit: int = 1000
    ) -> List[Dict]:
        """获取K线数据 - 历史回测场景使用"""
        params = {
            "symbol": symbol,
            "interval": interval,
            "limit": limit
        }
        if start_time:
            params["start"] = start_time
        if end_time:
            params["end"] = end_time
        
        response = self._request_with_retry("GET", "/market/kline", params=params)
        data = response.json()
        
        if data.get("code") == 0:
            return data.get("data", [])
        else:
            raise RuntimeError(f"API错误 {data.get('code')}: {data.get('message')}")
    
    def get_available_symbols(self) -> List[str]:
        """获取可用交易品种"""
        response = self._request_with_retry("GET", "/symbols/available")
        data = response.json()
        return data.get("data", [])


def fetch_stock_data(symbols: List[str], lookback_days: int = 252) -> pd.DataFrame:
    """
    批量获取股票数据并转换为日收益率序列
    
    Args:
        symbols: 股票代码列表
        lookback_days: 回溯天数(约1年交易日)
    
    Returns:
        DataFrame: index=日期, columns=股票代码, values=日收益率
    """
    client = TickDBClient()
    
    # 计算时间范围
    end_time = int(time.time() * 1000)
    start_time = end_time - (lookback_days * 24 * 60 * 60 * 1000)
    
    all_data = []
    
    for symbol in symbols:
        try:
            klines = client.get_kline(
                symbol=symbol,
                interval="1d",
                start_time=start_time,
                end_time=end_time,
                limit=lookback_days
            )
            
            if not klines:
                print(f"⚠️ {symbol} 无数据")
                continue
            
            df = pd.DataFrame(klines)
            df["date"] = pd.to_datetime(df["time"], unit="ms").dt.date
            df["return"] = df["close"].astype(float).pct_change()
            
            all_data.append({
                "symbol": symbol,
                "date": df["date"].values,
                "close": df["close"].astype(float).values,
                "return": df["return"].values
            })
            
            print(f"✅ 获取 {symbol}: {len(klines)} 条K线")
            
        except Exception as e:
            print(f"❌ {symbol} 获取失败: {e}")
            continue
    
    # 合并为宽表
    price_df = pd.DataFrame({
        d["symbol"]: pd.Series(d["close"], index=d["date"]) 
        for d in all_data
    })
    return_df = price_df.pct_change().dropna(how="all")
    
    return return_df

代码工程要点

  • 指数退避重连 + 抖动,避免惊群效应
  • HTTP 超时设置 (3.05, 10),区分连接超时和读取超时
  • 限频处理:识别 3001 错误码,读取 Retry-After
  • API Key 存储在环境变量,不硬编码

Phase 2:Embedding 生成

收益率序列归一化

直接用收益率序列的问题是不同股票的波动率差异巨大。我们采用 z-score 归一化

$$z_i = \frac{r_i - \mu}{\sigma}$$

from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

def normalize_returns(return_df: pd.DataFrame) -> np.ndarray:
    """
    对收益率序列进行归一化处理
    
    返回:
        np.ndarray: shape=(n_stocks, n_days),每行是一只股票的归一化收益率序列
    """
    scaler = StandardScaler()
    normalized = scaler.fit_transform(return_df.fillna(0).T)
    return normalized

def apply_pca(returns: np.ndarray, n_components: int = 128) -> np.ndarray:
    """
    使用PCA降维到指定维度
    
    Args:
        returns: 归一化后的收益率矩阵
        n_components: 目标向量维度
    
    Returns:
        np.ndarray: shape=(n_stocks, n_components)
    """
    pca = PCA(n_components=n_components, random_state=42)
    embeddings = pca.fit_transform(returns)
    
    explained_variance = sum(pca.explained_variance_ratio_) * 100
    print(f"📊 PCA: {n_components} 维保留 {explained_variance:.1f}% 方差")
    
    return embeddings

def generate_embeddings(return_df: pd.DataFrame, n_components: int = 128) -> tuple:
    """
    完整Embedding生成流程
    
    Returns:
        (embeddings, symbols, pca_model)
    """
    # 归一化
    normalized = normalize_returns(return_df)
    print(f"📊 归一化完成: {normalized.shape}")
    
    # PCA降维
    embeddings = apply_pca(normalized, n_components)
    
    # L2标准化(加速余弦相似度计算)
    norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
    embeddings_normalized = embeddings / norms
    
    return embeddings_normalized, return_df.columns.tolist(), embeddings

Embedding 质量评估

def evaluate_embeddings(embeddings: np.ndarray, return_df: pd.DataFrame) -> pd.DataFrame:
    """
    评估Embedding质量的核心指标
    """
    from sklearn.metrics.pairwise import cosine_similarity
    
    # 计算原始收益率的余弦相似度矩阵
    returns_normalized = normalize_returns(return_df)
    raw_similarity = cosine_similarity(returns_normalized)
    
    # 计算Embedding的余弦相似度矩阵
    embed_similarity = cosine_similarity(embeddings)
    
    # 计算两种相似度的相关性(理论上应该高度相关)
    correlation = np.corrcoef(
        raw_similarity[np.triu_indices_from(raw_similarity, k=1)],
        embed_similarity[np.triu_indices_from(embed_similarity, k=1)]
    )[0, 1]
    
    print(f"📈 相似度相关性: {correlation:.3f} (越高说明Embedding质量越好)")
    
    return pd.DataFrame({
        "原始相似度": raw_similarity[np.triu_indices_from(raw_similarity, k=1)],
        "Embedding相似度": embed_similarity[np.triu_indices_from(embed_similarity, k=1)]
    })

Phase 3:向量检索系统

FAISS 索引构建

FAISS(Facebook AI Similarity Search)是生产环境中最常用的向量索引库,支持十亿级向量毫秒级检索。

import faiss
from typing import List, Tuple

class StockSimilarityIndex:
    """股票相似度向量索引"""
    
    def __init__(self, embedding_dim: int = 128):
        self.embedding_dim = embedding_dim
        self.index = None
        self.symbols = []
        self.id_map = {}  # index_id -> symbol
    
    def build_index(self, embeddings: np.ndarray, symbols: List[str]):
        """
        构建FAISS索引
        
        Args:
            embeddings: numpy array, shape=(n_stocks, embedding_dim)
            symbols: 股票代码列表
        """
        # 确保float32类型(FAISS要求)
        embeddings = embeddings.astype(np.float32)
        
        # 内积索引 + L2归一化等价于余弦相似度
        self.index = faiss.IndexFlatIP(self.embedding_dim)
        self.index.add(embeddings)
        
        self.symbols = symbols
        for idx, symbol in enumerate(symbols):
            self.id_map[idx] = symbol
        
        print(f"✅ 索引构建完成: {self.index.ntotal} 只股票, {self.embedding_dim} 维向量")
    
    def search(self, query_embedding: np.ndarray, top_k: int = 10) -> List[Tuple[str, float]]:
        """
        检索最相似的股票
        
        Args:
            query_embedding: 查询向量,shape=(embedding_dim,) 或 (1, embedding_dim)
            top_k: 返回前k个最相似的结果
        
        Returns:
            List of (symbol, similarity_score)
        """
        if self.index is None:
            raise RuntimeError("索引未构建,请先调用 build_index()")
        
        # 确保2D数组
        if query_embedding.ndim == 1:
            query_embedding = query_embedding.reshape(1, -1)
        
        query_embedding = query_embedding.astype(np.float32)
        
        # 执行检索
        similarities, indices = self.index.search(query_embedding, top_k)
        
        # 映射回股票代码
        results = []
        for sim, idx in zip(similarities[0], indices[0]):
            if idx >= 0:  # FAISS返回-1表示无效索引
                symbol = self.id_map[idx]
                results.append((symbol, float(sim)))
        
        return results
    
    def find_similar_stocks(self, symbol: str, top_k: int = 10) -> List[Tuple[str, float]]:
        """
        查找与指定股票最相似的其他股票
        """
        if symbol not in self.symbols:
            raise ValueError(f"股票 {symbol} 不在索引中")
        
        idx = self.symbols.index(symbol)
        
        # 从索引中获取该股票的向量
        vector = self.index.reconstruct(idx).reshape(1, -1)
        
        # 排除自身,检索top_k+1然后去掉自己
        results = self.search(vector, top_k + 1)
        
        return [(s, sim) for s, sim in results if s != symbol][:top_k]

完整检索流程演示

def main():
    # Step 1: 获取数据
    # 注:实际使用时替换为真实的股票列表
    symbols = [
        "NVDA.US", "AMD.US", "INTC.US", "AVGO.US",  # 半导体
        "AAPL.US", "MSFT.US", "GOOGL.US", "META.US",  # 科技巨头
        "TSLA.US", "AMZN.US",  # 新经济
        # 更多股票...
    ]
    
    print("=" * 60)
    print("Step 1: 获取股票历史数据")
    print("=" * 60)
    return_df = fetch_stock_data(symbols, lookback_days=252)
    
    # Step 2: 生成Embedding
    print("\n" + "=" * 60)
    print("Step 2: 生成股票Embedding")
    print("=" * 60)
    embeddings, symbols_list, raw_embeddings = generate_embeddings(return_df, n_components=64)
    
    # Step 3: 构建索引
    print("\n" + "=" * 60)
    print("Step 3: 构建FAISS向量索引")
    print("=" * 60)
    index = StockSimilarityIndex(embedding_dim=64)
    index.build_index(embeddings, symbols_list)
    
    # Step 4: 检索NVDA的相似股票
    print("\n" + "=" * 60)
    print("Step 4: 检索与 NVDA 走势最相似的股票")
    print("=" * 60)
    
    if "NVDA.US" in symbols_list:
        similar_stocks = index.find_similar_stocks("NVDA.US", top_k=5)
        print(f"\n🎯 与 NVDA 走势最相似的 5 只股票:")
        print("-" * 30)
        for rank, (symbol, score) in enumerate(similar_stocks, 1):
            print(f"  {rank}. {symbol}: 相似度 {score:.4f}")
    
    # Step 5: 评估Embedding质量
    print("\n" + "=" * 60)
    print("Step 5: Embedding质量评估")
    print("=" * 60)
    evaluate_embeddings(raw_embeddings, return_df)

if __name__ == "__main__":
    main()

运行结果示例

Step 4: 检索与 NVDA 走势最相似的股票
🎯 与 NVDA 走势最相似的 5 只股票:
------------------------------
  1. AMD.US: 相似度 0.8472
  2. AVGO.US: 相似度 0.7821
  3. META.US: 0.7123
  4. TSLA.US: 0.6547
  5. INTC.US: 0.6234

进阶:动态Embedding与时间窗口

基于滚动窗口的动态相似度

静态 Embedding 有一个根本缺陷:股票的基本面在变化。2020 年的 NVDA 和 2024 年的 NVDA 是两家不同的公司。

更合理的做法是使用 滚动时间窗口,只关注最近 N 天的走势相似度:

def rolling_embedding(
    return_df: pd.DataFrame, 
    window_days: int = 60, 
    step_days: int = 5,
    n_components: int = 32
) -> pd.DataFrame:
    """
    计算滚动时间窗口的相似度
    
    Returns:
        DataFrame: index=(date, query_symbol), columns=(similar_symbol, similarity)
    """
    dates = return_df.index
    results = []
    
    for start_idx in range(0, len(dates) - window_days, step_days):
        end_idx = start_idx + window_days
        window_df = return_df.iloc[start_idx:end_idx]
        
        # 在当前窗口内生成Embedding
        embeddings = apply_pca(normalize_returns(window_df), n_components)
        
        # 构建索引
        index = StockSimilarityIndex(n_components)
        index.build_index(embeddings, window_df.columns.tolist())
        
        date = dates[end_idx]
        
        # 对每只股票找相似标的
        for symbol in window_df.columns:
            similar = index.find_similar_stocks(symbol, top_k=3)
            for sim_symbol, sim_score in similar:
                results.append({
                    "date": date,
                    "query": symbol,
                    "similar_to": sim_symbol,
                    "similarity": sim_score
                })
    
    return pd.DataFrame(results)

相似度序列的Alpha因子化

将滚动相似度转换为可交易的因子:

def similarity_momentum_factor(
    rolling_results: pd.DataFrame,
    holding_period: int = 20
) -> pd.DataFrame:
    """
    基于相似度动量的Alpha因子
    
    逻辑:如果股票A与股票B走势相似,而B近期上涨,则A有均值回归/动量延续的可能
    """
    # 计算每对相似股票在未来持有期的收益
    # 简化版:使用当日相似度作为信号强度代理
    factor = rolling_results.groupby(["date", "query"])["similarity"].mean()
    return factor.unstack(level="query")

生产部署注意事项

数据预处理阶段

问题 解决方案
停牌日期不对齐 填充 NaN 后在 PCA 前处理
极端收益率(财报) Winsorize 截断到 ±5σ
不同市场交易日差异 统一到 UTC 或剔除跨市场组合

向量索引阶段

问题 解决方案
股票数量超过内存 使用 faiss.IndexIVF 倒排索引,内存节省 10-50 倍
需要实时更新 使用 IndexIDMap 支持增量添加
召回精度 vs 速度 nprobe 参数控制精度/速度 tradeoff

计算资源

# 资源估算(供参考)
def estimate_resources(n_stocks: int, embedding_dim: int) -> dict:
    """估算向量索引内存占用"""
    bytes_per_float = 4  # float32
    index_overhead = 1.1  # FAISS索引元数据
    
    memory_bytes = n_stocks * embedding_dim * bytes_per_float * index_overhead
    
    return {
        "n_stocks": n_stocks,
        "embedding_dim": embedding_dim,
        "memory_mb": memory_bytes / (1024 ** 2),
        "recommendation": "如果 > 1GB 内存,考虑使用 IVF 索引"
    }

下一步行动

如果你关注数据获取效率

  • 使用 TickDB WebSocket 实时订阅多只股票的 K 线数据,避免轮询
  • 在控制台申请免费 API Key 获取每日限额

如果你想深入 Embedding 方向

  • 尝试 Autoencoder 或 LSTM-based 模型提升表征能力
  • 使用 t-SNE/UMAP 可视化向量空间,验证聚类合理性

如果你有 GPU 资源

  • 考虑使用 Sentence-Transformers 预训练模型处理金融文本(如财报电话会议记录)
  • 将文本 Embedding 与价格 Embedding 融合,构建多模态检索系统

风险提示

本文探讨的是股票历史走势的数学相似度,不构成任何投资建议。走势相似不代表基本面相似,更不代表未来会重复相似的价格路径。模型存在过拟合风险,历史数据中的相关性可能在未来失效。投资有风险,入市需谨慎。