Java线程池之ThreadPoolExecutor

如下图所示,ThreadPoolExecutor实现的顶层接口是Executor ,本文主要介绍了如何创建线程池、线程池底层原理等内容,其中重点内容是线程的生命周期。

关于ThreadPoolExecutor的具体方法和属性,本文不作详细介绍,可以直接查阅Java官方API文档。ThreadPoolExecutor中比较特殊的一个字段是ctl。它是一个复合属性,保存两部分信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。

创建线程池

创建ThreadPoolExecutor线程池有两种方式:new和Executors工厂类。

  • 直接new

    ThreadPoolExecutor类提供了很4种构造方法,构造方法的参数包括:

    • corePoolSize:线程池中保留的核心线程数量
    • maximumPoolSize:线程池允许的最大线程数量
    • keepAliveTime:线程数量超过核心数量后,空闲线程最多存活多长时间
    • workQueue:任务队列,有很多类型,比如阻塞队列、无边界队列等等
    • threadFactory:线程池用来创建线程的线程工程,用户可以自定义,也可以用系统默认的
    • handler:阻塞队列满了,并且没有空闲线程时,线程池需要采取的策略。默认有4钟,用户也可以自定义。
  • Executors工厂类

    Executors提供了很多静态工厂方法,用于创建Executor、ThreadFactory、Callable等实例对象。里面提供了多种用于创建ThreadPoolExecutor的静态方法:

    • newFixedThreadPool

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      public static ExecutorService newFixedThreadPool(int nThreads) {
      return new ThreadPoolExecutor(nThreads, nThreads,
      0L, TimeUnit.MILLISECONDS,
      new LinkedBlockingQueue<Runnable>());
      }
      public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
      return new ThreadPoolExecutor(nThreads, nThreads,
      0L, TimeUnit.MILLISECONDS,
      new LinkedBlockingQueue<Runnable>(),
      threadFactory);
      }
    • newSingleThreadExecutor

      1
      2
      3
      4
      5
      6
      public static ExecutorService newSingleThreadExecutor() {
      return new FinalizableDelegatedExecutorService
      (new ThreadPoolExecutor(1, 1,
      0L, TimeUnit.MILLISECONDS,
      new LinkedBlockingQueue<Runnable>()));
      }
    • newCachedThreadPool

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      public static ExecutorService newCachedThreadPool() {
      return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
      60L, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>());
      }
      public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
      return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
      60L, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(),
      threadFactory);
      }

使用线程池

ThreadPoolExecutor是如何运行,如何同时维护线程和执行任务的呢?其运行机制如下图所示:

主要由两部分组成:线程池和任务缓冲队列。用户提交一个新任务后,判断能否立即分配线程执行,能就执行任务,不能则将任务放入等待队列中。整个线程池在运行过程中具有5个状态:

  1. RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务;

  2. SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态);

  3. STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;

  4. TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。

  5. TERMINATED:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。

    进入TERMINATED的条件如下:

    • 线程池不是RUNNING状态;
    • 线程池状态不是TIDYING状态或TERMINATED状态;
    • 如果线程池状态是SHUTDOWN并且workerQueue为空;
    • workerCount为0;
    • 设置TIDYING状态成功。

下图展示了线程池的状态转换过程:

任务提交:submit方法

提交任务用的是AbstractExecutorService.submit(),可以获取执行完的返回值, 而ThreadPoolExecutor AbstractExecutorService.submit()的子类,所以submit方法也是ThreadPoolExecutor的方法。

1
2
3
4
5
6
7
8
// submit方法在AbstractExecutorService中的实现
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 通过submit方法提交的Callable任务会被封装成了一个FutureTask对象。
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

通过submit方法提交的Callable任务会被封装成了一个FutureTask对象。通过Executor.execute方法提交FutureTask到线程池中等待被执行,最终执行的是FutureTask的run方法.

任务执行:execute方法

execute()方法源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* clt记录着runState和workerCount
*/
int c = ctl.get();
/*
* workerCountOf方法取出低29位的值,表示当前活动的线程数;
* 如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中;
* 并把任务添加到该线程中。
*/
if (workerCountOf(c) < corePoolSize) {
/*
* addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断;
* 如果为true,根据corePoolSize来判断;
* 如果为false,则根据maximumPoolSize来判断
*/
if (addWorker(command, true))
return;
/*
* 如果添加失败,则重新获取ctl值
*/
c = ctl.get();
}
/*
* 如果当前线程池是运行状态并且任务添加到队列成功
*/
if (isRunning(c) && workQueue.offer(command)) {
// 重新获取ctl值
int recheck = ctl.get();
// 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了,
// 这时需要移除该command
// 执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回
if (! isRunning(recheck) && remove(command))
reject(command);
/*
* 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法
* 这里传入的参数表示:
* 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动;
* 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;
* 如果判断workerCount大于0,则直接返回,在workQueue中新增的command会在将来的某个时刻被执行。
*/
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*
* 如果执行到这里,有两种情况:
* 1. 线程池已经不是RUNNING状态;
* 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
* 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
* 如果失败则拒绝该任务
*/
else if (!addWorker(command, false))
reject(command);
}

