最新要闻
- 全球最资讯丨A卡有能力跟RTX 4090正面刚 AMD:不做是因为太贵、功耗高
- 微头条丨深山红烛 照亮希望
- 每日快报!安全性不行 Win7系统真的别用了 又一个游戏《堡垒之夜》放弃支持
- 每日快播:童年回来了!《灌篮高手》发布流川枫角色海报
- 今日热闻!光速破发?iPhone 14黄色版还未开售便降价600元
- 华硕发布ROG Strix Impact III鼠标:双手通用、可更换微动插槽
- 真香!重庆购买HUAWEI问界M7直补3万:起售仅25.98万
- 当前简讯:滴滴快车预付车费什么时候退款_滴滴快车预付车费什么意思
- 今日播报!客服回应疯狂小杨哥带货翻车:有巡查、违规会处罚
- 怕了吗?长城公布“1000万悬赏计划”:严厉打击网络水军
- 天天通讯!伊朗外长:伊沙恢复外交关系将为两国和地区发展注入巨大动能
- 世界快讯:日系车掀起买一送一热潮:买皓影插混送飞度、买日产楼兰送轩逸
- 天天热资讯!50万以内最舒适二排 理想L7正式交付:31.98万起售
- 环球新资讯:10年分红1000多亿!董明珠鼓励员工“砸锅卖铁”买格力股票
- 资讯推荐:全球最强!传音发布260W有线、110无线快充:8分钟充满
- 世界观热点:美国科学家改变人类科技、终极能源即将实现?真相来了!
手机
iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?
警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案
- iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?
- 警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案
- 男子被关545天申国赔:获赔18万多 驳回精神抚慰金
- 3天内26名本土感染者,辽宁确诊人数已超安徽
- 广西柳州一男子因纠纷杀害三人后自首
- 洱海坠机4名机组人员被批准为烈士 数千干部群众悼念
家电
有赞一面:还有任务没执行,线程池被关闭怎么办?
说在前面
在40岁老架构师 尼恩的读者交流群(50+)中,最近有小伙伴拿到了一线互联网企业如极兔、有赞、希音的面试资格,遇到一几个很重要的面试题:
还有线程池正在执行的任务和线程,如果线程池shutdown怎么怎么办
(相关资料图)
与之类似的、其他小伙伴遇到过的问题还有:
如果还有任务没执行,线程池被关闭了,怎么办?
这里尼恩给大家做一下系统化、体系化的线程池梳理,使得大家可以充分展示一下大家雄厚的 “技术肌肉”,让面试官爱到 “不能自已、口水直流”。
也一并把这个题目以及参考答案,收入咱们的 《尼恩Java面试宝典》V60版本,供后面的小伙伴参考,提升大家的 3高 架构、设计、开发水平。
注:本文以 PDF 持续更新,最新尼恩 架构笔记、面试题 的PDF文件,请从这里获取:码云
一:首先回顾线程池线程池的5种运行状态
ThreadPoolExecutor 使用 runState (运行状态) 变量,管理线程池的生命周期,
runState 一共有以下5种取值:
(1)RUNNING:接收新的任务,并对任务队列里的任务进行处理;
(2)SHUTDOWN:不再接收新的任务,但是会对任务队列中的任务进行处理;
(3)STOP:不接收新任务,也不再对任务队列中的任务进行处理,并中断正在处理的任务;
(4)TIDYING:所有任务都已终止,线程数为0,在转向TIDYING状态的过程中,线程会执行terminated()钩子方法,钩子方法是指在本类中是空方法,而在子类中进行具体实现的方法;
(5)TERMINATED:terminated()方法执行结束后会进入这一状态,表示线程池已关闭。
与线程池关闭有关的状态,不是1个,而是有4个:
状态(2)SHUTDOWN:不再接收新的任务,但是会对任务队列中的任务进行处理;
状态(3)STOP:不接收新任务,也不再对任务队列中的任务进行处理,并中断正在处理的任务;
状态(4)TIDYING:所有任务都已终止,线程数为0,在转向TIDYING状态的过程中,线程会执行terminated()钩子方法,钩子方法是指在本类中是空方法,而在子类中进行具体实现的方法;
状态(5)TERMINATED:terminated()方法执行结束后会进入这一状态,表示线程池已彻底关闭。
从这么多的状态可以知道,线程池的关闭,不是一个简单的问题了。
二:线程池停止相关的五个方法
线程池停止相关的五个方法:
(1)shutdown方法:柔和关闭线程池;
(2)shutdownNow方法:暴力关闭线程池,无论线程池中是否有剩余任务,立刻彻底停止线程池
(3)isShutdown方法:查看线程池是否已进入停止状态了
(4)isTerminated方法:查看线程池是否已经彻底停止了
(5)awaitTermination方法:判断在等待的时间内,线程池是否彻底停止
其中终止线程池主要有2个:
(1)shutdown方法:柔和关闭线程池;
shutdown()后线程池将变成shutdown状态,此时不接收新任务,但会处理完正在运行的 和 在 workQueue 阻塞队列中等待处理的任务。
(2)shutdownNow方法:暴力关闭线程池
无论线程池中是否有剩余任务,shutdownNow()立刻彻底停止线程池。shutdownNow()后线程池将变成stop状态,此时不接收新任务,不再处理在阻塞队列中等待的任务,还会尝试中断正在处理中的工作线程。
其中对线程池关闭状态进行检查的方法,主要有3个:
(3)isShutdown方法:查看线程池是否已进入停止状态了
(4)isTerminated方法:查看线程池是否已经彻底停止了
(5)awaitTermination方法:判断在等待的时间内,线程池是否彻底停止
(1)shutdown柔和关闭线程池;
shutdown柔和关闭线程池,有两个要点:
(1)shutdown方法是关闭线程池;
(2)但是,shutdown只是初始化整个关闭过程, 执行完这个方法后,线程池不一定会立即停止;
所以,在我们调用了shutdown方法后,线程池就知道了 停止线程池的意图;而并不是我们调用shutdown方法后,整个线程池就能停的。比如,线程池在执行到一半时,线程中有正在执行的任务,队列中也可能有等待被执行的任务,线程池需要等这些任务执行完了,才能真正停止。
当然,在我们调用了shutdown方法后,如果还有新的任务过来,线程池就会拒绝。
演示案例,在尼恩的《Java 高并发核心编程 卷2 加强版》随时源码中,有大量的 shutdown 使用案例。
在超级牛逼的rocketmq 源码中,也是shutdown 关闭线程池,具体如下:
说明:
(1)还是强调一下:我们执行了shutdown方法,isShutdown方法就会返回true;isShutdown方法返回true,仅仅代表线程池处于停止状态了,不代表线程池彻底停止了(因为,线程池进入停止状态后,还要等待【正在执行的任务以及队列中等待的任务】都执行完后,才能彻底终止);
(2)那么怎么看,线程池是否彻底停止了呐? 稍微晚点,要讲isTerminated()方法,可以实现这个需求;
(2)shutdownNow 粗暴关闭线程
shutdownNow方法:无论线程池中是否有剩余任务,立刻彻底停止线程池;
如何一个粗暴法呢?
(1) 正在执行任务的线程会被中断;
(2)队列中正在排队的任务,会返回;
来看一个例子:向3个线程的固定大小线程池, 提交10个任务,每个任务 500ms!
执行结果如下:
另外还有 7个 任务,没有来得及执行。
如果数据和任务都不重要,可以 shutdownNow 粗暴关闭线程,否则,这就太野蛮了。
(3)isShutdown方法:查看线程池是否已进入停止状态了;
当调用shutdown方法关闭线程后,线程不是立即关闭,仅仅是启动了关闭流程,不再接收新的任务;
问题是,如何查看线程池是否已进入停止状态呢? 难道,我们只有通过 向线程池添加任务的方式 才能看到shutdown确确实实被执行了吗?
可以通过 isShutdown方法 查看线程池是否已进入停止状态了。只要开始执行了shutdown方法,isShutdown方法就会返回true;
(4)isTerminated方法:判停, 注意是阻塞判停
threadPool.isTerminated方法:查看线程池是否已经彻底停止了
threadPool.isTerminated() 常用来判断线程池是否结束,线程池pool的状态是否为Terminated,如果是,表示线程池pool彻底终止, threadPool.isTerminated() 返回为TRUE
当需要用到isTerminated()函数判断线程池中的所有线程是否执行完毕时候,不能直接使用该函数,
必须在shutdown()方法关闭线程池之后才能使用,否则isTerminated()永不为TRUE,而且线程将一直阻塞在该判断的地方,导致程序最终崩溃。
(5)awaitTermination 等待停止
awaitTermination方法:判断在等待的时间内,线程池是否彻底停止。awaitTermination第一个参数是long类型的超时时间,第二个参数可以为该时间指定单位。
awaitTermination
的功能如下:
- 阻塞当前线程,等已提交和已执行的任务都执行完,解除阻塞
- 当等待超过设置的时间,检查线程池是否停止,如果停止返回
true
,否则返回false
,并解除阻塞
awaitTermination 一般与shutdown()方法结合使用,下面是一个例子:
执行结果如下:
例子中,线程池的有效执行时间为20S,20S之后不管子任务有没有执行完毕,都要关闭线程池。
注意:
与shutdown()方法结合使用时,尤其要注意的是shutdown()方法必须要在awaitTermination()方法之前调用,该方法才会生效。否则会造成死锁。
关闭线程池的正确姿势
关闭线程池的正确姿势= shutdown方法 +awaitTermination方法 组合关闭。
(1)shutdown方法:柔和的关闭ExecutorService,
当此方法被调用时,pool停止接收新的任务并且等待已经提交的任务(包含提交正在执行和提交未执行)执行完成。当所有提交任务执行完毕,线程池即被关闭。
(2)awaitTermination 方法:
接收人timeout和TimeUnit两个参数,用于设定超时时间及单位。
当等待超过设定时间时,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。
三:线程池关闭的源码分析
接下来,分析一下线程池关闭相关的方法的源码,包括各个方法之间的逻辑关系,调用关系和产生的效果。
再次回顾:线程池的5种运行状态
ThreadPoolExecutor 使用 runState (运行状态) 变量,管理线程池的生命周期,
线程池关闭过程中,会涉及到 频繁的 runState 运行状态转化,
所以,首先需要了解线程池的各种 runState 运行状态及 各种 runState 之间的转化关系,
runState 一共有以下5种取值:
(1)RUNNING:接收新的任务,并对任务队列里的任务进行处理;
(2)SHUTDOWN:不再接收新的任务,但是会对任务队列中的任务进行处理;
(3)STOP:不接收新任务,也不再对任务队列中的任务进行处理,并中断正在处理的任务;
(4)TIDYING:所有任务都已终止,线程数为0,在转向TIDYING状态的过程中,线程会执行terminated()钩子方法,钩子方法是指在本类中是空方法,而在子类中进行具体实现的方法;
(5)TERMINATED:terminated()方法执行结束后会进入这一状态,表示线程池已关闭。
运行状态的转化条件和转化关系如下所示:
shutdown操作之后,经历三个状态:
(1)首先最重要的一点变化就是线程池状态变成了SHUTDOWN。
该状态是开始关闭线程池之后,从RUNNING改变状态经过的第一个状态,
(2)等任务队列和线程数为0之后,进入TIDYING第2个状态,
(3)等内部调用的terminated()方法执行结束后,会进入TERMINATED状态,表示线程池已关闭
shutdownNow操作之后,经历3个状态:
(1)直接进STOP,不管任务队列里边是否还有任务要处理,尝试停止所有活动的正在执行的任务,停止等待任务的处理,并排空任务列表
(2)等任务队列和线程数为0之后,进入TIDYING第2个状态,
(3)等内部调用的terminated()方法执行结束后,会进入TERMINATED状态,表示线程池已关闭
源码分析1:shutdown()柔和终止线程池
shutdown()柔和终止线程池的核心流程如下:
step1、抢占线程池的主锁
线程池的主锁是 mainLock ,是可重入锁,
当要操作workers set这个保持线程的HashSet时,需要先获取 mainLock,
另外,当要处理largestPoolSize、completedTaskCount这类统计数据时需要先获取mainLock
step 2、权限校验
java 安全管理器校验 , 判断调用者是否有权限shutdown线程池
step 3、更新线程池状态为shutdown
使用CAS操作将线程池状态设置为shutdown,
shutdown之后将不再接收新任务
step 4、中断所有空闲线程
调用 interruptIdleWorkers() 打断所有的空闲工作线程,即workerQueue.take()阻塞的线程
step 5、onShutdown(),
调用子类回调方法,基类默认为空方法
子类回调方法 可以在shutdown()时做一些处理
子类 ScheduledThreadPoolExecutor中实现了这个方法,
step 6、解锁
step 7、尝试终止线程池 tryTerminate()
public void shutdown() { final ReentrantLock mainLock = this.mainLock; // step1、抢占线程池的主锁 mainLock.lock(); try { // step 2、权限校验 java 安全管理器校验 checkShutdownAccess(); //step 3、更新线程池状态为shutdown advanceRunState(SHUTDOWN); // step 4、 打断所有的空闲工作线程,即workerQueue.take()阻塞的线程 interruptIdleWorkers(); // 调用子类回调方法,基类默认为空方法 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { //**step 6、解锁** mainLock.unlock(); } //step 7、尝试终止线程池 tryTerminate() tryTerminate();}
最重要的3个步骤是:
step 3 更新线程池状态为shutdown
step 4 中断所有空闲线程、
step 7 tryTerminated()尝试终止线程池
接下来,介绍step 4 、step 7的核心源码
step4: 中断所有空闲线程 interruptIdleWorkers()
step4 是 调用 interruptIdleWorkers() 中断所有空闲线程 完成的。有两个问题:
(1)什么是空闲线程?
(2)interruptIdleWorkers() 是怎么中断空闲线程的?
/** * 中断唤醒后,可以判断线程池状态是否变化来决定是否继续 * * onlyOne如果为true,最多interrupt一个worker * 只有当终止流程已经开始, * 但线程池还有worker线程时,tryTerminate()方法会做调用onlyOne为true的调用 * (终止流程已经开始指的是:shutdown状态 且 workQueue为空,或者 stop状态) * 在这种情况下,最多有一个worker被中断,为了传播shutdown信号,以免所有的线程都在等待 * 为保证线程池最终能终止,这个操作总是中断一个空闲worker * 而shutdown()中断所有空闲worker,来保证空闲线程及时退出 */private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); //上锁 try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); //解锁 }}
interruptIdleWorkers() 首先会获取mainLock锁,因为要迭代workers 集合 ,
然后,中断在等待任务的线程(没有上锁的),在中断每个worker前,需要做两个判断:
1、线程是否已经被中断,是就什么都不做
2、worker.tryLock() 是否成功
第二个判断worker.tryLock()比较重要,因为Worker类除了实现了可执行的Runnable,也继承了AQS,
也就说,worker 本身也是一把锁.
尼恩提示,AQS的知识,非常重要,具体请阅读 《Java 高并发核心编程 卷2 加强版》。
该书对 AQS 作为浅显易懂的介绍, 被很多小伙伴称之为最为易懂的版本,pdf 是免费获取的。
worker.tryLock() 为什么要获取worker的锁呢?
Woker类在执行任务的工作线程, 都是上了worker锁的。
在 runWorker()方法中, worker 从pool 中获取task 并执行,但是执行的过程中,涉及到锁:
(1)一个worker 每次通过 getTask() 方法从 pool 获取到task 之后,在执行 task.run() 之前,都需要 worker.lock()上锁,
(2)task 运行结束后 unlock 解锁,
所以说, 只要是 正在执行任务的工作线程, 都是上了worker锁的
参考的源码如下:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }}
回顾一下前面 interruptIdleWorkers 的代码,有一个核心要点:
interruptIdleWorkers 中断 work 线程之前, 需要先work.tryLock()获取worker锁,
这就 意味着正在执行task的worker线程,不能被中断。
为啥呢? worker 锁比较特殊: 核心的要点是 worker 锁是不可重入的 , 所以 不管是不是 当前线程,worker.tryLock() 都失败。
怎么证明 worker 锁是不可重入的,可以去看源码: worker 是线程池 ThreadPoolExecutor 的内部类,继承了 AbstractQueuedSynchronizer 抽象队列同步器, 核心的方法如下:
尼恩提示,
这里关键的知识点,还是AQS
所以 AQS 非常重要,具体请阅读 《Java 高并发核心编程 卷2 加强版》。
该书对 AQS 作为浅显易懂的介绍, 被很多小伙伴称之为最为易懂的版本,pdf 是免费获取的。
所以说,shutdown() 只有对能获取到worker锁的空闲线程发送中断信号, 对于忙的worker线程, 要等到拿到锁之后,才能去发中断信号。
由此可以 将worker划分为:
1、闲的worker:没有执行任务的worker,比如正在从workQueue阻塞队列中获取任务的worker,
2、忙的worker:正在task.run()执行任务的worker
线程被中断之后,如何处理
还有一点需要注意: 对于闲着的但是正在被阻塞在getTask()的worker,是可以被中断的,但是在被中断后会抛出InterruptedException,runWorker的while循环被破坏,从而 不再阻塞获取任务
worker 捕获中断异常后,将跳出 while循环,进入 processWorkerExit 方法,
runWorker 的核心代码如下:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //...... 省略n行,这里 执行拿到的任务,并处理任务异常 } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // InterruptedException 中断发生之后, 走到这里 processWorkerExit(w, completedAbruptly); }}
这里特别注意, 一旦出了那个while 循环, 这个thread 的执行,即将 结束了
换句话说,一旦worker 捕获中断异常后,worker 所绑定的thread将跳出 while循环,即将 结束了
具体请参考下面:
虽然worker 绑定的线程,即将 结束了。但是在结束之前,还要执行一下 processWorkerExit方法
processWorkerExit方法解析
来看看 processWorkerExit(Worker w, boolean completedAbruptly) 方法解析
1.参数说明:
Worker w : 工作线程包装器。
boolean completedAbruptly :默认值为true,
只有调用getTask()方法,返回null,线程正常退出,会将completedAbruptly设置为false。
当task.run()任务运行过程中抛出异常,线程异常退出,completedAbruptly还是默认值true。
2.执行过程:
- 统计执行完成的任务个数。
- tryTerminate() 尝试调用terminated()方法。
- RUNNING | SHUTDOWN 状态下,保证工作线程数量 >= corePoolSize,如果不满足,添加新线程。
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 线程异常退出,修改工作线程数量。 if (completedAbruptly) // If abrupt, then workerCount wasn"t adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 统计执行完成任务个数 completedTaskCount += w.completedTasks; // 移除当前worker workers.remove(w); } finally { mainLock.unlock(); } // 尝试调用terminated() 方法 tryTerminate(); int c = ctl.get(); //如果线程 为 RUNNING | SHUTDOWN 状态下 , 要保证最小工作线程数。 if (runStateLessThan(c, STOP)) { // 线程正常退出,需要退出救急线程 // 线程异常退出,直接添加新线程 if (!completedAbruptly) { // 判断最小线程数量,一般是核心线程数量。 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) // 最少一个线程 min = 1; if (workerCountOf(c) >= min) // 不需要添加新线程 return; // replacement not needed } // 添加新线程,在RUNNING | SHUTDOWN 状态下,不至于一个线程也没有了,要保证 剩余的任务干完 addWorker(null, false); }}
核心的工作为:
(1) 从pool 的 workers 集合移除当前worker
//Set containing all worker threads in pool. Accessed only when holding mainLock. private final HashSet workers = new HashSet();
(2)尝试调用 pool的 terminated() 方法
这个方法中,首先判断 pool的状态,如果为 RUNNING || (线程池已经被关闭【TIDYING | TERMINATED】) || (SHUTDOWN && 任务队列不为空),直接返回。
这个方法中,然后判断 工作线程数,如果不为0(自己不是最后一个工作线程), 随机打断一个空闲线程,直接返回。
否则,这一个线程修改线程池状态为TIDYING,修改线程状态为TERMINATED,调用terminated()方法,唤醒等待pool终止的线程 ,也就是awaitTermination() 的线程。
尼恩提示:pool的 terminated() 方法,稍微晚点介绍。
(3) 保证最小工作线程数
上面的代码中,使用runStateLessThan(c, STOP) 判断线程的状态 是否比 STOP 小,那么比STOP 小的是谁呢?
(1)RUNNING状态
(2)SHUTDOWN 状态
ThreadPoolExecutor用一个AtomicInteger字段保存了2个状态
- workerCount (有效线程数) (占用29位)
- runState (线程池运行状态) (占用高3位)
//标记线程数和状态的混合值private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//线程位数private static final int COUNT_BITS = Integer.SIZE - 3;//线程最大个数(低29位)00011111111111111111111111111111private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; //(高3位):11100000000000000000000000000000private static final int RUNNING = -1 << COUNT_BITS;//(高3位):00000000000000000000000000000000private static final int SHUTDOWN = 0 << COUNT_BITS;//(高3位):00100000000000000000000000000000private static final int STOP = 1 << COUNT_BITS;//(高3位):01000000000000000000000000000000private static final int TIDYING = 2 << COUNT_BITS;//(高3位):01100000000000000000000000000000private static final int TERMINATED = 3 << COUNT_BITS; //获取线程池运行状态private static int runStateOf(int c) { return c & ~COUNT_MASK; }//获取线程个数private static int workerCountOf(int c) { return c & COUNT_MASK; }//计算ctl新值private static int ctlOf(int rs, int wc) { return rs | wc; }
从上面的源码可以看出 ,比STOP 小的是RUNNING | SHUTDOWN
processWorkerExit方法需要保证:如果pool在 RUNNING | SHUTDOWN 状态下,不能一个线程也没有了,要保证 workQueue 剩余的任务干完
所以,在RUNNING | SHUTDOWN 状态下, 如果有必要,还要添加新线程,
step7: 尝试终止线程池 tryTerminate()
shutdown()的最后也调用了tryTerminated()方法,下面看看这个方法的逻辑:
/** * 在以下情况将线程池变为TERMINATED终止状态 * shutdown 且 正在运行的worker 和 workQueue队列 都empty * stop 且 没有正在运行的worker * * 这个方法必须在任何可能导致线程池终止的情况下被调用,如: * 减少worker数量 * shutdown时从queue中移除任务 * * 这个方法不是私有的,所以允许子类ScheduledThreadPoolExecutor调用 */final void tryTerminate() { //这个for循环主要是和进入关闭线程池操作的CAS判断结合使用的 for (;;) { int c = ctl.get(); /** * 线程池是否需要终止 * 如果以下3中情况任一为true,return,不进行终止 * 1、还在运行状态 * 2、状态是TIDYING、或 TERMINATED,已经终止过了 * 3、SHUTDOWN 且 workQueue不为空 */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; /** * 只有shutdown状态 且 workQueue为空,或者 stop状态能执行到这一步 * 如果此时线程池还有线程(正在运行任务,正在等待任务) * 中断唤醒一个正在等任务的空闲worker * 唤醒后再次判断线程池状态,会return null,进入processWorkerExit()流程 */ if (workerCountOf(c) != 0) { // Eligible to terminate 资格终止 interruptIdleWorkers(ONLY_ONE); //中断workers集合中的空闲任务,参数为true,只中断一个 return; } /** * 如果状态是SHUTDOWN,workQueue也为空了,正在运行的worker也没有了,开始terminated */ final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //CAS:将线程池的ctl变成TIDYING(所有的任务被终止,workCount为0, // 为此状态时将会调用terminated()方法),期间ctl有变化就会失败,会再次for循环 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); //需子类实现 } finally { ctl.set(ctlOf(TERMINATED, 0)); //将线程池的ctl变成TERMINATED termination.signalAll(); //唤醒调用了 等待线程池终止的线程 awaitTermination() } return; } } finally { mainLock.unlock(); } // else retry on failed CAS // 如果上面的CAS判断false,再次循环 }}
tryTerminate() 执行流程:
1、判断线程池是否需要进入终止流程(只有当shutdown状态+workQueue.isEmpty 或 stop状态,才需要)
2、判断线程池中是否还有线程,有则interruptIdleWorkers(ONLY_ONE)尝试中断一个空闲线程
正是这个逻辑可以再次发出中断信号,中断阻塞在获取任务的线程
3、如果状态是SHUTDOWN,workQueue也为空了,正在运行的worker也没有了,开始terminated
会先上锁,将线程池置为tidying状态,之后调用需子类实现的 terminated(),最后线程池置为terminated状态,并唤醒所有等待线程池终止这个Condition的线程
源码分析2:shutdownNow() 粗暴终止线程池的核心流程
/** * 尝试停止所有活动的正在执行的任务,停止等待任务的处理,并返回正在等待被执行的任务列表 * 这个任务列表是从任务队列中排出(删除)的 * * 这个方法不用等到正在执行的任务结束,要等待线程池终止可使用awaitTermination() * * 除了尽力尝试停止运行中的任务,没有任何保证 * 取消任务是通过Thread.interrupt()实现的,所以任何响应中断失败的任务可能永远不会结束 */public List shutdownNow() { List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); //上锁 try { //判断调用者是否有权限shutdown线程池 checkShutdownAccess(); //CAS+循环设置线程池状态为stop advanceRunState(STOP); //中断所有线程,包括正在运行任务的 interruptWorkers(); tasks = drainQueue(); //将workQueue中的元素放入一个List并返回 } finally { mainLock.unlock(); //解锁 } //尝试终止线程池 tryTerminate(); return tasks; //返回workQueue中未执行的任务}
shutdownNow() 和 shutdown()的大体流程相似,差别是:
1、将线程池更新为stop状态
2、调用interruptWorkers()中断所有线程,包括正在运行的线程
3、将workQueue中待处理的任务移到一个List中,并在方法最后返回,说明shutdownNow()后不会再处理workQueue中的任务
interruptWorkers()
private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); }}
interruptWorkers() 很简单,循环对所有worker调用 interruptIfStarted(),其中会判断worker的AQS state是否大于0,即worker是否已经开始运作,再调用Thread.interrupt()
注意:
对于运行中的线程调用Thread.interrupt()并不能保证线程被终止,为啥呢?
task.run()内部执行的是业务代码,如果业务代码里边捕获了InterruptException,没有上抛,导致这里的结束机制失效。
改怎么办呢?其实也无所谓。
当runWorker 执行下一次或者任务之后,里边会进行 线程池状态的双重检查,如果线程池的状态变了,变为结束,那么 工作线程 也会被 中断了。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck (线程池状态的双重检查) in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }}
源码分析3:awaitTermination() 等待线程池终止的核心流程
这个方法,也比较重要,咱们顺便看看源码吧。
awaitTermination() 源码如下
// 参数: timeout:超时时间 unit:timeout超时时间的单位//返回: true:线程池终止 , false:超过timeout指定时间public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { // 线程池状态如果已经结束,立即返回,无需等待 if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; //阻塞 nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); }}
在发出一个shutdown请求后,在以下3种情况发生之前,awaitTermination()都会被阻塞
1、所有任务完成执行
2、到达超时时间
3、当前线程被中断
这里用到一个 锁条件 termination:
/*** Wait condition to support awaitTermination*/private final Condition termination = mainLock.newCondition();
awaitTermination() 循环的判断线程池是否terminated终止 或 是否已经超过超时时间,然后通过termination这个Condition阻塞等待一段时间
termination 阻塞等待过程中发生以下具体情况会解除阻塞(对上面3种情况的解释):
1、如果发生了 termination.signalAll()(内部实现是 LockSupport.unpark())会唤醒阻塞等待,且由于ThreadPoolExecutor只有在 tryTerminated()尝试终止线程池成功,将线程池更新为terminated状态后才会signalAll(),故awaitTermination()再次判断状态会return true退出
2、如果达到了超时时间 termination.awaitNanos() 也会返回,此时nano==0,再次循环判断return false,等待线程池终止失败
3、如果当前线程被 Thread.interrupt(),termination.awaitNanos()会上抛InterruptException,awaitTermination()继续上抛给调用线程,会以异常的形式解除阻塞
故终止线程池并需要知道其是否终止,可以用如下方式:
executorService.shutdown();try{while(!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) { LOGGER.debug("Waiting for terminate");}} catch (InterruptedException e) {//中断处理}
参考文献:
清华大学出版社 《尼恩 Java 高并发核心编程 卷2 加强版》
4000页《尼恩Java面试宝典》中 专题29 多线程 面试专题
推荐阅读:
尼恩N篇硬核架构 文章 ,帮你实现 架构自由:
《吃透8图1模板,人人可以做架构》
《10Wqps评论中台,如何架构?B站是这么做的!!!》
《阿里二面:千万级、亿级数据,如何性能优化? 教科书级 答案来了》
《峰值21WQps、亿级DAU,小游戏《羊了个羊》是怎么架构的?》
《100亿级订单怎么调度,来一个大厂的极品方案》
《2个大厂 100亿级 超大流量 红包 架构方案》
....... 更多架构文章,正在添加中
实现你的响应式 自由:
《响应式圣经:10W字,实现Spring响应式编程自由》
这是老版本 《Flux、Mono、Reactor 实战(史上最全)》
实现你的 spring cloud 自由:
《sentinel (史上最全)》
《Nacos (史上最全)》
《SpringCloud+Dubbo3 = 王炸 !》
《Springcloud gateway 底层原理、核心实战 (史上最全)》
《nacos高可用(图解+秒懂+史上最全)》
《分库分表 Sharding-JDBC 底层原理、核心实战(史上最全)》
《一文搞定:SpringBoot、SLF4j、Log4j、Logback、Netty之间混乱关系(史上最全)》
实现你的 linux 自由:
《Linux命令大全:2W多字,一次实现Linux自由》
实现你的 网络 自由:
《TCP协议详解 (史上最全)》
《网络三张表:ARP表, MAC表, 路由表,实现你的网络自由!!》
实现你的 分布式锁 自由:
《Redis分布式锁(图解 - 秒懂 - 史上最全)》
《Zookeeper 分布式锁 - 图解 - 秒懂》
实现你的 王者组件 自由:
《队列之王: Disruptor 原理、架构、源码 一文穿透》
《缓存之王:Caffeine 源码、架构、原理(史上最全,10W字 超级长文)》
《缓存之王:Caffeine 的使用(史上最全)》
《Java Agent 探针、字节码增强 ByteBuddy(史上最全)》
实现你的 面试题 自由:
《阿里一面:你做过哪些代码优化?来一个人人可以用的极品案例》
《网易二面:CPU狂飙900%,该怎么处理?》
《场景题:假设10W人突访,你的系统如何做到不 雪崩?》
《Nginx面试题(史上最全 + 持续更新)》
《K8S面试题(史上最全 + 持续更新)》
《操作系统面试题(史上最全、持续更新)》
《Docker面试题(史上最全 + 持续更新)》
《clickhouse 超底层原理 + 高可用实操 (史上最全)》
《环形队列、 条带环形队列 Striped-RingBuffer (史上最全)》
《单例模式(史上最全)》
《红黑树( 图解 + 秒懂 + 史上最全)》
《分布式事务 (秒懂)》
《Docker原理(图解+秒懂+史上最全)》
《Zookeeper Curator 事件监听 - 10分钟看懂》
《Netty 粘包 拆包 | 史上最全解读》
《Netty 100万级高并发服务器配置》
《Springcloud 高并发 配置 (一文全懂)》
关键词:
有赞一面:还有任务没执行,线程池被关闭怎么办?
全球快看点丨注解与反射
java基础二-面向对象的三大特性
全球最资讯丨A卡有能力跟RTX 4090正面刚 AMD:不做是因为太贵、功耗高
微头条丨深山红烛 照亮希望
环球新消息丨Redis 深度学习
环球快资讯:前端设计模式——策略模式
新消息丨MySQL学习笔记-SQL实践1
每日快报!安全性不行 Win7系统真的别用了 又一个游戏《堡垒之夜》放弃支持
每日快播:童年回来了!《灌篮高手》发布流川枫角色海报
每日报道:使用web client对 vcenter 进行补丁升级
有监督学习——梯度下降
全球百事通!聪明的燕姿
当前视点!Oracle数据库中没有scott账户的方法
今日热闻!光速破发?iPhone 14黄色版还未开售便降价600元
华硕发布ROG Strix Impact III鼠标:双手通用、可更换微动插槽
真香!重庆购买HUAWEI问界M7直补3万:起售仅25.98万
当前简讯:滴滴快车预付车费什么时候退款_滴滴快车预付车费什么意思
世界通讯!网络安全(中职组)-B模块:Web安全应用-2
今日播报!客服回应疯狂小杨哥带货翻车:有巡查、违规会处罚
怕了吗?长城公布“1000万悬赏计划”:严厉打击网络水军
【世界新要闻】四位计数器testbench的设计
世界滚动:chatgpt 集成飞书实践指南
自以为是 与 思考
天天通讯!伊朗外长:伊沙恢复外交关系将为两国和地区发展注入巨大动能
世界快讯:日系车掀起买一送一热潮:买皓影插混送飞度、买日产楼兰送轩逸
天天热资讯!50万以内最舒适二排 理想L7正式交付:31.98万起售
环球新资讯:10年分红1000多亿!董明珠鼓励员工“砸锅卖铁”买格力股票
剪刀石头布的算法
资讯推荐:全球最强!传音发布260W有线、110无线快充:8分钟充满
世界观热点:美国科学家改变人类科技、终极能源即将实现?真相来了!
世界快讯:《原子之心》更新:添加FOV设置、删除种族主义动画
Shell命令-常用操作2
【质因数分解算法详解】C/Java/Go/Python/JS/Dart/Swift/Rust等不同语言实现
今亮点!霸道女总裁加盟《碟中谍8》
天天播报:00后男生每天下班后卖烤肠解压 50元投资日赚200元:厌恶刷视频、打游戏
【时快讯】网络安全(中职组)-B模块:Web安全渗透测试
环球新消息丨K8S 性能优化-K8S Node 参数调优
环球微速讯:比进口价格便宜!海南三亚榴莲今年6月将大规模上市
当废物挺好?委员称年轻人想躺平更多是调侃 奋斗的是大多数
天天亮点!中国足彩网竞彩11日推荐:曼城取胜无悬念
焦点短讯!使用SSM+Shiro+Layui框架,基于RBAC3模型开发的权限管理系统
snort入侵检测基础概述
当前播报:amusements
环球热议:6月上映!《变形金刚7》角色海报发布:擎天柱、猩猩队长亮相
【速看料】降价潮席卷全国 车企尽数参战是为何?乘联会解答:国六B要来了
观热点:01-C语言概述
环球今热点:真正油电同价!比亚迪投放“深水炸弹”:13.4万买宋Pro DM-i超级混动
环球通讯!立春来首场寒潮横扫我国大部:多地将遭遇滑梯式降温 最高降20℃
【新要闻】活久见!女生家中发现神奇圆柱形手机:登QQ、手电筒、拍照 功能多到炸
读Java性能权威指南(第2版)笔记13_堆内存下
最便宜竖折叠继任者!摩托罗拉Razr 2023真机图出炉:首次拼色后壳
当前快报:汽车价格战新进展:南北大众同日入局 丰田买一辆送一辆
世界速递!day05-功能实现04
Vue————Vue v2.7.14 入口文件【二】
【时快讯】《满江红》中国影史票房榜第6:力压《唐人街探案3》 票房突破45.23亿
环球即时:2023开门红!长四丙成功发射“一箭双星”
环球消息!第一批PCIe 5.0 SSD都是残血!14GB/s满血版还早呢
世界快资讯丨有了ChatGPT 动动嘴就能使唤Excel:我的童年梦想实现了
每日热门:8岁男孩单手打破汉诺塔世界纪录:4.305秒搞定4层
当前头条:海绵宝宝卡通图片线条图_海绵宝宝卡通图片
天生要完美电视剧28集完整版_天生要完美电视剧
对C++做爬虫的代码进行简单分析
世界热推荐:2.HelloSpring
孙海洋夫妇餐饮公司被列经营异常:本人回应
今日报丨香港男子深圳上班每天通勤4小时:月薪3万 每天通勤费用80元
【全球独家】63.C++类型转换
世界今亮点!python可变长参数
当前观察:大获成功!《最后生还者》成史上收视率最高的游戏改编剧
爆款椰子鞋停售后:阿迪在中国凉凉了
1.3kg下颜值、性能、屏幕全给你!华硕灵耀14 2023评测:续航惊人
观热点:长城汽车发布Hi4全新新能源技术:4驱享受 2驱能耗
全球关注:杠上比亚迪秦PLUS DM-i 新款日产轩逸上市:9.98万起
8GB、16GB显存的性能差多少?实测多达172%!
明解数据库------数据库存储演变史
AMD最强核显跑分上来了!但是还打不过GTX 1650 Ti
全球最新:买丰田bZ4X电动车 送一辆威驰轿车?4S店回应:活动属实
RTX 30公版显卡突然集体消失!刚刚降价40%
微头条丨公司规定不接董事长电话1次罚10000元 员工:试岗1天就走了
【全球快播报】校友承诺捐赠1100万元却不兑现被告 学校:他具备履约能力
紧跟微信步伐:支付宝掌纹支付设备外观专利获授权
【天天快播报】搅局中端市场!一加Ace2V评测:将16G满血内存进行到底
通讯!破壁机虚标功率后 疯狂小杨哥带货又翻车:面霜因虚假宣传被罚
《王者荣耀》出海“首战告捷”:登顶巴西免费游戏榜
环球报道:记录--vue3+setup+ts 知识总结
【世界速看料】程序员养发神器:拒绝加班熬夜,告别秃头!
【世界聚看点】【希尔排序ShellSort算法详解】Java/Go/Python/JS/C不同语言实现
环球微头条丨【分享贴】项目中为啥总是项目经理一人干着急?
使用PostgreSQL而不是MySQL存储中型数据有什么好处?
3000块多品牌SSD质量大PK:整体比机械硬盘可靠
玩家购入二手Switch主机:可是被卖家坑惨了
航班晚点1小时 机长提速提前20分钟到达帮助乘客换机?山航回应
每人1600元!北京发放首批“京彩·绿色”消费券:买手机PC都能用
当前热文:涉及121万辆!我国2022年新能源汽车召回量创历史新高:电池、电机缺陷多
环球最资讯丨暴风的恋人百度云_暴风的恋人
有监督学习——线性回归
禁用XXE处理漫谈
腾讯-广点通转化归因
来真的!贾跃亭:3月30日生产FF91 百万豪车来了
【天天新视野】30个汽车品牌降价 成都发放消费券:满40万可减8000元