当前位置:   article > 正文

2.深入理解juc-Fixed线程池_线程池 fix

线程池 fix

首先看一个线程池的简单例子:

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. import java.util.concurrent.TimeUnit;
  4. public class FixThreadPoolTest {
  5. public static void main(String[] args) {
  6. ExecutorService pool = Executors.newFixedThreadPool(6);
  7. for (int i = 0; i < 50; i++) {
  8. pool.execute(new Runnable() {
  9. @Override
  10. public void run() {
  11. try {
  12. TimeUnit.SECONDS.sleep(1);
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. System.out.println("hello world! Execute ThreadName=" + Thread.currentThread().getName());
  17. }
  18. });
  19. }
  20. pool.shutdown();
  21. }
  22. }

本文将从源码角度分析线程池的创建,线程的提交,线程池的执行策略,线程池阻塞队列,线程池参数含义,最后线程池的关闭角度来分析。

首先看第一点,分析ExecutorService,Executors源码

 

  1. public interface ExecutorService extends Executor {
  2. void shutdown();
  3. List<Runnable> shutdownNow();
  4. boolean isShutdown();
  5. boolean isTerminated();
  6. boolean awaitTermination(long timeout, TimeUnit unit)
  7. throws InterruptedException;
  8. <T> Future<T> submit(Callable<T> task);
  9. <T> Future<T> submit(Runnable task, T result);
  10. Future<?> submit(Runnable task);
  11. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  12. throws InterruptedException;
  13. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  14. long timeout, TimeUnit unit)
  15. throws InterruptedException;
  16. <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  17. throws InterruptedException, ExecutionException;
  18. <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  19. long timeout, TimeUnit unit)
  20. throws InterruptedException, ExecutionException, TimeoutException;
  21. }

先来看ExecutorService接口的定义:ExecutorService 是个接口,并且继承了Executor接口,Executor中只定义了一个execute方法。该方法接收一个线程作为参数,显然是用来执行一个提交上来的线程。

对于ExecutorService接口,它主要定义了终止线程的方法和异步追踪每个线程回调结果的能力。并且它是可以被关闭的,当他被关闭的时候,会拒绝新的任务的提交。ExecutorService线程池的关闭有两种方法:shutdown和shutdownnow,前者被调用后,会拒绝新的任务提交,但是会等待队列里的线程和线程池中的线程执行完毕。后者会阻止新任务的启动,并且会尝试终止线程池中正在运行的任务。

我们基本可以认为ExecutorService就是一个线程池的父接口。该接口定义了线程池的一些必备比较高层次抽象的方法。

 

再来看第二点,来分析一下线程池的核心参数

对于线程池的具体创建,这里定义了一个固定线程数量为6的线程池.创建过程调用了newFixedThreadPool方法:

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

该方法调用了ThreadPoolExecutor这个构造方法。

 

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue) {
  6. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  7. Executors.defaultThreadFactory(), defaultHandler);
  8. }

接着调用了ThreadPoolExecutor的重载方法:

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. if (corePoolSize < 0 ||
  9. maximumPoolSize <= 0 ||
  10. maximumPoolSize < corePoolSize ||
  11. keepAliveTime < 0)
  12. throw new IllegalArgumentException();
  13. if (workQueue == null || threadFactory == null || handler == null)
  14. throw new NullPointerException();
  15. this.acc = System.getSecurityManager() == null ?
  16. null :
  17. AccessController.getContext();
  18. this.corePoolSize = corePoolSize;
  19. this.maximumPoolSize = maximumPoolSize;
  20. this.workQueue = workQueue;
  21. this.keepAliveTime = unit.toNanos(keepAliveTime);
  22. this.threadFactory = threadFactory;
  23. this.handler = handler;
  24. }

这样看来,最后这个线程池就是指的是这个ThreadPoolExecutor类了。再来看看这个类的定义:

