MongoDb Support

2.1 版本引入了对 MongoDB的支持:一个 “high-performance, open source, document-oriented database”。

Version 2.1 introduced support for MongoDB: a “high-performance, open source, document-oriented database”.

你需要将此依赖项包含在你的项目中:

You need to include this dependency into your project:

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mongodb</artifactId>
    <version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-mongodb:{project-version}"

有关下载、安装和运行 MongoDB 的信息,请参阅 MongoDB documentation

To download, install, and run MongoDB, see the MongoDB documentation.

Connecting to MongoDb

Blocking or Reactive?

从 5.3 版开始,Spring Integration 为反应式 MongoDB 驱动程序提供支持,以便在访问 MongoDB 时启用非阻塞 I/O。要启用反应式支持,请将 MongoDB 反应式流驱动程序添加到您的依赖项中:

Beginning with version 5.3, Spring Integration provides support for reactive MongoDB drivers to enable non-blocking I/O when accessing MongoDB. To enable reactive support, add the MongoDB reactive streams driver to your dependencies:

  • Maven

  • Gradle

<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-reactivestreams"

对于常规同步客户端,您需要将相应的驱动程序添加到依赖项中:

For regular synchronous client you need to add its respective driver into dependencies:

  • Maven

  • Gradle

<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-sync</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-sync"

这两个都是框架中的“可选”部分,以便更好地支持最终用户选择。

Both of them are optional in the framework for better end-user choice support.

要开始与 MongoDB 交互,首先需要连接到 MongoDB。Spring Integration 基于另一个 Spring 项目提供的支持,即 Spring Data MongoDB。它提供称为 `MongoDatabaseFactory`和 `ReactiveMongoDatabaseFactory`的工厂类,它们简化了与 MongoDB 客户端 API 的集成。

To begin interacting with MongoDB, you first need to connect to it. Spring Integration builds on the support provided by another Spring project, Spring Data MongoDB. It provides factory classes called MongoDatabaseFactory and ReactiveMongoDatabaseFactory, which simplify integration with the MongoDB Client API.

Spring Data 默认提供了阻塞式 MongoDB 驱动,但您可以选择使用上述依赖项选择响应式用法。

Spring Data provides the blocking MongoDB driver by default, but you may opt in for reactive usage by including the above dependency.

Using MongoDatabaseFactory

要连接到 MongoDB,您可以使用 MongoDatabaseFactory 接口的实现。

To connect to MongoDB you can use an implementation of the MongoDatabaseFactory interface.

以下示例展示了如何使用 SimpleMongoClientDatabaseFactory

The following example shows how to use SimpleMongoClientDatabaseFactory:

  • Java

  • XML

MongoDatabaseFactory mongoDbFactory =
        new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoClientDatabaseFactory">
    <constructor-arg>
        <bean class="com.mongodb.client.MongoClients" factory-method="create"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>

SimpleMongoClientDatabaseFactory`接受两个参数:一个 `MongoClient`实例和一个指定数据库名称的 `String。如果您需要配置 host、`port`等属性,则可以使用底层 `MongoClients`类提供的构造函数之一来传递它们。有关如何配置 MongoDB 的更多信息,请参阅 Spring-Data-MongoDB参考。

SimpleMongoClientDatabaseFactory takes two arguments: a MongoClient instance and a String that specifies the name of the database. If you need to configure properties such as host, port, and others, you can pass those by using one of the constructors provided by the underlying MongoClients class. For more information on how to configure MongoDB, see the Spring-Data-MongoDB reference.

Using ReactiveMongoDatabaseFactory

要使用反应式驱动程序连接到 MongoDB,您可以使用 ReactiveMongoDatabaseFactory 接口的实现。

To connect to MongoDB with the reactive driver, you can use an implementation of the ReactiveMongoDatabaseFactory interface.

以下示例展示了如何使用 SimpleReactiveMongoDatabaseFactory

The following example shows how to use SimpleReactiveMongoDatabaseFactory:

  • Java

  • XML

ReactiveMongoDatabaseFactory mongoDbFactory =
        new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleReactiveMongoDatabaseFactory">
    <constructor-arg>
        <bean class="com.mongodb.reactivestreams.client.MongoClients" factory-method="create"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>

MongoDB Message Store

