package com.zswl.dataservice.service.artemis; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.google.gson.JsonObject; import com.zswl.dataservice.dao.mqtt.DeviceInfoDao; import com.zswl.dataservice.dao.mqtt.OperationMessageDao; import com.zswl.dataservice.domain.mqtt.DeviceInfo; import com.zswl.dataservice.domain.mqtt.OperationMessage; import com.zswl.dataservice.model.hxz.ConsumTransactionsModel; import com.zswl.dataservice.model.mqtt.*; import com.zswl.dataservice.service.mqtt.DeviceInfoService; import com.zswl.dataservice.service.mqtt.GateWayInfoService; import com.zswl.dataservice.service.payment.HxzService; import com.zswl.dataservice.utils.DateUtils; import com.zswl.dataservice.utils.bean.BeanUtils; import com.zswl.dataservice.utils.mqtt.MqttTopicUtils; import com.zswl.dataservice.utils.mqtt.mqttConfig.client.MQClient; import com.zswl.dataservice.utils.page.PageEntityUtil; import com.zswl.dataservice.utils.result.ResultContent; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ObjectUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.stereotype.Service; import java.util.Date; import java.util.UUID; /** * 指令数据管理 * * @author TRX * @date 2024/5/14 */ @Slf4j @Service public class OperationMessageService { @Autowired OperationMessageDao operationMessageDao; @Autowired DeviceInfoService deviceInfoService; @Autowired GateWayInfoService gateWayInfoService; @Autowired MQClient mqClient; @Autowired DeviceInfoDao deviceInfoDao; @Autowired HxzService hxzService; // 保存90天 private Long ttlMill = 90 * 24L * 60 * 60 * 1000L; /** * 给设备下发指令 * * @param deviceId 设备ID * @param command 指令,如:on,off * @param data 指令数据 * @return */ public ResultContent sendMessage(String deviceId, String command, JSONObject data) { DeviceInfo deviceInfo = deviceInfoDao.findTopById(deviceId); if (ObjectUtils.isEmpty(deviceInfo)) { return ResultContent.buildFail(String.format("设备不存在:%s", deviceId)); } try { // 消息的TTL时间 Long ttl = 10 * 1000L; // 消息的ID String messageId = ""; OperationMessage message = new OperationMessage(); if (data.containsKey("messageId")) { messageId = data.getStr("messageId"); } else { messageId = UUID.randomUUID().toString(); data.append("messageId", messageId); } String topic = MqttTopicUtils.buildDeviceOperationInstructionsTopic(deviceId, command); message.setMessageId(messageId); message.setData(data); message.setTopic(topic); message.setDeviceId(deviceId); message.setDeviceInfo(deviceInfo); message.setIsReceive(Boolean.FALSE); message.setTtlTime(ttl); message.setTime(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG)); JsonObject jsonObject = new JsonObject(); jsonObject.addProperty("id", messageId); jsonObject.addProperty("data", data.toString()); jsonObject.addProperty("time", System.currentTimeMillis()); jsonObject.addProperty("ttl", ttl); mqClient.sendObject(topic, jsonObject.toString()); log.info("mqtt msg 发送成功"); operationMessageDao.save(message); } catch (Exception e) { e.printStackTrace(); } return ResultContent.buildSuccess(); } /** * 添加 * * @param entity * @return */ public ResultContent addOperationMessage(OperationMessage entity) { entity.setTime(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS)); entity.setTtl(new Date(System.currentTimeMillis() + ttlMill)); if(entity.getIsTimeOut()) { entity.setHandleMsg("超时不处理"); } operationMessageDao.save(entity); boolean isTimeOut = entity.getIsTimeOut(); if(!isTimeOut) { // 处理消息 handleOperationMessage(entity); } return ResultContent.buildSuccess(); } /** * 处理消息 * * @param entity * @return */ public ResultContent handleOperationMessage(OperationMessage entity) { String event = entity.getEvent(); JSONObject json = (JSONObject) entity.getData(); if (json.containsKey("data")) { Object result = null; String dataStr = json.getStr("data"); boolean isHandleSuccess = true; String handleMsg = "处理成功"; try { // 判断那个业务处理 if (event.equals("consum")) { ConsumTransactionsModel model = JSONUtil.toBean(dataStr, ConsumTransactionsModel.class); ResultContent resultContent = hxzService.consumTransactions(model); if (resultContent.isSuccess()) { result = resultContent.getContent(); }else { isHandleSuccess = false; handleMsg = resultContent.getMsg(); } } } catch (Exception e) { e.printStackTrace(); isHandleSuccess = false; handleMsg = String.format("业务处理出错:%S", e.getMessage()); } // 返回结果 entity.setResultData(result); // 业务处理失败 entity.setIsHandleSuccess(isHandleSuccess); entity.setHandleMsg(handleMsg); if (isHandleSuccess) { // 处理成功,返回响应 responseMessage(entity); }else { entity.setReTime(System.currentTimeMillis()); entity.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS)); operationMessageDao.save(entity); return ResultContent.buildFail(handleMsg); } } return ResultContent.buildSuccess(); } /** * 响应数据 * * @param entity * @return */ public ResultContent responseMessage(OperationMessage entity) { com.alibaba.fastjson.JSONObject jsonObject = new com.alibaba.fastjson.JSONObject(); jsonObject.put("id", entity.getDataId()); Object data = entity.getResultData(); JSONObject object = new JSONObject(); if (ObjectUtils.isNotEmpty(data)) { String _str = JSONUtil.toJsonStr(data); object = JSONUtil.parseObj(_str); } jsonObject.put("data", object); jsonObject.put("time", System.currentTimeMillis()); jsonObject.put("ttl", entity.getTtlTime()); jsonObject.put("event", entity.getEvent()); String reTopic = String.format("%s/reply", entity.getTopic()); String reMsg = "响应成功"; Boolean reIsSuccess = Boolean.TRUE; try { mqClient.sendObject(reTopic, JSONUtil.toJsonStr(jsonObject)); } catch (Exception e) { e.printStackTrace(); reIsSuccess = Boolean.FALSE; reMsg = "mqtt响应出错:" + e.getMessage(); } entity.setIsResult(Boolean.TRUE); entity.setReIsSuccess(reIsSuccess); entity.setReMsg(reMsg); entity.setReTime(System.currentTimeMillis()); entity.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS)); entity.setReTopic(reTopic); operationMessageDao.save(entity); return ResultContent.buildSuccess(); } /** * 指令列表 * * @param pageable * @param param * @return */ public ResultContent> page(Pageable pageable, OperationMessageSearchParam param) { Page page = operationMessageDao.page(pageable, param); return ResultContent.buildSuccess(PageEntityUtil.toPageModel(page, this::toModel)); } /** * 标记消息已收到 * * @param messageId * @return */ public ResultContent receiveMessage(String messageId) { OperationMessage operationMessage = operationMessageDao.findTopByMessageId(messageId); if (ObjectUtils.isEmpty(operationMessage)) { return ResultContent.buildFail(String.format("%s指令信息不存在", messageId)); } if (operationMessage.getIsReceive() != null && operationMessage.getIsReceive()) { return ResultContent.buildFail("该指令已响应"); } operationMessage.setIsReceive(Boolean.TRUE); operationMessage.setReceiveTime(System.currentTimeMillis()); operationMessage.setReceiveTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG)); operationMessageDao.save(operationMessage); return ResultContent.buildSuccess(); } public OperationMessageModel toModel(OperationMessage entity) { OperationMessageModel model = new OperationMessageModel(); if (ObjectUtils.isNotEmpty(entity)) { BeanUtils.copyProperties(entity, model); model.setDeviceInfo(deviceInfoService.toModel(entity.getDeviceInfo())); model.setGateWayInfo(gateWayInfoService.toModel(entity.getGateWayInfo())); } return model; } }