lianshufeng 1 rok temu
rodzic
commit
346bde7473
13 zmienionych plików z 257 dodań i 496 usunięć
  1. 0 9
      components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/dao/ExecQueueDao.java
  2. 0 18
      components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/dao/extend/ExecQueueDaoExtend.java
  3. 0 71
      components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/dao/impl/ExecQueueDaoImpl.java
  4. 0 79
      components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/domain/ExecQueue.java
  5. 0 89
      components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/service/ExecQueueService.java
  6. 0 138
      components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/timer/ExecQueueTimer.java
  7. 0 23
      components/data/MongodbData/src/test/java/demo/simple/queue/config/QueueConfig.java
  8. 0 34
      components/data/MongodbData/src/test/java/demo/simple/queue/controller/QueueController.java
  9. 0 27
      components/data/MongodbData/src/test/java/demo/simple/queue/event/QuEvent.java
  10. 2 7
      super/PCore/src/main/java/com/github/microservice/core/components/queue/config/ExecQueueConfig.java
  11. 1 1
      super/PCore/src/main/java/com/github/microservice/core/components/queue/event/ExecQueueEvent.java
  12. 73 0
      super/PCore/src/main/java/com/github/microservice/core/components/queue/service/ExecQueueService.java
  13. 181 0
      super/PCore/src/main/java/com/github/microservice/core/util/queue/ExecuteQueueUtil.java

+ 0 - 9
components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/dao/ExecQueueDao.java

@@ -1,9 +0,0 @@
-package com.github.microservice.components.data.mongo.queue.dao;
-
-import com.github.microservice.components.data.mongo.mongo.dao.MongoDao;
-import com.github.microservice.components.data.mongo.queue.dao.extend.ExecQueueDaoExtend;
-import com.github.microservice.components.data.mongo.queue.domain.ExecQueue;
-
-public interface ExecQueueDao extends MongoDao<ExecQueue>, ExecQueueDaoExtend {
-
-}

+ 0 - 18
components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/dao/extend/ExecQueueDaoExtend.java

@@ -1,18 +0,0 @@
-package com.github.microservice.components.data.mongo.queue.dao.extend;
-
-import com.github.microservice.components.data.mongo.queue.domain.ExecQueue;
-
-import java.util.List;
-
-public interface ExecQueueDaoExtend {
-
-    boolean updateStatus(String id, ExecQueue.ExecQueueStatus status);
-
-
-    List<ExecQueue> scanWait(int maxCount);
-
-
-    //回复待执行队列
-    long restoreWaitStatus(long executeTimeOut);
-
-}

+ 0 - 71
components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/dao/impl/ExecQueueDaoImpl.java

@@ -1,71 +0,0 @@
-package com.github.microservice.components.data.mongo.queue.dao.impl;
-
-import com.github.microservice.components.data.mongo.mongo.helper.DBHelper;
-import com.github.microservice.components.data.mongo.queue.dao.extend.ExecQueueDaoExtend;
-import com.github.microservice.components.data.mongo.queue.domain.ExecQueue;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.domain.Sort;
-import org.springframework.data.mongodb.core.BulkOperations;
-import org.springframework.data.mongodb.core.MongoTemplate;
-import org.springframework.data.mongodb.core.query.Criteria;
-import org.springframework.data.mongodb.core.query.Query;
-import org.springframework.data.mongodb.core.query.Update;
-
-import java.util.List;
-
-public class ExecQueueDaoImpl implements ExecQueueDaoExtend {
-
-    @Autowired
-    private MongoTemplate mongoTemplate;
-
-    @Autowired
-    private DBHelper dbHelper;
-
-    @Override
-    public boolean updateStatus(String id, ExecQueue.ExecQueueStatus status) {
-        Update update = new Update();
-        update.set("status", status);
-        this.dbHelper.updateTime(update);
-        return this.mongoTemplate.updateFirst(Query.query(Criteria.where("_id").is(id)), update, ExecQueue.class).getModifiedCount() > 0;
-    }
-
-    @Override
-    public List<ExecQueue> scanWait(int maxCount) {
-        final long workTime = this.dbHelper.getTime();
-
-        Query query = new Query(
-                Criteria.where("status").is(ExecQueue.ExecQueueStatus.Wait)
-                        .and("nextTime").lt(this.dbHelper.getTime())
-                        // 尝试次数未超过最大次数
-                        .and("$expr").lte(new String[]{"$currentTryCount", "$maxTryCount"})
-        );
-        query.with(Sort.by(Sort.Direction.ASC, "nextTime", "workTime"));
-
-        Update update = new Update();
-        update.set("status", ExecQueue.ExecQueueStatus.Work);
-        update.set("workTime", workTime);
-
-        final BulkOperations bulkOperations = this.mongoTemplate.bulkOps(BulkOperations.BulkMode.ORDERED, ExecQueue.class);
-        for (int i = 0; i < maxCount; i++) {
-            bulkOperations.updateOne(query, update);
-        }
-        bulkOperations.execute();
-
-        return this.mongoTemplate.find(Query.query(Criteria.where("workTime").is(workTime)), ExecQueue.class);
-    }
-
-    @Override
-    public long restoreWaitStatus(long executeTimeOut) {
-        Query query = new Query(
-                Criteria.where("status").is(ExecQueue.ExecQueueStatus.Work)
-                        .and("workTime").lt(this.dbHelper.getTime() - executeTimeOut)
-        );
-
-        Update update = new Update();
-        update.set("status", ExecQueue.ExecQueueStatus.Wait);
-        update.inc("currentTryCount", 1);
-        this.dbHelper.updateTime(update);
-        return this.mongoTemplate.updateMulti(query, update, ExecQueue.class).getModifiedCount();
-    }
-
-}

