Java的CompletableFuture是Java 8引入的一个非常强大的异步编程工具,它实现了Future和CompletionStage接口,提供了丰富的方法来组合、转换和处理异步计算的结果。下面我将从创建、转换、组合、错误处理等方面详细讲解CompletableFuture的使用。
1. 创建CompletableFuture
1.1 使用runAsync和supplyAsync
-
runAsync(Runnable runnable): 执行一个Runnable任务,不返回结果,返回一个CompletableFuture<Void>。 -
supplyAsync(Supplier<U> supplier): 执行一个Supplier任务,并返回一个CompletableFuture<U>,其中包含Supplier返回的值。
示例:
// 无返回值的异步任务
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {System.out.println("执行无返回值的异步任务");
});// 有返回值的异步任务
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {return "Hello, CompletableFuture!";
});
1.2 使用completedFuture创建一个已经完成并带有结果的Future
CompletableFuture<String> future = CompletableFuture.completedFuture("立即完成的结果");
1.3 使用不完整的CompletableFuture
可以通过new关键字创建一个不完整的CompletableFuture,然后在某个时刻通过complete方法手动完成它。
CompletableFuture<String> future = new CompletableFuture<>();
// 在某个时刻,比如另一个线程中,完成这个Future
future.complete("手动完成的结果");
2. 获取结果
2.1 get() 和 get(long timeout, TimeUnit unit)
-
get(): 阻塞直到任务完成,然后返回结果。 -
get(long timeout, TimeUnit unit): 在指定时间内阻塞等待结果,超时则抛出TimeoutException。
2.2 join()
与get()类似,但不会抛出受检异常(ExecutionException和InterruptedException),而是抛出未经检查的CompletionException。
2.3 getNow(T valueIfAbsent)
如果任务已经完成,返回结果;否则返回指定的valueIfAbsent。
3. 转换和组合
3.1 thenApply 和 thenApplyAsync
-
thenApply(Function<? super T,? extends U> fn): 当当前阶段完成后,对结果应用一个函数,返回一个新的CompletableFuture。 -
thenApplyAsync是在另一个线程中执行函数。
示例:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello").thenApply(s -> s + " World").thenApply(String::toUpperCase);
System.out.println(future.get()); // 输出: HELLO WORLD
3.2 thenAccept 和 thenRun
-
thenAccept(Consumer<? super T> action): 对结果进行消费,不返回新的值。 -
thenRun(Runnable action): 不关心结果,只是在前一个阶段完成后执行一个动作。
3.3 thenCompose 和 thenCombine
-
thenCompose(Function<? super T, ? extends CompletionStage<U>> fn): 用于连接两个有依赖关系的CompletableFuture,第二个CompletableFuture依赖于第一个的结果。 -
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn): 当两个不相关的CompletableFuture都完成后,对两个结果进行组合。
示例:
// thenCompose
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = future1.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));// thenCombine
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> " World");
CompletableFuture<String> combined = future3.thenCombine(future4, (s1, s2) -> s1 + s2);
4. 并行执行多个任务
4.1 allOf 和 anyOf
-
allOf(CompletableFuture<?>... cfs): 当所有的CompletableFuture都完成时,返回一个新的CompletableFuture。 -
anyOf(CompletableFuture<?>... cfs): 当任意一个CompletableFuture完成时,返回一个新的CompletableFuture。
示例:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task2");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Task3");CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2, future3);
all.thenRun(() -> {// 所有任务都完成后的操作try {String result1 = future1.get();String result2 = future2.get();String result3 = future3.get();System.out.println(result1 + ", " + result2 + ", " + result3);} catch (Exception e) {e.printStackTrace();}
});
5. 异常处理
5.1 exceptionally
类似于try-catch中的catch,当阶段中抛出异常时,可以通过exceptionally方法返回一个备选值。
示例:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {if (true) throw new RuntimeException("异常发生");return "正常结果";
}).exceptionally(ex -> {System.out.println("捕获异常: " + ex.getMessage());return "备选结果";
});
5.2 handle 和 handleAsync
无论阶段是否正常完成,都会执行给定的函数。函数接收两个参数:结果和异常。如果阶段正常完成,则异常参数为null;如果异常完成,则结果参数为null。
示例:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {// 可能正常执行,也可能抛出异常return "结果";
}).handle((result, ex) -> {if (ex != null) {return "异常处理";}return result;
});
5.3 whenComplete
与handle类似,但whenComplete不改变结果,只是对结果或异常进行消费,返回的CompletableFuture的结果与原阶段相同。
6. 其他方法
-
completeExceptionally(Throwable ex): 手动将CompletableFuture设置为异常完成。 -
isDone(): 判断是否完成(正常完成或异常完成)。 -
isCompletedExceptionally(): 判断是否异常完成。
7. 使用线程池
默认情况下,runAsync和supplyAsync使用ForkJoinPool.commonPool()。你也可以指定自定义的线程池。
示例:
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {// 任务逻辑return "结果";
}, executor);
8. 注意事项
-
避免在CompletableFuture的回调方法(如thenApply)中执行长时间阻塞的操作,因为这会阻塞后续任务的执行(除非使用Async后缀的方法,它们会在新的线程中执行)。
-
合理使用异常处理,避免异常被吞没。
-
注意线程池的选择,避免资源耗尽。
CompletableFuture提供了非常灵活的异步编程模型,通过组合各种方法,可以构建复杂的异步工作流。希望以上详解能帮助你更好地使用CompletableFuture。
CompletableFuture 是 Java 8 引入的强大的异步编程工具,它实现了 Future 和 CompletionStage 接口,提供了丰富的异步编程能力。下面我将从基础使用到高级特性进行全面详解。
📚 核心概念
1. 创建 CompletableFuture
| 创建方式 | 方法签名 | 说明 |
|---|---|---|
| 直接创建 | CompletableFuture<Void> runAsync(Runnable runnable) |
执行无返回值的异步任务 |
CompletableFuture<U> supplyAsync(Supplier<U> supplier) |
执行有返回值的异步任务 | |
| 已完成 | CompletableFuture<U> completedFuture(U value) |
创建已完成的 Future |
| 手动控制 | new CompletableFuture<>() |
手动控制的 Future |
// 1. 无返回值的异步任务
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {System.out.println("异步执行无返回值任务");
});// 2. 有返回值的异步任务
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {sleep(1000);return "Hello, CompletableFuture!";
});// 3. 使用自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {return "使用自定义线程池";
}, executor);
🔄 结果处理与转换
2. 结果转换 (thenApply)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello");// thenApply - 同步转换
CompletableFuture<String> upperFuture = future.thenApply(String::toUpperCase);
System.out.println(upperFuture.get()); // 输出: HELLO// thenApplyAsync - 异步转换
CompletableFuture<String> asyncFuture = future.thenApplyAsync(s -> {sleep(1000);return s + " world";
});
System.out.println(asyncFuture.get()); // 输出: hello world
3. 结果消费 (thenAccept / thenRun)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "result");// thenAccept - 消费结果
future.thenAccept(result -> {System.out.println("接收到结果: " + result);
});// thenRun - 不关心结果,只执行操作
future.thenRun(() -> {System.out.println("任务完成,执行清理操作");
});// 异步版本
future.thenAcceptAsync(result -> {System.out.println("异步消费: " + result);
});
🔗 任务组合
4. 任务链 (thenCompose)
用于连接两个有依赖关系的异步任务:
// 模拟异步获取用户ID
CompletableFuture<String> getUserById(int id) {return CompletableFuture.supplyAsync(() -> "user_" + id);
}// 模拟异步获取用户详情
CompletableFuture<String> getUserDetail(String user) {return CompletableFuture.supplyAsync(() -> user + "_details");
}// 使用 thenCompose 连接两个依赖的任务
CompletableFuture<String> result = getUserById(123).thenCompose(this::getUserDetail);System.out.println(result.get()); // 输出: user_123_details
5. 任务合并 (thenCombine)
合并两个独立的异步任务结果:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {sleep(1000);return "Hello";
});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {sleep(500);return "World";
});// 合并两个任务的结果
CompletableFuture<String> combined = future1.thenCombine(future2, (r1, r2) -> {return r1 + " " + r2;
});System.out.println(combined.get()); // 输出: Hello World
🎯 多任务处理
6. 所有任务完成 (allOf)
List<CompletableFuture<String>> futures = Arrays.asList(CompletableFuture.supplyAsync(() -> "Task1"),CompletableFuture.supplyAsync(() -> "Task2"),CompletableFuture.supplyAsync(() -> "Task3")
);CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])
);// 等待所有任务完成,然后处理结果
CompletableFuture<List<String>> allResults = allFutures.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
);List<String> results = allResults.get();
System.out.println(results); // 输出: [Task1, Task2, Task3]
7. 任一任务完成 (anyOf)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {sleep(2000);return "Slow Task";
});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {sleep(500);return "Fast Task";
});CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
System.out.println(anyFuture.get()); // 输出: Fast Task (先完成的任务)
🚨 异常处理
8. 异常处理方式
// 1. exceptionally - 异常时提供默认值
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {if (true) throw new RuntimeException("出错啦!");return "Success";
}).exceptionally(ex -> {System.out.println("异常处理: " + ex.getMessage());return "Default Value";
});// 2. handle - 无论成功失败都会执行
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {return "Success";
}).handle((result, ex) -> {if (ex != null) {return "Error: " + ex.getMessage();}return "Result: " + result;
});// 3. whenComplete - 类似 handle 但不改变结果
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {return "Data";
}).whenComplete((result, ex) -> {if (ex != null) {System.out.println("任务失败: " + ex.getMessage());} else {System.out.println("任务成功: " + result);}
});
⏱️ 超时控制
9. 超时处理 (Java 9+)
// Java 9+ 的 orTimeout 方法
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {sleep(3000);return "Slow Result";
}).orTimeout(1000, TimeUnit.MILLISECONDS) // 1秒超时.exceptionally(ex -> {if (ex instanceof TimeoutException) {return "Timeout Fallback";}return "Other Error";
});System.out.println(future.get()); // 输出: Timeout Fallback// completeOnTimeout - 超时时提供默认值
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {sleep(3000);return "Slow Result";
}).completeOnTimeout("Default Value", 1000, TimeUnit.MILLISECONDS);System.out.println(future2.get()); // 输出: Default Value
🏗️ 实际应用示例
10. 电商订单处理示例
public class OrderService {public CompletableFuture<OrderResult> processOrder(Order order) {// 并行执行多个任务CompletableFuture<Boolean> inventoryCheck = checkInventory(order);CompletableFuture<Boolean> paymentProcess = processPayment(order);CompletableFuture<Boolean> fraudCheck = checkFraud(order);// 等待所有检查通过return CompletableFuture.allOf(inventoryCheck, paymentProcess, fraudCheck).thenCompose(v -> {// 验证所有检查结果if (inventoryCheck.join() && paymentProcess.join() && fraudCheck.join()) {return shipOrder(order).thenApply(trackingNumber -> new OrderResult(true, "订单处理成功", trackingNumber));} else {return CompletableFuture.completedFuture(new OrderResult(false, "订单检查未通过", null));}}).exceptionally(ex -> new OrderResult(false, "处理过程中出错: " + ex.getMessage(), null));}private CompletableFuture<Boolean> checkInventory(Order order) {return CompletableFuture.supplyAsync(() -> {sleep(200);return true; // 模拟库存检查});}private CompletableFuture<Boolean> processPayment(Order order) {return CompletableFuture.supplyAsync(() -> {sleep(300);return true; // 模拟支付处理});}private CompletableFuture<Boolean> checkFraud(Order order) {return CompletableFuture.supplyAsync(() -> {sleep(150);return true; // 模拟欺诈检查});}private CompletableFuture<String>