OperationMessageService.java 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. package com.zswl.dataservice.service.artemis;
  2. import cn.hutool.core.date.StopWatch;
  3. import cn.hutool.json.JSONObject;
  4. import cn.hutool.json.JSONUtil;
  5. import com.google.gson.JsonObject;
  6. import com.zswl.dataservice.dao.mqtt.DeviceInfoDao;
  7. import com.zswl.dataservice.dao.mqtt.OperationMessageDao;
  8. import com.zswl.dataservice.dao.other.ExecuteMethodInfoDao;
  9. import com.zswl.dataservice.domain.mqtt.DeviceInfo;
  10. import com.zswl.dataservice.domain.mqtt.OperationMessage;
  11. import com.zswl.dataservice.domain.other.ExecuteMethodInfo;
  12. import com.zswl.dataservice.model.artemis.OperationMessageModel;
  13. import com.zswl.dataservice.model.artemis.OperationMessageSearchParam;
  14. import com.zswl.dataservice.model.mqtt.SendMessageModel;
  15. import com.zswl.dataservice.service.base.SuperService;
  16. import com.zswl.dataservice.service.mqtt.DeviceInfoService;
  17. import com.zswl.dataservice.service.mqtt.GateWayInfoService;
  18. import com.zswl.dataservice.service.payment.HxzService;
  19. import com.zswl.dataservice.type.OperationType;
  20. import com.zswl.dataservice.utils.DateUtils;
  21. import com.zswl.dataservice.utils.bean.BeanUtils;
  22. import com.zswl.dataservice.utils.mqtt.MqttTopicUtils;
  23. import com.zswl.dataservice.utils.mqtt.mqttConfig.client.MQClient;
  24. import com.zswl.dataservice.utils.page.PageEntityUtil;
  25. import com.zswl.dataservice.utils.result.ResultContent;
  26. import jakarta.jms.Message;
  27. import jakarta.jms.TextMessage;
  28. import lombok.extern.slf4j.Slf4j;
  29. import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage;
  30. import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
  31. import org.apache.commons.lang3.ObjectUtils;
  32. import org.apache.commons.lang3.StringUtils;
  33. import org.springframework.beans.factory.annotation.Autowired;
  34. import org.springframework.beans.factory.annotation.Value;
  35. import org.springframework.context.ApplicationContext;
  36. import org.springframework.data.domain.Page;
  37. import org.springframework.data.domain.Pageable;
  38. import org.springframework.scheduling.annotation.Async;
  39. import org.springframework.stereotype.Service;
  40. import java.lang.reflect.Method;
  41. import java.util.Date;
  42. import java.util.UUID;
  43. /**
  44. * 指令数据管理
  45. *
  46. * @author TRX
  47. * @date 2024/5/14
  48. */
  49. @Slf4j
  50. @Service
  51. public class OperationMessageService {
  52. @Autowired
  53. OperationMessageDao operationMessageDao;
  54. @Autowired
  55. DeviceInfoService deviceInfoService;
  56. @Autowired
  57. GateWayInfoService gateWayInfoService;
  58. @Autowired
  59. MQClient mqClient;
  60. @Autowired
  61. DeviceInfoDao deviceInfoDao;
  62. @Autowired
  63. HxzService hxzService;
  64. // 保存90天
  65. @Value("${artemisstore.time}")
  66. private Long ttlMill = 90 * 24L * 60 * 60 * 1000L;
  67. @Autowired
  68. private ExecuteMethodInfoDao executeMethodInfoDao;
  69. @Autowired
  70. private ApplicationContext applicationContext;
  71. /**
  72. * 发送指令
  73. *
  74. * @param param
  75. * @return
  76. */
  77. public ResultContent sendMessage(SendMessageModel param) {
  78. String msg = "发送成功";
  79. try {
  80. JSONObject jsonObject = new JSONObject();
  81. jsonObject.set("id", UUID.randomUUID().toString());
  82. jsonObject.set("data", param.getMessage());
  83. jsonObject.set("timeStr", DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
  84. jsonObject.set("time", System.currentTimeMillis());
  85. jsonObject.set("ttl", 10 * 1000);
  86. mqClient.sendObject(param.getTopic(), jsonObject.toString());
  87. log.info("mqtt msg 发送成功");
  88. } catch (Exception e) {
  89. e.printStackTrace();
  90. msg = "发送失败: " + e.getMessage();
  91. }
  92. return ResultContent.buildSuccess(msg);
  93. }
  94. /**
  95. * 给设备下发指令
  96. *
  97. * @param deviceId 设备ID
  98. * @param command 指令,如:on,off
  99. * @param data 指令数据
  100. * @return
  101. */
  102. public ResultContent sendMessage(String deviceId, String command, JSONObject data) {
  103. DeviceInfo deviceInfo = deviceInfoDao.findTopById(deviceId);
  104. if (ObjectUtils.isEmpty(deviceInfo)) {
  105. return ResultContent.buildFail(String.format("设备不存在:%s", deviceId));
  106. }
  107. try {
  108. // 消息的TTL时间
  109. Long ttl = 10 * 1000L;
  110. // 消息的ID
  111. String messageId = "";
  112. OperationMessage message = new OperationMessage();
  113. if (data.containsKey("messageId")) {
  114. messageId = data.getStr("messageId");
  115. } else {
  116. messageId = UUID.randomUUID().toString();
  117. data.append("messageId", messageId);
  118. }
  119. String topic = MqttTopicUtils.buildDeviceOperationInstructionsTopic(deviceId, command);
  120. message.setMessageId(messageId);
  121. message.setData(data);
  122. message.setTopic(topic);
  123. message.setDeviceId(deviceId);
  124. message.setDeviceInfo(deviceInfo);
  125. message.setIsReceive(Boolean.FALSE);
  126. message.setTtlTime(ttl);
  127. message.setTime(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
  128. JsonObject jsonObject = new JsonObject();
  129. jsonObject.addProperty("id", messageId);
  130. jsonObject.addProperty("data", data.toString());
  131. jsonObject.addProperty("time", System.currentTimeMillis());
  132. jsonObject.addProperty("ttl", ttl);
  133. mqClient.sendObject(topic, jsonObject.toString());
  134. log.info("mqtt msg 发送成功");
  135. operationMessageDao.save(message);
  136. } catch (Exception e) {
  137. e.printStackTrace();
  138. }
  139. return ResultContent.buildSuccess();
  140. }
  141. /**
  142. * 处理网关MQTT信息
  143. *
  144. * @param message
  145. */
  146. public void handlerGateWayMessage(Message message) {
  147. // 处理接收到的消息
  148. try {
  149. ActiveMQTopic activeMQTopic = (ActiveMQTopic) message.getJMSDestination();
  150. String topicName = activeMQTopic.getTopicName();
  151. if (topicName.equals("activemq.notifications")) {
  152. return;
  153. }
  154. String messageId = message.getJMSMessageID();
  155. String clientId = message.getStringProperty("__AMQ_CID");
  156. log.info("receiveMessage {} 消息监听clientId: {}", messageId, clientId);
  157. log.info("Topic: {}", topicName);
  158. if (StringUtils.isNotEmpty(topicName) && topicName.endsWith("reply")) {
  159. // 这是响应的数据,不处理
  160. log.warn("回复消息不处理");
  161. return;
  162. }
  163. OperationMessage operationMessage = new OperationMessage();
  164. operationMessage.setMessageId(messageId);
  165. operationMessage.setClientId(clientId);
  166. operationMessage.setTopic(topicName);
  167. operationMessage.setOperationType(OperationType.Sub);
  168. String messageClass = "";
  169. String msg = "";
  170. if (message instanceof ActiveMQBytesMessage) {
  171. ActiveMQBytesMessage activeMQBytesMessage = (ActiveMQBytesMessage) message;
  172. messageClass = ActiveMQBytesMessage.class.getSimpleName();
  173. byte[] messageBody = new byte[(int) activeMQBytesMessage.getBodyLength()];
  174. // 读取消息内容到字节数组
  175. activeMQBytesMessage.readBytes(messageBody);
  176. msg = new String(messageBody);
  177. }
  178. if (message instanceof TextMessage) {
  179. messageClass = TextMessage.class.getSimpleName();
  180. TextMessage textMessage = (TextMessage) message;
  181. msg = textMessage.getText();
  182. }
  183. cn.hutool.json.JSONObject jsonObject = JSONUtil.parseObj(msg);
  184. String id = jsonObject.getStr("id");
  185. Long time = jsonObject.getLong("time");
  186. Long ttl = jsonObject.getLong("ttl");
  187. String event = jsonObject.getStr("event");
  188. boolean isTimeOut = false;
  189. if (System.currentTimeMillis() > (time + ttl)) {
  190. isTimeOut = true;
  191. }
  192. log.info("textMessage: {}", msg);
  193. String timeStr = DateUtils.paresTime(time, DateUtils.patternyyyySSS);
  194. jsonObject.set("timeStr", timeStr);
  195. // --------------------处理业务 start------------------
  196. operationMessage.setMessageClass(messageClass);
  197. operationMessage.setData(jsonObject);
  198. operationMessage.setTtlTime(ttl);
  199. operationMessage.setSendTime(time);
  200. operationMessage.setDataId(id);
  201. operationMessage.setIsTimeOut(isTimeOut);
  202. operationMessage.setEvent(event);
  203. addOperationMessage(operationMessage);
  204. } catch (Exception e) {
  205. e.printStackTrace();
  206. }
  207. }
  208. /**
  209. * 添加
  210. *
  211. * @param entity
  212. * @return
  213. */
  214. public ResultContent addOperationMessage(OperationMessage entity) {
  215. entity.setTime(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
  216. entity.setTtl(new Date(System.currentTimeMillis() + ttlMill));
  217. if (entity.getIsTimeOut()) {
  218. entity.setHandleMsg("超时不处理");
  219. }
  220. boolean isTimeOut = entity.getIsTimeOut();
  221. isTimeOut = true;
  222. // 判断业务数据ID是否存在,不要重复处理
  223. boolean isExit = operationMessageDao.existsByDataId(entity.getDataId());
  224. // TODO
  225. if (isTimeOut) {
  226. // 未超时,处理消息
  227. handleOperationMessage(entity);
  228. } else {
  229. // 超时
  230. operationMessageDao.save(entity);
  231. }
  232. return ResultContent.buildSuccess();
  233. }
  234. /**
  235. * 处理消息
  236. *
  237. * @param entity
  238. * @return
  239. */
  240. public ResultContent handleOperationMessage(OperationMessage entity) {
  241. String event = entity.getEvent();
  242. JSONObject json = (JSONObject) entity.getData();
  243. if (json.containsKey("data")) {
  244. Object result = null;
  245. StopWatch stopWatch = new StopWatch();
  246. stopWatch.start();
  247. boolean isHandleSuccess = true;
  248. String handleMsg = "处理成功";
  249. try {
  250. ExecuteMethodInfo executeMethodInfo = executeMethodInfoDao.findTopByEvent(event);
  251. if (ObjectUtils.isNotEmpty(executeMethodInfo)) {
  252. String dataStr = json.getStr("data");
  253. String beanName = executeMethodInfo.getBeanName();
  254. String methodName = executeMethodInfo.getMethodName();
  255. Class c = applicationContext.getBean(beanName).getClass();
  256. SuperService t = (SuperService) applicationContext.getBean(beanName);
  257. Method method = c.getMethod(methodName, String.class);
  258. ResultContent<Object> resultContent = (ResultContent<Object>) method.invoke(t, dataStr);
  259. if (resultContent.isSuccess()) {
  260. result = resultContent.getContent();
  261. } else {
  262. isHandleSuccess = false;
  263. handleMsg = resultContent.getMsg();
  264. }
  265. entity.setBeanName(beanName);
  266. entity.setMethodName(methodName);
  267. } else {
  268. isHandleSuccess = false;
  269. handleMsg = "消息处理方法未找到";
  270. }
  271. } catch (Exception e) {
  272. e.printStackTrace();
  273. isHandleSuccess = false;
  274. handleMsg = String.format("业务处理出错:%S", e.getMessage());
  275. }
  276. stopWatch.stop();
  277. entity.setHandlerTime(stopWatch.getLastTaskTimeMillis());
  278. // 返回结果
  279. entity.setResultData(result);
  280. // 业务处理失败
  281. entity.setIsHandleSuccess(isHandleSuccess);
  282. entity.setHandleMsg(handleMsg);
  283. if (isHandleSuccess) {
  284. // 处理成功,返回响应
  285. responseMessage(entity);
  286. } else {
  287. // 处理失败,记录数据
  288. entity.setReTime(System.currentTimeMillis());
  289. entity.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
  290. operationMessageDao.save(entity);
  291. return ResultContent.buildFail(handleMsg);
  292. }
  293. }
  294. return ResultContent.buildSuccess();
  295. }
  296. /**
  297. * 响应数据
  298. *
  299. * @param entity
  300. * @return
  301. */
  302. public ResultContent responseMessage(OperationMessage entity) {
  303. com.alibaba.fastjson.JSONObject jsonObject = new com.alibaba.fastjson.JSONObject();
  304. jsonObject.put("id", entity.getDataId());
  305. Object data = entity.getResultData();
  306. JSONObject object = new JSONObject();
  307. if (ObjectUtils.isNotEmpty(data)) {
  308. String _str = JSONUtil.toJsonStr(data);
  309. object = JSONUtil.parseObj(_str);
  310. }
  311. jsonObject.put("data", object);
  312. jsonObject.put("time", System.currentTimeMillis());
  313. jsonObject.put("ttl", entity.getTtlTime());
  314. jsonObject.put("event", entity.getEvent());
  315. String reTopic = String.format("%s/reply", entity.getTopic());
  316. String reMsg = "响应成功";
  317. Boolean reIsSuccess = Boolean.TRUE;
  318. try {
  319. mqClient.sendObject(reTopic, JSONUtil.toJsonStr(jsonObject));
  320. } catch (Exception e) {
  321. e.printStackTrace();
  322. reIsSuccess = Boolean.FALSE;
  323. reMsg = "mqtt响应出错:" + e.getMessage();
  324. }
  325. entity.setIsResult(Boolean.TRUE);
  326. entity.setReIsSuccess(reIsSuccess);
  327. entity.setReMsg(reMsg);
  328. entity.setReTime(System.currentTimeMillis());
  329. entity.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
  330. entity.setReTopic(reTopic);
  331. operationMessageDao.save(entity);
  332. return ResultContent.buildSuccess();
  333. }
  334. /**
  335. * 指令列表
  336. *
  337. * @param pageable
  338. * @param param
  339. * @return
  340. */
  341. public ResultContent<Page<OperationMessageModel>> page(Pageable pageable, OperationMessageSearchParam param) {
  342. Page<OperationMessage> page = operationMessageDao.page(pageable, param);
  343. return ResultContent.buildSuccess(PageEntityUtil.toPageModel(page, this::toModel));
  344. }
  345. /**
  346. * 标记消息已收到
  347. *
  348. * @param messageId
  349. * @return
  350. */
  351. public ResultContent receiveMessage(String messageId) {
  352. OperationMessage operationMessage = operationMessageDao.findTopByMessageId(messageId);
  353. if (ObjectUtils.isEmpty(operationMessage)) {
  354. return ResultContent.buildFail(String.format("%s指令信息不存在", messageId));
  355. }
  356. if (operationMessage.getIsReceive() != null && operationMessage.getIsReceive()) {
  357. return ResultContent.buildFail("该指令已响应");
  358. }
  359. operationMessage.setIsReceive(Boolean.TRUE);
  360. operationMessage.setReceiveTime(System.currentTimeMillis());
  361. operationMessage.setReceiveTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
  362. operationMessageDao.save(operationMessage);
  363. return ResultContent.buildSuccess();
  364. }
  365. public OperationMessageModel toModel(OperationMessage entity) {
  366. OperationMessageModel model = new OperationMessageModel();
  367. if (ObjectUtils.isNotEmpty(entity)) {
  368. BeanUtils.copyProperties(entity, model);
  369. model.setDeviceInfo(deviceInfoService.toModel(entity.getDeviceInfo()));
  370. model.setGateWayInfo(gateWayInfoService.toModel(entity.getGateWayInfo()));
  371. }
  372. return model;
  373. }
  374. }