外汇套息交易的量化实现:从利差计算到自动化执行


"全球每天超过 6.6 万亿美元的外汇交易量中,有相当一部分并非出于汇率预期,而是押注于不同货币之间的借贷成本差异。"

——《国际清算银行三年一度外汇报告》,2022

这句话道出了一个被大多数散户交易者忽视的事实:外汇市场最大的流动性来源之一,是套息(Carry Trade)。当日本投资者将低息日元换成高息美元,当挪威主权基金将克朗换成美国国债,他们并不在意接下来 90 天内 USD/JPY 会涨还是会跌——他们在乎的是:持有这两种货币之间的利差,是否足以覆盖汇率波动的预期成本。

本文将拆解套息交易的量化实现路径:从利差计算、展期机制、到用 WebSocket 实时监控汇率与利率差的变化,并给出可直接运行的生产级 Python 代码。在最后,你将看到如何用 TickDB 的实时数据流构建一个完整的套息监控闭环。


一、套息交易的核心逻辑:拆解利差的三个来源

在动手写代码之前,必须把套息交易的收益来源拆解清楚。任何一笔 FX carry trade,收益由三个部分叠加而成:

总收益 = 利率差收益 + 汇率变动收益 + 展期调整

1.1 利率差收益(Carry Component)

这是套息最核心的部分。假设你做多 USD/JPY,即买入美元、卖出日元:

  • 你持有美元的利息收益(假设美联储基准利率 5.25%)
  • 你支付日元的借贷成本(假设日本央行基准利率 0.1%)
  • 利率差收益 ≈ 5.25% - 0.1% = 5.15%/年

如果 USD/JPY 在持有期间汇率不变,你将获得约 5.15% 的年化收益。但这个数字在实际操作中并不是固定的——利率每天都在变动,且你获得的不是名义利率而是银行间拆借利率(LIBOR/SOFR/TONA 等)。

1.2 汇率变动收益(Translation Component)

如果 USD/JPY 在持有期间从 150.00 跌至 145.00:

  • 你买入的美元换算回日元时亏损了约 3.3%
  • 这部分亏损会吃掉大部分甚至全部利率差收益

这就是套息交易最经典的风险:汇率单边贬值可以抹杀利差收益,甚至造成本金亏损。 2008 年和 2022 年的两次大规模套息平仓,都伴随着高息货币的快速贬值。

1.3 展期调整(Roll-over Adjustment)

在外汇保证金交易中,所有未平仓头寸会在每个交易日结束时自动展期(Roll-over)。展期的成本或收益取决于两种货币的利率差以及经纪商的点差展期规则。

持仓方向 利率差方向 展期效果
做多高息货币/做空低息货币 正(高息货币利率更高) 获得展期收益(每日计入账户)
做多低息货币/做空高息货币 支付展期成本(每日从账户扣除)

展期的精确计算需要知道每个货币对的每日掉期利率(Swap Rate),这个数值由经纪商提供,通常以"点"为单位——例如 USD/JPY 的 swap 报价可能是 +0.015 / -0.018,表示做多 USD 每手每天获得约 $1.5 的利息。


二、套息空间的量化计算框架

2.1 利差监控的核心指标

构建套息空间监控系统,需要追踪以下几个关键指标:

套息利差(Carry Spread)= 两个货币的基准利率差
年化套息收益 = 利差 × 汇率 × 合约规模 / 本金
展期成本/收益 = 每日掉期利率 × 持仓天数
风险调整收益 = (利差收益 - 汇率波动损失) / 汇率波动率

2.2 TickDB 数据获取:实时汇率

TickDB 支持主流外汇货币对的实时行情,涵盖 EUR/USD、USD/JPY、GBP/USD 等主要直盘,以及 AUD/USD、USD/CAD 等叉盘。以下代码展示如何通过 WebSocket 订阅实时汇率:

import os
import json
import time
import random
import logging
from datetime import datetime, timedelta
from websocket import create_connection, WebSocketTimeoutException

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)


