Database
Cursor, Spring Batch, JdbcCursorItemReader, HibernateCursorItemReader, StoredProcedureItemReader, SqlPagingQueryProviderFactoryBean, JdbcPagingItemReader, JpaPagingItemReader, ItemWriter :description: 数据库批处理的独特之处在于,返回的庞大数据集需要在内存中保留的所有结果,而不是一次读取很多行。Spring Batch 通过游标和分页两种技术解决了这个问题。
游标方法:
-
使用数据库游标,允许逐行流式传输数据。
-
JdbcCursorItemReader、HibernateCursorItemReader 和 StoredProcedureItemReader 是用于此方法的 ItemReader 实现。
分页方法:
-
将结果集分成多个页面,每次查询仅获取一部分结果。
-
JdbcPagingItemReader 和 JpaPagingItemReader 是用于此方法的 ItemReader 实现。
ItemReader 的目的是以项目为单位返回数据,每个项目将在 ItemWriter 中写入。在数据库场景中,事务性写入是内置的,无需专门的 ItemWriter 实现。对于使用 Hibernate 的 ItemWriter,建议在每个项目后刷新,以避免批处理写入错误处理中的潜在问题。
与大多数企业应用样式一样,数据库是批量处理的中央存储机制。但是,由于系统必须处理的数据集非常庞大,因此批处理与其他应用样式有所不同。如果某个 SQL 语句返回 1 百万行,结果集可能在读取所有行之前一直将所有返回的结果保存在内存中。Spring Batch 为此问题提供了两种类型的解决方案:
Like most enterprise application styles, a database is the central storage mechanism for batch. However, batch differs from other application styles due to the sheer size of the datasets with which the system must work. If a SQL statement returns 1 million rows, the result set probably holds all returned results in memory until all rows have been read. Spring Batch provides two types of solutions for this problem:
Cursor-based ItemReader
Implementations
对于大多数批处理开发人员,使用数据库游标通常是默认方法,因为这是数据库解决“流式传输”关系数据问题的解决方案。Java ResultSet
类基本上是一种面向对象机制,用于处理游标。ResultSet
保留到数据的当前行的游标。对 ResultSet
调用 next
会将此游标移至下一行。基于游标的 Spring Batch ItemReader
实现会在初始化时打开游标,并每调用一次 read
便将游标向前移动一行,进而返回可用于处理的映射对象。然后会调用 close
方法,以确保释放所有资源。Spring 核心 JdbcTemplate
通过使用回调模式全面映射 ResultSet
中的所有行,并先于将控制权返回给方法调用方而解决了此问题。然而,在批处理中,必须等到该步骤完成。下图显示了基于游标的 ItemReader
的通用图表。请注意,尽管示例使用 SQL(因为 SQL 非常普及),但任何技术都可以实现基本方法。
Using a database cursor is generally the default approach of most batch developers,
because it is the database’s solution to the problem of 'streaming' relational data. The
Java ResultSet
class is essentially an object oriented mechanism for manipulating a
cursor. A ResultSet
maintains a cursor to the current row of data. Calling next
on a
ResultSet
moves this cursor to the next row. The Spring Batch cursor-based ItemReader
implementation opens a cursor on initialization and moves the cursor forward one row for
every call to read
, returning a mapped object that can be used for processing. The
close
method is then called to ensure all resources are freed up. The Spring core
JdbcTemplate
gets around this problem by using the callback pattern to completely map
all rows in a ResultSet
and close before returning control back to the method caller.
However, in batch, this must wait until the step is complete. The following image shows a
generic diagram of how a cursor-based ItemReader
works. Note that, while the example
uses SQL (because SQL is so widely known), any technology could implement the basic
approach.
此示例说明了基本模式。给定有 3 列 (ID
、NAME
和 BAR
) 的“FOO”表格,选择所有 ID 大于 1 但小于 7 的行。这会将游标的开始(第 1 行)放在 ID 2 上。此行的结果应当是完全映射的 Foo
对象。再次调用 read()
会将游标移动到下一行,即 ID 为 3 的 Foo
。每次 read
之后都会写出读取的结果,从而允许回收对象(假设没有实例变量保留到对象的引用)。
This example illustrates the basic pattern. Given a 'FOO' table, which has three columns:
ID
, NAME
, and BAR
, select all rows with an ID greater than 1 but less than 7. This
puts the beginning of the cursor (row 1) on ID 2. The result of this row should be a
completely mapped Foo
object. Calling read()
again moves the cursor to the next row,
which is the Foo
with an ID of 3. The results of these reads are written out after each
read
, allowing the objects to be garbage collected (assuming no instance variables are
maintaining references to them).
JdbcCursorItemReader
JdbcCursorItemReader
是基于游标的技术的 JDBC 实现。它直接使用 ResultSet
,并要求针对从 DataSource
获得的连接运行 SQL 语句。以下数据库架构用作示例:
JdbcCursorItemReader
is the JDBC implementation of the cursor-based technique. It works
directly with a ResultSet
and requires an SQL statement to run against a connection
obtained from a DataSource
. The following database schema is used as an example:
CREATE TABLE CUSTOMER (
ID BIGINT IDENTITY PRIMARY KEY,
NAME VARCHAR(45),
CREDIT FLOAT
);
许多人希望为每一行使用一个域对象,因此以下示例使用 RowMapper
接口的一个实现来映射 CustomerCredit
对象:
Many people prefer to use a domain object for each row, so the following example uses an
implementation of the RowMapper
interface to map a CustomerCredit
object:
public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {
public static final String ID_COLUMN = "id";
public static final String NAME_COLUMN = "name";
public static final String CREDIT_COLUMN = "credit";
public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
CustomerCredit customerCredit = new CustomerCredit();
customerCredit.setId(rs.getInt(ID_COLUMN));
customerCredit.setName(rs.getString(NAME_COLUMN));
customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));
return customerCredit;
}
}
由于 JdbcCursorItemReader
与 JdbcTemplate
共享 key 接口,因此查看如何通过 JdbcTemplate
读入此数据以将其与 ItemReader
进行对照的示例十分有用。就本示例的目的而言,假设 CUSTOMER
数据库中有 1000 行。第一个示例使用 JdbcTemplate
:
Because JdbcCursorItemReader
shares key interfaces with JdbcTemplate
, it is useful to
see an example of how to read in this data with JdbcTemplate
, in order to contrast it
with the ItemReader
. For the purposes of this example, assume there are 1,000 rows in
the CUSTOMER
database. The first example uses JdbcTemplate
:
//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
new CustomerCreditRowMapper());
运行上述代码片段之后,customerCredits
列表便包含 1000 个 CustomerCredit
对象。在查询方法中,从 DataSource
获得一个连接,针对连接运行提供的 SQL,然后为 ResultSet
中的每一行调用 mapRow
方法。将此方法与 JdbcCursorItemReader
的方法进行对照,后者在以下示例中展示:
After running the preceding code snippet, the customerCredits
list contains 1,000
CustomerCredit
objects. In the query method, a connection is obtained from the
DataSource
, the provided SQL is run against it, and the mapRow
method is called for
each row in the ResultSet
. Contrast this with the approach of the
JdbcCursorItemReader
, shown in the following example:
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
运行上述代码片段之后,计数器等于 1000。如果上述代码已将返回的 customerCredit
放入列表,则结果与使用 JdbcTemplate
示例的结果完全相同。但是,ItemReader
的一大优势在于它允许对项目进行“流式传输”。read
方法可被调用一次,该项目可由 ItemWriter
写出,然后下一个项目可使用 read
获得。这允许分批和定期提交地对项目进行读取和写入,这是高性能批处理处理的精髓。而且,它可轻松配置成注入到 Spring Batch Step
中。
After running the preceding code snippet, the counter equals 1,000. If the code above had
put the returned customerCredit
into a list, the result would have been exactly the
same as with the JdbcTemplate
example. However, the big advantage of the ItemReader
is that it allows items to be 'streamed'. The read
method can be called once, the item
can be written out by an ItemWriter
, and then the next item can be obtained with
read
. This allows item reading and writing to be done in 'chunks' and committed
periodically, which is the essence of high performance batch processing. Furthermore, it
is easily configured for injection into a Spring Batch Step
.
- Java
-
以下示例展示如何在 Java 中将
ItemReader
注入到Step
中:
The following example shows how to inject an ItemReader
into a Step
in Java:
@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
return new JdbcCursorItemReaderBuilder<CustomerCredit>()
.dataSource(this.dataSource)
.name("creditReader")
.sql("select ID, NAME, CREDIT from CUSTOMER")
.rowMapper(new CustomerCreditRowMapper())
.build();
}
- XML
-
以下示例展示如何在 XML 中将
ItemReader
注入到Step
中:
The following example shows how to inject an ItemReader
into a Step
in XML:
<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
Additional Properties
由于有许多不同的 Java 打开游标选项,因此 JdbcCursorItemReader
上有许多可以设置的属性,如下表所述:
Because there are so many varying options for opening a cursor in Java, there are many
properties on the JdbcCursorItemReader
that can be set, as described in the following
table:
ignoreWarnings |
Determines whether or not SQLWarnings are logged or cause an exception.
The default is |
fetchSize |
Gives the JDBC driver a hint as to the number of rows that should be fetched
from the database when more rows are needed by the |
maxRows |
Sets the limit for the maximum number of rows the underlying |
queryTimeout |
Sets the number of seconds the driver waits for a |
verifyCursorPosition |
Because the same |
saveState |
Indicates whether or not the reader’s state should be saved in the
|
driverSupportsAbsolute |
Indicates whether the JDBC driver supports
setting the absolute row on a |
setUseSharedExtendedConnection |
Indicates whether the connection
used for the cursor should be used by all other processing, thus sharing the same
transaction. If this is set to |
HibernateCursorItemReader
正如普通 Spring 用户会对是否使用 ORMS 解决方案做出重要决定(此决定会影响他们是否使用 JdbcTemplate
或 HibernateTemplate
)一样,Spring Batch 用户也拥有相同的选择。HibernateCursorItemReader
是游标技术的一个 Hibernate 实现。Hibernate 在批处理中的使用存在相当大的争议。造成这种争议的很大一个原因是,Hibernate 起初只开发用于支持在线应用样式。但是,这不意味着它不能用于批处理。解决此问题的最简单的方法是使用 StatelessSession
,而非使用标准会话。这会移除 Hibernate 采用的所有缓存和脏检查,而这些内容可能会在批处理场景中导致问题。有关无状态 Hibernate 会话与正常 Hibernate 会话之间差异的更多信息,请参阅特定 Hibernate 发行的文档。HibernateCursorItemReader
允许声明 HQL 语句,并传递 SessionFactory
,后者会以与 JdbcCursorItemReader
基本相同的基础方式每调用一次 read
便返回该项目。以下示例配置使用与 JDBC 阅读器相同的“客户信用”示例:
Just as normal Spring users make important decisions about whether or not to use ORM
solutions, which affect whether or not they use a JdbcTemplate
or a
HibernateTemplate
, Spring Batch users have the same options.
HibernateCursorItemReader
is the Hibernate implementation of the cursor technique.
Hibernate’s usage in batch has been fairly controversial. This has largely been because
Hibernate was originally developed to support online application styles. However, that
does not mean it cannot be used for batch processing. The easiest approach for solving
this problem is to use a StatelessSession
rather than a standard session. This removes
all of the caching and dirty checking Hibernate employs and that can cause issues in a
batch scenario. For more information on the differences between stateless and normal
hibernate sessions, refer to the documentation of your specific hibernate release. The
HibernateCursorItemReader
lets you declare an HQL statement and pass in a
SessionFactory
, which will pass back one item per call to read in the same basic
fashion as the JdbcCursorItemReader
. The following example configuration uses the same
'customer credit' example as the JDBC reader:
HibernateCursorItemReader itemReader = new HibernateCursorItemReader();
itemReader.setQueryString("from CustomerCredit");
//For simplicity sake, assume sessionFactory already obtained.
itemReader.setSessionFactory(sessionFactory);
itemReader.setUseStatelessSession(true);
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
此已配置 ItemReader
以与 JdbcCursorItemReader
描述的完全相同的方式返回 CustomerCredit
对象,假设已为 Customer
表格正确创建了 Hibernate 映射文件。useStatelessSession
属性默认为 true,但在此添加是为了引起对打开或关闭此属性功能的注意。值得注意的是,底层游标的获取大小可使用 setFetchSize
属性进行设置。与 JdbcCursorItemReader
一样,配置十分简单。
This configured ItemReader
returns CustomerCredit
objects in the exact same manner
as described by the JdbcCursorItemReader
, assuming hibernate mapping files have been
created correctly for the Customer
table. The 'useStatelessSession' property defaults
to true but has been added here to draw attention to the ability to switch it on or off.
It is also worth noting that the fetch size of the underlying cursor can be set with the
setFetchSize
property. As with JdbcCursorItemReader
, configuration is
straightforward.
- Java
-
以下示例展示如何在 Java 中注入 Hibernate
ItemReader
:
The following example shows how to inject a Hibernate ItemReader
in Java:
@Bean
public HibernateCursorItemReader itemReader(SessionFactory sessionFactory) {
return new HibernateCursorItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.sessionFactory(sessionFactory)
.queryString("from CustomerCredit")
.build();
}
- XML
-
以下示例展示如何在 XML 中注入 Hibernate
ItemReader
:
The following example shows how to inject a Hibernate ItemReader
in XML:
<bean id="itemReader"
class="org.springframework.batch.item.database.HibernateCursorItemReader">
<property name="sessionFactory" ref="sessionFactory" />
<property name="queryString" value="from CustomerCredit" />
</bean>
StoredProcedureItemReader
有时需要通过使用存储过程来获取游标数据。StoredProcedureItemReader
与 JdbcCursorItemReader
类似,不同之处在于,它不是通过运行查询来获取游标,而是通过运行返回游标的存储过程来获取游标。存储过程可以通过三种不同的方式返回游标:
Sometimes it is necessary to obtain the cursor data by using a stored procedure. The
StoredProcedureItemReader
works like the JdbcCursorItemReader
, except that, instead
of running a query to obtain a cursor, it runs a stored procedure that returns a cursor.
The stored procedure can return the cursor in three different ways:
-
As a returned
ResultSet
(used by SQL Server, Sybase, DB2, Derby, and MySQL). -
As a ref-cursor returned as an out parameter (used by Oracle and PostgreSQL).
-
As the return value of a stored function call.
- Java
-
以下 Java 示例配置使用与前面示例相同的“客户信用”示例:
The following Java example configuration uses the same 'customer credit' example as earlier examples:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
return reader;
}
- XML
-
以下 XML 示例配置使用与前面示例相同的“客户信用”示例:
The following XML example configuration uses the same 'customer credit' example as earlier examples:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
前面的示例依赖存储过程提供 ResultSet
作为返回结果(前面的选项 1)。
The preceding example relies on the stored procedure to provide a ResultSet
as a
returned result (option 1 from earlier).
如果存储过程返回 ref-cursor
(选项 2),那么我们需要提供返回的 ref-cursor
的 out 参数的位置。
If the stored procedure returned a ref-cursor
(option 2), then we would need to provide
the position of the out parameter that is the returned ref-cursor
.
- Java
-
以下示例显示如何在第一部分使用 ref-cursor:
The following example shows how to work with the first parameter being a ref-cursor in
- Java
-
Java Configuration
@Bean public StoredProcedureItemReader reader(DataSource dataSource) { StoredProcedureItemReader reader = new StoredProcedureItemReader(); reader.setDataSource(dataSource); reader.setProcedureName("sp_customer_credit"); reader.setRowMapper(new CustomerCreditRowMapper()); reader.setRefCursorPosition(1); return reader; }
- XML
-
以下示例显示如何在 XML 中使用作为 ref-cursor 的第一部分:
The following example shows how to work with the first parameter being a ref-cursor in XML:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
如果游标是通过存储函数返回的(选项 3),则需要将属性“[role="maroon"][.maroon]function”设置为 true
。它的默认值为 false
。
If the cursor was returned from a stored function (option 3), we would need to set the
property "[role="maroon"][.maroon]function" to true
. It defaults to false
.
- Java
-
以下示例显示如何在 Java 中将属性设置为
true
:
The following example shows property to true
in Java:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setFunction(true);
return reader;
}
- XML
-
以下示例显示如何在 XML 中将属性设置为
true
:
The following example shows property to true
in XML:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="function" value="true"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
在所有情况下,我们需要定义 RowMapper
,以及 DataSource
和实际的步骤名称。
In all of these cases, we need to define a RowMapper
as well as a DataSource
and the
actual procedure name.
如果存储过程或函数接收参数,那么必须使用 parameters
属性对其进行声明和设置。以下示例针对 Oracle 声明三个参数。第一个是返回 ref-cursor 的 out
参数,第二个和第三个是接收类型为 INTEGER
值的 in 参数。
If the stored procedure or function takes in parameters, then they must be declared and
set by using the parameters
property. The following example, for Oracle, declares three
parameters. The first one is the out
parameter that returns the ref-cursor, and the
second and third are in parameters that takes a value of type INTEGER
.
- Java
-
以下示例显示如何在 Java 中使用参数:
The following example shows how to work with parameters in Java:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
List<SqlParameter> parameters = new ArrayList<>();
parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
parameters.add(new SqlParameter("amount", Types.INTEGER);
parameters.add(new SqlParameter("custId", Types.INTEGER);
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("spring.cursor_func");
reader.setParameters(parameters);
reader.setRefCursorPosition(1);
reader.setRowMapper(rowMapper());
reader.setPreparedStatementSetter(parameterSetter());
return reader;
}
- XML
-
以下示例显示如何在 XML 中使用参数:
The following example shows how to work with parameters in XML:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="spring.cursor_func"/>
<property name="parameters">
<list>
<bean class="org.springframework.jdbc.core.SqlOutParameter">
<constructor-arg index="0" value="newid"/>
<constructor-arg index="1">
<util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="amount"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="custid"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
</list>
</property>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper" ref="rowMapper"/>
<property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>
除了参数声明之外,我们需要指定一个 PreparedStatementSetter`实现,用于设置调用的参数值。它的作用与上述 `JdbcCursorItemReader`相同。Additional Properties中列出的所有其他属性同样适用于 `StoredProcedureItemReader
。
In addition to the parameter declarations, we need to specify a PreparedStatementSetter
implementation that sets the parameter values for the call. This works the same as for
the JdbcCursorItemReader
above. All the additional properties listed in
Additional Properties apply to the StoredProcedureItemReader
as well.
Paging ItemReader
Implementations
使用数据库游标的一个替代方法是运行多个查询,其中每个查询都获取一部分结果。我们称这部分为一页。每个查询都必须指定开始行号和我们希望在页面中返回的行数。
An alternative to using a database cursor is running multiple queries where each query fetches a portion of the results. We refer to this portion as a page. Each query must specify the starting row number and the number of rows that we want returned in the page.
JdbcPagingItemReader
ItemReader
分页实现之一是 JdbcPagingItemReader
。JdbcPagingItemReader
需要一个 PagingQueryProvider
负责提供用于检索构成一页的行 SQL 查询。由于每个数据库都有自己的分页支持策略,因此我们需要为每个支持的数据库类型使用不同的 PagingQueryProvider
。还有 SqlPagingQueryProviderFactoryBean
可以自动检测正在使用的数据库并确定合适的 PagingQueryProvider
实现。这简化了配置,是推荐的最佳实践。
One implementation of a paging ItemReader
is the JdbcPagingItemReader
. The
JdbcPagingItemReader
needs a PagingQueryProvider
responsible for providing the SQL
queries used to retrieve the rows making up a page. Since each database has its own
strategy for providing paging support, we need to use a different PagingQueryProvider
for each supported database type. There is also the SqlPagingQueryProviderFactoryBean
that auto-detects the database that is being used and determine the appropriate
PagingQueryProvider
implementation. This simplifies the configuration and is the
recommended best practice.
SqlPagingQueryProviderFactoryBean
要求您指定 select
子句和 from
子句。您还可以提供可选的 WHERE 子句。这些子句和必需的 sortKey
用于生成 SQL 语句。
The SqlPagingQueryProviderFactoryBean
requires that you specify a select
clause and a
from
clause. You can also provide an optional where
clause. These clauses and the
required sortKey
are used to build an SQL statement.
重要的是在 |
It is important to have a unique key constraint on the |
在打开阅读器以后,它会像任何其他 ItemReader
一样以同样的基本方式在每次调用 read
时回传一个项目。当需要其他行时,分页会在后台进行。
After the reader has been opened, it passes back one item per call to read
in the same
basic fashion as any other ItemReader
. The paging happens behind the scenes when
additional rows are needed.
- Java
-
以下 Java 示例配置使用类似于前面所示基于游标的
ItemReaders
的“客户信用”示例:
The following Java example configuration uses a similar 'customer credit' example as the
cursor-based ItemReaders
shown previously:
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "NEW");
return new JdbcPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(customerCreditMapper())
.pageSize(1000)
.build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setSelectClause("select id, name, credit");
provider.setFromClause("from customer");
provider.setWhereClause("where status=:status");
provider.setSortKey("id");
return provider;
}
- XML
-
以下 XML 示例配置使用类似于前面所示基于游标的
ItemReaders
的“客户信用”示例:
The following XML example configuration uses a similar 'customer credit' example as the
cursor-based ItemReaders
shown previously:
<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="queryProvider">
<bean class="org.spr...SqlPagingQueryProviderFactoryBean">
<property name="selectClause" value="select id, name, credit"/>
<property name="fromClause" value="from customer"/>
<property name="whereClause" value="where status=:status"/>
<property name="sortKey" value="id"/>
</bean>
</property>
<property name="parameterValues">
<map>
<entry key="status" value="NEW"/>
</map>
</property>
<property name="pageSize" value="1000"/>
<property name="rowMapper" ref="customerMapper"/>
</bean>
此配置的 ItemReader
使用必须指定的 RowMapper
返回 CustomerCredit
对象。“pageSize”属性确定每次查询运行中从数据库中读取的实体数量。
This configured ItemReader
returns CustomerCredit
objects using the RowMapper
,
which must be specified. The 'pageSize' property determines the number of entities read
from the database for each query run.
“parameterValues”属性可用于指定用于查询的参数值的 Map
。如果您在 where
子句中使用命名参数,则每个条目的键应与命名参数的名称相匹配。如果您使用传统 “?”占位符,则每个条目的键应是占位符的编号,从 1 开始。
The 'parameterValues' property can be used to specify a Map
of parameter values for the
query. If you use named parameters in the where
clause, the key for each entry should
match the name of the named parameter. If you use a traditional '?' placeholder, then the
key for each entry should be the number of the placeholder, starting with 1.
JpaPagingItemReader
分页 ItemReader
的另一个实现是 JpaPagingItemReader
。JPA 没有类似于 Hibernate StatelessSession
的概念,因此我们必须使用 JPA 规范提供的不其他功能。由于 JPA 支持分页,因此在使用 JPA 进行批处理时这是自然的。在读取每一页后,实体将被分离,并清除持久化上下文,以便在处理该页后将实体进行垃圾回收。
Another implementation of a paging ItemReader
is the JpaPagingItemReader
. JPA does
not have a concept similar to the Hibernate StatelessSession
, so we have to use other
features provided by the JPA specification. Since JPA supports paging, this is a natural
choice when it comes to using JPA for batch processing. After each page is read, the
entities become detached and the persistence context is cleared, to allow the entities to
be garbage collected once the page is processed.
JpaPagingItemReader
允许您声明一个 JPQL 语句并传入一个 EntityManagerFactory
。然后它以与任何其他 ItemReader
相同的基本方式逐个项目传递回读取内容。当需要其他实体时,分页在后台发生。
The JpaPagingItemReader
lets you declare a JPQL statement and pass in a
EntityManagerFactory
. It then passes back one item per call to read in the same basic
fashion as any other ItemReader
. The paging happens behind the scenes when additional
entities are needed.
- Java
-
以下 Java 示例配置使用与之前所示 JDBC 读取器相同的“客户信用”示例:
The following Java example configuration uses the same 'customer credit' example as the JDBC reader shown previously:
@Bean
public JpaPagingItemReader itemReader() {
return new JpaPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.entityManagerFactory(entityManagerFactory())
.queryString("select c from CustomerCredit c")
.pageSize(1000)
.build();
}
- XML
-
以下 XML 示例配置使用与之前所示 JDBC 读取器相同的“客户信用”示例:
The following XML example configuration uses the same 'customer credit' example as the JDBC reader shown previously:
<bean id="itemReader" class="org.spr...JpaPagingItemReader">
<property name="entityManagerFactory" ref="entityManagerFactory"/>
<property name="queryString" value="select c from CustomerCredit c"/>
<property name="pageSize" value="1000"/>
</bean>
此配置的 ItemReader
以与上面为 JdbcPagingItemReader
描述的完全相同的方式返回 CustomerCredit
对象,假设 CustomerCredit
对象有正确的 JPA 注释或 ORM 映射文件。“pageSize”属性确定每次查询执行中从数据库中读取的实体数量。
This configured ItemReader
returns CustomerCredit
objects in the exact same manner as
described for the JdbcPagingItemReader
above, assuming the CustomerCredit
object has the
correct JPA annotations or ORM mapping file. The 'pageSize' property determines the
number of entities read from the database for each query execution.
Database ItemWriters
尽管平面文件和 XML 文件都有特定的 ItemWriter
实例,但在数据库世界中没有完全等效的实例。这是因为事务提供了所有必需的功能。ItemWriter
实现对于文件是必需的,因为它们必须表现得好像它们是事务性的,在适当的时间跟踪已写入的项目并刷新或清除。数据库不需要此功能,因为该写入已经包含在事务中。用户可以创建自己的实现 ItemWriter
接口的 DAO,或从为通用处理问题编写的自定义 ItemWriter
中使用一个。无论哪种方式,它们都应该毫无问题地工作。需要注意的一件事是批量输出所提供的性能和错误处理功能。在使用 Hibernate 作为 ItemWriter
时这种情况最常见,但在使用 JDBC 批处理模式时也可能遇到相同的问题。假设我们小心刷新并且数据中没有错误,则批量数据库输出没有任何固有的缺陷。但是,任何写入错误都可能导致混淆,因为没有办法知道是哪个单独的项目导致异常,甚至没有办法知道是否有任何单独的项目是该异常的根源,如下面的图像所示:
While both flat files and XML files have a specific ItemWriter
instance, there is no exact equivalent
in the database world. This is because transactions provide all the needed functionality.
ItemWriter
implementations are necessary for files because they must act as if they’re transactional,
keeping track of written items and flushing or clearing at the appropriate times.
Databases have no need for this functionality, since the write is already contained in a
transaction. Users can create their own DAOs that implement the ItemWriter
interface or
use one from a custom ItemWriter
that’s written for generic processing concerns. Either
way, they should work without any issues. One thing to look out for is the performance
and error handling capabilities that are provided by batching the outputs. This is most
common when using hibernate as an ItemWriter
but could have the same issues when using
JDBC batch mode. Batching database output does not have any inherent flaws, assuming we
are careful to flush and there are no errors in the data. However, any errors while
writing can cause confusion, because there is no way to know which individual item caused
an exception or even if any individual item was responsible, as illustrated in the
following image:
如果在写入之前对项目进行缓冲,则任何错误都不会被抛出,直到在提交之前刷新缓冲区。例如,假设每个块写入 20 个项目,并且第 15 个项目抛出 DataIntegrityViolationException
。就 Step
而言,所有 20 个项目都成功写入,因为在实际写入之前无法知道发生错误。一旦调用 Session#flush()
,缓冲区就被清空并且遇到了异常。此时,Step
无能为力。必须回滚事务。通常,此异常可能导致跳过该项目(取决于跳过/重试策略),然后不再写入它。但是,在批处理场景中,没有办法知道哪个项目导致了该问题。发生故障时,整个缓冲区正在被写入。解决此问题的唯一方法是在每个项目后刷新,如下面的图像所示:
If items are buffered before being written, any errors are not thrown until the buffer is
flushed just before a commit. For example, assume that 20 items are written per chunk,
and the 15th item throws a DataIntegrityViolationException
. As far as the Step
is concerned, all 20 item are written successfully, since there is no way to know that an
error occurs until they are actually written. Once Session#flush()
is called, the
buffer is emptied and the exception is hit. At this point, there is nothing the Step
can do. The transaction must be rolled back. Normally, this exception might cause the
item to be skipped (depending upon the skip/retry policies), and then it is not written
again. However, in the batched scenario, there is no way to know which item caused the
issue. The whole buffer was being written when the failure happened. The only way to
solve this issue is to flush after each item, as shown in the following image:
这是一个常见的用例,尤其是在使用 Hibernate 时,并且 ItemWriter
实现的简单指导原则是对 write()
的每个调用进行刷新。这样做允许可靠地跳过项目,Spring Batch 在错误后会内部处理对 ItemWriter
调用的粒度。
This is a common use case, especially when using Hibernate, and the simple guideline for
implementations of ItemWriter
is to flush on each call to write()
. Doing so allows
for items to be skipped reliably, with Spring Batch internally taking care of the
granularity of the calls to ItemWriter
after an error.