GateWayInfoService.java 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. package com.zswl.dataservice.service.mqtt;
  2. import com.zswl.dataservice.dao.mqtt.*;
  3. import com.zswl.dataservice.domain.mqtt.*;
  4. import com.zswl.dataservice.model.mqtt.*;
  5. import com.zswl.dataservice.utils.DateUtils;
  6. import com.zswl.dataservice.utils.bean.BeanUtils;
  7. import com.zswl.dataservice.utils.mqtt.MqttTopicUtils;
  8. import com.zswl.dataservice.utils.mqtt.type.OnLineState;
  9. import com.zswl.dataservice.utils.page.PageEntityUtil;
  10. import com.zswl.dataservice.utils.result.ResultContent;
  11. import lombok.Cleanup;
  12. import lombok.extern.slf4j.Slf4j;
  13. import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
  14. import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
  15. import org.apache.commons.lang3.ObjectUtils;
  16. import org.springframework.beans.factory.annotation.Autowired;
  17. import org.springframework.data.domain.Page;
  18. import org.springframework.data.domain.Pageable;
  19. import org.springframework.stereotype.Service;
  20. import javax.management.MBeanServerConnection;
  21. import javax.management.MBeanServerInvocationHandler;
  22. import javax.management.ObjectName;
  23. import javax.management.remote.JMXConnector;
  24. import javax.management.remote.JMXConnectorFactory;
  25. import javax.management.remote.JMXServiceURL;
  26. import java.util.ArrayList;
  27. import java.util.List;
  28. /**
  29. * @author TRX
  30. * @date 2024/5/17
  31. */
  32. @Slf4j
  33. @Service
  34. public class GateWayInfoService {
  35. @Autowired
  36. GateWayInfoDao gateWayInfoDao;
  37. @Autowired
  38. GateWayUserInfoDao gateWayUserInfoDao;
  39. @Autowired
  40. DeviceInfoDao deviceInfoDao;
  41. @Autowired
  42. GateWay2UserDao gateWay2UserDao;
  43. @Autowired
  44. DeviceInfoService deviceInfoService;
  45. @Autowired
  46. GateWay2DeviceDao gateWay2DeviceDao;
  47. @Autowired
  48. MqttInfoDao mqttInfoDao;
  49. @Autowired
  50. OperationLogsService operationLogsService;
  51. /**
  52. * 添加网关
  53. *
  54. * @param param
  55. * @return
  56. */
  57. public ResultContent addGateWayInfo(GateWayInfoAddParam param) {
  58. GateWayInfo gateWayInfo = gateWayInfoDao.findTopByGateWayId(param.getGateWayId());
  59. if (ObjectUtils.isNotEmpty(gateWayInfo)) {
  60. return ResultContent.buildFail(String.format("网关ID已存在:%s", param.getGateWayId()));
  61. }
  62. gateWayInfo = new GateWayInfo();
  63. BeanUtils.copyProperties(param, gateWayInfo);
  64. if (param.getState() == null) {
  65. gateWayInfo.setState(OnLineState.OffLine);
  66. }
  67. gateWayInfoDao.save(gateWayInfo);
  68. return ResultContent.buildSuccess();
  69. }
  70. /**
  71. * 网关绑定设备、连接账号
  72. *
  73. * @param param
  74. * @return
  75. */
  76. public ResultContent gateWayBindDevice(GateWayBindDeviceParam param) {
  77. GateWayInfo gateWayInfo = gateWayInfoDao.findTopByGateWayId(param.getGateWayId());
  78. if (ObjectUtils.isEmpty(gateWayInfo)) {
  79. // 如果网关不存在,则初始化
  80. GateWayInfoAddParam addParam = new GateWayInfoAddParam();
  81. addParam.setGateWayId(param.getGateWayId());
  82. addParam.setGateWayName(param.getGateWayName());
  83. }
  84. gateWayInfo = gateWayInfoDao.findTopByGateWayId(param.getGateWayId());
  85. // 验证连接账号
  86. GateWayUserInfo gateWayUserInfo = gateWayUserInfoDao.findTopByUserName(param.getUserName());
  87. if (ObjectUtils.isEmpty(gateWayUserInfo)) {
  88. return ResultContent.buildFail(String.format("连接账号不存在:%s", param.getUserName()));
  89. }
  90. // 验证账号是否已绑定网关
  91. GateWay2User gateWay2User = gateWay2UserDao.findTopByGateWayUserInfo(gateWayUserInfo);
  92. if (ObjectUtils.isNotEmpty(gateWay2User)) {
  93. // 如果有绑定关系
  94. GateWayInfo oldGateWayInfo = gateWay2User.getGateWayInfo();
  95. if (!oldGateWayInfo.getGateWayId().equals(gateWayInfo.getGateWayId())) {
  96. return ResultContent.buildFail(String.format("连接账号%s已使用", gateWayUserInfo.getUserName()));
  97. }
  98. }
  99. // 设备列表
  100. List<DeviceInfo> deviceInfos = new ArrayList<>();
  101. List<DeviceInfoAddParam> devices = new ArrayList<>();
  102. if (ObjectUtils.isNotEmpty(devices)) {
  103. // 绑定网关和设备的关系
  104. for (DeviceInfoAddParam device : devices) {
  105. // 如果设备不存在,,则初始化
  106. ResultContent<DeviceInfo> resultContent = deviceInfoService.initDeviceInfo(device);
  107. DeviceInfo deviceInfo = resultContent.getContent();
  108. GateWay2Device gateWay2Device = gateWay2DeviceDao.findTopByGateWayInfoAndDeviceInfo(gateWayInfo, deviceInfo);
  109. if (ObjectUtils.isEmpty(gateWay2Device)) {
  110. gateWay2Device = new GateWay2Device();
  111. gateWay2Device.setState(OnLineState.OffLine);
  112. }
  113. gateWay2Device.setGateWayInfo(gateWayInfo);
  114. gateWay2Device.setDeviceInfo(deviceInfo);
  115. gateWay2Device.setBindTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
  116. gateWay2DeviceDao.save(gateWay2Device);
  117. deviceInfos.add(deviceInfo);
  118. }
  119. }
  120. // 绑定网关和用户的关系
  121. if (ObjectUtils.isEmpty(gateWay2User)) {
  122. gateWay2User = new GateWay2User();
  123. }
  124. gateWay2User.setGateWayUserInfo(gateWayUserInfo);
  125. gateWay2User.setGateWayInfo(gateWayInfo);
  126. gateWay2User.setBindTime(System.currentTimeMillis());
  127. gateWay2User.setBindTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
  128. gateWay2UserDao.save(gateWay2User);
  129. // 添加连接账号关于:网关、设备的 Topic权限
  130. bindUserGateWayPermissions(MqttTopicUtils.getUserRoleName(gateWayUserInfo), gateWayInfo.getGateWayId());
  131. // 绑定用户设备权限
  132. bindUserDevicesPermissions(MqttTopicUtils.getUserRoleName(gateWayUserInfo), deviceInfos);
  133. return ResultContent.buildSuccess();
  134. }
  135. /**
  136. * 删除网关
  137. *
  138. * @param gateWayId
  139. * @return
  140. */
  141. public ResultContent deleteGateWayInfo(String gateWayId) {
  142. GateWayInfo gateWayInfo = gateWayInfoDao.findTopByGateWayId(gateWayId);
  143. if (ObjectUtils.isEmpty(gateWayInfo)) {
  144. return ResultContent.buildFail(String.format("网关ID不存在:%s", gateWayId));
  145. }
  146. gateWayInfoDao.delete(gateWayInfo);
  147. return ResultContent.buildSuccess();
  148. }
  149. /**
  150. * 网关列表
  151. *
  152. * @param pageable
  153. * @param param
  154. * @return
  155. */
  156. public ResultContent<Page<GateWayInfoModel>> pageGateWay(Pageable pageable, GateWayInfoSearchParam param) {
  157. Page<GateWayInfo> page = gateWayInfoDao.page(pageable, param);
  158. return ResultContent.buildSuccess(PageEntityUtil.toPageModel(page, this::toModel));
  159. }
  160. /**
  161. * 查询网关详情
  162. *
  163. * @param gateWayId
  164. * @return
  165. */
  166. public ResultContent<GateWayInfoModel> getById(String gateWayId) {
  167. GateWayInfoModel model = null;
  168. GateWayInfo deviceInfo = gateWayInfoDao.findTopByGateWayId(gateWayId);
  169. if (ObjectUtils.isNotEmpty(deviceInfo)) {
  170. model = toModel(deviceInfo);
  171. }
  172. return ResultContent.buildSuccess(model);
  173. }
  174. public GateWayInfoModel toModel(GateWayInfo entity) {
  175. GateWayInfoModel model = new GateWayInfoModel();
  176. if (ObjectUtils.isNotEmpty(entity)) {
  177. BeanUtils.copyProperties(entity, model);
  178. }
  179. return model;
  180. }
  181. /**
  182. * 绑定角色网关的权限
  183. *
  184. * @param roleName
  185. * @param gateWayId
  186. * @return
  187. */
  188. public ResultContent bindUserGateWayPermissions(String roleName, String gateWayId) {
  189. try {
  190. List<MqttInfo> list = mqttInfoDao.findAll();
  191. if (ObjectUtils.isNotEmpty(list)) {
  192. for (MqttInfo mqttInfo : list) {
  193. String urlStr = String.format("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
  194. JMXServiceURL url = new JMXServiceURL(urlStr);
  195. @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null);
  196. connector.connect();
  197. log.info("JMX %s:%s 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
  198. MBeanServerConnection connection = connector.getMBeanServerConnection();
  199. ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName();
  200. ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
  201. addressControl.addSecuritySettings(MqttTopicUtils.buildGateWayAllTopic(gateWayId), roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName);
  202. }
  203. }
  204. } catch (Exception e) {
  205. e.printStackTrace();
  206. log.error("bindUserGateWayPermissions 出错:%s", e.getMessage());
  207. }
  208. return ResultContent.buildSuccess();
  209. }
  210. public ResultContent bindUserDevicesPermissions(String roleName, List<DeviceInfo> deviceInfos) {
  211. try {
  212. List<MqttInfo> list = mqttInfoDao.findAll();
  213. if (ObjectUtils.isNotEmpty(list)) {
  214. list.parallelStream().forEach(mqttInfo -> {
  215. try {
  216. String urlStr = String.format("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
  217. JMXServiceURL url = new JMXServiceURL(urlStr);
  218. @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null);
  219. connector.connect();
  220. log.info("JMX %s:%s 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
  221. MBeanServerConnection connection = connector.getMBeanServerConnection();
  222. ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName();
  223. ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
  224. for (DeviceInfo deviceInfo : deviceInfos) {
  225. addressControl.addSecuritySettings(MqttTopicUtils.buildDeviceAllTopic(deviceInfo.getDeviceId()), roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName);
  226. }
  227. } catch (Exception e) {
  228. e.printStackTrace();
  229. }
  230. });
  231. }
  232. } catch (Exception e) {
  233. e.printStackTrace();
  234. log.error("bindUserDevicesPermissions 出错:%s", e.getMessage());
  235. }
  236. return ResultContent.buildSuccess();
  237. }
  238. }