TRX 1 vuosi sitten
vanhempi
commit
d9ebca9dfc

+ 2 - 0
src/main/java/com/zswl/dataservice/dao/mqtt/OperationMessageDao.java

@@ -16,6 +16,8 @@ public interface OperationMessageDao extends MongoDao<OperationMessage>, Operati
 
     OperationMessage findTopById(String id);
 
+    OperationMessage findTopByDataId(String dataId);
+
     // 业务数据是否存在
     boolean existsByDataId(String dataId);
 }

+ 7 - 0
src/main/java/com/zswl/dataservice/dao/mqtt/extend/OperationMessageDaoExtend.java

@@ -12,4 +12,11 @@ import org.springframework.data.domain.Pageable;
  */
 public interface OperationMessageDaoExtend {
     Page<OperationMessage> page(Pageable pageable, OperationMessageSearchParam param);
+
+    OperationMessage init(String dataId, String token);
+
+    String acquire(String dataId, Long expiration);
+
+    boolean release(String dataId);
+
 }

+ 61 - 0
src/main/java/com/zswl/dataservice/dao/mqtt/impl/OperationMessageDaoImpl.java

@@ -6,14 +6,19 @@ import com.zswl.dataservice.domain.mqtt.OperationMessage;
 import com.zswl.dataservice.helper.DBHelper;
 import com.zswl.dataservice.model.artemis.OperationMessageSearchParam;
 import com.zswl.dataservice.utils.CommonUtil;
+import com.zswl.dataservice.utils.TokenUtil;
 import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.domain.Page;
 import org.springframework.data.domain.Pageable;
 import org.springframework.data.domain.Sort;
+import org.springframework.data.mongodb.core.FindAndModifyOptions;
 import org.springframework.data.mongodb.core.MongoTemplate;
 import org.springframework.data.mongodb.core.query.Criteria;
 import org.springframework.data.mongodb.core.query.Query;
+import org.springframework.data.mongodb.core.query.Update;
 import org.springframework.util.CollectionUtils;
 
 import java.util.ArrayList;
@@ -27,6 +32,7 @@ import java.util.regex.Pattern;
  */
 public class OperationMessageDaoImpl extends BaseImpl implements OperationMessageDaoExtend {
 
+    private static final Logger log = LoggerFactory.getLogger(OperationMessageDaoImpl.class);
     @Autowired
     private MongoTemplate mongoTemplate;
 
@@ -75,4 +81,59 @@ public class OperationMessageDaoImpl extends BaseImpl implements OperationMessag
         return dbHelper.pages(query, pageable, OperationMessage.class);
     }
 
+    public OperationMessage init(String dataId, String token) {
+        OperationMessage doc = null;
+        try {
+            Query query = Query.query(Criteria.where("dataId").is(dataId).and("token").isNull());
+            Update update = new Update()
+                    .set("token", token)
+                ;
+            FindAndModifyOptions options = new FindAndModifyOptions().upsert(true)
+                    .returnNew(true);
+            doc = mongoTemplate.findAndModify(query, update, options,
+                    OperationMessage.class);
+        } catch (Exception e) {
+            log.error("init {}", e.getMessage());
+        }
+        return doc;
+    }
+
+    public String acquire(String dataId, Long expiration) {
+        Query query = Query.query(Criteria.where("dataId").is(dataId).and("token").isNull());
+        String token = TokenUtil.create();
+        Update update = new Update()
+                .set("expireAt", System.currentTimeMillis() + expiration)
+                .set("token", token);
+
+        FindAndModifyOptions options = new FindAndModifyOptions().upsert(false)
+                .returnNew(true);
+
+        OperationMessage doc = mongoTemplate.findAndModify(query, update, options,
+                OperationMessage.class);
+
+        if (doc == null) {
+            OperationMessage lockObj = mongoTemplate.findOne(Query.query(Criteria.where("dataId").is(dataId)), OperationMessage.class);
+            doc = lockObj;
+        }
+        boolean locked = doc != null && doc.getToken() != null && doc.getToken().equals(token);
+        // 如果已过期
+        if (!locked && doc != null && doc.getExpireAt() < System.currentTimeMillis()) {
+            release(dataId);
+            // 成功释放锁, 再次尝试获取锁
+            return this.acquire(dataId, expiration);
+        }
+        return locked ? token : null;
+    }
+
+    public boolean release(String dataId) {
+        Query releaseQuery = Query.query(Criteria.where("dataId").is(dataId));
+        Update releaseUpdate = new Update().set("expireAt", null).set("token", null);
+
+        FindAndModifyOptions releaseOptions = new FindAndModifyOptions().upsert(true)
+                .returnNew(true);
+        OperationMessage flowDisposition = mongoTemplate.findAndModify(releaseQuery, releaseUpdate, releaseOptions,
+                OperationMessage.class);
+        return StringUtils.isEmpty(flowDisposition.getToken());
+    }
+
 }

+ 5 - 0
src/main/java/com/zswl/dataservice/domain/mqtt/OperationMessage.java

