MongoDb Support
2.1 版本引入了对 MongoDB的支持:一个 “high-performance, open source, document-oriented database”。 你需要将此依赖项包含在你的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mongodb</artifactId>
<version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-mongodb:{project-version}"
有关下载、安装和运行 MongoDB 的信息,请参阅 MongoDB documentation。
Connecting to MongoDb
Blocking or Reactive?
从 5.3 版开始,Spring Integration 为反应式 MongoDB 驱动程序提供支持,以便在访问 MongoDB 时启用非阻塞 I/O。要启用反应式支持,请将 MongoDB 反应式流驱动程序添加到您的依赖项中:
-
Maven
-
Gradle
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-reactivestreams"
对于常规同步客户端,您需要将相应的驱动程序添加到依赖项中:
-
Maven
-
Gradle
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-sync"
这两个都是框架中的“可选”部分,以便更好地支持最终用户选择。
要开始与 MongoDB 交互,首先需要连接到 MongoDB。Spring Integration 基于另一个 Spring 项目提供的支持,即 Spring Data MongoDB。它提供称为 `MongoDatabaseFactory`和 `ReactiveMongoDatabaseFactory`的工厂类,它们简化了与 MongoDB 客户端 API 的集成。
Spring Data 默认提供了阻塞式 MongoDB 驱动,但您可以选择使用上述依赖项选择响应式用法。 |
Using MongoDatabaseFactory
要连接到 MongoDB,您可以使用 MongoDatabaseFactory
接口的实现。
以下示例展示了如何使用 SimpleMongoClientDatabaseFactory
:
-
Java
-
XML
MongoDatabaseFactory mongoDbFactory =
new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoClientDatabaseFactory">
<constructor-arg>
<bean class="com.mongodb.client.MongoClients" factory-method="create"/>
</constructor-arg>
<constructor-arg value="test"/>
</bean>
SimpleMongoClientDatabaseFactory`接受两个参数:一个 `MongoClient`实例和一个指定数据库名称的 `String
。如果您需要配置 host
、`port`等属性,则可以使用底层 `MongoClients`类提供的构造函数之一来传递它们。有关如何配置 MongoDB 的更多信息,请参阅 Spring-Data-MongoDB参考。
Using ReactiveMongoDatabaseFactory
要使用反应式驱动程序连接到 MongoDB,您可以使用 ReactiveMongoDatabaseFactory
接口的实现。
以下示例展示了如何使用 SimpleReactiveMongoDatabaseFactory
:
-
Java
-
XML
ReactiveMongoDatabaseFactory mongoDbFactory =
new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleReactiveMongoDatabaseFactory">
<constructor-arg>
<bean class="com.mongodb.reactivestreams.client.MongoClients" factory-method="create"/>
</constructor-arg>
<constructor-arg value="test"/>
</bean>
MongoDB Message Store
正如 Enterprise Integration Patterns(EIP)书中所述, Message Store可以让您持久化消息。在处理能够缓冲消息的组件时,这样做非常有用(例如 QueueChannel
、aggregator
、`resequencer`等),如果可靠性是一个问题。在 Spring Integration 中,`MessageStore`策略也为 claim check模式提供了基础,该模式也在 EIP 中进行了描述。
Spring Integration 的 MongoDB 模块提供了 MongoDbMessageStore
,它是 MessageStore
策略(主要用于索取凭证模式)和 MessageGroupStore
策略(主要用于聚合器和序列重建模式)的实现。
以下示例配置 MongoDbMessageStore
以使用 QueueChannel
和 aggregator
:
<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
<constructor-arg ref="mongoDbFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="mongoDbMessageStore"/>
<int:channel>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="mongoDbMessageStore"/>
前面的示例是一个简单的 bean 配置,并且需要 MongoDbFactory
作为构造器参数。
MongoDbMessageStore
使用 Spring Data Mongo 映射机制,将 Message
作为 Mongo 文档扩展,其中包含所有嵌套属性。当你需要访问 payload
或 headers
以进行审计或分析时,它很有用——例如,针对存储的消息。
MongoDbMessageStore
使用自定义 MappingMongoConverter
实现来将 Message
实例存储为 MongoDB 文档,并且 Message
的属性(payload
和 header
值)存在一些限制。
从版本 5.1.6 开始,MongoDbMessageStore
可以使用自定义转换器配置,这些转换器被传播到内部的 MappingMongoConverter
实现中。有关更多信息,请参阅 MongoDbMessageStore.setCustomConverters(Object… customConverters)
JavaDocs。
Spring Integration 3.0 引入了 ConfigurableMongoDbMessageStore
。它同时实现了 MessageStore
和 MessageGroupStore
接口。此类可以作为构造器参数接收 MongoTemplate
,例如,你可以使用它配置自定义 WriteConcern
。另一个构造器需要 MappingMongoConverter
和 MongoDbFactory
,它允许你为 Message
实例及其属性提供一些自定义转换。请注意,默认情况下,ConfigurableMongoDbMessageStore
使用标准 Java 序列化将 Message
实例写入和读出 MongoDB(请参阅 MongoDbMessageBytesConverter
),并依赖于 MongoTemplate
中其他属性的默认值。它从提供的 MongoDbFactory
和 MappingMongoConverter
构建一个 MongoTemplate
。ConfigurableMongoDbMessageStore
存储的集合的默认名称为 configurableStoreMessages
。我们建议使用此实现创建健壮且灵活的解决方案,当消息包含复杂数据类型时。
从版本 6.0.8 开始,AbstractConfigurableMongoDbMessageStore
提供了 setCreateIndexes(boolean)
(默认为 true
)选项,可用于禁用自动索引创建。以下示例演示如何声明 bean 并禁用自动索引创建:
@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory) {
MongoDbChannelMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
mongoDbChannelMessageStore.setCreateIndexes(false);
return mongoDbChannelMessageStore;
}
MongoDB Channel Message Store
版本 4.0 引入了新的 MongoDbChannelMessageStore
。它是一个经过优化的 MessageGroupStore
,用于 QueueChannel
实例中。使用 priorityEnabled = true
,你可以在 <int:priority-queue>
实例中使用它,为持久化消息实现优先顺序轮询。优先级 MongoDB 文档字段从 IntegrationMessageHeaderAccessor.PRIORITY
(priority
)消息头填充。
此外,所有 MongoDB MessageStore
实例现在都有一个用于 MessageGroup
文档的 sequence
字段。sequence
值是同一集合中一个简单 sequence
文档的 $inc
操作的结果,该集合按需创建。sequence
字段用于 poll
操作,以便在消息存储在同一毫秒内时提供先进先出 (FIFO) 消息顺序(如果已配置,则按优先级)。
我们不建议将相同的 |
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
<constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="store"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
Using AbstractConfigurableMongoDbMessageStore with auto index creation disable
从版本 6.0.8 开始,AbstractConfigurableMongoDbMessageStore
实现了一个 setCreateIndex(boolean)
,它可用于禁用或启用(默认)自动索引创建。以下示例演示如何声明 bean 并禁用自动索引创建:
@Bean
public AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory)
{
AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
mongoDbChannelMessageStore.setCreateIndex(false);
return mongoDbChannelMessageStore;
}
MongoDB Metadata Store
Spring Integration 4.2 引入了基于 MongoDB 的新 MetadataStore
(请参阅 Metadata Store)实现。您可以使用 MongoDbMetadataStore
在应用程序重新启动期间维护元数据状态。您可以结合使用此新的 MetadataStore
实现和下列适配器等:
要指示这些适配器使用新的 MongoDbMetadataStore
,请声明一个 bean 名称为 metadataStore
的 Spring bean。Feed 入站通道适配器自动拾取并使用声明的 MongoDbMetadataStore
。以下示例演示如何声明名称为 metadataStore
的 bean:
@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}
MongoDbMetadataStore
还实现了 ConcurrentMetadataStore
,使其可以在多个应用程序实例之间可靠地共享,其中只允许一个实例存储或修改键的值。由于 MongoDB 保证,所有这些操作都是原子的。
MongoDB Inbound Channel Adapter
MongoDB 入站通道适配器是一个轮询消费者,它从 MongoDB 读取数据并将其作为 Message
有效负载发送。以下示例演示如何配置 MongoDB 入站通道适配器:
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
channel="replyChannel"
query="{'name' : 'Bob'}"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>
如前所述的配置所示,你可以使用 inbound-channel-adapter
元素并为各种属性提供值来配置 MongoDb 入站通道适配器,例如:
-
query
:JSON 查询(参见 MongoDB Querying) -
query-expression
:评估为 JSON 查询字符串(例如上面的query
属性)或o.s.data.mongodb.core.query.Query
实例的 SpEL 表达式。与query
属性互斥。 -
entity-class
:有效负载对象的类型。如果不提供,则返回com.mongodb.DBObject
。 -
collection-name
或collection-name-expression
:标识要使用的 MongoDB 集合的名称。 -
mongodb-factory
:引用o.s.data.mongodb.MongoDbFactory
的实例 -
mongo-template
:引用o.s.data.mongodb.core.MongoTemplate
的实例 -
所有其他入站适配器中常见的其他属性(例如“channel”)。
不能同时设置 |
前面的示例相对简单且静态,因为它有 query
的文本值并使用集合的默认名称。有时,你可能需要根据某些条件在运行时更改这些值。为此,请使用它们的 -expression
等效项(query-expression
和 collection-name-expression
),其中提供的表达式可以是任何有效的 SpEL 表达式。
此外,你可能希望对从 MongoDB 读取的成功处理的数据执行一些后处理。例如;你可能想在处理完文档后将其移动或删除。你可以使用 Spring Integration 2.2 添加的事务同步功能来实现此目的,如下例所示:
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
channel="replyChannel"
query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="200" max-messages-per-poll="1">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-mongodb:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit
expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
channel="someChannel"/>
</int:transaction-synchronization-factory>
<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
以下示例显示了前面示例中引用的 DocumentCleaner
:
public class DocumentCleaner {
public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
if (target instanceof List<?> documents){
for (Object document : documents) {
mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
}
}
}
}
可以通过使用 transactional
元素声明你的轮询器为事务性的。此元素可以引用一个真正的交易管理器(例如,如果你的流程的其他部分调用 JDBC)。如果没有 “real” 交易,则可以使用 o.s.i.transaction.PseudoTransactionManager
的实例,该实例实现了 Spring 的 PlatformTransactionManager
,并且当没有实际交易时,可以使用 Mongo 适配器的事务同步功能。
这样做并不会使 MongoDB 本身具有事务性。它能够在成功(提交)之前或之后,或者在失败(回滚)之后执行操作的同步。
一旦你的轮询器变为事务性,你可以在 transactional
元素上设置 o.s.i.transaction.TransactionSynchronizationFactory
的一个实例。TransactionSynchronizationFactory
会创建一个 TransactionSynchronization
实例。为了你的方便,我们公开了一个默认的基于 SpEL 的 TransactionSynchronizationFactory
,它允许你配置 SpEL 表达式,其执行与交易同步(协调)。支持提交前、提交后和回滚后事件的表达式,以及每个事件的通道,其中发送评估结果(如果存在的话)。对于每个子元素,你可以指定 expression
和 channel
属性。如果仅存在 channel
属性,则接收到的消息作为特定同步方案的一部分发送到那里。如果仅存在 expression
属性,并且表达式的结果是一个非空值,则生成一条消息,其结果作为有效负载,并发送到默认通道(NullChannel
),并显示在日志中(在 DEBUG
级别)。如果你希望评估结果转到特定通道,则添加一个 channel
属性。如果表达式的结果为空或无效,则不生成任何消息。
有关事务同步的更多信息,请参阅 Transaction Synchronization。
从版本 5.5 开始,MongoDbMessageSource
可以通过 updateExpression
配置,该表达式必须求值为具有 MongoDb update
语法的 String
或 org.springframework.data.mongodb.core.query.Update
实例。它可以用作上述后处理过程的替代方法,并且它会修改从集合中提取的那些实体,以便在下一个轮询周期中不会再次从集合中提取它们(假设更新更改了查询中使用的一些值)。当集群中针对同一集合使用了多个 MongoDbMessageSource
实例时,仍然建议使用事务来实现执行隔离和数据一致性。
MongoDB Change Stream Inbound Channel Adapter
从版本 5.3 开始,spring-integration-mongodb
模块引入了 MongoDbChangeStreamMessageProducer
- Spring Data ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class)
API 的响应式 MessageProducerSupport
实现。此组件会产生 body
为 ChangeStreamEvent
且默认情况下有负载和一些更改流相关头(参见 MongoHeaders
)的 Flux
消息。建议将此 MongoDbChangeStreamMessageProducer
与 FluxMessageChannel
结合用作 outputChannel
,以便按需订阅并消耗下游事件。
此通道适配器的 Java DSL 配置可能如下所示:
@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
return IntegrationFlow.from(
MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
.domainType(Person.class)
.collection("person")
.extractBody(false))
.channel(MessageChannels.flux())
.get();
}
当 MongoDbChangeStreamMessageProducer
停止,或下游订阅被取消,或 MongoDb 更改流产生 OperationType.INVALIDATE
时,Publisher
完成。通道适配器可以重新启动,并且创建源数据的 Publisher
,并在 MessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>)
中自动订阅。如果需要从其他地方使用更改流事件,在开始之间,此通道适配器可以针对新选项重新配置。
请参阅 Spring Data MongoDb documentation中关于更改流支持的更多信息。
MongoDB Outbound Channel Adapter
MongoDB 出站通道适配器允许您将消息有效负载写入 MongoDB 文档存储,如下例所示:
<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
collection-name="myCollection"
mongo-converter="mongoConverter"
mongodb-factory="mongoDbFactory" />
如前面的配置所示,您可以使用 outbound-channel-adapter
元素配置 MongoDB 出站通道适配器,并为各种属性提供值,例如:
-
collection-name
或collection-name-expression
:标识要使用的 MongoDb 集合的名称。 -
mongo-converter
:引用可帮助将原始 Java 对象转换为 JSON 文档表示形式的o.s.data.mongodb.core.convert.MongoConverter
实例。 -
mongodb-factory
:引用o.s.data.mongodb.MongoDbFactory
的实例。 -
mongo-template
:引用o.s.data.mongodb.core.MongoTemplate
的实例。注意:您不能同时设置 mongo-template 和 mongodb-factory。 -
所有入站适配器中常见的其他属性(例如“channel”)。
前面的示例相对简单且静态,因为它为 collection-name
具有文字值。有时,您可能需要根据某些条件在运行时更改此值。为此,请使用 collection-name-expression
,其中提供的表达式是任何有效的 SpEL 表达式。
MongoDB Outbound Gateway
5.0 版引入了 MongoDB 出站网关。它允许您通过向请求通道发送消息来查询数据库。然后,网关会将响应发送到回复通道。您可以使用消息有效负载和头来指定查询和集合名称,如下例所示:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@SpringBootApplication
public class MongoDbJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MongoDbJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private MongoDbFactory;
@Autowired
private MongoConverter;
@Bean
public IntegrationFlow gatewaySingleQueryFlow() {
return f -> f
.handle(queryOutboundGateway())
.channel(c -> c.queue("retrieveResults"));
}
private MongoDbOutboundGatewaySpec queryOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.query("{name : 'Bob'}")
.collectionNameFunction(m -> m.getHeaders().get("collection"))
.expectSingleResult(true)
.entityClass(Person.class);
}
}
class MongoDbKotlinApplication {
fun main(args: Array<String>) = runApplication<MongoDbKotlinApplication>(*args)
@Autowired
lateinit var mongoDbFactory: MongoDatabaseFactory
@Autowired
lateinit var mongoConverter: MongoConverter
@Bean
fun gatewaySingleQueryFlow() =
integrationFlow {
handle(queryOutboundGateway())
channel { queue("retrieveResults") }
}
private fun queryOutboundGateway(): MongoDbOutboundGatewaySpec {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.query("{name : 'Bob'}")
.collectionNameFunction<Any> { m -> m.headers["collection"] as String }
.expectSingleResult(true)
.entityClass(Person::class.java)
}
}
@SpringBootApplication
public class MongoDbJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MongoDbJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private MongoDbFactory mongoDbFactory;
@Bean
@ServiceActivator(inputChannel = "requestChannel")
public MessageHandler mongoDbOutboundGateway() {
MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory);
gateway.setCollectionNameExpressionString("'myCollection'");
gateway.setQueryExpressionString("'{''name'' : ''Bob''}'");
gateway.setExpectSingleResult(true);
gateway.setEntityClass(Person.class);
gateway.setOutputChannelName("replyChannel");
return gateway;
}
@Bean
@ServiceActivator(inputChannel = "replyChannel")
public MessageHandler handler() {
return message -> System.out.println(message.getPayload());
}
}
<int-mongodb:outbound-gateway id="gatewayQuery"
mongodb-factory="mongoDbFactory"
mongo-converter="mongoConverter"
query="{firstName: 'Bob'}"
collection-name="myCollection"
request-channel="in"
reply-channel="out"
entity-class="org.springframework.integration.mongodb.test.entity$Person"/>
您可以在 MongoDB 出站网关中使用以下属性:
-
collection-name
或collection-name-expression
:标识要使用的 MongoDB 集合的名称。 -
mongo-converter
:引用可帮助将原始 Java 对象转换为 JSON 文档表示形式的o.s.data.mongodb.core.convert.MongoConverter
实例。 -
mongodb-factory
:引用o.s.data.mongodb.MongoDbFactory
的实例。 -
mongo-template
: 引用o.s.data.mongodb.core.MongoTemplate
的实例。注意:你不能同时设置mongo-template
和mongodb-factory
。 -
entity-class
: 在 MongoTemplate 中传递给find(..)
和findOne(..)
方法的实体类的完全限定名称。如果未提供此属性,则默认值为org.bson.Document
。 -
query
或query-expression
: 指定 MongoDB 查询。有关更多查询示例,请参见 MongoDB documentation。 -
collection-callback
: 引用org.springframework.data.mongodb.core.CollectionCallback
的实例。自 5.0.11 起,优先使用o.s.i.mongodb.outbound.MessageCollectionCallback
的实例,它带有请求消息上下文。有关更多信息,请参阅其 Javadoc。注意:你不能同时拥有collection-callback
和任何查询属性。
作为 query
和 query-expression
属性的替代方案,您可以使用 collectionCallback
属性来指定其他数据库操作,作为 MessageCollectionCallback
函数式接口实现的引用。以下示例指定了计数操作:
private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.collectionCallback((collection, requestMessage) -> collection.count())
.collectionName("myCollection");
}
MongoDB Reactive Channel Adapters
从版本 5.3 开始,提供了 ReactiveMongoDbStoringMessageHandler
和 ReactiveMongoDbMessageSource
实现。它们基于 Spring Data 中的 ReactiveMongoOperations
,需要 org.mongodb:mongodb-driver-reactivestreams
依赖项。
`ReactiveMongoDbStoringMessageHandler`是 `ReactiveMessageHandler`的实现,当反应流组合涉及集成流定义时,该实现原生支持框架。在 ReactiveMessageHandler中查看更多信息。
从配置的角度来看,与许多其他标准通道适配器没有区别。例如,对于 Java DSL,这种通道适配器可以使用如下所示:
@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return f -> f
.channel(MessageChannels.flux())
.handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}
在此示例中,我们将通过提供的 ReactiveMongoDatabaseFactory
连接到 MongoDb,并将请求消息中的数据存储到具有 data
名称的默认集合中。实际操作将根据内部创建的 ReactiveStreamsConsumer
中的响应流复合按需执行。
ReactiveMongoDbMessageSource
是基于提供的 ReactiveMongoDatabaseFactory
或 ReactiveMongoOperations
和 MongoDb 查询(或表达式)的 AbstractMessageSource
实现,根据 expectSingleResult
选项调用 find()
或 findOne()
操作,该选项具有一个 entityClass
类型,以转换查询结果。当对产生的消息有效负载中 Publisher
(根据 expectSingleResult
选项为 Flux
或 Mono
)进行订阅时,会按需执行查询和结果评估。当下游使用 splitter 和 FluxMessageChannel
时,框架可以自动订阅此类有效负载(本质上是 flatMap
)。否则,由目标应用程序负责订阅下游端点中的投票发布者。
对于 Java DSL,这种通道适配器可以如下配置:
@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return IntegrationFlow
.from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
.entityClass(Person.class),
c -> c.poller(Pollers.fixedDelay(1000)))
.split()
.channel(c -> c.flux("output"))
.get();
}
从 5.5 版本开始,`ReactiveMongoDbMessageSource`可以使用 `updateExpression`进行配置。它具有与阻塞 `MongoDbMessageSource`相同的功能。有关更多信息,请参阅 MongoDB Inbound Channel Adapter和 `AbstractMongoDbMessageSourceSpec`JavaDocs。