从 tick 到 K 线:价格是如何被"压缩"的

凌晨 3:17,你在复盘软件上随手画了一根阳线。开盘价 150.23,最高价 151.80,最低价 149.95,收盘价 151.45——四个数字,勾勒出这一分钟市场的全部面貌。

但这根 K 线背后发生了什么?

在这一分钟里,可能有 47 笔成交,价格在 149.95 到 151.80 之间跳动了 23 次,其中有一笔 5000 股的大单在 151.20 成交,还有一笔 100 股的散户单在 149.98 成交。这些信息被"压缩"成四个数字,写进你的图表。

这就是 K 线聚合(Aggregation)的本质:信息的有损压缩。

理解这个压缩过程,不是学术游戏。2012 年 8 月 22 日,Knight Capital 的算法因为一个交易所规则更新,在 45 分钟内亏损 4.4 亿美元——那个更新的本质,就是改变了某些债券报价的聚合方式。当你知道数据是怎么来的,才能知道它"丢失"了什么,以及这种丢失会在什么时候让你的策略失效。

本文拆解 tick 到 K 线的完整聚合链路,涵盖时间边界对齐规则、OHLC 计算逻辑、主流数据源的规则差异,以及生产级的 Python 实现。


一、tick 数据的本质

在讨论聚合之前,必须先理解被聚合的对象——tick。

tick 是市场数据的最小单位,指一笔成交或一次报价更新。在 Level 1 行情中,tick 通常指成交(Trade);在 Level 2 行情中,tick 还包括逐档的买卖盘变化。

一个标准的美股成交 tick 包含以下字段:

@dataclass
class Tick:
    symbol: str           # 标的代码,如 "AAPL.US"
    timestamp: int        # 毫秒级 Unix 时间戳
    price: float           # 成交价格
    volume: int            # 成交量(股数)
    direction: str         # "buy" 或 "sell"(T+n 显示主动成交方向)
    exchange: str          # 交易所代码,如 "NSDQ", "NYSE"
    condition: str         # 成交条件代码,如 " @", "OPG", "CLS"

实际的数据源通常还会附加买卖盘快照(Bid/Ask),用于分析订单簿与成交的关联。

关键事实:tick 数据是离散的、带时间戳的原始事件流。同一时间戳可能有多笔 tick(如跨交易所成交),同一价格也可能在短时间内多次成交。K 线聚合的任务,就是把这个事件流转换为周期性的统计摘要。


二、聚合的核心问题:时间边界对齐

从 tick 到 K 线的第一步,是确定时间边界——这一分钟的 K 线,究竟"包含"哪些 tick?

这里存在三种主流的对齐规则:

2.1 固定时间对齐(Fixed Interval)

以整点时刻为边界,强制切分。比如 09:30:00 到 09:31:00 之间的所有 tick,都归入这一分钟的 K 线。

时间轴:|---09:30:00---|---09:30:01---|...|---09:31:00---|
K线边界:     ↑              ↑                    ↑
             |←──── 09:30 K 线  ───→|←── 09:31 K 线 →|

规则:无论区间内有多少笔 tick,边界是固定的。

优点:计算简单,不同数据源之间可复现。

缺点:无法反映实际交易的连续性——美股盘前(04:00-09:30)和盘中(09:30-16:00)的 tick 被同等对待,而这两个时段的流动性差异高达数十倍。

2.2 滚动窗口对齐(Rolling Window)

以第一笔 tick 的时间戳为锚点,之后的 tick 依次填入,直到窗口"满"为止。

第一笔 tick 发生在 09:30:03
窗口 1:09:30:03 起的 60 秒 → 包含 09:30:03 到 09:31:02 的所有 tick
窗口 2:09:31:02 起的 60 秒 → 包含 09:31:02 到 09:32:01 的所有 tick

规则:边界跟随数据流"滚动",窗口之间可能重叠或存在间隙。

优点:能更真实反映市场流动性的时间分布。

缺点:同一时间点可能属于两个不同的窗口,跨数据源无法精确对齐。

2.3 交易所对齐(Exchange-Aligned)

