代码在凌晨崩溃,而不是在白天

凌晨 3:17,你的交易监控系统告警响起。500 个协程正在运行,其中 3 个因为网络抖动永久挂起,既不报错也不退出。内存持续增长,但你不知道哪一部分在泄漏。你尝试发信号取消,但主进程无响应——它不知道如何正确地等待子协程结束。

这不是想象中的场景。这是每一个用 asyncio 构建过生产级系统的人迟早会遇见的问题。

协程的生命周期不只是 awaitreturn。当你需要管理 100 个并发任务、优雅地处理取消、处理部分失败、确保资源释放时,你会发现 asyncio 的官方文档只告诉你“如何启动”,却不告诉你“如何收场”。

本文从实战角度拆解协程生命周期管理的完整闭环:启动、监控、取消、清理。我们会用真实场景的错误模式和正确解法来说明每一个决策点的权衡。


一、为什么协程的生命周期比你想象的更复杂

asyncio.run() 看起来很简单。几行代码,一个主函数,几百个 await,好像就能跑起来。但当你真正进入生产环境,会遇到三类教科书不教的问题:

第一类:挂起陷阱。 网络请求没有设置超时,或者超时设置错误,协程会永久等待。一个这样的协程不可怕,100 个就会耗尽所有连接配额。

第二类:取消盲区。 task.cancel() 发出去,但没有 try/except asyncio.CancelledError,协程的清理逻辑永远不会执行。文件句柄、数据库连接、锁,全都会泄漏。

第三类:僵尸任务。 协程因为异常退出,但没有正确地报告,主进程还以为它在运行。内存持续增长,直到进程 OOM。

解决这三个问题的关键,是把协程看作一个有明确生命周期阶段的对象,而不是一段可以随时丢弃的代码。


二、协程生命周期的四个阶段

一个协程从创建到销毁,经历四个阶段:

PENDING → RUNNING → CANCELLED / FINISHED

asyncio.create_task() 把协程从 PENDING 推进到 RUNNING,同时返回一个 Task 对象——这是生命周期管理的核心。你所有的监控、取消、清理操作,都围绕这个 Task 对象展开。

但问题在于,协程内部代码必须配合生命周期管理才能正确工作。很多人写的代码会忽略这一点。

2.1 阶段一:启动——不要用裸协程

最常见的错误是直接调用协程函数但不管理结果:

# ❌ 错误示范:协程被启动但没有任何引用
async def fetch_market_data(symbol):
    return await client.get(symbol)

# 某处
fetch_market_data("AAPL.US")  # 协程被创建了,但没有保存返回值
# Python 事件循环可能根本还没来得及执行它,函数就已经结束了

正确做法是用 create_task 显式创建任务:

# ✅ 正确:创建 Task 并保存引用
task = asyncio.create_task(fetch_market_data("AAPL.US"))
tasks.append(task)

create_task 的核心作用是:给协程一个明确的引用,纳入事件循环的调度体系,并在任务完成后提供统一的结果访问接口。没有这个引用,任务的生命周期就无法被追踪。

2.2 阶段二:并发启动——gather 的正确和错误用法

批量启动协程时,asyncio.gather 是最常用的工具。但它的行为比你想象的更复杂。

# gather 的默认行为:任意一个协程抛异常,全部结果被丢弃
async def fetch(symbol):
    if symbol == "BROKEN":
        raise ValueError("Symbol not found")
    return await client.get(symbol)

# ❌ 如果 BREAK 被放在第三个位置,全部结果都收不到
results = await asyncio.gather(
    fetch("AAPL.US"),
    fetch("BROKEN"),       # 这里抛异常
    fetch("TSLA.US"),
)  # CancelledError 被抛出,results 永远拿不到

gather 的第一个参数 return_exceptions=True 能改变这个行为:

# ✅ 用 return_exceptions=True 让失败的协程返回异常对象而非抛出
results = await asyncio.gather(
    fetch("AAPL.US"),
    fetch("BROKEN"),
    fetch("TSLA.US"),
    return_exceptions=True
)
# results: [正常数据, ValueError(...), 正常数据]

但这只是解决了“收不到结果”的问题。更深一层的问题是:你需要知道哪个失败了、失败原因是什么、失败后要不要重试。

这时你需要更精细的控制。


三、TaskGroup:结构化的并发管理

Python 3.11 引入了 asyncio.TaskGroup,这是我认为 asyncio 历史上最重要的 API 更新之一。它解决了一个核心问题:gather 无法主动管理子任务的取消

TaskGroup 的核心语义是:当你用 group.start_soon() 注册一个任务后,组内的任意异常会导致组内所有其他任务被同步取消,然后 group 本身会等所有任务完成后再抛出异常。