正如 Enterprise Integration Patterns(EIP)书中所述, Message Store可以让您持久化消息。在处理能够缓冲消息的组件时,这样做非常有用(例如 QueueChannelaggregator、`resequencer`等),如果可靠性是一个问题。在 Spring Integration 中,`MessageStore`策略也为 claim check模式提供了基础,该模式也在 EIP 中进行了描述。

As described in the Enterprise Integration Patterns (EIP) book, a Message Store lets you persist messages. Doing so can be useful when dealing with components that have the ability to buffer messages (QueueChannel, aggregator, resequencer, and others.) if reliability is a concern. In Spring Integration, the MessageStore strategy also provides the foundation for the claim check pattern, which is described in EIP as well.

Spring Integration 的 MongoDB 模块提供了 MongoDbMessageStore,它是 MessageStore 策略(主要用于索取凭证模式)和 MessageGroupStore 策略(主要用于聚合器和序列重建模式)的实现。

Spring Integration’s MongoDB module provides the MongoDbMessageStore, which is an implementation of both the MessageStore strategy (mainly used by the claim check pattern) and the MessageGroupStore strategy (mainly used by the aggregator and resequencer patterns).

以下示例配置 MongoDbMessageStore 以使用 QueueChannelaggregator

The following example configures a MongoDbMessageStore to use a QueueChannel and an aggregator:

<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
    <constructor-arg ref="mongoDbFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="mongoDbMessageStore"/>
<int:channel>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
         message-store="mongoDbMessageStore"/>

前面的示例是一个简单的 bean 配置,并且需要 MongoDbFactory 作为构造器参数。

The preceding example is a simple bean configuration, and it expects a MongoDbFactory as a constructor argument.

MongoDbMessageStore 使用 Spring Data Mongo 映射机制,将 Message 作为 Mongo 文档扩展,其中包含所有嵌套属性。当你需要访问 payloadheaders 以进行审计或分析时,它很有用——例如,针对存储的消息。

The MongoDbMessageStore expands the Message as a Mongo document with all nested properties by using the Spring Data Mongo mapping mechanism. It is useful when you need to have access to the payload or headers for auditing or analytics — for example, against stored messages.

MongoDbMessageStore 使用自定义 MappingMongoConverter 实现来将 Message 实例存储为 MongoDB 文档,并且 Message 的属性(payloadheader 值)存在一些限制。

The MongoDbMessageStore uses a custom MappingMongoConverter implementation to store Message instances as MongoDB documents, and there are some limitations for the properties (payload and header values) of the Message.

从版本 5.1.6 开始,MongoDbMessageStore 可以使用自定义转换器配置,这些转换器被传播到内部的 MappingMongoConverter 实现中。有关更多信息,请参阅 MongoDbMessageStore.setCustomConverters(Object…​ customConverters) JavaDocs。

Starting with version 5.1.6, the MongoDbMessageStore can be configured with custom converters which are propagated into an internal MappingMongoConverter implementation. See MongoDbMessageStore.setCustomConverters(Object…​ customConverters) JavaDocs for more information.

Spring Integration 3.0 引入了 ConfigurableMongoDbMessageStore。它同时实现了 MessageStoreMessageGroupStore 接口。此类可以作为构造器参数接收 MongoTemplate,例如,你可以使用它配置自定义 WriteConcern。另一个构造器需要 MappingMongoConverterMongoDbFactory,它允许你为 Message 实例及其属性提供一些自定义转换。请注意,默认情况下,ConfigurableMongoDbMessageStore 使用标准 Java 序列化将 Message 实例写入和读出 MongoDB(请参阅 MongoDbMessageBytesConverter),并依赖于 MongoTemplate 中其他属性的默认值。它从提供的 MongoDbFactoryMappingMongoConverter 构建一个 MongoTemplateConfigurableMongoDbMessageStore 存储的集合的默认名称为 configurableStoreMessages。我们建议使用此实现创建健壮且灵活的解决方案,当消息包含复杂数据类型时。

