lianshufeng пре 1 година
родитељ
комит
e04b9646d7

+ 3 - 2
components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/domain/ExecQueue.java

@@ -31,8 +31,9 @@ public class ExecQueue extends SuperEntity {
     // 数据
     private Map<String,Object> data;
 
-    //事件类型
-    private String eventClass;
+
+    // 处理事件的代码
+    private byte[] execQueueEvent;
 
     //状态
     @Indexed

+ 2 - 1
components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/event/ExecQueueEvent.java

@@ -1,8 +1,9 @@
 package com.github.microservice.components.data.mongo.queue.event;
 
+import java.io.Serializable;
 import java.util.Map;
 
-public interface ExecQueueEvent {
+public interface ExecQueueEvent extends Serializable {
 
     boolean execute(Map<String,Object> o);
 }

+ 12 - 5
components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/service/ExecQueueService.java

@@ -5,6 +5,7 @@ import com.github.microservice.components.data.mongo.queue.config.ExecQueueConfi
 import com.github.microservice.components.data.mongo.queue.dao.ExecQueueDao;
 import com.github.microservice.components.data.mongo.queue.domain.ExecQueue;
 import com.github.microservice.components.data.mongo.queue.event.ExecQueueEvent;
+import org.apache.commons.lang3.SerializationUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
@@ -24,15 +25,21 @@ public class ExecQueueService {
     @Autowired
     private DBHelper dbHelper;
 
-    // 添加
-    public String add(Map<String, Object> data, int maxTryCount, Class<? extends ExecQueueEvent> eventClass) {
+    /**
+     * @param data
+     * @param maxTryCount
+     * @param execQueueEvent , 注意,事件必须是可序列化的
+     * @return
+     */
+    public String add(Map<String, Object> data, int maxTryCount, ExecQueueEvent execQueueEvent) {
         Assert.state(maxTryCount > 0, "最大尝试次数必须大于0");
-        Assert.notNull(eventClass, "事件不能为空");
+        Assert.notNull(execQueueEvent, "事件不能为空");
 
         final ExecQueue execQueue = new ExecQueue();
-
         execQueue.setData(data);
-        execQueue.setEventClass(eventClass.getName());
+//        execQueue.setEventClass(eventClass.getName());
+
+        execQueue.setExecQueueEvent(SerializationUtils.serialize(execQueueEvent));
 
         execQueue.setCurrentTryCount(0);
         execQueue.setMaxTryCount(maxTryCount);

+ 7 - 3
components/data/MongodbData/src/main/java/com/github/microservice/components/data/mongo/queue/timer/ExecQueueTimer.java

@@ -11,6 +11,7 @@ import com.github.microservice.core.util.os.SystemUtil;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.SerializationUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
@@ -110,9 +111,12 @@ public class ExecQueueTimer implements ApplicationRunner {
     //触发事件
     @SneakyThrows
     private void execute(ExecQueue execQueue) {
-        Class<? extends ExecQueueEvent> cls = (Class<? extends ExecQueueEvent>) Class.forName(execQueue.getEventClass());
-        // 发布事件
-        final ExecQueueEvent execQueueEvent = this.applicationContext.getBean(cls);
+//        Class<? extends ExecQueueEvent> cls = (Class<? extends ExecQueueEvent>) Class.forName(execQueue.getEventClass());
+//        // 发布事件
+//        final ExecQueueEvent execQueueEvent = this.applicationContext.getBean(cls);
+
+        // 转换为具体的实现类
+        final ExecQueueEvent execQueueEvent = SerializationUtils.deserialize(execQueue.getExecQueueEvent());
 
         //执行任务之前状态修改为工作中
         this.execQueueDao.updateStatus(execQueue.getId(), ExecQueue.ExecQueueStatus.Work);

+ 12 - 4
components/data/MongodbData/src/test/java/demo/simple/queue/controller/QueueController.java

@@ -1,7 +1,8 @@
 package demo.simple.queue.controller;
 
 import com.github.microservice.components.data.mongo.queue.service.ExecQueueService;
-import demo.simple.queue.event.QuEvent;
+import com.github.microservice.core.helper.ApplicationContextHolder;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
@@ -11,16 +12,23 @@ import java.util.UUID;
 
 @RequestMapping("queue")
 @RestController
+@Slf4j
 public class QueueController {
 
     @Autowired
     private ExecQueueService execQueueService;
 
+
+
     @RequestMapping("add")
-    public Object add(){
-        return execQueueService.add(Map.of("x", UUID.randomUUID()),10, QuEvent.class);
-    }
+    public Object add() {
 
+        return execQueueService.add(Map.of("x", UUID.randomUUID()), 10, (data) -> {
+            final ExecQueueService execQueueService =    ApplicationContextHolder.getContext().getBean(ExecQueueService.class);
+            log.info("execute : {} -> {} -> {}", Thread.currentThread().getName(), data, execQueueService);
+            return true;
+        });
+    }
 
 
 }