0%

springboot异步执行

Spring Boot 异步执行详解:从基础配置到自定义线程池实战

在 Spring Boot 应用中,同步执行任务(如耗时的 IO 操作、第三方接口调用)会阻塞主线程,导致接口响应变慢、资源利用率降低。异步执行通过 “后台线程处理任务” 的方式,让主线程快速返回,大幅提升应用并发能力。从 “异步执行基础→开启异步→自定义线程池→异常处理→实战场景” 五个维度,系统讲解 Spring Boot 异步执行的实现方法与最佳实践,帮你高效处理耗时任务。

异步执行的核心概念与价值

1. 同步 vs 异步:执行流程对比

执行方式 核心流程 优点 缺点 适用场景
同步 主线程发起任务→等待任务完成→返回结果 逻辑简单,无需处理线程安全 阻塞主线程,并发能力低,响应慢 耗时短的任务(如内存计算、简单 DB 查询)
异步 主线程发起任务→交给后台线程执行→直接返回→后台线程完成后通知主线程(可选) 不阻塞主线程,提升并发,响应快 需处理线程安全、结果回调、异常捕获 耗时长的任务(如文件上传、邮件发送、第三方 API 调用)

2. 异步执行的核心价值

  • 提升接口响应速度:主线程无需等待耗时任务完成,快速返回 “任务已受理” 等结果,减少用户等待时间;
  • 提高资源利用率:主线程可继续处理其他请求,后台线程并行处理耗时任务,充分利用 CPU 资源;
  • 解耦任务执行与结果返回:支持 “fire-and-forget”(发送后无需关注结果)或 “回调通知”(任务完成后处理结果)两种模式,灵活应对不同场景。

Spring Boot 异步执行的基础配置

Spring Boot 基于 Spring 的 @Async 注解实现异步执行,核心步骤仅需 “开启异步支持” 和 “标注异步方法”,无需手动创建线程。

步骤 1:开启异步支持(@EnableAsync)

在 Spring Boot 主程序类或配置类上添加 @EnableAsync 注解,开启全局异步支持:

1
2
3
4
5
6
7
8
9
10
11
12
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

// 关键:@EnableAsync 开启异步执行
@EnableAsync
@SpringBootApplication
public class AsyncDemoApplication {
public static void main(String[] args) {
SpringApplication.run(AsyncDemoApplication.class, args);
}
}
注意:
  • @EnableAsync 需添加在 “能被 Spring 扫描到的类” 上(如主程序类、@Configuration 配置类);
  • 若未添加 @EnableAsync@Async 注解会失效,方法仍以同步方式执行。

步骤 2:标注异步方法(@Async)

在需要异步执行的方法上添加 @Async 注解,Spring 会自动将该方法的执行交给 “默认线程池” 处理:

示例 1:无返回值的异步方法(fire-and-forget)

适合 “无需关注结果” 的场景(如日志记录、邮件发送):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
public class AsyncTaskService {

// 关键:@Async 标注该方法为异步方法
@Async
public void sendEmail(String to, String content) {
// 模拟耗时操作(如调用邮件 API,耗时 3 秒)
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("邮件发送完成:收件人=" + to + ",内容=" + content);
}
}
示例 2:有返回值的异步方法(Future 结果)

适合 “需要获取任务结果” 的场景(如异步计算、批量数据处理),需返回 Future 或其实现类(如 CompletableFuture):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;

@Service
public class AsyncTaskService {

// 异步方法返回 CompletableFuture(支持链式调用处理结果)
@Async
public CompletableFuture<String> calculateAsync(int num) {
// 模拟耗时计算(如复杂数据处理,耗时 2 秒)
String result = "计算结果:" + (num * 10);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return CompletableFuture.failedFuture(e); // 异常时返回失败结果
}
return CompletableFuture.completedFuture(result); // 成功时返回结果
}
}
调用有返回值的异步方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@RestController
public class AsyncTaskController {

@Resource
private AsyncTaskService asyncTaskService;

@GetMapping("/calculate/{num}")
public String calculate(@PathVariable int num) throws ExecutionException, InterruptedException {
// 1. 发起异步任务(主线程不阻塞,直接返回 CompletableFuture)
CompletableFuture<String> futureResult = asyncTaskService.calculateAsync(num);

// 2. 处理异步结果(可选:主线程可继续做其他事,再获取结果)
System.out.println("主线程继续处理其他任务..."); // 立即执行,不阻塞

// 3. 获取异步结果(get() 方法会阻塞主线程,直到任务完成)
String result = futureResult.get();
return result;
}
}

