0%

Python3 协程:异步编程的核心机制与实战

Python3 协程:异步编程的核心机制与实战

协程(Coroutine)是 Python 实现高效异步编程的核心技术,尤其适合处理 I/O 密集型任务(如网络请求、文件读写、数据库操作)。它通过「协作式调度」实现并发,避免了线程切换的开销,能在单线程内高效处理成千上万的并发任务。本文将从基础概念到实战应用,全面解析 Python3 协程的使用。

协程的核心概念:为什么需要协程?

在理解协程前,先明确它与线程、进程的区别,以及解决的核心问题:

并发模型 调度方式 切换开销 适用场景 缺点
进程 操作系统调度 最大 CPU 密集型、独立内存空间 内存占用高,切换成本高
线程 操作系统调度 中等 I/O 密集型、共享内存 受 GIL 限制(Python),切换有开销
协程 程序自身调度 最小 高并发 I/O 密集型 需手动处理异步逻辑,不适合 CPU 密集

协程的核心优势:

  1. 轻量级:一个进程可包含上千个协程,每个协程仅占用几 KB 内存(线程通常需 MB 级)。
  2. 无锁竞争:协程在单线程内执行,共享资源无需加锁(避免死锁问题)。
  3. 高效切换:协程切换由程序主动控制(如遇到 I/O 时暂停),无需操作系统介入,切换速度比线程快 100+ 倍。

Python3 协程的演进:从 yieldasync/await

Python 协程的语法经历了多次迭代,目前推荐使用 Python3.5+ 引入的 async/await 语法(简洁、直观,是官方标准)。

版本 协程实现方式 特点
Python3.3- yield/yield from 基于生成器模拟,语法繁琐
Python3.5+ async def/await 原生协程语法,清晰直观,支持异步库

关键语法区别:

  • 生成器协程(旧方式,不推荐):用 def 定义,通过 yield 暂停 / 恢复,需手动处理调度。
  • 原生协程(新方式,推荐):用 async def 定义,通过 await 暂停 / 恢复,配合 asyncio 库实现自动调度。

原生协程基础:async/await 语法

1. 定义协程函数

async def 定义协程函数,调用后返回协程对象(而非直接执行函数体):

1
2
3
4
5
6
7
8
9
10
11
# 定义协程函数
async def hello(name):
print(f"Hello, {name} (协程开始)")
# 模拟 I/O 操作(如网络请求、文件读写),此处用 asyncio.sleep 代替
await asyncio.sleep(1) # await 暂停协程,等待异步操作完成
print(f"Hello, {name} (协程结束)")
return f"Result: {name}"

# 调用协程函数:返回协程对象,不执行函数体
coro = hello("Python")
print(type(coro)) # 输出:<class 'coroutine'>

2. 运行协程:必须通过异步事件循环

协程不能直接调用执行,需通过 asyncio 库的事件循环(Event Loop) 调度。事件循环是协程的 “管理者”,负责:

  • 启动协程
  • 暂停 / 恢复协程(遇到 await 时暂停,异步操作完成后恢复)
  • 调度多个协程并发执行
方式 1:asyncio.run()(Python3.7+ 推荐)

asyncio.run() 是简化版接口,自动创建事件循环、运行协程、关闭循环:

1
2
3
4
5
6
7
8
9
10
11
import asyncio

async def hello(name):
print(f"Hello, {name} (开始)")
await asyncio.sleep(1) # 模拟 I/O 等待,释放事件循环
print(f"Hello, {name} (结束)")
return f"Result: {name}"

# 运行协程(自动管理事件循环)
result = asyncio.run(hello("Async"))
print(result) # 输出:Result: Async
执行流程解析:
  1. asyncio.run(hello("Async")) 创建事件循环,启动协程。
  2. 协程执行到 await asyncio.sleep(1) 时,暂停自身,并将控制权交还给事件循环。
  3. 事件循环等待 1 秒(期间可调度其他协程),待 sleep 完成后,恢复协程继续执行。
  4. 协程执行完毕,事件循环关闭,返回结果。

3. 并发运行多个协程

事件循环可同时调度多个协程,实现 “并发”(单线程内交替执行)。常用方式有两种:asyncio.gather()asyncio.create_task()

方式 1:asyncio.gather()(批量运行,等待所有完成)