这里有一个点就是:判断线程池状态的时候进行了double check,任务添加到队列前后都进行了状态检查。因为在多线程环境下,线程池的状态时刻在变化,而ctl.get()是非原子操作,很有可能刚获取了线程池状态后线程池状态就改变了。 倘若没有double check,万一线程池处于非running状态(在多线程环境下很有可能发生),那么command永远不会执行。

在执行execute()方法时如果状态一直是RUNNING时,执行过程如下:

创建线程:addWorker方法

addWorker方法的主要工作是在线程池中创建一个新的线程并执行,firstTask参数 用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 获取运行状态
int rs = runStateOf(c);

/*
* 这个if判断
* 如果rs >= SHUTDOWN,则表示此时不再接收新任务;
* 接着判断以下3个条件,只要有1个不满足,则返回false:
* 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
* 2. firsTask为空
* 3. 阻塞队列不为空
*
* 首先考虑rs == SHUTDOWN的情况
* 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回false;
* 然后,如果firstTask为空,并且workQueue也为空,则返回false,
* 因为队列中已经没有任务了,不需要再添加线程了
*/
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取线程数
int wc = workerCountOf(c);
// 如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false;
// 这里的core是addWorker方法的第二个参数,如果为true表示根据corePoolSize来比较,
// 如果为false则根据maximumPoolSize来比较。
//
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 尝试增加workerCount,如果成功,则跳出第一个for循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果增加workerCount失败,则重新获取ctl的值
c = ctl.get(); // Re-read ctl
// 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 根据firstTask来创建Worker对象
w = new Worker(firstTask);
// 每一个Worker对象都会创建一个线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN表示是RUNNING状态;
// 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。
// 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers是一个HashSet
workers.add(w);
int s = workers.size();
// largestPoolSize记录着线程池中出现过的最大线程数量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

从上面代码可以看到,addWork()方法的核心是创建一个Worker实例,然后在t.start()处启动该worker。需要注意的是,这里启动线程会调用Worker类中的run方法,Worker本身也实现了Runnable接口,所以 一个Worker类型的对象也是一个线程。 整个添加线程的流程如下:

Worker类

线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象,看一下Worker的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

Worker继承了AQS, 并实现了Runnable接口,注意其中的firstTask和thread属性:

  • firstTask: 用它来保存传入的任务
  • thread: 在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程

这里有两个问题,需要关注:

  1. 为什么Worker要加锁?

    简单来说就是,防止线程在执行任务过程中被中断。从后文介绍的runWorker源代码中我们可以看到,线程执行任务之前会上锁,任务完成后会释放锁。

    线程池执行中断方法时,会先获取该线程的锁,如果可以获取到,说明此时线程处于空闲状态,否则线程正在执行任务,禁止被中断。

  2. 为什么通过继承AQS来实现独占锁功能?

    可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:

    1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
    2. 如果正在执行任务,则不应该中断线程;
    3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
    4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
    5. 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程

    所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。

runWorker方法

