CompletableFuture提供了许多回调函数,来处理异步编程
获取任务结果方法
public T get ( ) throws InterruptedException , ExecutionException
public T get ( long timeout, TimeUnit unit) throws InterruptedException , ExecutionException , TimeoutException
public T join ( )
public T getNow ( T valueIfAbsent)
public boolean complete ( T value)
public boolean completeExceptionally ( Throwable ex)
CompletableFuture创建异步任务
1:supplyAsync是创建带有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法(线程池大小和硬件有关),一个是带有自定义线程池的重载方法
public static < U > CompletableFuture < U > supplyAsync ( Supplier < U > supplier)
public static < U > CompletableFuture < U > supplyAsync ( Supplier < U > supplier, Executor executor)
2:runAsync是创建没有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法(线程池大小和硬件有关),一个是带有自定义线程池的重载方法
public static CompletableFuture < Void > runAsync ( Runnable runnable)
public static CompletableFuture < Void > runAsync ( Runnable runnable, Executor executor)
异步回调处理
1:thenApply(Function<T, U> fn)父子线程是同一个线程和thenApplyAsync(Function<T, U> fn)父子线程不是同一个线程: 对任务结果进行转换;表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,带有返回值
public static void main ( String [ ] args) throws ExecutionException , InterruptedException { CompletableFuture < Integer > cf1 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( Thread . currentThread ( ) + " cf1 do something...." ) ; return 1 ; } ) ; CompletableFuture < Integer > cf2 = cf1. thenApply ( ( result) -> { System . out. println ( Thread . currentThread ( ) + " cf2 do something...." ) ; result += 2 ; return result; } ) ; System . out. println ( "cf1结果->" + cf1. get ( ) ) ; System . out. println ( "cf2结果->" + cf2. get ( ) ) ;
} public static void main ( String [ ] args) throws ExecutionException , InterruptedException { CompletableFuture < Integer > cf1 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( Thread . currentThread ( ) + " cf1 do something...." ) ; return 1 ; } ) ; CompletableFuture < Integer > cf2 = cf1. thenApplyAsync ( ( result) -> { System . out. println ( Thread . currentThread ( ) + " cf2 do something...." ) ; result += 2 ; return result; } ) ; System . out. println ( "cf1结果->" + cf1. get ( ) ) ; System . out. println ( "cf2结果->" + cf2. get ( ) ) ;
}
2:thenAccept(Consumeraction)父子线程是同一个线程和thenAcceptAsync(Consumeraction)父子线程不是同一个线程: 对任务结果进行转换;表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,无返回值
public static void main ( String [ ] args) throws ExecutionException , InterruptedException { CompletableFuture < Integer > cf1 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( Thread . currentThread ( ) + " cf1 do something...." ) ; return 1 ; } ) ; CompletableFuture < Void > cf2 = cf1. thenAccept ( ( result) -> { System . out. println ( Thread . currentThread ( ) + " cf2 do something...." ) ; } ) ; System . out. println ( "cf1结果->" + cf1. get ( ) ) ; System . out. println ( "cf2结果->" + cf2. get ( ) ) ;
} public static void main ( String [ ] args) throws ExecutionException , InterruptedException { CompletableFuture < Integer > cf1 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( Thread . currentThread ( ) + " cf1 do something...." ) ; return 1 ; } ) ; CompletableFuture < Void > cf2 = cf1. thenAcceptAsync ( ( result) -> { System . out. println ( Thread . currentThread ( ) + " cf2 do something...." ) ; } ) ; System . out. println ( "cf1结果->" + cf1. get ( ) ) ; System . out. println ( "cf2结果->" + cf2. get ( ) ) ;
}
3:thenRun(Runnable action)父子线程是同一个线程和thenRunAsync(Consumeraction)父子线程不是同一个线程: 执行一个额外的任务,不依赖于任务结果;表示某个任务执行完成后执行的动作,即回调方法,无入参,无返回值
public static void main ( String [ ] args) throws ExecutionException , InterruptedException { CompletableFuture < Integer > cf1 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( Thread . currentThread ( ) + " cf1 do something...." ) ; return 1 ; } ) ; CompletableFuture < Void > cf2 = cf1. thenRun ( ( ) -> { System . out. println ( Thread . currentThread ( ) + " cf2 do something...." ) ; } ) ; System . out. println ( "cf1结果->" + cf1. get ( ) ) ; System . out. println ( "cf2结果->" + cf2. get ( ) ) ;
} public static void main ( String [ ] args) throws ExecutionException , InterruptedException { CompletableFuture < Integer > cf1 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( Thread . currentThread ( ) + " cf1 do something...." ) ; return 1 ; } ) ; CompletableFuture < Void > cf2 = cf1. thenRunAsync ( ( ) -> { System . out. println ( Thread . currentThread ( ) + " cf2 do something...." ) ; } ) ; System . out. println ( "cf1结果->" + cf1. get ( ) ) ; System . out. println ( "cf2结果->" + cf2. get ( ) ) ;
}
双任务组合处理,将两个CompletableFuture组合起来处理,只有两个任务都正常完成时,才进行下阶段任务
1:thenCombine会将两个任务的执行结果作为所提供函数的参数,且该方法有返回值
public static void main ( String [ ] args) throws ExecutionException , InterruptedException { CompletableFuture < Integer > cf1 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( Thread . currentThread ( ) + " cf1 do something...." ) ; return 1 ; } ) ; CompletableFuture < Integer > cf2 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( Thread . currentThread ( ) + " cf2 do something...." ) ; return 2 ; } ) ; CompletableFuture < Integer > cf3 = cf1. thenCombine ( cf2, ( a, b) -> { System . out. println ( Thread . currentThread ( ) + " cf3 do something...." ) ; return a + b; } ) ; System . out. println ( "cf3结果->" + cf3. get ( ) ) ;
}
2:thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值
public static void main ( String [ ] args) throws ExecutionException , InterruptedException { CompletableFuture < Integer > cf1 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( Thread . currentThread ( ) + " cf1 do something...." ) ; return 1 ; } ) ; CompletableFuture < Integer > cf2 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( Thread . currentThread ( ) + " cf2 do something...." ) ; return 2 ; } ) ; CompletableFuture < Void > cf3 = cf1. thenAcceptBoth ( cf2, ( a, b) -> { System . out. println ( Thread . currentThread ( ) + " cf3 do something...." ) ; System . out. println ( a + b) ; } ) ; System . out. println ( "cf3结果->" + cf3. get ( ) ) ;
}
3:runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果
public static void main ( String [ ] args) throws ExecutionException , InterruptedException { CompletableFuture < Integer > cf1 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( Thread . currentThread ( ) + " cf1 do something...." ) ; return 1 ; } ) ; CompletableFuture < Integer > cf2 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( Thread . currentThread ( ) + " cf2 do something...." ) ; return 2 ; } ) ; CompletableFuture < Void > cf3 = cf1. runAfterBoth ( cf2, ( ) -> { System . out. println ( Thread . currentThread ( ) + " cf3 do something...." ) ; } ) ; System . out. println ( "cf3结果->" + cf3. get ( ) ) ;
}
双任务组合处理,将两个CompletableFuture组合起来处理,有一个任务正常完成时,就会进行下阶段任务
1:applyToEither会将已经完成任务的执行结果作为所提供函数的参数,且该方法有返回值
public static void main ( String [ ] args) throws ExecutionException , InterruptedException { CompletableFuture < Integer > cf1 = CompletableFuture . supplyAsync ( ( ) -> { try { Thread . sleep ( 1000 * 2 ) ; } catch ( InterruptedException e) { throw new RuntimeException ( e) ; } System . out. println ( Thread . currentThread ( ) + " cf1 do something...." ) ; return 1 ; } ) ; CompletableFuture < Integer > cf2 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( Thread . currentThread ( ) + " cf2 do something...." ) ; return 2 ; } ) ; CompletableFuture < Integer > cf3 = cf1. applyToEither ( cf2, ( result) -> { System . out. println ( Thread . currentThread ( ) + " cf3 do something...." ) ; return result; } ) ; System . out. println ( "cf3结果->" + cf3. get ( ) ) ; }
2:acceptEither同样将已经完成任务的执行结果作为方法入参,但是无返回值
public static void main ( String [ ] args) throws ExecutionException , InterruptedException { CompletableFuture < Integer > cf1 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( Thread . currentThread ( ) + " cf1 do something...." ) ; return 1 ; } ) ; CompletableFuture < Integer > cf2 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( Thread . currentThread ( ) + " cf2 do something...." ) ; return 2 ; } ) ; CompletableFuture < Void > cf3 = cf1. acceptEither ( cf2, ( a, b) -> { System . out. println ( Thread . currentThread ( ) + " cf3 do something...." ) ; System . out. println ( a + b) ; } ) ; System . out. println ( "cf3结果->" + cf3. get ( ) ) ;
}
3:runAfterEither没有入参,也没有返回值
public static void main ( String [ ] args) throws ExecutionException , InterruptedException { CompletableFuture < Integer > cf1 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( Thread . currentThread ( ) + " cf1 do something...." ) ; return 1 ; } ) ; CompletableFuture < Integer > cf2 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( Thread . currentThread ( ) + " cf2 do something...." ) ; return 2 ; } ) ; CompletableFuture < Void > cf3 = cf1. runAfterEither ( cf2, ( ) -> { System . out. println ( Thread . currentThread ( ) + " cf3 do something...." ) ; } ) ; System . out. println ( "cf3结果->" + cf3. get ( ) ) ;
}
合并多个CompletableFuture
1:allOf:CompletableFuture是多个任务都执行完成后才会执行,只要有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出相应任务的异常,如果都是正常执行,则get返回null
public static void main ( String [ ] args) throws ExecutionException , InterruptedException { CompletableFuture < String > cf1 = CompletableFuture . supplyAsync ( ( ) -> { try { System . out. println ( Thread . currentThread ( ) + " cf1 do something...." ) ; Thread . sleep ( 2000 ) ; } catch ( InterruptedException e) { e. printStackTrace ( ) ; } System . out. println ( "cf1 任务完成" ) ; return "cf1 任务完成" ; } ) ; CompletableFuture < String > cf2 = CompletableFuture . supplyAsync ( ( ) -> { try { System . out. println ( Thread . currentThread ( ) + " cf2 do something...." ) ; int a = 1 / 0 ; Thread . sleep ( 5000 ) ; } catch ( InterruptedException e) { e. printStackTrace ( ) ; } System . out. println ( "cf2 任务完成" ) ; return "cf2 任务完成" ; } ) ; CompletableFuture < Void > cfAll = CompletableFuture . allOf ( cf1, cf2) ; System . out. println ( "cfAll结果->" + cfAll. get ( ) ) ;
}
2:anyOf :CompletableFuture是多个任务只要有一个任务执行完成,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回执行完成任务的结果
public static void main ( String [ ] args) throws ExecutionException , InterruptedException { CompletableFuture < String > cf1 = CompletableFuture . supplyAsync ( ( ) -> { try { System . out. println ( Thread . currentThread ( ) + " cf1 do something...." ) ; Thread . sleep ( 2000 ) ; } catch ( InterruptedException e) { e. printStackTrace ( ) ; } System . out. println ( "cf1 任务完成" ) ; return "cf1 任务完成" ; } ) ; CompletableFuture < String > cf2 = CompletableFuture . supplyAsync ( ( ) -> { try { System . out. println ( Thread . currentThread ( ) + " cf2 do something...." ) ; Thread . sleep ( 5000 ) ; } catch ( InterruptedException e) { e. printStackTrace ( ) ; } System . out. println ( "cf2 任务完成" ) ; return "cf2 任务完成" ; } ) ; CompletableFuture < Object > cfAll = CompletableFuture . anyOf ( cf1, cf2) ; System . out. println ( "cfAll结果->" + cfAll. get ( ) ) ;
}
异常完成处理,如果任务执行过程中抛出了异常,可以使用以下方法处理异常
1:exceptionally(Function<Throwable, T> fn): 在任务抛出异常时提供一个默认值;
public static void main ( String [ ] args) throws ExecutionException , InterruptedException { CompletableFuture < Integer > cf1 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( Thread . currentThread ( ) + " cf1 do something...." ) ; int a = 1 / 0 ; return 1 ; } ) ; CompletableFuture < Integer > cf2 = cf1. exceptionally ( ex -> { System . out. println ( "Exception: " + ex. getMessage ( ) ) ; return 0 ; } ) ; System . out. println ( "cf2结果->" + cf2. get ( ) ) ; }
2:whenComplete和whenCompleteAsync,whenComplete是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法,如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常
public static void main ( String [ ] args) throws ExecutionException , InterruptedException { CompletableFuture < Integer > cf1 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( Thread . currentThread ( ) + " cf1 do something...." ) ; int a = 1 / 0 ; return 1 ; } ) ; CompletableFuture < Integer > cf2 = cf1. whenComplete ( ( result, e) -> { System . out. println ( "上个任务结果:" + result) ; System . out. println ( "上个任务抛出异常:" + e) ; System . out. println ( Thread . currentThread ( ) + " cf2 do something...." ) ; } ) ;
System . out. println ( "cf2结果->" + cf2. get ( ) ) ; }
3:handle和handleAsync,跟whenComplete方法基本一致,区别在于handle的回调方法有返回值,如果有异常会被抛出
public static void main ( String [ ] args) throws ExecutionException , InterruptedException { CompletableFuture < Integer > cf1 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( Thread . currentThread ( ) + " cf1 do something...." ) ; return 1 ; } ) ; CompletableFuture < Integer > cf2 = cf1. handle ( ( result, e) -> { System . out. println ( Thread . currentThread ( ) + " cf2 do something...." ) ; System . out. println ( "上个任务结果:" + result) ; System . out. println ( "上个任务抛出异常:" + e) ; return result+ 2 ; } ) ; System . out. println ( "cf2结果->" + cf2. get ( ) ) ;
}
CompletableFuture 和 Future的区别
Future 接口只提供了 get() 方法来获取计算结果,但如果计算时间过长,我们的线程就会一直堵塞,要等待 10年才会打印值,Future 也没有任何方法可以手动完成任务,例子如下
import java. util. concurrent. * ; public class FutureDemo { public static void main ( String [ ] args) throws ExecutionException , InterruptedException { ExecutorService executor = Executors . newSingleThreadExecutor ( ) ; Future < String > future = executor. submit ( ( ) -> threadSleep ( ) ) ; System . out. println ( "The result is: " + future. get ( ) ) ; } private static String threadSleep ( ) throws InterruptedException { TimeUnit . DAYS . sleep ( 365 * 10 ) ; return "finishSleep" ; }
}
CompletableFuture提供complete() 方法可以手动完成任务解决Future的弊端,示例代码如下:
import java. util. concurrent. * ; public class CompletableFutureDemo { public static void main ( String [ ] args) { CompletableFuture < String > completableFuture = CompletableFuture . supplyAsync ( ( ) -> threadSleep ( ) ) ; completableFuture. complete ( "Completed" ) ; System . out. println ( "result: " + completableFuture. get ( ) ) ; System . out. println ( "completableFuture done ? " + completableFuture. isDone ( ) ) ; } private static String threadSleep ( ) { try { TimeUnit . DAYS . sleep ( 365 * 10 ) ; } catch ( InterruptedException e) { throw new RuntimeException ( e) ; } return "finishSleep" ; }
}
使用 Future 接口,我们无法异步组合这两个操作,只能同步完成,示例代码如下:
import java. util. concurrent. * ; public class FutureDemo { public static void main ( String [ ] args) throws ExecutionException , InterruptedException { ExecutorService executor = Executors . newSingleThreadExecutor ( ) ; Future < Integer > firstFuture = executor. submit ( ( ) -> firstMethod ( 1 ) ) ; int firstMethodResult = firstFuture. get ( ) ; System . out. println ( "firstMethodResult:" + firstMethodResult) ; Future < Integer > secondFuture = executor. submit ( ( ) -> secondMethod ( firstMethodResult) ) ; System . out. println ( "secondMethodResult:" + secondFuture. get ( ) ) ; executor. shutdown ( ) ; } private static int firstMethod ( int num) { return num; } private static int secondMethod ( int firstMethodResult) { return 2 + firstMethodResult; }
}
在上述示例代码中: 首先,通过 ExecutorService 提交一个返回 Future 的任务来调用 firstMethod; 接着,我们必须等到 firstMethod 返回结果,然后再执行 secondMethod 操作,整个流程就变成同步过程
CompletableFuture 是在不阻塞主线程的前提下,异步组合两个操作过程
import java. util. concurrent. * ; public class CompletableFutureDemo { public static void main ( String [ ] args) throws ExecutionException , InterruptedException { var finalResult = CompletableFuture . supplyAsync ( ( ) -> firstMethod ( 1 ) ) . thenApply ( firstMethodResult -> secondMethod ( firstMethodResult) ) ; System . out. println ( "finalResult:" + finalResult. get ( ) ) ; } private static int firstMethod ( int num) { return num; } private static int secondMethod ( int firstMethodResult) { return 2 + firstMethodResult; }
}
在上述示例代码中: 首先,通过 CompletableFuture.supplyAsync 方法,返回一个新的 CompletableFuture,该 CompletableFuture 是在 ForkJoinPool.commonPool() 中异步完成的,并且将结果值赋值给 Supplier;
注意CompletableFuture 异步关于异常的坑
程序存在异常,却返回成功,结果:接口返回成功,控制台没有打印错误信息,如下代码
@ApiOperation ( value = "异步执行异常测试" , code = 800 )
@GetMapping ( "/asyncException" )
public ResponseData < Object > asyncException ( ) { try { try { CompletableFuture . runAsync ( ( ) -> { int i = 1 / 0 ; } ) ; } catch ( Exception e) { log. error ( "异常信息: " + e. getMessage ( ) , e) ; throw new BusinessException ( e. getMessage ( ) ) ; } return new ResponseData < > ( StatusCodeEnum . SUCCESS_CODE . getStatusCode ( ) , "操作成功" ) ; } catch ( Exception e) { return new ResponseData < > ( StatusCodeEnum . ERROR_CODE . getStatusCode ( ) , "操作失败:" + e. getMessage ( ) ) ; }
}
解决方法一:异步调用join(),结果:接口返回失败,控制台打印异常日志
try { CompletableFuture . runAsync ( ( ) -> { int i = 1 / 0 ; } , CUSTOM_THREAD_POOL ) . join ( ) ;
} catch ( Exception e) { log. error ( "外层异常信息: " + e. getMessage ( ) , e) ; throw new BusinessException ( e. getMessage ( ) ) ;
}
解决方法二:异步调用get(),异步方法中get()是阻塞的,在使用时要设置超时时间,结果:接口返回成功,控制台打印异常信息
try { CompletableFuture . runAsync ( ( ) -> { int i = 1 / 0 ; } , CUSTOM_THREAD_POOL ) . get ( 2 , TimeUnit . SECONDS ) ;
} catch ( Exception e) { log. error ( "外层异常信息: " + e. getMessage ( ) , e) ; throw new BusinessException ( e. getMessage ( ) ) ;
}
解决方法三:异步调用exception(),结果:接口返回成功,控制台打印异步线程异常日志,主线程没有打印异常日志
try { CompletableFuture . runAsync ( ( ) -> { int i = 1 / 0 ; } , CUSTOM_THREAD_POOL ) . exceptionally ( e -> { log. error ( "异步运行异常信息: " + e. getMessage ( ) , e) ; throw new BusinessException ( e. getMessage ( ) ) ; } ) ;
} catch ( Exception e) { log. error ( "异常信息: " + e. getMessage ( ) , e) ; throw new BusinessException ( e. getMessage ( ) ) ;
}
解决方法四:异步调用whenComplete(),结果:结果返回成功,控制台打印异步线程异常信息,主线程没有打印异常信息
try { CompletableFuture . runAsync ( ( ) -> { int i = 1 / 0 ; } , CUSTOM_THREAD_POOL ) . whenComplete ( ( r, e) -> { if ( e != null ) { log. error ( "异步执行异常信息: " + e. getMessage ( ) , e) ; throw new BusinessException ( e. getMessage ( ) ) ; } } ) ;
} catch ( Exception e) { log. error ( "异常信息: " + e. getMessage ( ) , e) ; throw new BusinessException ( e. getMessage ( ) ) ;
}
解决方法五:异步调用handle(),结果:结果返回成功,控制台打印异步线程异常信息,主线程没有打印异常信息
try { CompletableFuture . runAsync ( ( ) -> { int i = 1 / 0 ; } , CUSTOM_THREAD_POOL ) . handle ( ( r, e) -> { if ( e != null ) { log. error ( "异步执行异常信息: " + e. getMessage ( ) , e) ; throw new BusinessException ( e. getMessage ( ) ) ; } return null ; } ) ;
} catch ( Exception e) { log. error ( "异常信息: " + e. getMessage ( ) , e) ; throw new BusinessException ( e. getMessage ( ) ) ;
}
总结:在使用异步CompletableFuture时,无论是否有返回值都要调用get()/join()方法,避免程序执行报错了,仍然返回成功。如果在程序报错时需要对上一个异步任务结果做其他操作,可以调用whenComplete()、handle()处理,如果只是对异常做处理,不涉及对上一个异步任务结果的情况,调用exceptionally()处理