JDBC Message Store

Spring 集成提供了两个特定于 JDBC 的消息存储实现。JdbcMessageStore 适用于合计器和索赔检查模式。JdbcChannelMessageStore 实现提供了专门用于消息通道、有针对性和更具可扩展性的实现。

Spring Integration provides two JDBC specific message store implementations. The JdbcMessageStore is suitable for use with aggregators and the claim check pattern. The JdbcChannelMessageStore implementation provides a more targeted and scalable implementation specifically for message channel.

请注意,您可以使用 JdbcMessageStore 作为消息通道的后台,JdbcChannelMessageStore 已针对该目的进行了优化。

Note that you can use a JdbcMessageStore to back a message channel, JdbcChannelMessageStore is optimized for that purpose.

从 5.0.11、5.1.2 版本开始,`JdbcChannelMessageStore`的索引已得到优化。如果您在这样的存储中具有大型消息组,您可能希望更改索引。此外,`PriorityChannel`的索引已注释掉,因为除非您使用由 JDBC 支持的此类通道,否则不需要它。

Starting with versions 5.0.11, 5.1.2, the indexes for the JdbcChannelMessageStore have been optimized. If you have large message groups in such a store, you may wish to alter the indexes. Furthermore, the index for PriorityChannel is commented out because it is not needed unless you are using such channels backed by JDBC.

使用`OracleChannelMessageStoreQueryProvider`时,优先级通道索引*must*将被添加,因为它包含在查询的提示中。

When using the OracleChannelMessageStoreQueryProvider, the priority channel index must be added because it is included in a hint in the query.

Initializing the Database

在开始使用 JDBC 消息存储组件之前,您应该使用适当的对象来提供一个目标数据库。

Before starting to use JDBC message store components, you should provision a target database with the appropriate objects.

Spring 集成附带了一些示例脚本,可用于初始化数据库。在 spring-integration-jdbc JAR 文件中,可以在 org.springframework.integration.jdbc 包中找到脚本。它针对一系列常见的数据库平台提供了一个示例创建脚本和一个示例删除脚本。使用这些脚本的一种常用方法是引用 Spring JDBC data source initializer 中的这些脚本。请注意,这些脚本作为示例和所需表名和列名的规范提供。你可能会发现你需要对它们进行增强以用于生产(例如,通过添加索引声明)。

Spring Integration ships with some sample scripts that can be used to initialize a database. In the spring-integration-jdbc JAR file, you can find scripts in the org.springframework.integration.jdbc package. It provides an example create and an example drop script for a range of common database platforms. A common way to use these scripts is to reference them in a Spring JDBC data source initializer. Note that the scripts are provided as samples and as specifications of the required table and column names. You may find that you need to enhance them for production use (for, example, by adding index declarations).

从第 6.2 版开始,JdbcMessageStoreJdbcChannelMessageStoreJdbcMetadataStoreDefaultLockRepository 实现 SmartLifecycle,并在 start() 方法中针对各自的表执行 SELECT COUNT 查询,以确保在目标数据库中存在所需的表(根据提供的前缀)。如果所需的表不存在,则应用程序上下文将无法启动。可以通过 setCheckDatabaseOnStart(false) 禁用此项检查。

Starting with version 6.2, the JdbcMessageStore, JdbcChannelMessageStore, JdbcMetadataStore, and DefaultLockRepository implement SmartLifecycle and perform a`SELECT COUNT` query, on their respective tables, in the start() method to ensure that the required table (according to the provided prefix) is present in the target database. If the required table does not exist, the application context fails to start. The check can be disabled via setCheckDatabaseOnStart(false).

The Generic JDBC Message Store

JDBC 模块提供了由数据库支持的 Spring Integration MessageStore(在索取检查模式中很重要)和 MessageGroupStore(在诸如聚合器等有状态模式中很重要)的实现。这两个接口都由 JdbcMessageStore 实现,并且支持以 XML 配置存储实例,如下面的示例所示:

The JDBC module provides an implementation of the Spring Integration MessageStore (important in the claim check pattern) and MessageGroupStore (important in stateful patterns such as an aggregator) backed by a database. Both interfaces are implemented by the JdbcMessageStore, and there is support for configuring store instances in XML, as the following example shows:

<int-jdbc:message-store id="messageStore" data-source="dataSource"/>

您可以指定 JdbcTemplate 而不是 DataSource

You can specify a JdbcTemplate instead of a DataSource.

以下示例显示了一些其他可选属性:

The following example shows some other optional attributes:

<int-jdbc:message-store id="messageStore" data-source="dataSource"
    lob-handler="lobHandler" table-prefix="MY_INT_"/>

在前述示例中,我们指定了一个 LobHandler 来将消息作为大对象进行处理(这对于 Oracle 来说通常非常必要),并指定了存储中查询生成的表名的前缀。表名前缀默认为 INT_

