Browse Source

更新!

TRX 1 year ago
parent
commit
0a700820b4

+ 2 - 0
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/dao/mqtt/GateWay2DeviceDao.java

@@ -17,5 +17,7 @@ public interface GateWay2DeviceDao extends org.springframework.data.mongodb.repo
     // 根据设备查询绑定关系
     List<GateWay2Device> findByDeviceInfo(DeviceInfo deviceInfo);
 
+    List<GateWay2Device> findByGateWayInfo(GateWayInfo gateWayInfo);
+
     GateWay2Device findTopByDeviceInfoOrderByUpdateTimeDesc(DeviceInfo deviceInfo);
 }

+ 21 - 0
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/dao/mqtt/GateWayMqttSecurityDao.java

@@ -0,0 +1,21 @@
+package com.zhongshu.iot.server.core.dao.mqtt;
+
+import com.zhongshu.iot.server.core.domain.iot.mqtt.GateWayInfo;
+import com.zhongshu.iot.server.core.domain.iot.mqtt.GateWayMqttSecurity;
+import com.zhongshu.iot.server.core.domain.iot.mqtt.MqttInfo;
+
+import java.util.List;
+
+/**
+ * @author TRX
+ * @date 2024/3/21
+ */
+public interface GateWayMqttSecurityDao extends org.springframework.data.mongodb.repository.MongoRepository<GateWayMqttSecurity, String> {
+
+    GateWayMqttSecurity findTopById(String id);
+
+    List<GateWayMqttSecurity> findByMqttInfoAndGateWayInfo(MqttInfo mqttInfo, GateWayInfo gateWayInfo);
+
+    void deleteByMqttInfoAndGateWayInfo(MqttInfo mqttInfo, GateWayInfo gateWayInfo);
+
+}

+ 52 - 0
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/domain/iot/mqtt/GateWayMqttSecurity.java

@@ -0,0 +1,52 @@
+package com.zhongshu.iot.server.core.domain.iot.mqtt;
+
+import com.zhongshu.iot.server.core.domain.base.SuperEntity;
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.springframework.data.mongodb.core.mapping.DBRef;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+/**
+ * @author TRX
+ * @date 2025/1/2
+ */
+@Data
+@Document
+@NoArgsConstructor
+@AllArgsConstructor
+public class GateWayMqttSecurity extends SuperEntity {
+
+    @Schema(description = "连接信息")
+    @DBRef(lazy = true)
+    private MqttInfo mqttInfo;
+
+    @Schema(description = "连接名称")
+    private String mqttName;
+
+    @Schema(description = "网关信息信息")
+    @DBRef(lazy = true)
+    private GateWayInfo gateWayInfo;
+
+    @Schema(description = "网关ID")
+    private String gateWayId;
+
+    @Schema(description = "地址,如:/v1/device/0144572908/#")
+    private String addressMatch;
+
+    @Schema(description = "连接账号")
+    private String userName;
+
+    @Schema(description = "角色名称")
+    private String roleName;
+
+    @Schema(description = "是否同步成功")
+    private Boolean isSync = Boolean.FALSE;
+
+    @Schema(description = "同步消息")
+    private String msg = "";
+
+    @Schema(description = "时间可读字符串")
+    private String timeStr;
+}

+ 14 - 33
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/service/mqtt/GateWayInfoService.java

@@ -5,6 +5,7 @@ import com.github.microservice.net.ResultContent;
 import com.zhongshu.iot.client.model.mqtt.*;
 import com.zhongshu.iot.server.core.dao.mqtt.*;
 import com.zhongshu.iot.server.core.domain.iot.mqtt.*;
+import com.zhongshu.iot.server.core.service.base.AkSignService;
 import com.zhongshu.iot.server.core.service.base.SuperService;
 import com.zhongshu.iot.server.core.service.iot.IotServiceImpl;
 import com.zhongshu.iot.server.core.service.sync.DeviceSyncFullCardService;
