当前位置:   article > 正文

【定时任务】SpringBoot多线程并发动态执行定时任务_多线程定时任务并发执行

多线程定时任务并发执行

通过读取数据库的方式来进行定时任务信息获取,再通过多线程进行并发动态执行!

一、创建业务表

  1. CREATE TABLE `scheduled_task` (
  2. `id` int(11) NOT NULL AUTO_INCREMENT,
  3. `task_key` varchar(128) DEFAULT NULL COMMENT '任务key值(使用bean名称)',
  4. `task_desc` varchar(128) DEFAULT NULL COMMENT '任务描述',
  5. `task_cron` varchar(128) DEFAULT NULL COMMENT '任务表达式',
  6. `init_start_flag` int(2) DEFAULT '1' COMMENT '程序初始化是否启动 1 是 0 否',
  7. `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  8. `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  9. PRIMARY KEY (`id`),
  10. UNIQUE KEY `uniqu_task_key` (`task_key`)
  11. ) ENGINE=InnoDB AUTO_INCREMENT=101 DEFAULT CHARSET=utf8;

二、插入数据

  1. INSERT INTO `scheduled_task`(`id`, `task_key`, `task_desc`, `task_cron`, `init_start_flag`, `create_time`, `update_time`) VALUES (1, 'scheduledTask01', '定时任务01', '0/5 * * * * ?', 1, NOW(), NOW());
  2. INSERT INTO `scheduled_task`(`id`, `task_key`, `task_desc`, `task_cron`, `init_start_flag`, `create_time`, `update_time`) VALUES (2, 'scheduledTask02', '定时任务02', '0/2 * * * * ?', 0, NOW(), NOW());
  3. INSERT INTO `scheduled_task`(`id`, `task_key`, `task_desc`, `task_cron`, `init_start_flag`, `create_time`, `update_time`) VALUES (3, 'scheduledTask03', '定时任务03', '0/2 * * * * ?', 1, NOW(), NOW());

三、SpringBoot启动类添加相应注解

  1. import org.mybatis.spring.annotation.MapperScan;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. import org.springframework.scheduling.annotation.EnableScheduling;
  5. /**
  6. * @author lzw
  7. * @create 2022-07-14-9:30
  8. */
  9. @SpringBootApplication
  10. @EnableScheduling
  11. @MapperScan("cn.sdata.mapper")
  12. public class SpringBootController {
  13. public static void main(String[] args) {
  14. SpringApplication.run(SpringBootController.class,args);
  15. }
  16. }

四、定时任务执行核心实现类

  1. import cn.sdata.entity.LogTask;
  2. import com.baomidou.mybatisplus.core.toolkit.StringUtils;
  3. import org.springframework.scheduling.annotation.SchedulingConfigurer;
  4. import org.springframework.scheduling.config.CronTask;
  5. import org.springframework.scheduling.config.ScheduledTaskRegistrar;
  6. import org.springframework.stereotype.Component;
  7. import javax.annotation.PreDestroy;
  8. import java.util.List;
  9. import java.util.Set;
  10. import java.util.concurrent.ConcurrentHashMap;
  11. import java.util.concurrent.Executors;
  12. import java.util.concurrent.ScheduledFuture;
  13. @Component
  14. public class ScheduledTask implements SchedulingConfigurer {
  15. private static volatile ScheduledTaskRegistrar registrar;
  16. private static volatile ConcurrentHashMap<Integer, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<Integer, ScheduledFuture<?>>();
  17. private static volatile ConcurrentHashMap<Integer, CronTask> cronTasks = new ConcurrentHashMap<Integer, CronTask>();
  18. @Override
  19. public void configureTasks(ScheduledTaskRegistrar registrar) {
  20. //设置20个线程,默认单线程
  21. registrar.setScheduler(Executors.newScheduledThreadPool(20));
  22. this.registrar = registrar;
  23. }
  24. public void refresh(List<LogTask> tasks){
  25. //取消已经删除的策略任务
  26. Set<Integer> sids = scheduledFutures.keySet();
  27. for (Integer sid : sids) {
  28. if(!exists(tasks, sid)){
  29. scheduledFutures.get(sid).cancel(false);
  30. }
  31. }
  32. for (LogTask logTask : tasks) {
  33. // ScheduledTaskRunnable t = new ScheduledTaskRunnable(logTask.getTask_id(), logTask.getRule_db_id());
  34. String expression = logTask.getExpression();
  35. //计划任务表达式为空则跳过
  36. if(StringUtils.isEmpty(expression)){
  37. continue;
  38. }
  39. //计划任务已存在并且表达式未发生变化则跳过
  40. if(scheduledFutures.containsKey(logTask.getTask_id()) && cronTasks.get(logTask.getTask_id()).getExpression().equals(expression)){
  41. continue;
  42. }
  43. //如果策略执行时间发生了变化,则取消当前策略的任务
  44. if(scheduledFutures.containsKey(logTask.getTask_id())){
  45. scheduledFutures.get(logTask.getTask_id()).cancel(false);
  46. scheduledFutures.remove(logTask.getTask_id());
  47. cronTasks.remove(logTask.getTask_id());
  48. }
  49. CronTask task = new CronTask(new Runnable() {
  50. @Override
  51. public void run() {
  52. //每个计划任务实际需要执行的具体业务逻辑
  53. System.out.println("正在执行的任务ID: "+logTask.getTask_id()+" |执行的cron表达式: "+logTask.getExpression());
  54. }
  55. }, expression);
  56. ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());
  57. cronTasks.put(logTask.getTask_id(), task);
  58. scheduledFutures.put(logTask.getTask_id(), future);
  59. }
  60. }
  61. private boolean exists(List<LogTask> tasks, Integer tid){
  62. for(LogTask logTask:tasks){
  63. if(logTask.getTask_id() == tid){
  64. return true;
  65. }
  66. }
  67. return false;
  68. }
  69. @PreDestroy
  70. public void destroy() {
  71. registrar.destroy();
  72. }
  73. }