@@ -32,8 +32,13 @@ public class OperationMessage extends SuperEntity {
     private String clientId;
 
     @Schema(description = "业务生成的ID")
+    @Indexed(unique = true, sparse = true)
     private String dataId;
 
+    private String token;
+
+    private Long expireAt;
+
     @Schema(description = "消息内容")
     private Object data;
 

+ 24 - 6
src/main/java/com/zswl/dataservice/service/artemis/OperationMessageService.java

@@ -19,6 +19,7 @@ import com.zswl.dataservice.service.mqtt.GateWayInfoService;
 import com.zswl.dataservice.service.payment.HxzService;
 import com.zswl.dataservice.type.OperationType;
 import com.zswl.dataservice.utils.DateUtils;
+import com.zswl.dataservice.utils.TokenUtil;
 import com.zswl.dataservice.utils.bean.BeanUtils;
 import com.zswl.dataservice.utils.mqtt.MqttTopicUtils;
 import com.zswl.dataservice.utils.mqtt.mqttConfig.client.MQClient;
@@ -179,7 +180,6 @@ public class OperationMessageService {
                 log.warn("回复消息不处理");
                 return;
             }
-
             OperationMessage operationMessage = new OperationMessage();
             operationMessage.setMessageId(messageId);
             operationMessage.setClientId(clientId);
@@ -224,12 +224,32 @@ public class OperationMessageService {
             operationMessage.setIsTimeOut(isTimeOut);
             operationMessage.setEvent(event);
             operationMessage.setGateWayId(gateWayId);
-            addOperationMessage(operationMessage);
+            initAddOperationMessage(operationMessage);
         } catch (Exception e) {
             e.printStackTrace();
         }
     }
 
+    private ResultContent initAddOperationMessage(OperationMessage entity) {
+        // 判断DataId 是否存在,存在就不处理
+        String dataId = entity.getDataId();
+        try {
+            String token = TokenUtil.create();
+            OperationMessage temp = operationMessageDao.init(dataId, token);
+            if (ObjectUtils.isNotEmpty(temp)) {
+                log.info("获取到执行锁,开始执行");
+                entity.setId(temp.getId());
+                entity.setToken(temp.getToken());
+                addOperationMessage(entity);
+            } else {
+                log.warn("未获取到执行锁,跳过执行");
+            }
+        } catch (Exception e) {
+            log.error("错误: {}", e.getMessage());
+        }
+        return ResultContent.buildSuccess();
+    }
+
     /**
      * 添加
      *
@@ -244,9 +264,6 @@ public class OperationMessageService {
         }
         boolean isTimeOut = entity.getIsTimeOut();
         isTimeOut = true;
-        // 判断业务数据ID是否存在,不要重复处理
-        boolean isExit = operationMessageDao.existsByDataId(entity.getDataId());
-
         // TODO
         if (isTimeOut) {
             // 未超时,处理消息
@@ -274,12 +291,13 @@ public class OperationMessageService {
             boolean isHandleSuccess = true;
             String handleMsg = "处理成功";
             JSONObject jsonObject1 = (JSONObject) json.get("data");
+            jsonObject1.put("mqttDataId", entity.getDataId());
             String DeviceId = jsonObject1.getStr("DeviceId");
             entity.setDeviceId(DeviceId);
             try {
                 ExecuteMethodInfo executeMethodInfo = executeMethodInfoDao.findTopByEvent(event);
                 if (ObjectUtils.isNotEmpty(executeMethodInfo)) {
-                    String dataStr = json.getStr("data");
+                    String dataStr = JSONUtil.toJsonStr(jsonObject1);
 
                     String beanName = executeMethodInfo.getBeanName();
                     String methodName = executeMethodInfo.getMethodName();

+ 57 - 0
src/main/java/com/zswl/dataservice/utils/TokenUtil.java

@@ -0,0 +1,57 @@
+package com.zswl.dataservice.utils;
+
+import com.zswl.dataservice.utils.encode.HashUtil;
+
+import java.util.UUID;
+
+public class TokenUtil {
+
+    /**
+     * 创建令牌
+     *
+     * @return
+     */
+    public static String create() {
+        return UUID.randomUUID().toString().replaceAll("-", "");
+    }
+
+
+    /**
+     * 创建令牌并将UUID的值Hash
+     *
+     * @return
+     */
+    public static String createAndHash() {
+        return HashUtil.hash(create());
+    }
+
+
+    /**
+     * 创建令牌,长度
+     *
+     * @param size
+     * @return
+     */
+    public static String create(int size) {
+        String ret = createAndHash();
+        ret = limitMaxText(ret, size);
+        while (ret.length() < size) {
+            ret += createAndHash();
+        }
+        return limitMaxText(ret, size);
+    }
+
+
+    /**
+     * 限制最大文本
+     *
+     * @param text
+     * @param maxCount
+     * @return
+     */
+    private static String limitMaxText(String text, int maxCount) {
+        return text.length() > maxCount ? text.substring(0, maxCount) : text;
+    }
+
+
+}

+ 22 - 0
src/main/java/com/zswl/dataservice/utils/encode/HashUtil.java

@@ -0,0 +1,22 @@
+package com.zswl.dataservice.utils.encode;
+
+import org.springframework.util.DigestUtils;
+
+public class HashUtil {
+
+    /**
+     * 多文本hash工具
+     *
+     * @param texts
+     * @return
+     */
+    public static String hash(String... texts) {
+        StringBuilder sb = new StringBuilder();
+        for (String text : texts) {
+            sb.append(text + "_");
+        }
+        return DigestUtils.md5DigestAsHex(sb.toString().getBytes());
+    }
+
+
+}