价格的"快照":Tick 数据如何变成你看到的 K 线

凌晨三点,你盯着屏幕上的一根 1 分钟 K 线发呆——最高价 152.38,最低价 151.92,开盘和收盘几乎重合,成交量是昨天的三倍。

这根 K 线背后发生了什么?

也许是某家量化基金的算法在 151.95 到 152.20 之间来回扫了 47 笔单,每笔 200 股;也许是某个大户在收盘前集中抛了 8 万股;也许只是买卖盘在 152.00 附近僵持了 30 秒,没有人愿意先动手。

但这些细节,你从 K 线上完全看不到

这不是信息丢失——这是故意的抽象。K 线是一种压缩格式,就像 JPEG 对照片的压缩:保留轮廓,丢弃噪点。但如果你不知道压缩算法是怎么设计的,你可能会把 JPEG 的块效应当成真实纹理。

本文拆解从 tick 到 K 线的完整压缩链条,聚焦三个核心问题:聚合规则如何影响 OHLC 的计算结果?时间边界对齐有什么陷阱?以及你在使用 TickDB 历史 K 线数据时,需要理解哪些底层设计。


一、一切从 tick 开始

在讨论聚合之前,我们先明确输入数据的结构。

一个 tick 是市场上一次成交的最小记录单元。在 TickDB 的 trades 接口中,一条 tick 数据包含:

字段 说明 示例
symbol 交易品种 BTC.USDT
timestamp 成交时间(毫秒 Unix 时间戳) 1714060800123
price 成交价格 71245.50
volume 成交量 0.5234
side 成交方向(buy/sell) buy

这是最底层的原子数据。每一次撮合引擎完成一笔交易,就产生一条 tick。

一个关键事实:tick 数据是无结构的。它们不是"每秒钟恰好有一条",而是"每成交一次就有一条"。在流动性好的标的上,一秒钟可能产生上百条 tick;在流动性差的标的盘后,可能 30 秒才有一条。

这就是第一个需要理解的概念:K 线不是"采样"出来的,而是"聚合"出来的


二、OHLC 的计算逻辑:为什么总是这四个价格

K 线最常见的形态是 OHLC——Open(开盘价)、High(最高价)、Low(最低价)、Close(收盘价)。

你可能觉得这是约定俗成,但实际上这四个价格有明确的统计意义:

  • Open:周期起始时刻的价格。代表市场在"起点"的共识。
  • High:周期内的最大值。代表多方在这一时段能达到的最高进攻水位。
  • Low:周期内的最小值。代表空方在这一时段能达到的最高进攻水位。
  • Close:周期结束时刻的价格。代表市场在"终点"的共识。

这四个值能完整描述一个周期内的价格分布区间,同时保留起点和终点的状态。如果只给你最高价和最低价,你知道范围但不知道方向;如果只给你开盘和收盘,你知道起止但不知道日内振幅。

数学上,OHLC 是对一组价格序列 {p₁, p₂, ..., pₙ} 的四阶矩压缩:

Open  = p₁
Close = pₙ
High  = max(p₁, p₂, ..., pₙ)
Low   = min(p₁, p₂, ..., pₙ)

n 是该周期内的 tick 总数。


三、时间边界对齐:聚合规则的核心分歧

现在我们知道了输入(tick)和输出(OHLC),但还缺一个关键环节:时间边界在哪里?

这是最容易出错的地方。

3.1 三种主流对齐策略

对齐策略 规则 优点 缺点
固定时间边界 每 N 分钟从整点/整分开始 简单、对齐、可拼接 可能切断跨边界事件
浮动时间边界 从第一笔 tick 到达时开始计时 不丢失边界事件 难以跨周期拼接
交易所对齐 按交易所公布的交易时段切分 与交易所数据完全一致 不同时区处理复杂

TickDB 使用的是固定时间边界——以 UTC 时间为准,按固定周期对齐。

这个设计选择带来的直接影响:跨边界的成交会被切分

举例:假设有一笔大单在 09:00:45 触发,成交价从 150.10 扫到 150.25。如果这个大单恰好跨越了 09:00 和 09:01 的分钟边界,它会被切成两部分:一部分计入 09:00 这根 K 线,一部分计入 09:01 这根 K 线。

对于量化策略来说,这意味着:

  • 日内短线策略可能在边界附近出现"虚假信号"
  • 大单拆分信号可能无法被单根 K 线完整捕获

3.2 时区陷阱:夏令时的幽灵

这是另一个高频踩坑点。

