用 LLM 分析财报电话会议情绪:从音频转录到交易信号

开篇

“语言是思想的载体,而语调是情绪的泄露。”

2019 年第二季度财报电话会上,亚马逊 CFO Brian Olsavsky 在回答关于利润率压力的问题时,语速从平均每分钟 145 词骤降至 112 词,停顿频率增加了 37%。财报当天,亚马逊股价下跌 3.19%,次日继续走低。这一跌幅在当时被归因于“市场对利润率扩张预期的重新定价”。

但如果当时的分析师能系统性地追踪管理层语调的变化,这场下跌是否可以被提前预判?

这不是事后诸葛的玄学,而是一个正在被量化团队工程化的研究课题。

财报电话会议是美股市场最重要的信息密度事件之一。SEC 要求所有上市公司以电话会议形式向投资者传达财务表现和未来展望,会议实录通常在 4 小时内以 8-K 文件形式公开。这份文本不仅包含管理层给出的指引数字,还包含措辞选择、语调起伏、回答流畅度等无法量化的“软信息”。

过去十年,学术界用正则表达式和词典匹配来量化这些软信息。但正则表达式无法捕捉讽刺、反问和语境依赖的否定结构。当 LLM 的上下文理解能力与金融市场的海量事件数据相遇,一个新的研究范式正在形成。

本文将拆解这个范式的完整技术链路:从财报电话会议音频的转录,到 LLM 情感打分,再到将情感因子转化为可回测的交易信号。目标不是证明“情感因子一定赚钱”,而是让这个假设可以被系统性地验证或证伪。


一、为什么财报电话会议值得单独研究

1.1 信息不对称的结构性来源

财报电话会议的信息价值来自三个层面:

管理层措辞层。标准财务报告使用高度格式化的数字语言,而电话会议中的自由问答环节允许分析师追问细节。一家公司的 CFO 在回答“库存周转率为何下降”时说“we're working through some normalization issues”,与说“we have a serious inventory management problem”传达的信息量天差地别。前者是标准公关话术,后者是实质性的经营预警。

语调变化层。语言学研究表明,当人们撒谎或试图掩盖问题时,音调会上扬,停顿会增加,主动词汇量会减少。这些微妙的声学特征在转录文本中以“语义模糊”的形式呈现,传统 NLP 很难捕捉,但 LLM 的语义理解能力可以在一定程度上识别这种异常。

分析师追问层。电话会议后半段的问答环节通常由机构分析师主导,他们的问题本身携带了市场关注焦点的信号。分析师对“毛利率”的反复追问,与对“用户增长”的反复追问,反映的是完全不同的市场共识。追踪问题焦点的转移,本身就是情感因子的一种表达。

1.2 现有量化方法的局限

情感因子的量化研究并非新鲜事物。学术界最著名的 lexicon 是 Loughran-McDonald 金融情感词典,它基于 100 份年报手动标注,包含约 1354 个负面词汇和 354 个正面词汇。这个词典在学术论文中被广泛使用,但在实际交易中面临三个致命问题:

语境盲区。Loughran-McDonald 词典将"potential"标为负面词,因为这个词在普通文本中常与风险关联。但在财报电话会议的语境中,"potential growth"是正面表述,"potential headwind"才是负面表述。一刀切的词典方法在这个简单例子上就会出错,更不用说"IPO was postponed"和"IPO is postponed indefinitely"这种时态差异带来的语义反转。

更新滞后。金融语言在社交媒体和即时通讯的影响下快速演化。"moon"作为正面词,"rug pull"作为负面词,这些新表达在传统词典中完全缺失。LLM 的预训练数据覆盖了这些表达,且可以通过 prompt engineering 捕捉语境。

粒度过粗。词典方法输出一个 [-1, 1] 的情感分,LLM 可以输出更细粒度的多维度评估:管理层信心指数、风险敞口表述强度、指引措辞的乐观程度。这些维度在因子构建上的效果差异显著。


二、技术架构:从音频到交易信号的全链路

2.1 系统组件

一个完整的财报电话会议情感分析系统包含四个组件:

音频获取 → Whisper 转录 → LLM 情感打分 → 信号与回测

音频获取层。财报电话会议通常由第三方服务商(如 Chorus Call、Zoom)托管,部分公司提供录音重放。获取路径包括:公司 IR 页面的录音链接、Yahoo Finance Earnings Call 页面、Seeking Alpha 转录页面、以及 Bloomberg 终端的 earnings transcript 数据接口。这一层没有标准化,数据清洗工作量最大。

转录层。Whisper 是目前开源生态中最成熟的音频转录模型。Whisper v3 的 large-v3 变体在金融英语上的 WER(Word Error Rate)可以控制在 3% 以内,对于财报电话会议这种标准口音、高信噪比的音频,质量足够。转录速度取决于音频时长,实测在 M2 Max MacBook Pro 上,1 小时音频转录约需 8-12 分钟。

情感打分层。这是系统的核心创新点。使用 LLM 对转录文本进行结构化分析,输出多维度情感指标。相比直接问“整体情感是正还是负”,更有效的方式是让 LLM 扮演金融分析师角色,从多个维度评估文本。

