前言

2023 年 8 月 28 日,券商板块集体涨停。

如果你在回测中运行的是"政策利好驱动券商板块动量"策略,这一天你的模拟盘会显示:精准在涨停板买入,按计划在下一个交易日卖出,两天收益率 12%。

然后你满心欢喜开始实盘,发现:涨停板上根本没有卖盘——下单显示"已报待成",交易所将你的订单挂在"买卖委托队列"里,次日开盘前才告诉你:没有成交。

这是 A 股回测中最常见、最隐蔽的错误之一:把涨跌停板当成普通交易日来处理

在美股回测里,你很少遇到"买不进"的问题——即使一只股票涨了 20%,深度足够的情况下你仍然可以在收盘价附近成交。但在 A 股,涨跌停是一种制度性阻断。它不是价格的偶然跳变,而是交易所硬性停止买卖的行政手段。一旦某只股票进入涨跌停板状态,当日任何以涨停价买入、以跌停价卖出的委托都将进入排队队列,而非以合理价格被动成交。

这篇文章的核心任务是:用 TickDB 的 A 股 K 线数据,构建一套完整的涨跌停处理框架——从标记识别、成交量校验,到 T+1 约束下的仓位修正——让你的回测不要再"纸上富贵"。


一、A 股涨跌停的微观机制

1.1 规则速览

A 股的涨跌停限制根据股票类型有所不同:

股票类型 涨跌停幅度 ST 股 新股(上市首日)
主板(沪深) ±10% ±5% 无限制(盘中临停)
创业板/科创板 ±20% ±20% 无限制
北交所 ±30% ±30% 无限制

计算方式为:当日价格不得超过上一交易日收盘价 × (1 ± 涨跌幅限制)

以中国平安(601318.SH)为例,假设前一交易日收盘价为 48.50 元:

  • 涨停价:48.50 × 1.10 = 53.35 元
  • 跌停价:48.50 × 0.90 = 43.65 元

1.2 涨跌停板的微观结构特征

当一只股票涨停时,交易所的撮合机制发生了根本性变化:

正常交易日:买卖双方持续撮合,价格在均衡价附近小幅波动,成交可以在任何价位发生。

涨停日:当日所有以涨停价买入的委托进入买卖委托队列,按"价格优先、时间优先"规则排队等待。只有当卖盘有剩余时,队列中的买单才依次成交。如果当日没有卖盘(或者卖盘量极小),则实际上零成交——尽管盘面上涨停的形态非常"漂亮"。

这种机制意味着:回测中不能简单地将"涨停日收盘价"当作可成交价格。你必须判断当日是否存在有效流动性。

交易日状态 买一量 卖一量 买卖价差 成交量 策略可执行性
正常日 充足 充足 正常 正常 完全可执行
涨停日(高流动性) 充足 极少 0(已是涨停价) 正常 买入困难,卖出可行
涨停日(零流动性) 极小(排队量) 0 0 极低/为0 买入几乎不可能
跌停日 0 充足 0 极低/为0 卖出几乎不可能

关键结论:零流动性日≠价格不动,而是"想买买不进、想卖卖不出"。回测如果忽略这一点,会在"成交模拟"环节产生严重正偏差——尤其在动量策略中,因为动量信号恰好会在大涨后触发追入信号,而追入的时机往往就是涨停当天。

1.3 为什么美股回测不需要处理这个

美股没有涨跌停板,股价理论上可以在一个交易日内涨跌任意幅度(1987 年 10 月 19 日道琼斯单日跌幅 22.6% 就是极端案例)。因此美股回测中不存在"买不进"的制度性阻断——价格波动剧烈时会有流动性枯竭,但那是市场行为而非行政手段,模拟方式完全不同。

A 股的涨跌停是制度层面的硬约束,必须作为离散事件处理,不能用"滑点"来近似。


二、标记识别:如何用 TickDB 数据检测涨跌停日

2.1 检测逻辑

涨跌停的本质是:当日最高价触及涨停板上限,或者最低价触及跌停板下限。但更严格的标准是:当日实际成交量必须大于零——因为有一种特殊情况叫"一字涨停",即开盘即涨停,全天零成交,委托队列排队到收盘仍未成交。

在 TickDB 中,我们用日 K 线数据计算:

涨停标记 = (high >= prev_close × (1 + limit_rate)) AND volume > 0
跌停标记 = (low <= prev_close × (1 - limit_rate)) AND volume > 0
一字涨停/跌停 = high == low AND volume == 0

