package com.zswl.dataservice.service.artemis; import cn.hutool.core.date.StopWatch; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.google.gson.JsonObject; import com.zswl.dataservice.dao.mqtt.DeviceInfoDao; import com.zswl.dataservice.dao.mqtt.OperationMessageDao; import com.zswl.dataservice.dao.other.ExecuteMethodInfoDao; import com.zswl.dataservice.domain.mqtt.DeviceInfo; import com.zswl.dataservice.domain.mqtt.OperationMessage; import com.zswl.dataservice.domain.other.ExecuteMethodInfo; import com.zswl.dataservice.model.artemis.OperationMessageModel; import com.zswl.dataservice.model.artemis.OperationMessageSearchParam; import com.zswl.dataservice.model.mqtt.SendMessageModel; import com.zswl.dataservice.service.base.SuperService; import com.zswl.dataservice.service.mqtt.DeviceInfoService; import com.zswl.dataservice.service.mqtt.GateWayInfoService; import com.zswl.dataservice.service.payment.HxzService; import com.zswl.dataservice.type.OperationType; import com.zswl.dataservice.utils.DateUtils; import com.zswl.dataservice.utils.TokenUtil; import com.zswl.dataservice.utils.bean.BeanUtils; import com.zswl.dataservice.utils.mqtt.MqttTopicUtils; import com.zswl.dataservice.utils.mqtt.mqttConfig.client.MQClient; import com.zswl.dataservice.utils.page.PageEntityUtil; import com.zswl.dataservice.utils.result.ResultContent; import jakarta.jms.Message; import jakarta.jms.TextMessage; import lombok.extern.slf4j.Slf4j; import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage; import org.apache.activemq.artemis.jms.client.ActiveMQTopic; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.lang.reflect.Method; import java.util.Date; import java.util.UUID; /** * 指令数据管理 * * @author TRX * @date 2024/5/14 */ @Slf4j @Service public class OperationMessageService { @Autowired OperationMessageDao operationMessageDao; @Autowired DeviceInfoService deviceInfoService; @Autowired GateWayInfoService gateWayInfoService; @Autowired MQClient mqClient; @Autowired DeviceInfoDao deviceInfoDao; @Autowired HxzService hxzService; // 保存90天 @Value("${artemisstore.time}") private Long ttlMill = 90 * 24L * 60 * 60 * 1000L; @Autowired private ExecuteMethodInfoDao executeMethodInfoDao; @Autowired private ApplicationContext applicationContext; /** * 发送指令 * * @param param * @return */ public ResultContent sendMessage(SendMessageModel param) { String msg = "发送成功"; try { JSONObject jsonObject = new JSONObject(); jsonObject.set("id", UUID.randomUUID().toString()); jsonObject.set("data", param.getMessage()); jsonObject.set("timeStr", DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS)); jsonObject.set("time", System.currentTimeMillis()); jsonObject.set("ttl", 10 * 1000); mqClient.sendObject(param.getTopic(), jsonObject.toString()); log.info("mqtt msg 发送成功"); } catch (Exception e) { e.printStackTrace(); msg = "发送失败: " + e.getMessage(); } return ResultContent.buildSuccess(msg); } /** * 给设备下发指令 * * @param deviceId 设备ID * @param command 指令,如:on,off * @param data 指令数据 * @return */ public ResultContent sendMessage(String deviceId, String command, JSONObject data) { DeviceInfo deviceInfo = deviceInfoDao.findTopById(deviceId); if (ObjectUtils.isEmpty(deviceInfo)) { return ResultContent.buildFail(String.format("设备不存在:%s", deviceId)); } try { // 消息的TTL时间 Long ttl = 10 * 1000L; // 消息的ID String messageId = ""; OperationMessage message = new OperationMessage(); if (data.containsKey("messageId")) { messageId = data.getStr("messageId"); } else { messageId = UUID.randomUUID().toString(); data.append("messageId", messageId); } String topic = MqttTopicUtils.buildDeviceOperationInstructionsTopic(deviceId, command); message.setMessageId(messageId); message.setData(data); message.setTopic(topic); message.setDeviceId(deviceId); message.setDeviceInfo(deviceInfo); message.setIsReceive(Boolean.FALSE); message.setTtlTime(ttl); message.setTime(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG)); JsonObject jsonObject = new JsonObject(); jsonObject.addProperty("id", messageId); jsonObject.addProperty("data", data.toString()); jsonObject.addProperty("time", System.currentTimeMillis()); jsonObject.addProperty("ttl", ttl); mqClient.sendObject(topic, jsonObject.toString()); log.info("mqtt msg 发送成功"); operationMessageDao.save(message); } catch (Exception e) { e.printStackTrace(); } return ResultContent.buildSuccess(); } /** * /v1/gateway/# * 处理网关MQTT信息 * * @param message */ public void handlerGateWayMessage(Message message) { // 处理接收到的消息 try { ActiveMQTopic activeMQTopic = (ActiveMQTopic) message.getJMSDestination(); String topicName = activeMQTopic.getTopicName(); if (topicName.equals("activemq.notifications")) { return; } String messageId = message.getJMSMessageID(); String clientId = message.getStringProperty("__AMQ_CID"); // log.info("receiveMessage {} 消息监听clientId: {}", messageId, clientId); if (StringUtils.isNotEmpty(topicName) && topicName.endsWith("reply")) { // 这是响应的数据,不处理 // log.warn("回复消息不处理"); return; } log.info("Topic: {}", topicName); OperationMessage operationMessage = new OperationMessage(); operationMessage.setMessageId(messageId); operationMessage.setClientId(clientId); operationMessage.setTopic(topicName); operationMessage.setOperationType(OperationType.Sub); String messageClass = ""; String msg = ""; if (message instanceof ActiveMQBytesMessage) { ActiveMQBytesMessage activeMQBytesMessage = (ActiveMQBytesMessage) message; messageClass = ActiveMQBytesMessage.class.getSimpleName(); byte[] messageBody = new byte[(int) activeMQBytesMessage.getBodyLength()]; // 读取消息内容到字节数组 activeMQBytesMessage.readBytes(messageBody); msg = new String(messageBody); } if (message instanceof TextMessage) { messageClass = TextMessage.class.getSimpleName(); TextMessage textMessage = (TextMessage) message; msg = textMessage.getText(); } cn.hutool.json.JSONObject jsonObject = JSONUtil.parseObj(msg); String id = jsonObject.getStr("id"); Long time = jsonObject.getLong("time"); Long ttl = jsonObject.getLong("ttl"); String event = jsonObject.getStr("event"); String gateWayId = jsonObject.getStr("gatewayId"); String deviceId = jsonObject.getStr("deviceId"); boolean isTimeOut = false; if (System.currentTimeMillis() > (time + ttl)) { isTimeOut = true; } log.info("textMessage: {}", msg); String timeStr = DateUtils.paresTime(time, DateUtils.patternyyyySSS); jsonObject.set("timeStr", timeStr); // --------------------处理业务 start------------------ operationMessage.setMessageClass(messageClass); operationMessage.setData(jsonObject); operationMessage.setTtlTime(ttl); operationMessage.setSendTime(time); operationMessage.setDataId(id); operationMessage.setIsTimeOut(isTimeOut); operationMessage.setEvent(event); operationMessage.setGateWayId(gateWayId); if (ObjectUtils.isNotEmpty(deviceId)) { operationMessage.setDeviceId(deviceId); } initAddOperationMessage(operationMessage); } catch (Exception e) { e.printStackTrace(); } } private ResultContent initAddOperationMessage(OperationMessage entity) { // 判断DataId 是否存在,存在就不处理 String dataId = entity.getDataId(); try { String token = TokenUtil.create(); OperationMessage temp = operationMessageDao.init(dataId, token); if (ObjectUtils.isNotEmpty(temp)) { log.info("获取到执行锁,开始执行"); entity.setId(temp.getId()); entity.setToken(temp.getToken()); addOperationMessage(entity); } else { log.warn("未获取到执行锁,跳过执行"); } } catch (Exception e) { log.error("错误: {}", e.getMessage()); } return ResultContent.buildSuccess(); } /** * 添加 * * @param entity * @return */ public ResultContent addOperationMessage(OperationMessage entity) { entity.setTime(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS)); entity.setTtl(new Date(System.currentTimeMillis() + ttlMill)); if (entity.getIsTimeOut()) { entity.setHandleMsg("超时不处理"); } boolean isTimeOut = entity.getIsTimeOut(); isTimeOut = true; // TODO if (isTimeOut) { // 未超时,处理消息 handleOperationMessage(entity); } else { // 超时 operationMessageDao.save(entity); } return ResultContent.buildSuccess(); } /** * 处理消息 * * @param entity * @return */ public ResultContent handleOperationMessage(OperationMessage entity) { String event = entity.getEvent(); JSONObject json = (JSONObject) entity.getData(); if (json.containsKey("data")) { Object result = null; StopWatch stopWatch = new StopWatch(); stopWatch.start(); boolean isHandleSuccess = true; String handleMsg = "处理成功"; JSONObject jsonObject1 = (JSONObject) json.get("data"); jsonObject1.put("mqttDataId", entity.getDataId()); jsonObject1.put("GateWayId", entity.getGateWayId()); String DeviceId = jsonObject1.getStr("DeviceId"); if (StringUtils.isEmpty(DeviceId)) { DeviceId = entity.getDeviceId(); jsonObject1.put("DeviceId", DeviceId); } entity.setDeviceId(DeviceId); try { ExecuteMethodInfo executeMethodInfo = executeMethodInfoDao.findTopByEvent(event); if (ObjectUtils.isNotEmpty(executeMethodInfo)) { String dataStr = JSONUtil.toJsonStr(jsonObject1); String beanName = executeMethodInfo.getBeanName(); String methodName = executeMethodInfo.getMethodName(); Class c = applicationContext.getBean(beanName).getClass(); SuperService t = (SuperService) applicationContext.getBean(beanName); Method method = c.getMethod(methodName, String.class); ResultContent resultContent = (ResultContent) method.invoke(t, dataStr); if (resultContent.isSuccess()) { result = resultContent.getContent(); } else { isHandleSuccess = false; handleMsg = resultContent.getMsg(); } entity.setBeanName(beanName); entity.setMethodName(methodName); } else { isHandleSuccess = false; handleMsg = "消息处理方法未找到"; } } catch (Exception e) { e.printStackTrace(); isHandleSuccess = false; handleMsg = String.format("业务处理出错:%S", e.getMessage()); } stopWatch.stop(); entity.setHandlerTime(stopWatch.getLastTaskTimeMillis()); // 返回结果 entity.setResultData(result); // 业务处理失败 entity.setIsHandleSuccess(isHandleSuccess); entity.setHandleMsg(handleMsg); log.info("消息处理结果: {} {}", isHandleSuccess, handleMsg); if (isHandleSuccess) { // 处理成功,返回响应 responseMessage(entity); } else { // 处理失败,记录数据 entity.setReTime(System.currentTimeMillis()); entity.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS)); operationMessageDao.save(entity); return ResultContent.buildFail(handleMsg); } } return ResultContent.buildSuccess(); } /** * 响应数据 * * @param entity * @return */ public ResultContent responseMessage(OperationMessage entity) { com.alibaba.fastjson.JSONObject jsonObject = new com.alibaba.fastjson.JSONObject(); jsonObject.put("id", entity.getDataId()); Object data = entity.getResultData(); JSONObject object = new JSONObject(); if (ObjectUtils.isNotEmpty(data)) { String _str = JSONUtil.toJsonStr(data); object = JSONUtil.parseObj(_str); } jsonObject.put("data", object); jsonObject.put("time", System.currentTimeMillis()); jsonObject.put("ttl", entity.getTtlTime()); jsonObject.put("event", entity.getEvent()); String reTopic = String.format("%s/reply", entity.getTopic()); String reMsg = "响应成功"; Boolean reIsSuccess = Boolean.TRUE; try { mqClient.sendObject(reTopic, JSONUtil.toJsonStr(jsonObject), entity.getDataId()); } catch (Exception e) { e.printStackTrace(); reIsSuccess = Boolean.FALSE; reMsg = "mqtt响应出错:" + e.getMessage(); } entity.setIsResult(Boolean.TRUE); entity.setReIsSuccess(reIsSuccess); entity.setReMsg(reMsg); entity.setReTime(System.currentTimeMillis()); entity.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS)); entity.setReTopic(reTopic); operationMessageDao.save(entity); return ResultContent.buildSuccess(); } /** * 指令列表 * * @param pageable * @param param * @return */ public ResultContent> page(Pageable pageable, OperationMessageSearchParam param) { Page page = operationMessageDao.page(pageable, param); return ResultContent.buildSuccess(PageEntityUtil.toPageModel(page, this::toModel)); } /** * 标记消息已收到 * * @param messageId * @return */ public ResultContent receiveMessage(String messageId) { OperationMessage operationMessage = operationMessageDao.findTopByMessageId(messageId); if (ObjectUtils.isEmpty(operationMessage)) { return ResultContent.buildFail(String.format("%s指令信息不存在", messageId)); } if (operationMessage.getIsReceive() != null && operationMessage.getIsReceive()) { return ResultContent.buildFail("该指令已响应"); } operationMessage.setIsReceive(Boolean.TRUE); operationMessage.setReceiveTime(System.currentTimeMillis()); operationMessage.setReceiveTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG)); operationMessageDao.save(operationMessage); return ResultContent.buildSuccess(); } public OperationMessageModel toModel(OperationMessage entity) { OperationMessageModel model = new OperationMessageModel(); if (ObjectUtils.isNotEmpty(entity)) { BeanUtils.copyProperties(entity, model); model.setDeviceInfo(deviceInfoService.toModel(entity.getDeviceInfo())); model.setGateWayInfo(gateWayInfoService.toModel(entity.getGateWayInfo())); } return model; } }