package com.zswl.dataservice.service.mqtt; import com.zswl.dataservice.dao.mqtt.*; import com.zswl.dataservice.domain.mqtt.*; import com.zswl.dataservice.model.mqtt.*; import com.zswl.dataservice.service.base.SuperService; import com.zswl.dataservice.service.sync.DeviceSyncFullCardService; import com.zswl.dataservice.service.user.OperationLogsService; import com.zswl.dataservice.utils.DateUtils; import com.zswl.dataservice.utils.bean.BeanUtils; import com.zswl.dataservice.utils.mqtt.MqttTopicUtils; import com.zswl.dataservice.utils.mqtt.type.OnLineState; import com.zswl.dataservice.utils.page.PageEntityUtil; import com.zswl.dataservice.utils.result.ResultContent; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.stereotype.Service; import javax.management.MBeanServerConnection; import javax.management.MBeanServerInvocationHandler; import javax.management.ObjectName; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; import java.util.ArrayList; import java.util.List; /** * @author TRX * @date 2024/5/17 */ @Slf4j @Service public class GateWayInfoService extends SuperService { @Autowired GateWayInfoDao gateWayInfoDao; @Autowired GateWayUserInfoDao gateWayUserInfoDao; @Autowired DeviceInfoDao deviceInfoDao; @Autowired GateWay2UserDao gateWay2UserDao; @Autowired DeviceInfoService deviceInfoService; @Autowired GateWay2DeviceDao gateWay2DeviceDao; @Autowired MqttInfoDao mqttInfoDao; @Autowired OperationLogsService operationLogsService; @Autowired ApplicationContext applicationContext; @Autowired DeviceSyncFullCardService deviceSyncFullCardService; @Autowired private ProjectInfoDao projectInfoDao; @Autowired ProjectInfoService projectInfoService; @Autowired DevicePingInfoDao devicePingInfoDao; /** * 添加网关 * * @param param * @return */ public ResultContent addGateWayInfo(GateWayInfoAddParam param) { initDefaultUser(param); GateWayInfo gateWayInfo = gateWayInfoDao.findTopByGateWayId(param.getGateWayId()); if (ObjectUtils.isEmpty(gateWayInfo)) { gateWayInfo = new GateWayInfo(); gateWayInfo.setActivityTime(System.currentTimeMillis()); } gateWayInfo.setOnLineState(OnLineState.OnLine); gateWayInfo.setLastOnlineTime(System.currentTimeMillis()); BeanUtils.copyProperties(param, gateWayInfo, "id"); gateWayInfoDao.save(gateWayInfo); if (StringUtils.isNotEmpty(param.getProjectInfoCode())) { ProjectInfo projectInfo = projectInfoDao.findTopByCode(param.getProjectInfoCode()); gateWayInfo.setProjectInfo(projectInfo); } log.info("网关注册成功"); // 通知同步 deviceSyncFullCardService.noticeSyncGateWay(gateWayInfo); return ResultContent.buildSuccess(gateWayInfo); } /** * 注册网关 * * @param param * @return */ public ResultContent registerGateWay(GateWayInfoAddParam param) { ProjectInfo projectInfo = projectInfoDao.findTopByCode(param.getProjectInfoCode()); if (ObjectUtils.isEmpty(projectInfo)) { return ResultContent.buildFail(String.format("分组不存在:%s", param.getProjectInfoCode())); } // 添加网关信息 ResultContent gateWayInfo = addGateWayInfo(param); // 给网关分配个mqtt账号 MqttInfoReturnModel mqttInfoSimpleModel = new MqttInfoReturnModel(); mqttInfoSimpleModel.setBrokerAddress("tcp://162.14.78.247:61616"); mqttInfoSimpleModel.setBrokerUsername("admin"); mqttInfoSimpleModel.setBrokerPassword("admin123"); return ResultContent.buildSuccess(mqttInfoSimpleModel); } /** * 网关绑定设备、连接账号 * * @param param * @return */ public ResultContent gateWayBindDevice(GateWayBindDeviceParam param) { // 网关信息 GateWayInfo gateWayInfo = gateWayInfoDao.findTopByGateWayId(param.getGateWayId()); if (ObjectUtils.isEmpty(gateWayInfo)) { log.error("网关未注册"); return ResultContent.buildFail(String.format("网关未注册,清先注册网关:%s", param.getGateWayId())); } String projectInfoCode = param.getProjectInfoCode(); if (ObjectUtils.isEmpty(projectInfoCode)) { projectInfoCode = gateWayInfo.getProjectInfoCode(); } ProjectInfo projectInfo = projectInfoDao.findTopByCode(projectInfoCode); if (ObjectUtils.isEmpty(projectInfo)) { log.error("分组不存在"); return ResultContent.buildFail(String.format("分组不存在:%s", projectInfoCode)); } List devices = param.getDevices(); if (ObjectUtils.isNotEmpty(devices)) { // 检查设备数据合法性 for (DeviceInfoAddParam device : devices) { if (StringUtils.isEmpty(device.getDeviceId())) { return ResultContent.buildFail("deviceId不能为空"); } } } if (StringUtils.isEmpty(param.getUserName())) { param.setUserName("admin"); } // 验证连接账号 GateWayUserInfo gateWayUserInfo = gateWayUserInfoDao.findTopByUserName(param.getUserName()); if (ObjectUtils.isEmpty(gateWayUserInfo)) { return ResultContent.buildFail(String.format("连接账号不存在:%s", param.getUserName())); } // 验证账号是否已绑定网关 GateWay2User gateWay2User = gateWay2UserDao.findTopByGateWayUserInfo(gateWayUserInfo); if (ObjectUtils.isNotEmpty(gateWay2User)) { // 如果有绑定关系 GateWayInfo oldGateWayInfo = gateWay2User.getGateWayInfo(); if (!oldGateWayInfo.getGateWayId().equals(gateWayInfo.getGateWayId())) { // return ResultContent.buildFail(String.format("连接账号%s已使用", gateWayUserInfo.getUserName())); } } // 设备列表 List deviceInfos = new ArrayList<>(); if (ObjectUtils.isNotEmpty(devices)) { // 绑定网关和设备的关系 for (DeviceInfoAddParam device : devices) { device.setProjectInfoCode(projectInfoCode); // 保存设备信息 ResultContent resultContent = deviceInfoService.addDeviceInfo(device); DeviceInfo deviceInfo = resultContent.getContent(); // 设备可以绑定到多个网关,一个网关只能绑定设备一次 GateWay2Device gateWay2Device = gateWay2DeviceDao.findTopByGateWayInfoAndDeviceInfo(gateWayInfo, deviceInfo); if (ObjectUtils.isEmpty(gateWay2Device)) { gateWay2Device = new GateWay2Device(); gateWay2Device.setState(OnLineState.OffLine); } gateWay2Device.setGateWayInfo(gateWayInfo); gateWay2Device.setDeviceInfo(deviceInfo); gateWay2Device.setBindTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG)); gateWay2DeviceDao.save(gateWay2Device); deviceInfos.add(deviceInfo); } } // 绑定网关和用户的关系 if (ObjectUtils.isEmpty(gateWay2User)) { gateWay2User = new GateWay2User(); } gateWay2User.setGateWayUserInfo(gateWayUserInfo); gateWay2User.setGateWayInfo(gateWayInfo); gateWay2User.setBindTime(System.currentTimeMillis()); gateWay2User.setBindTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG)); gateWay2UserDao.save(gateWay2User); // 添加连接账号关于:网关、设备的 Topic权限 bindUserGateWayPermissions(MqttTopicUtils.getUserRoleName(gateWayUserInfo), gateWayInfo.getGateWayId()); // 绑定用户设备权限 bindUserDevicesPermissions(MqttTopicUtils.getUserRoleName(gateWayUserInfo), deviceInfos); log.info("设备注册成功:{}", deviceInfos.size()); // 同步设备 deviceSyncFullCardService.noticeSyncDevice(deviceInfos); return ResultContent.buildSuccess(); } /** * 删除网关 * * @param gateWayId * @return */ public ResultContent deleteGateWayInfo(String gateWayId) { GateWayInfo gateWayInfo = gateWayInfoDao.findTopByGateWayId(gateWayId); if (ObjectUtils.isEmpty(gateWayInfo)) { return ResultContent.buildFail(String.format("网关ID不存在:%s", gateWayId)); } gateWayInfoDao.delete(gateWayInfo); return ResultContent.buildSuccess(); } /** * 网关列表 * * @param pageable * @param param * @return */ public ResultContent> pageGateWay(Pageable pageable, GateWayInfoSearchParam param) { Page page = gateWayInfoDao.page(pageable, param); return ResultContent.buildSuccess(PageEntityUtil.toPageModel(page, this::toModel)); } /** * 查询网关详情 * * @param gateWayId * @return */ public ResultContent getById(String gateWayId) { GateWayInfoModel model = null; GateWayInfo deviceInfo = gateWayInfoDao.findTopByGateWayId(gateWayId); if (ObjectUtils.isNotEmpty(deviceInfo)) { model = toModel(deviceInfo); } return ResultContent.buildSuccess(model); } public GateWayInfoModel toModel(GateWayInfo entity) { GateWayInfoModel model = new GateWayInfoModel(); if (ObjectUtils.isNotEmpty(entity)) { BeanUtils.copyProperties(entity, model); model.setProjectInfo(projectInfoService.toModel(entity.getProjectInfo())); } return model; } /** * 绑定角色网关的权限 * * @param roleName * @param gateWayId * @return */ public ResultContent bindUserGateWayPermissions(String roleName, String gateWayId) { try { List list = mqttInfoDao.findAll(); if (ObjectUtils.isNotEmpty(list)) { for (MqttInfo mqttInfo : list) { String urlStr = String.format("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi", mqttInfo.getJmxHost(), mqttInfo.getJmxPort()); JMXServiceURL url = new JMXServiceURL(urlStr); @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null); connector.connect(); log.info("JMX %s:%s 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort()); MBeanServerConnection connection = connector.getMBeanServerConnection(); ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName(); ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false); addressControl.addSecuritySettings(MqttTopicUtils.buildGateWayAllTopic(gateWayId), roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName); } } } catch (Exception e) { e.printStackTrace(); log.error("bindUserGateWayPermissions 出错:%s", e.getMessage()); } return ResultContent.buildSuccess(); } public ResultContent bindUserDevicesPermissions(String roleName, List deviceInfos) { try { List list = mqttInfoDao.findAll(); if (ObjectUtils.isNotEmpty(list)) { list.parallelStream().forEach(mqttInfo -> { try { String urlStr = String.format("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi", mqttInfo.getJmxHost(), mqttInfo.getJmxPort()); JMXServiceURL url = new JMXServiceURL(urlStr); @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null); connector.connect(); log.info("JMX %s:%s 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort()); MBeanServerConnection connection = connector.getMBeanServerConnection(); ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName(); ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false); for (DeviceInfo deviceInfo : deviceInfos) { addressControl.addSecuritySettings(MqttTopicUtils.buildDeviceAllTopic(deviceInfo.getDeviceId()), roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName); } } catch (Exception e) { e.printStackTrace(); } }); } } catch (Exception e) { e.printStackTrace(); log.error("bindUserDevicesPermissions 出错:%s", e.getMessage()); } return ResultContent.buildSuccess(); } }