| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345 |
- package com.zswl.dataservice.service.mqtt;
- import com.zswl.dataservice.dao.mqtt.*;
- import com.zswl.dataservice.domain.mqtt.*;
- import com.zswl.dataservice.model.mqtt.*;
- import com.zswl.dataservice.service.base.SuperService;
- import com.zswl.dataservice.service.sync.DeviceSyncFullCardService;
- import com.zswl.dataservice.service.user.OperationLogsService;
- import com.zswl.dataservice.utils.DateUtils;
- import com.zswl.dataservice.utils.bean.BeanUtils;
- import com.zswl.dataservice.utils.mqtt.MqttTopicUtils;
- import com.zswl.dataservice.utils.mqtt.type.OnLineState;
- import com.zswl.dataservice.utils.page.PageEntityUtil;
- import com.zswl.dataservice.utils.result.ResultContent;
- import lombok.Cleanup;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
- import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
- import org.apache.commons.lang3.ObjectUtils;
- import org.apache.commons.lang3.StringUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.ApplicationContext;
- import org.springframework.data.domain.Page;
- import org.springframework.data.domain.Pageable;
- import org.springframework.stereotype.Service;
- import javax.management.MBeanServerConnection;
- import javax.management.MBeanServerInvocationHandler;
- import javax.management.ObjectName;
- import javax.management.remote.JMXConnector;
- import javax.management.remote.JMXConnectorFactory;
- import javax.management.remote.JMXServiceURL;
- import java.util.ArrayList;
- import java.util.List;
- /**
- * @author TRX
- * @date 2024/5/17
- */
- @Slf4j
- @Service
- public class GateWayInfoService extends SuperService {
- @Autowired
- GateWayInfoDao gateWayInfoDao;
- @Autowired
- GateWayUserInfoDao gateWayUserInfoDao;
- @Autowired
- DeviceInfoDao deviceInfoDao;
- @Autowired
- GateWay2UserDao gateWay2UserDao;
- @Autowired
- DeviceInfoService deviceInfoService;
- @Autowired
- GateWay2DeviceDao gateWay2DeviceDao;
- @Autowired
- MqttInfoDao mqttInfoDao;
- @Autowired
- OperationLogsService operationLogsService;
- @Autowired
- ApplicationContext applicationContext;
- @Autowired
- DeviceSyncFullCardService deviceSyncFullCardService;
- @Autowired
- private ProjectInfoDao projectInfoDao;
- @Autowired
- ProjectInfoService projectInfoService;
- @Autowired
- DevicePingInfoDao devicePingInfoDao;
- /**
- * 添加网关
- *
- * @param param
- * @return
- */
- public ResultContent<GateWayInfo> addGateWayInfo(GateWayInfoAddParam param) {
- initDefaultUser(param);
- GateWayInfo gateWayInfo = gateWayInfoDao.findTopByGateWayId(param.getGateWayId());
- if (ObjectUtils.isEmpty(gateWayInfo)) {
- gateWayInfo = new GateWayInfo();
- gateWayInfo.setActivityTime(System.currentTimeMillis());
- }
- gateWayInfo.setOnLineState(OnLineState.OnLine);
- gateWayInfo.setLastOnlineTime(System.currentTimeMillis());
- BeanUtils.copyProperties(param, gateWayInfo, "id");
- gateWayInfoDao.save(gateWayInfo);
- if (StringUtils.isNotEmpty(param.getProjectInfoCode())) {
- ProjectInfo projectInfo = projectInfoDao.findTopByCode(param.getProjectInfoCode());
- gateWayInfo.setProjectInfo(projectInfo);
- }
- log.info("网关注册成功");
- // 通知同步
- deviceSyncFullCardService.noticeSyncGateWay(gateWayInfo);
- return ResultContent.buildSuccess(gateWayInfo);
- }
- /**
- * 注册网关
- *
- * @param param
- * @return
- */
- public ResultContent<MqttInfoReturnModel> registerGateWay(GateWayInfoAddParam param) {
- ProjectInfo projectInfo = projectInfoDao.findTopByCode(param.getProjectInfoCode());
- if (ObjectUtils.isEmpty(projectInfo)) {
- return ResultContent.buildFail(String.format("分组不存在:%s", param.getProjectInfoCode()));
- }
- // 添加网关信息
- ResultContent<GateWayInfo> gateWayInfo = addGateWayInfo(param);
- // 给网关分配个mqtt账号
- MqttInfoReturnModel mqttInfoSimpleModel = new MqttInfoReturnModel();
- mqttInfoSimpleModel.setBrokerAddress("tcp://162.14.78.247:61616");
- mqttInfoSimpleModel.setBrokerUsername("admin");
- mqttInfoSimpleModel.setBrokerPassword("admin123");
- return ResultContent.buildSuccess(mqttInfoSimpleModel);
- }
- /**
- * 网关绑定设备、连接账号
- *
- * @param param
- * @return
- */
- public ResultContent gateWayBindDevice(GateWayBindDeviceParam param) {
- // 网关信息
- GateWayInfo gateWayInfo = gateWayInfoDao.findTopByGateWayId(param.getGateWayId());
- if (ObjectUtils.isEmpty(gateWayInfo)) {
- log.error("网关未注册");
- return ResultContent.buildFail(String.format("网关未注册,清先注册网关:%s", param.getGateWayId()));
- }
- String projectInfoCode = param.getProjectInfoCode();
- if (ObjectUtils.isEmpty(projectInfoCode)) {
- projectInfoCode = gateWayInfo.getProjectInfoCode();
- }
- ProjectInfo projectInfo = projectInfoDao.findTopByCode(projectInfoCode);
- if (ObjectUtils.isEmpty(projectInfo)) {
- log.error("分组不存在");
- return ResultContent.buildFail(String.format("分组不存在:%s", projectInfoCode));
- }
- List<DeviceInfoAddParam> devices = param.getDevices();
- if (ObjectUtils.isNotEmpty(devices)) {
- // 检查设备数据合法性
- for (DeviceInfoAddParam device : devices) {
- if (StringUtils.isEmpty(device.getDeviceId())) {
- return ResultContent.buildFail("deviceId不能为空");
- }
- }
- }
- if (StringUtils.isEmpty(param.getUserName())) {
- param.setUserName("admin");
- }
- // 验证连接账号
- GateWayUserInfo gateWayUserInfo = gateWayUserInfoDao.findTopByUserName(param.getUserName());
- if (ObjectUtils.isEmpty(gateWayUserInfo)) {
- return ResultContent.buildFail(String.format("连接账号不存在:%s", param.getUserName()));
- }
- // 验证账号是否已绑定网关
- GateWay2User gateWay2User = gateWay2UserDao.findTopByGateWayUserInfo(gateWayUserInfo);
- if (ObjectUtils.isNotEmpty(gateWay2User)) {
- // 如果有绑定关系
- GateWayInfo oldGateWayInfo = gateWay2User.getGateWayInfo();
- if (!oldGateWayInfo.getGateWayId().equals(gateWayInfo.getGateWayId())) {
- // return ResultContent.buildFail(String.format("连接账号%s已使用", gateWayUserInfo.getUserName()));
- }
- }
- // 设备列表
- List<DeviceInfo> deviceInfos = new ArrayList<>();
- if (ObjectUtils.isNotEmpty(devices)) {
- // 绑定网关和设备的关系
- for (DeviceInfoAddParam device : devices) {
- device.setProjectInfoCode(projectInfoCode);
- // 保存设备信息
- ResultContent<DeviceInfo> resultContent = deviceInfoService.addDeviceInfo(device);
- DeviceInfo deviceInfo = resultContent.getContent();
- // 设备可以绑定到多个网关,一个网关只能绑定设备一次
- GateWay2Device gateWay2Device = gateWay2DeviceDao.findTopByGateWayInfoAndDeviceInfo(gateWayInfo, deviceInfo);
- if (ObjectUtils.isEmpty(gateWay2Device)) {
- gateWay2Device = new GateWay2Device();
- gateWay2Device.setState(OnLineState.OffLine);
- }
- gateWay2Device.setGateWayInfo(gateWayInfo);
- gateWay2Device.setDeviceInfo(deviceInfo);
- gateWay2Device.setBindTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
- gateWay2DeviceDao.save(gateWay2Device);
- deviceInfos.add(deviceInfo);
- }
- }
- // 绑定网关和用户的关系
- if (ObjectUtils.isEmpty(gateWay2User)) {
- gateWay2User = new GateWay2User();
- }
- gateWay2User.setGateWayUserInfo(gateWayUserInfo);
- gateWay2User.setGateWayInfo(gateWayInfo);
- gateWay2User.setBindTime(System.currentTimeMillis());
- gateWay2User.setBindTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
- gateWay2UserDao.save(gateWay2User);
- // 添加连接账号关于:网关、设备的 Topic权限
- bindUserGateWayPermissions(MqttTopicUtils.getUserRoleName(gateWayUserInfo), gateWayInfo.getGateWayId());
- // 绑定用户设备权限
- bindUserDevicesPermissions(MqttTopicUtils.getUserRoleName(gateWayUserInfo), deviceInfos);
- log.info("设备注册成功:{}", deviceInfos.size());
- // 同步设备
- deviceSyncFullCardService.noticeSyncDevice(deviceInfos);
- return ResultContent.buildSuccess();
- }
- /**
- * 删除网关
- *
- * @param gateWayId
- * @return
- */
- public ResultContent deleteGateWayInfo(String gateWayId) {
- GateWayInfo gateWayInfo = gateWayInfoDao.findTopByGateWayId(gateWayId);
- if (ObjectUtils.isEmpty(gateWayInfo)) {
- return ResultContent.buildFail(String.format("网关ID不存在:%s", gateWayId));
- }
- gateWayInfoDao.delete(gateWayInfo);
- return ResultContent.buildSuccess();
- }
- /**
- * 网关列表
- *
- * @param pageable
- * @param param
- * @return
- */
- public ResultContent<Page<GateWayInfoModel>> pageGateWay(Pageable pageable, GateWayInfoSearchParam param) {
- Page<GateWayInfo> page = gateWayInfoDao.page(pageable, param);
- return ResultContent.buildSuccess(PageEntityUtil.toPageModel(page, this::toModel));
- }
- /**
- * 查询网关详情
- *
- * @param gateWayId
- * @return
- */
- public ResultContent<GateWayInfoModel> getById(String gateWayId) {
- GateWayInfoModel model = null;
- GateWayInfo deviceInfo = gateWayInfoDao.findTopByGateWayId(gateWayId);
- if (ObjectUtils.isNotEmpty(deviceInfo)) {
- model = toModel(deviceInfo);
- }
- return ResultContent.buildSuccess(model);
- }
- public GateWayInfoModel toModel(GateWayInfo entity) {
- GateWayInfoModel model = new GateWayInfoModel();
- if (ObjectUtils.isNotEmpty(entity)) {
- BeanUtils.copyProperties(entity, model);
- model.setProjectInfo(projectInfoService.toModel(entity.getProjectInfo()));
- }
- return model;
- }
- /**
- * 绑定角色网关的权限
- *
- * @param roleName
- * @param gateWayId
- * @return
- */
- public ResultContent bindUserGateWayPermissions(String roleName, String gateWayId) {
- try {
- List<MqttInfo> list = mqttInfoDao.findAll();
- if (ObjectUtils.isNotEmpty(list)) {
- for (MqttInfo mqttInfo : list) {
- String urlStr = String.format("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
- JMXServiceURL url = new JMXServiceURL(urlStr);
- @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null);
- connector.connect();
- log.info("JMX %s:%s 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
- MBeanServerConnection connection = connector.getMBeanServerConnection();
- ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName();
- ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
- addressControl.addSecuritySettings(MqttTopicUtils.buildGateWayAllTopic(gateWayId), roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- log.error("bindUserGateWayPermissions 出错:%s", e.getMessage());
- }
- return ResultContent.buildSuccess();
- }
- public ResultContent bindUserDevicesPermissions(String roleName, List<DeviceInfo> deviceInfos) {
- try {
- List<MqttInfo> list = mqttInfoDao.findAll();
- if (ObjectUtils.isNotEmpty(list)) {
- list.parallelStream().forEach(mqttInfo -> {
- try {
- String urlStr = String.format("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
- JMXServiceURL url = new JMXServiceURL(urlStr);
- @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null);
- connector.connect();
- log.info("JMX %s:%s 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
- MBeanServerConnection connection = connector.getMBeanServerConnection();
- ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName();
- ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
- for (DeviceInfo deviceInfo : deviceInfos) {
- addressControl.addSecuritySettings(MqttTopicUtils.buildDeviceAllTopic(deviceInfo.getDeviceId()), roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
- }
- } catch (Exception e) {
- e.printStackTrace();
- log.error("bindUserDevicesPermissions 出错:%s", e.getMessage());
- }
- return ResultContent.buildSuccess();
- }
- }
|