0%

多进程

Python 多进程:冲破 GIL 枷锁,榨干多核 CPU 的每一分性能

你的程序跑满了 CPU,但 8 个核心只有 1 个在工作?这不是硬件问题,是 Python 的 GIL(全局解释器锁)在“从中作梗”。今天我们来聊聊:如何用多进程让 Python 真正实现并行计算


一、一个让人沮丧的场景

你写了一个计算密集型的任务——比如计算一万以内有多少个素数。代码跑起来,CPU 占用率显示 100%,但仔细观察却发现:8 个 CPU 核心,只有一个在满负荷运转,其余 7 个几乎闲置

问题出在 Python 的 GIL(全局解释器锁)

简单说,GIL 是一个“门卫”,规定任何时候只能有一个线程执行 Python 代码。这是 CPython 的设计选择——简化了内存管理,但代价是:多线程在 CPU 密集型任务上形同虚设

任务类型 多线程效果 多进程效果
CPU 密集型(计算、加密、图像处理) ❌ 无效,甚至更慢 ✅ 线性加速
I/O 密集型(网络、文件、数据库) ✅ 有效 ⚠️ 可用但开销大

结论很明确:CPU 密集型任务,忘掉多线程,拥抱多进程。


二、多进程的本质:各立山头,各自为政

多进程的核心原理很简单:

  • 独立内存空间:每个进程有自己的 Python 解释器和内存,互不干扰
  • 真正的并行:操作系统将不同进程调度到不同 CPU 核心上,同时执行
  • 代价是通信成本:进程间不共享内存,数据传递需要额外的拷贝

一个直观的比喻

多线程模式:4 个人围坐在一张桌子旁,但只有一支笔。谁拿到笔谁写,其他人只能干等。这就是 GIL 的效果。

多进程模式:给每个人发一支笔、一张桌子、一间独立房间。4 个人同时开工,互不干扰。这才是真正的并行。

代码对比:验证差距

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import multiprocessing
import threading
import time

def cpu_intensive(n):
"""计算密集型任务:统计素数个数"""
count = 0
for i in range(2, n):
is_prime = all(i % j != 0 for j in range(2, int(i**0.5) + 1))
if is_prime:
count += 1
return count

# 单线程
start = time.time()
result = cpu_intensive(100000)
print(f"单线程: {time.time() - start:.2f}s")

# 多线程(4个)
with ThreadPoolExecutor(4) as executor:
futures = [executor.submit(cpu_intensive, 100000) for _ in range(4)]
results = [f.result() for f in futures]
print(f"多线程: {time.time() - start:.2f}s") # 约单线程的1.2倍?甚至更慢!

# 多进程(4个)
with multiprocessing.Pool(4) as pool:
results = pool.map(cpu_intensive, [100000] * 4)
print(f"多进程: {time.time() - start:.2f}s") # 约单线程的1/4时间!

典型输出

1
2
3
单线程: 2.35s
多线程: 2.78s
多进程: 0.68s

差距一目了然:多线程不仅没提速,反而更慢;多进程实现了接近 4 倍的加速。


三、核心用法:进程池

实际开发中,频繁创建和销毁进程成本很高。正确的做法是使用进程池——预先创建一批进程,任务来了分配,干完了回收,避免重复“招人”的开销。

基础用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import multiprocessing

def worker(n):
return n ** 2

if __name__ == "__main__":
# 创建进程池,默认大小 = CPU核心数
with multiprocessing.Pool() as pool:
# map:批量提交,保持顺序
results = pool.map(worker, range(100))

# apply_async:异步提交单个任务,适合需要回调的场景
async_result = pool.apply_async(worker, (10,))
result = async_result.get() # 获取结果(阻塞)

三个关键参数

1
2
3
4
5
6
Pool(
processes=4, # 进程数:通常设为CPU核心数
maxtasksperchild=100, # 每个进程最多执行100个任务后重启(防止内存泄漏)
initializer=init_worker, # 每个进程启动时的初始化函数
initargs=(db_config,) # 初始化函数的参数
)

进程数多少合适?

  • CPU 密集型:进程数 = CPU核心数
  • I/O 密集型:进程数 = CPU核心数 × 2
  • 内存密集型:进程数 = CPU核心数 ÷ 2

设置过多会导致操作系统频繁切换进程,反而降低效率。


四、进程间通信

进程不共享内存,那怎么交换数据?Python 提供了几种方式。

4.1 Queue:最常用的方式

像一条传送带,一头放数据,另一头取数据。适合生产者-消费者模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import multiprocessing

def producer(queue, items):
for item in items:
queue.put(item)
queue.put(None) # 结束信号

def consumer(queue):
while True:
item = queue.get()
if item is None:
break
print(f"处理: {item}")

