|
|
@@ -13,6 +13,7 @@ import com.zhongshu.iot.server.core.dao.mqtt.MqttInfoDao;
|
|
|
import com.zhongshu.iot.server.core.domain.iot.mqtt.*;
|
|
|
import com.zhongshu.iot.server.core.util.CommonUtil;
|
|
|
import com.zhongshu.iot.server.core.util.DateUtils;
|
|
|
+import com.zhongshu.iot.server.core.util.JMXUtil;
|
|
|
import com.zhongshu.iot.server.core.util.bean.BeanUtils;
|
|
|
import com.zhongshu.iot.server.core.util.page.PageEntityUtil;
|
|
|
import lombok.Cleanup;
|
|
|
@@ -109,7 +110,7 @@ public class GateWayUserInfoService {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 添加用户
|
|
|
+ * 添加用户 (从网关注册开始)
|
|
|
*
|
|
|
* @param param
|
|
|
* @return
|
|
|
@@ -209,6 +210,8 @@ public class GateWayUserInfoService {
|
|
|
return ResultContent.buildSuccess(model);
|
|
|
}
|
|
|
|
|
|
+ //-----------------------------同步权限数据 start--------------------
|
|
|
+
|
|
|
/**
|
|
|
* 把用户同步到MQTT服务中
|
|
|
*
|
|
|
@@ -221,41 +224,106 @@ public class GateWayUserInfoService {
|
|
|
log.info("syncUserToMQTTService {}", list.size());
|
|
|
if (ObjectUtils.isNotEmpty(list)) {
|
|
|
for (MqttInfo mqttInfo : list) {
|
|
|
- try {
|
|
|
- String urlStr = String.format("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
|
|
|
- JMXServiceURL url = new JMXServiceURL(urlStr);
|
|
|
- @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null);
|
|
|
- connector.connect();
|
|
|
- log.info("JMX %s:%s 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
|
|
|
- MBeanServerConnection connection = connector.getMBeanServerConnection();
|
|
|
- ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName();
|
|
|
- ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
|
|
|
+ syncMqttUsers(mqttInfo, List.of(gateWayUserInfo));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ResultContent.buildSuccess();
|
|
|
+ }
|
|
|
|
|
|
- // 添加账号
|
|
|
- addressControl.addUser(gateWayUserInfo.getUserName(), gateWayUserInfo.getPassWord(), gateWayUserInfo.getRoleName(), false);
|
|
|
+ /**
|
|
|
+ * 同步多个用户
|
|
|
+ *
|
|
|
+ * @param mqttInfo
|
|
|
+ * @param gateWayUserInfoList
|
|
|
+ */
|
|
|
+ private void syncMqttUsers(MqttInfo mqttInfo, List<GateWayUserInfo> gateWayUserInfoList) {
|
|
|
+ if (ObjectUtils.isNotEmpty(mqttInfo) && ObjectUtils.isNotEmpty(gateWayUserInfoList)) {
|
|
|
+ try {
|
|
|
+ String urlStr = JMXUtil.buildServiceURL(mqttInfo);
|
|
|
+ JMXServiceURL url = new JMXServiceURL(urlStr);
|
|
|
+ @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null);
|
|
|
+ connector.connect();
|
|
|
+ log.info("JMX {}:{} 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
|
|
|
+ MBeanServerConnection connection = connector.getMBeanServerConnection();
|
|
|
+ ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName();
|
|
|
+ ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
|
|
|
|
|
|
+ for (GateWayUserInfo gateWayUserInfo : gateWayUserInfoList) {
|
|
|
Mqtt2User mqtt2User = mqtt2UserDao.findTopByMqttInfoAndGateWayUserInfo(mqttInfo, gateWayUserInfo);
|
|
|
if (ObjectUtils.isEmpty(mqtt2User)) {
|
|
|
mqtt2User = new Mqtt2User();
|
|
|
}
|
|
|
- mqtt2User.setGateWayUserInfo(gateWayUserInfo);
|
|
|
- mqtt2User.setUserName(gateWayUserInfo.getUserName());
|
|
|
+ try {
|
|
|
+ String oldUserStr = addressControl.listUser(gateWayUserInfo.getUserName());
|
|
|
+ if (JMXUtil.mqttUserIsExit(oldUserStr)) {
|
|
|
+ addressControl.removeUser(gateWayUserInfo.getUserName());
|
|
|
+ }
|
|
|
+ addressControl.addUser(gateWayUserInfo.getUserName(), gateWayUserInfo.getPassWord(), gateWayUserInfo.getRoleName(), false);
|
|
|
+ mqtt2User.setGateWayUserInfo(gateWayUserInfo);
|
|
|
+ mqtt2User.setUserName(gateWayUserInfo.getUserName());
|
|
|
|
|
|
- mqtt2User.setMqttInfo(mqttInfo);
|
|
|
- mqtt2User.setMqttInfoName(mqttInfo.getName());
|
|
|
+ mqtt2User.setMqttInfo(mqttInfo);
|
|
|
+ mqtt2User.setMqttInfoName(mqttInfo.getName());
|
|
|
|
|
|
- mqtt2User.setIsSync(Boolean.TRUE);
|
|
|
- mqtt2User.setSyncTime(System.currentTimeMillis());
|
|
|
- mqtt2User.setSyncTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
|
|
|
+ mqtt2User.setIsSync(Boolean.TRUE);
|
|
|
+ mqtt2User.setSyncTime(System.currentTimeMillis());
|
|
|
+ mqtt2User.setSyncTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
|
|
|
+ mqtt2User.setMsg("同步成功");
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("syncMqttUsers: {}", e.getMessage());
|
|
|
+ mqtt2User.setIsSync(Boolean.FALSE);
|
|
|
+ mqtt2User.setMsg(String.format("同步失败:%s", e.getMessage()));
|
|
|
+ }
|
|
|
mqtt2UserDao.save(mqtt2User);
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
}
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ return;
|
|
|
}
|
|
|
}
|
|
|
- return ResultContent.buildSuccess();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 删除mqtt用户
|
|
|
+ *
|
|
|
+ * @param mqttInfo
|
|
|
+ * @param gateWayUserInfoList
|
|
|
+ */
|
|
|
+ private void deleteMqttUsers(MqttInfo mqttInfo, List<GateWayUserInfo> gateWayUserInfoList) {
|
|
|
+ if (ObjectUtils.isNotEmpty(mqttInfo) && ObjectUtils.isNotEmpty(gateWayUserInfoList)) {
|
|
|
+ try {
|
|
|
+ String urlStr = JMXUtil.buildServiceURL(mqttInfo);
|
|
|
+ JMXServiceURL url = new JMXServiceURL(urlStr);
|
|
|
+ @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null);
|
|
|
+ connector.connect();
|
|
|
+ log.info("JMX {}:{} 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
|
|
|
+ MBeanServerConnection connection = connector.getMBeanServerConnection();
|
|
|
+ ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName();
|
|
|
+ ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
|
|
|
+
|
|
|
+ for (GateWayUserInfo gateWayUserInfo : gateWayUserInfoList) {
|
|
|
+ Mqtt2User mqtt2User = mqtt2UserDao.findTopByMqttInfoAndGateWayUserInfo(mqttInfo, gateWayUserInfo);
|
|
|
+ try {
|
|
|
+ String oldUserStr = addressControl.listUser(gateWayUserInfo.getUserName());
|
|
|
+ if (JMXUtil.mqttUserIsExit(oldUserStr)) {
|
|
|
+ addressControl.removeUser(gateWayUserInfo.getUserName());
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("deleteMqttUsers: {}", e.getMessage());
|
|
|
+ }
|
|
|
+ if (ObjectUtils.isNotEmpty(mqtt2User)) {
|
|
|
+ mqtt2UserDao.delete(mqtt2User);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //-----------------------------同步权限数据 end----------------------
|
|
|
+
|
|
|
/**
|
|
|
* 删除用户 从MQTT服务
|
|
|
*
|