0%

Callable接口

Callable接口与异步任务处理:从基础到实战

在 Java 并发编程中,Callable接口与Future框架是处理异步任务的核心组件,它们弥补了Runnable接口在返回值和异常处理上的不足,为多线程任务提供了更灵活的结果获取机制。本文将深入解析CallableFutureFutureTaskCompletionService的设计原理与实战应用。

Callable 接口:超越 Runnable 的异步任务定义

Runnable 的局限性

Runnable是 Java 早期定义线程任务的接口,但其run()方法存在两大缺陷:

  • 无返回值:无法直接获取任务执行结果,需通过共享变量间接传递,代码繁琐;
  • 无异常抛出run()方法声明不允许抛出受检异常,必须在方法内部捕获处理,增加了代码复杂度。

Callable 的改进

Callable接口专为解决上述问题设计,其定义如下:

1
2
3
4
@FunctionalInterface  
public interface Callable<V> {
V call() throws Exception;
}

核心优势

  • 有返回值call()方法返回泛型V,直接承载任务结果;
  • 支持异常:声明抛出Exception,允许任务将异常传递给调用方处理;
  • 函数式接口:可配合 Lambda 表达式简化代码。

Callable 与 Runnable 的对比

特性 Runnable Callable
方法名 run() call()
返回值 void V(泛型)
异常处理 不允许抛出受检异常 允许抛出Exception
线程启动方式 直接通过Thread启动 需配合FutureTask使用

Future 接口:异步结果的生命周期管理

Callable仅定义任务,而Future接口则负责管理任务的执行过程和结果获取,是 “生产者 - 消费者” 模型中的桥梁。

Future 接口的核心方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public interface Future<V> {  
// 取消任务:mayInterruptIfRunning为true时中断执行中的线程
boolean cancel(boolean mayInterruptIfRunning);

// 判断任务是否被取消(正常结束前)
boolean isCancelled();

// 判断任务是否已完成(正常结束、异常或取消均返回true)
boolean isDone();

// 阻塞获取结果,直到任务完成
V get() throws InterruptedException, ExecutionException;

// 超时阻塞获取结果,超时未完成则抛出TimeoutException
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

关键作用

  • 非阻塞判断任务状态(isDone()isCancelled());
  • 阻塞或超时获取结果(get()方法);
  • 主动取消任务(cancel()方法)。

Future 的使用场景

  • 异步计算:主线程提交任务后继续执行,稍后获取结果;
  • 任务超时控制:通过get(timeout, unit)避免无限等待;
  • 任务中断:通过cancel(true)终止长时间未完成的任务。

FutureTask:Callable 与 Future 的桥梁

Callable无法直接通过Thread启动(Thread仅接受Runnable),而FutureTask实现了RunnableFuture<V>接口(继承RunnableFuture<V>),因此成为连接Callable与线程的适配器。

FutureTask 的类结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class FutureTask<V> implements RunnableFuture<V> {  
// 任务状态(NEW -> COMPLETING -> NORMAL/EXCEPTIONAL 等)
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1; // 结果赋值中
private static final int NORMAL = 2; // 正常完成
private static final int EXCEPTIONAL = 3; // 异常终止
private static final int CANCELLED = 4; // 已取消(未执行)
private static final int INTERRUPTING = 5; // 中断中
private static final int INTERRUPTED = 6; // 已中断

private Callable<V> callable; // 待执行的任务
private Object outcome; // 任务结果(正常结果或异常)
private volatile Thread runner; // 执行任务的线程
private volatile WaitNode waiters; // 等待结果的线程队列
}

FutureTask 的核心流程

任务执行(run()方法)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void run() {  
// 仅当状态为NEW且runner未被设置时执行(保证任务只执行一次)
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;

try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call(); // 执行Callable任务
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex); // 捕获异常并设置状态
}
if (ran)
set(result); // 设置正常结果并唤醒等待线程
}
} finally {
runner = null; // 清空执行线程
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
结果获取(get()方法)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public V get() throws InterruptedException, ExecutionException {  
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L); // 阻塞等待任务完成
return report(s); // 返回结果或抛出异常
}

