OperationMessageService.java 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. package com.zswl.dataservice.service.artemis;
  2. import cn.hutool.json.JSONObject;
  3. import cn.hutool.json.JSONUtil;
  4. import com.google.gson.JsonObject;
  5. import com.zswl.dataservice.dao.mqtt.DeviceInfoDao;
  6. import com.zswl.dataservice.dao.mqtt.OperationMessageDao;
  7. import com.zswl.dataservice.domain.mqtt.DeviceInfo;
  8. import com.zswl.dataservice.domain.mqtt.OperationMessage;
  9. import com.zswl.dataservice.model.hxz.ConsumTransactionsModel;
  10. import com.zswl.dataservice.model.mqtt.*;
  11. import com.zswl.dataservice.service.mqtt.DeviceInfoService;
  12. import com.zswl.dataservice.service.mqtt.GateWayInfoService;
  13. import com.zswl.dataservice.service.payment.HxzService;
  14. import com.zswl.dataservice.utils.DateUtils;
  15. import com.zswl.dataservice.utils.bean.BeanUtils;
  16. import com.zswl.dataservice.utils.mqtt.MqttTopicUtils;
  17. import com.zswl.dataservice.utils.mqtt.mqttConfig.client.MQClient;
  18. import com.zswl.dataservice.utils.page.PageEntityUtil;
  19. import com.zswl.dataservice.utils.result.ResultContent;
  20. import lombok.extern.slf4j.Slf4j;
  21. import org.apache.commons.lang3.ObjectUtils;
  22. import org.springframework.beans.factory.annotation.Autowired;
  23. import org.springframework.data.domain.Page;
  24. import org.springframework.data.domain.Pageable;
  25. import org.springframework.stereotype.Service;
  26. import java.util.Date;
  27. import java.util.UUID;
  28. /**
  29. * 指令数据管理
  30. *
  31. * @author TRX
  32. * @date 2024/5/14
  33. */
  34. @Slf4j
  35. @Service
  36. public class OperationMessageService {
  37. @Autowired
  38. OperationMessageDao operationMessageDao;
  39. @Autowired
  40. DeviceInfoService deviceInfoService;
  41. @Autowired
  42. GateWayInfoService gateWayInfoService;
  43. @Autowired
  44. MQClient mqClient;
  45. @Autowired
  46. DeviceInfoDao deviceInfoDao;
  47. @Autowired
  48. HxzService hxzService;
  49. // 保存90天
  50. private Long ttlMill = 90 * 24L * 60 * 60 * 1000L;
  51. /**
  52. * 给设备下发指令
  53. *
  54. * @param deviceId 设备ID
  55. * @param command 指令,如:on,off
  56. * @param data 指令数据
  57. * @return
  58. */
  59. public ResultContent sendMessage(String deviceId, String command, JSONObject data) {
  60. DeviceInfo deviceInfo = deviceInfoDao.findTopById(deviceId);
  61. if (ObjectUtils.isEmpty(deviceInfo)) {
  62. return ResultContent.buildFail(String.format("设备不存在:%s", deviceId));
  63. }
  64. try {
  65. // 消息的TTL时间
  66. Long ttl = 10 * 1000L;
  67. // 消息的ID
  68. String messageId = "";
  69. OperationMessage message = new OperationMessage();
  70. if (data.containsKey("messageId")) {
  71. messageId = data.getStr("messageId");
  72. } else {
  73. messageId = UUID.randomUUID().toString();
  74. data.append("messageId", messageId);
  75. }
  76. String topic = MqttTopicUtils.buildDeviceOperationInstructionsTopic(deviceId, command);
  77. message.setMessageId(messageId);
  78. message.setData(data);
  79. message.setTopic(topic);
  80. message.setDeviceId(deviceId);
  81. message.setDeviceInfo(deviceInfo);
  82. message.setIsReceive(Boolean.FALSE);
  83. message.setTtlTime(ttl);
  84. message.setTime(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
  85. JsonObject jsonObject = new JsonObject();
  86. jsonObject.addProperty("id", messageId);
  87. jsonObject.addProperty("data", data.toString());
  88. jsonObject.addProperty("time", System.currentTimeMillis());
  89. jsonObject.addProperty("ttl", ttl);
  90. mqClient.sendObject(topic, jsonObject.toString());
  91. log.info("mqtt msg 发送成功");
  92. operationMessageDao.save(message);
  93. } catch (Exception e) {
  94. e.printStackTrace();
  95. }
  96. return ResultContent.buildSuccess();
  97. }
  98. /**
  99. * 添加
  100. *
  101. * @param entity
  102. * @return
  103. */
  104. public ResultContent addOperationMessage(OperationMessage entity) {
  105. entity.setTime(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
  106. entity.setTtl(new Date(System.currentTimeMillis() + ttlMill));
  107. if(entity.getIsTimeOut()) {
  108. entity.setHandleMsg("超时不处理");
  109. }
  110. operationMessageDao.save(entity);
  111. boolean isTimeOut = entity.getIsTimeOut();
  112. if(!isTimeOut) {
  113. // 处理消息
  114. handleOperationMessage(entity);
  115. }
  116. return ResultContent.buildSuccess();
  117. }
  118. /**
  119. * 处理消息
  120. *
  121. * @param entity
  122. * @return
  123. */
  124. public ResultContent handleOperationMessage(OperationMessage entity) {
  125. String event = entity.getEvent();
  126. JSONObject json = (JSONObject) entity.getData();
  127. if (json.containsKey("data")) {
  128. Object result = null;
  129. String dataStr = json.getStr("data");
  130. boolean isHandleSuccess = true;
  131. String handleMsg = "处理成功";
  132. try {
  133. // 判断那个业务处理
  134. if (event.equals("consum")) {
  135. ConsumTransactionsModel model = JSONUtil.toBean(dataStr, ConsumTransactionsModel.class);
  136. ResultContent<Object> resultContent = hxzService.consumTransactions(model);
  137. if (resultContent.isSuccess()) {
  138. result = resultContent.getContent();
  139. }else {
  140. isHandleSuccess = false;
  141. handleMsg = resultContent.getMsg();
  142. }
  143. }
  144. } catch (Exception e) {
  145. e.printStackTrace();
  146. isHandleSuccess = false;
  147. handleMsg = String.format("业务处理出错:%S", e.getMessage());
  148. }
  149. // 返回结果
  150. entity.setResultData(result);
  151. // 业务处理失败
  152. entity.setIsHandleSuccess(isHandleSuccess);
  153. entity.setHandleMsg(handleMsg);
  154. if (isHandleSuccess) {
  155. // 处理成功,返回响应
  156. responseMessage(entity);
  157. }else {
  158. entity.setReTime(System.currentTimeMillis());
  159. entity.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
  160. operationMessageDao.save(entity);
  161. return ResultContent.buildFail(handleMsg);
  162. }
  163. }
  164. return ResultContent.buildSuccess();
  165. }
  166. /**
  167. * 响应数据
  168. *
  169. * @param entity
  170. * @return
  171. */
  172. public ResultContent responseMessage(OperationMessage entity) {
  173. com.alibaba.fastjson.JSONObject jsonObject = new com.alibaba.fastjson.JSONObject();
  174. jsonObject.put("id", entity.getDataId());
  175. Object data = entity.getResultData();
  176. JSONObject object = new JSONObject();
  177. if (ObjectUtils.isNotEmpty(data)) {
  178. String _str = JSONUtil.toJsonStr(data);
  179. object = JSONUtil.parseObj(_str);
  180. }
  181. jsonObject.put("data", object);
  182. jsonObject.put("time", System.currentTimeMillis());
  183. jsonObject.put("ttl", entity.getTtlTime());
  184. jsonObject.put("event", entity.getEvent());
  185. String reTopic = String.format("%s/reply", entity.getTopic());
  186. String reMsg = "响应成功";
  187. Boolean reIsSuccess = Boolean.TRUE;
  188. try {
  189. mqClient.sendObject(reTopic, JSONUtil.toJsonStr(jsonObject));
  190. } catch (Exception e) {
  191. e.printStackTrace();
  192. reIsSuccess = Boolean.FALSE;
  193. reMsg = "mqtt响应出错:" + e.getMessage();
  194. }
  195. entity.setIsResult(Boolean.TRUE);
  196. entity.setReIsSuccess(reIsSuccess);
  197. entity.setReMsg(reMsg);
  198. entity.setReTime(System.currentTimeMillis());
  199. entity.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
  200. entity.setReTopic(reTopic);
  201. operationMessageDao.save(entity);
  202. return ResultContent.buildSuccess();
  203. }
  204. /**
  205. * 指令列表
  206. *
  207. * @param pageable
  208. * @param param
  209. * @return
  210. */
  211. public ResultContent<Page<OperationMessageModel>> page(Pageable pageable, OperationMessageSearchParam param) {
  212. Page<OperationMessage> page = operationMessageDao.page(pageable, param);
  213. return ResultContent.buildSuccess(PageEntityUtil.toPageModel(page, this::toModel));
  214. }
  215. /**
  216. * 标记消息已收到
  217. *
  218. * @param messageId
  219. * @return
  220. */
  221. public ResultContent receiveMessage(String messageId) {
  222. OperationMessage operationMessage = operationMessageDao.findTopByMessageId(messageId);
  223. if (ObjectUtils.isEmpty(operationMessage)) {
  224. return ResultContent.buildFail(String.format("%s指令信息不存在", messageId));
  225. }
  226. if (operationMessage.getIsReceive() != null && operationMessage.getIsReceive()) {
  227. return ResultContent.buildFail("该指令已响应");
  228. }
  229. operationMessage.setIsReceive(Boolean.TRUE);
  230. operationMessage.setReceiveTime(System.currentTimeMillis());
  231. operationMessage.setReceiveTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
  232. operationMessageDao.save(operationMessage);
  233. return ResultContent.buildSuccess();
  234. }
  235. public OperationMessageModel toModel(OperationMessage entity) {
  236. OperationMessageModel model = new OperationMessageModel();
  237. if (ObjectUtils.isNotEmpty(entity)) {
  238. BeanUtils.copyProperties(entity, model);
  239. model.setDeviceInfo(deviceInfoService.toModel(entity.getDeviceInfo()));
  240. model.setGateWayInfo(gateWayInfoService.toModel(entity.getGateWayInfo()));
  241. }
  242. return model;
  243. }
  244. }