目录
- 前言
- 一、Springboot项目如何开启异步?
- 二、存在的问题
- 三、自定义线程池
- 四、自定义线程池使用
- 五、阻塞队列和拒绝策略
前言
当开发中遇到不影响主流程任务时,使用异步去处理。
如有以下场景:
1、业务需要生成一个季度的数据进行员工排名(涉及到的数据很多),数据查询、组装、按规则排名耗时比较长,并且开发方案能接受延时查看具体排名信息数据。在数据变动时,需要调用重新排名的方法,故把排名方法设置为异步。
一、Springboot项目如何开启异步?
启动类 上添加或者 自定义线程池 上添加注解:@EnableAsync。
执行方法上加上注解 @Async。
启动类配置
Controller
Service
打印信息
到这里就可以正常使用异步了。
二、存在的问题
虽然在 Spring 框架中,@Async 注解可以用于异步执行方法。
但是Spring 会自动创建一个默认的线程池用于执行方法。这个默认线程池是由 SimpleAsyncTaskExecutor 管理的,它为每个任务创建一个新的线程。虽然这可以工作,但可能会遇到以下问题:
- 无限制的线程创建:SimpleAsyncTaskExecutor 会为每个任务创建一个新的线程,而没有最大线程数的限制。如果异步任务的数量非常多,这可能导致大量的线程被创建,消耗大量的系统资源,最终可能导致 OutOfMemoryError 或降低系统性能。
- 线程管理:由于每次调用都会创建新线程,没有线程复用,这可能会导致线程管理上的开销,尤其是在高并发场景下。
- 调试和监控困难:默认线程池创建的线程名称没有明确的命名规则,这可能会使得在日志中或监控工具中跟踪异步任务变得困难。
- 资源竞争:大量的线程可能会引起CPU和内存资源的激烈竞争,尤其是在JVM和操作系统层面上的上下文切换。
- 安全性问题:如果异步任务执行的时间过长,而默认线程池没有适当的管理策略,可能会因为线程过多而影响到系统的稳定性和安全性。
三、自定义线程池
鉴于以上问题,建议使用自定义线程池。
import cn.hutool.core.thread.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;/*** 全局异步任务配置类*/
@Slf4j
@EnableAsync
@Configuration
public class GlobalAsyncConfig implements AsyncConfigurer {// 从 application.yml 中注入配置参数@Value("${async.executor.core-pool-size:10}")private int corePoolSize;@Value("${async.executor.max-pool-size:50}")private int maxPoolSize;@Value("${async.executor.keep-alive-seconds:60}")private int keepAliveSeconds;@Value("${async.executor.queue-capacity:200}")private int queueCapacity;/*** 创建主异步线程池** @return Executor*/@Bean(name = "asyncExecutor")public Executor asyncExecutor() {return createThreadPool("async-pool-", "MainAsyncTask");}/*** 创建邮件发送专用线程池(可选扩展)** @return Executor*/@Bean(name = "emailExecutor")public Executor emailExecutor() {return createThreadPool("email-pool-", "EmailTask");}/*** 创建线程池通用方法** @param namePrefix 线程名称前缀* @param taskType 任务类型描述(用于日志区分)* @return ThreadPoolTaskExecutor*/private Executor createThreadPool(String namePrefix, String taskType) {// 使用 Spring 提供的 ThreadPoolTaskExecutor,相较于原生 ThreadPoolExecutor,// 更加适合与 Spring 的 @Async 注解配合使用,并且支持更丰富的配置选项。ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 设置核心线程数,线程池初始化时创建的线程数量executor.setCorePoolSize(corePoolSize);// 设置最大线程数,当任务队列满时,线程池最多可扩容到的线程数量executor.setMaxPoolSize(maxPoolSize);// 设置非核心线程空闲存活时间(单位为秒)executor.setKeepAliveSeconds(keepAliveSeconds);// 设置任务队列容量,用于缓存待执行的任务executor.setQueueCapacity(queueCapacity);// 设置线程工厂,用于创建具有指定命名前缀的线程,便于日志追踪和问题定位executor.setThreadFactory(createThreadFactory(namePrefix));// 设置拒绝策略:当线程池和任务队列都已满时,由调用线程(即提交任务的线程)自己执行该任务executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 允许核心线程在空闲时超时并被回收,有助于节省资源(适用于负载波动较大的场景)executor.setAllowCoreThreadTimeOut(true);// 设置线程名称前缀,方便在日志中识别不同线程池中的线程executor.setThreadNamePrefix("[" + taskType + "] ");// 必须显式调用 initialize() 来启动线程池executor.initialize();// 返回配置完成的线程池实例return executor;}/*** 创建线程工厂(统一命名格式)** @param prefix 线程名称前缀* @return ThreadFactory*/private ThreadFactory createThreadFactory(String prefix) {return new ThreadFactoryBuilder().setNamePrefix(prefix).build();}@Overridepublic Executor getAsyncExecutor() {return asyncExecutor();}@Overridepublic AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {return new GlobalAsyncUncaughtExceptionHandler();}/*** 异步任务全局异常处理器,这里可以单独放一个类文件(这里鉴于篇幅写在一起)*/@Slf4jpublic static class GlobalAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {@Overridepublic void handleUncaughtException(Throwable ex, Method method, Object... params) {log.error("[Async Task Error] Method: {}, Params: {}", method.getName(), Arrays.deepToString(params), ex);}}
}
application.yml配置(已有默认值,看个人需求配置)
# 全局线程池相关配置
async:executor:core-pool-size: 10max-pool-size: 50keep-alive-seconds: 60queue-capacity: 200
四、自定义线程池使用
在业务开发中,建议每块业务区分线程池使用,模块互不影响,便于日志收集。
五、阻塞队列和拒绝策略
Java 中常用的阻塞队列(BlockingQueue)
队列类型 | 特点 | 适用场景 |
---|---|---|
ArrayBlockingQueue | 基于数组、有界、FIFO | 内存敏感、任务量可控的系统 |
LinkedBlockingQueue | 基于链表、可有界也可无界、FIFO | 通用型,吞吐量要求较高 |
PriorityBlockingQueue | 支持优先级排序、无界 | 需要按优先级处理的任务(如报警、日志级别) |
SynchronousQueue | 不存储元素,插入必须等待取出 | 高并发、低延迟场景,任务直接由消费者线程执行 |
🚫 线程池拒绝策略(RejectedExecutionHandler)
策略类名 | 行为说明 | 使用建议 |
---|---|---|
AbortPolicy | 抛出异常 RejectedExecutionException | 默认策略,适用于不能丢失任务的场景 |
CallerRunsPolicy | 由调用线程自己执行任务 | 减缓提交速度,适合临时过载时 |
DiscardOldestPolicy | 丢弃队列中最老的任务,尝试重新提交当前任务 | 可接受部分任务丢失,希望保留最新任务 |
DiscardPolicy | 默默丢弃任务,不做任何处理 | 可容忍任务丢失,如非关键日志、监控等任务 |
默认使用的是LinkedBlockingQueue队列,建议初始化大小,防止内存溢出。