0%

线程池

Java 线程池详解:从原理到实践

线程池是 Java 并发编程中不可或缺的组件,它通过复用线程、统一管理任务,显著提升了多线程程序的性能和稳定性。本文将深入解析线程池的核心原理、常用类型、工作机制及实践技巧。

线程池的核心价值

在没有线程池的场景下,每次任务执行都需要创建和销毁线程,这会带来显著的性能开销(线程创建涉及操作系统内核调用)。线程池通过以下方式解决这一问题:

  • 线程复用:避免频繁创建 / 销毁线程的开销;
  • 资源管控:限制最大线程数,防止线程过多导致的系统资源竞争和阻塞;
  • 任务管理:提供任务排队、定时执行、拒绝策略等功能,简化并发编程。

线程池的核心组件与接口

Java 线程池基于 Executor 框架实现,核心接口和类的关系如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 最顶层接口:定义任务执行入口
public interface Executor {
void execute(Runnable command);
}

// 扩展 Executor,增加线程池生命周期管理和任务提交能力
public interface ExecutorService extends Executor {
void shutdown(); // 优雅关闭
List<Runnable> shutdownNow(); // 强制关闭
<T> Future<T> submit(Callable<T> task); // 提交有返回值的任务
// 其他方法...
}

// 线程池的具体实现类
public class ThreadPoolExecutor extends AbstractExecutorService {
// 核心实现...
}

Executors 工具类提供了多种预定义线程池的创建方法,但实际开发中更推荐直接使用 ThreadPoolExecutor 自定义线程池(避免资源耗尽风险)。

ThreadPoolExecutor 核心参数与工作原理

核心构造参数

1
2
3
4
5
6
7
8
9
10
11
  public ThreadPoolExecutor(int corePoolSize, // 核心线程数,决定新提交的任务是新开线程去执行还是放到任务队列中,当线程数量小于corePoolSize时才会去创建线程,如果大于corePoolSize会将任务放入workQueue队列中
int maximumPoolSize, // 最大线程数,线程池能创建的最大线程数,达到该数值后将不会创建新的线程,再添加任务则采用拒绝策略
long keepAliveTime, // 线程超过corePoolSize后,多余线程的最大闲置时间,如果超过,则会终止,但是对于核心线程,如果allowCoreThreadTimeOut(boolean value)设置为true的话,核心线程也会被终止
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 线程数如果大于corePoolSize会将任务放入workQueue队列中,存放未处理的任务的阻塞队列,常用的队列有
//1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小
//2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE
//3)SynchronousQueue:这个队列比较特殊,它不会进行存储,而是将直接新建一个线程来执行新来的任务
//4)PriorityBlockingQuene:具有优先级的无界阻塞队列
ThreadFactory threadFactory, // 生产线程的工厂类,可以用来定义线程名称以及线程的优先级
RejectedExecutionHandler handler)//拒绝策略

工作流程

线程池处理任务的逻辑如下:

  1. 当任务提交时,若当前线程数 < corePoolSize,创建核心线程执行任务;
  2. 若线程数 ≥ corePoolSize,将任务加入 workQueue 等待;
  3. workQueue 已满,且当前线程数 < maximumPoolSize,创建临时线程执行任务;
  4. 若线程数 ≥ maximumPoolSize,触发 handler 拒绝策略;
  5. 临时线程空闲时间超过 keepAliveTime 时,被销毁释放资源。

注意:若设置 allowCoreThreadTimeOut(true),核心线程也会因空闲超时被销毁。

核心方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
// 变量的高3位代表线程池的状态,后29位(从低位往高位数)代表该线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 线程池数量的位数 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程最大个数
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// RUNNING -> SHUTDOWN On invocation of shutdown(), perhaps implicitly in finalize()
// (RUNNING or SHUTDOWN) -> STOP On invocation of shutdownNow()
// SHUTDOWN -> TIDYING When both queue and pool are empty
// STOP -> TIDYING When pool is empty
// TIDYING -> TERMINATED When the terminated() hook method has completed

