赞
踩
ThreadPoolExecutor 有四个构造方法,前三个都是调用最后一个(最后一个参数最全)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } // 都调用它 public ThreadPoolExecutor(// 核心线程数 int corePoolSize, // 最大线程数 int maximumPoolSize, // 闲置线程存活时间 long keepAliveTime, // 时间单位 TimeUnit unit, // 线程队列 BlockingQueue<Runnable> workQueue, // 线程工厂 ThreadFactory threadFactory, // 队列已满,而且当前线程数已经超过最大线程数时的异常处理策略 RejectedExecutionHandler handler ) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
这个方法接收一个Runnable实例,并且异步的执行
executorService.execute(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});
executorService.shutdown();
submit(Runnable)和execute(Runnable)区别是前者可以返回一个Future对象,通过返回的Future对象,我们可以检查提交的任务是否执行完毕,请看下面执行的例子:
Future future = executorService.submit(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});
future.get(); //returns null if the task has finished correctly.
submit(Callable)和submit(Runnable)类似,也会返回一个Future对象,但是除此之外,submit(Callable)接收的是一个Callable的实现,Callable接口中的call()方法有一个返回值,可以返回任务的执行结果,而Runnable接口中的run()方法是void的,没有返回值。请看下面实例:
Future future = executorService.submit(new Callable(){
public Object call() throws Exception {
System.out.println("Asynchronous Callable");
return "Callable Result";
}
});
System.out.println("future.get() = " + future.get());
如果任务执行完成,future.get()方法会返回Callable任务的执行结果。注意,future.get()方法会产生阻塞。
invokeAny(…)方法接收的是一个Callable的集合,执行这个方法不会返回Future,但是会返回所有Callable任务中其中一个任务的执行结果。这个方法也无法保证返回的是哪个任务的执行结果,反正是其中的某一个。
ExecutorService executorService = Executors.newSingleThreadExecutor(); Set<Callable<String>> callables = new HashSet<Callable<String>>(); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 1"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 2"; } }); callables.add(new Callable<String>() { public String call() throws Exception { return "Task 3"; } }); String result = executorService.invokeAny(callables); System.out.println("result = " + result); executorService.shutdown();
invokeAll(…)与 invokeAny(…)类似也是接收一个Callable集合,但是前者执行之后会返回一个Future的List,其中对应着每个Callable任务执行后的Future对象。
List<Future<String>> futures = executorService.invokeAll(callables);
for(Future<String> future : futures){
System.out.println("future.get = " + future.get());
}
executorService.shutdown();
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 数据收集配置,主要作用在于Spring启动时自动加载一个ExecutorService对象. * @author Bruce * @date 2017/2/22 * * update by Cliff at 2027/11/03 */ @Configuration public class ThreadPoolConfig { @Bean public ExecutorService getThreadPool(){ return Executors.newFixedThreadPool(); } }
在@service 中注入 ExecutorService 然后就可以直接用了。
@Autowired
private ExecutorService executorService;
public void test(){
executorService.execute(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});
}
配置一个线程池的bean,参数设置模拟生产环境。
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.*; /** * @author: yong_jiang@suishouji.com * @create: 2018-10-15 17:15 * @desc: springboot线程池配置类 **/ @Configuration @EnableAsync public class ExecutorConfiguration { private static final Logger logger= LoggerFactory.getLogger(ExecutorConfiguration.class); @Bean public ThreadPoolTaskExecutor asyncServiceExecutor(){ logger.info("start asyncServiceExecutor"); ThreadPoolTaskExecutor executor=new ThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(10); //配置最大线程数 executor.setMaxPoolSize(200); //线程池维护线程所允许的空闲时间 executor.setKeepAliveSeconds(5); //配置队列大小 executor.setQueueCapacity(500); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //执行初始化 executor.initialize(); return executor; } @Bean public ScheduledThreadPoolExecutor asyncScheduledThreadPoolExecutor(){ logger.info("start asyncScheduledThreadPoolExecutor"); ScheduledThreadPoolExecutor executor=new ScheduledThreadPoolExecutor(10); executor.setMaximumPoolSize(200); executor.setKeepAliveTime(5, TimeUnit.SECONDS); return executor; } }
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @author: yong_jiang@suishouji.com * @create: 2018-10-15 17:31 * @desc: 线程池在spring boot中的几种使用方式 **/ @Service public class ThreadoolTaskExecutorService { @Autowired private ThreadPoolTaskExecutor asyncServiceExecutor; @Autowired private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; private static final Logger logger= LoggerFactory.getLogger(ThreadoolTaskExecutorService.class); public void executeAsync() { logger.info("start executeAsync"); try{ System.out.println(asyncServiceExecutor); asyncServiceExecutor.execute(new Runnable() { @Override public void run() { try { Thread.sleep(100000); logger.info("end executeAsync"); } catch (InterruptedException e) { e.printStackTrace(); } } }); }catch(Exception e){ e.printStackTrace(); } } @Async("asyncServiceExecutor") public void executeAsyncAnotation() { logger.info("start executeAsync"); try { Thread.sleep(100000); } catch (InterruptedException e) { e.printStackTrace(); } logger.info("end executeAsync"); } public void scheduledThreadPool() { try { System.out.println(scheduledThreadPoolExecutor); logger.info("start scheduledThreadPool"); scheduledThreadPoolExecutor.schedule(new Runnable() { @Override public void run() { try { logger.info("start executeAsync"); Thread.sleep(100000); logger.info("end executeAsync"); } catch (InterruptedException e) { e.printStackTrace(); } } },50000, TimeUnit.MILLISECONDS); } catch (Exception e) { e.printStackTrace(); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。