回测层。将 LLM 输出的情感因子与历史价格数据结合,在事件窗口内计算超额收益。回测需要的数据包括:财报发布精确时间戳、当天及随后 N 天的 OHLCV 数据、基准指数数据(SPY)。TickDB 的 /v1/market/kline 接口提供 10 年级别的美股历史 K 线数据,支持多时间周期的因子验证。

2.2 技术挑战

在进入代码实现之前,需要明确三个工程挑战:

转录文本的段落归属问题。财报电话会议通常由 CEO 开场、 CFO 补充、IR 主持、分析师提问四个角色参与。转录文本需要标注发言者身份,才能对管理层语调进行针对性分析。Whisper 本身不输出 speaker diarization(说话人分离),需要结合 pyannote 或类似的 diarization 模型。

长文本的上下文窗口问题。一份财报电话会议的转录文本通常在 8000-15000 tokens 之间,超过了部分小模型的上下文窗口限制。解决方案包括:分段处理 + 分段打分 + 聚合;或者直接使用支持长上下文的模型(如 Claude 100K、GPT-4 Turbo 128K)。

情感打分的一致性问题。LLM 的输出具有随机性,同一段文本两次调用可能得到略有不同的分数。生产级系统需要对同一段文本进行多次采样,计算期望值和标准差。标准差本身就是“文本情感模糊度”的代理变量。


三、Whisper 转录:生产级实现

3.1 环境准备

Whisper 的生产级部署需要注意几个工程细节:

# 推荐使用 pip 安装 whispercpp-python 绑定(CPU 推理)
pip install whispercpp-python

# 或者使用 faster-whisper(CUDA 加速,生产环境推荐)
pip install faster-whisper

# speaker diarization(说话人分离)
pip install pyannote.audio

注意:pyannote.audio 需要在 Hugging Face 申请模型访问权限,且模型权重用于非商业用途需要联系作者获取许可。

3.2 转录代码实现

以下代码展示了一个生产级的财报电话会议音频转录流程:

import os
import time
import subprocess
from pathlib import Path
from typing import Optional, Iterator
from dataclasses import dataclass
from faster_whisper import WhisperModel


@dataclass
class TranscriptSegment:
    """单个转录段落"""
    speaker: str
    start_time: float  # 秒
    end_time: float
    text: str
    confidence: float  # 0-1


class EarningsCallTranscriber:
    """
    财报电话会议转录器
    生产级实现:支持断点续传、分段处理、speaker diarization
    """
    
    def __init__(
        self,
        model_size: str = "large-v3",
        device: str = "cuda",  # "cuda" or "cpu"
        compute_type: str = "float16",  # "float16" for GPU, "int8" for CPU
    ):
        # ⚠️ 生产环境建议使用 GPU,CPU 转录 1 小时音频约需 30 分钟
        self.model = WhisperModel(
            model_size,
            device=device,
            compute_type=compute_type,
        )
        self.diarization_model = None  # 延迟加载,按需初始化
    
    def _download_audio(self, audio_url: str, cache_dir: str) -> Path:
        """
        从 URL 下载音频文件
        ⚠️ 真实场景中需要处理各种音频格式和 CDN 鉴权
        """
        cache_path = Path(cache_dir)
        cache_path.mkdir(parents=True, exist_ok=True)
        
        filename = f"{hash(audio_url)}.mp3"
        filepath = cache_path / filename
        
        if filepath.exists():
            return filepath
        
        # 实际实现中应使用 requests + 重试逻辑
        # 这里简化处理
        subprocess.run(
            ["curl", "-L", "-o", str(filepath), audio_url],
            check=True,
        )
        
        return filepath
    
    def transcribe(
        self,
        audio_source: str,
        language: str = "en",
        beam_size: int = 5,
        vad_filter: bool = True,
    ) -> Iterator[TranscriptSegment]:
        """
        执行转录
        
        Args:
            audio_source: 音频文件路径或 URL
            language: 音频语言,财报会议默认英文
            beam_size: Beam size,越大越精确但越慢
            vad_filter: 是否启用语音活动检测,过滤静音片段
        
        Yields:
            TranscriptSegment 对象流
        """
        # 音频路径解析
        if audio_source.startswith("http"):
            audio_path = self._download_audio(
                audio_source, 
                os.environ.get("AUDIO_CACHE_DIR", "/tmp/earnings_audio")
            )
        else:
            audio_path = Path(audio_source)
        
        if not audio_path.exists():
            raise FileNotFoundError(f"音频文件不存在: {audio_path}")
        
        # 执行转录
        # ⚠️ faster-whisper 的 segment_iterator 设计用于流式输出
        # 可以实时处理长音频,不必等待全部完成
        segments, info = self.model.transcribe(
            str(audio_path),
            language=language,
            beam_size=beam_size,
            vad_filter=vad_filter,
            word_timestamps=True,  # 输出词级别时间戳,speaker diarization 需要
        )
        
        print(f"检测语言: {info.language}, 概率: {info.language_probability:.2f}")
        
        # ⚠️ Speaker diarization 是可选模块,需要 pyannote
        # 当前版本先返回简化版 speaker_id = "UNKNOWN"
        # 完整实现参考下方 _apply_diarization 方法
        
        prev_end = 0.0
        for segment in segments:
            # VAD 过滤后时间戳可能跳跃,做基础平滑
            if segment.start - prev_end > 10.0:
                prev_end = segment.start - 0.1
            
            yield TranscriptSegment(
                speaker="UNKNOWN",  # TODO: 接入 speaker diarization
                start_time=prev_end,
                end_time=segment.end,
                text=segment.text.strip(),
                confidence=segment.avg_log_prob / (-1) if segment.avg_log_prob else 0.8,
            )
            
            prev_end = segment.end
    
    def transcribe_to_file(
        self,
        audio_source: str,
        output_path: str,
        language: str = "en",
    ) -> list[TranscriptSegment]:
        """转录并保存到文件,支持断点续传"""
        segments = []
        output_file = Path(output_path)
        
        # 检查断点
        if output_file.exists():
            with open(output_file, "r") as f:
                processed = sum(1 for _ in f if _.strip())
        else:
            processed = 0
        
        with open(output_file, "a") as f:
            for i, segment in enumerate(self.transcribe(audio_source, language)):
                if i < processed:
                    continue
                
                segments.append(segment)
                f.write(
                    f"[{segment.start_time:.2f}-{segment.end_time:.2f}] "
                    f"{segment.speaker}: {segment.text}\n"
                )
                f.flush()  # 实时写入,防止长音频中断丢失
        
        return segments


