0%

java并发05_线程池原理

线程池实现原理

线程池的类的继承关系

ThreadPoolExecutor

核心数据结构

ctl 变量用于保存线程数量(workerCount)和线程池状态(runState),高 3 位来保存运行状态,低 29 位来保存线程数量。

mainLock 用于的线程池内部变量进行互斥访问控制。

Worker对象核心数据结构

Worker继承于AQS,也就是说Worker本身就是一把锁。

核心配置参数

  1. corePoolSize:在线程池中始终维护的线程个数。
  2. maxPoolSize:在corePooSize已满、队列也满的情况下,扩充线程至此值。
  3. keepAliveTime/TimeUnit:maxPoolSize 中的空闲线程,销毁所需要的时间,总线程数收缩回corePoolSize。
  4. blockingQueue:线程池所用的队列类型。
  5. threadFactory:线程创建工厂,可以自定义,也有一个默认的。
  6. RejectedExecutionHandler:corePoolSize 已满,队列已满,maxPoolSize已满,最后的拒绝策略。

线程池的4种拒绝策略

  1. AbortPolicy:直接抛出异常,默认策略;
  2. CallerRunsPolicy:用调用者所在的线程来执行任务;
  3. DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
  4. DiscardPolicy:直接丢弃任务;

线程池的生命周期

线程池的状态有五种,分别是RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED。

这里的状态迁移有一个非常关键的特征:从小到大迁移,-1,0,1,2,3,只会从小的状态值往大的状态值迁移,不会逆向迁移。例如,当线程池的状态在TIDYING=2时,接下来只可能迁移到TERMINATED=3,不可能迁移回STOP=1或者其他状态。

正确关闭线程池的步骤

线程池的关闭需要一个过程,在调用 shutDown()或者shutdownNow()之后,方法会立即返回,不会产生阻塞的情况。

但是线程池并不会立即关闭,线程池的关闭需要一个过程,接下来需要调用awaitTermination 来等待线程池关闭。(isTerminated 方法可以立即返回结果,但是没有阻塞能力)

不断循环判断线程池是否到达了最终状态TERMINATED

shutdown()与shutdownNow()的区别

shutdown()不会清空任务队列,会等所有任务执行完成,只会中断 空闲的工作线程
shutdownNow()会清空任务队列,并中断 所有的工作线程,并返回未执行的任务的list

线程池优雅关闭

当jvm进程需要退出的时候,线程池队列中仍有任务。此时如果jvm直接退出,这些任务都会丢失掉。

此时可以在创建线程池之后,使用Runtime.getRuntime().addShutdownHook(),并在此方法内增加线程池关闭的逻辑。这样jvm要退出时,会等待线程池运行结束后才会完全退出。

但是如果使用kill -9,或者机器crash等场景,还是会出现任务丢失的情况。

线程池核心流程&结合源码

任务提交流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// 判断有没有达到核心线程数,没达到直接调用addWorker方法,增加线程数
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
// 直接加worker成功,直接返回即可
return;
c = ctl.get();
}
// addWorker失败,可能的原因有
// 1. 线程池状态不是running了
// 2. 外部并发调用execute,导致worker数量已经超过了corePoolSize
// 所以对线程池进行状态检查,并将任务加入队列
if (isRunning(c) && workQueue.offer(command)) {
// 任务入队的时候,线程池状态可能发生变化,所以需要再次检查
// 如果再次检查发现线程池状态不是running了,需要将刚加进去的任务移除掉,并调用reject方法
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
// 如果线程池还是running,但是没有线程运行任务了,需要补偿一个worker
addWorker(null, false);
}
else if (!addWorker(command, false))
// 如果队列满了 加入队列失败,需要增加一个非核心worker
// 如果增加成功,就没什么问题,如果增加失败,调用reject
reject(command);
}

addWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//循环调用以下流程
// 1. 判断线程池状态
// 2. 获取当前worker数量
// 满足线程条件,增加workerCnt,并继续往下创建线程
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//开始创建worker流程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//如果运行worker失败,需要减少workerCnt
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

Worker主流程(任务执行过程)

worker本身实现了Runnable,本身是一把锁,成员里有具体运行的线程和获取到的任务firstTask,run方法主要代理了ThreadPool的runWorker方法。
还有一个interruptIfStarted()比较重要,这个方法在shutdownNow中会被调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 获取锁相关的忽略,重点看 run、interruptIfStarted方法

private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;

/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}

// Lock methods
//
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

runWorker方法

  1. 通过while,获取worker的锁,执行一下逻辑
  2. 运行task,如果task已经执行完毕,下次会去队列里面拿(getTask方法)
  3. 如果是核心线程数没满,会调用队列take方法一直阻塞。如果核心线程数满了,说明是非核心线程,会调用poll方法+超时时间(即线程池创建参数的空闲时间),直接返回空
  4. 回获取不到task,则退出while循环,执行线程退出逻辑。获取到task则执行,并继续while循环

