TRX 1 rok pred
rodič
commit
d18225d3a0

+ 12 - 7
src/main/java/com/zswl/dataservice/model/artemis/OperationMessageResultModel.java

@@ -1,8 +1,10 @@
 package com.zswl.dataservice.model.artemis;
 
+import cn.hutool.json.JSONObject;
 import com.zswl.dataservice.domain.iot.IotMain;
 import com.zswl.dataservice.domain.mqtt.OperationMessage;
 import com.zswl.dataservice.model.baseParam.SuperModel;
+import com.zswl.dataservice.model.iot.IotMainModel;
 import io.swagger.v3.oas.annotations.media.Schema;
 import lombok.AllArgsConstructor;
 import lombok.Data;
@@ -17,6 +19,8 @@ import org.springframework.data.mongodb.core.mapping.DBRef;
 @AllArgsConstructor
 @NoArgsConstructor
 public class OperationMessageResultModel extends SuperModel {
+
+    //---------------------消息相关信息 start------------------
     @Schema(description = "设备ID")
     private String deviceId;
 
@@ -30,7 +34,7 @@ public class OperationMessageResultModel extends SuperModel {
     private OperationMessageModel operationMessage;
 
     @Schema(description = "关联的事件")
-    private IotMain iotMain;
+    private IotMainModel iotMain;
 
     @Schema(description = "topic")
     private String realIotTopic;
@@ -49,15 +53,16 @@ public class OperationMessageResultModel extends SuperModel {
     @Schema(description = "处理结果")
     private String handleMsg;
 
-    @Schema(description = "业务处理的bean")
-    private String beanName;
-
-    @Schema(description = "业务处理的方法")
-    private String methodName;
-
     @Schema(description = "处理时间")
     private Long handlerTime;
 
+    //--------------------返回数据 start(就是把结果发送给响应网关)-------------
+    @Schema(description = "是否需要返回")
+    private Boolean isNeedReplay = Boolean.TRUE;
+
+    @Schema(description = "replay数据")
+    private JSONObject replayData;
+
     //--------------------返回数据 start(就是把结果发送给响应网关)-------------
     @Schema(description = "是否响应成功")
     private Boolean isResult = Boolean.FALSE;

+ 9 - 2
src/main/java/com/zswl/dataservice/service/artemis/ArtemisListenerService.java

@@ -39,11 +39,18 @@ public class ArtemisListenerService {
     // 网关来的消息
     @JmsListener(destination = "/v1/gateway/#", containerFactory = MQConstant.TopicListenerContainerFactory)
     @JmsListener(destination = ".v1.gateway.#", containerFactory = MQConstant.TopicListenerContainerFactory)
-    public void receiveMessage(Message message) {
+    public void receiveGateWayMessage(Message message) {
         operationMessageService.handlerGateWayMessage(message);
     }
 
-    // 网关来的消息
+    // 设备来的消息
+    @JmsListener(destination = "/v1/device/#", containerFactory = MQConstant.TopicListenerContainerFactory)
+    @JmsListener(destination = ".v1.device.#", containerFactory = MQConstant.TopicListenerContainerFactory)
+    public void receiveDeviceMessage(Message message) {
+        operationMessageService.handlerGateWayMessage(message);
+    }
+
+    // 终端连接的消息
     @JmsListener(destination = "$sys.mqtt.sessions", containerFactory = MQConstant.TopicListenerContainerFactory)
     public void receiveOnLineChangeMessage(Message message, Session session) {
         try {

+ 22 - 20
src/main/java/com/zswl/dataservice/service/artemis/OperationMessageService.java

@@ -68,23 +68,20 @@ import java.util.concurrent.CompletableFuture;
 public class OperationMessageService {
 
     @Autowired
-    OperationMessageDao operationMessageDao;
+    private OperationMessageDao operationMessageDao;
 
     @Autowired
-    DeviceInfoService deviceInfoService;
+    private DeviceInfoService deviceInfoService;
 
     @Autowired
-    GateWayInfoService gateWayInfoService;
+    private GateWayInfoService gateWayInfoService;
 
     @Autowired
-    MQClient mqClient;
+    private MQClient mqClient;
 
     @Autowired
-    DeviceInfoDao deviceInfoDao;
-
-    @Autowired
-    HxzService hxzService;
-
+    private DeviceInfoDao deviceInfoDao;
+    
     // 保存90天
     @Value("${artemisstore.time}")
     private Long ttlMill = 30 * 24L * 60 * 60 * 1000L;
@@ -96,16 +93,16 @@ public class OperationMessageService {
     private ApplicationContext applicationContext;
 
     @Autowired
-    IotMainDao iotMainDao;
+    private IotMainDao iotMainDao;
 
     @Autowired
-    OperationMessageResultDao operationMessageResultDao;
+    private OperationMessageResultDao operationMessageResultDao;
 
     @Autowired
-    ApiRequestService apiRequestService;
+    private ApiRequestService apiRequestService;
 
     @Autowired
-    IotDataVerifyService iotDataVerifyService;
+    private IotDataVerifyService iotDataVerifyService;
 
     /**
      * 发送指令
@@ -190,6 +187,7 @@ public class OperationMessageService {
      */
     public void handlerGateWayMessage(Message message) {
         // 处理接收到的消息
+        String msg = "";
         try {
             ActiveMQTopic activeMQTopic = (ActiveMQTopic) message.getJMSDestination();
             String topicName = activeMQTopic.getTopicName();
@@ -201,11 +199,9 @@ public class OperationMessageService {
 //            log.info("receiveMessage {} 消息监听clientId: {}", messageId, clientId);
             if (StringUtils.isNotEmpty(topicName) && topicName.endsWith("reply")) {
                 // 这是响应的数据,不处理
-//                log.warn("回复消息不处理");
                 return;
             }
             String messageClass = "";
-            String msg = "";
             if (message instanceof ActiveMQBytesMessage) {
                 ActiveMQBytesMessage activeMQBytesMessage = (ActiveMQBytesMessage) message;
                 messageClass = ActiveMQBytesMessage.class.getSimpleName();
@@ -234,6 +230,9 @@ public class OperationMessageService {
             String gateWayId = jsonObject.getStr("gatewayId");
             String deviceId = jsonObject.getStr("deviceId");
             boolean isTimeOut = false;
+            if (time == null) {
+                time = System.currentTimeMillis();
+            }
             if (System.currentTimeMillis() > (time + ttl)) {
                 isTimeOut = true;
             }
@@ -242,7 +241,7 @@ public class OperationMessageService {
             String timeStr = DateUtils.paresTime(time, DateUtils.patternyyyySSS);
             jsonObject.set("timeStr", timeStr);
 
-            // --------------------处理业务 start------------------
+            // --------------------消息处理 start------------------
             OperationMessage operationMessage = new OperationMessage();
             operationMessage.setMessageId(messageId); // 消息ID
             operationMessage.setClientId(clientId); // 终端ID
@@ -261,6 +260,7 @@ public class OperationMessageService {
             }
             initAddOperationMessage(operationMessage);
         } catch (Exception e) {
+            log.error(msg);
             e.printStackTrace();
         }
     }
@@ -298,7 +298,7 @@ public class OperationMessageService {
         // 消息的过期时间
         entity.setTtl(new Date(System.currentTimeMillis() + ttlMill));
         boolean isTimeOut = entity.getIsTimeOut();
-        isTimeOut = true;
+        isTimeOut = false;
         if (entity.getEvent().equals("ping")) {
             Date da = new Date(System.currentTimeMillis() + 10 * 60 * 1000);
             entity.setTtl(da);
@@ -307,7 +307,7 @@ public class OperationMessageService {
             entity.setHandleMsg("超时不处理");
         }
         // TODO
-        if (isTimeOut) {
+        if (!isTimeOut) {
             // 未超时,处理消息
             handleOperationMessage(entity);
         } else {
@@ -470,6 +470,8 @@ public class OperationMessageService {
                     // 参数验证成功
                     org.springframework.util.StopWatch stopWatch = new org.springframework.util.StopWatch();
                     stopWatch.start();
+
+                    // 请求业务端处理
                     APIResponseModel apiResponseModel = apiRequestService.requestAPI(remoteUrl, data);
                     messageResult.setResultData(apiResponseModel);
                     if (apiResponseModel.isSuccess()) {
@@ -479,7 +481,6 @@ public class OperationMessageService {
                         if (ObjectUtils.isNotEmpty(content)) {
                             object = JSONUtil.parseObj(content);
                         }
-                        messageResult.setResultData(content);
                         messageResult.setReplayData(object);
                         isHandleSuccess = true;
                         if (iotMain.getIsReturnData() != null && iotMain.getIsReturnData()) {
@@ -498,13 +499,14 @@ public class OperationMessageService {
                     stopWatch.stop();
                     messageResult.setUseTime(stopWatch.getTotalTimeMillis());
                 } else {
+                    // 参数验证失败
                     isHandleSuccess = false;
                     isNeedReplay = Boolean.TRUE;
                     handleMsg = verifyContent.getMsg();
                 }
             }
 
-            if(!isHandleSuccess) {
+            if (!isHandleSuccess) {
                 handleError.setFailed(handleMsg);
                 JSONObject replayData = JSONUtil.parseObj(handleError);
                 messageResult.setReplayData(replayData);