0%

Python3 线程:多任务并发编程详解

Python3 线程:多任务并发编程详解

线程(Thread)是 Python 实现并发编程的重要方式,适合处理 I/O 密集型任务(如网络请求、文件读写)。Python3 通过 threading 模块提供了完善的线程支持,本文将详细介绍线程的创建、管理、同步及实战应用。

线程的基本概念

  • 线程:操作系统调度的最小单位,属于进程的一部分,多个线程共享进程的内存空间。
  • 并发:多个任务交替执行(单核 CPU 模拟多任务)。
  • 并行:多个任务同时执行(多核 CPU 真正同时运行)。
  • Python 线程特点:
    • 受 GIL(全局解释器锁)限制,CPU 密集型任务多线程效率可能不如单线程;
    • I/O 密集型任务(如网络请求、文件读写)能显著提升效率。

线程的创建与启动

Python3 中创建线程主要有两种方式:使用 threading.Thread继承 threading.Thread 类重写 run() 方法

方式一:直接使用 threading.Thread

通过 threading.Thread(target=函数名, args=参数) 创建线程对象,调用 start() 启动线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading
import time

# 定义线程执行的函数
def task(name, delay):
for i in range(3):
print(f"线程 {name} 执行第 {i+1} 次,时间:{time.ctime()}")
time.sleep(delay) # 模拟I/O操作(如网络请求)

# 创建线程对象
t1 = threading.Thread(target=task, args=("Thread-1", 1)) # args为元组,传递参数
t2 = threading.Thread(target=task, args=("Thread-2", 2))

# 启动线程(自动调用target函数)
t1.start()
t2.start()

# 等待线程结束(主程序阻塞,直到t1、t2执行完毕)
t1.join()
t2.join()

print("所有线程执行完毕")

输出说明:两个线程交替执行,Thread-1 间隔 1 秒,Thread-2 间隔 2 秒,体现并发特性。

方式二:继承 threading.Thread

重写 run() 方法(线程执行的核心逻辑),通过 start() 启动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import threading
import time

class MyThread(threading.Thread):
def __init__(self, name, delay):
super().__init__() # 调用父类构造方法
self.name = name
self.delay = delay

# 重写run()方法,线程启动后自动执行
def run(self):
for i in range(3):
print(f"线程 {self.name} 执行第 {i+1} 次,时间:{time.ctime()}")
time.sleep(self.delay)

# 创建线程实例
t1 = MyThread("Thread-1", 1)
t2 = MyThread("Thread-2", 2)

# 启动线程
t1.start()
t2.start()

# 等待线程结束
t1.join()
t2.join()

print("所有线程执行完毕")

线程的核心方法与属性

方法 / 属性 说明
start() 启动线程,自动调用 run() 方法(不能重复调用)
run() 线程执行的核心逻辑,可重写
join([timeout]) 阻塞当前线程,等待被调用线程执行完毕(timeout 为最大等待秒数)
is_alive() 判断线程是否在运行中
getName()/setName() 获取 / 设置线程名称
ident 线程标识符(整数),线程启动后有效,结束后为 None
daemon 布尔值,设置为 True 时为守护线程(主程序结束后自动退出,需在 start() 前设置)

示例:守护线程(后台线程)

守护线程用于服务其他线程(如日志记录),主程序结束后自动退出,无需等待其完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import threading
import time

def daemon_task():
while True: # 无限循环,模拟后台服务
print("守护线程运行中...")
time.sleep(1)

# 创建守护线程
daemon_thread = threading.Thread(target=daemon_task)
daemon_thread.daemon = True # 设置为守护线程(必须在start()前)

daemon_thread.start()

# 主程序执行3秒后结束
time.sleep(3)
print("主程序结束,守护线程自动退出")

输出:主程序结束后,守护线程的无限循环会被强制终止。

线程同步:解决资源竞争

多个线程共享全局变量时,可能因资源竞争导致数据不一致。需通过同步机制(如锁、信号量)保证操作的原子性。

1. 锁(threading.Lock

最常用的同步方式,通过 acquire() 获取锁,release() 释放锁,确保同一时间只有一个线程执行临界区代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import threading

# 全局变量(共享资源)
count = 0
lock = threading.Lock() # 创建锁对象

def increment():
global count
for _ in range(1000000):
# 获取锁(若已被占用则阻塞等待)
lock.acquire()
try:
count += 1 # 临界区:需要同步的操作
finally:
# 确保锁释放,避免死锁
lock.release()

# 创建两个线程,同时修改count
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)

