赞
踩
// 将线程池状态与线程个数合二为一存储在一个原子变量 ctl 中,目的是只用一次 cas 原子操作就可以进行赋值更新两个信息 // 初始是 RUNNING 状态,线程数为 0 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 根据线程池状态和工作线程数量进行或运算,得到ctl的值 // rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,通过或运算合并它们,存放在 ctl 中 private static int ctlOf(int rs, int wc) { return rs | wc; } // Integer.SIZE=32表示Integer的长度,32-3=29,低 29 位代表线程个数 private static final int COUNT_BITS = Integer.SIZE - 3; // 1 << COUNT_BITS 表示 1左移29位,即 00100000 00000000 00000000 00000000 // (1 << COUNT_BITS) - 1 即 00100000 00000000 00000000 00000000 - 1 = 00011111 11111111 11111111 11111111 (十进制是2^29 - 1) // 线程池允许最大线程个数 2^29 - 1(即低 29 位全为 1) private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING // 线程池的5种状态需要3位二进制才能表示 // -1 的二进制是 11111111 11111111 11111111 11111111 左移29位后为 11100000 00000000 00000000 00000000 即高3位为111 private static final int RUNNING = -1 << COUNT_BITS; // 高3位:111,接受新任务并处理排队任务 private static final int SHUTDOWN = 0 << COUNT_BITS; // 高3位:000,不接受新任务,但处理排队任务 private static final int STOP = 1 << COUNT_BITS; // 高3位:001,不接受新任务,不处理排队任务,并中断正在进行的任务 private static final int TIDYING = 2 << COUNT_BITS; // 高3位:010,所有任务都已终止,workerCount 为零,转换到状态 TIDYING 的线程将运行 terminate() 钩子方法,terminate()是个空方法,需要自己实现逻辑 private static final int TERMINATED = 3 << COUNT_BITS; // 高3位:011,terminate() 已执行完 // 计算线程池的运行状态,c & ~CAPACITY 的结果是 c的高3位 private static int runStateOf(int c) { return c & ~CAPACITY; } // 计算工作线程的数量,c & CAPACITY 的结果是 c的低29位 private static int workerCountOf(int c) { return c & CAPACITY; }
execute方法描述:
public void execute(Runnable command) { // 非空判断(程序健壮性判断) if (command == null) throw new NullPointerException(); // 获取 ctl 属性 int c = ctl.get(); // 1.判断工作线程数 是否小于 核心线程数 if (workerCountOf(c) < corePoolSize) { // 小于,则通过addWorker方法创建核心线程,true代表核心线程 if (addWorker(command, true)) // t.start();线程被启动了,才算addWorker成功 return; // 再重新获取ctl属性值,因为如果有多个线程同时执行addWorker,只有一个线程会成功,并且会改变ctl属性值(例如:工作线程数加1), // 所以其他失败的线程需要重新获取ctl属性值 c = ctl.get(); } // private final BlockingQueue<Runnable> workQueue; // 2.工作线程数超过核心线程数,线程池处于RUNNING状态时,尝试将任务加入队列 // 创建线程失败可能造成线程池停止,所以需要再次判断线程池运行状态 if (isRunning(c) && workQueue.offer(command)) { // 添加任务到队列成功后,再次获取最新ctl属性值 int recheck = ctl.get(); // 加入队列后再次检查线程池状态(防止任务加入队列后,线程池状态改变了) // 线程池不是RUNNING状态时,需要移除已加入队列的任务,因为加入了也执行不了 if (! isRunning(recheck) && remove(command)) // 如果线程池不是RUNNING状态,并且移除任务成功,则执行拒绝策略 reject(command); // 判断工作线程数是否为0,如果为0,则需要创建非核心线程将队列中任务执行完 // 例如:上一步已经判断了不是RUNNING状态,并且移除队列中任务失败,此时可能是SHUTDOWN状态, // SHUTDOWN要变为TIDYING时,需要满足工作线程为0、工作队列为空的条件 // 所以执行到这一步,如果工作线程数为0后,则需要创建非核心线程将队列中的任务执行完 else if (workerCountOf(recheck) == 0) // 如果之前的工作线程已被销毁完,则传入一个空任务null, // 因为进入该判断之前已经将任务加入队列 workQueue.offer(command), // 只需进入addWorker方法中启动线程 t.start, // 然后进入runWorker方法执行 while (task != null || (task = getTask()) != null) { task.run(); } // 当task=null时,执行task = getTask(),从而可以到队列中获取任务并执行 addWorker(null, false); } // 3.如果队列满了,则创建非核心线程,false代表非核心线程 else if (!addWorker(command, false)) // 4.核心线程数 + 非核心线程数 超过了最大线程数,则执行拒绝策略 reject(command); }
小结:
execute 方法中调用了 3 次 addWorker 方法
addWorker 方法描述:
private boolean addWorker(Runnable firstTask, boolean core) { // 一、 整个for循环的作用是:外层for循环判断线程状态,内层for循环判断工作线程数量 // 跳出循环的标记 retry: for (;;) { // 通过ctl值获取线程池状态 int c = ctl.get(); int rs = runStateOf(c); // 如果线程池不是RUNNING状态,则需要做后续判断,当前任务是否可以不处理 if (rs >= SHUTDOWN && // 线程池状态为SHUTDOWN、任务为空、工作队列不为空 // 如果同时满足这3个条件,表示当前工作队列中的任务需要处理,取反表示不返回false,而是继续往下执行 // 此处与execute方法中的addWorker(null, false);相呼应 ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) ) // 以下情况将不创建线程处理任务: // (1) SHUTDOWN状态,firstTask不为null (此处验证了SHUTDOWN状态时,不再接受新任务) // (2) STOP状态同理 // (3) 工作队列为空 return false; for (;;) { // 通过ctl值获取线程数量 int wc = workerCountOf(c); // 分情况判断,core = true为核心线程,core = false为非核心线程 // 工作线程数超过了线程容量,则不能创建新的工作线程 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 通过CAS对ctl中的工作线程数量+1 if (compareAndIncrementWorkerCount(c)) // 如果ctl中的工作线程数量+1成功,退出最外层for循环,进入下面启动线程流程 break retry; // 并发时,让失败的线程重新获取ctl值 c = ctl.get(); // 如果ctl中的工作线程数量+1 失败,并且线程池状态 改变,需要重新开始 外层for循环(重新获取线程池状态) if (runStateOf(c) != rs) continue retry; // 如果工作线程数量+1 失败,并且线程池状态 没变,需要重新开始 内层for循环(重新获取线程数量) } } // 二、以下是启动工作线程的流程 boolean workerStarted = false; // 工作线程是否启动的标志 boolean workerAdded = false; // 工作线程是否添加的标志 Worker w = null; // Worker 就是工作线程 try { // 1.如果ctl中的工作线程数量+1成功,则将提交的任务封装为Worker对象,Worker对象中还包含了一个 Thread线程 属性 w = new Worker(firstTask); // new Worker时,会在Worker的有参构造中通过线程工厂实例化Thread(this.thread = getThreadFactory().newThread(this);) final Thread t = w.thread; if (t != null) {// 非空判断,程序健壮性判断 // 加锁执行,该对象锁mainLock,是线程池ThreadPoolExecutor类已经声明好的(private final ReentrantLock mainLock = new ReentrantLock();) final ReentrantLock mainLock = this.mainLock; // 创建新线程加锁的原因是:防止在创建线程的过程中再次执行shutdown、shutdownNow方法,因为它们加的都是同一把锁(一次只有一个操作能成功) mainLock.lock(); try { // 再次重新获取线程池状态,因为并发时可能有其他线程修改状态 int rs = runStateOf(ctl.get()); // 线程池处于RUNNING状态 或者 线程池关闭并且任务为空,则可以执行启动线程操作 if (rs < SHUTDOWN || // 此处与execute方法中的addWorker(null, false);相呼应 (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable // 预先检测是否已启动线程t,在启动线程t之前已存在t,则报异常 throw new IllegalThreadStateException(); // private final HashSet<Worker> workers = new HashSet<Worker>(); // 线程池中所有工作线程的集合。仅在持有 mainLock 时访问。 // 2.将工作线程加入集合 workers.add(w); int s = workers.size(); // 如果当前的工作线程数 大于 历史最大的工作线程数,则更新历史最大工作线程数 if (s > largestPoolSize) // private int largestPoolSize;实时记录工作线程最大数量。只能在 mainLock 下访问。 // 例如:第一次为1,则 largestPoolSize = 1;第二次为2,则 largestPoolSize = 2;第三次为1,则 largestPoolSize 还是为 2; largestPoolSize = s; // 工作线程添加成功的标志 workerAdded = true; } } finally { // 释放锁 mainLock.unlock(); } if (workerAdded) { // 3.Worker内的线程被启动了,才算addWorker成功 // Worker内的线程被启动后,会执行Worker内的run方法,run方法又调用了runWorker方法,public void run() { runWorker(this); } // runWorker是工作线程执行任务的核心方法 t.start(); // 线程启动成功的标志 workerStarted = true; } } } finally { if (! workerStarted) // 线程启动失败,需要移除workers集合中刚加的Worker工作线程、让ctl上对应的工作线程数自减、尝试修改线程状态为TIYDING状态 addWorkerFailed(w); } return workerStarted; } // 线程启动失败后的回滚操作 private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; // 加锁 mainLock.lock(); try { if (w != null) // 将刚加入到集合中的线程移除 workers.remove(w); // 将刚增加的工作线程数减去 decrementWorkerCount(); // 线程启动失败的原因可能是线程状态变了,当线程状态变为SHUTDOWN、STOP状态时,再尝试将线程状态转为TIDING状态 tryTerminate(); } finally { // 释放锁 mainLock.unlock(); } }
描述:
private final class Worker extends AbstractQueuedSynchronizer // 继承AQS,主要处理线程中断相关操作 implements Runnable // 实现Runnable,主要存储需要执行的任务 { private static final long serialVersionUID = 6138294804551838833L; // 执行工作的线程对象 final Thread thread; // Worker要执行的第一个任务 Runnable firstTask; // Per-thread task counter volatile long completedTasks; // 执行Worker的有参构造创建Worker Worker(Runnable firstTask) { // 为了在线程真正开始运行任务之前抑制中断,所以将锁定状态初始化为负值,并在启动时将其清除(在 runWorker 中) setState(-1); // 第一次new Worker时,会将任务赋值给firstTask this.firstTask = firstTask; // 使用工厂对象创建线程, 并把worker本身传入 this.thread = getThreadFactory().newThread(this); } // 调用t.start()时,会执行run方法,进一步执行runWorker方法 public void run() { // Worker的核心工作方法,需要传入Worker对象 runWorker(this); } // 以下是通过AQS实现了不可重入锁,1表示加锁,0表示解锁 // 中断线程不是立即让线程停止,只是将Thread的中断标记设置为true protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 将state置为0,在runWorker中,会调用Worker.unlock(); -> release(1); -> tryRelease(1),最终调到这里,表示当前线程允许被中断 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
runWorker方法描述:
final void runWorker(Worker w) { // 获取当前线程 Thread wt = Thread.currentThread(); // 获取工作线程的任务 Runnable task = w.firstTask; // 将工作线程的任务置空 w.firstTask = null; // 先将工作线程解锁 // 会调用w.unlock(); -> release(1); -> tryRelease(1),最终将state置为0,表示当前线程允许被中断 w.unlock(); // allow interrupts // 执行任务时,beforeExecute(wt, task); 或 afterExecute(task, thrown); 钩子函数中是否出现异常 boolean completedAbruptly = true; try { // 获取任务的第一种方式:执行execute、submit时,传入的任务直接被处理 // 获取任务的第二种方式:从工作队列中获取任务执行,与execute中的addWorker(null, false);相呼应 // getTask()不断从队列中获取任务,通过task.run();执行任务,从而使线程复用 // 提交任务优先级:核心线程 > 任务队列 > 非核心线程 // 执行任务优先级:核心线程 > 非核心线程 > 任务队列,因为首先判断的是task != null,task包含核心线程和非核心线程中的任务 while (task != null || (task = getTask()) != null) { // 获取到任务后,加锁,这里与addWorker中的mainLock.lock();不同 // Worker内部的锁是通过AQS实现的不可重入锁,w.lock(); -> acquire(1); -> tryAcquire(1), // 最终将state置为1,表示在SHUTDOWN状态下,当前线程不允许被中断 w.lock(); // 如果线程池状态变为STOP状态,则必须中断当前线程 // 1.第一个判断runStateAtLeast(ctl.get(), STOP)表示线程池状态>=STOP,即线程池不再接收新任务 // 2.第二个判断Thread.interrupted() 查看当前线程是否被中断。无论是否中断都会清除线程的中断状态,即中断标记置为false // (1) 如果Thread.interrupted()返回false,表示没被中断,返回false后,中断标记置为false(返回值和设置中断标记为false是2个步骤) // (2) 如果Thread.interrupted()返回true,再次执行runStateAtLeast(ctl.get(), STOP))判断线程池是否为STOP状态(虽然刚判断过,再次判断是因为并发场景下可能每时每刻的值都不同),返回true后,中断标记置为false(返回值和设置中断标记为false是2个步骤) if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) // (1) 如果是STOP状态,再查询当前线程中断标记是否为false,是false,则执行wt.interrupt(); && !wt.isInterrupted()) // 将中断标记置为true wt.interrupt(); try { // 这是钩子函数,里面是空实现,需要自己实现 beforeExecute(wt, task); Throwable thrown = null; try { // task是封装到Worker对象中的Runnable任务 task.run(); } catch (RuntimeException x) { // 抛出异常后,会依次执行下面3个finally thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 这是钩子函数,里面是空实现,需要自己实现 afterExecute(task, thrown); } } finally { // 任务执行完后,再将task重置为null task = null; // 任务执行成功的个数+1 w.completedTasks++; // 将state设置为0 w.unlock(); } } // 上面抛异常后,不会执行这里 completedAbruptly = false; } finally { // 上面抛异常后,都会进入该方法,此时completedAbruptly = true,将会从workers集合中移除当前工作线程,重新创建线程继续执行下一个任务 // while (task != null || (task = getTask()) != null)不成立时,也会执行该方法,此时completedAbruptly = false // (即getTask()返回null时,可能是非核心线程超时未获取到任务) processWorkerExit(w, completedAbruptly); } }
getTask方法描述:
// 从工作队列中获取任务 private Runnable getTask() { // 是否需要通过队列中的poll方法获取任务,poll方法可以设置超时时间,超时后将清除工作线程 boolean timedOut = false; // Did the last poll() time out? for (;;) { // =====================一、判断线程池状态========================= // 通过ctl值获取线程池状态 int c = ctl.get(); int rs = runStateOf(c); // 1.rs >= SHUTDOWN判断线程池状态是否为SHUTDOWN、STOP // 2.进一步判断rs >= STOP是否为STOP,STOP状态表示需要从线程集合中移除当前线程(因为STOP状态不会接受新任务和不处理工作队列中的任务) // 3.如果不是STOP而是SHUTDOWN,进一步判断workQueue.isEmpty()工作队列是否为空,若为空,此时也需要从线程集合中移除当前线程 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 通过CAS让ctl中的工作线程数自减 decrementWorkerCount(); // 返回到while (task != null || (task = getTask()) != null)这一步,不满足该条件,退出while循环 // 执行processWorkerExit(w, completedAbruptly);将工作线程从集合中移除 return null; } // =====================二、判断工作线程数量========================= int wc = workerCountOf(c); // 1.private volatile boolean allowCoreThreadTimeOut;此属性默认为false,表示核心线程即使在空闲时也保持活动状态。如果为true,表示核心线程使用 keepAliveTime 来超时等待工作 // 2.wc > corePoolSize;判断工作线程数是否大于核心线程数,若大于,说明当前线程是非核心线程,则要进一步判断当前非核心线程是否超时,若超时,则需移除当前非核心线程 // (注意:一个线程是否属于核心或非核心线程,是通过当下的工作线程数与核心线程数比较后判定的,所以某个线程t有时是核心线程,有时是非核心线程,而不是说只要第一次是核心线程以后就一直是) boolean timed = (allowCoreThreadTimeOut || wc > corePoolSize); // 3.wc > maximumPoolSize再判断工作线程数是否大于最大线程容量CAPACITY(一般不会超过) // 4.timed为true的情况,一般是满足wc > corePoolSize(因为allowCoreThreadTimeOut 默认为false),工作线程数大于核心线程数,timedOut如果为true,表示当前线程等待超时还没获取到任务,所以需要执行下一步尝试移除当前线程 if ((wc > maximumPoolSize || (timed && timedOut)) // 5.工作线程数超过1,或者工作队列为空,则可以移除当前线程 // (1)第一种情况:工作线程数超过1,是为了保证工作队列不为空时,移除当前线程后,还有剩余线程可以处理工作队列中的任务 // (2)第二种情况:工作线程数没超过1,但是工作队列为空,这时移除当前线程也没问题 && (wc > 1 || workQueue.isEmpty())) { // CAS将工作线程数-1 if (compareAndDecrementWorkerCount(c)) // 返回到while (task != null || (task = getTask()) != null)这一步,不满足该条件,退出while循环 // 执行processWorkerExit(w, completedAbruptly);将工作线程从集合中移除 return null; // CAS失败则continue继续进入for循环重试 continue; } // =====================三、从工作队列中获取任务========================= // 能执行到这里,说明不满足上面2个if条件,可以正常从队列中获取任务 try { // timed为true的情况,一般是满足wc > corePoolSize(因为allowCoreThreadTimeOut 默认为false),表示工作线程数大于核心线程数,说明核心线程全在工作,此时的非核心线程通过poll带超时的方法获取任务,超过存活时间keepAliveTime,则继续进入for循环,进入上面的if条件中去移除当前线程 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // timed为false,表示当前是核心线程,需要通过阻塞式take从队列获取任务,获取不到任务会在这里阻塞住,不再往下执行,除非有异常 workQueue.take(); if (r != null) // 能获取到任务,则将任务返回,去runWorker中执行 return r; // 只有执行poll时,超时获取不到任务,会执行到这里,将超时标记 timedOut 置为 true, // 再次自旋时,进入(timed && timedOut)判断,满足条件则移除当前线程 timedOut = true; } catch (InterruptedException retry) { // 如果workQueue.take();再阻塞时被打断,则设置 超时标记 timedOut 置为 false,再次自旋重试 timedOut = false; } } }
processWorkerExit 方法描述:
// 移除当前工作线程 private void processWorkerExit(Worker w, boolean completedAbruptly) { // completedAbruptly为true,一般是beforeExecute(wt, task) 或 afterExecute(task, null)钩子函数抛出了异常 if (completedAbruptly) // 将ctl中的工作线程数自减 decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; // 加可重入锁 mainLock.lock(); try { // 将当前线程已完成的任务数同步到线程池中记录的完成任务数 completedTaskCount += w.completedTasks; // 有异常 或者 getTask() = null(即可能是非核心线程超时未获取到任务),都会执行到这里,从workers集合中移除该工作线程 workers.remove(w); } finally { mainLock.unlock(); } // 尝试终止线程池 // 主要是判断线程池是否满足终止的状态 // 如果满足终止的状态,但线程池还有线程时,需要尝试对其发出中断响应,使其能进入退出流程 // 如果满足终止的状态,但线程池没有线程时,更新状态为TIDYING -> TERMINATED tryTerminate(); // 重新获取ctl int c = ctl.get(); // 线程状态值 < STOP,表示此时线程状态为RUNNING 或 SHUTDOWN,即 上面 尝试终止线程池 失败 if (runStateLessThan(c, STOP)) { // completedAbruptly为false,表示是正常状态下移除了当前线程 if (!completedAbruptly) { // min表示最小核心线程数,若allowCoreThreadTimeOut为true表示设置了允许核心线程数超时,则最小核心线程数为0,否则就是corePoolSize值 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果核心线程数最小为0,并且工作队列不为空,则重新设置核心线程数最小为1,保证队列中任务有线程处理 if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; } // 1.第一种情况:completedAbruptly为true,表示是异常状态下移除了当前线程,需要再创建一个非核心线程 // 2.第二种情况:队列不为空,并且没有工作线程,需要再创建一个非核心线程 addWorker(null, false); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。