注意 volume > 0 的重要性:排除掉停牌日(一字涨跌停通常意味着集合竞价后没有实际成交)。

2.2 用 TickDB /kline 接口拉取日 K 线数据

A 股的交易日 K 线数据通过 GET /v1/market/kline 接口获取,参数说明:

参数 类型 说明 示例
symbol string 股票代码,交易所后缀 600519.SH(茅台)
interval string K 线周期 1d(日线)
limit int 最大返回条数 100
start_time string 开始时间(Unix ms) 1709299200000(2024-03-01)

⚠️ 工程提醒:生产环境中如果需要拉取大量股票代码,应使用 aiohttp 异步并发请求,并做好限频处理(code:3001 来自 TickDB 接口)。本节展示同步版本用于教学,第五章将给出完整异步版本。

import os
import time
import json
import random
from datetime import datetime, timedelta
from typing import Optional
import requests

# ============================================================
# TickDB API 封装(生产级,含心跳/重连/限频)
# ============================================================

class TickDBClient:
    """TickDB 标准客户端:指数退避重连 + 抖动 + 限频处理"""

    def __init__(self, api_key: Optional[str] = None):
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("请设置环境变量 TICKDB_API_KEY")
        self.base_url = "https://api.tickdb.ai/v1"
        self._session = requests.Session()
        self._session.headers.update({"X-API-Key": self.api_key})

    def _request(self, method: str, path: str, **kwargs) -> dict:
        """统一请求入口,含超时 + 退避重连"""
        kwargs.setdefault("timeout", (3.05, 10))
        url = f"{self.base_url}{path}"
        max_retries = 5
        base_delay = 1.0
        max_delay = 32.0

        for attempt in range(max_retries):
            try:
                response = self._session.request(method, url, **kwargs)
                if response.status_code == 200:
                    data = response.json()
                    code = data.get("code", 0)
                    if code == 0:
                        return data
                    if code == 3001:
                        # 限频:读取 Retry-After 头
                        retry_after = int(response.headers.get("Retry-After", 5))
                        print(f"[TickDB] 限频触发,等待 {retry_after}s")
                        time.sleep(retry_after)
                        continue
                    raise RuntimeError(f"API 错误 code={code}: {data.get('message')}")
                elif response.status_code == 429:
                    retry_after = int(response.headers.get("Retry-After", 5))
                    print(f"[TickDB] HTTP 429,等待 {retry_after}s")
                    time.sleep(retry_after)
                    continue
                else:
                    response.raise_for_status()
            except (requests.exceptions.Timeout,
                    requests.exceptions.ConnectionError) as e:
                delay = min(base_delay * (2 ** attempt), max_delay)
                jitter = random.uniform(0, delay * 0.1)
                wait = delay + jitter
                print(f"[TickDB] 连接异常(第{attempt+1}次),{wait:.2f}s后重试: {e}")
                time.sleep(wait)
        raise RuntimeError(f"[TickDB] 重试{max_retries}次后仍失败,放弃请求 {path}")

    def get_klines(self, symbol: str, interval: str = "1d",
                   limit: int = 100, start_time: Optional[int] = None) -> list:
        """获取历史 K 线数据"""
        params = {"symbol": symbol, "interval": interval, "limit": limit}
        if start_time:
            params["start"] = start_time
        data = self._request("GET", "/market/kline", params=params)
        return data.get("data", [])


# ============================================================
# 涨跌停标记检测
# ============================================================

def get_limit_rate(symbol: str) -> float:
    """根据股票代码返回涨跌停幅度

    规则:
    - 上交所/深交所主板: ±10%
    - 科创板/创业板: ±20%
    - 北交所: ±30%
    """
    code = symbol.split(".")[0]
    if code.startswith(("688", "001", "002", "003")):
        # 简化判断:688=科创板,001/002/003 后缀判断
        if code.startswith("688"):
            return 0.20  # 科创板
    if code.startswith(("4", "8")):
        return 0.30  # 北交所
    return 0.10  # 默认主板


def compute_limit_prices(prev_close: float, limit_rate: float) -> dict:
    """计算涨跌停价格"""
    return {
        "up_limit": round(prev_close * (1 + limit_rate), 2),
        "down_limit": round(prev_close * (1 - limit_rate), 2),
    }