中断流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//获取worker的锁,通过能不能获取到锁来判断,worker对象是否空闲
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
//直接调用worker对象的interruptIfStarted方法,不管锁
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}

tryTerminate方法会将线程池的状态转换成TERMINATED,这个方法会在很多地方调用到,比如runWorker方法的最后。

ScheduledThreadPoolExecutor

延迟执行任务依靠的是DelayedWorkQueue。DelayedWorkQueue是BlockingQueue的一种,其实现原理是二叉堆。

ScheduledThreadPoolExecutor实现了按时间调度来执行任务

  1. schedule: 延迟执行任务
  2. scheduleAtFixedRate:按固定频率执行。每次执行完会重新放入队列,时间为固定的 任务开始时间+period。所以如果任务执行时间> period 的话,重新放入队列的时候,每次都会在队列第一个,即每次运行完后马上接着执行下一个。
  3. scheduleWithFixedDelay:按固定间隔执行,和任务执行时间无关。每次任务执行完,都延迟相同的时间。

withFixedDelay和atFixedRate的区别就体现在setNextRunTime里面。

  • 如果是atFixedRate,period>0,下一次开始执行时间等于上一次开始执行时间+period。
  • 如果是withFixedDelay,period < 0,下一次开始执行时间等于triggerTime(-p),为now+(-period),now即上一次执行的结束时间。

Executors工具类

  • newFixedThreadPool:指定线程数量的线程池,核心线程数和最大线程数相同,无限队列
  • newSingleThreadExecutor 只有一个线程的线程池,无限队列
  • newCachedThreadPool: 不限制线程数量的线程,核心线程数为0,丢入一个任务,创建一个线程
  • newSingleThreadScheduledExecutor: 只有一个线程的ScheduledExecutor
  • newScheduledThreadPool:指定核心线程数量的ScheduledThreadPoolExecutor
  • newWorkStealingPool: 1.8之后加入,底层使用ForkJoinPool

线程池注意事项

阿里开发手册不建议使用Executors工具类

线程池容量的动态调整

ThreadPoolExecutor提供了动态调整线程池容量大小的方法:

setCorePoolSize:设置核心池大小
setMaximumPoolSize:设置线 程池最大能创建的线程数目大小

线程池的线程可能根据业务需要进行动态调整,美团甚至封装了一个线程池中间件。

美团-动态化线程池

线程池参数调优

理论上是

1
2
3
4
5
cup利用率 = cpu计算时间/(cup计算时间+cupIO等待时间)

1/cup利用率 = 单核cpu可以开的线程数

总线程数= cup核数 * 单核cpu可以开的线程数

即上述图片第一种推导结果,但是实际操作起来非常的麻烦。还是要根据具体情况,遇到相应问题再去深入调优。

Callable与FutureTask

execute(Runnable command)接口是无返回值的,与之相对应的是一个有返回值的接口Future submit(Callable task)。

Callable也就是一个有返回值的Runnable,其定义如下

submit(Callable task)并不是在ThreadPoolExecutor 里面直接实现的,而是实现在其父类AbstractExecutorService中。

从这段代码中可以看出,Callable其实是用Runnable实现的。在submit内部,把Callable通过FutureTask这个Adapter转化成Runnable,然后通过execute执行。

FutureTask

FutureTask是一个Adapter对象。一方面,它实现了Runnable接口,也实现了Future接口;另一方面,它的内部包含了一个Callable对象,从而实现了把Callable转换成Runnable。

FutureTask.run

一方面,线程池内部的线程在执行RunTask的run()方法;另一方面,外部多个线程又在调用get()方法,等着返回结果,因此这个地方需要一个阻塞—通知机制。

JDK 7开始,直接基于CAS state变量+park/unpark()来实现阻塞—唤醒机制。

注意事项

线程池使用 FutureTask 时如果把拒绝策略设置为DiscardPolicy和DiscardOldestPolicy,并且在被拒绝的任务的 FutureTask 对象上调用了无参get方法,那么调用线程会一直被阻塞。

当使用FutureTask时,尽量使用带超时时间的get方法,这样即使使用了DiscardPolicy拒绝策略也不至于一直等待,超时时间到了就会自动返回。如果非要使用不带参数的get方法则可以重写DiscardPolicy的拒绝策略,在执行策略时设置该Future的状态大于COMPLETING即可。

FutureTask没有被执行,没有调用run方法,所以state状态不会变,初始化的时候是NEW,再看看get方法,会导致调用线程一直阻塞。

参考资料1-Java Future详解与使用

java并发实现原理:JDK源码剖析

Java 并发编程之美