在 Java 中,并行流 (parallelStream
) 和 CompletableFuture
都是处理并发和异步编程的工具,但它们在使用场景和适用性上有一些区别。下面是一些指导原则,可以帮助你在选择使用并行流或 CompletableFuture
时做出决策:
使用场景:
1. 并行流 (parallelStream
)
-
适用场景:
- 需要对集合进行并行处理。
- 操作是独立的,无需等待其他元素的结果。
- 数据量较大,适合并行分割和处理。
-
优点:
- 语法简洁,易于使用。
- 不需要显式的线程管理。
parallelStream
是 Java 8 引入的一个功能,允许对集合进行并行处理。这是传统顺序 stream
的并行版本。以下是一个简要的解释:
-
用法:
List<T> myList = // 一些列表或集合 myList.parallelStream().forEach(element -> {// 要在每个元素上并行执行的代码 });
-
解释:
parallelStream()
将顺序流转换为并行流。- 流的元素由多个线程并发处理,这有可能提高在多核处理器上的性能。
- 然而,重要的是要注意,并非所有操作都适合并行处理,这取决于任务的性质。
2. CompletableFuture
-
适用场景:
- 需要更细粒度的控制,例如异步任务之间的依赖关系。
- 需要等待多个异步任务全部完成或任一完成。
- 需要处理异常和结果。
-
优点:
- 提供了更灵活的异步编程能力。
- 可以使用回调、组合和链式调用等方式。
CompletableFuture.runAsync
是 Java 8 引入的 CompletableFuture
API 的一部分,提供了进行异步编程的一种方式。以下是它的工作原理:
-
用法:
// 1、使用ForkJoinPool.commonPool()作为它的线程池执行异步代码System.out.println("当前调用者线程为:" + Thread.currentThread().getName()); CompletableFuture.runAsync(() -> {// 异步方法内当前执行线程为:ForkJoinPool.commonPool-worker-1System.out.println("异步方法内当前执行线程为:" + Thread.currentThread().getName());System.out.println("---------------------"); });CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {// 异步执行的代码 });//2、使用指定的线程池执行异步代码。此异步方法无法返回值。System.out.println("当前调用者线程为:" + Thread.currentThread().getName()); // fixme 根据阿里规约 建议真实开发时使用 ThreadPoolExecutor 定义线程池 ExecutorService threadPool = Executors.newFixedThreadPool(10);CompletableFuture.runAsync(() -> {// 异步方法内当前执行线程为:pool-1-thread-1System.out.println("异步方法内当前执行线程为:" + Thread.currentThread().getName());System.out.println(111); }, threadPool);// 演示代码,执行完后关闭线程池 threadPool.shutdown();
-
解释:
runAsync
是一个工厂方法,创建一个新的CompletableFuture
并在单独的线程中运行指定的Runnable
。- 它返回一个
CompletableFuture<Void>
,因为Runnable
不产生结果。 - 你可以附加回调或组合多个
CompletableFuture
实例以链接异步操作。
具体指导原则:
并行流parallelStream 与 CompletableFuture的选择
计算密集型操作:推荐使用 parallelStream
:
-
原因
- 计算密集型操作可以充分利用并行流的简单实现和高效性能。
- 并行流的底层实现会自动分配工作给可用的处理器核心,提高整体计算速度。
含有I/O操作:推荐使用 CompletableFuture
:
-
原因
CompletableFuture
提供更灵活的异步编程能力,特别适用于等待I/O的操作。- 可以根据等待和计算的比率,灵活地控制线程数,避免不必要的线程创建。
- 在涉及到等待I/O的操作时,使用
CompletableFuture
会更加灵活,可以避免并行流的延迟特性造成的难以判断的情况。 - 通过
CompletableFuture.supplyAsync
创建异步任务,然后使用CompletableFuture.allOf
等待它们全部完成。
- 示例
List<CompletableFuture<Integer>> futures = myList.stream().map(element -> CompletableFuture.supplyAsync(() -> performIoOperation(element))).collect(Collectors.toList());CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allOf.join();List<Integer> result = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
1. 简单的并行处理:
-
如果任务可以独立并行处理:
- 选择并行流 (
parallelStream
):- 示例:对集合中的元素执行相同的操作,互不影响。
myList.parallelStream().forEach(element -> { // 并行处理的操作 });
- 选择并行流 (
2. 复杂的异步任务:
-
如果任务之间有依赖关系,需要更细粒度的控制:
- 选择
CompletableFuture
:- 示例:一个异步任务的结果依赖于另一个异步任务的完成。
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {// 异步任务1 }).thenRun(() -> {// 异步任务2,依赖于任务1的完成 });
- 选择
3. 等待多个任务完成:
-
如果需要等待多个任务全部完成或任一完成:
- 选择
CompletableFuture
:- 示例:等待多个异步任务的全部完成或任一完成。
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {// 异步任务1 });CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {// 异步任务2 });CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2); combinedFuture.join(); // 等待全部完成
- 选择
4. 异常处理:
-
如果需要更丰富的异常处理:
- 选择
CompletableFuture
:- 示例:对每个异步任务进行异常处理。
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {// 异步任务 }).exceptionally(throwable -> {// 异常处理return null; // 或提供默认值 });
- 选择
5. 性能考虑:
-
如果是对集合进行简单操作,数据量较大:
- 选择并行流 (
parallelStream
):- 示例:对集合进行简单的映射、过滤等操作。
myList.parallelStream().map(element -> {// 简单操作return transformedElement; });
- 选择并行流 (
总结:
-
简单的并行处理和集合操作:
- 选择并行流 (
parallelStream
)
- 选择并行流 (
-
复杂的异步任务、依赖关系、等待多个任务完成:
- 选择
CompletableFuture
- 选择
在实际应用中,通常你会根据具体的需求和场景选择合适的工具。有时候也可以结合使用这两者以发挥各自的优势。