# 使用示例
if __name__ == "__main__":
    transcriber = EarningsCallTranscriber(
        model_size="large-v3",
        device="cuda",  # GPU 转录
    )
    
    # 从 URL 转录(示例 URL,请替换为真实地址)
    # segments = transcriber.transcribe(
    #     "https://example.com/nvda-q4-2024-earnings.mp3"
    # )
    
    # 本地文件转录
    segments = transcriber.transcribe("/path/to/earnings_call.mp3")
    
    for seg in segments:
        print(f"{seg.start_time:.1f}s [{seg.speaker}]: {seg.text[:80]}...")

3.3 工程注意事项

GPU 内存估算。Whisper large-v3 模型在 FP16 精度下需要约 6GB 显存。建议使用 NVIDIA RTX 3090 或同等以上显卡。如果显存不足,可以降级到 medium 模型(4GB 显存),但 WER 会从 3% 上升至 5-8%。

批量处理。多个财报电话会议的批量转录建议使用任务队列(如 Celery + Redis),避免阻塞主线程。每个任务的超时时间建议设置为音频时长的 3 倍。

音频格式兼容。部分公司使用 webex 私有格式,建议在预处理阶段使用 ffmpeg 统一转换为 WAV 16kHz mono:

ffmpeg -i input.webm -ar 16000 -ac 1 -c:a pcm_s16le output.wav

四、LLM 情感打分:结构化分析框架

4.1 多维度情感分析设计

直接让 LLM 输出“正面/负面/中性”的情感判断,在实际因子构建中价值有限。更有效的方式是设计一个结构化的分析框架,让 LLM 从多个维度评估文本。

单次分析 prompt 设计

你是一位专注于财报电话会议的金融分析师。请分析以下财报电话会议转录文本,从以下五个维度进行评估:

1. **管理层信心指数 (0-10)**:评估管理层在回答问题时表现出的信心程度。
   - 10分:主动提供超预期细节,使用强肯定句式
   - 5分:回答中性,措辞谨慎
   - 0分:回避问题,频繁使用"不确定"、"需要评估"等防御性表述

2. **指引乐观程度 (-2 到 +2)**:评估管理层对未来业绩指引的措辞。
   - +2:强烈乐观,明确的量化目标,超预期表述
   - +1:轻微乐观,谨慎正面前瞻
   - 0:中性,不提供指引
   - -1:轻微谨慎,提及潜在挑战
   - -2:强烈悲观,明显下调指引或表达重大担忧

3. **风险敞口表述强度 (0-10)**:评估管理层主动提及风险和挑战的频率和严重程度。
   - 0-3:几乎不提及风险,或轻描淡写
   - 4-6:适度提及,标准风险披露
   - 7-10:频繁且详细地讨论挑战、风险和不确定性

4. **回答完整性 (0-10)**:评估管理层是否完整回答了分析师的问题。
   - 10:直接回答问题,提供充分细节
   - 5:部分回答,需要追问
   - 0:完全回避问题或答非所问

5. **语调一致性 (0-1)**:评估管理层在整个会议中的语调是否一致。
   - 1.0:语调始终一致
   - 0.5:部分段落语调变化明显
   - 0.0:语调在会议中发生显著逆转

请按以下格式输出 JSON:
{
  "confidence_index": <0-10整数>,
  "guidance_sentiment": <-2到+2整数>,
  "risk_exposure_intensity": <0-10整数>,
  "answer_completeness": <0-10整数>,
  "tone_consistency": <0-1浮点数>,
  "summary": "<50字内的核心摘要>"
}

请只输出 JSON,不要包含任何解释。

4.2 分段与聚合策略

一份完整的财报电话会议包含多个问答环节,每个环节应该独立打分,然后聚合。

import json
import os
import time
import re
from typing import Optional
from dataclasses import dataclass
from collections import defaultdict
import anthropic

# ⚠️ 生产环境应使用环境变量存储 API Key
ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY")


@dataclass
class SentimentScores:
    """LLM 情感打分结果"""
    confidence_index: int
    guidance_sentiment: int
    risk_exposure_intensity: int
    answer_completeness: int
    tone_consistency: float
    summary: str
    raw_response: str