美国市场使用东部时间(ET),在夏令时(DST)期间是 UTC-4,非夏令时是 UTC-5。但交易所的 tick 数据内部以UTC 存储,K 线聚合时也以 UTC 边界为基准。

如果你直接用"09:30 美东时间开盘"去匹配 TickDB 的数据,你会发现:

  • 夏令时期间:09:30 ET = 13:30 UTC
  • 非夏令时期间:09:30 ET = 14:30 UTC

如果你在代码里硬编码 09:30 作为起始时间,在非夏令时段,你的"开盘 K 线"实际上是从 14:30 UTC 开始的。

TickDB 的 K 线数据统一以 UTC 时间存储和返回。 使用时需要自行转换时区:

from datetime import datetime
import pytz

def utc_to_et(utc_timestamp_ms: int) -> datetime:
    """将 UTC 毫秒时间戳转换为美东时间"""
    utc_dt = datetime.fromtimestamp(utc_timestamp_ms / 1000, tz=pytz.UTC)
    et_tz = pytz.timezone('US/Eastern')
    return utc_dt.astimezone(et_tz)

def et_to_utc(et_dt: datetime) -> int:
    """将美东时间 datetime 转换为 UTC 毫秒时间戳"""
    et_tz = pytz.timezone('US/Eastern')
    et_aware = et_tz.localize(et_dt)
    return int(et_aware.timestamp() * 1000)

四、tick 聚合算法:从原理到代码

理解了时间边界问题后,我们来实现一个生产级的 tick 聚合器。

4.1 基础版本:固定周期聚合

import os
import time
import json
import random
import threading
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
import requests

# ============================================================
# TickDB 配置
# ============================================================
API_KEY = os.environ.get("TICKDB_API_KEY")
BASE_URL = "https://api.tickdb.ai"

# ⚠️ TickDB trades 接口不支持美股和 A 股,此处使用数字货币示例
SYMBOL = "BTC.USDT"

@dataclass
class OHLC:
    """K 线数据模型"""
    symbol: str
    open: float
    high: float
    low: float
    close: float
    volume: float
    timestamp: int  # K 线周期起始时间(UTC 毫秒)
    tick_count: int = 0
    
    def to_dict(self):
        return {
            "symbol": self.symbol,
            "open": self.open,
            "high": self.high,
            "low": self.low,
            "close": self.close,
            "volume": self.volume,
            "timestamp": self.timestamp,
            "tick_count": self.tick_count
        }


