|
|
@@ -5,8 +5,8 @@ import com.zhongshu.iot.client.model.artemis.OperationMessageSearchParam;
|
|
|
import com.zhongshu.iot.server.core.dao.base.BaseImpl;
|
|
|
import com.zhongshu.iot.server.core.dao.mqtt.extend.OperationMessageDaoExtend;
|
|
|
import com.zhongshu.iot.server.core.domain.iot.device.OperationMessage;
|
|
|
-import com.zhongshu.iot.server.core.util.CommonUtil;
|
|
|
import com.zhongshu.iot.server.core.util.TokenUtil;
|
|
|
+import org.apache.commons.lang3.ObjectUtils;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
@@ -33,6 +33,7 @@ import java.util.regex.Pattern;
|
|
|
public class OperationMessageDaoImpl extends BaseImpl implements OperationMessageDaoExtend {
|
|
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(OperationMessageDaoImpl.class);
|
|
|
+
|
|
|
@Autowired
|
|
|
private MongoTemplate mongoTemplate;
|
|
|
|
|
|
@@ -41,57 +42,27 @@ public class OperationMessageDaoImpl extends BaseImpl implements OperationMessag
|
|
|
|
|
|
@Override
|
|
|
public Page<OperationMessage> page(Pageable pageable, OperationMessageSearchParam param) {
|
|
|
- Criteria criteria = new Criteria();
|
|
|
-
|
|
|
- if (StringUtils.isNotEmpty(param.getDeviceId())) {
|
|
|
- criteria.and("deviceId").is(param.getDeviceId());
|
|
|
- }
|
|
|
-
|
|
|
- if (StringUtils.isNotEmpty(param.getEpId())) {
|
|
|
- criteria.and("epId").is(param.getEpId());
|
|
|
- }
|
|
|
-
|
|
|
- if (StringUtils.isNotEmpty(param.getGateWayId())) {
|
|
|
- criteria.and("gateWayId").is(param.getGateWayId());
|
|
|
- }
|
|
|
-
|
|
|
- if (param.getIsReceive() != null && param.getIsReceive()) {
|
|
|
- criteria.and("isReceive").is(param.getIsReceive());
|
|
|
- }
|
|
|
-
|
|
|
- // 时间范围
|
|
|
- if (!CommonUtil.longIsEmpty(param.getStartTime()) && !CommonUtil.longIsEmpty(param.getEndTime())) {
|
|
|
- criteria.and("createTime").gte(param.getStartTime()).lte(param.getEndTime());
|
|
|
- }
|
|
|
-
|
|
|
- // 模糊搜索
|
|
|
- List<Criteria> criterias = new ArrayList<>();
|
|
|
- if (StringUtils.isNotEmpty(param.getTopic())) {
|
|
|
- Pattern pattern = Pattern.compile("^.*" + param.getTopic() + ".*$");
|
|
|
- criterias.add(Criteria.where("topic").is(pattern));
|
|
|
- }
|
|
|
- if (!CollectionUtils.isEmpty(criterias)) {
|
|
|
- criteria.andOperator(criterias.toArray(new Criteria[]{}));
|
|
|
- }
|
|
|
- criteria.and("isDelete").is(Boolean.FALSE);
|
|
|
-
|
|
|
+ Criteria criteria = buildFilterCriteria(param);
|
|
|
Sort sort = buildSort(param);
|
|
|
Query query = Query.query(criteria);
|
|
|
query.with(sort);
|
|
|
return dbHelper.pages(query, pageable, OperationMessage.class);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public Long count(OperationMessageSearchParam param) {
|
|
|
+ Criteria criteria = buildFilterCriteria(param);
|
|
|
+ Query query = Query.query(criteria);
|
|
|
+ return mongoTemplate.count(query, OperationMessage.class);
|
|
|
+ }
|
|
|
+
|
|
|
public OperationMessage init(String dataId, String token) {
|
|
|
OperationMessage doc = null;
|
|
|
try {
|
|
|
Query query = Query.query(Criteria.where("dataId").is(dataId).and("token").isNull());
|
|
|
- Update update = new Update()
|
|
|
- .set("token", token)
|
|
|
- .set("createTime", System.currentTimeMillis());
|
|
|
- FindAndModifyOptions options = new FindAndModifyOptions().upsert(true)
|
|
|
- .returnNew(true);
|
|
|
- doc = mongoTemplate.findAndModify(query, update, options,
|
|
|
- OperationMessage.class);
|
|
|
+ Update update = new Update().set("token", token).set("createTime", System.currentTimeMillis());
|
|
|
+ FindAndModifyOptions options = new FindAndModifyOptions().upsert(true).returnNew(true);
|
|
|
+ doc = mongoTemplate.findAndModify(query, update, options, OperationMessage.class);
|
|
|
} catch (Exception e) {
|
|
|
log.error("init {}", e.getMessage());
|
|
|
}
|
|
|
@@ -101,15 +72,11 @@ public class OperationMessageDaoImpl extends BaseImpl implements OperationMessag
|
|
|
public String acquire(String dataId, Long expiration) {
|
|
|
Query query = Query.query(Criteria.where("dataId").is(dataId).and("token").isNull());
|
|
|
String token = TokenUtil.create();
|
|
|
- Update update = new Update()
|
|
|
- .set("expireAt", System.currentTimeMillis() + expiration)
|
|
|
- .set("token", token);
|
|
|
+ Update update = new Update().set("expireAt", System.currentTimeMillis() + expiration).set("token", token);
|
|
|
|
|
|
- FindAndModifyOptions options = new FindAndModifyOptions().upsert(false)
|
|
|
- .returnNew(true);
|
|
|
+ FindAndModifyOptions options = new FindAndModifyOptions().upsert(false).returnNew(true);
|
|
|
|
|
|
- OperationMessage doc = mongoTemplate.findAndModify(query, update, options,
|
|
|
- OperationMessage.class);
|
|
|
+ OperationMessage doc = mongoTemplate.findAndModify(query, update, options, OperationMessage.class);
|
|
|
|
|
|
if (doc == null) {
|
|
|
OperationMessage lockObj = mongoTemplate.findOne(Query.query(Criteria.where("dataId").is(dataId)), OperationMessage.class);
|
|
|
@@ -129,11 +96,55 @@ public class OperationMessageDaoImpl extends BaseImpl implements OperationMessag
|
|
|
Query releaseQuery = Query.query(Criteria.where("dataId").is(dataId));
|
|
|
Update releaseUpdate = new Update().set("expireAt", null).set("token", null);
|
|
|
|
|
|
- FindAndModifyOptions releaseOptions = new FindAndModifyOptions().upsert(true)
|
|
|
- .returnNew(true);
|
|
|
- OperationMessage flowDisposition = mongoTemplate.findAndModify(releaseQuery, releaseUpdate, releaseOptions,
|
|
|
- OperationMessage.class);
|
|
|
+ FindAndModifyOptions releaseOptions = new FindAndModifyOptions().upsert(true).returnNew(true);
|
|
|
+ OperationMessage flowDisposition = mongoTemplate.findAndModify(releaseQuery, releaseUpdate, releaseOptions, OperationMessage.class);
|
|
|
return StringUtils.isEmpty(flowDisposition.getToken());
|
|
|
}
|
|
|
|
|
|
+ public Criteria buildFilterCriteria(OperationMessageSearchParam param) {
|
|
|
+ Criteria criteria = buildCriteria(param);
|
|
|
+
|
|
|
+ if (StringUtils.isNotEmpty(param.getDeviceId())) {
|
|
|
+ criteria.and("deviceId").is(param.getDeviceId());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (StringUtils.isNotEmpty(param.getEpId())) {
|
|
|
+ criteria.and("epId").is(param.getEpId());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (StringUtils.isNotEmpty(param.getGateWayId())) {
|
|
|
+ criteria.and("gateWayId").is(param.getGateWayId());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (param.getIsReceive() != null && param.getIsReceive()) {
|
|
|
+ criteria.and("isReceive").is(param.getIsReceive());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (ObjectUtils.isNotEmpty(param.getEvents())) {
|
|
|
+ if (ObjectUtils.isNotEmpty(param.getNevents())) {
|
|
|
+ criteria.and("event").in(param.getEvents()).nin(param.getNevents());
|
|
|
+ } else {
|
|
|
+ criteria.and("event").in(param.getEvents());
|
|
|
+ }
|
|
|
+ } else if (ObjectUtils.isNotEmpty(param.getNevents())) {
|
|
|
+ criteria.and("event").nin(param.getNevents());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 消息业务类型
|
|
|
+ if (param.getOperationBusType() != null) {
|
|
|
+ criteria.and("operationBusType").is(param.getOperationBusType());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 模糊搜索
|
|
|
+ List<Criteria> criterias = new ArrayList<>();
|
|
|
+ if (StringUtils.isNotEmpty(param.getTopic())) {
|
|
|
+ Pattern pattern = Pattern.compile("^.*" + param.getTopic() + ".*$");
|
|
|
+ criterias.add(Criteria.where("topic").is(pattern));
|
|
|
+ }
|
|
|
+ if (!CollectionUtils.isEmpty(criterias)) {
|
|
|
+ criteria.andOperator(criterias.toArray(new Criteria[]{}));
|
|
|
+ }
|
|
|
+ return criteria;
|
|
|
+ }
|
|
|
+
|
|
|
}
|