package com.zswl.dataservice.service.mqtt; import com.zswl.dataservice.dao.mqtt.*; import com.zswl.dataservice.domain.mqtt.*; import com.zswl.dataservice.model.mqtt.*; import com.zswl.dataservice.utils.DateUtils; import com.zswl.dataservice.utils.bean.BeanUtils; import com.zswl.dataservice.utils.mqtt.MqttTopicUtils; import com.zswl.dataservice.utils.mqtt.type.OnLineState; import com.zswl.dataservice.utils.page.PageEntityUtil; import com.zswl.dataservice.utils.result.ResultContent; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; import org.apache.commons.lang3.ObjectUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.stereotype.Service; import javax.management.MBeanServerConnection; import javax.management.MBeanServerInvocationHandler; import javax.management.ObjectName; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; import java.util.ArrayList; import java.util.List; /** * @author TRX * @date 2024/5/17 */ @Slf4j @Service public class GateWayInfoService { @Autowired GateWayInfoDao gateWayInfoDao; @Autowired GateWayUserInfoDao gateWayUserInfoDao; @Autowired DeviceInfoDao deviceInfoDao; @Autowired GateWay2UserDao gateWay2UserDao; @Autowired DeviceInfoService deviceInfoService; @Autowired GateWay2DeviceDao gateWay2DeviceDao; @Autowired MqttInfoDao mqttInfoDao; @Autowired OperationLogsService operationLogsService; /** * 添加网关 * * @param param * @return */ public ResultContent addGateWayInfo(GateWayInfoAddParam param) { GateWayInfo gateWayInfo = gateWayInfoDao.findTopByGateWayId(param.getGateWayId()); if (ObjectUtils.isNotEmpty(gateWayInfo)) { return ResultContent.buildFail(String.format("网关ID已存在:%s", param.getGateWayId())); } gateWayInfo = new GateWayInfo(); BeanUtils.copyProperties(param, gateWayInfo); if (param.getState() == null) { gateWayInfo.setState(OnLineState.OffLine); } gateWayInfoDao.save(gateWayInfo); return ResultContent.buildSuccess(); } /** * 网关绑定设备、连接账号 * * @param param * @return */ public ResultContent gateWayBindDevice(GateWayBindDeviceParam param) { GateWayInfo gateWayInfo = gateWayInfoDao.findTopByGateWayId(param.getGateWayId()); if (ObjectUtils.isEmpty(gateWayInfo)) { // 如果网关不存在,则初始化 GateWayInfoAddParam addParam = new GateWayInfoAddParam(); addParam.setGateWayId(param.getGateWayId()); addParam.setGateWayName(param.getGateWayName()); } gateWayInfo = gateWayInfoDao.findTopByGateWayId(param.getGateWayId()); // 验证连接账号 GateWayUserInfo gateWayUserInfo = gateWayUserInfoDao.findTopByUserName(param.getUserName()); if (ObjectUtils.isEmpty(gateWayUserInfo)) { return ResultContent.buildFail(String.format("连接账号不存在:%s", param.getUserName())); } // 验证账号是否已绑定网关 GateWay2User gateWay2User = gateWay2UserDao.findTopByGateWayUserInfo(gateWayUserInfo); if (ObjectUtils.isNotEmpty(gateWay2User)) { // 如果有绑定关系 GateWayInfo oldGateWayInfo = gateWay2User.getGateWayInfo(); if (!oldGateWayInfo.getGateWayId().equals(gateWayInfo.getGateWayId())) { return ResultContent.buildFail(String.format("连接账号%s已使用", gateWayUserInfo.getUserName())); } } // 设备列表 List deviceInfos = new ArrayList<>(); List devices = new ArrayList<>(); if (ObjectUtils.isNotEmpty(devices)) { // 绑定网关和设备的关系 for (DeviceInfoAddParam device : devices) { // 如果设备不存在,,则初始化 ResultContent resultContent = deviceInfoService.initDeviceInfo(device); DeviceInfo deviceInfo = resultContent.getContent(); GateWay2Device gateWay2Device = gateWay2DeviceDao.findTopByGateWayInfoAndDeviceInfo(gateWayInfo, deviceInfo); if (ObjectUtils.isEmpty(gateWay2Device)) { gateWay2Device = new GateWay2Device(); gateWay2Device.setState(OnLineState.OffLine); } gateWay2Device.setGateWayInfo(gateWayInfo); gateWay2Device.setDeviceInfo(deviceInfo); gateWay2Device.setBindTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG)); gateWay2DeviceDao.save(gateWay2Device); deviceInfos.add(deviceInfo); } } // 绑定网关和用户的关系 if (ObjectUtils.isEmpty(gateWay2User)) { gateWay2User = new GateWay2User(); } gateWay2User.setGateWayUserInfo(gateWayUserInfo); gateWay2User.setGateWayInfo(gateWayInfo); gateWay2User.setBindTime(System.currentTimeMillis()); gateWay2User.setBindTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG)); gateWay2UserDao.save(gateWay2User); // 添加连接账号关于:网关、设备的 Topic权限 bindUserGateWayPermissions(MqttTopicUtils.getUserRoleName(gateWayUserInfo), gateWayInfo.getGateWayId()); // 绑定用户设备权限 bindUserDevicesPermissions(MqttTopicUtils.getUserRoleName(gateWayUserInfo), deviceInfos); return ResultContent.buildSuccess(); } /** * 删除网关 * * @param gateWayId * @return */ public ResultContent deleteGateWayInfo(String gateWayId) { GateWayInfo gateWayInfo = gateWayInfoDao.findTopByGateWayId(gateWayId); if (ObjectUtils.isEmpty(gateWayInfo)) { return ResultContent.buildFail(String.format("网关ID不存在:%s", gateWayId)); } gateWayInfoDao.delete(gateWayInfo); return ResultContent.buildSuccess(); } /** * 网关列表 * * @param pageable * @param param * @return */ public ResultContent> pageGateWay(Pageable pageable, GateWayInfoSearchParam param) { Page page = gateWayInfoDao.page(pageable, param); return ResultContent.buildSuccess(PageEntityUtil.toPageModel(page, this::toModel)); } /** * 查询网关详情 * * @param gateWayId * @return */ public ResultContent getById(String gateWayId) { GateWayInfoModel model = null; GateWayInfo deviceInfo = gateWayInfoDao.findTopByGateWayId(gateWayId); if (ObjectUtils.isNotEmpty(deviceInfo)) { model = toModel(deviceInfo); } return ResultContent.buildSuccess(model); } public GateWayInfoModel toModel(GateWayInfo entity) { GateWayInfoModel model = new GateWayInfoModel(); if (ObjectUtils.isNotEmpty(entity)) { BeanUtils.copyProperties(entity, model); } return model; } /** * 绑定角色网关的权限 * * @param roleName * @param gateWayId * @return */ public ResultContent bindUserGateWayPermissions(String roleName, String gateWayId) { try { List list = mqttInfoDao.findAll(); if (ObjectUtils.isNotEmpty(list)) { for (MqttInfo mqttInfo : list) { String urlStr = String.format("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi", mqttInfo.getJmxHost(), mqttInfo.getJmxPort()); JMXServiceURL url = new JMXServiceURL(urlStr); @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null); connector.connect(); log.info("JMX %s:%s 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort()); MBeanServerConnection connection = connector.getMBeanServerConnection(); ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName(); ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false); addressControl.addSecuritySettings(MqttTopicUtils.buildGateWayAllTopic(gateWayId), roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName); } } } catch (Exception e) { e.printStackTrace(); log.error("bindUserGateWayPermissions 出错:%s", e.getMessage()); } return ResultContent.buildSuccess(); } public ResultContent bindUserDevicesPermissions(String roleName, List deviceInfos) { try { List list = mqttInfoDao.findAll(); if (ObjectUtils.isNotEmpty(list)) { list.parallelStream().forEach(mqttInfo -> { try { String urlStr = String.format("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi", mqttInfo.getJmxHost(), mqttInfo.getJmxPort()); JMXServiceURL url = new JMXServiceURL(urlStr); @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null); connector.connect(); log.info("JMX %s:%s 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort()); MBeanServerConnection connection = connector.getMBeanServerConnection(); ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName(); ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false); for (DeviceInfo deviceInfo : deviceInfos) { addressControl.addSecuritySettings(MqttTopicUtils.buildDeviceAllTopic(deviceInfo.getDeviceId()), roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName); } } catch (Exception e) { e.printStackTrace(); } }); } } catch (Exception e) { e.printStackTrace(); log.error("bindUserDevicesPermissions 出错:%s", e.getMessage()); } return ResultContent.buildSuccess(); } }