3. 默认线程池的局限性

Spring Boot 异步执行的默认线程池由 SimpleAsyncTaskExecutor 实现,存在以下局限性:

  • 无线程池复用:每次执行异步任务都会创建新线程,频繁调用会导致线程爆炸,消耗系统资源;
  • 无队列缓冲:任务直接提交给新线程,无队列缓冲,高并发时易触发系统瓶颈;
  • 无异常处理:默认仅打印异常日志,无法自定义异常处理逻辑(如告警、重试)。

因此,生产环境必须自定义线程池,避免默认线程池的缺陷。

自定义异步线程池(核心实战)

Spring Boot 提供 AsyncConfigurer 接口,通过实现该接口可自定义异步线程池(Executor)和异步异常处理器(AsyncUncaughtExceptionHandler),完全掌控线程池的参数配置(核心线程数、最大线程数、队列容量等)。

1. 自定义线程池的核心参数

线程池的参数配置直接影响异步任务的执行效率和系统稳定性,关键参数如下:

参数名 作用描述 配置建议
corePoolSize 核心线程数(线程池长期保持的线程数量) 建议为 CPU 核心数 + 1(如 4 核 CPU 设为 5)
maxPoolSize 最大线程数(线程池可创建的最大线程数量) 建议为核心线程数的 2-4 倍(避免线程过多导致上下文切换开销)
queueCapacity 任务队列容量(核心线程满时,任务暂存队列) 建议为 100-1000(根据业务并发量调整)
keepAliveSeconds 空闲线程存活时间(核心线程外的线程空闲时存活时间) 建议为 60 秒(释放空闲资源)
threadNamePrefix 线程名称前缀(便于日志排查线程归属) 自定义前缀(如 my-async-thread-
rejectedExecutionHandler 任务拒绝策略(线程池满且队列满时的处理方式) 推荐 CallerRunsPolicy(主线程执行,避免任务丢失)

2. 实现 AsyncConfigurer 自定义线程池

完善自定义线程池配置(添加参数说明和最佳实践):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

// 关键:@Configuration 声明为配置类,Spring 会自动扫描并应用
@Configuration
public class CustomAsyncConfig implements AsyncConfigurer {

/**
* 自定义异步线程池
* @return 线程池实例
*/
@Override
public Executor getAsyncExecutor() {
// 1. 创建线程池实例(Spring 提供的 ThreadPoolTaskExecutor,封装了 JDK 的 ThreadPoolExecutor)
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

// 2. 配置核心参数
executor.setCorePoolSize(5); // 核心线程数:4 核 CPU 设为 5
executor.setMaxPoolSize(20); // 最大线程数:核心线程数的 4 倍
executor.setQueueCapacity(200); // 任务队列容量:200(支持 200 个任务排队)
executor.setKeepAliveSeconds(60); // 空闲线程存活时间:60 秒
executor.setThreadNamePrefix("my-async-thread-"); // 线程名称前缀:便于日志排查

// 3. 配置任务拒绝策略(核心:避免任务丢失)
// CallerRunsPolicy:线程池满时,由主线程执行任务(降低并发,但避免任务丢失)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

// 4. 配置线程池关闭时的等待策略(核心:确保未完成的任务执行完毕)
executor.setWaitForTasksToCompleteOnShutdown(true); // 关闭时等待任务完成
executor.setAwaitTerminationSeconds(30); // 等待时间:30 秒(超时后强制关闭)

// 5. 初始化线程池(必须调用,否则线程池不生效)
executor.initialize();

return executor;
}

/**
* 自定义异步任务异常处理器(处理无返回值异步方法的未捕获异常)
* @return 异常处理器实例
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> {
// 1. 打印异常日志(包含方法信息和参数,便于排查)
System.err.println("===== 异步任务异常 =====");
System.err.println("异常方法:" + method.getName());
System.err.println("方法参数:" + params.toString());
System.err.println("异常信息:" + ex.getMessage());
ex.printStackTrace();

// 2. 扩展:异常告警(如发送邮件、钉钉通知,生产环境必备)
// sendAlarmEmail("异步任务异常", ex.getMessage());
};
}

// 扩展:异常告警方法(生产环境实现)
// private void sendAlarmEmail(String title, String content) {
// // 调用邮件发送 API 实现告警
// }
}

3. 多线程池的差异化配置(进阶)

若应用中存在 “高优先级任务” 和 “低优先级任务”(如 “订单支付通知” vs “日志统计”),可配置多个线程池,通过 @Async("线程池Bean名") 指定任务使用的线程池:

示例:配置两个线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Configuration
public class MultiAsyncConfig {

/**
* 线程池 1:高优先级任务(如订单通知)
*/
@Bean("highPriorityExecutor")
public Executor highPriorityExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); // 核心线程数更多,优先处理
executor.setMaxPoolSize(30);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("high-priority-thread-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}

/**
* 线程池 2:低优先级任务(如日志统计)
*/
@Bean("lowPriorityExecutor")
public Executor lowPriorityExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3); // 核心线程数少,节省资源
executor.setMaxPoolSize(10);
executor.setQueueCapacity(500); // 队列容量大,允许更多任务排队
executor.setThreadNamePrefix("low-priority-thread-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
指定线程池执行任务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Service
public class AsyncTaskService {

// 高优先级任务:使用 "highPriorityExecutor" 线程池
@Async("highPriorityExecutor")
public void sendOrderNotification(String orderId) {
// 处理订单通知(高优先级,快速执行)
}

// 低优先级任务:使用 "lowPriorityExecutor" 线程池
@Async("lowPriorityExecutor")
public void statUserLog(String userId) {
// 处理日志统计(低优先级,允许排队)
}
}

异步执行的关键注意事项

1. 异步方法的调用限制

@Async 注解仅对 “外部调用” 生效,以下场景会导致异步失效:

  • 同一类内调用:异步方法与调用方法在同一个类中(Spring 无法通过 AOP 代理拦截);

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Service
    public class WrongAsyncService {
    // 错误:同一类内调用异步方法,@Async 失效
    public void callAsyncMethod() {
    this.asyncMethod(); // 同步执行,非异步
    }

    @Async
    public void asyncMethod() {
    // 异步逻辑
    }
    }
  • 静态方法@Async 标注在静态方法上(Spring 无法代理静态方法);

  • 非 Spring 管理的 Bean:异步方法所在的类未添加 @Service/@Component 等注解,未被 Spring 扫描为 Bean。

解决方案:
  • 同一类内调用:将异步方法抽离到独立的 @Service 类中;
  • 确保方法非静态,且类被 Spring 管理(添加 @Service/@Component)。

2. 异常处理的覆盖场景

  • 无返回值的异步方法:异常由 AsyncUncaughtExceptionHandler 处理(自定义的异常处理器);

  • 有返回值的异步方法(CompletableFuture):AsyncUncaughtExceptionHandler失效,需通过exceptionally()方法捕获异常:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    @Async
    public CompletableFuture<String> calculateAsync(int num) {
    try {
    // 耗时逻辑
    } catch (Exception e) {
    // 主动捕获异常,返回失败结果
    return CompletableFuture.failedFuture(e);
    }
    }

    // 调用时捕获异常
    public void handleAsyncResult() {
    asyncTaskService.calculateAsync(10)
    .thenAccept(result -> System.out.println("成功:" + result)) // 成功处理
    .exceptionally(ex -> { // 异常处理
    System.err.println("异步任务失败:" + ex.getMessage());
    return null;
    });
    }

3. 线程池的监控与调优

生产环境需监控线程池状态,避免线程泄漏或资源耗尽,推荐两种监控方式:

  • Spring Boot Actuator 监控:引入 spring-boot-starter-actuator 依赖,通过 /actuator/metrics 端点查看线程池指标(如 executor.activeThreadsexecutor.queueSize);

  • 自定义监控:通过线程池的getActiveCount()、getQueueSize()等方法,定时打印线程池状态:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Scheduled(fixedRate = 60000) // 每分钟打印一次
    public void monitorThreadPool() {
    ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) applicationContext.getBean("highPriorityExecutor");
    System.out.println("===== 线程池监控 =====");
    System.out.println("核心线程数:" + executor.getCorePoolSize());
    System.out.println("活跃线程数:" + executor.getActiveCount());
    System.out.println("队列任务数:" + executor.getQueueSize());
    System.out.println("已完成任务数:" + executor.getCompletedTaskCount());
    }

异步执行实战场景

场景 1:异步发送邮件(无返回值)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Service
public class EmailService {

@Resource
private JavaMailSender mailSender;

// 异步发送邮件,无需等待发送结果
@Async
public void sendAsyncEmail(String to, String subject, String content) {
SimpleMailMessage message = new SimpleMailMessage();
message.setTo(to);
message.setSubject(subject);
message.setText(content);
message.setFrom("sender@example.com");

try {
mailSender.send(message);
System.out.println("邮件已发送至:" + to);
} catch (MailException e) {
throw new RuntimeException("邮件发送失败:" + e.getMessage());
}
}
}

// 控制器调用
@RestController
public class UserController {
@Resource
private EmailService emailService;

@PostMapping("/register")
public String register(String email) {
// 1. 处理用户注册逻辑(主线程同步执行)
System.out.println("用户注册成功:" + email);

// 2. 异步发送注册邮件(主线程不阻塞,直接返回)
emailService.sendAsyncEmail(email, "注册成功", "欢迎加入我们!");

return "注册成功,邮件已发送(异步处理)";
}
}

场景 2:异步批量数据处理(有返回值)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Service
public class DataProcessService {

// 异步处理单批数据
@Async
public CompletableFuture<Long> processBatchData(List<String> dataList) {
// 模拟批量处理(耗时 1 秒)
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return CompletableFuture.failedFuture(e);
}

long processedCount = dataList.size();
return CompletableFuture.completedFuture(processedCount);
}

// 批量处理多批数据(并行执行)
public CompletableFuture<Long> processMultiBatchData(List<List<String>> multiBatchData) {
// 1. 为每批数据创建异步任务
List<CompletableFuture<Long>> futureList = multiBatchData.stream()
.map(this::processBatchData)
.toList();

// 2. 等待所有异步任务完成,汇总结果
return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]))
.thenApply(v -> futureList.stream()
.map(CompletableFuture::join) // 获取每批结果
.reduce(0L, Long::sum)); // 汇总总处理数
}
}