Spring Integration 3.0 introduced the ConfigurableMongoDbMessageStore. It implements both the MessageStore and MessageGroupStore interfaces. This class can receive, as a constructor argument, a MongoTemplate, with which you can, for example, configure a custom WriteConcern. Another constructor requires a MappingMongoConverter and a MongoDbFactory, which lets you provide some custom conversions for Message instances and their properties. Note that, by default, the ConfigurableMongoDbMessageStore uses standard Java serialization to write and read Message instances to and from MongoDB (see MongoDbMessageBytesConverter) and relies on default values for other properties from MongoTemplate. It builds a MongoTemplate from the provided MongoDbFactory and MappingMongoConverter. The default name for the collection stored by the ConfigurableMongoDbMessageStore is configurableStoreMessages. We recommend using this implementation to create robust and flexible solutions when messages contain complex data types.

从版本 6.0.8 开始,AbstractConfigurableMongoDbMessageStore 提供了 setCreateIndexes(boolean)(默认为 true)选项,可用于禁用自动索引创建。以下示例演示如何声明 bean 并禁用自动索引创建:

Starting with version 6.0.8, the AbstractConfigurableMongoDbMessageStore provides a setCreateIndexes(boolean) (defaults to true) option which can be used to disable the auto indexes creation. The following example shows how to declare a bean and disable the auto indexes creation:

@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory) {
    MongoDbChannelMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
    mongoDbChannelMessageStore.setCreateIndexes(false);
    return mongoDbChannelMessageStore;
}

MongoDB Channel Message Store

版本 4.0 引入了新的 MongoDbChannelMessageStore。它是一个经过优化的 MessageGroupStore,用于 QueueChannel 实例中。使用 priorityEnabled = true,你可以在 <int:priority-queue> 实例中使用它,为持久化消息实现优先顺序轮询。优先级 MongoDB 文档字段从 IntegrationMessageHeaderAccessor.PRIORITYpriority)消息头填充。

Version 4.0 introduced the new MongoDbChannelMessageStore. It is an optimized MessageGroupStore for use in QueueChannel instances. With priorityEnabled = true, you can use it in <int:priority-queue> instances to achieve priority-order polling for persisted messages. The priority MongoDB document field is populated from the IntegrationMessageHeaderAccessor.PRIORITY (priority) message header.

此外,所有 MongoDB MessageStore 实例现在都有一个用于 MessageGroup 文档的 sequence 字段。sequence 值是同一集合中一个简单 sequence 文档的 $inc 操作的结果,该集合按需创建。sequence 字段用于 poll 操作,以便在消息存储在同一毫秒内时提供先进先出 (FIFO) 消息顺序(如果已配置,则按优先级)。

In addition, all MongoDB MessageStore instances now have a sequence field for MessageGroup documents. The sequence value is the result of an $inc operation for a simple sequence document from the same collection, which is created on demand. The sequence field is used in poll operations to provide first-in-first-out (FIFO) message order (within priority, if configured) when messages are stored within the same millisecond.

我们不建议将相同的 MongoDbChannelMessageStore Bean 用于优先级和非优先级,因为 priorityEnabled 选项适用于整个存储。然而,相同的 collection 可用于两种 MongoDbChannelMessageStore 类型,因为从存储中的消息轮询已得到排序并使用索引。要配置该场景,可以从另一个消息存储 Bean 扩展一个消息存储 Bean,如下面的示例所示:

We do not recommend using the same MongoDbChannelMessageStore bean for priority and non-priority, because the priorityEnabled option applies to the entire store. However, the same collection can be used for both MongoDbChannelMessageStore types, because message polling from the store is sorted and uses indexes. To configure that scenario, you can extend one message store bean from the other, as the following example shows:

<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
    <constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="store"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

Using AbstractConfigurableMongoDbMessageStore with auto index creation disable

从版本 6.0.8 开始,AbstractConfigurableMongoDbMessageStore 实现了一个 setCreateIndex(boolean),它可用于禁用或启用(默认)自动索引创建。以下示例演示如何声明 bean 并禁用自动索引创建:

Starting with version 6.0.8, the AbstractConfigurableMongoDbMessageStore implements a setCreateIndex(boolean) which can be use to desable or enable (default) the auto index creation. The following example shows how to declare a bean and disable the auto index creation:

@Bean
public AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory)
{
    AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
    mongoDbChannelMessageStore.setCreateIndex(false);

    return mongoDbChannelMessageStore;
}

MongoDB Metadata Store