private int awaitDone(boolean timed, long nanos) throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;

for (;;) {
// 响应中断
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
if (s > COMPLETING) { // 任务已完成
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // 任务正在赋值结果,让出CPU
Thread.yield();
else if (q == null)
q = new WaitNode(); // 创建等待节点
else if (!queued)
// 将当前线程加入等待队列
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
else if (timed) { // 超时等待
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this); // 无限期阻塞
}
}
结果通知(finishCompletion()方法)

任务完成后(正常或异常),finishCompletion()会唤醒所有等待结果的线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void finishCompletion() {  
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t); // 唤醒等待线程
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // 帮助GC
q = next;
}
break;
}
}
done(); // 钩子方法,可重写扩展
callable = null; // 释放资源
}

FutureTask 的使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class FutureTaskDemo {  
public static void main(String[] args) throws Exception {
// 1. 创建Callable任务
Callable<Integer> task = () -> {
Thread.sleep(1000); // 模拟耗时计算
return 100;
};

// 2. 包装为FutureTask
FutureTask<Integer> futureTask = new FutureTask<>(task);

// 3. 启动线程执行任务
new Thread(futureTask).start();

// 4. 主线程可做其他事情
System.out.println("主线程执行其他任务...");

// 5. 获取结果(阻塞直到任务完成)
int result = futureTask.get();
System.out.println("任务结果:" + result); // 输出:100
}
}

CompletionService:批量异步任务的高效处理

当需要提交多个异步任务并按完成顺序处理结果时,FutureTask的逐个get()会导致效率低下(需等待所有任务完成)。CompletionService通过整合线程池和阻塞队列,解决了这一问题。

CompletionService 的设计原理

ExecutorCompletionServiceCompletionService的唯一实现,其内部包含:

  • 一个线程池(Executor):负责执行任务;
  • 一个阻塞队列(BlockingQueue):缓存已完成的任务结果,按完成顺序排列。

核心方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ExecutorCompletionService<V> implements CompletionService<V> {  
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;

// 提交Callable任务
public Future<V> submit(Callable<V> task) { ... }

// 提交Runnable任务,result为默认返回值
public Future<V> submit(Runnable task, V result) { ... }

// 阻塞获取下一个已完成的任务结果
public Future<V> take() throws InterruptedException { ... }

// 非阻塞获取已完成的任务结果(无则返回null)
public Future<V> poll() { ... }

// 超时获取已完成的任务结果
public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { ... }
}

实战示例:按完成顺序处理结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class CompletionServiceDemo {  
public static void main(String[] args) throws Exception {
// 1. 创建线程池和CompletionService
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);

// 2. 提交5个任务(随机休眠模拟耗时)
for (int i = 0; i < 5; i++) {
final int taskId = i;
completionService.submit(() -> {
long sleep = (long) (Math.random() * 1000);
Thread.sleep(sleep);
return "任务" + taskId + "(耗时" + sleep + "ms)";
});
}

// 3. 按完成顺序获取结果
for (int i = 0; i < 5; i++) {
Future<String> future = completionService.take(); // 阻塞获取下一个完成的任务
System.out.println("处理结果:" + future.get());
}

// 4. 关闭线程池
executor.shutdown();
}
}

输出示例(顺序由任务完成时间决定):

1
2
3
4
5
处理结果:任务2(耗时120ms)  
处理结果:任务0(耗时300ms)
处理结果:任务4(耗时500ms)
处理结果:任务1(耗时700ms)
处理结果:任务3(耗时900ms)

总结与最佳实践

组件选择指南

  • 简单异步任务Callable + FutureTask
  • 批量任务按完成顺序处理ExecutorCompletionService
  • 无需结果的后台任务Runnable(更轻量)。

注意事项

  • get()方法阻塞风险:避免在主线程无超时调用get(),防止程序卡死;
  • 任务取消与中断cancel(true)仅对阻塞中的线程有效(如sleepwait),需在任务中响应中断;
  • 资源释放:使用线程池时务必调用shutdown(),避免资源泄漏。

欢迎关注我的其它发布渠道

表情 | 预览
快来做第一个评论的人吧~
Powered By Valine
v1.3.10