|
|
@@ -1,13 +1,21 @@
|
|
|
package com.github.microservice.components.data.mongo.queue.timer;
|
|
|
|
|
|
|
|
|
+import com.github.microservice.components.data.mongo.queue.config.ExecQueueConfig;
|
|
|
+import com.github.microservice.components.data.mongo.queue.dao.ExecQueueDao;
|
|
|
+import com.github.microservice.components.data.mongo.queue.domain.ExecQueue;
|
|
|
+import com.github.microservice.components.data.mongo.queue.event.ExecQueueEvent;
|
|
|
+import com.github.microservice.components.data.mongo.queue.service.ExecQueueService;
|
|
|
import com.github.microservice.components.data.mongo.token.service.ResourceTokenService;
|
|
|
import com.github.microservice.core.util.os.SystemUtil;
|
|
|
import lombok.Cleanup;
|
|
|
+import lombok.SneakyThrows;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.boot.ApplicationArguments;
|
|
|
import org.springframework.boot.ApplicationRunner;
|
|
|
+import org.springframework.context.ApplicationContext;
|
|
|
+import org.springframework.context.ApplicationEvent;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
@@ -23,6 +31,18 @@ public class ExecQueueTimer implements ApplicationRunner {
|
|
|
@Autowired
|
|
|
private ResourceTokenService resourceTokenService;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ private ExecQueueDao execQueueDao;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private ExecQueueConfig execQueueConfig;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private ApplicationContext applicationContext;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private ExecQueueService execQueueService;
|
|
|
+
|
|
|
private final static String RESOURCE_NAME = "ExecQueueTimer";
|
|
|
|
|
|
|
|
|
@@ -50,7 +70,7 @@ public class ExecQueueTimer implements ApplicationRunner {
|
|
|
executorService.schedule(() -> {
|
|
|
|
|
|
try {
|
|
|
- updateWorkStatus();
|
|
|
+ restoreWaitStatus();
|
|
|
} catch (Exception e) {
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
@@ -62,19 +82,50 @@ public class ExecQueueTimer implements ApplicationRunner {
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
|
|
|
-
|
|
|
next();
|
|
|
|
|
|
- }, 1, TimeUnit.SECONDS);
|
|
|
+ }, execQueueConfig.loopEventTime(), TimeUnit.MILLISECONDS);
|
|
|
}
|
|
|
|
|
|
- // 更新队列的工作状态
|
|
|
- private void updateWorkStatus() {
|
|
|
-
|
|
|
+ // 恢复状态为等待
|
|
|
+ private void restoreWaitStatus() {
|
|
|
+ long count = this.execQueueDao.restoreWaitStatus(execQueueConfig.executeTimeOut());
|
|
|
+ if (count > 0) {
|
|
|
+ log.info("restoreWaitStatus : {}", count);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
//执行队列
|
|
|
private void executeWork() {
|
|
|
+ execQueueDao.scanWait(execQueueConfig.maxBatchExecuteCount()).forEach((it) -> {
|
|
|
+ try {
|
|
|
+ execute(it);
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ log.error(e.getMessage());
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ //触发事件
|
|
|
+ @SneakyThrows
|
|
|
+ private void execute(ExecQueue execQueue) {
|
|
|
+ Class<? extends ExecQueueEvent> cls = (Class<? extends ExecQueueEvent>) Class.forName(execQueue.getEventClass());
|
|
|
+ // 发布事件
|
|
|
+ final ExecQueueEvent execQueueEvent = this.applicationContext.getBean(cls);
|
|
|
+
|
|
|
+ //执行任务之前状态修改为工作中
|
|
|
+ this.execQueueDao.updateStatus(execQueue.getId(), ExecQueue.ExecQueueStatus.Work);
|
|
|
+
|
|
|
+ // 修改当前状态为执行完成
|
|
|
+ if (execQueueEvent.execute(execQueue.getData())) {
|
|
|
+ this.execQueueDao.updateStatus(execQueue.getId(), ExecQueue.ExecQueueStatus.Done);
|
|
|
+ } else if (this.execQueueService.updateNextExecTime(execQueue)) {
|
|
|
+ this.execQueueDao.save(execQueue);
|
|
|
+ } else {
|
|
|
+ this.execQueueDao.updateStatus(execQueue.getId(), ExecQueue.ExecQueueStatus.Stop);
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
}
|
|
|
|