0%

java并发08_CompletableFuture & CompletionService

CompletableFuture

从JDK 8开始,在Concurrent包中提供了一个强大的异步编程工具CompletableFuture。在JDK8之前,异步编程可以通过线程池和Future来实现,但功能还不够强大。CompletableFuture的出现,使Java的异步编程能力向前迈进了一大步。

CompletableFuture主要用处是将多个任务进行编排

几个重要 Lambda 函数

CompletableFuture 在 Java1.8 的版本中出现,自然也得搭上 Lambda 的顺风车,为了更好的理解 CompletableFuture,这里我需要先介绍一下几个 Lambda 函数,我们只需要关注它们的以下几点就可以:

  • 参数接受形式
  • 返回值形式
  • 函数名称

类结构

实现了 Future 接口 和 CompletionStage 接口

理解 CompletionStage

任务是有时序关系的,比如有串行关系、并行关系、汇聚关系等。
CompletionStage 接口的作用用于描述任务之间的时序关系,总结起来就是这个样子:

串行关系

开头的都是串行,then 后面的单词(比如run/apply/accept)就是上面说的函数式接口中的抽象方法名称了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14

**thenApply 和 thenCompose区别**
* thenApply 调用get方法返回的是函数返回值,是一个计算结果。
* thenCompose 返回的是一个CompletableFuture

```java
CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);

AND聚合

1
2
3
4
5
6
CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CopletionStage<Void> runAfterBothAsync(other, action);

OR聚合

1
2
3
4
5
6
CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);

异常处理

1
2
3
4
5
CompletionStage	exceptionally(fn); 
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);
  1. exceptionally()的使用非常类似于try{}catch{}中的catch{}
  2. whenComplete(),类似于try{}finally{}中的finally{}, 不支持返回结果
  3. handle()系列方法就类似于try{}finally{}中的finally{},支持返回结果

CompletableFuture以thenApply为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
   public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}

private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
if (e != null || !d.uniApply(this, f, null)) {
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}

static final class UniApply<T,V> extends UniCompletion<T,V> {
Function<? super T,? extends V> fn;
UniApply(Executor executor, CompletableFuture<V> dep,
CompletableFuture<T> src,
Function<? super T,? extends V> fn) {
super(executor, dep, src); this.fn = fn;
}
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d; CompletableFuture<T> a;
if ((d = dep) == null ||
!d.uniApply(a = src, fn, mode > 0 ? null : this))
return null;
dep = null; src = null; fn = null;
return d.postFire(a, mode);
}
}

final <S> boolean uniApply(CompletableFuture<S> a,
Function<? super S,? extends T> f,
UniApply<S,T> c) {
Object r; Throwable x;
if (a == null || (r = a.result) == null || f == null)
return false;
tryComplete: if (result == null) {
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
completeThrowable(x, r);
break tryComplete;
}
r = null;
}
try {
if (c != null && !c.claim())
return false;
@SuppressWarnings("unchecked") S s = (S) r;
completeValue(f.apply(s));
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}

最终任务的调度逻辑在uniApply方法内。

CompletionService

假设我们有 4 个任务(A, B, C, D)用来执行复杂的计算,每个任务的执行时间随着输入参数的不同而不同。

ExecutorService实现

1
2
3
4
5
6
7
8
9
10
11
12
ExecutorService executorService = Executors.newFixedThreadPool(4);
List<Future> futures = new ArrayList<Future<Integer>>();
futures.add(executorService.submit(A));
futures.add(executorService.submit(B));
futures.add(executorService.submit(C));
futures.add(executorService.submit(D));

// 遍历 Future list,通过 get() 方法获取每个 future 结果
for (Future future:futures) {
Integer result = future.get();
// 其他业务逻辑
}

CompletionService实现同样的场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ExecutorService executorService = Executors.newFixedThreadPool(4);

// ExecutorCompletionService 是 CompletionService 唯一实现类
CompletionService executorCompletionService= new ExecutorCompletionService<>(executorService );

List<Future> futures = new ArrayList<Future<Integer>>();
futures.add(executorCompletionService.submit(A));
futures.add(executorCompletionService.submit(B));
futures.add(executorCompletionService.submit(C));
futures.add(executorCompletionService.submit(D));

// 遍历 Future list,通过 get() 方法获取每个 future 结果
for (int i=0; i<futures.size(); i++) {
Integer result = executorCompletionService.take().get();
// 其他业务逻辑
}

两种方式对比

原理

CompletionService的实现原理也是内部维护了一个阻塞队列,当任务执行结束就把任务的执行结果加入到
阻塞队列中,不同的是CompletionService是把任务执行结果的Future对象加入到阻塞队列中,而上面的示
例代码是把任务最终的执行结果放入了阻塞队列中。

参考

https://dayarch.top/p/java8-completablefuture-tutorial.html

https://dayarch.top/p/executorservice-vs-completionservice.html

java并发实现原理:JDK源码剖析

Java 并发编程之美