五、定时任务相关实体类(可以根据实际业务场景进行字段添加)

  1. import lombok.Data;
  2. @Data
  3. public class LogTask {
  4. private int task_id;
  5. private String expression;
  6. }

六、进行Controller层代码编写(Service层和Mapper层代码可自己实现)

  1. import cn.sdata.config.ScheduledTask;
  2. import cn.sdata.entity.LogTask;
  3. import cn.sdata.entity.Scheduled;
  4. import cn.sdata.service.ScheduledTaskService;
  5. import lombok.AllArgsConstructor;
  6. import org.springframework.scheduling.config.ScheduledTaskRegistrar;
  7. import org.springframework.web.bind.annotation.GetMapping;
  8. import org.springframework.web.bind.annotation.RequestMapping;
  9. import org.springframework.web.bind.annotation.RestController;
  10. import java.util.ArrayList;
  11. import java.util.List;
  12. /**
  13. * @author lzw
  14. * @create 2022-07-14-9:35
  15. * 定时任务相关
  16. */
  17. @RestController
  18. @RequestMapping("cron")
  19. @AllArgsConstructor
  20. public class CronController {
  21. private final ScheduledTaskService scheduledTaskService;
  22. @GetMapping("cron")
  23. public void cron(){
  24. //创建一个任务的容器
  25. List<LogTask> logTasks = new ArrayList<>();
  26. //读取数据库中的任务信息为LogTask字段进行赋值并添加进容器中
  27. List<Scheduled> list = scheduledTaskService.findAll();
  28. for (Scheduled scheduled : list) {
  29. LogTask logTask = new LogTask();
  30. logTask.setTask_id(scheduled.getId());
  31. logTask.setExpression(scheduled.getTaskCron());
  32. logTasks.add(logTask);
  33. }
  34. //创建一个任务线程池与任务的逻辑连接纽带
  35. ScheduledTask scheduledTask = new ScheduledTask();
  36. //调用configureTasks()方法传入ScheduledTask
  37. scheduledTask.configureTasks(new ScheduledTaskRegistrar());
  38. //调用refresh()方法传入任务数据
  39. scheduledTask.refresh(logTasks);
  40. }
  41. }

七、启动程序调用cron方法

 定时任务正在并发执行

八、修改数据库中的cron数据并再次调用cron方法

当数据库中定时任务信息发生变更时可根据自己的业务场景再次调用refresh()方法

 九、删除数据库中的一条cron数据并再次调用cron方法

 动态多线程执行定时任务成功,并且极其灵活!

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/article/detail/52464
推荐阅读
相关标签
  

闽ICP备14008679号