以交易所发布的"交易时点"为边界,通常用于处理特殊时段。

  • 开盘价(OPG):以 09:30:00 的第一个成交价或报价作为开盘价
  • 收盘价(CLS):以 16:00:00 的最后一个成交价作为收盘价
  • 盘中拍卖:某些交易所会在特定时点进行集合竞价,竞价结果作为该时点的"伪 tick"

规则:用交易所公布的标准化价格锚定边界。

优点:与交易所公布的 OHLC 完全一致,便于对账。

缺点:如果交易所规则变更,历史的聚合结果可能与当时的实际价格不一致。


三、OHLC 的计算逻辑

确定时间边界后,需要把区间内的 tick 聚合为四个统计量:O(Open)、H(High)、L(Low)、C(Close)

3.1 标准 OHLC 计算

def compute_ohlc(ticks: list[Tick]) -> dict:
    """标准 OHLC 聚合计算"""
    if not ticks:
        return None
    
    prices = [t.price for t in ticks]
    
    return {
        "open": ticks[0].price,        # 区间第一笔成交价
        "high": max(prices),           # 区间最高成交价
        "low": min(prices),            # 区间最低成交价
        "close": ticks[-1].price,      # 区间最后一笔成交价
        "volume": sum(t.volume for t in ticks),
        "tick_count": len(ticks),
        "start_time": ticks[0].timestamp,
        "end_time": ticks[-1].timestamp,
    }

这段代码看起来直观,但有三个隐藏的陷阱。

3.2 陷阱一:收盘价不等于最后成交价

在固定时间对齐下,最后一笔 tick 的时间戳可能早于 09:31:00,因为 09:30:00-09:31:00 这个区间内,最后一笔 tick 发生在 09:30:47。

此时,close 字段实际上是"当前可见的最后成交价",而非"09:31:00 时刻的市场价格"。

实战影响:在盘口变化剧烈的时段(如财报发布瞬间),这可能导致 K 线收盘价与实际行情出现数秒的滞后。

3.3 陷阱二:高低价可能被单笔极端价主导

考虑这个场景:

09:30:01  成交 100 股 @ 150.00
09:30:02  成交 1 股 @ 999.99  ← 错误报价/测试单
09:30:03  成交 200 股 @ 150.05
...
09:30:58  成交 50 股 @ 151.80

按照标准算法,high = 999.99。但这笔 999.99 的成交只有 1 股,可能是错误报价或做市商的测试单。用它代表这一分钟的价格波动范围,显然失真。

解决方案:加入成交量过滤或价格偏离阈值。例如,当某笔成交价格偏离成交量加权均价(WAP)超过 5% 时,标记为异常 tick 或降权处理。

3.4 陷阱三:成交量加权均价(VWAP)与 OHLC 的混淆

VWAP 是另一个常见的聚合指标,定义为:

VWAP = Σ(price × volume) / Σ(volume)

OHLC 中的 High/Low 是价格极值,VWAP 是成交量加权均价,两者是完全不同的统计量。 许多新手会把 VWAP 的"最高点"理解为最高价,这是错误的。

指标 计算方式 含义
High max(price) 区间内触及的最高价格
Low min(price) 区间内触及的最低价格
VWAP Σ(p×v) / Σ(v) 按成交量加权的平均成本
TWAP avg(price) 等权重时间平均价

四、主流数据源的规则差异

理解了原理,再来看实际的数据供应商是如何处理这些规则差异的。

4.1 美股三大数据源的对比

数据源 时间对齐规则 特殊处理 已知差异点
TickDB 固定时间对齐(Unix 整点) 提供 1m/5m/15m/1h/1D 等标准周期;1D K 线按交易日切割(非日历日) 缺失 tick 级逐笔成交(trades 接口不支持美股/A 股)
Polygon.io 固定时间对齐 + 交易所时间戳 支持毫秒级精度;提供"实时"与"调整后"两个版本 未调整的分拆/股息数据可能导致历史 K 线断裂
Alpaca 固定时间对齐(UTC) 仅支持分钟级聚合,无小时/日线原生接口 历史 K 线从 2022 年开始,数据深度有限

4.2 港股与 A 股的特殊规则

