TRX 1 год назад
Родитель
Сommit
e5753aef58

+ 24 - 0
src/main/java/com/zswl/dataservice/dao/mqtt/DevicePingInfoDao.java

@@ -0,0 +1,24 @@
+package com.zswl.dataservice.dao.mqtt;
+
+import com.zswl.dataservice.dao.MongoDao;
+import com.zswl.dataservice.dao.mqtt.extend.DeviceInfoDaoExtend;
+import com.zswl.dataservice.domain.mqtt.DeviceInfo;
+import com.zswl.dataservice.domain.mqtt.DevicePingInfo;
+
+import java.util.List;
+
+/**
+ * @author TRX
+ * @date 2024/3/21
+ */
+public interface DevicePingInfoDao extends MongoDao<DevicePingInfo> {
+
+    DevicePingInfo findTopById(String id);
+
+    DevicePingInfo findTopByDeviceIdAndGateWayId(String deviceId);
+
+    DevicePingInfo findTopByDeviceIdOrderByCreateTimeDesc(String deviceId);
+
+    List<DevicePingInfo> findTop5ByDeviceIdOrderByCreateTimeDesc(String deviceId);
+
+}

+ 40 - 0
src/main/java/com/zswl/dataservice/domain/mqtt/DevicePingInfo.java

@@ -0,0 +1,40 @@
+package com.zswl.dataservice.domain.mqtt;
+
+import com.zswl.dataservice.domain.base.SuperEntity;
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.springframework.data.mongodb.core.index.Indexed;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+import java.util.Date;
+
+/**
+ * @author TRX
+ * @date 2024/7/3
+ */
+@Data
+@Document
+@NoArgsConstructor
+@AllArgsConstructor
+public class DevicePingInfo extends SuperEntity {
+
+    @Schema(description = "设备ID")
+    private String deviceId;
+
+    @Schema(description = "设备名称")
+    private String deviceName;
+
+    @Schema(description = "网关ID")
+    private String gateWayId;
+
+    @Schema(description = "项目code")
+    private String projectInfoCode;
+
+    @Schema(description = "时间")
+    private String timeStr;
+
+    @Indexed(expireAfterSeconds = 0)
+    private Date TTL;
+}

+ 4 - 1
src/main/java/com/zswl/dataservice/domain/mqtt/GateWayInfo.java

