TRX 1 éve
szülő
commit
4bcef9fc79

+ 2 - 0
src/main/java/com/zswl/dataservice/dao/mqtt/DevicePingInfoDao.java

@@ -21,4 +21,6 @@ public interface DevicePingInfoDao extends MongoDao<DevicePingInfo> {
 
     List<DevicePingInfo> findTop5ByDeviceIdOrderByCreateTimeDesc(String deviceId);
 
+    List<DevicePingInfo> findTop5ByGateWayIdAndPingTypeOrderByCreateTimeDesc(String gateWayId, String pingType);
+
 }

+ 1 - 1
src/main/java/com/zswl/dataservice/domain/mqtt/DevicePingInfo.java

@@ -29,7 +29,7 @@ public class DevicePingInfo extends SuperEntity {
     @Schema(description = "网关ID")
     private String gateWayId;
 
-    @Schema(description = "ping类型")
+    @Schema(description = "ping类型 DeviceType Consumer:消费机 GateWay:网关")
     private String pingType;
 
     @Schema(description = "项目code")

+ 13 - 9
src/main/java/com/zswl/dataservice/service/base/CommonService.java

@@ -3,6 +3,7 @@ package com.zswl.dataservice.service.base;
 import com.mongodb.client.result.UpdateResult;
 import com.zswl.dataservice.utils.CommonUtil;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.mongodb.core.MongoTemplate;
 import org.springframework.data.mongodb.core.query.Criteria;
@@ -33,15 +34,18 @@ public class CommonService {
     public Object updateData(Map<String, Object> standardData, String collectionName) {
         collectionName = CommonUtil.getCollectionName(collectionName);
         Object id = standardData.get("id");
-        Query query = new Query(Criteria.where("_id").is(id));
-        Update update = new Update();
-        standardData.forEach((key, value) -> {
-            if (!"id".equals(key)) {
-                update.set(key, value);
-            }
-        });
-        UpdateResult updateResult = mongoTemplate.upsert(query, update, collectionName);
-        return updateResult.getUpsertedId();
+        if (id != null) {
+            Query query = new Query(Criteria.where("_id").is(id));
+            Update update = new Update();
+            standardData.forEach((key, value) -> {
+                if (!"id".equals(key)) {
+                    update.set(key, value);
+                }
+            });
+            UpdateResult updateResult = mongoTemplate.upsert(query, update, collectionName);
+            return updateResult.getUpsertedId();
+        }
+        return null;
     }
 
     public Object updateData(Map<String, Object> where, Map<String, Object> standardData, String collectionName) {

+ 36 - 1
src/main/java/com/zswl/dataservice/service/mqtt/DevicePingInfoService.java

@@ -57,9 +57,10 @@ public class DevicePingInfoService extends SuperService {
     @Autowired
     GateWayInfoDao gateWayInfoDao;
 
+    // ping记录保存时间
     private Long pingTTl = 1 * 24L * 60 * 60 * 1000L;
 
-    // 5分钟
+    //最大的心跳时间间隔 5分钟
     private Long maxPingTime = 5 * 60 * 1000L;
 
     @ExecuteAnnotationServiceMethod(value = "devicePing", remark = "设备心跳")
@@ -162,4 +163,38 @@ public class DevicePingInfoService extends SuperService {
         }
     }
 
+    public void checkGateWayState() {
+        List<GateWayInfo> list = gateWayInfoDao.findAll();
+        if (ObjectUtils.isNotEmpty(list)) {
+            long time = System.currentTimeMillis();
+            list.parallelStream().forEach(gateWayInfo -> {
+                OnLineState onLineState = OnLineState.OnLine;
+                List<DevicePingInfo> _list = devicePingInfoDao.findTop5ByGateWayIdAndPingTypeOrderByCreateTimeDesc(
+                        gateWayInfo.getGateWayId(), DeviceType.GateWay.name()
+                );
+                if (_list != null && _list.size() > 2) {
+                    Long firstTime = _list.get(0).getCreateTime();
+                    Long lastTime = _list.get(_list.size() - 1).getCreateTime();
+                    // 2 次心跳间隔时间
+                    long avgTime = ((firstTime - lastTime) / (_list.size() - 1)) * 2;
+                    if (avgTime > maxPingTime) {
+                        avgTime = maxPingTime;
+                    }
+                    // 根据ping数据判断是否在线
+                    if ((time - firstTime) > avgTime) {
+                        onLineState = OnLineState.OffLine;
+                    }
+                }
+
+                if (onLineState != gateWayInfo.getOnLineState()) {
+                    log.info("网关在线状态改变:{} {}", gateWayInfo.getGateWayName(), onLineState);
+                    Map<String, Object> standardData = new HashMap<String, Object>();
+                    standardData.put("id", gateWayInfo.getId());
+                    standardData.put("onLineState", onLineState);
+                    commonService.updateData(standardData, GateWayInfo.class.getSimpleName());
+                }
+            });
+        }
+    }
+
 }

+ 6 - 10
src/main/java/com/zswl/dataservice/service/mqtt/GateWayInfoService.java

@@ -5,6 +5,7 @@ import com.zswl.dataservice.domain.mqtt.*;
 import com.zswl.dataservice.model.mqtt.*;
 import com.zswl.dataservice.service.base.SuperService;
 import com.zswl.dataservice.service.sync.DeviceSyncFullCardService;
+import com.zswl.dataservice.type.DeviceType;
 import com.zswl.dataservice.utils.DateUtils;
 import com.zswl.dataservice.utils.bean.BeanUtils;
 import com.zswl.dataservice.utils.mqtt.MqttTopicUtils;
@@ -30,7 +31,9 @@ import javax.management.remote.JMXConnector;
 import javax.management.remote.JMXConnectorFactory;
 import javax.management.remote.JMXServiceURL;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * @author TRX
@@ -76,6 +79,9 @@ public class GateWayInfoService extends SuperService {
     @Autowired
     ProjectInfoService projectInfoService;
 
+    @Autowired
+    DevicePingInfoDao devicePingInfoDao;
+
     /**
      * 添加网关
      *
@@ -330,14 +336,4 @@ public class GateWayInfoService extends SuperService {
         return ResultContent.buildSuccess();
     }
 
-    /**
-     * 在线状态改变
-     *
-     * @param gateWayId
-     * @param onLineState
-     */
-    public void stateChange(String gateWayId, OnLineState onLineState) {
-
-    }
-
 }

+ 2 - 0
src/main/java/com/zswl/dataservice/work/CheckDeviceStateWork.java

@@ -1,6 +1,7 @@
 package com.zswl.dataservice.work;
 
 import com.zswl.dataservice.service.mqtt.DevicePingInfoService;
+import com.zswl.dataservice.service.mqtt.GateWayInfoService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.EnableScheduling;
@@ -24,5 +25,6 @@ public class CheckDeviceStateWork {
     @Scheduled(fixedRate = 1000 * 10)
     public void checkDeviceState() {
         devicePingInfoService.checkDeviceState();
+        devicePingInfoService.checkGateWayState();
     }
 }