港股实行交易时段分段制

  • 上午盘:09:30-12:00
  • 下午盘:13:00-16:00

如果用固定时间对齐,12:00-13:00 这个"午休一小时"会被错误地包含在 K 线聚合中。正确的处理方式是按会话(Session)对齐,分别聚合上午盘和下午盘的 K 线。

A股则有涨跌停制度,当股价触及涨跌停板时,后续成交会骤降甚至归零。此时 OHLC 可能出现同一价格连续多根 K 线不变的情况——这是制度约束,不是数据错误。

4.3 数字货币:24/7 无休的特殊性

加密货币交易所(如 Binance)没有休市概念,K 线边界按 UTC 时间固定切分。

一个常见的坑:Binance API 提供的 K 线数据,close 时间戳是区间结束时间,而非区间开始时间。而某些绘图库默认把时间戳当作"开始时间",导致图表向右偏移一个周期。

# Binance K 线的时间戳含义
kline = {
    "open_time": 1700000000000,  # 毫秒时间戳,表示这个 K 线覆盖 [open_time, close_time)
    "close_time": 1700000060000, # 毫秒时间戳
    "open": "42000.00",
    "close": "42150.00",
}

五、生产级聚合引擎实现

理解了规则差异后,来实现一个生产级的 tick-to-K 线聚合引擎。

import os
import time
import json
import hmac
import hashlib
import requests
from datetime import datetime, timezone
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Optional
import random


@dataclass
class Tick:
    """行情 tick 数据结构"""
    symbol: str
    timestamp: int       # 毫秒级 Unix 时间戳
    price: float
    volume: int
    side: str            # "buy" | "sell"(主动成交方向)


@dataclass
class Kline:
    """K 线数据结构"""
    symbol: str
    interval: str        # "1m", "5m", "1h", "1d"
    open_time: int       # K 线开始时间(毫秒)
    close_time: int      # K 线结束时间(毫秒)
    open: float
    high: float
    low: float
    close: float
    volume: int
    tick_count: int


