我必须在Spring Batch作业中实现以下用例:


通过StoredProcedureItemReader阅读提供商列表
遍历列表,并为步骤1中找到的每个提供程序(作为输入参数)调用另一个StoredProcedureItemReader
第二个SP的输出将被写入CSV。


我想出了以下策略:


步骤1开始
SP ItemReader返回提供程序列表。
在ItemWriter中,将提供程序保存到ExecutionContext
步骤1结束
步骤2开始
另一个SP ItemReader从ExecutionContext访问提供程序
另一个ItemWriter使用FlatFileItemWriter将响应写入CSV


我在理解第二个SP ItemReader如何访问第一个SP ItemReader返回的提供程序列表时遇到了麻烦。分区对您有帮助吗?另外,是否有更好的策略来实现这一目标?

编辑1:

这是ItemProcessor的原始实现(作为扩展):

@Scope("step")
public class FetchReportFromProviderProcessor implements
        ItemProcessor<RiscProvider, List<SogReportRecord>>, ItemStream {

    StoredProcedureItemReader<SogReportRecord> reader = new StoredProcedureItemReader<SogReportRecord>();
    private DataSource dataSource;

    @Value("#{jobParameters['date']}")
    private String date;

    @Override
    public List<SogReportRecord> process(final RiscProvider item) throws Exception {

        SogReportRecord record = null;
        List<SogReportRecord> records = new ArrayList<SogReportRecord>();

        SqlParameter[] sqlParameters = new SqlParameter[] {new SqlParameter(OracleTypes.CURSOR)};

        reader.setParameters(sqlParameters);
        reader.setPreparedStatementSetter(new PreparedStatementSetter() {

            @Override
            public void setValues(PreparedStatement ps) throws SQLException {
                ps.setString(0, item.getPrefix());
                ps.setString(1, date);
            }
        });

        while( (record = reader.read()) != null ) {
            records.add(record);
        }

        return records;
    }

    public DataSource getDataSource() {
        return dataSource;
    }

    public void setDataSource(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @Override
    public void open(ExecutionContext executionContext)
            throws ItemStreamException {
        reader.setDataSource(dataSource);
        reader.setProcedureName("RISC_GET_DAYMOVEINOUT");
        reader.open(executionContext);
    }

    @Override
    public void update(ExecutionContext executionContext)
            throws ItemStreamException {
        reader.update(executionContext);
    }

    @Override
    public void close() throws ItemStreamException {
        reader.close();
    }

}


和XML部分:

<batch:job id="SOG_MOVEINOUT_REPORT_GENERATOR">
    <batch:step id="GET_REPORTS">
        <batch:tasklet>
            <batch:chunk reader="getProviders"
                processor="fetchRecordsFromProvider"
                writer="sogReportWriter" commit-interval="500" />
        </batch:tasklet>
    </batch:step>
</batch:job>

<!-- Reader to fetch list of providers -->
<bean id="getProviders" class="org.springframework.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource" />
    <property name="procedureName" value="RISC_GET_PROVIDER" />
    <property name="parameters">
        <list>
            <bean class="org.springframework.jdbc.core.SqlOutParameter">
                <constructor-arg index="0" value="providers" />
                <constructor-arg index="1">
                    <util:constant static-field="oracle.jdbc.OracleTypes.CURSOR" />
                </constructor-arg>
            </bean>
        </list>
    </property>
    <property name="refCursorPosition" value="1" />
    <property name="rowMapper">
        <bean class="com.kpn.risc.ProviderRowMapper" />
    </property>
</bean>

<bean id="fetchRecordsFromProvider" class="com.kpn.risc.FetchReportFromProviderProcessor">
    <property name="dataSource" ref="dataSource" />
</bean>

<bean id="sogReportWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step">
    <property name="resource" value="file:///${batch.job.report.dir}/report-#{stepExecutionContext['provider']}.csv" />
    <property name="lineAggregator">
        <bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
            <property name="fieldExtractor">
                <bean class="org.springframework.batch.item.file.transform.PassThroughFieldExtractor" />
            </property>
        </bean>
    </property>
</bean>


在上面的代码中,处理器将其工作委托给SP ItemReader。但是,在调用read()方法之前,读者无法正确初始化。是否可以或建议在ItemReader内部调用ItemProcessor

最佳答案

您所描述的内容直接属于批处理的“驱动查询”模式。本质上,查询定义了另一个查询用来驱动其查询的ID(或您的案例提供者)。通常,ItemReader读取ID,并将每个ID传递给ItemProcessor进行充实(“其他”查询)。然后将结果传递到ItemWriter

您可以在此处的Spring Batch文档的Common Batch Patterns部分中阅读有关Driving Query模式的更多信息:http://docs.spring.io/spring-batch/trunk/reference/html/patterns.html

10-06 06:25