if __name__ == "__main__":
q = multiprocessing.Queue()
p1 = multiprocessing.Process(target=producer, args=(q, [1,2,3,4,5]))
p2 = multiprocessing.Process(target=consumer, args=(q,))
p1.start(); p2.start()
p1.join(); p2.join()

注意:如果队列满了,put() 会阻塞;如果不及时取,可能造成死锁。

4.2 Pipe:轻量级双向通道

适合两个进程之间的点对点通信。

1
2
parent_conn, child_conn = multiprocessing.Pipe()
# parent_conn 和 child_conn 是一对,一个发一个收

4.3 共享内存:高性能但需加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import multiprocessing

def worker(counter, lock):
for _ in range(1000):
with lock:
counter.value += 1

if __name__ == "__main__":
counter = multiprocessing.Value('i', 0) # 'i' 表示整数
lock = multiprocessing.Lock()

processes = [multiprocessing.Process(target=worker, args=(counter, lock))
for _ in range(10)]
for p in processes: p.start()
for p in processes: p.join()

print(counter.value) # 10000

没有 lock 保护的话,多个进程同时修改 counter.value 会导致数据错乱。

通信方式选择指南

场景 推荐方案
多对多通信 Queue
一对一双向通信 Pipe
高频读写简单数据 Value/Array + Lock
复杂数据结构(列表、字典) Manager(性能最差,但最方便)

五、避坑指南

坑1:Windows 上不写 if __name__ == "__main__"

这是最经典的错误。Windows 创建新进程时会重新导入主模块,如果不加保护,程序会无限创建进程,直到系统崩溃。

1
2
3
4
5
6
7
8
# ❌ 错误
p = multiprocessing.Process(target=worker)
p.start()

# ✅ 正确
if __name__ == "__main__":
p = multiprocessing.Process(target=worker)
p.start()

坑2:传递超大对象

进程间传递数据需要序列化(pickle),一个 500MB 的 DataFrame 在父子进程间传递,光序列化就可能花费数秒。

解决思路:用共享内存代替传递,或者把数据分割后再传递。

坑3:死锁

典型场景:生产者往队列放数据,队列满了阻塞;主进程又在等待生产者结束,形成死锁。

解决:使用不限大小的队列,或者确保消费者及时取数据。

坑4:日志混乱

多个进程同时写一个日志文件,会导致内容穿插、格式错乱。

解决:用 QueueHandler 把日志集中到一个进程处理。

1
2
3
4
5
6
# 多进程日志的正确姿势:用一个进程专门处理日志
import logging.handlers

queue = multiprocessing.Queue()
handler = logging.handlers.QueueHandler(queue)
# 另一个进程监听队列并写入文件

六、实战:批量图片处理

下面是一个真实场景:你有 1 万张图片,每张需要缩放、加水印、调色。如何设计?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import multiprocessing
from PIL import Image
import os

def process_one_image(args):
"""处理单张图片"""
input_path, output_path, size = args
try:
with Image.open(input_path) as img:
img.thumbnail(size)
img.save(output_path, optimize=True, quality=85)
return True, input_path
except Exception as e:
return False, f"{input_path}: {e}"

def batch_process(input_dir, output_dir, size=(800, 800)):
# 1. 收集任务
tasks = []
for f in os.listdir(input_dir):
if f.lower().endswith(('.png', '.jpg')):
tasks.append((
os.path.join(input_dir, f),
os.path.join(output_dir, f),
size
))

# 2. 并行处理(8个进程)
with multiprocessing.Pool(8) as pool:
results = pool.map(process_one_image, tasks)

# 3. 统计结果
success = sum(1 for ok, _ in results if ok)
print(f"成功: {success}/{len(tasks)}")

if __name__ == "__main__":
batch_process("./raw", "./processed")

设计要点

  • 每张图片独立处理 → 天然可并行
  • 进程数设为核心数(假设 8 核)
  • pool.map 批量提交
  • 异常捕获,单张失败不影响整体
  • 如需进度条,可用 tqdm 配合 apply_async 回调

七、总结

要点 结论
什么时候用多进程 CPU 密集型任务(计算、加密、图像处理)
什么时候不用 I/O 密集型(用线程或协程)、任务粒度过小、需频繁通信
进程数设置 CPU 密集型 = 核心数;I/O 密集型 = 核心数 × 2
进程间通信 Queue(多对多)、Pipe(一对一)、共享内存(高频)
Windows 注意 必须用 if __name__ == "__main__" 保护
性能陷阱 避免传递大对象、避免频繁创建销毁进程

一句话总结:GIL 让多线程在计算任务面前“徒有其表”,而多进程通过独立的内存空间和真正的并行调度,才是压榨多核 CPU 性能的正确姿势。

欢迎关注我的其它发布渠道