我试图设置一个Spring批处理ItemWriter来调用PostgreSQL中的函数来插入提供的对象。我最初尝试使用JdbcBatchItemWriter并在配置中指定SQL,但当失败时,我切换到自定义ItemWriter类以尝试进一步调试,但我得到了相同的错误。以下是我为ItemWriter设置的组件(我将要求大家假设ItemReader工作正常,因为我对它没有问题,所以我不会提供该实现的详细信息):
-源数据库表和数据:

CREATE TABLE test_user
(
    test_user_sys_id numeric NOT NULL,
    ssn character varying(9) NOT NULL,
    create_user_id character varying(30) NOT NULL,
    create_ts timestamp(6) without time zone NOT NULL DEFAULT now()
)
WITH (
OIDS=FALSE
)
DISTRIBUTED BY (ssn);

INSERT INTO test_user (test_user_sys_id, ssn, create_user_id)  VALUES (1,'111111111','DataAdmin');
INSERT INTO test_user (test_user_sys_id, ssn, create_user_id)  VALUES (2,'222222222','DataAdmin');
INSERT INTO test_user (test_user_sys_id, ssn, create_user_id)  VALUES (3,'333333333','DataAdmin');

-目标数据库表:
CREATE TABLE test_user_result
(
  test_user_result_sys_id numeric NOT NULL,
  ssn character varying(9) NOT NULL,
  job_id numeric NOT NULL
)
WITH (
  OIDS=FALSE
)
DISTRIBUTED BY (ssn);

-保存结果的数据库函数:
CREATE OR REPLACE FUNCTION test_user_result_insert_func(
p_id NUMERIC,
p_ssn CHARACTER VARYING(9),
p_job_id NUMERIC
)
  RETURNS VOID AS
$BODY$

BEGIN
INSERT INTO test_user_result (test_user_result_sys_id,ssn,job_id)
VALUES (p_id,p_ssn,p_job_id);
END;
$BODY$
  LANGUAGE plpgsql VOLATILE;

-localLaunchContext.xml(环境特定信息):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="
               http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">

    <!-- Import the Spring Batch config files -->
    <import resource="springBatchConfig.xml" />

    <!-- Define the PostgreSQL source -->
    <bean id="postgresql_dataSource"
        class="org.apache.commons.dbcp.BasicDataSource">
        <property name="driverClassName" value="org.postgresql.Driver"/>
        <property name="url" value="jdbc:postgresql://host:port/schema"/>
        <property name="username" value="user"/>
        <property name="password" value="password"/>
        <property name="defaultAutoCommit" value="false"/>
    </bean>

    <!-- Define a resourceless transaction manager for the in-memory job repository -->
    <bean id="repositoryTransactionManager"
    class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

    <!-- Define a transaction manager for the data source -->
    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="postgresql_dataSource"/>
    </bean>

    <!--  Define the writer JDBC template -->
      <bean id="outputJdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
          <property name="dataSource" ref="postgresql_dataSource"/>
       </bean>

    <!-- Define in-memory job repository  -->
    <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
        <property name="transactionManager" ref="repositoryTransactionManager"/>
    </bean>

    <!-- Define the synchronous job launcher -->
    <bean id="syncJobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository" />
    </bean>

</beans>

-springBatchConfig.xml(作业信息):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:batch="http://www.springframework.org/schema/batch"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:util="http://www.springframework.org/schema/util"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
                    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
                    http://www.springframework.org/schema/batch
                    http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
                    http://www.springframework.org/schema/util
                    http://www.springframework.org/schema/util/spring-util-3.0.xsd">

    <!-- ================================================== -->
    <!-- Components for TestUser Job           -->
    <!-- ================================================== -->

    <!-- Test User Stored Procedure ItemReader -->
    <bean id="testUserItemReader"
        class="org.springframework.batch.item.database.StoredProcedureItemReader"
        scope="step">
        <property name="dataSource" ref="postgresql_dataSource" />
        <property name="procedureName" value="get_user_func_no_arg" />
        <property name="parameters">
            <list>
                <bean class="org.springframework.jdbc.core.SqlParameter">
                    <constructor-arg index="0" value="p_id_min" />
                    <constructor-arg index="1">
                        <util:constant static-field="java.sql.Types.NUMERIC" />
                    </constructor-arg>
                </bean>
                <bean class="org.springframework.jdbc.core.SqlOutParameter">
                    <constructor-arg index="0" value="resultCursor" />
                    <constructor-arg index="1">
                        <util:constant static-field="java.sql.Types.OTHER" />
                    </constructor-arg>
                </bean>
            </list>
        </property>
        <property name="refCursorPosition" value="2" />
        <property name="rowMapper">
            <bean class="dao.mapper.TestUserRowMapper" />
        </property>
        <property name="preparedStatementSetter" ref="preparedStatementSetter" />
    </bean>

    <!-- ItemProcessor is not needed, since the stored procedure provides the results -->

    <!-- TestUser ItemWriter -->
      <bean id="testUserItemWriter" class="dao.writer.TestUserDbItemWriter" scope="step">
        <property name="jdbcTemplate" ref="outputJdbcTemplate"/>
        <property name="jobId" value="#{stepExecution.jobExecution.id}"/>
    </bean>

    <!-- TestUser Job definition -->
    <batch:job id="TestUserJob" incrementer="jobParametersIncrementer">
        <batch:step id="TestUser_step1">
            <batch:tasklet>
                <batch:chunk reader="testUserItemReader" reader-transactional-queue="true"
                    writer="testUserItemWriter" commit-interval="1" />
            </batch:tasklet>
        </batch:step>
    </batch:job>

    <!-- ================================================== -->
    <!-- Common Beans that are used in multiple scenarios -->
    <!-- ================================================== -->

    <!-- Increments the Job ID -->
    <bean id="jobParametersIncrementer" class="org.springframework.batch.core.launch.support.RunIdIncrementer" />

    <!-- Prepared statement setter to provide Tax Year for the stored procedures. -->
    <bean id="preparedStatementSetter" class="dao.setter.TestUserPreparedStatementSetter"
        scope="step">
        <property name="minId">
            <value>#{jobParameters['minId']}</value>
        </property>
    </bean>