public class ThreadPoolExecutor extends AbstractExecutorService

发现他继承自一个抽象的AbstractExecutorService,再来看AbstractExecutorService

public abstract class AbstractExecutorService implements ExecutorService

AbstractExecutorService 是实现了ExecutorService接口。而这个接口正式我们代码里写的那个接口:ExecutorService pool = Executors.newFixedThreadPool(6); 那么这段代码就清晰了:

返回的这个pool 是创建了ExecutorService 的子子类,赋给了这个ExecutorService 接口,通过对ExecutorService 接口的分析,ThreadPoolExecutor就是我们创建的那个线程池了,下面就对该线程池的具体实现做详细分析。

我们还是用上面的ThreadPoolExecutor的重载方法来分析,里面有这么几个参数:

int corePoolSize, 核心线程数量

int maximumPoolSize, 最大线程数量

long keepAliveTime, 核心线程外的那个线程池存活时间

TimeUnit unit, keepAliveTime的时间单位

BlockingQueue<Runnable> workQueue, 线程池中的那个queue

ThreadFactory threadFactory, 线程工厂,用来创建线程池中常驻的worker线程

RejectedExecutionHandler handler 拒绝策略

实际上,从参数中我们可以看出线程池的构成,可以用一张图表示:

 

 

 

下面来看第三点:线程的提交。

再次来看我们提交过程提交了哪些参数:

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads=6, nThreads=6,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

第一个参数:核心线程nThreads为6,

第二个参数:最大线程数nThreads为6,

第三个参数:非core线程存活时间为0,

第四个参数时间单位为秒,

第五个参数:线程池的queue为LinkedBlockingQueue的一个线程池,对于LinkedBlockingQueue,这里使用的是默认的构造函数。

  1. public LinkedBlockingQueue() {
  2. this(Integer.MAX_VALUE);
  3. }

传入的是int的最大值,重载方法参数是初始化该queue的最大值:

  1. public LinkedBlockingQueue(int capacity) {
  2. if (capacity <= 0) throw new IllegalArgumentException();
  3. this.capacity = capacity;
  4. last = head = new Node<E>(null);
  5. }

所以该queue是个初始化大小是Integer.MAX_VALUE的队列,可以认为是个无界队列。

那么这个fix线程池的构成就变成了这样:

上面new ThreadPoolExecutor构造函数调用的重载方法:

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue) {
  6. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  7. Executors.defaultThreadFactory(), defaultHandler);
  8. }

再贴一次上面的代码:

 

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. if (corePoolSize < 0 ||
  9. maximumPoolSize <= 0 ||
  10. maximumPoolSize < corePoolSize ||
  11. keepAliveTime < 0)
  12. throw new IllegalArgumentException();
  13. if (workQueue == null || threadFactory == null || handler == null)
  14. throw new NullPointerException();
  15. this.acc = System.getSecurityManager() == null ?
  16. null :
  17. AccessController.getContext();
  18. this.corePoolSize = corePoolSize;
  19. this.maximumPoolSize = maximumPoolSize;
  20. this.workQueue = workQueue;
  21. this.keepAliveTime = unit.toNanos(keepAliveTime);
  22. this.threadFactory = threadFactory;
  23. this.handler = handler;
  24. }

除了上面分析的五个参数外,这里还多了两个参数,一个是ThreadFactory用的是Executors.defaultThreadFactory(),另一个拒绝策略用的是defaultHandler,拒绝策略的作用是当线程池满了并且queue也满了的时候,再来一个线程提交上来,线程池该怎么处理?这里拒绝策略需要传入一个实现了RejectedExecutionHandler接口的对象 handler,实现该接口需要实现它的rejectedExecution方法,该方法用来实现拒绝策略的具体执行,先来看这个默认拒绝策略:

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

