尽管CompletableFuture大约是两年前(!)于2014年3月在Java 8中引入的,但它仍然是一个相对较新的概念。但是,此类不是很广为人知是一件好事,因为它很容易被滥用,尤其是在线程和线程方面。一路涉及的线程池。 本文旨在描述如何将线程与CompletableFuture一起使用。
运行任务
这是API的基本部分。 有一个便捷的supplyAsync()方法类似于ExecutorService.submit() ,但是返回CompletableFuture :
CompletableFuture<String> future =CompletableFuture.supplyAsync(() -> {try (InputStream is = new URL("http://www.nurkiewicz.com").openStream()) {log.info("Downloading");return IOUtils.toString(is, StandardCharsets.UTF_8);} catch (IOException e) {throw new RuntimeException(e);}}); 问题是, supplyAsync()默认情况下使用ForkJoinPool.commonPool() ,所有CompletableFuture ,所有并行流以及部署在同一JVM上的所有应用程序之间共享的线程池(如果不幸的是,仍然使用具有许多已部署工件的应用程序服务器) 。 这个硬编码的,不可配置的线程池完全在我们的控制范围之外,难以监视和扩展。 因此,您应该始终指定自己的Executor ,例如此处(并查看我如何创建一个的一些技巧 ):
ExecutorService pool = Executors.newFixedThreadPool(10);final CompletableFuture<String> future =CompletableFuture.supplyAsync(() -> {//...}, pool);但这仅仅是开始……
回调和转换
假设您要转换给定的CompletableFuture ,例如,提取String的长度:
CompletableFuture<Integer> intFuture =future.thenApply(s -> s.length()); 究竟是谁调用s.length()代码? 坦白地说,我亲爱的开发人员,我们不给该死[1] 。 只要像thenApply这样的所有运算符中的lambda表达式thenApply便宜,我们就不在乎谁调用它。 但是,如果此表达式花费一点CPU时间来完成或进行阻塞的网络调用怎么办?
首先,默认情况下会发生什么? 想想看:我们有一个String类型的后台任务,我们想在该值完成后异步应用一些特定的转换。 最简单的实现方法是包装原始任务(返回String ),并在完成任务时对其进行拦截。 内部任务完成后,我们的回调开始,应用转换并返回修改后的值。 这就像介于我们的代码和原始计算结果之间的一个方面。 话虽这么说,很明显s.length()转换将在与原始任务相同的线程中执行,是吗? 不完全的!
CompletableFuture<String> future =CompletableFuture.supplyAsync(() -> {sleepSeconds(2);return "ABC";}, pool);future.thenApply(s -> {log.info("First transformation");return s.length();
});future.get();
pool.shutdownNow();
pool.awaitTermination(1, TimeUnit.MINUTES);future.thenApply(s -> {log.info("Second transformation");return s.length();
}); 当任务仍在运行时,将注册thenApply()的第一个转换。 因此,它将在任务完成后立即在与任务相同的线程中执行。 但是,在注册第二个转换之前,我们要等到任务实际完成为止。 更糟糕的是,我们完全关闭了线程池,以确保在那里没有其他代码可以执行。 那么哪个线程将运行第二次转换? 我们知道它必须立即发生,因为future我们在已经完成的回调上进行注册。 事实证明,默认情况下使用客户端线程(!)! 输出如下:
pool-1-thread-1 | First transformation main | Second transformation 在注册了第二个转换后,它意识到CompletableFuture已经完成,因此它立即执行了转换。 周围没有其他线程,因此在当前main线程的上下文中调用thenApply() 。 当实际的转换成本很高时,就会出现这种行为容易出错的最大原因。 想象一下thenApply() lambda表达式进行了一些繁重的计算或阻塞了网络调用。 突然,我们的异步CompletableFuture阻止了调用线程!
控制回调的线程池
有两种技术可以控制哪个线程执行我们的回调和转换。 请注意,仅当您的转换成本很高时才需要这些解决方案。 否则,差异可以忽略不计。 因此,首先我们可以选择*Async版本的运算符,例如:
future.thenApplyAsync(s -> {log.info("Second transformation");return s.length();
}); 这次,第二个转换自动卸载给我们的朋友ForkJoinPool.commonPool() :
pool-1-thread-1 | First transformation
ForkJoinPool.commonPool-worker-1 | Second transformation 但是我们不喜欢commonPool所以我们提供自己的:
future.thenApplyAsync(s -> {log.info("Second transformation");return s.length();
}, pool2); 请注意,使用了不同的线程池( pool-1与pool-2 ):
pool-1-thread-1 | First transformation
pool-2-thread-1 | Second transformation将回调视为另一个计算步骤
但是我相信,如果您在长时间运行的回调和转换方面遇到麻烦(请记住,本文适用于CompletableFuture上的几乎所有其他方法),则应该简单地使用另一个显式的CompletableFuture ,例如:
//Imagine this is slow and costly
CompletableFuture<Integer> strLen(String s) {return CompletableFuture.supplyAsync(() -> s.length(),pool2);
}//...CompletableFuture<Integer> intFuture = future.thenCompose(s -> strLen(s)); 这种方法更加明确。 知道我们的转换成本很高,因此我们不冒险在任意或不受控制的线程上运行它。 相反,我们将其显式建模为从String到CompletableFuture<Integer>异步操作。 但是,我们必须将thenApply()替换为thenCompose() ,否则最终将获得CompletableFuture<CompletableFuture<Integer>> 。
但是,如果我们的转换没有一个与嵌套CompletableFuture applyToEither()的版本,例如, applyToEither()等待第一个Future完成并应用转换,该怎么办?
CompletableFuture<CompletableFuture<Integer>> poor = future1.applyToEither(future2, s -> strLen(s)); 有一个方便的技巧可以“解包”这种晦涩的数据结构,称为flatten ,可以使用flatMap(identity) (或flatMap(x -> x) )轻松实现。 在我们的例子中, flatMap()称为thenCompose ( duh! ):
CompletableFuture<Integer> good = poor.thenCompose(x -> x); 我由您自己决定如何运作以及为什么运作。 我希望本文CompletableFuture您更清楚地了解如何在CompletableFuture中涉及线程。
翻译自: https://www.javacodegeeks.com/2015/12/thread-executes-completablefutures-tasks-callbacks.html