@@ -31,7 +31,7 @@ public class GateWayInfo extends SuperEntity {
     private String ip;
 
     @Schema(description = "网关在线状态")
-    OnLineState state;
+    OnLineState onLineState;
 
     @Schema(description = "所属项目")
     @DBRef(lazy = true)
@@ -39,4 +39,7 @@ public class GateWayInfo extends SuperEntity {
 
     @Schema(description = "项目code")
     private String projectInfoCode;
+
+    @Schema(description = "最上线时间")
+    private Long lastOnlineTime;
 }

+ 21 - 0
src/main/java/com/zswl/dataservice/event/DeviceOnLineTimeChangeEvent.java

@@ -0,0 +1,21 @@
+package com.zswl.dataservice.event;
+
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * 设备状态改变
+ *
+ * @author TRX
+ * @date 2024/6/26
+ */
+public class DeviceOnLineTimeChangeEvent extends ApplicationEvent {
+
+    @Getter
+    private String deviceId;
+
+    public DeviceOnLineTimeChangeEvent(Object source, String deviceId) {
+        super(source);
+        this.deviceId = deviceId;
+    }
+}

+ 24 - 0
src/main/java/com/zswl/dataservice/event/DeviceStateChangeEvent.java

@@ -0,0 +1,24 @@
+package com.zswl.dataservice.event;
+
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * 设备状态改变
+ *
+ * @author TRX
+ * @date 2024/6/26
+ */
+public class DeviceStateChangeEvent extends ApplicationEvent {
+
+    @Getter
+    private String deviceId;
+
+    public DeviceStateChangeEvent(Object source, String deviceId) {
+        super(source);
+        this.deviceId = deviceId;
+    }
+}

+ 4 - 1
src/main/java/com/zswl/dataservice/httpRequest/conf/FullCardAPIConfig.java

@@ -33,6 +33,8 @@ public class FullCardAPIConfig {
     public static final String deviceSync = api + "/deviceSync/syncDevices";
     public static final String syncGateWays = api + "/deviceSync/syncGateWays";
 
+    public static final String syncDeviceOnLineState = api + "/deviceSync/syncDeviceOnLineState";
+    public static final String lastDeviceOnLineTime = api + "/deviceSync/lastDeviceOnLineTime";
 
     public static final HashMap<String, ApiConfParam> map = new HashMap<>();
 
@@ -55,7 +57,8 @@ public class FullCardAPIConfig {
         // ----------------------------全卡项目 start -----------------
         map.put(deviceSync, ApiConfParam.builder().apiName(deviceSync).methodType(MethodType.Json).build());
         map.put(syncGateWays, ApiConfParam.builder().apiName(syncGateWays).methodType(MethodType.Json).build());
-
+        map.put(syncDeviceOnLineState, ApiConfParam.builder().apiName(syncDeviceOnLineState).methodType(MethodType.Json).build());
+        map.put(lastDeviceOnLineTime, ApiConfParam.builder().apiName(lastDeviceOnLineTime).methodType(MethodType.Json).build());
     }
 
     public static ApiConfParam getApiConfParam(String apiName) {

+ 5 - 5
src/main/java/com/zswl/dataservice/model/mqtt/GateWayInfoModel.java

@@ -29,13 +29,13 @@ public class GateWayInfoModel extends SuperModel {
     private String projectInfoCode;
 
     @Schema(description = "设备在线状态")
-    OnLineState state;
+    private OnLineState onLineState;
 
-    String stateStr;
+    String onLineStateStr;
 
-    public String getStateStr() {
-        if (state != null) {
-            return state.getRemark();
+    public String getOnLineStateStr() {
+        if (onLineState != null) {
+            return onLineState.getRemark();
         }
         return "";
     }

+ 47 - 0
src/main/java/com/zswl/dataservice/service/base/CommonService.java

@@ -0,0 +1,47 @@
+package com.zswl.dataservice.service.base;
+
+import com.mongodb.client.result.UpdateResult;
+import com.zswl.dataservice.utils.CommonUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.data.mongodb.core.query.Criteria;
+import org.springframework.data.mongodb.core.query.Query;
+import org.springframework.data.mongodb.core.query.Update;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+
+/**
+ * @author TRX
+ * @date 2024/5/31
+ */
+@Slf4j
+@Service
+public class CommonService {
+
+    @Autowired
+    private MongoTemplate mongoTemplate;
+
+    /**
+     * 编辑数据
+     *
+     * @param standardData
+     * @param collectionName
+     * @return
+     */
+    public Object updateData(Map<String, Object> standardData, String collectionName) {
+        collectionName = CommonUtil.getCollectionName(collectionName);
+        Object id = standardData.get("id");
+        Query query = new Query(Criteria.where("_id").is(id));
+        Update update = new Update();
+        standardData.forEach((key, value) -> {
+            if (!"id".equals(key)) {
+                update.set(key, value);
+            }
+        });
+        UpdateResult updateResult = mongoTemplate.upsert(query, update, collectionName);
+        return updateResult.getUpsertedId();
+    }
+
+}

+ 5 - 1
src/main/java/com/zswl/dataservice/service/mqtt/DeviceInfoService.java

@@ -14,6 +14,7 @@ import com.zswl.dataservice.service.sync.DeviceSyncFullCardService;
 import com.zswl.dataservice.type.FunctionType;
 import com.zswl.dataservice.utils.bean.BeanUtils;
 import com.zswl.dataservice.utils.mqtt.type.LogsLevel;
+import com.zswl.dataservice.utils.mqtt.type.OnLineState;
 import com.zswl.dataservice.utils.page.PageEntityUtil;
 import com.zswl.dataservice.utils.result.ResultContent;
 import org.apache.commons.lang3.ObjectUtils;
@@ -65,9 +66,12 @@ public class DeviceInfoService {
         if (ObjectUtils.isNotEmpty(temp)) {
             deviceInfo = temp;
             param.setId(null);
+        } else {
+            deviceInfo.setActivityTime(System.currentTimeMillis());
         }
         BeanUtils.copyProperties(param, deviceInfo, "id");
-
+        deviceInfo.setLastOnlineTime(System.currentTimeMillis());
+        deviceInfo.setOnLineState(OnLineState.OnLine);
         // 项目
         if (StringUtils.isNotEmpty(param.getProjectInfoCode())) {
             ProjectInfo projectInfo = projectInfoDao.findTopByCode(param.getProjectInfoCode());

+ 111 - 0
src/main/java/com/zswl/dataservice/service/mqtt/DevicePingInfoService.java

@@ -0,0 +1,111 @@
+package com.zswl.dataservice.service.mqtt;
+
+import cn.hutool.json.JSONUtil;
+import com.github.microservice.models.hxz.ConsumTransactionsModel;
+import com.github.microservice.models.hxz.DevicePingInfoParam;
+import com.zswl.dataservice.dao.mqtt.DeviceInfoDao;
+import com.zswl.dataservice.dao.mqtt.DevicePingInfoDao;
+import com.zswl.dataservice.domain.ExecuteAnnotationService;
+import com.zswl.dataservice.domain.ExecuteAnnotationServiceMethod;
+import com.zswl.dataservice.domain.mqtt.DeviceInfo;
+import com.zswl.dataservice.domain.mqtt.DevicePingInfo;
+import com.zswl.dataservice.service.base.CommonService;
+import com.zswl.dataservice.service.base.SuperService;
+import com.zswl.dataservice.service.sync.DeviceSyncFullCardService;
+import com.zswl.dataservice.type.DataState;
+import com.zswl.dataservice.utils.DateUtils;
+import com.zswl.dataservice.utils.bean.BeanUtils;
+import com.zswl.dataservice.utils.mqtt.type.OnLineState;
+import com.zswl.dataservice.utils.result.ResultContent;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ObjectUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author TRX
+ * @date 2024/7/3
+ */
+@Slf4j
+@Service
+@ExecuteAnnotationService
+public class DevicePingInfoService extends SuperService {
+
+    @Autowired
+    DevicePingInfoDao devicePingInfoDao;
+
+    @Autowired
+    DeviceInfoDao deviceInfoDao;
+
+    @Autowired
+    CommonService commonService;
+
+    @Autowired
+    DeviceSyncFullCardService deviceSyncFullCardService;
+
+    @ExecuteAnnotationServiceMethod(value = "devicePing", remark = "设备心跳")
+    public ResultContent<Object> devicePing(String dataStr) {
+        DevicePingInfoParam param = JSONUtil.toBean(dataStr, DevicePingInfoParam.class);
+        String deviceId = param.getDeviceId();
+        DeviceInfo deviceInfo = deviceInfoDao.findTopByDeviceId(deviceId);
+        if (ObjectUtils.isNotEmpty(deviceInfo)) {
+            Long time = System.currentTimeMillis();
+            Map<String, Object> standardData = new HashMap<String, Object>();
+            standardData.put("id", deviceInfo.getId());
+            standardData.put("lastOnlineTime", time);
+            commonService.updateData(standardData, DeviceInfo.class.getSimpleName());
+
+            // ping记录
+            DevicePingInfo devicePingInfo = new DevicePingInfo();
+            devicePingInfo.setDeviceId(deviceId);
+            devicePingInfo.setDeviceName(deviceInfo.getDeviceName());
+            devicePingInfo.setGateWayId(param.getGateWayId());
+            devicePingInfo.setProjectInfoCode(deviceInfo.getProjectInfoCode());
+            devicePingInfo.setTTL(new Date(System.currentTimeMillis() + 7 * 24L * 60 * 60 * 1000L));
+            devicePingInfo.setTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
+            devicePingInfoDao.save(devicePingInfo);
+            deviceSyncFullCardService.noticeSyncDeviceOnlineTimeChange(deviceId);
+        } else {
+            log.info("心跳设备未找到: {}", deviceId);
+        }
+        return ResultContent.buildSuccess();
+    }
+
+    /**
+     * 检查
+     */
+    public void checkDeviceState() {
+        log.info("checkDeviceState: {}", DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
+        List<DeviceInfo> list = deviceInfoDao.findAll();
+        if (ObjectUtils.isNotEmpty(list)) {
+            long time = System.currentTimeMillis();
+            for (DeviceInfo deviceInfo : list) {
+                OnLineState onLineState = OnLineState.OnLine;
+                List<DevicePingInfo> _list = devicePingInfoDao.findTop5ByDeviceIdOrderByCreateTimeDesc(deviceInfo.getDeviceId());
+                if (_list != null && _list.size() > 2) {
+                    Long firstTime = _list.get(0).getCreateTime();
+                    Long lastTime = _list.get(_list.size() - 1).getCreateTime();
+                    long avgTime = (firstTime - lastTime) / _list.size();
+                    log.info("avgTime: {} {}", (firstTime - lastTime), avgTime);
+                    if ((time - firstTime) > avgTime) {
+                        onLineState = OnLineState.OffLine;
+                    }
+                }
+
+                if (onLineState != deviceInfo.getOnLineState()) {
+                    Map<String, Object> standardData = new HashMap<String, Object>();
+                    standardData.put("id", deviceInfo.getId());
+                    standardData.put("onLineState", onLineState);
+                    commonService.updateData(standardData, DeviceInfo.class.getSimpleName());
+                }
+                deviceSyncFullCardService.noticeSyncDeviceOnlineStateChange(deviceInfo.getDeviceId());
+            }
+        }
+    }
+
+}

+ 3 - 1
src/main/java/com/zswl/dataservice/service/mqtt/GateWayInfoService.java

@@ -88,9 +88,11 @@ public class GateWayInfoService extends SuperService {
         if (ObjectUtils.isEmpty(gateWayInfo)) {
             gateWayInfo = new GateWayInfo();
         }
+        gateWayInfo.setOnLineState(OnLineState.OnLine);
+        gateWayInfo.setLastOnlineTime(System.currentTimeMillis());
         BeanUtils.copyProperties(param, gateWayInfo, "id");
         if (param.getState() == null) {
-            gateWayInfo.setState(OnLineState.OffLine);
+            gateWayInfo.setOnLineState(OnLineState.OffLine);
         }
         gateWayInfoDao.save(gateWayInfo);
         if (StringUtils.isNotEmpty(param.getProjectInfoCode())) {

+ 60 - 0
src/main/java/com/zswl/dataservice/service/sync/DeviceSyncFullCardService.java

@@ -3,12 +3,15 @@ package com.zswl.dataservice.service.sync;
 import com.github.microservice.models.device.DeviceInfoSyncParam;
 import com.github.microservice.models.device.DeviceSyncListParam;
 import com.github.microservice.models.device.GateWaySyncParam;
+import com.github.microservice.models.hxz.DevicePingInfoParam;
 import com.zswl.dataservice.dao.mqtt.DeviceInfoDao;
 import com.zswl.dataservice.dao.mqtt.GateWay2DeviceDao;
 import com.zswl.dataservice.dao.mqtt.GateWayInfoDao;
 import com.zswl.dataservice.domain.mqtt.DeviceInfo;
 import com.zswl.dataservice.domain.mqtt.GateWay2Device;
 import com.zswl.dataservice.domain.mqtt.GateWayInfo;
+import com.zswl.dataservice.event.DeviceOnLineTimeChangeEvent;
+import com.zswl.dataservice.event.DeviceStateChangeEvent;
 import com.zswl.dataservice.event.DeviceSyncEvent;
 import com.zswl.dataservice.event.GateWaySyncEvent;
 import com.zswl.dataservice.httpRequest.ApiRequestService;
@@ -184,4 +187,61 @@ public class DeviceSyncFullCardService extends SuperService {
             log.info("同步网关情况:{} {}", api.isSuccess(), api.getMsg());
         }
     }
+
+    public ResultContent noticeSyncDeviceOnlineStateChange(String deviceId) {
+        if (ObjectUtils.isNotEmpty(deviceId)) {
+            DeviceStateChangeEvent event = new DeviceStateChangeEvent(this, deviceId);
+            applicationContext.publishEvent(event);
+        }
+        return ResultContent.buildSuccess();
+    }
+
+    @EventListener(classes = DeviceStateChangeEvent.class)
+    @Async
+    @SneakyThrows
+    public void syncDeviceOnlineStateChange(DeviceStateChangeEvent event) {
+        String deviceId = event.getDeviceId();
+        log.info("event syncDeviceOnlineStateChange: {}", deviceId);
+        DeviceInfo deviceInfo = deviceInfoDao.findTopByDeviceId(deviceId);
+        if (ObjectUtils.isNotEmpty(deviceInfo)) {
+            DevicePingInfoParam param = new DevicePingInfoParam();
+            param.setDeviceId(deviceInfo.getDeviceId());
+            param.setLastOnlineTime(deviceInfo.getLastOnlineTime());
+            param.setOnLineState(deviceInfo.getOnLineState().name());
+            param.setProjectInfoCode(deviceInfo.getProjectInfoCode());
+
+            APIResponseModel api = apiRequestService.sendFullCardAPI(FullCardAPIConfig.syncDeviceOnLineState, param);
+            log.info("同步设备在线情况:{} {}", api.isSuccess(), api.getMsg());
+
+        }
+    }
+
+    public ResultContent noticeSyncDeviceOnlineTimeChange(String deviceId) {
+        if (ObjectUtils.isNotEmpty(deviceId)) {
+            DeviceOnLineTimeChangeEvent event = new DeviceOnLineTimeChangeEvent(this, deviceId);
+            applicationContext.publishEvent(event);
+        }
+        return ResultContent.buildSuccess();
+    }
+
+    @EventListener(classes = DeviceOnLineTimeChangeEvent.class)
+    @Async
+    @SneakyThrows
+    public void syncDeviceOnlineTimeChange(DeviceOnLineTimeChangeEvent event) {
+        String deviceId = event.getDeviceId();
+        log.info("event syncDeviceOnlineTimeChange: {}", deviceId);
+        DeviceInfo deviceInfo = deviceInfoDao.findTopByDeviceId(deviceId);
+        if (ObjectUtils.isNotEmpty(deviceInfo)) {
+            DevicePingInfoParam param = new DevicePingInfoParam();
+            param.setDeviceId(deviceInfo.getDeviceId());
+            param.setLastOnlineTime(deviceInfo.getLastOnlineTime());
+            param.setOnLineState(deviceInfo.getOnLineState().name());
+            param.setProjectInfoCode(deviceInfo.getProjectInfoCode());
+
+            APIResponseModel api = apiRequestService.sendFullCardAPI(FullCardAPIConfig.lastDeviceOnLineTime, param);
+            log.info("同步设备在线时间:{} {}", api.isSuccess(), api.getMsg());
+
+        }
+    }
+
 }

+ 28 - 0
src/main/java/com/zswl/dataservice/work/CheckDeviceStateWork.java

@@ -0,0 +1,28 @@
+package com.zswl.dataservice.work;
+
+import com.zswl.dataservice.service.mqtt.DevicePingInfoService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+/**
+ * 定时检查设备在线状态
+ *
+ * @author TRX
+ * @date 2024/7/3
+ */
+@Slf4j
+@EnableScheduling
+@Component
+public class CheckDeviceStateWork {
+
+    @Autowired
+    private DevicePingInfoService devicePingInfoService;
+
+    @Scheduled(fixedRate = 1000 * 10)
+    public void checkDeviceState() {
+        devicePingInfoService.checkDeviceState();
+    }
+}