赞
踩
1.1 在线程池的实践中,我们一般要通过ThreadPoolExecutor的构造函数来声明线程池,避免使用Executors类创建线程池,否则会有OOM风险,原因如下:
FixedThreadPool 和 SingleThreadExecutor : 使用的是无界的 LinkedBlockingQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。CachedThreadPool :使用的是同步队列 SynchronousQueue, 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。**ScheduledThreadPool 和 SingleThreadScheduledExecutor ** : 使用的无界的延迟阻塞队列DelayedWorkQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
1.2 不同业务用不同的线程池 。配置线程池的时候要根据当前业务的情况对当前线程池进行配置,因为不同的业务的并发以及对资源的使用情况都不同,而且当多个业务使用同一个线程池时,可能会因为线程池使用不当导致死锁
1.3 监测线程池运行状态
1.4 给线程池命名
1.5 正确配置线程池参数。公式如下:
CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1。比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
具体的线程池使用事项可参考下列文章: Java线程池最佳实践
创建一个maven项目,引入下列依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <artifactId>spring-boot-starter-parent</artifactId> <groupId>org.springframework.boot</groupId> <version>2.7.0</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> <groupId>org.example</groupId> <artifactId>threadPool01</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> </project>
项目目录结构如下:

ThreadPoolConfig.class
package com.young.config; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @Configuration public class ThreadPoolConfig { //在某个项目中看到的一个线程池创建方式 @Bean @Qualifier(value = "threadPoolExecutor") public ThreadPoolExecutor threadPoolExecutor(){ return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), (int)(Runtime.getRuntime().availableProcessors()/(1-0.9)), 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(Runtime.getRuntime().availableProcessors()), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); } //这里设置核心和最大线程数都是2,方便后面的测试 @Bean @Qualifier(value = "commonThreadPoolExecutor") public ThreadPoolExecutor commonThreadPoolExecutor(){ return new ThreadPoolExecutor(2,2, 60,TimeUnit.SECONDS, new LinkedBlockingDeque<>(Runtime.getRuntime().availableProcessors()),Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); } //检查业务对应的线程池 @Bean @Qualifier(value = "checkThreadPoolExecutor") public ThreadPoolExecutor checkThreadPoolExecutor(){ return new ThreadPoolExecutor(2,2, 60,TimeUnit.SECONDS, new LinkedBlockingDeque<>(Runtime.getRuntime().availableProcessors()),Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); } //支付业务对应的线程池 @Bean @Qualifier(value = "payThreadPoolExecutor") public ThreadPoolExecutor payThreadPoolExecutor(){ return new ThreadPoolExecutor(2,2, 60,TimeUnit.SECONDS, new LinkedBlockingDeque<>(Runtime.getRuntime().availableProcessors()),Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); } //通知业务对应的线程池 @Bean @Qualifier(value = "noticeThreadPoolExecutor") public ThreadPoolExecutor noticeThreadPoolExecutor(){ return new ThreadPoolExecutor(2,2, 60,TimeUnit.SECONDS, new LinkedBlockingDeque<>(Runtime.getRuntime().availableProcessors()),Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); } //用来模拟线程池使用不当导致死锁的情况 @Bean @Qualifier(value = "deadThreadPoolExecutor") public ThreadPoolExecutor deadThreadPoolExecutor(){ return new ThreadPoolExecutor(1,1, 60,TimeUnit.SECONDS, new LinkedBlockingDeque<>(Runtime.getRuntime().availableProcessors()),Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); } }
ThreadPool01App.class
@SpringBootApplication
public class ThreadPool01App {
public static void main(String[] args) {
SpringApplication.run(ThreadPool01App.class,args);
}
}
TestController.java
package com.young.controller; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; @RestController public class TestController { @Resource @Qualifier(value = "commonThreadPoolExecutor") private ThreadPoolExecutor commonThreadPoolExecutor; @Resource @Qualifier(value = "checkThreadPoolExecutor") private ThreadPoolExecutor checkThreadPoolExecutor; @Resource @Qualifier(value = "payThreadPoolExecutor") private ThreadPoolExecutor payThreadPoolExecutor; @Resource @Qualifier(value = "noticeThreadPoolExecutor") private ThreadPoolExecutor noticeThreadPoolExecutor; @Resource @Qualifier(value = "deadThreadPoolExecutor") private ThreadPoolExecutor deadThreadPoolExecutor; //所有业务使用同一个线程池 @GetMapping("/onePool") public String onePool(){ //因为每个方法耗时2秒,而线程池中最多只能有两个线程,所有总共耗时4秒 long start = System.currentTimeMillis(); Future<Integer> checkSubmit = commonThreadPoolExecutor.submit(() -> { return check(); }); Future<Integer> paySubmit = commonThreadPoolExecutor.submit(() -> { return pay(); }); Future<Integer> noticeSubmit = commonThreadPoolExecutor.submit(() -> { return notice(); }); try{ int sum=checkSubmit.get()+paySubmit.get()+noticeSubmit.get(); System.out.println(sum); }catch (Exception e){ e.printStackTrace(); } long end=System.currentTimeMillis(); System.out.println("onePool用时:"+(end-start)); return "success"; } //不同业务使用对应的线程池 @GetMapping("/multiPool") public String multiPool(){ //每个方法耗时2秒,不过使用的是不同的线程池池,因此最终用时2秒 long start = System.currentTimeMillis(); Future<Integer> checkSubmit = checkThreadPoolExecutor.submit(() -> { return check(); }); Future<Integer> paySubmit = payThreadPoolExecutor.submit(() -> { return pay(); }); Future<Integer> noticeSubmit = noticeThreadPoolExecutor.submit(() -> { return notice(); }); try{ int sum=checkSubmit.get()+paySubmit.get()+noticeSubmit.get(); System.out.println(sum); }catch (Exception e){ e.printStackTrace(); } long end=System.currentTimeMillis(); System.out.println("multiPool用时:"+(end-start)); return "success"; } //模拟不同业务使用同一个线程池时可能出现的死锁现象 @GetMapping("/dead") public String deadPool(){ long start = System.currentTimeMillis(); Future<Integer> submit = deadThreadPoolExecutor.submit(() -> { //线程池中最大线程数为1,这个线程用于处理如下逻辑 //但是在deadNotice()方法中,也有用到同一个线程池,但此时该线程池没有线程池可用,因此deadNotice()迟迟无法结束,进而导致匿名函数无法结束让出线程,从而出现死锁 int notice = deadNotice(); return notice + pay(); }); try { System.out.println(submit.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } long end=System.currentTimeMillis(); System.out.println("deadPool用时:"+(end-start)); return "success"; } public int deadNotice() throws ExecutionException, InterruptedException { return deadThreadPoolExecutor.submit(() -> { return notice(); }).get(); } //支付前检查 public int check(){ try { System.out.println("检查余额,选择支付方式============="); Thread.sleep(2000); return 1; } catch (InterruptedException e) { e.printStackTrace(); return 0; } } //支付 public int pay(){ try{ System.out.println("进行支付==================="); Thread.sleep(2000); return 1; }catch (InterruptedException e){ e.printStackTrace(); return 0; } } //发送支付消息 public int notice(){ try{ System.out.println("发送通知================"); Thread.sleep(2000); return 1; }catch (InterruptedException e){ e.printStackTrace(); return 0; } } }
application.yml
server:
port: 8001
运行项目,访问对应的方法,结果如下:




创建maven项目,导入下列依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <artifactId>spring-boot-starter-parent</artifactId> <groupId>org.springframework.boot</groupId> <version>2.7.0</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.4.3</version> </dependency> </dependencies> <groupId>com.young</groupId> <artifactId>threadPool03</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> </project>
数据库中,创建m_log表,用来保存日志

Mybatis-Plus的IService提供了批量插入数据的功能,但它的实现却是下面这样的:

不过,因为Mybatis-plus能兼容mybatis,因此我们创建对应的mapper.xml文件,用来实现批量插入
Log实体类:
package com.young.entity; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data; import java.io.Serializable; import java.time.LocalDateTime; @Data @TableName(value = "m_log") public class Log implements Serializable { @TableId(type = IdType.ASSIGN_ID) private String id; private String msg; private String url; private String type; private LocalDateTime createTime; }
LogMapper.java
package com.young.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.young.entity.Log;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
@Mapper
public interface LogMapper extends BaseMapper<Log> {
public int batchInsert(List<Log>list);
}
对应的mapper.xml文件如下:
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.young.mapper.LogMapper"> <insert id="batchInsert" parameterType="java.util.List"> <!-- <selectKey resultType="string" order="AFTER" keyProperty="id">--> <!-- select uuid()--> <!-- </selectKey>--> insert into m_log (id,msg,url,type,create_time) values <foreach collection="list" item="item" index="index" separator=","> ( #{item.id} ,#{item.msg},#{item.url},#{item.type},#{item.createTime} ) </foreach> </insert> </mapper>
LogService.java
package com.young.service; import com.young.entity.Log; import com.young.mapper.LogMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; @Service public class LogService { @Autowired private LogMapper logMapper; public void batchInsert(List<Log>list){ logMapper.batchInsert(list); } }
LogConstant的常量类
package com.young.constants;
public class LogConstant {
public final static String INFO="info";
public final static String DEBUG="debug";
public final static String ERROR="error";
public final static String WARN="warn";
}
线程池配置如下。实际项目中,不同业务,要配置不同的线程池,因此我这里用到@Qualifier注解,来区分不同业务的线程池。
package com.young.config; import com.young.service.LogService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.sound.midi.MidiSystem; import java.util.concurrent.*; @Configuration public class BatchInsertLogPoolConfig { //日志队列的最大容量 private int MAX_QUEUE_SIZE=100; //线程睡眠时间,具体时间需要结合项目实际情况,单位毫秒 private int SLEEP_TIME=500; //创建一个单线程的线程池 @Bean @Qualifier(value = "batchInsertLogPool") public ThreadPoolExecutor batchInsertLogPool(){ // return new ThreadPoolExecutor(1,0,500,TimeUnit.MILLISECONDS, // new LinkedBlockingDeque<>(Runtime.getRuntime().availableProcessors()) // ); return new ThreadPoolExecutor(2,2,60,TimeUnit.SECONDS, new LinkedBlockingDeque<>(Runtime.getRuntime().availableProcessors()), Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy()); } }
这里还有一个坑,就是线程池的coreSize设置为1,maxSize设置为0的时候,会报错,报错截图如下:

这个坑也不清楚是什么原因,我这里是仿照Executors.newFixedThreadPoll中的设置来实现的,设置一个单线程的线程池,但是没有成功,最后把coreSize和maxSize都改成了2
用于批量插入日志的线程
package com.young.logpool; import com.young.entity.Log; import com.young.service.LogService; import java.util.List; //用于批量插入日志的线程 public class BatchInsertLogThread implements Runnable{ private List<Log>logList; private LogService logService; public BatchInsertLogThread(List<Log>logList,LogService logService){ this.logList=logList; this.logService=logService; System.out.println("new BatchInsertLogThread==========="+logList.size()); } @Override public void run() { System.out.println("批量插入======================"); this.logService.batchInsert(logList); } }
批量插入日志的线程管理类,这里的BATCH_SIZE等参数,可以改一下,我这里是为了方便看结果才设置得比较小
package com.young.logpool; import com.young.entity.Log; import com.young.service.LogService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @Component @Slf4j public class BatchInsertLogThreadManager { //日志批量插入的数量,10条日志 private int BATCH_SIZE=5; //日志插入执行的最大时间间隔,单位毫秒 private long MAX_EXE_TIME=10000; //日志队列的最大容量 private int MAX_QUEUE_SIZE=100; //线程睡眠时间,具体时间需要结合项目实际情况,单位毫秒 private int SLEEP_TIME=500; @Autowired @Qualifier(value = "batchInsertLogPool") private ThreadPoolExecutor batchInsertLogPool; @Autowired private LogService logService; //任务队列,存放日志内容 private BlockingQueue<Log>queue=new LinkedBlockingDeque<>(MAX_QUEUE_SIZE); //原子变量,用来判断是否循环 private AtomicBoolean run=new AtomicBoolean(true); //记录任务队列中的任务数量 private AtomicInteger logCount=new AtomicInteger(0); //上次执行日志插入时的时间 private long lastExecuteTime; @PostConstruct public void init(){ log.info("init--------------------"); lastExecuteTime=System.currentTimeMillis(); batchInsertLogPool.execute(new Runnable() { @Override public void run() { while (run.get()){ try{ //线程休眠,具体时间根据项目的实际情况配置 Thread.sleep(SLEEP_TIME); }catch (InterruptedException e){ } //满足放入10个日志 if (logCount.get()>=BATCH_SIZE||(System.currentTimeMillis()-lastExecuteTime)>MAX_EXE_TIME){ log.info("满足要求了===================,queue.size():{},isEmpty:{}",queue.size(),queue.isEmpty()); if (logCount.get()>0){ List<Log>list=new ArrayList<>(); /** * drainTo (): 一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数), * 通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。 * 将取出的数据放入指定的list集合中 */ queue.drainTo(list); //任务队列中任务数量置为0 logCount.set(0); //从线程池中取出线程执行日志插入 batchInsertLogPool.execute(new BatchInsertLogThread(list,logService)); } //获取当前执行的时间 lastExecuteTime=System.currentTimeMillis(); } } } }); } /** * 将日志放入队列中 */ public boolean addLog(Log log)throws Exception{ if (logCount.get()>=MAX_QUEUE_SIZE){ //当队列满时,直接将日志丢弃 return false; } //将日志放入任务队列中 this.queue.offer(log); //队列中的任务数量+1 logCount.incrementAndGet(); return true; } /** * 关闭线程池 */ public void shutdown(){ //结束while循环 run.set(false); //关闭线程池 batchInsertLogPool.shutdown(); batchInsertLogPool.shutdownNow(); } }
DemoController.java
package com.young.controller; import com.young.constants.LogConstant; import com.young.entity.Log; import com.young.logpool.BatchInsertLogThreadManager; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import javax.servlet.http.HttpServletRequest; import java.time.LocalDateTime; @RestController @RequestMapping("/log") public class DemoController { @Resource private BatchInsertLogThreadManager batchInsertLogThreadManager; @PostMapping public String testLog(HttpServletRequest request) { Log log=new Log(); log.setMsg("hello world"+System.currentTimeMillis()); log.setCreateTime(LocalDateTime.now()); log.setUrl(request.getRequestURL().toString()); log.setType(LogConstant.DEBUG); try { batchInsertLogThreadManager.addLog(log); } catch (Exception e) { e.printStackTrace(); } return "success"; } } application.yml ```yml server: port: 8089 spring: datasource: username: root password: 123456 driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/young?useSSL=false&serverTimezone=UTC mybatis-plus: global-config: db-config: logic-delete-value: 1 logic-not-delete-value: 0 mapper-locations: classpath:mapper/*.xml
运行项目,连续发送8个请求,结果如下:
当发送第6个请求后,执行一次批量操作任务,再过5秒后,执行第二次批量插入任务,执行批量插入任务前,先判断队列中的任务数量,不为0时才执行任务

查看数据库

关于线程池的更具体的使用,可以参考下面这个项目:
springboot_log
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。