当前位置:   article > 正文

SpringBoot中的异步操作与线程池_destroymethod = "shutdown

destroymethod = "shutdown

线程池类型

Java通过 java.util.concurrent.Executors 的静态方法提供五种线程池

  1. newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
  2. newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  3. newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
  4. newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
  5. newWorkStealingPool 这是java8新增的线程池类型。创建一个含有足够多线程的线程池,来维持相应的并行级别,它会通过工作窃取的方式,使得多核的CPU不会闲置,总会有活着的线程让CPU去运行。

五种线程池的底层实现

  • ThreadPoolExecutor 是CachedThreadPool、FixedThreadPool、SingleThreadExecutor、ScheduledThreadPool 这四种类型线程池的底层实现
  • ForkJoinPool (java7已有) 是WorkStealingPool线程池的底层实现

使用线程池的优点

  • 重用存在的线程,减少对象创建、消亡的开销,性能佳。
  • 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
  • 提供定时执行、定期执行、单线程、并发数控制等功能。

如何在SpringBoot中优雅的使用线程池

注册线程池

在config目录下创建 AsyncConfig 配置类,在配置类中定义线程池

  1. package com.example.async_demo.config;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.context.annotation.Lazy;
  5. import org.springframework.scheduling.annotation.EnableAsync;
  6. import org.springframework.scheduling.annotation.EnableScheduling;
  7. import org.springframework.scheduling.annotation.SchedulingConfigurer;
  8. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  9. import org.springframework.scheduling.config.ScheduledTaskRegistrar;
  10. import java.util.concurrent.ExecutorService;
  11. import java.util.concurrent.Executors;
  12. @Configuration
  13. @EnableAsync
  14. @EnableScheduling
  15. public class AsyncConfig implements SchedulingConfigurer {
  16. //第一种线程池定义方式,可代替CachedThreadPool、FixedThreadPool、SingleThreadExecutor这三种
  17. // Spring线程池
  18. @Lazy //线程池懒加载
  19. @Bean(name="threadPoolTaskExecutor",destroyMethod="shutdown") //name为线程池名称,destroyMethod="shutdown"在spring bean回收后释放资源
  20. public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
  21. //封装的是原生的ThreadPoolExecutor类型线程池
  22. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  23. //核心线程数(获取硬件):线程池创建时候初始化的线程数
  24. int corePoolSize = Runtime.getRuntime().availableProcessors();
  25. System.out.println(corePoolSize);
  26. executor.setCorePoolSize(corePoolSize);
  27. //最大线程数+5:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
  28. executor.setMaxPoolSize(corePoolSize+5);
  29. //缓冲队列500:用来缓冲执行任务的队列
  30. executor.setQueueCapacity(500);
  31. //允许线程的空闲时间60秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
  32. executor.setKeepAliveSeconds(60);
  33. //线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
  34. executor.setThreadNamePrefix("MyAsync-");
  35. executor.initialize();
  36. return executor;
  37. }
  38. //第二种线程池定义方式,使用的是WorkStealingPool
  39. //java8 抢占式线程池
  40. @Lazy
  41. @Bean(name="workStealingPool",destroyMethod="shutdown")
  42. public ExecutorService workStealingPool(){
  43. ExecutorService executorService = Executors.newWorkStealingPool();
  44. return executorService;
  45. }
  46. //第三种线程池定义方式,为周期任务线程池
  47. //周期任务线程池
  48. @Lazy
  49. @Bean(name="scheduledThreadPool",destroyMethod="shutdown")
  50. public ExecutorService scheduledThreadPool() {
  51. return Executors.newScheduledThreadPool(3);
  52. }
  53. @Override
  54. public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
  55. scheduledTaskRegistrar.setScheduler(scheduledThreadPool());
  56. }
  57. }

我在上述案例代码中定义了三种类型的线程池

  1. 第一种是ThreadPoolTaskExecutor线程池,他是Spring中的 org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor 线程池,
    底层是对 java.util.concurrent.ThreadPoolExecutor 的封装,综合了CachedThreadPool、FixedThreadPool、SingleThreadExecutor这三种线程池的优点;
  2. 第二种是java8新增的 workStealingPool 线程池。第一种和第二种使用时可以在配置类上使用@EnableAsync注解,这样就能优雅的使用@Async注解方法来实现线程run逻辑了;
  3. 第三种是ScheduledThreadPool线程池,不过在Spring中使用需要配置类实现SchedulingConfigurer接口,重写configureTasks方法。在配置类上使用
    @EnableScheduling注解,就可以优雅的使用@Scheduled注解方法来实现周期逻辑了

使用线程池