In the preceding example, we have specified a LobHandler for dealing with messages as large objects (which is often necessary for Oracle) and a prefix for the table names in the queries generated by the store. The table name prefix defaults to INT_.

Backing Message Channels

如果您打算使用 JDBC 支持消息通道,我们建议使用 JdbcChannelMessageStore 实现。它只能与消息通道配合使用。

If you intend to back message channels with JDBC, we recommend using the JdbcChannelMessageStore implementation. It works only in conjunction with Message Channels.

Supported Databases

JdbcChannelMessageStore 使用特定于数据库的 SQL 查询从数据库中检索消息。因此,您必须在 JdbcChannelMessageStore 上设置 ChannelMessageStoreQueryProvider 属性。此 channelMessageStoreQueryProvider 提供针对您指定的特定数据库的 SQL 查询。Spring Integration 为以下关系型数据库提供支持:

The JdbcChannelMessageStore uses database-specific SQL queries to retrieve messages from the database. Therefore, you must set the ChannelMessageStoreQueryProvider property on the JdbcChannelMessageStore. This channelMessageStoreQueryProvider provides the SQL queries for the particular database you specify. Spring Integration provides support for the following relational databases:

  • PostgreSQL

  • HSQLDB

  • MySQL

  • Oracle

  • Derby

  • H2

  • SqlServer

  • Sybase

  • DB2

如果没有列出您的数据库,您可以实现 ChannelMessageStoreQueryProvider 接口并提供您自己的自定义查询。

If your database is not listed, you can implement the ChannelMessageStoreQueryProvider interface and provide your own custom queries.

版本 4.0 向表中添加了 MESSAGE_SEQUENCE 列,以确保即使消息存储在同一毫秒内,也能按照先进先出 (FIFO) 排列顺序队列。

Version 4.0 added the MESSAGE_SEQUENCE column to the table to ensure first-in-first-out (FIFO) queueing even when messages are stored in the same millisecond.

从第 6.2 版开始,ChannelMessageStoreQueryProvider 公开一个 isSingleStatementForPoll 标志,其中 PostgresChannelMessageStoreQueryProvider 返回 true,且其用于轮询的查询现基于一条 DELETE…​RETURNING 语句。JdbcChannelMessageStore 咨询 isSingleStatementForPoll 选项,如果只支持单轮询语句,则跳过单独的 DELETE 语句。

Starting with version 6.2, ChannelMessageStoreQueryProvider exposes a isSingleStatementForPoll flag, where the PostgresChannelMessageStoreQueryProvider returns true and its queries for polls are now based on a single DELETE…​RETURNING statement. The JdbcChannelMessageStore consults with the isSingleStatementForPoll option and skips a separate DELETE statement if only single poll statement is supported.

Custom Message Insertion

自版本 5.0 起,通过重载 ChannelMessageStorePreparedStatementSetter 类,你可以在 JdbcChannelMessageStore 中提供一条自定义实现用于消息插入。你可以使用它来设置不同的列或更改表结构或序列化策略。例如,你可以将其结构作为 JSON 字符串存储,而不是默认序列化为 byte[]

Since version 5.0, by overloading the ChannelMessageStorePreparedStatementSetter class, you can provide a custom implementation for message insertion in the JdbcChannelMessageStore. You can use it to set different columns or change the table structure or serialization strategy. For example, instead of default serialization to byte[], you can store its structure as a JSON string.

以下示例使用 setValues 的默认实现来存储公共列,并覆盖存储消息负载为 varchar 的行为:

The following example uses the default implementation of setValues to store common columns and overrides the behavior to store the message payload as a varchar:

public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {

    @Override
    public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
        Object groupId, String region, 	boolean priorityEnabled) throws SQLException {
        // Populate common columns
        super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
        // Store message payload as varchar
        preparedStatement.setString(6, requestMessage.getPayload().toString());
    }
}

通常,我们不建议使用关系数据库进行排队。如果您可能的话,可考虑使用 JMS 或 AMQP 支持的通道。有关更多参考,请参见以下资源:

Generally, we do not recommend using a relational database for queuing. Instead, if possible, consider using either JMS- or AMQP-backed channels instead. For further reference, see the following resource:

如果您仍计划将数据库用作队列,请考虑使用 PostgreSQL 及其通知机制,其在 {in a subsequent section} 中进行了描述。

If you are still planning to use your database as a queue, consider using PostgreSQL and its notification mechanism which is described in a subsequent section.

Concurrent Polling

轮询消息通道时,您可以选择使用 TaskExecutor 引用配置关联的 Poller

When polling a message channel, you have the option to configure the associated Poller with a TaskExecutor reference.

