|
@@ -6,6 +6,8 @@ import com.zhongshu.card.server.core.dao.schedule.ScheduleTaskConfigDao;
|
|
|
import com.zhongshu.card.server.core.domain.schedule.ScheduleLog;
|
|
import com.zhongshu.card.server.core.domain.schedule.ScheduleLog;
|
|
|
import com.zhongshu.card.server.core.domain.schedule.ScheduleTaskConfig;
|
|
import com.zhongshu.card.server.core.domain.schedule.ScheduleTaskConfig;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
+import org.apache.commons.lang3.ObjectUtils;
|
|
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.scheduling.annotation.SchedulingConfigurer;
|
|
import org.springframework.scheduling.annotation.SchedulingConfigurer;
|
|
|
import org.springframework.scheduling.config.CronTask;
|
|
import org.springframework.scheduling.config.CronTask;
|
|
@@ -13,7 +15,6 @@ import org.springframework.scheduling.config.ScheduledTaskRegistrar;
|
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
|
import org.springframework.util.CollectionUtils;
|
|
import org.springframework.util.CollectionUtils;
|
|
|
import org.springframework.util.StopWatch;
|
|
import org.springframework.util.StopWatch;
|
|
|
-import org.springframework.util.StringUtils;
|
|
|
|
|
|
|
|
|
|
import javax.annotation.PreDestroy;
|
|
import javax.annotation.PreDestroy;
|
|
|
import java.util.List;
|
|
import java.util.List;
|
|
@@ -48,8 +49,8 @@ public class ScheduledTask implements SchedulingConfigurer {
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
|
|
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
|
|
|
- registrar.setScheduler(Executors.newScheduledThreadPool(16));
|
|
|
|
|
- this.registrar = registrar;
|
|
|
|
|
|
|
+ taskRegistrar.setScheduler(Executors.newScheduledThreadPool(16));
|
|
|
|
|
+ this.registrar = taskRegistrar;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
@PreDestroy
|
|
@PreDestroy
|
|
@@ -62,7 +63,7 @@ public class ScheduledTask implements SchedulingConfigurer {
|
|
|
// 删除已经取消任务
|
|
// 删除已经取消任务
|
|
|
scheduledFutures.keySet().forEach(key -> {
|
|
scheduledFutures.keySet().forEach(key -> {
|
|
|
//如果为空,取消所有
|
|
//如果为空,取消所有
|
|
|
- if (Objects.isNull(tasks) || tasks.size() == 0) {
|
|
|
|
|
|
|
+ if (ObjectUtils.isEmpty(tasks)) {
|
|
|
scheduledFutures.get(key).cancel(false);
|
|
scheduledFutures.get(key).cancel(false);
|
|
|
scheduledFutures.remove(key);
|
|
scheduledFutures.remove(key);
|
|
|
cronTasks.remove(key);
|
|
cronTasks.remove(key);
|
|
@@ -79,85 +80,86 @@ public class ScheduledTask implements SchedulingConfigurer {
|
|
|
});
|
|
});
|
|
|
});
|
|
});
|
|
|
|
|
|
|
|
- // 添加新任务、更改执行规则任务
|
|
|
|
|
- tasks.forEach(item -> {
|
|
|
|
|
- String expression = item.getExpression();
|
|
|
|
|
- // 任务表达式为空则跳过
|
|
|
|
|
- if (StringUtils.isEmpty(expression)) {
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ if (ObjectUtils.isNotEmpty(tasks)) {
|
|
|
|
|
+ // 添加新任务、更改执行规则任务
|
|
|
|
|
+ tasks.forEach(item -> {
|
|
|
|
|
+ String expression = item.getExpression();
|
|
|
|
|
+ // 任务表达式为空则跳过
|
|
|
|
|
+ if (StringUtils.isEmpty(expression)) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- // 任务已存在并且表达式未发生变化则跳过
|
|
|
|
|
- if (scheduledFutures.containsKey(item.getAboutDataId()) && cronTasks.get(item.getAboutDataId()).getExpression().equals(expression)) {
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // 任务已存在并且表达式未发生变化则跳过
|
|
|
|
|
+ if (scheduledFutures.containsKey(item.getAboutDataId()) && cronTasks.get(item.getAboutDataId()).getExpression().equals(expression)) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- // 任务执行时间发生了变化,则删除该任务
|
|
|
|
|
- if (scheduledFutures.containsKey(item.getAboutDataId())) {
|
|
|
|
|
- scheduledFutures.get(item.getAboutDataId()).cancel(false);
|
|
|
|
|
- scheduledFutures.remove(item.getAboutDataId());
|
|
|
|
|
- cronTasks.remove(item.getAboutDataId());
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // 任务执行时间发生了变化,则删除该任务
|
|
|
|
|
+ if (scheduledFutures.containsKey(item.getAboutDataId())) {
|
|
|
|
|
+ scheduledFutures.get(item.getAboutDataId()).cancel(false);
|
|
|
|
|
+ scheduledFutures.remove(item.getAboutDataId());
|
|
|
|
|
+ cronTasks.remove(item.getAboutDataId());
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- CronTask task = new CronTask(new Runnable() {
|
|
|
|
|
- @Override
|
|
|
|
|
- public void run() {
|
|
|
|
|
- StopWatch stopWatch = new StopWatch("flow-unlock-task-watch-" + Thread.currentThread().getName());
|
|
|
|
|
- ScheduleLog scheduleLog = new ScheduleLog();
|
|
|
|
|
- scheduleLog.setExecuteTime(System.currentTimeMillis());
|
|
|
|
|
- scheduleLog.setExecuteTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
|
|
|
|
|
- // 执行业务逻辑
|
|
|
|
|
- try {
|
|
|
|
|
- log.info("====执行单个任务,配置ID【{}】,任务ID【{}】执行规则【{}】=======",
|
|
|
|
|
- item.getId(), item.getAboutDataId(), item.getExpression());
|
|
|
|
|
- scheduleLog.setScheduleTaskConfig(item);
|
|
|
|
|
-
|
|
|
|
|
- String token = scheduleTaskConfigDao.acquire(item.getId(), 10000L);
|
|
|
|
|
- boolean newLock = org.apache.commons.lang3.StringUtils.isNotEmpty(token);
|
|
|
|
|
- if (newLock) {
|
|
|
|
|
- log.info("获取到执行锁,开始执行");
|
|
|
|
|
-
|
|
|
|
|
- Long begin = DateUtils.getCurrentDayStartTime().getTime();
|
|
|
|
|
- Long end = DateUtils.getCurrentDayEndTime().getTime();
|
|
|
|
|
- List<ScheduleLog> existsLogs = scheduleLogDao.findByScheduleTaskConfigAndCreateTimeBetweenAndStatus(item, begin, end, ScheduleLog.STATUS_SUCESS);
|
|
|
|
|
-
|
|
|
|
|
- if (CollectionUtils.isEmpty(existsLogs)) {
|
|
|
|
|
- stopWatch.start("item:" + item.getAboutDataId());
|
|
|
|
|
- taskContextService.execute(item);
|
|
|
|
|
- scheduleLog.setStatus(ScheduleLog.STATUS_SUCESS);
|
|
|
|
|
- scheduleLog.setResult("执行成功");
|
|
|
|
|
|
|
+ CronTask cronTask = new CronTask(new Runnable() {
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void run() {
|
|
|
|
|
+ StopWatch stopWatch = new StopWatch("flow-unlock-task-watch-" + Thread.currentThread().getName());
|
|
|
|
|
+ ScheduleLog scheduleLog = new ScheduleLog();
|
|
|
|
|
+ scheduleLog.setExecuteTime(System.currentTimeMillis());
|
|
|
|
|
+ scheduleLog.setExecuteTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
|
|
|
|
|
+ // 执行业务逻辑
|
|
|
|
|
+ try {
|
|
|
|
|
+ log.info("====执行单个任务,配置ID【{}】,任务ID【{}】执行规则【{}】=======", item.getId(), item.getAboutDataId(), item.getExpression());
|
|
|
|
|
+ scheduleLog.setScheduleTaskConfig(item);
|
|
|
|
|
+ // 得到任务执行锁
|
|
|
|
|
+ String token = scheduleTaskConfigDao.acquire(item.getId(), 10000L);
|
|
|
|
|
+ if (StringUtils.isNotEmpty(token)) {
|
|
|
|
|
+ log.info("获取到执行锁,开始执行");
|
|
|
|
|
+
|
|
|
|
|
+ Long begin = DateUtils.getCurrentDayStartTime().getTime();
|
|
|
|
|
+ Long end = DateUtils.getCurrentDayEndTime().getTime();
|
|
|
|
|
+ List<ScheduleLog> existsLogs = scheduleLogDao.findByScheduleTaskConfigAndCreateTimeBetweenAndStatus(item, begin, end, ScheduleLog.STATUS_SUCESS);
|
|
|
|
|
+
|
|
|
|
|
+ if (CollectionUtils.isEmpty(existsLogs)) {
|
|
|
|
|
+ stopWatch.start("item:" + item.getAboutDataId());
|
|
|
|
|
+ taskContextService.execute(item);
|
|
|
|
|
+ scheduleLog.setStatus(ScheduleLog.STATUS_SUCESS);
|
|
|
|
|
+ scheduleLog.setResult("执行成功");
|
|
|
|
|
+ } else {
|
|
|
|
|
+ log.warn("已查询到执行结果,跳过执行,已执行成功记录数:{}", existsLogs.size());
|
|
|
|
|
+ scheduleLog.setStatus(ScheduleLog.STATUS_PASS);
|
|
|
|
|
+ scheduleLog.setResult("已查询到执行结果,跳过执行");
|
|
|
|
|
+ }
|
|
|
} else {
|
|
} else {
|
|
|
- log.warn("已查询到执行结果,跳过执行,已执行成功记录数:{}", existsLogs.size());
|
|
|
|
|
|
|
+ log.warn("未获取到执行锁,跳过执行");
|
|
|
scheduleLog.setStatus(ScheduleLog.STATUS_PASS);
|
|
scheduleLog.setStatus(ScheduleLog.STATUS_PASS);
|
|
|
- scheduleLog.setResult("已查询到执行结果,跳过执行");
|
|
|
|
|
|
|
+ scheduleLog.setResult("未获取到执行锁,跳过执行");
|
|
|
}
|
|
}
|
|
|
- } else {
|
|
|
|
|
- log.warn("未获取到执行锁,跳过执行");
|
|
|
|
|
- scheduleLog.setStatus(ScheduleLog.STATUS_PASS);
|
|
|
|
|
- scheduleLog.setResult("未获取到执行锁,跳过执行");
|
|
|
|
|
- }
|
|
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- log.error("执行任务异常,异常信息:", e);
|
|
|
|
|
- scheduleLog.setStatus(ScheduleLog.STATUS_FAILED);
|
|
|
|
|
- scheduleLog.setException("执行出错:" + e.getStackTrace().toString());
|
|
|
|
|
- } finally {
|
|
|
|
|
- if (stopWatch.isRunning()) {
|
|
|
|
|
- stopWatch.stop();
|
|
|
|
|
- scheduleLog.setCostTime(stopWatch.getLastTaskTimeMillis());
|
|
|
|
|
- log.info("任务执行结束,耗时:{}ms", stopWatch.getLastTaskTimeMillis());
|
|
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("执行任务异常,异常信息:", e);
|
|
|
|
|
+ scheduleLog.setStatus(ScheduleLog.STATUS_FAILED);
|
|
|
|
|
+ scheduleLog.setException("执行出错:" + e.getStackTrace().toString());
|
|
|
|
|
+ } finally {
|
|
|
|
|
+ if (stopWatch.isRunning()) {
|
|
|
|
|
+ stopWatch.stop();
|
|
|
|
|
+ scheduleLog.setCostTime(stopWatch.getLastTaskTimeMillis());
|
|
|
|
|
+ log.info("任务执行结束,耗时:{}ms", stopWatch.getLastTaskTimeMillis());
|
|
|
|
|
+ }
|
|
|
|
|
+ // 保存执行日志
|
|
|
|
|
+ scheduleLogDao.save(scheduleLog);
|
|
|
|
|
+ // 释放执行锁
|
|
|
|
|
+ scheduleTaskConfigDao.release(item.getId());
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
- scheduleLogDao.save(scheduleLog);
|
|
|
|
|
- scheduleTaskConfigDao.release(item.getId());
|
|
|
|
|
}
|
|
}
|
|
|
- }
|
|
|
|
|
- }, expression);
|
|
|
|
|
-
|
|
|
|
|
- ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());
|
|
|
|
|
- cronTasks.put(item.getAboutDataId(), task);
|
|
|
|
|
- scheduledFutures.put(item.getAboutDataId(), future);
|
|
|
|
|
- });
|
|
|
|
|
|
|
+ }, expression);
|
|
|
|
|
|
|
|
|
|
+ ScheduledFuture<?> future = registrar.getScheduler().
|
|
|
|
|
+ schedule(cronTask.getRunnable(), cronTask.getTrigger());
|
|
|
|
|
+ cronTasks.put(item.getAboutDataId(), cronTask);
|
|
|
|
|
+ scheduledFutures.put(item.getAboutDataId(), future);
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
}
|
|
}
|