Spring Integration 4.2 引入了基于 MongoDB 的新 MetadataStore (请参阅 Metadata Store)实现。您可以使用 MongoDbMetadataStore 在应用程序重新启动期间维护元数据状态。您可以结合使用此新的 MetadataStore 实现和下列适配器等:

Spring Integration 4.2 introduced a new MongoDB-based MetadataStore (see Metadata Store) implementation. You can use the MongoDbMetadataStore to maintain metadata state across application restarts. You can use this new MetadataStore implementation with adapters such as:

要指示这些适配器使用新的 MongoDbMetadataStore,请声明一个 bean 名称为 metadataStore 的 Spring bean。Feed 入站通道适配器自动拾取并使用声明的 MongoDbMetadataStore。以下示例演示如何声明名称为 metadataStore 的 bean:

To instruct these adapters to use the new MongoDbMetadataStore, declare a Spring bean with a bean name of metadataStore. The feed inbound channel adapter automatically picks up and use the declared MongoDbMetadataStore. The following example shows how to declare a bean with a name of metadataStore:

@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
    return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}

MongoDbMetadataStore 还实现了 ConcurrentMetadataStore,使其可以在多个应用程序实例之间可靠地共享,其中只允许一个实例存储或修改键的值。由于 MongoDB 保证,所有这些操作都是原子的。

The MongoDbMetadataStore also implements ConcurrentMetadataStore, letting it be reliably shared across multiple application instances, where only one instance is allowed to store or modify a key’s value. All these operations are atomic, thanks to MongoDB guarantees.

MongoDB Inbound Channel Adapter

MongoDB 入站通道适配器是一个轮询消费者,它从 MongoDB 读取数据并将其作为 Message 有效负载发送。以下示例演示如何配置 MongoDB 入站通道适配器:

The MongoDB inbound channel adapter is a polling consumer that reads data from MongoDB and sends it as a Message payload. The following example shows how to configure a MongoDB inbound channel adapter:

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
       channel="replyChannel"
       query="{'name' : 'Bob'}"
       entity-class="java.lang.Object"
       auto-startup="false">
		<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>

如前所述的配置所示,你可以使用 inbound-channel-adapter 元素并为各种属性提供值来配置 MongoDb 入站通道适配器,例如:

As the preceding configuration shows, you configure a MongoDb inbound channel adapter by using the inbound-channel-adapter element and providing values for various attributes, such as:

  • query: A JSON query (see MongoDB Querying)

  • query-expression: A SpEL expression that is evaluated to a JSON query string (as the query attribute above) or to an instance of o.s.data.mongodb.core.query.Query. Mutually exclusive with the query attribute.

  • entity-class: The type of the payload object. If not supplied, a com.mongodb.DBObject is returned.

  • collection-name or collection-name-expression: Identifies the name of the MongoDB collection to use.

  • mongodb-factory: Reference to an instance of o.s.data.mongodb.MongoDbFactory

  • mongo-template: Reference to an instance of o.s.data.mongodb.core.MongoTemplate

  • Other attributes that are common across all others inbound adapters (such as 'channel').

不能同时设置 mongo-templatemongodb-factory

You cannot set both mongo-template and mongodb-factory.

前面的示例相对简单且静态,因为它有 query 的文本值并使用集合的默认名称。有时,你可能需要根据某些条件在运行时更改这些值。为此,请使用它们的 -expression 等效项(query-expressioncollection-name-expression),其中提供的表达式可以是任何有效的 SpEL 表达式。

The preceding example is relatively simple and static, since it has a literal value for the query and uses the default name for a collection. Sometimes, you may need to change those values at runtime, based on some condition. To do so, use their -expression equivalents (query-expression and collection-name-expression), where the provided expression can be any valid SpEL expression.

此外,你可能希望对从 MongoDB 读取的成功处理的数据执行一些后处理。例如;你可能想在处理完文档后将其移动或删除。你可以使用 Spring Integration 2.2 添加的事务同步功能来实现此目的,如下例所示:

Also, you may wish to do some post-processing to the successfully processed data that was read from the MongoDB. For example; you may want to move or remove a document after it has been processed. You can do so by using that transaction synchronization feature Spring Integration 2.2 added, as the following example shows:

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
    channel="replyChannel"
    query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
    entity-class="java.lang.Object"
    auto-startup="false">
        <int:poller fixed-rate="200" max-messages-per-poll="1">
            <int:transactional synchronization-factory="syncFactory"/>
        </int:poller>