async def fetch_with_timeout(symbol, timeout=5.0):
    """带超时的行情数据获取"""
    async with asyncio.timeout(timeout):
        return await client.get(symbol)

async def batch_fetch(symbols: list[str]):
    async with asyncio.TaskGroup() as group:
        tasks = {}
        for symbol in symbols:
            # start_soon 会在任务完成后自动处理生命周期
            tasks[symbol] = group.start_soon(
                fetch_with_timeout, symbol
            )
    
    # TaskGroup 退出时,所有任务都已完成或已取消
    return {symbol: task.result() for symbol, task in tasks}

asyncio.timeout 是另一个重要工具,它和 asyncio.TimeoutError 配合,可以在协程内部直接处理超时:

async def fetch_market_data(symbol):
    try:
        async with asyncio.timeout(5.0):
            return await client.get(symbol)
    except asyncio.TimeoutError:
        # 这里协程已经收到了 CancelledError
        # 可以在这里做清理:关闭连接、释放锁、更新状态
        logger.warning(f"{symbol} fetch timeout, cleanup completed")
        return None

3.1 gather vs TaskGroup:怎么选

场景 工具 原因
固定数量任务,全部完成才继续 gather 结构简单
需要部分失败后继续执行其他任务 gather(..., return_exceptions=True) 隔离错误
组内一个失败,需要全部取消 TaskGroup 原子性取消
需要在运行中途添加新任务 TaskGroup gather 不支持动态添加
需要对每个任务设置独立超时 asyncio.timeout + TaskGroup 最灵活
只需要等待全部完成 两者皆可 根据偏好选择

在交易系统的场景中,一个典型场景是:同时监控 50 只股票的 depth 数据,只要任意一只股票的 WebSocket 连接断了一定时间(比如 30 秒),就认为该连接不可靠,取消所有 50 个监控任务并重建连接。用 TaskGroup 配合 asyncio.timeout 就能优雅实现这个逻辑。


四、取消:信号处理的正确姿势

生产环境中,协程的取消几乎总是由外部事件触发的:用户按 Ctrl+C、收到了 SIGTERM、监控发现某个任务超时。这些外部事件通过信号传递给 Python 进程。

但 asyncio 的信号处理有一个容易踩的坑:asyncio 的事件循环运行在一个线程中,信号处理器在主线程中执行,两者的协调需要正确的方式

4.1 用屏蔽信号保护事件循环

import signal
import asyncio

async def run_trading_system():
    """交易系统主函数"""
    
    # 第一步:创建信号处理器,定义清理行为
    shutdown_event = asyncio.Event()
    
    def signal_handler(signum, frame):
        logger.info(f"Received signal {signum}, initiating graceful shutdown")
        shutdown_event.set()
    
    # 第二步:注册信号处理器到主线程
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)
    
    # 第三步:让协程监听 shutdown_event,而不是直接抛异常
    tasks = []
    for symbol in SYMBOLS:
        task = asyncio.create_task(
            monitor_depth(symbol, shutdown_event)  # 注意这里传入了 shutdown 事件
        )
        tasks.append(task)
    
    # 等待任意一个任务失败,或者收到 shutdown 信号
    await shutdown_event.wait()
    
    # 第四步:触发取消
    logger.info("Initiating task cancellation")
    pending = [t for t in tasks if not t.done()]
    
    for task in pending:
        task.cancel()
    
    # 第五步:等待所有任务完成清理(关键!)
    await asyncio.gather(*pending, return_exceptions=True)
    logger.info("All tasks cancelled and cleaned up")

这段代码的核心逻辑是:不用信号处理器直接取消任务,而是让信号处理器设置一个 asyncio.Event,主协程监听这个事件,在合适的时机统一触发取消

这样做有三个原因:

  1. 信号处理器在主线程执行,而 asyncio 事件循环也在主线程执行。如果信号处理器直接调用 task.cancel(),可能恰好在事件循环执行另一个协程的中间,造成竞态条件。

  2. 统一取消比分散取消更容易保证清理逻辑的执行。清理代码往往需要访问共享状态(数据库连接、文件句柄),分散取消会导致清理逻辑的执行顺序不可控。

  3. await asyncio.gather(*pending, return_exceptions=True)return_exceptions=True 是关键:它保证即使某个协程的清理代码也抛异常,我们仍然能等待所有任务完成。

4.2 协程内部的取消响应

协程必须正确响应 CancelledError,否则取消操作无法生效:

