Database
Cursor, Spring Batch, JdbcCursorItemReader, HibernateCursorItemReader, StoredProcedureItemReader, SqlPagingQueryProviderFactoryBean, JdbcPagingItemReader, JpaPagingItemReader, ItemWriter :description: 数据库批处理的独特之处在于,返回的庞大数据集需要在内存中保留的所有结果,而不是一次读取很多行。Spring Batch 通过游标和分页两种技术解决了这个问题。
游标方法:
-
使用数据库游标,允许逐行流式传输数据。
-
JdbcCursorItemReader、HibernateCursorItemReader 和 StoredProcedureItemReader 是用于此方法的 ItemReader 实现。
分页方法:
-
将结果集分成多个页面,每次查询仅获取一部分结果。
-
JdbcPagingItemReader 和 JpaPagingItemReader 是用于此方法的 ItemReader 实现。
ItemReader 的目的是以项目为单位返回数据,每个项目将在 ItemWriter 中写入。在数据库场景中,事务性写入是内置的,无需专门的 ItemWriter 实现。对于使用 Hibernate 的 ItemWriter,建议在每个项目后刷新,以避免批处理写入错误处理中的潜在问题。
与大多数企业应用样式一样,数据库是批量处理的中央存储机制。但是,由于系统必须处理的数据集非常庞大,因此批处理与其他应用样式有所不同。如果某个 SQL 语句返回 1 百万行,结果集可能在读取所有行之前一直将所有返回的结果保存在内存中。Spring Batch 为此问题提供了两种类型的解决方案:
Cursor-based ItemReader
Implementations
对于大多数批处理开发人员,使用数据库游标通常是默认方法,因为这是数据库解决“流式传输”关系数据问题的解决方案。Java ResultSet
类基本上是一种面向对象机制,用于处理游标。ResultSet
保留到数据的当前行的游标。对 ResultSet
调用 next
会将此游标移至下一行。基于游标的 Spring Batch ItemReader
实现会在初始化时打开游标,并每调用一次 read
便将游标向前移动一行,进而返回可用于处理的映射对象。然后会调用 close
方法,以确保释放所有资源。Spring 核心 JdbcTemplate
通过使用回调模式全面映射 ResultSet
中的所有行,并先于将控制权返回给方法调用方而解决了此问题。然而,在批处理中,必须等到该步骤完成。下图显示了基于游标的 ItemReader
的通用图表。请注意,尽管示例使用 SQL(因为 SQL 非常普及),但任何技术都可以实现基本方法。
此示例说明了基本模式。给定有 3 列 (ID
、NAME
和 BAR
) 的“FOO”表格,选择所有 ID 大于 1 但小于 7 的行。这会将游标的开始(第 1 行)放在 ID 2 上。此行的结果应当是完全映射的 Foo
对象。再次调用 read()
会将游标移动到下一行,即 ID 为 3 的 Foo
。每次 read
之后都会写出读取的结果,从而允许回收对象(假设没有实例变量保留到对象的引用)。
JdbcCursorItemReader
JdbcCursorItemReader
是基于游标的技术的 JDBC 实现。它直接使用 ResultSet
,并要求针对从 DataSource
获得的连接运行 SQL 语句。以下数据库架构用作示例:
CREATE TABLE CUSTOMER (
ID BIGINT IDENTITY PRIMARY KEY,
NAME VARCHAR(45),
CREDIT FLOAT
);
许多人希望为每一行使用一个域对象,因此以下示例使用 RowMapper
接口的一个实现来映射 CustomerCredit
对象:
public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {
public static final String ID_COLUMN = "id";
public static final String NAME_COLUMN = "name";
public static final String CREDIT_COLUMN = "credit";
public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
CustomerCredit customerCredit = new CustomerCredit();
customerCredit.setId(rs.getInt(ID_COLUMN));
customerCredit.setName(rs.getString(NAME_COLUMN));
customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));
return customerCredit;
}
}
由于 JdbcCursorItemReader
与 JdbcTemplate
共享 key 接口,因此查看如何通过 JdbcTemplate
读入此数据以将其与 ItemReader
进行对照的示例十分有用。就本示例的目的而言,假设 CUSTOMER
数据库中有 1000 行。第一个示例使用 JdbcTemplate
:
//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
new CustomerCreditRowMapper());
运行上述代码片段之后,customerCredits
列表便包含 1000 个 CustomerCredit
对象。在查询方法中,从 DataSource
获得一个连接,针对连接运行提供的 SQL,然后为 ResultSet
中的每一行调用 mapRow
方法。将此方法与 JdbcCursorItemReader
的方法进行对照,后者在以下示例中展示:
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
运行上述代码片段之后,计数器等于 1000。如果上述代码已将返回的 customerCredit
放入列表,则结果与使用 JdbcTemplate
示例的结果完全相同。但是,ItemReader
的一大优势在于它允许对项目进行“流式传输”。read
方法可被调用一次,该项目可由 ItemWriter
写出,然后下一个项目可使用 read
获得。这允许分批和定期提交地对项目进行读取和写入,这是高性能批处理处理的精髓。而且,它可轻松配置成注入到 Spring Batch Step
中。
- Java
-
以下示例展示如何在 Java 中将
ItemReader
注入到Step
中:
@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
return new JdbcCursorItemReaderBuilder<CustomerCredit>()
.dataSource(this.dataSource)
.name("creditReader")
.sql("select ID, NAME, CREDIT from CUSTOMER")
.rowMapper(new CustomerCreditRowMapper())
.build();
}
- XML
-
以下示例展示如何在 XML 中将
ItemReader
注入到Step
中:
<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
Additional Properties
由于有许多不同的 Java 打开游标选项,因此 JdbcCursorItemReader
上有许多可以设置的属性,如下表所述:
ignoreWarnings |
确定是记录 SQLWarnings,还是导致异常。默认值为 |
fetchSize |
当需要由 |
maxRows |
设置 |
queryTimeout |
设置驱动程序等待 |
verifyCursorPosition |
因为 |
saveState |
表明是否应将读取器的状态保存在 |
driverSupportsAbsolute |
表明 JDBC 驱动程序是否支持在 |
setUseSharedExtendedConnection |
表明是否应将用于光标的连接用于所有其他处理,从而共享相同的交易。如果将其设置为 |
HibernateCursorItemReader
正如普通 Spring 用户会对是否使用 ORMS 解决方案做出重要决定(此决定会影响他们是否使用 JdbcTemplate
或 HibernateTemplate
)一样,Spring Batch 用户也拥有相同的选择。HibernateCursorItemReader
是游标技术的一个 Hibernate 实现。Hibernate 在批处理中的使用存在相当大的争议。造成这种争议的很大一个原因是,Hibernate 起初只开发用于支持在线应用样式。但是,这不意味着它不能用于批处理。解决此问题的最简单的方法是使用 StatelessSession
,而非使用标准会话。这会移除 Hibernate 采用的所有缓存和脏检查,而这些内容可能会在批处理场景中导致问题。有关无状态 Hibernate 会话与正常 Hibernate 会话之间差异的更多信息,请参阅特定 Hibernate 发行的文档。HibernateCursorItemReader
允许声明 HQL 语句,并传递 SessionFactory
,后者会以与 JdbcCursorItemReader
基本相同的基础方式每调用一次 read
便返回该项目。以下示例配置使用与 JDBC 阅读器相同的“客户信用”示例:
HibernateCursorItemReader itemReader = new HibernateCursorItemReader();
itemReader.setQueryString("from CustomerCredit");
//For simplicity sake, assume sessionFactory already obtained.
itemReader.setSessionFactory(sessionFactory);
itemReader.setUseStatelessSession(true);
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
此已配置 ItemReader
以与 JdbcCursorItemReader
描述的完全相同的方式返回 CustomerCredit
对象,假设已为 Customer
表格正确创建了 Hibernate 映射文件。useStatelessSession
属性默认为 true,但在此添加是为了引起对打开或关闭此属性功能的注意。值得注意的是,底层游标的获取大小可使用 setFetchSize
属性进行设置。与 JdbcCursorItemReader
一样,配置十分简单。
- Java
-
以下示例展示如何在 Java 中注入 Hibernate
ItemReader
:
@Bean
public HibernateCursorItemReader itemReader(SessionFactory sessionFactory) {
return new HibernateCursorItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.sessionFactory(sessionFactory)
.queryString("from CustomerCredit")
.build();
}
- XML
-
以下示例展示如何在 XML 中注入 Hibernate
ItemReader
:
<bean id="itemReader"
class="org.springframework.batch.item.database.HibernateCursorItemReader">
<property name="sessionFactory" ref="sessionFactory" />
<property name="queryString" value="from CustomerCredit" />
</bean>
StoredProcedureItemReader
有时需要通过使用存储过程来获取游标数据。StoredProcedureItemReader
与 JdbcCursorItemReader
类似,不同之处在于,它不是通过运行查询来获取游标,而是通过运行返回游标的存储过程来获取游标。存储过程可以通过三种不同的方式返回游标:
-
作为返回
ResultSet
(由 SQL Server、Sybase、DB2、Derby 和 MySQL 使用)。 -
作为以 out 参数返回的 ref-cursor(由 Oracle 和 PostgreSQL 使用)。
-
作为存储函数调用的返回值。
- Java
-
以下 Java 示例配置使用与前面示例相同的“客户信用”示例:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
return reader;
}
- XML
-
以下 XML 示例配置使用与前面示例相同的“客户信用”示例:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
前面的示例依赖存储过程提供 ResultSet
作为返回结果(前面的选项 1)。
如果存储过程返回 ref-cursor
(选项 2),那么我们需要提供返回的 ref-cursor
的 out 参数的位置。
- Java
-
以下示例显示如何在第一部分使用 ref-cursor:
- Java
-
Java Configuration
@Bean public StoredProcedureItemReader reader(DataSource dataSource) { StoredProcedureItemReader reader = new StoredProcedureItemReader(); reader.setDataSource(dataSource); reader.setProcedureName("sp_customer_credit"); reader.setRowMapper(new CustomerCreditRowMapper()); reader.setRefCursorPosition(1); return reader; }
- XML
-
以下示例显示如何在 XML 中使用作为 ref-cursor 的第一部分:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
如果游标是通过存储函数返回的(选项 3),则需要将属性“[role="maroon"][.maroon]function”设置为 true
。它的默认值为 false
。
- Java
-
以下示例显示如何在 Java 中将属性设置为
true
:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setFunction(true);
return reader;
}
- XML
-
以下示例显示如何在 XML 中将属性设置为
true
:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="function" value="true"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
在所有情况下,我们需要定义 RowMapper
,以及 DataSource
和实际的步骤名称。
如果存储过程或函数接收参数,那么必须使用 parameters
属性对其进行声明和设置。以下示例针对 Oracle 声明三个参数。第一个是返回 ref-cursor 的 out
参数,第二个和第三个是接收类型为 INTEGER
值的 in 参数。
- Java
-
以下示例显示如何在 Java 中使用参数:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
List<SqlParameter> parameters = new ArrayList<>();
parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
parameters.add(new SqlParameter("amount", Types.INTEGER);
parameters.add(new SqlParameter("custId", Types.INTEGER);
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("spring.cursor_func");
reader.setParameters(parameters);
reader.setRefCursorPosition(1);
reader.setRowMapper(rowMapper());
reader.setPreparedStatementSetter(parameterSetter());
return reader;
}
- XML
-
以下示例显示如何在 XML 中使用参数:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="spring.cursor_func"/>
<property name="parameters">
<list>
<bean class="org.springframework.jdbc.core.SqlOutParameter">
<constructor-arg index="0" value="newid"/>
<constructor-arg index="1">
<util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="amount"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="custid"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
</list>
</property>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper" ref="rowMapper"/>
<property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>
除了参数声明之外,我们需要指定一个 PreparedStatementSetter`实现,用于设置调用的参数值。它的作用与上述 `JdbcCursorItemReader`相同。Additional Properties中列出的所有其他属性同样适用于 `StoredProcedureItemReader
。
Paging ItemReader
Implementations
使用数据库游标的一个替代方法是运行多个查询,其中每个查询都获取一部分结果。我们称这部分为一页。每个查询都必须指定开始行号和我们希望在页面中返回的行数。
JdbcPagingItemReader
ItemReader
分页实现之一是 JdbcPagingItemReader
。JdbcPagingItemReader
需要一个 PagingQueryProvider
负责提供用于检索构成一页的行 SQL 查询。由于每个数据库都有自己的分页支持策略,因此我们需要为每个支持的数据库类型使用不同的 PagingQueryProvider
。还有 SqlPagingQueryProviderFactoryBean
可以自动检测正在使用的数据库并确定合适的 PagingQueryProvider
实现。这简化了配置,是推荐的最佳实践。
SqlPagingQueryProviderFactoryBean
要求您指定 select
子句和 from
子句。您还可以提供可选的 WHERE 子句。这些子句和必需的 sortKey
用于生成 SQL 语句。
重要的是在 |
在打开阅读器以后,它会像任何其他 ItemReader
一样以同样的基本方式在每次调用 read
时回传一个项目。当需要其他行时,分页会在后台进行。
- Java
-
以下 Java 示例配置使用类似于前面所示基于游标的
ItemReaders
的“客户信用”示例:
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "NEW");
return new JdbcPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(customerCreditMapper())
.pageSize(1000)
.build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setSelectClause("select id, name, credit");
provider.setFromClause("from customer");
provider.setWhereClause("where status=:status");
provider.setSortKey("id");
return provider;
}
- XML
-
以下 XML 示例配置使用类似于前面所示基于游标的
ItemReaders
的“客户信用”示例:
<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="queryProvider">
<bean class="org.spr...SqlPagingQueryProviderFactoryBean">
<property name="selectClause" value="select id, name, credit"/>
<property name="fromClause" value="from customer"/>
<property name="whereClause" value="where status=:status"/>
<property name="sortKey" value="id"/>
</bean>
</property>
<property name="parameterValues">
<map>
<entry key="status" value="NEW"/>
</map>
</property>
<property name="pageSize" value="1000"/>
<property name="rowMapper" ref="customerMapper"/>
</bean>
此配置的 ItemReader
使用必须指定的 RowMapper
返回 CustomerCredit
对象。“pageSize”属性确定每次查询运行中从数据库中读取的实体数量。
“parameterValues”属性可用于指定用于查询的参数值的 Map
。如果您在 where
子句中使用命名参数,则每个条目的键应与命名参数的名称相匹配。如果您使用传统 “?”占位符,则每个条目的键应是占位符的编号,从 1 开始。
JpaPagingItemReader
分页 ItemReader
的另一个实现是 JpaPagingItemReader
。JPA 没有类似于 Hibernate StatelessSession
的概念,因此我们必须使用 JPA 规范提供的不其他功能。由于 JPA 支持分页,因此在使用 JPA 进行批处理时这是自然的。在读取每一页后,实体将被分离,并清除持久化上下文,以便在处理该页后将实体进行垃圾回收。
JpaPagingItemReader
允许您声明一个 JPQL 语句并传入一个 EntityManagerFactory
。然后它以与任何其他 ItemReader
相同的基本方式逐个项目传递回读取内容。当需要其他实体时,分页在后台发生。
- Java
-
以下 Java 示例配置使用与之前所示 JDBC 读取器相同的“客户信用”示例:
@Bean
public JpaPagingItemReader itemReader() {
return new JpaPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.entityManagerFactory(entityManagerFactory())
.queryString("select c from CustomerCredit c")
.pageSize(1000)
.build();
}
- XML
-
以下 XML 示例配置使用与之前所示 JDBC 读取器相同的“客户信用”示例:
<bean id="itemReader" class="org.spr...JpaPagingItemReader">
<property name="entityManagerFactory" ref="entityManagerFactory"/>
<property name="queryString" value="select c from CustomerCredit c"/>
<property name="pageSize" value="1000"/>
</bean>
此配置的 ItemReader
以与上面为 JdbcPagingItemReader
描述的完全相同的方式返回 CustomerCredit
对象,假设 CustomerCredit
对象有正确的 JPA 注释或 ORM 映射文件。“pageSize”属性确定每次查询执行中从数据库中读取的实体数量。
Database ItemWriters
尽管平面文件和 XML 文件都有特定的 ItemWriter
实例,但在数据库世界中没有完全等效的实例。这是因为事务提供了所有必需的功能。ItemWriter
实现对于文件是必需的,因为它们必须表现得好像它们是事务性的,在适当的时间跟踪已写入的项目并刷新或清除。数据库不需要此功能,因为该写入已经包含在事务中。用户可以创建自己的实现 ItemWriter
接口的 DAO,或从为通用处理问题编写的自定义 ItemWriter
中使用一个。无论哪种方式,它们都应该毫无问题地工作。需要注意的一件事是批量输出所提供的性能和错误处理功能。在使用 Hibernate 作为 ItemWriter
时这种情况最常见,但在使用 JDBC 批处理模式时也可能遇到相同的问题。假设我们小心刷新并且数据中没有错误,则批量数据库输出没有任何固有的缺陷。但是,任何写入错误都可能导致混淆,因为没有办法知道是哪个单独的项目导致异常,甚至没有办法知道是否有任何单独的项目是该异常的根源,如下面的图像所示:
如果在写入之前对项目进行缓冲,则任何错误都不会被抛出,直到在提交之前刷新缓冲区。例如,假设每个块写入 20 个项目,并且第 15 个项目抛出 DataIntegrityViolationException
。就 Step
而言,所有 20 个项目都成功写入,因为在实际写入之前无法知道发生错误。一旦调用 Session#flush()
,缓冲区就被清空并且遇到了异常。此时,Step
无能为力。必须回滚事务。通常,此异常可能导致跳过该项目(取决于跳过/重试策略),然后不再写入它。但是,在批处理场景中,没有办法知道哪个项目导致了该问题。发生故障时,整个缓冲区正在被写入。解决此问题的唯一方法是在每个项目后刷新,如下面的图像所示:
这是一个常见的用例,尤其是在使用 Hibernate 时,并且 ItemWriter
实现的简单指导原则是对 write()
的每个调用进行刷新。这样做允许可靠地跳过项目,Spring Batch 在错误后会内部处理对 ItemWriter
调用的粒度。