class TickAggregator:
    """
    Tick 到 K 线聚合器
    
    支持:
    - 多种时间对齐规则(固定时间对齐、滚动窗口、成交量加权)
    - 异常 tick 过滤(价格偏离、WAP 抖动)
    - 限频自适应(识别 API 限流并等待)
    """
    
    def __init__(
        self,
        symbol: str,
        interval: str = "1m",
        alignment_rule: str = "fixed",
        price_deviation_threshold: float = 0.05,
        api_key: Optional[str] = None,
        api_secret: Optional[str] = None,
    ):
        self.symbol = symbol
        self.interval = interval
        self.alignment_rule = alignment_rule
        self.price_deviation_threshold = price_deviation_threshold
        
        # 从环境变量读取 API 凭证
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("API Key 未设置,请设置环境变量 TICKDB_API_KEY")
        
        self.api_secret = api_secret
        self.base_url = "https://api.tickdb.ai/v1"
        
        # 聚合状态
        self.pending_ticks: list[Tick] = []
        self.current_kline: Optional[Kline] = None
        self.last_flush_time: int = 0
        
        # 限频状态
        self.retry_count = 0
        self.max_retries = 3
        self.base_delay = 1.0
    
    def _interval_to_ms(self, interval: str) -> int:
        """将时间周期转换为毫秒数"""
        mapping = {
            "1m": 60_000,
            "5m": 300_000,
            "15m": 900_000,
            "30m": 1_800_000,
            "1h": 3_600_000,
            "4h": 14_400_000,
            "1d": 86_400_000,
        }
        if interval not in mapping:
            raise ValueError(f"不支持的时间周期: {interval}")
        return mapping[interval]
    
    def _align_timestamp(self, timestamp: int) -> int:
        """
        时间边界对齐
        
        Args:
            timestamp: 原始时间戳(毫秒)
        
        Returns:
            对齐后的 K 线开始时间戳
        """
        interval_ms = self._interval_to_ms(self.interval)
        
        if self.alignment_rule == "fixed":
            # 固定时间对齐:向下去整
            return (timestamp // interval_ms) * interval_ms
        
        elif self.alignment_rule == "rolling":
            # 滚动窗口:跟随第一笔 tick 滚动
            if self.last_flush_time == 0:
                self.last_flush_time = timestamp
            return self.last_flush_time
        
        else:
            raise ValueError(f"不支持的对齐规则: {self.alignment_rule}")
    
    def _compute_wap(self, ticks: list[Tick]) -> float:
        """计算成交量加权均价"""
        if not ticks:
            return 0.0
        total_pv = sum(t.price * t.volume for t in ticks)
        total_vol = sum(t.volume for t in ticks)
        return total_pv / total_vol if total_vol > 0 else 0.0
    
    def _is_anomaly_tick(self, tick: Tick, wap: float) -> bool:
        """检测异常 tick(价格偏离 WAP 超过阈值)"""
        if wap == 0:
            return False
        deviation = abs(tick.price - wap) / wap
        return deviation > self.price_deviation_threshold
    
    def add_tick(self, tick: Tick) -> Optional[Kline]:
        """
        添加一笔 tick,推进聚合逻辑
        
        Returns:
            如果触发了 K 线闭合,返回已完成的 Kline;否则返回 None
        """
        # 计算当前 tick 应归属的 K 线时间边界
        kline_start = self._align_timestamp(tick.timestamp)
        
        # 检查是否需要闭合当前 K 线
        if self.current_kline is not None:
            if self.current_kline.open_time != kline_start:
                # 时间边界变化,闭合当前 K 线
                completed_kline = self._flush_current_kline()
                self.pending_ticks = [tick]
                self._init_new_kline(kline_start)
                return completed_kline
        
        # 如果还没有初始化 K 线,则初始化
        if self.current_kline is None:
            self._init_new_kline(kline_start)
        
        self.pending_ticks.append(tick)
        
        # 更新 K 线状态
        self.current_kline.high = max(self.current_kline.high, tick.price)
        self.current_kline.low = min(self.current_kline.low, tick.price)
        self.current_kline.close = tick.price
        self.current_kline.volume += tick.volume
        self.current_kline.tick_count += 1
        
        return None
    
    def _init_new_kline(self, open_time: int):
        """初始化新的 K 线"""
        interval_ms = self._interval_to_ms(self.interval)
        self.current_kline = Kline(
            symbol=self.symbol,
            interval=self.interval,
            open_time=open_time,
            close_time=open_time + interval_ms - 1,
            open=0.0,
            high=0.0,
            low=float('inf'),
            close=0.0,
            volume=0,
            tick_count=0,
        )
        self.pending_ticks = []
    
    def _flush_current_kline(self) -> Optional[Kline]:
        """闭合并返回当前 K 线"""
        if self.current_kline is None:
            return None
        
        # 用 pending_ticks 计算 OHLC
        if not self.pending_ticks:
            return None
        
        wap = self._compute_wap(self.pending_ticks)
        
        # 过滤异常 tick(可选)
        valid_ticks = [
            t for t in self.pending_ticks
            if not self._is_anomaly_tick(t, wap)
        ]
        
        if valid_ticks:
            self.current_kline.open = valid_ticks[0].price
            self.current_kline.high = max(t.price for t in valid_ticks)
            self.current_kline.low = min(t.price for t in valid_ticks)
            self.current_kline.close = valid_ticks[-1].price
        
        # 重置高低价(防止 inf 值)
        if self.current_kline.high == 0.0:
            self.current_kline.high = self.current_kline.close
        if self.current_kline.low == float('inf'):
            self.current_kline.low = self.current_kline.close
        
        return self.current_kline
    
    def force_flush(self) -> Optional[Kline]:
        """强制闭合当前 K 线(用于实盘收尾或模拟结束)"""
        return self._flush_current_kline()

六、边界时间点的深度拆解

理解了聚合引擎的实现后,来看看三个高风险的边界场景。

6.1 财报发布瞬间的价格跳跃

财报发布通常发生在美东时间盘后(16:00-17:00),但实际的"盘后交易"价格发现是连续的。

时间节点 事件 OHLC 的表现
16:00:00.000 财报前最后一笔 tick 最后一根"正常"K 线的 close
16:00:00.123 财报发布 数据源可能在 100ms 内推送大量 tick
16:00:00.500 报价趋于稳定 新 K 线的 open 可能与上一根的 close 相差 5%-15%

问题:如果你的策略依赖"收盘价"判断方向,财报瞬间的 K 线边界切割可能导致你"追在最高点"——因为 16:00:00.000 之前的最后一笔成交和 16:00:00.500 的新成交被切割到了不同的 K 线。

建议:在财报、央行决议等高影响事件附近,使用 tick 级数据实时监控,而非等待 K 线收盘。

6.2 涨跌停板的 K 线表现

A股的涨跌停板制度会导致特殊的 K 线形态:

场景 正常 K 线 涨跌停 K 线
O-H-L-C 四个数字通常不同 可能出现 H=L=C(价格一字板)
成交量 与波动幅度正相关 可能接近零(封板后无成交)
下一根 K 线 方向不确定 大概率跳空高/低开

识别方法:当 high == low == close 且成交量异常低时,说明触及涨跌停。

6.3 美股特殊时段的 K 线切分

美股有四个特殊时段,每个时段的 K 线聚合规则不同:

时段 时间范围 K 线聚合规则
盘前 04:00-09:30 允许,但 tick 稀疏,K 线可能失真
盘中 09:30-16:00 标准聚合
盘后 16:00-20:00 允许,tick 密度低于盘中
夜盘(期货) 不定期 按交易所规则

最佳实践:在回测和实盘之间使用统一的时段过滤逻辑,排除盘前盘后或明确标记非标准时段的数据。


七、TickDB 的 K 线接口:从 API 到数据

如果你不想自己实现聚合引擎,可以使用 TickDB 提供的 /v1/market/kline 接口直接获取已聚合的 K 线数据。

import os
import time
import requests
from typing import Literal


class TickDBKlineClient:
    """
    TickDB K 线数据客户端(生产级)
    
    功能:
    - 获取历史 K 线数据用于回测
    - 限频自适应 + 指数退避重连
    - 支持 1m/5m/15m/1h/1d 等标准周期
    """
    
    def __init__(self, api_key: str = None):
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("请设置环境变量 TICKDB_API_KEY")
        
        self.base_url = "https://api.tickdb.ai/v1"
        self.session = requests.Session()
        self.session.headers.update({
            "X-API-Key": self.api_key,
            "Content-Type": "application/json",
        })
    
    def _request_with_retry(
        self,
        method: str,
        endpoint: str,
        params: dict = None,
        max_retries: int = 5,
    ) -> dict:
        """
        带指数退避重试的 HTTP 请求
        
        ⚠️ 生产环境高频调用建议使用 aiohttp 异步架构
        """
        delay = 1.0
        
        for attempt in range(max_retries):
            try:
                response = self.session.request(
                    method=method,
                    url=f"{self.base_url}{endpoint}",
                    params=params,
                    timeout=(3.05, 10),  # (connect_timeout, read_timeout)
                )
                
                # 限频处理
                if response.status_code == 429:
                    retry_after = int(response.headers.get("Retry-After", delay))
                    print(f"[限频] 等待 {retry_after} 秒后重试...")
                    time.sleep(retry_after)
                    delay = min(delay * 2, 60)  # 最大等待 60 秒
                    continue
                
                response.raise_for_status()
                return response.json()
                
            except requests.exceptions.Timeout:
                print(f"[超时] 第 {attempt + 1} 次重试...")
                delay *= 2
                delay += random.uniform(0, 0.1 * delay)  # 抖动
                time.sleep(delay)
                
            except requests.exceptions.RequestException as e:
                print(f"[请求错误] {e}")
                if attempt == max_retries - 1:
                    raise
                time.sleep(delay)
        
        raise RuntimeError(f"达到最大重试次数 ({max_retries})")
    
    def get_kline(
        self,
        symbol: str,
        interval: Literal["1m", "5m", "15m", "30m", "1h", "4h", "1d"] = "1m",
        start_time: int = None,
        end_time: int = None,
        limit: int = 1000,
    ) -> list[dict]:
        """
        获取历史 K 线数据
        
        Args:
            symbol: 标的代码,如 "AAPL.US"
            interval: K 线周期
            start_time: 开始时间(毫秒时间戳)
            end_time: 结束时间(毫秒时间戳)
            limit: 最大返回条数(最大 1000)
        
        Returns:
            K 线数据列表,按时间升序排列
        """
        params = {
            "symbol": symbol,
            "interval": interval,
            "limit": min(limit, 1000),
        }
        
        if start_time:
            params["start_time"] = start_time
        if end_time:
            params["end_time"] = end_time
        
        data = self._request_with_retry("GET", "/market/kline", params=params)
        
        return data.get("data", [])
    
    def get_latest_kline(
        self,
        symbol: str,
        interval: Literal["1m", "5m", "15m", "30m", "1h", "4h", "1d"] = "1m",
    ) -> dict:
        """
        获取最新一根 K 线
        
        ⚠️ 用于实时监控,不要用于回测(回测请用 get_kline)
        """
        params = {
            "symbol": symbol,
            "interval": interval,
        }
        
        data = self._request_with_retry("GET", "/market/kline/latest", params=params)
        
        return data.get("data")


# 使用示例
if __name__ == "__main__":
    client = TickDBKlineClient()
    
    # 获取苹果过去 100 根 1 分钟 K 线
    klines = client.get_kline(
        symbol="AAPL.US",
        interval="1m",
        limit=100,
    )
    
    print(f"获取到 {len(klines)} 根 K 线")
    for kline in klines[-5:]:
        print(
            f"时间: {datetime.fromtimestamp(kline['open_time']/1000, tz=timezone.utc)} | "
            f"O: {kline['open']:.2f} H: {kline['high']:.2f} "
            f"L: {kline['low']:.2f} C: {kline['close']:.2f} | "
            f"成交量: {kline['volume']:,}"
        )

八、数据压缩比:信息丢失了多少?

回到开篇的问题:K 线是 tick 的"有损压缩"。让我们量化这个压缩率。

以苹果(AAPL)某日的 1 分钟 K 线为例,估算信息密度:

指标 数值 说明
盘中 tick 总数(估算) ~50,000 笔/天 基于成交量和平均单笔规模估算
1 分钟 K 线数量 390 根 09:30-16:00,共 390 分钟
信息压缩比 128:1 50,000 / 390
压缩后数据点 390 × 5 = 1,950 个 OHLC + 成交量 + 时间
原始数据点 50,000 × 4 = 200,000 个 价格 + 成交量 + 时间 + 方向

结论:从 tick 到 1 分钟 K 线,信息被压缩了约 100 倍。丢失的信息包括:价格变动的微观时间序列、单笔成交的订单簿背景(是主动买还是主动卖)、成交的交易所来源等。

这不是缺陷,而是设计取舍:压缩是为了可读性和存储效率。理解这个压缩过程,才能知道在什么场景下需要用 tick 数据,什么场景下 K 线足够用。


结语

K 线是市场的"快照",不是市场的"全貌"。

当你理解了从 tick 到 K 线的聚合规则——固定时间对齐还是滚动窗口、异常 tick 怎么处理、时间边界怎么切割——你就掌握了量化交易的第一性原理:数据是被加工过的,加工方式会直接影响策略的可靠性。

2012 年的 Knight Capital 事件不是孤例。每一年的"闪崩"背后,几乎都有数据规则变更或聚合边界处理不当的影子。

下一次你看 K 线时,不妨多想一层:这根线"压缩"掉了什么?


下一步行动

如果你需要 tick 级数据自己做聚合
TickDB 提供港股、数字货币的逐笔成交(trades)接口,支持高精度的订单流分析。访问 tickdb.ai 注册,获取免费 API Key。

如果你只需要 K 线数据做策略回测
TickDB 的 /v1/market/kline 接口提供 10 年级别的美股历史 K 线数据,覆盖 6 大类资产。无需自己实现聚合引擎,直接用于回测。

如果你想了解更多市场微观结构
关注 TickDB 公众号,回复"微观结构"获取系列文章目录。


本文不构成任何投资建议。市场有风险,投资需谨慎。