wy 1 tahun lalu
induk
melakukan
bf21d7a843

+ 6 - 0
SpringBatchServiceClient/pom.xml

@@ -52,5 +52,11 @@
             <artifactId>hutool-core</artifactId>
             <version>5.8.18</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-batch</artifactId>
+            <version>2.7.3</version>
+        </dependency>
     </dependencies>
 </project>

+ 17 - 0
SpringBatchServiceClient/src/main/java/com/zswl/cloud/springBatch/cline/enhance/EnhanceJobBuilderFactory.java

@@ -0,0 +1,17 @@
+package com.zswl.cloud.springBatch.cline.enhance;
+
+import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
+import org.springframework.beans.factory.BeanFactory;
+import org.springframework.cloud.sleuth.instrument.batch.TraceJobBuilderFactory;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author wy
+ */
+@Component
+public class EnhanceJobBuilderFactory extends TraceJobBuilderFactory {
+
+    public EnhanceJobBuilderFactory(BeanFactory beanFactory, JobBuilderFactory delegate) {
+        super(beanFactory, delegate);
+    }
+}

+ 17 - 0
SpringBatchServiceClient/src/main/java/com/zswl/cloud/springBatch/cline/enhance/EnhanceStepBuilderFactory.java

@@ -0,0 +1,17 @@
+package com.zswl.cloud.springBatch.cline.enhance;
+
+import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
+import org.springframework.beans.factory.BeanFactory;
+import org.springframework.cloud.sleuth.instrument.batch.TraceStepBuilderFactory;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author wy
+ */
+@Component
+public class EnhanceStepBuilderFactory extends TraceStepBuilderFactory {
+
+    public EnhanceStepBuilderFactory(BeanFactory beanFactory, StepBuilderFactory delegate) {
+        super(beanFactory, delegate);
+    }
+}

+ 6 - 0
SpringBatchServiceServer/pom.xml

@@ -75,6 +75,12 @@
             <version>2.7.3</version>
         </dependency>
 
+        <dependency>
+            <groupId>com.zswl.cloud.SpringBatchService</groupId>
+            <artifactId>SpringBatchServiceClient</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
     </dependencies>
 
     <build>

+ 1 - 1
SpringBatchServiceServer/src/main/java/com/zswl/cloud/springBtach/server/boot/SpringBatchApplication.java

@@ -11,7 +11,7 @@ import org.springframework.context.annotation.ComponentScan;
  */
 @SpringBootApplication
 @EnableApplicationClient
-@ComponentScan("com.zswl.cloud.springBtach.server.core")
+@ComponentScan({"com.zswl.cloud.springBtach.server.core","com.zswl.cloud.springBatch.cline.enhance"})
 public class SpringBatchApplication extends ApplicationBootSuper {
 
     public static void main(String[] args) {

+ 80 - 0
SpringBatchServiceServer/src/main/java/com/zswl/cloud/springBtach/server/core/config/ChannelOrderBatchConfig.java

@@ -0,0 +1,80 @@
+package com.zswl.cloud.springBtach.server.core.config;
+
+import com.zswl.cloud.springBatch.cline.enhance.EnhanceStepBuilderFactory;
+import org.springframework.batch.core.Step;
+import org.springframework.batch.core.StepContribution;
+import org.springframework.batch.core.scope.context.ChunkContext;
+import org.springframework.batch.core.step.tasklet.Tasklet;
+import org.springframework.batch.repeat.RepeatStatus;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import javax.annotation.Resource;
+
+/**
+ * @author Wy
+ */
+@Configuration
+public class ChannelOrderBatchConfig {
+
+    @Resource
+    private EnhanceStepBuilderFactory stepBuilderFactory;
+
+    @Bean
+    public Tasklet channelOrderTaskletA(){
+        return new Tasklet() {
+            @Override
+            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
+                System.out.println("----------------channelOrderTaskletA---------------" + chunkContext.getStepContext().getJobParameters().get("name"));
+                return RepeatStatus.FINISHED;
+            }
+        };
+    }
+
+    @Bean
+    public Tasklet channelOrderTaskletB(){
+        return new Tasklet() {
+            @Override
+            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
+                System.out.println("------------channelOrderTaskletB-------------------");
+                return RepeatStatus.FINISHED;
+            }
+        };
+    }
+
+    @Bean
+    public Tasklet channelOrderTaskletC(){
+        return new Tasklet() {
+            @Override
+            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
+                System.out.println("----------------channelOrderTaskletC---------------");
+                return RepeatStatus.FINISHED;
+            }
+        };
+    }
+
+    @Bean
+    public Step channelOrderStepA(){
+        //tasklet 执行step逻辑, 类似 Thread()--->可以执行runable接口
+        return stepBuilderFactory.get("channelOrderStepA")
+                .tasklet(channelOrderTaskletA())
+                .build();
+    }
+
+    @Bean
+    public Step channelOrderStepB(){
+        //tasklet 执行step逻辑, 类似 Thread()--->可以执行runable接口
+        return stepBuilderFactory.get("channelOrderStepB")
+                .tasklet(channelOrderTaskletB())
+                .build();
+    }
+
+    @Bean
+    public Step channelOrderStepC(){
+        //tasklet 执行step逻辑, 类似 Thread()--->可以执行runable接口
+        return stepBuilderFactory.get("channelOrderStepC")
+                .tasklet(channelOrderTaskletC())
+                .build();
+    }
+
+}

