JDBC Message Store
Spring 集成提供了两个特定于 JDBC 的消息存储实现。JdbcMessageStore
适用于合计器和索赔检查模式。JdbcChannelMessageStore
实现提供了专门用于消息通道、有针对性和更具可扩展性的实现。
Spring Integration provides two JDBC specific message store implementations.
The JdbcMessageStore
is suitable for use with aggregators and the claim check pattern.
The JdbcChannelMessageStore
implementation provides a more targeted and scalable implementation specifically for message channel.
请注意,您可以使用 JdbcMessageStore
作为消息通道的后台,JdbcChannelMessageStore
已针对该目的进行了优化。
Note that you can use a JdbcMessageStore
to back a message channel, JdbcChannelMessageStore
is optimized for that purpose.
从 5.0.11、5.1.2 版本开始,`JdbcChannelMessageStore`的索引已得到优化。如果您在这样的存储中具有大型消息组,您可能希望更改索引。此外,`PriorityChannel`的索引已注释掉,因为除非您使用由 JDBC 支持的此类通道,否则不需要它。
Starting with versions 5.0.11, 5.1.2, the indexes for the JdbcChannelMessageStore
have been optimized.
If you have large message groups in such a store, you may wish to alter the indexes.
Furthermore, the index for PriorityChannel
is commented out because it is not needed unless you are using such channels backed by JDBC.
使用`OracleChannelMessageStoreQueryProvider`时,优先级通道索引*must*将被添加,因为它包含在查询的提示中。 |
When using the |
Initializing the Database
在开始使用 JDBC 消息存储组件之前,您应该使用适当的对象来提供一个目标数据库。
Before starting to use JDBC message store components, you should provision a target database with the appropriate objects.
Spring 集成附带了一些示例脚本,可用于初始化数据库。在 spring-integration-jdbc
JAR 文件中,可以在 org.springframework.integration.jdbc
包中找到脚本。它针对一系列常见的数据库平台提供了一个示例创建脚本和一个示例删除脚本。使用这些脚本的一种常用方法是引用 Spring JDBC data source initializer 中的这些脚本。请注意,这些脚本作为示例和所需表名和列名的规范提供。你可能会发现你需要对它们进行增强以用于生产(例如,通过添加索引声明)。
Spring Integration ships with some sample scripts that can be used to initialize a database.
In the spring-integration-jdbc
JAR file, you can find scripts in the org.springframework.integration.jdbc
package.
It provides an example create and an example drop script for a range of common database platforms.
A common way to use these scripts is to reference them in a Spring JDBC data source initializer.
Note that the scripts are provided as samples and as specifications of the required table and column names.
You may find that you need to enhance them for production use (for, example, by adding index declarations).
从第 6.2 版开始,JdbcMessageStore
、JdbcChannelMessageStore
、JdbcMetadataStore
和 DefaultLockRepository
实现 SmartLifecycle
,并在 start()
方法中针对各自的表执行 SELECT COUNT
查询,以确保在目标数据库中存在所需的表(根据提供的前缀)。如果所需的表不存在,则应用程序上下文将无法启动。可以通过 setCheckDatabaseOnStart(false)
禁用此项检查。
Starting with version 6.2, the JdbcMessageStore
, JdbcChannelMessageStore
, JdbcMetadataStore
, and DefaultLockRepository
implement SmartLifecycle
and perform a`SELECT COUNT` query, on their respective tables, in the start()
method to ensure that the required table (according to the provided prefix) is present in the target database.
If the required table does not exist, the application context fails to start.
The check can be disabled via setCheckDatabaseOnStart(false)
.
The Generic JDBC Message Store
JDBC 模块提供了由数据库支持的 Spring Integration MessageStore
(在索取检查模式中很重要)和 MessageGroupStore
(在诸如聚合器等有状态模式中很重要)的实现。这两个接口都由 JdbcMessageStore
实现,并且支持以 XML 配置存储实例,如下面的示例所示:
The JDBC module provides an implementation of the Spring Integration MessageStore
(important in the claim check pattern) and MessageGroupStore
(important in stateful patterns such as an aggregator) backed by a database.
Both interfaces are implemented by the JdbcMessageStore
, and there is support for configuring store instances in XML, as the following example shows:
<int-jdbc:message-store id="messageStore" data-source="dataSource"/>
您可以指定 JdbcTemplate
而不是 DataSource
。
You can specify a JdbcTemplate
instead of a DataSource
.
以下示例显示了一些其他可选属性:
The following example shows some other optional attributes:
<int-jdbc:message-store id="messageStore" data-source="dataSource"
lob-handler="lobHandler" table-prefix="MY_INT_"/>
在前述示例中,我们指定了一个 LobHandler
来将消息作为大对象进行处理(这对于 Oracle 来说通常非常必要),并指定了存储中查询生成的表名的前缀。表名前缀默认为 INT_
。
In the preceding example, we have specified a LobHandler
for dealing with messages as large objects (which is often necessary for Oracle) and a prefix for the table names in the queries generated by the store.
The table name prefix defaults to INT_
.
Backing Message Channels
如果您打算使用 JDBC 支持消息通道,我们建议使用 JdbcChannelMessageStore
实现。它只能与消息通道配合使用。
If you intend to back message channels with JDBC, we recommend using the JdbcChannelMessageStore
implementation.
It works only in conjunction with Message Channels.
Supported Databases
JdbcChannelMessageStore
使用特定于数据库的 SQL 查询从数据库中检索消息。因此,您必须在 JdbcChannelMessageStore
上设置 ChannelMessageStoreQueryProvider
属性。此 channelMessageStoreQueryProvider
提供针对您指定的特定数据库的 SQL 查询。Spring Integration 为以下关系型数据库提供支持:
The JdbcChannelMessageStore
uses database-specific SQL queries to retrieve messages from the database.
Therefore, you must set the ChannelMessageStoreQueryProvider
property on the JdbcChannelMessageStore
.
This channelMessageStoreQueryProvider
provides the SQL queries for the particular database you specify.
Spring Integration provides support for the following relational databases:
-
PostgreSQL
-
HSQLDB
-
MySQL
-
Oracle
-
Derby
-
H2
-
SqlServer
-
Sybase
-
DB2
如果没有列出您的数据库,您可以实现 ChannelMessageStoreQueryProvider
接口并提供您自己的自定义查询。
If your database is not listed, you can implement the ChannelMessageStoreQueryProvider
interface and provide your own custom queries.
版本 4.0 向表中添加了 MESSAGE_SEQUENCE
列,以确保即使消息存储在同一毫秒内,也能按照先进先出 (FIFO) 排列顺序队列。
Version 4.0 added the MESSAGE_SEQUENCE
column to the table to ensure first-in-first-out (FIFO) queueing even when messages are stored in the same millisecond.
从第 6.2 版开始,ChannelMessageStoreQueryProvider
公开一个 isSingleStatementForPoll
标志,其中 PostgresChannelMessageStoreQueryProvider
返回 true
,且其用于轮询的查询现基于一条 DELETE…RETURNING
语句。JdbcChannelMessageStore
咨询 isSingleStatementForPoll
选项,如果只支持单轮询语句,则跳过单独的 DELETE
语句。
Starting with version 6.2, ChannelMessageStoreQueryProvider
exposes a isSingleStatementForPoll
flag, where the PostgresChannelMessageStoreQueryProvider
returns true
and its queries for polls are now based on a single DELETE…RETURNING
statement.
The JdbcChannelMessageStore
consults with the isSingleStatementForPoll
option and skips a separate DELETE
statement if only single poll statement is supported.
Custom Message Insertion
自版本 5.0 起,通过重载 ChannelMessageStorePreparedStatementSetter
类,你可以在 JdbcChannelMessageStore
中提供一条自定义实现用于消息插入。你可以使用它来设置不同的列或更改表结构或序列化策略。例如,你可以将其结构作为 JSON 字符串存储,而不是默认序列化为 byte[]
。
Since version 5.0, by overloading the ChannelMessageStorePreparedStatementSetter
class, you can provide a custom implementation for message insertion in the JdbcChannelMessageStore
.
You can use it to set different columns or change the table structure or serialization strategy.
For example, instead of default serialization to byte[]
, you can store its structure as a JSON string.
以下示例使用 setValues
的默认实现来存储公共列,并覆盖存储消息负载为 varchar
的行为:
The following example uses the default implementation of setValues
to store common columns and overrides the behavior to store the message payload as a varchar
:
public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {
@Override
public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
Object groupId, String region, boolean priorityEnabled) throws SQLException {
// Populate common columns
super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
// Store message payload as varchar
preparedStatement.setString(6, requestMessage.getPayload().toString());
}
}
通常,我们不建议使用关系数据库进行排队。如果您可能的话,可考虑使用 JMS 或 AMQP 支持的通道。有关更多参考,请参见以下资源:
Generally, we do not recommend using a relational database for queuing. Instead, if possible, consider using either JMS- or AMQP-backed channels instead. For further reference, see the following resource:
如果您仍计划将数据库用作队列,请考虑使用 PostgreSQL 及其通知机制,其在 {in a subsequent section} 中进行了描述。
If you are still planning to use your database as a queue, consider using PostgreSQL and its notification mechanism which is described in a subsequent section.
Concurrent Polling
轮询消息通道时,您可以选择使用 TaskExecutor
引用配置关联的 Poller
。
When polling a message channel, you have the option to configure the associated Poller
with a TaskExecutor
reference.
但请记住,如果你使用 JDBC 支持的消息通道,并且你计划轮询该通道,并因此以事务方式使用多线程的消息存储,则应确保使用支持 Multiversion Concurrency Control (MVCC)的关系数据库。否则,锁定可能是一个问题,在使用多线程时,性能可能无法达到预期。例如,Apache Derby 在这方面存在问题。
Keep in mind, though, that if you use a JDBC backed message channel, and you plan to poll the channel and consequently the message store transactional with multiple threads, you should ensure that you use a relational database that supports Multiversion Concurrency Control (MVCC). Otherwise, locking may be an issue and the performance, when using multiple threads, may not materialize as expected. For example, Apache Derby is problematic in that regard.
要实现更好的 JDBC 队列吞吐量,并在不同线程可能从队列轮询同一 Message
时避免问题,在使用不支持 MVCC 的数据库时,将 JdbcChannelMessageStore
的 usingIdCache
属性设置为 true
非常重要。以下示例显示了如何进行此项设置:
To achieve better JDBC queue throughput and avoid issues when different threads may poll the same Message
from the queue, it is important to set the usingIdCache
property of JdbcChannelMessageStore
to true
when using databases that do not support MVCC.
The following example shows how to do so:
<bean id="queryProvider"
class="o.s.i.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
<int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/>
</int:transaction-synchronization-factory>
<task:executor id="pool" pool-size="10"
queue-capacity="10" rejection-policy="CALLER_RUNS" />
<bean id="store" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
<property name="region" value="TX_TIMEOUT"/>
<property name="usingIdCache" value="true"/>
</bean>
<int:channel id="inputChannel">
<int:queue message-store="store"/>
</int:channel>
<int:bridge input-channel="inputChannel" output-channel="outputChannel">
<int:poller fixed-delay="500" receive-timeout="500"
max-messages-per-poll="1" task-executor="pool">
<int:transactional propagation="REQUIRED" synchronization-factory="syncFactory"
isolation="READ_COMMITTED" transaction-manager="transactionManager" />
</int:poller>
</int:bridge>
<int:channel id="outputChannel" />
Priority Channel
从版本 4.0 开始,JdbcChannelMessageStore
实现 PriorityCapableChannelMessageStore
,并提供 priorityEnabled
选项,允许将其用作 priority-queue
实例的 message-store
引用。为此,INT_CHANNEL_MESSAGE
表有一个 MESSAGE_PRIORITY
列来存储 PRIORITY
消息标头的值。此外,新的 MESSAGE_SEQUENCE
列使我们能够实现稳定的先进先出 (FIFO) 轮询机制,即使在同一毫秒内存储了具有相同优先级的多条消息时也是如此。按照 order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
从数据库中轮询(选择)消息。
Starting with version 4.0, JdbcChannelMessageStore
implements PriorityCapableChannelMessageStore
and provides the priorityEnabled
option, letting it be used as a message-store
reference for priority-queue
instances.
For this purpose, the INT_CHANNEL_MESSAGE
table has a MESSAGE_PRIORITY
column to store the value of PRIORITY
message headers.
In addition, a new MESSAGE_SEQUENCE
column lets us achieve a robust first-in-first-out (FIFO) polling mechanism, even when multiple messages are stored with the same priority in the same millisecond.
Messages are polled (selected) from the database with order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
.
我们不建议对优先级和非优先级队列通道使用相同的`JdbcChannelMessageStore` bean,因为 |
We do not recommend using the same |
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
Partitioning a Message Store
通常将 {JdbcMessageStore
} 作为同一应用程序中应用程序或节点组的全局存储。为了提供一些针对名称冲突的保护并控制数据库元数据配置,消息存储允许将表格以两种方式进行分区。一种方法是通过更改前缀(如 {described earlier})来使用单独的表格名称。另一种方法是指定 {region
} 名称来对单一表格内的分区数据进行分区。使用第二种方法的一个重要用例是,当 {MessageStore
} 正在管理作为 Spring Integration 消息通道的后备的持久队列时。持久通道的消息数据在存储中以通道名称为键。因此,如果通道名称并非全局唯一,通道将获取不针对它们的数据。为了避免这种危险,您可以使用消息存储 {region
} 来为拥有相同逻辑名称的不同物理通道保留单独的数据。
It is common to use a JdbcMessageStore
as a global store for a group of applications or nodes in the same application.
To provide some protection against name clashes and to give control over the database meta-data configuration, the message store lets the tables be partitioned in two ways.
One way is to use separate table names, by changing the prefix (as described earlier).
The other way is to specify a region
name for partitioning data within a single table.
An important use case for the second approach is when the MessageStore
is managing persistent queues that back a Spring Integration Message Channel.
The message data for a persistent channel is keyed in the store on the channel name.
Consequently, if the channel names are not globally unique, the channels can pick up data that is not intended for them.
To avoid this danger, you can use the message store region
to keep data separate for different physical channels that have the same logical name.
PostgreSQL: Receiving Push Notifications
PostgreSQL 提供了一个侦听和通知框架,用于在对数据库表进行操作时接收推送通知。Spring Integration 利用此机制(从版本 6.0 开始)允许在向 JdbcChannelMessageStore
添加新消息时接收推送通知。在使用此功能时,必须定义一个数据库触发器,它可以在包含在 Spring Integration JDBC 模块中的 schema-postgresql.sql
文件的注释部分找到。
PostgreSQL offers a listen and notification framework for receiving push notifications upon database table manipulations.
Spring Integration leverages this mechanism (starting with version 6.0) to allow for receiving push notifications when new messages are added to a JdbcChannelMessageStore
.
When using this feature, a database trigger must be defined, which can be found as part of the comments of the schema-postgresql.sql
file which is included in the JDBC module of Spring Integration.
推送通知通过 PostgresChannelMessageTableSubscriber
类接收,该类允许其订阅者在为任何给定的 region
和 groupId
接收新消息时接收回调。即使在不同的 JVM 上追加消息,但对于同一个数据库,也会收到这些通知。PostgresSubscribableChannel
实现使用 PostgresChannelMessageTableSubscriber.Subscription
契约,以响应上述 PostgresChannelMessageTableSubscriber
的通知从存储区提取消息。
Push notifications are received via the PostgresChannelMessageTableSubscriber
class which allows its subscribers to receive a callback upon the arrival of new messages for any given region
and groupId
.
These notifications are received even if a message was appended on a different JVM, but to the same database.
The PostgresSubscribableChannel
implementation uses a PostgresChannelMessageTableSubscriber.Subscription
contract to pull messages from the store as a reaction for notification from the mentioned PostgresChannelMessageTableSubscriber
notifications.
例如,可以按如下方式接收 some group
的推送通知:
For example, push notifications for some group
can be received as follows:
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return messageStore;
}
@Bean
public PostgresChannelMessageTableSubscriber subscriber(
@Value("${spring.datasource.url}") String url,
@Value("${spring.datasource.username}") String username,
@Value("${spring.datasource.password}") String password) {
return new PostgresChannelMessageTableSubscriber(() ->
DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}
@Bean
public PostgresSubscribableChannel channel(
PostgresChannelMessageTableSubscriber subscriber,
JdbcChannelMessageStore messageStore) {
return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}
交易支持
Transaction support
从 6.0.5 版开始,在 PostgresSubscribableChannel
上指定 PlatformTransactionManager
将在事务中通知订阅者。订阅器中的异常将导致事务回滚并消息将被放回消息存储区。默认情况下,事务支持未激活。
Starting with version 6.0.5, specifying a PlatformTransactionManager
on a PostgresSubscribableChannel
will notify subscribers in a transaction.
An exception in a subscriber will cause the transaction to be rolled back and the message to be put back in the message store.
Transactional support is not activated by default.
重试
Retries
从 6.0.5 版开始,可以通过向 PostgresSubscribableChannel
提供 RetryTemplate
来指定重试策略。默认情况下,不执行任何重试。
Starting with version 6.0.5, a retry policy can be specified by providing a RetryTemplate
to the PostgresSubscribableChannel
.
By default, no retries are performed.
任何 active PostgresChannelMessageTableSubscriber
在其活动生命周期内占用 exclusive JDBC Connection
。因此,此连接不能来自池化 DataSource
非常重要。此类连接池通常希望在预定义的超时窗口内关闭已发出的连接。
Any active PostgresChannelMessageTableSubscriber
occupies an exclusive JDBC Connection
for the duration of its active life cycle.
It is therefore important that this connection does not originate from a pooling DataSource
.
Such connection pools do normally expect that issued connections are closed within a predefined timeout window.
对于此 exclusive 连接的需求,还建议一个 JVM 仅运行一个 PostgresChannelMessageTableSubscriber
,该 subscriber 可用来注册任意数量的订阅项。
For this need of an exclusive connection, it is also recommended that a JVM only runs a single PostgresChannelMessageTableSubscriber
which can be used to register any number of subscriptions.