+ 0 - 79
components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/domain/ExecQueue.java

@@ -1,79 +0,0 @@
-package com.github.microservice.components.data.mongo.queue.domain;
-
-import com.github.microservice.components.data.mongo.mongo.domain.SuperEntity;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import org.springframework.data.mongodb.core.index.Indexed;
-import org.springframework.data.mongodb.core.mapping.Document;
-
-import java.util.Date;
-import java.util.Map;
-
-@Data
-@Document
-@NoArgsConstructor
-@AllArgsConstructor
-public class ExecQueue extends SuperEntity {
-
-    @Indexed
-    private long nextTime;
-
-
-    //尝试次数
-    @Indexed
-    private int maxTryCount;
-
-    //尝试次数
-    @Indexed
-    private int currentTryCount;
-
-    // 数据
-    private Map<String, Object> data;
-
-
-    // 处理事件的代码
-    private byte[] execQueueEvent;
-
-    //状态
-    @Indexed
-    private ExecQueueStatus status;
-
-
-    // 正在执行的时间,如果超时则需要重置等待队列
-    @Indexed
-    private long workTime;
-
-    // 过期时间
-    @Indexed(expireAfterSeconds = 0)
-    private Date TTL;
-
-
-    /**
-     * 队列的状态
-     */
-    public enum ExecQueueStatus {
-
-        Wait("等待中"),
-
-        Work("进行中"),
-
-        Done("完成"),
-
-        Stop("停止"),
-
-        ;
-
-        ExecQueueStatus(String name) {
-            this.name = name;
-        }
-
-        @Getter
-        private String name;
-
-
-    }
-
-
-}

+ 0 - 89
components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/service/ExecQueueService.java