// 控制器调用
@RestController
public class DataController {
@Resource
private DataProcessService dataProcessService;

@PostMapping("/process-data")
public CompletableFuture<String> processData() {
// 模拟 5 批数据,每批 100 条
List<List<String>> multiBatchData = new ArrayList<>();
for (int i = 0; i < 5; i++) {
List<String> batch = new ArrayList<>();
for (int j = 0; j < 100; j++) {
batch.add("data-" + i + "-" + j);
}
multiBatchData.add(batch);
}

// 异步处理多批数据(并行执行,总耗时约 1 秒,而非 5 秒)
return dataProcessService.processMultiBatchData(multiBatchData)
.thenApply(total -> "数据处理完成,总处理数:" + total);
}
}

总结

Spring Boot 异步执行的核心是 “@EnableAsync 开启支持 + @Async 标注方法 + 自定义线程池”,关键要点可概括为:

  1. 基础配置@EnableAsync 开启异步,@Async 标注异步方法,支持无返回值和 CompletableFuture 有返回值两种模式;
  2. 线程池自定义:实现 AsyncConfigurer 接口,配置核心线程数、队列容量、拒绝策略等参数,生产环境必须自定义(避免默认线程池缺陷);
  3. 注意事项:异步方法需外部调用、非静态、被 Spring 管理;异常处理需区分 “有无返回值” 场景;
  4. 实战价值:适合处理耗时任务(邮件发送、文件上传、第三方 API 调用),提升接口响应速度和系统并发能力

欢迎关注我的其它发布渠道

表情 | 预览
快来做第一个评论的人吧~
Powered By Valine
v1.3.10