0%

java并发10_ForkJoin框架任务提交流程&常见坑

外部提交任务

ForkJoinPool 中的任务有两个来源:

  • 外部提交的大任务;
  • 内部拆分的小任务。

submit

1
2
3
4
5
6
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task;
}

externalPush

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws; WorkQueue q; int m;
int r = ThreadLocalRandom.getProbe();
int rs = runState;
//符合条件的直接往数组里面放
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a; int am, n, s;
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
int j = ((am & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
U.putIntVolatile(q, QLOCK, 0);
if (n <= 1)
signalWork(ws, q);
return;
}
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
//不符合条件的走这里
externalSubmit(task);
}

这里符合的“条件”是:

  • (ws = workQueues) != null,线程池的 WorkQueue 数组已经被初始化;
  • (m = (ws.length - 1)) >= 0,初始化的 WorkQueue 数组 length >= 1,正常情况初始化出来的 length 一定 >= 1,实际上一定 >= 4,这个判断其实主要是为 m 赋值;
  • (q = ws[m & r & SQMASK]) != null,当前提交任务的线程已经拥有了一个 WorkQueue;
  • r != 0,当前提交任务的线程已经生成过随机数;
  • rs > 0,线程池没有 shutdown;
  • U.compareAndSwapInt(q, QLOCK, 0, 1),队列此前没有加锁,并且自己此时加锁成功。

看上去这么多条件,其实就一个,这个线程已经提交过任务。已经提交过任务的线程,必然已经通过 ThreadLocalRandom 生成了随机数,这个随机数一般是不会变的,所以再次提交任务,会直接提交到上次提交任务的队列里面去。

externalSubmit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
private void externalSubmit(ForkJoinTask<?> task) {
int r; // initialize caller's probe
if ((r = ThreadLocalRandom.getProbe()) == 0) {
//初始化随机数种子,并取一个随机数
ThreadLocalRandom.localInit();
r = ThreadLocalRandom.getProbe();
}
for (;;) {
WorkQueue[] ws; WorkQueue q; int rs, m, k;
boolean move = false;
//shutdown 不可以提交任务
if ((rs = runState) < 0) {
tryTerminate(false, false); // help terminate
throw new RejectedExecutionException();
}
//线程池还没有 started,初始化线程池
else if ((rs & STARTED) == 0 || // initialize
((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
int ns = 0;
rs = lockRunState();
try {
if ((rs & STARTED) == 0) {
U.compareAndSwapObject(this, STEALCOUNTER, null,
new AtomicLong());
// create workQueues array with size a power of two
int p = config & SMASK; // ensure at least 2 slots
int n = (p > 1) ? p - 1 : 1;
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
workQueues = new WorkQueue[n];
ns = STARTED;
}
} finally {
unlockRunState(rs, (rs & ~RSLOCK) | ns);
}
}
//目标队列已经存在
else if ((q = ws[k = r & m & SQMASK]) != null) {
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a = q.array;
int s = q.top;
boolean submitted = false; // initial submission or resizing
try { // locked version of push
if ((a != null && a.length > s + 1 - q.base) ||
(a = q.growArray()) != null) {
int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
submitted = true;
}
} finally {
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
if (submitted) {
//尝试安排一个线程,但是否真的能安排上由方法里面控制
signalWork(ws, q);
return;
}
}
move = true; // move on failure
}
//目标队列不存在,先初始化一个队列
else if (((rs = runState) & RSLOCK) == 0) { // create new queue
//注意这里初始化的是一个没有自己工作线程的共享队列
q = new WorkQueue(this, null);
q.hint = r;
q.config = k | SHARED_QUEUE;
q.scanState = INACTIVE;
rs = lockRunState(); // publish index
if (rs > 0 && (ws = workQueues) != null &&
k < ws.length && ws[k] == null)
ws[k] = q; // else terminated
unlockRunState(rs, rs & ~RSLOCK);
}
//目标队列很忙,没拿到锁
else
move = true; // move if busy
if (move)
//重新生成一个随机数
r = ThreadLocalRandom.advanceProbe(r);
}
}

这个方法看起来复杂,但主体逻辑很清晰:

  • 如果线程池 shutdown,不让提交任务;
  • 如果提交任务的线程还没有生成随机数,那么先生成一个随机数,每个 Thread 对象都有一个自己的随机数;
  • 如果线程池还没有初始化,则先初始化;
  • 如果目标队列还没有初始化,则先初始化;
  • 都初始化好了,再加锁,提交任务到目标队列;
  • 如果抢锁失败,再重新生成一个随机数,重新确定目标队列;
  • 任务成功入队之后,通知线程池需要一个活跃线程。

这里创建的队列有几个特征:

  • 没有工作线程;
  • 共享模式;
  • 初始状态就是 inactive;
  • 在 WorkQueue 数组中的索引值一定是偶数,因为索引值 k = r & m & SQMASK,而 SQMASK 是个偶数;

线程随机数的作用:
外部提交的任务会进入一个目标队列,而目标队列则是由线程随机数来决定的,只要随机数不变,WorkQueue 数组不扩容,那么同一个线程提交的任务一定会进入同一个队列。

线程随机数一般不变,发生变化的情况是,目标队列竞争激烈,这会带来性能损耗,这时候就重新生成一个随机数,那么下次提交任务就会换到另一个队列,不跟其它线程竞争。

总结来说,外部提交的任务会根据线程随机数进入一个没有工作线程的队列里等着。

窃取任务

run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
// hook
onStart();
//执行任务在这里
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
// hook
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
//线程退出
pool.deregisterWorker(this, exception);
}
}
}
}

