lianshufeng 1 سال پیش
والد
کامیت
38fdb63fc6

+ 4 - 0
components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/dao/impl/ExecQueueDaoImpl.java

@@ -32,9 +32,12 @@ public class ExecQueueDaoImpl implements ExecQueueDaoExtend {
     @Override
     public List<ExecQueue> scanWait(int maxCount) {
         final long workTime = this.dbHelper.getTime();
+
         Query query = new Query(
                 Criteria.where("status").is(ExecQueue.ExecQueueStatus.Wait)
                         .and("nextTime").lt(this.dbHelper.getTime())
+                        // 尝试次数未超过最大次数
+                        .and("$expr").lt(new String[]{"$currentTryCount", "$maxTryCount"})
         );
         query.with(Sort.by(Sort.Direction.ASC, "nextTime", "workTime"));
 
@@ -60,6 +63,7 @@ public class ExecQueueDaoImpl implements ExecQueueDaoExtend {
 
         Update update = new Update();
         update.set("status", ExecQueue.ExecQueueStatus.Wait);
+        update.inc("currentTryCount", 1);
         this.dbHelper.updateTime(update);
         return this.mongoTemplate.updateMulti(query, update, ExecQueue.class).getModifiedCount();
     }

+ 1 - 1
components/data/MongodbData/src/test/java/demo/simple/queue/controller/QueueController.java

@@ -26,7 +26,7 @@ public class QueueController {
         return execQueueService.add(Map.of("x", UUID.randomUUID()), 10, (data) -> {
             final ExecQueueService execQueueService =    ApplicationContextHolder.getContext().getBean(ExecQueueService.class);
             log.info("execute : {} -> {} -> {}", Thread.currentThread().getName(), data, execQueueService);
-            return true;
+            return false;
         });
     }