</beans>

-TestUser.java(域数据对象):
package domain;

// Data object to support a user
public class TestUser {

    private int id;
    private String ssn;
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getSsn() {
        return ssn;
    }
    public void setSsn(String ssn) {
        this.ssn = ssn;
    }

    @Override
    public String toString() {
        return "TestUser [id=" + id + ", ssn=" + ssn + "]";
    }
}

-TestUserDbItemWriter.java(自定义ItemWriter实现):
package dao.writer;

import java.util.List;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.springframework.batch.item.ItemWriter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;

import domain.TestUser;

public class TestUserDbItemWriter  implements ItemWriter<TestUser>
{
    private JdbcTemplate jdbcTemplate;
    private int jobId;

    @Override
    public void write(final List<? extends TestUser> chunk) throws Exception {

        String sql = "select test_user_result_insert_func(?, ?, ?);";
        try
        {
            getJdbcTemplate().setSkipResultsProcessing(true);
            getJdbcTemplate().setSkipUndeclaredResults(true);
            getJdbcTemplate().batchUpdate(sql,
                new BatchPreparedStatementSetter() {
                    @Override
                    public void setValues(PreparedStatement ps, int i) throws SQLException {
                        TestUser testUser = chunk.get(i);
                        ps.setInt(1, testUser.getId());
                        ps.setString(2, testUser.getSsn());
                        ps.setInt(3, getJobId());
                    }
                    @Override
                    public int getBatchSize() {
                        return chunk.size();
                    }
                });
        }
        catch(org.springframework.dao.DataIntegrityViolationException  ex)
        {
            System.out.println("data integrity ex="+ex.getMessage());
            Throwable innerex = ex.getMostSpecificCause();
            if(innerex instanceof java.sql.BatchUpdateException)
            {
                java.sql.BatchUpdateException batchex = (java.sql.BatchUpdateException) innerex ;
                SQLException current = batchex;
                int count=1;
                   do {

                       System.out.println("inner ex " + count + " =" + current.getMessage());
                       count++;

                   } while ((current = current.getNextException()) != null);
            }

            throw ex;
        }
        catch(Exception  ex)
        {
            System.out.println("ex="+ex.getMessage());
            throw ex;
        }
    }

    public JdbcTemplate getJdbcTemplate() {
        return jdbcTemplate;
    }

    public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    public int getJobId() {
        return jobId;
    }

    public void setJobId(int jobId) {
        this.jobId = jobId;
    }
}

-调用命令:
java -classpath ".;lib\*;bin" org.springframework.batch.core.launch.support.CommandLineJobRunner localLaunchContext.xml TestUserJob minId=1

运行此命令时,我从自定义ItemWriter中的println语句获得以下结果:
data integrity ex=PreparedStatementCallback; SQL [select test_user_result_insert_func(?, ?, ?);]; Batch entry 0 select test_user_result_insert_func(3, '333333333', 0) was aborted.  Call getNextException to see the cause.; nested exception is java.sql.BatchUpdateException: Batch entry 0 select test_user_result_insert_func(3, '333333333', 0) was aborted.  Call getNextException to see the cause.
inner ex 1 =Batch entry 0 select test_user_result_insert_func(3, '333333333', 0) was aborted.  Call getNextException to see the cause.
inner ex 2 =A result was returned when none was expected.
data integrity ex=PreparedStatementCallback; SQL [select test_user_result_insert_func(?, ?, ?);]; Batch entry 0 select test_user_result_insert_func(2, '222222222', 0) was aborted.  Call getNextException to see the cause.; nested exception is java.sql.BatchUpdateException: Batch entry 0 select test_user_result_insert_func(2, '222222222', 0) was aborted.  Call getNextException to see the cause.
inner ex 1 =Batch entry 0 select test_user_result_insert_func(2, '222222222', 0) was aborted.  Call getNextException to see the cause.
inner ex 2 =A result was returned when none was expected.

使用JDK 1.7、PostgreSQL JDBC驱动程序JAR PostgreSQL-9.3-1102.jdbc4和PostgreSQL 8.2.15(在Greenplum 4.2.8.1 build 2下)会发生这种情况
我在网上做了很多调查和搜索,到目前为止发现了以下几点:
表示这是由于batchUpdate()链中的某些内容不喜欢使用“select”,因为它不需要结果。PostgreSQL没有存储过程和函数(就像Oracle和其他数据库一样),所以我知道函数是唯一的选项
正如当前的TestUserDbItemWriter.write()方法一样,我已经在JdbcTemplate上尝试了跳过结果和未声明结果的设置,但是在行为上没有看到任何变化。
我找到了将ItemWriter使用的SQL字符串更改为使用“call”而不是“select”的建议。每当我尝试这样做时,就会得到一个SQL语法错误。
虽然这是一个简单的例子,在这个例子中,一个函数可能会被认为是多余的,而不是仅仅使用SQL来进行插入,但是我的实际应用程序将有更复杂的数据要跨多个表保存,我希望能够使用一个函数从Java代码中提取这些细节。如有任何建议,将不胜感激。
提前谢谢

最佳答案

请尝试使用getJdbcTemplate().execute(sql),而不是调用getJdbcTemplate().batchUpdate;
这对我来说至少在一个类似的要求。

10-08 14:28