CompletableFuture
从JDK 8开始,在Concurrent包中提供了一个强大的异步编程工具CompletableFuture。在JDK8之前,异步编程可以通过线程池和Future来实现,但功能还不够强大。CompletableFuture的出现,使Java的异步编程能力向前迈进了一大步。
CompletableFuture主要用处是将多个任务进行编排。
几个重要 Lambda 函数
CompletableFuture 在 Java1.8 的版本中出现,自然也得搭上 Lambda 的顺风车,为了更好的理解 CompletableFuture,这里我需要先介绍一下几个 Lambda 函数,我们只需要关注它们的以下几点就可以:

类结构

实现了 Future 接口 和 CompletionStage 接口
理解 CompletionStage
任务是有时序关系的,比如有串行关系、并行关系、汇聚关系等。
CompletionStage 接口的作用用于描述任务之间的时序关系,总结起来就是这个样子:


串行关系
开头的都是串行,then 后面的单词(比如run/apply/accept)就是上面说的函数式接口中的抽象方法名称了。1 2 3 4 5 6 7 8 9 10 11 12 13 14
| **thenApply 和 thenCompose区别** * thenApply 调用get方法返回的是函数返回值,是一个计算结果。 * thenCompose 返回的是一个CompletableFuture
```java CompletionStage<R> thenApply(fn); CompletionStage<R> thenApplyAsync(fn); CompletionStage<Void> thenAccept(consumer); CompletionStage<Void> thenAcceptAsync(consumer); CompletionStage<Void> thenRun(action); CompletionStage<Void> thenRunAsync(action); CompletionStage<R> thenCompose(fn); CompletionStage<R> thenComposeAsync(fn);
|
AND聚合
1 2 3 4 5 6
| CompletionStage<R> thenCombine(other, fn); CompletionStage<R> thenCombineAsync(other, fn); CompletionStage<Void> thenAcceptBoth(other, consumer); CompletionStage<Void> thenAcceptBothAsync(other, consumer); CompletionStage<Void> runAfterBoth(other, action); CopletionStage<Void> runAfterBothAsync(other, action);
|
OR聚合
1 2 3 4 5 6
| CompletionStage applyToEither(other, fn); CompletionStage applyToEitherAsync(other, fn); CompletionStage acceptEither(other, consumer); CompletionStage acceptEitherAsync(other, consumer); CompletionStage runAfterEither(other, action); CompletionStage runAfterEitherAsync(other, action);
|
异常处理
1 2 3 4 5
| CompletionStage exceptionally(fn); CompletionStage<R> whenComplete(consumer); CompletionStage<R> whenCompleteAsync(consumer); CompletionStage<R> handle(fn); CompletionStage<R> handleAsync(fn);
|
- exceptionally()的使用非常类似于try{}catch{}中的catch{}
- whenComplete(),类似于try{}finally{}中的finally{}, 不支持返回结果
- handle()系列方法就类似于try{}finally{}中的finally{},支持返回结果
CompletableFuture以thenApply为例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn) { return uniApplyStage(null, fn); }
private <V> CompletableFuture<V> uniApplyStage( Executor e, Function<? super T,? extends V> f) { if (f == null) throw new NullPointerException(); CompletableFuture<V> d = new CompletableFuture<V>(); if (e != null || !d.uniApply(this, f, null)) { UniApply<T,V> c = new UniApply<T,V>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; }
static final class UniApply<T,V> extends UniCompletion<T,V> { Function<? super T,? extends V> fn; UniApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T,? extends V> fn) { super(executor, dep, src); this.fn = fn; } final CompletableFuture<V> tryFire(int mode) { CompletableFuture<V> d; CompletableFuture<T> a; if ((d = dep) == null || !d.uniApply(a = src, fn, mode > 0 ? null : this)) return null; dep = null; src = null; fn = null; return d.postFire(a, mode); } }
final <S> boolean uniApply(CompletableFuture<S> a, Function<? super S,? extends T> f, UniApply<S,T> c) { Object r; Throwable x; if (a == null || (r = a.result) == null || f == null) return false; tryComplete: if (result == null) { if (r instanceof AltResult) { if ((x = ((AltResult)r).ex) != null) { completeThrowable(x, r); break tryComplete; } r = null; } try { if (c != null && !c.claim()) return false; @SuppressWarnings("unchecked") S s = (S) r; completeValue(f.apply(s)); } catch (Throwable ex) { completeThrowable(ex); } } return true; }
|
最终任务的调度逻辑在uniApply方法内。
CompletionService
假设我们有 4 个任务(A, B, C, D)用来执行复杂的计算,每个任务的执行时间随着输入参数的不同而不同。
ExecutorService实现
1 2 3 4 5 6 7 8 9 10 11 12
| ExecutorService executorService = Executors.newFixedThreadPool(4); List<Future> futures = new ArrayList<Future<Integer>>(); futures.add(executorService.submit(A)); futures.add(executorService.submit(B)); futures.add(executorService.submit(C)); futures.add(executorService.submit(D));
for (Future future:futures) { Integer result = future.get(); }
|
CompletionService实现同样的场景
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| ExecutorService executorService = Executors.newFixedThreadPool(4);
CompletionService executorCompletionService= new ExecutorCompletionService<>(executorService );
List<Future> futures = new ArrayList<Future<Integer>>(); futures.add(executorCompletionService.submit(A)); futures.add(executorCompletionService.submit(B)); futures.add(executorCompletionService.submit(C)); futures.add(executorCompletionService.submit(D));
for (int i=0; i<futures.size(); i++) { Integer result = executorCompletionService.take().get(); }
|
两种方式对比


原理
CompletionService的实现原理也是内部维护了一个阻塞队列,当任务执行结束就把任务的执行结果加入到
阻塞队列中,不同的是CompletionService是把任务执行结果的Future对象加入到阻塞队列中,而上面的示
例代码是把任务最终的执行结果放入了阻塞队列中。
参考
https://dayarch.top/p/java8-completablefuture-tutorial.html
https://dayarch.top/p/executorservice-vs-completionservice.html
java并发实现原理:JDK源码剖析
Java 并发编程之美