CompletableFuture
一、前言
Java 支持的多线程开启方式
根据 Oracle 官方出具的 Java 文档说明,创建线程的方式只有两种:继承 Thread 或者实现 Runnable 接口。但是这两种方法都存在一个缺陷,没有返回值,也就是说我们无法得知线程执行结果。虽然简单场景下已经满足,但是当我们需要返回值的时候怎么办呢?Java1.5 以后得 Callable 和 Future 接口就解决了这个问题,我们可以通过向线程池提交一个 Callable 来获取一个包含返回值的 Future 对象,从此,我们的程序逻辑就不再是同步顺序。
下面是 Java8 实战书籍的原文:
1Future接口在Java5中被引入,设计初衷是对将来某个时刻会产生的结果进行建模。它建模了一种异步运算,返回了一个执行结果的引用,当运算结束后,这个引用被返回给调用方。在Future中触发那些潜在耗时的操作完成,我们从最初的串行操作变成了并行,在异步的同时,我们还可以做其他事情来解决程序运行时间。
Future 接口的局限性
当我们得到包含结果的 Future 时,我们可以使用 get()方法等待线程完成并获取返回值,但是 Future 的 get()方法会阻塞主线程。Future 文档原文如下:
1A {@code Future} represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation.
翻译:
1{@Code Future}代表异步*计算的结果。提供了一些方法来检查计算是否完成,等待其完成并检索计算结果。
Future 执行耗时任务
由此我们可以得知,Future 获取线程执行结果之前,我们的主线程 get()得到结果需要一直阻塞等待,即使我们使用 isDone()方法轮训去查看线程执行状态,但是这样也非常浪费 cpu 资源。
当 Future 的线程进行了一个非常耗时的操作,那我们的主线程也就阻塞了。当我们在简单业务上,可以使用 Future 的另一个重载方法 get(long, TimeUnit)来设置超时时间,避免我们的主线程被无穷尽的阻塞。
我们需要更加强大的异步能力
不仅如此,当我们在碰到以下业务场景的时候,单纯使用 Future 接口或者 FutureTask 类并不能很好的完成以下我们所需的业务
- 将两个异步计算合并成一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果;
- 仅等待 Future 集合中最快结束的任务完成(有可能因为他们试图通过不同的方式计算同一个值),并返回它的结果;
- 通过编程方式完成一个 Future 任务的执行(即以手工设定异步操作结果的方式);
- 应对 Future 的完成时间(即当 Future 的完成时间完成时会收到通知,并能使用 Future 的计算结果进行下一步的操作,不只是简单地阻塞等待操作的结果);
二、正文
什么是 CompletableFuture
在 Java 8 中,新增加了一个包含 50 个方法左右的类:CompletableFuture,结合了 Future 的优点,提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合 CompletableFuture 的方法。
CompletableFuture 被设计在 Java 中进行异步编程。异步编程意味着在主线程之外创建一个独立的线程,与主线程分隔开,并在上面允许一个非阻塞的任务,然后通知主线程进展,成功或者失败。
通过这种方式,你的主线程不用为了任务的完成而阻塞/等待,你可以用主线程去并行执行其他的任务。使用这种并行方式,极大的提升了程序的表现。
Java 8 源码 doc 注释:
译文:
1当一个Future可能需要显示地完成时,使用CompletionStage接口去支持完成时触发的函数和操作。
2当2个以上线程同时尝试完成、异常完成、取消一个CompletableFuture时,只有一个能成功。
3
4CompletableFuture实现了CompletionStage接口的如下策略
5
61. 为了完成当前的CompletableFuture接口或者其他完成方法的回调函数的线程,提供了非异步的完成操作;
7
82. 没有显示入参Executor的所有async方法都是用ForkJoinPool.commonPool(),为了简化、调试和跟踪,所有生成的异步任务都是标记接口AsynchronousCompletionTask的实例;
9
103. 所有的CompletionStage方法都是独立于其他共有方法实现的,因此一个方法的行为不会受到子类中其他方法的覆盖;
11
12CompletableFuture实现了Future接口的如下策略:
13
141. CmopletableFuture无法直接控制完成,所以cancel操作被视为是另一种异常完成形式。方法isCompletedExceptionally可以用来确定一个CompletableFuture是否以任务异常的方式完成;
15
162. 以一个CompletionException为例,方法get()和get(long, TimeUnit)抛出一个ExecutionException,对应CompletionException。为了在大多数上下文中简化用法,这个类还定义了方法join()和getNow,而不是直接在这些情况中直接抛出CompletionException;
三、CompletableFuture API
实例化 CompletableFuture
实例化方式
1public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
2public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
3
4public static CompletableFuture<Void> runAsync(Runnable runnable);
5public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
6
有两种格式,一种是 supply 开头的方法,一种是 run 开头的方法
- supply 开头:这种方法可以返回异步线程执行之后的结果;
- run 开头:这种不会返回结果,就只是执行线程任务;
或者可以通过一个简单的无参构造器
1CompletableFuture<String> completableFuture = new CompletableFuture<String>();
2
小贴士:我们注意到,在实例化方法中,我们是可以指定 Executor 参数的,当我们不指定的时候,我们所开的并行线程使用的是默认系统及公共线程池 ForkJoinPool,而且这些线程都是守护线程。我们在编程的时候需要谨慎使用守护线程,如果将我们普通的用户线程设置成守护线程,当我们的程序主线程结束,JVM 中不存在其余用户线程,那么 CompletableFuture 的守护线程会直接退出,造成任务无法完成的问题。
1其中supplyAsync用于有返回值的任务,runAsync则用于没有返回值的任务。Executor参数可以手动指定线程池,否则默认ForkJoinPool.commonPool()系统级公共线程池,注意:这些线程都是Daemon线程,主线程结束Daemon线程不结束,只有JVM关闭时,生命周期终止。
获取结果
同步获取结果
1public T get();
2public T get(long timeout, TimeUnit unit);
3public T join();
4public T getNow(T valueIfAbsent);
简单的例子
1CompletableFuture<Integer> future = new CompletableFuture<>();
2Integer integer = future.get();
get() 方法同样会阻塞直到任务完成,上面的代码,主线程会一直阻塞,因为这种方式创建的 future 从未完成
前两个方法比较通俗易懂,认真看完上面 Future 部分的小伙伴肯定知道什么意思。getNow() 则有所区别,参数 valueIfAbsent 的意思是当计算结果不存在或者 Now 时刻没有完成任务,给定一个确定的值。
join() 与 get() 区别在与 join() 返回计算的结果或者抛出一个 unchecked 异常(CompletionException),而 get() 返回一个具体的异常。
计算完成后续操作 1 — complete
1public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
2public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
3public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
4public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
方法 1 和 2 的区别在于是否使用异步处理,2 和 3 的区别在于是否使用自定义的线程池,前三个方法都会提供一个返回结果和可抛出异常,我们可以使用 lambda 表达式来接收这两个参数,然后自己处理。方法 4,接收一个可抛出的异常,且必须 return 一个返回值,类型与钻石表达式中的类型一样;
1 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 10086);
2 future.whenComplete((result, error) -> {
3 System.out.println("拨打" + result);
4 error.printStackTrace();
5 });
计算完成后续操作 2 — handle
1public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
2public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
3public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
4
handle 方法集和上面的 complete 方法集没有区别,同样有两个参数一个返回值和可抛出异常,区别在于返回值,虽然同样返回 CompletableFuture 类型,但是里面的参数类型,handle 方法是可以自定义的。
1 // 开启一个异步方法
2 CompletableFuture<List<String>> future = CompletableFuture.supplyAsync(() -> {
3 List<String> list = new ArrayList<>();
4 list.add("语文");
5 list.add("数学");
6 // 获取今天的所有课程
7 return list;
8 });
9 // 使用handle()方法接受list数据和error异常
10 CompletableFuture<Integer> future2 = future.handle((list, error) -> {
11 // 如果报错,就打印出异常
12 error.printStackTrace();
13 // 如果不报错,返回一个包含Integer的全新的CompletableFuture
14 return list.size();
15 });
计算完成后续操作 3 — apply
1public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
2public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
3public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
为什么这三个方法都称作计算完成的后续操作 2 呢?因为 apply 方法和 handle 方法一样,都是结束计算之后的后续操作,唯一的不同是,handle 方法会给出异常,可以让用户自己在内部处理,而 apply 方法只有一个返回结果,如果异常了,会被直接抛出,交给上一层处理。如果不想每个链式调用都处理异常,那么就使用 apply 吧。
计算完成后续操作 4 — accept
1public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
2public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
3public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
accpet()三个方法只做最终结果的消费,注意此时返回的 CompletableFuture 是空返回,只消费,无返回,有点像流式编程的终端操作。
捕获中间产生的异常 — exceptionally
1public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
2
exceptionally()可以帮我们捕捉到所有中间过程的异常,方法会给我们一个异常作为参数,我们可以处理这个异常,同时返回一个默认值,跟服务降级有点像,默认值的类型和上一个操作的返回值相同。ps:向线程池提交任务的时候发生的异常属于外部异常,是无法捕捉到的,毕竟还没有开始执行任务。exceptionally()无法捕捉 RejectedExecutionException()。
1 // 实例化一个CompletableFuture, 返回值是Integer
2 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> null);
3
4 CompletableFuture<String> exceptionally = future.thenApply(result -> {
5 // 制造一个空指针异常NPE
6 return (int) result;
7 }).thenApply(result -> {
8 // 这里不会执行,因为上面出现了异常
9 return "现在是" + result + "点钟";
10 }).exceptionally(error -> {
11 error.printStackTrace();
12 return "出错啊~";
13 });
14 exceptionally.thenAccept(System.out::println);
最后输出结果
组合式异步编程
组合两个 CompletableFuture
- 将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。
thenApply()
假设一个场景,我是一个小学生,我想知道今天我需要上几门课程,此时我需要两个步骤,1. 根据我的名字获取我得学生信息 2. 根据我的学生信息查询课程 我们可以用下面这种方式来链式调用 api,使用上一步的结果进行下一步操作。
1 CompletableFuture<List<String>> future = CompletableFuture.supplyAsync(() -> {
2 // 查询学生信息,比如模拟返回学生ID
3 return 1L;
4 }).thenApply(studentId -> {
5 // 查询学生课程,模拟返回
6 List<String> list = new ArrayList<>();
7 list.add("语文");
8 list.add("数学");
9 return list;
10 });
11
我们模拟返回学生 ID,然后使用上一步得到的学生 ID 传递到 apply()方法在获取到学生今天的课程列表。
- 将两个异步计算合并为一个,这两个异步计算之间相互独立,互不依赖;
thenCompose()
假设一个场景,我是一个小学生,今天有劳技课和美术课,我需要查询到今天需要带什么东西到学校
1 CompletableFuture<List<String>> total = CompletableFuture.supplyAsync(() -> {
2 // 第一个任务获取美术课需要带的东西,返回一个list
3 List<String> stuff = new ArrayList<>();
4 stuff.add("画笔");
5 stuff.add("颜料");
6 return stuff;
7 }).thenCompose(list -> {
8 // 向第二个任务传递参数list(上一个美术课所需要的东西集合)
9 CompletableFuture<List<String>> insideFuture = CompletableFuture.supplyAsync(() -> {
10 List<String> stuff = new ArrayList<>();
11 // 第二个任务是获取劳技课所需的工具
12 stuff.add("剪刀");
13 stuff.add("折纸");
14 // 合并两个list,获取课程所需所有工具
15 return Stream.of(list, stuff).flatMap(Collection::stream).collect(Collectors.toList());
16 });
17 return insideFuture;
18 });
19 System.out.println(total.join().size());
我们通过 CompletableFuture.supplyAsync() 方法创建第一个任务,获取美术课所需的物品 list,然后使用 thenComponse() 接口传递 list 到第二个任务,然后第二个任务获取劳技课所需的物品,整合之后再返回。至此我们完成了两个任务的合并。
- 将两个异步计算合并为一个,这两个异步计算之间相互独立,互不依赖;
thenCombine()
还是上面那个场景,我是一个小学生,今天有劳技课和美术课,我需要查询到今天需要带什么东西到学校
1 CompletableFuture<List<String>> painting = CompletableFuture.supplyAsync(() -> {
2 // 第一个任务获取美术课需要带的东西,返回一个list
3 List<String> stuff = new ArrayList<>();
4 stuff.add("画笔");
5 stuff.add("颜料");
6 return stuff;
7 });
8 CompletableFuture<List<String>> handWork = CompletableFuture.supplyAsync(() -> {
9 List<String> stuff = new ArrayList<>();
10 // 第二个任务是获取劳技课所需的工具
11 stuff.add("剪刀");
12 stuff.add("折纸");
13 return stuff;
14 });
15 CompletableFuture<List<String>> total = painting.thenCombine(handWork, (stuff1, stuff2) -> {
16 // 传入handWork列表,然后得到两个CompletableFuture的参数stuff1和stuff2
17 // 合并成新的list
18 return Stream.of(stuff1, stuff2).flatMap(Collection::stream).collect(Collectors.toList());
19 });
20 System.out.println(JSON.toJSONString(total.join()));
- 等待 Future 集合中的任务都完成。
获取所有完成结果 — allOf
1public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
allOf 方法,当所有给定的任务完成后,返回一个全新的已完成 CompletableFuture
1 CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
2 // 使用sleep()模拟耗时操作
3 try {
4 TimeUnit.SECONDS.sleep(2);
5 } catch (InterruptedException e) {
6 throw new RuntimeException(e);
7 }
8 return 1;
9 });
10 CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 2);
11 CompletableFuture.allOf(future1, future2);
12
13 // 输出3
14 System.out.println(future1.join() + future2.join());
获取率先完成的任务结果 — anyOf
- 仅等待 Future 集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同一个值),并返回它的结果。PS:如果最快完成的任务出现了异常,也会先返回异常,如果害怕出错可以加个 exceptionally()去处理一下可能发生的异常并设定默认返回值。
1 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
2// throw new NullPointerException();
3// try {
4// TimeUnit.SECONDS.sleep(4);
5// } catch (InterruptedException e) {
6// throw new RuntimeException(e);
7// }
8 return 3;
9 });
10 CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
11 // 睡眠3s模拟延时
12 try {
13 TimeUnit.SECONDS.sleep(3);
14 } catch (InterruptedException e) {
15 throw new RuntimeException(e);
16 }
17 return 1;
18 });
19 CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future, future2).exceptionally(error -> {
20 error.printStackTrace();
21 return 2;
22 });
23 System.out.println(anyOf.join());
多个方法组合使用
- 通过编程方式完成一个 Future 任务的执行(即以手工设定异步操作结果的方式);
- 应对 Future 的完成时间(即当 Future 的完成时间完成时会收到通知,并能使用 Future 的计算结果进行下一步的操作,不只是简单的阻塞等待操作的结果);
1 CompletableFuture.supplyAsync(() -> 1)
2 .whenComplete((result, error) -> {
3 System.out.println(result);
4 error.printStackTrace();
5 })
6 .handle((result, error) -> {
7 error.printStackTrace();
8 return error;
9 })
10 .thenApply(Object::toString)
11 .thenApply(Integer::valueOf)
12 .thenAccept((param) -> System.out.println("done"));
循环创建并发任务
1 long begin = System.currentTimeMillis();
2 // 自定义一个线程池
3 ExecutorService executorService = Executors.newFixedThreadPool(10);
4 // 循环创建10个CompletableFuture
5 List<CompletableFuture<Integer>> collect = IntStream.range(1, 10).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
6 // 在i == 5的时候抛出一个NPE
7 if (i == 5) {
8 throw new NullPointerException();
9 }
10 try {
11 // 每个依次睡眠1-9秒,模拟线程耗时
12 TimeUnit.SECONDS.sleep(i);
13 } catch (InterruptedException e) {
14 throw new RuntimeException(e);
15 }
16 System.out.println(i);
17 return i;
18 }, executorService)
19 // 这里处理一下 i == 5时出现的NPE
20 // 如果这里不处理异常,那么异常会在所有任务完成后抛出
21 .exceptionally(error -> {
22 error.printStackTrace();
23 try {
24 TimeUnit.SECONDS.sleep(5);
25 } catch (InterruptedException e) {
26 throw new RuntimeException(e);
27 }
28 return 100;
29 })).collect(Collectors.toList());
30 // List列表转成CompletableFuture的Array数组,使其可以作为allOf()的参数
31 // 使用join()方法使得主线程阻塞,并等待所有并行线程完成
32 CompletableFuture.allOf(collect.toArray(new CompletableFuture[]{})).join();
33 System.out.println("最终耗时" + (System.currentTimeMillis() - begin) + "毫秒");
34 executorService.shutdown();
四、使用 CompletableFuture 场景
- 执行比较耗时的操作时,尤其是那些依赖一个或多个远程服务的操作,使用异步任务可以改善程序的性能,加快程序的响应速度;
- 使用 CompletableFuture 类,它提供了异常管理的机制,让你有机会抛出、管理异步任务执行中发生的异常;
- 使用这些异步任务之间相互独立,或者它们之间的某一些的结果是另一些的输入,你可以将这些异步任务构造或合并成一个;