@@ -1,89 +0,0 @@
-package com.github.microservice.components.data.mongo.queue.service;
-
-import com.github.microservice.components.data.mongo.mongo.helper.DBHelper;
-import com.github.microservice.components.data.mongo.queue.config.ExecQueueConfig;
-import com.github.microservice.components.data.mongo.queue.dao.ExecQueueDao;
-import com.github.microservice.components.data.mongo.queue.domain.ExecQueue;
-import com.github.microservice.components.data.mongo.queue.event.ExecQueueEvent;
-import org.apache.commons.lang3.SerializationUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-import org.springframework.util.Assert;
-
-import java.util.Date;
-import java.util.Map;
-import java.util.Optional;
-
-@Component
-public class ExecQueueService {
-
-
-    @Autowired
-    private ExecQueueDao execQueueDao;
-
-    @Autowired
-    private ExecQueueConfig execQueueConfig;
-
-    @Autowired
-    private DBHelper dbHelper;
-
-    /**
-     * @param data
-     * @param maxTryCount
-     * @param execQueueEvent , 注意,事件必须是可序列化的
-     * @return
-     */
-    public String add(Map<String, Object> data, int maxTryCount, ExecQueueEvent execQueueEvent) {
-        return this.add(data, maxTryCount, execQueueEvent, null);
-    }
-
-
-    public String add(Map<String, Object> data, int maxTryCount, ExecQueueEvent execQueueEvent, Long ttlTime) {
-        Assert.state(maxTryCount > 0, "最大尝试次数必须大于0");
-        Assert.notNull(execQueueEvent, "事件不能为空");
-
-        final ExecQueue execQueue = new ExecQueue();
-        execQueue.setData(data);
-
-        // 事件
-        execQueue.setExecQueueEvent(SerializationUtils.serialize(execQueueEvent));
-
-        // 初始化参数
-        execQueue.setCurrentTryCount(0);
-        execQueue.setMaxTryCount(maxTryCount);
-        execQueue.setStatus(ExecQueue.ExecQueueStatus.Wait);
-
-        // 设置过期时间
-        Optional.ofNullable(ttlTime).ifPresent(it -> execQueue.setTTL(new Date(System.currentTimeMillis() + it)));
-
-        // 更新下次执行时间
-        this.updateNextExecTime(execQueue);
-
-        this.execQueueDao.save(execQueue);
-        return execQueue.getId();
-    }
-
-
-    // 删除
-    public void delete(String id) {
-        execQueueDao.deleteById(id);
-    }
-
-
-    /**
-     * 更新下次执行时间
-     */
-    public boolean updateNextExecTime(ExecQueue execQueue) {
-        if (execQueue.getCurrentTryCount() >= execQueue.getMaxTryCount()) {
-            return false;
-        }
-        execQueue.setCurrentTryCount(execQueue.getCurrentTryCount() + 1);
-        final long sleepTime = execQueueConfig.sleepTime(execQueue.getCurrentTryCount());
-        execQueue.setNextTime(this.dbHelper.getTime() + sleepTime);
-        execQueue.setStatus(ExecQueue.ExecQueueStatus.Wait);
-        this.dbHelper.updateTime(execQueue);
-        return true;
-    }
-
-
-}

+ 0 - 138
components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/timer/ExecQueueTimer.java