</int-mongodb:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit
        expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
        channel="someChannel"/>
</int:transaction-synchronization-factory>

<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>

以下示例显示了前面示例中引用的 DocumentCleaner

The following example shows the DocumentCleaner referenced in the preceding example:

public class DocumentCleaner {
    public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
        if (target instanceof List<?> documents){
            for (Object document : documents) {
                mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
            }
        }
    }
}

可以通过使用 transactional 元素声明你的轮询器为事务性的。此元素可以引用一个真正的交易管理器(例如,如果你的流程的其他部分调用 JDBC)。如果没有 “real” 交易,则可以使用 o.s.i.transaction.PseudoTransactionManager 的实例,该实例实现了 Spring 的 PlatformTransactionManager,并且当没有实际交易时,可以使用 Mongo 适配器的事务同步功能。

You can declare your poller to be transactional by using the transactional element. This element can reference a real transaction manager (for example, if some other part of your flow invokes JDBC). If you do not have a “real” transaction, you can use an instance of o.s.i.transaction.PseudoTransactionManager, which is an implementation of Spring’s PlatformTransactionManager and enables the use of the transaction synchronization features of the Mongo adapter when there is no actual transaction.

这样做并不会使 MongoDB 本身具有事务性。它能够在成功(提交)之前或之后,或者在失败(回滚)之后执行操作的同步。

Doing so does not make MongoDB itself transactional. It lets the synchronization of actions be taken before or after success (commit) or after failure (rollback).

一旦你的轮询器变为事务性,你可以在 transactional 元素上设置 o.s.i.transaction.TransactionSynchronizationFactory 的一个实例。TransactionSynchronizationFactory 会创建一个 TransactionSynchronization 实例。为了你的方便,我们公开了一个默认的基于 SpEL 的 TransactionSynchronizationFactory,它允许你配置 SpEL 表达式,其执行与交易同步(协调)。支持提交前、提交后和回滚后事件的表达式,以及每个事件的通道,其中发送评估结果(如果存在的话)。对于每个子元素,你可以指定 expressionchannel 属性。如果仅存在 channel 属性,则接收到的消息作为特定同步方案的一部分发送到那里。如果仅存在 expression 属性,并且表达式的结果是一个非空值,则生成一条消息,其结果作为有效负载,并发送到默认通道(NullChannel),并显示在日志中(在 DEBUG 级别)。如果你希望评估结果转到特定通道,则添加一个 channel 属性。如果表达式的结果为空或无效,则不生成任何消息。

Once your poller is transactional, you can set an instance of the o.s.i.transaction.TransactionSynchronizationFactory on the transactional element. A TransactionSynchronizationFactory creates an instance of the TransactionSynchronization. For your convenience, we have exposed a default SpEL-based TransactionSynchronizationFactory that lets you configure SpEL expressions, with their execution being coordinated (synchronized) with a transaction. Expressions for before-commit, after-commit, and after-rollback events are supported, together with a channel for each event where the evaluation result (if any) is sent. For each child element, you can specify expression and channel attributes. If only the channel attribute is present, the received message is sent there as part of the particular synchronization scenario. If only the expression attribute is present and the result of an expression is a non-null value, a message with the result as the payload is generated and sent to a default channel (NullChannel) and appears in the logs (on the DEBUG level). If you want the evaluation result to go to a specific channel, add a channel attribute. If the result of an expression is null or void, no message is generated.

有关事务同步的更多信息,请参阅 Transaction Synchronization

For more information about transaction synchronization, see Transaction Synchronization.

从版本 5.5 开始,MongoDbMessageSource 可以通过 updateExpression 配置,该表达式必须求值为具有 MongoDb update 语法的 Stringorg.springframework.data.mongodb.core.query.Update 实例。它可以用作上述后处理过程的替代方法,并且它会修改从集合中提取的那些实体,以便在下一个轮询周期中不会再次从集合中提取它们(假设更新更改了查询中使用的一些值)。当集群中针对同一集合使用了多个 MongoDbMessageSource 实例时,仍然建议使用事务来实现执行隔离和数据一致性。

