|
|
@@ -5,16 +5,14 @@ import com.zhongshu.iot.client.model.mqtt.*;
|
|
|
import com.zhongshu.iot.client.type.type.MqttUserState;
|
|
|
import com.zhongshu.iot.server.core.dao.mqtt.*;
|
|
|
import com.zhongshu.iot.server.core.dataConfig.MqttConfig;
|
|
|
-import com.zhongshu.iot.server.core.domain.iot.mqtt.*;
|
|
|
+import com.zhongshu.iot.server.core.domain.iot.mqtt.GateWay2User;
|
|
|
+import com.zhongshu.iot.server.core.domain.iot.mqtt.GateWayInfo;
|
|
|
+import com.zhongshu.iot.server.core.domain.iot.mqtt.GateWayUserInfo;
|
|
|
import com.zhongshu.iot.server.core.util.CommonUtil;
|
|
|
import com.zhongshu.iot.server.core.util.DateUtils;
|
|
|
-import com.zhongshu.iot.server.core.util.JMXUtil;
|
|
|
import com.zhongshu.iot.server.core.util.bean.BeanUtils;
|
|
|
import com.zhongshu.iot.server.core.util.page.PageEntityUtil;
|
|
|
-import lombok.Cleanup;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
|
|
|
-import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
|
|
|
import org.apache.commons.lang3.ObjectUtils;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
@@ -22,16 +20,6 @@ import org.springframework.data.domain.Page;
|
|
|
import org.springframework.data.domain.Pageable;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
-import javax.management.MBeanServerConnection;
|
|
|
-import javax.management.MBeanServerInvocationHandler;
|
|
|
-import javax.management.ObjectName;
|
|
|
-import javax.management.remote.JMXConnector;
|
|
|
-import javax.management.remote.JMXConnectorFactory;
|
|
|
-import javax.management.remote.JMXServiceURL;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.List;
|
|
|
-import java.util.stream.Collectors;
|
|
|
-
|
|
|
/**
|
|
|
* 账号管理
|
|
|
*
|
|
|
@@ -63,15 +51,19 @@ public class GateWayUserInfoService {
|
|
|
@Autowired
|
|
|
private GateWayInfoDao gateWayInfoDao;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ private JMXSyncService jmxSyncService;
|
|
|
+
|
|
|
/**
|
|
|
* 刷新系统所有的用户 和 用户权限数据
|
|
|
*/
|
|
|
public void initData() {
|
|
|
log.info("GateWayUserInfoService initData");
|
|
|
+ // 初始默认用户密码
|
|
|
initDefaultUser();
|
|
|
|
|
|
- refreshAllGateWayUser();
|
|
|
- refreshAllGateWaySecurity();
|
|
|
+ jmxSyncService.refreshAllGateWayUser();
|
|
|
+ jmxSyncService.refreshAllGateWaySecurity();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -128,7 +120,7 @@ public class GateWayUserInfoService {
|
|
|
gateWayUserInfo = gateWay2User.getGateWayUserInfo();
|
|
|
}
|
|
|
}
|
|
|
- syncUserToMQTTService(gateWayUserInfo);
|
|
|
+ jmxSyncService.syncUserToMQTTService(gateWayUserInfo);
|
|
|
return gateWayUserInfo;
|
|
|
}
|
|
|
|
|
|
@@ -195,7 +187,7 @@ public class GateWayUserInfoService {
|
|
|
gateWayUserInfoDao.save(entity);
|
|
|
|
|
|
// 同步用户
|
|
|
- syncUserToMQTTService(entity);
|
|
|
+ jmxSyncService.syncUserToMQTTService(entity);
|
|
|
return ResultContent.buildSuccess(entity);
|
|
|
}
|
|
|
|
|
|
@@ -225,10 +217,10 @@ public class GateWayUserInfoService {
|
|
|
// 同步
|
|
|
if (entity.getState() == MqttUserState.Disable) {
|
|
|
// 不过不可用,则删除QMTT服务上的用户信息
|
|
|
- removeUserToMQTTService(entity);
|
|
|
+ jmxSyncService.removeUserToMQTTService(entity);
|
|
|
} else {
|
|
|
// 添加用户
|
|
|
- syncUserToMQTTService(entity);
|
|
|
+ jmxSyncService.syncUserToMQTTService(entity);
|
|
|
}
|
|
|
return ResultContent.buildSuccess();
|
|
|
}
|
|
|
@@ -260,7 +252,7 @@ public class GateWayUserInfoService {
|
|
|
gateWayUserInfoDao.delete(entity);
|
|
|
|
|
|
// 删除服务文件上的用户
|
|
|
- removeUserToMQTTService(entity);
|
|
|
+ jmxSyncService.removeUserToMQTTService(entity);
|
|
|
return ResultContent.buildSuccess();
|
|
|
}
|
|
|
|
|
|
@@ -291,441 +283,6 @@ public class GateWayUserInfoService {
|
|
|
return ResultContent.buildSuccess(model);
|
|
|
}
|
|
|
|
|
|
- //-----------------------------同步权限数据 start--------------------
|
|
|
-
|
|
|
- /**
|
|
|
- * 把用户同步到MQTT服务中
|
|
|
- *
|
|
|
- * @param gateWayUserInfo
|
|
|
- * @return
|
|
|
- */
|
|
|
- public ResultContent syncUserToMQTTService(GateWayUserInfo gateWayUserInfo) {
|
|
|
- //todo同步用户
|
|
|
- List<MqttInfo> list = mqttInfoDao.findAll();
|
|
|
- log.info("syncUserToMQTTService {}", list.size());
|
|
|
- if (ObjectUtils.isNotEmpty(list)) {
|
|
|
- for (MqttInfo mqttInfo : list) {
|
|
|
- syncMqttUsers(mqttInfo, List.of(gateWayUserInfo));
|
|
|
- }
|
|
|
- }
|
|
|
- return ResultContent.buildSuccess();
|
|
|
- }
|
|
|
-
|
|
|
- public ResultContent syncUserToMQTTService(List<GateWayUserInfo> gateWayUserInfoList) {
|
|
|
- //todo同步用户
|
|
|
- List<MqttInfo> list = mqttInfoDao.findAll();
|
|
|
- log.info("syncUserToMQTTService {}", list.size());
|
|
|
- if (ObjectUtils.isNotEmpty(list)) {
|
|
|
- list.parallelStream().forEach(mqttInfo -> {
|
|
|
- syncMqttUsers(mqttInfo, gateWayUserInfoList);
|
|
|
- });
|
|
|
- }
|
|
|
- return ResultContent.buildSuccess();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 同步多个用户
|
|
|
- *
|
|
|
- * @param mqttInfo
|
|
|
- * @param gateWayUserInfoList
|
|
|
- */
|
|
|
- private void syncMqttUsers(MqttInfo mqttInfo, List<GateWayUserInfo> gateWayUserInfoList) {
|
|
|
- if (ObjectUtils.isNotEmpty(mqttInfo) && ObjectUtils.isNotEmpty(gateWayUserInfoList)) {
|
|
|
- try {
|
|
|
- String urlStr = JMXUtil.buildServiceURL(mqttInfo);
|
|
|
- JMXServiceURL url = new JMXServiceURL(urlStr);
|
|
|
- @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null);
|
|
|
- connector.connect();
|
|
|
- log.info("JMX syncMqttUsers {}:{} 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
|
|
|
- MBeanServerConnection connection = connector.getMBeanServerConnection();
|
|
|
- ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName();
|
|
|
- ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
|
|
|
-
|
|
|
- for (GateWayUserInfo gateWayUserInfo : gateWayUserInfoList) {
|
|
|
- Mqtt2User mqtt2User = mqtt2UserDao.findTopByMqttInfoAndGateWayUserInfo(mqttInfo, gateWayUserInfo);
|
|
|
- if (ObjectUtils.isEmpty(mqtt2User)) {
|
|
|
- mqtt2User = new Mqtt2User();
|
|
|
- }
|
|
|
- try {
|
|
|
- String oldUserStr = addressControl.listUser(gateWayUserInfo.getUserName());
|
|
|
- if (JMXUtil.mqttUserIsExit(oldUserStr)) {
|
|
|
- addressControl.removeUser(gateWayUserInfo.getUserName());
|
|
|
- }
|
|
|
- // 添加用户
|
|
|
- addressControl.addUser(gateWayUserInfo.getUserName(), gateWayUserInfo.getPassWord(), gateWayUserInfo.getRoleName(), false);
|
|
|
-
|
|
|
- // 添加用户的默认权限
|
|
|
- List<String> security = JMXUtil.buildGateWayUserDefault(gateWayUserInfo);
|
|
|
- if (ObjectUtils.isNotEmpty(security)) {
|
|
|
- String roleName = gateWayUserInfo.getRoleName();
|
|
|
- for (String addressMatch : security) {
|
|
|
- addressControl.addSecuritySettings(addressMatch, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- mqtt2User.setGateWayUserInfo(gateWayUserInfo);
|
|
|
- mqtt2User.setUserName(gateWayUserInfo.getUserName());
|
|
|
-
|
|
|
- mqtt2User.setMqttInfo(mqttInfo);
|
|
|
- mqtt2User.setMqttInfoName(mqttInfo.getName());
|
|
|
-
|
|
|
- mqtt2User.setIsSync(Boolean.TRUE);
|
|
|
- mqtt2User.setSyncTime(System.currentTimeMillis());
|
|
|
- mqtt2User.setSyncTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
|
|
|
- mqtt2User.setMsg("同步成功");
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("syncMqttUsers: {}", e.getMessage());
|
|
|
- mqtt2User.setIsSync(Boolean.FALSE);
|
|
|
- mqtt2User.setMsg(String.format("同步失败:%s", e.getMessage()));
|
|
|
- }
|
|
|
- mqtt2UserDao.save(mqtt2User);
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- return;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public ResultContent removeUserToMQTTService(GateWayUserInfo gateWayUserInfo) {
|
|
|
- //todo删除用户
|
|
|
- List<MqttInfo> list = mqttInfoDao.findAll();
|
|
|
- log.info("removeUserToMQTTService {}", list.size());
|
|
|
- if (ObjectUtils.isNotEmpty(list)) {
|
|
|
- for (MqttInfo mqttInfo : list) {
|
|
|
- removeMqttUsers(mqttInfo, List.of(gateWayUserInfo));
|
|
|
- }
|
|
|
- }
|
|
|
- return ResultContent.buildSuccess();
|
|
|
- }
|
|
|
-
|
|
|
- private void removeMqttUsers(MqttInfo mqttInfo, List<GateWayUserInfo> gateWayUserInfoList) {
|
|
|
- if (ObjectUtils.isNotEmpty(mqttInfo) && ObjectUtils.isNotEmpty(gateWayUserInfoList)) {
|
|
|
- try {
|
|
|
- String urlStr = JMXUtil.buildServiceURL(mqttInfo);
|
|
|
- JMXServiceURL url = new JMXServiceURL(urlStr);
|
|
|
- @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null);
|
|
|
- connector.connect();
|
|
|
- log.info("JMX removeMqttUsers {}:{} 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
|
|
|
- MBeanServerConnection connection = connector.getMBeanServerConnection();
|
|
|
- ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName();
|
|
|
- ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
|
|
|
-
|
|
|
- for (GateWayUserInfo gateWayUserInfo : gateWayUserInfoList) {
|
|
|
- try {
|
|
|
- String oldUserStr = addressControl.listUser(gateWayUserInfo.getUserName());
|
|
|
- if (JMXUtil.mqttUserIsExit(oldUserStr)) {
|
|
|
- addressControl.removeUser(gateWayUserInfo.getUserName());
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("removeMqttUsers: {}", e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- return;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 删除mqtt用户
|
|
|
- *
|
|
|
- * @param mqttInfo
|
|
|
- * @param gateWayUserInfoList
|
|
|
- */
|
|
|
- private void deleteMqttUsers(MqttInfo mqttInfo, List<GateWayUserInfo> gateWayUserInfoList) {
|
|
|
- if (ObjectUtils.isNotEmpty(mqttInfo) && ObjectUtils.isNotEmpty(gateWayUserInfoList)) {
|
|
|
- try {
|
|
|
- String urlStr = JMXUtil.buildServiceURL(mqttInfo);
|
|
|
- JMXServiceURL url = new JMXServiceURL(urlStr);
|
|
|
- @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null);
|
|
|
- connector.connect();
|
|
|
- log.info("JMX deleteMqttUsers {}:{} 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
|
|
|
- MBeanServerConnection connection = connector.getMBeanServerConnection();
|
|
|
- ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName();
|
|
|
- ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
|
|
|
-
|
|
|
- for (GateWayUserInfo gateWayUserInfo : gateWayUserInfoList) {
|
|
|
- Mqtt2User mqtt2User = mqtt2UserDao.findTopByMqttInfoAndGateWayUserInfo(mqttInfo, gateWayUserInfo);
|
|
|
- try {
|
|
|
- String oldUserStr = addressControl.listUser(gateWayUserInfo.getUserName());
|
|
|
- if (JMXUtil.mqttUserIsExit(oldUserStr)) {
|
|
|
- addressControl.removeUser(gateWayUserInfo.getUserName());
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("deleteMqttUsers: {}", e.getMessage());
|
|
|
- }
|
|
|
- if (ObjectUtils.isNotEmpty(mqtt2User)) {
|
|
|
- mqtt2UserDao.delete(mqtt2User);
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- return;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public ResultContent syncSecurityToMQTTService(GateWayInfo gateWayInfo) {
|
|
|
- //todo同步权限
|
|
|
- List<MqttInfo> list = mqttInfoDao.findAll();
|
|
|
- log.info("syncSecurityToMQTTService {}", list.size());
|
|
|
- if (ObjectUtils.isNotEmpty(list)) {
|
|
|
- for (MqttInfo mqttInfo : list) {
|
|
|
- syncMqttSecuritySettings(mqttInfo, List.of(gateWayInfo));
|
|
|
- }
|
|
|
- }
|
|
|
- return ResultContent.buildSuccess();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 同步网关的权限
|
|
|
- *
|
|
|
- * @param gateWayInfo
|
|
|
- * @return
|
|
|
- */
|
|
|
- public ResultContent syncSecurityGateToMQTTService(GateWayInfo gateWayInfo) {
|
|
|
- //todo同步权限
|
|
|
- List<MqttInfo> list = mqttInfoDao.findAll();
|
|
|
- log.info("syncSecurityGateToMQTTService {}", list.size());
|
|
|
- if (ObjectUtils.isNotEmpty(list)) {
|
|
|
- for (MqttInfo mqttInfo : list) {
|
|
|
- syncMqttSecurityGateSettings(mqttInfo, List.of(gateWayInfo));
|
|
|
- }
|
|
|
- }
|
|
|
- return ResultContent.buildSuccess();
|
|
|
- }
|
|
|
-
|
|
|
- public ResultContent syncSecurityToMQTTService(List<GateWayInfo> gateWayInfos) {
|
|
|
- //todo同步权限
|
|
|
- List<MqttInfo> list = mqttInfoDao.findAll();
|
|
|
- log.info("syncSecurityToMQTTService {}", list.size());
|
|
|
- if (ObjectUtils.isNotEmpty(list)) {
|
|
|
- list.parallelStream().forEach(mqttInfo -> {
|
|
|
- syncMqttSecuritySettings(mqttInfo, gateWayInfos);
|
|
|
- });
|
|
|
- }
|
|
|
- return ResultContent.buildSuccess();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 同步网关可监听的权限
|
|
|
- *
|
|
|
- * @param mqttInfo
|
|
|
- * @param gateWayInfos
|
|
|
- * @return
|
|
|
- */
|
|
|
- public void syncMqttSecuritySettings(MqttInfo mqttInfo, List<GateWayInfo> gateWayInfos) {
|
|
|
- if (ObjectUtils.isNotEmpty(mqttInfo) && ObjectUtils.isNotEmpty(gateWayInfos)) {
|
|
|
- try {
|
|
|
- String urlStr = JMXUtil.buildServiceURL(mqttInfo);
|
|
|
- JMXServiceURL url = new JMXServiceURL(urlStr);
|
|
|
- @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null);
|
|
|
- connector.connect();
|
|
|
- log.info("syncMqttSecuritySettings JMX {}:{} 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
|
|
|
- MBeanServerConnection connection = connector.getMBeanServerConnection();
|
|
|
- ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName();
|
|
|
- ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
|
|
|
-
|
|
|
- final String _roleName = MqttConfig.adminRoleName;
|
|
|
- for (GateWayInfo gateWayInfo : gateWayInfos) {
|
|
|
- // 以前的权限
|
|
|
- List<GateWayMqttSecurity> list = gateWayMqttSecurityDao.findByMqttInfoAndGateWayInfo(mqttInfo, gateWayInfo);
|
|
|
- gateWayMqttSecurityDao.deleteByMqttInfoAndGateWayInfo(mqttInfo, gateWayInfo);
|
|
|
-
|
|
|
- // 查询网关关联的用户
|
|
|
- GateWay2User gateWay2User = gateWay2UserDao.findTopByGateWayInfo(gateWayInfo);
|
|
|
- if (ObjectUtils.isEmpty(gateWay2User)) {
|
|
|
- log.error("网关对应用户信息为不存在");
|
|
|
- continue;
|
|
|
- }
|
|
|
- GateWayUserInfo gateWayUserInfo = gateWay2User.getGateWayUserInfo();
|
|
|
- if (ObjectUtils.isEmpty(gateWayUserInfo)) {
|
|
|
- log.error("网关对应用户信息为空");
|
|
|
- continue;
|
|
|
- }
|
|
|
- // 查询网关对应的设备
|
|
|
- List<GateWay2Device> gateWay2Devices = gateWay2DeviceDao.findByGateWayInfo(gateWayInfo);
|
|
|
- List<DeviceInfo> deviceInfos = new ArrayList<>();
|
|
|
- if (ObjectUtils.isNotEmpty(gateWay2Devices)) {
|
|
|
- deviceInfos = gateWay2Devices.stream().map(it -> it.getDeviceInfo()).collect(Collectors.toList());
|
|
|
- }
|
|
|
- List<String> addressMatchs = JMXUtil.buildAddressMatch(gateWayInfo, deviceInfos);
|
|
|
- if (ObjectUtils.isNotEmpty(addressMatchs)) {
|
|
|
- String userName = gateWayUserInfo.getUserName();
|
|
|
- String roleName = gateWayUserInfo.getRoleName();
|
|
|
-
|
|
|
- List<String> roleNames = new ArrayList<>();
|
|
|
- roleNames.add(roleName);
|
|
|
- roleNames.add(_roleName);
|
|
|
-
|
|
|
- roleName = String.join(",", roleNames);
|
|
|
- List<GateWayMqttSecurity> securities = new ArrayList<>();
|
|
|
- for (String addressMatch : addressMatchs) {
|
|
|
- GateWayMqttSecurity gateWayMqttSecurity = new GateWayMqttSecurity();
|
|
|
- gateWayMqttSecurity.setUserName(userName);
|
|
|
- gateWayMqttSecurity.setRoleName(roleName);
|
|
|
- gateWayMqttSecurity.setGateWayInfo(gateWayInfo);
|
|
|
- gateWayMqttSecurity.setGateWayId(gateWayInfo.getGateWayId());
|
|
|
- gateWayMqttSecurity.setMqttInfo(mqttInfo);
|
|
|
- gateWayMqttSecurity.setMqttName(mqttInfo.getName());
|
|
|
- gateWayMqttSecurity.setAddressMatch(addressMatch);
|
|
|
- try {
|
|
|
- addressControl.addSecuritySettings(addressMatch, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName);
|
|
|
-
|
|
|
- gateWayMqttSecurity.setIsSync(Boolean.TRUE);
|
|
|
- gateWayMqttSecurity.setMsg("同步成功");
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- gateWayMqttSecurity.setIsSync(Boolean.FALSE);
|
|
|
- gateWayMqttSecurity.setMsg(String.format("同步失败:%s", e.getMessage()));
|
|
|
- }
|
|
|
- gateWayMqttSecurity.setTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
|
|
|
- securities.add(gateWayMqttSecurity);
|
|
|
- }
|
|
|
- gateWayMqttSecurityDao.saveAll(securities);
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 只同步网关的权限
|
|
|
- *
|
|
|
- * @param mqttInfo
|
|
|
- * @param gateWayInfos
|
|
|
- */
|
|
|
- public void syncMqttSecurityGateSettings(MqttInfo mqttInfo, List<GateWayInfo> gateWayInfos) {
|
|
|
- if (ObjectUtils.isNotEmpty(mqttInfo) && ObjectUtils.isNotEmpty(gateWayInfos)) {
|
|
|
- try {
|
|
|
- String urlStr = JMXUtil.buildServiceURL(mqttInfo);
|
|
|
- JMXServiceURL url = new JMXServiceURL(urlStr);
|
|
|
- @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null);
|
|
|
- connector.connect();
|
|
|
- log.info("syncMqttSecurityGateSettings JMX {}:{} 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
|
|
|
- MBeanServerConnection connection = connector.getMBeanServerConnection();
|
|
|
- ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName();
|
|
|
- ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
|
|
|
-
|
|
|
- final String _roleName = MqttConfig.adminRoleName;
|
|
|
- for (GateWayInfo gateWayInfo : gateWayInfos) {
|
|
|
- // 查询网关关联的用户
|
|
|
- GateWay2User gateWay2User = gateWay2UserDao.findTopByGateWayInfo(gateWayInfo);
|
|
|
- if (ObjectUtils.isEmpty(gateWay2User)) {
|
|
|
- log.error("网关对应用户信息为不存在");
|
|
|
- continue;
|
|
|
- }
|
|
|
- GateWayUserInfo gateWayUserInfo = gateWay2User.getGateWayUserInfo();
|
|
|
- if (ObjectUtils.isEmpty(gateWayUserInfo)) {
|
|
|
- log.error("网关对应用户信息为空");
|
|
|
- continue;
|
|
|
- }
|
|
|
- List<String> addressMatchs = JMXUtil.buildGateWayAddressMatch(gateWayInfo);
|
|
|
- if (ObjectUtils.isNotEmpty(addressMatchs)) {
|
|
|
- String userName = gateWayUserInfo.getUserName();
|
|
|
- String tempRoleName = gateWayUserInfo.getRoleName();
|
|
|
-
|
|
|
- List<String> roleNames = new ArrayList<>();
|
|
|
- roleNames.add(tempRoleName);
|
|
|
- roleNames.add(_roleName);
|
|
|
-
|
|
|
- String roleName = String.join(",", roleNames);
|
|
|
- List<GateWayMqttSecurity> securities = new ArrayList<>();
|
|
|
- addressMatchs.stream().forEach(addressMatch -> {
|
|
|
- gateWayMqttSecurityDao.deleteByMqttInfoAndAddressMatch(mqttInfo, addressMatch);
|
|
|
-
|
|
|
- GateWayMqttSecurity gateWayMqttSecurity = new GateWayMqttSecurity();
|
|
|
- gateWayMqttSecurity.setUserName(userName);
|
|
|
- gateWayMqttSecurity.setRoleName(roleName);
|
|
|
- gateWayMqttSecurity.setGateWayInfo(gateWayInfo);
|
|
|
- gateWayMqttSecurity.setGateWayId(gateWayInfo.getGateWayId());
|
|
|
- gateWayMqttSecurity.setMqttInfo(mqttInfo);
|
|
|
- gateWayMqttSecurity.setMqttName(mqttInfo.getName());
|
|
|
- gateWayMqttSecurity.setAddressMatch(addressMatch);
|
|
|
- try {
|
|
|
- addressControl.addSecuritySettings(addressMatch, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName);
|
|
|
- gateWayMqttSecurity.setIsSync(Boolean.TRUE);
|
|
|
- gateWayMqttSecurity.setMsg("同步成功");
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- gateWayMqttSecurity.setIsSync(Boolean.FALSE);
|
|
|
- gateWayMqttSecurity.setMsg(String.format("同步失败:%s", e.getMessage()));
|
|
|
- }
|
|
|
- gateWayMqttSecurity.setTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
|
|
|
- securities.add(gateWayMqttSecurity);
|
|
|
- });
|
|
|
- gateWayMqttSecurityDao.saveAll(securities);
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 同步所有的用户
|
|
|
- *
|
|
|
- * @return
|
|
|
- */
|
|
|
- public ResultContent refreshAllGateWayUser() {
|
|
|
- List<GateWayUserInfo> gateWayUserInfoList = gateWayUserInfoDao.findAll();
|
|
|
- if (ObjectUtils.isNotEmpty(gateWayUserInfoList)) {
|
|
|
- syncUserToMQTTService(gateWayUserInfoList);
|
|
|
- }
|
|
|
- return ResultContent.buildSuccess();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 同步所有的数据
|
|
|
- *
|
|
|
- * @return
|
|
|
- */
|
|
|
- public ResultContent refreshAllGateWaySecurity() {
|
|
|
- List<GateWayInfo> gateWayInfos = gateWayInfoDao.findAll();
|
|
|
- if (ObjectUtils.isNotEmpty(gateWayInfos)) {
|
|
|
- syncSecurityToMQTTService(gateWayInfos);
|
|
|
- }
|
|
|
- return ResultContent.buildSuccess();
|
|
|
- }
|
|
|
-
|
|
|
- //-----------------------------同步权限数据 end----------------------
|
|
|
-
|
|
|
- /**
|
|
|
- * 删除用户 从MQTT服务
|
|
|
- *
|
|
|
- * @param gateWayUserInfo
|
|
|
- * @return
|
|
|
- */
|
|
|
- public ResultContent deleteGateWayUser(GateWayUserInfo gateWayUserInfo) {
|
|
|
- List<MqttInfo> list = mqttInfoDao.findAll();
|
|
|
- if (ObjectUtils.isNotEmpty(list)) {
|
|
|
- for (MqttInfo mqttInfo : list) {
|
|
|
- try {
|
|
|
- String urlStr = String.format("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
|
|
|
- JMXServiceURL url = new JMXServiceURL(urlStr);
|
|
|
- @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null);
|
|
|
- connector.connect();
|
|
|
- log.info("JMX %s:%s 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
|
|
|
- MBeanServerConnection connection = connector.getMBeanServerConnection();
|
|
|
- ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName();
|
|
|
- ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
|
|
|
- addressControl.removeUser(gateWayUserInfo.getUserName());
|
|
|
- // 删除关系记录
|
|
|
- mqtt2UserDao.deleteByMqttInfoAndGateWayUserInfo(mqttInfo, gateWayUserInfo);
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return ResultContent.buildSuccess();
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
public GateWayUserInfoModel toModel(GateWayUserInfo entity) {
|
|
|
GateWayUserInfoModel model = new GateWayUserInfoModel();
|
|
|
if (ObjectUtils.isNotEmpty(entity)) {
|