Java并发之线程池

线程池介绍

线程池为线程生命周期的开销和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。

使用线程池的好处

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

类图

Executor 接口

只有一个方法void execute(Runnable command),用来提交一个任务,根据不同的Executor实现,可能是创建一个新工作线程或者是复用工作线程运行
根据线程池的容量或阻塞队列的容量决定是否放入阻塞队列中,或者拒绝接受传入的任务。

ExecutorService接口

继承自Executor,并提供了管理终止的方法shutdown()(不再接受新任务,等待任务结束)shutdownNow()(除此之外还尝试终止运行中的任务)
Future submit(Runnable command) 提供一个能查询结果的接口

ScheduledExecutorService接口

ScheduledExecutorService扩展ExecutorService接口并增加了schedule方法。调用schedule方法可以在指定的延时后执行一个Runnable或者Callable任务。ScheduledExecutorService接口还定义了按照指定时间间隔定期执行任务的scheduleAtFixedRate()方法和scheduleWithFixedDelay()方法。

ThreadPoolExecutor分析

继承自AbstractExecutorService,也是实现了ExecutorService接口。
重要字段:

1
2
3
4
5
6
7
8
9
10
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

ctl是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。
为什么要一个Integer表示两个值:在多线程的环境下,运行状态和有效线程数量往往需要保证统一,不能出现一个改而另一个没有改的情况,如果将他们放在同一个 AtomicInteger中,利用 AtomicInteger 的原子操作,就可以保证这两个值始终是统一的。Doug Lea大神牛逼!

1
2
3
4
5
RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务;
SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态);
STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;
TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
TERMINATED:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。
ctl计算方法
1
2
3
4
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean isRunning(int c) { return c < SHUTDOWN; }

构造方法

关键参数:

  1. corePoolSize:核心线程数量

  2. maximumPoolSize:最大线程数量

  3. workQueue:等待队列,当任务提交时,如果线程池中的线程数量大于等于corePoolSize的时候,把该任务封装成一个Worker对象放入等待队列
    当有新任务用execute()提交时会执行以下判断:

    1. 运行线程少于corePoolSize则创建新的线程来处理任务,即使线程池中其他线程是空闲的
    2. 当线程池中的线程数量大于等于corePoolSize且小于maximumPoolSize,则当workQueue满的时候才创建新的线程去处理任务
    3. 若设置的corePoolSize和maximumPoolSize相同,则创建的线程池大小固定,workQueue未满时直接放到workQueue,等待有空闲的线程去取任务
    4. 当运行线程数大于等于maximumPoolSize时,若workQueue已满,则执行handler来拒绝任务
      有如下的阻塞队列:
    5. SynchronousQueue(CachedThreadPool中使用)必须匹配生产者和消费者才会结束阻塞过程,否则一直阻塞到其他的线程执行take/offer,超时返回false。
    6. LinkedBlockingQueue 无界队列,链表实现。如果使用这种方式,那么线程池中能够创建的最大线程数就是corePoolSize,而maximumPoolSize就不会起作用了。当线程池中所有的核心线程都是RUNNING状态时,这时一个新的任务提交就会放入等待队列中。
    7. ArrayBlockingQueue 有界队列。使用该方式可以将线程池的最大线程数量限制为maximumPoolSize,这样能够降低资源的消耗,但同时这种方式也使得线程池对线程的调度变得更困难,因为线程池和队列的容量都是有限的值,所以要想使线程池处理任务的吞吐率达到一个相对合理的范围,又想使线程调度相对简单,并且还要尽可能的降低线程池对资源的消耗,就需要合理的设置这两个数量。
  4. keepAliveTime:线程池维护线程所允许的空闲时间。
    线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;

  5. threadFactory:用于创建自定义线程,指定优先级、名称。

  6. handler:它是RejectedExecutionHandler类型的变量,表示线程池的饱和策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。

    1. AbortPolicy:直接抛出异常,这是默认策略
    2. CallerRunsPolicy:用调用者所在的线程来执行任务;
    3. DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    4. DiscardPolicy:直接丢弃任务;

