赞
踩
@Async注解调用#- public class AsyncTest {
- @Async
- public void async(String name) throws InterruptedException {
- System.out.println("async" + name + " " + Thread.currentThread().getName());
- Thread.sleep(1000);
- }
- }
启动类上需要添加@EnableAsync注解,否则不会生效。
- @SpringBootApplication
- //@EnableAsync
- public class Test1Application {
- public static void main(String[] args) throws InterruptedException {
- ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args);
- AsyncTest bean = run.getBean(AsyncTest.class);
- for(int index = 0; index <= 10; ++index){
- bean.async(String.valueOf(index));
- }
- }
- }
ThreadPoolTaskExecutor#此时可不加 @EnableAsync注解
- @SpringBootTest
- class Test1ApplicationTests {
-
- @Resource
- ThreadPoolTaskExecutor threadPoolTaskExecutor;
-
- @Test
- void contextLoads() {
- Runnable runnable = () -> {
- System.out.println(Thread.currentThread().getName());
- };
-
- for(int index = 0; index <= 10; ++index){
- threadPoolTaskExecutor.submit(runnable);
- }
- }
-
- }

SpringBoot线程池的常见配置:
- spring:
- task:
- execution:
- pool:
- core-size: 8
- max-size: 16 # 默认是 Integer.MAX_VALUE
- keep-alive: 60s # 当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过keepAliveTime,线程将被终止
- allow-core-thread-timeout: true # 是否允许核心线程超时,默认true
- queue-capacity: 100 # 线程队列的大小,默认Integer.MAX_VALUE
- shutdown:
- await-termination: false # 线程关闭等待
- thread-name-prefix: task- # 线程名称的前缀
TaskExecutionAutoConfiguration 类中定义了 ThreadPoolTaskExecutor,该类的内部实现也是基于java原生的 ThreadPoolExecutor类。initializeExecutor()方法在其父类中被调用,但是在父类中 RejectedExecutionHandler 被定义为了 private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(); ,并通过initialize()方法将AbortPolicy传入initializeExecutor()中。
注意在TaskExecutionAutoConfiguration 类中,ThreadPoolTaskExecutor类的bean的名称为: applicationTaskExecutor 和 taskExecutor。
- // TaskExecutionAutoConfiguration#applicationTaskExecutor()
- @Lazy
- @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
- AsyncAnnotationBeanPostProcessor.DEFAUL
- T_TASK_EXECUTOR_BEAN_NAME })
- @ConditionalOnMissingBean(Executor.class)
- public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
- return builder.build();
- }
- // ThreadPoolTaskExecutor#initializeExecutor()
- @Override
- protected ExecutorService initializeExecutor(
- ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
-
- BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
-
- ThreadPoolExecutor executor;
- if (this.taskDecorator != null) {
- executor = new ThreadPoolExecutor(
- this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
- queue, threadFactory, rejectedExecutionHandler) {
- @Override
- public void execute(Runnable command) {
- Runnable decorated = taskDecorator.decorate(command);
- if (decorated != command) {
- decoratedTaskMap.put(decorated, command);
- }
- super.execute(decorated);
- }
- };
- }
- else {
- executor = new ThreadPoolExecutor(
- this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
- queue, threadFactory, rejectedExecutionHandler);
-
- }
-
- if (this.allowCoreThreadTimeOut) {
- executor.allowCoreThreadTimeOut(true);
- }
-
- this.threadPoolExecutor = executor;
- return executor;
- }