async def monitor_depth(symbol, shutdown_event):
    """行情监控协程——正确响应取消"""
    try:
        while not shutdown_event.is_set():
            depth = await tickdb_ws.subscribe_depth(symbol)
            await process_depth(depth)
            await asyncio.sleep(1)  # 控制轮询频率
    except asyncio.CancelledError:
        # 必须在 except 块中处理清理逻辑
        logger.info(f"Cleaning up monitor for {symbol}")
        await close_websocket_connection()  # 关闭 WebSocket
        await flush_buffer_to_db()          # 刷新缓冲区
        await release_lock(symbol)           # 释放持有的锁
        raise  # 重新抛出 CancelledError,让 Task 知道协程已正确退出

很多人会在 except 块中直接 pass,不重新抛出 CancelledError。这会导致一个问题:Task 的状态会停留在 CANCELLED,但主进程无法确认协程是否完成了清理。正确的方式是清理完成后重新抛出。


五、监控:知道每个协程在做什么

取消的前提是监控。你不能取消一个你不知道还在运行的任务。

5.1 Task 状态追踪

import asyncio
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    DONE = "done"
    FAILED = "failed"
    CANCELLED = "cancelled"

@dataclass
class TaskMonitor:
    """协程任务监控器"""
    _tasks: dict[str, asyncio.Task] = field(default_factory=dict)
    _lock: asyncio.Lock = field(default_factory=asyncio.Lock)
    
    def register(self, name: str, coro):
        task = asyncio.create_task(coro)
        self._tasks[name] = task
        return task
    
    def get_status(self, name: str) -> TaskStatus:
        task = self._tasks.get(name)
        if not task:
            return TaskStatus.PENDING
        if task.done():
            if task.cancelled():
                return TaskStatus.CANCELLED
            if task.exception():
                return TaskStatus.FAILED
            return TaskStatus.DONE
        return TaskStatus.RUNNING
    
    async def cancel_all(self, timeout=10.0):
        """取消所有任务,等待清理完成"""
        to_cancel = [t for t in self._tasks.values() if not t.done()]
        for task in to_cancel:
            task.cancel()
        
        # 给清理逻辑最多 timeout 秒完成
        await asyncio.wait_for(
            asyncio.gather(*to_cancel, return_exceptions=True),
            timeout=timeout
        )
        await self._clear_done_tasks()
    
    async def _clear_done_tasks(self):
        async with self._lock:
            self._tasks = {
                name: task for name, task in self._tasks.items()
                if not task.done()
            }
    
    async def get_summary(self) -> dict:
        """返回所有任务的状态摘要"""
        summary = {}
        for name in self._tasks:
            task = self._tasks[name]
            status = self.get_status(name)
            exc_info = None
            if status == TaskStatus.FAILED:
                exc_info = task.exception()
            summary[name] = {
                "status": status.value,
                "exception": str(exc_info) if exc_info else None
            }
        return summary

这个监控器解决了生产环境中的核心问题:某个协程崩溃了,你知道是哪一個、为什么、什么时候崩溃的。在没有监控的情况下,一个失败的 WebSocket 连接只会沉默地退出,你可能几天后发现内存泄漏了 2GB。

5.2 用 logging 上下文追踪协程

每条日志带有协程标识:

import contextvars

# 每个协程拥有独立的 logging 上下文
task_context = contextvars.ContextVar("task_name", default="main")

async def monitored_task(symbol: str):
    logger = logging.getLogger("trading")
    task_context.set(symbol)
    # 所有日志都会自动带上 symbol 作为上下文
    logger.info(f"Task started")
    try:
        await asyncio.sleep(10)
    finally:
        logger.info(f"Task completed")

六、生产级模板:从启动到清理的完整闭环

整合以上所有技术点,以下是可用于生产环境的协程生命周期管理模板:

import asyncio
import signal
import logging
from dataclasses import dataclass, field

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(task_name)s] %(levelname)s: %(message)s",
    style="%"
)
logger = logging.getLogger("system")