@@ -1,138 +0,0 @@
-package com.github.microservice.components.data.mongo.queue.timer;
-
-
-import com.github.microservice.components.data.mongo.queue.config.ExecQueueConfig;
-import com.github.microservice.components.data.mongo.queue.dao.ExecQueueDao;
-import com.github.microservice.components.data.mongo.queue.domain.ExecQueue;
-import com.github.microservice.components.data.mongo.queue.event.ExecQueueEvent;
-import com.github.microservice.components.data.mongo.queue.service.ExecQueueService;
-import com.github.microservice.components.data.mongo.token.service.ResourceTokenService;
-import com.github.microservice.core.util.os.SystemUtil;
-import lombok.Cleanup;
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.SerializationUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationEvent;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.PostConstruct;
-import java.util.Optional;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-@Component
-@Slf4j
-public class ExecQueueTimer implements ApplicationRunner {
-
-    @Autowired
-    private ResourceTokenService resourceTokenService;
-
-    @Autowired
-    private ExecQueueDao execQueueDao;
-
-    @Autowired
-    private ExecQueueConfig execQueueConfig;
-
-    @Autowired
-    private ApplicationContext applicationContext;
-
-    @Autowired
-    private ExecQueueService execQueueService;
-
-    private final static String RESOURCE_NAME = "ExecQueueTimer";
-
-
-    private ScheduledExecutorService executorService;
-
-
-    @PostConstruct
-    private void shutdown() {
-        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-            Optional.ofNullable(executorService).ifPresent(it -> it.shutdownNow());
-        }));
-    }
-
-    @Override
-    public void run(ApplicationArguments args) throws Exception {
-
-        new Thread(() -> {
-            ResourceTokenService.Token token = this.resourceTokenService.token(RESOURCE_NAME);
-            executorService = Executors.newScheduledThreadPool(SystemUtil.getCpuCoreCount() * 2);
-            log.info("ExecQueueTimer start");
-            next();
-
-        }).start();
-    }
-
-    private void next() {
-        executorService.schedule(() -> {
-
-            try {
-                restoreWaitStatus();
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-
-
-            try {
-                executeWork();
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-
-            next();
-
-        }, execQueueConfig.loopEventTime(), TimeUnit.MILLISECONDS);
-    }
-
-    // 恢复状态为等待
-    private void restoreWaitStatus() {
-        long count = this.execQueueDao.restoreWaitStatus(execQueueConfig.executeTimeOut());
-        if (count > 0) {
-            log.info("restoreWaitStatus : {}", count);
-        }
-    }
-
-    //执行队列
-    private void executeWork() {
-        execQueueDao.scanWait(execQueueConfig.maxBatchExecuteCount()).forEach((it) -> {
-            try {
-                execute(it);
-            } catch (Exception e) {
-                e.printStackTrace();
-                log.error(e.getMessage());
-            }
-        });
-    }
-
-    //触发事件
-    @SneakyThrows
-    private void execute(ExecQueue execQueue) {
-//        Class<? extends ExecQueueEvent> cls = (Class<? extends ExecQueueEvent>) Class.forName(execQueue.getEventClass());
-//        // 发布事件
-//        final ExecQueueEvent execQueueEvent = this.applicationContext.getBean(cls);
-
-        // 转换为具体的实现类
-        final ExecQueueEvent execQueueEvent = SerializationUtils.deserialize(execQueue.getExecQueueEvent());
-
-        //执行任务之前状态修改为工作中
-        this.execQueueDao.updateStatus(execQueue.getId(), ExecQueue.ExecQueueStatus.Work);
-
-        // 修改当前状态为执行完成
-        if (execQueueEvent.execute(execQueue.getData())) {
-            this.execQueueDao.updateStatus(execQueue.getId(), ExecQueue.ExecQueueStatus.Done);
-        } else if (this.execQueueService.updateNextExecTime(execQueue)) {
-            this.execQueueDao.save(execQueue);
-        } else {
-            this.execQueueDao.updateStatus(execQueue.getId(), ExecQueue.ExecQueueStatus.Stop);
-        }
-
-
-    }
-
-}

+ 0 - 23
components/data/MongodbData/src/test/java/demo/simple/queue/config/QueueConfig.java

@@ -1,23 +0,0 @@
-package demo.simple.queue.config;
-
-import com.github.microservice.components.data.mongo.queue.config.ExecQueueConfig;
-import org.springframework.context.annotation.Configuration;
-
-import java.util.Map;
-
-@Configuration
-public class QueueConfig extends ExecQueueConfig {
-
-    private static final Map<Integer, Long> SleepTime = Map.of(
-            1, 2000L,
-            2, 3000L,
-            3, 5000L,
-            4, 7000L,
-            5, 11000L
-    );
-
-    @Override
-    public long sleepTime(int tryIndex) {
-        return SleepTime.getOrDefault(tryIndex, 1000L * 60);
-    }
-}

+ 0 - 34
components/data/MongodbData/src/test/java/demo/simple/queue/controller/QueueController.java

@@ -1,34 +0,0 @@
-package demo.simple.queue.controller;
-
-import com.github.microservice.components.data.mongo.queue.service.ExecQueueService;
-import com.github.microservice.core.helper.ApplicationContextHolder;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
-
-import java.util.Map;
-import java.util.UUID;
-
-@RequestMapping("queue")
-@RestController
-@Slf4j
-public class QueueController {
-
-    @Autowired
-    private ExecQueueService execQueueService;
-
-
-
-    @RequestMapping("add")
-    public Object add() {
-
-        return execQueueService.add(Map.of("x", UUID.randomUUID()), 10, (data) -> {
-            final ExecQueueService execQueueService =    ApplicationContextHolder.getContext().getBean(ExecQueueService.class);
-            log.info("execute : {} -> {} -> {}", Thread.currentThread().getName(), data, execQueueService);
-            return false;
-        });
-    }
-
-
-}

+ 0 - 27
components/data/MongodbData/src/test/java/demo/simple/queue/event/QuEvent.java

@@ -1,27 +0,0 @@
-package demo.simple.queue.event;
-
-import com.github.microservice.components.data.mongo.queue.event.ExecQueueEvent;
-import com.github.microservice.core.util.random.RandomUtil;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-import java.util.Map;
-
-@Slf4j
-@Component
-public class QuEvent implements ExecQueueEvent {
-
-
-    @Override
-    public boolean execute(Map<String,Object> o) {
-        log.info("time : {} -> {}", System.currentTimeMillis(), o);
-        int random = RandomUtil.nextInt(1, 100);
-        if (random % 11 == 0) {
-            return true;
-        }
-//        else if (random % 19 == 0) {
-//            System.exit(0);
-//        }
-        return false;
-    }
-}

+ 2 - 7
components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/config/ExecQueueConfig.java → super/PCore/src/main/java/com/github/microservice/core/components/queue/config/ExecQueueConfig.java

@@ -1,15 +1,10 @@
-package com.github.microservice.components.data.mongo.queue.config;
+package com.github.microservice.core.components.queue.config;
 
-import com.github.microservice.components.data.mongo.token.config.ResourceTokenConfiguration;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Import;
-import org.springframework.data.mongodb.repository.config.EnableMongoRepositories;
 
 @Configuration
-@EnableMongoRepositories(basePackages = {"com.github.microservice.components.data.mongo.queue.dao"})
-@ComponentScan("com.github.microservice.components.data.mongo.queue")
-@Import(ResourceTokenConfiguration.class)
+@ComponentScan("com.github.microservice.core.components.queue.service")
 public class ExecQueueConfig {
 
 

+ 1 - 1
components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/event/ExecQueueEvent.java → super/PCore/src/main/java/com/github/microservice/core/components/queue/event/ExecQueueEvent.java

@@ -1,4 +1,4 @@
-package com.github.microservice.components.data.mongo.queue.event;
+package com.github.microservice.core.components.queue.event;
 
 import java.io.Serializable;
 import java.util.Map;

+ 73 - 0
super/PCore/src/main/java/com/github/microservice/core/components/queue/service/ExecQueueService.java

@@ -0,0 +1,73 @@
+//package com.github.microservice.core.components.queue.service;
+//
+//import com.github.microservice.core.components.queue.config.ExecQueueConfig;
+//import com.github.microservice.core.components.queue.event.ExecQueueEvent;
+//import lombok.SneakyThrows;
+//import org.apache.commons.lang3.SerializationUtils;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.stereotype.Component;
+//import org.springframework.util.Assert;
+//
+//import java.util.Date;
+//import java.util.Map;
+//import java.util.Optional;
+//
+//@Component
+//public class ExecQueueService {
+//
+//
+//
+//    @Autowired
+//    private ExecQueueConfig execQueueConfig;
+//
+//
+//    public String add(Map<String, Object> data, int maxTryCount, ExecQueueEvent execQueueEvent) {
+//        return this.add(data, maxTryCount, execQueueEvent, null);
+//    }
+//
+//
+//    public String add(Map<String, Object> data, int maxTryCount, ExecQueueEvent execQueueEvent, Long ttlTime) {
+//        Assert.state(maxTryCount > 0, "最大尝试次数必须大于0");
+//        Assert.notNull(execQueueEvent, "事件不能为空");
+//
+//        final ExecQueue execQueue = new ExecQueue();
+//        execQueue.setData(data);
+//
+//        // 事件
+//        execQueue.setExecQueueEvent(SerializationUtils.serialize(execQueueEvent));
+//
+//        // 初始化参数
+//        execQueue.setCurrentTryCount(0);
+//        execQueue.setMaxTryCount(maxTryCount);
+//        execQueue.setStatus(ExecQueue.ExecQueueStatus.Wait);
+//
+//        // 设置过期时间
+//        Optional.ofNullable(ttlTime).ifPresent(it -> execQueue.setTTL(new Date(System.currentTimeMillis() + it)));
+//
+//
+//        this.execQueueDao.save(execQueue);
+//        return execQueue.getId();
+//    }
+//
+//
+//
+//
+//    /**
+//     * 更新下次执行时间
+//     */
+////    public boolean updateNextExecTime(ExecQueue execQueue) {
+////        if (execQueue.getCurrentTryCount() >= execQueue.getMaxTryCount()) {
+////            return false;
+////        }
+////        execQueue.setCurrentTryCount(execQueue.getCurrentTryCount() + 1);
+////        final long sleepTime = execQueueConfig.sleepTime(execQueue.getCurrentTryCount());
+////        execQueue.setNextTime(this.dbHelper.getTime() + sleepTime);
+////        execQueue.setStatus(ExecQueue.ExecQueueStatus.Wait);
+////        this.dbHelper.updateTime(execQueue);
+////        return true;
+////    }
+//
+//
+//
+//
+//}

+ 181 - 0
super/PCore/src/main/java/com/github/microservice/core/util/queue/ExecuteQueueUtil.java

@@ -0,0 +1,181 @@
+package com.github.microservice.core.util.queue;
+
+import com.github.microservice.core.util.os.SystemUtil;
+import lombok.*;
+import lombok.experimental.Accessors;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.util.Assert;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class ExecuteQueueUtil {
+
+    private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(SystemUtil.getCpuCoreCount() * 2);
+
+
+    static {
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            executorService.shutdownNow();
+        }));
+    }
+
+    @SneakyThrows
+    public static <T> T execute(int maxTryCount, SleepTime sleepTime, Handle<T> handle) {
+        Assert.state(maxTryCount > 0, "最大执行次数必须大于0");
+        Assert.notNull(sleepTime, "休眠时间不能为空");
+        Assert.notNull(handle, "处理器不能为空");
+
+        final long startTime = System.currentTimeMillis();
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        final Item<T> item = new Item<T>() {{
+            setMaxTryCount(maxTryCount);
+            setStartTime(startTime);
+            setSleepTime(sleepTime);
+            setCountDownLatch(countDownLatch);
+            setHandle(handle);
+        }};
+        sleepExecute(1, item);
+        countDownLatch.await();
+
+        return item.getResult().getData();
+    }
+
+    // 延迟执行
+    private static <T> void sleepExecute(int index, Item<T> item) {
+        executorService.schedule(() -> {
+            try {
+                final RunTime runTime = new RunTime() {{
+                    setIndex(index);
+                    setStartTime(item.getStartTime());
+                }};
+                //记录结果
+                item.setResult(item.getHandle().execute(runTime));
+            } catch (Exception e) {
+                e.printStackTrace();
+                log.error(e.getMessage());
+            }
+
+            // 异步则退出阻塞
+            if (item.getResult().isAsync()) {
+                countDown(item.getCountDownLatch());
+            }
+
+            // 完成则退出阻塞
+            if (item.getResult().isDone()) {
+                countDown(item.getCountDownLatch());
+                // 结束,不在执行循环
+                return;
+            }
+
+            if (index < item.getMaxTryCount()) {
+                sleepExecute(index + 1, item);
+            } else {
+                countDown(item.getCountDownLatch());
+                return;
+            }
+        }, item.getSleepTime().get(index), TimeUnit.MILLISECONDS);
+    }
+
+    private static void countDown(CountDownLatch countDownLatch) {
+        if (countDownLatch.getCount() > 0) {
+            countDownLatch.countDown();
+        }
+    }
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    public static class Item<T> {
+        private int maxTryCount;
+        private long startTime;
+        private SleepTime sleepTime;
+        private Handle<T> handle;
+        private CountDownLatch countDownLatch;
+
+        //结果集
+        private Result<T> result;
+    }
+
+
+    @Data
+    @Builder
+    @NoArgsConstructor
+    @AllArgsConstructor
+    @Accessors(chain = true)
+    public static class Result<T> {
+        // 返回的数据
+        private T data;
+
+        // 是否异步
+        private boolean async;
+
+        //是否结束
+        private boolean done;
+
+    }
+
+
+    // 休眠时间
+    public interface SleepTime {
+        long get(int index);
+    }
+
+
+    // 处理器
+    public interface Handle<T> {
+        Result<T> execute(RunTime runTime);
+    }
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    public static class RunTime {
+        private int index;
+        private long startTime;
+    }
+
+    
+//    @SneakyThrows
+//    public static void main(String[] args) {
+//
+//        String a = execute(10, (index) -> {
+//            return Map.of(1, 1000L, 2, 1000L, 3, 1000L, 4, 5000L).getOrDefault(index, 5000L);
+//        }, (runTime) -> {
+//            long time = System.currentTimeMillis() - runTime.getStartTime();
+//            System.out.println("------------" + Thread.currentThread());
+//            System.out.println(runTime.getIndex() + "," + time);
+//
+//            //异常
+////            if (runTime.getIndex() >= 3) {
+////                throw new RuntimeException("模拟异常");
+////            }
+//
+////            if (runTime.getIndex() >= 5) {
+////                return new Result<String>().setData("放弃").setDone(true);
+////            }
+//
+//            // 时间异步
+//            if (time > 3000) {
+//                return new Result<String>().setData("异步线程继续").setAsync(true);
+//            }
+//
+//            // 次数模拟完成
+////            if (runTime.getIndex() >= 5) {
+////                return new Result<String>().setData("完成").setDone(true);
+////            }
+//
+//
+//            return new Result<String>().setData("未完成");
+//        });
+//
+//
+//        System.out.println("结果: " + a);
+//
+//    }
+
+
+}