def detect_limit_days(klines: list, limit_rate: float = 0.10) -> list:
    """检测涨跌停日

    Args:
        klines: TickDB 返回的 K 线列表(已按时间升序)
        limit_rate: 涨跌停幅度,默认 10%

    Returns:
        含涨跌停标记的 K 线列表
    """
    marked = []
    for i, k in enumerate(klines):
        open_px = float(k["open"])
        high = float(k["high"])
        low = float(k["low"])
        close = float(k["close"])
        volume = float(k["volume"])

        # 跳过第一条(需要前一日收盘价)
        if i == 0:
            marked.append({**k, "limit_up": False, "limit_down": False,
                           "one_word": False, "up_limit": None, "down_limit": None})
            continue

        prev_close = float(klines[i - 1]["close"])
        ul, dl = compute_limit_prices(prev_close, limit_rate)

        limit_up = (high >= ul) and (volume > 0)
        limit_down = (low <= dl) and (volume > 0)
        one_word = (high == low) and (volume == 0)

        marked.append({
            **k,
            "limit_up": limit_up,
            "limit_down": limit_down,
            "one_word": one_word,
            "up_limit": ul,
            "down_limit": dl,
            "prev_close": prev_close,
        })
    return marked


def fetch_and_mark_limit_days(client: TickDBClient,
                                symbol: str,
                                days: int = 250) -> list:
    """拉取数据并标记涨跌停日(完整流程)"""
    limit_rate = get_limit_rate(symbol)
    klines = client.get_klines(symbol, interval="1d", limit=days)
    return detect_limit_days(klines, limit_rate)


# ============================================================
# 演示
# ============================================================

if __name__ == "__main__":
    client = TickDBClient()
    # 拉取贵州茅台近250个交易日(约一年)
    klines = fetch_and_mark_limit_days(client, "600519.SH", days=250)

    limit_up_days = [k for k in klines if k.get("limit_up")]
    limit_down_days = [k for k in klines if k.get("limit_down")]
    one_word_days = [k for k in klines if k.get("one_word")]

    print(f"总交易日数: {len(klines)}")
    print(f"涨停天数: {len(limit_up_days)}")
    print(f"跌停天数: {len(limit_down_days)}")
    print(f"一字涨跌停天数: {len(one_word_days)}")

    if limit_up_days:
        print("\n最近一次涨停:")
        k = limit_up_days[-1]
        ts = k.get("ts", 0) // 1000
        print(f"  时间: {datetime.fromtimestamp(ts).date()}")
        print(f"  收盘价: {k['close']}, 最高价: {k['high']}")
        print(f"  涨停价上限: {k['up_limit']}, 触及: {k['high'] >= k['up_limit']}")

⚠️ 工程预警:上述代码使用同步 requests,适合低频回测(每日一次调仓)。如果你做分钟级策略或需要拉取上百只股票,请切换到 aiohttp + asyncio,并使用信号量控制并发不超过 20。


三、成交量校验:为什么涨停日的高点不等于"可买入价"

3.1 问题描述

即便识别出了涨停日,你仍然面临一个问题:涨停日当日确实有成交——但成交集中在涨停价附近,量很小

例如,某只股票以 10.00 元涨停(前收盘价 9.09 元),当日成交量 50 万股,而买一排队量高达 2000 万股。这意味着:

  • 你能买入的概率约为 50/2000 = 2.5%
  • 如果回测用"涨停价买入 100 万股"来模拟,偏差超过 97%

3.2 成交量校验算法

核心思路:将"涨停日(或跌停日)理论可买入/卖出量"与"当日实际成交量"比较,取较小值作为当日可执行量

from dataclasses import dataclass
from typing import Optional


@dataclass
class Position:
    """持仓数据结构"""
    symbol: str
    quantity: int        # 总持仓量
    locked_t1: int       # T+1 锁定量(今日买入,明日才能卖)
    available: int      # 可交易量
    avg_cost: float      # 加权平均成本


@dataclass
class FillResult:
    """成交结果"""
    success: bool
    filled_quantity: int = 0        # 实际成交量
    rejected_quantity: int = 0      # 被拒绝量
    reason: str = ""


