TRX 1 tahun lalu
induk
melakukan
c949348bda

+ 1 - 1
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/config/MongoConfig.java

@@ -8,7 +8,7 @@ import org.springframework.data.mongodb.repository.config.EnableMongoRepositorie
 @Configuration
 @Import(MongoConfiguration.class)
 @EnableMongoRepositories(
-        value = "com.zhongshu.iot.server.core.dao"
+        value = "com.zhongshu.iot.server.core"
 )
 public class MongoConfig {
 

+ 22 - 0
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/config/RedisConfiguration.java

@@ -0,0 +1,22 @@
+package com.zhongshu.iot.server.core.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+
+/**
+ * @author TRX
+ * @date 2024/12/23
+ */
+@Configuration
+public class RedisConfiguration {
+
+    @Bean
+    RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
+        final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
+        container.setConnectionFactory(connectionFactory);
+        return container;
+    }
+
+}

+ 19 - 0
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/dataConfig/CommonTTLTimeConfig.java

@@ -0,0 +1,19 @@
+package com.zhongshu.iot.server.core.dataConfig;
+
+/**
+ * 通用的TTL时间配置
+ *
+ * @author TRX
+ * @date 2024/12/23
+ */
+public class CommonTTLTimeConfig {
+
+    // ping记录保存时间 1天
+    public static final Long pingTTl = 1 * 24L * 60 * 60 * 1000L;
+
+    // 最大的心跳时间间隔 5分钟
+    public static final Long maxUnPingOnLineTime = 5 * 60 * 1000L;
+
+    public static final Long maxDeviceRedisOnLineTime = 60 * 1000L;
+
+}

+ 7 - 3
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/listener/MongodbAutoEvent.java

@@ -1,5 +1,6 @@
 package com.zhongshu.iot.server.core.listener;
 
+import com.zhongshu.iot.server.core.domain.iot.mqtt.DeviceOnLineInfo;
 import com.zhongshu.iot.server.core.service.mqtt.DevicePingInfoService;
 import lombok.extern.slf4j.Slf4j;
 import org.bson.Document;
