当前位置:   article > 正文

线程池详解+springboot整合线程池(超级详细简洁代码可直接执行)

springboot整合线程池

一、概念

数据库连接池的原理类似,线程池就是将多个线程对象放入一个池子里面,之后从该池子中获取、实用和回收线程。有两点需要明确。1. 每一个线程,在一段时间内只能执行一个任务。2. 线程池中的各个线程是可以重复使用的。

二、线程池的创建方式

  1. Executors.newSingleThreadExecutor() 创建只有一个线程的线程池。其底层源码如下:

    1. public static ExecutorService newSingleThreadExecutor() {
    2.        return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
    3.   }

    其中可以发现,线程只有一个,但是等待队列是Integer.MAX_VALUE大的。

  2. Executors.newCachedThreadPool() 创建可以放多个线程的线程池。其底层源码如下:

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
  3. }

        其中可以发现,线程有Integer.MAX_VALUE个,其中SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列。

    3. Executors.newFixedThreadPool(5) 创建固定线程数的线程池。其底层源码如下:

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
  3. }

其中可以发现,线程数是指定的,等待队列是Integer.MAX_VALUE大的。

  4. Executors.newSingleThreadScheduledExecutor() 创建一个可用于任务调度的线程池。其底层源码如下:

  1. public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
  2.    return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
  3. }
  4. public ScheduledThreadPoolExecutor(int corePoolSize) {
  5.    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
  6. }

其中可以发现核心线程数为1,但是最大线程数为Integer.MAX_VALUE,等待队列是一个延迟无界队列

5. Executors.newScheduledThreadPool(5) 创建一个可用于任务调度的线程池,并且线程是有多个的。

  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  2.    return new ScheduledThreadPoolExecutor(corePoolSize);
  3. }
  4. public ScheduledThreadPoolExecutor(int corePoolSize) {
  5.    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
  6. }

可以发现,除了核心线程数跟newSingleThreadScheduledExecutor()不一样,其他的都一样。

特别说明:

        阿里巴巴编程规范中特意强调,禁止使用上面五种方式创建线程池。其实我们也可以从中发现,以上线程池中,要么是最大线程数没有限制,要么是其中的等待队列的大小没有限制(Integer.MAX_VALUE可以被认为是没有限制),那么就有可能将内存撑爆,导致系统崩溃。所以工作中一定不要用上面的方式创建数据库。

那么应该怎样创建数据库呢?

创建数据库的正确姿势如下:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,2,3, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10),new ThreadPoolExecutor.AbortPolicy());

其实就是上面那些创建线程池有系统自动赋值的参数,改成我们手动赋值

  1. public ThreadPoolExecutor(int corePoolSize,
  2.                              int maximumPoolSize,
  3.                              long keepAliveTime,
  4.                              TimeUnit unit,
  5.                              BlockingQueue<Runnable> workQueue,
  6.                              RejectedExecutionHandler handler) {
  7.        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  8.             Executors.defaultThreadFactory(), handler);
  9.   }

线程池的参数说明:

  1. corePoolSize:核心线程数

  2. maximumPoolSize:最大线程数

  3. keepAliveTime:最大空闲时间

  4. unit:最大空闲时间单位

  5. workQueue:任务队列

  6. handler:拒绝策略,有以下四种

    (1)ThreadPoolExecutor.AbortPolicy 丢弃任务,并抛出 RejectedExecutionException 异常。

    (2)ThreadPoolExecutor.CallerRunsPolicy:该任务被线程池拒绝,由调用 execute方法的线程执行该任务。

    (3)ThreadPoolExecutor.DiscardOldestPolicy : 抛弃队列最前面的任务,然后重新尝试执行任务。

    (4)ThreadPoolExecutor.DiscardPolicy,丢弃任务,不过也不抛出异常。

    也可以自己实现RejectedExecutionHandler接口来自定义拒绝策略

下面附上SpringBoot整合线程的代码:

代码的结构图,方便读者查看

 

  1. @RestController
  2. public class ThreadController {
  3.    @Autowired
  4.    private AsyncService asyncService;
  5.    @Autowired
  6.    private ThreadPoolTaskExecutor threadPool;
  7.    @RequestMapping("test")
  8.    public void testAsync() throws InterruptedException {
  9.        System.out.println("testAsync 执行开始");
  10.        TimeUnit.SECONDS.sleep(2);
  11.        asyncService.executeAsync();
  12.        System.out.println("testAsync 执行完毕");
  13.   }
  14.    @RequestMapping("testThreadPool")
  15.    public void testThreadPool() throws InterruptedException {
  16.        System.out.println("testThreadPool start");
  17.        TimeUnit.SECONDS.sleep(2);
  18.        threadPool.execute(() -> System.out.println("threadPool testThreadPool"));
  19.        System.out.println("testThreadPool end");
  20.   }
  21. }
  1. public interface AsyncService {
  2.    /**
  3.     * 执行异步任务
  4.     */
  5.    void executeAsync();
  6. }
  1. @Service
  2. public class AsyncServiceImpl implements AsyncService {
  3.    private static final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class);
  4.    @Async("asyncServiceExecutor")
  5. //   @Resource(name = "asyncServiceExecutor")
  6.    @Override
  7.    public void executeAsync() {
  8.        logger.info("start executeAsync");
  9.        System.out.println("start executeAsync");
  10.        System.out.println("当前运行的线程名称:" + Thread.currentThread().getName());
  11.        logger.info("end executeAsync");
  12.        System.out.println("end executeAsync");
  13.   }
  14. }
  1. @Configuration
  2. @EnableAsync
  3. @Data
  4. public class ExecutorConfig {
  5.    @Value("${async.executor.thread.core_pool_size}")
  6.    private int corePoolSize;
  7.    @Value("${async.executor.thread.max_pool_size}")
  8.    private int maxPoolSize;
  9.    @Value("${async.executor.thread.queue_capacity}")
  10.    private int queueCapacity;
  11.    @Value("${async.executor.thread.name.prefix}")
  12.    private String namePrefix;
  13.    @Bean(name = "asyncServiceExecutor")
  14.    public ThreadPoolTaskExecutor asyncServiceExecutor(){
  15.        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
  16.        threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
  17.        threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
  18.        threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
  19.        threadPoolTaskExecutor.setKeepAliveSeconds(3);
  20.        threadPoolTaskExecutor.setThreadNamePrefix(namePrefix);
  21.        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
  22.        // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
  23.        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
  24.        //加载
  25.        threadPoolTaskExecutor.initialize();
  26.        return threadPoolTaskExecutor;
  27.   }
  28. }

application.yml配置

  1. async:
  2. executor:
  3.   thread:
  4.      # 配置核心线程数
  5.     core_pool_size: 10
  6.      # 配置最大线程数
  7.     max_pool_size: 20
  8.      # 配置队列大小
  9.     queue_capacity: 999
  10.     name:
  11.       prefix: async-service-

以上就是springboot整合线程池的代码,读者可以直接拷贝下来运行。这里整合Controller里面有两种用法,一种是用service的实现方法去异步执行;另外一种是直接将ThreadPoolTaskExecutor注入到Controller中来调用执行。读者可以自行体会。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/52428
推荐阅读
相关标签
  

闽ICP备14008679号