// 线程池状态 高3位
// 高三位111,表示运行中的状态标识,此时可以接受新任务并且执行队列中的任务 runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
// 高三位000,表示关闭中的状态标识,调用了shutdown方法,此时不再接收新任务,但是队列里的任务还得执行
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 高三位001,表示已停止的状态标识,调用了shutdownNow方法不再接受新任务,同时抛弃阻塞队列中的所有任务并中断正在执行的任务
private static final int STOP = 1 << COUNT_BITS;
// 高三位010,表示当前所有任务已经终止,任务数量为0时的状态标识,在调用shutdown方法和shutdownNow方法时都会尝试更新这个状态
private static final int TIDYING = 2 << COUNT_BITS;
// 高三位011,表示线程池已经完全终止(关闭),调用了terminated方法会更新为该状态
private static final int TERMINATED = 3 << COUNT_BITS;

// 获取运行状态 高3位
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取线程数量 低29位
private static int workerCountOf(int c) { return c & CAPACITY; }
// 计算ctl值 线程状态和线程个数 或操作
private static int ctlOf(int rs, int wc) { return rs | wc; }

// 用来保存等待任务执行的阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 可重入锁,对线程池状态等属性进行修改时需要持有该锁
private final ReentrantLock mainLock = new ReentrantLock();
// 包含了所有在工作的线程
private final HashSet<Worker> workers = new HashSet<Worker>();
// 锁条件队列,主要用于 awaitTermination 终止条件
private final Condition termination = mainLock.newCondition();
// 记录线程池最大工作线程的数量
private int largestPoolSize;
// 完成任务的数量,仅在中止工作任务时更新
private long completedTaskCount;
// 用于创建线程的工厂
private volatile ThreadFactory threadFactory;
// 拒绝策略
private volatile RejectedExecutionHandler handler;
// 空闲存活时间,如果线程池中的线程数量比核心线程数量还要多时,并且多出的这些线程都是闲置状态,该变量则是这些闲置状态的线程的存活时间
private volatile long keepAliveTime;
// 默认为 false,如果设为 true那么核心线程也会遵循 keepAliveTime的时间来做闲置处理
private volatile boolean allowCoreThreadTimeOut;
// 线程池核心线程数量
private volatile int corePoolSize;
// 线程池最大线程数量
private volatile int maximumPoolSize;

// submit是将任务进行一次封装,然后执行execute方法
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

public void execute(Runnable command) {
// command为空,则抛出空指针
if (command == null)
throw new NullPointerException();
// 获取线程池状态 和 线程数量的组合值
int c = ctl.get();
// workerCountOf方法根据ctl的低29位,得到线程池的当前线程数,如果线程数小于corePoolSize,则执行addWorker方法创建新的线程执行任务
// 判断是否创建核心线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // 创建核心线程
// 成功则直接返回
return;
// 失败需要再次获取线程池的状态
c = ctl.get();
}
// 经过上面的if语句,走到这里可能会因为两种情况1:当前工作线程数大于等于核心线程数 2:创建线程失败
// 判断线程池是否在运行
// 如果是RUNNING状态,则添加任务到阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
// 双重检查,再次获取线程池状态,插入成功再次验证线程池是否运行,防止任务入队的过程中ctl的值发生变化
int recheck = ctl.get();
//再次检查如果不在运行,移除已入队的任务,然后抛出拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果在运行,当前线程数为0,就启用一个非核心线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 加入阻塞队列失败(队列已满),尝试创建非核心线程
//如果创建非核心线程失败,直接拒绝
else if (!addWorker(command, false))
reject(command);
}