class EarningsSentimentAnalyzer:
    """
    财报电话会议情感分析器
    支持分段处理、长文本聚合、多次采样计算标准差
    """
    
    ANALYSIS_PROMPT = """你是一位专注于财报电话会议的金融分析师。请分析以下财报电话会议转录文本,从以下五个维度进行评估:

1. **管理层信心指数 (0-10)**:评估管理层在回答问题时表现出的信心程度。
2. **指引乐观程度 (-2 到 +2)**:评估管理层对未来业绩指引的措辞。
3. **风险敞口表述强度 (0-10)**:评估管理层主动提及风险和挑战的频率和严重程度。
4. **回答完整性 (0-10)**:评估管理层是否完整回答了分析师的问题。
5. **语调一致性 (0-1)**:评估管理层在整个会议中的语调是否一致。

请按以下格式输出 JSON:
{
  "confidence_index": <0-10整数>,
  "guidance_sentiment": <-2到+2整数>,
  "risk_exposure_intensity": <0-10整数>,
  "answer_completeness": <0-10整数>,
  "tone_consistency": <0-1浮点数>,
  "summary": "<50字内的核心摘要>"
}

请只输出 JSON,不要包含任何解释。"""
    
    def __init__(
        self,
        model: str = "claude-3-5-sonnet-20241022",
        max_tokens: int = 500,
        temperature: float = 0.3,
    ):
        """
        初始化分析器
        
        Args:
            model: Claude 模型名称,支持 claude-3-5-sonnet-20241022 等
            max_tokens: 最大输出 token 数
            temperature: 采样温度,0.3 适合结构化评分
        """
        self.client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
        self.model = model
        self.max_tokens = max_tokens
        self.temperature = temperature
        
        # 指数退避重试配置
        self.max_retries = 5
        self.base_delay = 2.0
    
    def _parse_json_response(self, response: str) -> dict:
        """从 LLM 输出中提取 JSON"""
        # 尝试直接解析
        try:
            return json.loads(response)
        except json.JSONDecodeError:
            pass
        
        # 尝试提取 markdown 代码块
        code_block_match = re.search(
            r'```(?:json)?\s*([\s\S]*?)\s*```',
            response,
            re.MULTILINE
        )
        if code_block_match:
            try:
                return json.loads(code_block_match.group(1))
            except json.JSONDecodeError:
                pass
        
        # 尝试提取第一个 { 到最后一个 } 的内容
        json_match = re.search(r'\{[\s\S]*\}', response)
        if json_match:
            try:
                return json.loads(json_match.group())
            except json.JSONDecodeError:
                pass
        
        raise ValueError(f"无法从响应中提取 JSON: {response[:200]}")
    
    def analyze_segment(
        self,
        transcript_segment: str,
        speaker_context: Optional[str] = None,
    ) -> SentimentScores:
        """
        分析单个转录段落
        
        Args:
            transcript_segment: 转录文本段落
            speaker_context: 发言者上下文(如"CFO 回答关于毛利率的问题")
        """
        user_message = transcript_segment
        if speaker_context:
            user_message = f"[发言者上下文]: {speaker_context}\n\n[转录文本]:\n{transcript_segment}"
        
        for retry in range(self.max_retries):
            try:
                response = self.client.messages.create(
                    model=self.model,
                    max_tokens=self.max_tokens,
                    temperature=self.temperature,
                    system=self.ANALYSIS_PROMPT,
                    messages=[
                        {
                            "role": "user",
                            "content": user_message
                        }
                    ],
                    timeout=30.0,  # ⚠️ 生产环境必须设置超时
                )
                
                parsed = self._parse_json_response(response.content[0].text)
                
                return SentimentScores(
                    confidence_index=parsed["confidence_index"],
                    guidance_sentiment=parsed["guidance_sentiment"],
                    risk_exposure_intensity=parsed["risk_exposure_intensity"],
                    answer_completeness=parsed["answer_completeness"],
                    tone_consistency=parsed["tone_consistency"],
                    summary=parsed["summary"],
                    raw_response=response.content[0].text,
                )
                
            except Exception as e:
                delay = self.base_delay * (2 ** retry)
                # 添加抖动避免惊群
                delay += time.time() % 1
                
                if retry == self.max_retries - 1:
                    raise
                
                print(f"请求失败,重试中 ({retry + 1}/{self.max_retries}): {e}")
                time.sleep(min(delay, 60))  # 最大等待 60 秒
    
    def analyze_with_sampling(
        self,
        transcript_segment: str,
        n_samples: int = 3,
    ) -> tuple[SentimentScores, float]:
        """
        多次采样分析,计算期望值和标准差
        
        Returns:
            (平均分, 标准差)
        """
        scores_list = []
        
        for i in range(n_samples):
            scores = self.analyze_segment(transcript_segment)
            scores_list.append(scores)
        
        # 计算各维度均值
        avg_scores = SentimentScores(
            confidence_index=int(sum(s.confidence_index for s in scores_list) / n_samples),
            guidance_sentiment=int(sum(s.guidance_sentiment for s in scores_list) / n_samples),
            risk_exposure_intensity=int(sum(s.risk_exposure_intensity for s in scores_list) / n_samples),
            answer_completeness=int(sum(s.answer_completeness for s in scores_list) / n_samples),
            tone_consistency=sum(s.tone_consistency for s in scores_list) / n_samples,
            summary=scores_list[0].summary,  # 取第一次的摘要
            raw_response="\n".join(s.raw_response for s in scores_list),
        )
        
        # 计算标准差(作为文本模糊度的代理变量)
        variance = (
            sum((s.confidence_index - avg_scores.confidence_index) ** 2 for s in scores_list) / n_samples
        )
        std_dev = variance ** 0.5
        
        return avg_scores, std_dev
    
    def analyze_full_transcript(
        self,
        segments: list,
        aggregate_method: str = "mean",
    ) -> dict:
        """
        分析完整财报电话会议
        
        Args:
            segments: TranscriptSegment 列表(来自 Whisper 转录)
            aggregate_method: 聚合方式,"mean" 或 "weighted"
        
        Returns:
            包含各维度分数和会议整体评分的字典
        """
        all_scores = []
        
        for i, segment in enumerate(segments):
            if len(segment.text.split()) < 20:  # 跳过过短的片段
                continue
            
            try:
                scores, uncertainty = self.analyze_with_sampling(
                    segment.text,
                    n_samples=3,
                )
                all_scores.append({
                    "segment_index": i,
                    "start_time": segment.start_time,
                    "scores": scores,
                    "uncertainty": uncertainty,
                })
            except Exception as e:
                print(f"段落 {i} 分析失败: {e}")
                continue
        
        # 聚合计算
        if not all_scores:
            raise ValueError("没有成功的分析段落")
        
        n = len(all_scores)
        
        # 会议整体评分
        aggregate = {
            "avg_confidence": sum(s["scores"].confidence_index for s in all_scores) / n,
            "avg_guidance": sum(s["scores"].guidance_sentiment for s in all_scores) / n,
            "avg_risk": sum(s["scores"].risk_exposure_intensity for s in all_scores) / n,
            "avg_completeness": sum(s["scores"].answer_completeness for s in all_scores) / n,
            "avg_consistency": sum(s["scores"].tone_consistency for s in all_scores) / n,
            "avg_uncertainty": sum(s["uncertainty"] for s in all_scores) / n,
            "segment_count": n,
        }
        
        # 合成单一信号(简单加权平均)
        # 信心指数越高、指引越乐观、风险暴露越低 = 正面信号
        composite_score = (
            0.3 * (aggregate["avg_confidence"] / 10) +  # 标准化到 0-1
            0.3 * ((aggregate["avg_guidance"] + 2) / 4) +  # -2 到 +2 标准化到 0-1
            -0.2 * (aggregate["avg_risk"] / 10) +  # 风险反向
            0.2 * (aggregate["avg_consistency"])  # 一致性正向
        )
        
        aggregate["composite_signal"] = composite_score
        aggregate["signal_interpretation"] = (
            "正面" if composite_score > 0.6 else
            "中性" if composite_score > 0.4 else
            "负面"
        )
        
        return aggregate


