从 tick 到 K 线:价格是如何被"压缩"的

你盯着屏幕上跳动的数字——每一笔成交的单子,都在改变着市场的轨迹。但当你打开交易终端,看到的却是一根根整齐的 K 线:开盘价、最高价、最低价、收盘价。

这中间发生了什么?

那些毫秒级别的微观事件,如何被压缩成你每天看到的"一根线"?为什么同一个市场、同一个时间段,你和他画出来的 K 线可能不一样?

这不是一个无聊的技术细节。聚合规则的选择,直接决定了你的策略信号从哪里开始、到哪里结束。 一个在"标准"K 线上看起来完美的策略,可能因为边界对齐方式的不同,在实盘中亏损。

本文拆解这个被大多数教程一笔带过的底层问题。


一、问题的本质:信息压缩的代价

1.1 tick 数据的原始形态

每一笔成交 tick,本质上是一个五元组:

(time, price, volume, side, venue)
  • time:精确到毫秒甚至微秒的时间戳
  • price:成交价格
  • volume:成交量
  • side:主动买入还是主动卖出(撮合方向)
  • venue:交易所代码

一个活跃股票每天产生数十万到数百万条 tick。处理全量 tick 数据需要相当的算力和存储成本。

K 线,是对这个信息流的压缩。

1.2 压缩的数学本质

从 N 个 tick 聚合到 1 根 K 线,是一个信息损失过程:

原始数据 压缩后
N 个精确时间戳 1 个时间边界
N 个价格 4 个价格(OHLC)
N 个成交量 1 个成交量
N 个方向信息 全部丢弃

不同的聚合规则,决定了"损失哪些信息、保留哪些信息"。

这就像把一部高清电影压缩成一帧截图——你选择截哪一帧,决定了观众看到什么。


二、三种主流聚合规则:技术细节与陷阱

2.1 固定时间法(Fixed Time Boundaries)

这是最常见的聚合方式。将时间轴按固定周期切分:

Period 1: [00:00.000 - 00:59.999] → K线 1
Period 2: [01:00.000 - 01:59.999] → K线 2
...

实现逻辑

from datetime import datetime, timedelta
from typing import List, Dict, Optional

