lianshufeng 1 anno fa
parent
commit
e29931bcef

+ 31 - 0
components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/config/ExecQueueConfig.java

@@ -0,0 +1,31 @@
+package com.github.microservice.components.data.mongo.queue.config;
+
+import com.github.microservice.components.data.mongo.token.config.ResourceTokenConfiguration;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+import org.springframework.data.mongodb.repository.config.EnableMongoRepositories;
+
+@Configuration
+@EnableMongoRepositories(basePackages = {"com.github.microservice.components.data.mongo.queue.dao"})
+@ComponentScan("com.github.microservice.components.data.mongo.queue")
+@Import(ResourceTokenConfiguration.class)
+public class ExecQueueConfig {
+
+
+    /**
+     * 队列执行的延迟时间
+     */
+    public Long sleepTime(int tryIndex) {
+        return tryIndex * 1000L;
+    }
+
+    /**
+     * 最大的批量执行执行数
+     */
+    private int maxExecuteBatchCount() {
+        return 10;
+    }
+
+
+}

+ 8 - 0
components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/dao/ExecQueueDao.java

@@ -0,0 +1,8 @@
+package com.github.microservice.components.data.mongo.queue.dao;
+
+import com.github.microservice.components.data.mongo.mongo.dao.MongoDao;
+import com.github.microservice.components.data.mongo.queue.domain.ExecQueue;
+
+public interface ExecQueueDao extends MongoDao<ExecQueue> {
+
+}

+ 15 - 0
components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/dao/extend/ExecQueueDaoExtend.java

@@ -0,0 +1,15 @@
+package com.github.microservice.components.data.mongo.queue.dao.extend;
+
+import com.github.microservice.components.data.mongo.queue.domain.ExecQueue;
+
+import java.util.List;
+
+public interface ExecQueueDaoExtend {
+
+    boolean updateStatus(String id, ExecQueue.ExecQueueStatus status);
+
+
+    List<ExecQueue> scan(ExecQueue.ExecQueueStatus status, int maxCount);
+
+
+}

+ 38 - 0
components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/dao/impl/ExecQueueDaoImpl.java

@@ -0,0 +1,38 @@
+package com.github.microservice.components.data.mongo.queue.dao.impl;
+
+import com.github.microservice.components.data.mongo.mongo.helper.DBHelper;
+import com.github.microservice.components.data.mongo.queue.dao.extend.ExecQueueDaoExtend;
+import com.github.microservice.components.data.mongo.queue.domain.ExecQueue;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.domain.Sort;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.data.mongodb.core.query.Criteria;
+import org.springframework.data.mongodb.core.query.Query;
+import org.springframework.data.mongodb.core.query.Update;
+
+import java.util.List;
+
+public class ExecQueueDaoImpl implements ExecQueueDaoExtend {
+
+    @Autowired
+    private MongoTemplate mongoTemplate;
+
+    @Autowired
+    private DBHelper dbHelper;
+
+    @Override
+    public boolean updateStatus(String id, ExecQueue.ExecQueueStatus status) {
+        Update update = new Update();
+        update.set("status", status);
+        this.dbHelper.updateTime(update);
+        return this.mongoTemplate.updateFirst(Query.query(Criteria.where("_id").is(id)), update, ExecQueue.class).getModifiedCount() > 0;
+    }
+
+    @Override
+    public List<ExecQueue> scan(ExecQueue.ExecQueueStatus status, int maxCount) {
+        Query query = new Query(Criteria.where("status").is(status));
+        query.with(Sort.by(Sort.Direction.ASC, "nextTime", "workTime"));
+        query.limit(maxCount);
+        return this.mongoTemplate.find(query, ExecQueue.class);
+    }
+}

+ 73 - 0
components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/domain/ExecQueue.java

@@ -0,0 +1,73 @@
+package com.github.microservice.components.data.mongo.queue.domain;
+
+import com.github.microservice.components.data.mongo.mongo.domain.SuperEntity;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import org.springframework.context.ApplicationEvent;
+import org.springframework.data.mongodb.core.index.Indexed;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+@Data
+@Document
+@NoArgsConstructor
+@AllArgsConstructor
+public class ExecQueue extends SuperEntity {
+
+    @Indexed
+    private long nextTime;
+
+
+    //尝试次数
+    @Indexed
+    private int maxTryCount;
+
+    //尝试次数
+    @Indexed
+    private int currentTryCount;
+
+    // 类名
+    private Class<? extends Object> ObjectClass;
+
+    // 数据
+    private Object object;
+
+    //事件类型
+    private Class<? extends ApplicationEvent> eventClass;
+
+    //状态
+    @Indexed
+    private ExecQueueStatus status;
+
+
+    // 正在执行的时间,如果超时则需要重置等待队列
+    @Indexed
+    private long workTime;
+
+
+    /**
+     * 队列的状态
+     */
+    public enum ExecQueueStatus {
+
+        Wait("等待中"),
+
+        Work("进行中"),
+
+        Done("结束"),
+
+        ;
+
+        ExecQueueStatus(String name) {
+            this.name = name;
+        }
+
+        @Getter
+        private String name;
+
+
+    }
+
+
+}

