赞
踩
(一)解决的问题: core表示式无法灵活修改,定时任务无法用数据库控制开关等问题,定时任务执行数据库相关操作需要额外写代码。
(二)原理: ThreadPoolTaskScheduler 线程池可以管理spring的定时任务
(三)思路:
1 一个定时任务管理实体类,主要字段有core表达式,bean名称,方法名,状态,定时任务名称等
2 使用反射根据bean名称和方法名找到定时任务需要执行的方法
3 使用ThreadPoolTaskScheduler提供的方法进行定时任务添加/关闭/执行一次等
4 如何实现动态开关:将数据存在数据库中,配置类初始化方法使用**@PostConstruct**初始化需要执行的定时任务列表,添加到ThreadPoolTaskScheduler 线程池中执行
(四)代码
ScheduleConfigDO 实体类,mapper,service等不贴出了 public class ScheduleConfigDO { /** * 自增主键 */ private Integer id; /** * 定时任务名称 */ private String jobName; /** * 类名称 */ private String className; /** * 方法 */ private String method; /** * cron 表达式 */ private String cron; /** * 状态:1正常,0停用 */ private Integer enabled; /** * 创建时间 */ private Date createTime; /** * 创建人 */ private String createBy; /** * 更新时间 */ private Date updateTime; /** * 更新人 */ private String updateBy; }
AsyncAndScheduleTaskExecutePool 线程池配置类 @Slf4j @EnableAsync @Configuration @EnableScheduling public class AsyncAndScheduleTaskExecutePool implements SchedulingConfigurer,AsyncConfigurer { @Override public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) { ThreadPoolTaskScheduler taskScheduler = taskScheduler(); scheduledTaskRegistrar.setTaskScheduler(taskScheduler); } /** * 定时任务使用的线程池 * @return */ @Bean(destroyMethod = "shutdown", name = "taskScheduler") public ThreadPoolTaskScheduler taskScheduler(){ ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(50); scheduler.setThreadNamePrefix("taskScheduler-"); scheduler.setAwaitTerminationSeconds(600); scheduler.setErrorHandler(throwable -> log.error("定时调度任务发生异常{}", throwable.toString())); scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); scheduler.setWaitForTasksToCompleteOnShutdown(true); scheduler.initialize(); return scheduler; } @Override public Executor getAsyncExecutor() { return asyncExecutor(); } /** * 异步任务执行线程池 * @return */ @Bean(name = "asyncExecutor") public ThreadPoolTaskExecutor asyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //核心线程池大小 executor.setCorePoolSize(50); //最大线程数 executor.setMaxPoolSize(100); //队列容量 executor.setQueueCapacity(100); //活跃时间 executor.setKeepAliveSeconds(60); //设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean executor.setWaitForTasksToCompleteOnShutdown(true); //线程名字前缀 executor.setThreadNamePrefix("asyncExecutor-"); // setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务 // CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } /** * 异步任务异常处理 * @return */ @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (throwable, method, objects) -> { log.error("异步任务执行出现异常{}", throwable.toString()); log.error("exception method:{}", method.getName()); log.error("exception objects:{}", objects.toString()); }; } }
ScheduleTaskComponent 定时任务管理类 @Component public class ScheduleTaskComponent { // 保存任务 private Map<String, ScheduledFuture<?>> futuresMap = new ConcurrentHashMap<String, ScheduledFuture<?>>(); @Autowired private ScheduleConfigService scheduleConfigService; // 创建ThreadPoolTaskScheduler线程池 @Autowired private ThreadPoolTaskScheduler threadPoolTaskScheduler; // 初始化任务 @PostConstruct public void initSchedule(){ List<ScheduleConfigDTO> list = scheduleConfigService.getList(null); for (ScheduleConfigDTO config : list){ ScheduledFuture<?> future = threadPoolTaskScheduler.schedule(getRunnable(config), getTrigger(config)); futuresMap.put(config.getJobName(), future); } } /** * 暂停任务 * @param key * @return */ public boolean pauseeTask(String key) { ScheduledFuture toBeRemovedFuture = futuresMap.remove(key); if (toBeRemovedFuture != null) { toBeRemovedFuture.cancel(true); return true; } else { return false; } } /** * 添加任务 * @param config */ public void addTask(ScheduleConfigDTO config){ ScheduledFuture<?> future = threadPoolTaskScheduler.schedule(getRunnable(config), getTrigger(config)); futuresMap.put(config.getJobName(), future); } /** * 更新任务 * @param config */ public void updateTask(ScheduleConfigDTO config) { ScheduledFuture toBeRemovedFuture = futuresMap.remove(config.getJobName()); if (toBeRemovedFuture != null) { toBeRemovedFuture.cancel(true); } addTask(config); } /** * 转换首字母小写 * * @param str * @return */ public static String lowerFirstCapse(String str) { char[] chars = str.toCharArray(); chars[0] += 32; return String.valueOf(chars); } /** * //反射使用newInstance静态方法来实例化对象 获取不了spring中注入的对象 * runnable * @param scheduleConfig * @return */ private Runnable getRunnable(ScheduleConfigDTO scheduleConfig) { return new Runnable() { @Override public void run() { Class<?> clazz; try { //注意这里如果@Service注解没指定名字,生成的默认首字母小写 Object o = ApplicationContextUtil.getBean(scheduleConfig.getClassName()); clazz = o.getClass(); Method method = clazz.getMethod(scheduleConfig.getMethod()); //method.invoke(clazz.newInstance()); method.invoke(o); } catch (Exception e) { log.error(e.getMessage()); } } }; } /** * Trigger * @param scheduleConfig * @return */ private Trigger getTrigger(ScheduleConfigDTO scheduleConfig) { return new Trigger() { @Override public Date nextExecutionTime(TriggerContext triggerContext) { CronTrigger trigger = new CronTrigger(scheduleConfig.getCron()); Date nextExec = trigger.nextExecutionTime(triggerContext); return nextExec; } }; } }
控制器controller类 @RestController @Api(tags = "定时任务配置") @RequestMapping("/scheduleConfig") public class ScheduleConfigController { private static final Logger LOGGER = LoggerFactory.getLogger(ScheduleConfigController.class); @Autowired private ScheduleConfigService scheduleConfigService; @Autowired private ScheduleTaskComponent scheduleTaskComponent; /** * 分页查询 * @param scheduleConfigQueryVO 分页查询参数 * @return 分页参数 */ @ApiOperation("分页查询") @PostMapping("getPageList") public ScheduleConfigPageInfoVO selectPageList(@RequestBody @Valid ScheduleConfigQueryVO scheduleConfigQueryVO) { String info = String.format("The method name[selectPageList] params:%s", scheduleConfigQueryVO.toString()); LOGGER.info(info); ScheduleConfigDTO scheduleConfig = BeanUtil.convert(scheduleConfigQueryVO, ScheduleConfigDTO.class); Page<ScheduleConfigResponseVO> page = PageHelper.startPage(scheduleConfigQueryVO.getPageNum(), scheduleConfigQueryVO.getPageSize()); List<ScheduleConfigDTO> scheduleConfigList = scheduleConfigService.getList(scheduleConfig); ScheduleConfigPageInfoVO scheduleConfigPageInfo = new ScheduleConfigPageInfoVO(); BeanUtils.copyProperties(page.toPageInfo(), scheduleConfigPageInfo); List<ScheduleConfigResponseVO> voList = BeanUtil.convert(scheduleConfigList, ScheduleConfigResponseVO.class); scheduleConfigPageInfo.setList(voList); return scheduleConfigPageInfo; } /** * 添加任务 * @return */ @Log(type = Log.OperationType.ADD,button = "添加任务",menu = "定时任务管理") @ApiOperation("添加任务") @PostMapping("/insert") public void insertTask(@RequestBody @Valid ScheduleConfigUpdateVO vo) throws BusinessException { ScheduleConfigDTO dto = BeanUtil.convert(vo, ScheduleConfigDTO.class); if(StringUtils.isBlank(dto.getClassName())||StringUtils.isBlank(dto.getMethod())||StringUtils.isBlank(dto.getCron())){ throw new BusinessException("类名,方法名,core表达式不能为空"); } if(!CronExpression.isValidExpression(dto.getCron())){ throw new BusinessException("core表达式格式错误"); } dto.setCreateBy(SecurityUtils.getUsername()); dto.setUpdateBy(SecurityUtils.getUsername()); dto.setUpdateTime(DateUtils.getNowDate()); dto.setCreateTime(DateUtils.getNowDate()); dto.setEnabled(1); scheduleConfigService.insert(dto); //新增任务task scheduleTaskComponent.addTask(dto); } /** * 修改任务 * @return */ @Log(type = Log.OperationType.UPDATE,button = "修改任务",menu = "定时任务管理") @ApiOperation("修改任务") @PostMapping("/update") public void updateTask(@RequestBody @Valid ScheduleConfigUpdateVO vo){ if(StringUtils.isBlank(vo.getCron())){ vo.setCron(" 0 0/1 * * * ?"); } if(vo.getId() == null||StringUtils.isBlank(vo.getClassName())||StringUtils.isBlank(vo.getMethod())){ throw new BusinessException("ID,类名,方法名,core表达式不能为空"); } if(!CronExpression.isValidExpression(vo.getCron())){ throw new BusinessException("core表达式格式错误"); } ScheduleConfigDTO dto = BeanUtil.convert(vo, ScheduleConfigDTO.class); dto.setUpdateBy(SecurityUtils.getUsername()); dto.setUpdateTime(new Date()); scheduleConfigService.update(dto); //更新任务task scheduleTaskComponent.updateTask(dto); } /** * 暂停任务 */ @Log(type = Log.OperationType.UPDATE,button = "暂停任务",menu = "定时任务管理") @ApiOperation("暂停任务") @GetMapping("/pause") public void pauseTask(@RequestParam Integer id){ ScheduleConfigDTO dto = new ScheduleConfigDTO(); dto.setId(id); //1正常,0停用 dto.setEnabled(0); dto.setUpdateBy(SecurityUtils.getUsername()); dto.setUpdateTime(new Date()); scheduleConfigService.update(dto); //暂停任务task scheduleTaskComponent.pauseeTask(scheduleConfigService.getById(id).getJobName()); } /** * 开启任务 */ @Log(type = Log.OperationType.UPDATE,button = "开启任务",menu = "定时任务管理") @ApiOperation("开启任务") @GetMapping("/start") public void startTask(@RequestParam Integer id){ ScheduleConfigDTO dto = new ScheduleConfigDTO(); dto.setId(id); dto.setEnabled(1); dto.setUpdateBy(SecurityUtils.getUsername()); dto.setUpdateTime(new Date()); scheduleConfigService.update(dto); //开启任务task scheduleTaskComponent.addTask(scheduleConfigService.getById(id)); } /** * 执行一次 */ @Log(type = Log.OperationType.UPDATE,button = "执行一次",menu = "定时任务管理") @ApiOperation("执行一次") @GetMapping("/executeOnce") public void executeOnce(@RequestParam Integer id) throws Exception{ ScheduleConfigDTO dto = scheduleConfigService.getById(id); Class<?> clazz; //注意这里如果@Service注解没指定名字,生成的默认首字母小写 Object o = ApplicationContextUtil.getBean(dto.getClassName()); clazz = o.getClass(); Method method = clazz.getMethod(dto.getMethod()); //method.invoke(clazz.newInstance()); method.invoke(o); } /** * 删除任务 */ @Log(type = Log.OperationType.DELETE,button = "删除任务",menu = "定时任务管理") @ApiOperation("删除任务") @DeleteMapping("/delete") public void delTask(@RequestParam Integer id){ ScheduleConfigDTO dto = scheduleConfigService.getById(id); scheduleConfigService.delete(dto); scheduleTaskComponent.pauseeTask(dto.getJobName()); } }
前端效果(这里还没做对接)

后端启动效果:


Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。