+ 80 - 0
SpringBatchServiceServer/src/main/java/com/zswl/cloud/springBtach/server/core/config/OrderBatchConfig.java

@@ -0,0 +1,80 @@
+package com.zswl.cloud.springBtach.server.core.config;
+
+import com.zswl.cloud.springBatch.cline.enhance.EnhanceStepBuilderFactory;
+import org.springframework.batch.core.Step;
+import org.springframework.batch.core.StepContribution;
+import org.springframework.batch.core.scope.context.ChunkContext;
+import org.springframework.batch.core.step.tasklet.Tasklet;
+import org.springframework.batch.repeat.RepeatStatus;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import javax.annotation.Resource;
+
+/**
+ * @author wy
+ */
+@Configuration
+public class OrderBatchConfig {
+
+    @Resource
+    private EnhanceStepBuilderFactory stepBuilderFactory;
+
+    @Bean
+    public Tasklet orderTaskletA(){
+        return new Tasklet() {
+            @Override
+            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
+                // todo 具体业务调用或代码
+                System.out.println("----------------OrderBatchConfig-taskletA---------------" + chunkContext.getStepContext().getJobParameters().get("name"));
+                return RepeatStatus.FINISHED;
+            }
+        };
+    }
+
+    @Bean
+    public Tasklet orderTaskletB(){
+        return new Tasklet() {
+            @Override
+            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
+                // todo 具体业务调用或代码
+                System.out.println("------------OrderBatchConfig-taskletB-------------------");
+                return RepeatStatus.FINISHED;
+            }
+        };
+    }
+
+    @Bean
+    public Tasklet orderTaskletC(){
+        return new Tasklet() {
+            @Override
+            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
+                // todo 具体业务调用或代码
+                System.out.println("----------------OrderBatchConfig-taskletC---------------");
+                return RepeatStatus.FINISHED;
+            }
+        };
+    }
+
+    @Bean
+    public Step orderStepA(){
+        return stepBuilderFactory.get("orderStepA")
+                .tasklet(orderTaskletA())
+                .build();
+    }
+
+    @Bean
+    public Step orderStepB() {
+        return stepBuilderFactory.get("orderStepB")
+                .tasklet(orderTaskletB())
+                .build();
+    }
+
+    @Bean
+    public Step orderStepC(){
+        return stepBuilderFactory.get("orderStepC")
+                .tasklet(orderTaskletC())
+                .build();
+    }
+
+}

+ 19 - 133
SpringBatchServiceServer/src/main/java/com/zswl/cloud/springBtach/server/core/config/SpringBatchConfig.java

@@ -1,162 +1,48 @@
 package com.zswl.cloud.springBtach.server.core.config;
 
+import com.zswl.cloud.springBatch.cline.enhance.EnhanceJobBuilderFactory;
 import org.springframework.batch.core.Job;