从前面我们知道线程池中的所有线程都被封装成了Worker实例, 在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 获取第一个任务
Runnable task = w.firstTask;
w.firstTask = null;
// 允许中断
w.unlock(); // allow interrupts
// 是否因为异常退出循环
boolean completedAbruptly = true;
try {
// 如果task为空,则通过getTask来获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

getTask方法

观察上面runWorker方法的源代码,其中一个非常重要的环节就是调用getTask方法,获取可执行任务。getTask源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
private Runnable getTask() {
// timeOut变量的值表示上次从阻塞队列中取任务时是否超时
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/*
* 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断:
* 1. rs >= STOP,线程池是否正在stop;
* 2. 阻塞队列是否为空。
* 如果以上条件满足,则将workerCount减1并返回null。
* 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// timed变量用于判断是否需要进行超时控制。
// allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
// wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
// 对于超过核心线程数量的这些线程,需要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

/*
* wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
* timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
* 接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
* 如果减1失败,则返回重试。
* 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/*
* 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null;
* 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。
*
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 如果 r == null,说明已经超时,timedOut设置为true
timedOut = true;
} catch (InterruptedException retry) {
// 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
timedOut = false;
}
}
}

getTask方法功能非常简单,就是从任务队列里面获取任务。但是如果任务队列为空怎么办?阅读上述源代码后半部分的try…catch…,可以看到这时候keepAliveTime就派上用场了。

在研究线程池的时候,我们心里可能有个疑惑:我们知道线程池中的线程分为核心线程和非核心线程,核心线程是不会被销毁的,而非核心线程在空闲状态下超过一定时间会被销毁。线程池是如何实现这一功能的呢?getTask方法的源码给了我们答案。

  • 变量boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;表示是否对当前线程进行超时控制
  • timed==true,获取任务时候执行 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),队列为空,超时后返回null,当runWork方法中获取null任务时,运行结束,线程也就被销毁了
  • timed==false,获取任务时候执行workQueue.take(),队列为空时,会一直阻塞,因此getTask方法不会运行结束,runWork方法也就不会结束,线程也就不会被销毁。

默认allowCoreThreadTimeOut 参数为false,这样当运行任务数小于核心线程数时,timed变量始终为false,就不会再销毁线程。

销毁线程

什么时候会销毁?当然是runWorker方法执行完之后,也就是Worker中的run方法执行完,由JVM自动回收。

getTask方法返回null时,在runWorker方法中会跳出while循环,然后会执行processWorkerExit方法。processWorkerExit方法源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1;
// 如果线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操作,这里就不必再减了。
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//统计完成的任务数
completedTaskCount += w.completedTasks;
// 从workers中移除,也就表示着从线程池中移除了一个工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 根据线程池状态进行判断是否结束线程池
tryTerminate();
int c = ctl.get();
/*
* 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker;
* 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;
* 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
*/
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

线程生命周期

以上就是整个工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束,如图所示:

关闭线程池

前面我们介绍了如何创建线程池,以及线程池底层运行原理,那么如何关闭线程池呢?

tryTerminate方法

在介绍销毁线程的processWorkerExit方法时,其源代码中间部分有一行代码tryTerminate(),即尝试关闭线程池。 tryTerminate方法根据线程池状态进行判断是否结束线程池,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/*
* 当前线程池的状态为以下几种情况时,直接返回:
* 1. RUNNING,因为还在运行中,不能停止;
* 2. TIDYING或TERMINATED,因为线程池中已经没有正在运行的线程了;
* 3. SHUTDOWN并且等待队列非空,这时要执行完workQueue中的task;
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果线程数量不为0,则中断一个空闲的工作线程,并返回
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 这里尝试设置状态为TIDYING,如果设置成功,则调用terminated方法
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// terminated方法默认什么都不做,留给子类实现
terminated();
} finally {
// 设置状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}

shutdown方法

shutdown方法要将线程池切换到SHUTDOWN状态,并调用interruptIdleWorkers方法请求中断所有空闲的worker,最后调用tryTerminate尝试结束线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 安全策略判断
checkShutdownAccess();
// 切换状态为SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试结束线程池
tryTerminate();
}

其中核心一句代码就是中断空闲线程:interruptIdleWorkers(),下面分析下这个方法。

interruptIdleWorkers方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

interruptIdleWorkers遍历workers中所有的工作线程,若线程没有被中断tryLock成功,就中断该线程。 这里有两个知识点:

  1. 和前文我们提到的一个问题“为什么getTask方法中执行任务前后要加锁和释放锁”呼应上了

    我们可以看到在for循环的第一个if条件中一项就是w.tryLock()即获取当前worker的锁,如果当前worker正在执行任务,那么getTask方法中就会获取锁,这里的w.tryLock()就无法获取锁,进而无法中断该线程。

    这样就能达到我们的目的:正在执行任务的线程禁止被中断。

  2. 为什么需要持有mainLock?

    因为workers是HashSet类型的,不能保证线程安全。

shutdownNow方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
// 中断所有工作线程,无论是否空闲
interruptWorkers();
// 取出队列中没有被执行的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

shutdownNow方法与shutdown方法类似,不同的地方在于:

  1. 设置状态为STOP;
  2. 中断所有工作线程,无论是否是空闲的;
  3. 取出阻塞队列中没有被执行的任务并返回。

shutdownNow方法执行完之后调用tryTerminate方法,该方法在上文已经分析过了,目的就是使线程池的状态设置为TERMINATED。

总结

本文主要介绍了一下知识点:

  1. 如何创建线程池?

    有两种方法:new和Executors工厂类。尽量不要使用Executors去创建线程池,因为其本质也是调用new ThreadPoolExecutor(),但是由于参数固定,存在很多缺陷。

    • newFixedThreadPool和newSingleThreadExecutor:  主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。

    • newCachedThreadPool和newScheduledThreadPool:  主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。

  2. 线程池中线程的生命周期

    threadpool-lifecycle.png

    线程池中线程的生命周期如上图所示,这里面有很多知识点。比如核心线程和非核心线程的销毁,getTask方法的细节,runWorker方法的细节,为什么Worker要加锁之类的。

  3. 线程池关闭

    怎么关闭线程池?shutdown和shutdownNow有什么区别?

参考资料

  1. http://www.ideabuffer.cn/2017/04/04/
  2. https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
  3. https://pdai.tech/md/java/thread/java-thread-x-juc-executor-ThreadPoolExecutor.html
打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2021-2022 Yin Peng
  • 引擎: Hexo   |  主题:修改自 Ayer
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信