|
|
@@ -0,0 +1,163 @@
|
|
|
+package com.zhongshu.card.server.core.service.schedule;
|
|
|
+
|
|
|
+import com.zhongshu.card.client.utils.DateUtils;
|
|
|
+import com.zhongshu.card.server.core.dao.schedule.ScheduleLogDao;
|
|
|
+import com.zhongshu.card.server.core.dao.schedule.ScheduleTaskConfigDao;
|
|
|
+import com.zhongshu.card.server.core.domain.schedule.ScheduleLog;
|
|
|
+import com.zhongshu.card.server.core.domain.schedule.ScheduleTaskConfig;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.scheduling.annotation.SchedulingConfigurer;
|
|
|
+import org.springframework.scheduling.config.CronTask;
|
|
|
+import org.springframework.scheduling.config.ScheduledTaskRegistrar;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
+import org.springframework.util.StopWatch;
|
|
|
+import org.springframework.util.StringUtils;
|
|
|
+
|
|
|
+import javax.annotation.PreDestroy;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Objects;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.ScheduledFuture;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author TRX
|
|
|
+ * @date 2024/11/6
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+public class ScheduledTask implements SchedulingConfigurer {
|
|
|
+
|
|
|
+ // 动态定时任务
|
|
|
+ private volatile ScheduledTaskRegistrar registrar;
|
|
|
+
|
|
|
+ private final ConcurrentHashMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ private final ConcurrentHashMap<String, CronTask> cronTasks = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private ScheduleTaskConfigDao scheduleTaskConfigDao;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private ScheduleLogDao scheduleLogDao;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private TaskContextService taskContextService;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
|
|
|
+ registrar.setScheduler(Executors.newScheduledThreadPool(16));
|
|
|
+ this.registrar = registrar;
|
|
|
+ }
|
|
|
+
|
|
|
+ @PreDestroy
|
|
|
+ public void destroy() {
|
|
|
+ this.registrar.destroy();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void refreshTask(List<ScheduleTaskConfig> tasks) {
|
|
|
+
|
|
|
+ // 删除已经取消任务
|
|
|
+ scheduledFutures.keySet().forEach(key -> {
|
|
|
+ //如果为空,取消所有
|
|
|
+ if (Objects.isNull(tasks) || tasks.size() == 0) {
|
|
|
+ scheduledFutures.get(key).cancel(false);
|
|
|
+ scheduledFutures.remove(key);
|
|
|
+ cronTasks.remove(key);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ tasks.forEach(task -> {
|
|
|
+ if (!Objects.equals(key, task.getAboutDataId())) {
|
|
|
+ if (scheduledFutures.containsKey(key)) {
|
|
|
+ scheduledFutures.get(key).cancel(false);
|
|
|
+ scheduledFutures.remove(key);
|
|
|
+ cronTasks.remove(key);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ });
|
|
|
+
|
|
|
+ // 添加新任务、更改执行规则任务
|
|
|
+ tasks.forEach(item -> {
|
|
|
+ String expression = item.getExpression();
|
|
|
+ // 任务表达式为空则跳过
|
|
|
+ if (StringUtils.isEmpty(expression)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 任务已存在并且表达式未发生变化则跳过
|
|
|
+ if (scheduledFutures.containsKey(item.getAboutDataId()) && cronTasks.get(item.getAboutDataId()).getExpression().equals(expression)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 任务执行时间发生了变化,则删除该任务
|
|
|
+ if (scheduledFutures.containsKey(item.getAboutDataId())) {
|
|
|
+ scheduledFutures.get(item.getAboutDataId()).cancel(false);
|
|
|
+ scheduledFutures.remove(item.getAboutDataId());
|
|
|
+ cronTasks.remove(item.getAboutDataId());
|
|
|
+ }
|
|
|
+
|
|
|
+ CronTask task = new CronTask(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ StopWatch stopWatch = new StopWatch("flow-unlock-task-watch-" + Thread.currentThread().getName());
|
|
|
+ ScheduleLog scheduleLog = new ScheduleLog();
|
|
|
+ scheduleLog.setExecuteTime(System.currentTimeMillis());
|
|
|
+ scheduleLog.setExecuteTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
|
|
|
+ // 执行业务逻辑
|
|
|
+ try {
|
|
|
+ log.info("====执行单个任务,配置ID【{}】,任务ID【{}】执行规则【{}】=======",
|
|
|
+ item.getId(), item.getAboutDataId(), item.getExpression());
|
|
|
+ scheduleLog.setScheduleTaskConfig(item);
|
|
|
+
|
|
|
+ String token = scheduleTaskConfigDao.acquire(item.getId(), 10000L);
|
|
|
+ boolean newLock = org.apache.commons.lang3.StringUtils.isNotEmpty(token);
|
|
|
+ if (newLock) {
|
|
|
+ log.info("获取到执行锁,开始执行");
|
|
|
+
|
|
|
+ Long begin = DateUtils.getCurrentDayStartTime().getTime();
|
|
|
+ Long end = DateUtils.getCurrentDayEndTime().getTime();
|
|
|
+ List<ScheduleLog> existsLogs = scheduleLogDao.findByScheduleTaskConfigAndCreateTimeBetweenAndStatus(item, begin, end, ScheduleLog.STATUS_SUCESS);
|
|
|
+
|
|
|
+ if (CollectionUtils.isEmpty(existsLogs)) {
|
|
|
+ stopWatch.start("item:" + item.getAboutDataId());
|
|
|
+ taskContextService.execute(item);
|
|
|
+ scheduleLog.setStatus(ScheduleLog.STATUS_SUCESS);
|
|
|
+ scheduleLog.setResult("执行成功");
|
|
|
+ } else {
|
|
|
+ log.warn("已查询到执行结果,跳过执行,已执行成功记录数:{}", existsLogs.size());
|
|
|
+ scheduleLog.setStatus(ScheduleLog.STATUS_PASS);
|
|
|
+ scheduleLog.setResult("已查询到执行结果,跳过执行");
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ log.warn("未获取到执行锁,跳过执行");
|
|
|
+ scheduleLog.setStatus(ScheduleLog.STATUS_PASS);
|
|
|
+ scheduleLog.setResult("未获取到执行锁,跳过执行");
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("执行任务异常,异常信息:", e);
|
|
|
+ scheduleLog.setStatus(ScheduleLog.STATUS_FAILED);
|
|
|
+ scheduleLog.setException("执行出错:" + e.getStackTrace().toString());
|
|
|
+ } finally {
|
|
|
+ if (stopWatch.isRunning()) {
|
|
|
+ stopWatch.stop();
|
|
|
+ scheduleLog.setCostTime(stopWatch.getLastTaskTimeMillis());
|
|
|
+ log.info("任务执行结束,耗时:{}ms", stopWatch.getLastTaskTimeMillis());
|
|
|
+ }
|
|
|
+
|
|
|
+ scheduleLogDao.save(scheduleLog);
|
|
|
+ scheduleTaskConfigDao.release(item.getId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }, expression);
|
|
|
+
|
|
|
+ ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());
|
|
|
+ cronTasks.put(item.getAboutDataId(), task);
|
|
|
+ scheduledFutures.put(item.getAboutDataId(), future);
|
|
|
+ });
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+}
|