赞
踩
ThreadPoolTaskExecutor有两种提交方法execute和submit:
无返回值的任务使用public void execute(Runnable command) 方法提交;
有返回值的任务使用public <T> Future<T> submit(Callable) 方法提交
。
下面具体来看下两者的应用以及区别。
一、与主线程执行顺序的区别:
1、(1)public void execute(Runnable command) 方法提交,子线程可能在主线程结束之后结束;
举例:
- @RequestMapping("/execute")
- public String execute(){
- System.out.println("进入方法");
- threadPoolTaskExecutor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(20000);
- System.out.println("sleep后");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- System.out.println("执行提交后");
- return "aa";
- }

请求后打印:
- 进入方法
- 执行提交后
- sleep后
可见,由于子线程比较耗时,主线程结束后子线程还没有执行完。
(2)public <T> Future<T> submit(Callable) 方法提交,因为提交任务后有个取数据的过程,在从
Future取数据的过程中,Callable自带的阻塞机制,这个机制保证主线程一定在子线程结束之后结束。反之如果没有取数据,子线程可能会在主线程结束之后才结束。
举例说明:
- package exceldemo.task;
-
- import exceldemo.dto.User;
- import exceldemo.service.UserService;
-
- import java.util.List;
- import java.util.concurrent.Callable;
-
- public class UserTaskTest implements Callable<List<User>> {
-
- private List<Integer> ids;
- private UserService userService;
-
- public UserTaskTest(List<Integer> childIds, UserService userService) {
- System.out.println("构造");
- this.ids = childIds;
- this.userService = userService;
- }
-
- @Override
- public List<User> call() throws Exception {
- Thread.sleep(4000);
- System.out.println("执行");
- return userService.getByIds(ids);
- }
- }

例a:submit提交任务之后没有取数据:
- package exceldemo.rest;
-
- import exceldemo.dto.User;
- import exceldemo.service.UserService;
- import exceldemo.task.UserTaskTest;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.Future;
-
- @RestController
- @RequestMapping("/order")
- public class OrderTest {
-
- @Autowired
- private ThreadPoolTaskExecutor threadPoolTaskExecutor;
-
- @Autowired
- private UserService userService;
-
-
- @RequestMapping("/submit")
- public List<User> submit(){
- List<Integer> ids = new ArrayList<>();
- for(int i = 0;i<=500;i++){
- ids.add(i);
- }
-
- //异步获取所有用户
-
- List<User> users = new ArrayList<>();
- List<Future> futures = new ArrayList<>();
-
- for (int i = 0; i < ids.size(); i += 100) {
- int startIndex = i;
- int endIndex = startIndex + 100 > ids.size() ? ids.size() : startIndex + 100;
- UserTaskTest task = new UserTaskTest(ids.subList(startIndex, endIndex),userService);
- Future<List<User>> future = threadPoolTaskExecutor.submit(task);
- System.out.println("加入futurn");
- futures.add(future);
- }
-
- System.out.println("返回结果"+users.size());
- return users;
- }
- }

请求后后端打印:
- 构造
- 加入futurn
- 构造
- 加入futurn
- 构造
- 加入futurn
- 构造
- 加入futurn
- 构造
- 加入futurn
- 构造
- 加入futurn
- 返回结果0
- 执行
- 执行
- 执行
- 执行
- 执行
- 执行

可以看到子线程比较耗时,主线程结束之后,子线程还没有执行完;
例b:submit提交任务之后取数据:
- @RequestMapping("/submit")
- public List<User> submit(){
- List<Integer> ids = new ArrayList<>();
- for(int i = 0;i<=500;i++){
- ids.add(i);
- }
-
- //异步获取所有用户
-
- List<User> users = new ArrayList<>();
- List<Future> futures = new ArrayList<>();
-
- for (int i = 0; i < ids.size(); i += 100) {
- int startIndex = i;
- int endIndex = startIndex + 100 > ids.size() ? ids.size() : startIndex + 100;
- UserTaskTest task = new UserTaskTest(ids.subList(startIndex, endIndex),userService);
- Future<List<User>> future = threadPoolTaskExecutor.submit(task);
- System.out.println("加入futurn");
- futures.add(future);
- }
- //取数据
- try{
- System.out.println("获取数据");
- for(Future future : futures){
- System.out.println("获取数据内部");
- users.addAll((List<User>) future.get());
- }
- }catch (Exception e){
-
- }
- System.out.println("返回结果"+users.size());
- return users;
- }

请求后后端打印:
- 构造
- 加入futurn
- 构造
- 加入futurn
- 构造
- 加入futurn
- 构造
- 加入futurn
- 构造
- 加入futurn
- 构造
- 加入futurn
- 获取数据
- 获取数据内部
- 执行
- 执行
- 执行
- 执行
- 执行
- 获取数据内部
- 获取数据内部
- 获取数据内部
- 获取数据内部
- 获取数据内部
- 执行
- 返回结果501

可以看到,即使子线程比主线程耗时,主线程也等子线程结束后才结束。
这两个例子证明了使用submit提交任务,提交后只要有从Future取数据的操作,就可以保证主线程在子线程结束后才结束。
************************************************************分割线****************************************************************************
2、下面再举个完整的例子,在子线程同样耗时以及主线程执行步骤一样的情况下比较execute和submit这两种方法:
线程池:
- package exceldemo.config;
-
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-
- @Configuration
- public class ThreadPoolTaskExecutorConfig {
- private static int CORE_POOL_SIZE = 5;
- private static int MAX_POOL_SIZE = 1000;
- @Bean(name="threadPoolTaskExecutor")
- public ThreadPoolTaskExecutor serviceJobTaskExecutor(){
- ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
- //线程池维护线程的最少数量
- poolTaskExecutor.setCorePoolSize(CORE_POOL_SIZE);
- //线程池维护线程的最大数量
- poolTaskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
- //线程池所使用的缓冲队列
- poolTaskExecutor.setQueueCapacity(200);
- //线程池维护线程所允许的空闲时间
- poolTaskExecutor.setKeepAliveSeconds(30000);
- poolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
- System.out.println(poolTaskExecutor);
- return poolTaskExecutor;
- }
- }

