0%

java并发09_ForkJoin框架整体介绍

ForkJoin框架

ForkJoin 是 JDK 特地为分治算法实现的一个框架。适合在CUP密集场景下使用。

ForkJoin 有两大核心思想:分治和窃取

分治思想

分治法的基本思想是将一个规模为N的问题分解为K个规模较小的子问题,这些子问题的相互独立且与原问题的性质相同,求出子问题的解之后,将这些解合并,就可以得到原有问题的解。

工作窃取

工作窃取是指当某个线程的任务队列中没有可执行任务的时候,从其他线程的任务队列中窃取任务来执行,以充分利用工作线程的计算能力,减少线程由于获取不到任务而造成的空闲浪费。在ForkJoinpool中,工作任务的队列都采用双端队列Deque容器。我们知道,在通常使用队列的过程中,我们都在队尾插入,而在队头消费以实现FIFO。而为了实现工作窃取。一般我们会改成工作线程在工作队列上LIFO,而窃取其他线程的任务的时候,从队列头部取获取。示意图如下:

窃取的具体实现体现了递归的思想,因此ForkJoin框架使用不当,会出现栈溢出的情况。

主要成员

ForkJoinTask

ForkJoinPool 唯一可以接受的任务类型,RecursiveTask 和 RecursiveAction 都是 ForkJoinTask 的子类,如果直接提交 Callable 或 Runnable 也会被自动包装成 ForkJoinTask;

ForkJoinTask存在多个内部类,对这些场景做了适配

  • AdaptedRunnable
  • AdaptedRunnableAction
  • RunnableExecuteAction
  • AdaptedCallable

同时通过ForkJoinPool进行提交的时候,对传入的对象,进行了一次包装

1
2
3
4
5
public <T> ForkJoinTask<T> submit(Runnable task, T result) {
ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result);
externalPush(job);
return job;
}

fork方法

fork方法并不是如其方法名会fork一个新线程来执行任务,只是将任务提交到任务队列中而已,然后立即返回,不会等待任务执行完成,其实现如下:

1
2
3
4
5
6
7
8
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) //判断是不是ForkJoinWorkerThread,push到worker线程的队列中去
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this); //不是worker线程,直接提交到common线程池的任务队列中
return this;
}

join / quietlyJoin

join方法用于阻塞当前线程,等待任务执行完成并返回结果。部分情形下会通过当前线程执行任务,如果异常结束或者被取消需要抛出异常;

quietlyJoin 加入此任务,而不返回其结果或异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
 public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL) //运行doJoin逻辑,判断结果;如果任务非正常结束
reportException(s); // 如果是CANCELLED(取消),抛出取消异常;如果是EXCEPTIONAL(任务异常),抛出相关异常;
return getRawResult();// 任务正常结束,获取结果 V
}

public final void quietlyJoin() {
doJoin(); //只是等待任务执行完成
}

private void reportException(int s) {
if (s == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL)
rethrow(getThrowableException());
}

private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s : //status小于0说明任务已结束,直接返回
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? //如果当前线程是ForkJoinWorkerThread,执行tryUnpush,
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s ://如果tryUnpush返回true且或者doExec返回小于0,直接返回状态s(s被重新赋值过)
wt.pool.awaitJoin(w, this, 0L) : // 否则调用awaitJoin方法等待并返回状态
externalAwaitDone(); //如果当前线程是普通的Java线程
}

final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);//如果任务调用成功,s为负数
}
return s;
}

invoke / quietlyInvoke