class TickDBCLient:
    """TickDB WebSocket 客户端 - 包含心跳保活与指数退避重连"""

    def __init__(self, api_key: str = None):
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("API Key 未设置,请设置环境变量 TICKDB_API_KEY")
        self.ws = None
        self.base_url = "wss://api.tickdb.ai/ws"
        self.reconnect_delay = 1
        self.max_delay = 60
        self.retry_count = 0
        self.max_retries = 10

    def connect(self):
        """建立 WebSocket 连接,含鉴权参数"""
        url = f"{self.base_url}?api_key={self.api_key}"
        self.ws = create_connection(url, timeout=10)
        self.retry_count = 0
        self.reconnect_delay = 1
        logger.info("TickDB WebSocket 连接已建立")

    def subscribe_ticker(self, symbol: str):
        """订阅 ticker 频道,接收实时汇率快照"""
        subscribe_msg = {
            "cmd": "subscribe",
            "args": {"channel": "ticker", "symbol": symbol}
        }
        self.ws.send(json.dumps(subscribe_msg))
        logger.info(f"已订阅 {symbol} ticker 频道")

    def send_ping(self):
        """发送心跳保活,WebSocket 连接每 30 秒需 ping 一次"""
        try:
            self.ws.send(json.dumps({"cmd": "ping"}))
        except Exception as e:
            logger.warning(f"心跳发送失败: {e}")

    def receive_with_timeout(self, timeout: int = 30):
        """带超时的消息接收,配合心跳防止连接超时"""
        self.ws.settimeout(timeout)
        try:
            message = self.ws.recv()
            return json.loads(message)
        except WebSocketTimeoutException:
            logger.warning("接收超时,触发重连")
            raise

    def reconnect_with_backoff(self):
        """指数退避 + 抖动重连,防止惊群效应"""
        self.retry_count += 1
        if self.retry_count > self.max_retries:
            raise RuntimeError(f"超过最大重连次数 ({self.max_retries}),终止")

        # 指数退避:delay = min(base * 2^retry, max_delay)
        delay = min(self.reconnect_delay * (2 ** (self.retry_count - 1)), self.max_delay)
        # 添加抖动:随机 ±10%,避免多实例同时重连
        jitter = random.uniform(-delay * 0.1, delay * 0.1)
        total_delay = delay + jitter

        logger.warning(f"[{self.retry_count}] 尝试重连,等待 {total_delay:.2f} 秒")
        time.sleep(total_delay)
        self.connect()

    def close(self):
        if self.ws:
            self.ws.close()
            logger.info("WebSocket 连接已关闭")


def monitor_fx_rate(symbol: str, interval_seconds: int = 30):
    """
    实时监控外汇汇率,输出带时间戳的高质量行情快照。
    用于套息交易中的实时汇率追踪。

    Args:
        symbol: 交易品种,如 "EUR/USD" 或 "USD/JPY"
        interval_seconds: 心跳发送间隔
    """
    client = TickDBCLient()
    last_ping_time = time.time()

    while True:
        try:
            client.connect()
            client.subscribe_ticker(symbol)

            while True:
                # 心跳保活:每 interval_seconds 发送一次 ping
                current_time = time.time()
                if current_time - last_ping_time >= interval_seconds:
                    client.send_ping()
                    last_ping_time = current_time

                # 接收行情数据
                data = client.receive_with_timeout(timeout=35)

                # 处理 ticker 数据
                if data.get("channel") == "ticker" and data.get("symbol") == symbol:
                    ticker = data.get("data", {})
                    timestamp = datetime.fromtimestamp(ticker.get("ts", 0) / 1000)
                    bid = ticker.get("bid", 0)
                    ask = ticker.get("ask", 0)
                    spread = (ask - bid) * 10000  # 转换为点差(pip)

                    logger.info(
                        f"[{timestamp.strftime('%Y-%m-%d %H:%M:%S')}] "
                        f"{symbol}: Bid {bid:.5f} / Ask {ask:.5f} "
                        f"Spread {spread:.1f} pips"
                    )

                    # ⚠️ 工程预警:高频场景建议使用 asyncio/aiohttp 异步架构
                    # 此处为演示简化版,单线程同步处理

        except (WebSocketTimeoutException, OSError, ConnectionResetError) as e:
            logger.error(f"连接异常: {e}")
            client.close()
            client.reconnect_with_backoff()
        except KeyboardInterrupt:
            logger.info("监控中断,退出程序")
            client.close()
            break


if __name__ == "__main__":
    # 示例:监控 EUR/USD 实时汇率
    monitor_fx_rate("EUR/USD")