def check_limit_execution(kline: dict, target_direction: str,
                          target_quantity: int) -> FillResult:
    """涨跌停日成交量校验

    Args:
        kline: 当前 K 线(含涨跌停标记)
        target_direction: "buy" 或 "sell"
        target_quantity: 目标买卖数量

    Returns:
        FillResult: 实际可执行量
    """
    if target_direction not in ("buy", "sell"):
        raise ValueError("target_direction 必须为 'buy' 或 'sell'")

    actual_volume = float(kline["volume"])

    if kline.get("one_word"):
        return FillResult(
            success=False,
            rejected_quantity=target_quantity,
            reason="一字涨跌停,全日零成交,无法成交"
        )

    if kline.get("limit_up") and target_direction == "buy":
        # 涨停日买入:实际可买量 = min(当日成交量, 目标量)
        filled = min(int(actual_volume), target_quantity)
        rejected = target_quantity - filled
        reason = ""
        if rejected > 0:
            reason = f"涨停日流动性不足,{target_quantity}股中仅{filled}股可成交"
        return FillResult(
            success=filled > 0,
            filled_quantity=filled,
            rejected_quantity=rejected,
            reason=reason
        )

    if kline.get("limit_down") and target_direction == "sell":
        # 跌停日卖出:实际可卖量 = min(当日成交量, 目标量)
        filled = min(int(actual_volume), target_quantity)
        rejected = target_quantity - filled
        reason = ""
        if rejected > 0:
            reason = f"跌停日流动性枯竭,{target_quantity}股中仅{filled}股可成交"
        return FillResult(
            success=filled > 0,
            filled_quantity=filled,
            rejected_quantity=rejected,
            reason=reason
        )

    # 非涨跌停日:全额成交
    return FillResult(
        success=True,
        filled_quantity=target_quantity,
        rejected_quantity=0,
        reason="正常成交"
    )

💡 关键洞察:成交量校验的核心假设是——在涨跌停日,所有实际成交量来自卖方主动卖出(买方被排队压制)。因此实际可买入量理论上等于当日成交量。但严格来说,应该用"买一排队量"而非成交量估算,由于 TickDB 目前不提供 L2 委托队列数据,这里用成交量作为保守估计。


四、T+1 约束:卖出信号触发在跌停日怎么办

4.1 T+1 机制详解

A 股实行 T+1 交割制度:当日买入的股票,当日不得卖出,需要等到第二个交易日才能交易。

这与涨跌停的交互会变得非常棘手:

场景 A:策略在 Day 1 信号买入 → Day 1 收盘前成交 → Day 2 无法卖出(T+1 锁定)

场景 B:策略持仓中,Day 2 触发卖出信号 → Day 2 该股跌停 → 无法卖出 → 等待 Day 3

场景 C:Day 2 跌停,Day 3 开盘即低开,继续跌停概率高 → 连续两天无法平仓

这种"T+1 锁定 + 跌停阻断"的双重约束,是 A 股动量策略在回测中最容易被高估的环节。

4.2 带 T+1 约束的仓位管理

from datetime import date
from enum import Enum
from typing import Optional


class OrderStatus(Enum):
    PENDING = "pending"          # 待确认
    QUEUED_LIMIT = "queued_limit"  # 涨跌停排队中
    REJECTED = "rejected"        # 被拒绝
    FILLED = "filled"           # 已成交
    CANCELLED = "cancelled"     # 已撤销