主要逻辑在 runWorker 方法

runWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final void runWorker(WorkQueue w) {
w.growArray(); // allocate queue
int seed = w.hint; // initially holds randomization hint
int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
//死循环,避免线程退出
for (ForkJoinTask<?> t;;) {
//扫描任务,可能是自己队列的,也可能是其它线程队列的
if ((t = scan(w, r)) != null)
//执行任务
w.runTask(t);
//没有任务,看情况挂起或者销毁线程
else if (!awaitWork(w, r))
//如果销毁线程,从这里退出
break;
//重新生成随机数
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
}
}

scan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
//w 是当前线程的队列,r 是当前线程的随机数
private ForkJoinTask<?> scan(WorkQueue w, int r) {
WorkQueue[] ws; int m;
//如果线程池已经初始化
if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
int ss = w.scanState; // initially non-negative
//死循环扫描 WorkQueue[] 所有队列,从 origin = r & m 的索引开始
for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
int b, n; long c;
//如果这个索引处存在队列
if ((q = ws[k]) != null) {
//如果队列中还有任务
if ((n = (b = q.base) - q.top) < 0 &&
(a = q.array) != null) { // non-empty
// (a.length - 1) & b 应该不陌生,在 ConcurrentHashMap 中也遇到过
// 相当于 b % a.length,即取索引为 b 的元素
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
if ((t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i))) != null &&
q.base == b) {
//scanState >= 0 表示 active
if (ss >= 0) {
if (U.compareAndSwapObject(a, i, t, null)) {
//从 base 取走一个,base + 1
q.base = b + 1;
if (n < -1) // signal others
signalWork(ws, q);
//返回扫描到的任务
return t;
}
}
else if (oldSum == 0 && // try to activate
w.scanState < 0)
tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
}
if (ss < 0) // refresh
ss = w.scanState;
r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
origin = k = r & m; // move and rescan
oldSum = checkSum = 0;
continue;
}
checkSum += b;
}
//回到了 origin,表示遍历了一圈,没有发现任务
if ((k = (k + 1) & m) == origin) { // continue until stable
if ((ss >= 0 || (ss == (ss = w.scanState))) &&
oldSum == (oldSum = checkSum)) {
if (ss < 0 || w.qlock < 0) // already inactive
break;
int ns = ss | INACTIVE; // try to inactivate
long nc = ((SP_MASK & ns) |
(UC_MASK & ((c = ctl) - AC_UNIT)));
w.stackPred = (int)c; // hold prev stack top
U.putInt(w, QSCANSTATE, ns);
if (U.compareAndSwapLong(this, CTL, c, nc))
ss = ns;
else
w.scanState = ss; // back out
}
checkSum = 0;
}
}
}
return null;
}