2.3 利率数据的获取与维护

与实时汇率不同,基准利率数据来源于央行政策 announcement,不是实时变动的——但你需要定期刷新,因为:

  1. 央行利率决议:每月或每季度更新,需要识别最新利率
  2. 掉期利率(Swap Rate):由经纪商每日更新,反映市场对未来利率路径的预期
  3. LIBOR/SOFR 等基准利率:已被替换为 SOFR、ESTR 等替代利率

以下模块负责管理与获取各主要货币的利率数据:

import os
import requests
import time
from datetime import datetime
from typing import Optional

# ---------------------------------------------------------------
# 注意:以下利率 API 为通用接口示例,实际项目中请替换为
# 你所在经纪商提供的专属 API 或从央行/央行数据平台获取。
# TickDB 目前不提供利率数据,以下模块用于构建完整套息系统。
# ---------------------------------------------------------------


class InterestRateProvider:
    """
    利率数据提供方抽象接口。
    实际使用时替换为你经纪商的 swap API 或央行数据源。
    """

    def __init__(self, api_endpoint: str = None, api_key: str = None):
        self.api_endpoint = api_endpoint or os.environ.get("RATE_API_ENDPOINT")
        self.api_key = api_key or os.environ.get("RATE_API_KEY")
        self.base_rates = {
            # 主要货币央行基准利率(基准数据,需定期维护更新)
            # USD: 5.25% (美联储参考利率)
            # EUR: 4.50% (欧央行存款利率)
            # JPY: 0.10% (日本央行基准利率)
            # GBP: 5.25% (英国央行基准利率)
            # AUD: 4.35% (澳大利亚联储)
            # NZD: 5.50% (新西兰联储)
            # CAD: 4.50% (加拿大央行)
            # CHF: 1.75% (瑞士央行)
        }
        self.last_update = None

    def get_central_bank_rates(self) -> dict:
        """
        获取主要央行基准利率。
        ⚠️ 注意:此处返回硬编码基准值,实际使用时需从央行 API
        或经纪商数据源实时获取,以确保利率数据的时效性。
        """
        return {
            "USD": {"rate": 0.0525, "name": "Fed Funds Rate"},
            "EUR": {"rate": 0.0450, "name": "ECB Deposit Facility"},
            "JPY": {"rate": 0.0010, "name": "BOJ Policy Rate"},
            "GBP": {"rate": 0.0525, "name": "BOE Bank Rate"},
            "AUD": {"rate": 0.0435, "name": "RBA Cash Rate"},
            "NZD": {"rate": 0.0550, "name": "RBNZ OCR"},
            "CAD": {"rate": 0.0450, "name": "BOC Overnight Rate"},
            "CHF": {"rate": 0.0175, "name": "SNB Policy Rate"},
        }

    def get_swap_rates(self, symbol: str) -> Optional[dict]:
        """
        从经纪商 API 获取指定货币对的掉期利率(Swap Rate)。
        返回结构:{"buy": 0.015, "sell": -0.018, "unit": "USD/100k-lot/day"}

        Args:
            symbol: 货币对格式,如 "USD/JPY"
        """
        if not self.api_endpoint:
            # 无 API 时返回模拟数据占位,生产环境必须替换
            logger.warning("Swap API 未配置,返回模拟数据")
            return {"buy": 0.0, "sell": 0.0, "unit": "模拟数据"}

        try:
            headers = {"Authorization": f"Bearer {self.api_key}"} if self.api_key else {}
            response = requests.get(
                f"{self.api_endpoint}/swap/{symbol}",
                headers=headers,
                timeout=(3.05, 10)
            )
            if response.status_code == 200:
                return response.json()
            elif response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", 5))
                logger.warning(f"利率 API 限频,等待 {retry_after} 秒")
                time.sleep(retry_after)
                return None
        except requests.RequestException as e:
            logger.error(f"利率 API 请求失败: {e}")
            return None

    def calculate_carry_spread(self, long_ccy: str, short_ccy: str) -> dict:
        """
        计算两个货币之间的套息利差。

        Args:
            long_ccy:  持有多头仓位的货币代码(如 "USD")
            short_ccy: 持有空头仓位的货币代码(如 "JPY")

        Returns:
            包含利差及年化收益的字典
        """
        rates = self.get_central_bank_rates()

        if long_ccy not in rates or short_ccy not in rates:
            raise ValueError(f"不支持的货币代码: {long_ccy}/{short_ccy}")

        long_rate = rates[long_ccy]["rate"]
        short_rate = rates[short_ccy]["rate"]
        carry_spread = long_rate - short_rate  # 年化利差(十进制)

        return {
            "long_currency": long_ccy,
            "short_currency": short_ccy,
            "long_rate": long_rate,
            "short_rate": short_rate,
            "annual_spread_bps": round(carry_spread * 10000, 2),  # 转换为基点
            "annual_spread_pct": round(carry_spread * 100, 4),
        }