# 使用示例
if __name__ == "__main__":
    analyzer = EarningsSentimentAnalyzer(
        model="claude-3-5-sonnet-20241022",
        temperature=0.3,
    )
    
    # 示例转录文本(简化)
    sample_text = """
    Analyst: Can you discuss the margin pressure you're seeing in the cloud segment?
    
    CFO Brian Olsavsky: Sure, I'd be happy to address that. We did see some margin compression in Q2, primarily driven by the infrastructure investments we made earlier in the year. However, we're seeing early returns on our optimization efforts, and we expect margins to normalize by Q4. The team has done excellent work on cost efficiency, and our unit economics continue to improve. We're not concerned about the long-term trajectory.
    """
    
    scores = analyzer.analyze_segment(sample_text)
    print(f"信心指数: {scores.confidence_index}/10")
    print(f"指引情感: {scores.guidance_sentiment} (-2 到 +2)")
    print(f"风险敞口: {scores.risk_exposure_intensity}/10")
    print(f"摘要: {scores.summary}")

4.3 工程注意事项

API 成本优化。完整财报转录文本约 8000-15000 tokens,使用 8K context 的模型需要分段处理。每次 API 调用约消耗 $0.003-0.01(取决于模型)。一份财报的完整分析成本约 $0.5-2。如果进行 3 次采样,则约 $1.5-6。相比数据价值和潜在 alpha,这个成本是合理的。

并发限制。Claude API 有 rate limit,建议使用信号量控制并发量。Anthropic 的标准限制是 50 RPM,使用共享的 API Key 时需要特别注意。

异常处理。LLM 可能输出非标准 JSON,需要有 robust 的解析逻辑。同时建议记录 raw response,便于后续审计和调优。


五、事件回测:情感因子验证框架

5.1 回测设计

回测是验证情感因子有效性的核心环节。一个严谨的事件回测需要明确以下设计决策:

事件定义。以财报电话会议结束时间为 t=0。情感因子在 t=0 之后计算,等待一段时间(如 1 小时)让市场吸收信息,然后开仓。持仓期可以是固定窗口(如 5 个交易日)或基于止盈/止损条件。

信号分组。将情感因子分为三组:正面(composite_score > 0.65)、中性(0.35-0.65)、负面(< 0.35)。或者使用十分位数分组,检验信号的单调性。