+ 12 - 0
components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/event/ExecQueueEvent.java

@@ -0,0 +1,12 @@
+package com.github.microservice.components.data.mongo.queue.event;
+
+import org.springframework.context.ApplicationEvent;
+
+public class ExecQueueEvent extends ApplicationEvent {
+
+    public ExecQueueEvent(Object source) {
+        super(source);
+    }
+
+
+}

+ 72 - 0
components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/service/ExecQueueService.java

@@ -0,0 +1,72 @@
+package com.github.microservice.components.data.mongo.queue.service;
+
+import com.github.microservice.components.data.mongo.mongo.helper.DBHelper;
+import com.github.microservice.components.data.mongo.queue.config.ExecQueueConfig;
+import com.github.microservice.components.data.mongo.queue.dao.ExecQueueDao;
+import com.github.microservice.components.data.mongo.queue.domain.ExecQueue;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationEvent;
+import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
+
+@Component
+public class ExecQueueService {
+
+
+    @Autowired
+    private ExecQueueDao execQueueDao;
+
+    @Autowired
+    private ExecQueueConfig execQueueConfig;
+
+    @Autowired
+    private DBHelper dbHelper;
+
+    // 添加
+    public String add(Object o, int maxTryCount, Class<? extends ApplicationEvent> eventClass) {
+        Assert.state(maxTryCount > 0, "最大尝试次数必须大于0");
+        Assert.notNull(eventClass, "事件不能为空");
+
+        final ExecQueue execQueue = new ExecQueue();
+
+        execQueue.setObject(o);
+        execQueue.setObjectClass(o.getClass());
+        execQueue.setEventClass(eventClass);
+
+        execQueue.setCurrentTryCount(1);
+        execQueue.setMaxTryCount(maxTryCount);
+        execQueue.setStatus(ExecQueue.ExecQueueStatus.Wait);
+
+
+
+        // 更新下次执行时间
+        this.updateNextExecTime(execQueue);
+
+        this.execQueueDao.save(execQueue);
+
+        return execQueue.getId();
+    }
+
+
+    // 删除
+    public void delete(String id) {
+        execQueueDao.deleteById(id);
+    }
+
+
+    /**
+     * 更新下次执行时间
+     */
+    public boolean updateNextExecTime(ExecQueue execQueue) {
+        if (execQueue.getCurrentTryCount() > execQueue.getMaxTryCount()) {
+            return false;
+        }
+        final long sleepTime = execQueueConfig.sleepTime(execQueue.getCurrentTryCount());
+        execQueue.setCurrentTryCount(execQueue.getCurrentTryCount() + 1);
+        execQueue.setNextTime(this.dbHelper.getTime() + sleepTime);
+        execQueue.setStatus(ExecQueue.ExecQueueStatus.Wait);
+        return true;
+    }
+
+
+}

+ 81 - 0
components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/timer/ExecQueueTimer.java

@@ -0,0 +1,81 @@
+package com.github.microservice.components.data.mongo.queue.timer;
+
+
+import com.github.microservice.components.data.mongo.token.service.ResourceTokenService;
+import com.github.microservice.core.util.os.SystemUtil;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Component
+@Slf4j
+public class ExecQueueTimer implements ApplicationRunner {
+
+    @Autowired
+    private ResourceTokenService resourceTokenService;
+
+    private final static String RESOURCE_NAME = "ExecQueueTimer";
+
+
+    private ScheduledExecutorService executorService;
+
+
+    @PostConstruct
+    private void shutdown() {
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            Optional.ofNullable(executorService).ifPresent(it -> it.shutdownNow());
+        }));
+    }
+
+    @Override
+    public void run(ApplicationArguments args) throws Exception {
+        @Cleanup ResourceTokenService.Token token = this.resourceTokenService.token(RESOURCE_NAME);
+        executorService = Executors.newScheduledThreadPool(SystemUtil.getCpuCoreCount() * 2);
+        log.info("ExecQueueTimer start");
+
+        next();
+
+    }
+
+    private void next() {
+        executorService.schedule(() -> {
+
+            try {
+                updateWorkStatus();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+
+            try {
+                executeWork();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+
+            next();
+
+        }, 1, TimeUnit.SECONDS);
+    }
+
+    // 更新队列的工作状态
+    private void updateWorkStatus() {
+
+    }
+
+    //执行队列
+    private void executeWork() {
+
+    }
+
+}