@dataclass
class AsyncSystem:
    """协程系统生命周期管理器"""
    tasks: dict[str, asyncio.Task] = field(default_factory=dict)
    shutdown_event: asyncio.Event = field(default_factory=default_factory.asyncio.Event)
    _task_counter: int = field(default=0)
    
    def start(self, name: str, coro) -> asyncio.Task:
        """启动带命名的协程"""
        self._task_counter += 1
        task_id = f"{name}#{self._task_counter}"
        task = asyncio.create_task(self._wrap_coro(task_id, coro))
        self.tasks[task_id] = task
        logger.info(f"Task {task_id} registered")
        return task
    
    async def _wrap_coro(self, task_id: str, coro):
        """协程包装器:处理取消和错误"""
        try:
            return await coro
        except asyncio.CancelledError:
            logger.info(f"Task {task_id} cancelled")
            raise
        except Exception as e:
            logger.error(f"Task {task_id} failed: {e}", exc_info=True)
            raise
    
    async def shutdown(self, timeout=30.0):
        """优雅关闭:取消所有任务,等待清理完成"""
        logger.info(f"Shutdown initiated, {len(self.tasks)} tasks to cancel")
        self.shutdown_event.set()
        
        pending = [t for t in self.tasks.values() if not t.done()]
        for task in pending:
            task.cancel()
        
        if pending:
            done, pending = await asyncio.wait(
                pending, timeout=timeout,
                return_when=asyncio.ALL_COMPLETED
            )
            
            if pending:
                logger.warning(
                    f"{len(pending)} tasks did not complete within {timeout}s"
                )
        
        logger.info("Shutdown completed")
    
    async def wait_until_shutdown(self):
        """等待任意任务失败或收到关闭信号"""
        try:
            # 等待第一个任务完成(正常或异常)
            done, pending = await asyncio.wait(
                self.tasks.values(),
                return_when=asyncio.FIRST_COMPLETED
            )
            
            # 处理第一个完成的任务的异常
            for task in done:
                if task.exception():
                    logger.error(f"Task failed with: {task.exception()}")
                    # 根据业务策略决定是否立即关闭系统
                    await self.shutdown()
                    return
            
            # 否则等待 shutdown 信号
            await self.shutdown_event.wait()
        except asyncio.CancelledError:
            await self.shutdown()


async def main():
    system = AsyncSystem()
    
    # 注册信号处理器
    loop = asyncio.get_running_loop()
    for sig in (signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(
            sig,
            lambda s=sig: system.shutdown_event.set()
        )
    
    # 启动多个监控任务
    symbols = ["AAPL.US", "TSLA.US", "NVDA.US", "META.US", "AMZN.US"]
    for symbol in symbols:
        system.start(
            f"monitor-{symbol}",
            monitor_depth(symbol, system.shutdown_event)
        )
    
    # 启动一个定时任务
    system.start("scheduler", periodic_rebalance(system.shutdown_event))
    
    await system.wait_until_shutdown()


if __name__ == "__main__":
    asyncio.run(main())

这个模板覆盖了以下关键场景:

  • 协程命名与追踪
  • 信号驱动的优雅关闭
  • 超时等待所有任务清理
  • 任务失败后的自动关闭决策

七、最常见的五个错误及修正

在生产环境中,以下五个错误几乎出现在每一个 asyncio 项目的第一版中:

错误一:不等待任务完成就退出。

# ❌
async def main():
    for i in range(100):
        asyncio.create_task(worker(i))
    print("All tasks launched")  # 主函数结束,但任务还在运行

# ✅
async def main():
    tasks = [asyncio.create_task(worker(i)) for i in range(100)]
    await asyncio.gather(*tasks)  # 显式等待

错误二:超时用 time.sleep 而不是 asyncio.sleep

# ❌ 阻塞整个事件循环
await time.sleep(5)

# ✅ 让出控制权
await asyncio.sleep(5)

错误三:在 gather 中不用 return_exceptions,导致一个失败全部丢失。

错误四:协程的 except 块中没有重新抛出 CancelledError

错误五:忘记在 Windows 上 loop.add_signal_handler 只支持 SIGINT 和 SIGTERM。 如果你的生产环境是 Windows Server,用事件轮询代替信号处理。


八、结语

协程的生命周期管理,本质上是在回答一个问题:当你有 1000 个并发任务运行时,如果系统需要关闭,每一个任务能不能安全、完整、按时地退出?

这不是一个可以用“写好代码”来绕过的问题。它需要你在架构层面就设计好监控、取消信号、清理这三个环节的协作方式。

asyncio.TaskGroupasyncio.timeout、contextvars 上下文日志——这些工具每一个都不复杂,但组合在一起,就构成了一个可以跑三年的生产级异步系统的骨架。


下一步行动

如果你在写实时行情监控,访问 tickdb.ai 查看 WebSocket 的深度数据订阅能力,配合本文的 TaskGroup 模式可以同时管理多个标的的连接生命周期。

如果你在构建回测框架,建议先用 asyncio.TaskGroup 重写数据获取模块,用结构化并发替代无组织的 gather 调用,回测的稳定性会显著提升。

如果你想用 AI 辅助开发,在 AI 助手中搜索安装 tickdb-market-data SKILL,它封装了 TickDB 的异步数据获取接口,涵盖了心跳、重连、超时等生产级细节,可以直接集成进你的 asyncio 系统。

风险提示:本文不构成任何投资建议。异步编程中的资源泄漏和任务挂起问题可能影响系统稳定性,请在生产环境部署前充分测试。