基准对比。与买入持有对比,计算超额收益(alpha)。与同行业其他股票在该财报季的收益对比(行业内异象)。

幸存者偏差。必须包含已退市股票,否则会高估收益。使用全量历史数据(含退市标的)是 TickDB 数据覆盖的重要优势。

交易成本假设。假设每笔交易 0.1% 滑点 + $0.005/股佣金。对于日内事件策略,成本可能占总收益的 15-30%。

5.2 回测代码实现

以下代码展示如何使用 TickDB 历史 K 线数据进行事件回测:

import os
import json
import time
import requests
from datetime import datetime, timedelta
from typing import Optional
from dataclasses import dataclass
from collections import defaultdict
import pandas as pd
import numpy as np


@dataclass
class TickDBConfig:
    """TickDB 连接配置"""
    api_key: str
    base_url: str = "https://api.tickdb.ai/v1"
    
    def get_headers(self) -> dict:
        return {"X-API-Key": self.api_key}


class TickDBKlineClient:
    """
    TickDB K 线数据客户端
    生产级实现:限频处理、指数退避重连、超时设置
    """
    
    def __init__(self, config: TickDBConfig):
        self.config = config
        self.session = requests.Session()
        self.session.headers.update(config.get_headers())
        
        # 限频状态
        self.last_request_time = 0
        self.min_request_interval = 0.1  # 最小请求间隔(秒)
    
    def _rate_limit_wait(self):
        """简单的限频等待"""
        elapsed = time.time() - self.last_request_time
        if elapsed < self.min_request_interval:
            time.sleep(self.min_request_interval - elapsed)
        self.last_request_time = time.time()
    
    def _handle_response(self, response: requests.Response) -> dict:
        """统一响应处理和错误处理"""
        if response.status_code == 429:
            retry_after = int(response.headers.get("Retry-After", 5))
            # ⚠️ 限频错误按 Retry-After 等待
            print(f"触发限频,等待 {retry_after} 秒")
            time.sleep(retry_after)
            return self._retry_request(response.request)
        
        if response.status_code != 200:
            raise RuntimeError(f"API 错误 {response.status_code}: {response.text}")
        
        data = response.json()
        
        code = data.get("code", 0)
        if code != 0:
            if code in (1001, 1002):
                raise ValueError("API Key 无效")
            elif code == 2002:
                raise KeyError("交易品种不存在")
            else:
                raise RuntimeError(f"API 业务错误 {code}: {data.get('message')}")
        
        return data
    
    def _retry_request(
        self, 
        original_request, 
        base_delay: float = 1.0,
        max_retries: int = 5,
    ) -> dict:
        """指数退避重连"""
        for retry in range(max_retries):
            delay = min(base_delay * (2 ** retry), 60)
            # 添加抖动
            delay *= (1 + np.random.uniform(-0.1, 0.1))
            
            time.sleep(delay)
            
            try:
                response = self.session.send(
                    self.session.prepare_request(original_request),
                    timeout=(3.05, 10),
                )
                
                if response.status_code != 429:
                    return self._handle_response(response)
                    
            except requests.exceptions.RequestException as e:
                if retry == max_retries - 1:
                    raise
                print(f"请求异常,重试中 ({retry + 1}/{max_retries}): {e}")
        
        raise RuntimeError("重试次数耗尽")
    
    def get_klines(
        self,
        symbol: str,
        interval: str = "1d",
        start_time: Optional[int] = None,
        end_time: Optional[int] = None,
        limit: int = 1000,
    ) -> pd.DataFrame:
        """
        获取 K 线数据
        
        Args:
            symbol: 交易品种代码,如 "AAPL.US"
            interval: K 线周期,"1m"/"5m"/"15m"/"1h"/"4h"/"1d"/"1w"
            start_time: 开始时间戳(毫秒)
            end_time: 结束时间戳(毫秒)
            limit: 每次请求最大条数
        
        Returns:
            DataFrame,包含 OHLCV 数据
        """
        self._rate_limit_wait()
        
        params = {
            "symbol": symbol,
            "interval": interval,
            "limit": limit,
        }
        
        if start_time:
            params["start"] = start_time
        if end_time:
            params["end"] = end_time
        
        # ⚠️ 生产环境必须设置 timeout
        response = self.session.get(
            f"{self.config.base_url}/market/kline",
            params=params,
            timeout=(3.05, 10),
        )
        
        data = self._handle_response(response)
        
        klines = data.get("data", {}).get("klines", [])
        
        if not klines:
            return pd.DataFrame()
        
        df = pd.DataFrame(klines)
        df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
        
        # 数值列类型转换
        numeric_cols = ["open", "high", "low", "close", "volume"]
        for col in numeric_cols:
            if col in df.columns:
                df[col] = df[col].astype(float)
        
        return df


@dataclass
class EarningsEvent:
    """财报事件"""
    symbol: str
    earnings_date: datetime  # 财报发布日
    conference_call_end: Optional[datetime] = None  # 电话会议结束时间
    sentiment_scores: Optional[dict] = None  # LLM 情感评分


@dataclass
class BacktestResult:
    """回测结果"""
    symbol: str
    event_date: datetime
    composite_score: float
    signal_direction: str
    holding_days: int
    entry_price: float
    exit_price: float
    raw_return: float
    cost_adjusted_return: float
    benchmark_return: float
    alpha: float


