Python asyncio 并发模式:从协程原理到 Rust 开发者的思维转换

Python asyncio 并发模式:从协程原理到 Rust 开发者的思维转换

一、Rust 开发者看 Python asyncio:相似但不同

我学 Rust 的 Tokio 之前先学了 Python 的 asyncio,当时觉得两者差不多——都是事件循环 + 协程。后来深入对比才发现,虽然概念相似,实现差异很大。Python 的协程是基于生成器的,运行时动态调度;Rust 的 Future 是编译期状态机,零成本抽象。

最大的思维差异在于错误处理。Python 用 try/except,协程里的异常会冒泡到事件循环;Rust 用 Result,编译器强制你处理每个可能的错误。在 asyncio 中忘记捕获异常,整个事件循环可能崩溃;在 Tokio 中,JoinError 会被类型系统捕获。

另一个差异是取消机制。Python 的协程取消通过asyncio.CancelledError实现,但协程可以捕获并忽略这个异常;Rust 的CancellationToken是协作式的,Future 必须主动检查取消信号。Python 的方式更灵活但更危险,Rust 的方式更安全但需要更多样板代码。

二、asyncio 的底层机制:事件循环与协程调度

Python asyncio 的核心是事件循环(Event Loop)。事件循环不断从就绪队列中取出协程执行,遇到 IO 操作时注册回调,IO 完成后把协程放回就绪队列。协程的切换发生在await点,和 Rust 的.await类似。

