Browse Source

feat(app): 引入RabbitMQ延迟消息处理订单超时- 添加RabbitMQ配置及依赖,支持延迟消息处理- 实现订单15分钟未支付自动取消功能
- 替换原有的Redisson延迟队列实现
- 新增临时约课字段及处理逻辑
-优化用户核验记录去重逻辑
- 开放利润分润接口访问权限

wzq 1 week ago
parent
commit
b88276d5d7
14 changed files with 262 additions and 29 deletions
  1. 1 0
      national-motion-base-core/src/main/java/org/jeecg/config/shiro/ShiroConfig.java
  2. 6 0
      national-motion-module-system/national-motion-system-biz/pom.xml
  3. 5 13
      national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/app/controller/commercial/CommercialController.java
  4. 12 9
      national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/app/service/impl/OrderServiceImpl.java
  5. 8 0
      national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/pay/paytest/payController.java
  6. 71 0
      national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/rabbitmq/DelayedMessageListener.java
  7. 53 0
      national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/rabbitmq/DelayedMessageService.java
  8. 20 0
      national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/rabbitmq/RabbitListenerConfig.java
  9. 41 0
      national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/rabbitmq/RabbitMQConfig.java
  10. 4 4
      national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/redission/RedissonDelayQueue.java
  11. 6 0
      national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/system/app/entity/AppCoursesVerificationRecord.java
  12. 19 3
      national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/system/app/service/impl/AppCoureseServiceImpl.java
  13. 10 0
      national-motion-module-system/national-motion-system-start/src/main/resources/application-dev.yml
  14. 6 0
      pom.xml

+ 1 - 0
national-motion-base-core/src/main/java/org/jeecg/config/shiro/ShiroConfig.java

