TRX 1 rok pred
rodič
commit
c4a40d3f6d

+ 8 - 0
src/main/java/com/zswl/dataservice/domain/mqtt/OperationMessage.java

@@ -6,6 +6,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.data.mongodb.core.index.Indexed;
 import org.springframework.data.mongodb.core.mapping.Document;
 
@@ -71,6 +72,13 @@ public class OperationMessage extends SuperEntity {
     @Schema(description = "网关ID")
     private String gateWayId;
 
+    public String getGateWayId() {
+        if (StringUtils.isNotEmpty(gateWayId)) {
+            return gateWayId;
+        }
+        return "";
+    }
+
     @Schema(description = "mqtt消息类型: 发送 接收")
     OperationType operationType;
 

+ 153 - 132
src/main/java/com/zswl/dataservice/service/artemis/OperationMessageService.java

@@ -53,6 +53,7 @@ import java.lang.reflect.Method;
 import java.util.Date;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * 指令数据管理
@@ -219,8 +220,10 @@ public class OperationMessageService {
             Long time = jsonObject.getLong("time");
             Long ttl = jsonObject.getLong("ttl");
             String event = jsonObject.getStr("event");
-            if (event.equals("ping")) {
-                return;
+            if (ObjectUtils.isEmpty(event)) {
+                String[] arr = topicName.split("/");
+                event = arr[arr.length - 1];
+                jsonObject.put("event", event);
             }
             log.info("Topic: {}", topicName);
             String gateWayId = jsonObject.getStr("gatewayId");
@@ -236,9 +239,9 @@ public class OperationMessageService {
 
             // --------------------处理业务 start------------------
             OperationMessage operationMessage = new OperationMessage();
-            operationMessage.setMessageId(messageId);
-            operationMessage.setClientId(clientId);
-            operationMessage.setTopic(topicName); // top名称
+            operationMessage.setMessageId(messageId); // 消息ID
+            operationMessage.setClientId(clientId); // 终端ID
+            operationMessage.setTopic(topicName); // topic名称
             operationMessage.setOperationType(OperationType.Sub);
             operationMessage.setMessageClass(messageClass);
             operationMessage.setData(jsonObject);
@@ -327,22 +330,21 @@ public class OperationMessageService {
             String handleMsg = "处理成功";
 
             // 业务处理的消息内容
-            JSONObject jsonObject1 = (JSONObject) json.get("data");
-            jsonObject1.put("mqttDataId", entity.getDataId());
-            jsonObject1.put("GateWayId", entity.getGateWayId());
-            String DeviceId = jsonObject1.getStr("DeviceId");
+            JSONObject requestData = (JSONObject) json.get("data");
+            requestData.put("mqttDataId", entity.getDataId());
+            requestData.put("GateWayId", entity.getGateWayId());
+            String DeviceId = requestData.getStr("DeviceId");
             if (StringUtils.isEmpty(DeviceId)) {
                 DeviceId = entity.getDeviceId();
-                jsonObject1.put("DeviceId", DeviceId);
+                requestData.put("DeviceId", DeviceId);
             }
             entity.setDeviceId(DeviceId);
 
             // 查询有对应事件的设备
             List<IotMain> events = iotMainDao.findByRealIotTopicAndFunctionTypeOrderByCreateTimeAsc(entity.getTopic(), FunctionType.Event);
             if (ObjectUtils.isNotEmpty(events)) {
-                String dataStr = JSONUtil.toJsonStr(jsonObject1);
                 for (IotMain iotMain : events) {
-                    executeOperationMessage(entity, dataStr, iotMain);
+                    executeOperationMessage(entity, requestData, iotMain);
                 }
             } else {
                 isHandleSuccess = false;
@@ -356,145 +358,164 @@ public class OperationMessageService {
                 entity.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
                 operationMessageDao.save(entity);
             }
+            // 如果是ping,则平台也会处理
+            if (event != null && event.equals("ping")) {
+                pingHandler(entity, requestData);
+            }
+        }
+        return ResultContent.buildSuccess();
+    }
 
-            if (event.equals("ping")) {
-                // 查询业务处理端信息
-                try {
-                    ExecuteMethodInfo executeMethodInfo = executeMethodInfoDao.findTopByEvent(event);
-                    if (ObjectUtils.isNotEmpty(executeMethodInfo)) {
-                        String dataStr = JSONUtil.toJsonStr(jsonObject1);
-
-                        String beanName = executeMethodInfo.getBeanName();
-                        String methodName = executeMethodInfo.getMethodName();
-                        Class c = applicationContext.getBean(beanName).getClass();
-                        SuperService t = (SuperService) applicationContext.getBean(beanName);
-                        Method method = c.getMethod(methodName, String.class);
-                        ResultContent<Object> resultContent = (ResultContent<Object>) method.invoke(t, dataStr);
-                        if (resultContent.isSuccess()) {
-                            isHandleSuccess = true;
-                            handleMsg = "处理成功";
-                            result = resultContent.getContent();
-                        } else {
-                            isHandleSuccess = false;
-                            handleMsg = resultContent.getMsg();
-                        }
-                        entity.setBeanName(beanName);
-                        entity.setMethodName(methodName);
-                    } else {
-                        isHandleSuccess = false;
-                        handleMsg = "消息处理方法未找到";
-                    }
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    isHandleSuccess = false;
-                    handleMsg = String.format("业务处理出错:%S", e.getMessage());
-                }
-                stopWatch.stop();
-                entity.setHandlerTime(stopWatch.getLastTaskTimeMillis());
-                // 返回结果
-                entity.setResultData(result);
-                // 业务处理失败
-                entity.setIsHandleSuccess(isHandleSuccess);
-                entity.setHandleMsg(handleMsg);
-                log.info("消息处理结果: {} {}", isHandleSuccess, handleMsg);
-                if (isHandleSuccess) {
-                    // 处理成功,返回响应
-                    responseMessage(entity);
+    /**
+     * 处理ping消息
+     *
+     * @param entity
+     * @param requestData
+     * @return
+     */
+    public ResultContent pingHandler(OperationMessage entity, JSONObject requestData) {
+        String event = entity.getEvent();
+        Object result = null;
+        StopWatch stopWatch = new StopWatch();
+        stopWatch.start();
+
+        boolean isHandleSuccess = true;
+        String handleMsg = "处理成功";
+        // 查询业务处理端信息
+        try {
+            ExecuteMethodInfo executeMethodInfo = executeMethodInfoDao.findTopByEvent(event);
+            if (ObjectUtils.isNotEmpty(executeMethodInfo)) {
+                String dataStr = JSONUtil.toJsonStr(requestData);
+
+                String beanName = executeMethodInfo.getBeanName();
+                String methodName = executeMethodInfo.getMethodName();
+                Class c = applicationContext.getBean(beanName).getClass();
+                SuperService t = (SuperService) applicationContext.getBean(beanName);
+                Method method = c.getMethod(methodName, String.class);
+                ResultContent<Object> resultContent = (ResultContent<Object>) method.invoke(t, dataStr);
+                if (resultContent.isSuccess()) {
+                    isHandleSuccess = true;
+                    handleMsg = "处理成功";
+                    result = resultContent.getContent();
                 } else {
-                    // 处理失败,记录数据
-                    entity.setReTime(System.currentTimeMillis());
-                    entity.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
-                    operationMessageDao.save(entity);
-                    return ResultContent.buildFail(handleMsg);
+                    isHandleSuccess = false;
+                    handleMsg = resultContent.getMsg();
                 }
+                entity.setBeanName(beanName);
+                entity.setMethodName(methodName);
+            } else {
+                isHandleSuccess = false;
+                handleMsg = "消息处理方法未找到";
             }
+        } catch (Exception e) {
+            e.printStackTrace();
+            isHandleSuccess = false;
+            handleMsg = String.format("业务处理出错:%S", e.getMessage());
+        }
+        stopWatch.stop();
+        entity.setHandlerTime(stopWatch.getLastTaskTimeMillis());
+        // 返回结果
+        entity.setResultData(result);
+        // 业务处理失败
+        entity.setIsHandleSuccess(isHandleSuccess);
+        entity.setHandleMsg(handleMsg);
+        log.info("消息处理结果: {} {}", isHandleSuccess, handleMsg);
+        if (isHandleSuccess) {
+            // 处理成功,返回响应
+            return responseMessage(entity);
+        } else {
+            // 处理失败,记录数据
+            entity.setReTime(System.currentTimeMillis());
+            entity.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
+            operationMessageDao.save(entity);
+            return ResultContent.buildFail(handleMsg);
         }
-        return ResultContent.buildSuccess();
     }
 
     /**
      * 执行
      *
      * @param entity
-     * @param dataStr
+     * @param data
      * @param iotMain
      */
-    public void executeOperationMessage(OperationMessage entity, String dataStr, IotMain iotMain) {
-        OperationMessageResult messageResult = new OperationMessageResult();
-        messageResult.setOperationMessage(entity);
-        messageResult.setIotMain(iotMain);
-        messageResult.setDeviceId(iotMain.getDeviceId());
-        messageResult.setProjectCode(iotMain.getProjectCode());
-        messageResult.setGateWayId(entity.getGateWayId());
-        messageResult.setRealIotTopic(iotMain.getRealIotTopic());
-
-        boolean isHandleSuccess = true;
-        String handleMsg = "";
-        Long handlerTime = System.currentTimeMillis();
+    public void executeOperationMessage(OperationMessage entity, Object data, IotMain iotMain) {
+        CompletableFuture.runAsync(() -> {
+            OperationMessageResult messageResult = new OperationMessageResult();
+            messageResult.setOperationMessage(entity);
+            messageResult.setIotMain(iotMain);
+            messageResult.setDeviceId(iotMain.getDeviceId());
+            messageResult.setProjectCode(iotMain.getProjectCode());
+            messageResult.setGateWayId(entity.getGateWayId());
+            messageResult.setRealIotTopic(iotMain.getRealIotTopic());
 
-        String remoteUrl = iotMain.getRemoteUrl();
-        if (StringUtils.isEmpty(remoteUrl)) {
-            isHandleSuccess = false;
-            handleMsg = "";
-        } else {
-            org.springframework.util.StopWatch stopWatch = new org.springframework.util.StopWatch();
-            stopWatch.start();
+            boolean isHandleSuccess = true;
+            String handleMsg = "处理成功";
+            Long handlerTime = System.currentTimeMillis();
 
-            APIResponseModel apiResponseModel = apiRequestService.requestAPI(remoteUrl, dataStr);
-            messageResult.setResultData(apiResponseModel);
-
-            if (apiResponseModel.isSuccess()) {
-                // 处理成功
-                String content = apiResponseModel.getContent();
-                entity.setResultData(content);
-                if (iotMain.getIsReturnData() != null && iotMain.getIsReturnData()) {
-                    // 返回
-                    JSONObject jsonObject = new JSONObject();
-                    jsonObject.put("id", entity.getDataId());
-
-                    JSONObject object = new JSONObject();
-                    if (ObjectUtils.isNotEmpty(content)) {
-                        object = JSONUtil.parseObj(content);
-                    }
-                    jsonObject.put("data", object);
-                    jsonObject.put("time", System.currentTimeMillis());
-                    jsonObject.put("ttl", entity.getTtlTime());
-                    jsonObject.put("event", entity.getEvent());
-                    jsonObject.put("deviceId", entity.getDeviceId());
-                    jsonObject.put("gateWayId", entity.getGateWayId());
-
-                    String reTopic = String.format("%s/reply", entity.getTopic());
-                    String reMsg = "响应成功";
-                    Boolean reIsSuccess = Boolean.TRUE;
-                    try {
-                        mqClient.sendObject(reTopic, JSONUtil.toJsonStr(jsonObject), entity.getDataId());
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                        reIsSuccess = Boolean.FALSE;
-                        reMsg = "mqtt响应出错:" + e.getMessage();
+            String remoteUrl = iotMain.getRemoteUrl();
+            if (StringUtils.isEmpty(remoteUrl)) {
+                isHandleSuccess = false;
+                handleMsg = "业务处理地址为空";
+            } else {
+                org.springframework.util.StopWatch stopWatch = new org.springframework.util.StopWatch();
+                stopWatch.start();
+
+                APIResponseModel apiResponseModel = apiRequestService.requestAPI(remoteUrl, data);
+                messageResult.setResultData(apiResponseModel);
+                if (apiResponseModel.isSuccess()) {
+                    // 处理成功
+                    String content = apiResponseModel.getContent();
+                    entity.setResultData(content);
+                    if (iotMain.getIsReturnData() != null && iotMain.getIsReturnData()) {
+                        // 返回
+                        JSONObject jsonObject = new JSONObject();
+                        jsonObject.put("id", entity.getDataId());
+
+                        JSONObject object = new JSONObject();
+                        if (ObjectUtils.isNotEmpty(content)) {
+                            object = JSONUtil.parseObj(content);
+                        }
+                        jsonObject.put("data", object);
+                        jsonObject.put("time", System.currentTimeMillis());
+                        jsonObject.put("ttl", entity.getTtlTime());
+                        jsonObject.put("event", entity.getEvent());
+                        jsonObject.put("deviceId", entity.getDeviceId());
+                        jsonObject.put("gateWayId", entity.getGateWayId());
+
+                        String reTopic = String.format("%s/reply", entity.getTopic());
+                        String reMsg = "响应成功";
+                        Boolean reIsSuccess = Boolean.TRUE;
+                        try {
+                            mqClient.sendObject(reTopic, JSONUtil.toJsonStr(jsonObject), entity.getDataId());
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                            reIsSuccess = Boolean.FALSE;
+                            reMsg = "mqtt响应出错:" + e.getMessage();
+                        }
+                        messageResult.setIsResult(Boolean.TRUE);
+                        messageResult.setReIsSuccess(reIsSuccess);
+                        messageResult.setReMsg(reMsg);
+                        messageResult.setReTime(System.currentTimeMillis());
+                        messageResult.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
+                        messageResult.setReTopic(reTopic);
+                    } else {
+                        messageResult.setIsResult(Boolean.TRUE);
+                        messageResult.setReIsSuccess(Boolean.FALSE);
+                        messageResult.setReMsg("无需返回");
                     }
-                    messageResult.setIsResult(Boolean.TRUE);
-                    messageResult.setReIsSuccess(reIsSuccess);
-                    messageResult.setReMsg(reMsg);
-                    messageResult.setReTime(System.currentTimeMillis());
-                    messageResult.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
-                    messageResult.setReTopic(reTopic);
                 } else {
-                    messageResult.setIsResult(Boolean.TRUE);
-                    messageResult.setReIsSuccess(Boolean.FALSE);
-                    messageResult.setReMsg("无需返回");
+                    isHandleSuccess = false;
+                    handleMsg = apiResponseModel.getMsg();
                 }
-            } else {
-                isHandleSuccess = false;
-                handleMsg = apiResponseModel.getMsg();
+                stopWatch.stop();
+                messageResult.setUseTime(stopWatch.getTotalTimeMillis());
             }
-            stopWatch.stop();
-            messageResult.setUseTime(stopWatch.getTotalTimeMillis());
-        }
-        messageResult.setIsHandleSuccess(isHandleSuccess);
-        messageResult.setHandleMsg(handleMsg);
-        messageResult.setHandlerTime(handlerTime);
-        operationMessageResultDao.save(messageResult);
+            messageResult.setIsHandleSuccess(isHandleSuccess);
+            messageResult.setHandleMsg(handleMsg);
+            messageResult.setHandlerTime(handlerTime);
+            operationMessageResultDao.save(messageResult);
+        });
     }
 
     /**