Python 多进程:冲破 GIL 枷锁,榨干多核 CPU 的每一分性能
你的程序跑满了 CPU,但 8 个核心只有 1 个在工作?这不是硬件问题,是 Python 的 GIL(全局解释器锁)在“从中作梗”。今天我们来聊聊:如何用多进程让 Python 真正实现并行计算 。
一、一个让人沮丧的场景 你写了一个计算密集型的任务——比如计算一万以内有多少个素数。代码跑起来,CPU 占用率显示 100%,但仔细观察却发现:8 个 CPU 核心,只有一个在满负荷运转,其余 7 个几乎闲置 。
问题出在 Python 的 GIL(全局解释器锁) 。
简单说,GIL 是一个“门卫”,规定任何时候只能有一个线程执行 Python 代码 。这是 CPython 的设计选择——简化了内存管理,但代价是:多线程在 CPU 密集型任务上形同虚设 。
任务类型
多线程效果
多进程效果
CPU 密集型(计算、加密、图像处理)
❌ 无效,甚至更慢
✅ 线性加速
I/O 密集型(网络、文件、数据库)
✅ 有效
⚠️ 可用但开销大
结论很明确 :CPU 密集型任务,忘掉多线程,拥抱多进程。
二、多进程的本质:各立山头,各自为政 多进程的核心原理很简单:
独立内存空间 :每个进程有自己的 Python 解释器和内存,互不干扰
真正的并行 :操作系统将不同进程调度到不同 CPU 核心上,同时执行
代价是通信成本 :进程间不共享内存,数据传递需要额外的拷贝
一个直观的比喻 多线程模式 :4 个人围坐在一张桌子旁,但只有一支笔。谁拿到笔谁写,其他人只能干等。这就是 GIL 的效果。
多进程模式 :给每个人发一支笔、一张桌子、一间独立房间。4 个人同时开工,互不干扰。这才是真正的并行。
代码对比:验证差距
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import multiprocessingimport threadingimport timedef cpu_intensive (n ): """计算密集型任务:统计素数个数""" count = 0 for i in range (2 , n): is_prime = all (i % j != 0 for j in range (2 , int (i**0.5 ) + 1 )) if is_prime: count += 1 return count start = time.time() result = cpu_intensive(100000 ) print (f"单线程: {time.time() - start:.2 f} s" )with ThreadPoolExecutor(4 ) as executor: futures = [executor.submit(cpu_intensive, 100000 ) for _ in range (4 )] results = [f.result() for f in futures] print (f"多线程: {time.time() - start:.2 f} s" ) with multiprocessing.Pool(4 ) as pool: results = pool.map (cpu_intensive, [100000 ] * 4 ) print (f"多进程: {time.time() - start:.2 f} s" )
典型输出 :1 2 3 单线程: 2.35s 多线程: 2.78s 多进程: 0.68s
差距一目了然:多线程不仅没提速,反而更慢;多进程实现了接近 4 倍的加速。
三、核心用法:进程池 实际开发中,频繁创建和销毁进程成本很高。正确的做法是使用进程池 ——预先创建一批进程,任务来了分配,干完了回收,避免重复“招人”的开销。
基础用法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 import multiprocessingdef worker (n ): return n ** 2 if __name__ == "__main__" : with multiprocessing.Pool() as pool: results = pool.map (worker, range (100 )) async_result = pool.apply_async(worker, (10 ,)) result = async_result.get()
三个关键参数 1 2 3 4 5 6 Pool( processes=4 , maxtasksperchild=100 , initializer=init_worker, initargs=(db_config,) )
进程数多少合适?
CPU 密集型:进程数 = CPU核心数
I/O 密集型:进程数 = CPU核心数 × 2
内存密集型:进程数 = CPU核心数 ÷ 2
设置过多会导致操作系统频繁切换进程,反而降低效率。
四、进程间通信 进程不共享内存,那怎么交换数据?Python 提供了几种方式。
4.1 Queue:最常用的方式 像一条传送带,一头放数据,另一头取数据。适合生产者-消费者模式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import multiprocessingdef producer (queue, items ): for item in items: queue.put(item) queue.put(None ) def consumer (queue ): while True : item = queue.get() if item is None : break print (f"处理: {item} " ) if __name__ == "__main__" : q = multiprocessing.Queue() p1 = multiprocessing.Process(target=producer, args=(q, [1 ,2 ,3 ,4 ,5 ])) p2 = multiprocessing.Process(target=consumer, args=(q,)) p1.start(); p2.start() p1.join(); p2.join()
注意 :如果队列满了,put() 会阻塞;如果不及时取,可能造成死锁。
4.2 Pipe:轻量级双向通道 适合两个进程之间的点对点通信。
1 2 parent_conn, child_conn = multiprocessing.Pipe()
4.3 共享内存:高性能但需加锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import multiprocessingdef worker (counter, lock ): for _ in range (1000 ): with lock: counter.value += 1 if __name__ == "__main__" : counter = multiprocessing.Value('i' , 0 ) lock = multiprocessing.Lock() processes = [multiprocessing.Process(target=worker, args=(counter, lock)) for _ in range (10 )] for p in processes: p.start() for p in processes: p.join() print (counter.value)
没有 lock 保护的话,多个进程同时修改 counter.value 会导致数据错乱。
通信方式选择指南 :
场景
推荐方案
多对多通信
Queue
一对一双向通信
Pipe
高频读写简单数据
Value/Array + Lock
复杂数据结构(列表、字典)
Manager(性能最差,但最方便)
五、避坑指南 坑1:Windows 上不写 if __name__ == "__main__" 这是最经典的错误。Windows 创建新进程时会重新导入主模块,如果不加保护,程序会无限创建进程,直到系统崩溃。
1 2 3 4 5 6 7 8 p = multiprocessing.Process(target=worker) p.start() if __name__ == "__main__" : p = multiprocessing.Process(target=worker) p.start()
坑2:传递超大对象 进程间传递数据需要序列化(pickle),一个 500MB 的 DataFrame 在父子进程间传递,光序列化就可能花费数秒。
解决思路 :用共享内存代替传递,或者把数据分割后再传递。
坑3:死锁 典型场景:生产者往队列放数据,队列满了阻塞;主进程又在等待生产者结束,形成死锁。
解决 :使用不限大小的队列,或者确保消费者及时取数据。
坑4:日志混乱 多个进程同时写一个日志文件,会导致内容穿插、格式错乱。
解决 :用 QueueHandler 把日志集中到一个进程处理。
1 2 3 4 5 6 import logging.handlersqueue = multiprocessing.Queue() handler = logging.handlers.QueueHandler(queue)
六、实战:批量图片处理 下面是一个真实场景:你有 1 万张图片,每张需要缩放、加水印、调色。如何设计?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import multiprocessingfrom PIL import Imageimport osdef process_one_image (args ): """处理单张图片""" input_path, output_path, size = args try : with Image.open (input_path) as img: img.thumbnail(size) img.save(output_path, optimize=True , quality=85 ) return True , input_path except Exception as e: return False , f"{input_path} : {e} " def batch_process (input_dir, output_dir, size=(800 , 800 ) ): tasks = [] for f in os.listdir(input_dir): if f.lower().endswith(('.png' , '.jpg' )): tasks.append(( os.path.join(input_dir, f), os.path.join(output_dir, f), size )) with multiprocessing.Pool(8 ) as pool: results = pool.map (process_one_image, tasks) success = sum (1 for ok, _ in results if ok) print (f"成功: {success} /{len (tasks)} " ) if __name__ == "__main__" : batch_process("./raw" , "./processed" )
设计要点 :
每张图片独立处理 → 天然可并行
进程数设为核心数(假设 8 核)
用 pool.map 批量提交
异常捕获,单张失败不影响整体
如需进度条,可用 tqdm 配合 apply_async 回调
七、总结
要点
结论
什么时候用多进程
CPU 密集型任务(计算、加密、图像处理)
什么时候不用
I/O 密集型(用线程或协程)、任务粒度过小、需频繁通信
进程数设置
CPU 密集型 = 核心数;I/O 密集型 = 核心数 × 2
进程间通信
Queue(多对多)、Pipe(一对一)、共享内存(高频)
Windows 注意
必须用 if __name__ == "__main__" 保护
性能陷阱
避免传递大对象、避免频繁创建销毁进程
一句话总结 :GIL 让多线程在计算任务面前“徒有其表”,而多进程通过独立的内存空间和真正的并行调度,才是压榨多核 CPU 性能的正确姿势。