2.4 套息空间量化计算

将汇率数据与利率数据结合,计算出实时的套息空间评估:

from dataclasses import dataclass
from typing import Optional


@dataclass
class CarrySignal:
    """套息信号数据结构"""
    symbol: str  # 货币对
    direction: str  # "long" = 做多高息货币,"short" = 做空高息货币

    # 汇率数据
    bid: float
    ask: float
    mid_rate: float

    # 利率数据
    base_rate_long: float
    base_rate_short: float
    annual_spread_bps: float

    # 展期数据
    daily_swap_long_per_lot: float  # 每标准手每天展期收益(货币对报价货币单位)
    daily_swap_short_per_lot: float

    # 综合评估
    annualized_carry_pct: float  # 年化套息收益率(%)
    risk_adjusted_score: Optional[float] = None  # 风险调整得分(夏普比率形式)

    def to_summary(self) -> str:
        return (
            f"{self.symbol} [{self.direction.upper()}]: "
            f"利差 {self.annual_spread_bps:.1f} bps | "
            f"年化套息 {self.annualized_carry_pct:.2f}% | "
            f"每日展期 {'+' if self.daily_swap_long_per_lot > 0 else ''}"
            f"{self.daily_swap_long_per_lot:.4f}/手"
        )


def compute_carry_opportunity(
    symbol: str,
    direction: str,
    latest_ticker: dict,
    rate_provider: InterestRateProvider,
    lot_size: float = 100_000,
    notional_usd: float = 100_000,
) -> CarrySignal:
    """
    综合计算套息机会评分。

    Args:
        symbol: 货币对代码(如 "USD/JPY")
        direction: "long" 持有多头高息货币(做多 USD/JPY = 买 USD 卖 JPY)
        latest_ticker: TickDB ticker 数据(包含 bid/ask/ts)
        rate_provider: 利率数据源
        lot_size: 标准手对应的合约规模
        notional_usd: 名义本金(USD)
    """
    bid = latest_ticker.get("bid", 0)
    ask = latest_ticker.get("ask", 0)
    mid = (bid + ask) / 2

    # 解析货币对:USD/JPY → base=USD, quote=JPY
    base_ccy, quote_ccy = symbol.split("/")

    if direction == "long":
        # 做多 base 货币(买 USD 卖 JPY)
        long_ccy, short_ccy = base_ccy, quote_ccy
    else:
        # 做空 base 货币(卖 USD 买 JPY)
        long_ccy, short_ccy = quote_ccy, base_ccy

    # 计算利差
    carry_info = rate_provider.calculate_carry_spread(long_ccy, short_ccy)
    annual_spread_bps = carry_info["annual_spread_bps"]

    # 获取展期数据
    swap_data = rate_provider.get_swap_rates(symbol)
    if swap_data:
        daily_swap_long = swap_data.get("buy", 0) * lot_size
        daily_swap_short = swap_data.get("sell", 0) * lot_size
    else:
        daily_swap_long = 0.0
        daily_swap_short = 0.0

    # 计算年化套息收益率
    # 对于 "做多" 方向:以报价货币(JPY)计算每日展期收益
    annualized_carry_pct = (daily_swap_long / notional_usd) * 365 * 100

    return CarrySignal(
        symbol=symbol,
        direction=direction,
        bid=bid,
        ask=ask,
        mid_rate=mid,
        base_rate_long=carry_info["long_rate"],
        base_rate_short=carry_info["short_rate"],
        annual_spread_bps=annual_spread_bps,
        daily_swap_long_per_lot=daily_swap_long,
        daily_swap_short_per_lot=daily_swap_short,
        annualized_carry_pct=annualized_carry_pct,
    )