他是一个AbortPolicy,从它的rejectedExecution方法可以看出,这个拒绝策略就是直接抛出RejectedExecutionException异常。

  1. /**
  2. * A handler for rejected tasks that throws a
  3. * {@code RejectedExecutionException}.
  4. */
  5. public static class AbortPolicy implements RejectedExecutionHandler {
  6. /**
  7. * Creates an {@code AbortPolicy}.
  8. */
  9. public AbortPolicy() { }
  10. /**
  11. * Always throws RejectedExecutionException.
  12. *
  13. * @param r the runnable task requested to be executed
  14. * @param e the executor attempting to execute this task
  15. * @throws RejectedExecutionException always
  16. */
  17. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  18. throw new RejectedExecutionException("Task " + r.toString() +
  19. " rejected from " +
  20. e.toString());
  21. }
  22. }

 

再来看这个defaultThreadFactory是什么东西:

  1. static class DefaultThreadFactory implements ThreadFactory {
  2. private static final AtomicInteger poolNumber = new AtomicInteger(1);
  3. private final ThreadGroup group;
  4. private final AtomicInteger threadNumber = new AtomicInteger(1);
  5. private final String namePrefix;
  6. DefaultThreadFactory() {
  7. SecurityManager s = System.getSecurityManager();
  8. group = (s != null) ? s.getThreadGroup() :
  9. Thread.currentThread().getThreadGroup();
  10. namePrefix = "pool-" +
  11. poolNumber.getAndIncrement() +
  12. "-thread-";
  13. }
  14. public Thread newThread(Runnable r) {
  15. Thread t = new Thread(group, r,
  16. namePrefix + threadNumber.getAndIncrement(),
  17. 0);
  18. if (t.isDaemon())
  19. t.setDaemon(false);
  20. if (t.getPriority() != Thread.NORM_PRIORITY)
  21. t.setPriority(Thread.NORM_PRIORITY);
  22. return t;
  23. }
  24. }

上面的构造器要求传入的是ThreadFactory接口的实现类,而该类继承自ThreadFactory,那么显然有用的方法就是这个ThreadFactory中定义的newThread,这是生产一个线程,也就是加入到corepool中的线程都要经过该工厂封装处理一下,具体用法后面会讲到。这样这几个参数都清晰了。那么整个fix线程池的构成是这样:

队列:LinkedBlockQueue,相当于无界队列

核心线程池:6个线程

拒绝策略:AbortPolicy

ThreadFactory:DefaultThreadFactory

最大线程数 = 核心线程数

分析到这里,我们的fix线程池就构建完毕了,这里需要注意的是,当前线程池中并没有具体的线程在执行,只是初始化了一个大小为6的池子。

我们来到第三步,线程的提交:

