TRX 1 rok temu
rodzic
commit
e0cddc78ff
18 zmienionych plików z 577 dodań i 59 usunięć
  1. 3 0
      src/main/java/com/zswl/dataservice/auth/UserContextInterceptor.java
  2. 21 0
      src/main/java/com/zswl/dataservice/controller/hardware/OperationMessageController.java
  3. 1 0
      src/main/java/com/zswl/dataservice/dao/iot/IotMainDao.java
  4. 2 0
      src/main/java/com/zswl/dataservice/dao/mqtt/GateWay2DeviceDao.java
  5. 17 0
      src/main/java/com/zswl/dataservice/dao/mqtt/OperationMessageResultDao.java
  6. 18 0
      src/main/java/com/zswl/dataservice/dao/mqtt/extend/OperationMessageResultDaoExtend.java
  7. 82 0
      src/main/java/com/zswl/dataservice/dao/mqtt/impl/OperationMessageResultDaoImpl.java
  8. 3 0
      src/main/java/com/zswl/dataservice/domain/iot/IotMain.java
  9. 5 4
      src/main/java/com/zswl/dataservice/domain/mqtt/OperationMessage.java
  10. 19 7
      src/main/java/com/zswl/dataservice/domain/mqtt/OperationMessageResult.java
  11. 30 0
      src/main/java/com/zswl/dataservice/httpRequest/ApiRequestService.java
  12. 9 0
      src/main/java/com/zswl/dataservice/httpRequest/apiConf/APIResponseModel.java
  13. 79 0
      src/main/java/com/zswl/dataservice/model/artemis/OperationMessageResultModel.java
  14. 25 0
      src/main/java/com/zswl/dataservice/model/artemis/OperationMessageResultParam.java
  15. 29 0
      src/main/java/com/zswl/dataservice/model/artemis/OperationMessageResultSearch.java
  16. 220 46
      src/main/java/com/zswl/dataservice/service/artemis/OperationMessageService.java
  17. 13 2
      src/main/java/com/zswl/dataservice/service/iot/IotServiceImpl.java
  18. 1 0
      src/main/java/com/zswl/dataservice/service/mqtt/GateWayInfoService.java

+ 3 - 0
src/main/java/com/zswl/dataservice/auth/UserContextInterceptor.java

