|
|
@@ -0,0 +1,98 @@
|
|
|
+package com.zhongshu.card.server.core.service.mqtt;
|
|
|
+
|
|
|
+import cn.hutool.json.JSONObject;
|
|
|
+import cn.hutool.json.JSONUtil;
|
|
|
+import com.google.gson.JsonObject;
|
|
|
+import com.zhongshu.card.client.model.mqtt.SendMessageModel;
|
|
|
+import com.zhongshu.card.client.ret.ResultContent;
|
|
|
+import com.zhongshu.card.server.core.dao.org.UserCountDao;
|
|
|
+import com.zhongshu.card.server.core.service.base.RedisService;
|
|
|
+import com.zhongshu.card.server.core.service.base.SuperService;
|
|
|
+import com.zhongshu.card.server.core.service.mqtt.mqttConfig.client.MQClient;
|
|
|
+import com.zhongshu.card.server.core.service.mqtt.mqttConfig.constant.MQConstant;
|
|
|
+import com.zhongshu.card.server.core.util.DateUtils;
|
|
|
+import jakarta.jms.Message;
|
|
|
+import jakarta.jms.TextMessage;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage;
|
|
|
+import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.jms.annotation.JmsListener;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import java.util.UUID;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author TRX
|
|
|
+ * @date 2024/3/20
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+public class MqttServiceImpl extends SuperService {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ UserCountDao userDao;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ RedisService redisService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ MQClient mqClient;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送指令
|
|
|
+ *
|
|
|
+ * @param param
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public ResultContent sendMessage(SendMessageModel param) {
|
|
|
+ String msg = "发送成功";
|
|
|
+ try {
|
|
|
+ JsonObject jsonObject = new JsonObject();
|
|
|
+ jsonObject.addProperty("id", UUID.randomUUID().toString());
|
|
|
+ jsonObject.addProperty("data", param.getMessage());
|
|
|
+ jsonObject.addProperty("time", DateUtils.paresTime(System.currentTimeMillis(), DateUtils.patternyyyySSS));
|
|
|
+ jsonObject.addProperty("ttl", 10 * 1000);
|
|
|
+ mqClient.sendObject(param.getTopic(), jsonObject.toString());
|
|
|
+ log.info("mqtt msg 发送成功");
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ msg = "发送失败: " + e.getMessage();
|
|
|
+ }
|
|
|
+ return ResultContent.buildSuccess(msg);
|
|
|
+ }
|
|
|
+
|
|
|
+ // @JmsListener(destination = "#", containerFactory = MQConstant.TopicListenerContainerFactory)
|
|
|
+ @JmsListener(destination = "v1/#", containerFactory = MQConstant.TopicListenerContainerFactory)
|
|
|
+ @JmsListener(destination = "v1.#", containerFactory = MQConstant.TopicListenerContainerFactory)
|
|
|
+ public void receiveMessage(Message message) {
|
|
|
+ // 处理接收到的消息
|
|
|
+ try {
|
|
|
+ ActiveMQTopic activeMQTopic = (ActiveMQTopic) message.getJMSDestination();
|
|
|
+ String topicName = activeMQTopic.getTopicName();
|
|
|
+ if (topicName.equals("activemq.notifications")) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ log.info("receiveMessage {} 消息监听clientId: {}", message.getJMSMessageID(), message.getStringProperty("__AMQ_CID"));
|
|
|
+ log.info("Topic: {}", topicName);
|
|
|
+
|
|
|
+ if (message instanceof ActiveMQBytesMessage) {
|
|
|
+ ActiveMQBytesMessage activeMQBytesMessage = (ActiveMQBytesMessage) message;
|
|
|
+
|
|
|
+ byte[] messageBody = new byte[(int) activeMQBytesMessage.getBodyLength()];
|
|
|
+ // 读取消息内容到字节数组
|
|
|
+ activeMQBytesMessage.readBytes(messageBody);
|
|
|
+ String msg = new String(messageBody);
|
|
|
+ JSONObject json = JSONUtil.parseObj(msg);
|
|
|
+ log.info("消息内容:{}", json.get("msg"));
|
|
|
+ }
|
|
|
+ if (message instanceof TextMessage) {
|
|
|
+ TextMessage textMessage = (TextMessage) message;
|
|
|
+ log.info("textMessage: {}", textMessage.getText());
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+}
|