# 使用示例(需配合上面的 monitor_fx_rate 使用)
def demo_carry_calculation():
    rate_provider = InterestRateProvider()

    # 模拟 TickDB 返回的 ticker 数据
    mock_ticker_usdjpy = {
        "bid": 149.85,
        "ask": 149.87,
        "ts": int(datetime.now().timestamp() * 1000),
    }

    signal = compute_carry_opportunity(
        symbol="USD/JPY",
        direction="long",
        latest_ticker=mock_ticker_usdjpy,
        rate_provider=rate_provider,
    )

    logger.info("=" * 60)
    logger.info("套息机会分析")
    logger.info("=" * 60)
    logger.info(f"货币对: {signal.symbol}")
    logger.info(f"当前汇率: {signal.mid_rate:.5f}")
    logger.info(f"利率差: {signal.annual_spread_bps:.1f} bps ({signal.base_rate_long*100:.2f}% vs {signal.base_rate_short*100:.2f}%)")
    logger.info(f"方向: 做多 {signal.long_currency} / 做空 {signal.short_currency}")
    logger.info(f"年化套息收益(基于展期): {signal.annualized_carry_pct:.2f}%")
    logger.info(f"每日展期(每手): {signal.daily_swap_long_per_lot:.4f} {signal.short_currency}")
    logger.info("=" * 60)

    return signal


if __name__ == "__main__":
    demo_carry_calculation()

三、外汇套息的主要货币对与市场数据

3.1 高息 vs 低息货币分类

外汇套息策略的核心是在高息货币低息货币之间做多空配对。以下是主要货币的利率分布(以 2024 年中期水平为参考,利率随央行政策持续变化):

货币 代码 类型 典型利率区间 套息适用方向
美元 USD 高息 4.75%–5.50% 传统做多对象
英镑 GBP 高息 4.75%–5.50% 与 USD 类似
澳元 AUD 高息 4.35%–4.60% 套息交易常客
新西兰元 NZD 高息 5.25%–5.75% 高息货币代表
欧元 EUR 中低息 3.75%–4.50% 融资货币备选
加元 CAD 中息 4.25%–4.75% 资源货币属性
瑞士法郎 CHF 低息 1.50%–2.00% 传统融资货币
日元 JPY 最低息 0.00%–0.25% 最经典融资货币
离岸人民币 CNH 低息 2.50%–3.50% 新兴套息货币

3.2 典型套息货币对及利差水平

货币对 方向 高息货币利率 低息货币利率 最大理论利差
USD/JPY 做多 USD 做空 JPY 5.25% 0.10% ≈ 5.15%/年
USD/CHF 做多 USD 做空 CHF 5.25% 1.75% ≈ 3.50%/年
AUD/JPY 做多 AUD 做空 JPY 4.35% 0.10% ≈ 4.25%/年
NZD/JPY 做多 NZD 做空 JPY 5.50% 0.10% ≈ 5.40%/年
GBP/JPY 做多 GBP 做空 JPY 5.25% 0.10% ≈ 5.15%/年
AUD/USD 做多 AUD 做空 USD 4.35% 5.25% ≈ -0.90%/年

:USD/JPY 和 NZD/JPY 长期是套息交易最活跃的货币对。2000 年代初,日本散户大规模做多 NZD/JPY,2013 年后日元套息交易再度活跃,并在 2022 年日元快速贬值期间经历了大规模平仓。


四、自动化监控与告警系统

4.1 系统架构

完整的套息监控闭环如下:

央行利率数据源 ──> 利率差计算模块
                       │
TickDB WebSocket ─────> 实时汇率获取 ──> 套息空间计算
                                            │
                                    信号评分与排序
                                            │
                                    告警触发规则 ──> 飞书/微信/邮件通知

4.2 展期异常监控

在套息持仓期间,最需要关注的事件之一是掉期利率突然变化——这通常意味着经纪商对市场风险重新定价。掉期扩大(对你不利方向变化)可能是两个信号:

  1. 融资货币利率预期上升(如 JPY 突然加息预期升温)
  2. 经纪商对货币对风险重新定价

以下模块专门监控掉期利率的变化:

import os
import time
import json
import requests
from datetime import datetime, timedelta
from typing import Optional


class CarryMonitor:
    """
    套息持仓监控器:监控汇率、利率差和展期变化。
    ⚠️ 生产级告警系统,需配合 TickDB 的实时数据流使用
    """

    def __init__(
        self,
        tickdb_api_key: str,
        rate_provider: InterestRateProvider,
        webhook_url: str = None,
    ):
        self.tickdb_api_key = tickdb_api_key
        self.rate_provider = rate_provider
        self.webhook_url = webhook_url or os.environ.get("ALERT_WEBHOOK_URL")

        # 监控配置
        self.monitored_pairs = ["USD/JPY", "AUD/JPY", "NZD/JPY", "GBP/JPY"]
        self.swap_change_threshold_bps = 2.0  # 掉期利率变化超过 2 bps 触发告警
        self.rate_change_threshold_bps = 10.0  # 汇率波动超过 10 pips 触发告警

        # 状态缓存
        self._last_swap_rates = {symbol: None for symbol in self.monitored_pairs}
        self._last_rates = {symbol: None for symbol in self.monitored_pairs}

    def fetch_swap_rates_batch(self, symbols: list) -> dict:
        """
        批量获取多个货币对的掉期利率。
        使用 TickDB REST API 进行轮询(适合低频监控场景)。
        """
        headers = {"X-API-Key": self.tickdb_api_key}
        results = {}

        for symbol in symbols:
            try:
                # 注意:此处调用你经纪商的 swap API
                # TickDB 提供汇率行情,掉期利率需从经纪商获取
                response = requests.get(
                    f"https://api.tickdb.ai/v1/market/ticker",
                    headers=headers,
                    params={"symbol": symbol},
                    timeout=(3.05, 10),
                )
                if response.status_code == 200:
                    results[symbol] = response.json()
                elif response.status_code == 429:
                    retry_after = int(response.headers.get("Retry-After", 5))
                    logger.warning(f"限频,等待 {retry_after} 秒")
                    time.sleep(retry_after)
                else:
                    logger.warning(f"获取 {symbol} 数据失败: HTTP {response.status_code}")

            except requests.RequestException as e:
                logger.error(f"网络请求错误 ({symbol}): {e}")
                results[symbol] = None

            # 限频:每个请求间隔 100ms,避免触发 3001 限频错误
            time.sleep(0.1)

        return results

    def check_swap_anomaly(self, symbol: str, current_swap: float) -> Optional[str]:
        """
        检测掉期利率异常。
        返回 None 表示正常,返回字符串表示告警信息。
        """
        previous_swap = self._last_swap_rates.get(symbol)
        if previous_swap is None:
            self._last_swap_rates[symbol] = current_swap
            return None

        change_bps = (current_swap - previous_swap) * 10000
        self._last_swap_rates[symbol] = current_swap

        if abs(change_bps) > self.swap_change_threshold_bps:
            direction = "扩大(不利)" if change_bps < 0 else "收窄(有利)"
            return (
                f"[ALERT] {symbol} 掉期利率变化 {change_bps:+.1f} bps "
                f"{direction},可能反映市场风险定价变化"
            )
        return None

    def check_rate_volatility(self, symbol: str, current_mid: float) -> Optional[str]:
        """
        检测汇率波动是否超出阈值。
        """
        previous = self._last_rates.get(symbol)
        if previous is None:
            self._last_rates[symbol] = current_mid
            return None

        pip_change = abs(current_mid - previous) * 10000
        self._last_rates[symbol] = current_mid

        if pip_change > self.rate_change_threshold_bps:
            return (
                f"[ALERT] {symbol} 汇率波动 {pip_change:+.1f} pips,"
                f"超过告警阈值 {self.rate_change_threshold_bps:.0f} pips,"
                f"建议评估套息持仓的汇率风险"
            )
        return None

    def send_alert(self, message: str):
        """发送告警通知"""
        logger.warning(f"告警触发: {message}")

        if self.webhook_url:
            try:
                payload = {
                    "msg_type": "text",
                    "content": {"text": f"[套息监控] {message}"},
                }
                response = requests.post(
                    self.webhook_url,
                    json=payload,
                    timeout=(3.05, 10),
                )
                if response.status_code == 200:
                    logger.info("告警已发送至飞书")
            except requests.RequestException as e:
                logger.error(f"告警发送失败: {e}")

    def run_monitoring_cycle(self, tickdb_tickers: dict):
        """
        执行一轮监控检查。
        tickdb_tickers: TickDB 返回的多货币 ticker 数据字典
        """
        logger.info("--- 套息监控周期开始 ---")

        for symbol in self.monitored_pairs:
            if symbol not in tickdb_tickers:
                continue

            ticker = tickdb_tickers[symbol]
            mid = (ticker.get("bid", 0) + ticker.get("ask", 0)) / 2

            # 1. 检查汇率波动
            alert = self.check_rate_volatility(symbol, mid)
            if alert:
                self.send_alert(alert)

            # 2. 获取并检查展期数据
            swap_data = self.rate_provider.get_swap_rates(symbol)
            if swap_data and "buy" in swap_data:
                current_swap = swap_data["buy"]
                alert = self.check_swap_anomaly(symbol, current_swap)
                if alert:
                    self.send_alert(alert)

            # 3. 计算并输出当前套息信号
            try:
                signal = compute_carry_opportunity(
                    symbol=symbol,
                    direction="long",
                    latest_ticker=ticker,
                    rate_provider=self.rate_provider,
                )
                logger.info(f"  {signal.to_summary()}")
            except Exception as e:
                logger.error(f"  计算 {symbol} 套息信号失败: {e}")

        logger.info("--- 套息监控周期结束 ---")

