|
|
@@ -0,0 +1,144 @@
|
|
|
+package com.zswl.dataservice.service.iot;
|
|
|
+
|
|
|
+import cn.hutool.json.JSONObject;
|
|
|
+import com.github.microservice.models.iot.IotSendParam;
|
|
|
+import com.zswl.dataservice.dao.iot.IotMainDao;
|
|
|
+import com.zswl.dataservice.dao.iot.IotTopicDao;
|
|
|
+import com.zswl.dataservice.dao.mqtt.DeviceInfoDao;
|
|
|
+import com.zswl.dataservice.dao.mqtt.GateWay2DeviceDao;
|
|
|
+import com.zswl.dataservice.dao.mqtt.OperationMessageDao;
|
|
|
+import com.zswl.dataservice.dao.mqtt.OperationMessageResultDao;
|
|
|
+import com.zswl.dataservice.domain.iot.IotMain;
|
|
|
+import com.zswl.dataservice.domain.mqtt.*;
|
|
|
+import com.zswl.dataservice.service.artemis.OperationMessageService;
|
|
|
+import com.zswl.dataservice.service.base.SuperService;
|
|
|
+import com.zswl.dataservice.type.IotDataType;
|
|
|
+import com.zswl.dataservice.type.OperationType;
|
|
|
+import com.zswl.dataservice.utils.CommonUtil;
|
|
|
+import com.zswl.dataservice.utils.DateUtils;
|
|
|
+import com.zswl.dataservice.utils.TokenUtil;
|
|
|
+import com.zswl.dataservice.utils.mqtt.MqttTopicUtils;
|
|
|
+import com.zswl.dataservice.utils.mqtt.mqttConfig.client.MQClient;
|
|
|
+import com.zswl.dataservice.utils.result.ResultContent;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.lang3.ObjectUtils;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import java.util.Date;
|
|
|
+import java.util.List;
|
|
|
+import java.util.UUID;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author TRX
|
|
|
+ * @date 2024/8/9
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+public class IotSendMessageService extends SuperService {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private DeviceInfoDao deviceInfoDao;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IotMainDao iotMainDao;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ OperationMessageDao operationMessageDao;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private OperationMessageResultDao operationMessageResultDao;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private MQClient mqClient;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private GateWay2DeviceDao gateWay2DeviceDao;
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送信息
|
|
|
+ *
|
|
|
+ * @param param
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public ResultContent sendIotMessage(IotSendParam param) {
|
|
|
+ // 验证数据
|
|
|
+ DeviceInfo deviceInfo = deviceInfoDao.findTopByDeviceId(param.getDeviceId());
|
|
|
+ if (ObjectUtils.isEmpty(deviceInfo)) {
|
|
|
+ return ResultContent.buildFail(String.format("设备不存在:%S", param.getDeviceId()));
|
|
|
+ }
|
|
|
+ List<IotMain> list = iotMainDao.findByDeviceIdAndIdentifierAndIotDataType(
|
|
|
+ param.getDeviceId(), param.getIdentifier(), IotDataType.Device
|
|
|
+ );
|
|
|
+ if (ObjectUtils.isEmpty(list)) {
|
|
|
+ return ResultContent.buildFail("没有对应的设备物模型");
|
|
|
+ }
|
|
|
+ GateWayInfo gateWayInfo = null;
|
|
|
+ GateWay2Device gateWay2Device = gateWay2DeviceDao.findTopByDeviceInfoOrderByUpdateTimeDesc(deviceInfo);
|
|
|
+ if (ObjectUtils.isEmpty(gateWay2Device)) {
|
|
|
+ gateWayInfo = gateWay2Device.getGateWayInfo();
|
|
|
+ }
|
|
|
+ String dataId = param.getDataId();
|
|
|
+ if (StringUtils.isEmpty(dataId)) {
|
|
|
+ dataId = CommonUtil.UUID();
|
|
|
+ }
|
|
|
+ // 组装要发送的数据
|
|
|
+ JSONObject jsonObject = new JSONObject();
|
|
|
+ jsonObject.set("id", dataId);
|
|
|
+ jsonObject.set("data", param.getData());
|
|
|
+ jsonObject.set("timeStr", DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
|
|
|
+ jsonObject.set("time", System.currentTimeMillis());
|
|
|
+ jsonObject.set("ttl", param.getTtl());
|
|
|
+ jsonObject.set("imitate", param.getIsImitate());
|
|
|
+
|
|
|
+ String token = TokenUtil.create();
|
|
|
+ OperationMessage entity = operationMessageDao.init(dataId, token);
|
|
|
+ if (ObjectUtils.isNotEmpty(entity)) {
|
|
|
+ entity.setMessageId(CommonUtil.UUID()); // 消息ID
|
|
|
+ entity.setClientId(""); // 终端ID
|
|
|
+ entity.setTopic(""); // topic名称
|
|
|
+ entity.setOperationType(OperationType.Push);
|
|
|
+ entity.setMessageClass("");
|
|
|
+ entity.setData(jsonObject);
|
|
|
+ entity.setTtlTime(param.getTtl());
|
|
|
+ entity.setSendTime(System.currentTimeMillis());
|
|
|
+ entity.setDataId(dataId);
|
|
|
+ entity.setIsTimeOut(false);
|
|
|
+ entity.setEvent("");
|
|
|
+ entity.setGateWayInfo(gateWayInfo);
|
|
|
+ if (ObjectUtils.isNotEmpty(gateWayInfo)) {
|
|
|
+ entity.setGateWayId(gateWayInfo.getGateWayId());
|
|
|
+ }
|
|
|
+ entity.setDeviceId(deviceInfo.getDeviceId());
|
|
|
+ entity.setDeviceInfo(deviceInfo);
|
|
|
+ entity.setTtl(new Date(System.currentTimeMillis() + OperationMessageService.ttlMill));
|
|
|
+ entity.setReceiveTime(System.currentTimeMillis());
|
|
|
+ entity.setReceiveTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
|
|
|
+ operationMessageDao.save(entity);
|
|
|
+
|
|
|
+ for (IotMain iotMain : list) {
|
|
|
+ // %s/issued 下发数据 Topic
|
|
|
+ String topic = MqttTopicUtils.buildIssuedTopic(iotMain.getRealIotTopic());
|
|
|
+
|
|
|
+ OperationMessageResult messageResult = new OperationMessageResult();
|
|
|
+ messageResult.setOperationMessage(entity);
|
|
|
+ messageResult.setIotMain(iotMain);
|
|
|
+ // 设备ID
|
|
|
+ messageResult.setDeviceId(iotMain.getDeviceId());
|
|
|
+ // 分组code
|
|
|
+ messageResult.setProjectCode(iotMain.getProjectCode());
|
|
|
+ messageResult.setGateWayId(entity.getGateWayId());
|
|
|
+ messageResult.setRealIotTopic(topic);
|
|
|
+ messageResult.setData(jsonObject);
|
|
|
+ messageResult.setGateWayId(entity.getGateWayId());
|
|
|
+ messageResult.setOperationType(entity.getOperationType());
|
|
|
+
|
|
|
+ operationMessageResultDao.save(messageResult);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ResultContent.buildSuccess();
|
|
|
+ }
|
|
|
+
|
|
|
+}
|