我试图设置一个Spring批处理ItemWriter来调用PostgreSQL中的函数来插入提供的对象。我最初尝试使用JdbcBatchItemWriter并在配置中指定SQL,但当失败时,我切换到自定义ItemWriter类以尝试进一步调试,但我得到了相同的错误。以下是我为ItemWriter设置的组件(我将要求大家假设ItemReader工作正常,因为我对它没有问题,所以我不会提供该实现的详细信息):
-源数据库表和数据:
CREATE TABLE test_user
(
test_user_sys_id numeric NOT NULL,
ssn character varying(9) NOT NULL,
create_user_id character varying(30) NOT NULL,
create_ts timestamp(6) without time zone NOT NULL DEFAULT now()
)
WITH (
OIDS=FALSE
)
DISTRIBUTED BY (ssn);
INSERT INTO test_user (test_user_sys_id, ssn, create_user_id) VALUES (1,'111111111','DataAdmin');
INSERT INTO test_user (test_user_sys_id, ssn, create_user_id) VALUES (2,'222222222','DataAdmin');
INSERT INTO test_user (test_user_sys_id, ssn, create_user_id) VALUES (3,'333333333','DataAdmin');
-目标数据库表:
CREATE TABLE test_user_result
(
test_user_result_sys_id numeric NOT NULL,
ssn character varying(9) NOT NULL,
job_id numeric NOT NULL
)
WITH (
OIDS=FALSE
)
DISTRIBUTED BY (ssn);
-保存结果的数据库函数:
CREATE OR REPLACE FUNCTION test_user_result_insert_func(
p_id NUMERIC,
p_ssn CHARACTER VARYING(9),
p_job_id NUMERIC
)
RETURNS VOID AS
$BODY$
BEGIN
INSERT INTO test_user_result (test_user_result_sys_id,ssn,job_id)
VALUES (p_id,p_ssn,p_job_id);
END;
$BODY$
LANGUAGE plpgsql VOLATILE;
-localLaunchContext.xml(环境特定信息):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<!-- Import the Spring Batch config files -->
<import resource="springBatchConfig.xml" />
<!-- Define the PostgreSQL source -->
<bean id="postgresql_dataSource"
class="org.apache.commons.dbcp.BasicDataSource">
<property name="driverClassName" value="org.postgresql.Driver"/>
<property name="url" value="jdbc:postgresql://host:port/schema"/>
<property name="username" value="user"/>
<property name="password" value="password"/>
<property name="defaultAutoCommit" value="false"/>
</bean>
<!-- Define a resourceless transaction manager for the in-memory job repository -->
<bean id="repositoryTransactionManager"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<!-- Define a transaction manager for the data source -->
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="postgresql_dataSource"/>
</bean>
<!-- Define the writer JDBC template -->
<bean id="outputJdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource" ref="postgresql_dataSource"/>
</bean>
<!-- Define in-memory job repository -->
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="repositoryTransactionManager"/>
</bean>
<!-- Define the synchronous job launcher -->
<bean id="syncJobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
</beans>
-springBatchConfig.xml(作业信息):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-3.0.xsd">
<!-- ================================================== -->
<!-- Components for TestUser Job -->
<!-- ================================================== -->
<!-- Test User Stored Procedure ItemReader -->
<bean id="testUserItemReader"
class="org.springframework.batch.item.database.StoredProcedureItemReader"
scope="step">
<property name="dataSource" ref="postgresql_dataSource" />
<property name="procedureName" value="get_user_func_no_arg" />
<property name="parameters">
<list>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="p_id_min" />
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.NUMERIC" />
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlOutParameter">
<constructor-arg index="0" value="resultCursor" />
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.OTHER" />
</constructor-arg>
</bean>
</list>
</property>
<property name="refCursorPosition" value="2" />
<property name="rowMapper">
<bean class="dao.mapper.TestUserRowMapper" />
</property>
<property name="preparedStatementSetter" ref="preparedStatementSetter" />
</bean>
<!-- ItemProcessor is not needed, since the stored procedure provides the results -->
<!-- TestUser ItemWriter -->
<bean id="testUserItemWriter" class="dao.writer.TestUserDbItemWriter" scope="step">
<property name="jdbcTemplate" ref="outputJdbcTemplate"/>
<property name="jobId" value="#{stepExecution.jobExecution.id}"/>
</bean>
<!-- TestUser Job definition -->
<batch:job id="TestUserJob" incrementer="jobParametersIncrementer">
<batch:step id="TestUser_step1">
<batch:tasklet>
<batch:chunk reader="testUserItemReader" reader-transactional-queue="true"
writer="testUserItemWriter" commit-interval="1" />
</batch:tasklet>
</batch:step>
</batch:job>
<!-- ================================================== -->
<!-- Common Beans that are used in multiple scenarios -->
<!-- ================================================== -->
<!-- Increments the Job ID -->
<bean id="jobParametersIncrementer" class="org.springframework.batch.core.launch.support.RunIdIncrementer" />
<!-- Prepared statement setter to provide Tax Year for the stored procedures. -->
<bean id="preparedStatementSetter" class="dao.setter.TestUserPreparedStatementSetter"
scope="step">
<property name="minId">
<value>#{jobParameters['minId']}</value>
</property>
</bean>
</beans>
-TestUser.java(域数据对象):
package domain;
// Data object to support a user
public class TestUser {
private int id;
private String ssn;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getSsn() {
return ssn;
}
public void setSsn(String ssn) {
this.ssn = ssn;
}
@Override
public String toString() {
return "TestUser [id=" + id + ", ssn=" + ssn + "]";
}
}
-TestUserDbItemWriter.java(自定义ItemWriter实现):
package dao.writer;
import java.util.List;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.springframework.batch.item.ItemWriter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import domain.TestUser;
public class TestUserDbItemWriter implements ItemWriter<TestUser>
{
private JdbcTemplate jdbcTemplate;
private int jobId;
@Override
public void write(final List<? extends TestUser> chunk) throws Exception {
String sql = "select test_user_result_insert_func(?, ?, ?);";
try
{
getJdbcTemplate().setSkipResultsProcessing(true);
getJdbcTemplate().setSkipUndeclaredResults(true);
getJdbcTemplate().batchUpdate(sql,
new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
TestUser testUser = chunk.get(i);
ps.setInt(1, testUser.getId());
ps.setString(2, testUser.getSsn());
ps.setInt(3, getJobId());
}
@Override
public int getBatchSize() {
return chunk.size();
}
});
}
catch(org.springframework.dao.DataIntegrityViolationException ex)
{
System.out.println("data integrity ex="+ex.getMessage());
Throwable innerex = ex.getMostSpecificCause();
if(innerex instanceof java.sql.BatchUpdateException)
{
java.sql.BatchUpdateException batchex = (java.sql.BatchUpdateException) innerex ;
SQLException current = batchex;
int count=1;
do {
System.out.println("inner ex " + count + " =" + current.getMessage());
count++;
} while ((current = current.getNextException()) != null);
}
throw ex;
}
catch(Exception ex)
{
System.out.println("ex="+ex.getMessage());
throw ex;
}
}
public JdbcTemplate getJdbcTemplate() {
return jdbcTemplate;
}
public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
public int getJobId() {
return jobId;
}
public void setJobId(int jobId) {
this.jobId = jobId;
}
}
-调用命令:
java -classpath ".;lib\*;bin" org.springframework.batch.core.launch.support.CommandLineJobRunner localLaunchContext.xml TestUserJob minId=1
运行此命令时,我从自定义ItemWriter中的println语句获得以下结果:
data integrity ex=PreparedStatementCallback; SQL [select test_user_result_insert_func(?, ?, ?);]; Batch entry 0 select test_user_result_insert_func(3, '333333333', 0) was aborted. Call getNextException to see the cause.; nested exception is java.sql.BatchUpdateException: Batch entry 0 select test_user_result_insert_func(3, '333333333', 0) was aborted. Call getNextException to see the cause.
inner ex 1 =Batch entry 0 select test_user_result_insert_func(3, '333333333', 0) was aborted. Call getNextException to see the cause.
inner ex 2 =A result was returned when none was expected.
data integrity ex=PreparedStatementCallback; SQL [select test_user_result_insert_func(?, ?, ?);]; Batch entry 0 select test_user_result_insert_func(2, '222222222', 0) was aborted. Call getNextException to see the cause.; nested exception is java.sql.BatchUpdateException: Batch entry 0 select test_user_result_insert_func(2, '222222222', 0) was aborted. Call getNextException to see the cause.
inner ex 1 =Batch entry 0 select test_user_result_insert_func(2, '222222222', 0) was aborted. Call getNextException to see the cause.
inner ex 2 =A result was returned when none was expected.
使用JDK 1.7、PostgreSQL JDBC驱动程序JAR PostgreSQL-9.3-1102.jdbc4和PostgreSQL 8.2.15(在Greenplum 4.2.8.1 build 2下)会发生这种情况
我在网上做了很多调查和搜索,到目前为止发现了以下几点:
表示这是由于batchUpdate()链中的某些内容不喜欢使用“select”,因为它不需要结果。PostgreSQL没有存储过程和函数(就像Oracle和其他数据库一样),所以我知道函数是唯一的选项
正如当前的TestUserDbItemWriter.write()方法一样,我已经在JdbcTemplate上尝试了跳过结果和未声明结果的设置,但是在行为上没有看到任何变化。
我找到了将ItemWriter使用的SQL字符串更改为使用“call”而不是“select”的建议。每当我尝试这样做时,就会得到一个SQL语法错误。
虽然这是一个简单的例子,在这个例子中,一个函数可能会被认为是多余的,而不是仅仅使用SQL来进行插入,但是我的实际应用程序将有更复杂的数据要跨多个表保存,我希望能够使用一个函数从Java代码中提取这些细节。如有任何建议,将不胜感激。
提前谢谢
最佳答案
请尝试使用getJdbcTemplate().execute(sql),而不是调用getJdbcTemplate().batchUpdate;
这对我来说至少在一个类似的要求。