|
|
@@ -0,0 +1,681 @@
|
|
|
+package com.zhongshu.iot.server.core.service.artemis;
|
|
|
+
|
|
|
+import cn.hutool.core.date.StopWatch;
|
|
|
+import cn.hutool.json.JSONObject;
|
|
|
+import cn.hutool.json.JSONUtil;
|
|
|
+import com.github.microservice.models.hxz.base.HxzBaseResult;
|
|
|
+import com.google.gson.JsonObject;
|
|
|
+import com.zhongshu.iot.client.model.artemis.OperationMessageModel;
|
|
|
+import com.zhongshu.iot.client.model.artemis.OperationMessageResultModel;
|
|
|
+import com.zhongshu.iot.client.model.artemis.OperationMessageResultSearch;
|
|
|
+import com.zhongshu.iot.client.model.artemis.OperationMessageSearchParam;
|
|
|
+import com.zhongshu.iot.client.model.mqtt.SendMessageModel;
|
|
|
+import com.zhongshu.iot.client.type.FunctionType;
|
|
|
+import com.zhongshu.iot.client.type.OperationType;
|
|
|
+import com.zhongshu.iot.server.core.dao.iot.IotMainDao;
|
|
|
+import com.zhongshu.iot.server.core.dao.mqtt.DeviceInfoDao;
|
|
|
+import com.zhongshu.iot.server.core.dao.mqtt.OperationMessageDao;
|
|
|
+import com.zhongshu.iot.server.core.dao.mqtt.OperationMessageResultDao;
|
|
|
+import com.zhongshu.iot.server.core.dao.other.ExecuteMethodInfoDao;
|
|
|
+import com.zhongshu.iot.server.core.domain.iot.IotMain;
|
|
|
+import com.zhongshu.iot.server.core.domain.iot.mqtt.DeviceInfo;
|
|
|
+import com.zhongshu.iot.server.core.domain.iot.mqtt.OperationMessage;
|
|
|
+import com.zhongshu.iot.server.core.domain.iot.mqtt.OperationMessageResult;
|
|
|
+import com.zhongshu.iot.server.core.domain.other.ExecuteMethodInfo;
|
|
|
+import com.zhongshu.iot.server.core.httpRequest.ApiRequestService;
|
|
|
+import com.zhongshu.iot.server.core.httpRequest.apiConf.APIResponseModel;
|
|
|
+import com.zhongshu.iot.server.core.service.base.SuperService;
|
|
|
+import com.zhongshu.iot.server.core.service.iot.IotDataVerifyService;
|
|
|
+import com.zhongshu.iot.server.core.service.mqtt.DeviceInfoService;
|
|
|
+import com.zhongshu.iot.server.core.service.mqtt.GateWayInfoService;
|
|
|
+import com.zhongshu.iot.server.core.util.CommonUtil;
|
|
|
+import com.zhongshu.iot.server.core.util.DateUtils;
|
|
|
+import com.zhongshu.iot.server.core.util.TokenUtil;
|
|
|
+import com.zhongshu.iot.server.core.util.bean.BeanUtils;
|
|
|
+import com.zhongshu.iot.server.core.util.mqtt.MqttTopicUtils;
|
|
|
+import com.zhongshu.iot.server.core.util.mqtt.mqttConfig.client.MQClient;
|
|
|
+import com.zhongshu.iot.server.core.util.page.PageEntityUtil;
|
|
|
+import com.zhongshu.iot.server.core.util.result.ResultContent;
|
|
|
+import jakarta.jms.Message;
|
|
|
+import jakarta.jms.TextMessage;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage;
|
|
|
+import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
|
|
|
+import org.apache.commons.lang3.ObjectUtils;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.context.ApplicationContext;
|
|
|
+import org.springframework.data.domain.Page;
|
|
|
+import org.springframework.data.domain.Pageable;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import java.lang.reflect.Method;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.List;
|
|
|
+import java.util.UUID;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 指令数据管理
|
|
|
+ *
|
|
|
+ * @author TRX
|
|
|
+ * @date 2024/5/14
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+public class OperationMessageService {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private OperationMessageDao operationMessageDao;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private DeviceInfoService deviceInfoService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private GateWayInfoService gateWayInfoService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private MQClient mqClient;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private DeviceInfoDao deviceInfoDao;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private ExecuteMethodInfoDao executeMethodInfoDao;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private ApplicationContext applicationContext;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IotMainDao iotMainDao;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private OperationMessageResultDao operationMessageResultDao;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private ApiRequestService apiRequestService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IotDataVerifyService iotDataVerifyService;
|
|
|
+
|
|
|
+ @Value("${artemisstore.time}")
|
|
|
+ public Long ttlMill = 30 * 24L * 60 * 60 * 1000L;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送指令
|
|
|
+ *
|
|
|
+ * @param param
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public ResultContent sendMessage(SendMessageModel param) {
|
|
|
+ String msg = "发送成功";
|
|
|
+ try {
|
|
|
+ JSONObject jsonObject = new JSONObject();
|
|
|
+ jsonObject.set("id", UUID.randomUUID().toString());
|
|
|
+ jsonObject.set("data", param.getMessage());
|
|
|
+ jsonObject.set("timeStr", DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
|
|
|
+ jsonObject.set("time", System.currentTimeMillis());
|
|
|
+ jsonObject.set("ttl", 10 * 1000);
|
|
|
+ mqClient.sendObject(param.getTopic(), jsonObject.toString());
|
|
|
+ log.info("mqtt msg 发送成功");
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ msg = "发送失败: " + e.getMessage();
|
|
|
+ }
|
|
|
+ return ResultContent.buildSuccess(msg);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 给设备下发指令
|
|
|
+ *
|
|
|
+ * @param deviceId 设备ID
|
|
|
+ * @param command 指令,如:on,off
|
|
|
+ * @param data 指令数据
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public ResultContent sendMessage(String deviceId, String command, JSONObject data) {
|
|
|
+ DeviceInfo deviceInfo = deviceInfoDao.findTopById(deviceId);
|
|
|
+ if (ObjectUtils.isEmpty(deviceInfo)) {
|
|
|
+ return ResultContent.buildFail(String.format("设备不存在:%s", deviceId));
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ // 消息的TTL时间
|
|
|
+ Long ttl = 10 * 1000L;
|
|
|
+ // 消息的ID
|
|
|
+ String messageId = "";
|
|
|
+ OperationMessage message = new OperationMessage();
|
|
|
+ if (data.containsKey("messageId")) {
|
|
|
+ messageId = data.getStr("messageId");
|
|
|
+ } else {
|
|
|
+ messageId = UUID.randomUUID().toString();
|
|
|
+ data.append("messageId", messageId);
|
|
|
+ }
|
|
|
+ String topic = MqttTopicUtils.buildDeviceOperationInstructionsTopic(deviceId, command);
|
|
|
+
|
|
|
+ message.setMessageId(messageId);
|
|
|
+ message.setData(data);
|
|
|
+ message.setTopic(topic);
|
|
|
+ message.setDeviceId(deviceId);
|
|
|
+ message.setDeviceInfo(deviceInfo);
|
|
|
+ message.setIsReceive(Boolean.FALSE);
|
|
|
+ message.setTtlTime(ttl);
|
|
|
+ message.setTime(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
|
|
|
+
|
|
|
+ JsonObject jsonObject = new JsonObject();
|
|
|
+ jsonObject.addProperty("id", messageId);
|
|
|
+ jsonObject.addProperty("data", data.toString());
|
|
|
+ jsonObject.addProperty("time", System.currentTimeMillis());
|
|
|
+ jsonObject.addProperty("ttl", ttl);
|
|
|
+ mqClient.sendObject(topic, jsonObject.toString());
|
|
|
+ log.info("mqtt msg 发送成功");
|
|
|
+
|
|
|
+ operationMessageDao.save(message);
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ return ResultContent.buildSuccess();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * /v1/gateway/#
|
|
|
+ * 处理网关MQTT信息
|
|
|
+ *
|
|
|
+ * @param message
|
|
|
+ */
|
|
|
+ public void handlerGateWayMessage(Message message) {
|
|
|
+ // 处理接收到的消息
|
|
|
+ String msg = "";
|
|
|
+ try {
|
|
|
+ ActiveMQTopic activeMQTopic = (ActiveMQTopic) message.getJMSDestination();
|
|
|
+ String topicName = activeMQTopic.getTopicName();
|
|
|
+ if (topicName.equals("activemq.notifications")) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ String messageId = message.getJMSMessageID();
|
|
|
+ String clientId = message.getStringProperty("__AMQ_CID");
|
|
|
+// log.info("receiveMessage {} 消息监听clientId: {}", messageId, clientId);
|
|
|
+ if (StringUtils.isNotEmpty(topicName) && topicName.endsWith("reply")) {
|
|
|
+ // 这是响应的数据,不处理
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ String messageClass = "";
|
|
|
+ if (message instanceof ActiveMQBytesMessage) {
|
|
|
+ ActiveMQBytesMessage activeMQBytesMessage = (ActiveMQBytesMessage) message;
|
|
|
+ messageClass = ActiveMQBytesMessage.class.getSimpleName();
|
|
|
+ byte[] messageBody = new byte[(int) activeMQBytesMessage.getBodyLength()];
|
|
|
+ // 读取消息内容到字节数组
|
|
|
+ activeMQBytesMessage.readBytes(messageBody);
|
|
|
+ msg = new String(messageBody);
|
|
|
+ }
|
|
|
+ if (message instanceof TextMessage) {
|
|
|
+ messageClass = TextMessage.class.getSimpleName();
|
|
|
+ TextMessage textMessage = (TextMessage) message;
|
|
|
+ msg = textMessage.getText();
|
|
|
+ }
|
|
|
+ // 提取内容里面的数据
|
|
|
+ JSONObject jsonObject = JSONUtil.parseObj(msg);
|
|
|
+ String id = jsonObject.getStr("id");
|
|
|
+ Long time = jsonObject.getLong("time");
|
|
|
+ Long ttl = jsonObject.getLong("ttl");
|
|
|
+ String event = jsonObject.getStr("event");
|
|
|
+ if (ObjectUtils.isEmpty(event)) {
|
|
|
+ String[] arr = topicName.split("/");
|
|
|
+ event = arr[arr.length - 1];
|
|
|
+ jsonObject.put("event", event);
|
|
|
+ }
|
|
|
+ // 是否是测试
|
|
|
+ boolean isTest = true;
|
|
|
+ // ping不执行
|
|
|
+ if (isTest && event.equals("ping")) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (isTest) {
|
|
|
+ id = CommonUtil.UUID();
|
|
|
+ }
|
|
|
+ log.info("Topic: {} {}", event, topicName);
|
|
|
+
|
|
|
+ String gateWayId = jsonObject.getStr("gatewayId");
|
|
|
+ String deviceId = jsonObject.getStr("deviceId");
|
|
|
+ boolean isTimeOut = true;
|
|
|
+ if (time == null) {
|
|
|
+ time = System.currentTimeMillis();
|
|
|
+ }
|
|
|
+ if (System.currentTimeMillis() > (time + ttl)) {
|
|
|
+ isTimeOut = true;
|
|
|
+ }
|
|
|
+ log.info("消息内容: {}", msg);
|
|
|
+
|
|
|
+ String timeStr = DateUtils.paresTime(time, DateUtils.patternyyyySSS);
|
|
|
+ jsonObject.set("timeStr", timeStr);
|
|
|
+
|
|
|
+ // --------------------消息处理 start------------------
|
|
|
+ OperationMessage operationMessage = new OperationMessage();
|
|
|
+ operationMessage.setMessageId(messageId); // 消息ID
|
|
|
+ operationMessage.setClientId(clientId); // 终端ID
|
|
|
+ operationMessage.setTopic(topicName); // topic名称
|
|
|
+ operationMessage.setOperationType(OperationType.Sub);
|
|
|
+ operationMessage.setMessageClass(messageClass);
|
|
|
+ operationMessage.setData(jsonObject);
|
|
|
+ operationMessage.setTtlTime(ttl);
|
|
|
+ operationMessage.setSendTime(time);
|
|
|
+ operationMessage.setDataId(id);
|
|
|
+ operationMessage.setIsTimeOut(isTimeOut);
|
|
|
+ operationMessage.setEvent(event);
|
|
|
+ operationMessage.setGateWayId(gateWayId);
|
|
|
+ if (ObjectUtils.isNotEmpty(deviceId)) {
|
|
|
+ operationMessage.setDeviceId(deviceId);
|
|
|
+ }
|
|
|
+ initAddOperationMessage(operationMessage);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("消息出错了: {}", msg);
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private ResultContent initAddOperationMessage(OperationMessage entity) {
|
|
|
+ // 判断DataId 是否存在,存在就不处理
|
|
|
+ String dataId = entity.getDataId();
|
|
|
+ try {
|
|
|
+ String token = TokenUtil.create();
|
|
|
+ OperationMessage temp = operationMessageDao.init(dataId, token);
|
|
|
+ if (ObjectUtils.isNotEmpty(temp)) {
|
|
|
+ log.info("获取到执行锁,开始执行");
|
|
|
+ entity.setId(temp.getId());
|
|
|
+ entity.setToken(temp.getToken());
|
|
|
+ entity.setCreateTime(System.currentTimeMillis());
|
|
|
+ addOperationMessage(entity);
|
|
|
+ } else {
|
|
|
+ log.warn("未获取到执行锁,跳过执行");
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("错误: {}", e.getMessage());
|
|
|
+ }
|
|
|
+ return ResultContent.buildSuccess();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 添加
|
|
|
+ *
|
|
|
+ * @param entity
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public ResultContent addOperationMessage(OperationMessage entity) {
|
|
|
+ // 服务器接收消息的时间
|
|
|
+ entity.setTime(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
|
|
|
+ // 消息的过期时间
|
|
|
+ entity.setTtl(new Date(System.currentTimeMillis() + ttlMill));
|
|
|
+ boolean isTimeOut = entity.getIsTimeOut();
|
|
|
+ isTimeOut = false;
|
|
|
+ if (entity.getEvent().equals("ping")) {
|
|
|
+ Date da = new Date(System.currentTimeMillis() + 10 * 60 * 1000);
|
|
|
+ entity.setTtl(da);
|
|
|
+ }
|
|
|
+ if (entity.getIsTimeOut()) {
|
|
|
+ entity.setHandleMsg("超时不处理");
|
|
|
+ }
|
|
|
+ // TODO
|
|
|
+ if (!isTimeOut) {
|
|
|
+ // 未超时,处理消息
|
|
|
+ handleOperationMessage(entity);
|
|
|
+ } else {
|
|
|
+ // 超时
|
|
|
+ operationMessageDao.save(entity);
|
|
|
+ }
|
|
|
+ return ResultContent.buildSuccess();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理消息
|
|
|
+ *
|
|
|
+ * @param entity
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public ResultContent handleOperationMessage(OperationMessage entity) {
|
|
|
+ String event = entity.getEvent();
|
|
|
+ JSONObject json = (JSONObject) entity.getData();
|
|
|
+ if (json.containsKey("data")) {
|
|
|
+ StopWatch stopWatch = new StopWatch();
|
|
|
+ stopWatch.start();
|
|
|
+
|
|
|
+ boolean isHandleSuccess = true;
|
|
|
+ String handleMsg = "处理成功";
|
|
|
+ // 业务处理的消息内容
|
|
|
+ JSONObject requestData = (JSONObject) json.get("data");
|
|
|
+ requestData.put("mqttDataId", entity.getDataId());
|
|
|
+ requestData.put("GateWayId", entity.getGateWayId());
|
|
|
+ String DeviceId = requestData.getStr("DeviceId");
|
|
|
+ if (StringUtils.isEmpty(DeviceId)) {
|
|
|
+ DeviceId = entity.getDeviceId();
|
|
|
+ requestData.put("DeviceId", DeviceId);
|
|
|
+ }
|
|
|
+ entity.setDeviceId(DeviceId);
|
|
|
+
|
|
|
+ // 查询有对应事件的设备
|
|
|
+ List<IotMain> events = iotMainDao.findByRealIotTopicAndFunctionTypeOrderByCreateTimeAsc(entity.getTopic(), FunctionType.Event);
|
|
|
+ if (ObjectUtils.isNotEmpty(events)) {
|
|
|
+ for (IotMain iotMain : events) {
|
|
|
+ executeOperationMessage(entity, requestData, iotMain);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ isHandleSuccess = false;
|
|
|
+ handleMsg = "无对应物模型事件处理";
|
|
|
+ }
|
|
|
+ // 如果是ping,则平台也会处理
|
|
|
+ if (event != null && event.equals("ping")) {
|
|
|
+ pingHandler(entity, requestData);
|
|
|
+ } else if (event != null && event.equals("ServerTime")) {
|
|
|
+ pingHandler(entity, requestData);
|
|
|
+ } else {
|
|
|
+ // 业务处理失败
|
|
|
+ entity.setIsHandleSuccess(isHandleSuccess);
|
|
|
+ entity.setHandleMsg(handleMsg);
|
|
|
+ entity.setReTime(System.currentTimeMillis());
|
|
|
+ entity.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
|
|
|
+ operationMessageDao.save(entity);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ResultContent.buildSuccess();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理ping消息
|
|
|
+ *
|
|
|
+ * @param entity
|
|
|
+ * @param requestData
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public ResultContent pingHandler(OperationMessage entity, JSONObject requestData) {
|
|
|
+ String event = entity.getEvent();
|
|
|
+ Object result = null;
|
|
|
+ StopWatch stopWatch = new StopWatch();
|
|
|
+ stopWatch.start();
|
|
|
+
|
|
|
+ boolean isHandleSuccess = true;
|
|
|
+ String handleMsg = "处理成功";
|
|
|
+ // 查询业务处理端信息
|
|
|
+ try {
|
|
|
+ ExecuteMethodInfo executeMethodInfo = executeMethodInfoDao.findTopByEvent(event);
|
|
|
+ if (ObjectUtils.isNotEmpty(executeMethodInfo)) {
|
|
|
+ String dataStr = JSONUtil.toJsonStr(requestData);
|
|
|
+
|
|
|
+ String beanName = executeMethodInfo.getBeanName();
|
|
|
+ String methodName = executeMethodInfo.getMethodName();
|
|
|
+ Class c = applicationContext.getBean(beanName).getClass();
|
|
|
+ SuperService t = (SuperService) applicationContext.getBean(beanName);
|
|
|
+ Method method = c.getMethod(methodName, String.class);
|
|
|
+ ResultContent<Object> resultContent = (ResultContent<Object>) method.invoke(t, dataStr);
|
|
|
+ if (resultContent.isSuccess()) {
|
|
|
+ isHandleSuccess = true;
|
|
|
+ handleMsg = "处理成功";
|
|
|
+ result = resultContent.getContent();
|
|
|
+ } else {
|
|
|
+ isHandleSuccess = false;
|
|
|
+ handleMsg = resultContent.getMsg();
|
|
|
+ }
|
|
|
+ entity.setBeanName(beanName);
|
|
|
+ entity.setMethodName(methodName);
|
|
|
+ } else {
|
|
|
+ isHandleSuccess = false;
|
|
|
+ handleMsg = "消息处理方法未找到";
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ isHandleSuccess = false;
|
|
|
+ handleMsg = String.format("业务处理出错:%S", e.getMessage());
|
|
|
+ }
|
|
|
+ stopWatch.stop();
|
|
|
+ entity.setHandlerTime(stopWatch.getLastTaskTimeMillis());
|
|
|
+ // 返回结果
|
|
|
+ entity.setResultData(result);
|
|
|
+ // 业务处理失败
|
|
|
+ entity.setIsHandleSuccess(isHandleSuccess);
|
|
|
+ entity.setHandleMsg(handleMsg);
|
|
|
+ log.info("消息处理结果: {} {}", isHandleSuccess, handleMsg);
|
|
|
+ if (isHandleSuccess) {
|
|
|
+ // 处理成功,返回响应
|
|
|
+ return responseMessage(entity);
|
|
|
+ } else {
|
|
|
+ // 处理失败,记录数据
|
|
|
+ entity.setReTime(System.currentTimeMillis());
|
|
|
+ entity.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
|
|
|
+ operationMessageDao.save(entity);
|
|
|
+ return ResultContent.buildFail(handleMsg);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 执行
|
|
|
+ *
|
|
|
+ * @param entity
|
|
|
+ * @param data
|
|
|
+ * @param iotMain
|
|
|
+ */
|
|
|
+ public void executeOperationMessage(OperationMessage entity, JSONObject data, IotMain iotMain) {
|
|
|
+ CompletableFuture.runAsync(() -> {
|
|
|
+ OperationMessageResult messageResult = new OperationMessageResult();
|
|
|
+ messageResult.setOperationMessage(entity);
|
|
|
+ messageResult.setIotMain(iotMain);
|
|
|
+ // 设备ID
|
|
|
+ messageResult.setDeviceId(iotMain.getDeviceId());
|
|
|
+ // 分组code
|
|
|
+ messageResult.setProjectCode(iotMain.getProjectCode());
|
|
|
+ messageResult.setGateWayId(entity.getGateWayId());
|
|
|
+ messageResult.setRealIotTopic(iotMain.getRealIotTopic());
|
|
|
+
|
|
|
+ boolean isHandleSuccess = true;
|
|
|
+ String handleMsg = "处理成功";
|
|
|
+
|
|
|
+ Long handlerTime = System.currentTimeMillis();
|
|
|
+ HxzBaseResult handleError = new HxzBaseResult();
|
|
|
+ // 是否需要返回
|
|
|
+ Boolean isNeedReplay = Boolean.TRUE;
|
|
|
+ String remoteUrl = iotMain.getRemoteUrl();
|
|
|
+ if (StringUtils.isEmpty(remoteUrl)) {
|
|
|
+ isHandleSuccess = false;
|
|
|
+ isNeedReplay = Boolean.TRUE;
|
|
|
+ handleMsg = "业务处理地址为空";
|
|
|
+ } else {
|
|
|
+ // 验证参数
|
|
|
+ ResultContent verifyContent = iotDataVerifyService.verifyIotParam(iotMain, data);
|
|
|
+ if (verifyContent.isSuccess()) {
|
|
|
+ // 参数验证成功
|
|
|
+ org.springframework.util.StopWatch stopWatch = new org.springframework.util.StopWatch();
|
|
|
+ stopWatch.start();
|
|
|
+
|
|
|
+ // 请求业务端处理
|
|
|
+ APIResponseModel apiResponseModel = apiRequestService.requestAPI(remoteUrl, data);
|
|
|
+ messageResult.setResultData(apiResponseModel);
|
|
|
+ if (apiResponseModel.isSuccess()) {
|
|
|
+ // 处理成功
|
|
|
+ String content = apiResponseModel.getContent();
|
|
|
+ JSONObject object = new JSONObject();
|
|
|
+ if (ObjectUtils.isNotEmpty(content)) {
|
|
|
+ object = JSONUtil.parseObj(content);
|
|
|
+ }
|
|
|
+ messageResult.setReplayData(object);
|
|
|
+ isHandleSuccess = true;
|
|
|
+ if (iotMain.getIsReturnData() != null && iotMain.getIsReturnData()) {
|
|
|
+ isNeedReplay = true;
|
|
|
+ } else {
|
|
|
+ isNeedReplay = false;
|
|
|
+ messageResult.setIsResult(Boolean.TRUE);
|
|
|
+ messageResult.setReIsSuccess(Boolean.FALSE);
|
|
|
+ messageResult.setReMsg("无需返回");
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // 业务响应失败
|
|
|
+ isHandleSuccess = false;
|
|
|
+ handleMsg = apiResponseModel.getMsg();
|
|
|
+ }
|
|
|
+ stopWatch.stop();
|
|
|
+ messageResult.setUseTime(stopWatch.getTotalTimeMillis());
|
|
|
+ } else {
|
|
|
+ // 参数验证失败
|
|
|
+ isHandleSuccess = false;
|
|
|
+ isNeedReplay = Boolean.TRUE;
|
|
|
+ handleMsg = verifyContent.getMsg();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!isHandleSuccess) {
|
|
|
+ handleError.setFailed(handleMsg);
|
|
|
+ JSONObject replayData = JSONUtil.parseObj(handleError);
|
|
|
+ messageResult.setReplayData(replayData);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (isNeedReplay) {
|
|
|
+ // 返回
|
|
|
+ JSONObject jsonObject = new JSONObject();
|
|
|
+ jsonObject.put("id", entity.getDataId());
|
|
|
+ jsonObject.put("data", messageResult.getReplayData());
|
|
|
+ jsonObject.put("time", System.currentTimeMillis());
|
|
|
+ jsonObject.put("ttl", entity.getTtlTime());
|
|
|
+ jsonObject.put("event", entity.getEvent());
|
|
|
+ jsonObject.put("deviceId", entity.getDeviceId());
|
|
|
+ jsonObject.put("gateWayId", entity.getGateWayId());
|
|
|
+ String reTopic = String.format("%s/reply", entity.getTopic());
|
|
|
+ String reMsg = "响应成功";
|
|
|
+ Boolean reIsSuccess = Boolean.TRUE;
|
|
|
+ try {
|
|
|
+ mqClient.sendObject(reTopic, JSONUtil.toJsonStr(jsonObject), entity.getDataId());
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ reIsSuccess = Boolean.FALSE;
|
|
|
+ reMsg = "mqtt响应出错:" + e.getMessage();
|
|
|
+ }
|
|
|
+ messageResult.setIsResult(Boolean.TRUE);
|
|
|
+ messageResult.setReIsSuccess(reIsSuccess);
|
|
|
+ messageResult.setReMsg(reMsg);
|
|
|
+ messageResult.setReTime(System.currentTimeMillis());
|
|
|
+ messageResult.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
|
|
|
+ messageResult.setReTopic(reTopic);
|
|
|
+ }
|
|
|
+ messageResult.setIsHandleSuccess(isHandleSuccess);
|
|
|
+ messageResult.setHandleMsg(handleMsg);
|
|
|
+ messageResult.setHandlerTime(handlerTime);
|
|
|
+ operationMessageResultDao.save(messageResult);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 响应数据
|
|
|
+ *
|
|
|
+ * @param entity
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public ResultContent responseMessage(OperationMessage entity) {
|
|
|
+ JSONObject jsonObject = new JSONObject();
|
|
|
+ jsonObject.put("id", entity.getDataId());
|
|
|
+ Object data = entity.getResultData();
|
|
|
+ JSONObject object = new JSONObject();
|
|
|
+ if (ObjectUtils.isNotEmpty(data)) {
|
|
|
+ String _str = JSONUtil.toJsonStr(data);
|
|
|
+ object = JSONUtil.parseObj(_str);
|
|
|
+ }
|
|
|
+ jsonObject.put("data", object);
|
|
|
+ jsonObject.put("time", System.currentTimeMillis());
|
|
|
+ jsonObject.put("ttl", entity.getTtlTime());
|
|
|
+ jsonObject.put("event", entity.getEvent());
|
|
|
+ String reTopic = String.format("%s/reply", entity.getTopic());
|
|
|
+
|
|
|
+ String reMsg = "响应成功";
|
|
|
+ Boolean reIsSuccess = Boolean.TRUE;
|
|
|
+ try {
|
|
|
+ mqClient.sendObject(reTopic, JSONUtil.toJsonStr(jsonObject), entity.getDataId());
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ reIsSuccess = Boolean.FALSE;
|
|
|
+ reMsg = "mqtt响应出错:" + e.getMessage();
|
|
|
+ }
|
|
|
+ entity.setIsResult(Boolean.TRUE);
|
|
|
+ entity.setReIsSuccess(reIsSuccess);
|
|
|
+ entity.setReMsg(reMsg);
|
|
|
+ entity.setReTime(System.currentTimeMillis());
|
|
|
+ entity.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
|
|
|
+ entity.setReTopic(reTopic);
|
|
|
+ operationMessageDao.save(entity);
|
|
|
+ return ResultContent.buildSuccess();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 指令列表
|
|
|
+ *
|
|
|
+ * @param pageable
|
|
|
+ * @param param
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public ResultContent<Page<OperationMessageModel>> page(Pageable pageable, OperationMessageSearchParam param) {
|
|
|
+ Page<OperationMessage> page = operationMessageDao.page(pageable, param);
|
|
|
+ return ResultContent.buildSuccess(PageEntityUtil.toPageModel(page, this::toModel));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 设备消息列表日志
|
|
|
+ *
|
|
|
+ * @param pageable
|
|
|
+ * @param param
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public ResultContent<Page<OperationMessageResultModel>> pageMessage(Pageable pageable, OperationMessageResultSearch param) {
|
|
|
+ Page<OperationMessageResult> page = operationMessageResultDao.page(pageable, param);
|
|
|
+ return ResultContent.buildSuccess(PageEntityUtil.toPageModel(page, this::toModel));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 根据数据ID查询数据详情
|
|
|
+ *
|
|
|
+ * @param id
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public ResultContent<OperationMessageResultModel> getMessageById(String id) {
|
|
|
+ OperationMessageResult messageResult = operationMessageResultDao.findTopById(id);
|
|
|
+ if (ObjectUtils.isEmpty(messageResult)) {
|
|
|
+ return ResultContent.buildFail(String.format("数据ID不存在:%s", id));
|
|
|
+ }
|
|
|
+ OperationMessageResultModel model = toModel(messageResult);
|
|
|
+ return ResultContent.buildSuccess(model);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 标记消息已收到
|
|
|
+ *
|
|
|
+ * @param messageId
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public ResultContent receiveMessage(String messageId) {
|
|
|
+ OperationMessage operationMessage = operationMessageDao.findTopByMessageId(messageId);
|
|
|
+ if (ObjectUtils.isEmpty(operationMessage)) {
|
|
|
+ return ResultContent.buildFail(String.format("%s指令信息不存在", messageId));
|
|
|
+ }
|
|
|
+ if (operationMessage.getIsReceive() != null && operationMessage.getIsReceive()) {
|
|
|
+ return ResultContent.buildFail("该指令已响应");
|
|
|
+ }
|
|
|
+ operationMessage.setIsReceive(Boolean.TRUE);
|
|
|
+ operationMessage.setReceiveTime(System.currentTimeMillis());
|
|
|
+ operationMessage.setReceiveTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
|
|
|
+
|
|
|
+ operationMessageDao.save(operationMessage);
|
|
|
+ return ResultContent.buildSuccess();
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperationMessageModel toModel(OperationMessage entity) {
|
|
|
+ OperationMessageModel model = new OperationMessageModel();
|
|
|
+ if (ObjectUtils.isNotEmpty(entity)) {
|
|
|
+ BeanUtils.copyProperties(entity, model);
|
|
|
+ model.setDeviceInfo(deviceInfoService.toModel(entity.getDeviceInfo()));
|
|
|
+ model.setGateWayInfo(gateWayInfoService.toModel(entity.getGateWayInfo()));
|
|
|
+ }
|
|
|
+ return model;
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperationMessageResultModel toModel(OperationMessageResult entity) {
|
|
|
+ OperationMessageResultModel model = null;
|
|
|
+ if (ObjectUtils.isNotEmpty(entity)) {
|
|
|
+ model = new OperationMessageResultModel();
|
|
|
+
|
|
|
+ }
|
|
|
+ return model;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|