Java线程池源码详解

线程池具备如下两个方面的优势:

  1. 当需要创建大量异步任务时会改善应用性能,因为线程池减少了任务的调用开销;
  2. 可以更好的管理线程,比如监控线程使用情况、系统调优等;

下面是线程池类图结构:

Executorvoid execute(Runnable command)ExecutorServiceFuture<?> submit(Runnable task, ...)void shutdown()List<Runnable> shutdownNow()AbstractExecutorServiceThreadPoolExecutorExecutorsForkJoinPoolWorkerThreadRunnable用户任务newnew10~n

线程池分了两个大类:ThreadPoolExecutor是普通线程池,ForkJoinPool可用理解为是一个轻量级任务的线程池。

线程池线程数量管理

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
  1. corePoolSize核心线程数: 当线程池的线程数小于corePoolSize时,会创建一个新的线程来执行新任务,即使有线程处于闲置状态。
  2. maximumPoolSize最大线程数:当线程数已经大于corePoolSize,新任务会被优先添加到队列中等待执行。如果任务队列满了导致添加失败,并且线程数小于maximumPoolSize,就会创建新的线程来执行新任务。大于核心线程数的这些线程算是“借”来的,当借来的这些线程的idle时间超过keepAliveTime,就会被回收。回收逻辑在runWorker函数中。
  3. 如果线程队列满了,线程数也已经大于maximumPoolSize,就会回调RejectedExecutionHandler.rejectedExecution让调用中来处理任务。
添加新任务no核心线程已满创建核心线程执行任务no任务队列已满yes添加到队列等待执行no达到最大线程数yesyes创新新线程执行任务执行拒绝策略

通过一段代码测试一下上述线程数量的限制:

1
2
3
4
5
6
7
8
9
10
11
12
val server: ExecutorService = ThreadPoolExecutor(
2, //corePoolSize
10, //maximumPoolSize
5, TimeUnit.MINUTES, //keepAliveTime
LinkedBlockingDeque(5)) //任务队列,最大容量是5

for (i in 0..20) {
server.execute {
println("task $i run")
Thread.sleep(Long.MAX_VALUE)
}
}

上面这段代码的输出为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
task 0 run
task 1 run
task 7 run
task 8 run
task 9 run
task 10 run
task 11 run
task 12 run
task 13 run
task 14 run
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task MainKt$$Lambda$16/0x0000017ddf003400@70177ecd rejected from java.util.concurrent.ThreadPoolExecutor@1e80bfe8[Running, pool size = 10, active threads = 10, queued tasks = 5, completed tasks = 0]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
at MainKt.main(Main.kt:20)

从程序的输出可以看出,0、1是通过核心线程来执行,2~5号任务被添加到了队列中等待执行。因为任务队列满了,所以7~14是通过新建线程来执行(线程数没有超过maximumPoolSize)。0、1、7~14共计10个任务,达到了maximumPoolSize设置的最大线程数,所以当添加15号线程时报出了异常。

提交新任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// ThreadPoolExecutor.java
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1. 如果当前线程池线程数小于corePoolSize,就创建一个新的线程来执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}

// 2. 线程数已经大于corePoolSize,并且线程池当前是running状态,就把任务添加到队列中等待执行
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 任务队列已满,并且线程数小于maximumPoolSize,则创建新的线程来执行
else if (!addWorker(command, false))
reject(command);
}

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN) &&
(runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty()))
return false;