execute方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0) // 防止唯一的线程挂掉,无法从工作队列取数据
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

大概进行过程如下,addWorker中也有判断线程数的逻辑,加上这里双重校验线程池状态,所以有点混乱:

  1. 如果workerCount < corePoolSize,则创建并启动一个线程(addWorker)来执行新提交的任务;
  2. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
  3. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize(addWorker内检查)且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
  4. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
    即是,尝试下面的操作,满足则提交运行,新建线程或者是放入队列中:
  5. 核心线程数是否已满?
  6. 阻塞队列是否已满?
  7. 最大线程数是否已满?
  8. 都不满足则拒绝任务

addWorker方法

addWorker方法的主要工作是在线程池中创建一个新的线程并执行,firstTask参数 用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize。
主要思路:

  1. 第一层for,检查运行状态。获取当前线程池的运行状态,如果大于等于SHUTDOWN,则表示不再接收新的任务。并且以下条件有一个不满足则返回false:
    1. 线程池的状态等于SHUTDOWN(则可继续执行阻塞队列中的任务)
    2. firstTask为空(不能接受新的任务)
    3. 阻塞队列不为空(队列中没有任务,不需要再添加线程)
  2. 第二层for,尝试增加workCount,获取当前的线程数,若大于最大线程数(maximumPoolSize)不能再创建返回false。
  3. 否则尝试增加workCount(AtomicInteger,底层是CAS),成功跳出for,进入创建进程的代码块
  4. 失败则重新获取线程池的状态,于开始记录下的值对比,不等说明运行状态被改变,继续第一层for。相等时继续第二层for
  5. 此时已经完成了workCount的增加,开始执行Worker的创建代码。new Worker(firstTask)
  6. 获取一把可重入锁(mainLock),再次判断线程池的状态(是RUNNINGSHUTDOWNfirstTask为空),不符合则跳出try块,执行清理刚才的线程工作(addWorkerFaild(w))。返回false
  7. 否则添加刚才创建的worker到workers(HashSet)里,并开启调用worker里的thread成员start,开启线程执行,更新largestPoolSize(记录出现的最大线程数量),返回true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
// largestPoolSize记录着线程池中出现过的最大线程数量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

Worker类

线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象

1
2
3
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable

继承了AQS、实现了Runnable接口,并有firstTask(记录保存传入的任务),thread(在调用构造函数时用ThreadFactory创建的,保存处理任务的线程)。构造函数中执行了this.thread = getThreadFactory().newThread(this);新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。
Worker继承了AQS,使用AQS来实现独占锁的功能。不使用ReentrantLock(可重入):

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中
  2. 如果正在执行任务,则不应该中断线程
  3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断
  4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
  5. 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。

Worker.runWorker方法

run方法中调用了该方法来执行任务。主要工作是:

  1. while循环不断地通过getTask()方法获取任务;
  2. getTask()方法从阻塞队列中取任务;
  3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
  4. 调用task.run()执行任务;
  5. 如果task为null则跳出循环,执行processWorkerExit()方法;
  6. runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。
    这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类来实现。

getTask方法

getTask方法用来从阻塞队列中取任务
这里重要的地方是第二个if判断,目的是控制线程池的有效线程数量。由上文中的分析可以知道,在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,并且workQueue已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize即可。
什么时候会销毁?当然是runWorker方法执行完之后,也就是Worker中的run方法执行完,由JVM自动回收。
getTask方法返回null时,在runWorker方法中会跳出while循环,然后会执行processWorkerExit方法。

processWorkerExit方法

至此,processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束,如图所示:

tryTerminate方法

tryTerminate方法根据线程池状态进行判断是否结束线程池。

  1. 当前线程池的状态为以下几种情况时,直接返回,不能结束线程池:
    1. RUNNING,因为还在运行中,不能停止;
    2. TIDYING或TERMINATED,因为线程池中已经没有正在运行的线程了;
    3. SHUTDOWN并且等待队列非空,这时要执行完workQueue中的task;
  2. 如果线程数量不为0,则中断一个空闲的工作线程,并返回
  3. 获取this.mainLock锁,尝试设置状态TIDYING( 线程数已经为0,调用完terminated,即改变为TERMINATED),调用terminated()后设置状态为TERMINATED