t1.start()
t2.start()
t1.join()
t2.join()

print(f"最终count值:{count}") # 若不加锁,结果可能小于2000000;加锁后正确为2000000

说明count += 1 实际包含 “读取 - 修改 - 写入” 三步,不加锁时可能因线程交替执行导致数据错误,加锁后确保操作完整执行。

2. 可重入锁(threading.RLock

允许同一线程多次获取锁(避免递归调用时死锁),需释放相同次数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading

rlock = threading.RLock()

def inner():
rlock.acquire()
print("inner 获得锁")
rlock.release()

def outer():
rlock.acquire()
print("outer 获得锁")
inner() # 同一线程再次获取锁
rlock.release()

t = threading.Thread(target=outer)
t.start()
t.join()

3. 其他同步工具

工具 用途
threading.Event 线程间通信,通过 set() 发送信号,wait() 等待信号
threading.Semaphore 控制同时访问资源的线程数量(如限制并发连接数)
threading.Condition 更灵活的条件同步,支持 wait()/notify()/notify_all() 机制

线程池:高效管理线程

频繁创建 / 销毁线程会消耗资源,线程池可预先创建固定数量的线程,重复利用以提高效率。Python3 通过 concurrent.futures.ThreadPoolExecutor 实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import time

def task(num):
print(f"任务 {num} 开始,线程:{threading.current_thread().name}")
time.sleep(2) # 模拟I/O操作
print(f"任务 {num} 结束")
return num * 2 # 返回结果

# 创建线程池(最多3个线程)
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交5个任务
futures = [executor.submit(task, i) for i in range(5)]

# 获取任务结果(按完成顺序)
for future in as_completed(futures):
result = future.result()
print(f"任务结果:{result}")

优势

  • 自动管理线程生命周期,避免频繁创建开销;
  • 方便获取任务结果(future.result());
  • 支持批量提交任务,简化代码。

线程 vs 进程:如何选择?

场景 推荐使用 原因分析
I/O 密集型任务 线程(Thread) 线程切换成本低,适合等待时间长的任务(如网络请求)
CPU 密集型任务 进程(Process) 避开 GIL 限制,利用多核 CPU 并行计算
共享数据频繁 线程 线程共享内存空间,数据交互方便
数据隔离要求高 进程 进程内存独立,互不干扰,安全性高

常见问题与避坑指南

  1. GIL 导致的 CPU 密集型性能问题Python 的 GIL 限制同一时间只有一个线程执行 Python 字节码,因此多线程不适合 CPU 密集型任务(如大规模计算),建议用多进程(multiprocessing)。

  2. 死锁风险多个线程互相等待对方释放锁会导致死锁,避免方式:

    • 按固定顺序获取锁;
    • 使用 with 语句自动管理锁(with lock: ... 替代 acquire()/release())。
    1
    2
    3
    # 安全的锁使用方式(自动释放)
    with lock:
    count += 1 # 临界区操作
  1. 全局变量的线程安全非原子操作(如 count += 1、列表 append()pop())需加锁保护,原子操作(如简单赋值 a = 1)无需同步。

实战案例:多线程爬取网页

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import threading
import requests
from concurrent.futures import ThreadPoolExecutor

# 爬取网页内容
def fetch_url(url):
try:
response = requests.get(url, timeout=5)
return f"{url} 状态码:{response.status_code}"
except Exception as e:
return f"{url} 爬取失败:{str(e)}"

# 待爬取的URL列表
urls = [
"https://www.baidu.com",
"https://www.github.com",
"https://www.python.org",
"https://www.csdn.net"
]

# 使用线程池并发爬取
with ThreadPoolExecutor(max_workers=2) as executor:
results = executor.map(fetch_url, urls) # 批量提交任务

for result in results:
print(result)

说明:网络请求属于 I/O 密集型任务,多线程能显著减少总耗时(无需等待前一个请求完成再发起下一个)。

总结

Python3 线程是处理并发任务的重要工具,尤其适合 I/O 密集型场景。核心要点:

  • 通过 threading.Thread 或继承类创建线程,start() 启动,join() 等待结束;
  • 用锁(Lock)解决共享资源竞争问题;
  • 线程池(ThreadPoolExecutor)高效管理线程,推荐优先使用;
  • 避开 GIL 限制,CPU 密集型任务优先选多进程

欢迎关注我的其它发布渠道