一、异步编程基础概念
1.1 同步 vs 异步
特性 | 同步 | 异步 |
---|---|---|
执行方式 | 顺序执行,阻塞调用 | 非阻塞,调用后立即返回 |
线程使用 | 单线程完成所有任务 | 多线程并行处理 |
响应性 | 较差,需等待前任务完成 | 较好,可立即响应新请求 |
复杂度 | 简单直观 | 较复杂,需处理线程安全 |
适用场景 | 简单流程,短时间任务 | IO密集型,长时间任务 |
通俗理解:同步就像在银行柜台排队办理业务,必须等前面的人办完才能轮到你;异步则像取号后可以坐着玩手机,等叫号时再去办理。
1.2 为什么要使用异步
- 提高吞吐量:服务器能同时处理更多请求
- 增强用户体验:避免用户长时间等待
- 资源优化:合理利用系统资源,避免阻塞主线程
- 解耦:将耗时操作与主流程分离
1.3 Java中的异步编程方式
// 1. 传统Thread方式
new Thread(() -> {// 异步任务
}).start();// 2. Future模式
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(() -> {Thread.sleep(1000);return "Result";
});// 3. CompletableFuture (Java8+)
CompletableFuture.supplyAsync(() -> {// 异步任务return "Result";
}).thenAccept(result -> {// 处理结果
});// 4. Spring @Async (本文重点)
@Async
public void asyncMethod() {// 异步方法体
}
二、@Async基础使用
2.1 启用@Async支持
步骤1:在Spring Boot主类或配置类上添加@EnableAsync
@SpringBootApplication
@EnableAsync // 启用异步支持
public class AsyncApplication {public static void main(String[] args) {SpringApplication.run(AsyncApplication.class, args);}
}
步骤2:创建异步服务类
@Service
public class EmailService {// 无返回值异步方法@Asyncpublic void sendEmail(String to, String content) {// 模拟邮件发送耗时try {Thread.sleep(3000);System.out.println("邮件已发送至: " + to + ", 内容: " + content);} catch (InterruptedException e) {e.printStackTrace();}}// 有返回值异步方法@Asyncpublic Future<String> sendEmailWithResult(String to, String content) {try {Thread.sleep(3000);String result = "邮件已发送至: " + to;return new AsyncResult<>(result);} catch (InterruptedException e) {return new AsyncResult<>("发送失败");}}
}
2.2 调用异步方法
@RestController
@RequestMapping("/api/email")
public class EmailController {@Autowiredprivate EmailService emailService;@GetMapping("/send")public String sendEmail() {long start = System.currentTimeMillis();// 调用异步方法emailService.sendEmail("user@example.com", "您的订单已创建");long elapsed = System.currentTimeMillis() - start;return "请求已处理,耗时: " + elapsed + "ms"; // 立即返回,不会等待邮件发送完成}@GetMapping("/send-with-result")public String sendEmailWithResult() throws Exception {Future<String> future = emailService.sendEmailWithResult("user@example.com", "订单详情");// 可以在这里做其他事情// 当需要结果时(阻塞等待)String result = future.get();return "处理结果: " + result;}
}
2.3 @Async方法限制
- 必须public修饰:因为基于Spring AOP实现
- 同类调用无效:
this.asyncMethod()
不会异步执行 - 返回值限制:
- void
- Future及其子类(如AsyncResult)
- CompletableFuture (Spring 4.2+)
- ListenableFuture (Spring 4.2+)
三、线程池配置详解
3.1 默认线程池问题
Spring默认使用SimpleAsyncTaskExecutor
,它不重用线程,每次调用都创建新线程,生产环境不推荐使用。
3.2 自定义线程池配置
方式1:配置类方式(推荐)
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {@Overridepublic Executor getAsyncExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 核心线程数:线程池创建时初始化的线程数executor.setCorePoolSize(5);// 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程executor.setMaxPoolSize(10);// 缓冲队列:用来缓冲执行任务的队列executor.setQueueCapacity(50);// 线程名前缀executor.setThreadNamePrefix("Async-Executor-");// 线程池关闭时等待所有任务完成executor.setWaitForTasksToCompleteOnShutdown(true);// 等待时间executor.setAwaitTerminationSeconds(60);// 拒绝策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}@Overridepublic AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {return new CustomAsyncExceptionHandler();}
}// 自定义异常处理器
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {@Overridepublic void handleUncaughtException(Throwable ex, Method method, Object... params) {System.err.println("异步任务异常 - 方法: " + method.getName());System.err.println("异常信息: " + ex.getMessage());// 这里可以添加自定义处理逻辑,如记录日志、发送告警等}
}
方式2:使用@Bean定义多个线程池
@Configuration
public class TaskExecutorConfig {@Bean(name = "emailExecutor")public Executor emailTaskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(3);executor.setMaxPoolSize(5);executor.setQueueCapacity(30);executor.setThreadNamePrefix("Email-Executor-");executor.initialize();return executor;}@Bean(name = "reportExecutor")public Executor reportTaskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(2);executor.setMaxPoolSize(4);executor.setQueueCapacity(20);executor.setThreadNamePrefix("Report-Executor-");executor.initialize();return executor;}
}
使用指定线程池:
@Async("emailExecutor")
public void sendEmail(String to) { /*...*/ }@Async("reportExecutor")
public void generateReport() { /*...*/ }
3.3 线程池参数详解
参数名 | 说明 | 推荐设置建议 |
---|---|---|
corePoolSize | 核心线程数,即使空闲也不会被回收 | CPU密集型:CPU核数+1 IO密集型:2*CPU核数 |
maxPoolSize | 最大线程数,当队列满时才会创建新线程直到此值 | 建议为核心线程数的2-3倍 |
queueCapacity | 任务队列容量,超过核心线程数的任务会进入队列 | 根据业务量调整,太大可能导致OOM |
keepAliveSeconds | 非核心线程空闲存活时间(秒) | 60-300秒 |
threadNamePrefix | 线程名前缀,便于监控和日志追踪 | 建议按业务命名,如"Order-Async-" |
allowCoreThreadTimeOut | 是否允许核心线程超时退出 | 默认false,长时间空闲应用可设为true |
waitForTasksToCompleteOnShutdown | 应用关闭时是否等待异步任务完成 | 生产环境建议true |
awaitTerminationSeconds | 等待任务完成的超时时间 | 根据业务最长执行时间设置 |
rejectedExecutionHandler | 拒绝策略,当线程池和队列都满时的处理方式 | 根据业务需求选择 |
拒绝策略选项:
AbortPolicy
:默认,直接抛出RejectedExecutionExceptionCallerRunsPolicy
:由调用者线程执行该任务DiscardPolicy
:直接丢弃任务DiscardOldestPolicy
:丢弃队列中最老的任务并重试
3.4 线程池监控
@Service
public class ThreadPoolMonitor {@Autowiredprivate ThreadPoolTaskExecutor emailExecutor;@Scheduled(fixedRate = 5000) // 每5秒监控一次public void monitor() {System.out.println("=== 线程池监控 ===");System.out.println("当前线程数: " + emailExecutor.getPoolSize());System.out.println("活跃线程数: " + emailExecutor.getActiveCount());System.out.println("完成任务数: " + emailExecutor.getCompletedTaskCount());System.out.println("队列剩余容量: " + emailExecutor.getThreadPoolExecutor().getQueue().remainingCapacity());}
}
四、@Async高级特性
4.1 返回值处理
1. Future模式
@Async
public Future<String> processData(String input) {// 模拟处理耗时try {Thread.sleep(2000);return new AsyncResult<>("处理完成: " + input.toUpperCase());} catch (InterruptedException e) {Thread.currentThread().interrupt();return new AsyncResult<>("处理中断");}
}// 调用方
Future<String> future = service.processData("hello");
// 可以做其他事情...
String result = future.get(3, TimeUnit.SECONDS); // 带超时的等待
2. CompletableFuture (Java8+)
@Async
public CompletableFuture<String> fetchData(String param) {return CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);return "Data for " + param;} catch (InterruptedException e) {throw new RuntimeException(e);}});
}// 链式调用
service.fetchData("user123").thenApply(String::toUpperCase).thenAccept(System.out::println).exceptionally(ex -> {System.err.println("Error: " + ex.getMessage());return null;});
4.2 基于条件的异步执行
1. 使用Spring Expression Language (SpEL)
@Async("#{systemProperties['async.enabled'] ? 'emailExecutor' : 'syncExecutor'}")
public void conditionalAsync() {// 根据系统属性决定使用哪个执行器
}
2. 基于配置的开关
@Async
@ConditionalOnProperty(name = "app.async.enabled", havingValue = "true")
public void configBasedAsync() {// 当app.async.enabled=true时才异步执行
}
4.3 事务处理
异步方法与事务的特殊关系:
- 事务边界:
@Async
方法会在新线程中执行,与原方法不在同一事务中 - 传播行为:需要在异步方法上单独声明
@Transactional
@Async
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void asyncWithTransaction() {// 这个方法会在新事务中执行userRepository.save(new User("AsyncUser"));// 如果发生异常,只会回滚当前方法内的操作
}
4.4 组合异步操作
场景:需要等待多个异步任务全部完成
@Async
public CompletableFuture<String> fetchUserInfo(String userId) {// 模拟获取用户信息return CompletableFuture.completedFuture("UserInfo-" + userId);
}@Async
public CompletableFuture<String> fetchOrderInfo(String userId) {// 模拟获取订单信息return CompletableFuture.completedFuture("OrderInfo-" + userId);
}// 组合多个异步任务
public CompletableFuture<Void> combineAsyncTasks(String userId) {return CompletableFuture.allOf(fetchUserInfo(userId),fetchOrderInfo(userId)).thenRun(() -> {// 所有任务完成后的处理System.out.println("所有异步任务已完成");});
}
五、异常处理机制
5.1 异常处理方式对比
处理方式 | 适用场景 | 优点 | 缺点 |
---|---|---|---|
AsyncUncaughtExceptionHandler | 处理void返回类型的异步方法异常 | 集中处理,统一管理 | 无法获取方法返回值 |
Future.get() | 处理有返回值的异步方法异常 | 可以获取具体异常信息 | 需要手动调用get() |
CompletableFuture.exceptionally | Java8+的优雅异常处理方式 | 链式调用,代码简洁 | 仅适用于CompletableFuture |
5.2 实践示例
1. 全局异常处理器
public class GlobalAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {private static final Logger logger = LoggerFactory.getLogger(GlobalAsyncExceptionHandler.class);@Overridepublic void handleUncaughtException(Throwable ex, Method method, Object... params) {logger.error("异步任务异常 - 方法: {}, 参数: {}", method.getName(), Arrays.toString(params), ex);// 可以根据异常类型进行不同处理if (ex instanceof BusinessException) {// 业务异常处理sendAlert("业务异常警报: " + ex.getMessage());} else if (ex instanceof TimeoutException) {// 超时处理retryTask(method, params);}}private void sendAlert(String message) { /*...*/ }private void retryTask(Method method, Object... params) { /*...*/ }
}
2. Future方式的异常处理
@Async
public Future<String> asyncTaskWithException() {try {// 业务逻辑if (someCondition) {throw new BusinessException("业务异常");}return new AsyncResult<>("成功");} catch (BusinessException e) {return new AsyncResult<>("失败: " + e.getMessage());}
}// 调用方处理
Future<String> future = service.asyncTaskWithException();
try {String result = future.get();if (result.startsWith("失败")) {// 处理失败情况}
} catch (ExecutionException e) {// 处理执行时异常
}
3. CompletableFuture的异常处理
@Async
public CompletableFuture<String> asyncProcess(String input) {return CompletableFuture.supplyAsync(() -> {if (input == null) {throw new IllegalArgumentException("输入不能为空");}return "处理结果: " + input.toUpperCase();});
}// 调用方处理
service.asyncProcess(null).exceptionally(ex -> {System.err.println("发生异常: " + ex.getMessage());return "默认返回值";}).thenAccept(result -> {System.out.println("最终结果: " + result);});
六、性能优化与最佳实践
6.1 性能优化建议
-
线程池参数调优
- 根据业务类型调整线程池大小
- 监控线程池状态动态调整参数
- 使用有界队列防止OOM
-
避免长时间阻塞
- 异步方法内避免同步阻塞操作
- 使用带超时的阻塞调用
-
资源清理
- 确保异步方法正确释放资源
- 使用try-with-resources管理资源
-
上下文传递
- 注意ThreadLocal变量在异步线程中的传递问题
- 使用
TaskDecorator
传递上下文
executor.setTaskDecorator(new ContextCopyingDecorator());public class ContextCopyingDecorator implements TaskDecorator {@Overridepublic Runnable decorate(Runnable runnable) {// 获取当前线程的上下文RequestAttributes context = RequestContextHolder.currentRequestAttributes();return () -> {try {// 在新线程中设置上下文RequestContextHolder.setRequestAttributes(context);runnable.run();} finally {RequestContextHolder.resetRequestAttributes();}};}
}
6.2 最佳实践清单
-
命名规范
- 异步方法名以
Async
后缀标识,如sendEmailAsync
- 线程池按业务命名,如
orderTaskExecutor
- 异步方法名以
-
日志记录
- 记录异步任务开始/结束时间
- 为异步线程设置可追踪的上下文ID
@Async
public void asyncWithLogging() {String traceId = UUID.randomUUID().toString();MDC.put("traceId", traceId);try {log.info("异步任务开始");// 业务逻辑log.info("异步任务完成");} finally {MDC.clear();}
}
-
防御性编程
- 检查异步方法参数有效性
- 添加合理的超时控制
-
资源限制
- 限制并发异步任务数量
- 对重要任务设置优先级
-
监控告警
- 监控线程池关键指标
- 设置异常告警阈值
七、与其他技术的整合
7.1 与Spring Retry整合
实现异步任务失败重试:
@Async
@Retryable(value = {RemoteAccessException.class}, maxAttempts = 3, backoff = @Backoff(delay = 1000, multiplier = 2))
public CompletableFuture<String> callExternalService() {// 调用可能失败的外部服务return CompletableFuture.completedFuture(externalService.call());
}// 重试全部失败后的处理
@Recover
public CompletableFuture<String> recover(RemoteAccessException e) {return CompletableFuture.completedFuture("默认返回值");
}
7.2 与Spring Cache整合
异步缓存更新:
@Async
@CacheEvict(value = "users", key = "#userId")
public void evictUserCacheAsync(String userId) {// 异步清理缓存
}@Async
@CachePut(value = "users", key = "#user.id")
public CompletableFuture<User> updateUserAsync(User user) {// 异步更新用户并更新缓存return CompletableFuture.completedFuture(userRepository.save(user));
}
7.3 与WebFlux响应式编程对比
特性 | @Async | WebFlux |
---|---|---|
编程模型 | 命令式 | 响应式 |
线程模型 | 线程池-based | 事件循环 |
资源消耗 | 较高(每个请求一个线程) | 较低(少量线程处理所有请求) |
学习曲线 | 较低 | 较高 |
适用场景 | 传统Servlet应用 | 高并发IO密集型应用 |
背压支持 | 不支持 | 支持 |
集成复杂度 | 简单 | 中等 |
八、常见问题与解决方案
8.1 问题排查表
问题现象 | 可能原因 | 解决方案 |
---|---|---|
@Async方法不异步执行 | 同类调用 | 确保通过Spring代理调用,使用@Autowired注入自身 |
未加@EnableAsync | 在主配置类添加@EnableAsync | |
异步方法抛出异常不显示 | 未正确处理AsyncUncaughtException | 实现AsyncUncaughtExceptionHandler |
线程池不生效 | 未正确命名或注入 | 确保@Async(“executorName”)与@Bean名称一致 |
应用关闭时任务丢失 | 未配置优雅关闭 | 设置setWaitForTasksToCompleteOnShutdown(true)和awaitTerminationSeconds |
性能未提升反而下降 | 线程池配置不合理 | 调整核心/最大线程数和队列容量 |
ThreadLocal值丢失 | 线程切换导致上下文丢失 | 使用TaskDecorator传递上下文 |
8.2 实战问题案例
案例1:数据库连接泄漏
@Async
public void processData() {// 错误示范:未关闭ConnectionConnection conn = dataSource.getConnection();// 使用conn...
}
解决方案:
@Async
public void processData() {try (Connection conn = dataSource.getConnection()) {// 使用conn...} catch (SQLException e) {// 异常处理}
}
案例2:订单超时未支付取消
@Async
@Scheduled(fixedDelay = 60000) // 每分钟检查一次
public void cancelUnpaidOrders() {List<Order> unpaidOrders = orderRepository.findByStatusAndCreateTimeBefore(OrderStatus.UNPAID, LocalDateTime.now().minusMinutes(30));unpaidOrders.forEach(order -> {order.setStatus(OrderStatus.CANCELLED);orderRepository.save(order);notificationService.sendCancelNotice(order);});
}
九、总结
9.1 核心要点总结
-
基础使用:
@EnableAsync
启用支持@Async
标注异步方法- 避免同类调用
-
线程池配置:
- 生产环境必须自定义线程池
- 合理设置核心参数
- 监控线程池状态
-
异常处理:
- 区分返回值类型选择处理方式
- 实现全局异常处理器
- 日志记录完整上下文
-
高级特性:
- 组合多个异步操作
- 事务边界处理
- 条件异步执行
-
最佳实践:
- 命名规范
- 防御性编程
- 资源清理
- 上下文传递
9.2 完整示例代码
订单处理服务示例:
@Service
public class OrderProcessingService {private static final Logger logger = LoggerFactory.getLogger(OrderProcessingService.class);@Autowiredprivate OrderRepository orderRepository;@Autowiredprivate PaymentService paymentService;@Autowiredprivate NotificationService notificationService;@Async("orderTaskExecutor")@Transactional(propagation = Propagation.REQUIRES_NEW)public CompletableFuture<OrderResult> processOrderAsync(Order order) {logger.info("开始异步处理订单: {}", order.getId());long startTime = System.currentTimeMillis();try {// 1. 保存订单Order savedOrder = orderRepository.save(order);// 2. 处理支付PaymentResult paymentResult = paymentService.processPayment(savedOrder);if (!paymentResult.isSuccess()) {throw new PaymentException("支付处理失败: " + paymentResult.getErrorMessage());}// 3. 更新订单状态savedOrder.setStatus(OrderStatus.PAID);orderRepository.save(savedOrder);// 4. 发送通知notificationService.sendOrderConfirmation(savedOrder);long elapsed = System.currentTimeMillis() - startTime;logger.info("订单处理完成: {}, 耗时: {}ms", savedOrder.getId(), elapsed);return CompletableFuture.completedFuture(new OrderResult(true, "订单处理成功", savedOrder));} catch (PaymentException e) {logger.error("订单支付异常: {}", order.getId(), e);return CompletableFuture.completedFuture(new OrderResult(false, e.getMessage(), order));} catch (Exception e) {logger.error("订单处理未知异常: {}", order.getId(), e);return CompletableFuture.failedFuture(e);}}// 批量异步处理订单@Async("batchOrderExecutor")public CompletableFuture<Void> processOrdersBatch(List<Order> orders) {List<CompletableFuture<OrderResult>> futures = orders.stream().map(this::processOrderAsync).collect(Collectors.toList());return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).exceptionally(ex -> {logger.error("批量处理订单异常", ex);return null;});}
}// 配置类
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {@Overridepublic Executor getAsyncExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(10);executor.setMaxPoolSize(20);executor.setQueueCapacity(100);executor.setThreadNamePrefix("Order-Async-");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.setWaitForTasksToCompleteOnShutdown(true);executor.setAwaitTerminationSeconds(60);executor.initialize();return executor;}@Bean(name = "batchOrderExecutor")public Executor batchOrderExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(10);executor.setQueueCapacity(50);executor.setThreadNamePrefix("Batch-Order-");executor.initialize();return executor;}@Overridepublic AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {return new OrderAsyncExceptionHandler();}
}// 全局异常处理器
public class OrderAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {private static final Logger logger = LoggerFactory.getLogger(OrderAsyncExceptionHandler.class);@Overridepublic void handleUncaughtException(Throwable ex, Method method, Object... params) {logger.error("异步订单处理异常 - 方法: {}, 参数: {}", method.getName(), Arrays.toString(params), ex);// 发送告警邮件if (ex instanceof CriticalOrderException) {sendCriticalAlert(method, ex, params);}}private void sendCriticalAlert(Method method, Throwable ex, Object... params) {// 实现告警逻辑}
}