|
@@ -1,10 +1,17 @@
|
|
|
package com.zswl.dataservice.service.artemis;
|
|
package com.zswl.dataservice.service.artemis;
|
|
|
|
|
|
|
|
import com.zswl.dataservice.service.base.SuperService;
|
|
import com.zswl.dataservice.service.base.SuperService;
|
|
|
|
|
+import com.zswl.dataservice.service.mqtt.GateWayInfoService;
|
|
|
import com.zswl.dataservice.utils.mqtt.mqttConfig.client.MQClient;
|
|
import com.zswl.dataservice.utils.mqtt.mqttConfig.client.MQClient;
|
|
|
import com.zswl.dataservice.utils.mqtt.mqttConfig.constant.MQConstant;
|
|
import com.zswl.dataservice.utils.mqtt.mqttConfig.constant.MQConstant;
|
|
|
|
|
+import jakarta.jms.Destination;
|
|
|
import jakarta.jms.Message;
|
|
import jakarta.jms.Message;
|
|
|
|
|
+import jakarta.jms.Session;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
|
|
|
|
+import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage;
|
|
|
|
|
+import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
|
|
|
|
|
+import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.jms.annotation.JmsListener;
|
|
import org.springframework.jms.annotation.JmsListener;
|
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.stereotype.Service;
|
|
@@ -23,6 +30,9 @@ public class ArtemisListenerService {
|
|
|
@Autowired
|
|
@Autowired
|
|
|
OperationMessageService operationMessageService;
|
|
OperationMessageService operationMessageService;
|
|
|
|
|
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ GateWayInfoService gateWayInfoService;
|
|
|
|
|
+
|
|
|
// 网关来的消息
|
|
// 网关来的消息
|
|
|
@JmsListener(destination = "/v1/gateway/#", containerFactory = MQConstant.TopicListenerContainerFactory)
|
|
@JmsListener(destination = "/v1/gateway/#", containerFactory = MQConstant.TopicListenerContainerFactory)
|
|
|
@JmsListener(destination = ".v1.gateway.#", containerFactory = MQConstant.TopicListenerContainerFactory)
|
|
@JmsListener(destination = ".v1.gateway.#", containerFactory = MQConstant.TopicListenerContainerFactory)
|
|
@@ -30,4 +40,42 @@ public class ArtemisListenerService {
|
|
|
operationMessageService.handlerGateWayMessage(message);
|
|
operationMessageService.handlerGateWayMessage(message);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ // 网关来的消息
|
|
|
|
|
+ @JmsListener(destination = "#", containerFactory = MQConstant.TopicListenerContainerFactory)
|
|
|
|
|
+ public void receiveOnLineChangeMessage(Message message, Session session) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ ActiveMQTopic activeMQTopic = (ActiveMQTopic) message.getJMSDestination();
|
|
|
|
|
+ String topicName = activeMQTopic.getTopicName();
|
|
|
|
|
+ if (!topicName.equals("$sys.mqtt.sessions")) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ log.info("==sezz {}", session.getTransacted());
|
|
|
|
|
+ String messageId = message.getJMSMessageID();
|
|
|
|
|
+ String clientId = message.getStringProperty("_AMQ_LVQ_NAME");
|
|
|
|
|
+ log.info("receiveMessage {} 消息监听clientId: {}", messageId, clientId);
|
|
|
|
|
+ log.info("Topic: {}", topicName);
|
|
|
|
|
+
|
|
|
|
|
+ if (message instanceof ActiveMQBuffer) {
|
|
|
|
|
+ log.info("---------------------");
|
|
|
|
|
+ }
|
|
|
|
|
+ Destination destination = message.getJMSDestination();
|
|
|
|
|
+
|
|
|
|
|
+ log.info("---------------------AA {} ", destination.toString());
|
|
|
|
|
+ ActiveMQMessage activeMQBytesMessage = (ActiveMQMessage) message;
|
|
|
|
|
+ ActiveMQBuffer buffer = activeMQBytesMessage.getCoreMessage().getBodyBuffer();
|
|
|
|
|
+
|
|
|
|
|
+ activeMQBytesMessage.checkBuffer();
|
|
|
|
|
+ log.info("bb: {}", buffer.readableBytes());
|
|
|
|
|
+ byte[] bytes = new byte[buffer.readableBytes()];
|
|
|
|
|
+ buffer.readBytes(bytes);
|
|
|
|
|
+
|
|
|
|
|
+ // 处理消息,解析出会话信息
|
|
|
|
|
+ String sessionInfo = new String(bytes, "gbk");
|
|
|
|
|
+ log.info("-------------------: {} {}", sessionInfo, sessionInfo.length());
|
|
|
|
|
+
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ e.printStackTrace();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
}
|
|
}
|