0%

Python3 线程:多任务并发编程详解

Python 线程:什么时候该用,什么时候别用

写并发代码的时候,很多人第一反应就是”开线程”。

但 Python 的线程有个特殊的地方——GIL(全局解释器锁)。这东西让 Python 同一时刻只能有一个线程在执行 Python 字节码。所以:

  • I/O 密集型任务(网络请求、文件读写、数据库查询):多线程有用,线程在等 I/O 的时候 GIL 会释放
  • CPU 密集型任务(大量计算、图像处理):多线程没用,甚至更慢

一句话:Python 多线程适合”等”的场景,不适合”算”的场景。

创建线程:两种方式,用第一种就够了

方式1:直接传函数(最常用)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import threading
import time

def worker(name, delay):
for i in range(3):
print(f"{name}{i+1} 次")
time.sleep(delay)

t1 = threading.Thread(target=worker, args=("A", 1))
t2 = threading.Thread(target=worker, args=("B", 2))

t1.start()
t2.start()

t1.join() # 等 t1 结束
t2.join() # 等 t2 结束
print("全部完成")

start() 启动线程,join() 等待线程结束。主线程如果不 join(),子线程可能还没来得及执行完,程序就退出了。

方式2:继承 Thread 类(封装复杂状态时用)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class DownloadTask(threading.Thread):
def __init__(self, url):
super().__init__()
self.url = url
self.result = None

def run(self):
# 线程启动后自动执行
print(f"下载:{self.url}")
time.sleep(2)
self.result = f"{self.url} 完成"

task = DownloadTask("https://example.com/file.zip")
task.start()
task.join()
print(task.result)

日常开发用方式1就够了,方式2在需要把线程状态封装在对象里的时候用。

守护线程:主程序退出,它也跟着退

普通线程会阻止程序退出——主线程结束了,还要等所有子线程结束才退出。

守护线程(daemon=True)不一样:主线程一退出,守护线程直接被终止。

1
2
3
4
5
6
7
8
9
10
def background():
while True:
print("后台运行...")
time.sleep(1)

t = threading.Thread(target=background, daemon=True)
t.start()

time.sleep(3)
print("主程序结束") # 3秒后退出,守护线程自动终止

什么时候用? 后台监控、心跳检测这种”主程序退出了你也没必要继续跑”的任务。

线程同步:多个线程改同一个变量,会乱

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
counter = 0

def increment():
global counter
for _ in range(100000):
counter += 1

t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
t1.start()
t2.start()
t1.join()
t2.join()

print(counter) # 期望 200000,实际可能 189xxx

counter += 1 不是原子操作(读-算-写三步),两个线程同时操作就会互相覆盖。

解决方案:用 Lock

1
2
3
4
5
6
7
8
9
counter = 0
lock = threading.Lock()

def increment():
global counter
for _ in range(100000):
with lock: # 拿到锁再操作
counter += 1
# 离开 with 自动释放锁

with lock 是最安全的写法,不用手动 acquire()release(),不会忘记释放。

线程间通信:用 Queue,别自己折腾锁

多个线程之间传数据,最省事的办法是 queue.Queue——它内部已经实现了线程安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import queue
import threading

q = queue.Queue()

def producer():
for i in range(5):
q.put(i)
print(f"生产:{i}")
q.put(None) # 结束信号

def consumer():
while True:
item = q.get()
if item is None:
break
print(f"消费:{item}")

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()

生产者-消费者模式是线程间通信最经典的场景,Queue 就是为这个设计的。

Queue 有三种:

类型 特点
Queue 先进先出(默认)
LifoQueue 后进先出(栈)
PriorityQueue 按优先级取

大部分场景用默认的 Queue 就够了。

其他同步工具:认识一下,用到的时候知道有这东西

工具 作用 什么时候用
Lock 互斥锁,同一时刻只能一个线程访问 保护共享变量
RLock 可重入锁,同一线程可多次获取 递归函数里加锁
Semaphore 信号量,限制同时访问的线程数 限流,比如最多3个线程同时请求API
Event 事件通知,一个线程等另一个线程的信号 等待初始化完成、等待任务结束
Condition 条件变量,更灵活的通知机制 生产者-消费者(但通常用 Queue 就够了)

实际开发中,Lock 和 Queue 是最常用的两个。 其他的用到的时候再查文档就行。

线程池:别手动创建一堆线程

手动创建线程没有问题,但如果你要创建几十上百个线程,用线程池更省事——复用线程,不用频繁创建销毁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from concurrent.futures import ThreadPoolExecutor

def task(n):
return n * 2

# 创建线程池,最多3个线程同时跑
with ThreadPoolExecutor(max_workers=3) as executor:
# 方式1:提交单个任务
future = executor.submit(task, 5)
print(future.result()) # 10

# 方式2:批量提交
results = executor.map(task, range(5))
print(list(results)) # [0, 2, 4, 6, 8]

with 语句会自动等待所有任务完成并清理线程池。

submit vs map

  • submit:提交单个任务,返回 Future 对象,可以单独拿结果
  • map:批量提交,返回结果的迭代器,顺序跟输入一致
1
2
3
4
5
# 获取完成的任务(谁先完成先处理谁)
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task, i) for i in range(5)]
for future in as_completed(futures):
print(future.result())

as_completed() 在需要”谁先完成先处理谁”的时候很有用。

什么时候用线程,什么时候用别的

任务类型 推荐方案 原因
网络请求、爬虫 线程 / 线程池 I/O 等待时释放 GIL
文件读写 线程 I/O 操作,线程有效
大量计算(CPU密集) 多进程(multiprocessing GIL 限制,线程无效
高并发网络服务 asyncio(异步IO) 单线程更高效
简单并发 concurrent.futures 统一接口,切换方便

一个简单的判断逻辑:

  • 任务在等外部(网络、磁盘、数据库)→ 用线程
  • 任务在算内部(循环、数学运算)→ 用多进程或异步

一个完整的例子:批量下载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from concurrent.futures import ThreadPoolExecutor
import requests
import time

urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/3",
]

def download(url):
start = time.time()
resp = requests.get(url, timeout=10)
elapsed = time.time() - start
return {"url": url, "status": resp.status_code, "time": elapsed}

# 串行
start = time.time()
for url in urls:
print(download(url))
print(f"串行耗时:{time.time() - start:.2f}s")

# 并行(线程池)
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
results = executor.map(download, urls)
for r in results:
print(r)
print(f"并行耗时:{time.time() - start:.2f}s")

4 个请求各等 1-3 秒,串行要等 7 秒,并行只等最长的那个 3 秒。

欢迎关注我的其它发布渠道