1. 异步生成器的核心概念
异步生成器与普通生成器的区别
在 Python 中,普通生成器通过 yield 提供惰性迭代,而 异步生成器则通过 async def 与 yield 的组合实现异步的逐步产出。它们遵循 异步迭代协议,在需要等待 I/O 的场景下能让事件循环并行处理。
要点在于异步生成器只能在 async for 或 anext 的上下文中消费。
异步生成器的基本语法
在一个异步生成器中,函数定义使用 async def,内部可以使用 yield 来产出值,同时可以在产出前后使用 await 来等待异步操作。这种结构使得生成器既具备惰性产出,也具备异步 I/O 等待能力。这种模式是实现高并发后端数据流的基础。
import asyncioasync def numbers():for i in range(5):await asyncio.sleep(0.1) # 模拟 I/Oyield i
异步遍历的基本模式
消费异步生成器的常见方式是 async for,它会在遇到可用的数据时继续执行,遇到等待时释放事件循环给其他任务。
通过这种模式,后端可以实现高吞吐的数据流处理。
async def main():async for v in numbers():print(v)# 运行需要在事件循环中执行
# asyncio.run(main())
2. 异步生成器的实战用法
定义一个简单的异步数据流
最常见的用法是定义一个数据流源,它从某个 IO 源产生数据。定义阶段要确保使用 async def 与 yield 的组合,里面的等待操作用 await 表示。
这样后续的消费者就可以通过 async for 逐个接收数据。
import asyncio
from random import randintasync def fast_source():for _ in range(10):await asyncio.sleep(0.05)yield randint(1, 100)
把生成器放进并发框架
异步生成器本身是耗时操作的产物,常与 asyncio.gather、asyncio.as_completed 等组合,用于并发执行多个数据源或任务。
通过组合,可以实现对多路数据源的并发聚合。
async def aggregate_sources():sources = [fast_source() for _ in range(3)]tasks = [asyncio.create_task(s) for s in sources]for t in asyncio.as_completed(tasks):yield await t
生产者-消费者模式的协同
在后端流式处理中,常将异步生成器作为生产者,配合一个或多个 消费者 协程一起工作。
需要注意适当的缓冲区与背压控制,避免任一端成为瓶颈。
from asyncio import Queueasync def producer(queue: Queue):async for item in fast_source():await queue.put(item)async def consumer(queue: Queue):while True:item = await queue.get()if item is None:break# 处理 itemqueue.task_done()async def run_pipeline():q = Queue(maxsize=10)await asyncio.gather(producer(q), consumer(q))
3. 异步生成器在后端的并发模型中的应用
流式 API 与服务器端渲染
在面向前端的 API 设计中,流式响应可以逐步返回数据,降低端到端延迟。
使用异步生成器,我们可以把数据库查询、网络请求、数据加工等阶段逐步串联,形成一个管道。
# 以 FastAPI 为例
from fastapi import FastAPI
from starlette.responses import StreamingResponse
import asyncioapp = FastAPI()async def stream_numbers():for i in range(100):await asyncio.sleep(0.05)yield f"data: {i}\\n\\n"@app.get("/stream")
async def stream():return StreamingResponse(stream_numbers(), media_type="text/event-stream")
数据库与外部服务的并发拉取
通过异步生成器,可以把数据库游标或外部服务的逐条响应组合成一个统一的数据流。避免阻塞,提升请求的并发吞吐量。
注意对连接进行正确的清理与超时控制。
async def fetch_rows(conn, query):async with conn.cursor() as cur:await cur.execute(query)while True:row = await cur.fetchone()if row is None:breakyield row
错误处理与资源清理
在异步生成器中,错误传播与 资源清理尤为重要。
使用 try/except 捕获异常,并在必要时用 aclose 或 finally 块进行清理。
async def safe_stream():agen = numbers()try:async for item in agen:yield itemexcept Exception as exc:# 记录日志、清理资源raisefinally:await agen.aclose()
4. 性能优化要点:并发性与资源利用
合理的并发粒度与缓冲区
要点在于将 生产速度 与 消费速度 匹配,通过缓冲区控制背压,避免内存膨胀或阻塞。
可以通过设置队列的 maxsize、限速器等手段来实现。

QUEUE_SIZE = 20async def bounded_producer(queue: asyncio.Queue):async for item in fast_source():await queue.put(item)async def bounded_consumer(queue: asyncio.Queue):while True:item = await queue.get()if item is None:breakprocess(item)queue.task_done()async def main():q = asyncio.Queue(maxsize=QUEUE_SIZE)await asyncio.gather(bounded_producer(q), bounded_consumer(q))
取消、超时与容错
在长时间运行的后端任务中,取消 与 超时机制至关重要。
对异步生成器的消费端应实现对取消的优雅处理,确保资源被正确回收。
async def cancellable_consumer(agen):try:async for item in agen:process(item)except asyncio.CancelledError:await agen.aclose()raise
监控与诊断
对事件循环、任务数、等待时间等指标进行监控,能帮助定位阻塞点与内存瓶颈。
结合 logging 和 tracing,能够快速定位问题。
5. 实战场景:后端 API 的数据流与示例
来自数据库的分页流式查询
利用异步生成器对数据库分页查询进行分页流式输出,降低峰值内存占用。
在后端接口中,客户端可以边接收数据边处理。分页控制、错误处理、资源释放等是设计要点。
# 假设使用 asyncpg
import asyncpg
import asyncioasync def stream_users(conn):async with conn.transaction():async for row in conn.cursor("SELECT id, name FROM users", timeout=60):yield row['id'], row['name']
与服务端事件(SSE)结合
服务器端事件是常见的前端数据流场景,通过异步生成器逐条推送事件。
这种模式天然支持回压与渐进加载。
# 伪代码:在 FastAPI 中的 SSE 实现
async def events():for i in range(100):yield f"data: {i}\\n\\n"await asyncio.sleep(0.2)
与 WebSocket 的协同工作
结合 WebSocket,可以将异步生成器的产出作为消息源,持续向客户端推送数据。
需要处理连接管理、心跳与异常重连。
# WebSocket 案例(简化)
from fastapi import WebSocket@app.websocket("/ws")
async def ws_endpoint(ws: WebSocket):await ws.accept()async for msg in numbers():await ws.send_text(str(msg))
6. 常见坑与调试技巧
资源泄漏风险
不正确关闭异步生成器会导致资源泄漏,务必在结束时调用 aclose,或使用 finally 块确保清理。
agen = numbers()
try:async for n in agen:process(n)
finally:await agen.aclose()
异常传播与隔离
在异步生成器中抛出的异常应被合理捕获,避免阻塞事件循环或使其他协程异常终止。
使用 try/except 进行局部处理,必要时重新抛出。
async def robust_consumer(agen):try:async for x in agen:if x is None:continueprocess(x)except SomeError:log.exception("数据流处理错误")raise