但请记住,如果你使用 JDBC 支持的消息通道,并且你计划轮询该通道,并因此以事务方式使用多线程的消息存储,则应确保使用支持 Multiversion Concurrency Control (MVCC)的关系数据库。否则,锁定可能是一个问题,在使用多线程时,性能可能无法达到预期。例如,Apache Derby 在这方面存在问题。

Keep in mind, though, that if you use a JDBC backed message channel, and you plan to poll the channel and consequently the message store transactional with multiple threads, you should ensure that you use a relational database that supports Multiversion Concurrency Control (MVCC). Otherwise, locking may be an issue and the performance, when using multiple threads, may not materialize as expected. For example, Apache Derby is problematic in that regard.

要实现更好的 JDBC 队列吞吐量,并在不同线程可能从队列轮询同一 Message 时避免问题,在使用不支持 MVCC 的数据库时,将 JdbcChannelMessageStoreusingIdCache 属性设置为 true 非常重要。以下示例显示了如何进行此项设置:

To achieve better JDBC queue throughput and avoid issues when different threads may poll the same Message from the queue, it is important to set the usingIdCache property of JdbcChannelMessageStore to true when using databases that do not support MVCC. The following example shows how to do so:

<bean id="queryProvider"
    class="o.s.i.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/>
</int:transaction-synchronization-factory>

<task:executor id="pool" pool-size="10"
    queue-capacity="10" rejection-policy="CALLER_RUNS" />

<bean id="store" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
    <property name="region" value="TX_TIMEOUT"/>
    <property name="usingIdCache" value="true"/>
</bean>

<int:channel id="inputChannel">
    <int:queue message-store="store"/>
</int:channel>

<int:bridge input-channel="inputChannel" output-channel="outputChannel">
    <int:poller fixed-delay="500" receive-timeout="500"
        max-messages-per-poll="1" task-executor="pool">
        <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory"
        isolation="READ_COMMITTED" transaction-manager="transactionManager" />
    </int:poller>
</int:bridge>

<int:channel id="outputChannel" />

Priority Channel

从版本 4.0 开始,JdbcChannelMessageStore 实现 PriorityCapableChannelMessageStore,并提供 priorityEnabled 选项,允许将其用作 priority-queue 实例的 message-store 引用。为此,INT_CHANNEL_MESSAGE 表有一个 MESSAGE_PRIORITY 列来存储 PRIORITY 消息标头的值。此外,新的 MESSAGE_SEQUENCE 列使我们能够实现稳定的先进先出 (FIFO) 轮询机制,即使在同一毫秒内存储了具有相同优先级的多条消息时也是如此。按照 order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE 从数据库中轮询(选择)消息。

Starting with version 4.0, JdbcChannelMessageStore implements PriorityCapableChannelMessageStore and provides the priorityEnabled option, letting it be used as a message-store reference for priority-queue instances. For this purpose, the INT_CHANNEL_MESSAGE table has a MESSAGE_PRIORITY column to store the value of PRIORITY message headers. In addition, a new MESSAGE_SEQUENCE column lets us achieve a robust first-in-first-out (FIFO) polling mechanism, even when multiple messages are stored with the same priority in the same millisecond. Messages are polled (selected) from the database with order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE.

我们不建议对优先级和非优先级队列通道使用相同的`JdbcChannelMessageStore` bean,因为 priorityEnabled 选项适用于整个存储,并且不会为队列通道保留正确的 FIFO 队列语义。但是,同一`INT_CHANNEL_MESSAGE`表(甚至是`region`)都可以用于两种`JdbcChannelMessageStore`类型。要配置该场景,您可以从另一个消息存储 bean 扩展一个,如下面的示例所示:

We do not recommend using the same JdbcChannelMessageStore bean for priority and non-priority queue channels, because the priorityEnabled option applies to the entire store and proper FIFO queue semantics are not retained for the queue channel. However, the same INT_CHANNEL_MESSAGE table (and even region) can be used for both JdbcChannelMessageStore types. To configure that scenario, you can extend one message store bean from the other, as the following example shows:

<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

Partitioning a Message Store

通常将 {JdbcMessageStore} 作为同一应用程序中应用程序或节点组的全局存储。为了提供一些针对名称冲突的保护并控制数据库元数据配置,消息存储允许将表格以两种方式进行分区。一种方法是通过更改前缀(如 {described earlier})来使用单独的表格名称。另一种方法是指定 {region} 名称来对单一表格内的分区数据进行分区。使用第二种方法的一个重要用例是,当 {MessageStore} 正在管理作为 Spring Integration 消息通道的后备的持久队列时。持久通道的消息数据在存储中以通道名称为键。因此,如果通道名称并非全局唯一,通道将获取不针对它们的数据。为了避免这种危险,您可以使用消息存储 {region} 来为拥有相同逻辑名称的不同物理通道保留单独的数据。