shutdown方法

shutdown方法要将线程池切换到SHUTDOWN状态,并调用interruptIdleWorkers方法请求中断所有空闲的worker,最后调用tryTerminate尝试结束线程池。

interruptIdleWorkers方法

interruptIdleWorkers遍历workers中所有的工作线程,若线程没有被中断tryLock成功,就中断该线程。

shutdownNow方法

shutdownNow方法执行完之后调用tryTerminate方法,该方法在上文已经分析过了,目的就是使线程池的状态设置为TERMINATED。
shutdownNow方法与shutdown方法类似,不同的地方在于:

  1. 设置状态为STOP;
  2. 中断所有工作线程,无论是否是空闲的;
  3. 取出阻塞队列中没有被执行的任务并返回。
    shutdownNow方法执行完之后调用tryTerminate方法,该方法在上文已经分析过了,目的就是使线程池的状态设置为TERMINATED。

问题

为什么需要持有mainLock?因为workers是HashSet类型的,不能保证线程安全。

在runWorker方法中,执行任务时对Worker对象w进行了lock操作,为什么要在执行任务的时候对每个工作线程都加锁呢?
下面仔细分析一下:

  • 在getTask方法中,如果这时线程池的状态是SHUTDOWN并且workQueue为空,那么就应该返回null来结束这个工作线程,而使线程池进入SHUTDOWN状态需要调用shutdown方法;
  • shutdown方法会调用interruptIdleWorkers来中断空闲的线程,interruptIdleWorkers持有mainLock,会遍历workers来逐个判断工作线程是否空闲。但getTask方法中没有mainLock;
  • 在getTask中,如果判断当前线程池状态是RUNNING,并且阻塞队列为空,那么会调用workQueue.take()进行阻塞;
  • 如果在判断当前线程池状态是RUNNING后,这时调用了shutdown方法把状态改为了SHUTDOWN,这时如果不进行中断,那么当前的工作线程在调用了workQueue.take()后会一直阻塞而不会被销毁,因为在SHUTDOWN状态下不允许再有新的任务添加到workQueue中,这样一来线程池永远都关闭不了了;
  • 由上可知,shutdown方法与getTask方法(从队列中获取任务时)存在竞态条件;
  • 解决这一问题就需要用到线程的中断,也就是为什么要用interruptIdleWorkers方法。在调用workQueue.take()时,如果发现当前线程在执行之前或者执行期间是中断状态,则会抛出InterruptedException,解除阻塞的状态;
  • 但是要中断工作线程,还要判断工作线程是否是空闲的,如果工作线程正在处理任务,就不应该发生中断
  • 所以Worker继承自AQS,在工作线程处理任务时会进行lock,interruptIdleWorkers在进行中断时会使用tryLock来判断该工作线程是否正在处理任务,如果tryLock返回true,说明该工作线程当前未执行任务,这时才可以被中断。

总结

  1. 分析了线程的创建,任务的提交,状态的转换以及线程池的关闭;
  2. 这里通过execute方法来展开线程池的工作流程,execute方法通过corePoolSize,maximumPoolSize以及阻塞队列的大小来判断决定传入的任务应该被立即执行,还是应该添加到阻塞队列中,还是应该拒绝任务。
  3. 介绍了线程池关闭时的过程,也分析了shutdown方法与getTask方法存在竞态条件;
  4. 在获取任务时,要通过线程池的状态来判断应该结束工作线程还是阻塞线程等待新的任务,也解释了为什么关闭线程池时要中断工作线程以及为什么每一个worker都需要lock。

参考

https://juejin.im/entry/58fada5d570c350058d3aaad

Author: whllhw
Link: https://whllhw.ml/posts/2019/05/09/Java并发之线程池/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.