|
|
@@ -3,6 +3,7 @@ package com.zswl.dataservice.service.artemis;
|
|
|
import cn.hutool.core.date.StopWatch;
|
|
|
import cn.hutool.json.JSONObject;
|
|
|
import cn.hutool.json.JSONUtil;
|
|
|
+import com.github.microservice.models.hxz.base.HxzBaseResult;
|
|
|
import com.google.gson.JsonObject;
|
|
|
import com.zswl.dataservice.dao.iot.IotMainDao;
|
|
|
import com.zswl.dataservice.dao.mqtt.DeviceInfoDao;
|
|
|
@@ -22,6 +23,7 @@ import com.zswl.dataservice.model.artemis.OperationMessageResultSearch;
|
|
|
import com.zswl.dataservice.model.artemis.OperationMessageSearchParam;
|
|
|
import com.zswl.dataservice.model.mqtt.SendMessageModel;
|
|
|
import com.zswl.dataservice.service.base.SuperService;
|
|
|
+import com.zswl.dataservice.service.iot.IotDataVerifyService;
|
|
|
import com.zswl.dataservice.service.mqtt.DeviceInfoService;
|
|
|
import com.zswl.dataservice.service.mqtt.GateWayInfoService;
|
|
|
import com.zswl.dataservice.service.payment.HxzService;
|
|
|
@@ -102,6 +104,9 @@ public class OperationMessageService {
|
|
|
@Autowired
|
|
|
ApiRequestService apiRequestService;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ IotDataVerifyService iotDataVerifyService;
|
|
|
+
|
|
|
/**
|
|
|
* 发送指令
|
|
|
*
|
|
|
@@ -292,6 +297,8 @@ public class OperationMessageService {
|
|
|
entity.setTime(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
|
|
|
// 消息的过期时间
|
|
|
entity.setTtl(new Date(System.currentTimeMillis() + ttlMill));
|
|
|
+ boolean isTimeOut = entity.getIsTimeOut();
|
|
|
+ isTimeOut = true;
|
|
|
if (entity.getEvent().equals("ping")) {
|
|
|
Date da = new Date(System.currentTimeMillis() + 10 * 60 * 1000);
|
|
|
entity.setTtl(da);
|
|
|
@@ -299,8 +306,6 @@ public class OperationMessageService {
|
|
|
if (entity.getIsTimeOut()) {
|
|
|
entity.setHandleMsg("超时不处理");
|
|
|
}
|
|
|
- boolean isTimeOut = entity.getIsTimeOut();
|
|
|
- isTimeOut = true;
|
|
|
// TODO
|
|
|
if (isTimeOut) {
|
|
|
// 未超时,处理消息
|
|
|
@@ -322,13 +327,11 @@ public class OperationMessageService {
|
|
|
String event = entity.getEvent();
|
|
|
JSONObject json = (JSONObject) entity.getData();
|
|
|
if (json.containsKey("data")) {
|
|
|
- Object result = null;
|
|
|
StopWatch stopWatch = new StopWatch();
|
|
|
stopWatch.start();
|
|
|
|
|
|
boolean isHandleSuccess = true;
|
|
|
String handleMsg = "处理成功";
|
|
|
-
|
|
|
// 业务处理的消息内容
|
|
|
JSONObject requestData = (JSONObject) json.get("data");
|
|
|
requestData.put("mqttDataId", entity.getDataId());
|
|
|
@@ -349,8 +352,11 @@ public class OperationMessageService {
|
|
|
} else {
|
|
|
isHandleSuccess = false;
|
|
|
handleMsg = "无对应物模型事件处理";
|
|
|
- // 返回结果
|
|
|
- entity.setResultData(result);
|
|
|
+ }
|
|
|
+ // 如果是ping,则平台也会处理
|
|
|
+ if (event != null && event.equals("ping")) {
|
|
|
+ pingHandler(entity, requestData);
|
|
|
+ } else {
|
|
|
// 业务处理失败
|
|
|
entity.setIsHandleSuccess(isHandleSuccess);
|
|
|
entity.setHandleMsg(handleMsg);
|
|
|
@@ -358,10 +364,6 @@ public class OperationMessageService {
|
|
|
entity.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
|
|
|
operationMessageDao.save(entity);
|
|
|
}
|
|
|
- // 如果是ping,则平台也会处理
|
|
|
- if (event != null && event.equals("ping")) {
|
|
|
- pingHandler(entity, requestData);
|
|
|
- }
|
|
|
}
|
|
|
return ResultContent.buildSuccess();
|
|
|
}
|
|
|
@@ -439,7 +441,7 @@ public class OperationMessageService {
|
|
|
* @param data
|
|
|
* @param iotMain
|
|
|
*/
|
|
|
- public void executeOperationMessage(OperationMessage entity, Object data, IotMain iotMain) {
|
|
|
+ public void executeOperationMessage(OperationMessage entity, JSONObject data, IotMain iotMain) {
|
|
|
CompletableFuture.runAsync(() -> {
|
|
|
OperationMessageResult messageResult = new OperationMessageResult();
|
|
|
messageResult.setOperationMessage(entity);
|
|
|
@@ -451,65 +453,89 @@ public class OperationMessageService {
|
|
|
|
|
|
boolean isHandleSuccess = true;
|
|
|
String handleMsg = "处理成功";
|
|
|
- Long handlerTime = System.currentTimeMillis();
|
|
|
|
|
|
+ Long handlerTime = System.currentTimeMillis();
|
|
|
+ HxzBaseResult handleError = new HxzBaseResult();
|
|
|
+ // 是否需要返回
|
|
|
+ Boolean isNeedReplay = Boolean.TRUE;
|
|
|
String remoteUrl = iotMain.getRemoteUrl();
|
|
|
if (StringUtils.isEmpty(remoteUrl)) {
|
|
|
isHandleSuccess = false;
|
|
|
+ isNeedReplay = Boolean.TRUE;
|
|
|
handleMsg = "业务处理地址为空";
|
|
|
} else {
|
|
|
- org.springframework.util.StopWatch stopWatch = new org.springframework.util.StopWatch();
|
|
|
- stopWatch.start();
|
|
|
-
|
|
|
- APIResponseModel apiResponseModel = apiRequestService.requestAPI(remoteUrl, data);
|
|
|
- messageResult.setResultData(apiResponseModel);
|
|
|
- if (apiResponseModel.isSuccess()) {
|
|
|
- // 处理成功
|
|
|
- String content = apiResponseModel.getContent();
|
|
|
- entity.setResultData(content);
|
|
|
- if (iotMain.getIsReturnData() != null && iotMain.getIsReturnData()) {
|
|
|
- // 返回
|
|
|
- JSONObject jsonObject = new JSONObject();
|
|
|
- jsonObject.put("id", entity.getDataId());
|
|
|
-
|
|
|
+ // 验证参数
|
|
|
+ ResultContent verifyContent = iotDataVerifyService.verifyIotParam(iotMain, data);
|
|
|
+ if (verifyContent.isSuccess()) {
|
|
|
+ // 参数验证成功
|
|
|
+ org.springframework.util.StopWatch stopWatch = new org.springframework.util.StopWatch();
|
|
|
+ stopWatch.start();
|
|
|
+ APIResponseModel apiResponseModel = apiRequestService.requestAPI(remoteUrl, data);
|
|
|
+ messageResult.setResultData(apiResponseModel);
|
|
|
+ if (apiResponseModel.isSuccess()) {
|
|
|
+ // 处理成功
|
|
|
+ String content = apiResponseModel.getContent();
|
|
|
JSONObject object = new JSONObject();
|
|
|
if (ObjectUtils.isNotEmpty(content)) {
|
|
|
object = JSONUtil.parseObj(content);
|
|
|
}
|
|
|
- jsonObject.put("data", object);
|
|
|
- jsonObject.put("time", System.currentTimeMillis());
|
|
|
- jsonObject.put("ttl", entity.getTtlTime());
|
|
|
- jsonObject.put("event", entity.getEvent());
|
|
|
- jsonObject.put("deviceId", entity.getDeviceId());
|
|
|
- jsonObject.put("gateWayId", entity.getGateWayId());
|
|
|
-
|
|
|
- String reTopic = String.format("%s/reply", entity.getTopic());
|
|
|
- String reMsg = "响应成功";
|
|
|
- Boolean reIsSuccess = Boolean.TRUE;
|
|
|
- try {
|
|
|
- mqClient.sendObject(reTopic, JSONUtil.toJsonStr(jsonObject), entity.getDataId());
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- reIsSuccess = Boolean.FALSE;
|
|
|
- reMsg = "mqtt响应出错:" + e.getMessage();
|
|
|
+ messageResult.setResultData(content);
|
|
|
+ messageResult.setReplayData(object);
|
|
|
+ isHandleSuccess = true;
|
|
|
+ if (iotMain.getIsReturnData() != null && iotMain.getIsReturnData()) {
|
|
|
+ isNeedReplay = true;
|
|
|
+ } else {
|
|
|
+ isNeedReplay = false;
|
|
|
+ messageResult.setIsResult(Boolean.TRUE);
|
|
|
+ messageResult.setReIsSuccess(Boolean.FALSE);
|
|
|
+ messageResult.setReMsg("无需返回");
|
|
|
}
|
|
|
- messageResult.setIsResult(Boolean.TRUE);
|
|
|
- messageResult.setReIsSuccess(reIsSuccess);
|
|
|
- messageResult.setReMsg(reMsg);
|
|
|
- messageResult.setReTime(System.currentTimeMillis());
|
|
|
- messageResult.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
|
|
|
- messageResult.setReTopic(reTopic);
|
|
|
} else {
|
|
|
- messageResult.setIsResult(Boolean.TRUE);
|
|
|
- messageResult.setReIsSuccess(Boolean.FALSE);
|
|
|
- messageResult.setReMsg("无需返回");
|
|
|
+ // 业务响应失败
|
|
|
+ isHandleSuccess = false;
|
|
|
+ handleMsg = apiResponseModel.getMsg();
|
|
|
}
|
|
|
+ stopWatch.stop();
|
|
|
+ messageResult.setUseTime(stopWatch.getTotalTimeMillis());
|
|
|
} else {
|
|
|
isHandleSuccess = false;
|
|
|
- handleMsg = apiResponseModel.getMsg();
|
|
|
+ isNeedReplay = Boolean.TRUE;
|
|
|
+ handleMsg = verifyContent.getMsg();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if(!isHandleSuccess) {
|
|
|
+ handleError.setFailed(handleMsg);
|
|
|
+ JSONObject replayData = JSONUtil.parseObj(handleError);
|
|
|
+ messageResult.setReplayData(replayData);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (isNeedReplay) {
|
|
|
+ // 返回
|
|
|
+ JSONObject jsonObject = new JSONObject();
|
|
|
+ jsonObject.put("id", entity.getDataId());
|
|
|
+ jsonObject.put("data", messageResult.getReplayData());
|
|
|
+ jsonObject.put("time", System.currentTimeMillis());
|
|
|
+ jsonObject.put("ttl", entity.getTtlTime());
|
|
|
+ jsonObject.put("event", entity.getEvent());
|
|
|
+ jsonObject.put("deviceId", entity.getDeviceId());
|
|
|
+ jsonObject.put("gateWayId", entity.getGateWayId());
|
|
|
+ String reTopic = String.format("%s/reply", entity.getTopic());
|
|
|
+ String reMsg = "响应成功";
|
|
|
+ Boolean reIsSuccess = Boolean.TRUE;
|
|
|
+ try {
|
|
|
+ mqClient.sendObject(reTopic, JSONUtil.toJsonStr(jsonObject), entity.getDataId());
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ reIsSuccess = Boolean.FALSE;
|
|
|
+ reMsg = "mqtt响应出错:" + e.getMessage();
|
|
|
}
|
|
|
- stopWatch.stop();
|
|
|
- messageResult.setUseTime(stopWatch.getTotalTimeMillis());
|
|
|
+ messageResult.setIsResult(Boolean.TRUE);
|
|
|
+ messageResult.setReIsSuccess(reIsSuccess);
|
|
|
+ messageResult.setReMsg(reMsg);
|
|
|
+ messageResult.setReTime(System.currentTimeMillis());
|
|
|
+ messageResult.setReTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
|
|
|
+ messageResult.setReTopic(reTopic);
|
|
|
}
|
|
|
messageResult.setIsHandleSuccess(isHandleSuccess);
|
|
|
messageResult.setHandleMsg(handleMsg);
|