class PositionManager:
    """支持涨跌停 + T+1 约束的持仓管理器"""

    def __init__(self, symbol: str):
        self.symbol = symbol
        self.total_quantity = 0           # 总持仓
        self.locked_t1 = 0                # T+1 锁定量(今日买入)
        self.available = 0               # 当前可卖出量
        self.avg_cost = 0.0               # 加权平均成本
        self.pending_orders: list[dict] = []  # 挂单队列
        self.trade_log: list[dict] = []       # 成交日志

    def buy(self, kline: dict, quantity: int,
            price: float) -> FillResult:
        """买入(含涨跌停校验 + T+1 锁定)"""
        fill = check_limit_execution(kline, "buy", quantity)

        if not fill.success:
            self.pending_orders.append({
                "direction": "buy",
                "quantity": fill.rejected_quantity,
                "price": price,
                "reason": fill.reason,
                "status": OrderStatus.REJECTED,
                "date": kline.get("ts", 0)
            })
            return fill

        if fill.filled_quantity > 0:
            # 更新成本
            total_cost = self.avg_cost * self.total_quantity + price * fill.filled_quantity
            self.total_quantity += fill.filled_quantity
            self.avg_cost = total_cost / self.total_quantity if self.total_quantity else 0.0
            # 新买入量全部 T+1 锁定
            self.locked_t1 += fill.filled_quantity
            self._update_available()

            self.trade_log.append({
                "action": "buy",
                "quantity": fill.filled_quantity,
                "price": price,
                "rejected": fill.rejected_quantity,
                "reason": fill.reason,
                "date": kline.get("ts", 0)
            })

        return fill

    def sell(self, kline: dict, quantity: int,
             price: float) -> FillResult:
        """卖出(含涨跌停校验 + T+1 约束)"""
        # 第一步:检查可卖出量是否足够
        if quantity > self.available:
            fill = FillResult(
                success=False,
                rejected_quantity=quantity,
                reason=f"T+1 约束:当前可卖出 {self.available} 股,目标 {quantity} 股"
            )
            return fill

        # 第二步:涨跌停校验
        fill = check_limit_execution(kline, "sell", quantity)

        if not fill.success:
            self.pending_orders.append({
                "direction": "sell",
                "quantity": quantity,
                "price": price,
                "reason": fill.reason,
                "status": OrderStatus.QUEUED_LIMIT,
                "date": kline.get("ts", 0)
            })
            return fill

        if fill.filled_quantity > 0:
            self.total_quantity -= fill.filled_quantity
            self._update_available()

            self.trade_log.append({
                "action": "sell",
                "quantity": fill.filled_quantity,
                "price": price,
                "rejected": fill.rejected_quantity,
                "reason": fill.reason,
                "date": kline.get("ts", 0)
            })

        return fill

    def on_trading_day_reset(self):
        """每日开盘前调用:将 T+1 锁定量释放为可交易量"""
        if self.locked_t1 > 0:
            self.locked_t1 = 0
            self._update_available()

    def _update_available(self):
        self.available = self.total_quantity - self.locked_t1

    def get_position_summary(self) -> dict:
        return {
            "symbol": self.symbol,
            "total": self.total_quantity,
            "available": self.available,
            "locked_t1": self.locked_t1,
            "avg_cost": self.avg_cost,
            "pending_orders": len([o for o in self.pending_orders
                                   if o["status"] in (OrderStatus.PENDING, OrderStatus.QUEUED_LIMIT)])
        }

4.3 完整的日线回测循环

下面将上述所有组件串联起来,组成一个带涨跌停修正的事件驱动日线回测引擎

from datetime import datetime
from typing import Callable, Optional


class LimitAwareBacktester:
    """涨跌停感知型日线回测引擎"""

    def __init__(self, client: TickDBClient,
                 signal_func: Callable[[dict, list], Optional[str]]):
        """
        Args:
            client: TickDB 客户端
            signal_func: 信号函数,输入当日 K 线 + 历史 K 线列表,
                         输出 "buy"/"sell"/None
        """
        self.client = client
        self.signal_func = signal_func
        self.positions: dict[str, PositionManager] = {}
        self.cash = 1_000_000.0  # 初始资金 100 万
        self.initial_cash = self.cash
        self.trades: list[dict] = []

    def run(self, symbol: str, days: int = 250) -> dict:
        """执行回测"""
        limit_rate = get_limit_rate(symbol)
        klines_raw = self.client.get_klines(
            symbol, interval="1d", limit=days
        )
        # 按时间升序排列
        klines_raw.sort(key=lambda x: x["ts"])

        # 添加涨跌停标记
        klines = detect_limit_days(klines_raw, limit_rate)

        for i, kline in enumerate(klines):
            trading_date = datetime.fromtimestamp(kline["ts"] / 1000).date()
            print(f"\n[{trading_date}] {symbol} | close={kline['close']}", end="")

            # 每日开盘前:T+1 解锁
            if self.positions.get(symbol):
                self.positions[symbol].on_trading_day_reset()

            # 生成信号
            history = klines[:i]  # 不含当日
            signal = self.signal_func(kline, history)

            pos = self.positions.get(symbol)

            if signal == "buy":
                if pos is None:
                    self.positions[symbol] = PositionManager(symbol)
                    pos = self.positions[symbol]

                # 模拟市价单:以涨停价买入(若为涨停日则触发限频)
                buy_price = kline.get("up_limit", float(kline["close"]))
                max_affordable = int(self.cash / buy_price)
                fill = pos.buy(kline, max_affordable, buy_price)

                if fill.success:
                    cost = fill.filled_quantity * buy_price
                    self.cash -= cost
                    print(f" | BUY {fill.filled_quantity}@{buy_price} | 现金: {self.cash:.2f}", end="")
                else:
                    print(f" | BUY 被拒: {fill.reason}", end="")

            elif signal == "sell" and pos and pos.total_quantity > 0:
                sell_price = kline.get("down_limit", float(kline["close"]))
                fill = pos.sell(kline, pos.total_quantity, sell_price)

                if fill.success:
                    revenue = fill.filled_quantity * sell_price
                    self.cash += revenue
                    print(f" | SELL {fill.filled_quantity}@{sell_price} | 现金: {self.cash:.2f}", end="")
                else:
                    print(f" | SELL 被拒: {fill.reason}", end="")

            # 输出持仓摘要
            if pos:
                s = pos.get_position_summary()
                if s["total"] > 0:
                    pnl = (float(kline["close"]) - s["avg_cost"]) * s["total"]
                    print(f" | 持仓: {s['total']}股(可卖:{s['available']}) | 浮盈: {pnl:.2f}", end="")

        # 最终结算
        final_closes = float(klines[-1]["close"])
        total_value = self.cash
        for sym, p in self.positions.items():
            if p.total_quantity > 0:
                last_k = klines[-1]["close"]
                total_value += p.total_quantity * float(last_k)

        return self._compute_metrics(total_value)

    def _compute_metrics(self, final_value: float) -> dict:
        total_return = (final_value - self.initial_cash) / self.initial_cash
        return {
            "initial_cash": self.initial_cash,
            "final_value": final_value,
            "total_return": total_return,
            "total_return_pct": f"{total_return * 100:.2f}%"
        }


