Bladeren bron

机构类型

TRX 1 jaar geleden
bovenliggende
commit
7989cc6704

+ 3 - 0
src/main/java/com/zswl/dataservice/dao/mqtt/OperationMessageDao.java

@@ -15,4 +15,7 @@ public interface OperationMessageDao extends MongoDao<OperationMessage>, Operati
     OperationMessage findTopByMessageId(String messageId);
 
     OperationMessage findTopById(String id);
+
+    // 业务数据是否存在
+    boolean existsByDataId(String dataId);
 }

+ 5 - 3
src/main/java/com/zswl/dataservice/service/artemis/OperationMessageService.java

@@ -169,17 +169,17 @@ public class OperationMessageService {
             if (topicName.equals("activemq.notifications")) {
                 return;
             }
-            OperationMessage operationMessage = new OperationMessage();
-
             String messageId = message.getJMSMessageID();
             String clientId = message.getStringProperty("__AMQ_CID");
             log.info("receiveMessage {} 消息监听clientId: {}", messageId, clientId);
             log.info("Topic: {}", topicName);
             if (StringUtils.isNotEmpty(topicName) && topicName.endsWith("reply")) {
                 // 这是响应的数据,不处理
+                log.warn("回复消息不处理");
                 return;
             }
 
+            OperationMessage operationMessage = new OperationMessage();
             operationMessage.setMessageId(messageId);
             operationMessage.setClientId(clientId);
             operationMessage.setTopic(topicName);
@@ -212,6 +212,7 @@ public class OperationMessageService {
             log.info("textMessage: {}", msg);
             String timeStr = DateUtils.paresTime(time, DateUtils.patternyyyySSS);
             jsonObject.set("timeStr", timeStr);
+
             // --------------------处理业务 start------------------
             operationMessage.setMessageClass(messageClass);
             operationMessage.setData(jsonObject);
@@ -220,7 +221,6 @@ public class OperationMessageService {
             operationMessage.setDataId(id);
             operationMessage.setIsTimeOut(isTimeOut);
             operationMessage.setEvent(event);
-
             addOperationMessage(operationMessage);
         } catch (Exception e) {
             e.printStackTrace();
@@ -241,6 +241,8 @@ public class OperationMessageService {
         }
         boolean isTimeOut = entity.getIsTimeOut();
         isTimeOut = true;
+        // 判断业务数据ID是否存在,不要重复处理
+        boolean isExit = operationMessageDao.existsByDataId(entity.getDataId());
 
         // TODO
         if (isTimeOut) {

+ 0 - 1
src/main/java/com/zswl/dataservice/utils/mqtt/mqttConfig/config/ConsumerConfiguration.java

@@ -73,7 +73,6 @@ public class ConsumerConfiguration {
         jmsTemplate.setConnectionFactory(connectionFactory());
         //setting PuSubDomain to true configures JmsTemplate to work with topics instead of queues
         jmsTemplate.setPubSubDomain(true);
-        jmsTemplate.setDeliveryMode(1);
         return jmsTemplate;
     }