@@ -124,6 +124,7 @@ public class ShiroConfig {
 //        filterChainDefinitionMap.put("/app/detail/getCourseInfo", "anon");
         filterChainDefinitionMap.put("/app/stadium/getPlaceInfo", "anon");
         filterChainDefinitionMap.put("/test/**", "anon");//测试
+        filterChainDefinitionMap.put("/profitSharing/**", "anon");//测试
 
 
 //        filterChainDefinitionMap.put("/app/user/**", "anon");//小程序相关

+ 6 - 0
national-motion-module-system/national-motion-system-biz/pom.xml

@@ -71,6 +71,12 @@
             <artifactId>bcprov-jdk18on</artifactId>
             <version>1.82</version>
         </dependency>
+
+        <!-- rabbitMQ -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-amqp</artifactId>
+        </dependency>
 	</dependencies>
 	
 </project>

+ 5 - 13
national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/app/controller/commercial/CommercialController.java

@@ -12,6 +12,7 @@ import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shiro.SecurityUtils;
 import org.jeecg.common.api.vo.Result;
+import org.jeecg.common.constant.CommonConstant;
 import org.jeecg.common.system.vo.LoginUser;
 import org.jeecg.modules.app.form.ClassPostponeForm;
 import org.jeecg.modules.app.form.CourseQueryUsersForm;
@@ -110,6 +111,9 @@ public class CommercialController {
     public Result<List<AppCoursesVerificationRecord>> courseQueryUsers(@RequestBody CourseQueryUsersForm form) {
         List<AppCoursesVerificationRecord> verificationRecords =
                 appCoursesVerificationRecordService.courseQueryUsersList( form.getCoursePriceRulesId(),form.getOrPostpone(), form.getVerifyStatus());
+        List<AppCoursesVerificationRecord> temporaryCourseList =
+                appCoursesVerificationRecordService.list(Wrappers.lambdaQuery(AppCoursesVerificationRecord.class).eq(AppCoursesVerificationRecord::getOrTemporaryCourse, CommonConstant.NUMBER_1));
+        verificationRecords.addAll(temporaryCourseList);
         List<AppCoursesVerificationRecord> list = verificationRecords.stream()
                 .collect(Collectors.collectingAndThen(
                         Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing(AppCoursesVerificationRecord::getUseUserId))),
@@ -190,7 +194,7 @@ public class CommercialController {
     }
 
     /**
-     * 扫码核销(查询订单详情)
+     * 扫码核销确认
      *
      * @param orderProInfoIds
      * @return
@@ -251,16 +255,4 @@ public class CommercialController {
     public Result<String> repealVerifyRecord(@Schema(description = "appIsinId") @RequestParam(name = "appIsinId") String appIsinId) throws ParseException {
         return Result.OK(appIsinService.repealVerifyRecord(appIsinId));
     }
-
-//    private final RedissonDelayQueue redissonDelayQueue;
-    /**
-     * 测试超时未支付延时任务
-     */
-    @GetMapping("/orderTimeOutTask")
-    public void orderTimeOutTask() {
-        log.info("测试超时未支付延时任务-开始");
-        String task = "OrderTimeOutTask_1953753939752968194";
-//        redissonDelayQueue.offerTask(task, 30);
-        log.info("测试超时未支付延时任务-结束");
-    }
 }

+ 12 - 9
national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/app/service/impl/OrderServiceImpl.java

@@ -37,6 +37,7 @@ import org.jeecg.modules.app.vo.QueryOrderVerifyRecordsVO;
 import org.jeecg.modules.pay.config.WechatConstants;
 import org.jeecg.modules.pay.config.WechatPayV3Utils;
 import org.jeecg.modules.pay.config.WechatUrlConstants;
+import org.jeecg.modules.rabbitmq.DelayedMessageService;
 import org.jeecg.modules.redission.RedissonDelayQueue;
 import org.jeecg.modules.system.app.dto.receiptPaymentDetails.ReceiptPaymentDetailsInfoVo;
 import org.jeecg.modules.system.app.entity.*;
@@ -114,22 +115,24 @@ public class OrderServiceImpl extends ServiceImpl<AppOrderMapper, AppOrder> impl
     private AppCoursesVerificationRecordMapper appCoursesVerificationRecordMapper;
     @Resource
     private AppContractSignMapper appContractSignMapper;
-    @Resource
-    private RedissonDelayQueue redissonDelayQueue;
+//    @Resource
+//    private RedissonDelayQueue redissonDelayQueue;
     @Resource
     private AppDeviceMapper appDeviceMapper;
     @Resource
     private WeChatPayService weChatPayService;
     @Resource
     private RedisTemplate<String, Object> redisTemplate;
-    @Resource
-    private WechatPayV3Utils wechatPayV3Utils;
+//    @Resource
+//    private WechatPayV3Utils wechatPayV3Utils;
     @Resource
     private EvaluateMapper evaluateMapper;
     @Resource
     private SysDepartMapper sysDepartMapper;
     @Resource
     private SeparateAccountsMapper separateAccountsMapper;
+    @Resource
+    private DelayedMessageService delayedMessageService;
 
 
     @Override
@@ -1213,12 +1216,12 @@ public class OrderServiceImpl extends ServiceImpl<AppOrderMapper, AppOrder> impl
             payForm.setParams(result);
 
             //发布任务到redission延迟队列(16分钟)
-            String task = CommonConstant.ORDER_TIME_OUT_TASK_PREFIX + appOrder.getId();
-            redissonDelayQueue.offerTask(task, 60 * 16);
+//            String task = CommonConstant.ORDER_TIME_OUT_TASK_PREFIX + appOrder.getId();
+//            redissonDelayQueue.offerTask(task, 60 * 16);
+
+            //发送延迟消息
+            delayedMessageService.sendOrderMessage(appOrder.getId());
 
-            //发布任务到redission延迟队列(16分钟)
-            String task2 = CommonConstant.ORDER_TIME_TASK_PREFIX + appOrder.getId();
-            redissonDelayQueue.offerTask(task2, 60 * 16);
         }
         return payForm;
     }

+ 8 - 0
national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/pay/paytest/payController.java

@@ -20,10 +20,12 @@ import org.jeecg.modules.pay.config.WeChatProfitSharingService;
 import org.jeecg.modules.pay.serverPay.HttpClientUtil;
 import org.jeecg.modules.pay.serverPay.PayKit;
 import org.jeecg.modules.pay.serverPay.RsaKit;
