GateWayInfoService.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. package com.zswl.dataservice.service.mqtt;
  2. import com.zswl.dataservice.dao.mqtt.*;
  3. import com.zswl.dataservice.domain.mqtt.*;
  4. import com.zswl.dataservice.model.mqtt.*;
  5. import com.zswl.dataservice.service.base.SuperService;
  6. import com.zswl.dataservice.service.sync.DeviceSyncFullCardService;
  7. import com.zswl.dataservice.utils.DateUtils;
  8. import com.zswl.dataservice.utils.bean.BeanUtils;
  9. import com.zswl.dataservice.utils.mqtt.MqttTopicUtils;
  10. import com.zswl.dataservice.utils.mqtt.type.OnLineState;
  11. import com.zswl.dataservice.utils.page.PageEntityUtil;
  12. import com.zswl.dataservice.utils.result.ResultContent;
  13. import lombok.Cleanup;
  14. import lombok.extern.slf4j.Slf4j;
  15. import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
  16. import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
  17. import org.apache.commons.lang3.ObjectUtils;
  18. import org.apache.commons.lang3.StringUtils;
  19. import org.springframework.beans.factory.annotation.Autowired;
  20. import org.springframework.context.ApplicationContext;
  21. import org.springframework.data.domain.Page;
  22. import org.springframework.data.domain.Pageable;
  23. import org.springframework.stereotype.Service;
  24. import javax.management.MBeanServerConnection;
  25. import javax.management.MBeanServerInvocationHandler;
  26. import javax.management.ObjectName;
  27. import javax.management.remote.JMXConnector;
  28. import javax.management.remote.JMXConnectorFactory;
  29. import javax.management.remote.JMXServiceURL;
  30. import java.util.ArrayList;
  31. import java.util.List;
  32. /**
  33. * @author TRX
  34. * @date 2024/5/17
  35. */
  36. @Slf4j
  37. @Service
  38. public class GateWayInfoService extends SuperService {
  39. @Autowired
  40. GateWayInfoDao gateWayInfoDao;
  41. @Autowired
  42. GateWayUserInfoDao gateWayUserInfoDao;
  43. @Autowired
  44. DeviceInfoDao deviceInfoDao;
  45. @Autowired
  46. GateWay2UserDao gateWay2UserDao;
  47. @Autowired
  48. DeviceInfoService deviceInfoService;
  49. @Autowired
  50. GateWay2DeviceDao gateWay2DeviceDao;
  51. @Autowired
  52. MqttInfoDao mqttInfoDao;
  53. @Autowired
  54. OperationLogsService operationLogsService;
  55. @Autowired
  56. ApplicationContext applicationContext;
  57. @Autowired
  58. DeviceSyncFullCardService deviceSyncFullCardService;
  59. @Autowired
  60. private ProjectInfoDao projectInfoDao;
  61. @Autowired
  62. ProjectInfoService projectInfoService;
  63. /**
  64. * 添加网关
  65. *
  66. * @param param
  67. * @return
  68. */
  69. public ResultContent addGateWayInfo(GateWayInfoAddParam param) {
  70. initDefaultUser(param);
  71. GateWayInfo gateWayInfo = gateWayInfoDao.findTopByGateWayId(param.getGateWayId());
  72. if (ObjectUtils.isEmpty(gateWayInfo)) {
  73. gateWayInfo = new GateWayInfo();
  74. }
  75. BeanUtils.copyProperties(param, gateWayInfo, "id");
  76. if (param.getState() == null) {
  77. gateWayInfo.setState(OnLineState.OffLine);
  78. }
  79. gateWayInfoDao.save(gateWayInfo);
  80. if (StringUtils.isNotEmpty(param.getProjectInfoCode())) {
  81. ProjectInfo projectInfo = projectInfoDao.findTopByCode(param.getProjectInfoCode());
  82. gateWayInfo.setProjectInfo(projectInfo);
  83. }
  84. // 通知同步
  85. deviceSyncFullCardService.noticeSyncGateWay(gateWayInfo);
  86. return ResultContent.buildSuccess();
  87. }
  88. /**
  89. * 注册网关
  90. *
  91. * @param param
  92. * @return
  93. */
  94. public ResultContent registerGateWay(GateWayInfoAddParam param) {
  95. addGateWayInfo(param);
  96. return ResultContent.buildSuccess();
  97. }
  98. /**
  99. * 网关绑定设备、连接账号
  100. *
  101. * @param param
  102. * @return
  103. */
  104. public ResultContent gateWayBindDevice(GateWayBindDeviceParam param) {
  105. // 网关信息
  106. GateWayInfo gateWayInfo = gateWayInfoDao.findTopByGateWayId(param.getGateWayId());
  107. if (ObjectUtils.isEmpty(gateWayInfo)) {
  108. return ResultContent.buildFail(String.format("网关未注册:%s", param.getGateWayId()));
  109. }
  110. List<DeviceInfoAddParam> devices = param.getDevices();
  111. if (ObjectUtils.isNotEmpty(devices)) {
  112. // 检查设备数据合法性
  113. for (DeviceInfoAddParam device : devices) {
  114. if (StringUtils.isEmpty(device.getDeviceId())) {
  115. return ResultContent.buildFail("deviceId不能为空");
  116. }
  117. }
  118. }
  119. if (StringUtils.isEmpty(param.getUserName())) {
  120. param.setUserName("admin");
  121. }
  122. // 验证连接账号
  123. GateWayUserInfo gateWayUserInfo = gateWayUserInfoDao.findTopByUserName(param.getUserName());
  124. if (ObjectUtils.isEmpty(gateWayUserInfo)) {
  125. return ResultContent.buildFail(String.format("连接账号不存在:%s", param.getUserName()));
  126. }
  127. // 验证账号是否已绑定网关
  128. GateWay2User gateWay2User = gateWay2UserDao.findTopByGateWayUserInfo(gateWayUserInfo);
  129. if (ObjectUtils.isNotEmpty(gateWay2User)) {
  130. // 如果有绑定关系
  131. GateWayInfo oldGateWayInfo = gateWay2User.getGateWayInfo();
  132. if (!oldGateWayInfo.getGateWayId().equals(gateWayInfo.getGateWayId())) {
  133. // return ResultContent.buildFail(String.format("连接账号%s已使用", gateWayUserInfo.getUserName()));
  134. }
  135. }
  136. String projectInfoCode = param.getProjectInfoCode();
  137. // 设备列表
  138. List<DeviceInfo> deviceInfos = new ArrayList<>();
  139. if (ObjectUtils.isNotEmpty(devices)) {
  140. // 绑定网关和设备的关系
  141. for (DeviceInfoAddParam device : devices) {
  142. device.setProjectInfoCode(projectInfoCode);
  143. // 保存设备信息
  144. ResultContent<DeviceInfo> resultContent = deviceInfoService.addDeviceInfo(device);
  145. DeviceInfo deviceInfo = resultContent.getContent();
  146. GateWay2Device gateWay2Device = gateWay2DeviceDao.findTopByGateWayInfoAndDeviceInfo(gateWayInfo, deviceInfo);
  147. if (ObjectUtils.isEmpty(gateWay2Device)) {
  148. gateWay2Device = new GateWay2Device();
  149. gateWay2Device.setState(OnLineState.OffLine);
  150. }
  151. gateWay2Device.setGateWayInfo(gateWayInfo);
  152. gateWay2Device.setDeviceInfo(deviceInfo);
  153. gateWay2Device.setBindTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
  154. gateWay2DeviceDao.save(gateWay2Device);
  155. deviceInfos.add(deviceInfo);
  156. }
  157. }
  158. // 绑定网关和用户的关系
  159. if (ObjectUtils.isEmpty(gateWay2User)) {
  160. gateWay2User = new GateWay2User();
  161. }
  162. gateWay2User.setGateWayUserInfo(gateWayUserInfo);
  163. gateWay2User.setGateWayInfo(gateWayInfo);
  164. gateWay2User.setBindTime(System.currentTimeMillis());
  165. gateWay2User.setBindTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
  166. gateWay2UserDao.save(gateWay2User);
  167. // 添加连接账号关于:网关、设备的 Topic权限
  168. bindUserGateWayPermissions(MqttTopicUtils.getUserRoleName(gateWayUserInfo), gateWayInfo.getGateWayId());
  169. // 绑定用户设备权限
  170. bindUserDevicesPermissions(MqttTopicUtils.getUserRoleName(gateWayUserInfo), deviceInfos);
  171. // 同步设备
  172. deviceSyncFullCardService.noticeSyncDevice(deviceInfos);
  173. return ResultContent.buildSuccess();
  174. }
  175. /**
  176. * 删除网关
  177. *
  178. * @param gateWayId
  179. * @return
  180. */
  181. public ResultContent deleteGateWayInfo(String gateWayId) {
  182. GateWayInfo gateWayInfo = gateWayInfoDao.findTopByGateWayId(gateWayId);
  183. if (ObjectUtils.isEmpty(gateWayInfo)) {
  184. return ResultContent.buildFail(String.format("网关ID不存在:%s", gateWayId));
  185. }
  186. gateWayInfoDao.delete(gateWayInfo);
  187. return ResultContent.buildSuccess();
  188. }
  189. /**
  190. * 网关列表
  191. *
  192. * @param pageable
  193. * @param param
  194. * @return
  195. */
  196. public ResultContent<Page<GateWayInfoModel>> pageGateWay(Pageable pageable, GateWayInfoSearchParam param) {
  197. Page<GateWayInfo> page = gateWayInfoDao.page(pageable, param);
  198. return ResultContent.buildSuccess(PageEntityUtil.toPageModel(page, this::toModel));
  199. }
  200. /**
  201. * 查询网关详情
  202. *
  203. * @param gateWayId
  204. * @return
  205. */
  206. public ResultContent<GateWayInfoModel> getById(String gateWayId) {
  207. GateWayInfoModel model = null;
  208. GateWayInfo deviceInfo = gateWayInfoDao.findTopByGateWayId(gateWayId);
  209. if (ObjectUtils.isNotEmpty(deviceInfo)) {
  210. model = toModel(deviceInfo);
  211. }
  212. return ResultContent.buildSuccess(model);
  213. }
  214. public GateWayInfoModel toModel(GateWayInfo entity) {
  215. GateWayInfoModel model = new GateWayInfoModel();
  216. if (ObjectUtils.isNotEmpty(entity)) {
  217. BeanUtils.copyProperties(entity, model);
  218. model.setProjectInfo(projectInfoService.toModel(entity.getProjectInfo()));
  219. }
  220. return model;
  221. }
  222. /**
  223. * 绑定角色网关的权限
  224. *
  225. * @param roleName
  226. * @param gateWayId
  227. * @return
  228. */
  229. public ResultContent bindUserGateWayPermissions(String roleName, String gateWayId) {
  230. try {
  231. List<MqttInfo> list = mqttInfoDao.findAll();
  232. if (ObjectUtils.isNotEmpty(list)) {
  233. for (MqttInfo mqttInfo : list) {
  234. String urlStr = String.format("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
  235. JMXServiceURL url = new JMXServiceURL(urlStr);
  236. @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null);
  237. connector.connect();
  238. log.info("JMX %s:%s 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
  239. MBeanServerConnection connection = connector.getMBeanServerConnection();
  240. ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName();
  241. ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
  242. addressControl.addSecuritySettings(MqttTopicUtils.buildGateWayAllTopic(gateWayId), roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName);
  243. }
  244. }
  245. } catch (Exception e) {
  246. e.printStackTrace();
  247. log.error("bindUserGateWayPermissions 出错:%s", e.getMessage());
  248. }
  249. return ResultContent.buildSuccess();
  250. }
  251. public ResultContent bindUserDevicesPermissions(String roleName, List<DeviceInfo> deviceInfos) {
  252. try {
  253. List<MqttInfo> list = mqttInfoDao.findAll();
  254. if (ObjectUtils.isNotEmpty(list)) {
  255. list.parallelStream().forEach(mqttInfo -> {
  256. try {
  257. String urlStr = String.format("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
  258. JMXServiceURL url = new JMXServiceURL(urlStr);
  259. @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null);
  260. connector.connect();
  261. log.info("JMX %s:%s 连接成功...", mqttInfo.getJmxHost(), mqttInfo.getJmxPort());
  262. MBeanServerConnection connection = connector.getMBeanServerConnection();
  263. ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName();
  264. ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
  265. for (DeviceInfo deviceInfo : deviceInfos) {
  266. addressControl.addSecuritySettings(MqttTopicUtils.buildDeviceAllTopic(deviceInfo.getDeviceId()), roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName, roleName);
  267. }
  268. } catch (Exception e) {
  269. e.printStackTrace();
  270. }
  271. });
  272. }
  273. } catch (Exception e) {
  274. e.printStackTrace();
  275. log.error("bindUserDevicesPermissions 出错:%s", e.getMessage());
  276. }
  277. return ResultContent.buildSuccess();
  278. }
  279. }