| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407 |
- package com.zswl.dataservice.service.artemis;
- import cn.hutool.core.date.StopWatch;
- import cn.hutool.json.JSONObject;
- import cn.hutool.json.JSONUtil;
- import com.google.gson.JsonObject;
- import com.zswl.dataservice.dao.mqtt.DeviceInfoDao;
- import com.zswl.dataservice.dao.mqtt.OperationMessageDao;
- import com.zswl.dataservice.dao.other.ExecuteMethodInfoDao;
- import com.zswl.dataservice.domain.mqtt.DeviceInfo;
- import com.zswl.dataservice.domain.mqtt.OperationMessage;
- import com.zswl.dataservice.domain.other.ExecuteMethodInfo;
- import com.zswl.dataservice.model.artemis.OperationMessageModel;
- import com.zswl.dataservice.model.artemis.OperationMessageSearchParam;
- import com.zswl.dataservice.model.mqtt.SendMessageModel;
- import com.zswl.dataservice.service.base.SuperService;
- import com.zswl.dataservice.service.mqtt.DeviceInfoService;
- import com.zswl.dataservice.service.mqtt.GateWayInfoService;
- import com.zswl.dataservice.service.payment.HxzService;
- import com.zswl.dataservice.type.OperationType;
- import com.zswl.dataservice.utils.DateUtils;
- import com.zswl.dataservice.utils.bean.BeanUtils;
- import com.zswl.dataservice.utils.mqtt.MqttTopicUtils;
- import com.zswl.dataservice.utils.mqtt.mqttConfig.client.MQClient;
- import com.zswl.dataservice.utils.page.PageEntityUtil;
- import com.zswl.dataservice.utils.result.ResultContent;
- import jakarta.jms.Message;
- import jakarta.jms.TextMessage;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage;
- import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
- import org.apache.commons.lang3.ObjectUtils;
- import org.apache.commons.lang3.StringUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.ApplicationContext;
- import org.springframework.data.domain.Page;
- import org.springframework.data.domain.Pageable;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Service;
- import java.lang.reflect.Method;
- import java.util.Date;
- import java.util.UUID;
- /**
- * 指令数据管理
- *
- * @author TRX
- * @date 2024/5/14
- */
- @Slf4j
- @Service
- public class OperationMessageService {
- @Autowired
- OperationMessageDao operationMessageDao;
- @Autowired
- DeviceInfoService deviceInfoService;
- @Autowired
- GateWayInfoService gateWayInfoService;
- @Autowired
- MQClient mqClient;
- @Autowired
- DeviceInfoDao deviceInfoDao;
- @Autowired
- HxzService hxzService;
- // 保存90天
- @Value("${artemisstore.time}")
- private Long ttlMill = 90 * 24L * 60 * 60 * 1000L;
- @Autowired
- private ExecuteMethodInfoDao executeMethodInfoDao;
- @Autowired
- private ApplicationContext applicationContext;
- /**
- * 发送指令
- *
- * @param param
- * @return
- */
- public ResultContent sendMessage(SendMessageModel param) {
- String msg = "发送成功";
- try {
- JSONObject jsonObject = new JSONObject();
- jsonObject.set("id", UUID.randomUUID().toString());
- jsonObject.set("data", param.getMessage());
- jsonObject.set("timeStr", DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
- jsonObject.set("time", System.currentTimeMillis());
- jsonObject.set("ttl", 10 * 1000);
- mqClient.sendObject(param.getTopic(), jsonObject.toString());
- log.info("mqtt msg 发送成功");
- } catch (Exception e) {
- e.printStackTrace();
- msg = "发送失败: " + e.getMessage();
- }
- return ResultContent.buildSuccess(msg);
- }
- /**
- * 给设备下发指令
- *
- * @param deviceId 设备ID
- * @param command 指令,如:on,off
- * @param data 指令数据
- * @return
- */
- public ResultContent sendMessage(String deviceId, String command, JSONObject data) {
- DeviceInfo deviceInfo = deviceInfoDao.findTopById(deviceId);
- if (ObjectUtils.isEmpty(deviceInfo)) {
- return ResultContent.buildFail(String.format("设备不存在:%s", deviceId));
- }
- try {
- // 消息的TTL时间
- Long ttl = 10 * 1000L;
- // 消息的ID
- String messageId = "";
- OperationMessage message = new OperationMessage();
- if (data.containsKey("messageId")) {
- messageId = data.getStr("messageId");
- } else {
- messageId = UUID.randomUUID().toString();
- data.append("messageId", messageId);
- }
- String topic = MqttTopicUtils.buildDeviceOperationInstructionsTopic(deviceId, command);
- message.setMessageId(messageId);
- message.setData(data);
- message.setTopic(topic);
- message.setDeviceId(deviceId);
- message.setDeviceInfo(deviceInfo);
- message.setIsReceive(Boolean.FALSE);
- message.setTtlTime(ttl);
- message.setTime(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
- JsonObject jsonObject = new JsonObject();
- jsonObject.addProperty("id", messageId);
- jsonObject.addProperty("data", data.toString());
- jsonObject.addProperty("time", System.currentTimeMillis());
- jsonObject.addProperty("ttl", ttl);
- mqClient.sendObject(topic, jsonObject.toString());
- log.info("mqtt msg 发送成功");
- operationMessageDao.save(message);
- } catch (Exception e) {
- e.printStackTrace();
- }
- return ResultContent.buildSuccess();
- }
- /**
- * 处理网关MQTT信息
- *
- * @param message
- */
- public void handlerGateWayMessage(Message message) {
- // 处理接收到的消息
- try {
- ActiveMQTopic activeMQTopic = (ActiveMQTopic) message.getJMSDestination();
- String topicName = activeMQTopic.getTopicName();
- if (topicName.equals("activemq.notifications")) {
- return;
- }
- String messageId = message.getJMSMessageID();
- String clientId = message.getStringProperty("__AMQ_CID");
- log.info("receiveMessage {} 消息监听clientId: {}", messageId, clientId);
- log.info("Topic: {}", topicName);
- if (StringUtils.isNotEmpty(topicName) && topicName.endsWith("reply")) {
- // 这是响应的数据,不处理
- log.warn("回复消息不处理");
- return;
- }
- OperationMessage operationMessage = new OperationMessage();
- operationMessage.setMessageId(messageId);
- operationMessage.setClientId(clientId);
- operationMessage.setTopic(topicName);
- operationMessage.setOperationType(OperationType.Sub);
- String messageClass = "";
- String msg = "";
- if (message instanceof ActiveMQBytesMessage) {
- ActiveMQBytesMessage activeMQBytesMessage = (ActiveMQBytesMessage) message;
- messageClass = ActiveMQBytesMessage.class.getSimpleName();
- byte[] messageBody = new byte[(int) activeMQBytesMessage.getBodyLength()];
- // 读取消息内容到字节数组
- activeMQBytesMessage.readBytes(messageBody);
- msg = new String(messageBody);
- }
- if (message instanceof TextMessage) {
- messageClass = TextMessage.class.getSimpleName();
- TextMessage textMessage = (TextMessage) message;
- msg = textMessage.getText();
- }
- cn.hutool.json.JSONObject jsonObject = JSONUtil.parseObj(msg);
- String id = jsonObject.getStr("id");
- Long time = jsonObject.getLong("time");
- Long ttl = jsonObject.getLong("ttl");
- String event = jsonObject.getStr("event");
- boolean isTimeOut = false;
- if (System.currentTimeMillis() > (time + ttl)) {
- isTimeOut = true;
- }
- log.info("textMessage: {}", msg);
- String timeStr = DateUtils.paresTime(time, DateUtils.patternyyyySSS);
- jsonObject.set("timeStr", timeStr);
- // --------------------处理业务 start------------------
- operationMessage.setMessageClass(messageClass);
- operationMessage.setData(jsonObject);
- operationMessage.setTtlTime(ttl);
- operationMessage.setSendTime(time);
- operationMessage.setDataId(id);
- operationMessage.setIsTimeOut(isTimeOut);
- operationMessage.setEvent(event);
- addOperationMessage(operationMessage);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- /**
- * 添加
- *
- * @param entity
- * @return
- */
- public ResultContent addOperationMessage(OperationMessage entity) {
- entity.setTime(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
- entity.setTtl(new Date(System.currentTimeMillis() + ttlMill));
- if (entity.getIsTimeOut()) {
- entity.setHandleMsg("超时不处理");
- }
- boolean isTimeOut = entity.getIsTimeOut();
- isTimeOut = true;
- // 判断业务数据ID是否存在,不要重复处理
- boolean isExit = operationMessageDao.existsByDataId(entity.getDataId());
- // TODO
- if (isTimeOut) {
- // 未超时,处理消息
- handleOperationMessage(entity);
- } else {
- // 超时
- operationMessageDao.save(entity);
- }
- return ResultContent.buildSuccess();
- }
- /**
- * 处理消息
- *
- * @param entity
- * @return
- */
- public ResultContent handleOperationMessage(OperationMessage entity) {
- String event = entity.getEvent();
- JSONObject json = (JSONObject) entity.getData();
- if (json.containsKey("data")) {
- Object result = null;
- StopWatch stopWatch = new StopWatch();
- stopWatch.start();
- boolean isHandleSuccess = true;
- String handleMsg = "处理成功";
- try {
- ExecuteMethodInfo executeMethodInfo = executeMethodInfoDao.findTopByEvent(event);
- if (ObjectUtils.isNotEmpty(executeMethodInfo)) {
- String dataStr = json.getStr("data");
- String beanName = executeMethodInfo.getBeanName();
- String methodName = executeMethodInfo.getMethodName();
- Class c = applicationContext.getBean(beanName).getClass();
- SuperService t = (SuperService) applicationContext.getBean(beanName);
- Method method = c.getMethod(methodName, String.class);
- ResultContent<Object> resultContent = (ResultContent<Object>) method.invoke(t, dataStr);
- if (resultContent.isSuccess()) {
- result = resultContent.getContent();
- } else {
- isHandleSuccess = false;
- handleMsg = resultContent.getMsg();
- }
- entity.setBeanName(beanName);
- entity.setMethodName(methodName);
- } else {
- isHandleSuccess = false;
- handleMsg = "消息处理方法未找到";
- }
- } catch (Exception e) {
- e.printStackTrace();
- isHandleSuccess = false;
- handleMsg = String.format("业务处理出错:%S", e.getMessage());
- }
- stopWatch.stop();
- entity.setHandlerTime(stopWatch.getLastTaskTimeMillis());
- // 返回结果
- entity.setResultData(result);
- // 业务处理失败
- entity.setIsHandleSuccess(isHandleSuccess);
- entity.setHandleMsg(handleMsg);
- if (isHandleSuccess) {
- // 处理成功,返回响应
- responseMessage(entity);
- } else {
- // 处理失败,记录数据
- entity.setReTime(System.currentTimeMillis());
- entity.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
- operationMessageDao.save(entity);
- return ResultContent.buildFail(handleMsg);
- }
- }
- return ResultContent.buildSuccess();
- }
- /**
- * 响应数据
- *
- * @param entity
- * @return
- */
- public ResultContent responseMessage(OperationMessage entity) {
- com.alibaba.fastjson.JSONObject jsonObject = new com.alibaba.fastjson.JSONObject();
- jsonObject.put("id", entity.getDataId());
- Object data = entity.getResultData();
- JSONObject object = new JSONObject();
- if (ObjectUtils.isNotEmpty(data)) {
- String _str = JSONUtil.toJsonStr(data);
- object = JSONUtil.parseObj(_str);
- }
- jsonObject.put("data", object);
- jsonObject.put("time", System.currentTimeMillis());
- jsonObject.put("ttl", entity.getTtlTime());
- jsonObject.put("event", entity.getEvent());
- String reTopic = String.format("%s/reply", entity.getTopic());
- String reMsg = "响应成功";
- Boolean reIsSuccess = Boolean.TRUE;
- try {
- mqClient.sendObject(reTopic, JSONUtil.toJsonStr(jsonObject));
- } catch (Exception e) {
- e.printStackTrace();
- reIsSuccess = Boolean.FALSE;
- reMsg = "mqtt响应出错:" + e.getMessage();
- }
- entity.setIsResult(Boolean.TRUE);
- entity.setReIsSuccess(reIsSuccess);
- entity.setReMsg(reMsg);
- entity.setReTime(System.currentTimeMillis());
- entity.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
- entity.setReTopic(reTopic);
- operationMessageDao.save(entity);
- return ResultContent.buildSuccess();
- }
- /**
- * 指令列表
- *
- * @param pageable
- * @param param
- * @return
- */
- public ResultContent<Page<OperationMessageModel>> page(Pageable pageable, OperationMessageSearchParam param) {
- Page<OperationMessage> page = operationMessageDao.page(pageable, param);
- return ResultContent.buildSuccess(PageEntityUtil.toPageModel(page, this::toModel));
- }
- /**
- * 标记消息已收到
- *
- * @param messageId
- * @return
- */
- public ResultContent receiveMessage(String messageId) {
- OperationMessage operationMessage = operationMessageDao.findTopByMessageId(messageId);
- if (ObjectUtils.isEmpty(operationMessage)) {
- return ResultContent.buildFail(String.format("%s指令信息不存在", messageId));
- }
- if (operationMessage.getIsReceive() != null && operationMessage.getIsReceive()) {
- return ResultContent.buildFail("该指令已响应");
- }
- operationMessage.setIsReceive(Boolean.TRUE);
- operationMessage.setReceiveTime(System.currentTimeMillis());
- operationMessage.setReceiveTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
- operationMessageDao.save(operationMessage);
- return ResultContent.buildSuccess();
- }
- public OperationMessageModel toModel(OperationMessage entity) {
- OperationMessageModel model = new OperationMessageModel();
- if (ObjectUtils.isNotEmpty(entity)) {
- BeanUtils.copyProperties(entity, model);
- model.setDeviceInfo(deviceInfoService.toModel(entity.getDeviceInfo()));
- model.setGateWayInfo(gateWayInfoService.toModel(entity.getGateWayInfo()));
- }
- return model;
- }
- }
|