pool.execute(new Runnable() {...}

点进去发现,execute是executor中定义的接口,而代码

ExecutorService pool = Executors.newFixedThreadPool(6);

中的ExecutorService 是executor的子接口,executor中只定义了execute方法,而这里的pool付给了一个ExecutorService 接口,该接口继承了executor,因此可以使用pool调用execute方法。这里的继承关系如下图:

最终的实现类是ThreadPoolExecutor,来看该类的execute方法:

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. int c = ctl.get();
  5. if (workerCountOf(c) < corePoolSize) {
  6. if (addWorker(command, true))
  7. return;
  8. c = ctl.get();
  9. }
  10. if (isRunning(c) && workQueue.offer(command)) {
  11. int recheck = ctl.get();
  12. if (! isRunning(recheck) && remove(command))
  13. reject(command);
  14. else if (workerCountOf(recheck) == 0)
  15. addWorker(null, false);
  16. }
  17. else if (!addWorker(command, false))
  18. reject(command);

这里分为三种情况:

1.当线程池中worker数量没有达到corepoolsize的话,那么将会调用addworker方法把一个worker加入到core线程池里。加入的过程中也会检查worker的数量和线程池的状态。

2.当corepool中的线程数量达到了corepoolsize,那么会将提交上来的command(Runnable)放入到队列里。在放入过程中,也会去检查线程池的状态等。

3.如果放入队列失败,如果此时还有最大线程池的设置,并且最大线程池数量大于corepoolsize的话,那么就会启动新的线程,否则,执行拒绝策略。

首先,看int c = ctl.get();这里的ctl:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

一个atomic类型的变量,该变量身兼两职:与ReentrantReadWriteLock类似,利用变量的不同位记录了线程池的状态和线程数量。比如runState通过对该变量位运算拿到线程池状态,workerCountOf利用位运算拿到线程数量:

  1. private static int runStateOf(int c) { return c & ~CAPACITY; }
  2. private static int workerCountOf(int c) { return c & CAPACITY; }
  3. private static int ctlOf(int rs, int wc) { return rs | wc; }

它的低29位用于存放当前的线程数, 因此一个线程池在理论上最大的线程数是 536870911; 高 3 位是用于表示当前线程池的状态, 其中高三位的值和状态对应如下:

  • 111: RUNNING
  • 000: SHUTDOWN
  • 001: STOP
  • 010: TIDYING
  • 110: TERMINATED

总之这个变量是用来记录线程池状态和当前线程数量的。

 

  1. //如果当前线程池的数量小于corepoolsize的数量,就去执行addWorker(command, true)
  2. if (workerCountOf(c) < corePoolSize) {
  3. if (addWorker(command, true))
  4. return;
  5. c = ctl.get();
  6. }

进入addworker方法,传入的参数分别是需要执行的提交的线程,和一个bool变量,该变量来标识该线程是否是提交到core线程池内。

  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. //标识,方便程序跳转
  3. retry:
  4. for (;;) {
  5. //获取当前的ctl值
  6. int c = ctl.get();
  7. //通过ctl获取当前线程池状态
  8. int rs = runStateOf(c);
  9. // Check if queue empty only if necessary.
  10. if (rs >= SHUTDOWN &&
  11. ! (rs == SHUTDOWN &&
  12. firstTask == null &&
  13. ! workQueue.isEmpty()))
  14. return false;
  15. for (;;) {
  16. //获取当前线程池线程数量
  17. int wc = workerCountOf(c);
  18. //这里core为true,也就是说,如果当前正在执行的线程数量大于等于CAPACITY
  19. //或者大于等于corepoolsize,就直接返回false
  20. if (wc >= CAPACITY ||
  21. wc >= (core ? corePoolSize : maximumPoolSize))
  22. return false;
  23. //否则的话,进入这里意味着corepool还没有满
  24. //cas操作对当前线程数的记录进行原子加操作
  25. //如果cas成功,就跳出当前两层循环向下执行
  26. if (compareAndIncrementWorkerCount(c))
  27. break retry;
  28. //运行到这里意味着上面对workercount的cas加操作失败
  29. //重新上面的操作,直到cas成功跳出循环或者线程池满导致条件判断失败返回false
  30. c = ctl.get(); // Re-read ctl
  31. if (runStateOf(c) != rs)
  32. continue retry;
  33. // else CAS failed due to workerCount change; retry inner loop
  34. }
  35. }
  36. //设置状态标志worker线程
  37. boolean workerStarted = false;
  38. boolean workerAdded = false;
  39. Worker w = null;
  40. try {
  41. //firstTask是传进来的线程,Worker的构造看下面的源码
  42. w = new Worker(firstTask);
  43. //t这个threa就是worker中通过线程工厂创建的线程
  44. final Thread t = w.thread;
  45. if (t != null) {
  46. //加锁
  47. final ReentrantLock mainLock = this.mainLock;
  48. mainLock.lock();
  49. try {
  50. // Recheck while holding lock.
  51. // Back out on ThreadFactory failure or if
  52. // shut down before lock acquired.
  53. //再次检查线程池运行状态
  54. int rs = runStateOf(ctl.get());
  55. //小于shutdown意味着running状态
  56. if (rs < SHUTDOWN ||
  57. (rs == SHUTDOWN && firstTask == null)) {
  58. //检查一下,t是新加入的线程,正常情况下不应该已经启动
  59. if (t.isAlive()) // precheck that t is startable
  60. throw new IllegalThreadStateException();
  61. //worker是个HashSet 就是core线程池,加入新建的worker
  62. workers.add(w);
  63. int s = workers.size();
  64. if (s > largestPoolSize)
  65. largestPoolSize = s;
  66. //加入成功设置状态
  67. workerAdded = true;
  68. }
  69. } finally {
  70. mainLock.unlock();
  71. }
  72. if (workerAdded) {
  73. //wroker加入到线程池后,将worker内的那个thread启动。并修改状态
  74. //该threa的启动并是执行的worker中的run方法
  75. t.start();
  76. workerStarted = true;
  77. }
  78. }
  79. } finally {
  80. if (! workerStarted)
  81. //如果worker启动失败,则要做处理
  82. addWorkerFailed(w);
  83. }
  84. //最后返回线程是否启动成功
  85. return workerStarted;
  86. }

 