flowchart TB A[事件循环 Event Loop] --> B[就绪队列<br/>Ready Queue] A --> C[IO 多路复用<br/>epoll/kqueue] A --> D[定时器队列<br/>Timer Heap] B --> E[取出就绪协程执行] E --> F{遇到 await} F -->|IO 操作| G[注册回调到 epoll] F -->|sleep| H[加入定时器队列] F -->|完成| I[返回结果] G -->|IO 完成| J[回调将协程加入就绪队列] H -->|超时| J J --> B subgraph 协程状态 K[创建: coroutine object] L[运行: 正在执行] M[挂起: 等待 IO/定时器] N[完成: 返回结果] O[取消: CancelledError] end K --> L --> M --> N M --> O subgraph 与 Tokio 的对比 P[Python: 动态调度<br/>运行时决定] Q[Rust: 静态状态机<br/>编译期生成] R[Python: 绿色线程<br/>无 Send 约束] S[Rust: Send + 'static<br/>编译期检查] end

关键区别:Python 的协程是"懒"的——创建后不会执行,必须被 await 或调度。Rust 的 Future 也是"懒"的,但通过.await驱动状态机转换。Python 的开销在运行时动态调度,Rust 的开销在编译期代码膨胀。

三、生产级代码实现:asyncio 并发模式

3.1 并发任务编排

import asyncio from dataclasses import dataclass, field from typing import List, Optional, Any import time @dataclass class TaskResult: """任务执行结果""" task_name: str success: bool data: Any = None error: Optional[str] = None duration_ms: float = 0.0 class ConcurrentRunner: """并发任务编排器""" def __init__(self, max_concurrency: int = 10): # 为什么限制并发数:Python 的 asyncio # 虽然是协作式调度,但并发量过大 # 会导致事件循环调度开销增加, # 内存占用上升;10 是一个经验值, # 适用于大多数 IO 密集场景 self.semaphore = asyncio.Semaphore( max_concurrency) async def run_with_limit( self, coro, name: str ) -> TaskResult: """带并发限制的任务执行""" async with self.semaphore: start = time.perf_counter() try: result = await coro duration = (time.perf_counter() - start) * 1000 return TaskResult( task_name=name, success=True, data=result, duration_ms=round(duration, 2)) except asyncio.CancelledError: # 取消不是错误,直接传播 raise except Exception as e: duration = (time.perf_counter() - start) * 1000 return TaskResult( task_name=name, success=False, error=str(e), duration_ms=round(duration, 2)) async def gather_with_limit( self, coros: List, names: List[str] ) -> List[TaskResult]: """带并发限制的批量执行""" tasks = [ self.run_with_limit(coro, name) for coro, name in zip(coros, names) ] # gather 的 return_exceptions=True # 确保单个任务失败不影响其他任务 # 为什么用 return_exceptions: # 默认情况下 gather 会在任意 # 任务抛异常时立即取消其他任务, # return_exceptions=True 让 # 异常作为结果返回 results = await asyncio.gather( *tasks, return_exceptions=True) return [ r if isinstance(r, TaskResult) else TaskResult( task_name=names[i], success=False, error=str(r)) for i, r in enumerate(results) ] async def first_completed( self, coros: List, names: List[str], timeout: float = 5.0 ) -> TaskResult: """竞速执行:返回最先完成的结果""" tasks = { asyncio.create_task( self.run_with_limit(coro, name), name=name ): name for coro, name in zip(coros, names) } try: done, pending = await asyncio.wait( tasks.keys(), timeout=timeout, return_when=asyncio.FIRST_COMPLETED) # 取消未完成的任务 # 为什么必须取消:未取消的 # 任务会继续占用资源, # 即使结果不再需要 for task in pending: task.cancel() # 等待取消完成 await asyncio.gather( *pending, return_exceptions=True) # 返回第一个完成的结果 for task in done: return task.result() except asyncio.TimeoutError: # 全部超时 for task in tasks: task.cancel() return TaskResult( task_name="timeout", success=False, error=f"全部任务超时 {timeout}s") return TaskResult( task_name="none", success=False, error="无任务完成")

3.2 生产者-消费者模式

import asyncio from typing import AsyncIterator class AsyncProducerConsumer: """异步生产者-消费者模式""" def __init__( self, queue_size: int = 100, consumer_count: int = 3 ): # 为什么限制队列大小:无界队列 # 在生产速度大于消费速度时会 # 导致内存溢出;有界队列在满时 # 阻塞生产者,实现自然背压 self.queue = asyncio.Queue(maxsize=queue_size) self.consumer_count = consumer_count self._stopped = False async def producer( self, source: AsyncIterator ): """生产者:从数据源读取并放入队列""" try: async for item in source: if self._stopped: break # 队列满时自动背压 await self.queue.put(item) finally: # 发送结束信号 # 为什么用 None 作为哨兵: # 每个消费者需要一个哨兵 # 才能正常退出循环 for _ in range(self.consumer_count): await self.queue.put(None) async def consumer( self, consumer_id: int, process_fn ): """消费者:从队列取出并处理""" while True: item = await self.queue.get() if item is None: # 收到结束信号 self.queue.task_done() break try: await process_fn(item, consumer_id) except Exception as e: # 消费者不能因单个 # 处理失败而退出 print(f"消费者 {consumer_id} " f"处理失败: {e}") finally: self.queue.task_done() async def run( self, source: AsyncIterator, process_fn ): """运行生产者-消费者""" producer_task = asyncio.create_task( self.producer(source)) consumer_tasks = [ asyncio.create_task( self.consumer(i, process_fn)) for i in range(self.consumer_count) ] # 等待生产者完成 await producer_task # 等待所有消费者完成 await asyncio.gather(*consumer_tasks)

3.3 超时与取消处理

import asyncio async def with_timeout( coro, timeout: float, task_name: str = "unnamed" ): """带超时的协程执行""" try: # 为什么用 wait_for 而非手动计时: # wait_for 在超时时自动取消任务, # 释放资源;手动计时需要自己 # 管理任务取消和清理 result = await asyncio.wait_for( coro, timeout=timeout) return result except asyncio.TimeoutError: print(f"任务 {task_name} 超时 " f"({timeout}s)") raise except asyncio.CancelledError: # 取消时的清理逻辑 # 为什么单独捕获 CancelledError: # 取消是正常的控制流, # 不是错误;需要做清理 # 但不应该记录为错误 print(f"任务 {task_name} 被取消") raise async def graceful_shutdown( tasks: List[asyncio.Task], timeout: float = 5.0 ): """优雅关闭所有任务""" # 发送取消请求 for task in tasks: task.cancel() # 等待任务响应取消 # 为什么给超时:有些任务可能 # 不响应取消(如 C 扩展中的 # 阻塞调用),超时后强制放弃 results = await asyncio.gather( *tasks, return_exceptions=True) for i, result in enumerate(results): if isinstance(result, Exception) and \ not isinstance(result, asyncio.CancelledError): print(f"任务 {i} 关闭异常: {result}")

四、asyncio 的边界:与 Rust Tokio 的关键差异

性能差距:Python asyncio 的单次协程切换开销约 1μs,Rust Tokio 约 0.1μs。在高并发场景下(10 万+ 协程),这个差距会累积。Python 的 GIL 虽然在 IO 密集场景下不阻塞,但 CPU 密集操作会锁住整个解释器。

类型安全:Python 的协程没有 Send/Sync 约束,多协程共享可变状态不会编译报错,只在运行时出问题。Rust 的 Send + 'static 约束在编译期就排除了数据竞争。

取消语义:Python 的取消通过抛出 CancelledError 实现,协程可以捕获并忽略。Rust 的取消是协作式的,Future 必须主动检查。Python 的方式更灵活但更危险——忽略取消会导致资源泄漏。

调试难度:Python 的异步调用栈比同步深得多,异常信息难以追踪。Rust 的编译器错误虽然难读,但至少在编译期就暴露了问题。

五、总结

Python asyncio 和 Rust Tokio 的核心概念相似(事件循环 + 协程),但实现和约束差异很大。Python 更灵活但更危险,Rust 更安全但更严格。从 Rust 回来看 asyncio,最需要注意三点:用 Semaphore 控制并发数,用 return_exceptions=True 防止级联失败,用 CancelledError 做清理而非忽略。asyncio 的优势是生态成熟、开发速度快,劣势是性能和类型安全。如果你的项目对性能和可靠性要求高,Rust 是更好的选择;如果追求开发速度,Python asyncio 足够用。