适合已知所有协程,需等待全部执行完毕后获取结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio

async def task(name, delay):
print(f"任务 {name} 开始,延迟 {delay} 秒")
await asyncio.sleep(delay) # 模拟 I/O 操作
print(f"任务 {name} 结束")
return f"任务 {name} 结果"

async def main():
# 批量创建协程,并发运行
results = await asyncio.gather(
task("A", 2), # 延迟 2 秒
task("B", 1), # 延迟 1 秒
task("C", 1.5) # 延迟 1.5 秒
)
print("所有任务结果:", results)

# 运行主协程
asyncio.run(main())
输出与分析:
1
2
3
4
5
6
7
任务 A 开始,延迟 2 秒
任务 B 开始,延迟 1 秒
任务 C 开始,延迟 1.5 秒
任务 B 结束 # 1 秒后先完成
任务 C 结束 # 1.5 秒后完成
任务 A 结束 # 2 秒后完成
所有任务结果: ['任务 A 结果', '任务 B 结果', '任务 C 结果']
  • 三个任务 “并发” 执行,总耗时 ≈ 2 秒(等于最长任务的延迟),而非 2+1+1.5=4.5 秒。
  • asyncio.gather() 会按协程传入顺序返回结果,即使任务完成顺序不同。
方式 2:asyncio.create_task()(动态创建任务,灵活控制)

适合动态添加任务(如循环创建),可单独跟踪每个任务的状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio

async def task(name, delay):
print(f"任务 {name} 开始,延迟 {delay} 秒")
await asyncio.sleep(delay)
print(f"任务 {name} 结束")
return f"任务 {name} 结果"

async def main():
# 1. 创建任务(添加到事件循环)
task1 = asyncio.create_task(task("A", 2))
task2 = asyncio.create_task(task("B", 1))

# 2. 可在任务运行中做其他操作
print("主协程:等待任务完成...")

# 3. 等待任务完成,获取结果
result1 = await task1
result2 = await task2

print("任务 1 结果:", result1)
print("任务 2 结果:", result2)

asyncio.run(main())
输出:
1
2
3
4
5
6
7
主协程:等待任务完成...
任务 A 开始,延迟 2 秒
任务 B 开始,延迟 1 秒
任务 B 结束
任务 A 结束
任务 1 结果: 任务 A 结果
任务 2 结果: 任务 B 结果

协程的核心机制:await 关键字的作用

await 是协程的 “暂停开关”,仅能在 async def 函数内使用,作用是:

  1. 暂停当前协程:当执行到 await 异步对象 时,当前协程释放事件循环控制权,进入 “等待” 状态。
  2. 等待异步操作完成:异步对象(如 asyncio.sleep()、异步网络请求)完成后,事件循环唤醒当前协程,继续执行后续代码。
  3. 传递结果:异步对象的返回值会作为 await 表达式的结果,供协程后续使用。

哪些对象可以被 await

只有可等待对象(Awaitable) 才能跟在 await 后,主要包括:

  • 协程对象(async def 函数返回的对象)
  • 任务对象(asyncio.create_task() 创建的对象)
  • Future 对象(底层异步结果容器,较少直接使用)

错误示例await 不能用于普通函数或同步操作(如 time.sleep()):

1
2
3
4
5
6
7
8
9
import asyncio
import time

async def bad_task():
# 错误:time.sleep() 是同步函数,不能被 await
# await time.sleep(1) # 报错:TypeError: object NoneType can't be used in 'await' expression

# 正确:用 asyncio.sleep()(异步版本)
await asyncio.sleep(1)

实战:协程处理异步 I/O 任务

协程的核心价值体现在异步 I/O 场景(如并发爬取网页、异步读写数据库)。以下以 “并发爬取多个网页” 为例,展示协程的高效性。

示例:异步爬取网页(使用 aiohttp 库)

requests 库是同步的,需用 aiohttp(异步 HTTP 客户端)配合协程实现异步爬取:

1. 安装依赖:
1
pip install aiohttp  # 异步 HTTP 库
2. 异步爬取代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import asyncio
import aiohttp

# 异步爬取单个网页
async def fetch_url(session, url):
try:
# 异步发送 HTTP 请求(await 等待响应)
async with session.get(url, timeout=5) as response:
status = response.status
content_length = response.headers.get("Content-Length", "未知")
return f"URL: {url} | 状态码: {status} | 内容长度: {content_length}"
except Exception as e:
return f"URL: {url} | 爬取失败: {str(e)}"