其中worker线程的构造过程:

  1. Worker(Runnable firstTask) {
  2. setState(-1); // inhibit interrupts until runWorker
  3. this.firstTask = firstTask;
  4. this.thread = getThreadFactory().newThread(this);
  5. }

将提交上来的thread:firstTask赋值给Worker的成员变量,并将自己传递给线程工厂来创建出一个新的线程,这里有点绕,实际上是Worker本身就是一个Runnable类,它持有两个线程,一个是提交上来的线程firstTask,一个是将本身传递给线程工厂产生的一个线程thread:

  1. /** Thread this worker is running in. Null if factory fails. */
  2. final Thread thread;
  3. /** Initial task to run. Possibly null. */
  4. Runnable firstTask;

线程工厂主要做了一些封装,给线程设置个名字,优先级等。

  1. public Thread newThread(Runnable r) {
  2. Thread t = new Thread(group, r,
  3. namePrefix + threadNumber.getAndIncrement(),
  4. 0);
  5. if (t.isDaemon())
  6. t.setDaemon(false);
  7. if (t.getPriority() != Thread.NORM_PRIORITY)
  8. t.setPriority(Thread.NORM_PRIORITY);
  9. return t;
  10. }

下面来看,如果线程启动失败了怎么办,也就是代码中的addWorkerFailed(w);方法:

  1. private void addWorkerFailed(Worker w) {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. if (w != null)
  6. workers.remove(w);
  7. decrementWorkerCount();
  8. tryTerminate();
  9. } finally {
  10. mainLock.unlock();
  11. }
  12. }

该方法首先获取锁,因为workers是非线程安全的类,所以前面的addworker和这里的remove操作都需要枷锁。这里首先从worker中删除刚加入的worker线程,然后将worker计数器减一,并调用tryTerminate方法,最后释放锁。最后tryTerminate方法去关掉线程。

在上面worker的执行t.start();这段代码调用的是worker的run方法:

 

  1. public void run() {
  2. runWorker(this);
  3. }

