赞
踩
提示:本文介绍的CompletableFuture用法都是基于JDK 1.8
随着业务系统的不断复杂化,业务数据量的不断增加,单线程有时已经很难满足现有的系统要求。为了提升用户体验,有时不得不通过异步多线程的代码去加快系统响应速度。这个时候,CompletableFuture 就可以帮大忙了。
CompletableFuture是 Java JUC 包中的一个工具类,它继承了 Future 接口和 CompletionStage 接口。
Future 可以获取到异步执行代码块的返回值,包含了以下五个方法,相对比较简单。
CompletionStage 可以针对任务执行的步骤进行一系列细粒度操作。方法比较多,这里就不列出,在下面具体使用时进行介绍。
runAsync 方法会开启一个新的线程去执行对应的任务,不存在返回值。并且可以指定对应的线程池执行,不指定的话会使用一个内部默认的线程池 asyncPool。
package com.curtis.demo.future; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.function.Supplier; /** * @author Curtis * @since 2024-04-19 10:56 */ public class CompletableFutureTest { public static void main(String[] args) { // 使用Runnable Runnable runnable = () -> { System.out.println(Thread.currentThread().getName() + ": execute CompletableFuture.runAsync"); }; CompletableFuture.runAsync(runnable); // 使用Runnable 简写 CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName() + ": execute CompletableFuture.runAsync"); }); // 指定线程池(默认使用CompletableFuture的asyncPool) ExecutorService executorService = Executors.newSingleThreadExecutor(); CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName() + ": execute CompletableFuture.runAsync"); }, executorService); // 关闭线程池 executorService.shutdown(); } }
输出结果如下:可以观察到默认的线程池为ForkJoinPool.commonPool
ForkJoinPool.commonPool-worker-1: execute CompletableFuture.runAsync
ForkJoinPool.commonPool-worker-1: execute CompletableFuture.runAsync
pool-1-thread-1: execute CompletableFuture.runAsync
supplyAsync 方法会开启一个新的线程去执行对应的任务,存在返回值。同时可以指定对应的线程池执行,不指定线程池会使用默认的线程池asyncPool。
package com.curtis.demo.future; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.function.Supplier; /** * @author Curtis * @since 2024-04-19 10:56 */ public class CompletableFutureTest { public static void main(String[] args) { // 存在返回值 Supplier<String> supplier = () -> { System.out.println("supplier"); return "supplier execute"; }; CompletableFuture.supplyAsync(supplier); // 存在返回值简写 CompletableFuture.supplyAsync(() -> { System.out.println("supplier"); return "supplier execute"; }); // 指定线程池 ExecutorService executorService = Executors.newSingleThreadExecutor(); CompletableFuture.supplyAsync(() -> { System.out.println("supplier"); return "supplier execute"; }, executorService); // 关闭线程池 executorService.shutdown(); } }
输出结果如下:可以观察到默认的线程池为ForkJoinPool.commonPool
ForkJoinPool.commonPool-worker-1: supplier
ForkJoinPool.commonPool-worker-1: supplier
pool-1-thread-1: supplier
completedFuture 方法可以创建出一个具有指定value结果的 CompletableFuture,可能有人会问,这有什么作用?别急,可以在后续的一些方法中使用到。
例如:CompletableFuture.thenCombine 可以接收一个 completableFuture 参数和一个函数式接口。对 completableFuture 参数的结果和当前 completableFuture 实例结果进行函数式接口操作,这时候需要使用到 completedFuture 创建一个实例。
另外 thenAccept 可以接受一个 Consumer 函数式接口,对于 CompletableFuture 实例结果进行自定义操作。
package com.curtis.demo.future; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; /** * @author Curtis * @since 2024-04-19 10:56 */ public class CompletableFutureTest { public static void main(String[] args) { // 创建返回值为supplyAsync1的CompletableFuture CompletableFuture<String> completableFuture1 = CompletableFuture.completedFuture("supplyAsync1"); try { // get阻塞等待任务执行结束,最后获取结果 System.out.println(completableFuture1.get()); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } // 创建一个具有world返回值的completableFuture CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("world"); // 模拟一段业务逻辑之后,返回了一个值 CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> { // mock一段业务逻辑的返回值(假装执行完一段业务逻辑) return "hello"; }); // completableFuture2接收completableFuture参数 // 和一个BiFunction参数, r1, r2, 分别为两个completableFuture的返回值 // thenAccept可以接收一个Consumer 例如 r -> {}, r为thenCombine之后生成的CompletableFuture的结果 completableFuture2.thenCombine(completableFuture, (r1, r2) -> { return r1 + r2; }).thenAccept(r -> System.out.println(r)); } }
输出结果:
supplyAsync1
helloworld
执行 getNow 方法时将会判断当前任务是否执行结束,若已执行结束则会任务执行完的结果,但是若未执行完,则会直接结束任务返回指定的value结果。
package com.curtis.demo.future; import java.util.concurrent.CompletableFuture; /** * @author Curtis * @since 2024-04-19 10:56 */ public class CompletableFutureTest { public static void main(String[] args) { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { return "completableFuture: hello"; }); // completableFuture未执行结束, 调用getNow 将直接返回指定的value System.out.println(completableFuture.getNow("completableFuture: customer value")); CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> { return "completableFuture2: hello"; }); // 这里睡眠一秒钟确保completableFuture2任务已经执行结束 try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } // 执行结束调用getNow将返回任务执行结果 System.out.println(completableFuture2.getNow("completableFuture2: customer value")); } }
结果如下:和预期料想的一样
completableFuture: customer value
completableFuture2: hello
isDone 方法判断当前CompletableFuture任务是否已经执行结束,不论是否正常结束,也就是即使抛出异常或者被取消都会返回true。
isCancelled 方法判断当前CompletableFuture任务是否被取消。
cancel 方法取消当前CompletableFuture任务。
package com.curtis.demo.future; import java.util.concurrent.CompletableFuture; /** * @author Curtis * @since 2024-04-19 10:56 */ public class CompletableFutureTest { public static void main(String[] args) { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { return "completableFuture: hello"; }); try { Thread.sleep(500L); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("completableFuture: isDone ? " + completableFuture.isDone()); CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new RuntimeException(e); } return "completableFuture: hello"; }); // 取消任务, 方便查看cancel之后 isDone结果 completableFuture2.cancel(true); System.out.println("completableFuture2: isCancelled ? " + completableFuture2.isCancelled()); System.out.println("completableFuture2: isDone ? " + completableFuture2.isDone()); CompletableFuture<String> completableFuture3 = CompletableFuture.supplyAsync(() -> { // 抛出异常, 查看抛出异常isDone结果 throw new RuntimeException("eee"); }); try { Thread.sleep(500L); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("completableFuture3: isDone ? " + completableFuture3.isDone()); } }
输出结果
completableFuture: isDone ? true
completableFuture2: isCancelled ? true
completableFuture2: isDone ? true
completableFuture3: isDone ? true
执行 get 方法会阻塞当前线程执行,等待任务执行结束,并获取到返回值。
执行 join 方法之后会优先执行当前 CompletableFuture 的任务,任务执行结束才会执行后续代码。
执行 complete 方法时,若方法未执行结束会直接结束任务,并设置自定义结果。
执行 completeExceptionally 方法时,若方法未执行结束会直接结束任务,并在执行 get 方法时,将直接抛出设置的异常。
package com.curtis.demo.future; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * @author Curtis * @since 2024-04-19 10:56 */ public class CompletableFutureTest { public static void main(String[] args) { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "CompletableFuture supplyAsync"); try { // 阻塞并获取结果 System.out.println(completableFuture.get()); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> { // 设置睡眠2s确保超时 try { Thread.sleep(2000); } catch (InterruptedException e) { throw new RuntimeException(e); } return "CompletableFuture supplyAsync"; }); try { // 设置超时时间阻塞并获取结果 System.out.println(completableFuture2.get(1, TimeUnit.SECONDS)); } catch (InterruptedException | ExecutionException | TimeoutException e) { e.printStackTrace(); } // 直接完成任务,若任务未完成返回值设置为value,若已完成则不做处理 CompletableFuture<String> completableFuture3 = CompletableFuture.supplyAsync(() -> "completableFuture3 supplyAsync"); try { // 设置睡眠时间 Thread.sleep(1000L); } catch (InterruptedException e) { throw new RuntimeException(e); } // 直接结束任务并设置自定义结果 completableFuture3.complete("completableFuture3 customer value"); try { System.out.println(completableFuture3.get()); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } // 直接结束任务,如果任务未完成,则在调用get时抛出指定的异常。若已完成则不做处理 CompletableFuture<String> completableFuture4 = CompletableFuture.supplyAsync(() -> "completableFuture4 supplyAsync"); // 睡眠一秒则任务执行结束不会抛出异常 // try { // Thread.sleep(1000); // } catch (InterruptedException e) { // throw new RuntimeException(e); // } completableFuture4.completeExceptionally(new RuntimeException()); try { System.out.println(completableFuture4.get()); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } CompletableFuture<Void> completableFuture5 = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName() + ": completableFuture5 runAsync"); // 设置睡眠时间 try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } }); // 主线程等待completableFuture5任务执行结束 才会执行后续代码 completableFuture5.join(); System.out.println(Thread.currentThread().getName() + ": 执行我!!!!"); } }
输出结果:
CompletableFuture supplyAsync // completableFuture2 超时结果 java.util.concurrent.TimeoutException at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at com.curtis.demo.future.CompletableFutureTest.main(CompletableFutureTest.java:34) completableFuture3 supplyAsync // completableFuture4 睡眠执行结果 completableFuture4 supplyAsync // completableFuture4 未睡眠 Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException at com.curtis.demo.future.CompletableFutureTest.main(CompletableFutureTest.java:64) Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at com.curtis.demo.future.CompletableFutureTest.main(CompletableFutureTest.java:62) Caused by: java.lang.RuntimeException at com.curtis.demo.future.CompletableFutureTest.main(CompletableFutureTest.java:60) // completableFuture5 join, 可以看出等待completableFuture5执行结束再执行主线程代码 ForkJoinPool.commonPool-worker-1: completableFuture5 runAsync main: 执行我!!!!
thenApply 获得 completableFuture 的执行结果,并作为参数进行进下一步处理。参数为一个函数式接口 Function,而 thenApplyAsync 只是在 thenApply 的基础上,是一个异步处理,可以指定线程池执行,无其他差异。
package com.curtis.demo.future; import java.util.concurrent.*; /** * @author Curtis * @since 2024-04-19 10:56 */ public class CompletableFutureTest { public static void main(String[] args) { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "CompletableFuture supplyAsync"); completableFuture.thenApply((r) -> r + " -- thenApply") .thenAccept(r -> System.out.println(Thread.currentThread().getName() + ": " + r)); // 异步 completableFuture.thenApply((r) -> r + " -- thenApply") .thenAcceptAsync(r -> System.out.println(Thread.currentThread().getName() + ": " + r)); // 异步并指定线程 ExecutorService executorService = Executors.newSingleThreadExecutor(); completableFuture.thenApply((r) -> r + " -- thenApply") .thenAcceptAsync(r -> System.out.println(Thread.currentThread().getName() + ": " + r), executorService); // 关闭线程池 executorService.shutdown(); } }
输出结果:可以看出执行线程的差异,未指定线程会有默认的线程池线程执行。
main: CompletableFuture supplyAsync -- thenApply
ForkJoinPool.commonPool-worker-1: CompletableFuture supplyAsync -- thenApply
pool-1-thread-1: CompletableFuture supplyAsync -- thenApply
package com.curtis.demo.future; import java.util.concurrent.*; import java.util.function.Consumer; /** * @author Curtis * @since 2024-04-19 10:56 */ public class CompletableFutureTest { public static void main(String[] args) { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "CompletableFuture supplyAsync"); // 这里不再写简写,将Consumer直接创建出来,便于加深大家的印象 Consumer<String> consumer = (r) -> System.out.println(Thread.currentThread().getName() + ": consumer " + r); completableFuture.thenAccept(consumer); // 这里异步执行, 可以指定线程池 completableFuture.thenAcceptAsync(consumer); ExecutorService executorService = Executors.newSingleThreadExecutor(); completableFuture.thenAcceptAsync(consumer, executorService); executorService.shutdown(); } }
输出结果:可以看出执行线程的差异,未指定线程会有默认的线程池线程执行。
main: consumer CompletableFuture supplyAsync
ForkJoinPool.commonPool-worker-1: consumer CompletableFuture supplyAsync
pool-1-thread-1: consumer CompletableFuture supplyAsync
thenRun 执行指定的 Runnable ,而 thenRunAsync 也仅是进行异步执行。相信大家都发现了,方法上加了 Async 仅仅至少代表异步而已。
package com.curtis.demo.future; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; /** * @author Curtis * @since 2024-04-19 10:56 */ public class CompletableFutureTest { public static void main(String[] args) { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "CompletableFuture supplyAsync"); completableFuture.thenRun(() -> { try { System.out.println(Thread.currentThread().getName() + ": thenRun" + completableFuture.get()); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }); completableFuture.thenRunAsync(() -> { try { System.out.println(Thread.currentThread().getName() + ": thenRun" + completableFuture.get()); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }); ExecutorService executorService = Executors.newSingleThreadExecutor(); completableFuture.thenRunAsync(() -> { try { System.out.println(Thread.currentThread().getName() + ": thenRun" + completableFuture.get()); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }, executorService); // 关闭线程池 executorService.shutdown(); } }
执行结果:
main: thenRunCompletableFuture supplyAsync
ForkJoinPool.commonPool-worker-1: thenRunCompletableFuture supplyAsync
pool-1-thread-1: thenRunCompletableFuture supplyAsync
thenAcceptBoth 获取到两个CompletableFuture实例的结果,作为 BiConsumer 的参数传入,执行对应的Consumer逻辑。acceptEither 接收执行最快的CompletableFuture实例的结果,作为 Consumer 参数传入,执行对应的Consumer逻辑。
package com.curtis.demo.future; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; /** * @author Curtis * @since 2024-04-19 10:56 */ public class CompletableFutureTest { public static void main(String[] args) { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { try { // 保证completableFuture2优先结束 Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } return "CompletableFuture supplyAsync"; }); CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> "CompletableFuture2 supplyAsync"); // 获取到completableFuture2和completableFuture的结果, 针对结果进行一系列操作 completableFuture.thenAcceptBoth(completableFuture2, (result1, result2) -> { System.out.println(Thread.currentThread().getName() + ": result1: " + result1); System.out.println(Thread.currentThread().getName() + ": result2: " + result2); }); // 获取到completableFuture2和completableFuture最先执行结束的结果, 针对结果进行一系列操作 completableFuture.acceptEither(completableFuture2, (r) -> { System.out.println(Thread.currentThread().getName() + ": r: " + r); }); // 为了保证主线程不结束,才能看到completableFuture执行结果的打印 try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
需要注意的是:若代码执行到acceptEither或thenAcceptBoth的位置时,不存在执行结束的任务,这个时候主线程不再关心这个代码的执行,会交给CompletableFuture的默认线程池去执行
执行结果:
main: r: CompletableFuture2 supplyAsync
ForkJoinPool.commonPool-worker-1: result1: CompletableFuture supplyAsync
ForkJoinPool.commonPool-worker-1: result2: CompletableFuture2 supplyAsync
// 在CompletableFuture2中加入睡眠一秒钟代码会发现执行结果会变成
ForkJoinPool.commonPool-worker-2: r: CompletableFuture2 supplyAsync
ForkJoinPool.commonPool-worker-1: result1: CompletableFuture supplyAsync
ForkJoinPool.commonPool-worker-1: result2: CompletableFuture2 supplyAsync
package com.curtis.demo.future; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; /** * @author Curtis * @since 2024-04-19 10:56 */ public class CompletableFutureTest { public static void main(String[] args) { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "CompletableFuture.supplyAsync"); CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> "CompletableFuture2.supplyAsync"); // 在两个CompletableFuture都执行结束后执行 completableFuture.runAfterBoth(completableFuture2, () -> { System.out.println("runAfterBoth"); try { System.out.println(completableFuture2.get()); System.out.println(completableFuture.get()); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }); try { // 在两个completableFuture任意一个执行结束后,获取结果对结果进行处理 System.out.println(completableFuture.applyToEither(completableFuture2, (r) -> "completableFuture.applyToEither" + r).get()); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } // 在两个completableFuture任意一个执行结束后执行 completableFuture.runAfterEither(completableFuture2, () -> System.out.println("completableFuture.runAfterEither")); } }
输出结果:
runAfterBoth
CompletableFuture2.supplyAsync
CompletableFuture.supplyAsync
completableFuture.applyToEitherCompletableFuture.supplyAsync
completableFuture.runAfterEither
thenCompose 可以针对 Completable 进行组合式异步操作,例如:构造了一个异步链
package com.curtis.demo.future; import java.util.concurrent.CompletableFuture; /** * @author Curtis * @since 2024-04-19 10:56 */ public class CompletableFutureTest { public static void main(String[] args) { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "CompletableFuture.supplyAsync"); // 可以组合 completableFuture 执行异步任务链 completableFuture.thenCompose((s) -> { // completableFuture执行结束后拿到结果s继续执行另一个任务 return CompletableFuture.supplyAsync(() -> { return "anOtherCompletableFuture supplyAsync --- " + s; }); }).thenCompose((r) -> { // 上一个 thenCompose 执行结束后拿到结果r继续执行另一个任务 return CompletableFuture.supplyAsync(() -> { return "thenCompose result"; }); }).thenAccept(System.out::println); } }
输出结果:
thenCompose result
whenComplete 在任务结束时执行,正常结束是会接收到result,异常结束时会接收到异常信息。该方法,没有返回值,只能对于异常进行一些日志记录等。而 handler 方法会在任务结束时执行,同样可以获取到执行结果和异常信息。区别在于 handler 方法能够修改任务执行结果。
package com.curtis.demo.future; import java.util.concurrent.CompletableFuture; /** * @author Curtis * @since 2024-04-19 10:56 */ public class CompletableFutureTest { public static void main(String[] args) { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { // return "CompletableFuture.supplyAsync"; // 抛出异常查看打印结果 throw new RuntimeException(); }); // 当任务执行结束时执行,可以获取到执行结果和异常信息 completableFuture.whenComplete((result, ex) -> { System.out.println(result); if (ex != null) { ex.printStackTrace(); } }); // 当任务执行结束时执行,可以获取到执行结果和异常信息,并且可以修改返回值 completableFuture.handle((result, ex) -> { System.out.println(result); if (ex != null) { // ex.printStackTrace(); // 修改执行结果 return "customer result"; } return result; }); } }
执行结果:
null java.util.concurrent.CompletionException: java.lang.RuntimeException at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) Caused by: java.lang.RuntimeException at com.curtis.demo.future.CompletableFutureTest.lambda$main$0(CompletableFutureTest.java:15) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ... 5 more null customer result
CompletableFuture 内部封装了一系列针对异步任务的操作,能够在细粒度的一些节点进行一些自定义操作,并且可以获取到异步执行后的返回结果,非常灵活。
名称上包含 Async 的方法仅代表是该方法异步操作,并且可以指定线程池执行,默认使用内部线程池。
灵活得组合方法进行使用,可以基本覆盖实现所有场景。优雅,太优雅了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。