赞
踩
ThreadPoolExecutor:这个是JAVA自己实现的线程池执行类,基本上创建线程池都是通过这个类进行的创建!
ThreadPoolTaskExecutor :这个是springboot基于ThreadPoolExecutor实现的一个线程池执行类。(SpringBoot中用这个!!!!)
# 线程池配置
# 核心线程数
threadPool.spring.corePoolSize: 20
# 最大线程数
threadPool.spring.maxPoolSize: 40
# 线程队列长度
threadPool.spring.queueCapacity: 100
# 超过核心线程数的线程所允许的空闲时间
threadPool.spring.keepAliveSeconds: 300
package com.biaogexf.Tools.config; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.concurrent.*; /** * 线程池配置 */ @Configuration public class ThreadPoolConfig { @Value("${threadPool.spring.corePoolSize}") private Integer corePoolSize; @Value("${threadPool.spring.maxPoolSize}") private Integer maxPoolSize; @Value("${threadPool.spring.queueCapacity}") private Integer queueCapacity; @Value("${threadPool.spring.keepAliveSeconds}") private Integer keepAliveSeconds; // SpringBoot中使用ThreadPoolExecutor @Bean(name = "threadPoolExecutor") public ExecutorService threadPoolExecutor() { ExecutorService executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveSeconds, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); return executor; } }
package com.biaogexf.Tools.thread.springboot; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.biaogexf.tools.ToolsApplication; import java.util.concurrent.ThreadPoolExecutor; @RunWith(SpringRunner.class) @SpringBootTest(classes = ToolsApplication.class) @Slf4j public class ThreadPoolExecutorTest { @Autowired ThreadPoolExecutor threadPoolExecutor; //会去匹配 @Bean(name = "threadPoolExecutor") 这个线程池 @Test public void testThreadPoolExecutor() { for (int i = 0; i < 10; i++) { final int index = i; // execute用来提交线程的执行 threadPoolExecutor.execute(new Runnable() { @Override public void run() { log.info("{}:{}", Thread.currentThread().getName(), index); } }); } } }
在springboot当中,根据官方文档的说明,如果没有配置线程池的话,springboot会自动配置一个ThreadPoolTaskExecutor 线程池到bean当中,我们只需要按照他的方式调用就可以了!!!
方式一:
第一步:在Application启动类上面加上@EnableAsync 第二步:在需要异步执行的方法上加@Async
- 1
- 2
- 3
方式二:
直接注入ThreadPoolTaskExecutor
- 1
package com.biaogexf.tools; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableAsync; /** * 启动程序 */ @SpringBootApplication @EnableAsync // 开启异步 1.在Application启动类上面加上@EnableAsync !!!!!!!!!! public class ToolsApplication { public static void main(String[] args) { System.setProperty("spring.devtools.restart.enabled", "false"); SpringApplication.run(ToolsApplication.class, args); } }
package com.biaogexf.tools.thread.springboot; import com.biaogexf.tools.ToolsApplication; import com.biaogexf.tools.service.AsyncService; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.test.context.junit4.SpringRunner; import java.util.concurrent.Executor; /** * 在springboot当中,根据 官方文档的说明,如果没有配置线程池的话, * springboot会自动配置一个ThreadPoolTaskExecutor 线程池到bean当中,我们只需要按照他的方式调用就可以了!!! */ @RunWith(SpringRunner.class) @SpringBootTest(classes = ToolsApplication.class) @Slf4j public class SpringBootDefaultThreadPoolTest { // 方式一:@Enable+@Async @Autowired AsyncService asyncService; // 方式二:直接注入ThreadPoolTaskExecutor @Autowired Executor threadPoolTaskExecutor; @Test public void testDefaultPool() { // 方式一 for (int i = 0; i < 10; i++) { asyncService.helllo(i); } // 方式二 for (int j = 0; j < 10; j++) { final int index = j; threadPoolTaskExecutor.execute(new Runnable() { @Override public void run() { log.info("threadPoolTaskExecutor 创建线程 异步执行:{}", index); } }); } } }
package com.biaogexf.tools.service; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; @Service public class AsyncService { private final Logger log = LoggerFactory.getLogger(AsyncService.class); @Async // 2.在需要异步执行的方法上加@Async public void helllo(int i) { log.info("异步线程启动:{}", i); } }
package com.biaogexf.tools.config; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.*; /** * 线程池配置 * ThreadPoolTaskExecutor的使用 * 自定义多个线程池 & 指定线程池使用 */ @Configuration public class ThreadPoolConfig { @Value("${threadPool.spring.corePoolSize}") private Integer corePoolSize; @Value("${threadPool.spring.maxPoolSize}") private Integer maxPoolSize; @Value("${threadPool.spring.queueCapacity}") private Integer queueCapacity; @Value("${threadPool.spring.keepAliveSeconds}") private Integer keepAliveSeconds; // SpringBoot中使用ThreadPoolExecutor @Bean(name = "threadPoolExecutor") public ThreadPoolExecutor threadPoolExecutor() { ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveSeconds, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); return executor; } // 自定义ThreadPoolTaskExecutor线程池 @Bean(name = "customThreadPoolTaskExecutor") public ThreadPoolTaskExecutor customThreadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 设置线程池参数 executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setKeepAliveSeconds(keepAliveSeconds); executor.setThreadNamePrefix("myExecutor--"); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setAwaitTerminationSeconds(60); // 修改拒绝策略为使用当前线程执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 初始化线程池 executor.initialize(); return executor; } // 自定义ThreadPoolTaskExecutor线程池 @Bean(name = "customThreadPoolTaskExecutor11111") public ThreadPoolTaskExecutor customThreadPoolTaskExecutor1() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 设置线程池参数 executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setKeepAliveSeconds(keepAliveSeconds); executor.setThreadNamePrefix("myExecutor11111--"); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setAwaitTerminationSeconds(60); // 修改拒绝策略为使用当前线程执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 初始化线程池 executor.initialize(); return executor; } }
package com.biaogexf.tools.thread.springboot; import com.biaogexf.tools.ToolsApplication; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.test.context.junit4.SpringRunner; /** * SpringBoot中自定义多个线程池 & 指定线程池使用 */ @RunWith(SpringRunner.class) @SpringBootTest(classes = ToolsApplication.class) @Slf4j public class SpringBootCustomThreadPoolTest { // SpringBoot中使用个性化配置后的ThreadPoolTaskExecutor线程池 自定义线程池 customThreadPoolTaskExecutor @Autowired private ThreadPoolTaskExecutor customThreadPoolTaskExecutor; // 会去匹配 @Bean("customThreadPoolTaskExecutor") 这个线程池 // SpringBoot中使用个性化配置后的ThreadPoolTaskExecutor线程池 自定义线程池 customThreadPoolTaskExecutor1 @Autowired private ThreadPoolTaskExecutor customThreadPoolTaskExecutor11111; // 会去匹配 @Bean("customThreadPoolTaskExecutor11111") 这个线程池 @Test public void testCustomThreadPoolTaskExecutor() { // SpringBoot中使用自定义线程池 customThreadPoolTaskExecutor for (int j = 0; j < 10; j++) { final int index = j; customThreadPoolTaskExecutor.execute(new Runnable() { @Override public void run() { log.info("###SpringBoot中使用自定义线程池 customThreadPoolTaskExecutor 创建线程 异步执行:{}", index); } }); } // SpringBoot中使用自定义线程池 customThreadPoolTaskExecutor1 for (int j = 0; j < 10; j++) { final int index = j; customThreadPoolTaskExecutor11111.execute(new Runnable() { @Override public void run() { log.info("###SpringBoot中使用自定义线程池 customThreadPoolTaskExecutor11111 创建线程 异步执行:{}", index); } }); } } }
springboot之线程池ThreadPoolTaskExecutor以及@Async异步注解
在spring boot中使用java线程池ExecutorService
在spring中,可以通过@EnableAsync + @Async两个注解非常快捷的实现异步。
但是默认情况下,每一次调用都是开启一个新的线程!
@Async用的是SimpleAsyncTaskExecutor线程池,但是如果没有对SimpleAsyncTaskExecutor做策略配置的话,是不复用线程的,这是对服务器资源的浪费。
/** * 启动程序 */ @SpringBootApplication @EnableAsync // 开启异步 public class ToolsApplication { public static void main(String[] args) { System.setProperty("spring.devtools.restart.enabled", "true"); SpringApplication.run(ToolsApplication.class, args); } } /** * 使用@Enable + @Async快速实现异步 */ @RestController @RequestMapping("/WxworkAsyncController") public class WxworkAsyncController { @Autowired private WxworkAsyncService wxworkAsyncService; @GetMapping("async") public String async() { // 模拟异步处理任务 wxworkAsyncService.async(); return "success123213213fffff"; } } /** * 使用@Enable + @Async快速实现异步 */ @Service public class WxworkAsyncService { private static final Logger log = LoggerFactory.getLogger(WxworkAsyncService.class); @Async public void async() { try { // 模拟业务执行操作 Thread.sleep(1000); } catch (InterruptedException e) { log.error("### WxworkAsyncService InterruptedException ", e); } log.info("{} -- {}", System.currentTimeMillis(), Thread.currentThread().getName()); } }
21-05-17.10:21:16.441 [SimpleAsyncTaskExecutor-1] INFO WxworkAsyncService - 1621218076441 – SimpleAsyncTaskExecutor-1
21-05-17.10:21:32.944 [SimpleAsyncTaskExecutor-2] INFO WxworkAsyncService - 1621218092944 – SimpleAsyncTaskExecutor-2
21-05-17.10:21:33.445 [SimpleAsyncTaskExecutor-3] INFO WxworkAsyncService - 1621218093445 – SimpleAsyncTaskExecutor-3
21-05-17.10:21:33.764 [SimpleAsyncTaskExecutor-4] INFO WxworkAsyncService - 1621218093764 – SimpleAsyncTaskExecutor-4
21-05-17.10:21:34.052 [SimpleAsyncTaskExecutor-5] INFO WxworkAsyncService - 1621218094052 – SimpleAsyncTaskExecutor-5
21-05-17.10:21:34.509 [SimpleAsyncTaskExecutor-6] INFO WxworkAsyncService - 1621218094509 – SimpleAsyncTaskExecutor-6
21-05-17.10:21:34.935 [SimpleAsyncTaskExecutor-7] INFO WxworkAsyncService - 1621218094935 – SimpleAsyncTaskExecutor-7
21-05-17.10:21:35.139 [SimpleAsyncTaskExecutor-8] INFO WxworkAsyncService - 1621218095139 – SimpleAsyncTaskExecutor-8
21-05-17.10:21:35.523 [SimpleAsyncTaskExecutor-9] INFO WxworkAsyncService - 1621218095523 – SimpleAsyncTaskExecutor-9
21-05-17.10:21:35.905 [SimpleAsyncTaskExecutor-10] INFO WxworkAsyncService - 1621218095905 – SimpleAsyncTaskExecutor-10
21-05-17.10:21:36.272 [SimpleAsyncTaskExecutor-11] INFO WxworkAsyncService - 1621218096272 – SimpleAsyncTaskExecutor-11
21-05-17.10:21:36.630 [SimpleAsyncTaskExecutor-12] INFO WxworkAsyncService - 1621218096630 – SimpleAsyncTaskExecutor-12
21-05-17.10:21:36.990 [SimpleAsyncTaskExecutor-13] INFO WxworkAsyncService - 1621218096990 – SimpleAsyncTaskExecutor-13
21-05-17.10:21:37.341 [SimpleAsyncTaskExecutor-14] INFO WxworkAsyncService - 1621218097341 – SimpleAsyncTaskExecutor-14
21-05-17.10:21:37.687 [SimpleAsyncTaskExecutor-15] INFO WxworkAsyncService - 1621218097687 – SimpleAsyncTaskExecutor-15
21-05-17.10:21:38.032 [SimpleAsyncTaskExecutor-16] INFO WxworkAsyncService - 1621218098032 – SimpleAsyncTaskExecutor-16
21-05-17.10:21:38.364 [SimpleAsyncTaskExecutor-17] INFO WxworkAsyncService - 1621218098364 – SimpleAsyncTaskExecutor-17
21-05-17.10:21:38.726 [SimpleAsyncTaskExecutor-18] INFO WxworkAsyncService - 1621218098726 – SimpleAsyncTaskExecutor-18
自定义线程池很简单,只要实现AsyncConfigurer接口,并且重写getAsyncExecutor方法。
也可以重新getAsyncUncaughtExceptionHandler方法,实现自定义异常处理类。
当implements AsyncConfigurer 接口,实现了自定义线程池后,会默认覆盖springboot自带的异步线程池
package com.biaogexf.tools.config; import lombok.extern.slf4j.Slf4j; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.lang.reflect.Method; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** * 实现AsyncConfigurer接口,自定义线程池,并通过@Async注解使用 (该注解使用在需要在新线程中执行的方法上) * * @Async 注解在service内的方法上(@EnableAsync开启异步),可以实现在controller的异步调用, * 调用的被@Async注解的方法会在一个单独线程内运行,适合即使返回,异步解耦,service慢慢去处理 * * @Async 注解的方法只能 返回void或者future类型的返回值,其他值会使 注解无效,因为不能异步执行 * * 当implements AsyncConfigurer接口实现了自定义的异步线程池后,会默认覆盖spring自带的异步线程池 */ @Configuration @EnableAsync @Slf4j public class SpringThreadPoolConfig implements AsyncConfigurer { @Value("${threadPool.spring.corePoolSize}") private Integer corePoolSize; @Value("${threadPool.spring.maxPoolSize}") private Integer maxPoolSize; @Value("${threadPool.spring.queueCapacity}") private Integer queueCapacity; @Value("${threadPool.spring.keepAliveSeconds}") private Integer keepAliveSeconds; @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(corePoolSize); taskExecutor.setMaxPoolSize(maxPoolSize); taskExecutor.setQueueCapacity(queueCapacity); taskExecutor.setKeepAliveSeconds(keepAliveSeconds); taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.setThreadNamePrefix("MyAsync-"); taskExecutor.initialize(); return taskExecutor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new MyAsyncExceptionHandler(); } } /** * 自定义异常处理类 * 被 @Async 修饰的方法在独立线程调用,不能被@ControllerAdvice全局异常处理器捕获,所以需要自己设置异常处理 */ @Slf4j class MyAsyncExceptionHandler implements AsyncUncaughtExceptionHandler { @Override public void handleUncaughtException(Throwable throwable, Method method, Object... objects) { log.info("Exception message - " + throwable.getMessage()); log.info("Method name - " + method.getName()); for (Object param: objects) { log.info("Parameter value - " + param); } log.error("handleUncaughtException method:" + method.getName(), throwable); } }
21-05-17.11:00:32.289 [MyAsync-1 ] INFO WxworkAsyncService - 1621220432289 – MyAsync-1
21-05-17.11:00:34.512 [MyAsync-2 ] INFO WxworkAsyncService - 1621220434512 – MyAsync-2
21-05-17.11:00:34.880 [MyAsync-3 ] INFO WxworkAsyncService - 1621220434880 – MyAsync-3
21-05-17.11:00:35.252 [MyAsync-4 ] INFO WxworkAsyncService - 1621220435252 – MyAsync-4
21-05-17.11:00:35.639 [MyAsync-5 ] INFO WxworkAsyncService - 1621220435639 – MyAsync-5
21-05-17.11:00:36.188 [MyAsync-1 ] INFO WxworkAsyncService - 1621220436188 – MyAsync-1
21-05-17.11:00:36.569 [MyAsync-2 ] INFO WxworkAsyncService - 1621220436569 – MyAsync-2
21-05-17.11:00:39.163 [MyAsync-3 ] INFO WxworkAsyncService - 1621220439163 – MyAsync-3
21-05-17.11:00:39.522 [MyAsync-4 ] INFO WxworkAsyncService - 1621220439522 – MyAsync-4
21-05-17.11:00:40.003 [MyAsync-5 ] INFO WxworkAsyncService - 1621220440003 – MyAsync-5
21-05-17.11:00:43.637 [MyAsync-1 ] INFO WxworkAsyncService - 1621220443637 – MyAsync-1
21-05-17.11:00:51.359 [MyAsync-2 ] INFO WxworkAsyncService - 1621220451359 – MyAsync-2
21-05-17.11:01:01.652 [MyAsync-3 ] INFO WxworkAsyncService - 1621220461652 – MyAsync-3
@ASYNC,@ENABLEASYNC,ASYNCCONFIGURER 自定义线程池
在SpringBoot中使用线程池必须根据业务场景自定义配置线程池核心参数&实现自定义线程池!!!不可用SpringBoot中默认自带的线程池去执行任务,因为这样不可控!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。