for (;;) {
//a) 添加核心线程:如果线程数量>=corePoolSize返回失败;
//b) 队列已满,添加非核心新线程,如果线程数量>=maximumPoolSize返回失败
if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
//线程数+1并返回外层的for循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建新线程,Worker构造函数中会创新新的Thread
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();

if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

执行任务

线程池中的每个线程对应一个Worker,Worker的首次任务执行完成后,不会立马退出,而是通过getTask()在任务队列中阻塞获取任务,当获取到任务队列中的任务后,就继续执行。如果线程池没有被退出,并且线程数已经超过核心线程数,getTask()中就会以超时的方式阻塞获取任务,超时时间为keepAliveTime,如果发生timeout(keepAliveTime时间后,任务队列中也没有新任务),就会返回null,此时runWorker函数就会走到processWorkerExit的逻辑来回收“借”来的线程,processWorkerExit执行完成后线程就退出了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
......
public void run() {
runWorker(this);
}
......
//反复从队列中获取任务并执行
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 优先执行firstTask,firstTask执行完成后会调用getTask()从队列获取任务来执行
// 如果getTask()返回null,线程会退出,系统就会回收这个线程
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run(); //执行任务
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();

// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

//线程数大于maximumPoolSize或者线程的闲置时间超过了keepAliveTime就返回null退出线程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
//从队列中获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
......
}

线程池状态

  • RUNNING: 接收新任务,并且处理队列中的任务;
  • SHUTDOWN: 不接受新任务,但会继续处理队列中的任务;
  • STOP: 不接受新任务,不处理队列中的任务,并且会终端正在执行的任务;
  • TIDYING: 所有任务都已经终止,线程数是0,线程转换到TIDYING状态会运行terminated()方法;
  • TERMINATED: terminated()运行结束;
%0 RUNNING RUNNING SHUTDOWN SHUTDOWN RUNNING->SHUTDOWN shutdown STOP STOP RUNNING->STOP shutdownNow SHUTDOWN->STOP shutdownNow TIDYING TIDYING SHUTDOWN->TIDYING When both queue and pool are empty STOP->TIDYING When pool is empty TERMINATED TERMINATED TIDYING->TERMINATED When terminated() method has completed

线程池状态和线程数使用同一个int变量来保存,一个int变量包含32个bit,高3为用于存放状态。RUNNING=111,SHUTDOWN=000,STOP=001,TIDYING=010,TERMINATED=011。

线程池排队策略

线程池任务的排队常用的有3中策略。

直接传递:采用同步队列SynchronousQueue来执行任务排队策略。每次任务入队都要等待另一个线程拿走该任务。当任务要入队时,如果没有线程立马来消费,入队就会失败(SynchronousQueue.offer默认超时时间为0),就会新建一个线程来执行任务。直接传递策略通常要求maximumPoolSize设置为无限大来避免任务被拒绝。否则,当线程数达到上限时,新任务将会被拒绝。

  • Executors#newCachedThreadPool()就是使用这种策略,核心线程数为0,最大线程数是Integer.MAX_VALUE,线程keepAliveTime是1分钟。

无界队列:比如使用不设置最大容量的LinkedBlockingQueue。当核心线程已满时,新任务会在队列中等待。因此创建的线程数不会超过corePoolSize(任务队列是无限大,maximumPoolSize不会起作用)。如果任务经常会被阻塞,比如IO任务,将会导致耗尽系统内存。

  • Executors#newFixedThreadPool(int nThreads)就是使用的这种策略,最大线程数和核心线程数使用相同的值nThreads;
  • Executors#newSingleThreadExecutor()也是这种策略,相当于nThreads=1;

有界队列:比如ArrayBlockingQueue。配合有限制的maximumPoolSize,有界队列可以防止资源耗尽的情况发生。但是队列的容量和maximumPoolSize两者必须做权衡:使用大容量队列和一个小的maximumPoolSize,可以减少对cpu、系统资源、上下文切换等资源的消耗,但是会明显降低吞吐量;使用小队列和大容量线程池,时刻保持cpu的忙碌状态,但是可能会遇到比较大的调度压力,也会降低吞吐量。

参考文档

ThreadPoolExecutor线程池的keepAliveTime
如何优雅的关闭Java线程池
线程池的三种队列区别:SynchronousQueue、LinkedBlockingQueue 和ArrayBlockingQueue
太完整了!这是我见过最详细的线程池讲解了
Why is creating a Thread said to be expensive?