-import org.springframework.batch.core.Step;
-import org.springframework.batch.core.StepContribution;
 import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
-import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
-import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
-import org.springframework.batch.core.job.builder.FlowBuilder;
-import org.springframework.batch.core.job.flow.Flow;
 import org.springframework.batch.core.launch.support.RunIdIncrementer;
-import org.springframework.batch.core.scope.context.ChunkContext;
-import org.springframework.batch.core.step.tasklet.Tasklet;
-import org.springframework.batch.repeat.RepeatStatus;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 import javax.annotation.Resource;
 
 /**
- * @author Wy
+ * @author wy
  */
 @EnableBatchProcessing
 @Configuration
 public class SpringBatchConfig {
 
-    //job构造工厂---用于构建job对象
     @Resource
-    private JobBuilderFactory jobBuilderFactory;
+    private EnhanceJobBuilderFactory jobBuilderFactory;
 
-    //step 构造工厂--用于构造step对象
     @Resource
-    private StepBuilderFactory stepBuilderFactory;
+    private OrderBatchConfig orderBatchConfig;
 
-    //构造一个step对象执行的任务(逻辑对象)
-    @Bean
-    public Tasklet taskletA(){
-        return new Tasklet() {
-            @Override
-            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
-                System.out.println("----------------taskletA---------------" + chunkContext.getStepContext().getJobParameters().get("name"));
-                return RepeatStatus.FINISHED;
-            }
-        };
-    }
-
-    @Bean
-    public Tasklet taskletB1(){
-        return new Tasklet() {
-            @Override
-            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
-                System.out.println("------------stepB----taskletB1---------------");
-                return RepeatStatus.FINISHED;
-            }
-        };
-    }
-
-    @Bean
-    public Tasklet taskletB2(){
-        return new Tasklet() {
-            @Override
-            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
-                System.out.println("------------stepB----taskletB2---------------");
-                return RepeatStatus.FINISHED;
-            }
-        };
-    }
-    @Bean
-    public Tasklet taskletB3(){
-        return new Tasklet() {
-            @Override
-            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
-                System.out.println("------------stepB----taskletB3---------------");
-                return RepeatStatus.FINISHED;
-            }
-        };
-    }
-
-    @Bean
-    public Tasklet taskletC(){
-        return new Tasklet() {
-            @Override
-            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
-                System.out.println("----------------taskletC---------------");
-                return RepeatStatus.FINISHED;
-            }
-        };
-    }
-
-    //构造一个step对象
-    @Bean
-    public Step stepA(){
-        //tasklet 执行step逻辑, 类似 Thread()--->可以执行runable接口
-        return stepBuilderFactory.get("stepA")
-                .tasklet(taskletA())
-                .build();
-    }
-
-    @Bean
-    public Step stepB1(){
-        //tasklet 执行step逻辑, 类似 Thread()--->可以执行runable接口
-        return stepBuilderFactory.get("stepB1")
-                .tasklet(taskletB1())
-                .build();
-    }
-
-    @Bean
-    public Step stepB2(){
-        //tasklet 执行step逻辑, 类似 Thread()--->可以执行runable接口
-        return stepBuilderFactory.get("stepB2")
-                .tasklet(taskletB2())
-                .build();
-    }
-
-    @Bean
-    public Step stepB3(){
-        //tasklet 执行step逻辑, 类似 Thread()--->可以执行runable接口
-        return stepBuilderFactory.get("stepB3")
-                .tasklet(taskletB3())
-                .build();
-    }
-
-    //构造一个流式步骤
-    @Bean
-    public Flow flowB(){
-        return new FlowBuilder<Flow>("flowB")
-                .start(stepB1())
-                .next(stepB2())
-                .next(stepB3())
-                .build();
-    }
-
-    //job 没有现有的flowStep步骤操作方法, 必须使用step进行封装之后再执行
-    @Bean
-    public Step stepB(){
-        //tasklet 执行step逻辑, 类似 Thread()--->可以执行runable接口
-        return stepBuilderFactory
-                .get("stepB")
-                .flow(flowB())
-                .build();
-    }
+    @Resource
+    private ChannelOrderBatchConfig channelOrderBatchConfig;
 
