ForkJoin框架
ForkJoin 是 JDK 特地为分治算法实现的一个框架。适合在CUP密集场景下使用。
ForkJoin 有两大核心思想:分治和窃取
分治思想 分治法的基本思想是将一个规模为N的问题分解为K个规模较小的子问题,这些子问题的相互独立且与原问题的性质相同,求出子问题的解之后,将这些解合并,就可以得到原有问题的解。
工作窃取 工作窃取是指当某个线程的任务队列中没有可执行任务的时候,从其他线程的任务队列中窃取任务来执行,以充分利用工作线程的计算能力,减少线程由于获取不到任务而造成的空闲浪费。在ForkJoinpool中,工作任务的队列都采用双端队列Deque容器。我们知道,在通常使用队列的过程中,我们都在队尾插入,而在队头消费以实现FIFO。而为了实现工作窃取。一般我们会改成工作线程在工作队列上LIFO,而窃取其他线程的任务的时候,从队列头部取获取。示意图如下:
窃取的具体实现体现了递归的思想,因此ForkJoin框架使用不当,会出现栈溢出的情况。
主要成员 ForkJoinTask ForkJoinPool 唯一可以接受的任务类型,RecursiveTask 和 RecursiveAction 都是 ForkJoinTask 的子类,如果直接提交 Callable 或 Runnable 也会被自动包装成 ForkJoinTask;
ForkJoinTask存在多个内部类,对这些场景做了适配
AdaptedRunnable
AdaptedRunnableAction
RunnableExecuteAction
AdaptedCallable
同时通过ForkJoinPool进行提交的时候,对传入的对象,进行了一次包装
1 2 3 4 5 public <T> ForkJoinTask<T> submit(Runnable task, T result) { ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result); externalPush(job); return job; }
fork方法 fork方法并不是如其方法名会fork一个新线程来执行任务,只是将任务提交到任务队列中而已,然后立即返回,不会等待任务执行完成,其实现如下:
1 2 3 4 5 6 7 8 public final ForkJoinTask<V> fork () { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this ); else ForkJoinPool.common.externalPush(this ); return this ; }
join / quietlyJoin join方法用于阻塞当前线程,等待任务执行完成并返回结果 。部分情形下会通过当前线程执行任务,如果异常结束或者被取消需要抛出异常;
quietlyJoin 加入此任务,而不返回其结果或异常 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) //运行doJoin逻辑,判断结果;如果任务非正常结束 reportException(s); // 如果是CANCELLED(取消),抛出取消异常;如果是EXCEPTIONAL(任务异常),抛出相关异常; return getRawResult();// 任务正常结束,获取结果 V } public final void quietlyJoin() { doJoin(); //只是等待任务执行完成 } private void reportException(int s) { if (s == CANCELLED) throw new CancellationException(); if (s == EXCEPTIONAL) rethrow(getThrowableException()); } private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : //status小于0说明任务已结束,直接返回 ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? //如果当前线程是ForkJoinWorkerThread,执行tryUnpush, (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s ://如果tryUnpush返回true且或者doExec返回小于0,直接返回状态s(s被重新赋值过) wt.pool.awaitJoin(w, this, 0L) : // 否则调用awaitJoin方法等待并返回状态 externalAwaitDone(); //如果当前线程是普通的Java线程 } final int doExec() { int s; boolean completed; if ((s = status) >= 0) { try { completed = exec(); } catch (Throwable rex) { return setExceptionalCompletion(rex); } if (completed) s = setCompletion(NORMAL);//如果任务调用成功,s为负数 } return s; }
invoke / quietlyInvoke invoke,当前线程会立即执行当前任务并返回结果。如果doExec方法返回值大于等于0说明还有其他的子任务未完成,则等待其他子任务执行完成,典型的应用场景就是CountedCompleter,RecursiveAction和RecursiveTask通常doExec返回值小于0,会在compute方法即执行exec方法时等待所有的子任务执行完成; quietlyInvoke,当前线程会立即执行当前任务,不返回结果或者抛出异常 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public final V invoke() { int s; if ((s = doInvoke() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); } private int doInvoke() { int s; Thread t; ForkJoinWorkerThread wt; return (s = doExec()) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (wt = (ForkJoinWorkerThread)t).pool. awaitJoin(wt.workQueue, this, 0L) : externalAwaitDone(); }
get之间的区别 get方法继承自 Future类。 内部实现逻辑和Join大致相同。
1 2 3 4 5 6 7 8 9 10 public final V get() throws InterruptedException, ExecutionException { int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? doJoin() : externalInterruptibleAwaitDone(); Throwable ex; if ((s &= DONE_MASK) == CANCELLED) throw new CancellationException(); if (s == EXCEPTIONAL && (ex = getThrowableException()) != null) throw new ExecutionException(ex); return getRawResult(); }
join/invoke/get之间的区别
join方法用于让当前线程阻塞,直到对应的子任务完成运行并返回执行结果。(调用join方法之前必须要吧task提交到线程池,否则会一直阻塞。 )
invoke,立即执行任务,任务无需提交到线程池(无需调用fork )
get,方法继承自 Future类,阻塞并返回结果。
ForkJoinPool ForkJoin 专用线程池。下面会详细说明
WorkQueue ForkJoin 专用工作队列。
WorkQueue 里面的成员变量有很多,这里我们只关注其中一部分。
scanState,32 位的 int 变量,记录了四个信息:
线程的 inactive 状态;
版本号;
队列索引值;
scanning 状态。
ForkJoinTask 数组的存取如下图所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 * array 初始容量 8192; * 第一个任务放在 4096,似乎是因为操作系统内存的原因; * 8191 的位置放入任务之后,还是会回到 0 的位置; * 初始 base = top = 4096; * 从上面放入一个任务 top + 1,不会从下面放入任务; * LIFO 模式自己线程从上面取走任务 top - 1; * FIFO 模式自己线程从下面取走任务 base + 1; * 被其它线程从下面窃取任务,base + 1,其它线程不会从上面窃取任务; * 数组 size 由 top - base 获得; * 从 8191 回到 0 之后,top 和 base 会继续往上加,索引值通过取余获得。 * pop操作也仅限于工作线程。
关于WorkQueue的思考
为什么作者会自己实现一个Deque,而不是使用juc中已存在的容器?
1 2 3 这就是因为这个队列全程都是采用Unsafe来实现的,在开篇作者也说了,需要@Contented修饰,就是为了避免缓存的伪代共享。这样来实现一个高效的Deque,以供ForkJoinPool来操作。 这与学习ConcurrentHashMap等容器的源码一样,可以看出作者为了性能的优化,采用了很多独特的方式来实现。这些地方都是我们值得学习和借鉴之处。这也是ForkJoin性能高效的关键。在作者的论文中也可以看出,java的实现,由于抽象在jvm之上,性能比c/c++的实现要低很多。这也是作者尽可能将性能做到最优的原因之一。
WorkQueue对队列怎么加锁的?
贴一个poll的源码,底层基于Unsafe的cas对象方法来实现的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 /** * Takes next task, if one exists, in FIFO order. */ final ForkJoinTask<?> poll() { ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t; //判断 base-top小于0说明存在task 切array不为空 while ((b = base) - top < 0 && (a = array) != null) { //计算出unsafe操作的索引 实际上就是拿到b int j = (((a.length - 1) & b) << ASHIFT) + ABASE; //之后拿到这个task 用volatile的方式 t = (ForkJoinTask<?>)U.getObjectVolatile(a, j); //之后如果base和b相等 if (base == b) { //如果拿到的task不为空 if (t != null) { //那么将这个位置的元素移除 base+1 然后返回t if (U.compareAndSwapObject(a, j, t, null)) { base = b + 1; return t; } } //在上述操作之后,如果base比top小1说明已经为空了 直接退出循环 else if (b + 1 == top) // now empty break; } } //默认返回null return null; }
ForkJoinWorkerThread ForkJoin 专用工作线程。
ForkJoinPool常量以及成员变量
ForkJoinPool 里面有一个 WorkQueue 数组;
每个 WorkQueue 里面都有一个 ForkJoinTask 数组,存放提交的任务;
WorkQueue 数组:
偶数位,存放外部调用提交的任务,如通过execute和submit等方法。这种队列称为SubmissionQueue。
奇数位,存放前者(偶数位)在执行过程种通过fork方法产生的新任务。这种队列称为workQueue。
奇数位的WorkQueue有自己的ForkJoinWorkerThread为成员变量owner;偶数下标的没有,即owner为null
重要变量 ForkJoinPool.config 变量:
高 16 位存储 mode 信息,包括:
SHARED_QUEUE:共享队列,没有自己的工作线程,只能被其它线程窃取任务;
FIFO_QUEUE:先进先出队列;
LIFO_QUEUE:后进先出队列;
低 16 位存储并发度,parallelism size,也可以理解为类似于 ThreadPoolExecutor 的 core pool size,但创建额外线程的逻辑不太一样。
ForkJoinPool.runState 变量:
第 1 位的 1 表示 RSLOCK 状态,runState 变量被锁,其它线程暂时不可以修改;
第 2 位的 1 表示 RSIGNAL 状态,线程阻塞等其它线程释放 RSLOCK;
其它状态同 ThreadPoolExecutor。
ForkJoinPool构造函数 1 2 3 4 5 6 7 8 9 10 11 12 13 public ForkJoinPool (int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this (checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-" ); checkPermission(); }
parallelism 表示ForkJoinPool支持的最大并行度
factory 用于产生线程的工厂方法
handler UncaughtExceptionHandler 用于异常处理的拒绝策略
asyncMode 异步模式,如果为ture,则队列将采用FIFO_QUEUE ,实现先进先出,反之则LIFO_QUEUE 实现后进先出
asyncMode是个啥?
asyncMode是影响WorkQueue的一个参数,WorkQueue的模式。决定了WorkQueue上的线程,从自己的WorkQueue中安LIFO或者FIFO的方式执行任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 java.util.concurrent.ForkJoinPool#runWorker final void runWorker(WorkQueue w) { //初始化队列,这个方法会根据任务进行判断是否需要扩容 w.growArray(); // allocate queue //hint是采用的魔数的方式增加 int seed = w.hint; // initially holds randomization hint //如果seed为0 则使用1 int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift //死循环 for (ForkJoinTask<?> t;;) { //调用scan方法 对经过魔数计算的r 之后开始进行窃取过程 如果能够窃取 则task不为空 if ((t = scan(w, r)) != null) //运行窃取之后的task w.runTask(t); //反之则当前线程进行等待 else if (!awaitWork(w, r)) break; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift } } final void runTask(ForkJoinTask<?> task) { if (task != null) { //位运算更改队列状态为正在执行任务 scanState &= ~SCANNING; // mark as busy //当前 task 的执行 (currentSteal = task).doExec(); U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC //执行队列中的其它任务 execLocalTasks(); ForkJoinWorkerThread thread = owner; if (++nsteals < 0) // collect on overflow transferStealCount(pool); //任务执行完,再用位运算把状态改回扫描任务 scanState |= SCANNING; if (thread != null) //hook thread.afterTopLevelExec(); } } java.util.concurrent.ForkJoinPool.WorkQueue#execLocalTasks final void execLocalTasks() { int b = base, m, s; ForkJoinTask<?>[] a = array; //如果队列里面有任务 //通过 base 和 top 判断,如果 top > base,说明有任务 if (b - (s = top - 1) <= 0 && a != null && (m = a.length - 1) >= 0) { //LIFO 模式,FIFO 标志位为 0 if ((config & FIFO_QUEUE) == 0) { //遍历取出所有任务,依次执行 for (ForkJoinTask<?> t;;) { if ((t = (ForkJoinTask<?>)U.getAndSetObject //从 top 取任务 (a, ((m & s) << ASHIFT) + ABASE, null)) == null) break; //取走一个任务,top 变更为 s,也就是 top - 1 U.putOrderedInt(this, QTOP, s); //又进入了前面看到的 doExec 方法 t.doExec(); //重新计算 s,继续迭代 if (base - (s = top - 1) > 0) break; } } //FIFO 模式 else //与 LIFO 模式类似,也是遍历取出所有任务依次执行 //区别是,这里从 base 位开始往上遍历 pollAndExecAll(); } }
ForkJoinPool基本组成 ForkJoinPool的实际组成是,由一个WorkQueue的数组构成。但是这个数组比较特殊,在偶数位,存放外部调用提交的任务,如通过execute和submit等方法。这种队列称为SubmissionQueue。
而另外一种任务是前者在执行过程种通过fork方法产生的新任务。这种队列称为workQueue。
SubmissionQueue与WorkQueue的区别在于,WorkQueue的属性“final ForkJoinWorkerThread owner;”是有值的。也就是说,WorkQueue将有ForkJoinWorkerThread线程与之绑定。在运行过程中不断的从WorkQueue中获取任务。如果没有可执行的任务,则将从其他的SubmissionQueue和WorkQueue中窃取任务来执行。
前面学习过了工作窃取算法,实际上在WorkQueue上的ForkJoinWorkerThread就是一个窃取者,它从SubmissionQueue中或者去偷的WorkQueue中,按FIFO的方式窃取任务。之后执行。也从自己的WorkQueue中安LIFO或者FIFO的方式执行任务。这取决于模式的设定。默认情况下是采用LIFO的方式在执行。组成如下图所示: