当前位置:   article > 正文

线程池(三)ThreadPoolTaskExecutor(2)两种提交方法_threadpooltaskexecutor.execute

threadpooltaskexecutor.execute

ThreadPoolTaskExecutor有两种提交方法execute和submit:

无返回值的任务使用public void execute(Runnable command) 方法提交;

有返回值的任务使用public <T> Future<T> submit(Callable) 方法提交

下面具体来看下两者的应用以及区别。

一、与主线程执行顺序的区别:

1、(1)public void execute(Runnable command) 方法提交,子线程可能在主线程结束之后结束;

举例:

  1. @RequestMapping("/execute")
  2. public String execute(){
  3. System.out.println("进入方法");
  4. threadPoolTaskExecutor.execute(new Runnable() {
  5. @Override
  6. public void run() {
  7. try {
  8. Thread.sleep(20000);
  9. System.out.println("sleep后");
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. });
  15. System.out.println("执行提交后");
  16. return "aa";
  17. }

请求后打印:

  1. 进入方法
  2. 执行提交后
  3. sleep

可见,由于子线程比较耗时,主线程结束后子线程还没有执行完。 

(2)public <T> Future<T> submit(Callable) 方法提交,因为提交任务后有个取数据的过程,在从Future取数据的过程中,Callable自带的阻塞机制,这个机制保证主线程一定在子线程结束之后结束。反之如果没有取数据,子线程可能会在主线程结束之后才结束。

举例说明:

  1. package exceldemo.task;
  2. import exceldemo.dto.User;
  3. import exceldemo.service.UserService;
  4. import java.util.List;
  5. import java.util.concurrent.Callable;
  6. public class UserTaskTest implements Callable<List<User>> {
  7. private List<Integer> ids;
  8. private UserService userService;
  9. public UserTaskTest(List<Integer> childIds, UserService userService) {
  10. System.out.println("构造");
  11. this.ids = childIds;
  12. this.userService = userService;
  13. }
  14. @Override
  15. public List<User> call() throws Exception {
  16. Thread.sleep(4000);
  17. System.out.println("执行");
  18. return userService.getByIds(ids);
  19. }
  20. }

例a:submit提交任务之后没有取数据:

  1. package exceldemo.rest;
  2. import exceldemo.dto.User;
  3. import exceldemo.service.UserService;
  4. import exceldemo.task.UserTaskTest;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  7. import org.springframework.web.bind.annotation.RequestMapping;
  8. import org.springframework.web.bind.annotation.RestController;
  9. import java.util.ArrayList;
  10. import java.util.List;
  11. import java.util.concurrent.Future;
  12. @RestController
  13. @RequestMapping("/order")
  14. public class OrderTest {
  15. @Autowired
  16. private ThreadPoolTaskExecutor threadPoolTaskExecutor;
  17. @Autowired
  18. private UserService userService;
  19. @RequestMapping("/submit")
  20. public List<User> submit(){
  21. List<Integer> ids = new ArrayList<>();
  22. for(int i = 0;i<=500;i++){
  23. ids.add(i);
  24. }
  25. //异步获取所有用户
  26. List<User> users = new ArrayList<>();
  27. List<Future> futures = new ArrayList<>();
  28. for (int i = 0; i < ids.size(); i += 100) {
  29. int startIndex = i;
  30. int endIndex = startIndex + 100 > ids.size() ? ids.size() : startIndex + 100;
  31. UserTaskTest task = new UserTaskTest(ids.subList(startIndex, endIndex),userService);
  32. Future<List<User>> future = threadPoolTaskExecutor.submit(task);
  33. System.out.println("加入futurn");
  34. futures.add(future);
  35. }
  36. System.out.println("返回结果"+users.size());
  37. return users;
  38. }
  39. }

请求后后端打印:

  1. 构造
  2. 加入futurn
  3. 构造
  4. 加入futurn
  5. 构造
  6. 加入futurn
  7. 构造
  8. 加入futurn
  9. 构造
  10. 加入futurn
  11. 构造
  12. 加入futurn
  13. 返回结果0
  14. 执行
  15. 执行
  16. 执行
  17. 执行
  18. 执行
  19. 执行

 可以看到子线程比较耗时,主线程结束之后,子线程还没有执行完;

例b:submit提交任务之后取数据:

  1. @RequestMapping("/submit")
  2. public List<User> submit(){
  3. List<Integer> ids = new ArrayList<>();
  4. for(int i = 0;i<=500;i++){
  5. ids.add(i);
  6. }
  7. //异步获取所有用户
  8. List<User> users = new ArrayList<>();
  9. List<Future> futures = new ArrayList<>();
  10. for (int i = 0; i < ids.size(); i += 100) {
  11. int startIndex = i;
  12. int endIndex = startIndex + 100 > ids.size() ? ids.size() : startIndex + 100;
  13. UserTaskTest task = new UserTaskTest(ids.subList(startIndex, endIndex),userService);
  14. Future<List<User>> future = threadPoolTaskExecutor.submit(task);
  15. System.out.println("加入futurn");
  16. futures.add(future);
  17. }
  18. //取数据
  19. try{
  20. System.out.println("获取数据");
  21. for(Future future : futures){
  22. System.out.println("获取数据内部");
  23. users.addAll((List<User>) future.get());
  24. }
  25. }catch (Exception e){
  26. }
  27. System.out.println("返回结果"+users.size());
  28. return users;
  29. }

请求后后端打印:

  1. 构造
  2. 加入futurn
  3. 构造
  4. 加入futurn
  5. 构造
  6. 加入futurn
  7. 构造
  8. 加入futurn
  9. 构造
  10. 加入futurn
  11. 构造
  12. 加入futurn
  13. 获取数据
  14. 获取数据内部
  15. 执行
  16. 执行
  17. 执行
  18. 执行
  19. 执行
  20. 获取数据内部
  21. 获取数据内部
  22. 获取数据内部
  23. 获取数据内部
  24. 获取数据内部
  25. 执行
  26. 返回结果501

可以看到,即使子线程比主线程耗时,主线程也等子线程结束后才结束。

这两个例子证明了使用submit提交任务,提交后只要有从Future取数据的操作,就可以保证主线程在子线程结束后才结束。

************************************************************分割线****************************************************************************

2、下面再举个完整的例子,在子线程同样耗时以及主线程执行步骤一样的情况下比较execute和submit这两种方法:

线程池:

  1. package exceldemo.config;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  5. @Configuration
  6. public class ThreadPoolTaskExecutorConfig {
  7. private static int CORE_POOL_SIZE = 5;
  8. private static int MAX_POOL_SIZE = 1000;
  9. @Bean(name="threadPoolTaskExecutor")
  10. public ThreadPoolTaskExecutor serviceJobTaskExecutor(){
  11. ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
  12. //线程池维护线程的最少数量
  13. poolTaskExecutor.setCorePoolSize(CORE_POOL_SIZE);
  14. //线程池维护线程的最大数量
  15. poolTaskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
  16. //线程池所使用的缓冲队列
  17. poolTaskExecutor.setQueueCapacity(200);
  18. //线程池维护线程所允许的空闲时间
  19. poolTaskExecutor.setKeepAliveSeconds(30000);
  20. poolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
  21. System.out.println(poolTaskExecutor);
  22. return poolTaskExecutor;
  23. }
  24. }

controller接口: 

  1. package exceldemo.rest;
  2. import exceldemo.task.MyOrderCallableTask;
  3. import exceldemo.task.MyOrderRunnableTask;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. import java.util.concurrent.ExecutionException;
  9. import java.util.concurrent.Future;
  10. @RestController
  11. @RequestMapping("/myOrderDemo")
  12. public class MyOrderDemo {
  13. @Autowired
  14. private ThreadPoolTaskExecutor threadPoolTaskExecutor;
  15. @RequestMapping("/execute")
  16. public void execute(){
  17. String str = "execute方法";
  18. threadPoolTaskExecutor.execute(new MyOrderRunnableTask(str));
  19. System.out.println("主线程调用结束");
  20. }
  21. @RequestMapping("/submit")
  22. public String submit(){
  23. String str = "submit方法";
  24. Future<String> future = threadPoolTaskExecutor.submit(new MyOrderCallableTask(str));
  25. try {
  26. str = future.get();
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. } catch (ExecutionException e) {
  30. e.printStackTrace();
  31. }
  32. System.out.println("主线程调用结束");
  33. return str;
  34. }
  35. }

Callable实现类: 

  1. package exceldemo.task;
  2. import java.util.concurrent.Callable;
  3. public class MyOrderCallableTask implements Callable<String> {
  4. private String name;
  5. public MyOrderCallableTask(String name) {
  6. this.name = name;
  7. }
  8. @Override
  9. public String call() throws Exception {
  10. try {
  11. Thread.sleep(2000);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. name = "MyOrderCallableTask";
  16. System.out.println("MyOrderCallableTask已执行");
  17. return name;
  18. }
  19. }

Runnable实现类: 

  1. package exceldemo.task;
  2. public class MyOrderRunnableTask implements Runnable{
  3. private String name;
  4. public MyOrderRunnableTask(String name){
  5. this.name = name;
  6. }
  7. @Override
  8. public void run() {
  9. try {
  10. Thread.sleep(2000);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. name = "MyOrderRunnableTask";
  15. System.out.println("MyOrderRunnableTask已执行");
  16. }
  17. }

启动类:

  1. package exceldemo;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Primary;
  7. import org.springframework.core.task.TaskExecutor;
  8. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  9. @SpringBootApplication
  10. public class Application {
  11. public static void main(String[] args) {
  12. SpringApplication.run(Application.class, args);
  13. }
  14. }

 启动项目后,

请求execute接口:

后端打印:

  1. 主线程调用结束
  2. MyOrderRunnableTask已执行

请求submit方法:

后端打印:

  1. MyOrderCallableTask已执行
  2. 主线程调用结束

验证结束。这也和他们的功能是保持一致的。不需要返回结果,主线程就不需要等待子线程执行;需要返回结果,主线程肯定需要等所有的子线程结束后汇总结果。所以在调用的时候也需要注意:

(1)如果主线程调用了ThreadPoolTaskExecutor的execute提交任务,且传递了参数给子线程,并且子线程在修改这个参数,调用后主线程就不应该再使用这个参数,因为这个参数的值已经无法确定了;

(2)如果主线程调用了ThreadPoolTaskExecutor的submit提交任务,记得要在调用的逻辑后面,从Future里面把返回值取出来(调用Future的get方法),否则就和execute的效果一样了。

二、处理异常的区别:Callable执行call时遇到异常会抛出,而Runnable执行run时遇到异常并不会抛出。

    举例:

  1. package com.demo.rest;
  2. import com.demo.service.UserService;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.RequestMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. @RestController
  7. @RequestMapping("/user")
  8. public class UserController {
  9. @Autowired
  10. private UserService userService;
  11. @RequestMapping("/submit")
  12. public String submit(String param){
  13. param = userService.submit(param);
  14. return param;
  15. }
  16. @RequestMapping("/execute")
  17. public String execute(String param){
  18. String res = userService.execute(param);
  19. return res;
  20. }
  21. }
  1. package com.demo.service.impl;
  2. import com.demo.dto.UserDTO;
  3. import com.demo.mapper.UserMapper;
  4. import com.demo.service.UserService;
  5. import com.demo.task.UserCallableTask;
  6. import com.demo.task.UserRunnableTask;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  9. import org.springframework.stereotype.Service;
  10. import org.springframework.transaction.annotation.Transactional;
  11. import java.util.concurrent.ExecutionException;
  12. import java.util.concurrent.Future;
  13. @Service("userService")
  14. public class UserServiceImpl implements UserService {
  15. @Autowired
  16. private ThreadPoolTaskExecutor threadPoolTaskExecutor;
  17. @Autowired
  18. private UserMapper userMapper;
  19. @Override
  20. public String submit(String param) {
  21. Future<String> future = threadPoolTaskExecutor.submit(new UserCallableTask(param));
  22. try {
  23. param = future.get();
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. return "error";
  27. } catch (ExecutionException e) {
  28. e.printStackTrace();
  29. return "error";
  30. }
  31. UserDTO user = new UserDTO();
  32. user.setName(param);
  33. userMapper.insert(user);
  34. return param;
  35. }
  36. @Override
  37. public String execute(String param) {
  38. threadPoolTaskExecutor.execute(new UserRunnableTask(param,userMapper));
  39. return "success";
  40. }
  41. }

task:

  1. package com.demo.task;
  2. import com.demo.dto.UserDTO;
  3. import com.demo.mapper.UserMapper;
  4. import java.util.concurrent.Callable;
  5. public class UserCallableTask implements Callable<String> {
  6. private String param;
  7. public UserCallableTask (String param){
  8. this.param = param;
  9. }
  10. @Override
  11. public String call() throws Exception {
  12. param += "UserCallableTask";
  13. int a = 1/0;
  14. return param;
  15. }
  16. }
  1. package com.demo.task;
  2. import com.demo.dto.UserDTO;
  3. import com.demo.mapper.UserMapper;
  4. public class UserRunnableTask implements Runnable {
  5. private String param;
  6. private UserMapper userMapper;
  7. public UserRunnableTask (String param,UserMapper userMapper){
  8. this.param = param;
  9. this.userMapper = userMapper;
  10. }
  11. @Override
  12. public void run() {
  13. param += "UserRunnableTask";
  14. UserDTO user = new UserDTO();
  15. user.setName(param);
  16. int a = 1/0;
  17. userMapper.insert(user);
  18. }
  19. }

请求submit:

请求execute:

因为在两个task里面都加了异常1/0,所以请求这两个方法都不会往数据库插入数据。call方法抛出异常,service层捕获到后return就不再插入数据了,run方法自己遇到异常就终止了也不再往下执行。区别就再于二者对于异常的处理,调用sumbit方法执行时可以捕获异常,这样就可以自定义处理如把异常抛出给调用处(controller层),而execute的run方法遇到异常就自己终止了,主线程无法感知其运行成功与否。

有的人可能会想在调用execute时加上try...catch....,这个肯定是不可以的,这个try...catch...捕获的只是

threadPoolTaskExecutor.execute(new UserRunnableTask(param,userMapper));这个任务提交有没有异常,而这个任务和主线程是异步的,它实际执行的run方法主线程是捕获不到的。可以验证一下:
  1. @Override
  2. @Transactional
  3. public String execute(String param) {
  4. try{
  5. threadPoolTaskExecutor.execute(new UserRunnableTask(param,userMapper));
  6. }catch (Exception e){
  7. System.out.println("有异常");
  8. return "error";
  9. }
  10. return "success";
  11. }

请求:

验证确实没有捕获到。 

三、多线程与事务回滚:

上述,如果在事务中调用了多线程,submit遇到异常会抛出且必须被捕获,不会触发回滚,execute遇到异常主线程无法感知,也不会触发回滚。那如果需要在多线程调用时实现事务回滚该怎么做呢?这就需要加入其它的操作了:

1、submit方法与事务回滚:我们知道sumbit方法提交线程在获取返回结果时是需要捕获异常的,那么我们就可以在捕获到异常时手动回滚当前事务。

(1)主线程正常,子线程发生异常,只回滚主线程:这种情况比较简单,主线程捕获异常后直接TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();回滚主线程就可以了:

  1. package com.demo.service.impl;
  2. import com.demo.dto.UserDTO;
  3. import com.demo.mapper.UserMapper;
  4. import com.demo.service.UserService;
  5. import com.demo.task.UserCallableTask;
  6. import com.demo.task.UserRunnableTask;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  9. import org.springframework.stereotype.Service;
  10. import org.springframework.transaction.annotation.Transactional;
  11. import org.springframework.transaction.interceptor.TransactionAspectSupport;
  12. import java.util.concurrent.ExecutionException;
  13. import java.util.concurrent.Future;
  14. @Service("userService")
  15. public class UserServiceImpl implements UserService {
  16. @Autowired
  17. private ThreadPoolTaskExecutor threadPoolTaskExecutor;
  18. @Autowired
  19. private UserMapper userMapper;
  20. @Override
  21. @Transactional
  22. public String submit(String param) {
  23. Future<String> future = threadPoolTaskExecutor.submit(new UserCallableTask(param,userMapper));
  24. UserDTO user = new UserDTO();
  25. user.setName("我是主线程");
  26. userMapper.insert(user);
  27. try {
  28. param = future.get();
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
  32. return "error";
  33. } catch (ExecutionException e) {
  34. e.printStackTrace();
  35. TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
  36. return "error";
  37. }
  38. return param;
  39. }
  40. @Override
  41. public String execute(String param) {
  42. // threadPoolTaskExecutor.execute(new UserRunnableTask(param,userMapper));
  43. return "success";
  44. }
  45. }

任务: 

  1. package com.demo.task;
  2. import com.demo.dto.UserDTO;
  3. import com.demo.mapper.UserMapper;
  4. import java.util.concurrent.Callable;
  5. public class UserCallableTask implements Callable<String> {
  6. private String param;
  7. private UserMapper userMapper;
  8. public UserCallableTask(String param, UserMapper userMapper){
  9. this.param = param;
  10. this.userMapper = userMapper;
  11. }
  12. @Override
  13. public String call() throws Exception {
  14. param += "UserCallableTask";
  15. UserDTO user = new UserDTO();
  16. user.setName("我是子线程");
  17. userMapper.insert(user);
  18. int a = 1/0;
  19. return param;
  20. }
  21. }

请求

数据库没有主线程的数据插入:

说明主线程回滚成功。 

(2)、主线程或子线程异常,主线程、子线程全部回滚:同时回滚主线程和子线程,就需要把主线程和子线程放到同一个事务中。

说明主线程、子线程全部回滚成功。

2、execute方法:

四、 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/1008838
推荐阅读
相关标签
  

闽ICP备14008679号