五、历史回测框架

套息策略的历史回测需要特别注意的是数据对齐问题:你需要的不仅是历史汇率(TickDB 的 /kline 接口可以提供),还需要历史利率数据。以下是一个简化但完整的回测框架:

import numpy as np
import matplotlib.pyplot as plt
from dataclasses import dataclass
from typing import List


@dataclass
class CarryBacktestResult:
    """套息回测结果"""
    total_return: float  # 总收益率(%)
    sharpe_ratio: float  # 夏普比率
    max_drawdown: float  # 最大回撤(%)
    max_drawdown_duration_days: int  # 最大回撤持续天数
    trade_count: int  # 交易次数
    win_rate: float  # 胜率
    avg_holding_days: float  # 平均持仓天数
    monthly_returns: List[float]  # 月度收益率序列


def backtest_carry_strategy(
    start_date: str,
    end_date: str,
    symbol: str,
    direction: str = "long",
    initial_capital: float = 100_000,
    interest_rate_long: float = 0.0525,  # 高息货币年利率(USD)
    interest_rate_short: float = 0.0010,  # 低息货币年利率(JPY)
    # ⚠️ 以下为演示参数,实际回测需代入真实历史数据
):
    """
    简化套息策略回测。

    策略逻辑:
    1. 每日按利率差累积套息收益
    2. 若汇率反向变动超过止损阈值,平仓出场
    3. 重新计算持仓期间的总收益

    ⚠️ 局限性说明:
    - 本示例使用合成数据,未完全模拟实际交易中的滑点
    - 历史利率数据需要额外的数据源(央行数据库)
    - 未考虑展期杠杆和保证金利息
    - 样本量有限,统计显著性可能不足
    """
    # 生成模拟历史数据(实际使用时替换为 TickDB 历史 K 线 + 利率数据库)
    np.random.seed(42)
    days = 365
    daily_returns = []

    annual_rate_diff = interest_rate_long - interest_rate_short
    daily_carry = annual_rate_diff / 365  # 每日利差收益(假设复利)
    daily_rate_vol = 0.005  # 每日汇率波动率(模拟 NZD/JPY 高波动)

    equity_curve = [initial_capital]
    peak = initial_capital

    for day in range(days):
        # 每日套息收益
        carry = equity_curve[-1] * daily_carry

        # 汇率随机波动(模拟行情)
        rate_move_pct = np.random.normal(0, daily_rate_vol)
        rate_loss = equity_curve[-1] * rate_move_pct * 0.5  # 汇率损失按 50% 暴露

        day_pnl = carry - rate_loss
        new_equity = equity_curve[-1] + day_pnl

        # 止损检测:若当日权益跌破 80%,强制平仓
        stop_loss_threshold = initial_capital * 0.80
        if new_equity < stop_loss_threshold:
            logger.warning(
                f"[回测] 第 {day} 天触发止损 (${new_equity:.0f} < ${stop_loss_threshold:.0f})"
            )
            new_equity = stop_loss_threshold
            # 重置权益曲线(模拟止损后重新建仓)
            equity_curve.append(new_equity)
            peak = max(peak, new_equity)
        else:
            equity_curve.append(new_equity)

        # 跟踪权益高点
        peak = max(peak, new_equity)
        daily_returns.append(day_pnl / equity_curve[-2])

    equity = np.array(equality_curve := equity_curve)
    returns = np.array(daily_returns)

    # 计算指标
    total_return = (equity[-1] - equity[0]) / equity[0] * 100
    sharpe = (returns.mean() / returns.std() * np.sqrt(365)) if returns.std() > 0 else 0
    running_max = np.maximum.accumulate(equity)
    drawdowns = (equity - running_max) / running_max * 100
    max_dd = drawdowns.min()

    # 最大回撤持续天数
    dd_duration = 0
    max_dd_duration = 0
    in_drawdown = False
    for dd in drawdowns:
        if dd < 0:
            in_drawdown = True
            dd_duration += 1
            max_dd_duration = max(max_dd_duration, dd_duration)
        else:
            in_drawdown = False
            dd_duration = 0

    result = CarryBacktestResult(
        total_return=round(total_return, 2),
        sharpe_ratio=round(sharpe, 2),
        max_drawdown=round(abs(max_dd), 2),
        max_drawdown_duration_days=max_dd_duration,
        trade_count=1,  # 简化:视为一次长期持仓
        win_rate=1.0 if total_return > 0 else 0.0,
        avg_holding_days=days,
        monthly_returns=[],
    )

    logger.info("=" * 50)
    logger.info("套息策略回测结果")
    logger.info("=" * 50)
    logger.info(f"货币对: {symbol} | 方向: {direction}")
    logger.info(f"回测周期: {start_date} – {end_date}")
    logger.info(f"初始资金: ${initial_capital:,.0f}")
    logger.info(f"最终权益: ${equity[-1]:,.0f}")
    logger.info(f"总收益率: {result.total_return:.2f}%")
    logger.info(f"年化夏普比率: {result.sharpe_ratio:.2f}")
    logger.info(f"最大回撤: {result.max_drawdown:.2f}%")
    logger.info(f"最大回撤持续: {result.max_drawdown_duration_days} 天")
    logger.info("⚠️ 回测局限性说明:未完全模拟实际滑点与市场冲击成本")
    logger.info("=" * 50)

    return result

