OperationMessageService.java 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. package com.zswl.dataservice.service.artemis;
  2. import cn.hutool.core.date.StopWatch;
  3. import cn.hutool.json.JSONObject;
  4. import cn.hutool.json.JSONUtil;
  5. import com.google.gson.JsonObject;
  6. import com.zswl.dataservice.dao.mqtt.DeviceInfoDao;
  7. import com.zswl.dataservice.dao.mqtt.OperationMessageDao;
  8. import com.zswl.dataservice.dao.other.ExecuteMethodInfoDao;
  9. import com.zswl.dataservice.domain.mqtt.DeviceInfo;
  10. import com.zswl.dataservice.domain.mqtt.OperationMessage;
  11. import com.zswl.dataservice.domain.other.ExecuteMethodInfo;
  12. import com.zswl.dataservice.model.artemis.OperationMessageModel;
  13. import com.zswl.dataservice.model.artemis.OperationMessageSearchParam;
  14. import com.zswl.dataservice.model.mqtt.SendMessageModel;
  15. import com.zswl.dataservice.service.base.SuperService;
  16. import com.zswl.dataservice.service.mqtt.DeviceInfoService;
  17. import com.zswl.dataservice.service.mqtt.GateWayInfoService;
  18. import com.zswl.dataservice.service.payment.HxzService;
  19. import com.zswl.dataservice.type.OperationType;
  20. import com.zswl.dataservice.utils.DateUtils;
  21. import com.zswl.dataservice.utils.TokenUtil;
  22. import com.zswl.dataservice.utils.bean.BeanUtils;
  23. import com.zswl.dataservice.utils.mqtt.MqttTopicUtils;
  24. import com.zswl.dataservice.utils.mqtt.mqttConfig.client.MQClient;
  25. import com.zswl.dataservice.utils.page.PageEntityUtil;
  26. import com.zswl.dataservice.utils.result.ResultContent;
  27. import jakarta.jms.Message;
  28. import jakarta.jms.TextMessage;
  29. import lombok.extern.slf4j.Slf4j;
  30. import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage;
  31. import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
  32. import org.apache.commons.lang3.ObjectUtils;
  33. import org.apache.commons.lang3.StringUtils;
  34. import org.springframework.beans.factory.annotation.Autowired;
  35. import org.springframework.beans.factory.annotation.Value;
  36. import org.springframework.context.ApplicationContext;
  37. import org.springframework.data.domain.Page;
  38. import org.springframework.data.domain.Pageable;
  39. import org.springframework.scheduling.annotation.Async;
  40. import org.springframework.stereotype.Service;
  41. import java.lang.reflect.Method;
  42. import java.util.Date;
  43. import java.util.UUID;
  44. /**
  45. * 指令数据管理
  46. *
  47. * @author TRX
  48. * @date 2024/5/14
  49. */
  50. @Slf4j
  51. @Service
  52. public class OperationMessageService {
  53. @Autowired
  54. OperationMessageDao operationMessageDao;
  55. @Autowired
  56. DeviceInfoService deviceInfoService;
  57. @Autowired
  58. GateWayInfoService gateWayInfoService;
  59. @Autowired
  60. MQClient mqClient;
  61. @Autowired
  62. DeviceInfoDao deviceInfoDao;
  63. @Autowired
  64. HxzService hxzService;
  65. // 保存90天
  66. @Value("${artemisstore.time}")
  67. private Long ttlMill = 90 * 24L * 60 * 60 * 1000L;
  68. @Autowired
  69. private ExecuteMethodInfoDao executeMethodInfoDao;
  70. @Autowired
  71. private ApplicationContext applicationContext;
  72. /**
  73. * 发送指令
  74. *
  75. * @param param
  76. * @return
  77. */
  78. public ResultContent sendMessage(SendMessageModel param) {
  79. String msg = "发送成功";
  80. try {
  81. JSONObject jsonObject = new JSONObject();
  82. jsonObject.set("id", UUID.randomUUID().toString());
  83. jsonObject.set("data", param.getMessage());
  84. jsonObject.set("timeStr", DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
  85. jsonObject.set("time", System.currentTimeMillis());
  86. jsonObject.set("ttl", 10 * 1000);
  87. mqClient.sendObject(param.getTopic(), jsonObject.toString());
  88. log.info("mqtt msg 发送成功");
  89. } catch (Exception e) {
  90. e.printStackTrace();
  91. msg = "发送失败: " + e.getMessage();
  92. }
  93. return ResultContent.buildSuccess(msg);
  94. }
  95. /**
  96. * 给设备下发指令
  97. *
  98. * @param deviceId 设备ID
  99. * @param command 指令,如:on,off
  100. * @param data 指令数据
  101. * @return
  102. */
  103. public ResultContent sendMessage(String deviceId, String command, JSONObject data) {
  104. DeviceInfo deviceInfo = deviceInfoDao.findTopById(deviceId);
  105. if (ObjectUtils.isEmpty(deviceInfo)) {
  106. return ResultContent.buildFail(String.format("设备不存在:%s", deviceId));
  107. }
  108. try {
  109. // 消息的TTL时间
  110. Long ttl = 10 * 1000L;
  111. // 消息的ID
  112. String messageId = "";
  113. OperationMessage message = new OperationMessage();
  114. if (data.containsKey("messageId")) {
  115. messageId = data.getStr("messageId");
  116. } else {
  117. messageId = UUID.randomUUID().toString();
  118. data.append("messageId", messageId);
  119. }
  120. String topic = MqttTopicUtils.buildDeviceOperationInstructionsTopic(deviceId, command);
  121. message.setMessageId(messageId);
  122. message.setData(data);
  123. message.setTopic(topic);
  124. message.setDeviceId(deviceId);
  125. message.setDeviceInfo(deviceInfo);
  126. message.setIsReceive(Boolean.FALSE);
  127. message.setTtlTime(ttl);
  128. message.setTime(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
  129. JsonObject jsonObject = new JsonObject();
  130. jsonObject.addProperty("id", messageId);
  131. jsonObject.addProperty("data", data.toString());
  132. jsonObject.addProperty("time", System.currentTimeMillis());
  133. jsonObject.addProperty("ttl", ttl);
  134. mqClient.sendObject(topic, jsonObject.toString());
  135. log.info("mqtt msg 发送成功");
  136. operationMessageDao.save(message);
  137. } catch (Exception e) {
  138. e.printStackTrace();
  139. }
  140. return ResultContent.buildSuccess();
  141. }
  142. /**
  143. * /v1/gateway/#
  144. * 处理网关MQTT信息
  145. *
  146. * @param message
  147. */
  148. public void handlerGateWayMessage(Message message) {
  149. // 处理接收到的消息
  150. try {
  151. ActiveMQTopic activeMQTopic = (ActiveMQTopic) message.getJMSDestination();
  152. String topicName = activeMQTopic.getTopicName();
  153. if (topicName.equals("activemq.notifications")) {
  154. return;
  155. }
  156. String messageId = message.getJMSMessageID();
  157. String clientId = message.getStringProperty("__AMQ_CID");
  158. // log.info("receiveMessage {} 消息监听clientId: {}", messageId, clientId);
  159. if (StringUtils.isNotEmpty(topicName) && topicName.endsWith("reply")) {
  160. // 这是响应的数据,不处理
  161. // log.warn("回复消息不处理");
  162. return;
  163. }
  164. log.info("Topic: {}", topicName);
  165. OperationMessage operationMessage = new OperationMessage();
  166. operationMessage.setMessageId(messageId);
  167. operationMessage.setClientId(clientId);
  168. operationMessage.setTopic(topicName);
  169. operationMessage.setOperationType(OperationType.Sub);
  170. String messageClass = "";
  171. String msg = "";
  172. if (message instanceof ActiveMQBytesMessage) {
  173. ActiveMQBytesMessage activeMQBytesMessage = (ActiveMQBytesMessage) message;
  174. messageClass = ActiveMQBytesMessage.class.getSimpleName();
  175. byte[] messageBody = new byte[(int) activeMQBytesMessage.getBodyLength()];
  176. // 读取消息内容到字节数组
  177. activeMQBytesMessage.readBytes(messageBody);
  178. msg = new String(messageBody);
  179. }
  180. if (message instanceof TextMessage) {
  181. messageClass = TextMessage.class.getSimpleName();
  182. TextMessage textMessage = (TextMessage) message;
  183. msg = textMessage.getText();
  184. }
  185. cn.hutool.json.JSONObject jsonObject = JSONUtil.parseObj(msg);
  186. String id = jsonObject.getStr("id");
  187. Long time = jsonObject.getLong("time");
  188. Long ttl = jsonObject.getLong("ttl");
  189. String event = jsonObject.getStr("event");
  190. String gateWayId = jsonObject.getStr("gatewayId");
  191. boolean isTimeOut = false;
  192. if (System.currentTimeMillis() > (time + ttl)) {
  193. isTimeOut = true;
  194. }
  195. // log.info("textMessage: {}", msg);
  196. String timeStr = DateUtils.paresTime(time, DateUtils.patternyyyySSS);
  197. jsonObject.set("timeStr", timeStr);
  198. // --------------------处理业务 start------------------
  199. operationMessage.setMessageClass(messageClass);
  200. operationMessage.setData(jsonObject);
  201. operationMessage.setTtlTime(ttl);
  202. operationMessage.setSendTime(time);
  203. operationMessage.setDataId(id);
  204. operationMessage.setIsTimeOut(isTimeOut);
  205. operationMessage.setEvent(event);
  206. operationMessage.setGateWayId(gateWayId);
  207. initAddOperationMessage(operationMessage);
  208. } catch (Exception e) {
  209. e.printStackTrace();
  210. }
  211. }
  212. private ResultContent initAddOperationMessage(OperationMessage entity) {
  213. // 判断DataId 是否存在,存在就不处理
  214. String dataId = entity.getDataId();
  215. try {
  216. String token = TokenUtil.create();
  217. OperationMessage temp = operationMessageDao.init(dataId, token);
  218. if (ObjectUtils.isNotEmpty(temp)) {
  219. log.info("获取到执行锁,开始执行");
  220. entity.setId(temp.getId());
  221. entity.setToken(temp.getToken());
  222. addOperationMessage(entity);
  223. } else {
  224. log.warn("未获取到执行锁,跳过执行");
  225. }
  226. } catch (Exception e) {
  227. log.error("错误: {}", e.getMessage());
  228. }
  229. return ResultContent.buildSuccess();
  230. }
  231. /**
  232. * 添加
  233. *
  234. * @param entity
  235. * @return
  236. */
  237. public ResultContent addOperationMessage(OperationMessage entity) {
  238. entity.setTime(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
  239. entity.setTtl(new Date(System.currentTimeMillis() + ttlMill));
  240. if (entity.getIsTimeOut()) {
  241. entity.setHandleMsg("超时不处理");
  242. }
  243. boolean isTimeOut = entity.getIsTimeOut();
  244. isTimeOut = true;
  245. // TODO
  246. if (isTimeOut) {
  247. // 未超时,处理消息
  248. handleOperationMessage(entity);
  249. } else {
  250. // 超时
  251. operationMessageDao.save(entity);
  252. }
  253. return ResultContent.buildSuccess();
  254. }
  255. /**
  256. * 处理消息
  257. *
  258. * @param entity
  259. * @return
  260. */
  261. public ResultContent handleOperationMessage(OperationMessage entity) {
  262. String event = entity.getEvent();
  263. JSONObject json = (JSONObject) entity.getData();
  264. if (json.containsKey("data")) {
  265. Object result = null;
  266. StopWatch stopWatch = new StopWatch();
  267. stopWatch.start();
  268. boolean isHandleSuccess = true;
  269. String handleMsg = "处理成功";
  270. JSONObject jsonObject1 = (JSONObject) json.get("data");
  271. jsonObject1.put("mqttDataId", entity.getDataId());
  272. jsonObject1.put("GateWayId", entity.getGateWayId());
  273. String DeviceId = jsonObject1.getStr("DeviceId");
  274. entity.setDeviceId(DeviceId);
  275. try {
  276. ExecuteMethodInfo executeMethodInfo = executeMethodInfoDao.findTopByEvent(event);
  277. if (ObjectUtils.isNotEmpty(executeMethodInfo)) {
  278. String dataStr = JSONUtil.toJsonStr(jsonObject1);
  279. String beanName = executeMethodInfo.getBeanName();
  280. String methodName = executeMethodInfo.getMethodName();
  281. Class c = applicationContext.getBean(beanName).getClass();
  282. SuperService t = (SuperService) applicationContext.getBean(beanName);
  283. Method method = c.getMethod(methodName, String.class);
  284. ResultContent<Object> resultContent = (ResultContent<Object>) method.invoke(t, dataStr);
  285. if (resultContent.isSuccess()) {
  286. result = resultContent.getContent();
  287. } else {
  288. isHandleSuccess = false;
  289. handleMsg = resultContent.getMsg();
  290. }
  291. entity.setBeanName(beanName);
  292. entity.setMethodName(methodName);
  293. } else {
  294. isHandleSuccess = false;
  295. handleMsg = "消息处理方法未找到";
  296. }
  297. } catch (Exception e) {
  298. e.printStackTrace();
  299. isHandleSuccess = false;
  300. handleMsg = String.format("业务处理出错:%S", e.getMessage());
  301. }
  302. stopWatch.stop();
  303. entity.setHandlerTime(stopWatch.getLastTaskTimeMillis());
  304. // 返回结果
  305. entity.setResultData(result);
  306. // 业务处理失败
  307. entity.setIsHandleSuccess(isHandleSuccess);
  308. entity.setHandleMsg(handleMsg);
  309. log.info("消息处理结果: {} {}", isHandleSuccess, handleMsg);
  310. if (isHandleSuccess) {
  311. // 处理成功,返回响应
  312. responseMessage(entity);
  313. } else {
  314. // 处理失败,记录数据
  315. entity.setReTime(System.currentTimeMillis());
  316. entity.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
  317. operationMessageDao.save(entity);
  318. return ResultContent.buildFail(handleMsg);
  319. }
  320. }
  321. return ResultContent.buildSuccess();
  322. }
  323. /**
  324. * 响应数据
  325. *
  326. * @param entity
  327. * @return
  328. */
  329. public ResultContent responseMessage(OperationMessage entity) {
  330. com.alibaba.fastjson.JSONObject jsonObject = new com.alibaba.fastjson.JSONObject();
  331. jsonObject.put("id", entity.getDataId());
  332. Object data = entity.getResultData();
  333. JSONObject object = new JSONObject();
  334. if (ObjectUtils.isNotEmpty(data)) {
  335. String _str = JSONUtil.toJsonStr(data);
  336. object = JSONUtil.parseObj(_str);
  337. }
  338. jsonObject.put("data", object);
  339. jsonObject.put("time", System.currentTimeMillis());
  340. jsonObject.put("ttl", entity.getTtlTime());
  341. jsonObject.put("event", entity.getEvent());
  342. String reTopic = String.format("%s/reply", entity.getTopic());
  343. String reMsg = "响应成功";
  344. Boolean reIsSuccess = Boolean.TRUE;
  345. try {
  346. mqClient.sendObject(reTopic, JSONUtil.toJsonStr(jsonObject), entity.getDataId());
  347. } catch (Exception e) {
  348. e.printStackTrace();
  349. reIsSuccess = Boolean.FALSE;
  350. reMsg = "mqtt响应出错:" + e.getMessage();
  351. }
  352. entity.setIsResult(Boolean.TRUE);
  353. entity.setReIsSuccess(reIsSuccess);
  354. entity.setReMsg(reMsg);
  355. entity.setReTime(System.currentTimeMillis());
  356. entity.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
  357. entity.setReTopic(reTopic);
  358. operationMessageDao.save(entity);
  359. return ResultContent.buildSuccess();
  360. }
  361. /**
  362. * 指令列表
  363. *
  364. * @param pageable
  365. * @param param
  366. * @return
  367. */
  368. public ResultContent<Page<OperationMessageModel>> page(Pageable pageable, OperationMessageSearchParam param) {
  369. Page<OperationMessage> page = operationMessageDao.page(pageable, param);
  370. return ResultContent.buildSuccess(PageEntityUtil.toPageModel(page, this::toModel));
  371. }
  372. /**
  373. * 标记消息已收到
  374. *
  375. * @param messageId
  376. * @return
  377. */
  378. public ResultContent receiveMessage(String messageId) {
  379. OperationMessage operationMessage = operationMessageDao.findTopByMessageId(messageId);
  380. if (ObjectUtils.isEmpty(operationMessage)) {
  381. return ResultContent.buildFail(String.format("%s指令信息不存在", messageId));
  382. }
  383. if (operationMessage.getIsReceive() != null && operationMessage.getIsReceive()) {
  384. return ResultContent.buildFail("该指令已响应");
  385. }
  386. operationMessage.setIsReceive(Boolean.TRUE);
  387. operationMessage.setReceiveTime(System.currentTimeMillis());
  388. operationMessage.setReceiveTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
  389. operationMessageDao.save(operationMessage);
  390. return ResultContent.buildSuccess();
  391. }
  392. public OperationMessageModel toModel(OperationMessage entity) {
  393. OperationMessageModel model = new OperationMessageModel();
  394. if (ObjectUtils.isNotEmpty(entity)) {
  395. BeanUtils.copyProperties(entity, model);
  396. model.setDeviceInfo(deviceInfoService.toModel(entity.getDeviceInfo()));
  397. model.setGateWayInfo(gateWayInfoService.toModel(entity.getGateWayInfo()));
  398. }
  399. return model;
  400. }
  401. }