对第一种和第二种线程池在service中实现线程run的逻辑

  1. package com.example.async_demo.service;
  2. import org.springframework.scheduling.annotation.Async;
  3. import org.springframework.scheduling.annotation.AsyncResult;
  4. import org.springframework.stereotype.Service;
  5. import java.util.concurrent.CompletableFuture;
  6. import java.util.concurrent.Future;
  7. @Service
  8. public class AsyncService {
  9. //使用名为threadPoolTaskExecutor的线程池,返回Future
  10. @Async("threadPoolTaskExecutor")
  11. public Future<Double> service1(){
  12. double result = getRand(3000);
  13. return AsyncResult.forValue(result);
  14. }
  15. //使用名为threadPoolTaskExecutor的线程池,返回CompletableFuture
  16. @Async("threadPoolTaskExecutor")
  17. public CompletableFuture<Double> service2(){
  18. double result = getRand(3000);
  19. return CompletableFuture.completedFuture(result);
  20. }
  21. //使用名为workStealingPool的线程池,返回CompletableFuture
  22. @Async("workStealingPool")
  23. public CompletableFuture<Double> service3(){
  24. double result = getRand(3000);
  25. return CompletableFuture.completedFuture(result);
  26. }
  27. private double getRand(long sleep){
  28. System.out.println(Thread.currentThread().getId()+"-start");
  29. try {
  30. Thread.sleep(sleep);
  31. } catch (InterruptedException e) {
  32. e.printStackTrace();
  33. }
  34. double result = Math.random();//方法返回的结果
  35. return result;
  36. }
  37. }

测试第一种和第二种线程池

  1. @SpringBootTest
  2. class AsyncDemoApplicationTests {
  3. @Autowired
  4. private AsyncService asyncService;
  5. @Test
  6. void test1() throws ExecutionException, InterruptedException {
  7. long start = System.currentTimeMillis();
  8. Future<Double> result1 = asyncService.service1();
  9. Future<Double> result2 = asyncService.service1();
  10. Future<Double> result3 = asyncService.service1();
  11. //让主线程等待子线程结束之后才能继续运行
  12. while (!(result1.isDone()&&result2.isDone()&&result3.isDone())){
  13. Thread.sleep(500);
  14. }
  15. long end = System.currentTimeMillis();
  16. System.out.println(end-start+"ms");
  17. System.out.println(result1.get());
  18. System.out.println(result2.get());
  19. System.out.println(result3.get());
  20. }
  21. @Test
  22. void test2() throws ExecutionException, InterruptedException {
  23. long start = System.currentTimeMillis();
  24. CompletableFuture<Double> result1 = asyncService.service2();
  25. CompletableFuture<Double> result2 = asyncService.service2();
  26. CompletableFuture<Double> result3 = asyncService.service2();
  27. //join() 的作用:让主线程等待子线程结束之后才能继续运行
  28. CompletableFuture.allOf(result1,result2,result3).join();
  29. long end = System.currentTimeMillis();
  30. System.out.println(end-start+"ms");
  31. System.out.println(result1.get());
  32. System.out.println(result2.get());
  33. System.out.println(result3.get());
  34. }
  35. @Test
  36. void test3() throws ExecutionException, InterruptedException {
  37. long start = System.currentTimeMillis();
  38. CompletableFuture<Double> result1 = asyncService.service3();
  39. CompletableFuture<Double> result2 = asyncService.service3();
  40. CompletableFuture<Double> result3 = asyncService.service3();
  41. //join() 的作用:让主线程等待子线程结束之后才能继续运行
  42. CompletableFuture.allOf(result1,result2,result3).join();
  43. long end = System.currentTimeMillis();
  44. System.out.println(end-start+"ms");
  45. System.out.println(result1.get());
  46. System.out.println(result2.get());
  47. System.out.println(result3.get());
  48. }
  49. }

test1测试结果

test1

test2测试结果

test1

test3测试结果

test1

通过测试发现Future返回类型不适合主线等待多个子线程全部完成的操作,
因为需要用到while循环去阻塞主线程,而CompletableFuture可以通过CompletableFuture.allOf(cf1,cf2,cf3).join()
去完成这个操作,所以推荐使用CompletableFuture作为返回类型

注意:@Async注解的方法不能在本类中被调用,只能在其他类中调用,如Controller类

对第三种线程池在service中实现线程的逻辑

  1. package com.example.async_demo.service;
  2. import org.springframework.scheduling.annotation.Scheduled;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class AsyncService {
  6. //cron表达式 每5秒执行一次
  7. //@Scheduled(cron = "*/5 * * * * ?")
  8. @Scheduled(cron = "${cron.sec5}") //表达式写在application.yml文件中,则以这种方式取出。
  9. public void service4(){
  10. System.out.println("5s-"+Thread.currentThread().getId()+":"+System.currentTimeMillis()/1000);
  11. }
  12. //cron表达式 每3秒执行一次
  13. @Scheduled(cron = "${cron.sec3}")
  14. public void service5(){
  15. System.out.println("3s-"+Thread.currentThread().getId()+":"+System.currentTimeMillis()/1000);
  16. }
  17. }

application.yml 文件

  1. cron:
  2. sec5: '*/5 * * * * ?'
  3. sec3: '*/3 * * * * ?'

周期任务测试结果(启动Application类)

test1

通过测试结果可发现两个周期任务使用了三个线程,
线程id分别是20、21、25。两个周期任务分别以3s和5s执行一次,
但不固定在某个线程中执行,而是哪个线程空闲则使用哪个线程

注意:若不为周期任务配置线程池,只使用@EnableScheduling和@Scheduled注解的话,
则所有周期任务共用一个子线程,若出现下一个周期开始上一个周期任务还没结束的情况,
则线程阻塞,直到前一个任务完成

CRON表达式

  • cron表达式是定义任务周期的一种表达式
  • 这里不多介绍,可以参考这篇 博客
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/52489
推荐阅读
相关标签
  

闽ICP备14008679号