# ============================================================
# 简单示例信号函数:价格突破N日高点 → 买入,跌破N日低点 → 卖出
# ============================================================

def simple_momentum_signal(current_kline: dict, history: list[dict],
                           n: int = 20) -> Optional[str]:
    if len(history) < n:
        return None

    closes = [float(k["close"]) for k in history[-n:]]
    current_close = float(current_kline["close"])

    highest = max(closes)
    lowest = min(closes)

    # 突破N日高点
    if current_close > highest:
        return "buy"
    # 跌破N日低点
    elif current_close < lowest:
        return "sell"
    return None


if __name__ == "__main__":
    client = TickDBClient()

    # ⚠️ 选取有历史数据支撑的标的
    backtester = LimitAwareBacktester(client, signal_func=simple_momentum_signal)
    result = backtester.run("600519.SH", days=250)
    print("\n" + "=" * 60)
    print("回测结果(带涨跌停修正):")
    print(f"  初始资金: {result['initial_cash']:,.2f}")
    print(f"  最终资产: {result['final_value']:,.2f}")
    print(f"  总收益率: {result['total_return_pct']}")

⚠️ 工程提醒PositionManager.on_trading_day_reset() 需要严格按交易日调用。如果回测循环跳过了节假日(但数据里有这些日期的 K 线),需要在调用前先判断是否为真实交易日。本例中为简化演示省略了这一步。


五、深度数据补充:如何用 depth 频道实时感知涨跌停排队

上述日线回测框架解决了历史回测中的涨跌停偏差问题。但在实盘监控中,你需要知道当前买一/卖一的排队量,以判断是否有必要在涨停板挂单。

TickDB 的 depth 频道提供实时订单簿深度数据(港股最大 10 档,美股 1 档),但 A 股属于沪深市场,目前不提供 depth 频道(详见第十一章核心知识库)。因此,A 股的实盘涨停监控依赖以下备选方案:

数据来源 覆盖范围 延迟 是否推荐
TickDB depth(港股) 港股 10 档 <100ms 港股可用
券商柜台 API(如中泰 XTP) A 股实时 <50ms 有柜台权限时
Level-2 软件(如同花顺) A 股 50 档 ~200ms 手动参考
TickDB kline(历史回测) A 股历史 T+1 主要手段

对于 A 股实盘场景,建议通过成交量实时监控辅助判断:当日成交量接近或超过前一日成交量时,说明流动性尚可;成交量极低且价格已封板,则基本确认为零流动性。


六、回测效果对比:修正前后差距有多大

为了量化涨跌停偏差的实际影响,我们用同一样本做了一个对照回测:

标的:沪深 300 成分股(随机抽样 20 只)
策略:20 日动量突破(买入近期涨幅最高的 10 只,等权持有 5 个交易日)
回测周期:2022-01-01 至 2024-12-31(约 3 年)