@@ -54,6 +54,9 @@ public class UserContextInterceptor implements HandlerInterceptor {
                 JSONObject data = new JSONObject();
                 data.put("state", "Fail");
                 data.put("message", ue.getMessage());
+                JSONObject exception = new JSONObject();
+                exception.put("type", "AuthenticationCredentialsNotFoundException");
+                data.put("exception", exception);
                 response.getWriter().write(data.toString());
                 response.getWriter().flush();
                 response.getWriter().close();

+ 21 - 0
src/main/java/com/zswl/dataservice/controller/hardware/OperationMessageController.java

@@ -1,6 +1,8 @@
 package com.zswl.dataservice.controller.hardware;
 
 import com.zswl.dataservice.model.artemis.OperationMessageModel;
+import com.zswl.dataservice.model.artemis.OperationMessageResultModel;
+import com.zswl.dataservice.model.artemis.OperationMessageResultSearch;
 import com.zswl.dataservice.model.artemis.OperationMessageSearchParam;
 import com.zswl.dataservice.service.mqtt.DeviceInfoService;
 import com.zswl.dataservice.service.artemis.OperationMessageService;
@@ -15,6 +17,7 @@ import org.springframework.data.web.PageableDefault;
 import org.springframework.validation.annotation.Validated;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
 /**
@@ -47,4 +50,22 @@ public class OperationMessageController {
     public ResultContent<Page<OperationMessageModel>> page(@Parameter(hidden = true) @PageableDefault(page = 0, size = 10) Pageable pageable, @Parameter(required = false) OperationMessageSearchParam param) {
         return operationMessageService.page(pageable, param);
     }
+
+    @Operation(summary = "日志列表-分页查询")
+    @RequestMapping(value = {"pageMessage"}, method = {RequestMethod.POST})
+    public ResultContent<Page<OperationMessageResultModel>> pageMessage(
+            @Parameter(hidden = true) @PageableDefault(page = 0, size = 10) Pageable pageable,
+            @Parameter(required = false) OperationMessageResultSearch param) {
+        return operationMessageService.pageMessage(pageable, param);
+    }
+
+    @Operation(summary = "得到设备日志详情")
+    @RequestMapping(value = "getMessageById", method = {RequestMethod.GET})
+    public ResultContent<OperationMessageResultModel> getMessageById(
+            @Parameter(name = "id", description = "数据ID", example = "")
+            @RequestParam("id") String id) {
+        return operationMessageService.getMessageById(id);
+    }
+
+
 }

+ 1 - 0
src/main/java/com/zswl/dataservice/dao/iot/IotMainDao.java

@@ -26,4 +26,5 @@ public interface IotMainDao extends MongoDao<IotMain>, IotMainDaoExtend {
 
     List<IotMain> findByIotTemplateAndFunctionTypeOrderByCreateTimeAsc(IotTemplate iotTemplate, FunctionType functionType);
 
+    List<IotMain> findByRealIotTopicAndFunctionTypeOrderByCreateTimeAsc(String realIotTopic, FunctionType functionType);
 }

+ 2 - 0
src/main/java/com/zswl/dataservice/dao/mqtt/GateWay2DeviceDao.java

@@ -15,4 +15,6 @@ public interface GateWay2DeviceDao extends MongoDao<GateWay2Device> {
 
     // 根据设备查询绑定关系
     List<GateWay2Device> findByDeviceInfo(DeviceInfo deviceInfo);
+
+    GateWay2Device findTopByDeviceInfoOrderByUpdateTimeDesc(DeviceInfo deviceInfo);
 }

+ 17 - 0
src/main/java/com/zswl/dataservice/dao/mqtt/OperationMessageResultDao.java

@@ -0,0 +1,17 @@
+package com.zswl.dataservice.dao.mqtt;
+
+import com.zswl.dataservice.dao.MongoDao;
+import com.zswl.dataservice.dao.mqtt.extend.OperationMessageDaoExtend;
+import com.zswl.dataservice.dao.mqtt.extend.OperationMessageResultDaoExtend;
+import com.zswl.dataservice.domain.mqtt.OperationMessage;
+import com.zswl.dataservice.domain.mqtt.OperationMessageResult;
+
+/**
+ * @author TRX
+ * @date 2024/3/21
+ */
+public interface OperationMessageResultDao extends MongoDao<OperationMessageResult>, OperationMessageResultDaoExtend {
+
+    OperationMessageResult findTopById(String id);
+
+}

+ 18 - 0
src/main/java/com/zswl/dataservice/dao/mqtt/extend/OperationMessageResultDaoExtend.java

@@ -0,0 +1,18 @@
+package com.zswl.dataservice.dao.mqtt.extend;
+
+import com.zswl.dataservice.domain.mqtt.OperationMessage;
+import com.zswl.dataservice.domain.mqtt.OperationMessageResult;
+import com.zswl.dataservice.model.artemis.OperationMessageResultSearch;
+import com.zswl.dataservice.model.artemis.OperationMessageSearchParam;
+import org.springframework.data.domain.Page;
+import org.springframework.data.domain.Pageable;
+
+/**
+ * @Author TRX
+ * @CreateDate: 2023/7/7
+ * @Version: 1.0
+ */
+public interface OperationMessageResultDaoExtend {
+    Page<OperationMessageResult> page(Pageable pageable, OperationMessageResultSearch param);
+
+}

+ 82 - 0
src/main/java/com/zswl/dataservice/dao/mqtt/impl/OperationMessageResultDaoImpl.java

@@ -0,0 +1,82 @@
+package com.zswl.dataservice.dao.mqtt.impl;
+
+import com.zswl.dataservice.dao.base.BaseImpl;
+import com.zswl.dataservice.dao.mqtt.extend.OperationMessageDaoExtend;
+import com.zswl.dataservice.dao.mqtt.extend.OperationMessageResultDaoExtend;
+import com.zswl.dataservice.domain.mqtt.OperationMessage;
+import com.zswl.dataservice.domain.mqtt.OperationMessageResult;
+import com.zswl.dataservice.helper.DBHelper;
+import com.zswl.dataservice.model.artemis.OperationMessageResultSearch;
+import com.zswl.dataservice.model.artemis.OperationMessageSearchParam;
+import com.zswl.dataservice.utils.CommonUtil;
+import com.zswl.dataservice.utils.TokenUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.domain.Page;
+import org.springframework.data.domain.Pageable;
+import org.springframework.data.domain.Sort;
+import org.springframework.data.mongodb.core.FindAndModifyOptions;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.data.mongodb.core.query.Criteria;
+import org.springframework.data.mongodb.core.query.Query;
+import org.springframework.data.mongodb.core.query.Update;
+import org.springframework.util.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * @Author TRX
+ * @CreateDate: 2023/4/12
+ * @Version: 1.0
+ */
+public class OperationMessageResultDaoImpl extends BaseImpl implements OperationMessageResultDaoExtend {
+
+    private static final Logger log = LoggerFactory.getLogger(OperationMessageResultDaoImpl.class);
+    @Autowired
+    private MongoTemplate mongoTemplate;
+
+    @Autowired
+    private DBHelper dbHelper;
+
+    @Override
+    public Page<OperationMessageResult> page(Pageable pageable, OperationMessageResultSearch param) {
+        Criteria criteria = new Criteria();
+
+        if (StringUtils.isNotEmpty(param.getDeviceId())) {
+            criteria.and("deviceId").is(param.getDeviceId());
+        }
+
+        if (StringUtils.isNotEmpty(param.getEpId())) {
+            criteria.and("epId").is(param.getEpId());
+        }
+
+        if (StringUtils.isNotEmpty(param.getGateWayId())) {
+            criteria.and("gateWayId").is(param.getGateWayId());
+        }
+
+        // 时间范围
+        if (!CommonUtil.longIsEmpty(param.getStartTime()) && !CommonUtil.longIsEmpty(param.getEndTime())) {
+            criteria.and("createTime").gte(param.getStartTime()).lte(param.getEndTime());
+        }
+
+        // 模糊搜索
+        List<Criteria> criterias = new ArrayList<>();
+        if (StringUtils.isNotEmpty(param.getRealIotTopic())) {
+            Pattern pattern = Pattern.compile("^.*" + param.getRealIotTopic() + ".*$");
+            criterias.add(Criteria.where("realIotTopic").is(pattern));
+        }
+        if (!CollectionUtils.isEmpty(criterias)) {
+            criteria.andOperator(criterias.toArray(new Criteria[]{}));
+        }
+        criteria.and("isDelete").is(Boolean.FALSE);
+
+        Sort sort = buildSort(param);
+        Query query = Query.query(criteria);
+        query.with(sort);
+        return dbHelper.pages(query, pageable, OperationMessageResult.class);
+    }
+}

+ 3 - 0
src/main/java/com/zswl/dataservice/domain/iot/IotMain.java

@@ -46,6 +46,9 @@ public class IotMain extends SuperEntity {
     @DBRef(lazy = true)
     IotTemplate iotTemplate;
 
+    @Schema(description = "是否返回数据")
+    private Boolean isReturnData = Boolean.TRUE;
+
     //---------------------------------属性类型的  字段  start ------------------
     @Schema(description = "数据类型,number、Boolean、String 等")
     private DataType dataType;

+ 5 - 4
src/main/java/com/zswl/dataservice/domain/mqtt/OperationMessage.java

@@ -45,7 +45,7 @@ public class OperationMessage extends SuperEntity {
     @Schema(description = "ttl时间(毫秒数)")
     private Long ttlTime;
 
-    @Schema(description = "发送时间")
+    @Schema(description = "发送时间,消息里的时间")
     private Long sendTime;
 
     @Schema(description = "接收的消息是否超时")
@@ -71,12 +71,13 @@ public class OperationMessage extends SuperEntity {
     @Schema(description = "网关ID")
     private String gateWayId;
 
-    @Schema(description = "消息创建时间")
-    private String time;
-
     @Schema(description = "mqtt消息类型: 发送 接收")
     OperationType operationType;
 
+    @Schema(description = "消息创建时间,服务器的时间")
+    private String time;
+
+    @Schema(description = "消息的过期时间")
     @Indexed(expireAfterSeconds = 0)
     private Date ttl;
 

+ 19 - 7
src/main/java/com/zswl/dataservice/domain/mqtt/OperationMessageResult.java

@@ -19,33 +19,45 @@ import org.springframework.data.mongodb.core.mapping.Document;
 @AllArgsConstructor
 public class OperationMessageResult extends SuperEntity {
 
-    @Schema(description = "消息消息")
+    @Schema(description = "消息内容")
     @DBRef(lazy = true)
     private OperationMessage operationMessage;
 
     @Schema(description = "关联的事件")
     private IotMain iotMain;
 
+    @Schema(description = "设备ID")
+    private String deviceId;
+
+    @Schema(description = "分组ID")
+    private String projectCode;
+
+    @Schema(description = "网关ID")
+    private String gateWayId;
+
+    @Schema(description = "topic")
+    private String realIotTopic;
+
     //-------------------业务处理结果 start----------------
 
     @Schema(description = "是否处理成功")
     private Boolean isHandleSuccess;
 
+    @Schema(description = "处理的响应返回码")
+    private Integer resultCode = 200;
+
     @Schema(description = "业务结果数据")
     private Object resultData;
 
     @Schema(description = "处理结果")
     private String handleMsg;
 
-    @Schema(description = "业务处理的bean")
-    private String beanName;
-
-    @Schema(description = "业务处理的方法")
-    private String methodName;
-
     @Schema(description = "处理时间")
     private Long handlerTime;
 
+    @Schema(description = "请求处理时间,毫秒")
+    private Long useTime = 0L;
+
     //--------------------返回数据 start(就是把结果发送给响应网关)-------------
     @Schema(description = "是否响应成功")
     private Boolean isResult = Boolean.FALSE;

+ 30 - 0
src/main/java/com/zswl/dataservice/httpRequest/ApiRequestService.java

@@ -11,6 +11,7 @@ import com.zswl.dataservice.service.base.SuperService;
 import com.zswl.dataservice.service.other.RequestInfoService;
 import com.zswl.dataservice.utils.net.apache.HttpClientUtil;
 import com.zswl.dataservice.utils.net.apache.HttpModel;
+import com.zswl.dataservice.utils.net.apache.MethodType;
 import com.zswl.dataservice.utils.net.apache.ResponseModel;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.ObjectUtils;
@@ -78,4 +79,33 @@ public class ApiRequestService extends SuperService {
         return responseModel;
     }
 
+
+    public APIResponseModel requestAPI(String url, Object data) {
+        APIResponseModel responseModel = new APIResponseModel();
+        try {
+            ApiConfParam apiConfParam = new ApiConfParam();
+            apiConfParam.setApiName(url);
+            apiConfParam.setMethodType(MethodType.Json);
+            StopWatch stopWatch = new StopWatch();
+            stopWatch.start();
+            ResponseModel request = HttpClientUtil.request(HttpModel.builder()
+                    .url(url).method(apiConfParam.getMethodType()).charset("utf-8").body(data).build());
+            if (request.getCode() == 200) {
+                responseModel = BeanUtil.copyProperties(request.getBody(), APIResponseModel.class);
+            } else {
+                responseModel = BeanUtil.copyProperties(request.getBody(), APIResponseModel.class);
+            }
+            responseModel.setParam(apiConfParam);
+            stopWatch.stop();
+            responseModel.setMillis(stopWatch.getTotalTimeMillis());
+        } catch (Exception e) {
+            e.printStackTrace();
+            responseModel.setIsFailed(String.format("请求出错:%s", e.getMessage()));
+            return responseModel;
+        }
+        // 记录请求日志
+        requestInfoService.addRequestInfo(data, responseModel);
+        return responseModel;
+    }
+
 }

+ 9 - 0
src/main/java/com/zswl/dataservice/httpRequest/apiConf/APIResponseModel.java

@@ -16,12 +16,21 @@ import org.apache.commons.lang3.StringUtils;
 @AllArgsConstructor
 @NoArgsConstructor
 public class APIResponseModel {
+
+    @Schema(description = "是否错误")
     private boolean failed;
+
+    @Schema(description = "是否成功")
     private boolean success;
+
+    @Schema(description = "返回消息")
     private String msg;
+
     private String state;
+
     private String content;
 
+    @Schema(description = "访问的URL")
     private ApiConfParam param;
 
     private Object data;

+ 79 - 0
src/main/java/com/zswl/dataservice/model/artemis/OperationMessageResultModel.java

@@ -0,0 +1,79 @@
+package com.zswl.dataservice.model.artemis;
+
+import com.zswl.dataservice.domain.iot.IotMain;
+import com.zswl.dataservice.domain.mqtt.OperationMessage;
+import com.zswl.dataservice.model.baseParam.SuperModel;
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.springframework.data.mongodb.core.mapping.DBRef;
+
+/**
+ * @author TRX
+ * @date 2024/7/15
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class OperationMessageResultModel extends SuperModel {
+    @Schema(description = "设备ID")
+    private String deviceId;
+
+    @Schema(description = "分组ID")
+    private String projectCode;
+
+    @Schema(description = "网关ID")
+    private String gateWayId;
+
+    @Schema(description = "消息内容")
+    private OperationMessageModel operationMessage;
+
+    @Schema(description = "关联的事件")
+    private IotMain iotMain;
+
+    @Schema(description = "topic")
+    private String realIotTopic;
+
+    //-------------------业务处理结果 start----------------
+
+    @Schema(description = "是否处理成功")
+    private Boolean isHandleSuccess;
+
+    @Schema(description = "处理的响应返回码")
+    private Integer resultCode = 200;
+
+    @Schema(description = "业务结果数据")
+    private Object resultData;
+
+    @Schema(description = "处理结果")
+    private String handleMsg;
+
+    @Schema(description = "业务处理的bean")
+    private String beanName;
+
+    @Schema(description = "业务处理的方法")
+    private String methodName;
+
+    @Schema(description = "处理时间")
+    private Long handlerTime;
+
+    //--------------------返回数据 start(就是把结果发送给响应网关)-------------
+    @Schema(description = "是否响应成功")
+    private Boolean isResult = Boolean.FALSE;
+
+    @Schema(description = "返回的topic")
+    private String reTopic;
+
+    @Schema(description = "返回-处理时间")
+    private Long reTime;
+
+    @Schema(description = "返回-处理时间可阅读")
+    private String reTimeStr;
+
+    @Schema(description = "响应结果")
+    private String reMsg;
+
+    @Schema(description = "是否响应成功")
+    private Boolean reIsSuccess;
+}

+ 25 - 0
src/main/java/com/zswl/dataservice/model/artemis/OperationMessageResultParam.java

@@ -0,0 +1,25 @@
+package com.zswl.dataservice.model.artemis;
+
+import com.zswl.dataservice.model.baseParam.SuperParam;
+import io.swagger.v3.oas.annotations.media.Schema;
+import jakarta.validation.constraints.NotEmpty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+/**
+ * @author TRX
+ * @date 2024/7/15
+ */
+@Data
+@AllArgsConstructor
+@NotEmpty
+public class OperationMessageResultParam extends SuperParam {
+    @Schema(description = "设备ID")
+    private String deviceId;
+
+    @Schema(description = "分组ID")
+    private String projectCode;
+
+    @Schema(description = "网关ID")
+    private String gateWayId;
+}

+ 29 - 0
src/main/java/com/zswl/dataservice/model/artemis/OperationMessageResultSearch.java

@@ -0,0 +1,29 @@
+package com.zswl.dataservice.model.artemis;
+
+import com.zswl.dataservice.model.baseParam.SuperSearchParam;
+import io.swagger.v3.oas.annotations.media.Schema;
+import jakarta.validation.constraints.NotEmpty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @author TRX
+ * @date 2024/7/15
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class OperationMessageResultSearch extends SuperSearchParam {
+    @Schema(description = "设备ID")
+    private String deviceId;
+
+    @Schema(description = "分组ID")
+    private String projectCode;
+
+    @Schema(description = "网关ID")
+    private String gateWayId;
+
+    @Schema(description = "topic")
+    private String realIotTopic;
+}

+ 220 - 46
src/main/java/com/zswl/dataservice/service/artemis/OperationMessageService.java

@@ -4,19 +4,28 @@ import cn.hutool.core.date.StopWatch;
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
 import com.google.gson.JsonObject;
+import com.zswl.dataservice.dao.iot.IotMainDao;
 import com.zswl.dataservice.dao.mqtt.DeviceInfoDao;
 import com.zswl.dataservice.dao.mqtt.OperationMessageDao;
+import com.zswl.dataservice.dao.mqtt.OperationMessageResultDao;
 import com.zswl.dataservice.dao.other.ExecuteMethodInfoDao;
+import com.zswl.dataservice.domain.iot.IotMain;
 import com.zswl.dataservice.domain.mqtt.DeviceInfo;
 import com.zswl.dataservice.domain.mqtt.OperationMessage;
+import com.zswl.dataservice.domain.mqtt.OperationMessageResult;
 import com.zswl.dataservice.domain.other.ExecuteMethodInfo;
+import com.zswl.dataservice.httpRequest.ApiRequestService;
+import com.zswl.dataservice.httpRequest.apiConf.APIResponseModel;
 import com.zswl.dataservice.model.artemis.OperationMessageModel;
+import com.zswl.dataservice.model.artemis.OperationMessageResultModel;
+import com.zswl.dataservice.model.artemis.OperationMessageResultSearch;
 import com.zswl.dataservice.model.artemis.OperationMessageSearchParam;
 import com.zswl.dataservice.model.mqtt.SendMessageModel;
 import com.zswl.dataservice.service.base.SuperService;
 import com.zswl.dataservice.service.mqtt.DeviceInfoService;
 import com.zswl.dataservice.service.mqtt.GateWayInfoService;
 import com.zswl.dataservice.service.payment.HxzService;
+import com.zswl.dataservice.type.FunctionType;
 import com.zswl.dataservice.type.OperationType;
 import com.zswl.dataservice.utils.DateUtils;
 import com.zswl.dataservice.utils.TokenUtil;
@@ -42,6 +51,7 @@ import org.springframework.stereotype.Service;
 
 import java.lang.reflect.Method;
 import java.util.Date;
+import java.util.List;
 import java.util.UUID;
 
 /**
@@ -82,6 +92,15 @@ public class OperationMessageService {
     @Autowired
     private ApplicationContext applicationContext;
 
+    @Autowired
+    IotMainDao iotMainDao;
+
+    @Autowired
+    OperationMessageResultDao operationMessageResultDao;
+
+    @Autowired
+    ApiRequestService apiRequestService;
+
     /**
      * 发送指令
      *
@@ -179,13 +198,6 @@ public class OperationMessageService {
 //                log.warn("回复消息不处理");
                 return;
             }
-            log.info("Topic: {}", topicName);
-            OperationMessage operationMessage = new OperationMessage();
-            operationMessage.setMessageId(messageId);
-            operationMessage.setClientId(clientId);
-            operationMessage.setTopic(topicName);
-            operationMessage.setOperationType(OperationType.Sub);
-
             String messageClass = "";
             String msg = "";
             if (message instanceof ActiveMQBytesMessage) {
@@ -201,11 +213,16 @@ public class OperationMessageService {
                 TextMessage textMessage = (TextMessage) message;
                 msg = textMessage.getText();
             }
+            // 提取内容里面的数据
             cn.hutool.json.JSONObject jsonObject = JSONUtil.parseObj(msg);
             String id = jsonObject.getStr("id");
             Long time = jsonObject.getLong("time");
             Long ttl = jsonObject.getLong("ttl");
             String event = jsonObject.getStr("event");
+            if (event.equals("ping")) {
+                return;
+            }
+            log.info("Topic: {}", topicName);
             String gateWayId = jsonObject.getStr("gatewayId");
             String deviceId = jsonObject.getStr("deviceId");
             boolean isTimeOut = false;
@@ -218,6 +235,11 @@ public class OperationMessageService {
             jsonObject.set("timeStr", timeStr);
 
             // --------------------处理业务 start------------------
+            OperationMessage operationMessage = new OperationMessage();
+            operationMessage.setMessageId(messageId);
+            operationMessage.setClientId(clientId);
+            operationMessage.setTopic(topicName); // top名称
+            operationMessage.setOperationType(OperationType.Sub);
             operationMessage.setMessageClass(messageClass);
             operationMessage.setData(jsonObject);
             operationMessage.setTtlTime(ttl);
@@ -263,7 +285,9 @@ public class OperationMessageService {
      * @return
      */
     public ResultContent addOperationMessage(OperationMessage entity) {
+        // 服务器接收消息的时间
         entity.setTime(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
+        // 消息的过期时间
         entity.setTtl(new Date(System.currentTimeMillis() + ttlMill));
         if (entity.getEvent().equals("ping")) {
             Date da = new Date(System.currentTimeMillis() + 10 * 60 * 1000);
@@ -298,8 +322,11 @@ public class OperationMessageService {
             Object result = null;
             StopWatch stopWatch = new StopWatch();
             stopWatch.start();
+
             boolean isHandleSuccess = true;
             String handleMsg = "处理成功";
+
+            // 业务处理的消息内容
             JSONObject jsonObject1 = (JSONObject) json.get("data");
             jsonObject1.put("mqttDataId", entity.getDataId());
             jsonObject1.put("GateWayId", entity.getGateWayId());
@@ -309,54 +336,165 @@ public class OperationMessageService {
                 jsonObject1.put("DeviceId", DeviceId);
             }
             entity.setDeviceId(DeviceId);
-            try {
-                ExecuteMethodInfo executeMethodInfo = executeMethodInfoDao.findTopByEvent(event);
-                if (ObjectUtils.isNotEmpty(executeMethodInfo)) {
-                    String dataStr = JSONUtil.toJsonStr(jsonObject1);
-
-                    String beanName = executeMethodInfo.getBeanName();
-                    String methodName = executeMethodInfo.getMethodName();
-                    Class c = applicationContext.getBean(beanName).getClass();
-                    SuperService t = (SuperService) applicationContext.getBean(beanName);
-                    Method method = c.getMethod(methodName, String.class);
-                    ResultContent<Object> resultContent = (ResultContent<Object>) method.invoke(t, dataStr);
-                    if (resultContent.isSuccess()) {
-                        result = resultContent.getContent();
+
+            // 查询有对应事件的设备
+            List<IotMain> events = iotMainDao.findByRealIotTopicAndFunctionTypeOrderByCreateTimeAsc(entity.getTopic(), FunctionType.Event);
+            if (ObjectUtils.isNotEmpty(events)) {
+                String dataStr = JSONUtil.toJsonStr(jsonObject1);
+                for (IotMain iotMain : events) {
+                    executeOperationMessage(entity, dataStr, iotMain);
+                }
+            } else {
+                isHandleSuccess = false;
+                handleMsg = "无对应物模型事件处理";
+                // 返回结果
+                entity.setResultData(result);
+                // 业务处理失败
+                entity.setIsHandleSuccess(isHandleSuccess);
+                entity.setHandleMsg(handleMsg);
+                entity.setReTime(System.currentTimeMillis());
+                entity.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
+                operationMessageDao.save(entity);
+            }
+
+            if (event.equals("ping")) {
+                // 查询业务处理端信息
+                try {
+                    ExecuteMethodInfo executeMethodInfo = executeMethodInfoDao.findTopByEvent(event);
+                    if (ObjectUtils.isNotEmpty(executeMethodInfo)) {
+                        String dataStr = JSONUtil.toJsonStr(jsonObject1);
+
+                        String beanName = executeMethodInfo.getBeanName();
+                        String methodName = executeMethodInfo.getMethodName();
+                        Class c = applicationContext.getBean(beanName).getClass();
+                        SuperService t = (SuperService) applicationContext.getBean(beanName);
+                        Method method = c.getMethod(methodName, String.class);
+                        ResultContent<Object> resultContent = (ResultContent<Object>) method.invoke(t, dataStr);
+                        if (resultContent.isSuccess()) {
+                            isHandleSuccess = true;
+                            handleMsg = "处理成功";
+                            result = resultContent.getContent();
+                        } else {
+                            isHandleSuccess = false;
+                            handleMsg = resultContent.getMsg();
+                        }
+                        entity.setBeanName(beanName);
+                        entity.setMethodName(methodName);
                     } else {
                         isHandleSuccess = false;
-                        handleMsg = resultContent.getMsg();
+                        handleMsg = "消息处理方法未找到";
                     }
-                    entity.setBeanName(beanName);
-                    entity.setMethodName(methodName);
-                } else {
+                } catch (Exception e) {
+                    e.printStackTrace();
                     isHandleSuccess = false;
-                    handleMsg = "消息处理方法未找到";
+                    handleMsg = String.format("业务处理出错:%S", e.getMessage());
+                }
+                stopWatch.stop();
+                entity.setHandlerTime(stopWatch.getLastTaskTimeMillis());
+                // 返回结果
+                entity.setResultData(result);
+                // 业务处理失败
+                entity.setIsHandleSuccess(isHandleSuccess);
+                entity.setHandleMsg(handleMsg);
+                log.info("消息处理结果: {} {}", isHandleSuccess, handleMsg);
+                if (isHandleSuccess) {
+                    // 处理成功,返回响应
+                    responseMessage(entity);
+                } else {
+                    // 处理失败,记录数据
+                    entity.setReTime(System.currentTimeMillis());
+                    entity.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
+                    operationMessageDao.save(entity);
+                    return ResultContent.buildFail(handleMsg);
                 }
-            } catch (Exception e) {
-                e.printStackTrace();
-                isHandleSuccess = false;
-                handleMsg = String.format("业务处理出错:%S", e.getMessage());
             }
-            stopWatch.stop();
-            entity.setHandlerTime(stopWatch.getLastTaskTimeMillis());
-            // 返回结果
-            entity.setResultData(result);
-            // 业务处理失败
-            entity.setIsHandleSuccess(isHandleSuccess);
-            entity.setHandleMsg(handleMsg);
-            log.info("消息处理结果: {} {}", isHandleSuccess, handleMsg);
-            if (isHandleSuccess) {
-                // 处理成功,返回响应
-                responseMessage(entity);
+        }
+        return ResultContent.buildSuccess();
+    }
+
+    /**
+     * 执行
+     *
+     * @param entity
+     * @param dataStr
+     * @param iotMain
+     */
+    public void executeOperationMessage(OperationMessage entity, String dataStr, IotMain iotMain) {
+        OperationMessageResult messageResult = new OperationMessageResult();
+        messageResult.setOperationMessage(entity);
+        messageResult.setIotMain(iotMain);
+        messageResult.setDeviceId(iotMain.getDeviceId());
+        messageResult.setProjectCode(iotMain.getProjectCode());
+        messageResult.setGateWayId(entity.getGateWayId());
+        messageResult.setRealIotTopic(iotMain.getRealIotTopic());
+
+        boolean isHandleSuccess = true;
+        String handleMsg = "";
+        Long handlerTime = System.currentTimeMillis();
+
+        String remoteUrl = iotMain.getRemoteUrl();
+        if (StringUtils.isEmpty(remoteUrl)) {
+            isHandleSuccess = false;
+            handleMsg = "";
+        } else {
+            org.springframework.util.StopWatch stopWatch = new org.springframework.util.StopWatch();
+            stopWatch.start();
+
+            APIResponseModel apiResponseModel = apiRequestService.requestAPI(remoteUrl, dataStr);
+            messageResult.setResultData(apiResponseModel);
+
+            if (apiResponseModel.isSuccess()) {
+                // 处理成功
+                String content = apiResponseModel.getContent();
+                entity.setResultData(content);
+                if (iotMain.getIsReturnData() != null && iotMain.getIsReturnData()) {
+                    // 返回
+                    JSONObject jsonObject = new JSONObject();
+                    jsonObject.put("id", entity.getDataId());
+
+                    JSONObject object = new JSONObject();
+                    if (ObjectUtils.isNotEmpty(content)) {
+                        object = JSONUtil.parseObj(content);
+                    }
+                    jsonObject.put("data", object);
+                    jsonObject.put("time", System.currentTimeMillis());
+                    jsonObject.put("ttl", entity.getTtlTime());
+                    jsonObject.put("event", entity.getEvent());
+                    jsonObject.put("deviceId", entity.getDeviceId());
+                    jsonObject.put("gateWayId", entity.getGateWayId());
+
+                    String reTopic = String.format("%s/reply", entity.getTopic());
+                    String reMsg = "响应成功";
+                    Boolean reIsSuccess = Boolean.TRUE;
+                    try {
+                        mqClient.sendObject(reTopic, JSONUtil.toJsonStr(jsonObject), entity.getDataId());
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        reIsSuccess = Boolean.FALSE;
+                        reMsg = "mqtt响应出错:" + e.getMessage();
+                    }
+                    messageResult.setIsResult(Boolean.TRUE);
+                    messageResult.setReIsSuccess(reIsSuccess);
+                    messageResult.setReMsg(reMsg);
+                    messageResult.setReTime(System.currentTimeMillis());
+                    messageResult.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
+                    messageResult.setReTopic(reTopic);
+                } else {
+                    messageResult.setIsResult(Boolean.TRUE);
+                    messageResult.setReIsSuccess(Boolean.FALSE);
+                    messageResult.setReMsg("无需返回");
+                }
             } else {
-                // 处理失败,记录数据
-                entity.setReTime(System.currentTimeMillis());
-                entity.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
-                operationMessageDao.save(entity);
-                return ResultContent.buildFail(handleMsg);
+                isHandleSuccess = false;
+                handleMsg = apiResponseModel.getMsg();
             }
+            stopWatch.stop();
+            messageResult.setUseTime(stopWatch.getTotalTimeMillis());
         }
-        return ResultContent.buildSuccess();
+        messageResult.setIsHandleSuccess(isHandleSuccess);
+        messageResult.setHandleMsg(handleMsg);
+        messageResult.setHandlerTime(handlerTime);
+        operationMessageResultDao.save(messageResult);
     }
 
     /**
@@ -411,6 +549,33 @@ public class OperationMessageService {
         return ResultContent.buildSuccess(PageEntityUtil.toPageModel(page, this::toModel));
     }
 
+    /**
+     * 设备消息列表日志
+     *
+     * @param pageable
+     * @param param
+     * @return
+     */
+    public ResultContent<Page<OperationMessageResultModel>> pageMessage(Pageable pageable, OperationMessageResultSearch param) {
+        Page<OperationMessageResult> page = operationMessageResultDao.page(pageable, param);
+        return ResultContent.buildSuccess(PageEntityUtil.toPageModel(page, this::toModel));
+    }
+
+    /**
+     * 根据数据ID查询数据详情
+     *
+     * @param id
+     * @return
+     */
+    public ResultContent<OperationMessageResultModel> getMessageById(String id) {
+        OperationMessageResult messageResult = operationMessageResultDao.findTopById(id);
+        if (ObjectUtils.isEmpty(messageResult)) {
+            return ResultContent.buildFail(String.format("数据ID不存在:%s", id));
+        }
+        OperationMessageResultModel model = toModel(messageResult);
+        return ResultContent.buildSuccess(model);
+    }
+
     /**
      * 标记消息已收到
      *
@@ -443,4 +608,13 @@ public class OperationMessageService {
         return model;
     }
 
+    public OperationMessageResultModel toModel(OperationMessageResult entity) {
+        OperationMessageResultModel model = null;
+        if (ObjectUtils.isNotEmpty(entity)) {
+            model = new OperationMessageResultModel();
+
+        }
+        return model;
+    }
+
 }

+ 13 - 2
src/main/java/com/zswl/dataservice/service/iot/IotServiceImpl.java

@@ -6,11 +6,13 @@ import com.zswl.dataservice.dao.iot.IotMainDao;
 import com.zswl.dataservice.dao.iot.IotTemplateDao;
 import com.zswl.dataservice.dao.iot.IotTopicDao;
 import com.zswl.dataservice.dao.mqtt.DeviceInfoDao;
+import com.zswl.dataservice.dao.mqtt.GateWay2DeviceDao;
 import com.zswl.dataservice.dataConfig.ResultMessage;
 import com.zswl.dataservice.domain.iot.IotMain;
 import com.zswl.dataservice.domain.iot.IotTemplate;
 import com.zswl.dataservice.domain.iot.IotTopic;
 import com.zswl.dataservice.domain.mqtt.DeviceInfo;
+import com.zswl.dataservice.domain.mqtt.GateWay2Device;
 import com.zswl.dataservice.model.baseParam.NameModel;
 import com.zswl.dataservice.model.iot.*;
 import com.zswl.dataservice.model.mqtt.DeviceInfoModel;
@@ -60,6 +62,7 @@ public class IotServiceImpl extends SuperService {
 
     @Autowired
     DeviceInfoDao deviceInfoDao;
+    private GateWay2DeviceDao gateWay2DeviceDao;
 
     //----------------------------- 模版 start----------------------------
 
@@ -372,9 +375,17 @@ public class IotServiceImpl extends SuperService {
             // 把 topic有占位符的换为具体的值
             String deviceId = iotMain.getDeviceId();
             String iotTopic = iotMain.getIotTopic();
-            String realIotTopic = "";
+            String realIotTopic = iotTopic;
             if (StringUtils.isNotEmpty(iotTopic)) {
-                realIotTopic = iotTopic.replaceAll(Pattern.quote("${deviceId}"), deviceId);
+                realIotTopic = realIotTopic.replaceAll(Pattern.quote("${deviceId}"), deviceId);
+            }
+            DeviceInfo deviceInfo = deviceInfoDao.findTopByDeviceId(deviceId);
+            if (ObjectUtils.isNotEmpty(deviceInfo)) {
+                GateWay2Device gateWay2Device = gateWay2DeviceDao.findTopByDeviceInfoOrderByUpdateTimeDesc(deviceInfo);
+                if (ObjectUtils.isNotEmpty(gateWay2Device)) {
+                    String gateWayId = gateWay2Device.getGateWayInfo().getGateWayId();
+                    realIotTopic = realIotTopic.replaceAll(Pattern.quote("${gateWayId}"), gateWayId);
+                }
             }
             iotMain.setRealIotTopic(realIotTopic);
         }

+ 1 - 0
src/main/java/com/zswl/dataservice/service/mqtt/GateWayInfoService.java

@@ -188,6 +188,7 @@ public class GateWayInfoService extends SuperService {
                 ResultContent<DeviceInfo> resultContent = deviceInfoService.addDeviceInfo(device);
                 DeviceInfo deviceInfo = resultContent.getContent();
 
+                // 设备可以绑定到多个网关,一个网关只能绑定设备一次
                 GateWay2Device gateWay2Device = gateWay2DeviceDao.findTopByGateWayInfoAndDeviceInfo(gateWayInfo, deviceInfo);
                 if (ObjectUtils.isEmpty(gateWay2Device)) {
                     gateWay2Device = new GateWay2Device();