从 tick 到 K 线:价格是如何被"压缩"的
你盯着屏幕上跳动的数字——每一笔成交的单子,都在改变着市场的轨迹。但当你打开交易终端,看到的却是一根根整齐的 K 线:开盘价、最高价、最低价、收盘价。
这中间发生了什么?
那些毫秒级别的微观事件,如何被压缩成你每天看到的"一根线"?为什么同一个市场、同一个时间段,你和他画出来的 K 线可能不一样?
这不是一个无聊的技术细节。聚合规则的选择,直接决定了你的策略信号从哪里开始、到哪里结束。 一个在"标准"K 线上看起来完美的策略,可能因为边界对齐方式的不同,在实盘中亏损。
本文拆解这个被大多数教程一笔带过的底层问题。
一、问题的本质:信息压缩的代价
1.1 tick 数据的原始形态
每一笔成交 tick,本质上是一个五元组:
(time, price, volume, side, venue)
- time:精确到毫秒甚至微秒的时间戳
- price:成交价格
- volume:成交量
- side:主动买入还是主动卖出(撮合方向)
- venue:交易所代码
一个活跃股票每天产生数十万到数百万条 tick。处理全量 tick 数据需要相当的算力和存储成本。
K 线,是对这个信息流的压缩。
1.2 压缩的数学本质
从 N 个 tick 聚合到 1 根 K 线,是一个信息损失过程:
| 原始数据 | 压缩后 |
|---|---|
| N 个精确时间戳 | 1 个时间边界 |
| N 个价格 | 4 个价格(OHLC) |
| N 个成交量 | 1 个成交量 |
| N 个方向信息 | 全部丢弃 |
不同的聚合规则,决定了"损失哪些信息、保留哪些信息"。
这就像把一部高清电影压缩成一帧截图——你选择截哪一帧,决定了观众看到什么。
二、三种主流聚合规则:技术细节与陷阱
2.1 固定时间法(Fixed Time Boundaries)
这是最常见的聚合方式。将时间轴按固定周期切分:
Period 1: [00:00.000 - 00:59.999] → K线 1
Period 2: [01:00.000 - 01:59.999] → K线 2
...
实现逻辑:
from datetime import datetime, timedelta
from typing import List, Dict, Optional
class FixedTimeAggregator:
"""固定时间边界 K 线聚合器"""
def __init__(self, interval_seconds: int = 60):
self.interval = interval_seconds
self.current_bar: Optional[Dict] = None
self.current_bar_start: Optional[datetime] = None
def _get_bar_time(self, tick_time: datetime) -> datetime:
"""计算 tick 所属的 K 线起始时间"""
epoch = datetime(1970, 1, 1)
total_seconds = (tick_time - epoch).total_seconds()
bar_seconds = int(total_seconds // self.interval) * self.interval
return epoch + timedelta(seconds=bar_seconds)
def ingest_tick(self, tick: Dict) -> Optional[Dict]:
"""
摄入单个 tick,返回收盘的 K 线(如果有)
Args:
tick: {"time": datetime, "price": float, "volume": float}
Returns:
如果当前 K 线结束,返回 K 线字典;否则返回 None
"""
tick_time = tick["time"]
bar_time = self._get_bar_time(tick_time)
# 新 K 线开始
if self.current_bar is None or bar_time > self.current_bar_start:
# 返回上一个 K 线
if self.current_bar is not None:
finished_bar = self._close_bar()
self.current_bar = self._init_bar(bar_time, tick)
return finished_bar
else:
self.current_bar = self._init_bar(bar_time, tick)
self.current_bar_start = bar_time
# 更新当前 K 线
self._update_bar(tick)
return None
def _init_bar(self, bar_time: datetime, tick: Dict) -> Dict:
return {
"start": bar_time,
"open": tick["price"],
"high": tick["price"],
"low": tick["price"],
"close": tick["price"],
"volume": tick["volume"],
}
def _update_bar(self, tick: Dict):
self.current_bar["high"] = max(self.current_bar["high"], tick["price"])
self.current_bar["low"] = min(self.current_bar["low"], tick["price"])
self.current_bar["close"] = tick["price"]
self.current_bar["volume"] += tick["volume"]
def _close_bar(self) -> Dict:
self.current_bar["end"] = self.current_bar_start + timedelta(seconds=self.interval)
return self.current_bar.copy()
def flush(self) -> Optional[Dict]:
"""强制输出当前未收盘的 K 线"""
if self.current_bar is not None:
return self._close_bar()
return None
固定时间法的特点:
| 优点 | 缺点 |
|---|---|
| 实现简单,逻辑清晰 | 可能切断连续的价格波动 |
| 不同数据源对齐方便 | 边界 tick 的归属取决于精确到毫秒的切分 |
| 便于跨市场对比 | 实时计算时,K 线"未完成"状态持续存在 |
一个关键陷阱:假设某笔成交恰好发生在 09:30:00.000,它属于 9:30 的 K 线还是 9:29 的最后一笔?这取决于数据源的时间戳精度和服务器时钟配置。
2.2 滚动窗口法(Rolling Window)
滚动窗口不依赖固定的时间边界,而是从"第一笔成交"开始计算周期:
窗口 1: [第一笔 tick → 60秒后] → K线 1
窗口 2: [K线1结束后 → 60秒后] → K线 2
...
实现逻辑:
from datetime import datetime, timedelta
from collections import deque
class RollingWindowAggregator:
"""滚动窗口 K 线聚合器"""
def __init__(self, window_seconds: int = 60):
self.window = window_seconds
self.ticks: deque = deque() # 存储窗口内的所有 tick
self.window_start: Optional[datetime] = None
self.current_bar: Optional[Dict] = None
def ingest_tick(self, tick: Dict) -> List[Dict]:
"""
摄入 tick,返回期间可能产生的多根 K 线
"""
tick_time = tick["time"]
closed_bars = []
# 初始化第一个窗口
if self.window_start is None:
self.window_start = tick_time
self.ticks.append(tick)
self.current_bar = self._init_bar(tick_time, tick)
return []
# 判断是否需要开新窗口
while (tick_time - self.window_start).total_seconds() >= self.window:
# 关闭当前 K 线
closed_bar = self._close_current_bar()
closed_bars.append(closed_bar)
# 开新窗口:从关闭的 K 线的下一笔 tick 开始
self.window_start = tick_time
self.ticks.clear()
self.current_bar = self._init_bar(tick_time, tick)
# 更新当前窗口
self.ticks.append(tick)
self._update_bar(tick)
return closed_bars
def _init_bar(self, bar_start: datetime, tick: Dict) -> Dict:
return {
"start": bar_start,
"open": tick["price"],
"high": tick["price"],
"low": tick["price"],
"close": tick["price"],
"volume": tick["volume"],
}
def _update_bar(self, tick: Dict):
self.current_bar["high"] = max(self.current_bar["high"], tick["price"])
self.current_bar["low"] = min(self.current_bar["low"], tick["price"])
self.current_bar["close"] = tick["price"]
self.current_bar["volume"] += tick["volume"]
def _close_current_bar(self) -> Dict:
self.current_bar["end"] = self.window_start + timedelta(seconds=self.window)
return self.current_bar.copy()
def flush(self) -> Optional[Dict]:
if self.current_bar is not None:
return self._close_current_bar()
return None
滚动窗口法的特点:
| 优点 | 缺点 |
|---|---|
| 边界对齐更自然 | 不同数据源产生的 K 线可能不同步 |
| 避免"刚好切在高低点"的问题 | 实现复杂度更高 |
| 实时场景下 K 线状态更稳定 | 跨市场横向对比需要额外处理 |
2.3 成交量加权法(Volume-Based)
不以时间为维度,而以成交量为切分单位:
K线 1: 前 1000 股的总成交
K线 2: 接下来 1000 股的总成交
实现逻辑:
class VolumeBarAggregator:
"""成交量加权 K 线聚合器"""
def __init__(self, target_volume: float = 1000):
self.target_volume = target_volume
self.current_bar: Optional[Dict] = None
self.bar_volume: float = 0
def ingest_tick(self, tick: Dict) -> Optional[Dict]:
"""摄入 tick,返回可能产生的成交量加权 K 线"""
tick_time = tick["time"]
price = tick["price"]
volume = tick["volume"]
# 新 K 线开始
if self.current_bar is None:
self.current_bar = self._init_bar(tick_time, price, volume)
self.bar_volume = volume
return None
# 累计成交量
self.bar_volume += volume
self.current_bar["volume"] = self.bar_volume
# 更新价格信息
self.current_bar["high"] = max(self.current_bar["high"], price)
self.current_bar["low"] = min(self.current_bar["low"], price)
self.current_bar["close"] = price
# 判断是否达到目标成交量
if self.bar_volume >= self.target_volume:
closed_bar = self.current_bar.copy()
closed_bar["end"] = tick_time
closed_bar["vwap"] = self._calculate_vwap()
self.current_bar = None
self.bar_volume = 0
return closed_bar
return None
def _init_bar(self, bar_start: datetime, price: float, volume: float) -> Dict:
return {
"start": bar_start,
"open": price,
"high": price,
"low": price,
"close": price,
"volume": volume,
}
def _calculate_vwap(self) -> float:
"""计算成交量加权平均价格"""
total_pv = sum(t["price"] * t["volume"] for t in self.ticks)
return total_pv / self.bar_volume if self.bar_volume > 0 else 0
def flush(self) -> Optional[Dict]:
if self.current_bar is not None:
closed_bar = self.current_bar.copy()
closed_bar["end"] = self.current_bar.get("start")
return closed_bar
return None
三、三种规则的核心对比
| 维度 | 固定时间法 | 滚动窗口法 | 成交量加权法 |
|---|---|---|---|
| 切分依据 | 绝对时间边界 | 相对时间窗口 | 累积成交量 |
| K 线时长 | 严格等长 | 可能不等长 | 不等长 |
| 边界可预测性 | 事前可知 | 事后才知 | 事后才知 |
| 跨市场对齐 | 容易 | 困难 | 困难 |
| 适合策略类型 | 均值回归、突破 | 趋势跟随、事件驱动 | 订单簿分析、机构策略 |
| 实盘复杂度 | 低 | 中 | 高 |
一个关键洞察:固定时间法的 K 线边界是"硬切"的,而滚动窗口的边界是"软切"的。这个差异在事件驱动策略中尤为关键。
想象一个场景:某重要数据在 09:30:00.500 发布——恰好在 9:30 整点 K 线的边界上:
- 固定时间法:
09:30:00.000-09:30:59.999这根 K 线可能完整记录了数据发布后的全部冲击 - 滚动窗口法:如果前一根 K 线在
09:29:45.000结束,那么数据发布会被新窗口的起始 tick 捕获
两种方式记录的都是"真实发生的价格",但它们对策略信号的触发时点完全不同。
四、边界 case:那些让 K 线"失真"的陷阱
4.1 零成交时间窗
市场并非每时每刻都有成交。在低流动性品种或盘前盘后时段,可能出现"空白 K 线":
| 时间 | 固定时间法输出 | 滚动窗口法输出 |
|---|---|---|
| 09:30:00 - 09:30:59 | K 线 A(有成交) | K 线 A(有成交) |
| 09:31:00 - 09:31:59 | K 线 B(无成交) | 无输出(不生成空 K 线) |
| 09:32:00 - 09:32:59 | K 线 C(有成交) | K 线 C(有成交) |
处理方式取决于策略需求:
- 技术分析类策略:通常需要填充空 K 线(使用前一根收盘价)
- 量化因子类策略:可能需要标记"无效 K 线",避免误用
4.2 非标准时间对齐
不同数据源的服务器时钟可能存在毫秒级差异。假设:
- 数据源 A 的 tick 时间戳:
09:30:00.000 - 数据源 B 的 tick 时间戳:
09:30:00.003
即使使用完全相同的聚合规则,两个数据源产生的 K 线也会有微小差异。这种差异在高频策略中可能累积成显著的交易成本。
4.3 历史回测 vs 实盘的边界差异
这是最容易被忽视的问题。
回测时:你通常使用已经"切好"的 K 线数据(来自 TickDB 或其他数据服务),K 线边界已经固定。
实盘时:你需要实时聚合 tick 数据,而实时产生的 K 线边界可能与回测时使用的边界不完全一致。
这意味着:你在回测中验证有效的策略,在实盘时可能因为 K 线边界对齐方式的不同,产生完全不同的信号。
解决方案:
- 使用统一的数据源和聚合规则进行回测和实盘
- 在实盘代码中实现与回测相同的聚合逻辑
- 或者,直接使用提供实时 K 线聚合的数据服务
五、TickDB 的 K 线数据处理方式
如果你使用 TickDB 获取 K 线数据,有几个技术细节值得了解:
5.1 时间边界对齐
TickDB 的 /kline 接口返回的 K 线使用固定时间边界,时间戳对齐到 UTC 0 点开始的整分钟/整小时/整日。这确保了不同品种、不同日期的 K 线可以直接横向对比。
例如:获取 AAPL.US 的 1 小时 K 线
返回的时间戳会是 09:00:00, 10:00:00, 11:00:00...
而非 09:00:15, 10:00:22, 11:00:08...
5.2 OHLC 的计算规则
TickDB 的 K 线 OHLC 来自对应周期的聚合数据:
| 字段 | 定义 |
|---|---|
| Open | 周期内第一笔成交价格 |
| High | 周期内最高成交价格 |
| Low | 周期内最低成交价格 |
| Close | 周期内最后一笔成交价格 |
| Volume | 周期内总成交量 |
注意:这里的"成交价格"指的是撮合价格,与订单簿的报价(bid/ask)不同。如果需要分析订单簿深度的变化,应使用 depth 频道而非 K 线数据。
5.3 获取 K 线数据的代码示例
import os
import requests
import time
class TickDBKlineClient:
"""TickDB K 线数据客户端(生产级)"""
def __init__(self, api_key: str = None):
self.api_key = api_key or os.environ.get("TICKDB_API_KEY")
if not self.api_key:
raise ValueError("请设置 TICKDB_API_KEY 环境变量")
self.base_url = "https://api.tickdb.ai/v1"
self.session = requests.Session()
self.session.headers.update({"X-API-Key": self.api_key})
def get_historical_klines(
self,
symbol: str,
interval: str = "1h",
limit: int = 100,
start_time: int = None,
end_time: int = None
) -> list:
"""
获取历史 K 线数据
Args:
symbol: 交易品种,如 "BTC.USDT"
interval: K 线周期,"1m", "5m", "1h", "1d"
limit: 返回数量上限
start_time: 起始时间戳(毫秒)
end_time: 结束时间戳(毫秒)
Returns:
K 线列表,每根包含 [time, open, high, low, close, volume]
"""
params = {
"symbol": symbol,
"interval": interval,
"limit": limit,
}
if start_time:
params["start"] = start_time
if end_time:
params["end"] = end_time
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
response = self.session.get(
f"{self.base_url}/market/kline",
params=params,
timeout=(3.05, 10) # (connect_timeout, read_timeout)
)
if response.status_code == 200:
data = response.json()
if data.get("code") == 0:
return data.get("data", [])
elif data.get("code") == 3001:
# 限频处理
retry_after = int(response.headers.get("Retry-After", 5))
print(f"触发限频,等待 {retry_after} 秒后重试...")
time.sleep(retry_after)
retry_count += 1
continue
else:
raise RuntimeError(f"API 错误: {data.get('message')}")
else:
raise RuntimeError(f"HTTP 错误: {response.status_code}")
except requests.exceptions.Timeout:
retry_count += 1
if retry_count >= max_retries:
raise RuntimeError("请求超时,已达最大重试次数")
# 指数退避
delay = min(2 ** retry_count, 8)
time.sleep(delay)
raise RuntimeError("获取 K 线数据失败")
def get_latest_kline(self, symbol: str, interval: str = "1h") -> dict:
"""获取当前未收盘的 K 线"""
response = self.session.get(
f"{self.base_url}/market/kline/latest",
params={"symbol": symbol, "interval": interval},
timeout=(3.05, 10)
)
data = response.json()
if data.get("code") == 0:
return data.get("data")
raise RuntimeError(f"获取最新 K 线失败: {data.get('message')}")
六、实战建议:如何选择聚合规则
6.1 按策略类型选择
| 策略类型 | 推荐聚合规则 | 原因 |
|---|---|---|
| 趋势跟随 | 滚动窗口法 | 避免边界切在趋势启动点 |
| 均值回归 | 固定时间法 | 边界可预测,便于计算指标 |
| 事件驱动 | 滚动窗口法 | 事件不被边界切断 |
| 高频剥头皮 | 成交量加权法 | 捕捉订单流密度变化 |
| 机器学习特征 | 多种规则对比 | 特征工程需要多角度采样 |
6.2 按数据频率选择
| 数据频率 | 推荐聚合规则 | 注意事项 |
|---|---|---|
| 日线级别 | 固定时间法 | 时区问题可能影响边界 |
| 小时级别 | 固定时间法 | 跨交易所数据对齐较容易 |
| 分钟级别 | 滚动窗口法 | 避免关键时点被切断 |
| 秒级/tick级 | 成交量加权法 | 需要专业订单簿数据支持 |
6.3 一个被验证有效的实践
回测与实盘使用同一套聚合逻辑:
不要在回测时用一种聚合规则、实盘时用另一种。哪怕你使用的是 TickDB 这类专业数据服务,也建议在本地实现一层聚合封装,确保回测和实盘的行为完全一致。
# 统一封装示例
class UnifiedAggregator:
"""
统一聚合器:同时支持回测和实盘场景
回测时传入历史 tick 数据
实盘时实时摄入 tick
"""
def __init__(self, interval_seconds: int = 60, mode: str = "fixed"):
"""
Args:
interval_seconds: K 线周期(秒)
mode: "fixed" 固定时间 / "rolling" 滚动窗口
"""
if mode == "fixed":
self.aggregator = FixedTimeAggregator(interval_seconds)
else:
self.aggregator = RollingWindowAggregator(interval_seconds)
def backtest(self, historical_ticks: list) -> list:
"""回测模式:返回完整 K 线序列"""
bars = []
for tick in historical_ticks:
bar = self.aggregator.ingest_tick(tick)
if bar:
bars.append(bar)
last_bar = self.aggregator.flush()
if last_bar:
bars.append(last_bar)
return bars
def live(self, tick: dict) -> Optional[dict]:
"""实盘模式:返回可能收盘的 K 线"""
return self.aggregator.ingest_tick(tick)
结语
K 线是市场价格的"压缩快照"。理解这个压缩过程,不是为了成为学术专家,而是为了在正确的层级使用正确的数据。
当你知道一根 5 分钟 K 线是如何从数百个 tick 中生成,你就能理解:
- 为什么回测盈利,实盘可能亏损
- 为什么两个数据源的 K 线会有细微差异
- 为什么你的止损单"刚好"被扫了
价格是结果,订单簿是原因。K 线,是连接两者的桥梁。
下一步行动
如果你刚接触量化交易:
推荐从固定时间法开始,先跑通"获取数据—计算指标—生成信号—回测验证"的完整流程,再根据策略需求考虑其他聚合方式。
如果你在构建实时交易系统:
建议在本地实现与回测相同的聚合逻辑,确保回测和实盘的一致性。或者选择 TickDB 这样提供统一数据聚合服务的平台,减少你自己维护聚合逻辑的复杂度。
如果你在研究市场微观结构:
建议深入了解 tick 数据本身的特性——成交间隔、成交量分布、主动买卖比例——这些信息在 K 线层面会被压缩丢失,但在 tick 层面保留完整。
本文不构成任何投资建议。市场有风险,投资需谨慎。