为Spring批处理作业存储库以及Redshfit中SEQUEN

为Spring批处理作业存储库以及Redshfit中SEQUEN

本文介绍了使用Redshfit作为Spring批处理作业存储库以及Redshfit中SEQUENCE的替代品的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的项目中的一项要求是将spring批处理模式放置在Amazon redshift db上.
我正计划从schema-postgresql.sql开始,因为redshift基于postgres.

One of the requirements in my project is to place the spring batch schema on amazon redshift db.
I am planning to start from the schema-postgresql.sql as the base line as redshift is based on postgres.

看一下Spring批处理源代码,看起来您需要做一些事情才能使它起作用:

Looking at the spring batch source code it looks like you need to do few things to make this work:

  • 扩展JobRepositoryFactoryBean,DefaultDataFieldMaxValueIncrementerFactory.
  • 添加我自己的RedshfitMaxValueIncrementer来扩展AbstractSequenceMaxValueIncrementer

查看 redshift数据类型看起来除了用于创建作业,执行,步骤执行ID的序列之外,转换架构脚本不会有任何问题.

Looking at the redshift datatypes it does not look like I will not have any issues converting the schema script aside from sequence which used to create job,execution,step execution ids.

对于缺失序列的最佳解决方法是什么?

  1. 将这些列指定为 IDENTITY 列.从redshift的角度看,这是最简单的方法.这可能是有问题的DataFieldMaxValueIncrementer.nextLongValue()返回长且不返回Long,我们需要返回null并让IDENTITY为我们完成工作
  2. 基于BATCH_STEP_EXECUTION中的select max(STEP_EXECUTION_ID)之类的实现并执行类似于MySQLMaxValueIncrementer的扩展AbstractColumnMaxValueIncrementer
  3. 仅以Java代码创建序列;使用与之相似的工具休眠使用
  4. 上面没有提到的方法
  1. Specifies those columns as an IDENTITY column.Looks as the easiest way from the redshift point of view. This can be problematic asDataFieldMaxValueIncrementer.nextLongValue() return long and notLong and we need to return null and let IDENTITY do the job for us
  2. Implementation base on something like select max(STEP_EXECUTION_ID) from BATCH_STEP_EXECUTIONAnd doing something similar to MySQLMaxValueIncrementer that extendsAbstractColumnMaxValueIncrementer
  3. Creating the sequences in java code only; using tools similar to the oneshibernate use
  4. An approach not mentioned above

推荐答案

以下是我至少要使该部分(显然)起作用的方式:

Here's how I got at least that part to (apparently) work:

在我的DefaultBatchConfigurer子类中,添加了以下代码:

In my subclass of DefaultBatchConfigurer, I added this code:

@Override
protected JobRepository createJobRepository() throws Exception
{
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(getTransactionManager());
    factory.setIncrementerFactory(new RedshiftIncrementerFactory(dataSource));
    factory.afterPropertiesSet();
    return factory.getObject();
}

工厂对象看起来像

public class RedshiftIncrementerFactory implements DataFieldMaxValueIncrementerFactory
{
    private DataSource dataSource;

    public RedshiftIncrementerFactory(DataSource ds)
    {
        this.dataSource = ds;
    }

    @Override
    public DataFieldMaxValueIncrementer getIncrementer(String databaseType, String incrementerName)
    {
        return new RedshiftIncrementer(dataSource, incrementerName);
    }

    @Override
    public boolean isSupportedIncrementerType(String databaseType)
    {
        return POSTGRES.toString().equals(databaseType);
    }

    @Override
    public String[] getSupportedIncrementerTypes()
    {
        return new String[]{POSTGRES.toString()};
    }

}

然后是增量器本身:

public class RedshiftIncrementer extends AbstractSequenceMaxValueIncrementer
{
    public RedshiftIncrementer(DataSource dataSource, String incrementorName)
    {
        super(dataSource, incrementorName);
    }

    // I need to run two queries here, since Redshift doesn't support sequences
    @Override
    protected long getNextKey() throws DataAccessException {
        Connection con = DataSourceUtils.getConnection(getDataSource());
        Statement stmt = null;
        ResultSet rs = null;
        try {
            stmt = con.createStatement();
            DataSourceUtils.applyTransactionTimeout(stmt, getDataSource());
            String table = getIncrementerName();
            stmt.executeUpdate("UPDATE " + table + " SET ID = ID + 1");
            rs = stmt.executeQuery("SELECT ID FROM " + table + " WHERE UNIQUE_KEY='0'");
            if (rs.next()) {
                return rs.getLong(1);
            }
            else {
                throw new DataAccessResourceFailureException("Sequence query did not return a result");
            }
        }
        catch (SQLException ex) {
            throw new DataAccessResourceFailureException("Could not obtain sequence value", ex);
        }
        finally {
            JdbcUtils.closeResultSet(rs);
            JdbcUtils.closeStatement(stmt);
            DataSourceUtils.releaseConnection(con, getDataSource());
        }
    }

    @Override
    protected String getSequenceQuery()
    {
        // No longer used
        return null;
    }
}

这至少允许作业开始.但是,Redshift还有其他问题,我将在其他地方详细介绍.

This at least allows the job to start. However, there are other problems with Redshift that I will detail elsewhere.

这篇关于使用Redshfit作为Spring批处理作业存储库以及Redshfit中SEQUENCE的替代品的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-06 11:50