@@ -24,18 +25,17 @@ public class MongodbAutoEvent extends AbstractMongoEventListener<Object> {
 
     @Override
     public void onAfterSave(AfterSaveEvent<Object> event) {
-        log.info("onAfterSave");
+//        log.info("onAfterSave");
     }
 
     @Override
     public void onBeforeDelete(BeforeDeleteEvent<Object> event) {
-        log.info("onBeforeDelete");
+//        log.info("onBeforeDelete");
         super.onBeforeDelete(event);
     }
 
     @Override
     public void onAfterDelete(AfterDeleteEvent<Object> event) {
-        log.info("onAfterDelete--- ");
         super.onAfterDelete(event);
         Object source = event.getSource();
         try {
@@ -43,6 +43,10 @@ public class MongodbAutoEvent extends AbstractMongoEventListener<Object> {
             log.info("onAfterDelete 删除了数据... {}", event.getType().getName());
             Document document = event.getDocument();
             if (document != null) {
+                String id = document.getObjectId("_id").toString();
+                if (className.equals(DeviceOnLineInfo.class.getName())) {
+                    devicePingInfoService.deviceTTLUnLine(id);
+                }
             }
         } catch (Exception e) {
             e.printStackTrace();

+ 33 - 0
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/listener/RedisKeyExpirationListener.java

@@ -0,0 +1,33 @@
+package com.zhongshu.iot.server.core.listener;
+
+import com.zhongshu.iot.server.core.service.mqtt.DevicePingInfoService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author TRX
+ * @date 2024/12/23
+ */
+@Component
+@Slf4j
+public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
+
+    @Autowired
+    private DevicePingInfoService devicePingInfoService;
+
+    public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
+        super(listenerContainer);
+    }
+
+    @Override
+    public void onMessage(Message message, byte[] pattern) {
+        // 获取到过期的键
+        String expiredKey = message.toString();
+        log.info("redis expired: {}", expiredKey);
+        devicePingInfoService.redisExpire(expiredKey);
+    }
+}

+ 126 - 106
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/service/mqtt/DevicePingInfoService.java

@@ -12,10 +12,12 @@ import com.zhongshu.iot.server.core.dao.mqtt.DeviceInfoDao;
 import com.zhongshu.iot.server.core.dao.mqtt.DeviceOnLineInfoDao;
 import com.zhongshu.iot.server.core.dao.mqtt.DevicePingInfoDao;
 import com.zhongshu.iot.server.core.dao.mqtt.GateWayInfoDao;
+import com.zhongshu.iot.server.core.dataConfig.CommonTTLTimeConfig;
 import com.zhongshu.iot.server.core.domain.ExecuteAnnotationService;
 import com.zhongshu.iot.server.core.domain.ExecuteAnnotationServiceMethod;
 import com.zhongshu.iot.server.core.domain.iot.mqtt.*;
 import com.zhongshu.iot.server.core.service.base.CommonService;
+import com.zhongshu.iot.server.core.service.base.RedisService;
 import com.zhongshu.iot.server.core.service.base.SuperService;
 import com.zhongshu.iot.server.core.service.sync.DeviceSyncFullCardService;
 import com.zhongshu.iot.server.core.util.DateUtils;
@@ -57,11 +59,8 @@ public class DevicePingInfoService extends SuperService {
     @Autowired
     private DeviceOnLineInfoDao deviceOnLineInfoDao;
 
-    // ping记录保存时间 1天
-    private Long pingTTl = 1 * 24L * 60 * 60 * 1000L;
-
-    //最大的心跳时间间隔 5分钟
-    private Long maxPingTime = 5 * 60 * 1000L;
+    @Autowired
+    private RedisService redisService;
 
     @ExecuteAnnotationServiceMethod(value = "devicePing", remark = "设备心跳")
     public ResultContent<Object> devicePing(String dataStr) {
@@ -91,7 +90,7 @@ public class DevicePingInfoService extends SuperService {
             devicePingInfo.setDeviceName(deviceInfo.getDeviceName());
             devicePingInfo.setGateWayId(param.getGateWayId());
             devicePingInfo.setProjectInfoCode(deviceInfo.getProjectInfoCode());
-            devicePingInfo.setTTL(new Date(System.currentTimeMillis() + pingTTl));
+            devicePingInfo.setTTL(new Date(System.currentTimeMillis() + CommonTTLTimeConfig.pingTTl));
             devicePingInfo.setTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
             devicePingInfoDao.save(devicePingInfo);
 
@@ -133,12 +132,12 @@ public class DevicePingInfoService extends SuperService {
                 devicePingInfo.setPingType(DeviceType.GateWay.name());
                 devicePingInfo.setGateWayId(param.getGateWayId());
                 devicePingInfo.setProjectInfoCode(gateWayInfo.getProjectInfoCode());
-                devicePingInfo.setTTL(new Date(System.currentTimeMillis() + pingTTl));
+                devicePingInfo.setTTL(new Date(System.currentTimeMillis() + CommonTTLTimeConfig.pingTTl));
                 devicePingInfo.setTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
                 devicePingInfoDao.save(devicePingInfo);
 
                 // 网关在线状态
-//                updateDeviceOnLine(gateWayInfo.getGateWayId(), gateWayInfo.getGateWayName(), OnLineDeviceType.GateWay);
+                updateDeviceOnLine(gateWayInfo.getGateWayId(), gateWayInfo.getGateWayName(), OnLineDeviceType.GateWay);
             }
         }
         pingResult.setSuccess();
@@ -151,12 +150,25 @@ public class DevicePingInfoService extends SuperService {
         if (ObjectUtils.isEmpty(entity)) {
             entity = new DeviceOnLineInfo();
         }
+        Long expireTime = CommonTTLTimeConfig.maxUnPingOnLineTime;
+
         entity.setDeviceId(deviceId);
         entity.setDeviceName(deviceName);
-        entity.setTTL(new Date(System.currentTimeMillis() + 5000));
+        entity.setTTL(new Date(System.currentTimeMillis() + expireTime + 1000));
         entity.setTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
         entity.setOnLineDeviceType(onLineDeviceType);
         deviceOnLineInfoDao.save(entity);
+
+        String expiredKey = String.format("expiredKey_%s", deviceId);
+        redisService.setValue(expiredKey, deviceName, CommonTTLTimeConfig.maxDeviceRedisOnLineTime);
+    }
+
+    public void redisExpire(String expiredKey) {
+        if (StringUtils.isNotEmpty(expiredKey) && expiredKey.startsWith("expiredKey_")) {
+            // 设备在线状态
+            expiredKey = expiredKey.replace("expiredKey_", "");
+            deviceTTLUnLine(expiredKey);
+        }
     }
 
     /**
@@ -166,22 +178,17 @@ public class DevicePingInfoService extends SuperService {
      */
     public void deviceTTLUnLine(String dataId) {
         DeviceOnLineInfo entity = deviceOnLineInfoDao.findTopById(dataId);
+        if (ObjectUtils.isEmpty(entity)) {
+            entity = deviceOnLineInfoDao.findTopByDeviceId(dataId);
+        }
+
         if (ObjectUtils.isNotEmpty(entity)) {
             if (entity.getOnLineDeviceType() == OnLineDeviceType.GateWay) {
                 GateWayInfo gateWayInfo = gateWayInfoDao.findTopByGateWayId(entity.getDeviceId());
-                OnLineState onLineState = OnLineState.OffLine;
-                Map<String, Object> standardData = new HashMap<String, Object>();
-                standardData.put("id", gateWayInfo.getId());
-                standardData.put("onLineState", onLineState);
-                commonService.updateData(standardData, GateWayInfo.class.getSimpleName());
+                gateWayCheckOnLineState(gateWayInfo);
             } else if (entity.getOnLineDeviceType() == OnLineDeviceType.Device) {
                 DeviceInfo deviceInfo = deviceInfoDao.findTopByDeviceId(entity.getDeviceId());
-                OnLineState onLineState = OnLineState.OffLine;
-                Map<String, Object> standardData = new HashMap<String, Object>();
-                standardData.put("id", deviceInfo.getId());
-                standardData.put("onLineState", onLineState);
-                commonService.updateData(standardData, DeviceInfo.class.getSimpleName());
-                deviceSyncFullCardService.noticeSyncDeviceOnlineStateChange(deviceInfo.getDeviceId());
+                deviceCheckOnLineState(deviceInfo);
             }
         }
     }
@@ -190,49 +197,55 @@ public class DevicePingInfoService extends SuperService {
      * 检查设备的状态
      */
     public void checkDeviceState() {
-//        log.info("checkDeviceState: {}", DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
         List<DeviceInfo> list = deviceInfoDao.findAll();
         if (ObjectUtils.isNotEmpty(list)) {
-            long time = System.currentTimeMillis();
             list.parallelStream().forEach(deviceInfo -> {
-                // 默认离线
-                OnLineState onLineState = OnLineState.OffLine;
-                List<DevicePingInfo> _list = devicePingInfoDao.findTop5ByDeviceIdOrderByCreateTimeDesc(deviceInfo.getDeviceId());
-                if (_list != null) {
-                    Long firstTime = null;
-                    long avgTime = 0;
-                    if (_list.size() == 1) {
-                        firstTime = _list.get(0).getCreateTime();
-                        avgTime = maxPingTime;
-                    } else if (_list.size() > 2) {
-                        firstTime = _list.get(0).getCreateTime();
-                        Long lastTime = _list.get(_list.size() - 1).getCreateTime();
-                        // 2 次心跳间隔时间
-                        avgTime = ((firstTime - lastTime) / (_list.size() - 1)) * 2;
-                        if (avgTime > maxPingTime) {
-                            avgTime = maxPingTime;
-                        }
-                    }
-                    if (firstTime != null) {
-                        // 根据ping数据判断是否在线
-                        if ((time - firstTime) > avgTime) {
-                            onLineState = OnLineState.OffLine;
-                        } else {
-                            onLineState = OnLineState.OnLine;
-                        }
-                    }
+                deviceCheckOnLineState(deviceInfo);
+            });
+        }
+    }
+
+    private void deviceCheckOnLineState(DeviceInfo deviceInfo) {
+        if (ObjectUtils.isEmpty(deviceInfo)) {
+            return;
+        }
+        long time = System.currentTimeMillis();
+        // 默认离线
+        OnLineState onLineState = OnLineState.OffLine;
+        List<DevicePingInfo> _list = devicePingInfoDao.findTop5ByDeviceIdOrderByCreateTimeDesc(deviceInfo.getDeviceId());
+        if (_list != null) {
+            Long firstTime = null;
+            long avgTime = 0;
+            if (_list.size() == 1) {
+                firstTime = _list.get(0).getCreateTime();
+                avgTime = CommonTTLTimeConfig.maxUnPingOnLineTime;
+            } else if (_list.size() > 2) {
+                firstTime = _list.get(0).getCreateTime();
+                Long lastTime = _list.get(_list.size() - 1).getCreateTime();
+                // 2 次心跳间隔时间
+                avgTime = ((firstTime - lastTime) / (_list.size() - 1)) * 2;
+                if (avgTime > CommonTTLTimeConfig.maxUnPingOnLineTime) {
+                    avgTime = CommonTTLTimeConfig.maxUnPingOnLineTime;
                 }
-//                log.info("{} {}", deviceInfo.getDeviceId(), onLineState);
-                if (onLineState != deviceInfo.getOnLineState()) {
-                    log.info("设备在线状态改变:{} {}", deviceInfo.getDeviceName(), onLineState);
-                    Map<String, Object> standardData = new HashMap<String, Object>();
-                    standardData.put("id", deviceInfo.getId());
-                    standardData.put("onLineState", onLineState);
-                    commonService.updateData(standardData, DeviceInfo.class.getSimpleName());
-                    deviceSyncFullCardService.noticeSyncDeviceOnlineStateChange(deviceInfo.getDeviceId());
+            }
+            if (firstTime != null) {
+                // 根据ping数据判断是否在线
+                if ((time - firstTime) > avgTime) {
+                    onLineState = OnLineState.OffLine;
+                } else {
+                    onLineState = OnLineState.OnLine;
                 }
-            });
+            }
         }
+        if (onLineState != deviceInfo.getOnLineState()) {
+            log.info("设备在线状态改变:{} {}", deviceInfo.getDeviceName(), onLineState);
+            Map<String, Object> standardData = new HashMap<String, Object>();
+            standardData.put("id", deviceInfo.getId());
+            standardData.put("onLineState", onLineState);
+            commonService.updateData(standardData, DeviceInfo.class.getSimpleName());
+            deviceSyncFullCardService.noticeSyncDeviceOnlineStateChange(deviceInfo.getDeviceId());
+        }
+
     }
 
     /**
@@ -241,61 +254,68 @@ public class DevicePingInfoService extends SuperService {
     public void checkGateWayState() {
         List<GateWayInfo> list = gateWayInfoDao.findAll();
         if (ObjectUtils.isNotEmpty(list)) {
-            long time = System.currentTimeMillis();
             list.parallelStream().forEach(gateWayInfo -> {
-                List<DevicePingInfo> _list = devicePingInfoDao.findTop5ByGateWayIdAndPingTypeOrderByCreateTimeDesc(gateWayInfo.getGateWayId(), DeviceType.GateWay.name());
-                // 默认离线
-                OnLineState onLineState = OnLineState.OffLine;
-                if (_list != null) {
-                    Long firstTime = null;
-                    long avgTime = 0;
-                    if (_list.size() == 1) {
-                        firstTime = _list.get(0).getCreateTime();
-                        avgTime = maxPingTime;
-                    } else if (_list.size() > 2) {
-                        firstTime = _list.get(0).getCreateTime();
-                        Long lastTime = _list.get(_list.size() - 1).getCreateTime();
-                        // 2 次心跳间隔时间
-                        avgTime = ((firstTime - lastTime) / (_list.size() - 1)) * 2;
-                        if (avgTime > maxPingTime) {
-                            avgTime = maxPingTime;
-                        }
-                    }
-                    if (firstTime != null) {
-                        // 根据ping数据判断是否在线
-                        if ((time - firstTime) > avgTime) {
-                            onLineState = OnLineState.OffLine;
-                        } else {
-                            onLineState = OnLineState.OnLine;
-                        }
-                    }
-                }
+                gateWayCheckOnLineState(gateWayInfo);
+            });
+        }
+    }
 
-                if (_list != null && _list.size() > 2) {
-                    Long firstTime = _list.get(0).getCreateTime();
-                    Long lastTime = _list.get(_list.size() - 1).getCreateTime();
-                    // 2 次心跳间隔时间
-                    long avgTime = ((firstTime - lastTime) / (_list.size() - 1)) * 2;
-                    if (avgTime > maxPingTime) {
-                        avgTime = maxPingTime;
-                    }
-                    // 根据ping数据判断是否在线
-                    if ((time - firstTime) > avgTime) {
-                        onLineState = OnLineState.OffLine;
-                    }
+    private void gateWayCheckOnLineState(GateWayInfo gateWayInfo) {
+        if (ObjectUtils.isEmpty(gateWayInfo)) {
+            return;
+        }
+        long time = System.currentTimeMillis();
+        List<DevicePingInfo> _list = devicePingInfoDao.findTop5ByGateWayIdAndPingTypeOrderByCreateTimeDesc(gateWayInfo.getGateWayId(), DeviceType.GateWay.name());
+        // 默认离线
+        OnLineState onLineState = OnLineState.OffLine;
+        if (_list != null) {
+            Long firstTime = null;
+            long avgTime = 0;
+            if (_list.size() == 1) {
+                firstTime = _list.get(0).getCreateTime();
+                avgTime = CommonTTLTimeConfig.maxUnPingOnLineTime;
+            } else if (_list.size() > 2) {
+                firstTime = _list.get(0).getCreateTime();
+                Long lastTime = _list.get(_list.size() - 1).getCreateTime();
+                // 2 次心跳间隔时间
+                avgTime = ((firstTime - lastTime) / (_list.size() - 1)) * 2;
+                if (avgTime > CommonTTLTimeConfig.maxUnPingOnLineTime) {
+                    avgTime = CommonTTLTimeConfig.maxUnPingOnLineTime;
                 }
-                if (ObjectUtils.isEmpty(_list)) {
+            }
+            if (firstTime != null) {
+                // 根据ping数据判断是否在线
+                if ((time - firstTime) > avgTime) {
                     onLineState = OnLineState.OffLine;
+                } else {
+                    onLineState = OnLineState.OnLine;
                 }
+            }
+        }
 
-                if (onLineState != gateWayInfo.getOnLineState()) {
-                    log.info("网关在线状态改变:{} {}", gateWayInfo.getGateWayName(), onLineState);
-                    Map<String, Object> standardData = new HashMap<String, Object>();
-                    standardData.put("id", gateWayInfo.getId());
-                    standardData.put("onLineState", onLineState);
-                    commonService.updateData(standardData, GateWayInfo.class.getSimpleName());
-                }
-            });
+        if (_list != null && _list.size() > 2) {
+            Long firstTime = _list.get(0).getCreateTime();
+            Long lastTime = _list.get(_list.size() - 1).getCreateTime();
+            // 2 次心跳间隔时间
+            long avgTime = ((firstTime - lastTime) / (_list.size() - 1)) * 2;
+            if (avgTime > CommonTTLTimeConfig.maxUnPingOnLineTime) {
+                avgTime = CommonTTLTimeConfig.maxUnPingOnLineTime;
+            }
+            // 根据ping数据判断是否在线
+            if ((time - firstTime) > avgTime) {
+                onLineState = OnLineState.OffLine;
+            }
+        }
+        if (ObjectUtils.isEmpty(_list)) {
+            onLineState = OnLineState.OffLine;
+        }
+
+        if (onLineState != gateWayInfo.getOnLineState()) {
+            log.info("网关在线状态改变:{} {}", gateWayInfo.getGateWayName(), onLineState);
+            Map<String, Object> standardData = new HashMap<String, Object>();
+            standardData.put("id", gateWayInfo.getId());
+            standardData.put("onLineState", onLineState);
+            commonService.updateData(standardData, GateWayInfo.class.getSimpleName());
         }
     }