我已经写了很多有关InterruptedException和中断线程的文章 。 简而言之,如果您没有Future.cancel()调用Future.cancel()那么Future将终止待处理的get() ,但还将尝试中断基础线程。 这是一个非常重要的功能,可以更好地利用线程池。 我还写信总是比标准Future更喜欢CompletableFuture 。 事实证明,功能更强大的Future弟兄没有那么优雅地处理cancel() 。 考虑以下任务,我们稍后将在整个测试中使用以下任务:
class InterruptibleTask implements Runnable {private final CountDownLatch started = new CountDownLatch(1)private final CountDownLatch interrupted = new CountDownLatch(1)@Overridevoid run() {started.countDown()try {Thread.sleep(10_000)} catch (InterruptedException ignored) {interrupted.countDown()}}void blockUntilStarted() {started.await()}void blockUntilInterrupted() {assert interrupted.await(1, TimeUnit.SECONDS)}} 客户端线程可以检查InterruptibleTask以查看其是否已开始或被中断。 首先,让我们看一下InterruptibleTask对外部的cancel()反应:
def "Future is cancelled without exception"() {given:def task = new InterruptibleTask()def future = myThreadPool.submit(task)task.blockUntilStarted()and:future.cancel(true)when:future.get()then:thrown(CancellationException)
}def "CompletableFuture is cancelled via CancellationException"() {given:def task = new InterruptibleTask()def future = CompletableFuture.supplyAsync({task.run()} as Supplier, myThreadPool)task.blockUntilStarted()and:future.cancel(true)when:future.get()then:thrown(CancellationException)
} 到目前为止,一切都很好。 显然, Future和CompletableFuture工作方式几乎相同-在取消结果后检索结果会引发CancellationException 。 但是myThreadPool线程呢? 我以为它会被游泳池打断,从而被回收,我怎么了!
def "should cancel Future"() {given:def task = new InterruptibleTask()def future = myThreadPool.submit(task)task.blockUntilStarted()when:future.cancel(true)then:task.blockUntilInterrupted()
}@Ignore("Fails with CompletableFuture")
def "should cancel CompletableFuture"() {given:def task = new InterruptibleTask()def future = CompletableFuture.supplyAsync({task.run()} as Supplier, myThreadPool)task.blockUntilStarted()when:future.cancel(true)then:task.blockUntilInterrupted()
} 第一个测试将普通的Runnable提交给ExecutorService并等待其启动。 稍后,我们取消Future并等待直到观察到InterruptedException 。 当基础线程被中断时, blockUntilInterrupted()将返回。 但是,第二次测试失败。 CompletableFuture.cancel()永远不会中断基础线程,因此尽管Future看起来好像已被取消,但后备线程仍在运行,并且sleep()不会抛出InterruptedException 。 错误或功能? 它已记录在案 ,因此很遗憾地提供一个功能:
参数:
mayInterruptIfRunning–此值在此实现中无效,因为不使用中断来控制处理。
您说RTFM,但是为什么CompletableFuture这样工作? 首先,让我们研究“旧的” Future实现与CompletableFuture有何不同。 从ExecutorService.submit()返回的FutureTask具有以下cancel()实现(我使用类似的非线程安全Java代码删除了Unsafe ,因此仅将其视为伪代码):
public boolean cancel(boolean mayInterruptIfRunning) {if (state != NEW)return false;state = mayInterruptIfRunning ? INTERRUPTING : CANCELLED;try {if (mayInterruptIfRunning) {try {Thread t = runner;if (t != null)t.interrupt();} finally { // final statestate = INTERRUPTED;}}} finally {finishCompletion();}return true;
} FutureTask具有一个遵循该状态图的state变量:
在的情况下, cancel()我们可以进入CANCELLED状态或去INTERRUPTED通过INTERRUPTING 。 核心部分是我们拿runner线程(如果存在的,也就是说,如果当前正在执行的任务),我们尽量打断它。 该分支负责急切和强制中断已运行的线程。 最后,我们必须通知阻塞的所有线程Future.get()中finishCompletion()这里无关紧要)。 因此,很明显,多大的Future取消已经运行的任务。 那CompletableFuture呢? cancel()伪代码:
public boolean cancel(boolean mayInterruptIfRunning) {boolean cancelled = false;if (result == null) {result = new AltResult(new CancellationException());cancelled = true;}postComplete();return cancelled || isCancelled();
} 令人失望的是,我们几乎没有将result设置为CancellationException ,而忽略了mayInterruptIfRunning标志。 postComplete()也有类似的作用finishCompletion() -在通知未来注册的所有悬而未决的回调。 它的实现相当令人不快(使用非阻塞式Treiber stack ),但是它绝对不会中断任何底层线程。
原因和含义
在CompletableFuture情况下,有限的cancel()不是错误,而是设计决定。 CompletableFuture并非固有地绑定到任何线程,而Future几乎总是代表后台任务。 从零开始创建CompletableFuture ( new CompletableFuture<>() )是完美的,其中根本没有要取消的底层线程。 我仍然不禁感到大多数CompletableFuture 都将具有关联的任务和后台线程。 在这种情况下, cancel()可能会出现故障。 我不再建议用CompletableFuture盲目地替换Future ,因为它可能会更改依赖cancel()的应用程序的行为。 这意味着CompletableFuture故意违反了Liskov替换原则 -这是需要考虑的严重问题。
翻译自: https://www.javacodegeeks.com/2015/03/completablefuture-cant-be-interrupted.html