class EarningsSentimentBacktester:
    """
    财报情感因子回测器
    """
    
    def __init__(
        self,
        tickdb_config: TickDBConfig,
        cost_per_trade: float = 0.001,  # 0.1% 交易成本
        slippage: float = 0.0005,  # 0.05% 滑点
    ):
        self.kline_client = TickDBKlineClient(tickdb_config)
        self.cost_per_trade = cost_per_trade
        self.slippage = slippage
    
    def calculate_event_returns(
        self,
        event: EarningsEvent,
        benchmark_symbol: str = "SPY.US",
        post_event_days: int = 5,
        interval: str = "1d",
    ) -> Optional[BacktestResult]:
        """
        计算单个事件的收益
        
        Args:
            event: 财报事件
            benchmark_symbol: 基准代码
            post_event_days: 事件后持有天数
            interval: K 线周期
        """
        # 获取事件后 K 线数据
        start_ts = int(event.earnings_date.timestamp() * 1000)
        end_ts = int(
            (event.earnings_date + timedelta(days=post_event_days + 5)).timestamp() * 1000
        )
        
        try:
            stock_df = self.kline_client.get_klines(
                symbol=event.symbol,
                interval=interval,
                start_time=start_ts,
                end_time=end_ts,
                limit=100,
            )
            
            if len(stock_df) < 2:
                return None
            
            # 计算日收益率
            stock_df = stock_df.sort_values("timestamp").reset_index(drop=True)
            
            # 事件日收盘价作为入场价(假设 T+1 开盘入场)
            entry_price = stock_df.iloc[1]["open"] if len(stock_df) > 1 else stock_df.iloc[0]["close"]
            
            # 持有期最后一天收盘价作为出场价
            hold_end_idx = min(post_event_days + 1, len(stock_df) - 1)
            exit_price = stock_df.iloc[hold_end_idx]["close"]
            
            # 计算基准收益
            benchmark_df = self.kline_client.get_klines(
                symbol=benchmark_symbol,
                interval=interval,
                start_time=start_ts,
                end_time=end_ts,
                limit=100,
            )
            
            benchmark_df = benchmark_df.sort_values("timestamp").reset_index(drop=True)
            
            if len(benchmark_df) < hold_end_idx + 1:
                benchmark_return = 0.0
            else:
                bench_entry = benchmark_df.iloc[1]["open"] if len(benchmark_df) > 1 else benchmark_df.iloc[0]["close"]
                bench_exit = benchmark_df.iloc[hold_end_idx]["close"]
                benchmark_return = (bench_exit - bench_entry) / bench_entry
            
            # 原始收益
            raw_return = (exit_price - entry_price) / entry_price
            
            # 成本调整后收益(买入+卖出)
            total_cost = self.cost_per_trade * 2 + self.slippage * 2
            cost_adjusted_return = raw_return - total_cost
            
            # Alpha
            alpha = cost_adjusted_return - benchmark_return
            
            # 信号方向
            if event.sentiment_scores and "composite_signal" in event.sentiment_scores:
                signal = event.sentiment_scores["composite_signal"]
            else:
                signal = 0.5
            
            signal_direction = (
                "long" if signal > 0.6 else
                "short" if signal < 0.4 else
                "neutral"
            )
            
            return BacktestResult(
                symbol=event.symbol,
                event_date=event.earnings_date,
                composite_score=signal,
                signal_direction=signal_direction,
                holding_days=hold_end_idx,
                entry_price=entry_price,
                exit_price=exit_price,
                raw_return=raw_return,
                cost_adjusted_return=cost_adjusted_return,
                benchmark_return=benchmark_return,
                alpha=alpha,
            )
            
        except Exception as e:
            print(f"计算 {event.symbol} {event.earnings_date.date()} 收益失败: {e}")
            return None
    
    def run_backtest(
        self,
        events: list[EarningsEvent],
        benchmark_symbol: str = "SPY.US",
        post_event_days: int = 5,
    ) -> pd.DataFrame:
        """
        运行完整回测
        
        Returns:
            包含所有事件收益的 DataFrame
        """
        results = []
        
        for i, event in enumerate(events):
            print(f"处理事件 {i + 1}/{len(events)}: {event.symbol} @ {event.earnings_date.date()}")
            
            result = self.calculate_event_returns(
                event,
                benchmark_symbol,
                post_event_days,
            )
            
            if result:
                results.append(result)
            
            # ⚠️ 避免触发限频
            time.sleep(0.2)
        
        df = pd.DataFrame(results)
        
        if len(df) == 0:
            return pd.DataFrame()
        
        # 计算分组统计
        df = self._calculate_group_stats(df)
        
        return df
    
    def _calculate_group_stats(self, df: pd.DataFrame) -> pd.DataFrame:
        """计算分组统计"""
        df["score_decile"] = pd.qcut(df["composite_score"], q=10, labels=False, duplicates="drop")
        
        grouped = df.groupby("score_decile").agg({
            "raw_return": ["mean", "std", "count"],
            "cost_adjusted_return": ["mean", "std"],
            "alpha": ["mean", "std"],
            "composite_score": "mean",
        }).round(4)
        
        print("\n=== 分组收益统计 ===")
        print(grouped)
        
        # Long/Short 组合
        long_returns = df[df["composite_score"] > 0.7]["cost_adjusted_return"].mean()
        short_returns = df[df["composite_score"] < 0.3]["cost_adjusted_return"].mean()
        
        print(f"\nTop tertile 平均收益: {long_returns:.4f}")
        print(f"Bottom tertile 平均收益: {short_returns:.4f}")
        print(f"Long-Short 组合: {long_returns - short_returns:.4f}")
        
        return df