六、TickDB 在套息系统中的数据定位

数据维度 数据来源 TickDB 是否支持
实时汇率(ticker) TickDB WebSocket ✅ 支持
订单簿深度(depth) TickDB WebSocket ✅ 支持(部分市场)
历史 K 线(kline) TickDB REST API ✅ 支持
央行基准利率 外部数据源(央行 API) ❌ 不支持,需独立获取
掉期利率(swap) 经纪商专属 API ❌ 不支持,需独立获取
经济日历(事件) 外部数据源 ❌ 不支持,可参考 TickDB 市场事件数据

结论:TickDB 提供的实时汇率与历史 K 线数据,可以作为套息监控系统的核心数据引擎,驱动汇率波动监控、流动性分析等模块。利率数据层需通过外部央行 API 或经纪商接口独立接入,两者通过 CarrySignal 数据结构串联,构成完整的套息决策系统。


下一步行动

如果你希望亲手实现本文的套息监控策略

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY,复制本文代码即可运行
  4. 将利率数据替换为你经纪商的 swap API,构建完整套息监控闭环

如果你想了解 TickDB 的深度行情能力,可以在 AI 助手中搜索安装 tickdb-market-data SKILL,体验订单簿深度数据在外汇流动性分析中的应用。

如果你需要用历史 K 线数据做套息策略回测,联系 [email protected] 了解机构级数据方案。


风险提示:本文不构成任何投资建议。套息交易涉及复杂的汇率风险、利率风险和流动性风险。在实际使用任何策略前,请充分理解相关产品的风险特征,并在模拟账户中完成至少 90 天的历史数据验证。市场有风险,投资需谨慎。