Python 线程:什么时候该用,什么时候别用 写并发代码的时候,很多人第一反应就是”开线程”。
但 Python 的线程有个特殊的地方——GIL(全局解释器锁) 。这东西让 Python 同一时刻只能有一个线程在执行 Python 字节码。所以:
I/O 密集型任务 (网络请求、文件读写、数据库查询):多线程有用,线程在等 I/O 的时候 GIL 会释放
CPU 密集型任务 (大量计算、图像处理):多线程没用,甚至更慢
一句话:Python 多线程适合”等”的场景,不适合”算”的场景。
创建线程:两种方式,用第一种就够了 方式1:直接传函数(最常用) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import threadingimport timedef worker (name, delay ): for i in range (3 ): print (f"{name} 第 {i+1 } 次" ) time.sleep(delay) t1 = threading.Thread(target=worker, args=("A" , 1 )) t2 = threading.Thread(target=worker, args=("B" , 2 )) t1.start() t2.start() t1.join() t2.join() print ("全部完成" )
start() 启动线程,join() 等待线程结束。主线程如果不 join(),子线程可能还没来得及执行完,程序就退出了。
方式2:继承 Thread 类(封装复杂状态时用) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class DownloadTask (threading.Thread): def __init__ (self, url ): super ().__init__() self .url = url self .result = None def run (self ): print (f"下载:{self.url} " ) time.sleep(2 ) self .result = f"{self.url} 完成" task = DownloadTask("https://example.com/file.zip" ) task.start() task.join() print (task.result)
日常开发用方式1就够了,方式2在需要把线程状态封装在对象里的时候用。
守护线程:主程序退出,它也跟着退 普通线程会阻止程序退出——主线程结束了,还要等所有子线程结束才退出。
守护线程(daemon=True)不一样:主线程一退出,守护线程直接被终止。
1 2 3 4 5 6 7 8 9 10 def background (): while True : print ("后台运行..." ) time.sleep(1 ) t = threading.Thread(target=background, daemon=True ) t.start() time.sleep(3 ) print ("主程序结束" )
什么时候用? 后台监控、心跳检测这种”主程序退出了你也没必要继续跑”的任务。
线程同步:多个线程改同一个变量,会乱 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 counter = 0 def increment (): global counter for _ in range (100000 ): counter += 1 t1 = threading.Thread(target=increment) t2 = threading.Thread(target=increment) t1.start() t2.start() t1.join() t2.join() print (counter)
counter += 1 不是原子操作(读-算-写三步),两个线程同时操作就会互相覆盖。
解决方案:用 Lock 1 2 3 4 5 6 7 8 9 counter = 0 lock = threading.Lock() def increment (): global counter for _ in range (100000 ): with lock: counter += 1
with lock 是最安全的写法,不用手动 acquire() 和 release(),不会忘记释放。
线程间通信:用 Queue,别自己折腾锁 多个线程之间传数据,最省事的办法是 queue.Queue——它内部已经实现了线程安全。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import queueimport threadingq = queue.Queue() def producer (): for i in range (5 ): q.put(i) print (f"生产:{i} " ) q.put(None ) def consumer (): while True : item = q.get() if item is None : break print (f"消费:{item} " ) t1 = threading.Thread(target=producer) t2 = threading.Thread(target=consumer) t1.start() t2.start() t1.join() t2.join()
生产者-消费者模式 是线程间通信最经典的场景,Queue 就是为这个设计的。
Queue 有三种:
类型
特点
Queue
先进先出(默认)
LifoQueue
后进先出(栈)
PriorityQueue
按优先级取
大部分场景用默认的 Queue 就够了。
其他同步工具:认识一下,用到的时候知道有这东西
工具
作用
什么时候用
Lock
互斥锁,同一时刻只能一个线程访问
保护共享变量
RLock
可重入锁,同一线程可多次获取
递归函数里加锁
Semaphore
信号量,限制同时访问的线程数
限流,比如最多3个线程同时请求API
Event
事件通知,一个线程等另一个线程的信号
等待初始化完成、等待任务结束
Condition
条件变量,更灵活的通知机制
生产者-消费者(但通常用 Queue 就够了)
实际开发中,Lock 和 Queue 是最常用的两个。 其他的用到的时候再查文档就行。
线程池:别手动创建一堆线程 手动创建线程没有问题,但如果你要创建几十上百个线程,用线程池更省事——复用线程,不用频繁创建销毁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from concurrent.futures import ThreadPoolExecutordef task (n ): return n * 2 with ThreadPoolExecutor(max_workers=3 ) as executor: future = executor.submit(task, 5 ) print (future.result()) results = executor.map (task, range (5 )) print (list (results))
with 语句会自动等待所有任务完成并清理线程池。
submit vs map:
submit:提交单个任务,返回 Future 对象,可以单独拿结果
map:批量提交,返回结果的迭代器,顺序跟输入一致
1 2 3 4 5 with ThreadPoolExecutor(max_workers=3 ) as executor: futures = [executor.submit(task, i) for i in range (5 )] for future in as_completed(futures): print (future.result())
as_completed() 在需要”谁先完成先处理谁”的时候很有用。
什么时候用线程,什么时候用别的
任务类型
推荐方案
原因
网络请求、爬虫
线程 / 线程池
I/O 等待时释放 GIL
文件读写
线程
I/O 操作,线程有效
大量计算(CPU密集)
多进程(multiprocessing)
GIL 限制,线程无效
高并发网络服务
asyncio(异步IO)
单线程更高效
简单并发
concurrent.futures
统一接口,切换方便
一个简单的判断逻辑:
任务在等外部(网络、磁盘、数据库)→ 用线程
任务在算内部(循环、数学运算)→ 用多进程或异步
一个完整的例子:批量下载 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from concurrent.futures import ThreadPoolExecutorimport requestsimport timeurls = [ "https://httpbin.org/delay/1" , "https://httpbin.org/delay/2" , "https://httpbin.org/delay/1" , "https://httpbin.org/delay/3" , ] def download (url ): start = time.time() resp = requests.get(url, timeout=10 ) elapsed = time.time() - start return {"url" : url, "status" : resp.status_code, "time" : elapsed} start = time.time() for url in urls: print (download(url)) print (f"串行耗时:{time.time() - start:.2 f} s" )start = time.time() with ThreadPoolExecutor(max_workers=4 ) as executor: results = executor.map (download, urls) for r in results: print (r) print (f"并行耗时:{time.time() - start:.2 f} s" )
4 个请求各等 1-3 秒,串行要等 7 秒,并行只等最长的那个 3 秒。