这个方法里面涉及比较多的实现细节,比如:

  • oldSum,checkSum:主要是为了判断 WorkQueue[] 是否稳定,在扫描之前,记录一个 oldSum,扫描过程中根据扫过的 WorkQueue 的 base 位置累加出一个 checkSum,如果两者不相等,就认为 WorkQueue[] 在扫描过程中发生了变化,那么即使扫描一圈之后发现没有任务,也要再扫描一次。
  • 一些 try active,try inactive,already inactive 的操作,也是为了可以多扫描几圈,尽可能延迟线程阻塞。

工作线程会根据自己的线程随机数,确定一个初始位置,然后从这里开始依次遍历所有队列,发现任务就从队列底部取走。这个过程就体现了任务窃取。

runTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final void runTask(ForkJoinTask<?> task) {
if (task != null) {
//位运算更改队列状态为正在执行任务
scanState &= ~SCANNING; // mark as busy
//当前 task 的执行
(currentSteal = task).doExec();
U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
//执行队列中的其它任务
execLocalTasks();
ForkJoinWorkerThread thread = owner;
if (++nsteals < 0) // collect on overflow
transferStealCount(pool);
//任务执行完,再用位运算把状态改回扫描任务
scanState |= SCANNING;
if (thread != null)
//hook
thread.afterTopLevelExec();
}
}

这里执行任务的方法有两个:

  • doExec,执行当前任务;
  • execLocalTasks,执行队列里的其它任务。

一个任务执行完之后,就会进入 execLocalTasks 方法执行自己队列里的其它任务。

runTask方法总结

  • 在工作线程的 run 方法里面,先从其它队列窃取一个任务;
  • 然后执行这个任务;
  • 再执行自己队列里的其它所有任务;(根据线程池的模式,是用FIFO或者LIFO从自己队列里获取任务)
  • 循环上面的流程,直到线程销毁。

doJoin里面的窃取(执行子任务)

dojoin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//另一种更具可读性的写法
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
//如果任务已经执行完成,直接返回任务状态
if ((s = status) < 0) {
return s;
}
t = Thread.currentThread();
boolean isForkJoinThread = t instanceof ForkJoinWorkerThread;
//如果当前线程不是 ForkJoin 线程,即外部线程直接调用 join 方法
if (!isForkJoinThread) {
//等待任务被线程池分配线程执行完,返回任务状态
return externalAwaitDone();
}
//对于 ForkJoinPool 内部线程
wt = (ForkJoinWorkerThread) t;
w = wt.workQueue;
//尝试从队列顶部把当前任务弹出来,可以理解为 tryPop,
//如果当前任务在队列顶部,就可以弹出来
if (w.tryUnpush(this)) {
//执行当前任务
return this.doExec();
}
//如果当前任务不在队列顶部,那么要等
return wt.pool.awaitJoin(w, this, 0L);
}

如果tryUnpush 失败,进入等待流程。(剧透,此时不会把线程挂起,而是去偷任务执行,避免CPU上下文切换)

awaitJoin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
int s = 0;
if (task != null && w != null) {
ForkJoinTask<?> prevJoin = w.currentJoin;
U.putOrderedObject(w, QCURRENTJOIN, task);
CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
(CountedCompleter<?>)task : null;
//自旋
for (;;) {
//1.任务已经被执行了,退出
if ((s = task.status) < 0)
break;
if (cc != null)
helpComplete(w, cc, 0);
//2.从队列里查找并取出当前任务,直接执行
else if (w.base == w.top || w.tryRemoveAndExec(task))
helpStealer(w, task);
//3.进入阻塞之前,再检查一次状态
if ((s = task.status) < 0)
break;
long ms, ns;
if (deadline == 0L)
ms = 0L;
else if ((ns = deadline - System.nanoTime()) <= 0L)
break;
else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
ms = 1L;
//4.尝试补偿一个活跃线程,然后阻塞当前线程,
//目的是避免所有线程都进入阻塞状态,那线程池就没线程可用了
if (tryCompensate(w)) {
//进入阻塞,等待当前任务被其它线程执行完成
task.internalWait(ms);
U.getAndAddLong(this, CTL, AC_UNIT);
}
}
U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
}
return s;
}

当工作线程进入阻塞前,会补偿一个活跃线程(tryCompensate方法),防止线程池没有线程可用。同时如果forkJoinPool使用不当,这可能会导致线程池疯狂创建线程(详见使用避坑)。