invoke,当前线程会立即执行当前任务并返回结果。如果doExec方法返回值大于等于0说明还有其他的子任务未完成,则等待其他子任务执行完成,典型的应用场景就是CountedCompleter,RecursiveAction和RecursiveTask通常doExec返回值小于0,会在compute方法即执行exec方法时等待所有的子任务执行完成;
quietlyInvoke,当前线程会立即执行当前任务,不返回结果或者抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final V invoke() {
int s;
if ((s = doInvoke() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}

private int doInvoke() {
int s; Thread t; ForkJoinWorkerThread wt;
return (s = doExec()) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(wt = (ForkJoinWorkerThread)t).pool.
awaitJoin(wt.workQueue, this, 0L) :
externalAwaitDone();
}

get之间的区别

get方法继承自 Future类。
内部实现逻辑和Join大致相同。

1
2
3
4
5
6
7
8
9
10
public final V get() throws InterruptedException, ExecutionException {
int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
doJoin() : externalInterruptibleAwaitDone();
Throwable ex;
if ((s &= DONE_MASK) == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
throw new ExecutionException(ex);
return getRawResult();
}

join/invoke/get之间的区别

join方法用于让当前线程阻塞,直到对应的子任务完成运行并返回执行结果。(调用join方法之前必须要吧task提交到线程池,否则会一直阻塞。

invoke,立即执行任务,任务无需提交到线程池(无需调用fork)

get,方法继承自 Future类,阻塞并返回结果。

ForkJoinPool

ForkJoin 专用线程池。下面会详细说明

WorkQueue

ForkJoin 专用工作队列。

WorkQueue 里面的成员变量有很多,这里我们只关注其中一部分。

scanState,32 位的 int 变量,记录了四个信息:

  • 线程的 inactive 状态;
  • 版本号;
  • 队列索引值;
  • scanning 状态。

ForkJoinTask 数组的存取如下图所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14

* array 初始容量 8192;
* 第一个任务放在 4096,似乎是因为操作系统内存的原因;
* 8191 的位置放入任务之后,还是会回到 0 的位置;
* 初始 base = top = 4096;
* 从上面放入一个任务 top + 1,不会从下面放入任务;
* LIFO 模式自己线程从上面取走任务 top - 1;
* FIFO 模式自己线程从下面取走任务 base + 1;
* 被其它线程从下面窃取任务,base + 1,其它线程不会从上面窃取任务;
* 数组 size 由 top - base 获得;
* 从 8191 回到 0 之后,top 和 base 会继续往上加,索引值通过取余获得。
* pop操作也仅限于工作线程。


关于WorkQueue的思考

为什么作者会自己实现一个Deque,而不是使用juc中已存在的容器?

1
2
3
这就是因为这个队列全程都是采用Unsafe来实现的,在开篇作者也说了,需要@Contented修饰,就是为了避免缓存的伪代共享。这样来实现一个高效的Deque,以供ForkJoinPool来操作。
这与学习ConcurrentHashMap等容器的源码一样,可以看出作者为了性能的优化,采用了很多独特的方式来实现。这些地方都是我们值得学习和借鉴之处。这也是ForkJoin性能高效的关键。在作者的论文中也可以看出,java的实现,由于抽象在jvm之上,性能比c/c++的实现要低很多。这也是作者尽可能将性能做到最优的原因之一。

WorkQueue对队列怎么加锁的?

贴一个poll的源码,底层基于Unsafe的cas对象方法来实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Takes next task, if one exists, in FIFO order.
*/
final ForkJoinTask<?> poll() {
ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
//判断 base-top小于0说明存在task 切array不为空
while ((b = base) - top < 0 && (a = array) != null) {
//计算出unsafe操作的索引 实际上就是拿到b
int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
//之后拿到这个task 用volatile的方式
t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
//之后如果base和b相等
if (base == b) {
//如果拿到的task不为空
if (t != null) {
//那么将这个位置的元素移除 base+1 然后返回t
if (U.compareAndSwapObject(a, j, t, null)) {
base = b + 1;
return t;
}
}
//在上述操作之后,如果base比top小1说明已经为空了 直接退出循环
else if (b + 1 == top) // now empty
break;
}
}
//默认返回null
return null;
}

ForkJoinWorkerThread

ForkJoin 专用工作线程。

ForkJoinPool常量以及成员变量

  • ForkJoinPool 里面有一个 WorkQueue 数组;
  • 每个 WorkQueue 里面都有一个 ForkJoinTask 数组,存放提交的任务;
  • WorkQueue 数组:
    • 偶数位,存放外部调用提交的任务,如通过execute和submit等方法。这种队列称为SubmissionQueue。
    • 奇数位,存放前者(偶数位)在执行过程种通过fork方法产生的新任务。这种队列称为workQueue。

奇数位的WorkQueue有自己的ForkJoinWorkerThread为成员变量owner;偶数下标的没有,即owner为null

重要变量

ForkJoinPool.config 变量:

  • 高 16 位存储 mode 信息,包括:
    • SHARED_QUEUE:共享队列,没有自己的工作线程,只能被其它线程窃取任务;
    • FIFO_QUEUE:先进先出队列;
    • LIFO_QUEUE:后进先出队列;
  • 低 16 位存储并发度,parallelism size,也可以理解为类似于 ThreadPoolExecutor 的 core pool size,但创建额外线程的逻辑不太一样。

ForkJoinPool.runState 变量:

  • 第 1 位的 1 表示 RSLOCK 状态,runState 变量被锁,其它线程暂时不可以修改;
  • 第 2 位的 1 表示 RSIGNAL 状态,线程阻塞等其它线程释放 RSLOCK;
  • 其它状态同 ThreadPoolExecutor。

ForkJoinPool构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13

public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}

  • parallelism 表示ForkJoinPool支持的最大并行度
  • factory 用于产生线程的工厂方法
  • handler UncaughtExceptionHandler 用于异常处理的拒绝策略
  • asyncMode 异步模式,如果为ture,则队列将采用FIFO_QUEUE,实现先进先出,反之则LIFO_QUEUE 实现后进先出

asyncMode是个啥?

asyncMode是影响WorkQueue的一个参数,WorkQueue的模式。决定了WorkQueue上的线程,从自己的WorkQueue中安LIFO或者FIFO的方式执行任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
java.util.concurrent.ForkJoinPool#runWorker

final void runWorker(WorkQueue w) {
//初始化队列,这个方法会根据任务进行判断是否需要扩容
w.growArray(); // allocate queue
//hint是采用的魔数的方式增加
int seed = w.hint; // initially holds randomization hint
//如果seed为0 则使用1
int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
//死循环
for (ForkJoinTask<?> t;;) {
//调用scan方法 对经过魔数计算的r 之后开始进行窃取过程 如果能够窃取 则task不为空
if ((t = scan(w, r)) != null)
//运行窃取之后的task
w.runTask(t);
//反之则当前线程进行等待
else if (!awaitWork(w, r))
break;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
}
}

final void runTask(ForkJoinTask<?> task) {
if (task != null) {
//位运算更改队列状态为正在执行任务
scanState &= ~SCANNING; // mark as busy
//当前 task 的执行
(currentSteal = task).doExec();
U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
//执行队列中的其它任务
execLocalTasks();
ForkJoinWorkerThread thread = owner;
if (++nsteals < 0) // collect on overflow
transferStealCount(pool);
//任务执行完,再用位运算把状态改回扫描任务
scanState |= SCANNING;
if (thread != null)
//hook
thread.afterTopLevelExec();
}
}

java.util.concurrent.ForkJoinPool.WorkQueue#execLocalTasks

final void execLocalTasks() {
int b = base, m, s;
ForkJoinTask<?>[] a = array;
//如果队列里面有任务
//通过 base 和 top 判断,如果 top > base,说明有任务
if (b - (s = top - 1) <= 0 && a != null &&
(m = a.length - 1) >= 0) {
//LIFO 模式,FIFO 标志位为 0
if ((config & FIFO_QUEUE) == 0) {
//遍历取出所有任务,依次执行
for (ForkJoinTask<?> t;;) {
if ((t = (ForkJoinTask<?>)U.getAndSetObject
//从 top 取任务
(a, ((m & s) << ASHIFT) + ABASE, null)) == null)
break;
//取走一个任务,top 变更为 s,也就是 top - 1
U.putOrderedInt(this, QTOP, s);
//又进入了前面看到的 doExec 方法
t.doExec();
//重新计算 s,继续迭代
if (base - (s = top - 1) > 0)
break;
}
}
//FIFO 模式
else
//与 LIFO 模式类似,也是遍历取出所有任务依次执行
//区别是,这里从 base 位开始往上遍历
pollAndExecAll();
}
}


ForkJoinPool基本组成

ForkJoinPool的实际组成是,由一个WorkQueue的数组构成。但是这个数组比较特殊,在偶数位,存放外部调用提交的任务,如通过execute和submit等方法。这种队列称为SubmissionQueue。

而另外一种任务是前者在执行过程种通过fork方法产生的新任务。这种队列称为workQueue。

SubmissionQueue与WorkQueue的区别在于,WorkQueue的属性“final ForkJoinWorkerThread owner;”是有值的。也就是说,WorkQueue将有ForkJoinWorkerThread线程与之绑定。在运行过程中不断的从WorkQueue中获取任务。如果没有可执行的任务,则将从其他的SubmissionQueue和WorkQueue中窃取任务来执行。

前面学习过了工作窃取算法,实际上在WorkQueue上的ForkJoinWorkerThread就是一个窃取者,它从SubmissionQueue中或者去偷的WorkQueue中,按FIFO的方式窃取任务。之后执行。也从自己的WorkQueue中安LIFO或者FIFO的方式执行任务。这取决于模式的设定。默认情况下是采用LIFO的方式在执行。组成如下图所示: