GateWayInfoService.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. package com.zswl.dataservice.service.mqtt;
  2. import com.zswl.dataservice.dao.mqtt.*;
  3. import com.zswl.dataservice.domain.mqtt.*;
  4. import com.zswl.dataservice.event.GateWaySyncEvent;
  5. import com.zswl.dataservice.model.mqtt.*;
  6. import com.zswl.dataservice.service.base.SuperService;
  7. import com.zswl.dataservice.utils.DateUtils;
  8. import com.zswl.dataservice.utils.bean.BeanUtils;
  9. import com.zswl.dataservice.utils.mqtt.MqttTopicUtils;
  10. import com.zswl.dataservice.utils.mqtt.type.OnLineState;
  11. import com.zswl.dataservice.utils.page.PageEntityUtil;
  12. import com.zswl.dataservice.utils.result.ResultContent;
  13. import lombok.Cleanup;
  14. import lombok.extern.slf4j.Slf4j;
  15. import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
  16. import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
  17. import org.apache.commons.lang3.ObjectUtils;
  18. import org.apache.commons.lang3.StringUtils;
  19. import org.springframework.beans.factory.annotation.Autowired;
  20. import org.springframework.context.ApplicationContext;
  21. import org.springframework.data.domain.Page;
  22. import org.springframework.data.domain.Pageable;
  23. import org.springframework.stereotype.Service;
  24. import javax.management.MBeanServerConnection;
  25. import javax.management.MBeanServerInvocationHandler;
  26. import javax.management.ObjectName;
  27. import javax.management.remote.JMXConnector;
  28. import javax.management.remote.JMXConnectorFactory;
  29. import javax.management.remote.JMXServiceURL;
  30. import java.util.ArrayList;
  31. import java.util.List;
  32. /**
  33. * @author TRX
  34. * @date 2024/5/17
  35. */
  36. @Slf4j
  37. @Service
  38. public class GateWayInfoService extends SuperService {
  39. @Autowired
  40. GateWayInfoDao gateWayInfoDao;
  41. @Autowired
  42. GateWayUserInfoDao gateWayUserInfoDao;
  43. @Autowired
  44. DeviceInfoDao deviceInfoDao;
  45. @Autowired
  46. GateWay2UserDao gateWay2UserDao;
  47. @Autowired
  48. DeviceInfoService deviceInfoService;
  49. @Autowired
  50. GateWay2DeviceDao gateWay2DeviceDao;
  51. @Autowired
  52. MqttInfoDao mqttInfoDao;
  53. @Autowired
  54. OperationLogsService operationLogsService;
  55. @Autowired
  56. ApplicationContext applicationContext;
  57. @Autowired
  58. DeviceSyncFullCardService deviceSyncFullCardService;
  59. /**
  60. * 添加网关
  61. *
  62. * @param param
  63. * @return
  64. */
  65. public ResultContent addGateWayInfo(GateWayInfoAddParam param) {
  66. initDefaultUser(param);
  67. GateWayInfo gateWayInfo = gateWayInfoDao.findTopByGateWayId(param.getGateWayId());
  68. if (ObjectUtils.isEmpty(gateWayInfo)) {
  69. gateWayInfo = new GateWayInfo();
  70. }
  71. BeanUtils.copyProperties(param, gateWayInfo, "id");
  72. if (param.getState() == null) {
  73. gateWayInfo.setState(OnLineState.OffLine);
  74. }
  75. gateWayInfoDao.save(gateWayInfo);
  76. deviceSyncFullCardService.noticeSyncGateWay(gateWayInfo);
  77. return ResultContent.buildSuccess();
  78. }
  79. /**
  80. * 注册网关
  81. *
  82. * @param param
  83. * @return
  84. */
  85. public ResultContent registerGateWay(GateWayInfoAddParam param) {
  86. addGateWayInfo(param);
  87. return ResultContent.buildSuccess();
  88. }
  89. /**
  90. * 网关绑定设备、连接账号
  91. *
  92. * @param param
  93. * @return
  94. */
  95. public ResultContent gateWayBindDevice(GateWayBindDeviceParam param) {
  96. // 网关信息
  97. GateWayInfo gateWayInfo = gateWayInfoDao.findTopByGateWayId(param.getGateWayId());
  98. if (ObjectUtils.isEmpty(gateWayInfo)) {
  99. return ResultContent.buildFail(String.format("网关未注册:%s", param.getGateWayId()));
  100. }
  101. List<DeviceInfoAddParam> devices = param.getDevices();
  102. if (ObjectUtils.isNotEmpty(devices)) {
  103. // 检查设备数据合法性
  104. for (DeviceInfoAddParam device : devices) {
  105. if (StringUtils.isEmpty(device.getDeviceId())) {
  106. return ResultContent.buildFail("deviceId不能为空");
  107. }
  108. }
  109. }
  110. if (StringUtils.isEmpty(param.getUserName())) {
  111. param.setUserName("admin");
  112. }
  113. // 验证连接账号
  114. GateWayUserInfo gateWayUserInfo = gateWayUserInfoDao.findTopByUserName(param.getUserName());
  115. if (ObjectUtils.isEmpty(gateWayUserInfo)) {
  116. return ResultContent.buildFail(String.format("连接账号不存在:%s", param.getUserName()));
  117. }
  118. // 验证账号是否已绑定网关
  119. GateWay2User gateWay2User = gateWay2UserDao.findTopByGateWayUserInfo(gateWayUserInfo);
  120. if (ObjectUtils.isNotEmpty(gateWay2User)) {
  121. // 如果有绑定关系
  122. GateWayInfo oldGateWayInfo = gateWay2User.getGateWayInfo();
  123. if (!oldGateWayInfo.getGateWayId().equals(gateWayInfo.getGateWayId())) {
  124. // return ResultContent.buildFail(String.format("连接账号%s已使用", gateWayUserInfo.getUserName()));
  125. }
  126. }
  127. // 设备列表
  128. List<DeviceInfo> deviceInfos = new ArrayList<>();
  129. if (ObjectUtils.isNotEmpty(devices)) {
  130. // 绑定网关和设备的关系
  131. for (DeviceInfoAddParam device : devices) {
  132. device.setProjectInfoCode(param.getProjectInfoCode());
  133. // 保存设备信息
  134. ResultContent<DeviceInfo> resultContent = deviceInfoService.addDeviceInfo(device);
  135. DeviceInfo deviceInfo = resultContent.getContent();
  136. GateWay2Device gateWay2Device = gateWay2DeviceDao.findTopByGateWayInfoAndDeviceInfo(gateWayInfo, deviceInfo);
  137. if (ObjectUtils.isEmpty(gateWay2Device)) {
  138. gateWay2Device = new GateWay2Device();
  139. gateWay2Device.setState(OnLineState.OffLine);
  140. }
  141. gateWay2Device.setGateWayInfo(gateWayInfo);
  142. gateWay2Device.setDeviceInfo(deviceInfo);
  143. gateWay2Device.setBindTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
  144. gateWay2DeviceDao.save(gateWay2Device);
  145. deviceInfos.add(deviceInfo);
  146. }
  147. }
  148. // 绑定网关和用户的关系
  149. if (ObjectUtils.isEmpty(gateWay2User)) {
  150. gateWay2User = new GateWay2User();
  151. }
  152. gateWay2User.setGateWayUserInfo(gateWayUserInfo);
  153. gateWay2User.setGateWayInfo(gateWayInfo);
  154. gateWay2User.setBindTime(System.currentTimeMillis());
  155. gateWay2User.setBindTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
  156. gateWay2UserDao.save(gateWay2User);
  157. // 添加连接账号关于:网关、设备的 Topic权限
  158. bindUserGateWayPermissions(MqttTopicUtils.getUserRoleName(gateWayUserInfo), gateWayInfo.getGateWayId());
  159. // 绑定用户设备权限
  160. bindUserDevicesPermissions(MqttTopicUtils.getUserRoleName(gateWayUserInfo), deviceInfos);
  161. deviceSyncFullCardService.noticeSyncDevice(deviceInfos);
  162. return ResultContent.buildSuccess();
  163. }
  164. /**
  165. * 删除网关
  166. *
  167. * @param gateWayId
  168. * @return
  169. */
  170. public ResultContent deleteGateWayInfo(String gateWayId) {
  171. GateWayInfo gateWayInfo = gateWayInfoDao.findTopByGateWayId(gateWayId);
  172. if (ObjectUtils.isEmpty(gateWayInfo)) {
  173. return ResultContent.buildFail(String.format("网关ID不存在:%s", gateWayId));
  174. }
  175. gateWayInfoDao.delete(gateWayInfo);
  176. return ResultContent.buildSuccess();
  177. }
  178. /**
  179. * 网关列表
  180. *
  181. * @param pageable
  182. * @param param
  183. * @return
  184. */
  185. public ResultContent<Page<GateWayInfoModel>> pageGateWay(Pageable pageable, GateWayInfoSearchParam param) {
  186. Page<GateWayInfo> page = gateWayInfoDao.page(pageable, param);
  187. return ResultContent.buildSuccess(PageEntityUtil.toPageModel(page, this::toModel));
  188. }
  189. /**
  190. * 查询网关详情
  191. *
  192. * @param gateWayId
  193. * @return
  194. */
  195. public ResultContent<GateWayInfoModel> getById(String gateWayId) {
  196. GateWayInfoModel model = null;
  197. GateWayInfo deviceInfo = gateWayInfoDao.findTopByGateWayId(gateWayId);
  198. if (ObjectUtils.isNotEmpty(deviceInfo)) {
  199. model = toModel(deviceInfo);
  200. }
  201. return ResultContent.buildSuccess(model);
  202. }
  203. public GateWayInfoModel toModel(GateWayInfo entity) {
  204. GateWayInfoModel model = new GateWayInfoModel();
  205. if (ObjectUtils.isNotEmpty(entity)) {
  206. BeanUtils.copyProperties(entity, model);
  207. }
  208. return model;
  209. }
  210. /**
  211. * 绑定角色网关的权限
  212. *
  213. * @param roleName
  214. * @param gateWayId
  215. * @return
  216. */
  217. public ResultContent bindUserGateWayPermissions(String roleName, String gateWayId) {
  218. try {
  219. List<MqttInfo> list = mqttInfoDao.findAll();
  220. if (ObjectUtils.isNotEmpty(list)) {
  221. for (MqttInfo mqttInfo : list) {
  222. String urlStr = String.format("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
  223. JMXServiceURL url = new JMXServiceURL(urlStr);
  224. @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null);
  225. connector.connect();
  226. log.info("JMX %s:%s 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
  227. MBeanServerConnection connection = connector.getMBeanServerConnection();
  228. ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName();
  229. ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
  230. addressControl.addSecuritySettings(MqttTopicUtils.buildGateWayAllTopic(gateWayId), roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName);
  231. }
  232. }
  233. } catch (Exception e) {
  234. e.printStackTrace();
  235. log.error("bindUserGateWayPermissions 出错:%s", e.getMessage());
  236. }
  237. return ResultContent.buildSuccess();
  238. }
  239. public ResultContent bindUserDevicesPermissions(String roleName, List<DeviceInfo> deviceInfos) {
  240. try {
  241. List<MqttInfo> list = mqttInfoDao.findAll();
  242. if (ObjectUtils.isNotEmpty(list)) {
  243. list.parallelStream().forEach(mqttInfo -> {
  244. try {
  245. String urlStr = String.format("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
  246. JMXServiceURL url = new JMXServiceURL(urlStr);
  247. @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null);
  248. connector.connect();
  249. log.info("JMX %s:%s 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
  250. MBeanServerConnection connection = connector.getMBeanServerConnection();
  251. ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName();
  252. ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
  253. for (DeviceInfo deviceInfo : deviceInfos) {
  254. addressControl.addSecuritySettings(MqttTopicUtils.buildDeviceAllTopic(deviceInfo.getDeviceId()), roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName);
  255. }
  256. } catch (Exception e) {
  257. e.printStackTrace();
  258. }
  259. });
  260. }
  261. } catch (Exception e) {
  262. e.printStackTrace();
  263. log.error("bindUserDevicesPermissions 出错:%s", e.getMessage());
  264. }
  265. return ResultContent.buildSuccess();
  266. }
  267. }