Starting with version 5.5, the MongoDbMessageSource can be configured with an updateExpression, which must evaluate to a String with the MongoDb update syntax or to an org.springframework.data.mongodb.core.query.Update instance. It can be used as an alternative to described above post-processing procedure, and it modifies those entities that were fetched from the collection, so they won’t be pulled from the collection again on the next polling cycle (assuming the update changes some value used in the query). It is still recommended to use transactions to achieve execution isolation and data consistency, when several instances of the MongoDbMessageSource for the same collection are used in the cluster.

MongoDB Change Stream Inbound Channel Adapter

从版本 5.3 开始,spring-integration-mongodb 模块引入了 MongoDbChangeStreamMessageProducer - Spring Data ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class) API 的响应式 MessageProducerSupport 实现。此组件会产生 bodyChangeStreamEvent 且默认情况下有负载和一些更改流相关头(参见 MongoHeaders)的 Flux 消息。建议将此 MongoDbChangeStreamMessageProducerFluxMessageChannel 结合用作 outputChannel,以便按需订阅并消耗下游事件。

Starting with version 5.3, the spring-integration-mongodb module introduces the MongoDbChangeStreamMessageProducer - a reactive MessageProducerSupport implementation for the Spring Data ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class) API. This component produces a Flux of messages with a body of ChangeStreamEvent as the payload by default and some change stream related headers (see MongoHeaders). It is recommended that this MongoDbChangeStreamMessageProducer is combined with a FluxMessageChannel as the outputChannel for on-demand subscription and event consumption downstream.

此通道适配器的 Java DSL 配置可能如下所示:

The Java DSL configuration for this channel adapter may look like this:

@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
    return IntegrationFlow.from(
            MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
                    .domainType(Person.class)
                    .collection("person")
                    .extractBody(false))
            .channel(MessageChannels.flux())
            .get();
}

MongoDbChangeStreamMessageProducer 停止,或下游订阅被取消,或 MongoDb 更改流产生 OperationType.INVALIDATE 时,Publisher 完成。通道适配器可以重新启动,并且创建源数据的 Publisher,并在 MessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>) 中自动订阅。如果需要从其他地方使用更改流事件,在开始之间,此通道适配器可以针对新选项重新配置。

When the MongoDbChangeStreamMessageProducer is stopped, or the subscription is cancelled downstream, or the MongoDb change stream produces an OperationType.INVALIDATE, the Publisher is completed. The channel adapter can be started again and a new Publisher of source data is created and it is automatically subscribed in the MessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>). This channel adapter can be reconfigured for new options between starts, if there is a requirement to consume change stream events from other places.

请参阅 Spring Data MongoDb documentation中关于更改流支持的更多信息。

See more information about change stream support in Spring Data MongoDb documentation.

MongoDB Outbound Channel Adapter

MongoDB 出站通道适配器允许您将消息有效负载写入 MongoDB 文档存储,如下例所示:

The MongoDB outbound channel adapter lets you write the message payload to a MongoDB document store, as the following example shows:

<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
	collection-name="myCollection"
	mongo-converter="mongoConverter"
	mongodb-factory="mongoDbFactory" />

如前面的配置所示,您可以使用 outbound-channel-adapter 元素配置 MongoDB 出站通道适配器,并为各种属性提供值,例如:

As the preceding configuration shows, you can configure a MongoDB outbound channel adapter by using the outbound-channel-adapter element, providing values for various attributes, such as:

  • collection-name or collection-name-expression: Identifies the name of the MongoDb collection to use.

  • mongo-converter: Reference to an instance of o.s.data.mongodb.core.convert.MongoConverter that assists with converting a raw Java object to a JSON document representation.

  • mongodb-factory: Reference to an instance of o.s.data.mongodb.MongoDbFactory.

  • mongo-template: Reference to an instance of o.s.data.mongodb.core.MongoTemplate. NOTE: you can not have both mongo-template and mongodb-factory set.

  • Other attributes that are common across all inbound adapters (such as 'channel').

前面的示例相对简单且静态,因为它为 collection-name 具有文字值。有时,您可能需要根据某些条件在运行时更改此值。为此,请使用 collection-name-expression,其中提供的表达式是任何有效的 SpEL 表达式。

The preceding example is relatively simple and static, since it has a literal value for the collection-name. Sometimes, you may need to change this value at runtime, based on some condition. To do that, use collection-name-expression, where the provided expression is any valid SpEL expression.