controller接口:
- package exceldemo.rest;
-
- import exceldemo.task.MyOrderCallableTask;
- import exceldemo.task.MyOrderRunnableTask;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Future;
-
- @RestController
- @RequestMapping("/myOrderDemo")
- public class MyOrderDemo {
-
- @Autowired
- private ThreadPoolTaskExecutor threadPoolTaskExecutor;
-
- @RequestMapping("/execute")
- public void execute(){
- String str = "execute方法";
- threadPoolTaskExecutor.execute(new MyOrderRunnableTask(str));
- System.out.println("主线程调用结束");
-
- }
-
- @RequestMapping("/submit")
- public String submit(){
- String str = "submit方法";
- Future<String> future = threadPoolTaskExecutor.submit(new MyOrderCallableTask(str));
- try {
- str = future.get();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
- System.out.println("主线程调用结束");
- return str;
- }
- }

Callable实现类:
- package exceldemo.task;
-
- import java.util.concurrent.Callable;
-
- public class MyOrderCallableTask implements Callable<String> {
-
- private String name;
-
- public MyOrderCallableTask(String name) {
- this.name = name;
- }
-
- @Override
- public String call() throws Exception {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- name = "MyOrderCallableTask";
- System.out.println("MyOrderCallableTask已执行");
- return name;
- }
- }

Runnable实现类:
- package exceldemo.task;
-
- public class MyOrderRunnableTask implements Runnable{
-
- private String name;
-
- public MyOrderRunnableTask(String name){
- this.name = name;
- }
-
- @Override
- public void run() {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- name = "MyOrderRunnableTask";
- System.out.println("MyOrderRunnableTask已执行");
- }
- }

启动类:
- package exceldemo;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Primary;
- import org.springframework.core.task.TaskExecutor;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-
-
- @SpringBootApplication
- public class Application {
- public static void main(String[] args) {
- SpringApplication.run(Application.class, args);
-
- }
- }
-

启动项目后,
请求execute接口:
后端打印:
- 主线程调用结束
- MyOrderRunnableTask已执行
请求submit方法:
后端打印:
- MyOrderCallableTask已执行
- 主线程调用结束
验证结束。这也和他们的功能是保持一致的。不需要返回结果,主线程就不需要等待子线程执行;需要返回结果,主线程肯定需要等所有的子线程结束后汇总结果。所以在调用的时候也需要注意:
(1)如果主线程调用了ThreadPoolTaskExecutor的execute提交任务,且传递了参数给子线程,并且子线程在修改这个参数,调用后主线程就不应该再使用这个参数,因为这个参数的值已经无法确定了;
(2)如果主线程调用了ThreadPoolTaskExecutor的submit提交任务,记得要在调用的逻辑后面,从Future里面把返回值取出来(调用Future的get方法),否则就和execute的效果一样了。
二、处理异常的区别:Callable执行call时遇到异常会抛出,而Runnable执行run时遇到异常并不会抛出。
举例:
- package com.demo.rest;
-
- import com.demo.service.UserService;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- @RestController
- @RequestMapping("/user")
- public class UserController {
-
- @Autowired
- private UserService userService;
-
- @RequestMapping("/submit")
- public String submit(String param){
-
- param = userService.submit(param);
- return param;
- }
-
- @RequestMapping("/execute")
- public String execute(String param){
- String res = userService.execute(param);
- return res;
- }
- }

- package com.demo.service.impl;
-
- import com.demo.dto.UserDTO;
- import com.demo.mapper.UserMapper;
- import com.demo.service.UserService;
- import com.demo.task.UserCallableTask;
- import com.demo.task.UserRunnableTask;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
- import org.springframework.stereotype.Service;
- import org.springframework.transaction.annotation.Transactional;
-
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Future;
-
- @Service("userService")
- public class UserServiceImpl implements UserService {
-
- @Autowired
- private ThreadPoolTaskExecutor threadPoolTaskExecutor;
-
- @Autowired
- private UserMapper userMapper;
-
- @Override
- public String submit(String param) {
-
-
- Future<String> future = threadPoolTaskExecutor.submit(new UserCallableTask(param));
- try {
- param = future.get();
- } catch (InterruptedException e) {
- e.printStackTrace();
- return "error";
- } catch (ExecutionException e) {
- e.printStackTrace();
- return "error";
- }
- UserDTO user = new UserDTO();
- user.setName(param);
- userMapper.insert(user);
- return param;
- }
-
- @Override
- public String execute(String param) {
- threadPoolTaskExecutor.execute(new UserRunnableTask(param,userMapper));
- return "success";
- }
- }

task:
- package com.demo.task;
-
- import com.demo.dto.UserDTO;
- import com.demo.mapper.UserMapper;
-
- import java.util.concurrent.Callable;
-
- public class UserCallableTask implements Callable<String> {
-
- private String param;
-
- public UserCallableTask (String param){
- this.param = param;
- }
- @Override
- public String call() throws Exception {
- param += "UserCallableTask";
- int a = 1/0;
- return param;
- }
- }

- package com.demo.task;
-
- import com.demo.dto.UserDTO;
- import com.demo.mapper.UserMapper;
-
- public class UserRunnableTask implements Runnable {
-
- private String param;
-
- private UserMapper userMapper;
-
- public UserRunnableTask (String param,UserMapper userMapper){
- this.param = param;
- this.userMapper = userMapper;
- }
-
- @Override
- public void run() {
- param += "UserRunnableTask";
- UserDTO user = new UserDTO();
- user.setName(param);
- int a = 1/0;
- userMapper.insert(user);
- }
- }

请求submit:
请求execute:
因为在两个task里面都加了异常1/0,所以请求这两个方法都不会往数据库插入数据。call方法抛出异常,service层捕获到后return就不再插入数据了,run方法自己遇到异常就终止了也不再往下执行。区别就再于二者对于异常的处理,调用sumbit方法执行时可以捕获异常,这样就可以自定义处理如把异常抛出给调用处(controller层),而execute的run方法遇到异常就自己终止了,主线程无法感知其运行成功与否。
有的人可能会想在调用execute时加上try...catch....,这个肯定是不可以的,这个try...catch...捕获的只是
threadPoolTaskExecutor.execute(new UserRunnableTask(param,userMapper));这个任务提交有没有异常,而这个任务和主线程是异步的,它实际执行的run方法主线程是捕获不到的。可以验证一下:
- @Override
- @Transactional
- public String execute(String param) {
- try{
- threadPoolTaskExecutor.execute(new UserRunnableTask(param,userMapper));
- }catch (Exception e){
- System.out.println("有异常");
- return "error";
- }
-
- return "success";
- }
请求:
验证确实没有捕获到。
三、多线程与事务回滚:
上述,如果在事务中调用了多线程,submit遇到异常会抛出且必须被捕获,不会触发回滚,execute遇到异常主线程无法感知,也不会触发回滚。那如果需要在多线程调用时实现事务回滚该怎么做呢?这就需要加入其它的操作了:
1、submit方法与事务回滚:我们知道sumbit方法提交线程在获取返回结果时是需要捕获异常的,那么我们就可以在捕获到异常时手动回滚当前事务。
(1)主线程正常,子线程发生异常,只回滚主线程:这种情况比较简单,主线程捕获异常后直接TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();回滚主线程就可以了:
- package com.demo.service.impl;
-
- import com.demo.dto.UserDTO;
- import com.demo.mapper.UserMapper;
- import com.demo.service.UserService;
- import com.demo.task.UserCallableTask;
- import com.demo.task.UserRunnableTask;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
- import org.springframework.stereotype.Service;
- import org.springframework.transaction.annotation.Transactional;
- import org.springframework.transaction.interceptor.TransactionAspectSupport;
-
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Future;
-
- @Service("userService")
- public class UserServiceImpl implements UserService {
-
- @Autowired
- private ThreadPoolTaskExecutor threadPoolTaskExecutor;
-
- @Autowired
- private UserMapper userMapper;
-
- @Override
- @Transactional
- public String submit(String param) {
-
-
- Future<String> future = threadPoolTaskExecutor.submit(new UserCallableTask(param,userMapper));
- UserDTO user = new UserDTO();
- user.setName("我是主线程");
- userMapper.insert(user);
- try {
- param = future.get();
- } catch (InterruptedException e) {
- e.printStackTrace();
- TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
- return "error";
- } catch (ExecutionException e) {
- e.printStackTrace();
- TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
- return "error";
- }
-
- return param;
- }
-
- @Override
- public String execute(String param) {
- // threadPoolTaskExecutor.execute(new UserRunnableTask(param,userMapper));
- return "success";
- }
- }

任务:
- package com.demo.task;
-
- import com.demo.dto.UserDTO;
- import com.demo.mapper.UserMapper;
-
- import java.util.concurrent.Callable;
-
- public class UserCallableTask implements Callable<String> {
-
- private String param;
-
- private UserMapper userMapper;
-
- public UserCallableTask(String param, UserMapper userMapper){
- this.param = param;
- this.userMapper = userMapper;
- }
- @Override
- public String call() throws Exception {
- param += "UserCallableTask";
- UserDTO user = new UserDTO();
- user.setName("我是子线程");
- userMapper.insert(user);
- int a = 1/0;
- return param;
- }
- }

请求
数据库没有主线程的数据插入:
说明主线程回滚成功。
(2)、主线程或子线程异常,主线程、子线程全部回滚:同时回滚主线程和子线程,就需要把主线程和子线程放到同一个事务中。
说明主线程、子线程全部回滚成功。
2、execute方法:
四、
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。