TRX hace 1 año
padre
commit
b3ad6b639f

+ 3 - 0
OneCardIotClient/src/main/java/com/zhongshu/iot/client/model/mqtt/GateWayUserInfoAddParam.java

@@ -15,4 +15,7 @@ public class GateWayUserInfoAddParam extends SuperParam {
 
     @Schema(description = "mqtt连接密码", required = true)
     private String passWord;
+
+    @Schema(description = "mqtt角色信息")
+    private String roleName;
 }

+ 1 - 0
OneCardIotClient/src/main/java/com/zhongshu/iot/client/model/mqtt/MqttInfoReturnModel.java

@@ -9,6 +9,7 @@ import lombok.Data;
  */
 @Data
 public class MqttInfoReturnModel {
+
     @Schema(description = "MQTT地址")
     private String brokerAddress;
 

+ 4 - 0
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/dao/mqtt/GateWay2UserDao.java

@@ -1,6 +1,7 @@
 package com.zhongshu.iot.server.core.dao.mqtt;
 
 import com.zhongshu.iot.server.core.domain.iot.mqtt.GateWay2User;
+import com.zhongshu.iot.server.core.domain.iot.mqtt.GateWayInfo;
 import com.zhongshu.iot.server.core.domain.iot.mqtt.GateWayUserInfo;
 
 /**
@@ -10,4 +11,7 @@ import com.zhongshu.iot.server.core.domain.iot.mqtt.GateWayUserInfo;
 public interface GateWay2UserDao extends org.springframework.data.mongodb.repository.MongoRepository<GateWay2User, String> {
 
     GateWay2User findTopByGateWayUserInfo(GateWayUserInfo gateWayUserInfo);
+
+    GateWay2User findTopByGateWayInfo(GateWayInfo gateWayInfo);
+
 }

+ 9 - 0
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/domain/docker/AkSkConfig.java

@@ -2,10 +2,12 @@ package com.zhongshu.iot.server.core.domain.docker;
 
 import com.zhongshu.iot.client.type.DataState;
 import com.zhongshu.iot.server.core.domain.base.SuperEntity;
+import com.zhongshu.iot.server.core.domain.iot.mqtt.GateWayInfo;
 import io.swagger.v3.oas.annotations.media.Schema;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
+import org.springframework.data.mongodb.core.mapping.DBRef;
 import org.springframework.data.mongodb.core.mapping.Document;
 
 /**
@@ -20,10 +22,17 @@ import org.springframework.data.mongodb.core.mapping.Document;
 @AllArgsConstructor
 public class AkSkConfig extends SuperEntity {
 
+    @Schema(description = "ak")
     private String ak;
 
+    @Schema(description = "sk")
     private String sk;
 
     @Schema(description = "是否可用")
     private DataState state = DataState.Enable;
+
+    @Schema(description = "关联的网关信息")
+    @DBRef(lazy = true)
+    private GateWayInfo gateWayInfo;
+
 }

+ 6 - 0
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/domain/iot/mqtt/GateWay2User.java

@@ -24,10 +24,16 @@ public class GateWay2User extends SuperEntity {
     @Schema(description = "硬件设备网关信息")
     private GateWayInfo gateWayInfo;
 
+    @Schema(description = "网关ID")
+    private String gateWayId;
+
     @DBRef(lazy = true)
     @Schema(description = "用户信息")
     private GateWayUserInfo gateWayUserInfo;
 
+    @Schema(description = "mqtt账号")
+    private String userName;
+
     @Schema(description = "绑定的时间")
     private Long bindTime;
 

+ 3 - 0
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/domain/iot/mqtt/GateWayInfo.java

@@ -45,4 +45,7 @@ public class GateWayInfo extends SuperEntity {
 
     @Schema(description = "最上线时间")
     private Long lastOnlineTime;
+
+    @Schema(description = "最大可连接失败数量")
+    private Long maxConnectDevices = 10L;
 }

+ 7 - 0
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/domain/iot/mqtt/Mqtt2User.java

@@ -10,6 +10,7 @@ import org.springframework.data.mongodb.core.mapping.Document;
 
 /**
  * mqtt服务和账号关系信息 (是否在mqtt服务商添加账号)
+ *
  * @author TRX
  * @date 2024/5/14
  */
@@ -23,10 +24,16 @@ public class Mqtt2User extends SuperEntity {
     @DBRef(lazy = true)
     private MqttInfo mqttInfo;
 
+    @Schema(description = "mqtt连接名称")
+    private String mqttInfoName;
+
     @Schema(description = "网关用户信息")
     @DBRef(lazy = true)
     private GateWayUserInfo gateWayUserInfo;
 
+    @Schema(description = "连接账号")
+    private String userName;
+
     @Schema(description = "是否已同步")
     private Boolean isSync = Boolean.FALSE;
 

+ 4 - 0
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/service/base/AkSignService.java

@@ -4,6 +4,7 @@ import com.github.microservice.net.ResultContent;
 import com.zhongshu.iot.client.type.DataState;
 import com.zhongshu.iot.server.core.dao.docker.AkSkConfigDao;
 import com.zhongshu.iot.server.core.domain.docker.AkSkConfig;
+import com.zhongshu.iot.server.core.util.CommonUtil;
 import com.zhongshu.iot.server.core.util.SecurityUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.ObjectUtils;
@@ -60,6 +61,9 @@ public class AkSignService {
      */
     public ResultContent createAkSk() {
         AkSkConfig akSkConfig = new AkSkConfig();
+        akSkConfig.setAk(CommonUtil.generateRandomString(16));
+        akSkConfig.setSk(CommonUtil.generateRandomString(32));
+        akSkConfig.setState(DataState.Enable);
 
         akSkConfigDao.save(akSkConfig);
         return ResultContent.buildSuccess();

+ 2 - 2
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/service/mqtt/GateWayInfoService.java

@@ -1,6 +1,7 @@
 package com.zhongshu.iot.server.core.service.mqtt;
 
 import com.github.microservice.models.type.OnLineState;
+import com.github.microservice.net.ResultContent;
 import com.zhongshu.iot.client.model.mqtt.*;
 import com.zhongshu.iot.server.core.dao.mqtt.*;
 import com.zhongshu.iot.server.core.domain.iot.mqtt.*;
@@ -12,7 +13,6 @@ import com.zhongshu.iot.server.core.util.DateUtils;
 import com.zhongshu.iot.server.core.util.bean.BeanUtils;
 import com.zhongshu.iot.server.core.util.mqtt.MqttTopicUtils;
 import com.zhongshu.iot.server.core.util.page.PageEntityUtil;
-import com.github.microservice.net.ResultContent;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
@@ -130,7 +130,7 @@ public class GateWayInfoService extends SuperService {
         ResultContent<GateWayInfo> gateWayInfo = addGateWayInfo(param);
 
         // 给网关分配个mqtt账号
-        MqttInfoReturnModel mqttInfoSimpleModel = mqttInfoService.getCommonMqttInfo();
+        MqttInfoReturnModel mqttInfoSimpleModel = mqttInfoService.getCommonMqttInfo(gateWayInfo.getContent());
         return ResultContent.buildSuccess(mqttInfoSimpleModel);
     }
 

+ 58 - 11
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/service/mqtt/GateWayUserInfoService.java

@@ -1,20 +1,20 @@
 package com.zhongshu.iot.server.core.service.mqtt;
 
+import com.github.microservice.net.ResultContent;
 import com.zhongshu.iot.client.model.mqtt.GateWayUserInfoAddParam;
 import com.zhongshu.iot.client.model.mqtt.GateWayUserInfoModel;
 import com.zhongshu.iot.client.model.mqtt.GateWayUserInfoNameParam;
 import com.zhongshu.iot.client.model.mqtt.GateWayUserInfoSearchParam;
 import com.zhongshu.iot.client.type.type.MqttUserState;
+import com.zhongshu.iot.server.core.dao.mqtt.GateWay2UserDao;
 import com.zhongshu.iot.server.core.dao.mqtt.GateWayUserInfoDao;
 import com.zhongshu.iot.server.core.dao.mqtt.Mqtt2UserDao;
 import com.zhongshu.iot.server.core.dao.mqtt.MqttInfoDao;
-import com.zhongshu.iot.server.core.domain.iot.mqtt.GateWayUserInfo;
-import com.zhongshu.iot.server.core.domain.iot.mqtt.Mqtt2User;
-import com.zhongshu.iot.server.core.domain.iot.mqtt.MqttInfo;
+import com.zhongshu.iot.server.core.domain.iot.mqtt.*;
+import com.zhongshu.iot.server.core.util.CommonUtil;
 import com.zhongshu.iot.server.core.util.DateUtils;
 import com.zhongshu.iot.server.core.util.bean.BeanUtils;
 import com.zhongshu.iot.server.core.util.page.PageEntityUtil;
-import com.github.microservice.net.ResultContent;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
@@ -43,16 +43,20 @@ import java.util.List;
 public class GateWayUserInfoService {
 
     @Autowired
-    GateWayUserInfoDao gateWayUserInfoDao;
+    private GateWayUserInfoDao gateWayUserInfoDao;
 
     @Autowired
-    MqttInfoDao mqttInfoDao;
+    private MqttInfoDao mqttInfoDao;
 
     @Autowired
-    Mqtt2UserDao mqtt2UserDao;
+    private Mqtt2UserDao mqtt2UserDao;
+
+    @Autowired
+    private GateWay2UserDao gateWay2UserDao;
 
     /**
      * 初始mqtt连接账号信息
+     *
      * @return
      */
     public ResultContent initDefaultUser() {
@@ -64,19 +68,56 @@ public class GateWayUserInfoService {
             param.setPassWord("admin123");
             addGateWayUser(param);
             log.info(String.format("gateWayUser [%s] 初始化成功", userName));
-        }else {
+        } else {
             log.info(String.format("gateWayUser [%s] 已初始化", userName));
         }
         return ResultContent.buildSuccess();
     }
 
+    public GateWayUserInfo gateWayBindMqttUser(GateWayInfo gateWayInfo) {
+        GateWayUserInfo gateWayUserInfo = null;
+        if (ObjectUtils.isNotEmpty(gateWayInfo)) {
+            GateWay2User gateWay2User = gateWay2UserDao.findTopByGateWayInfo(gateWayInfo);
+            if (ObjectUtils.isEmpty(gateWay2User)) {
+                GateWayUserInfoAddParam userParam = new GateWayUserInfoAddParam();
+                String userName = CommonUtil.generateRandomString(8);
+                String roleName = userName;
+                userParam.setUserName(userName);
+                userParam.setPassWord(CommonUtil.generateRandomString(16));
+                userParam.setRoleName(roleName);
+
+                // 创建 并同步
+                ResultContent<GateWayUserInfo> resultContent = addGateWayUser(userParam);
+                if (resultContent.isSuccess()) {
+                    gateWayUserInfo = resultContent.getContent();
+                    gateWay2User = new GateWay2User();
+                    gateWay2User.setGateWayInfo(gateWayInfo);
+                    gateWay2User.setGateWayId(gateWayInfo.getGateWayId());
+                    gateWay2User.setGateWayUserInfo(gateWayUserInfo);
+                    gateWay2User.setUserName(userName);
+
+                    gateWay2User.setBindTime(System.currentTimeMillis());
+                    gateWay2User.setBindTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));
+                    gateWay2UserDao.save(gateWay2User);
+                }
+            } else {
+                gateWayUserInfo = gateWay2User.getGateWayUserInfo();
+            }
+        }
+        syncUserToMQTTService(gateWayUserInfo);
+        return gateWayUserInfo;
+    }
+
     /**
      * 添加用户
      *
      * @param param
      * @return
      */
-    public ResultContent addGateWayUser(GateWayUserInfoAddParam param) {
+    public ResultContent<GateWayUserInfo> addGateWayUser(GateWayUserInfoAddParam param) {
+        if (StringUtils.isEmpty(param.getRoleName())) {
+            return ResultContent.buildFail("roleName不能为空");
+        }
         GateWayUserInfo entity = gateWayUserInfoDao.findTopByUserName(param.getUserName());
         if (ObjectUtils.isNotEmpty(entity)) {
             return ResultContent.buildFail(String.format("%s 用户已存在", param.getUserName()));
@@ -84,11 +125,10 @@ public class GateWayUserInfoService {
         entity = new GateWayUserInfo();
         BeanUtils.copyProperties(param, entity);
         entity.setState(MqttUserState.Enable);
-        entity.setRoleName(param.getUserName());
         gateWayUserInfoDao.save(entity);
         // 同步用户
         syncUserToMQTTService(entity);
-        return ResultContent.buildSuccess();
+        return ResultContent.buildSuccess(entity);
     }
 
     /**
@@ -178,6 +218,7 @@ public class GateWayUserInfoService {
     public ResultContent syncUserToMQTTService(GateWayUserInfo gateWayUserInfo) {
         //todo同步用户
         List<MqttInfo> list = mqttInfoDao.findAll();
+        log.info("syncUserToMQTTService {}", list.size());
         if (ObjectUtils.isNotEmpty(list)) {
             for (MqttInfo mqttInfo : list) {
                 try {
@@ -189,6 +230,8 @@ public class GateWayUserInfoService {
                     MBeanServerConnection connection = connector.getMBeanServerConnection();
                     ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", mqttInfo.getBrokerName()).getActiveMQServerObjectName();
                     ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
+
+                    // 添加账号
                     addressControl.addUser(gateWayUserInfo.getUserName(), gateWayUserInfo.getPassWord(), gateWayUserInfo.getRoleName(), false);
 
                     Mqtt2User mqtt2User = mqtt2UserDao.findTopByMqttInfoAndGateWayUserInfo(mqttInfo, gateWayUserInfo);
@@ -196,7 +239,11 @@ public class GateWayUserInfoService {
                         mqtt2User = new Mqtt2User();
                     }
                     mqtt2User.setGateWayUserInfo(gateWayUserInfo);
+                    mqtt2User.setUserName(gateWayUserInfo.getUserName());
+
                     mqtt2User.setMqttInfo(mqttInfo);
+                    mqtt2User.setMqttInfoName(mqttInfo.getName());
+
                     mqtt2User.setIsSync(Boolean.TRUE);
                     mqtt2User.setSyncTime(System.currentTimeMillis());
                     mqtt2User.setSyncTimeStr(DateUtils.paresTime(System.currentTimeMillis(), DateUtils.FORMAT_LONG));

+ 13 - 2
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/service/mqtt/MqttInfoService.java

@@ -1,6 +1,7 @@
 package com.zhongshu.iot.server.core.service.mqtt;
 
 import com.github.microservice.models.type.CommonState;
+import com.github.microservice.net.ResultContent;
 import com.zhongshu.iot.client.model.mqtt.MqttInfoAddParam;
 import com.zhongshu.iot.client.model.mqtt.MqttInfoModel;
 import com.zhongshu.iot.client.model.mqtt.MqttInfoReturnModel;
@@ -8,9 +9,10 @@ import com.zhongshu.iot.client.model.mqtt.MqttInfoSimpleModel;
 import com.zhongshu.iot.client.type.type.AddressType;
 import com.zhongshu.iot.server.core.dao.mqtt.Mqtt2UserDao;
 import com.zhongshu.iot.server.core.dao.mqtt.MqttInfoDao;
+import com.zhongshu.iot.server.core.domain.iot.mqtt.GateWayInfo;
+import com.zhongshu.iot.server.core.domain.iot.mqtt.GateWayUserInfo;
 import com.zhongshu.iot.server.core.domain.iot.mqtt.MqttInfo;
 import com.zhongshu.iot.server.core.util.bean.BeanUtils;
-import com.github.microservice.net.ResultContent;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.ObjectUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -36,6 +38,9 @@ public class MqttInfoService {
     @Autowired
     private Mqtt2UserDao mqtt2UserDao;
 
+    @Autowired
+    private GateWayUserInfoService gateWayUserInfoService;
+
     /**
      * 初始数据
      */
@@ -53,6 +58,8 @@ public class MqttInfoService {
             mqttInfo.setAddress(AddressType.Common);
             mqttInfo.setState(CommonState.Enable);
             mqttInfo.setBrokerName("broker1");
+            mqttInfo.setJmxHost("172.24.50.53");
+            mqttInfo.setJmxPort("1098");
             mqttInfoDao.save(mqttInfo);
         }
     }
@@ -62,9 +69,13 @@ public class MqttInfoService {
      *
      * @return
      */
-    public MqttInfoReturnModel getCommonMqttInfo() {
+    public MqttInfoReturnModel getCommonMqttInfo(GateWayInfo gatewayInfo) {
         MqttInfoReturnModel mqttInfoSimpleModel = new MqttInfoReturnModel();
         MqttInfo mqttInfo = mqttInfoDao.findTopByAddress(AddressType.Common);
+
+        // 查询和绑定分配网关的mqtt账号
+        GateWayUserInfo gateWayUserInfo = gateWayUserInfoService.gateWayBindMqttUser(gatewayInfo);
+
         if (ObjectUtils.isNotEmpty(mqttInfo)) {
             mqttInfoSimpleModel.setBrokerAddress(String.format("%s:%s", mqttInfo.getBrokerHost(), mqttInfo.getBrokerPort()));
             mqttInfoSimpleModel.setBrokerUsername(mqttInfo.getUserName());

+ 87 - 0
OneCardIotServer/src/main/java/com/zhongshu/iot/server/core/util/test/Test153AddUser.java

@@ -0,0 +1,87 @@
+package com.zhongshu.iot.server.core.util.test;
+
+import com.zhongshu.iot.server.core.util.mqtt.MqttTopicUtils;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
+import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.util.StopWatch;
+
+import javax.management.MBeanServerConnection;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+/**
+ * @author TRX
+ * @date 2024/4/24
+ */
+@Slf4j
+public class Test153AddUser {
+
+    public static void main(String[] args) {
+        try {
+            log.info("-------------------- 开始 --------------------------");
+            JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://172.24.50.53:1098/jmxrmi");
+            @Cleanup JMXConnector connector = JMXConnectorFactory.connect(url, null);
+            connector.connect();
+            System.out.println("连接成功..............");
+            MBeanServerConnection connection = connector.getMBeanServerConnection();
+
+            ObjectName addressObjectName = ObjectNameBuilder.create("org.apache.activemq.artemis", "broke1").getActiveMQServerObjectName();
+
+            ActiveMQServerControl addressControl = MBeanServerInvocationHandler.newProxyInstance(connection, addressObjectName, ActiveMQServerControl.class, false);
+
+
+            // 查询用户 可以筛选特定的名称
+            String userListStr = addressControl.listUser("device2");
+            log.info("userListStr: {}", userListStr);
+            if (StringUtils.isNotEmpty(userListStr)) {
+                // 如果不存在用户会报错
+//                addressControl.removeUser("device1");
+            }
+            StopWatch stopWatch = new StopWatch();
+
+            stopWatch.start("开始添加用户");
+            for (int i = 0; i < 10000; i++) {
+                String name = "mqttUser" + i;
+                try {
+                    addressControl.removeUser(name);
+                } catch (Exception e) {
+                }
+//                String passWord = CommonUtil.generateRandomString(16);
+                String roleName = name;
+
+                String passWord = name;
+//                String roleName = "amq";
+                addressControl.addUser(name, passWord, roleName, true);
+
+                String addressMatch = MqttTopicUtils.buildDeviceAllTopic(name);
+//                addressControl.removeSecuritySettings(MqttTopicUtils.buildDeviceAllTopic(name));
+                addressControl.removeSecuritySettings(addressMatch);
+
+                log.info("addUser: {} {} ", name, addressMatch);
+                addressControl.addSecuritySettings(
+                        addressMatch,
+                        roleName, roleName, roleName, roleName, roleName, roleName, roleName,
+                        roleName, roleName, roleName);
+            }
+            stopWatch.stop();
+            log.info("耗时:{}", stopWatch.prettyPrint());
+
+//            // 添加用户 如果存在会报错
+//            addressControl.addUser("device2", "trx", "trx", false);
+//            // 删除用户 如果不存在会报错
+////            addressControl.removeUser("device1");
+//            log.info("添加用户成功1....");
+//            log.info("users: {}", addressControl.listUser(null));
+            log.info("-------------------- 结束 --------------------------");
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+    }
+}