欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 养生 > 基于Spring自带的调度机制实现动态任务管理

基于Spring自带的调度机制实现动态任务管理

2025/9/19 12:24:09 来源:https://blog.csdn.net/m0_37840000/article/details/145860861  浏览:    关键词:基于Spring自带的调度机制实现动态任务管理

一、需求背景

移动源需要将数据库中数据依据环保局的对接文档传递给对应省份环保局平台。传递的数据不同存在调度实时性不同,所以,需要一个调度池能够实现配置动态调度,且避免引入第三方调度工具减少维护成本。

二、实现思路

  1. 动态任务注册器:使用ScheduledTaskRegistrar动态注册任务。

  2. 数据库监听:通过定时任务定期检查数据库变化,动态更新调度。

  3. 任务容器:维护一个内存中的任务集合,管理任务状态。

关键说明:

       动态任务注册:

  • 使用 ScheduledTaskRegistrar 和 TaskScheduler 动态注册任务。

  • 通过 ConcurrentHashMap 维护当前运行的任务,确保任务集合的线程安全,在更新任务时,先取消旧任务再注册新任务。

  • 在注册任务前校验cron表达式合法性,避免无效表达式导致调度失败

      数据库变化监听:

  •  定时60秒检测任务change_status配置变化,发现变化后,更新或取消任务

      线程池管理:

  • 配置 ThreadPoolTaskScheduler 提供任务执行线程池。

避免默认单线程池导致任务阻塞。      任务去重:

  • 通过 taskKey 确保每个任务的唯一性,在数据库中设置 task_key 字段为唯一约束

     异常处理:

  •  在任务执行逻辑中添加 try-catch 块,防止任务异常影响调度器,并将异常信息记录于表里error_msg,状态修改为异常

     手动触发任务更新:

  •  提供REST API手动触发任务或强制刷新配置

任务表:

CREATE TABLE `dynamic_task` (`id` int(11) NOT NULL AUTO_INCREMENT,`task_name` varchar(100) NOT NULL COMMENT '任务名',`task_key` varchar(50) CHARACTER SET ujis NOT NULL COMMENT '任务key',`cron_expression` varchar(50) NOT NULL COMMENT '定期策略',`enabled` tinyint(1) DEFAULT '1',`status` tinyint(4) DEFAULT '1' COMMENT '状态:1-执行完成 0-执行中 -1 异常状态',`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',`error_msg` text COMMENT '错误信息',`notify_payload` json DEFAULT NULL,`biz_type` tinyint(4) NOT NULL COMMENT '业务类型看bizTypeEnum',`change_status` tinyint(4) DEFAULT '0' COMMENT '0:修改(待同步),1:已修改',PRIMARY KEY (`id`),UNIQUE KEY `task_key` (`task_key`) USING BTREE,KEY `change_status` (`change_status`),KEY `enabled_status` (`enabled`,`status`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;

三、核心代码实现

1、定义任务实体

@Getter
@Setter
@Accessors(chain = true)
@NoArgsConstructor
@TableName("dynamic_task")
public class DynamicTask {@TableId(value = "id", type = IdType.AUTO)private Integer id;private String taskName;private String taskKey;private String cronExpression;private Integer enabled;private Integer status;private Date createTime;private Date updateTime;private String notifyPayload;private Integer bizType;
}

2、动态任务管理器

@Slf4j
@Configuration
public class DynamicTaskManager implements SchedulingConfigurer {private final DynamicTaskService dynamicTaskService;private final TaskScheduler taskScheduler;private final UploadTargetContext uploadTargetContext;@Value("${uploadTarget}")private UploadTargetEnum uploadTarget;private final Map<String, ScheduledFuture<?>> taskMap = new ConcurrentHashMap<>();public DynamicTaskManager(DynamicTaskServiceImpl dynamicTaskService, TaskScheduler taskScheduler,UploadTargetContext uploadTargetContext) {this.dynamicTaskService = dynamicTaskService;this.taskScheduler = taskScheduler;this.uploadTargetContext = uploadTargetContext;}@Overridepublic void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {// 初始化加载所有启用的任务List<DynamicTask> tasks = dynamicTaskService.taskList(null);List<Integer> taskIdList = new ArrayList<>();for (DynamicTask task : tasks) {Integer taskId = scheduleTask(task);if (taskId != null) {taskIdList.add(taskId);}}if (CollectionUtils.isEmpty(taskIdList)) {return;}dynamicTaskService.processError(taskIdList, "表达式错误");}// 动态注册任务public Integer scheduleTask(DynamicTask task) {if (!isValidCron(task.getCronExpression())) {return task.getId();}String taskKey = task.getTaskKey();cancelTask(taskKey);// 创建新的触发器任务ScheduledFuture<?> scheduledFuture = taskScheduler.schedule(() -> executeBusinessLogic(taskKey, task.getNotifyPayload(), task.getBizType(),task.getId()),new CronTrigger(task.getCronExpression()));// 将任务存入MaptaskMap.put(taskKey, scheduledFuture);return null;}// 执行业务逻辑private void executeBusinessLogic(String taskKey, String notifyPayload, Integer bizType,Integer taskId) {try {log.info("执行任务: " + taskKey + ",时间: " + new Date());UploadTargetAdapter uploadTargetAdapter =uploadTargetContext.resolveRun(uploadTarget);BizTypeEnum byBizType = BizTypeEnum.getByBizType(bizType);if (Objects.nonNull(byBizType)) {if (StringUtil.isEmpty(notifyPayload)) {byBizType.performAction(uploadTargetAdapter, null);} else {Object data = JSON.parseObject(notifyPayload,byBizType.getClazz());byBizType.performAction(uploadTargetAdapter, data);}}} catch (Exception e) {dynamicTaskService.processError(Collections.singletonList(taskId), JSON.toJSONString(e));}}// 定时检查数据库更新(每隔60秒)@Scheduled(fixedRate = 60000)public void checkTaskUpdates() {Date date = new Date();List<DynamicTask> updatedTasks = dynamicTaskService.taskList(date);if (CollectionUtils.isEmpty(updatedTasks)) {return;}List<Integer> taskIdList = new ArrayList<>();updatedTasks.forEach(task -> {if (task.getEnabled() == 1) {scheduleTask(task);} else {cancelTask(task.getTaskKey());}taskIdList.add(task.getId());});// 更新检查状态dynamicTaskService.updateLastCheckTime(taskIdList);}// 取消任务public void cancelTask(String taskKey) {if (taskMap.containsKey(taskKey)) {taskMap.get(taskKey).cancel(false);taskMap.remove(taskKey);}}public boolean isValidCron(String cron) {return CronExpression.isValidExpression(cron);}

3、Spring配置类

@Configuration
public class SchedulerConfig {@Beanpublic TaskScheduler taskScheduler() {ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();scheduler.setPoolSize(10);scheduler.setThreadNamePrefix("dynamic-task-");HttpGlobalConfig.setTimeout(3000);return scheduler;}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词