- // ExecutorConfigurationSupport#initialize()
- public void initialize() {
- if (logger.isInfoEnabled()) {
- logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
- }
- if (!this.threadNamePrefixSet && this.beanName != null) {
- setThreadNamePrefix(this.beanName + "-");
- }
- this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
- }
覆盖默认的 taskExecutor对象,bean的返回类型可以是ThreadPoolTaskExecutor也可以是Executor。
- @Configuration
- public class ThreadPoolConfiguration {
-
- @Bean("taskExecutor")
- public ThreadPoolTaskExecutor taskExecutor() {
- ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
- //设置线程池参数信息
- taskExecutor.setCorePoolSize(10);
- taskExecutor.setMaxPoolSize(50);
- taskExecutor.setQueueCapacity(200);
- taskExecutor.setKeepAliveSeconds(60);
- taskExecutor.setThreadNamePrefix("myExecutor--");
- taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
- taskExecutor.setAwaitTerminationSeconds(60);
- //修改拒绝策略为使用当前线程执行
- taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
- //初始化线程池
- taskExecutor.initialize();
- return taskExecutor;
- }
- }

如果出现了多个线程池,例如再定义一个线程池 taskExecutor2,则直接执行会报错。此时需要指定bean的名称即可。
- @Bean("taskExecutor2")
- public ThreadPoolTaskExecutor taskExecutor2() {
- ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
- //设置线程池参数信息
- taskExecutor.setCorePoolSize(10);
- taskExecutor.setMaxPoolSize(50);
- taskExecutor.setQueueCapacity(200);
- taskExecutor.setKeepAliveSeconds(60);
- taskExecutor.setThreadNamePrefix("myExecutor2--");
- taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
- taskExecutor.setAwaitTerminationSeconds(60);
- //修改拒绝策略为使用当前线程执行
- taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
- //初始化线程池
- taskExecutor.initialize();
- return taskExecutor;
- }

