当前位置:   article > 正文

springboot之多任务并行+线程池处理_springboot 多线程并发

springboot 多线程并发

最近项目中做到一个关于批量发短信的业务,如果用户量特别大的话,不能使用单线程去发短信,只能尝试着使用多任务来完成!我们的项目使用到了方式二,即Future的方案

image

Java 线程池

  1. Java通过Executors提供四种线程池,分别为:
  2. newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
  3. newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  4. newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
  5. newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
  6. 优点
  7. 重用存在的线程,减少对象创建、消亡的开销,性能佳。
  8. 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
  9. 提供定时执行、定期执行、单线程、并发数控制等功能。

方式一(CountDownLatch)

  1. public class StatsDemo {
  2. final static SimpleDateFormat sdf = new SimpleDateFormat(
  3. "yyyy-MM-dd HH:mm:ss");
  4. final static String startTime = sdf.format(new Date());
  5. /**
  6. * IO密集型任务 = 一般为2*CPU核心数(常出现于线程中:数据库数据交互、文件上传下载、网络数据传输等等)
  7. * CPU密集型任务 = 一般为CPU核心数+1(常出现于线程中:复杂算法)
  8. * 混合型任务 = 视机器配置和复杂度自测而定
  9. */
  10. private static int corePoolSize = Runtime.getRuntime().availableProcessors();
  11. /**
  12. * public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
  13. * TimeUnit unit,BlockingQueue<Runnable> workQueue)
  14. * corePoolSize用于指定核心线程数量
  15. * maximumPoolSize指定最大线程数
  16. * keepAliveTime和TimeUnit指定线程空闲后的最大存活时间
  17. * workQueue则是线程池的缓冲队列,还未执行的线程会在队列中等待
  18. * 监控队列长度,确保队列有界
  19. * 不当的线程池大小会使得处理速度变慢,稳定性下降,并且导致内存泄露。如果配置的线程过少,则队列会持续变大,消耗过多内存。
  20. * 而过多的线程又会 由于频繁的上下文切换导致整个系统的速度变缓——殊途而同归。队列的长度至关重要,它必须得是有界的,这样如果线程池不堪重负了它可以暂时拒绝掉新的请求。
  21. * ExecutorService 默认的实现是一个无界的 LinkedBlockingQueue。
  22. */
  23. private static ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, corePoolSize+1, 10l, TimeUnit.SECONDS,
  24. new LinkedBlockingQueue<Runnable>(1000));
  25. public static void main(String[] args) throws InterruptedException {
  26. CountDownLatch latch = new CountDownLatch(5);
  27. //使用execute方法
  28. executor.execute(new Stats("任务A", 1000, latch));
  29. executor.execute(new Stats("任务B", 1000, latch));
  30. executor.execute(new Stats("任务C", 1000, latch));
  31. executor.execute(new Stats("任务D", 1000, latch));
  32. executor.execute(new Stats("任务E", 1000, latch));
  33. latch.await();// 等待所有人任务结束
  34. System.out.println("所有的统计任务执行完成:" + sdf.format(new Date()));
  35. }
  36. static class Stats implements Runnable {
  37. String statsName;
  38. int runTime;
  39. CountDownLatch latch;
  40. public Stats(String statsName, int runTime, CountDownLatch latch) {
  41. this.statsName = statsName;
  42. this.runTime = runTime;
  43. this.latch = latch;
  44. }
  45. public void run() {
  46. try {
  47. System.out.println(statsName+ " do stats begin at "+ startTime);
  48. //模拟任务执行时间
  49. Thread.sleep(runTime);
  50. System.out.println(statsName + " do stats complete at "+ sdf.format(new Date()));
  51. latch.countDown();//单次任务结束,计数器减一
  52. } catch (InterruptedException e) {
  53. e.printStackTrace();
  54. }
  55. }
  56. }
  57. }

结果

方式二(Future)

重点是和springboot整合,采用注解bean方式生成ThreadPoolTaskExecutor

@Bean

  1. //spring依赖包
  2. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  3. @Configuration
  4. public class GlobalConfig {
  5. /**
  6. * 默认线程池线程池
  7. *
  8. * @return Executor
  9. */
  10. @Bean
  11. public ThreadPoolTaskExecutor defaultThreadPool() {
  12. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  13. //核心线程数目
  14. executor.setCorePoolSize(16);
  15. //指定最大线程数
  16. executor.setMaxPoolSize(64);
  17. //队列中最大的数目
  18. executor.setQueueCapacity(16);
  19. //线程名称前缀
  20. executor.setThreadNamePrefix("defaultThreadPool_");
  21. //rejection-policy:当pool已经达到max size的时候,如何处理新任务
  22. //CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
  23. //对拒绝task的处理策略
  24. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  25. //线程空闲后的最大存活时间
  26. executor.setKeepAliveSeconds(60);
  27. //加载
  28. executor.initialize();
  29. return executor;
  30. }
  31. }

使用

  1. //通过注解引入配置
  2. @Resource(name = "defaultThreadPool")
  3. private ThreadPoolTaskExecutor executor;
  1. //使用Future方式执行多任务
  2. //生成一个集合
  3. List<Future> futures = new ArrayList<>();
  4. //获取后台全部有效运营人员的集合
  5. List<AdminUserMsgResponse> adminUserDOList = adminManagerService.GetUserToSentMsg(null);
  6. for (AdminUserMsgResponse response : adminUserDOList) {
  7. //并发处理
  8. if (response.getMobile() != null) {
  9. Future<?> future = executor.submit(() -> {
  10. //发送短信
  11. mobileMessageFacade.sendCustomerMessage(response.getMobile(), msgConfigById.getContent());
  12. });
  13. futures.add(future);
  14. }
  15. }
  16. //查询任务执行的结果
  17. for (Future<?> future : futureList) {
  18. while (true) {//CPU高速轮询:每个future都并发轮循,判断完成状态然后获取结果,这一行,是本实现方案的精髓所在。即有10个future在高速轮询,完成一个future的获取结果,就关闭一个轮询
  19. if (future.isDone()&& !future.isCancelled()) {//获取future成功完成状态,如果想要限制每个任务的超时时间,取消本行的状态判断+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超时异常使用即可。
  20. Integer i = future.get();//获取结果
  21. System.out.println("任务i="+i+"获取完成!"+new Date());
  22. list.add(i);
  23. break;//当前future获取结果完毕,跳出while
  24. } else {
  25. Thread.sleep(1);//每次轮询休息1毫秒(CPU纳秒级),避免CPU高速轮循耗空CPU---》新手别忘记这个
  26. }
  27. }
  28. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/52472
推荐阅读
相关标签
  

闽ICP备14008679号