// 添加线程 core表示是否为核心线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) { // 自旋
int c = ctl.get();
// 当前线程池运行状态
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 判断线程池状态,如果线程池的状态值大于或等于SHUTDOWN,则不处理提交任务,直接返回
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 自旋,更新线程数量
for (;;) {
// 当前线程数
int wc = workerCountOf(c);
// 根据core来判断是否为核心线程
// 当前线程数已经超过容量,就直接返回
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS增加当前线程数量,更改成功结束自旋
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS操作失败了,获取最新值
c = ctl.get(); // Re-read ctl
// 检查线程池状态是不是修改了,线程池状态修改则需要重新获取线程池
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// CAS操作成功,创建线程的所有条件都满足了,可以开始创建线程来执行任务
// worker是否启动
boolean workerStarted = false;
// 是否将worker添加到workers集合中
boolean workerAdded = false;
Worker w = null;
try {
// 创建工作线程
w = new Worker(firstTask);
// 取出worker中的线程对象
final Thread t = w.thread;
if (t != null) {
// 获取线程池主锁,为了保证workers同步
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 获取当前线程池运行状态
int rs = runStateOf(ctl.get());

//小于SHUTTDOWN即RUNNING
//等于SHUTDOWN并且firstTask为null,不接受新的任务,但是会继续执行等待队列中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 判断线程是否是 alive状态
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加线程到workers中,workers是一个HashSet private final HashSet<Worker> workers = new HashSet<Worker>();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 设置当前工作者加入线程队列的已添加的标识为 true
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 线程添加线程池成功,启动新建的线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 线程启动失败
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}


Worker

Worker实现了Runnable接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable

public void run() {
runWorker(this);
}
// worker的执行方法
final void runWorker(Worker w) {
// 获取当前线程
Thread wt = Thread.currentThread();
// 获取worker的firstTask
Runnable task = w.firstTask;
w.firstTask = null;
// 释放锁,设置aqs的state为0,允许中断
w.unlock(); // allow interrupts
// 用于标识线程是否异常终止
boolean completedAbruptly = true;
try {
// 循环getTask获取任务
while (task != null || (task = getTask()) != null) {
// 获取到可执行的任务,对worker对象加锁,保证线程在执行任务过程中不会被中断
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) || // 线程池状态大于等于STOP
(Thread.interrupted() && // 线程被中断 且 是线程池内部的状态变化中断的
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted()) // 当前线程未被中断
wt.interrupt(); // 中断
try {
// 线程执行前
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 开始执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 线程执行后
afterExecute(task, thrown);
}
} finally {
task = null;
// 完成数加一
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

// 获取任务
private Runnable getTask() {
// 标识是否超时
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
// 线程池控制状态
int c = ctl.get();
// 线程池运行状态
int rs = runStateOf(c);

// 如果线程池状态大于等于STOP,或者处于SHUTDOWN状态,并且阻塞队列为空,线程池工作线程数量递减,方法返回null,回收线程
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
// 如果allowCoreThreadTimeOut为ture,或当前线程数大于核心池大小,则需要超时回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//如果worker数量大于maximumPoolSize或者超时了
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c)) //线程池工作线程数量递减,方法返回null,回收线程
return null;
continue;//线程池工作线程数量递减失败,跳过剩余部分,继续循环
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

其他方法

1
2
3
4
5
6
7
8
// 取得第一个完成任务的结果值,当第一个任务执行完成后,会调用interrupt()将其他任务中断,需要结合if(Thread.currentThread().isInterrupted())来判断
// 会阻塞
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException
// 等全部线程任务执行完毕后,取得所有完成任务的结果值
// 会阻塞
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException

常用线程池类型(Executors 工具类)

Executors 提供了便捷的线程池创建方法,但需注意其潜在风险(如无界队列可能导致 OOM):

线程池类型 核心参数 适用场景 风险提示
newFixedThreadPool(n) 核心线程 = 最大线程 = n,队列无界(LinkedBlockingQueue 执行长期稳定的任务 任务过多时队列可能耗尽内存
newSingleThreadExecutor() 核心线程 = 最大线程 = 1,队列无界 串行执行任务(保证顺序) 同上
newCachedThreadPool() 核心线程 = 0,最大线程 = Integer.MAX_VALUE,队列无界(SynchronousQueue 短期小任务(如 RPC 调用) 任务暴增时可能创建过多线程
newScheduledThreadPool(n) 核心线程 = n,支持定时 / 周期性任务(DelayedWorkQueue 定时任务(如心跳检测) 最大线程数过大可能导致问题

任务提交与线程池关闭

任务提交方式

  • execute(Runnable):无返回值,无法捕获任务异常;
  • submit(Runnable/Callable):返回 Future 对象,可通过 get() 获取结果或异常。
1
2
3
4
5
6
7
// 示例:提交有返回值的任务
ExecutorService pool = new ThreadPoolExecutor(...);
Future<Integer> future = pool.submit(() -> {
Thread.sleep(1000);
return 1 + 1;
});
System.out.println(future.get()); // 阻塞等待结果:2

线程池关闭

  • shutdown():优雅关闭,不再接收新任务,等待已提交任务执行完毕;
  • shutdownNow():强制关闭,尝试中断正在执行的任务,返回未执行的任务列表。
1
2
3
4
5
6
// 优雅关闭示例
pool.shutdown();
if (!pool.awaitTermination(1, TimeUnit.MINUTES)) {
// 超时后强制关闭
pool.shutdownNow();
}

拒绝策略(RejectedExecutionHandler)

当任务数超过线程池承载能力(maximumPoolSize + workQueue 容量)时,触发拒绝策略:

策略类型 行为描述 适用场景
AbortPolicy 抛出 RejectedExecutionException(默认策略) 需明确感知任务拒绝的场景
CallerRunsPolicy 由提交任务的线程(如主线程)执行任务 临时流量峰值,减缓提交速度
DiscardPolicy 直接丢弃任务,无任何提示 非核心任务,允许丢失
DiscardOldestPolicy 丢弃队列中最旧的任务,尝试提交新任务 任务有时间优先级(如日志)

自定义拒绝策略:实现 RejectedExecutionHandler 接口,例如记录日志后异步重试。

ForkJoinPool:分治任务的并行计算

ForkJoinPool 是 Java 7 引入的线程池,专为分治任务设计(大任务拆分为小任务,并行计算后合并结果),底层采用工作窃取算法(空闲线程主动获取其他线程的任务)。

核心使用方式

  • RecursiveTask<V>:有返回值的分治任务;
  • RecursiveAction:无返回值的分治任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 示例:计算 1~100 的和(RecursiveTask)
public class SumTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 10; // 拆分阈值
private int start;
private int end;

public SumTask(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
if (end - start <= THRESHOLD) {
// 小任务直接计算
int sum = 0;
for (int i = start; i <= end; i++) sum += i;
return sum;
} else {
// 大任务拆分
int mid = (start + end) / 2;
SumTask left = new SumTask(start, mid);
SumTask right = new SumTask(mid + 1, end);
left.fork(); // 并行执行左任务
right.fork(); // 并行执行右任务
return left.join() + right.join(); // 合并结果
}
}

public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
System.out.println(pool.invoke(new SumTask(1, 100))); // 输出:5050
}
}

线程池实践建议

  1. 避免使用 Executors 工具类:直接通过 ThreadPoolExecutor 自定义参数,明确队列大小和拒绝策略;
  2. 合理设置核心参数
    • 核心线程数:CPU 密集型任务(如计算)设为 CPU核心数 + 1,IO 密集型任务(如网络请求)设为 2 * CPU核心数
    • 队列选择:短期任务用 ArrayBlockingQueue(有界),避免 OOM;
  3. 监控线程池状态:通过 getActiveCount()getQueue().size() 等方法监控负载,动态调整参数;
  4. 自定义线程工厂:为线程命名(如 OrderProcessing-Thread-1),便于问题排查;
  5. 优雅处理任务异常:通过 afterExecute 钩子方法记录任务异常,避免异常被吞噬。

欢迎关注我的其它发布渠道

表情 | 预览
快来做第一个评论的人吧~
Powered By Valine
v1.3.10