OperationMessageService.java 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. package com.zswl.dataservice.service.artemis;
  2. import cn.hutool.json.JSONObject;
  3. import cn.hutool.json.JSONUtil;
  4. import com.google.gson.JsonObject;
  5. import com.zswl.dataservice.dao.mqtt.DeviceInfoDao;
  6. import com.zswl.dataservice.dao.mqtt.OperationMessageDao;
  7. import com.zswl.dataservice.dao.other.ExecuteMethodInfoDao;
  8. import com.zswl.dataservice.domain.mqtt.DeviceInfo;
  9. import com.zswl.dataservice.domain.mqtt.OperationMessage;
  10. import com.zswl.dataservice.domain.other.ExecuteMethodInfo;
  11. import com.zswl.dataservice.helper.ApplicationContextHolder;
  12. import com.zswl.dataservice.model.hxz.ConsumTransactionsModel;
  13. import com.zswl.dataservice.model.mqtt.*;
  14. import com.zswl.dataservice.service.base.SuperService;
  15. import com.zswl.dataservice.service.mqtt.DeviceInfoService;
  16. import com.zswl.dataservice.service.mqtt.GateWayInfoService;
  17. import com.zswl.dataservice.service.payment.HxzService;
  18. import com.zswl.dataservice.utils.DateUtils;
  19. import com.zswl.dataservice.utils.bean.BeanUtils;
  20. import com.zswl.dataservice.utils.mqtt.MqttTopicUtils;
  21. import com.zswl.dataservice.utils.mqtt.mqttConfig.client.MQClient;
  22. import com.zswl.dataservice.utils.page.PageEntityUtil;
  23. import com.zswl.dataservice.utils.result.ResultContent;
  24. import lombok.extern.slf4j.Slf4j;
  25. import org.apache.commons.lang3.ObjectUtils;
  26. import org.springframework.beans.factory.annotation.Autowired;
  27. import org.springframework.context.ApplicationContext;
  28. import org.springframework.data.domain.Page;
  29. import org.springframework.data.domain.Pageable;
  30. import org.springframework.stereotype.Service;
  31. import java.lang.reflect.Method;
  32. import java.util.Date;
  33. import java.util.UUID;
  34. /**
  35. * 指令数据管理
  36. *
  37. * @author TRX
  38. * @date 2024/5/14
  39. */
  40. @Slf4j
  41. @Service
  42. public class OperationMessageService {
  43. @Autowired
  44. OperationMessageDao operationMessageDao;
  45. @Autowired
  46. DeviceInfoService deviceInfoService;
  47. @Autowired
  48. GateWayInfoService gateWayInfoService;
  49. @Autowired
  50. MQClient mqClient;
  51. @Autowired
  52. DeviceInfoDao deviceInfoDao;
  53. @Autowired
  54. HxzService hxzService;
  55. // 保存90天
  56. private Long ttlMill = 90 * 24L * 60 * 60 * 1000L;
  57. @Autowired
  58. private ExecuteMethodInfoDao executeMethodInfoDao;
  59. @Autowired
  60. private ApplicationContext applicationContext;
  61. /**
  62. * 给设备下发指令
  63. *
  64. * @param deviceId 设备ID
  65. * @param command 指令,如:on,off
  66. * @param data 指令数据
  67. * @return
  68. */
  69. public ResultContent sendMessage(String deviceId, String command, JSONObject data) {
  70. DeviceInfo deviceInfo = deviceInfoDao.findTopById(deviceId);
  71. if (ObjectUtils.isEmpty(deviceInfo)) {
  72. return ResultContent.buildFail(String.format("设备不存在:%s", deviceId));
  73. }
  74. try {
  75. // 消息的TTL时间
  76. Long ttl = 10 * 1000L;
  77. // 消息的ID
  78. String messageId = "";
  79. OperationMessage message = new OperationMessage();
  80. if (data.containsKey("messageId")) {
  81. messageId = data.getStr("messageId");
  82. } else {
  83. messageId = UUID.randomUUID().toString();
  84. data.append("messageId", messageId);
  85. }
  86. String topic = MqttTopicUtils.buildDeviceOperationInstructionsTopic(deviceId, command);
  87. message.setMessageId(messageId);
  88. message.setData(data);
  89. message.setTopic(topic);
  90. message.setDeviceId(deviceId);
  91. message.setDeviceInfo(deviceInfo);
  92. message.setIsReceive(Boolean.FALSE);
  93. message.setTtlTime(ttl);
  94. message.setTime(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
  95. JsonObject jsonObject = new JsonObject();
  96. jsonObject.addProperty("id", messageId);
  97. jsonObject.addProperty("data", data.toString());
  98. jsonObject.addProperty("time", System.currentTimeMillis());
  99. jsonObject.addProperty("ttl", ttl);
  100. mqClient.sendObject(topic, jsonObject.toString());
  101. log.info("mqtt msg 发送成功");
  102. operationMessageDao.save(message);
  103. } catch (Exception e) {
  104. e.printStackTrace();
  105. }
  106. return ResultContent.buildSuccess();
  107. }
  108. /**
  109. * 添加
  110. *
  111. * @param entity
  112. * @return
  113. */
  114. public ResultContent addOperationMessage(OperationMessage entity) {
  115. entity.setTime(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
  116. entity.setTtl(new Date(System.currentTimeMillis() + ttlMill));
  117. if (entity.getIsTimeOut()) {
  118. entity.setHandleMsg("超时不处理");
  119. }
  120. boolean isTimeOut = entity.getIsTimeOut();
  121. // TODO
  122. if (isTimeOut) {
  123. // 未超时,处理消息
  124. handleOperationMessage(entity);
  125. } else {
  126. // 超时
  127. operationMessageDao.save(entity);
  128. }
  129. return ResultContent.buildSuccess();
  130. }
  131. /**
  132. * 处理消息
  133. *
  134. * @param entity
  135. * @return
  136. */
  137. public ResultContent handleOperationMessage(OperationMessage entity) {
  138. String event = entity.getEvent();
  139. JSONObject json = (JSONObject) entity.getData();
  140. if (json.containsKey("data")) {
  141. Object result = null;
  142. boolean isHandleSuccess = true;
  143. String handleMsg = "处理成功";
  144. try {
  145. ExecuteMethodInfo executeMethodInfo = executeMethodInfoDao.findTopByEvent(event);
  146. if (ObjectUtils.isNotEmpty(executeMethodInfo)) {
  147. String dataStr = json.getStr("data");
  148. String beanName = executeMethodInfo.getBeanName();
  149. String methodName = executeMethodInfo.getMethodName();
  150. Class c = applicationContext.getBean(beanName).getClass();
  151. SuperService t = (SuperService) applicationContext.getBean(beanName);
  152. Method method = c.getMethod(methodName, String.class);
  153. ResultContent<Object> resultContent = (ResultContent<Object>) method.invoke(t, dataStr);
  154. if (resultContent.isSuccess()) {
  155. result = resultContent.getContent();
  156. } else {
  157. isHandleSuccess = false;
  158. handleMsg = resultContent.getMsg();
  159. }
  160. entity.setBeanName(beanName);
  161. entity.setMethodName(methodName);
  162. } else {
  163. isHandleSuccess = false;
  164. handleMsg = "消息处理方法未找到";
  165. }
  166. } catch (Exception e) {
  167. e.printStackTrace();
  168. isHandleSuccess = false;
  169. handleMsg = String.format("业务处理出错:%S", e.getMessage());
  170. }
  171. // 返回结果
  172. entity.setResultData(result);
  173. // 业务处理失败
  174. entity.setIsHandleSuccess(isHandleSuccess);
  175. entity.setHandleMsg(handleMsg);
  176. if (isHandleSuccess) {
  177. // 处理成功,返回响应
  178. responseMessage(entity);
  179. } else {
  180. // 处理失败,记录数据
  181. entity.setReTime(System.currentTimeMillis());
  182. entity.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
  183. operationMessageDao.save(entity);
  184. return ResultContent.buildFail(handleMsg);
  185. }
  186. }
  187. return ResultContent.buildSuccess();
  188. }
  189. /**
  190. * 响应数据
  191. *
  192. * @param entity
  193. * @return
  194. */
  195. public ResultContent responseMessage(OperationMessage entity) {
  196. com.alibaba.fastjson.JSONObject jsonObject = new com.alibaba.fastjson.JSONObject();
  197. jsonObject.put("id", entity.getDataId());
  198. Object data = entity.getResultData();
  199. JSONObject object = new JSONObject();
  200. if (ObjectUtils.isNotEmpty(data)) {
  201. String _str = JSONUtil.toJsonStr(data);
  202. object = JSONUtil.parseObj(_str);
  203. }
  204. jsonObject.put("data", object);
  205. jsonObject.put("time", System.currentTimeMillis());
  206. jsonObject.put("ttl", entity.getTtlTime());
  207. jsonObject.put("event", entity.getEvent());
  208. String reTopic = String.format("%s/reply", entity.getTopic());
  209. String reMsg = "响应成功";
  210. Boolean reIsSuccess = Boolean.TRUE;
  211. try {
  212. mqClient.sendObject(reTopic, JSONUtil.toJsonStr(jsonObject));
  213. } catch (Exception e) {
  214. e.printStackTrace();
  215. reIsSuccess = Boolean.FALSE;
  216. reMsg = "mqtt响应出错:" + e.getMessage();
  217. }
  218. entity.setIsResult(Boolean.TRUE);
  219. entity.setReIsSuccess(reIsSuccess);
  220. entity.setReMsg(reMsg);
  221. entity.setReTime(System.currentTimeMillis());
  222. entity.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
  223. entity.setReTopic(reTopic);
  224. operationMessageDao.save(entity);
  225. return ResultContent.buildSuccess();
  226. }
  227. /**
  228. * 指令列表
  229. *
  230. * @param pageable
  231. * @param param
  232. * @return
  233. */
  234. public ResultContent<Page<OperationMessageModel>> page(Pageable pageable, OperationMessageSearchParam param) {
  235. Page<OperationMessage> page = operationMessageDao.page(pageable, param);
  236. return ResultContent.buildSuccess(PageEntityUtil.toPageModel(page, this::toModel));
  237. }
  238. /**
  239. * 标记消息已收到
  240. *
  241. * @param messageId
  242. * @return
  243. */
  244. public ResultContent receiveMessage(String messageId) {
  245. OperationMessage operationMessage = operationMessageDao.findTopByMessageId(messageId);
  246. if (ObjectUtils.isEmpty(operationMessage)) {
  247. return ResultContent.buildFail(String.format("%s指令信息不存在", messageId));
  248. }
  249. if (operationMessage.getIsReceive() != null && operationMessage.getIsReceive()) {
  250. return ResultContent.buildFail("该指令已响应");
  251. }
  252. operationMessage.setIsReceive(Boolean.TRUE);
  253. operationMessage.setReceiveTime(System.currentTimeMillis());
  254. operationMessage.setReceiveTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
  255. operationMessageDao.save(operationMessage);
  256. return ResultContent.buildSuccess();
  257. }
  258. public OperationMessageModel toModel(OperationMessage entity) {
  259. OperationMessageModel model = new OperationMessageModel();
  260. if (ObjectUtils.isNotEmpty(entity)) {
  261. BeanUtils.copyProperties(entity, model);
  262. model.setDeviceInfo(deviceInfoService.toModel(entity.getDeviceInfo()));
  263. model.setGateWayInfo(gateWayInfoService.toModel(entity.getGateWayInfo()));
  264. }
  265. return model;
  266. }
  267. }