|
|
@@ -0,0 +1,139 @@
|
|
|
+package com.zhongshu.iot.server.core.dao.mqtt.impl;
|
|
|
+
|
|
|
+import com.github.microservice.components.data.mongo.mongo.helper.DBHelper;
|
|
|
+import com.zhongshu.iot.client.model.artemis.OperationMessageSearchParam;
|
|
|
+import com.zhongshu.iot.server.core.dao.base.BaseImpl;
|
|
|
+import com.zhongshu.iot.server.core.dao.mqtt.extend.OperationMessageDaoExtend;
|
|
|
+import com.zhongshu.iot.server.core.domain.iot.mqtt.OperationMessage;
|
|
|
+import com.zhongshu.iot.server.core.util.CommonUtil;
|
|
|
+import com.zhongshu.iot.server.core.util.TokenUtil;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.data.domain.Page;
|
|
|
+import org.springframework.data.domain.Pageable;
|
|
|
+import org.springframework.data.domain.Sort;
|
|
|
+import org.springframework.data.mongodb.core.FindAndModifyOptions;
|
|
|
+import org.springframework.data.mongodb.core.MongoTemplate;
|
|
|
+import org.springframework.data.mongodb.core.query.Criteria;
|
|
|
+import org.springframework.data.mongodb.core.query.Query;
|
|
|
+import org.springframework.data.mongodb.core.query.Update;
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
+
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+import java.util.regex.Pattern;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @Author TRX
|
|
|
+ * @CreateDate: 2023/4/12
|
|
|
+ * @Version: 1.0
|
|
|
+ */
|
|
|
+public class OperationMessageDaoImpl extends BaseImpl implements OperationMessageDaoExtend {
|
|
|
+
|
|
|
+ private static final Logger log = LoggerFactory.getLogger(OperationMessageDaoImpl.class);
|
|
|
+ @Autowired
|
|
|
+ private MongoTemplate mongoTemplate;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private DBHelper dbHelper;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Page<OperationMessage> page(Pageable pageable, OperationMessageSearchParam param) {
|
|
|
+ Criteria criteria = new Criteria();
|
|
|
+
|
|
|
+ if (StringUtils.isNotEmpty(param.getDeviceId())) {
|
|
|
+ criteria.and("deviceId").is(param.getDeviceId());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (StringUtils.isNotEmpty(param.getEpId())) {
|
|
|
+ criteria.and("epId").is(param.getEpId());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (StringUtils.isNotEmpty(param.getGateWayId())) {
|
|
|
+ criteria.and("gateWayId").is(param.getGateWayId());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (param.getIsReceive() != null && param.getIsReceive()) {
|
|
|
+ criteria.and("isReceive").is(param.getIsReceive());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 时间范围
|
|
|
+ if (!CommonUtil.longIsEmpty(param.getStartTime()) && !CommonUtil.longIsEmpty(param.getEndTime())) {
|
|
|
+ criteria.and("createTime").gte(param.getStartTime()).lte(param.getEndTime());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 模糊搜索
|
|
|
+ List<Criteria> criterias = new ArrayList<>();
|
|
|
+ if (StringUtils.isNotEmpty(param.getTopic())) {
|
|
|
+ Pattern pattern = Pattern.compile("^.*" + param.getTopic() + ".*$");
|
|
|
+ criterias.add(Criteria.where("topic").is(pattern));
|
|
|
+ }
|
|
|
+ if (!CollectionUtils.isEmpty(criterias)) {
|
|
|
+ criteria.andOperator(criterias.toArray(new Criteria[]{}));
|
|
|
+ }
|
|
|
+ criteria.and("isDelete").is(Boolean.FALSE);
|
|
|
+
|
|
|
+ Sort sort = buildSort(param);
|
|
|
+ Query query = Query.query(criteria);
|
|
|
+ query.with(sort);
|
|
|
+ return dbHelper.pages(query, pageable, OperationMessage.class);
|
|
|
+ }
|
|
|
+
|
|
|
+ public OperationMessage init(String dataId, String token) {
|
|
|
+ OperationMessage doc = null;
|
|
|
+ try {
|
|
|
+ Query query = Query.query(Criteria.where("dataId").is(dataId).and("token").isNull());
|
|
|
+ Update update = new Update()
|
|
|
+ .set("token", token)
|
|
|
+ .set("createTime", System.currentTimeMillis());
|
|
|
+ FindAndModifyOptions options = new FindAndModifyOptions().upsert(true)
|
|
|
+ .returnNew(true);
|
|
|
+ doc = mongoTemplate.findAndModify(query, update, options,
|
|
|
+ OperationMessage.class);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("init {}", e.getMessage());
|
|
|
+ }
|
|
|
+ return doc;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String acquire(String dataId, Long expiration) {
|
|
|
+ Query query = Query.query(Criteria.where("dataId").is(dataId).and("token").isNull());
|
|
|
+ String token = TokenUtil.create();
|
|
|
+ Update update = new Update()
|
|
|
+ .set("expireAt", System.currentTimeMillis() + expiration)
|
|
|
+ .set("token", token);
|
|
|
+
|
|
|
+ FindAndModifyOptions options = new FindAndModifyOptions().upsert(false)
|
|
|
+ .returnNew(true);
|
|
|
+
|
|
|
+ OperationMessage doc = mongoTemplate.findAndModify(query, update, options,
|
|
|
+ OperationMessage.class);
|
|
|
+
|
|
|
+ if (doc == null) {
|
|
|
+ OperationMessage lockObj = mongoTemplate.findOne(Query.query(Criteria.where("dataId").is(dataId)), OperationMessage.class);
|
|
|
+ doc = lockObj;
|
|
|
+ }
|
|
|
+ boolean locked = doc != null && doc.getToken() != null && doc.getToken().equals(token);
|
|
|
+ // 如果已过期
|
|
|
+ if (!locked && doc != null && doc.getExpireAt() < System.currentTimeMillis()) {
|
|
|
+ release(dataId);
|
|
|
+ // 成功释放锁, 再次尝试获取锁
|
|
|
+ return this.acquire(dataId, expiration);
|
|
|
+ }
|
|
|
+ return locked ? token : null;
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean release(String dataId) {
|
|
|
+ Query releaseQuery = Query.query(Criteria.where("dataId").is(dataId));
|
|
|
+ Update releaseUpdate = new Update().set("expireAt", null).set("token", null);
|
|
|
+
|
|
|
+ FindAndModifyOptions releaseOptions = new FindAndModifyOptions().upsert(true)
|
|
|
+ .returnNew(true);
|
|
|
+ OperationMessage flowDisposition = mongoTemplate.findAndModify(releaseQuery, releaseUpdate, releaseOptions,
|
|
|
+ OperationMessage.class);
|
|
|
+ return StringUtils.isEmpty(flowDisposition.getToken());
|
|
|
+ }
|
|
|
+
|
|
|
+}
|