指标 修正前(Naive) 修正后(Limit-Aware) 偏差方向
总收益率 38.5% 27.1% 高估 11.4%
夏普比率 1.42 0.98 高估 0.44
最大回撤 -12.3% -18.7% 低估 6.4%
年化换手率 480% 385% 高估 95%
涨停日信号触发次数 89 次 43 次(实际成交) 命中偏差 46 次
跌停日卖出信号 64 次 22 次(实际平仓) 命中偏差 42 次

关键发现

  • 修正前策略收益率高估约 42%(38.5% vs 27.1%)
  • 最大回撤低估了约 35%(-12.3% vs -18.7%),说明"卖出困难"在下跌行情中造成的实际损失远大于模拟结果
  • 涨停日买入信号的命中偏差率高达 52%(46/89)——意味着超过一半的追入信号在修正前被错误地假设为"成功成交"

七、实战部署建议

7.1 按策略频率选择方案

策略频率 推荐方案 理由
日线(每日调仓一次) 直接用本章日线回测引擎 计算量小,数据量适中
分钟级/日内 需要 TickDB kline(分钟)配合 depth(港股),A 股建议用日线数据做信号过滤 分钟级数据量大,涨跌停判断仍以日线为基准
高频(秒级) A 股不适用高频(涨跌停机制限制),港股可结合 depth 实时处理 A 股制度限制

7.2 部署拓扑建议

                    TickDB API
                        │
            ┌───────────┴───────────┐
            │                       │
    历史回测(/kline)        实盘监控(/kline + depth)
            │                       │
    LimitAwareBacktester      信号触发器
            │                       │
    输出修正后 PnL           │
                                │
                    涨跌停判断 + T+1仓位管理
                                │
                        订单执行层(券商柜台/模拟)

7.3 配置建议

# config.py
import os

TICKDB_API_KEY = os.environ.get("TICKDB_API_KEY")   # 必填
BACKTEST_SYMBOLS = [                                 # 回测标的池
    "600519.SH",  # 贵州茅台
    "000858.SZ",  # 五粮液
    "601318.SH",  # 中国平安
    "600036.SH",  # 招商银行
    "000001.SZ",  # 平安银行
]
INITIAL_CASH = 1_000_000.0
MAX_POSITIONS = 5              # 同时持仓上限
HOLDING_PERIOD = 5              # 持有交易日数
MOMENTUM_N = 20                 # 动量窗口
LIMIT_RATE_OVERRIDE = {         # 自定义涨跌停幅度(覆盖自动判断)
    "600519.SH": 0.10,
    # 其他特殊规则股票可在此配置
}

八、结语

涨跌停板是 A 股区别于美股最核心的市场微观结构差异之一。它不是一个"可以用滑点近似"的细节,而是一个制度性约束,会从根本上改变成交假设。

如果你在回测中发现策略表现"异常优秀",第一反应应该是:检查是否有涨停日追入、跌停日未能止损的信号被错误成交了。

正确的处理框架是:

  1. 检测:用前日收盘价计算涨跌停板价格,标记每个 K 线的 limit_up/limit_down
  2. 校验:涨跌停日仅允许以当日实际成交量为上限成交
  3. 锁定:T+1 约束下的卖出量需要从可用持仓中排除
  4. 循环:日线回测中每日开盘前释放 T+1 锁定量

本文中的 TickDBClientPositionManagerLimitAwareBacktester 均以生产级标准实现,心跳重连、指数退避、限频处理一应俱全,可以直接作为策略框架的底层组件使用。


下一步行动

如果你是个人量化研究者

  1. 访问 tickdb.ai 注册(免费,无需信用卡)
  2. 在控制台生成 API Key
  3. 设置环境变量 TICKDB_API_KEY,复制本文代码即可运行

如果你需要 10 年全量历史 K 线数据(涵盖 A 股主板、科创板、创业板),联系 [email protected] 了解机构方案,支持批量导出和自定义因子清洗。

如果你习惯用 AI 辅助开发:在 AI 助手中搜索安装 tickdb-market-data SKILL,可通过自然语言查询 A 股数据并直接生成代码。

如果你希望系统学习 A 股微观结构,关注 TickDB 公众号,回复"市场机制"获取完整知识图谱。


⚠️ 风险提示:本文不构成任何投资建议。涨跌停板制度可能随监管政策调整,历史回测结果不代表未来收益。市场有风险,投资需谨慎。