Python3 协程:异步编程的核心机制与实战
协程(Coroutine)是 Python 实现高效异步编程的核心技术,尤其适合处理 I/O 密集型任务(如网络请求、文件读写、数据库操作)。它通过「协作式调度」实现并发,避免了线程切换的开销,能在单线程内高效处理成千上万的并发任务。本文将从基础概念到实战应用,全面解析 Python3 协程的使用。
协程的核心概念:为什么需要协程?
在理解协程前,先明确它与线程、进程的区别,以及解决的核心问题:
| 并发模型 | 调度方式 | 切换开销 | 适用场景 | 缺点 |
|---|---|---|---|---|
| 进程 | 操作系统调度 | 最大 | CPU 密集型、独立内存空间 | 内存占用高,切换成本高 |
| 线程 | 操作系统调度 | 中等 | I/O 密集型、共享内存 | 受 GIL 限制(Python),切换有开销 |
| 协程 | 程序自身调度 | 最小 | 高并发 I/O 密集型 | 需手动处理异步逻辑,不适合 CPU 密集 |
协程的核心优势:
- 轻量级:一个进程可包含上千个协程,每个协程仅占用几 KB 内存(线程通常需 MB 级)。
- 无锁竞争:协程在单线程内执行,共享资源无需加锁(避免死锁问题)。
- 高效切换:协程切换由程序主动控制(如遇到 I/O 时暂停),无需操作系统介入,切换速度比线程快 100+ 倍。
Python3 协程的演进:从 yield 到 async/await
Python 协程的语法经历了多次迭代,目前推荐使用 Python3.5+ 引入的 async/await 语法(简洁、直观,是官方标准)。
| 版本 | 协程实现方式 | 特点 |
|---|---|---|
| Python3.3- | yield/yield from |
基于生成器模拟,语法繁琐 |
| Python3.5+ | async def/await |
原生协程语法,清晰直观,支持异步库 |
关键语法区别:
- 生成器协程(旧方式,不推荐):用
def定义,通过yield暂停 / 恢复,需手动处理调度。 - 原生协程(新方式,推荐):用
async def定义,通过await暂停 / 恢复,配合asyncio库实现自动调度。
原生协程基础:async/await 语法
1. 定义协程函数
用 async def 定义协程函数,调用后返回协程对象(而非直接执行函数体):
1 | # 定义协程函数 |
2. 运行协程:必须通过异步事件循环
协程不能直接调用执行,需通过 asyncio 库的事件循环(Event Loop) 调度。事件循环是协程的 “管理者”,负责:
- 启动协程
- 暂停 / 恢复协程(遇到
await时暂停,异步操作完成后恢复) - 调度多个协程并发执行
方式 1:asyncio.run()(Python3.7+ 推荐)
asyncio.run() 是简化版接口,自动创建事件循环、运行协程、关闭循环:
1 | import asyncio |
执行流程解析:
asyncio.run(hello("Async"))创建事件循环,启动协程。- 协程执行到
await asyncio.sleep(1)时,暂停自身,并将控制权交还给事件循环。 - 事件循环等待 1 秒(期间可调度其他协程),待
sleep完成后,恢复协程继续执行。 - 协程执行完毕,事件循环关闭,返回结果。
3. 并发运行多个协程
事件循环可同时调度多个协程,实现 “并发”(单线程内交替执行)。常用方式有两种:asyncio.gather() 和 asyncio.create_task()。
方式 1:asyncio.gather()(批量运行,等待所有完成)
适合已知所有协程,需等待全部执行完毕后获取结果:
1 | import asyncio |
输出与分析:
1 | 任务 A 开始,延迟 2 秒 |
- 三个任务 “并发” 执行,总耗时 ≈ 2 秒(等于最长任务的延迟),而非 2+1+1.5=4.5 秒。
asyncio.gather()会按协程传入顺序返回结果,即使任务完成顺序不同。
方式 2:asyncio.create_task()(动态创建任务,灵活控制)
适合动态添加任务(如循环创建),可单独跟踪每个任务的状态:
1 | import asyncio |
输出:
1 | 主协程:等待任务完成... |
协程的核心机制:await 关键字的作用
await 是协程的 “暂停开关”,仅能在 async def 函数内使用,作用是:
- 暂停当前协程:当执行到
await 异步对象时,当前协程释放事件循环控制权,进入 “等待” 状态。 - 等待异步操作完成:异步对象(如
asyncio.sleep()、异步网络请求)完成后,事件循环唤醒当前协程,继续执行后续代码。 - 传递结果:异步对象的返回值会作为
await表达式的结果,供协程后续使用。
哪些对象可以被 await?
只有可等待对象(Awaitable) 才能跟在 await 后,主要包括:
- 协程对象(
async def函数返回的对象) - 任务对象(
asyncio.create_task()创建的对象) - Future 对象(底层异步结果容器,较少直接使用)
错误示例:await 不能用于普通函数或同步操作(如 time.sleep()):
1 | import asyncio |
实战:协程处理异步 I/O 任务
协程的核心价值体现在异步 I/O 场景(如并发爬取网页、异步读写数据库)。以下以 “并发爬取多个网页” 为例,展示协程的高效性。
示例:异步爬取网页(使用 aiohttp 库)
requests 库是同步的,需用 aiohttp(异步 HTTP 客户端)配合协程实现异步爬取:
1. 安装依赖:
1 | pip install aiohttp # 异步 HTTP 库 |
2. 异步爬取代码:
1 | import asyncio |
输出分析:
- 5 个网页并发爬取,总耗时 ≈ 1-2 秒(取决于网络延迟),而同步爬取需 5 倍左右时间。
aiohttp.ClientSession复用 HTTP 连接,避免频繁建立连接的开销,进一步提升效率。
协程 vs 线程:如何选择?
| 对比维度 | 协程(async/await) |
线程(threading) |
|---|---|---|
| 调度方式 | 程序主动调度(协作式) | 操作系统调度(抢占式) |
| 切换开销 | 极低(内存操作) | 中等(保存线程上下文) |
| 并发能力 | 单线程支持上万协程 | 单进程支持数百线程(受内存限制) |
| 共享资源 | 无锁竞争(单线程) | 需加锁(多线程共享内存) |
| 适用场景 | I/O 密集型(网络、文件、数据库) | I/O 密集型(简单场景)、轻 CPU 密集 |
| 兼容性 | 需异步库支持(如 aiohttp、asyncpg) |
支持所有同步库(如 requests、sqlite3) |
选择建议:
- 优先用协程:处理高并发 I/O 任务(如 API 服务、爬虫、消息队列消费者),效率远超线程。
- 用线程:
- 场景简单,无需学习异步语法;
- 依赖仅支持同步的库(无法用协程改写);
- 轻量级 CPU 密集任务(如简单计算)。
- CPU 密集型任务:无论是协程还是线程,都受 GIL 限制,建议用多进程(
multiprocessing)。
常见问题与避坑指南
1. 协程中不能使用同步 I/O 库
问题:在协程中使用 requests(同步 HTTP)、time.sleep()(同步睡眠)等,会阻塞整个事件循环,导致所有协程无法并发。解决:替换为对应的异步库:
- 同步
requests→ 异步aiohttp - 同步
time.sleep()→ 异步asyncio.sleep() - 同步
sqlite3→ 异步asyncpg(PostgreSQL)/aiomysql(MySQL)
2. await 只能在 async def 函数内使用
问题:在普通 def 函数中使用 await,会报错 SyntaxError: 'await' outside async function。解决:将普通函数改为协程函数(async def),或在协程函数内调用该普通函数(若函数无异步操作)。
3. 事件循环的线程安全问题
问题:一个事件循环只能在一个线程内运行,不能在多线程中共享事件循环。解决:
- 每个线程创建独立的事件循环;
- 用
asyncio.run_coroutine_threadsafe()在其他线程调度协程(复杂场景,需谨慎)。
4. 协程的异常处理
问题:协程中未捕获的异常会导致程序崩溃,需正确处理。解决:用 try-except 捕获 await 后的异常:
1 | async def task(): |
核心语法速查表
| 功能 | 语法 / 代码示例 |
|---|---|
| 定义协程函数 | async def 函数名(参数): ... |
| 调用协程(获对象) | coro = 协程函数(参数) |
| 运行协程(Python3.7+) | asyncio.run(协程函数(参数)) |
| 暂停协程 | await 可等待对象(如 await asyncio.sleep(1)) |
| 并发运行多个协程 | await asyncio.gather(协程1, 协程2, ...) |
| 动态创建任务 | task = asyncio.create_task(协程) |
| 异步 HTTP 请求 | async with aiohttp.ClientSession() as session: await session.get(url) |