ChargingJob.java 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package com.zsElectric.boot.charging.quartz;
  2. import com.fasterxml.jackson.core.JsonProcessingException;
  3. import com.zsElectric.boot.charging.entity.ThirdPartyConnectorInfo;
  4. import com.zsElectric.boot.charging.mapper.ThirdPartyConnectorInfoMapper;
  5. import com.zsElectric.boot.charging.service.ChargingBusinessService;
  6. import com.zsElectric.boot.charging.vo.ChargingPricePolicyVO;
  7. import com.zsElectric.boot.charging.vo.QueryStationsInfoVO;
  8. import lombok.RequiredArgsConstructor;
  9. import lombok.extern.slf4j.Slf4j;
  10. import org.springframework.stereotype.Component;
  11. import org.springframework.util.CollectionUtils;
  12. import java.time.LocalDateTime;
  13. import java.time.format.DateTimeFormatter;
  14. import java.util.List;
  15. /**
  16. * 充电站信息同步定时任务
  17. *
  18. * @author system
  19. * @since 2025-12-11
  20. */
  21. @Slf4j
  22. @Component
  23. @RequiredArgsConstructor
  24. public class ChargingJob {
  25. private final ChargingBusinessService chargingBusinessService;
  26. private final ThirdPartyConnectorInfoMapper connectorInfoMapper;
  27. // 任务执行标记,防止并发执行
  28. private volatile boolean isPricePolicySyncRunning = false;
  29. private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  30. /**
  31. * 同步充电站信息
  32. * 每5分钟执行一次,从第三方接口获取充电站信息并存储到数据库
  33. */
  34. // @Scheduled(cron = "0 0/15 * * * ?")
  35. public void syncStationsInfo() {
  36. log.info("开始执行充电站信息同步定时任务");
  37. try {
  38. // 获取当前时间作为查询时间
  39. String lastQueryTime = LocalDateTime.now().minusMinutes(30).format(FORMATTER);
  40. // 分页查询,每次查询100条
  41. Integer pageNo = 1;
  42. Integer pageSize = 100;
  43. log.info("查询参数 - LastQueryTime: {}, PageNo: {}, PageSize: {}", lastQueryTime, pageNo, pageSize);
  44. // 调用业务服务查询充电站信息(会自动保存到数据库)
  45. QueryStationsInfoVO result = chargingBusinessService.queryStationsInfo(lastQueryTime, pageNo, pageSize);
  46. if (result != null && result.getStationInfos() != null) {
  47. log.info("充电站信息同步完成,共同步 {} 个充电站", result.getStationInfos().size());
  48. } else {
  49. log.warn("充电站信息同步结果为空");
  50. }
  51. } catch (JsonProcessingException e) {
  52. log.error("充电站信息同步失败 - JSON处理异常", e);
  53. } catch (Exception e) {
  54. log.error("充电站信息同步失败 - 系统异常", e);
  55. }
  56. log.info("充电站信息同步定时任务执行结束");
  57. }
  58. /**
  59. * 定时同步设备价格策略信息
  60. * 每10分钟执行一次,查询所有充电桩的价格策略并存储到数据库
  61. * cron表达式: 0 10 * * * ? 表示每10分钟执行
  62. */
  63. // @Scheduled(cron = "0 */10 * * * ?")
  64. public void syncEquipmentPricePolicy() {
  65. // 检查任务是否正在执行,防止并发
  66. if (isPricePolicySyncRunning) {
  67. log.warn("价格策略同步任务正在执行中,跳过本次调度");
  68. return;
  69. }
  70. isPricePolicySyncRunning = true;
  71. log.info("开始执行设备价格策略同步定时任务");
  72. try {
  73. // 查询所有充电桩接口信息
  74. List<ThirdPartyConnectorInfo> connectorList = connectorInfoMapper.selectList(null);
  75. if (CollectionUtils.isEmpty(connectorList)) {
  76. log.warn("未查询到充电桩接口信息,跳过价格策略同步");
  77. return;
  78. }
  79. log.info("开始同步价格策略,总共 {} 个充电桩接口", connectorList.size());
  80. int successCount = 0;
  81. int failCount = 0;
  82. // 遍历每个充电桩接口,查询价格策略
  83. for (ThirdPartyConnectorInfo connector : connectorList) {
  84. try {
  85. String connectorId = connector.getConnectorId();
  86. String equipmentId = connector.getEquipmentId();
  87. log.info("查询价格策略 - connectorId: {}, equipmentId: {}", connectorId, equipmentId);
  88. // 调用业务服务查询价格策略(会自动保存到数据库)
  89. ChargingPricePolicyVO result = chargingBusinessService.queryEquipBusinessPolicy(equipmentId, connectorId);
  90. // 立即休眠10秒,避免触发第三方接口保护机制
  91. // 第三方接口可能有频率限制,需要足够的间隔时间
  92. // Thread.sleep(18000); // 10秒间隔
  93. if (result != null && result.getSuccStat() != null && result.getSuccStat() == 0) {
  94. successCount++;
  95. log.info("价格策略同步成功 - connectorId: {}, 时段数: {}", connectorId, result.getSumPeriod());
  96. } else {
  97. failCount++;
  98. log.warn("价格策略同步失败 - connectorId: {}, 失败原因: {}, 跳过该接口继续处理下一个", connectorId,
  99. result != null ? result.getFailReason() : "返回结果为空");
  100. continue; // 跳过失败的接口,继续处理下一个
  101. }
  102. // } catch (InterruptedException e) {
  103. // Thread.currentThread().interrupt();
  104. // log.error("线程休眠被中断 - connectorId: {}", connector.getConnectorId(), e);
  105. // failCount++;
  106. // break; // 中断后退出循环
  107. } catch (Exception e) {
  108. failCount++;
  109. log.error("同步价格策略失败 - connectorId: {}", connector.getConnectorId(), e);
  110. }
  111. }
  112. log.info("设备价格策略同步完成 - 成功: {}, 失败: {}, 总数: {}", successCount, failCount, connectorList.size());
  113. } catch (Exception e) {
  114. log.error("设备价格策略同步定时任务执行异常", e);
  115. } finally {
  116. // 确保任务执行完毕后释放锁
  117. isPricePolicySyncRunning = false;
  118. log.info("设备价格策略同步定时任务执行结束");
  119. }
  120. }
  121. }