runWorker方法:

 

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. Runnable task = w.firstTask;
  4. w.firstTask = null;
  5. w.unlock(); // allow interrupts
  6. boolean completedAbruptly = true;
  7. try {
  8. //第一轮循环task!=null,进入循环,执行提交上来的任务
  9. //第二轮循环,task=null,getTask()从队列中获取任务,如果队列是空
  10. //则调用getTask方法从队列中获取任务
  11. while (task != null || (task = getTask()) != null) {
  12. w.lock();
  13. // If pool is stopping, ensure thread is interrupted;
  14. // if not, ensure thread is not interrupted. This
  15. // requires a recheck in second case to deal with
  16. // shutdownNow race while clearing interrupt
  17. if ((runStateAtLeast(ctl.get(), STOP) ||
  18. (Thread.interrupted() &&
  19. runStateAtLeast(ctl.get(), STOP))) &&
  20. !wt.isInterrupted())
  21. wt.interrupt();
  22. try {
  23. beforeExecute(wt, task);
  24. Throwable thrown = null;
  25. try {
  26. task.run();
  27. } catch (RuntimeException x) {
  28. thrown = x; throw x;
  29. } catch (Error x) {
  30. thrown = x; throw x;
  31. } catch (Throwable x) {
  32. thrown = x; throw new Error(x);
  33. } finally {
  34. afterExecute(task, thrown);
  35. }
  36. } finally {
  37. task = null;
  38. w.completedTasks++;
  39. w.unlock();
  40. }
  41. }
  42. completedAbruptly = false;
  43. } finally {
  44. processWorkerExit(w, completedAbruptly);
  45. }
  46. }

做了一些状态的变更,最终调用的还是提交上来的那个线程的run方法。这里还有个比较重要的方法就是在循环中的getTask()方法:

while (task != null || (task = getTask()) != null) {... ...}

getTask方法就是不断的从队列中获取任务交给runWorker里的循环来启动:

  1. private Runnable getTask() {
  2. boolean timedOut = false; // Did the last poll() time out?
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);
  6. // Check if queue empty only if necessary.
  7. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  8. decrementWorkerCount();
  9. return null;
  10. }
  11. int wc = workerCountOf(c);
  12. // Are workers subject to culling?
  13. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  14. if ((wc > maximumPoolSize || (timed && timedOut))
  15. && (wc > 1 || workQueue.isEmpty())) {
  16. if (compareAndDecrementWorkerCount(c))
  17. return null;
  18. continue;
  19. }
  20. try {
  21. Runnable r = timed ?
  22. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  23. //fix pool走这里,如果队列中没有任务则阻塞等待
  24. workQueue.take();
  25. if (r != null)
  26. return r;
  27. timedOut = true;
  28. } catch (InterruptedException retry) {
  29. timedOut = false;
  30. }
  31. }
  32. }

因为fix线程池中没有超时的设置,所有如果队列中没有任务需要执行了,则take方法阻塞住:

  1. public E take() throws InterruptedException {
  2. E x;
  3. int c = -1;
  4. final AtomicInteger count = this.count;
  5. final ReentrantLock takeLock = this.takeLock;
  6. takeLock.lockInterruptibly();
  7. try {
  8. while (count.get() == 0) {
  9. //阻塞在这里
  10. notEmpty.await();
  11. }
  12. x = dequeue();
  13. c = count.getAndDecrement();
  14. if (c > 1)
  15. notEmpty.signal();
  16. } finally {
  17. takeLock.unlock();
  18. }
  19. if (c == capacity)
  20. signalNotFull();
  21. return x;
  22. }

也就是阻塞队列里获取数据的时候,阻塞在notEmpty.await();方法。而后面的offer(command)向队列中添加任务的时候又会调用signalNotEmpty();来唤醒被阻塞的worker线程中的take方法,这样worker线程获取了新的任务继续在线程池里执行了。

那么第一种情况:当线程池中worker数量没有达到corepoolsize的话已经分析完了。

 

