Bladeren bron

更新!

TRX 1 jaar geleden
bovenliggende
commit
9112190569

+ 1 - 0
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/service/mqtt/GateWayInfoService.java

@@ -139,6 +139,7 @@ public class GateWayInfoService extends SuperService {
 
         // 给网关分配个mqtt账号
         MqttInfoReturnModel mqttInfoSimpleModel = mqttInfoService.getCommonMqttInfo(gateWayInfo.getContent());
+
         return ResultContent.buildSuccess(mqttInfoSimpleModel);
     }
 

+ 104 - 4
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/service/mqtt/GateWayUserInfoService.java

@@ -278,7 +278,7 @@ public class GateWayUserInfoService {
                 JMXServiceURL url = new JMXServiceURL(urlStr);
                 @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null);
                 connector.connect();
-                log.info("JMX {}:{} 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
+                log.info("JMX syncMqttUsers {}:{} 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
                 MBeanServerConnection connection = connector.getMBeanServerConnection();
                 ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName();
                 ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
@@ -331,7 +331,7 @@ public class GateWayUserInfoService {
                 JMXServiceURL url = new JMXServiceURL(urlStr);
                 @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null);
                 connector.connect();
-                log.info("JMX {}:{} 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
+                log.info("JMX deleteMqttUsers {}:{} 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
                 MBeanServerConnection connection = connector.getMBeanServerConnection();
                 ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName();
                 ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
@@ -369,6 +369,18 @@ public class GateWayUserInfoService {
         return ResultContent.buildSuccess();
     }
 
+    public ResultContent syncSecurityGateToMQTTService(GateWayInfo gateWayInfo) {
+        //todo同步权限
+        List<MqttInfo> list = mqttInfoDao.findAll();
+        log.info("syncSecurityGateToMQTTService {}", list.size());
+        if (ObjectUtils.isNotEmpty(list)) {
+            for (MqttInfo mqttInfo : list) {
+                syncMqttSecurityGateSettings(mqttInfo, List.of(gateWayInfo));
+            }
+        }
+        return ResultContent.buildSuccess();
+    }
+
     public ResultContent syncSecurityToMQTTService(List<GateWayInfo> gateWayInfos) {
         //todo同步权限
         List<MqttInfo> list = mqttInfoDao.findAll();
@@ -395,11 +407,19 @@ public class GateWayUserInfoService {
                 JMXServiceURL url = new JMXServiceURL(urlStr);
                 @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null);
                 connector.connect();
-                log.info("JMX {}:{} 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
+                log.info("syncMqttSecuritySettings JMX {}:{} 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
                 MBeanServerConnection connection = connector.getMBeanServerConnection();
                 ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName();
                 ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
 
+                final String _roleName = MqttConfig.adminRoleName;
+                addressControl.addSecuritySettings("#", _roleName, _roleName, _roleName, _roleName,
+                        _roleName, _roleName, _roleName, _roleName,
+                        _roleName, _roleName);
+//
+//                addressControl.addSecuritySettings("/v1/gateway/#", _roleName, _roleName, _roleName, _roleName,
+//                        _roleName, _roleName, _roleName, _roleName, _roleName, _roleName);
+
                 for (GateWayInfo gateWayInfo : gateWayInfos) {
                     // 以前的权限
                     List<GateWayMqttSecurity> list = gateWayMqttSecurityDao.findByMqttInfoAndGateWayInfo(mqttInfo, gateWayInfo);
@@ -428,7 +448,84 @@ public class GateWayUserInfoService {
                         String roleName = gateWayUserInfo.getRoleName();
 
                         List<GateWayMqttSecurity> securities = new ArrayList<>();
-                        addressMatchs.parallelStream().forEach(addressMatch -> {
+                        addressMatchs.stream().forEach(addressMatch -> {
+                            GateWayMqttSecurity gateWayMqttSecurity = new GateWayMqttSecurity();
+                            gateWayMqttSecurity.setUserName(userName);
+                            gateWayMqttSecurity.setRoleName(roleName);
+                            gateWayMqttSecurity.setGateWayInfo(gateWayInfo);
+                            gateWayMqttSecurity.setGateWayId(gateWayInfo.getGateWayId());
+                            gateWayMqttSecurity.setMqttInfo(mqttInfo);
+                            gateWayMqttSecurity.setMqttName(mqttInfo.getName());
+                            gateWayMqttSecurity.setAddressMatch(addressMatch);
+                            try {
+                                if (!roleName.equals(MqttConfig.adminRoleName)) {
+                                    addressControl.addSecuritySettings(addressMatch, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName);
+                                }
+                                addressControl.addSecuritySettings(addressMatch, _roleName, _roleName, _roleName, _roleName,
+                                        _roleName, _roleName, _roleName, _roleName,
+                                        _roleName, _roleName);
+
+                                gateWayMqttSecurity.setIsSync(Boolean.TRUE);
+                                gateWayMqttSecurity.setMsg("同步成功");
+                            } catch (Exception e) {
+                                e.printStackTrace();
+                                gateWayMqttSecurity.setIsSync(Boolean.FALSE);
+                                gateWayMqttSecurity.setMsg(String.format("同步失败:%s", e.getMessage()));
+                            }
+                            gateWayMqttSecurity.setTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
+                            securities.add(gateWayMqttSecurity);
+                        });
+                        gateWayMqttSecurityDao.saveAll(securities);
+                    }
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    /**
+     * 只同步网关的权限
+     *
+     * @param mqttInfo
+     * @param gateWayInfos
+     */
+    public void syncMqttSecurityGateSettings(MqttInfo mqttInfo, List<GateWayInfo> gateWayInfos) {
+        if (ObjectUtils.isNotEmpty(mqttInfo) && ObjectUtils.isNotEmpty(gateWayInfos)) {
+            try {
+                String urlStr = JMXUtil.buildServiceURL(mqttInfo);
+                JMXServiceURL url = new JMXServiceURL(urlStr);
+                @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null);
+                connector.connect();
+                log.info("syncMqttSecurityGateSettings JMX {}:{} 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
+                MBeanServerConnection connection = connector.getMBeanServerConnection();
+                ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName();
+                ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
+
+                final String _roleName = MqttConfig.adminRoleName;
+                addressControl.addSecuritySettings("#", _roleName, _roleName, _roleName, _roleName,
+                        _roleName, _roleName, _roleName, _roleName,
+                        _roleName, _roleName);
+
+                for (GateWayInfo gateWayInfo : gateWayInfos) {
+                    // 查询网关关联的用户
+                    GateWay2User gateWay2User = gateWay2UserDao.findTopByGateWayInfo(gateWayInfo);
+                    if (ObjectUtils.isEmpty(gateWay2User)) {
+                        log.error("网关对应用户信息为不存在");
+                        continue;
+                    }
+                    GateWayUserInfo gateWayUserInfo = gateWay2User.getGateWayUserInfo();
+                    if (ObjectUtils.isEmpty(gateWayUserInfo)) {
+                        log.error("网关对应用户信息为空");
+                        continue;
+                    }
+                    List<String> addressMatchs = JMXUtil.buildGateWayAddressMatch(gateWayInfo);
+                    if (ObjectUtils.isNotEmpty(addressMatchs)) {
+                        String userName = gateWayUserInfo.getUserName();
+                        String roleName = gateWayUserInfo.getRoleName();
+
+                        List<GateWayMqttSecurity> securities = new ArrayList<>();
+                        addressMatchs.stream().forEach(addressMatch -> {
                             GateWayMqttSecurity gateWayMqttSecurity = new GateWayMqttSecurity();
                             gateWayMqttSecurity.setUserName(userName);
                             gateWayMqttSecurity.setRoleName(roleName);
@@ -441,6 +538,9 @@ public class GateWayUserInfoService {
                                 if (!roleName.equals(MqttConfig.adminRoleName)) {
                                     addressControl.addSecuritySettings(addressMatch, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName);
                                 }
+                                addressControl.addSecuritySettings(addressMatch, _roleName, _roleName, _roleName, _roleName,
+                                        _roleName, _roleName, _roleName, _roleName,
+                                        _roleName, _roleName);
                                 gateWayMqttSecurity.setIsSync(Boolean.TRUE);
                                 gateWayMqttSecurity.setMsg("同步成功");
                             } catch (Exception e) {

+ 7 - 2
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/service/mqtt/MqttInfoService.java

@@ -68,16 +68,21 @@ public class MqttInfoService {
      * @return
      */
     public MqttInfoReturnModel getCommonMqttInfo(GateWayInfo gatewayInfo) {
+        // 返回信息
         MqttInfoReturnModel mqttInfoSimpleModel = new MqttInfoReturnModel();
+
+        // 查询一个通用的连接信息
         MqttInfo mqttInfo = mqttInfoDao.findTopByAddress(AddressType.Common);
 
         // 查询和绑定分配网关的mqtt账号
         GateWayUserInfo gateWayUserInfo = gateWayUserInfoService.gateWayBindMqttUser(gatewayInfo);
         if (ObjectUtils.isNotEmpty(mqttInfo)) {
             mqttInfoSimpleModel.setBrokerAddress(String.format("%s:%s", mqttInfo.getBrokerHost(), mqttInfo.getBrokerPort()));
-            mqttInfoSimpleModel.setBrokerUsername(mqttInfo.getUserName());
-            mqttInfoSimpleModel.setBrokerPassword(mqttInfo.getPassword());
+            mqttInfoSimpleModel.setBrokerUsername(gateWayUserInfo.getUserName());
+            mqttInfoSimpleModel.setBrokerPassword(gateWayUserInfo.getPassWord());
         }
+        // 只同步网关的 权限
+        gateWayUserInfoService.syncSecurityGateToMQTTService(gatewayInfo);
         return mqttInfoSimpleModel;
     }
 

+ 9 - 1
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/util/JMXUtil.java

@@ -59,7 +59,7 @@ public class JMXUtil {
     public static List<String> buildAddressMatch(GateWayInfo gateWayInfo, List<DeviceInfo> deviceInfoList) {
         List<String> list = new ArrayList<String>();
         if (ObjectUtils.isNotEmpty(gateWayInfo)) {
-            list.add(String.format("/v1/gateway/%s/#", gateWayInfo.getGateWayId()));
+            list.addAll(buildGateWayAddressMatch(gateWayInfo));
         }
         if (ObjectUtils.isNotEmpty(deviceInfoList)) {
             deviceInfoList.stream().forEach(deviceInfo -> {
@@ -69,4 +69,12 @@ public class JMXUtil {
         return list;
     }
 
+    public static List<String> buildGateWayAddressMatch(GateWayInfo gateWayInfo) {
+        List<String> list = new ArrayList<String>();
+        if (ObjectUtils.isNotEmpty(gateWayInfo)) {
+            list.add(String.format("/v1/gateway/%s/#", gateWayInfo.getGateWayId()));
+        }
+        return list;
+    }
+
 }