用 LLM 分析财报电话会议情绪:从音频转录到交易信号
"We remain committed to our long-term growth strategy, though we acknowledge near-term headwinds in certain segments."
这不是一句废话。这是 CFO 在财报电话会上精心措辞的信号——"headwinds"意味着下调预期,"long-term"意味着短期无法兑现。当人类分析师还在反复听录音时,量化系统已经完成了 2000 家公司的情绪评分。
财报电话会议是上市公司与投资者最重要的沟通渠道。一场 60 分钟的电话会,包含 CEO 的战略陈述、CFO 的财务回顾、以及分析师的追问,每个词都经过公关团队打磨。但这些信息对散户和非专业分析师来说,天然存在三个壁垒:时间成本(听完所有相关电话会不现实)、语言模糊性("cautiously optimistic"到底是乐观还是悲观?)、情绪主观性(不同分析师对同一表述可能给出截然不同的解读)。
本文构建一套完整的工程 pipeline:从财报电话会的音频抓取,到 Whisper 转录,再到 LLM 批量情感打分,最终将非结构化文本转化为可回测的量化信号。整个系统设计遵循可复现原则——你可以在任何一台有 GPU 的服务器上重建它。
为什么财报电话会情绪值得量化
财报电话会议的核心价值在于管理层前瞻性指引的透明度。与财报本身披露的"后视镜"数据不同,电话会中管理层的措辞往往预示未来业绩走向。
学术界对此有充分研究。,痛点拆解用学术研究的结论来支撑:
- 管理层语调与盈余惊喜:塔菲斯等学者的研究表明,管理层的语调变化能预测未来 2-4 个季度的盈利修正幅度。
- 情感词汇密度与市场反应:电话会中负面情感词密度每上升 10%,未来 5 日股价超额收益平均下降 1.2%。
- 分析师追问中的信息增量:分析师追问环节往往比开场陈述包含更多负面信号,因为此时管理层措辞更难掩饰。
但这些研究的共性问题是:手工标注不可扩展。一位分析师一天最多处理 3-5 场电话会,而美股财报季每天有超过 50 家公司发布财报。
LLM 的介入彻底改变了这个局面。一个 70B 参数的模型可以在 30 秒内完成一场 60 分钟电话会的情感分析,输出结构化的情绪指标。以 GPT-4o 为例,其在情感分析任务上的 F1 分数已超过大多数人类分析师。
整体架构:从音频到信号的四层 Pipeline
在展开代码之前,先理解整个系统的数据流:
┌─────────────────────────────────────────────────────────────────┐
│ 数据采集层 │
│ 财报日历 → 音频 URL 抓取 → 音频文件下载 → 格式标准化 │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 语音转录层 │
│ OpenAI Whisper API → 时间戳标注 → 说话人分离 → 文本分段 │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ LLM 情感分析层 │
│ Prompt 工程 → 批量打分 → 情绪指标计算 → 异常值标记 │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 事件回测层 │
│ TickDB 历史 K 线 → 信号对齐 → 事件窗口分析 → 夏普/胜率统计 │
└─────────────────────────────────────────────────────────────────┘
四层职责清晰分离:采集层负责"抓得到",转录层负责"听得准",分析层负责"读得懂",回测层负责"验得了"。
第一步:财报日历与音频抓取
财报日期是整个系统的触发点。我们需要提前获取即将发布财报的公司名单,并抓取电话会的音频 URL。
大多数美股公司的财报电话会音频通过以下平台之一发布:
- Company Investor Relations 网站:直接提供 MP3/AAC 链接
- Seeking Alpha Earnings Call Audio:聚合平台,支持批量查询
- FactSet Earnings Insight:机构级数据源,延迟低
import os
import time
import random
import requests
from datetime import datetime, timedelta
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from dataclasses import dataclass
from typing import Optional, List
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class EarningsCall:
"""财报电话会数据结构"""
ticker: str
company_name: str
report_date: datetime
fiscal_period: str # e.g., "Q4 2024"
audio_url: Optional[str] = None
transcript_url: Optional[str] = None
duration_seconds: Optional[int] = None
class EarningsCalendarFetcher:
"""
财报日历抓取器
从 Seeking Alpha 获取即将发布财报的公司列表,
并抓取对应的电话会音频 URL。
"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://seekingalpha.com/api/v1"
self.session = requests.Session()
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (compatible; EarningsBot/1.0)",
"Accept": "application/json"
})
self._rate_limit_init()
def _rate_limit_init(self):
"""初始化限频参数"""
self.last_request_time = 0
self.min_request_interval = 1.0 # Seeking Alpha 建议 1 秒间隔
self.retry_count = 0
self.max_retries = 3
def _rate_limited_request(self, url: str, params: dict = None) -> dict:
"""
带限频处理的请求
处理策略:
1. 检查距离上次请求的时间间隔
2. 如果被限频(429),读取 Retry-After 头等待
3. 指数退避重试
"""
current_time = time.time()
elapsed = current_time - self.last_request_time
if elapsed < self.min_request_interval:
sleep_time = self.min_request_interval - elapsed
sleep_time += random.uniform(0, 0.5) # 添加抖动避免惊群
time.sleep(sleep_time)
for attempt in range(self.max_retries):
try:
response = self.session.get(
url,
params=params,
timeout=(3.05, 10)
)
self.last_request_time = time.time()
if response.status_code == 200:
self.retry_count = 0
return response.json()
elif response.status_code == 429:
# 被限频,读取 Retry-After
retry_after = int(response.headers.get("Retry-After", 60))
logger.warning(f"Rate limited, waiting {retry_after}s")
time.sleep(retry_after)
continue
elif response.status_code in (500, 502, 503, 504):
# 服务器错误,指数退避
wait_time = min(2 ** attempt * 2, 60)
wait_time += random.uniform(0, wait_time * 0.2)
logger.warning(f"Server error {response.status_code}, retrying in {wait_time:.1f}s")
time.sleep(wait_time)
continue
else:
logger.error(f"Unexpected status {response.status_code}: {response.text[:200]}")
return None
except requests.exceptions.Timeout:
logger.warning(f"Request timeout, attempt {attempt + 1}/{self.max_retries}")
time.sleep(2 ** attempt)
continue
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {e}")
return None
return None
def get_upcoming_earnings(self, days_ahead: int = 14) -> List[EarningsCall]:
"""
获取未来 N 天内发布财报的公司列表
Args:
days_ahead: 向前查看的天数
Returns:
EarningsCall 对象列表
"""
end_date = datetime.now() + timedelta(days=days_ahead)
url = f"{self.base_url}/earnings_calendar"
params = {
"from": datetime.now().strftime("%Y-%m-%d"),
"to": end_date.strftime("%Y-%m-%d"),
"filter": "with_confirmed_date"
}
data = self._rate_limited_request(url, params)
if not data:
logger.error("Failed to fetch earnings calendar")
return []
earnings_list = []
for item in data.get("data", []):
try:
call = EarningsCall(
ticker=item.get("ticker", "").upper(),
company_name=item.get("company_name", ""),
report_date=datetime.fromisoformat(item.get("report_date").replace("Z", "")),
fiscal_period=item.get("fiscal_period", "")
)
earnings_list.append(call)
except (KeyError, ValueError) as e:
logger.warning(f"Parse error for item: {e}")
continue
logger.info(f"Fetched {len(earnings_list)} upcoming earnings events")
return earnings_list
def fetch_audio_url(self, ticker: str, fiscal_period: str) -> Optional[str]:
"""
获取指定财报电话会的音频 URL
通过公司 IR 页面或聚合平台查询
"""
# 策略1: 直接查询公司 IR
ir_url = f"https://ir.{ticker.lower()}.com" # 简化示例
# 策略2: Seeking Alpha 音频库
sa_url = f"{self.base_url}/articles"
params = {"q": f"{ticker} {fiscal_period} earnings call"}
data = self._rate_limited_request(sa_url, params)
if not data:
return None
# 解析音频链接(简化)
for article in data.get("results", []):
if "earnings call" in article.get("title", "").lower():
return article.get("audio_url")
return None
def download_audio(self, url: str, output_path: str, chunk_size: int = 8192) -> bool:
"""
下载音频文件到本地
Args:
url: 音频文件 URL
output_path: 本地保存路径
chunk_size: 下载块大小
Returns:
是否下载成功
"""
try:
response = self.session.get(url, stream=True, timeout=30)
response.raise_for_status()
total_size = int(response.headers.get("Content-Length", 0))
with open(output_path, "wb") as f:
downloaded = 0
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
# 进度日志(每 10MB 打印一次)
if downloaded % (10 * 1024 * 1024) == 0:
progress = (downloaded / total_size * 100) if total_size else 0
logger.info(f"Downloaded {progress:.1f}%: {output_path}")
logger.info(f"Audio saved: {output_path} ({total_size / 1024 / 1024:.1f} MB)")
return True
except Exception as e:
logger.error(f"Download failed: {e}")
# 清理不完整文件
if os.path.exists(output_path):
os.remove(output_path)
return False
# 使用示例
if __name__ == "__main__":
fetcher = EarningsCalendarFetcher(api_key=os.environ.get("SA_API_KEY"))
upcoming = fetcher.get_upcoming_earnings(days_ahead=7)
for call in upcoming[:5]: # 取前 5 个演示
logger.info(f"{call.ticker}: {call.fiscal_period} - {call.report_date.strftime('%Y-%m-%d')}")
⚠️ 工程预警:
- Seeking Alpha 对爬虫有限频,建议申请官方 API Key 访问
- 大型机构可考虑 FactSet、Bloomberg Wall Street Sources 等机构数据源
- 音频下载建议异步处理,配合 aiohttp 提升吞吐量
第二步:Whisper 语音转录
有了音频文件,下一步是将其转为带时间戳的文本。OpenAI 的 Whisper 模型是目前开源社区表现最好的通用 ASR 模型,Whisper Large v3 在英文电话会转录上的 WER(词错误率)低于 2%。
import asyncio
import json
import os
import subprocess
from pathlib import Path
from dataclasses import dataclass
from typing import List, Optional, AsyncIterator
from openai import AsyncOpenAI
@dataclass
class TranscriptSegment:
"""转录段落"""
start_time: float # 秒
end_time: float # 秒
speaker: str # 说话人标签
text: str # 原始文本
words: List[dict] # 单词级时间戳(可选)
@dataclass
class EarningsTranscript:
"""完整电话会转录"""
ticker: str
company_name: str
fiscal_period: str
call_date: str
segments: List[TranscriptSegment]
full_text: str
duration_seconds: float
class WhisperTranscriber:
"""
Whisper 语音转录器
支持:
- 时间戳输出
- 说话人分离(需配合 pyannote)
- 批量处理
"""
def __init__(
self,
model: str = "whisper-1",
api_base: str = "https://api.openai.com/v1",
language: str = "en"
):
self.client = AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY"), base_url=api_base)
self.model = model
self.language = language
self._semaphore = asyncio.Semaphore(3) # 限制并发数
async def transcribe_audio(
self,
audio_path: str,
response_format: str = "verbose_json",
timestamp_granularities: List[str] = None
) -> dict:
"""
异步转录音频文件
Args:
audio_path: 音频文件路径
response_format: 输出格式(verbose_json 支持时间戳)
timestamp_granularities: 时间戳粒度
Returns:
Whisper API 原始响应
"""
async with self._semaphore: # 并发控制
with open(audio_path, "rb") as audio_file:
try:
params = {
"model": self.model,
"file": audio_file,
"language": self.language,
"response_format": response_format,
}
if timestamp_granularities:
params["timestamp_granularities"] = timestamp_granularities
response = await asyncio.wait_for(
self.client.audio.transcriptions.create(**params),
timeout=300 # 60分钟电话会需要较长超时
)
return response.model_dump()
except asyncio.TimeoutError:
raise RuntimeError(f"Transcription timeout for {audio_path}")
except Exception as e:
raise RuntimeError(f"Transcription failed: {e}")
async def transcribe_with_speaker_diarization(
self,
audio_path: str,
hf_token: str
) -> EarningsTranscript:
"""
带说话人分离的转录
流程:
1. Whisper 转录(带时间戳)
2. pyannote 说话人分离
3. 合并结果
注意:需要安装 pyannote.audio
"""
try:
from pyannote.audio import Pipeline
except ImportError:
raise ImportError("请安装 pyannote.audio: pip install pyannote.audio")
# 步骤1:转录
transcript_json = await self.transcribe_audio(
audio_path,
response_format="verbose_json",
timestamp_granularities=["word"]
)
# 步骤2:说话人分离
pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1",
use_auth_token=hf_token
)
diarization = pipeline(audio_path)
# 步骤3:合并
segments = self._merge_transcript_with_diarization(
transcript_json,
diarization
)
full_text = " ".join([s.text for s in segments])
return EarningsTranscript(
ticker="", # 调用方填充
company_name="",
fiscal_period="",
call_date="",
segments=segments,
full_text=full_text,
duration_seconds=transcript_json.get("duration", 0)
)
def _merge_transcript_with_diarization(
self,
transcript_json: dict,
diarization
) -> List[TranscriptSegment]:
"""将 Whisper 转录与说话人分离结果合并"""
segments = []
words = transcript_json.get("words", [])
# 按时间窗口分配说话人
for segment in transcript_json.get("segments", []):
seg_start = segment["start"]
seg_end = segment["end"]
# 查找该时间段的说话人
speaker_label = "Unknown"
for turn, _, speaker in diarization.itertracks(yield_label=True):
if turn.start <= seg_start <= turn.end:
speaker_label = speaker
break
segments.append(TranscriptSegment(
start_time=seg_start,
end_time=seg_end,
speaker=speaker_label,
text=segment["text"],
words=[w for w in words if seg_start <= w["start"] <= seg_end]
))
return segments
async def batch_transcribe(
self,
audio_files: List[str],
progress_callback=None
) -> List[dict]:
"""
批量转录
Args:
audio_files: 音频文件路径列表
progress_callback: 进度回调函数
"""
tasks = []
for i, audio_path in enumerate(audio_files):
task = asyncio.create_task(
self._transcribe_with_retry(audio_path, max_retries=3)
)
tasks.append((i, audio_path, task))
results = [None] * len(audio_files)
for i, audio_path, task in tasks:
try:
result = await task
results[i] = result
if progress_callback:
progress_callback(i + 1, len(audio_files))
except Exception as e:
print(f"Failed to transcribe {audio_path}: {e}")
results[i] = None
return results
async def _transcribe_with_retry(
self,
audio_path: str,
max_retries: int = 3
) -> dict:
"""带重试的转录"""
for attempt in range(max_retries):
try:
return await self.transcribe_audio(audio_path)
except Exception as e:
if attempt < max_retries - 1:
wait = (attempt + 1) * 2
await asyncio.sleep(wait)
else:
raise
⚠️ 生产环境建议:
- Whisper API 按分钟计费,建议本地部署 Whisper Large v3 降低长期成本
- 财报电话会通常是 60-90 分钟,注意 ffmpeg 合并和分段策略
- 说话人分离是可选但推荐的增强,提升 LLM 分析的准确性
第三步:LLM 情感打分
转录文本需要转化为结构化指标。我们使用 LLM 对每个发言段落进行情感打分,并计算整体情绪指数。
import os
import json
import asyncio
from dataclasses import dataclass, field
from typing import List, Optional, Dict
from enum import Enum
from openai import AsyncOpenAI
class SentimentLabel(Enum):
"""情感标签枚举"""
VERY_NEGATIVE = "very_negative"
NEGATIVE = "negative"
NEUTRAL = "neutral"
POSITIVE = "positive"
VERY_POSITIVE = "very_positive"
@dataclass
class SegmentSentiment:
"""段落情感分析结果"""
speaker: str
start_time: float
end_time: float
label: SentimentLabel
score: float # 原始分数 [-1, 1]
confidence: float # 模型置信度
key_phrases: List[str] # 关键词列表
commentary: str # 简短解释
@dataclass
class CallSentimentReport:
"""完整电话会情感报告"""
ticker: str
fiscal_period: str
call_date: str
overall_sentiment: float # 整体情绪 [-1, 1]
sentiment_trend: List[float] # 情绪趋势(时间序列)
positive_ratio: float # 正面段落占比
uncertainty_mentions: int # 不确定性词汇提及次数
guidance_tone: str # 前瞻指引语调
segment_analysis: List[SegmentSentiment]
red_flags: List[str] # 红旗信号
green_flags: List[str] # 积极信号
class EarningsSentimentAnalyzer:
"""
财报电话会 LLM 情感分析器
设计原则:
1. 使用结构化输出确保一致性
2. 支持批量处理降低 API 调用成本
3. 内置 Prompt 工程,针对财报场景优化
"""
SYSTEM_PROMPT = """你是一位专业的金融分析师,擅长分析财报电话会的管理层语调。
分析要求:
1. 识别管理层对未来业绩的乐观/谨慎程度
2. 注意负面信号的隐晦表达(如"challenging environment"暗含经营困难)
3. 量化"headwinds"、"uncertainty"、"headwinds"等词的负面程度
4. 标记任何潜在的财务操纵信号
输出格式必须严格遵循 JSON schema,包含所有必需字段。"""
SEGMENT_ANALYSIS_PROMPT = """分析以下财报电话会段落,返回结构化的情感分析:
段落信息:
- 说话人:{speaker}
- 时间:{start_time:.1f}s - {end_time:.1f}s
- 内容:{text}
返回 JSON 格式:
{{
"label": "very_negative|negative|neutral|positive|very_positive",
"score": 浮点数,范围 [-1.0, 1.0],0 为中性,
"confidence": 浮点数,范围 [0.0, 1.0],表示分析置信度,
"key_phrases": ["关键短语列表"],
"commentary": "一句话解释"
}}
注意:
- CFO 和 CEO 的表态权重高于其他高管
- 分析师问答环节的负面追问需重点关注
- "long-term"常用于转移短期失败注意力"""
OVERALL_ANALYSIS_PROMPT = """基于以下分段情感分析,生成整体电话会报告:
分析对象:{ticker} {fiscal_period} 财报电话会
分段结果:
{segment_results}
返回 JSON 格式:
{{
"overall_sentiment": 浮点数 [-1, 1],
"sentiment_trend": [每5分钟的情绪分数序列],
"positive_ratio": 正面段落占比,
"uncertainty_mentions": 不确定性词汇提及次数,
"guidance_tone": "optimistic|cautious|bearish|withheld",
"red_flags": ["红旗信号列表,如未达预期、审计问题等"],
"green_flags": ["积极信号列表,如超预期、上调指引等"]
}}"""
def __init__(
self,
model: str = "gpt-4o",
api_key: Optional[str] = None,
batch_size: int = 10,
temperature: float = 0.1
):
"""
初始化分析器
Args:
model: 使用的 LLM 模型
api_key: API Key,默认从环境变量读取
batch_size: 单次 API 调用的最大段落数
temperature: 生成温度,较低值确保一致性
"""
self.client = AsyncOpenAI(api_key=api_key or os.environ.get("OPENAI_API_KEY"))
self.model = model
self.batch_size = batch_size
self.temperature = temperature
self._semaphore = asyncio.Semaphore(5) # 控制 API 并发
async def analyze_segments(
self,
transcript_segments: List[dict]
) -> List[SegmentSentiment]:
"""
批量分析转录段落
Args:
transcript_segments: Whisper 转录的段落列表
Returns:
SegmentSentiment 列表
"""
results = []
# 分批处理
for i in range(0, len(transcript_segments), self.batch_size):
batch = transcript_segments[i:i + self.batch_size]
try:
batch_results = await self._analyze_batch(batch)
results.extend(batch_results)
except Exception as e:
print(f"Batch analysis failed: {e}")
# 单段落降级处理
for seg in batch:
try:
result = await self._analyze_single_segment(seg)
results.append(result)
except Exception:
results.append(self._default_sentiment(seg))
return results
async def _analyze_batch(self, segments: List[dict]) -> List[SegmentSentiment]:
"""批量调用 LLM 分析"""
async with self._semaphore:
# 构建批量 prompt
batch_text = "\n\n".join([
f"段落 {j+1}:\n说话人: {seg.get('speaker', 'Unknown')}\n"
f"时间: {seg.get('start', 0):.1f}s - {seg.get('end', 0):.1f}s\n"
f"内容: {seg.get('text', '')}"
for j, seg in enumerate(segments)
])
prompt = f"""分析以下多个段落,返回每个段落的情感分析结果:
{batch_text}
对每个段落返回 JSON 对象,格式为:
{{
"segment_index": 段落序号(从1开始),
"label": "情感标签",
"score": 情绪分数 [-1, 1],
"confidence": 置信度 [0, 1],
"key_phrases": ["关键词"],
"commentary": "解释"
}}
所有段落的分析结果用 JSON 数组返回。"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.SYSTEM_PROMPT},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"},
temperature=self.temperature
)
result_data = json.loads(response.choices[0].message.content)
analyses = result_data.get("analyses", result_data.get("segments", []))
# 映射回原始段落
results = []
for j, seg in enumerate(segments):
analysis = next((a for a in analyses if a.get("segment_index") == j + 1), None)
if analysis:
results.append(SegmentSentiment(
speaker=seg.get("speaker", "Unknown"),
start_time=seg.get("start", 0),
end_time=seg.get("end", 0),
label=SentimentLabel(analysis["label"]),
score=analysis["score"],
confidence=analysis["confidence"],
key_phrases=analysis.get("key_phrases", []),
commentary=analysis.get("commentary", "")
))
else:
results.append(self._default_sentiment(seg))
return results
async def _analyze_single_segment(self, segment: dict) -> SegmentSentiment:
"""分析单个段落"""
async with self._semaphore:
prompt = self.SEGMENT_ANALYSIS_PROMPT.format(
speaker=segment.get("speaker", "Unknown"),
start_time=segment.get("start", 0),
end_time=segment.get("end", 0),
text=segment.get("text", "")
)
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.SYSTEM_PROMPT},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"},
temperature=self.temperature
)
result = json.loads(response.choices[0].message.content)
return SegmentSentiment(
speaker=segment.get("speaker", "Unknown"),
start_time=segment.get("start", 0),
end_time=segment.get("end", 0),
label=SentimentLabel(result["label"]),
score=result["score"],
confidence=result["confidence"],
key_phrases=result.get("key_phrases", []),
commentary=result.get("commentary", "")
)
def _default_sentiment(self, segment: dict) -> SegmentSentiment:
"""失败时的默认情感"""
return SegmentSentiment(
speaker=segment.get("speaker", "Unknown"),
start_time=segment.get("start", 0),
end_time=segment.get("end", 0),
label=SentimentLabel.NEUTRAL,
score=0.0,
confidence=0.0,
key_phrases=[],
commentary="Analysis failed"
)
async def generate_report(
self,
ticker: str,
fiscal_period: str,
call_date: str,
segment_analyses: List[SegmentSentiment]
) -> CallSentimentReport:
"""
生成完整情感报告
基于分段分析,生成整体评估
"""
# 构建摘要给 LLM
segment_summaries = [
{
"speaker": s.speaker,
"time_range": f"{s.start_time:.0f}s-{s.end_time:.0f}s",
"sentiment": s.label.value,
"score": s.score,
"key_phrases": s.key_phrases[:3]
}
for s in segment_analyses
]
# 调用 LLM 生成整体报告
async with self._semaphore:
prompt = self.OVERALL_ANALYSIS_PROMPT.format(
ticker=ticker,
fiscal_period=fiscal_period,
segment_results=json.dumps(segment_summaries, indent=2, ensure_ascii=False)
)
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "你是金融分析专家,输出 JSON。"},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"},
temperature=0.1
)
report_data = json.loads(response.choices[0].message.content)
return CallSentimentReport(
ticker=ticker,
fiscal_period=fiscal_period,
call_date=call_date,
overall_sentiment=report_data["overall_sentiment"],
sentiment_trend=report_data["sentiment_trend"],
positive_ratio=report_data["positive_ratio"],
uncertainty_mentions=report_data["uncertainty_mentions"],
guidance_tone=report_data["guidance_tone"],
segment_analysis=segment_analyses,
red_flags=report_data.get("red_flags", []),
green_flags=report_data.get("green_flags", [])
)
def sentiment_to_signal(self, report: CallSentimentReport) -> dict:
"""
将情感报告转化为交易信号
这部分逻辑可自定义,以下是基础版策略
"""
signal = {
"ticker": report.ticker,
"fiscal_period": report.fiscal_period,
"call_date": report.call_date,
"raw_sentiment": report.overall_sentiment,
"signal_direction": "neutral",
"signal_strength": 0.0,
"confidence": 0.0,
"metadata": {
"positive_ratio": report.positive_ratio,
"uncertainty_count": report.uncertainty_mentions,
"guidance_tone": report.guidance_tone,
"red_flags": report.red_flags,
"green_flags": report.green_flags
}
}
# 简单规则(非最优,实际策略需回测优化)
sentiment = report.overall_sentiment
if sentiment >= 0.3 and report.guidance_tone in ("optimistic",):
signal["signal_direction"] = "bullish"
signal["signal_strength"] = min(abs(sentiment) * 2, 1.0)
signal["confidence"] = 0.6
elif sentiment <= -0.3 or report.guidance_tone == "bearish":
signal["signal_direction"] = "bearish"
signal["signal_strength"] = min(abs(sentiment) * 2, 1.0)
signal["confidence"] = 0.6
elif report.uncertainty_mentions >= 5:
signal["signal_direction"] = "uncertain"
signal["confidence"] = 0.3
elif report.red_flags and not report.green_flags:
signal["signal_direction"] = "cautious"
signal["signal_strength"] = 0.3
signal["confidence"] = 0.4
return signal
⚠️ Prompt 工程建议:
- 当前 Prompt 基于 GPT-4o 优化,使用其他模型需调整
- "headwinds/逆风"等词的情感权重可根据实际回测结果调优
- 建议对 CFO 发言单独设置更高权重
第四步:事件回测验证
得到情感信号后,需要验证其预测能力。TickDB 提供 10 年级别的美股历史 K 线数据,支持跨周期回测。
import os
import time
import requests
import json
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List, Optional, Dict
from pathlib import Path
import pandas as pd
@dataclass
class PriceData:
"""价格数据结构"""
timestamp: int # Unix 时间戳(毫秒)
open: float
high: float
low: float
close: float
volume: int
@dataclass
class BacktestEvent:
"""回测事件"""
ticker: str
event_date: str # YYYY-MM-DD
signal_direction: str
signal_strength: float
sentiment_score: float
holding_days: int = 20 # 持有期
forward_return_5d: float = 0.0 # 事件后 5 日收益
forward_return_20d: float = 0.0 # 事件后 20 日收益
benchmark_return_5d: float = 0.0
benchmark_return_20d: float = 0.0
class TickDBClient:
"""
TickDB HTTP 客户端
用于获取历史 K 线数据进行事件回测
"""
def __init__(self, api_key: str = None):
self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
self.base_url = "https://api.tickdb.ai/v1"
self.session = requests.Session()
self.session.headers.update({
"X-API-Key": self.api_key,
"Content-Type": "application/json"
})
self._rate_limit_init()
def _rate_limit_init(self):
"""限频初始化"""
self.min_request_interval = 0.1 # TickDB 标准限制
self.last_request_time = 0
def _rate_limited_get(self, endpoint: str, params: dict = None) -> dict:
"""带限频的 GET 请求"""
current_time = time.time()
elapsed = current_time - self.last_request_time
if elapsed < self.min_request_interval:
time.sleep(self.min_request_interval - elapsed)
response = self.session.get(
f"{self.base_url}{endpoint}",
params=params,
timeout=(3.05, 10)
)
self.last_request_time = time.time()
if response.status_code == 200:
return response.json()
elif response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 5))
time.sleep(retry_after)
return self._rate_limited_get(endpoint, params)
else:
raise RuntimeError(f"API error {response.status_code}: {response.text}")
def get_kline(
self,
symbol: str,
interval: str = "1d",
start_time: int = None,
end_time: int = None,
limit: int = 1000
) -> List[PriceData]:
"""
获取 K 线数据
Args:
symbol: 交易品种,如 "AAPL.US"
interval: K 线周期,"1m", "5m", "1h", "1d"
start_time: 开始时间(Unix 毫秒)
end_time: 结束时间(Unix 毫秒)
limit: 最大返回条数
Returns:
PriceData 列表
"""
params = {
"symbol": symbol,
"interval": interval,
"limit": limit
}
if start_time:
params["start"] = start_time
if end_time:
params["end"] = end_time
data = self._rate_limited_get("/market/kline", params)
return [
PriceData(
timestamp=k["t"],
open=k["o"],
high=k["h"],
low=k["l"],
close=k["c"],
volume=k["v"]
)
for k in data.get("data", [])
]
def get_latest_kline(
self,
symbol: str,
interval: str = "1d"
) -> Optional[PriceData]:
"""
获取最新一根 K 线
用于实时监控
"""
params = {
"symbol": symbol,
"interval": interval
}
data = self._rate_limited_get("/market/kline/latest", params)
k = data.get("data")
if not k:
return None
return PriceData(
timestamp=k["t"],
open=k["o"],
high=k["h"],
low=k["l"],
close=k["c"],
volume=k["v"]
)
class SentimentBacktestEngine:
"""
情感信号事件回测引擎
将 LLM 情感信号与 TickDB 历史 K 线对齐,
计算事件窗口内的超额收益。
"""
def __init__(self, tickdb_client: TickDBClient):
self.tickdb = tickdb_client
def load_price_data(
self,
ticker: str,
start_date: datetime,
end_date: datetime
) -> pd.DataFrame:
"""
加载指定时间范围的价格数据
"""
start_ts = int(start_date.timestamp() * 1000)
end_ts = int(end_date.timestamp() * 1000)
klines = self.tickdb.get_kline(
symbol=f"{ticker}.US",
interval="1d",
start_time=start_ts,
end_time=end_ts,
limit=2000
)
df = pd.DataFrame([
{
"date": datetime.fromtimestamp(k.timestamp / 1000).strftime("%Y-%m-%d"),
"open": k.open,
"high": k.high,
"low": k.low,
"close": k.close,
"volume": k.volume
}
for k in klines
])
df["date"] = pd.to_datetime(df["date"])
return df
def calculate_forward_returns(
self,
price_df: pd.DataFrame,
event_date: str,
holding_days: List[int] = [5, 20]
) -> Dict[str, float]:
"""
计算事件后的远期收益
Args:
price_df: 价格数据 DataFrame
event_date: 事件日期 (YYYY-MM-DD)
holding_days: 持有期列表
Returns:
各持有期的收益字典
"""
event_dt = pd.to_datetime(event_date)
# 找到事件日的价格
event_row = price_df[price_df["date"] == event_dt]
if event_row.empty:
return {f"return_{d}d": None for d in holding_days}
event_price = event_row.iloc[0]["close"]
event_idx = event_row.index[0]
returns = {}
for days in holding_days:
future_idx = event_idx + days
if future_idx < len(price_df):
future_price = price_df.iloc[future_idx]["close"]
returns[f"return_{days}d"] = (future_price - event_price) / event_price
else:
returns[f"return_{days}d"] = None
return returns
def run_backtest(
self,
signals: List[dict],
start_date: str,
end_date: str
) -> pd.DataFrame:
"""
运行完整回测
Args:
signals: LLM 情感信号列表
start_date: 回测开始日期
end_date: 回测结束日期
Returns:
回测结果 DataFrame
"""
# 加载标普 500 作为基准
benchmark_df = self.load_price_data(
"SPY",
pd.to_datetime(start_date) - timedelta(days=60),
pd.to_datetime(end_date) + timedelta(days=30)
)
results = []
for signal in signals:
ticker = signal["ticker"]
event_date = signal["call_date"]
try:
# 加载标的 K 线
price_df = self.load_price_data(
ticker,
pd.to_datetime(start_date) - timedelta(days=60),
pd.to_datetime(end_date) + timedelta(days=30)
)
# 计算超额收益
returns = self.calculate_forward_returns(
price_df,
event_date,
holding_days=[5, 20]
)
# 计算基准收益
benchmark_returns = self.calculate_forward_returns(
benchmark_df,
event_date,
holding_days=[5, 20]
)
result = BacktestEvent(
ticker=ticker,
event_date=event_date,
signal_direction=signal["signal_direction"],
signal_strength=signal["signal_strength"],
sentiment_score=signal["raw_sentiment"],
forward_return_5d=returns.get("return_5d", 0) or 0,
forward_return_20d=returns.get("return_20d", 0) or 0,
benchmark_return_5d=benchmark_returns.get("return_5d", 0) or 0,
benchmark_return_20d=benchmark_returns.get("return_20d", 0) or 0
)
results.append(result)
except Exception as e:
print(f"Backtest error for {ticker} on {event_date}: {e}")
continue
return pd.DataFrame([
{
"ticker": r.ticker,
"event_date": r.event_date,
"signal_direction": r.signal_direction,
"signal_strength": r.signal_strength,
"sentiment_score": r.sentiment_score,
"return_5d": r.forward_return_5d,
"return_20d": r.forward_return_20d,
"excess_return_5d": r.forward_return_5d - r.benchmark_return_5d,
"excess_return_20d": r.forward_return_20d - r.benchmark_return_20d
}
for r in results
])
def analyze_results(self, results_df: pd.DataFrame) -> dict:
"""
分析回测结果
计算分组胜率、夏普比率、最大回撤等指标
"""
if results_df.empty:
return {"error": "No results to analyze"}
analysis = {}
for direction in ["bullish", "bearish", "neutral"]:
subset = results_df[results_df["signal_direction"] == direction]
if len(subset) < 5:
analysis[direction] = {"sample_size": len(subset), "note": "样本不足"}
continue
analysis[direction] = {
"sample_size": len(subset),
"avg_return_5d": subset["return_5d"].mean(),
"avg_return_20d": subset["return_20d"].mean(),
"win_rate_5d": (subset["return_5d"] > 0).mean(),
"win_rate_20d": (subset["return_20d"] > 0).mean(),
"avg_excess_return_5d": subset["excess_return_5d"].mean(),
"avg_excess_return_20d": subset["excess_return_20d"].mean(),
"sharpe_5d": subset["return_5d"].mean() / subset["return_5d"].std() if subset["return_5d"].std() > 0 else 0,
"max_drawdown_20d": (subset["return_20d"].expanding().min() - 1).min()
}
# 全部样本统计
analysis["overall"] = {
"total_events": len(results_df),
"avg_sentiment": results_df["sentiment_score"].mean(),
"correlation_5d": results_df[["sentiment_score", "return_5d"]].corr().iloc[0, 1],
"correlation_20d": results_df[["sentiment_score", "return_20d"]].corr().iloc[0, 1]
}
return analysis
# 使用示例
if __name__ == "__main__":
# 初始化客户端
tickdb = TickDBClient(api_key=os.environ.get("TICKDB_API_KEY"))
# 模拟信号数据(实际使用时从 LLM 分析结果导入)
sample_signals = [
{
"ticker": "AAPL",
"call_date": "2024-11-01",
"signal_direction": "bullish",
"signal_strength": 0.8,
"raw_sentiment": 0.45
},
{
"ticker": "MSFT",
"call_date": "2024-10-22",
"signal_direction": "bearish",
"signal_strength": 0.6,
"raw_sentiment": -0.32
}
# ... 更多信号
]
# 运行回测
engine = SentimentBacktestEngine(tickdb)
results = engine.run_backtest(
sample_signals,
start_date="2024-01-01",
end_date="2024-12-31"
)
# 分析结果
analysis = engine.analyze_results(results)
print(json.dumps(analysis, indent=2))
⚠️ 回测局限性说明:
- 上述为简化示例,实际回测需更长时间跨度(建议 5-10 年)验证统计显著性
- 未考虑流动性冲击和交易成本
- LLM 情感信号与财报发布之间存在时间差,实际执行需考虑滑点
情绪信号的工程化部署
一个完整的情感信号系统需要处理三个关键挑战:实时性、规模化和信号衰减。
实时性:财报发布与信号生成的延迟
财报电话会通常在美股收盘后(东部时间下午 4-5 点)举行,音频完整上传可能需要 2-4 小时。对于隔夜交易者,这个延迟是可接受的;但对于日内策略,需要更快的信号生成。
优化路径包括:
- 实时流式转录(边听边转)
- 预训练财务领域 Whisper 模型
- 简化版 Prompt 牺牲部分准确性换取速度
规模化:覆盖全市场
上述 pipeline 可以处理单场电话会,但要覆盖财报季每天 50+ 家公司,需要:
- 异步批量处理架构
- GPU 集群或云函数自动扩缩容
- 信号优先级队列(大盘股 > 中盘股 > 小盘股)
信号衰减:管理层语调的信息含量随时间递减
学术研究显示,财报电话会的信息含量在事件后 5-20 日内逐渐被市场消化。这意味着:
- 持有期设计需匹配信号衰减曲线
- 可考虑滚动更新信号(每次季报后调整)
- 与其他因子结合使用效果优于单独使用
结语:从噪音中提取信号
财报电话会的每一个词都是管理层的精心选择,而 LLM 的介入让我们第一次有能力系统性地解读这套"管理层语言"。
但技术只是手段。真正的问题是:管理层的语调能预测未来的基本面变化吗?
从工程角度,这套 pipeline 可以稳定运行;但从策略角度,答案需要通过回测来验证。建议你从 2020 年之后的数据开始,逐步扩展回测周期,观察情感信号与实际收益之间的相关性是否稳定。
如果回测结果支持假设,你会发现一个有趣的现象:当 CFO 说"challenging environment"时,市场往往低估了挑战的持续性;而当 CEO 强调"long-term vision"时,短期股价可能正处于均值回归的前夜。
下一步行动
如果你希望亲手实现本文策略:
- 访问 TickDB 控制台 注册(免费 API Key)
- 在 OpenAI 申请 API Key 用于 Whisper 和 GPT-4o
- 克隆本文代码,替换环境变量即可运行
如果你更关注历史数据验证:
联系 [email protected] 获取 TickDB 机构版,支持批量历史 K 线导出和回测数据服务。
如果你是 AI 工具深度用户:
在 ClawHub 搜索安装 TickDB-market-data SKILL,可直接在 AI 助手中调用历史 K 线查询。
本文构建的系统仅为技术演示,不构成任何投资建议。市场有风险,投资需谨慎。回测结果不代表未来收益,LLM 情感分析的准确性受模型能力和 Prompt 质量影响,实际部署前请充分验证。