It is common to use a JdbcMessageStore as a global store for a group of applications or nodes in the same application. To provide some protection against name clashes and to give control over the database meta-data configuration, the message store lets the tables be partitioned in two ways. One way is to use separate table names, by changing the prefix (as described earlier). The other way is to specify a region name for partitioning data within a single table. An important use case for the second approach is when the MessageStore is managing persistent queues that back a Spring Integration Message Channel. The message data for a persistent channel is keyed in the store on the channel name. Consequently, if the channel names are not globally unique, the channels can pick up data that is not intended for them. To avoid this danger, you can use the message store region to keep data separate for different physical channels that have the same logical name.

PostgreSQL: Receiving Push Notifications

PostgreSQL 提供了一个侦听和通知框架,用于在对数据库表进行操作时接收推送通知。Spring Integration 利用此机制(从版本 6.0 开始)允许在向 JdbcChannelMessageStore 添加新消息时接收推送通知。在使用此功能时,必须定义一个数据库触发器,它可以在包含在 Spring Integration JDBC 模块中的 schema-postgresql.sql 文件的注释部分找到。

PostgreSQL offers a listen and notification framework for receiving push notifications upon database table manipulations. Spring Integration leverages this mechanism (starting with version 6.0) to allow for receiving push notifications when new messages are added to a JdbcChannelMessageStore. When using this feature, a database trigger must be defined, which can be found as part of the comments of the schema-postgresql.sql file which is included in the JDBC module of Spring Integration.

推送通知通过 PostgresChannelMessageTableSubscriber 类接收,该类允许其订阅者在为任何给定的 regiongroupId 接收新消息时接收回调。即使在不同的 JVM 上追加消息,但对于同一个数据库,也会收到这些通知。PostgresSubscribableChannel 实现使用 PostgresChannelMessageTableSubscriber.Subscription 契约,以响应上述 PostgresChannelMessageTableSubscriber 的通知从存储区提取消息。

Push notifications are received via the PostgresChannelMessageTableSubscriber class which allows its subscribers to receive a callback upon the arrival of new messages for any given region and groupId. These notifications are received even if a message was appended on a different JVM, but to the same database. The PostgresSubscribableChannel implementation uses a PostgresChannelMessageTableSubscriber.Subscription contract to pull messages from the store as a reaction for notification from the mentioned PostgresChannelMessageTableSubscriber notifications.

例如,可以按如下方式接收 some group 的推送通知:

For example, push notifications for some group can be received as follows:

@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
    JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
    messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
    return messageStore;
}

@Bean
public PostgresChannelMessageTableSubscriber subscriber(
      @Value("${spring.datasource.url}") String url,
      @Value("${spring.datasource.username}") String username,
      @Value("${spring.datasource.password}") String password) {
    return new PostgresChannelMessageTableSubscriber(() ->
        DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}

@Bean
public PostgresSubscribableChannel channel(
    PostgresChannelMessageTableSubscriber subscriber,
    JdbcChannelMessageStore messageStore) {
  return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}

交易支持

Transaction support

从 6.0.5 版开始,在 PostgresSubscribableChannel 上指定 PlatformTransactionManager 将在事务中通知订阅者。订阅器中的异常将导致事务回滚并消息将被放回消息存储区。默认情况下,事务支持未激活。

Starting with version 6.0.5, specifying a PlatformTransactionManager on a PostgresSubscribableChannel will notify subscribers in a transaction. An exception in a subscriber will cause the transaction to be rolled back and the message to be put back in the message store. Transactional support is not activated by default.

重试

Retries

从 6.0.5 版开始,可以通过向 PostgresSubscribableChannel 提供 RetryTemplate 来指定重试策略。默认情况下,不执行任何重试。

Starting with version 6.0.5, a retry policy can be specified by providing a RetryTemplate to the PostgresSubscribableChannel. By default, no retries are performed.

任何 active PostgresChannelMessageTableSubscriber 在其活动生命周期内占用 exclusive JDBC Connection。因此,此连接不能来自池化 DataSource 非常重要。此类连接池通常希望在预定义的超时窗口内关闭已发出的连接。

Any active PostgresChannelMessageTableSubscriber occupies an exclusive JDBC Connection for the duration of its active life cycle. It is therefore important that this connection does not originate from a pooling DataSource. Such connection pools do normally expect that issued connections are closed within a predefined timeout window.

对于此 exclusive 连接的需求,还建议一个 JVM 仅运行一个 PostgresChannelMessageTableSubscriber,该 subscriber 可用来注册任意数量的订阅项。

For this need of an exclusive connection, it is also recommended that a JVM only runs a single PostgresChannelMessageTableSubscriber which can be used to register any number of subscriptions.