引用线程池时,需要将变量名更改为bean的名称,这样会按照名称查找。
- @Resource
- ThreadPoolTaskExecutor taskExecutor2;
对于使用@Async注解的多线程则在注解中指定bean的名字即可。
- @Async("taskExecutor2")
- public void async(String name) throws InterruptedException {
- System.out.println("async" + name + " " + Thread.currentThread().getName());
- Thread.sleep(1000);
- }
线程池的四种拒绝策略
ThreadPoolExecutor 类的构造函数如下:
- public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
- BlockingQueue<Runnable> workQueue) {
- this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
- Executors.defaultThreadFactory(), defaultHandler);
- }
不限制最大线程数(maximumPoolSize=Integer.MAX_VALUE),如果有空闲的线程超过需要,则回收,否则重用已有的线程。
- new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>());
定长线程池,超出线程数的任务会在队列中等待。
- return new ThreadPoolExecutor(nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>());
类似于newCachedThreadPool,线程数无上限,但是可以指定corePoolSize。可实现延迟执行、周期执行。
- public ScheduledThreadPoolExecutor(int corePoolSize) {
- super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
- new DelayedWorkQueue());
- }
周期执行:
- ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
- scheduledThreadPool.scheduleAtFixedRate(()->{
- System.out.println("rate");
- }, 1, 1, TimeUnit.SECONDS);
延时执行:
- scheduledThreadPool.schedule(()->{
- System.out.println("delay 3 seconds");
- }, 3, TimeUnit.SECONDS);
单线程线程池,可以实现线程的顺序执行。
- public static ExecutorService newSingleThreadExecutor() {
- return new FinalizableDelegatedExecutorService
- (new ThreadPoolExecutor(1, 1,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>()));
- }
CallerRunsPolicy:线程池让调用者去执行。
AbortPolicy:如果线程池拒绝了任务,直接报错。
DiscardPolicy:如果线程池拒绝了任务,直接丢弃。
DiscardOldestPolicy:如果线程池拒绝了任务,直接将线程池中最旧的,未运行的任务丢弃,将新任务入队。
直接在主线程中执行了run方法。
- public static class CallerRunsPolicy implements RejectedExecutionHandler {
-
- public CallerRunsPolicy() { }
-
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- if (!e.isShutdown()) {
- r.run();
- }
- }
- }
效果类似于:
- Runnable thread = ()->{
- System.out.println(Thread.currentThread().getName());
- try {
- Thread.sleep(0);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- };
-
- thread.run();
直接抛出RejectedExecutionException异常,并指示任务的信息,线程池的信息。、
- public static class AbortPolicy implements RejectedExecutionHandler {
-
- public AbortPolicy() { }
-
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- throw new RejectedExecutionException("Task " + r.toString() +
- " rejected from " +
- e.toString());
- }
- }
DiscardPolicy
什么也不做。
- public static class DiscardPolicy implements RejectedExecutionHandler {
-
- public DiscardPolicy() { }
-
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- }
- }
DiscardOldestPolicy
e.getQueue().poll() : 取出队列最旧的任务。
e.execute(r) : 当前任务入队。
- public static class DiscardOldestPolicy implements RejectedExecutionHandler {
-
- public DiscardOldestPolicy() { }
-
- public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
- if (!e.isShutdown()) {
- e.getQueue().poll();
- e.execute(r);
- }
- }
- }
java的线程池中保存的是 java.util.concurrent.ThreadPoolExecutor.Worker 对象,该对象在 被维护在private final HashSet<Worker> workers = new HashSet<Worker>();。workQueue是保存待执行的任务的队列,线程池中加入新的任务时,会将任务加入到workQueue队列中。
- private final class Worker
- extends AbstractQueuedSynchronizer
- implements Runnable
- {
- /**
- * This class will never be serialized, but we provide a
- * serialVersionUID to suppress a javac warning.
- */
- private static final long serialVersionUID = 6138294804551838833L;
-
- /** Thread this worker is running in. Null if factory fails. */
- final Thread thread;
- /** Initial task to run. Possibly null. */
- Runnable firstTask;
- /** Per-thread task counter */
- volatile long completedTasks;
-
- /**
- * Creates with given first task and thread from ThreadFactory.
- * @param firstTask the first task (null if none)
- */
- Worker(Runnable firstTask) {
- setState(-1); // inhibit interrupts until runWorker
- this.firstTask = firstTask;
- this.thread = getThreadFactory().newThread(this);
- }
-
- /** Delegates main run loop to outer runWorker */
- public void run() {
- runWorker(this);
- }
-
- // Lock methods
- //
- // The value 0 represents the unlocked state.
- // The value 1 represents the locked state.
-
- protected boolean isHeldExclusively() {
- return getState() != 0;
- }
-
- protected boolean tryAcquire(int unused) {
- if (compareAndSetState(0, 1)) {
- setExclusiveOwnerThread(Thread.currentThread());
- return true;
- }
- return false;
- }
-
- protected boolean tryRelease(int unused) {
- setExclusiveOwnerThread(null);
- setState(0);
- return true;
- }
-
- public void lock() { acquire(1); }
- public boolean tryLock() { return tryAcquire(1); }
- public void unlock() { release(1); }
- public boolean isLocked() { return isHeldExclusively(); }
-
- void interruptIfStarted() {
- Thread t;
- if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- }
- }
- }
- }

work对象的执行依赖于 runWorker(),与我们平时写的线程不同,该线程处在一个循环中,并不断地从队列中获取新的任务执行。因此线程池中的线程才可以复用,而不是像我们平常使用的线程一样执行完毕就结束。
- final void runWorker(Worker w) {
- Thread wt = Thread.currentThread();
- Runnable task = w.firstTask;
- w.firstTask = null;
- w.unlock(); // allow interrupts
- boolean completedAbruptly = true;
- try {
- while (task != null || (task = getTask()) != null) {
- w.lock();
- // If pool is stopping, ensure thread is interrupted;
- // if not, ensure thread is not interrupted. This
- // requires a recheck in second case to deal with
- // shutdownNow race while clearing interrupt
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- wt.interrupt();
- try {
- beforeExecute(wt, task);
- Throwable thrown = null;
- try {
- task.run();
- } catch (RuntimeException x) {
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- afterExecute(task, thrown);
- }
- } finally {
- task = null;
- w.completedTasks++;
- w.unlock();
- }
- }
- completedAbruptly = false;
- } finally {
- processWorkerExit(w, completedAbruptly);
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。