TRX há 1 ano atrás
pai
commit
1893061518

+ 10 - 1
src/main/java/com/zswl/dataservice/service/artemis/OperationMessageService.java

@@ -207,11 +207,12 @@ public class OperationMessageService {
             Long ttl = jsonObject.getLong("ttl");
             String event = jsonObject.getStr("event");
             String gateWayId = jsonObject.getStr("gatewayId");
+            String deviceId = jsonObject.getStr("deviceId");
             boolean isTimeOut = false;
             if (System.currentTimeMillis() > (time + ttl)) {
                 isTimeOut = true;
             }
-//            log.info("textMessage: {}", msg);
+            log.info("textMessage: {}", msg);
 
             String timeStr = DateUtils.paresTime(time, DateUtils.patternyyyySSS);
             jsonObject.set("timeStr", timeStr);
@@ -225,6 +226,9 @@ public class OperationMessageService {
             operationMessage.setIsTimeOut(isTimeOut);
             operationMessage.setEvent(event);
             operationMessage.setGateWayId(gateWayId);
+            if (ObjectUtils.isNotEmpty(deviceId)) {
+                operationMessage.setDeviceId(deviceId);
+            }
             initAddOperationMessage(operationMessage);
         } catch (Exception e) {
             e.printStackTrace();
@@ -294,7 +298,12 @@ public class OperationMessageService {
             JSONObject jsonObject1 = (JSONObject) json.get("data");
             jsonObject1.put("mqttDataId", entity.getDataId());
             jsonObject1.put("GateWayId", entity.getGateWayId());
+
             String DeviceId = jsonObject1.getStr("DeviceId");
+            if (StringUtils.isEmpty(DeviceId)) {
+                DeviceId = entity.getDeviceId();
+                jsonObject1.put("DeviceId", DeviceId);
+            }
             entity.setDeviceId(DeviceId);
             try {
                 ExecuteMethodInfo executeMethodInfo = executeMethodInfoDao.findTopByEvent(event);

+ 36 - 23
src/main/java/com/zswl/dataservice/service/mqtt/DevicePingInfoService.java

@@ -24,6 +24,7 @@ import com.zswl.dataservice.utils.mqtt.type.OnLineState;
 import com.zswl.dataservice.utils.result.ResultContent;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -56,6 +57,11 @@ public class DevicePingInfoService extends SuperService {
     @Autowired
     GateWayInfoDao gateWayInfoDao;
 
+    private Long pingTTl = 1 * 24L * 60 * 60 * 1000L;
+
+    // 5分钟
+    private Long maxPingTime = 5 * 60 * 1000L;
+
     @ExecuteAnnotationServiceMethod(value = "devicePing", remark = "设备心跳")
     public ResultContent<Object> devicePing(String dataStr) {
         DevicePingInfoParam param = JSONUtil.toBean(dataStr, DevicePingInfoParam.class);
@@ -76,10 +82,9 @@ public class DevicePingInfoService extends SuperService {
             devicePingInfo.setDeviceName(deviceInfo.getDeviceName());
             devicePingInfo.setGateWayId(param.getGateWayId());
             devicePingInfo.setProjectInfoCode(deviceInfo.getProjectInfoCode());
-            devicePingInfo.setTTL(new Date(System.currentTimeMillis() + 7 * 24L * 60 * 60 * 1000L));
+            devicePingInfo.setTTL(new Date(System.currentTimeMillis() + pingTTl));
             devicePingInfo.setTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
             devicePingInfoDao.save(devicePingInfo);
-
             // 通知设备
             deviceSyncFullCardService.noticeSyncDeviceOnlineTimeChange(deviceId);
         } else {
@@ -92,23 +97,27 @@ public class DevicePingInfoService extends SuperService {
     public ResultContent<Object> ping(String dataStr) {
         PingResult pingResult = new PingResult();
         GateWayPingInfoParam param = JSONUtil.toBean(dataStr, GateWayPingInfoParam.class);
-        GateWayInfo gateWayInfo = gateWayInfoDao.findTopByGateWayId(param.getGateWayId());
-        if (ObjectUtils.isNotEmpty(gateWayInfo)) {
-            Map<String, Object> where = new HashMap<>();
-            where.put("gateWayId", gateWayInfo.getGateWayId());
-            Map<String, Object> standardData = new HashMap<>();
-            standardData.put("onLineState", OnLineState.OnLine);
-            standardData.put("lastOnlineTime", System.currentTimeMillis());
-            commonService.updateData(where, standardData, GateWayInfo.class.getSimpleName());
-
-            // ping记录
-            DevicePingInfo devicePingInfo = new DevicePingInfo();
-            devicePingInfo.setPingType(DeviceType.GateWay.name());
-            devicePingInfo.setGateWayId(param.getGateWayId());
-            devicePingInfo.setProjectInfoCode(gateWayInfo.getProjectInfoCode());
-            devicePingInfo.setTTL(new Date(System.currentTimeMillis() + 1 * 24L * 60 * 60 * 1000L));
-            devicePingInfo.setTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
-            devicePingInfoDao.save(devicePingInfo);
+        if (StringUtils.isNotEmpty(param.getDeviceId())) {
+            devicePing(dataStr);
+        } else {
+            GateWayInfo gateWayInfo = gateWayInfoDao.findTopByGateWayId(param.getGateWayId());
+            if (ObjectUtils.isNotEmpty(gateWayInfo)) {
+                Map<String, Object> where = new HashMap<>();
+                where.put("gateWayId", gateWayInfo.getGateWayId());
+                Map<String, Object> standardData = new HashMap<>();
+                standardData.put("onLineState", OnLineState.OnLine);
+                standardData.put("lastOnlineTime", System.currentTimeMillis());
+                commonService.updateData(where, standardData, GateWayInfo.class.getSimpleName());
+
+                // ping记录
+                DevicePingInfo devicePingInfo = new DevicePingInfo();
+                devicePingInfo.setPingType(DeviceType.GateWay.name());
+                devicePingInfo.setGateWayId(param.getGateWayId());
+                devicePingInfo.setProjectInfoCode(gateWayInfo.getProjectInfoCode());
+                devicePingInfo.setTTL(new Date(System.currentTimeMillis() + pingTTl));
+                devicePingInfo.setTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
+                devicePingInfoDao.save(devicePingInfo);
+            }
         }
         pingResult.setSuccess();
         pingResult.setTime(System.currentTimeMillis());
@@ -124,13 +133,18 @@ public class DevicePingInfoService extends SuperService {
         List<DeviceInfo> list = deviceInfoDao.findAll();
         if (ObjectUtils.isNotEmpty(list)) {
             long time = System.currentTimeMillis();
-            for (DeviceInfo deviceInfo : list) {
+            list.parallelStream().forEach(deviceInfo -> {
                 OnLineState onLineState = OnLineState.OnLine;
                 List<DevicePingInfo> _list = devicePingInfoDao.findTop5ByDeviceIdOrderByCreateTimeDesc(deviceInfo.getDeviceId());
                 if (_list != null && _list.size() > 2) {
                     Long firstTime = _list.get(0).getCreateTime();
                     Long lastTime = _list.get(_list.size() - 1).getCreateTime();
-                    long avgTime = (firstTime - lastTime) / _list.size();
+                    // 2 次心跳间隔时间
+                    long avgTime = ((firstTime - lastTime) / (_list.size() - 1)) * 2;
+                    if (avgTime > maxPingTime) {
+                        avgTime = maxPingTime;
+                    }
+                    // 根据ping数据判断是否在线
                     if ((time - firstTime) > avgTime) {
                         onLineState = OnLineState.OffLine;
                     }
@@ -144,8 +158,7 @@ public class DevicePingInfoService extends SuperService {
                     commonService.updateData(standardData, DeviceInfo.class.getSimpleName());
                     deviceSyncFullCardService.noticeSyncDeviceOnlineStateChange(deviceInfo.getDeviceId());
                 }
-
-            }
+            });
         }
     }