赞
踩
线程池的工作原理大致分为4步,如下图所示:
【上图解释如下】
首先,当有任务要执行的时候,会计算线程池中存在的线程数量与核心线程数量(corePoolSize)进行比较,如果小于,则在线程池中创建线程,否则,进行下一步判断。
其次,如果不满足上面的条件,则会将任务添加到阻塞队列<>/font(1.6、 阻塞队列)中。等待线程池中的线程空闲下来后,获取队列中的任务进行执行。
然后,如果队列中也塞满了任务,那么会计算线程池中存在的线程数量与最大线程数量(maxnumPoolSize)进行比较,如果小于,则在线程池中创建线程。
最后,如果上面都不满足,则会执行对应的拒绝策略(1.5、拒绝策略)。
如上已经了解到了线程池的基本工作流程,那么如何使用线程池呢?如下图实例了解如何使用线程池,才可以基于它对应的方法进行源码分析。
通过上面的使用例子,我们针对源码的解析就要针对红框的这两点进行深入探索,一个是ThreadPoolExecutor的构造函数,另一个就是execute方法。
首先,我们先要了解一下ThreadPoolExecutor线程池类的继承关系,好对它有一个宏观的认知,如下图所示:
那么针对它的构造函数,其实我们能够发现,线程池的工作原理在构造函数的入参中都是有映射的,如下所示:
【上图解释如下】
corePoolSize: 核心线程数。
maximumPoolSize: 最大线程数。
keepAliveTime: 线程池中线程的最大闲置生命周期。
unit: 针对keepAliveTime的时间单位。
workQueue: 阻塞队列。
threadFactory: 创建线程的线程工厂。
handler: 拒绝策略。
在Executors中,已经给我们提供了很多种线程池的实现。如下图所示:
【上图解释如下】
newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadScheduleExecutor
创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程会代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。与其他等效的 newScheduledThreadPool(1) 不同,可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。
newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
线程池中提供了如下拒绝策略:
【上图解释如下】
AbortPolicy
丢弃任务并抛出RejectedExecutionException异常。
DiscardPolicy
丢弃任务,但是不抛出异常。
DiscardOldestPolicy
丢弃队列中最前面的任务,然后重新尝试执行任务。
CallerRunsPolicy
由调用线程处理该任务。
线程池中提供了如下任务队列:
【上图解释如下】
ArrayBlockingQueue
它是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。一但初始化,大小就无法修改
LinkedBlockingQueue
它内部以一个链式结构(链接节点)对其元素进行存储。可以指定元素上限,否则,上限则为Integer.MAX_VALUE。
DelayQueue
它对元素进行持有直到一个特定的延迟到期。注意:进入其中的元素必须实现Delayed接口。
PriorityBlockingQueue
它是一个无界的并发队列。无法向这个队列中插入null值。所有插入到这个队列中的元素必须实现Comparable接口。因此该队列中元素的排序就取决于你自己的Comparable实现。
SynchronousQueue
它是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一个元素的话,那么试图向队列中插入一个新元素的线程将会阻塞,直到另一个新线程将该元素从队列中抽走。同样的,如果队列为空,试图向队列中抽取一个元素的线程将会被阻塞,直到另一个线程向队列中插入了一条新的元素。因此,它其实不太像是一个队列,而更像是一个汇合点。
execute的整体流程如下所示:
【上图解释如下】
在上面的流程图中,我们看到三块绿色的addWorker方法,和两块红色的reject方法,这两个方法是我们解析的重点。
当我们看execute方法的时候,首先看到的就是ctl。
那么ctl到底是什么东西呢?下面截图源码和注释就是ctl所包含的方法
针对ctl的值,其实是两部分组成的:【高3位】表示:线程池状态 + 【低29位】表示:线程池中线程数量,如下图所示:
【上图解释如下】
上图中的runState:-1、0、1、2、3,其实是针对于红色3位来计算的,其实应该是32位来计算,这么写是为了方便大家直观感受到这5种状态值的大小。
其实如果按照32位来计算的哈,从小到大状态的排序依然是:RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED,后面对于多种状态的判断也是通过大小来判断的。
CAPACITY=(1<<COUNT_BITS)-1的计算方式如下所示:
针对ctl.get()获得的int值,其中有3个重要方法,如下所示:
int runStateOf(int c)
获取运行状态
RUNNING/SHUTDOWN/STOP/TIDYING/TERMINATED。
int workerCountOf(int c)
取出低位29位的值,表示获得当前活动的线程数。
int ctlOf(int rs, int wc)
计算ctl的值,ctl=[3位]线程池状态 + [29位]线程池中线程数量。
源码部分如下所示:
【上图解释如下】
其中的workerCountOf©用来获得当前线程池中的线程数,
如果小于核心线程数,则直接调用addWorker方法来向线程池中创建线程。
如果添加成功,则直接return返回。
如果添加失败了,则重新通过ctl.get()获取最新的ctl值。用于下面逻辑的判断。
addWorkder的流程图如下所示:
【上图解释如下】
关于流程图中的绿色方块,我们会在后面的章节中展开介绍。
addWorkder的整个流程,其实可以分为两部分:
(第一部分:上图中红色框部分):试图将workerCount+1。
(第二部分:上图中蓝色框部分):workerCount成功+1后,创建Worker(也是一个Runnable),加入集合workers中,并启动Worker线程。
我们先看一下第一部分的源码和注释:
在第一部分中,首先还是从类型为AtomicInteger的ctl中获得最新值,然后调用runStateOf来获得当前线程池的运行状态。
下面的if判断,这块我们其实需要以相反的判断去思考——也就是,如果if中的判断是false,那么就不用return false而可以继续往下执行了。那么,什么情况下,程序可以继续往下执行呢?
if (rs >= SHUTDOWN && !(rs == SHUTDOWN
&& firstTask == null && !workQueue.isEmpty()))
(1)、case1:如果线程池的状态时运行中(RUNNING),就可以继续往下执行了。
(2)、case2:如果线程池的状态时关闭(SHUTDOWN)并且firstTask=null并且阻塞队列workQueue中存在没有执行完毕的任务,那么就可以继续往下执行了。
(3)、也就是说,只有这两个情况,才不会直接返回false。还有继续往下执行的“机会”。
代码在往下,就是一个无限的循环了,在循环里我们发现,如果线程池中的线程数等于或者超过了最大线程数量(CAPACITY),或者已经等于或者超越了核心线程数(corePoolSize)/最大线程数(maximumPoolSize),那么就会直接返回false,没有“机会”继续下去了。而具体是对比corePoolSize还是maximumPoolSize,是根据我们addWorker方法第二个入参boolean core来决定的。
跳过这层阻碍,我们就要真正的去给当前的工作线程数加1了。这次依然采用的是CAS的方式去加1。如果加一成功,则Part1执行完毕,跳出循环,开始Part2的旅程。如果加一不成功,说明与其他线程操作冲突了,那么会重新获取最新的ctl值,再次循环执行上面的步骤。
我们先看一下第二部分的源码和注释:
在第二部分的逻辑中,我们就真正的开始了线程池的操作了。这部分才是主要部分。我们来往下看。
首先,我们new了一个Worker。Worker是什么东西呢?顾名思义,它是一个工人,一个线程池中负责给我们工作的工人,我们来看一下它的具体实现把:
【上图解释如下】
Worker是ThreadPoolExecutor的一个内部类,它虽然看着小,但是“五脏俱全”。
首先:变量thread,它就是我们线程池中运行的那个线程。它被包含在了Worker中。
其次:getThreadFactory()方法,其实就是我们线程池构造方法中的那个入参ThreadFactory,它就是用来创建线程用的。
之后:非常关键的一个方法,runWorker,线程池中关于线程复用的精华都在这个方法里了。
其中,runWorker方法我们暂时先不深入进去,后面会介绍,我们还是回到第二部分,先一层一层的了解代码的实现逻辑,以免调用代码调用的“深渊”中。
我们继续第二部分,创建了Worker后,从Worker中获取线程t,如果线程t不为空的话,我们就去尝试加锁了,如果可以成功的获得锁,就可以向线程池(其实就是HashSet workers)中添加线程了,不过在这之前,还需要任意的满足以下两个条件:
(1)、case1:线程池状态为RUNNING。
(2)、case2:线程池状态为SHUTDOWN并且firstTask为null。(什么是firstTask为null?其实就是单纯的创建线程,而不用附带执行firstTask的任务)
这一切执行完毕之后,在finally中调用了mainLock.unlock(),解开了锁。并且执行了t.start(),由于Worker是Runnable,所以,调用的其实就是Worker的run方法,而Worker的run方法里,调用了非常重要的那个方法——runWorker(this);
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。