# 主协程:并发爬取多个网页
async def main():
# 待爬取的 URL 列表
urls = [
"https://www.baidu.com",
"https://www.github.com",
"https://www.python.org",
"https://www.csdn.net",
"https://www.zhihu.com"
]

# 创建异步 HTTP 会话(复用连接,提高效率)
async with aiohttp.ClientSession() as session:
# 1. 批量创建任务(并发爬取)
tasks = [asyncio.create_task(fetch_url(session, url)) for url in urls]

# 2. 等待所有任务完成,获取结果
results = await asyncio.gather(*tasks)

# 3. 打印结果
for result in results:
print(result)

# 运行主协程
if __name__ == "__main__":
import time
start_time = time.time()
asyncio.run(main())
end_time = time.time()
print(f"总耗时:{end_time - start_time:.2f} 秒")
输出分析:
  • 5 个网页并发爬取,总耗时 ≈ 1-2 秒(取决于网络延迟),而同步爬取需 5 倍左右时间。
  • aiohttp.ClientSession 复用 HTTP 连接,避免频繁建立连接的开销,进一步提升效率。

协程 vs 线程:如何选择?

对比维度 协程(async/await 线程(threading
调度方式 程序主动调度(协作式) 操作系统调度(抢占式)
切换开销 极低(内存操作) 中等(保存线程上下文)
并发能力 单线程支持上万协程 单进程支持数百线程(受内存限制)
共享资源 无锁竞争(单线程) 需加锁(多线程共享内存)
适用场景 I/O 密集型(网络、文件、数据库) I/O 密集型(简单场景)、轻 CPU 密集
兼容性 需异步库支持(如 aiohttpasyncpg 支持所有同步库(如 requestssqlite3

选择建议:

  1. 优先用协程:处理高并发 I/O 任务(如 API 服务、爬虫、消息队列消费者),效率远超线程。
  2. 用线程:
    • 场景简单,无需学习异步语法;
    • 依赖仅支持同步的库(无法用协程改写);
    • 轻量级 CPU 密集任务(如简单计算)。
  3. CPU 密集型任务:无论是协程还是线程,都受 GIL 限制,建议用多进程(multiprocessing)。

常见问题与避坑指南

1. 协程中不能使用同步 I/O 库

问题:在协程中使用 requests(同步 HTTP)、time.sleep()(同步睡眠)等,会阻塞整个事件循环,导致所有协程无法并发。解决:替换为对应的异步库:

  • 同步 requests → 异步 aiohttp
  • 同步 time.sleep() → 异步 asyncio.sleep()
  • 同步 sqlite3 → 异步 asyncpg(PostgreSQL)/ aiomysql(MySQL)

2. await 只能在 async def 函数内使用

问题:在普通 def 函数中使用 await,会报错 SyntaxError: 'await' outside async function解决:将普通函数改为协程函数(async def),或在协程函数内调用该普通函数(若函数无异步操作)。

3. 事件循环的线程安全问题

问题:一个事件循环只能在一个线程内运行,不能在多线程中共享事件循环。解决

  • 每个线程创建独立的事件循环;
  • asyncio.run_coroutine_threadsafe() 在其他线程调度协程(复杂场景,需谨慎)。

4. 协程的异常处理

问题:协程中未捕获的异常会导致程序崩溃,需正确处理。解决:用 try-except 捕获 await 后的异常:

1
2
3
4
5
6
7
8
async def task():
try:
await asyncio.sleep(1)
raise ValueError("自定义异常")
except ValueError as e:
print(f"捕获异常:{e}")

asyncio.run(task()) # 输出:捕获异常:自定义异常

核心语法速查表

功能 语法 / 代码示例
定义协程函数 async def 函数名(参数): ...
调用协程(获对象) coro = 协程函数(参数)
运行协程(Python3.7+) asyncio.run(协程函数(参数))
暂停协程 await 可等待对象(如 await asyncio.sleep(1)
并发运行多个协程 await asyncio.gather(协程1, 协程2, ...)
动态创建任务 task = asyncio.create_task(协程)
异步 HTTP 请求 async with aiohttp.ClientSession() as session: await session.get(url)

欢迎关注我的其它发布渠道