class TickAggregator:
    """
    Tick 到 K 线的聚合器
    
    设计要点:
    1. 固定时间边界(UTC)
    2. 非阻塞聚合(用于实盘)
    3. 指数退避重连
    """
    
    def __init__(self, symbol: str, interval_seconds: int = 60):
        self.symbol = symbol
        self.interval_ms = interval_seconds * 1000
        self.ws: Optional[requests.Session] = None
        self.running = False
        self._lock = threading.Lock()
        
        # 当前周期的聚合状态
        self._current_ohlc: Optional[OHLC] = None
        self._pending_ticks: list = []
        
        # 重连状态
        self._retry_count = 0
        self._max_retries = 10
        self._base_delay = 1.0
        self._max_delay = 60.0
    
    def _get_period_start(self, timestamp_ms: int) -> int:
        """计算时间戳所属周期的起始时间(UTC)"""
        return (timestamp_ms // self.interval_ms) * self.interval_ms
    
    def _process_tick(self, tick: dict):
        """处理单条 tick 数据"""
        tick_time = tick["timestamp"]
        tick_price = float(tick["price"])
        tick_volume = float(tick["volume"])
        
        period_start = self._get_period_start(tick_time)
        
        with self._lock:
            # 检查是否进入新周期
            if self._current_ohlc is None or period_start > self._current_ohlc.timestamp:
                # 保存当前 K 线(收盘)
                if self._current_ohlc is not None:
                    self._finalize_ohlc()
                
                # 创建新 K 线
                self._current_ohlc = OHLC(
                    symbol=self.symbol,
                    open=tick_price,
                    high=tick_price,
                    low=tick_price,
                    close=tick_price,
                    volume=tick_volume,
                    timestamp=period_start,
                    tick_count=1
                )
            else:
                # 更新当前 K 线
                self._current_ohlc.high = max(self._current_ohlc.high, tick_price)
                self._current_ohlc.low = min(self._current_ohlc.low, tick_price)
                self._current_ohlc.close = tick_price
                self._current_ohlc.volume += tick_volume
                self._current_ohlc.tick_count += 1
    
    def _finalize_ohlc(self):
        """输出完成的 K 线"""
        if self._current_ohlc:
            ohlc = self._current_ohlc
            dt = datetime.fromtimestamp(ohlc.timestamp / 1000, tz=timezone.utc)
            print(f"[{dt.isoformat()}] OHLC: O={ohlc.open:.2f} H={ohlc.high:.2f} "
                  f"L={ohlc.low:.2f} C={ohlc.close:.2f} V={ohlc.volume:.4f} "
                  f"Ticks={ohlc.tick_count}")
    
    def start(self):
        """启动聚合器(示例使用 REST polling,生产环境应使用 WebSocket)"""
        self.running = True
        self._retry_count = 0
        
        print(f"Starting aggregator for {self.symbol}, interval={self.interval_ms}ms")
        
        # ⚠️ 生产环境应使用 WebSocket,此处演示 REST polling 模式
        headers = {"X-API-Key": API_KEY}
        
        while self.running:
            try:
                # 获取最近成交记录
                # 注意:trades 接口的数据范围和精度可能受限
                response = requests.get(
                    f"{BASE_URL}/v1/market/trades",
                    headers=headers,
                    params={
                        "symbol": self.symbol,
                        "limit": 100
                    },
                    timeout=(3.05, 10)
                )
                
                if response.status_code == 429:
                    # 限频处理
                    retry_after = int(response.headers.get("Retry-After", 5))
                    print(f"Rate limited, waiting {retry_after}s")
                    time.sleep(retry_after)
                    continue
                
                response.raise_for_status()
                data = response.json()
                
                if data.get("code") == 0:
                    trades = data.get("data", [])
                    for tick in trades:
                        self._process_tick(tick)
                    
                    self._retry_count = 0  # 重置重试计数
                else:
                    print(f"API error: {data}")
                
                time.sleep(1)  # Polling 间隔
                
            except requests.exceptions.Timeout:
                print("Request timeout, retrying...")
                self._handle_reconnect()
            except requests.exceptions.RequestException as e:
                print(f"Connection error: {e}")
                self._handle_reconnect()
    
    def _handle_reconnect(self):
        """指数退避重连 + 抖动"""
        self._retry_count += 1
        if self._retry_count > self._max_retries:
            print("Max retries exceeded, stopping.")
            self.running = False
            return
        
        delay = min(self._base_delay * (2 ** self._retry_count), self._max_delay)
        jitter = random.uniform(0, delay * 0.1)  # 10% 抖动
        sleep_time = delay + jitter
        
        print(f"Reconnecting in {sleep_time:.2f}s (attempt {self._retry_count})")
        time.sleep(sleep_time)
    
    def stop(self):
        """停止聚合器"""
        self.running = False
        with self._lock:
            if self._current_ohlc:
                self._finalize_ohlc()
        print("Aggregator stopped.")


# 使用示例
if __name__ == "__main__":
    if not API_KEY:
        print("Warning: TICKDB_API_KEY not set")
    
    aggregator = TickAggregator(symbol=SYMBOL, interval_seconds=60)
    
    try:
        aggregator.start()
    except KeyboardInterrupt:
        aggregator.stop()

4.2 核心逻辑拆解

上述代码展示了 tick 聚合的三个关键步骤:

第一步:时间边界判断

def _get_period_start(self, timestamp_ms: int) -> int:
    return (timestamp_ms // self.interval_ms) * self.interval_ms

这是固定时间边界对齐的数学表达。通过整数除法,将任意时间戳映射到当前周期的起始点。

第二步:状态更新

# 更新当前 K 线
self._current_ohlc.high = max(self._current_ohlc.high, tick_price)
self._current_ohlc.low = min(self._current_ohlc.low, tick_price)
self._current_ohlc.close = tick_price

注意 close 的更新策略:每次 tick 都更新为最新价格。这样当周期结束时,close 就是最后一笔成交价。

第三步:周期切换检测

if self._current_ohlc is None or period_start > self._current_ohlc.timestamp:
    # 保存当前 K 线,创建新 K 线

当新的 tick 到达时,首先检查它的周期起始时间是否大于当前 K 线的周期起始时间。如果是,说明进入了新周期,需要"关闭"旧 K 线并创建新 K 线。


五、成交量加权的变体:VWAP 不是简单求平均

除了标准的 OHLC,还有一个常见的 K 线变体:VWAP K 线(成交量加权平均价 K 线)。

在一些专业工具中,你可能会看到"K 线带有 VWAP"或"VWAP 加权 K 线"。这不是简单地用 (O+H+L+C)/4,而是按成交量加权的均价:

VWAP = Σ(price_i × volume_i) / Σ(volume_i)

这意味着大单成交价对 VWAP 的贡献更大。

一个容易混淆的点:VWAP 是一个时间段的概念,不是"每一 tick 都计算一次 VWAP"。通常是在周期结束时,用周期内的总成交量加权均价作为 close 或额外字段。

如果你需要实现带 VWAP 的 K 线聚合器:

def _update_with_vwap(self, tick_price: float, tick_volume: float):
    """更新带 VWAP 的 K 线状态"""
    if self._current_ohlc is None:
        return
    
    # 维护加权价格和
    if not hasattr(self, '_vwap_sum'):
        self._vwap_sum = 0.0
        self._volume_sum = 0.0
    
    self._vwap_sum += tick_price * tick_volume
    self._volume_sum += tick_volume
    
    # VWAP 计算
    if self._volume_sum > 0:
        self._current_ohlc.vwap = self._vwap_sum / self._volume_sum

六、TickDB 历史 K 线数据的对齐策略

前面讨论的是实盘聚合——你从 tick 数据实时生成 K 线。但在回测和历史分析场景中,你通常直接使用 TickDB 的 /v1/market/kline 接口获取已聚合好的 K 线。

这时候你不需要自己写聚合器,但你需要理解 TickDB 的聚合策略,以便正确解读数据。

6.1 TickDB 的 K 线聚合原则

设计决策 TickDB 选择 含义
时间基准 UTC 所有时间戳均为 UTC
对齐方式 固定时间边界 周期从整周期开始(如 09:00:00 UTC)
缺失周期 返回空 K 线或跳过 取决于市场是否有成交
精度 毫秒 timestamp 精确到毫秒

6.2 调用示例

import requests
import os

API_KEY = os.environ.get("TICKDB_API_KEY")
headers = {"X-API-Key": API_KEY}

# 获取 BTC 1 小时 K 线
response = requests.get(
    "https://api.tickdb.ai/v1/market/kline",
    headers=headers,
    params={
        "symbol": "BTC.USDT",
        "interval": "1h",
        "start_time": 1714057200000,  # 2024-04-25 15:00 UTC
        "end_time": 1714143600000,    # 2024-04-26 15:00 UTC
        "limit": 100
    },
    timeout=(3.05, 10)
)

data = response.json()
if data.get("code") == 0:
    klines = data.get("data", [])
    for k in klines:
        dt = datetime.fromtimestamp(k["timestamp"] / 1000, tz=timezone.utc)
        print(f"{dt.isoformat()} | O={k['open']:.2f} H={k['high']:.2f} "
              f"L={k['low']:.2f} C={k['close']:.2f} V={k['volume']:.2f}")

6.3 获取当前 K 线(未收盘)

对于实时分析场景,你可能需要获取当前正在形成的 K 线(未收盘)。TickDB 提供 /kline/latest 接口:

# ⚠️ 注意:/kline/latest 获取的是**当前已收盘的最近一根 K 线**
# 不是"当前未收盘的 K 线"
# 如果需要实时未收盘 K 线,需要使用 WebSocket 订阅实时数据

response = requests.get(
    "https://api.tickdb.ai/v1/market/kline/latest",
    headers=headers,
    params={
        "symbol": "BTC.USDT",
        "interval": "1h"
    },
    timeout=(3.05, 10)
)

重要区分

接口 用途 返回内容
/v1/market/kline 历史回测 已收盘的 K 线
/v1/market/kline/latest 获取最近已收盘 K 线 最近一根已收盘 K 线
WebSocket kline 频道 实时监控 实时推送,包含当前未收盘 K 线

七、实盘中的非标准周期与特殊规则

除了标准的 1m/5m/1h/1d,量化策略有时会使用非标准周期(如 93 秒、4 小时 27 分)。这些周期的 K 线怎么生成?

TickDB 对非标准周期的支持:标准周期(如 1m、1h、1d)有原生支持;非标准周期需要客户端自行聚合

这意味着:

  • 如果你用 93 秒周期,TickDB 不会帮你切分
  • 你需要自己写聚合器(参考第四节代码),用 trades 数据实时聚合
  • 或者用 depth 数据做非标准周期的订单簿快照

7.1 非标准周期的对齐陷阱

使用非标准周期时,时间边界对齐变得更加敏感。

假设你用 93 秒周期:

  • 第 1 根 K 线:从 T₀ 开始,到 T₀+92,999ms 结束
  • 第 2 根 K 线:从 T₀+93,000ms 开始,到 T₀+185,999ms 结束

如果你和其他数据源(比如另一个供应商)的"第 2 根 K 线"对比,发现数据不一致,可能的原因是:

  1. 起始时间不同:对方从 T₀+1ms 开始,你从 T₀ 开始
  2. 周期长度不同:对方用 93.5 秒,你用 93 秒
  3. 时区不同:UTC vs 本地时间

建议:在协作或对比数据时,明确约定三个参数:

  • 起始基准时间(Unix timestamp)
  • 周期长度(毫秒)
  • 时区(UTC)

八、回测中的 K 线重建问题

在回测场景中,一个常见问题是:我只有历史 K 线,没有 tick 数据,能否重建 tick 级别的信号?

答案:不能完全重建,但可以做近似估计

8.1 K 线 → tick 的信息损失

从数学上看,OHLC 是对 tick 价格序列 {p₁, ..., pₙ}有损压缩

  • 你知道 max(p₁, ..., pₙ)min(p₁, ..., pₙ)
  • 你不知道 p₁pₙn 的具体值
  • 你完全不知道序列的顺序和分布

举例:假设一根 K 线是 O=100, H=105, L=98, C=102, V=10000。

可能的 tick 序列 A

p₁=100, p₂=105, p₃=98, p₄=102, 每笔100股,250笔

可能的 tick 序列 B

p₁=105, p₂=100, p₃=102, p₄=98, 每笔2500股,4笔

两种序列的 OHLC 和成交量完全相同,但:

  • 序列 A 代表"多空反复拉锯"(高频率、小单)
  • 序列 B 代表"一次大幅波动后横盘"(低频率、大单)

如果你用这根 K 线做回测,用序列 A 的假设和用序列 B 的假设,信号可能完全不同

8.2 近似重建策略

如果你的策略必须用 K 线数据,可以考虑以下近似方法:

def estimate_tick_distribution(ohlc: dict, num_ticks_hint: int = None) -> list:
    """
    基于 OHLC 近似估计 tick 序列(用于回测场景的信号生成)
    
    ⚠️ 这是近似方法,不代表真实 tick 序列
    ⚠️ 高频策略不建议依赖此方法
    """
    prices = []
    
    # 添加开盘价
    prices.append(ohlc["open"])
    
    # 添加随机中间价格(假设波动均匀分布)
    if num_ticks_hint and num_ticks_hint > 2:
        for _ in range(num_ticks_hint - 2):
            # 在 H 和 L 之间均匀采样
            mid = (ohlc["high"] + ohlc["low"]) / 2
            half_range = (ohlc["high"] - ohlc["low"]) / 2
            sampled = mid + random.uniform(-half_range, half_range)
            prices.append(round(sampled, 2))
    
    # 添加收盘价
    prices.append(ohlc["close"])
    
    return prices

再次强调:这种方法只适用于信号逻辑的粗略回测。任何依赖 tick 级别精确性的策略(如订单流分析、大单检测)都必须使用 tick 数据。


结语:K 线是工具,不是真相

回到开篇的问题:一根 1 分钟 K 线背后发生了什么?

现在你知道了答案:它发生了一切,但最终被压缩成四个数字

这个压缩过程是有意义的——它去除了高频噪点,暴露了结构性趋势。但它也引入了新的风险:你会把压缩算法的伪影当成市场信号。

三个核心认知

  1. K 线是聚合结果,不是采样结果。不同聚合规则(时间边界、对齐策略)会产生不同的 K 线。
  2. OHLC 丢失了大量信息。用 K 线做信号时,你需要意识到你丢失了什么。
  3. TickDB 的 K 线以 UTC 为基准、固定时间边界对齐。使用时注意时区转换和非标准周期的客户端聚合。

理解 K 线生成机制,不是为了绕过它,而是为了在它的约束下设计更稳健的策略


下一步行动

如果你在构建回测系统

  1. 确认你的 K 线数据源使用何种对齐策略
  2. 测试边界条件(如收盘时间、夏令时切换)
  3. 在回测报告中注明使用的 K 线周期和对齐方式

如果你需要 tick 级别的精确数据

  • 访问 tickdb.ai 了解 tradesdepth 接口
  • 机构级用户可联系 [email protected] 获取完整数据方案

如果你习惯用 AI 辅助开发

  • 在 ClawHub 安装 tickdb-market-data SKILL,可直接用自然语言查询 TickDB 数据接口

风险提示:本文不构成任何投资建议。K 线聚合仅是数据处理概念,与任何特定投资策略的有效性无关。市场有风险,投资需谨慎。