|
|
@@ -409,6 +409,107 @@ public class JMXSyncService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 同步设备权限
|
|
|
+ *
|
|
|
+ * @param deviceInfos
|
|
|
+ */
|
|
|
+ public void syncDevicesSecurity(List<DeviceInfo> deviceInfos) {
|
|
|
+ if (ObjectUtils.isNotEmpty(deviceInfos)) {
|
|
|
+ List<MqttInfo> list = mqttInfoDao.findAll();
|
|
|
+ log.info("syncDevicesSecurity {}", list.size());
|
|
|
+ if (ObjectUtils.isNotEmpty(list)) {
|
|
|
+ list.parallelStream().forEach(mqttInfo -> {
|
|
|
+ syncMqttSecuritySettingsByDevices(mqttInfo, deviceInfos);
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void syncMqttSecuritySettingsByDevices(MqttInfo mqttInfo, List<DeviceInfo> deviceInfos) {
|
|
|
+ if (ObjectUtils.isNotEmpty(mqttInfo) && ObjectUtils.isNotEmpty(deviceInfos)) {
|
|
|
+ try {
|
|
|
+ String urlStr = JMXUtil.buildServiceURL(mqttInfo);
|
|
|
+ JMXServiceURL url = new JMXServiceURL(urlStr);
|
|
|
+ @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null);
|
|
|
+ connector.connect();
|
|
|
+ log.info("syncMqttSecuritySettings JMX {}:{} 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
|
|
|
+ MBeanServerConnection connection = connector.getMBeanServerConnection();
|
|
|
+ ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName();
|
|
|
+ ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
|
|
|
+
|
|
|
+ final String _roleName = MqttConfig.adminRoleName;
|
|
|
+ for (DeviceInfo deviceInfo : deviceInfos) {
|
|
|
+ GateWayUserInfo gateWayUserInfo = gateWayUserInfoDao.findTopByUserName(deviceInfo.getMqttUserName());
|
|
|
+ if (ObjectUtils.isEmpty(gateWayUserInfo)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+// // 以前的权限
|
|
|
+// List<GateWayMqttSecurity> list = gateWayMqttSecurityDao.findByMqttInfoAndGateWayInfo(mqttInfo, gateWayInfo);
|
|
|
+// gateWayMqttSecurityDao.deleteByMqttInfoAndGateWayInfo(mqttInfo, gateWayInfo);
|
|
|
+//
|
|
|
+// // 查询网关关联的用户
|
|
|
+// GateWay2User gateWay2User = gateWay2UserDao.findTopByGateWayInfo(gateWayInfo);
|
|
|
+// if (ObjectUtils.isEmpty(gateWay2User)) {
|
|
|
+// log.error("网关对应用户信息为不存在");
|
|
|
+// continue;
|
|
|
+// }
|
|
|
+// GateWayUserInfo gateWayUserInfo = gateWay2User.getGateWayUserInfo();
|
|
|
+// if (ObjectUtils.isEmpty(gateWayUserInfo)) {
|
|
|
+// log.error("网关对应用户信息为空");
|
|
|
+// continue;
|
|
|
+// }
|
|
|
+// // 查询网关对应的设备
|
|
|
+// List<GateWay2Device> gateWay2Devices = gateWay2DeviceDao.findByGateWayInfo(gateWayInfo);
|
|
|
+// List<DeviceInfo> deviceInfos = new ArrayList<>();
|
|
|
+// if (ObjectUtils.isNotEmpty(gateWay2Devices)) {
|
|
|
+// deviceInfos = gateWay2Devices.stream().map(it -> it.getDeviceInfo()).collect(Collectors.toList());
|
|
|
+// }
|
|
|
+//
|
|
|
+// List<String> addressMatchs = JMXUtil.buildAddressMatch(gateWayInfo, deviceInfos);
|
|
|
+// if (ObjectUtils.isNotEmpty(addressMatchs)) {
|
|
|
+// String userName = gateWayUserInfo.getUserName();
|
|
|
+// String roleName = gateWayUserInfo.getRoleName();
|
|
|
+//
|
|
|
+// List<String> roleNames = new ArrayList<>();
|
|
|
+// roleNames.add(roleName);
|
|
|
+// roleNames.add(_roleName);
|
|
|
+//
|
|
|
+// roleName = String.join(",", roleNames);
|
|
|
+// List<GateWayMqttSecurity> securities = new ArrayList<>();
|
|
|
+// for (String addressMatch : addressMatchs) {
|
|
|
+// GateWayMqttSecurity gateWayMqttSecurity = new GateWayMqttSecurity();
|
|
|
+// gateWayMqttSecurity.setUserName(userName);
|
|
|
+// gateWayMqttSecurity.setRoleName(roleName);
|
|
|
+// gateWayMqttSecurity.setGateWayInfo(gateWayInfo);
|
|
|
+// gateWayMqttSecurity.setGateWayId(gateWayInfo.getGateWayId());
|
|
|
+// gateWayMqttSecurity.setMqttInfo(mqttInfo);
|
|
|
+// gateWayMqttSecurity.setMqttName(mqttInfo.getName());
|
|
|
+// gateWayMqttSecurity.setAddressMatch(addressMatch);
|
|
|
+// try {
|
|
|
+// addressControl.addSecuritySettings(addressMatch, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName);
|
|
|
+//
|
|
|
+// gateWayMqttSecurity.setIsSync(Boolean.TRUE);
|
|
|
+// gateWayMqttSecurity.setMsg("同步成功");
|
|
|
+// } catch (Exception e) {
|
|
|
+// e.printStackTrace();
|
|
|
+// gateWayMqttSecurity.setIsSync(Boolean.FALSE);
|
|
|
+// gateWayMqttSecurity.setMsg(String.format("同步失败:%s", e.getMessage()));
|
|
|
+// }
|
|
|
+// gateWayMqttSecurity.setTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
|
|
|
+// securities.add(gateWayMqttSecurity);
|
|
|
+// }
|
|
|
+// gateWayMqttSecurityDao.saveAll(securities);
|
|
|
+// }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 同步所有的用户
|
|
|
*
|