helpStealer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
//仅在 awaitJoin 方法中被调用
private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
WorkQueue[] ws = workQueues;
int oldSum = 0, checkSum, m;
if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
task != null) {
do { // restart point
checkSum = 0; // for stability check
ForkJoinTask<?> subtask;
WorkQueue j = w, v; // v is subtask stealer
//找到窃取了当前任务的线程
descent: for (subtask = task; subtask.status >= 0; ) {
//其实就是遍历,不过是跳着遍历,因为所有工作线程的队列都在奇数位
for (int h = j.hint | 1, k = 0, i; ; k += 2) {
if (k > m) // can't find stealer
break descent;
if ((v = ws[i = (h + k) & m]) != null) {
if (v.currentSteal == subtask) {
j.hint = i;
break;
}
checkSum += v.base;
}
}
//当前线程帮窃取了当前任务的线程执行一个任务
for (;;) { // help v or descend
ForkJoinTask<?>[] a; int b;
checkSum += (b = v.base);
ForkJoinTask<?> next = v.currentJoin;
//当前任务窃取链断了,跳出 descent 循环
if (subtask.status < 0 || j.currentJoin != subtask ||
v.currentSteal != subtask) // stale
break descent;
//如果窃取了当前任务的线程队列里没有任务,就帮它执行它在 join 的任务
//注意这个 if 块里面对 subtask 重新赋值,如果 subtask 不为空,
//就会回到 descent 循环进行下一个迭代,
//也就是 help stealer's stealer
if (b - v.top >= 0 || (a = v.array) == null) {
//如果它没有在 join,任务窃取链就断了,跳出 descent 循环
if ((subtask = next) == null)
break descent;
//否则跳出当前 for 循环
j = v;
break;
}
//如果窃取了当前任务的线程队列有任务,就从底部窃取一个任务
int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
ForkJoinTask<?> t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i));
if (v.base == b) {
if (t == null) // stale
break descent;
if (U.compareAndSwapObject(a, i, t, null)) {
v.base = b + 1;
//先把 currentSteal 记下来,因为后面递归会导致其发生变化
ForkJoinTask<?> ps = w.currentSteal;
int top = w.top;
do {
U.putOrderedObject(w, QCURRENTSTEAL, t);
//这里会递归
t.doExec(); // clear local tasks too
} while (task.status >= 0 &&
w.top != top &&
(t = w.pop()) != null);
//递归出来,再把 currentSteal 改回之前的值
U.putOrderedObject(w, QCURRENTSTEAL, ps);
if (w.base != w.top)
return; // can't further help
}
}
}
}
//退出 helpStealer 的条件有两个,满足其一即可:
//1.自己 join 的任务已经执行完了,还是自己的工作优先;
//2.虽然自己 join 的任务还没有执行完,但是也找不到任务可以窃取了,
// 继续循环也只是空跑,浪费 cpu 资源,所以退出去后面阻塞当前线程
} while (task.status >= 0 && oldSum != (oldSum = checkSum));
}
}

这里主要涉及两个成员变量:

  • currentSteal:当前线程窃取过来的任务;
  • currentJoin:导致当前线程阻塞的任务。

helpStealer 方法与 scan 方法之后的 runTask 方法是呼应的,runTask 方法中执行窃取的任务时会先给 currentSteal 变量赋值。

helpStealer 方法其实不仅仅是 help stealer,还会 help stealer’s stealer,help stealer’s stealer’s … stealer’s stealer,比如:

  • thread1 在 task1 上 join;
  • 通过遍历比较发现 task1 == thread2.currentSteal;
  • thread1 从 thread2 队列底部窃取一个任务;
  • thread1 再从 thread2 队列底部窃取一个任务;
  • 继续从底部窃取……
  • thread2 队列没有任务了,thread1 继续寻找窃取了 thread2.currentJoin 的线程;
  • 通过遍历比较发现 thread2.currentJoin == thread3.currentSteal;
  • thread1 从 thread3 队列底部窃取一个任务;
  • 继续从底部窃取……
  • thread3 队列没有任务了,thread1 继续寻找窃取了 thread3.currentJoin 的线程;
  • 不断循环……