-    //构造一个step对象
     @Bean
-    public Step stepC(){
-        //tasklet 执行step逻辑, 类似 Thread()--->可以执行runable接口
-        return stepBuilderFactory.get("stepC")
-                .tasklet(taskletC())
+    public Job orderJob(){
+        return  jobBuilderFactory
+                .get("orderJob")
+                .start(orderBatchConfig.orderStepA())
+                .next(orderBatchConfig.orderStepB())
+                .next(orderBatchConfig.orderStepC())
+                .incrementer(new RunIdIncrementer())
                 .build();
     }
 
-    //如果firstStep 执行成功:下一步执行successStep 否则是failStep
     @Bean
-    public Job job(){
-        return jobBuilderFactory
-                .get("flow-step-job")
-                .start(stepA())
-                .next(stepB())
-                .next(stepC())
+    public Job chanelOrderJob(){
+        return  jobBuilderFactory
+                .get("chanelOrderJob")
+                .start(channelOrderBatchConfig.channelOrderStepA())
+                .next(channelOrderBatchConfig.channelOrderStepB())
+                .next(channelOrderBatchConfig.channelOrderStepC())
                 .incrementer(new RunIdIncrementer())
                 .build();
     }

+ 55 - 16
SpringBatchServiceServer/src/main/java/com/zswl/cloud/springBtach/server/core/controller/TestSpringBatch.java

@@ -1,12 +1,20 @@
 package com.zswl.cloud.springBtach.server.core.controller;
 
-import org.springframework.batch.core.*;
-import org.springframework.batch.core.explore.JobExplorer;
-import org.springframework.batch.core.launch.JobLauncher;
+import com.zswl.cloud.springBatch.cline.enhance.EnhanceStepBuilderFactory;
+import com.zswl.cloud.springBtach.server.core.service.JobService;
+import org.springframework.batch.core.ExitStatus;
+import org.springframework.batch.core.Step;
+import org.springframework.batch.core.StepContribution;
+import org.springframework.batch.core.job.SimpleJob;
+import org.springframework.batch.core.scope.context.ChunkContext;
+import org.springframework.batch.core.step.tasklet.Tasklet;
+import org.springframework.batch.repeat.RepeatStatus;
+import org.springframework.context.annotation.Bean;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RestController;
 
 import javax.annotation.Resource;
+import java.util.List;
 
 /**
  * @author wy
@@ -15,24 +23,55 @@ import javax.annotation.Resource;
 public class TestSpringBatch {
 
     @Resource
-    private JobLauncher launcher;
+    private JobService jobService;
 
     @Resource
-    private Job job;
+    private EnhanceStepBuilderFactory stepBuilderFactory;
 
-    @Resource
-    private JobExplorer jobExplorer;
+    @GetMapping("/job/order")
+    public ExitStatus startJob1(String name) throws Exception {
+        return jobService.orderBatch(name);
+    }
+
+    @GetMapping("/job/channelOrder")
+    public ExitStatus startJob2(String name) throws Exception {
+        return jobService.channelOrderBatch(name);
+    }
 
-    @GetMapping("/job/start")
-    public ExitStatus startJob(String name) throws Exception {
+    @Bean
+    public Tasklet tasklet() {
+        return new Tasklet() {
+            @Override
+            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
+                System.out.println("----------------taskletA---------------" + chunkContext.getStepContext().getJobParameters().get("name"));
+                return RepeatStatus.FINISHED;
+            }
+        };
+    }
 
-        //启动job作业
-        JobParameters parameters = new JobParametersBuilder(jobExplorer)
-                .getNextJobParameters(job)
-                .addString("name", name)
-                .toJobParameters();
+    public Step stepA(){
+        //tasklet 执行step逻辑, 类似 Thread()--->可以执行runable接口
+        return stepBuilderFactory.get("stepA")
+                .tasklet(new Tasklet() {
+                    @Override
+                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
+                        System.out.println("----------------hahahaha---------------");
+                        return RepeatStatus.FINISHED;
+                    }
+                })
+                .build();
+    }
 
-        JobExecution jobExet = launcher.run(job, parameters);
-        return jobExet.getExitStatus();
+    @GetMapping("/job/restart")
+    public ExitStatus restart(String name) throws Exception {
+        List<Step> stepsFromJobs = jobService.getStepsFromJob(name);
+        SimpleJob job = jobService.getJobByName(name);
+        for (Step item : stepsFromJobs) {
+            // todo 根据条件判断需不需要对默认job进行step的修改
+            if (item.getName().contains("B")) {
+                job.addStep(stepA());
+            }
+        }
+        return jobService.restJob(job, name);
     }
 }

+ 24 - 0
SpringBatchServiceServer/src/main/java/com/zswl/cloud/springBtach/server/core/service/JobRegisterService.java

@@ -0,0 +1,24 @@
+package com.zswl.cloud.springBtach.server.core.service;
+
+import org.springframework.batch.core.Job;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author wy
+ */
+@Service
+public class JobRegisterService {
+
+    private final Map<String, Job> jobMap = new ConcurrentHashMap<>();
+
+    public void registerJob(String jobName, Job job) {
+        jobMap.put(jobName, job);
+    }
+
+    public Job getJob(String jobName) {
+        return jobMap.get(jobName);
+    }
+}

+ 86 - 0
SpringBatchServiceServer/src/main/java/com/zswl/cloud/springBtach/server/core/service/JobService.java

@@ -0,0 +1,86 @@
+package com.zswl.cloud.springBtach.server.core.service;
+
+import cn.hutool.core.collection.CollectionUtil;
+import org.springframework.batch.core.*;
+import org.springframework.batch.core.explore.JobExplorer;
+import org.springframework.batch.core.job.SimpleJob;
+import org.springframework.batch.core.launch.JobLauncher;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * @author wy
+ */
+@Service
+public class JobService {
+
+    @Resource
+    private JobLauncher jobLauncher;
+
+    @Resource
+    private Job orderJob;
+
+    @Resource
+    private Job chanelOrderJob;
+
+    @Resource
+    private JobExplorer jobExplorer;
+
+    @Resource
+    private JobRegisterService jobRegisterService;
+
+//    @Autowired
+//    public void jobLauncherService(JobLauncher jobLauncher, Job orderJob, Job chanelOrderJob) {
+//        this.jobLauncher = jobLauncher;
+//        this.orderJob = orderJob;
+//        this.chanelOrderJob = chanelOrderJob;
+//    }
+
+    public ExitStatus orderBatch(String name) throws Exception {
+        JobParameters parameters = new JobParametersBuilder(jobExplorer)
+                .getNextJobParameters(orderJob)
+                .addString("name", name)
+                .toJobParameters();
+        jobRegisterService.registerJob("orderBatch", orderJob);
+        return jobLauncher.run(orderJob, parameters).getExitStatus();
+    }
+
+    public ExitStatus channelOrderBatch(String name) throws Exception {
+        JobParameters parameters = new JobParametersBuilder(jobExplorer)
+                .getNextJobParameters(chanelOrderJob)
+                .addString("name", name)
+                .toJobParameters();
+        jobRegisterService.registerJob("channelOrderBatch", chanelOrderJob);
+        return jobLauncher.run(chanelOrderJob, parameters).getExitStatus();
+    }
+
+    public ExitStatus restJob(Job job, String name) throws Exception {
+        JobParameters parameters = new JobParametersBuilder(jobExplorer)
+                .getNextJobParameters(job)
+                .addString("name", name)
+                .toJobParameters();
+        jobRegisterService.registerJob("restJob", job);
+        return jobLauncher.run(job, parameters).getExitStatus();
+    }
+
+    public SimpleJob getJobByName(String name) {
+        return (SimpleJob) jobRegisterService.getJob(name);
+    }
+
+    public List<Step> getStepsFromJob(String name) {
+        List<Step> steps = new ArrayList<>();
+        SimpleJob simpleJob = getJobByName(name);
+        Collection<String> stepNames = simpleJob.getStepNames();
+        if (CollectionUtil.isNotEmpty(stepNames)) {
+            for (String item : stepNames) {
+                steps.add(simpleJob.getStep(item));
+            }
+        }
+        return steps;
+    }
+
+}