赞
踩
ctl:表示线程池状态,1、workCount 有效任务线程数 2、runState 运行状态 关闭还是运行
// 直接使用构造函数 new ThreadPoolExecutor(coreSize, maxSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); // 创建一个固定线程数量的线程池,可控制线程最大并发数,超出的任务会在队列中等待。 Executors.newFixedThreadPool(4); public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } // 此线程池只有一个线程,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行 Executors.newSingleThreadExecutor(); public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } // 存线程池内线程数量会随时变化,当空闲线程不足则启用新线程,如果相对空闲,闲置线程会被回收 Executors.newCachedThreadPool(); public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } // 固定线程数的线程池,支持定时及周期性任务执行。 Executors.newScheduledThreadPool(4); public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
// ThreadPoolExecutor 继承于 AbstractExecutorService
public class ThreadPoolExecutor extends AbstractExecutorService
// 如果要提交任务需要调用 AbstractExecutorService 的 submit方法
executor.submit(new MyRunnable());
整体的提交流程如下图所示:
AbstractExecutorService的submit方法
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 将任务转为 FutureTask
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 调用自己的execute,也就是ThreadPoolExecutor的execute
execute(ftask);
return ftask;
}
ThreadPoolExecutor的execute
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 当前运行的线程数小于核心线程数,创建新的worker并将提交的任务作为其首个任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 如果线程数大于coreSize或者创建worker失败了 // 比如workerCount = 1 coreSize = 2 并发调用addWorker 则有一个会失败 // 且线程池还是运行状态,且队列添加任务成功 if (isRunning(c) && workQueue.offer(command)) { // 如果线程池状态变了(非运行状态)就需要从队列移除任务 int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); // 移除成功则执行拒绝策略 // 如果线程为0,则新建一个线程,比如上面的CachedThreadPool中coreSize为0 // 不会去创建核心线程,所以在这里创建线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 线程数量超过了corePoolSize,并且队列已满 // 创建新的worker执行当前任务,如果worker已经超过max,则执行拒绝策略 else if (!addWorker(command, false)) reject(command); }
addWorker方法
/** * firstTask:如果不为空,则应该为创建的线程先执行的任务。 * 对于少于核心线程数的情况或者队列已满,可以用firstTask来绕过队列,让线程直接执行。 * * core:判断的时候使用核心线程数还是最大线程数 */ // 代码过多,省略部分源码 private boolean addWorker(Runnable firstTask, boolean core) { // 下面省略的代码主要目的是判断下线程池状态,并且尝试增加workCount的值, // ...... try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { try { // 将新建的worker加入workers if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } if (workerAdded) { // 如果worker添加成功,则启动内部线程 t.start(); workerStarted = true; } } } finally { if (!workerStarted) // 如果上面线程启动失败则从workers移除worker addWorkerFailed(w); } return workerStarted; // 返回worker是否成功启动 }
上面都只是说了worker的创建,但是都没有说任务什么时候去执行下面就看看任务怎么处理的
worker继承AbstractQueuedSynchronizer主要用于处理并发问题
实现Runnable主要为了实现run方法以方便外部调用
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable {
final Thread thread; // 内部的线程
Runnable firstTask; // 首个任务,可能为空
volatile long completedTasks; // 完成任务数
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 使用ThreadFactory新建线程并将自己作为参数传入(自己实现Runnable接口)
// 调用thread.start()会执行下面的run方法
this.thread = getThreadFactory().newThread(this);
}
}
主要的实现方法
// run比较简单 调用自己的方法 public void run() { runWorker(this); } // 主要的处理逻辑 final void runWorker(Worker w) { // 如果有firstTask 则task = firstTask 且将firstTask = null Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 如果task为空 则阻塞 尝试从队列获取任务 // 如果超过keepAliveTime或线程池已经SHUTDOWN则返回空 while (task != null || (task = getTask()) != null) { // 源码看着真难受,主要是为了满足下面两个情况,把下面情况代入是通的。 // 如果状态是SHUTDOWN,会修改中断状态,继续跑完所有任务 // 1. 线程池停了,确保线程中断 // 2. 如果没停,确保线程没有中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 空方法 可以根据业务自己继承实现 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); // 执行真正的业务逻辑 } finally { // 空方法 可以根据业务自己继承实现 afterExecute(task, thrown); } } } completedAbruptly = false; } finally { // 如果上面代码没有正常执行 completedAbruptly = true // processWorkerExit 将 worker从workers里面移除 尝试结束线程池 processWorkerExit(w, completedAbruptly); } }
看看任务是如何获取的
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 如果线程池已经关闭或者任务队列已经为空 减小worker的数量 返回NULL if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 如果允许核心线程超时或者当前线程数大于核心线程数 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 如果当前线程数大于最大线程数 或者 已经超时 且 有启动的线程 或 任务为空 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 是否限制超时时间 如果有 则尝试获取任务等待keepAliveTime 否则尝试获取任务 // 如果超时了,设置timedOut为true,上面代码就可能返回null Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
workQueue:缓冲队列,队列类型
当maxPoolSize达到最大,任务数量任在继续增加,线程池处理能力达到最大,无法继续接受新任务,会拒绝接受新任务,调用指定的 RejectedExecutionHandler 来执行
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
AbortPolicy:直接抛出RejectedExecutionException异常,默认。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
CallerRunsPolicy:只用调用者所在线程来运行任务。由于饱和,下个任务会由调用execute的线程执行,由于任务提交线程需要去执行任务,那么此线程不会提交新任务,因此请求保存在TCP层的队列而不是线程池队列
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if(!e.isShutdown()) {
// submit->execute->reject都是由提交任务的线程执行的
// 所以r里面的具体逻辑也是由其处理
r.run();
}
}
DiscardOldestPolicy:丢弃队列里最早的一个任务,并让线程池执行这个任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r); // 交由线程池去执行,重新跑一遍execute
}
}
DiscardPolicy:不处理,丢弃掉。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
public void shutdown() {
finalReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN); // 循环将状态更新为SHUTDOWN
interruptIdleWorkers(); // 尝试调用所有线程的interrupt()方法
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate(); // 尝试 修改为 TIDYING -> TERMINATED
}
如果这个时候提交任务,不会放入队列,而是直接执行拒绝策略。从上文runWorker方法的源码可以看到,线程不会立即终止,而是等到所有任务跑完才会进入中断。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP); // 将状态修改为STOP
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
同上,如果这个时候提交任务,不会放入队列,而是直接执行拒绝策略。从上文runWorker方法的源码可以看到,空闲线程如果这个时候去获取任务会调用interrupt,并且可以执行完这个任务,但是下次将获取不到这个任务而退出循环。
private class LaunchTask implements Runnable {
public LaunchTask() {
System.out.println(1);
}
@Override
public void run() {
System.out.println(2);
}
}
// 原来的提交方式
submit(LaunchTask::new);
这种方法会导致LaunchTask的run方法不执行,而只会执行LaunchTask的构造方法。造成此问题主要原因还是自己对lambda不熟悉,目前理解LaunchTask::new会创建一个labmda类型,其run方法包装LaunchTask的构造方法而不是LaunchTask的run方法。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。