我正在尝试通过SCDF运行Spring批处理jar。我在读写时都使用了不同的数据源(两个Oracle DB)。我用来编写的dataSource是主要数据源。我使用自定义构建SCDF来包含oracle驱动程序依赖项。以下是自定义SCDF项目的位置。

dataflow-server-22x

在我的本地Spring批处理项目中,我实现了DefaultTaskConfigurer以提供主要数据源。因此,当我从IDE运行Batch项目时,该项目运行良好,并且它从辅助数据源读取记录并写入主数据源。但是,当我将批处理jar部署为自定义构建SCDF作为任务并启动它时,出现错误消息,

org.springframework.context.ApplicationContextException: Failed to start bean 'taskLifecycleListener'; nested exception is java.lang.IllegalArgumentException: Invalid TaskExecution, ID 3 not found


当我检查任务执行表(可以通过主数据源访问)时,该表中就有任务执行ID。但我仍然收到此错误。对于每次运行,都会将新任务ID插入Task_execution表中,但会收到上述错误消息以及新插入的task_execution ID。
以下是项目的具体信息:

Spring-boot-starter-parent : 2.2.5.RELEASE.
Spring-cloud-dataflow : 2.2.0.RELEASE.


我使用批处理作业类的实例从Boot的主类加载所有Batch_jobs,并且仅主类(启动所有作业)包含@EnableTask批注。下面是我的课程结构。

    @SpringBootApplication
    @EnableScheduling
    @EnableTask
    public class SpringBootMainApplication{
        @Autowired
        Job1Loader job1Loader;

        public static void main(String[] args) {
            SpringApplication.run(SpringBootMainApplication.class, args);
        }

        @Scheduled(cron = "0 */1 * * * ?")
        public void executeJob1Loader() throws Exception
        {
            JobParameters param = new JobParametersBuilder()
                                        .addString("JobID",
                                     String.valueOf(System.currentTimeMillis()))
                                        .toJobParameters();
            jobLauncher.run(job1Loader.loadJob1(), param);
        }
    }

    //Job Config
    @Configuration
    @EnableBatchProcessing
    public class Job1Loader {
    @Bean
        public Job loadJob1()
        {
            return jobBuilderFactory().get("JOb1Loader")
                .incrementer(new RunIdIncrementer())
                .flow(step01())
                .end()
                .build();;//return job
    }


我在Spring工作项目中使用了两个不同的数据源,它们都是oracle数据源(不同的服务器)。我将其中一个标记为主要,并在我的“ DefaultTaskConfigurer”自定义实现中使用了该数据源。

@Configuration
public class TaskConfig extends DefaultTaskConfigurer {
    @Autowired
    DatabaseConfig databaseConfig;
    @Override
    public DataSource getTaskDataSource() {
        return databaseConfig.dataSource();//dataSource() returns the
primary ds
    }
}


以下是我在SCDF定制服务程序和Spring Batch项目中使用的属性。

更新-1

**Spring batch Job :**
 spring.datasource.jdbc-url=jdbc:oracle:thin:@**MY_PRIMARY_DB**
 spring.datasource.username=db_user
 spring.datasource.password=db_pwd
 spring.datasource.driver-class-name=oracle.jdbc.OracleDriver

spring.datasource.jdbc-url=jdbc:oracle:thin:@**MY_SECONDARY_DB**
 spring.datasource.username=db_user
 spring.datasource.password=db_pwd
 spring.datasource.driver-class-name=oracle.jdbc.OracleDriver

**SCDF custom Server:**
 spring.datasource.url=jdbc:oracle:thin:@**MY_PRIMARY_DB**
 spring.datasource.username=db_user
 spring.datasource.password=db_pwd
 spring.datasource.driver-class-name=oracle.jdbc.OracleDriver


我的批处理应用程序使用两个数据库配置。一读一写。
因为源和目的地不同。
由于TASK_EXECUTION表是在MY_PRIMARY_DB数据库中创建的,因此我仅传递主db配置供SCDF读取和写入。因为读写发生在同一个数据库中。

我针对这个问题尝试了其他答案,但是没有一个有效。正如我之前所说,对此的任何投入都会有很大帮助。
谢谢。

最佳答案

我没有像上面所做的那样覆盖DefaultTaskConfigurer.getTaskDataSource()方法,而是更改了DefaultTaskConfigurer实现,如下所示。我还不确定为什么重写方法getTaskDataSource()会导致问题。以下是对我有用的解决方案。

@Configuration
public class TaskConfig extends DefaultTaskConfigurer
{

    Logger logger = LoggerFactory.getLogger(TaskConfig.class);

    Autowired
    public TaskConfig(@Qualifier("datasource1") DataSource dataSource) {
        super(dataSource); //"datasource1" is reference to the primary datasource.
    }
}

08-07 12:21