所以,currentJoin 和 currentSteal 其实记录了一条链路:thread1.currentJoin -> thread2.currentSteal -> thread2.currentJoin -> thread3.currentSteal -> thread3.currentJoin -> …

需要注意的是:

  • help stealer 只会发生在工作线程的队列之间,所以在上一节的 submit 子任务的示例中,help stealer 不会起作用;
  • help stealer 虽然也会执行子任务,但它在功能上不是必须的,而是一个性能优化,优化点在于减少线程因为 join 而阻塞,join 期间帮其它线程执行一个任务,可能 join 的任务就执行完了。

总结

这里体现了递归思想,工作线程不断的去帮偷取者,偷取者的偷取者…执行任务,直到没有子任务了,开始进入阻塞状态。等到最小子任务被执行完了,此工作线程被唤醒,再一层层返回。

使用避坑

不提交,直接 join(会一直阻塞)

1
2
3
4
5
6
//拆分出两个子任务
SumTask left = new SumTask(begin, mid);
SumTask right = new SumTask(mid + 1, end);
long leftSum = left.join();
long rightSum = right.join();
return leftSum + rightSum;

分析dojoin源码,如果是工作线程,会在awaitJoin这里阻塞(没有提交任务到队列)。如果是普通线程,则在externalAwaitDone()方法阻塞。

不提交,直接 invoke(当前线程会直接运行任务)

1
2
3
4
5
SumTask left = new SumTask(begin, mid);
SumTask right = new SumTask(mid + 1, end);
long leftSum = left.invoke();
long rightSum = right.invoke();
return leftSum + rightSum;

doInvoke 方法直接调用 doExec 方法就在当前线程里执行子任务了。

这就导致从头到尾就只有一个 ForkJoin 线程在执行任务,而且由于还要 new 一堆子任务以及递归,实际会比单线程执行还要慢得多(而且还要占用更多的栈内存和堆内存,由于所有的递归都在一个线程栈,还会增加栈溢出的风险)。

compute里面用forkJoinPool.submit 提交(疯狂创建线程)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
protected Long compute() {
System.out.println("thread [" + Thread.currentThread() + "] sum from [" + begin + "] to [" + end + "], pool size:" + forkJoinPool.getPoolSize() + ", active size: " + forkJoinPool.getActiveThreadCount());
long sum = 0;
if (end - begin <= THRESHOLD) {
for (long i = begin; i <= end; i++) {
sum += i;
}
return sum;
}
long mid = (end + begin) / 2;
SumTask left = new SumTask(begin, mid);
SumTask right = new SumTask(mid + 1, end);
forkJoinPool.submit(left);
forkJoinPool.submit(right);
long leftSum = left.join();
long rightSum = right.join();
return leftSum + rightSum;
}

前面讲过,submit 方法是一个外部提交任务的方法,它会把任务提交到没有工作线程的队列等待被窃取。

问题是这样发生的:

  • 线程 1 向线程池提交两个子任务;
  • 子任务进入 shared 队列,触发创建线程 2,线程 3;
  • 线程 1 join 进入阻塞;
  • 假设线程 2,线程 3 窃取到这两个子任务,执行子任务过程中并发重复以上过程:
  • 再次拆分四个子任务向线程池提交;
  • 新的子任务又进入另外两个 shared 队列,分别触发创建线程 4,线程 5,线程 6,线程 7;
  • 线程 2,线程 3 join 进入阻塞;
  • 假设现在线程数到达了 parallelism size;
  • 线程 4,5,6,7 join 进入阻塞,此前要补偿创建新线程 8,9,10,11;
  • 以此类推,瞬间成千上万的线程(最大 0x7fff = 32767)就被补偿创建出来了。

如果程序还没有崩溃,继续执行下去:

  • 终于某一时刻,子任务被拆分的足够小,大量的子任务开始被执行完成;
  • 在这些子任务上 join 的线程被唤醒,当前任务执行完成,开始 scan 其它任务;
  • shared 队列的任务被瓜分一空,成千上万的线程开始被迅速销毁,回到正常状态。

如果使用的是 ForkJoinPool.commonPool,程序会在创建“256 + cpu 逻辑核心数 - 1”个线程后报错

