소스 검색

机构类型

TRX 1 년 전
부모
커밋
bcb1509067

+ 1 - 63
src/main/java/com/zswl/dataservice/service/artemis/ArtemisService.java

@@ -52,69 +52,7 @@ public class ArtemisService extends SuperService {
     @JmsListener(destination = "/v1/gateway/#", containerFactory = MQConstant.TopicListenerContainerFactory)
     @JmsListener(destination = ".v1.gateway.#", containerFactory = MQConstant.TopicListenerContainerFactory)
     public void receiveMessage(Message message) {
-        // 处理接收到的消息
-        try {
-            ActiveMQTopic activeMQTopic = (ActiveMQTopic) message.getJMSDestination();
-            String topicName = activeMQTopic.getTopicName();
-            if (topicName.equals("activemq.notifications")) {
-                return;
-            }
-            OperationMessage operationMessage = new OperationMessage();
-
-            String messageId = message.getJMSMessageID();
-            String clientId = message.getStringProperty("__AMQ_CID");
-            log.info("receiveMessage {} 消息监听clientId: {}", messageId, clientId);
-            log.info("Topic: {}", topicName);
-            if (StringUtils.isNotEmpty(topicName) && topicName.endsWith("reply")) {
-                // 这是响应的数据,不处理
-                return;
-            }
-
-            operationMessage.setMessageId(messageId);
-            operationMessage.setClientId(clientId);
-            operationMessage.setTopic(topicName);
-            operationMessage.setOperationType(OperationType.Sub);
-
-            String messageClass = "";
-            String msg = "";
-            if (message instanceof ActiveMQBytesMessage) {
-                ActiveMQBytesMessage activeMQBytesMessage = (ActiveMQBytesMessage) message;
-                messageClass = ActiveMQBytesMessage.class.getSimpleName();
-                byte[] messageBody = new byte[(int) activeMQBytesMessage.getBodyLength()];
-                // 读取消息内容到字节数组
-                activeMQBytesMessage.readBytes(messageBody);
-                msg = new String(messageBody);
-            }
-            if (message instanceof TextMessage) {
-                messageClass = TextMessage.class.getSimpleName();
-                TextMessage textMessage = (TextMessage) message;
-                msg = textMessage.getText();
-            }
-            cn.hutool.json.JSONObject jsonObject = JSONUtil.parseObj(msg);
-            String id = jsonObject.getStr("id");
-            Long time = jsonObject.getLong("time");
-            Long ttl = jsonObject.getLong("ttl");
-            String event = jsonObject.getStr("event");
-            boolean isTimeOut = false;
-            if (System.currentTimeMillis() > (time + ttl)) {
-                isTimeOut = true;
-            }
-            log.info("textMessage: {}", msg);
-            String timeStr = DateUtils.paresTime(time, DateUtils.patternyyyySSS);
-            jsonObject.set("timeStr", timeStr);
-            // --------------------处理业务 start------------------
-            operationMessage.setMessageClass(messageClass);
-            operationMessage.setData(jsonObject);
-            operationMessage.setTtlTime(ttl);
-            operationMessage.setSendTime(time);
-            operationMessage.setDataId(id);
-            operationMessage.setIsTimeOut(isTimeOut);
-            operationMessage.setEvent(event);
-
-            operationMessageService.addOperationMessage(operationMessage);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+        operationMessageService.handlerGateWayMessage(message);
     }
 
     /**

+ 79 - 0
src/main/java/com/zswl/dataservice/service/artemis/OperationMessageService.java

@@ -16,19 +16,26 @@ import com.zswl.dataservice.service.base.SuperService;
 import com.zswl.dataservice.service.mqtt.DeviceInfoService;
 import com.zswl.dataservice.service.mqtt.GateWayInfoService;
 import com.zswl.dataservice.service.payment.HxzService;
+import com.zswl.dataservice.type.OperationType;
 import com.zswl.dataservice.utils.DateUtils;
 import com.zswl.dataservice.utils.bean.BeanUtils;
 import com.zswl.dataservice.utils.mqtt.MqttTopicUtils;
 import com.zswl.dataservice.utils.mqtt.mqttConfig.client.MQClient;
 import com.zswl.dataservice.utils.page.PageEntityUtil;
 import com.zswl.dataservice.utils.result.ResultContent;
+import jakarta.jms.Message;
+import jakarta.jms.TextMessage;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage;
+import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
 import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.ApplicationContext;
 import org.springframework.data.domain.Page;
 import org.springframework.data.domain.Pageable;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 
 import java.lang.reflect.Method;
@@ -124,6 +131,78 @@ public class OperationMessageService {
         return ResultContent.buildSuccess();
     }
 
+    /**
+     * 处理网关MQTT信息
+     *
+     * @param message
+     */
+    @Async
+    public void handlerGateWayMessage(Message message) {
+        // 处理接收到的消息
+        try {
+            ActiveMQTopic activeMQTopic = (ActiveMQTopic) message.getJMSDestination();
+            String topicName = activeMQTopic.getTopicName();
+            if (topicName.equals("activemq.notifications")) {
+                return;
+            }
+            OperationMessage operationMessage = new OperationMessage();
+
+            String messageId = message.getJMSMessageID();
+            String clientId = message.getStringProperty("__AMQ_CID");
+            log.info("receiveMessage {} 消息监听clientId: {}", messageId, clientId);
+            log.info("Topic: {}", topicName);
+            if (StringUtils.isNotEmpty(topicName) && topicName.endsWith("reply")) {
+                // 这是响应的数据,不处理
+                return;
+            }
+
+            operationMessage.setMessageId(messageId);
+            operationMessage.setClientId(clientId);
+            operationMessage.setTopic(topicName);
+            operationMessage.setOperationType(OperationType.Sub);
+
+            String messageClass = "";
+            String msg = "";
+            if (message instanceof ActiveMQBytesMessage) {
+                ActiveMQBytesMessage activeMQBytesMessage = (ActiveMQBytesMessage) message;
+                messageClass = ActiveMQBytesMessage.class.getSimpleName();
+                byte[] messageBody = new byte[(int) activeMQBytesMessage.getBodyLength()];
+                // 读取消息内容到字节数组
+                activeMQBytesMessage.readBytes(messageBody);
+                msg = new String(messageBody);
+            }
+            if (message instanceof TextMessage) {
+                messageClass = TextMessage.class.getSimpleName();
+                TextMessage textMessage = (TextMessage) message;
+                msg = textMessage.getText();
+            }
+            cn.hutool.json.JSONObject jsonObject = JSONUtil.parseObj(msg);
+            String id = jsonObject.getStr("id");
+            Long time = jsonObject.getLong("time");
+            Long ttl = jsonObject.getLong("ttl");
+            String event = jsonObject.getStr("event");
+            boolean isTimeOut = false;
+            if (System.currentTimeMillis() > (time + ttl)) {
+                isTimeOut = true;
+            }
+            log.info("textMessage: {}", msg);
+            String timeStr = DateUtils.paresTime(time, DateUtils.patternyyyySSS);
+            jsonObject.set("timeStr", timeStr);
+            // --------------------处理业务 start------------------
+            operationMessage.setMessageClass(messageClass);
+            operationMessage.setData(jsonObject);
+            operationMessage.setTtlTime(ttl);
+            operationMessage.setSendTime(time);
+            operationMessage.setDataId(id);
+            operationMessage.setIsTimeOut(isTimeOut);
+            operationMessage.setEvent(event);
+
+            addOperationMessage(operationMessage);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
     /**
      * 添加
      *