下面分析第二种情况:当corepool中的线程数量达到了corepoolsize,那么会将提交上来的command(Runnable)放入到队列里。再贴一次上面的源码:

 

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. int c = ctl.get();
  5. //第一种情况
  6. //如果工作线程数小于核心线程数,
  7. if (workerCountOf(c) < corePoolSize) {
  8. if (addWorker(command, true))
  9. return;
  10. c = ctl.get();
  11. }
  12. //第二种情况
  13. //如果工作线程数大于核心线程数,则检查线程池状态是否是正在运行,
  14. //且将新线程向阻塞队列提交。
  15. if (isRunning(c) && workQueue.offer(command)) {
  16. int recheck = ctl.get();
  17. //如果线程池不在存活状态,那么从队列里把任务移除,并执行reject策略
  18. if (! isRunning(recheck) && remove(command))
  19. reject(command);
  20. //如果线程池的工作线程为零,则调用addWoker提交任务
  21. //如果走到这里,说明上步的remove方法失败了,任务已经提交到queue里了
  22. //所以这里addWorker传入null,然后从队列里拿就行了,否则任务就重复
  23. //提交了
  24. else if (workerCountOf(recheck) == 0)
  25. addWorker(null, false);
  26. }
  27. //第三种情况
  28. else if (!addWorker(command, false))
  29. reject(command);

这种情况下,只要线程池存活,就会尝试将这个任务放到workQueue中,通过前面的分析指导该queue是个LinkedBlockingQueue,来看它的offer方法:

  1. public boolean offer(E e) {
  2. if (e == null) throw new NullPointerException();
  3. final AtomicInteger count = this.count;
  4. if (count.get() == capacity)
  5. return false;
  6. int c = -1;
  7. Node<E> node = new Node<E>(e);
  8. //加锁
  9. final ReentrantLock putLock = this.putLock;
  10. putLock.lock();
  11. try {
  12. if (count.get() < capacity) {
  13. enqueue(node);
  14. c = count.getAndIncrement();
  15. if (c + 1 < capacity)
  16. //队列没有满,通知添加被阻塞的线程继续加入元素
  17. notFull.signal();
  18. }
  19. } finally {
  20. putLock.unlock();
  21. }
  22. //队列里没有任务了,那么所有的core线程池中的worker都被take阻塞住了
  23. if (c == 0)
  24. //通知所有等待的worker,队列有数据了,worker中的阻塞take方法被唤醒执行
  25. signalNotEmpty();
  26. return c >= 0;
  27. }

 

加入队列后,再次去判断线程池状态,如果是不再运行,那么再从队列中把任务删除掉并执行reject策略。如果remove失败,则提交一个空任务的worker到core线程池,消费这个queue里的任务就行了。

第三种情况比较简单了,如果corepoll满了,并且队列也满了,那么就会创建额外的线程池

 

  1. else if (!addWorker(command, false))
  2. reject(command);

在addworker中

  1. if (wc >= CAPACITY ||
  2. wc >= (core ? corePoolSize : maximumPoolSize))
  3. return false;

如果当前线程数量已经大于maximumPoolSize了,就执行拒绝策略。

实际上,在fix线程池中是不会走到第三种情况的,因为只有在队列满了的情况下才会出现第三者情况,但是这个队列是int型的最大值,在到第三种情况之前早就内存溢出了。

最后看下线程池关闭的方法shutdown:

 

  1. /**
  2. * Initiates an orderly shutdown in which previously submitted
  3. * tasks are executed, but no new tasks will be accepted.
  4. * Invocation has no additional effect if already shut down.
  5. *
  6. * <p>This method does not wait for previously submitted tasks to
  7. * complete execution. Use {@link #awaitTermination awaitTermination}
  8. * to do that.
  9. *
  10. * @throws SecurityException {@inheritDoc}
  11. */
  12. public void shutdown() {
  13. final ReentrantLock mainLock = this.mainLock;
  14. mainLock.lock();
  15. try {
  16. checkShutdownAccess();
  17. advanceRunState(SHUTDOWN);
  18. interruptIdleWorkers();
  19. onShutdown(); // hook for ScheduledThreadPoolExecutor
  20. } finally {
  21. mainLock.unlock();
  22. }
  23. tryTerminate();
  24. }

 

 

这个方法大意就是不再接受新的任务提交,但是会等待所有的已经提交的任务全部执行完才去关闭线程池。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/246485
推荐阅读
相关标签
  

闽ICP备14008679号