@@ -87,6 +88,12 @@ public class GateWayInfoService extends SuperService {
     @Autowired
     private MqttInfoService mqttInfoService;
 
+    @Autowired
+    private GateWayUserInfoService gateWayUserInfoService;
+
+    @Autowired
+    private AkSignService akSignService;
+
     /**
      * 添加网关
      *
@@ -112,6 +119,7 @@ public class GateWayInfoService extends SuperService {
         log.info("网关注册成功");
         // 通知同步
         deviceSyncFullCardService.noticeSyncGateWay(gateWayInfo);
+        akSignService.createAkSk();
         return ResultContent.buildSuccess(gateWayInfo);
     }
 
@@ -169,22 +177,9 @@ public class GateWayInfoService extends SuperService {
             }
         }
 
-        if (StringUtils.isEmpty(param.getUserName())) {
-            param.setUserName("admin");
-        }
-        // 验证连接账号
-        GateWayUserInfo gateWayUserInfo = gateWayUserInfoDao.findTopByUserName(param.getUserName());
-        if (ObjectUtils.isEmpty(gateWayUserInfo)) {
-            return ResultContent.buildFail(String.format("连接账号不存在:%s", param.getUserName()));
-        }
-        // 验证账号是否已绑定网关
-        GateWay2User gateWay2User = gateWay2UserDao.findTopByGateWayUserInfo(gateWayUserInfo);
-        if (ObjectUtils.isNotEmpty(gateWay2User)) {
-            // 如果有绑定关系
-            GateWayInfo oldGateWayInfo = gateWay2User.getGateWayInfo();
-            if (!oldGateWayInfo.getGateWayId().equals(gateWayInfo.getGateWayId())) {
-//                return ResultContent.buildFail(String.format("连接账号%s已使用", gateWayUserInfo.getUserName()));
-            }
+        GateWay2User gateWay2User = gateWay2UserDao.findTopByGateWayInfo(gateWayInfo);
+        if (ObjectUtils.isEmpty(gateWay2User)) {
+            return ResultContent.buildFail(String.format("网关为分配连接账户: %S", gateWay2User.getGateWayId()));
         }
 
         // 设备列表
@@ -217,26 +212,12 @@ public class GateWayInfoService extends SuperService {
             }
         }
 
-        // 绑定网关和用户的关系
-        if (ObjectUtils.isEmpty(gateWay2User)) {
-            gateWay2User = new GateWay2User();
-        }
-        gateWay2User.setGateWayUserInfo(gateWayUserInfo);
-        gateWay2User.setGateWayInfo(gateWayInfo);
-        gateWay2User.setBindTime(System.currentTimeMillis());
-        gateWay2User.setBindTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
-        gateWay2UserDao.save(gateWay2User);
-
-        // 添加连接账号关于:网关、设备的 Topic权限
-        bindUserGateWayPermissions(MqttTopicUtils.getUserRoleName(gateWayUserInfo), gateWayInfo.getGateWayId());
-
-        // 绑定用户设备权限
-        bindUserDevicesPermissions(MqttTopicUtils.getUserRoleName(gateWayUserInfo), deviceInfos);
-
+        // 更新网关的权限
+        gateWayUserInfoService.syncSecurityToMQTTService(gateWayInfo);
         log.info("设备注册成功:{}", deviceInfos.size());
+
         // 同步设备
         deviceSyncFullCardService.noticeSyncDevice(deviceInfos);
-
         return ResultContent.buildSuccess();
     }
 

+ 99 - 4
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/service/mqtt/GateWayUserInfoService.java

@@ -6,10 +6,7 @@ import com.zhongshu.iot.client.model.mqtt.GateWayUserInfoModel;
 import com.zhongshu.iot.client.model.mqtt.GateWayUserInfoNameParam;
 import com.zhongshu.iot.client.model.mqtt.GateWayUserInfoSearchParam;
 import com.zhongshu.iot.client.type.type.MqttUserState;
-import com.zhongshu.iot.server.core.dao.mqtt.GateWay2UserDao;
-import com.zhongshu.iot.server.core.dao.mqtt.GateWayUserInfoDao;
-import com.zhongshu.iot.server.core.dao.mqtt.Mqtt2UserDao;
-import com.zhongshu.iot.server.core.dao.mqtt.MqttInfoDao;
+import com.zhongshu.iot.server.core.dao.mqtt.*;
 import com.zhongshu.iot.server.core.domain.iot.mqtt.*;
 import com.zhongshu.iot.server.core.util.CommonUtil;
 import com.zhongshu.iot.server.core.util.DateUtils;
@@ -33,7 +30,9 @@ import javax.management.ObjectName;
 import javax.management.remote.JMXConnector;
 import javax.management.remote.JMXConnectorFactory;
 import javax.management.remote.JMXServiceURL;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * @author TRX
@@ -55,6 +54,11 @@ public class GateWayUserInfoService {
     @Autowired
     private GateWay2UserDao gateWay2UserDao;
 
+    @Autowired
+    private GateWayMqttSecurityDao gateWayMqttSecurityDao;
+    @Autowired
+    private GateWay2DeviceDao gateWay2DeviceDao;
+
     /**
      * 初始mqtt连接账号信息
      *
@@ -322,6 +326,97 @@ public class GateWayUserInfoService {
         }
     }
 
+    public ResultContent syncSecurityToMQTTService(GateWayInfo gateWayInfo) {
+        //todo同步权限
+        List<MqttInfo> list = mqttInfoDao.findAll();
+        log.info("syncSecurityToMQTTService {}", list.size());
+        if (ObjectUtils.isNotEmpty(list)) {
+            for (MqttInfo mqttInfo : list) {
+                syncMqttSecuritySettings(mqttInfo, List.of(gateWayInfo));
+            }
+        }
+        return ResultContent.buildSuccess();
+    }
+
+    /**
+     * 同步网关可监听的权限
+     *
+     * @param mqttInfo
+     * @param gateWayInfos
+     * @return
+     */
+    public void syncMqttSecuritySettings(MqttInfo mqttInfo, List<GateWayInfo> gateWayInfos) {
+        if (ObjectUtils.isNotEmpty(mqttInfo) && ObjectUtils.isNotEmpty(gateWayInfos)) {
+            try {
+                String urlStr = JMXUtil.buildServiceURL(mqttInfo);
+                JMXServiceURL url = new JMXServiceURL(urlStr);
+                @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null);
+                connector.connect();
+                log.info("JMX {}:{} 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
+                MBeanServerConnection connection = connector.getMBeanServerConnection();
+                ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName();
+                ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
+
+                for (GateWayInfo gateWayInfo : gateWayInfos) {
+                    // 以前的权限
+                    List<GateWayMqttSecurity> list = gateWayMqttSecurityDao.findByMqttInfoAndGateWayInfo(mqttInfo, gateWayInfo);
+                    gateWayMqttSecurityDao.deleteByMqttInfoAndGateWayInfo(mqttInfo, gateWayInfo);
+
+                    // 查询网关关联的用户
+                    GateWay2User gateWay2User = gateWay2UserDao.findTopByGateWayInfo(gateWayInfo);
+                    if (ObjectUtils.isEmpty(gateWay2User)) {
+                        log.error("网关对应用户信息为不存在");
+                        continue;
+                    }
+                    GateWayUserInfo gateWayUserInfo = gateWay2User.getGateWayUserInfo();
+                    if (ObjectUtils.isEmpty(gateWayUserInfo)) {
+                        log.error("网关对应用户信息为空");
+                        continue;
+                    }
+                    // 查询网关对应的设备
+                    List<GateWay2Device> gateWay2Devices = gateWay2DeviceDao.findByGateWayInfo(gateWayInfo);
+                    List<DeviceInfo> deviceInfos = new ArrayList<>();
+                    if (ObjectUtils.isNotEmpty(gateWay2Devices)) {
+                        deviceInfos = gateWay2Devices.stream().map(it -> it.getDeviceInfo()).collect(Collectors.toList());
+                    }
+                    List<String> addressMatchs = JMXUtil.buildAddressMatch(gateWayInfo, deviceInfos);
+                    if (ObjectUtils.isNotEmpty(addressMatchs)) {
+                        String userName = gateWayUserInfo.getUserName();
+                        String roleName = gateWayUserInfo.getRoleName();
+
+                        List<GateWayMqttSecurity> securities = new ArrayList<>();
+                        addressMatchs.parallelStream().forEach(addressMatch -> {
+                            GateWayMqttSecurity gateWayMqttSecurity = new GateWayMqttSecurity();
+                            gateWayMqttSecurity.setUserName(userName);
+                            gateWayMqttSecurity.setRoleName(roleName);
+                            gateWayMqttSecurity.setGateWayInfo(gateWayInfo);
+                            gateWayMqttSecurity.setGateWayId(gateWayInfo.getGateWayId());
+                            gateWayMqttSecurity.setMqttInfo(mqttInfo);
+                            gateWayMqttSecurity.setMqttName(mqttInfo.getName());
+                            try {
+                                addressControl.addSecuritySettings(
+                                        addressMatch,
+                                        roleName, roleName, roleName, roleName, roleName, roleName, roleName,
+                                        roleName, roleName, roleName);
+                                gateWayMqttSecurity.setIsSync(Boolean.TRUE);
+                                gateWayMqttSecurity.setMsg("同步成功");
+                            } catch (Exception e) {
+                                e.printStackTrace();
+                                gateWayMqttSecurity.setIsSync(Boolean.FALSE);
+                                gateWayMqttSecurity.setMsg(String.format("同步失败:%s", e.getMessage()));
+                            }
+                            gateWayMqttSecurity.setTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
+                            securities.add(gateWayMqttSecurity);
+                        });
+                        gateWayMqttSecurityDao.saveAll(securities);
+                    }
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
     //-----------------------------同步权限数据 end----------------------
 
     /**

+ 2 - 3
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/service/mqtt/MqttInfoService.java

@@ -75,11 +75,10 @@ public class MqttInfoService {
 
         // 查询和绑定分配网关的mqtt账号
         GateWayUserInfo gateWayUserInfo = gateWayUserInfoService.gateWayBindMqttUser(gatewayInfo);
-
         if (ObjectUtils.isNotEmpty(mqttInfo)) {
             mqttInfoSimpleModel.setBrokerAddress(String.format("%s:%s", mqttInfo.getBrokerHost(), mqttInfo.getBrokerPort()));
-            mqttInfoSimpleModel.setBrokerUsername(mqttInfo.getUserName());
-            mqttInfoSimpleModel.setBrokerPassword(mqttInfo.getPassword());
+            mqttInfoSimpleModel.setBrokerUsername(gateWayUserInfo.getUserName());
+            mqttInfoSimpleModel.setBrokerPassword(gateWayUserInfo.getPassWord());
         }
         return mqttInfoSimpleModel;
     }

+ 26 - 1
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/util/JMXUtil.java

@@ -2,8 +2,14 @@ package com.zhongshu.iot.server.core.util;
 
 import cn.hutool.json.JSONArray;
 import cn.hutool.json.JSONObject;
+import com.zhongshu.iot.server.core.domain.iot.mqtt.DeviceInfo;
+import com.zhongshu.iot.server.core.domain.iot.mqtt.GateWayInfo;
 import com.zhongshu.iot.server.core.domain.iot.mqtt.MqttInfo;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ObjectUtils;
+
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * @author TRX
@@ -19,7 +25,6 @@ public class JMXUtil {
      * @return
      */
     public static boolean mqttUserIsExit(String userStr) {
-        log.info("userStr: {}", userStr);
         // [{"username":"bkjhpahp","roles":["bkjhpahp"]}]
         try {
             JSONArray array = new JSONArray(userStr);
@@ -44,4 +49,24 @@ public class JMXUtil {
         return String.format("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
     }
 
+    /**
+     * 创建监听 地址集合
+     *
+     * @param gateWayInfo
+     * @param deviceInfoList
+     * @return
+     */
+    public static List<String> buildAddressMatch(GateWayInfo gateWayInfo, List<DeviceInfo> deviceInfoList) {
+        List<String> list = new ArrayList<String>();
+        if (ObjectUtils.isNotEmpty(gateWayInfo)) {
+            list.add(String.format("/v1/gateway/%s/#", gateWayInfo.getGateWayId()));
+        }
+        if (ObjectUtils.isNotEmpty(deviceInfoList)) {
+            deviceInfoList.stream().forEach(deviceInfo -> {
+                list.add(String.format("/v1/device/%s/#", deviceInfo.getDeviceId()));
+            });
+        }
+        return list;
+    }
+
 }