|
|
@@ -0,0 +1,136 @@
|
|
|
+package com.zswl.dataservice.service.artemis;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.google.gson.JsonObject;
|
|
|
+import com.zswl.dataservice.domain.mqtt.OperationMessage;
|
|
|
+import com.zswl.dataservice.httpRequest.ApiRequestService;
|
|
|
+import com.zswl.dataservice.model.mqtt.SendMessageModel;
|
|
|
+import com.zswl.dataservice.service.base.RedisService;
|
|
|
+import com.zswl.dataservice.service.base.SuperService;
|
|
|
+import com.zswl.dataservice.service.payment.HxzService;
|
|
|
+import com.zswl.dataservice.type.OperationType;
|
|
|
+import com.zswl.dataservice.utils.DateUtils;
|
|
|
+import com.zswl.dataservice.utils.mqtt.mqttConfig.client.MQClient;
|
|
|
+import com.zswl.dataservice.utils.mqtt.mqttConfig.constant.MQConstant;
|
|
|
+import com.zswl.dataservice.utils.result.ResultContent;
|
|
|
+import jakarta.jms.Message;
|
|
|
+import jakarta.jms.TextMessage;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage;
|
|
|
+import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.jms.annotation.JmsListener;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import java.util.UUID;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author TRX
|
|
|
+ * @date 2024/6/27
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+public class ArtemisService extends SuperService {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ RedisService redisService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ MQClient mqClient;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ ApiRequestService apiRequestService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ OperationMessageService operationMessageService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ HxzService hxzService;
|
|
|
+
|
|
|
+ // 网关来的消息
|
|
|
+ @JmsListener(destination = "/v1/gateway/#", containerFactory = MQConstant.TopicListenerContainerFactory)
|
|
|
+ @JmsListener(destination = ".v1.gateway.#", containerFactory = MQConstant.TopicListenerContainerFactory)
|
|
|
+ public void receiveMessage(Message message) {
|
|
|
+ // 处理接收到的消息
|
|
|
+ try {
|
|
|
+ ActiveMQTopic activeMQTopic = (ActiveMQTopic) message.getJMSDestination();
|
|
|
+ String topicName = activeMQTopic.getTopicName();
|
|
|
+ if (topicName.equals("activemq.notifications")) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ OperationMessage operationMessage = new OperationMessage();
|
|
|
+
|
|
|
+ String messageId = message.getJMSMessageID();
|
|
|
+ String clientId = message.getStringProperty("__AMQ_CID");
|
|
|
+ log.info("receiveMessage {} 消息监听clientId: {}", messageId, clientId);
|
|
|
+ log.info("Topic: {}", topicName);
|
|
|
+
|
|
|
+ operationMessage.setMessageId(messageId);
|
|
|
+ operationMessage.setClientId(clientId);
|
|
|
+ operationMessage.setTopic(topicName);
|
|
|
+ operationMessage.setOperationType(OperationType.Sub);
|
|
|
+
|
|
|
+ String messageClass = "";
|
|
|
+ String msg = "";
|
|
|
+ if (message instanceof ActiveMQBytesMessage) {
|
|
|
+ ActiveMQBytesMessage activeMQBytesMessage = (ActiveMQBytesMessage) message;
|
|
|
+ messageClass = ActiveMQBytesMessage.class.getSimpleName();
|
|
|
+ byte[] messageBody = new byte[(int) activeMQBytesMessage.getBodyLength()];
|
|
|
+ // 读取消息内容到字节数组
|
|
|
+ activeMQBytesMessage.readBytes(messageBody);
|
|
|
+ msg = new String(messageBody);
|
|
|
+ }
|
|
|
+ if (message instanceof TextMessage) {
|
|
|
+ messageClass = TextMessage.class.getSimpleName();
|
|
|
+ TextMessage textMessage = (TextMessage) message;
|
|
|
+ msg = textMessage.getText();
|
|
|
+ }
|
|
|
+ JSONObject json = JSONObject.parseObject(msg);
|
|
|
+ String id = json.getString("id");
|
|
|
+ Long time = json.getLong("time");
|
|
|
+ Long ttl = json.getLong("ttl");
|
|
|
+ boolean isTimeOut = false;
|
|
|
+ if (System.currentTimeMillis() > (time + ttl)) {
|
|
|
+ isTimeOut = true;
|
|
|
+ }
|
|
|
+ log.info("textMessage: {}", msg);
|
|
|
+ // --------------------处理业务 start------------------
|
|
|
+ operationMessage.setMessageClass(messageClass);
|
|
|
+ operationMessage.setData(msg);
|
|
|
+ operationMessage.setTtlTime(ttl);
|
|
|
+ operationMessage.setSendTime(time);
|
|
|
+ operationMessage.setDataId(id);
|
|
|
+ operationMessage.setIsTimeOut(isTimeOut);
|
|
|
+ if (!isTimeOut) {
|
|
|
+
|
|
|
+ }
|
|
|
+ operationMessageService.addOperationMessage(operationMessage);
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送指令
|
|
|
+ *
|
|
|
+ * @param param
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public ResultContent sendMessage(SendMessageModel param) {
|
|
|
+ String msg = "发送成功";
|
|
|
+ try {
|
|
|
+ JsonObject jsonObject = new JsonObject();
|
|
|
+ jsonObject.addProperty("id", UUID.randomUUID().toString());
|
|
|
+ jsonObject.addProperty("data", param.getMessage());
|
|
|
+ jsonObject.addProperty("time", DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
|
|
|
+ jsonObject.addProperty("ttl", 10 * 1000);
|
|
|
+// mqClient.sendObject(param.getTopic(), jsonObject.toString());
|
|
|
+ log.info("mqtt msg 发送成功");
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ msg = "发送失败: " + e.getMessage();
|
|
|
+ }
|
|
|
+ return ResultContent.buildSuccess(msg);
|
|
|
+ }
|
|
|
+
|
|
|
+}
|