fork 提交(正确用法)

1
2
3
4
left.fork();
right.fork();
long leftSum = left.join();
long rightSum = right.join();

invokeAll 提交(最佳)

1
2
3
4
5
6
SumTask left = new SumTask(begin, mid);
SumTask right = new SumTask(mid + 1, end);
invokeAll(left, right);
long leftSum = left.join();
long rightSum = right.join();
return leftSum + rightSum;

invokeAll会把所有任务都fork掉,剩下一个任务直接执行(也是为了避免线程开销)。

1
2
3
4
5
6
7
8
9
10
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
int s1, s2;
//t2 入队
t2.fork();
//t1 直接执行
if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)
t1.reportException(s1);
if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)
t2.reportException(s2);
}

永远不要在ForkJoinPool里面做任何IO操作

例如访问mysql,rpc,redis,http调用等。

补充问题

ForkJonPool 初始化线程数?

初始化的时候不创建线程,执行任务的时候再去创建。最大的并发线程数(最大活跃线程数)即为 ForkJoinPool 的并发度。

ForkJonPool 和 ThreadPoolExecutor 不同,ThreadPoolExecutor有最大线程数限制,而ForkJonPool如果使用错误会创建大量线程。

当我们调用 ForkJoinPool 的 submit 方法向线程池中提交一个任务时,发生了什么?这个任务会立即分配一个空闲线程来执行还是会入队?

类似问题 在没有工作线程的队列里等待的任务,它怎么被执行?

扔到共享队列,然后被worker线程窃取

总结

对于一个 new ForkJoinPool(),执行任务全流程如下:

  • ForkJoinPool 初始化 parallelism size = cpu 逻辑核心数,没有队列,没有线程;
  • 向 ForkJoinPool 提交一个任务;
  • 初始化队列数组,容量为 2 * Max { parallelism size, 2 ^ n };
  • 创建一个没有线程的队列,容量为 2 ^ 13,随机放在队列数组的某一个偶数索引处;
  • 任务存入这个队列索引值为 2 ^ 12 处;
  • 再创建一个有线程的队列,容量为 2 ^ 13,随机放在队列数组的某一个奇数索引处;
  • 线程启动;
  • 线程从随机一个队列开始,遍历所有队列,最终扫描找到前面提交的任务,并从其所在队列取出;
  • 线程执行任务,拆分出两个子任务;
  • 如果用 invokeAll 提交,则一个进入线程所在队列,另一个直接在线程里执行;
  • 如果用 fork 提交,则两个都进入线程所在队列;
  • 提交的子任务触发创建新的线程,及与其对应的队列,还是在奇数索引处;
  • 提交的子任务可能仍然被当前线程执行,可能被其它线程窃取;
  • 线程在子任务处 join,join 期间会尝试从窃取自己任务的线程那里窃取任务执行;
  • 优先窃取队列底部;
  • 队列没有任务则窃取其正在 join 的任务;
  • 还没有则阻塞自己等待被唤醒,在阻塞之前会补偿一个活跃线程;
  • 提交的子任务不管被哪个线程执行,仍会重复上述拆分、提交、窃取、阻塞流程;
  • 当任务被拆分的足够细,则会真正开始计算;
  • 计算完成从递归一层一层返回;
  • 最终所有子任务都完成,得到结果;
  • 如果不再提交任务,所有线程扫描不到任务进入 inactive 状态;
  • 最终,所有线程销毁,所有奇数索引位的队列回收,ForkJoinPool 中只剩下一个最初创建的在偶数索引位的队列。

参考资料

【JUC源码解析】ForkJoinPool

java线程池(四):ForkJoinPool的使用及基本原理

java线程池(五):ForkJoinPool源码分析之一(外部提交及worker执行过程)

java线程池(六):ForkJoinPool源码分析之二(WorkQueue源码)

java线程池(七):ForkJoinPool源码分析之三(ForkJoinTask源码)

java线程池(八):ForkJoinPool源码分析之四(ForkJoinWorkerThread源码)

Java的Fork/Join任务,你写对了吗?

ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)

Java并发系列(12)——ForkJoin框架源码解析(上)

Java并发系列(12)——ForkJoin框架源码解析(下)

《Java并发实现原理:JDK源码剖析》