MongoDB Outbound Gateway

5.0 版引入了 MongoDB 出站网关。它允许您通过向请求通道发送消息来查询数据库。然后,网关会将响应发送到回复通道。您可以使用消息有效负载和头来指定查询和集合名称,如下例所示:

Version 5.0 introduced the MongoDB outbound gateway. It allows you query a database by sending a message to its request channel. The gateway then send the response to the reply channel. You can use the message payload and headers to specify the query and the collection name, as the following example shows:

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory;

    @Autowired
    private MongoConverter;


    @Bean
    public IntegrationFlow gatewaySingleQueryFlow() {
        return f -> f
                .handle(queryOutboundGateway())
                .channel(c -> c.queue("retrieveResults"));
    }

    private MongoDbOutboundGatewaySpec queryOutboundGateway() {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
                .query("{name : 'Bob'}")
                .collectionNameFunction(m -> m.getHeaders().get("collection"))
                .expectSingleResult(true)
                .entityClass(Person.class);
    }

}
class MongoDbKotlinApplication {

    fun main(args: Array<String>) = runApplication<MongoDbKotlinApplication>(*args)

    @Autowired
    lateinit var mongoDbFactory: MongoDatabaseFactory

    @Autowired
    lateinit var mongoConverter: MongoConverter

    @Bean
    fun gatewaySingleQueryFlow() =
    integrationFlow {
        handle(queryOutboundGateway())
        channel { queue("retrieveResults") }
    }

    private fun queryOutboundGateway(): MongoDbOutboundGatewaySpec {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .query("{name : 'Bob'}")
            .collectionNameFunction<Any> { m -> m.headers["collection"] as String }
            .expectSingleResult(true)
            .entityClass(Person::class.java)
    }

}
@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory mongoDbFactory;

    @Bean
    @ServiceActivator(inputChannel = "requestChannel")
    public MessageHandler mongoDbOutboundGateway() {
        MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory);
        gateway.setCollectionNameExpressionString("'myCollection'");
        gateway.setQueryExpressionString("'{''name'' : ''Bob''}'");
        gateway.setExpectSingleResult(true);
        gateway.setEntityClass(Person.class);
        gateway.setOutputChannelName("replyChannel");
        return gateway;
    }

    @Bean
    @ServiceActivator(inputChannel = "replyChannel")
    public MessageHandler handler() {
        return message -> System.out.println(message.getPayload());
    }
}
<int-mongodb:outbound-gateway id="gatewayQuery"
    mongodb-factory="mongoDbFactory"
    mongo-converter="mongoConverter"
    query="{firstName: 'Bob'}"
    collection-name="myCollection"
    request-channel="in"
    reply-channel="out"
    entity-class="org.springframework.integration.mongodb.test.entity$Person"/>

您可以在 MongoDB 出站网关中使用以下属性:

You can use the following attributes with a MongoDB outbound Gateway:

  • collection-name or collection-name-expression: Identifies the name of the MongoDB collection to use.

  • mongo-converter: Reference to an instance of o.s.data.mongodb.core.convert.MongoConverter that assists with converting a raw Java object to a JSON document representation.

  • mongodb-factory: Reference to an instance of o.s.data.mongodb.MongoDbFactory.

  • mongo-template: Reference to an instance of o.s.data.mongodb.core.MongoTemplate. NOTE: you can not set both mongo-template and mongodb-factory.

  • entity-class: The fully qualified name of the entity class to be passed to the find(..) and findOne(..) methods in MongoTemplate. If this attribute is not provided, the default value is org.bson.Document.

  • query or query-expression: Specifies the MongoDB query. See the MongoDB documentation for more query samples.

  • collection-callback: Reference to an instance of org.springframework.data.mongodb.core.CollectionCallback. Preferable an instance of o.s.i.mongodb.outbound.MessageCollectionCallback since 5.0.11 with the request message context. See its Javadocs for more information. NOTE: You can not have both collection-callback and any of the query attributes.

作为 queryquery-expression 属性的替代方案,您可以使用 collectionCallback 属性来指定其他数据库操作,作为 MessageCollectionCallback 函数式接口实现的引用。以下示例指定了计数操作:

As an alternate to the query and query-expression properties, you can specify other database operations by using the collectionCallback property as a reference to the MessageCollectionCallback functional interface implementation. The following example specifies a count operation:

private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
    return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .collectionCallback((collection, requestMessage) -> collection.count())
            .collectionName("myCollection");
}

MongoDB Reactive Channel Adapters

从版本 5.3 开始,提供了 ReactiveMongoDbStoringMessageHandlerReactiveMongoDbMessageSource 实现。它们基于 Spring Data 中的 ReactiveMongoOperations,需要 org.mongodb:mongodb-driver-reactivestreams 依赖项。

Starting with version 5.3, the ReactiveMongoDbStoringMessageHandler and ReactiveMongoDbMessageSource implementations are provided. They are based on the ReactiveMongoOperations from Spring Data and requires a org.mongodb:mongodb-driver-reactivestreams dependency.

`ReactiveMongoDbStoringMessageHandler`是 `ReactiveMessageHandler`的实现,当反应流组合涉及集成流定义时,该实现原生支持框架。在 ReactiveMessageHandler中查看更多信息。

The ReactiveMongoDbStoringMessageHandler is an implementation of the ReactiveMessageHandler which is supported natively in the framework when reactive streams composition is involved in the integration flow definition. See more information in the ReactiveMessageHandler.

从配置的角度来看,与许多其他标准通道适配器没有区别。例如,对于 Java DSL,这种通道适配器可以使用如下所示:

From configuration perspective there is no difference with many other standard channel adapters. For example with Java DSL such a channel adapter could be used like:

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
    return f -> f
            .channel(MessageChannels.flux())
            .handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}

在此示例中,我们将通过提供的 ReactiveMongoDatabaseFactory 连接到 MongoDb,并将请求消息中的数据存储到具有 data 名称的默认集合中。实际操作将根据内部创建的 ReactiveStreamsConsumer 中的响应流复合按需执行。

In this sample we are going to connect to the MongoDb via provided ReactiveMongoDatabaseFactory and store a data from request message into a default collection with the data name. The real operation is going to be performed on-demand from the reactive stream composition in the internally created ReactiveStreamsConsumer.

ReactiveMongoDbMessageSource 是基于提供的 ReactiveMongoDatabaseFactoryReactiveMongoOperations 和 MongoDb 查询(或表达式)的 AbstractMessageSource 实现,根据 expectSingleResult 选项调用 find()findOne() 操作,该选项具有一个 entityClass 类型,以转换查询结果。当对产生的消息有效负载中 Publisher (根据 expectSingleResult 选项为 FluxMono)进行订阅时,会按需执行查询和结果评估。当下游使用 splitter 和 FluxMessageChannel 时,框架可以自动订阅此类有效负载(本质上是 flatMap)。否则,由目标应用程序负责订阅下游端点中的投票发布者。

The ReactiveMongoDbMessageSource is an AbstractMessageSource implementation based on the provided ReactiveMongoDatabaseFactory or ReactiveMongoOperations and MongoDb query (or expression), calls find() or findOne() operation according an expectSingleResult option with an expected entityClass type to convert a query result. A query execution and result evaluation is performed on demand when Publisher (Flux or Mono according expectSingleResult option) in the payload of produced message is subscribed. The framework can subscribe to such a payload automatically (essentially flatMap) when splitter and FluxMessageChannel are used downstream. Otherwise, it is target application responsibility to subscribe into a polled publishers in downstream endpoints.

对于 Java DSL,这种通道适配器可以如下配置:

With Java DSL such a channel adapter could be configured like:

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
    return IntegrationFlow
            .from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
                            .entityClass(Person.class),
                    c -> c.poller(Pollers.fixedDelay(1000)))
            .split()
            .channel(c -> c.flux("output"))
            .get();
}

从 5.5 版本开始,`ReactiveMongoDbMessageSource`可以使用 `updateExpression`进行配置。它具有与阻塞 `MongoDbMessageSource`相同的功能。有关更多信息,请参阅 MongoDB Inbound Channel Adapter和 `AbstractMongoDbMessageSourceSpec`JavaDocs。

Starting with version 5.5, the ReactiveMongoDbMessageSource can be configured with an updateExpression. It has the same functionality as the blocking MongoDbMessageSource. See MongoDB Inbound Channel Adapter and AbstractMongoDbMessageSourceSpec JavaDocs for more information.