|
|
@@ -1,15 +1,15 @@
|
|
|
package com.zhongshu.iot.server.core.service.iot;
|
|
|
|
|
|
import cn.hutool.json.JSONObject;
|
|
|
+import cn.hutool.json.JSONUtil;
|
|
|
+import com.github.microservice.models.iot.IotSendGateWayParam;
|
|
|
import com.github.microservice.models.iot.IotSendParam;
|
|
|
import com.github.microservice.types.FunctionType;
|
|
|
+import com.zhongshu.iot.client.model.iot.IotMainSearch;
|
|
|
import com.zhongshu.iot.client.type.IotDataType;
|
|
|
import com.zhongshu.iot.client.type.OperationType;
|
|
|
import com.zhongshu.iot.server.core.dao.iot.IotMainDao;
|
|
|
-import com.zhongshu.iot.server.core.dao.mqtt.DeviceInfoDao;
|
|
|
-import com.zhongshu.iot.server.core.dao.mqtt.GateWay2DeviceDao;
|
|
|
-import com.zhongshu.iot.server.core.dao.mqtt.OperationMessageDao;
|
|
|
-import com.zhongshu.iot.server.core.dao.mqtt.OperationMessageResultDao;
|
|
|
+import com.zhongshu.iot.server.core.dao.mqtt.*;
|
|
|
import com.zhongshu.iot.server.core.domain.iot.IotMain;
|
|
|
import com.zhongshu.iot.server.core.domain.iot.mqtt.*;
|
|
|
import com.zhongshu.iot.server.core.service.base.SuperService;
|
|
|
@@ -22,13 +22,16 @@ import com.zhongshu.iot.server.core.util.result.ResultContent;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.lang3.ObjectUtils;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.springframework.beans.BeanUtils;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.data.domain.Page;
|
|
|
+import org.springframework.data.domain.PageRequest;
|
|
|
+import org.springframework.data.domain.Pageable;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Date;
|
|
|
-import java.util.List;
|
|
|
+import java.util.*;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
* @author TRX
|
|
|
@@ -60,6 +63,9 @@ public class IotSendMessageService extends SuperService {
|
|
|
@Value("${artemisstore.time}")
|
|
|
public Long ttlMill = 30 * 24L * 60 * 60 * 1000L;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ private GateWayInfoDao gateWayInfoDao;
|
|
|
+
|
|
|
/**
|
|
|
* 发送信息
|
|
|
*
|
|
|
@@ -67,34 +73,34 @@ public class IotSendMessageService extends SuperService {
|
|
|
* @return
|
|
|
*/
|
|
|
public ResultContent sendIotMessage(IotSendParam param) {
|
|
|
- // 验证数据
|
|
|
- DeviceInfo deviceInfo = deviceInfoDao.findTopByDeviceId(param.getDeviceId());
|
|
|
- if (ObjectUtils.isEmpty(deviceInfo)) {
|
|
|
- return ResultContent.buildFail(String.format("设备不存在:%S", param.getDeviceId()));
|
|
|
- }
|
|
|
- FunctionType functionType = param.getFunctionType();
|
|
|
- List<IotMain> list = new ArrayList<>();
|
|
|
- if (functionType != null) {
|
|
|
- list = iotMainDao.findByDeviceIdAndIdentifierAndIotDataTypeAndFunctionType(
|
|
|
- param.getDeviceId(), param.getIdentifier(), IotDataType.Device, functionType);
|
|
|
- } else {
|
|
|
- list = iotMainDao.findByDeviceIdAndIdentifierAndIotDataType(
|
|
|
- param.getDeviceId(), param.getIdentifier(), IotDataType.Device);
|
|
|
- }
|
|
|
+ IotMainSearch search = new IotMainSearch();
|
|
|
+ BeanUtils.copyProperties(param, search);
|
|
|
+ Pageable pageable = PageRequest.of(0, Integer.MAX_VALUE);
|
|
|
+ Page<IotMain> page = iotMainDao.page(pageable, search);
|
|
|
|
|
|
+ List<IotMain> list = page.getContent();
|
|
|
if (ObjectUtils.isEmpty(list)) {
|
|
|
- log.error("设备没有对应的物模型 {} {}", param.getDeviceId(), param.getIdentifier());
|
|
|
+ log.error("没有对应的物模型 {}", param.getIdentifier());
|
|
|
return ResultContent.buildFail("没有对应的设备物模型");
|
|
|
}
|
|
|
+ // list 去重
|
|
|
+ list= list.stream().collect(Collectors.collectingAndThen(Collectors.toCollection(
|
|
|
+ () -> new TreeSet<>(Comparator.comparing(IotMain::getRealIotTopic))), ArrayList::new));
|
|
|
+
|
|
|
+ DeviceInfo deviceInfo = null;
|
|
|
GateWayInfo gateWayInfo = null;
|
|
|
- GateWay2Device gateWay2Device = gateWay2DeviceDao.findTopByDeviceInfoOrderByUpdateTimeDesc(deviceInfo);
|
|
|
- if (ObjectUtils.isEmpty(gateWay2Device)) {
|
|
|
- gateWayInfo = gateWay2Device.getGateWayInfo();
|
|
|
+ if (StringUtils.isNotEmpty(param.getGateWayId())) {
|
|
|
+ gateWayInfo = gateWayInfoDao.findTopByGateWayId(param.getGateWayId());
|
|
|
+ }
|
|
|
+ if (StringUtils.isNotEmpty(param.getDeviceId())) {
|
|
|
+ deviceInfo = deviceInfoDao.findTopByDeviceId(param.getDeviceId());
|
|
|
}
|
|
|
+
|
|
|
String dataId = param.getDataId();
|
|
|
if (StringUtils.isEmpty(dataId)) {
|
|
|
dataId = CommonUtil.UUID();
|
|
|
}
|
|
|
+
|
|
|
// 组装要发送的数据
|
|
|
JSONObject jsonObject = new JSONObject();
|
|
|
jsonObject.set("id", dataId);
|
|
|
@@ -103,6 +109,8 @@ public class IotSendMessageService extends SuperService {
|
|
|
jsonObject.set("time", System.currentTimeMillis());
|
|
|
jsonObject.set("ttl", param.getTtl());
|
|
|
jsonObject.set("imitate", param.getIsImitate());
|
|
|
+ jsonObject.set("event", param.getEvent());
|
|
|
+ jsonObject.set("issued", Boolean.TRUE);
|
|
|
|
|
|
String token = TokenUtil.create();
|
|
|
OperationMessage entity = operationMessageDao.init(dataId, token);
|
|
|
@@ -118,12 +126,15 @@ public class IotSendMessageService extends SuperService {
|
|
|
entity.setDataId(dataId);
|
|
|
entity.setIsTimeOut(false);
|
|
|
entity.setEvent("");
|
|
|
- entity.setGateWayInfo(gateWayInfo);
|
|
|
if (ObjectUtils.isNotEmpty(gateWayInfo)) {
|
|
|
+ entity.setGateWayInfo(gateWayInfo);
|
|
|
entity.setGateWayId(gateWayInfo.getGateWayId());
|
|
|
}
|
|
|
- entity.setDeviceId(deviceInfo.getDeviceId());
|
|
|
- entity.setDeviceInfo(deviceInfo);
|
|
|
+ if (ObjectUtils.isNotEmpty(deviceInfo)) {
|
|
|
+ entity.setDeviceId(deviceInfo.getDeviceId());
|
|
|
+ entity.setDeviceInfo(deviceInfo);
|
|
|
+ }
|
|
|
+
|
|
|
entity.setTtl(new Date(System.currentTimeMillis() + ttlMill));
|
|
|
entity.setReceiveTime(System.currentTimeMillis());
|
|
|
entity.setReceiveTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
|
|
|
@@ -131,9 +142,12 @@ public class IotSendMessageService extends SuperService {
|
|
|
|
|
|
for (IotMain iotMain : list) {
|
|
|
// %s/issued 下发数据 Topic
|
|
|
- String topic = MqttTopicUtils.buildIssuedTopic(iotMain.getRealIotTopic());
|
|
|
+ String topic = iotMain.getRealIotTopic();
|
|
|
|
|
|
+ String messageId = CommonUtil.UUID();
|
|
|
OperationMessageResult messageResult = new OperationMessageResult();
|
|
|
+ messageResult.setMessageId(messageId);
|
|
|
+ messageResult.setDataId(entity.getDataId());
|
|
|
messageResult.setOperationMessage(entity);
|
|
|
// 物模型信息
|
|
|
messageResult.setIotMain(iotMain);
|
|
|
@@ -145,9 +159,14 @@ public class IotSendMessageService extends SuperService {
|
|
|
messageResult.setGateWayId(entity.getGateWayId());
|
|
|
messageResult.setRealIotTopic(topic);
|
|
|
messageResult.setData(jsonObject);
|
|
|
- messageResult.setGateWayId(entity.getGateWayId());
|
|
|
messageResult.setOperationType(entity.getOperationType());
|
|
|
|
|
|
+ jsonObject.set("id", messageId);
|
|
|
+ try {
|
|
|
+ mqClient.sendObject(topic, JSONUtil.toJsonStr(jsonObject), messageId);
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
operationMessageResultDao.save(messageResult);
|
|
|
}
|
|
|
} else {
|
|
|
@@ -156,4 +175,31 @@ public class IotSendMessageService extends SuperService {
|
|
|
return ResultContent.buildSuccess();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 给网关发生信息
|
|
|
+ *
|
|
|
+ * @param param
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public ResultContent sendIotGateWayMessage(IotSendGateWayParam param) {
|
|
|
+
|
|
|
+ String dataId = CommonUtil.UUID();
|
|
|
+ JSONObject jsonObject = new JSONObject();
|
|
|
+ jsonObject.set("id", dataId);
|
|
|
+ jsonObject.set("data", param.getData());
|
|
|
+ jsonObject.set("timeStr", DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
|
|
|
+ jsonObject.set("time", System.currentTimeMillis());
|
|
|
+ jsonObject.set("ttl", param.getTtl());
|
|
|
+ jsonObject.set("imitate", param.getIsImitate());
|
|
|
+ jsonObject.set("gateWayId", param.getGateWayId());
|
|
|
+ jsonObject.set("event", "permissionNotice");
|
|
|
+
|
|
|
+ String topIc = "/v1/gateway/" + param.getGateWayId() + "/reply";
|
|
|
+ try {
|
|
|
+ mqClient.sendObject(topIc, JSONUtil.toJsonStr(jsonObject), dataId);
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ return ResultContent.buildSuccess();
|
|
|
+ }
|
|
|
}
|