# 使用示例
if __name__ == "__main__":
    # 初始化 TickDB 客户端
    config = TickDBConfig(api_key=os.environ.get("TICKDB_API_KEY"))
    backtester = EarningsSentimentBacktester(
        tickdb_config=config,
        cost_per_trade=0.001,
        slippage=0.0005,
    )
    
    # 示例事件列表(实际使用中应从公开数据源获取)
    # 每份财报需要:股票代码、财报日期、LLM 情感评分
    sample_events = [
        EarningsEvent(
            symbol="NVDA.US",
            earnings_date=datetime(2024, 11, 20),
            sentiment_scores={"composite_signal": 0.78},
        ),
        EarningsEvent(
            symbol="META.US",
            earnings_date=datetime(2024, 10, 30),
            sentiment_scores={"composite_signal": 0.65},
        ),
        EarningsEvent(
            symbol="TSLA.US",
            earnings_date=datetime(2024, 10, 23),
            sentiment_scores={"composite_signal": 0.42},
        ),
        # ... 更多事件
    ]
    
    results_df = backtester.run_backtest(
        events=sample_events,
        benchmark_symbol="SPY.US",
        post_event_days=5,
    )
    
    if len(results_df) > 0:
        print(f"\n回测样本数: {len(results_df)}")
        print(f"平均成本调整收益: {results_df['cost_adjusted_return'].mean():.4f}")
        print(f"平均 Alpha: {results_df['alpha'].mean():.4f}")

5.3 回测披露与局限性

回测局限性说明:上述回测结果基于历史数据模拟,不构成未来收益保证。回测中存在以下局限性:未完全模拟实际交易中的滑点和市场冲击成本(已假设 0.1% 固定滑点);未考虑极端行情下的流动性枯竭风险;样本量有限,统计显著性可能不足;LLM 情感评分具有主观性,不同 prompt 设计可能导致结果差异;未考虑财报结果与预期的偏离(超预期/低于预期是更强的驱动因素,情感分析可能只是补充)。

建议在实际使用前进行更长时间跨度的验证,并进行样本外测试。


六、产业链与标的覆盖

6.1 财报季时间分布

美股主要财报季的时间窗口:

财报季 财年结束月 财报发布时间 重要关注点
Q1 3 月 4 月中-5 月初 年报后的首个季度,消费/科技开年表现
Q2 6 月 7 月中-8 月初 中期指引调整窗口
Q3 9 月 10 月中-11 月初 下半年趋势确认
Q4 12 月 1 月中-2 月初 全年收官,年度指引发布

6.2 LLM 财报分析的高信息密度标的

以下类型标的从财报电话会议情感分析中获益最多:

标的类型 特点 分析价值
大型科技 (FAANG) 话语权强、分析师关注度高、指引详尽 管理层信心指数对短期走势有显著影响
成长股 对未来指引高度敏感,估值依赖叙事 风险敞口表述与估值重定价强相关
周期股 管理层语调与行业周期交叉验证 避免在周期顶部被乐观指引误导
中小盘 信息覆盖率低,电话会议是主要信源 从管理层措辞中挖掘被主流分析师忽略的信号

七、结语与下一步行动

7.1 核心洞察

情感因子是补充,不是替代。LLM 情感分析的价值不在于替代财报数字分析,而在于捕捉数字背后的“管理层的态度”。一个 beat and raise(超预期并上调指引)的财报,如果管理层在电话会议中表现出明显的防御姿态,可能预示着短期利好已被透支。

信号需要验证。单一维度的情感因子胜率有限。有效的做法是将情感因子与基本面因子(盈利 surprise、市值、动量)结合,构建多因子模型。情感因子在“预期外”的场景中信号强度更高。

工程化是壁垒。情感分析的工程实现比模型选型更重要。稳定的音频获取渠道、标准化的转录流程、带有置信区间的打分机制、严谨的回测框架——这些基础设施决定了因子能否从研究走向实盘。

7.2 分层行动指南

如果你是个人投资者,关注财报电话会议中管理层对“宏观环境”的措辞。当 CFO 连续两个季度提及“consumer spending pressure”或“enterprise spending caution”,可能预示着行业性需求放缓。

如果你希望亲手实现本文策略

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY,复制本文代码即可开始回测
  4. 从 Whisper 转录开始,逐步搭建完整的情感分析 pipeline

如果你需要 10 年全量历史 K 线数据做策略回测,联系 [email protected] 了解机构方案。完整的财报事件研究需要覆盖至少两个完整牛熊周期,数据广度和质量直接影响结论的可靠性。

如果你习惯用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,可以快速调用历史 K 线数据进行验证。


风险提示:本文不构成任何投资建议。市场有风险,投资需谨慎。LLM 情感分析是一种研究工具,其输出具有随机性和主观性,不应作为独立的投资决策依据。回测结果不代表未来收益。