class FixedTimeAggregator:
    """固定时间边界 K 线聚合器"""
    
    def __init__(self, interval_seconds: int = 60):
        self.interval = interval_seconds
        self.current_bar: Optional[Dict] = None
        self.current_bar_start: Optional[datetime] = None
    
    def _get_bar_time(self, tick_time: datetime) -> datetime:
        """计算 tick 所属的 K 线起始时间"""
        epoch = datetime(1970, 1, 1)
        total_seconds = (tick_time - epoch).total_seconds()
        bar_seconds = int(total_seconds // self.interval) * self.interval
        return epoch + timedelta(seconds=bar_seconds)
    
    def ingest_tick(self, tick: Dict) -> Optional[Dict]:
        """
        摄入单个 tick,返回收盘的 K 线(如果有)
        
        Args:
            tick: {"time": datetime, "price": float, "volume": float}
        Returns:
            如果当前 K 线结束,返回 K 线字典;否则返回 None
        """
        tick_time = tick["time"]
        bar_time = self._get_bar_time(tick_time)
        
        # 新 K 线开始
        if self.current_bar is None or bar_time > self.current_bar_start:
            # 返回上一个 K 线
            if self.current_bar is not None:
                finished_bar = self._close_bar()
                self.current_bar = self._init_bar(bar_time, tick)
                return finished_bar
            else:
                self.current_bar = self._init_bar(bar_time, tick)
                self.current_bar_start = bar_time
        
        # 更新当前 K 线
        self._update_bar(tick)
        return None
    
    def _init_bar(self, bar_time: datetime, tick: Dict) -> Dict:
        return {
            "start": bar_time,
            "open": tick["price"],
            "high": tick["price"],
            "low": tick["price"],
            "close": tick["price"],
            "volume": tick["volume"],
        }
    
    def _update_bar(self, tick: Dict):
        self.current_bar["high"] = max(self.current_bar["high"], tick["price"])
        self.current_bar["low"] = min(self.current_bar["low"], tick["price"])
        self.current_bar["close"] = tick["price"]
        self.current_bar["volume"] += tick["volume"]
    
    def _close_bar(self) -> Dict:
        self.current_bar["end"] = self.current_bar_start + timedelta(seconds=self.interval)
        return self.current_bar.copy()
    
    def flush(self) -> Optional[Dict]:
        """强制输出当前未收盘的 K 线"""
        if self.current_bar is not None:
            return self._close_bar()
        return None

固定时间法的特点

优点 缺点
实现简单,逻辑清晰 可能切断连续的价格波动
不同数据源对齐方便 边界 tick 的归属取决于精确到毫秒的切分
便于跨市场对比 实时计算时,K 线"未完成"状态持续存在

一个关键陷阱:假设某笔成交恰好发生在 09:30:00.000,它属于 9:30 的 K 线还是 9:29 的最后一笔?这取决于数据源的时间戳精度和服务器时钟配置。

2.2 滚动窗口法(Rolling Window)

滚动窗口不依赖固定的时间边界,而是从"第一笔成交"开始计算周期:

窗口 1: [第一笔 tick → 60秒后] → K线 1
窗口 2: [K线1结束后 → 60秒后] → K线 2
...

实现逻辑

from datetime import datetime, timedelta
from collections import deque

class RollingWindowAggregator:
    """滚动窗口 K 线聚合器"""
    
    def __init__(self, window_seconds: int = 60):
        self.window = window_seconds
        self.ticks: deque = deque()  # 存储窗口内的所有 tick
        self.window_start: Optional[datetime] = None
        self.current_bar: Optional[Dict] = None
    
    def ingest_tick(self, tick: Dict) -> List[Dict]:
        """
        摄入 tick,返回期间可能产生的多根 K 线
        """
        tick_time = tick["time"]
        closed_bars = []
        
        # 初始化第一个窗口
        if self.window_start is None:
            self.window_start = tick_time
            self.ticks.append(tick)
            self.current_bar = self._init_bar(tick_time, tick)
            return []
        
        # 判断是否需要开新窗口
        while (tick_time - self.window_start).total_seconds() >= self.window:
            # 关闭当前 K 线
            closed_bar = self._close_current_bar()
            closed_bars.append(closed_bar)
            
            # 开新窗口:从关闭的 K 线的下一笔 tick 开始
            self.window_start = tick_time
            self.ticks.clear()
            self.current_bar = self._init_bar(tick_time, tick)
        
        # 更新当前窗口
        self.ticks.append(tick)
        self._update_bar(tick)
        
        return closed_bars
    
    def _init_bar(self, bar_start: datetime, tick: Dict) -> Dict:
        return {
            "start": bar_start,
            "open": tick["price"],
            "high": tick["price"],
            "low": tick["price"],
            "close": tick["price"],
            "volume": tick["volume"],
        }
    
    def _update_bar(self, tick: Dict):
        self.current_bar["high"] = max(self.current_bar["high"], tick["price"])
        self.current_bar["low"] = min(self.current_bar["low"], tick["price"])
        self.current_bar["close"] = tick["price"]
        self.current_bar["volume"] += tick["volume"]
    
    def _close_current_bar(self) -> Dict:
        self.current_bar["end"] = self.window_start + timedelta(seconds=self.window)
        return self.current_bar.copy()
    
    def flush(self) -> Optional[Dict]:
        if self.current_bar is not None:
            return self._close_current_bar()
        return None

滚动窗口法的特点

优点 缺点
边界对齐更自然 不同数据源产生的 K 线可能不同步
避免"刚好切在高低点"的问题 实现复杂度更高
实时场景下 K 线状态更稳定 跨市场横向对比需要额外处理

2.3 成交量加权法(Volume-Based)

不以时间为维度,而以成交量为切分单位:

K线 1: 前 1000 股的总成交
K线 2: 接下来 1000 股的总成交

实现逻辑

class VolumeBarAggregator:
    """成交量加权 K 线聚合器"""
    
    def __init__(self, target_volume: float = 1000):
        self.target_volume = target_volume
        self.current_bar: Optional[Dict] = None
        self.bar_volume: float = 0
    
    def ingest_tick(self, tick: Dict) -> Optional[Dict]:
        """摄入 tick,返回可能产生的成交量加权 K 线"""
        tick_time = tick["time"]
        price = tick["price"]
        volume = tick["volume"]
        
        # 新 K 线开始
        if self.current_bar is None:
            self.current_bar = self._init_bar(tick_time, price, volume)
            self.bar_volume = volume
            return None
        
        # 累计成交量
        self.bar_volume += volume
        self.current_bar["volume"] = self.bar_volume
        
        # 更新价格信息
        self.current_bar["high"] = max(self.current_bar["high"], price)
        self.current_bar["low"] = min(self.current_bar["low"], price)
        self.current_bar["close"] = price
        
        # 判断是否达到目标成交量
        if self.bar_volume >= self.target_volume:
            closed_bar = self.current_bar.copy()
            closed_bar["end"] = tick_time
            closed_bar["vwap"] = self._calculate_vwap()
            self.current_bar = None
            self.bar_volume = 0
            return closed_bar
        
        return None
    
    def _init_bar(self, bar_start: datetime, price: float, volume: float) -> Dict:
        return {
            "start": bar_start,
            "open": price,
            "high": price,
            "low": price,
            "close": price,
            "volume": volume,
        }
    
    def _calculate_vwap(self) -> float:
        """计算成交量加权平均价格"""
        total_pv = sum(t["price"] * t["volume"] for t in self.ticks)
        return total_pv / self.bar_volume if self.bar_volume > 0 else 0
    
    def flush(self) -> Optional[Dict]:
        if self.current_bar is not None:
            closed_bar = self.current_bar.copy()
            closed_bar["end"] = self.current_bar.get("start")
            return closed_bar
        return None

三、三种规则的核心对比

维度 固定时间法 滚动窗口法 成交量加权法
切分依据 绝对时间边界 相对时间窗口 累积成交量
K 线时长 严格等长 可能不等长 不等长
边界可预测性 事前可知 事后才知 事后才知
跨市场对齐 容易 困难 困难
适合策略类型 均值回归、突破 趋势跟随、事件驱动 订单簿分析、机构策略
实盘复杂度

一个关键洞察:固定时间法的 K 线边界是"硬切"的,而滚动窗口的边界是"软切"的。这个差异在事件驱动策略中尤为关键。

想象一个场景:某重要数据在 09:30:00.500 发布——恰好在 9:30 整点 K 线的边界上:

  • 固定时间法09:30:00.000-09:30:59.999 这根 K 线可能完整记录了数据发布后的全部冲击
  • 滚动窗口法:如果前一根 K 线在 09:29:45.000 结束,那么数据发布会被新窗口的起始 tick 捕获

两种方式记录的都是"真实发生的价格",但它们对策略信号的触发时点完全不同。


四、边界 case:那些让 K 线"失真"的陷阱

4.1 零成交时间窗

市场并非每时每刻都有成交。在低流动性品种或盘前盘后时段,可能出现"空白 K 线":

时间 固定时间法输出 滚动窗口法输出
09:30:00 - 09:30:59 K 线 A(有成交) K 线 A(有成交)
09:31:00 - 09:31:59 K 线 B(无成交) 无输出(不生成空 K 线)
09:32:00 - 09:32:59 K 线 C(有成交) K 线 C(有成交)

处理方式取决于策略需求

  • 技术分析类策略:通常需要填充空 K 线(使用前一根收盘价)
  • 量化因子类策略:可能需要标记"无效 K 线",避免误用

4.2 非标准时间对齐

不同数据源的服务器时钟可能存在毫秒级差异。假设:

  • 数据源 A 的 tick 时间戳:09:30:00.000
  • 数据源 B 的 tick 时间戳:09:30:00.003

即使使用完全相同的聚合规则,两个数据源产生的 K 线也会有微小差异。这种差异在高频策略中可能累积成显著的交易成本。

4.3 历史回测 vs 实盘的边界差异

这是最容易被忽视的问题。

回测时:你通常使用已经"切好"的 K 线数据(来自 TickDB 或其他数据服务),K 线边界已经固定。

实盘时:你需要实时聚合 tick 数据,而实时产生的 K 线边界可能与回测时使用的边界不完全一致。

这意味着:你在回测中验证有效的策略,在实盘时可能因为 K 线边界对齐方式的不同,产生完全不同的信号。

解决方案

  1. 使用统一的数据源和聚合规则进行回测和实盘
  2. 在实盘代码中实现与回测相同的聚合逻辑
  3. 或者,直接使用提供实时 K 线聚合的数据服务

五、TickDB 的 K 线数据处理方式

如果你使用 TickDB 获取 K 线数据,有几个技术细节值得了解:

5.1 时间边界对齐

TickDB 的 /kline 接口返回的 K 线使用固定时间边界,时间戳对齐到 UTC 0 点开始的整分钟/整小时/整日。这确保了不同品种、不同日期的 K 线可以直接横向对比。

例如:获取 AAPL.US 的 1 小时 K 线
返回的时间戳会是 09:00:00, 10:00:00, 11:00:00...
而非 09:00:15, 10:00:22, 11:00:08...

5.2 OHLC 的计算规则

TickDB 的 K 线 OHLC 来自对应周期的聚合数据:

字段 定义
Open 周期内第一笔成交价格
High 周期内最高成交价格
Low 周期内最低成交价格
Close 周期内最后一笔成交价格
Volume 周期内总成交量

注意:这里的"成交价格"指的是撮合价格,与订单簿的报价(bid/ask)不同。如果需要分析订单簿深度的变化,应使用 depth 频道而非 K 线数据。

5.3 获取 K 线数据的代码示例

import os
import requests
import time

class TickDBKlineClient:
    """TickDB K 线数据客户端(生产级)"""
    
    def __init__(self, api_key: str = None):
        self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
        if not self.api_key:
            raise ValueError("请设置 TICKDB_API_KEY 环境变量")
        self.base_url = "https://api.tickdb.ai/v1"
        self.session = requests.Session()
        self.session.headers.update({"X-API-Key": self.api_key})
    
    def get_historical_klines(
        self,
        symbol: str,
        interval: str = "1h",
        limit: int = 100,
        start_time: int = None,
        end_time: int = None
    ) -> list:
        """
        获取历史 K 线数据
        
        Args:
            symbol: 交易品种,如 "BTC.USDT"
            interval: K 线周期,"1m", "5m", "1h", "1d"
            limit: 返回数量上限
            start_time: 起始时间戳(毫秒)
            end_time: 结束时间戳(毫秒)
        
        Returns:
            K 线列表,每根包含 [time, open, high, low, close, volume]
        """
        params = {
            "symbol": symbol,
            "interval": interval,
            "limit": limit,
        }
        if start_time:
            params["start"] = start_time
        if end_time:
            params["end"] = end_time
        
        max_retries = 3
        retry_count = 0
        
        while retry_count < max_retries:
            try:
                response = self.session.get(
                    f"{self.base_url}/market/kline",
                    params=params,
                    timeout=(3.05, 10)  # (connect_timeout, read_timeout)
                )
                
                if response.status_code == 200:
                    data = response.json()
                    if data.get("code") == 0:
                        return data.get("data", [])
                    elif data.get("code") == 3001:
                        # 限频处理
                        retry_after = int(response.headers.get("Retry-After", 5))
                        print(f"触发限频,等待 {retry_after} 秒后重试...")
                        time.sleep(retry_after)
                        retry_count += 1
                        continue
                    else:
                        raise RuntimeError(f"API 错误: {data.get('message')}")
                else:
                    raise RuntimeError(f"HTTP 错误: {response.status_code}")
                    
            except requests.exceptions.Timeout:
                retry_count += 1
                if retry_count >= max_retries:
                    raise RuntimeError("请求超时,已达最大重试次数")
                # 指数退避
                delay = min(2 ** retry_count, 8)
                time.sleep(delay)
                
        raise RuntimeError("获取 K 线数据失败")
    
    def get_latest_kline(self, symbol: str, interval: str = "1h") -> dict:
        """获取当前未收盘的 K 线"""
        response = self.session.get(
            f"{self.base_url}/market/kline/latest",
            params={"symbol": symbol, "interval": interval},
            timeout=(3.05, 10)
        )
        data = response.json()
        if data.get("code") == 0:
            return data.get("data")
        raise RuntimeError(f"获取最新 K 线失败: {data.get('message')}")

六、实战建议:如何选择聚合规则

6.1 按策略类型选择

策略类型 推荐聚合规则 原因
趋势跟随 滚动窗口法 避免边界切在趋势启动点
均值回归 固定时间法 边界可预测,便于计算指标
事件驱动 滚动窗口法 事件不被边界切断
高频剥头皮 成交量加权法 捕捉订单流密度变化
机器学习特征 多种规则对比 特征工程需要多角度采样

6.2 按数据频率选择

数据频率 推荐聚合规则 注意事项
日线级别 固定时间法 时区问题可能影响边界
小时级别 固定时间法 跨交易所数据对齐较容易
分钟级别 滚动窗口法 避免关键时点被切断
秒级/tick级 成交量加权法 需要专业订单簿数据支持

6.3 一个被验证有效的实践

回测与实盘使用同一套聚合逻辑

不要在回测时用一种聚合规则、实盘时用另一种。哪怕你使用的是 TickDB 这类专业数据服务,也建议在本地实现一层聚合封装,确保回测和实盘的行为完全一致。

# 统一封装示例
class UnifiedAggregator:
    """
    统一聚合器:同时支持回测和实盘场景
    回测时传入历史 tick 数据
    实盘时实时摄入 tick
    """
    
    def __init__(self, interval_seconds: int = 60, mode: str = "fixed"):
        """
        Args:
            interval_seconds: K 线周期(秒)
            mode: "fixed" 固定时间 / "rolling" 滚动窗口
        """
        if mode == "fixed":
            self.aggregator = FixedTimeAggregator(interval_seconds)
        else:
            self.aggregator = RollingWindowAggregator(interval_seconds)
    
    def backtest(self, historical_ticks: list) -> list:
        """回测模式:返回完整 K 线序列"""
        bars = []
        for tick in historical_ticks:
            bar = self.aggregator.ingest_tick(tick)
            if bar:
                bars.append(bar)
        last_bar = self.aggregator.flush()
        if last_bar:
            bars.append(last_bar)
        return bars
    
    def live(self, tick: dict) -> Optional[dict]:
        """实盘模式:返回可能收盘的 K 线"""
        return self.aggregator.ingest_tick(tick)

结语

K 线是市场价格的"压缩快照"。理解这个压缩过程,不是为了成为学术专家,而是为了在正确的层级使用正确的数据

当你知道一根 5 分钟 K 线是如何从数百个 tick 中生成,你就能理解:

  • 为什么回测盈利,实盘可能亏损
  • 为什么两个数据源的 K 线会有细微差异
  • 为什么你的止损单"刚好"被扫了

价格是结果,订单簿是原因。K 线,是连接两者的桥梁。


下一步行动

如果你刚接触量化交易
推荐从固定时间法开始,先跑通"获取数据—计算指标—生成信号—回测验证"的完整流程,再根据策略需求考虑其他聚合方式。

如果你在构建实时交易系统
建议在本地实现与回测相同的聚合逻辑,确保回测和实盘的一致性。或者选择 TickDB 这样提供统一数据聚合服务的平台,减少你自己维护聚合逻辑的复杂度。

如果你在研究市场微观结构
建议深入了解 tick 数据本身的特性——成交间隔、成交量分布、主动买卖比例——这些信息在 K 线层面会被压缩丢失,但在 tick 层面保留完整。


本文不构成任何投资建议。市场有风险,投资需谨慎。