+import org.jeecg.modules.rabbitmq.DelayedMessageService;
 import org.jeecg.modules.system.app.entity.AppOrder;
 import org.jeecg.modules.system.app.service.IAppOrderService;
 import org.springframework.web.bind.annotation.*;
 
+import javax.annotation.Resource;
 import java.math.BigDecimal;
 import java.math.RoundingMode;
 import java.security.PrivateKey;
@@ -264,6 +266,12 @@ public class payController {
         }
         return null;
     }
+    private final DelayedMessageService delayedMessageService;
+    @GetMapping(value = "/test/rabbitTest")
+    public void rabbitTest(@RequestParam("msg") String msg) {
+        delayedMessageService.send25DayMessage( msg);
+        log.info("接口调用,发送消息成功");
+    }
 
     @GetMapping(value = "/test2")
     public JSONObject CertificateDownloaderTest() throws Exception {

+ 71 - 0
national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/rabbitmq/DelayedMessageListener.java

@@ -0,0 +1,71 @@
+package org.jeecg.modules.rabbitmq;
+
+
+import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.util.ObjectUtil;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.rabbitmq.client.Channel;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.jeecg.common.constant.CommonConstant;
+import org.jeecg.modules.system.app.entity.AppOrder;
+import org.jeecg.modules.system.app.entity.AppOrderProInfo;
+import org.jeecg.modules.system.app.service.IAppOrderProInfoService;
+import org.jeecg.modules.system.app.service.IAppOrderService;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+@Slf4j
+@Component
+@AllArgsConstructor
+public class DelayedMessageListener {
+
+    private final IAppOrderService appOrderService;
+    private final IAppOrderProInfoService appOrderProInfoService;
+
+    @RabbitListener(queues = RabbitMQConfig.DELAY_QUEUE)
+    public void handleMessage(Message message, Channel channel) throws IOException {
+        try {
+            String orderId = new String(message.getBody());
+            log.info("收到延迟消息,订单ID,{}:",orderId);
+
+            // 业务逻辑处理
+            orderProcessMessage(orderId);
+
+            // 手动确认成功
+            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+        } catch (Exception e) {
+            // 处理失败,拒绝消息并重新入队(或进入死信队列)
+            log.error("处理延迟消息失败:{}", e.getMessage());
+            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
+        }
+    }
+
+    private void orderProcessMessage(String orderId) {
+        // 业务处理
+        log.info("处理订单消息:{}",orderId);
+
+        AppOrder appOrder = appOrderService.getById(orderId);
+        if(ObjectUtil.isNotEmpty(appOrder)){
+            if (Objects.equals(appOrder.getOrderStatus(), CommonConstant.ORDER_STATUS_0) && appOrder.getRevision() == 0) {
+                log.info("修改订单:{},支付状态为已取消", orderId);
+                appOrder.setOrderStatus(CommonConstant.ORDER_STATUS_4);
+                appOrderService.updateById(appOrder);
+                //修改子订单状态
+                List<AppOrderProInfo> appOrderProInfoList = appOrderProInfoService.list(Wrappers.<AppOrderProInfo>lambdaQuery().eq(AppOrderProInfo::getOrderId, orderId));
+                if (CollUtil.isNotEmpty(appOrderProInfoList)){
+                    for (AppOrderProInfo appOrderProInfo : appOrderProInfoList) {
+                        appOrderProInfo.setOrderStatus(CommonConstant.ORDER_STATUS_4);
+                        appOrderProInfoService.updateById(appOrderProInfo);
+                    }
+                }
+            }
+
+        }
+    }
+}

+ 53 - 0
national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/rabbitmq/DelayedMessageService.java

@@ -0,0 +1,53 @@
+package org.jeecg.modules.rabbitmq;
+
+import org.springframework.amqp.core.MessageDeliveryMode;
+import org.springframework.amqp.core.MessagePostProcessor;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class DelayedMessageService {
+
+    @Autowired
+    private RabbitTemplate rabbitTemplate;
+
+    public void send25DayMessage(String message) {
+        MessagePostProcessor processor = msg -> {
+            // 设置延迟时间(25天毫秒数)
+            msg.getMessageProperties().setDelay(60 * 1000);
+            // 强制持久化消息
+            msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
+            return msg;
+        };
+
+        rabbitTemplate.convertAndSend(
+                RabbitMQConfig.DELAY_EXCHANGE,
+                RabbitMQConfig.DELAY_ROUTING_KEY,
+                message,
+                processor
+        );
+    }
+
+    /**
+     * 发送延迟消息:订单15分钟超时未支付取消
+     *
+     * @param message
+     */
+    public void sendOrderMessage(String message) {
+        MessagePostProcessor processor = msg -> {
+            // 设置延迟时间(15分钟毫秒数)
+            msg.getMessageProperties().setDelay(15 * 60 * 1000);
+            // 强制持久化消息
+            msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
+            return msg;
+        };
+
+        rabbitTemplate.convertAndSend(
+                RabbitMQConfig.DELAY_EXCHANGE,
+                RabbitMQConfig.DELAY_ROUTING_KEY,
+                message,
+                processor
+        );
+    }
+}

+ 20 - 0
national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/rabbitmq/RabbitListenerConfig.java

@@ -0,0 +1,20 @@
+package org.jeecg.modules.rabbitmq;
+
+import org.springframework.amqp.core.AcknowledgeMode;
+import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class RabbitListenerConfig {
+
+    @Bean
+    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
+        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
+        factory.setConnectionFactory(connectionFactory);
+        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动确认
+        factory.setPrefetchCount(1000); // 预取消息数
+        return factory;
+    }
+}

+ 41 - 0
national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/rabbitmq/RabbitMQConfig.java

@@ -0,0 +1,41 @@
+package org.jeecg.modules.rabbitmq;
+
+import org.springframework.amqp.core.*;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+public class RabbitMQConfig {
+
+    /**
+     * ---------------------------------------------订单超时未支付取消
+     */
+    public static final String DELAY_EXCHANGE = "order_delayed_exchange";
+    public static final String DELAY_QUEUE = "order_delayed_queue";
+    public static final String DELAY_ROUTING_KEY = "order_delayed_key";
+
+    // 声明延迟交换机(持久化)
+    @Bean
+    public CustomExchange delayExchange() {
+        Map<String, Object> args = new HashMap<>();
+        args.put("x-delayed-type", "direct"); // 延迟交换机类型
+        return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, args);
+    }
+
+    // 声明持久化队列
+    @Bean
+    public Queue delayQueue() {
+        return QueueBuilder.durable(DELAY_QUEUE)
+                .withArgument("x-dead-letter-exchange", "dlx_exchange") // 死信交换机(可选)
+                .build();
+    }
+
+    // 绑定队列到交换机
+    @Bean
+    public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {
+        return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
+    }
+}

+ 4 - 4
national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/redission/RedissonDelayQueue.java

@@ -27,7 +27,7 @@ import java.util.stream.Collectors;
  * @Date 2025/8/15
  * @Desc 延迟队列实现订单超时取消
  */
-@Component
+//@Component
 @Slf4j
 public class RedissonDelayQueue {
 
@@ -46,11 +46,11 @@ public class RedissonDelayQueue {
     private RDelayedQueue<String> expireDelayQueue;
     private RBlockingQueue<String> expireBlockingQueue;
 
-    @PostConstruct
+//    @PostConstruct
     public void init() {
         initDelayQueue();
-        startDelayQueueConsumer();
-        startExpireDelayQueueConsumer();
+//        startDelayQueueConsumer();
+//        startExpireDelayQueueConsumer();
     }
 
     private void initDelayQueue() {

+ 6 - 0
national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/system/app/entity/AppCoursesVerificationRecord.java

@@ -122,6 +122,12 @@ public class AppCoursesVerificationRecord implements Serializable {
     @Excel(name = "是否延课(0-未延课 1-已延课)", width = 15)
     @Schema(description = "是否延课(0-未延课 1-已延课)")
     private Integer orPostpone;
+    /**
+     * 是否临时约课(0-否 1-是)
+     */
+    @Excel(name = "是否临时约课(0-否 1-是)", width = 15)
+    @Schema(description = "是否临时约课(0-否 1-是)")
+    private Integer orTemporaryCourse;
     /**
      * 延课原因
      */

+ 19 - 3
national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/system/app/service/impl/AppCoureseServiceImpl.java

@@ -38,6 +38,9 @@ import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import static org.jeecg.common.constant.CommonConstant.SC_INTERNAL_SERVER_ERROR_500;
@@ -301,6 +304,17 @@ public class AppCoureseServiceImpl extends ServiceImpl<AppCoursesMapper, AppCour
         return dtoList;
     }
 
+    /** 自定义去重工具方法
+     * 通过 Predicate结合 ConcurrentHashMap记录已出现的字段值,动态过滤重复项
+     * @param keyExtractor
+     * @return
+     * @param <T>
+     */
+    private static <T> Predicate<T> distinctByKey(Function<? super T, ?> keyExtractor) {
+        Set<Object> seen = ConcurrentHashMap.newKeySet();
+        return t -> seen.add(keyExtractor.apply(t));
+    }
+
     @Override
     public List<CoursesPriceRulesVO> getCourseInfo(String courseId) {
 
@@ -330,13 +344,14 @@ public class AppCoureseServiceImpl extends ServiceImpl<AppCoursesMapper, AppCour
             //共计
             coursesPriceRulesVO.setTotalNum(verificationRecords.size());
             //延期
-            long postponeNum = verificationRecords.stream().filter(e -> e.getOrPostpone() == 1).count();
+            long postponeNum = verificationRecords.stream().filter(e -> e.getOrPostpone() == 1).filter(distinctByKey(AppCoursesVerificationRecord::getUseUserId)).count();
             coursesPriceRulesVO.setPostponeNum((int) postponeNum);
             //已核验
-            long writtenOffNum = verificationRecords.stream().filter(e -> e.getVerifyStatus() == 1).count();
+            long writtenOffNum = verificationRecords.stream().filter(e -> e.getVerifyStatus() == 1).filter(distinctByKey(AppCoursesVerificationRecord::getUseUserId)).count();
             coursesPriceRulesVO.setWrittenOffNum((int) writtenOffNum);
             //未核验
-            long unwrittenOffNum = verificationRecords.stream().filter(e -> e.getVerifyStatus() == 0 && e.getOrPostpone() == 0).count();
+            long unwrittenOffNum =
+                    verificationRecords.stream().filter(e -> e.getVerifyStatus() == 0 && e.getOrPostpone() == 0).filter(distinctByKey(AppCoursesVerificationRecord::getUseUserId)) .count();
             coursesPriceRulesVO.setUnwrittenOffNum((int) unwrittenOffNum);
 
             //判断是否为当日数据
@@ -398,6 +413,7 @@ public class AppCoureseServiceImpl extends ServiceImpl<AppCoursesMapper, AppCour
             appCoursesVerificationRecord.setUseUserName(familyMembers.getFullName());
             appCoursesVerificationRecord.setVerifyStatus(0);
             appCoursesVerificationRecord.setOrPostpone(0);
+            appCoursesVerificationRecord.setOrTemporaryCourse(1);
             appCoursesVerificationRecord.setCoursesType(0);
 
             int insert = appCoursesVerificationRecordMapper.insert(appCoursesVerificationRecord);

+ 10 - 0
national-motion-module-system/national-motion-system-start/src/main/resources/application-dev.yml

@@ -46,6 +46,16 @@ spring:
       mail.smtp.auth: true
       smtp.ssl.enable: true
       mail.debug: true  # 启用调试模式(查看详细日志)
+  rabbitmq:
+    host: localhost
+    port: 5672
+    username: admin
+    password: admin123
+    listener:
+      simple:
+        acknowledge-mode: manual  # 手动确认
+        concurrency: 5
+        prefetch: 1000
   ## quartz定时任务,采用数据库方式
   quartz:
     job-store-type: jdbc

+ 6 - 0
pom.xml

@@ -131,6 +131,12 @@
 			<artifactId>spring-boot-starter-test</artifactId>
 			<scope>test</scope>
 		</dependency>
+
+        <!-- rabbitMQ -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-amqp</artifactId>
+        </dependency>
 		<!-- Lombok -->
 		<dependency>
 			<groupId>org.projectlombok</groupId>