Inbound Channel Adapters
通常,消息流从入站通道适配器开始(例如 <int-jdbc:inbound-channel-adapter>
)。适配器使用 <poller>
配置,它要求一个 MessageSource<?>
定期生成消息。Java DSL 允许从 MessageSource<?>
启动 IntegrationFlow
。为此,IntegrationFlow
fluent API 提供了一个重载 IntegrationFlow.from(MessageSource<?> messageSource)
方法。你可以将 MessageSource<?>
配置为 bean,并将其作为该方法的参数提供。IntegrationFlow.from()
的第二个参数是 Consumer<SourcePollingChannelAdapterSpec>
lambda,它允许你为 SourcePollingChannelAdapter
提供选项(例如 PollerMetadata
或 SmartLifecycle
)。以下示例展示了如何使用 fluent API 和 lambda 来创建 IntegrationFlow
:
Typically, message flows start from an inbound channel adapter (such as <int-jdbc:inbound-channel-adapter>
).
The adapter is configured with <poller>
, and it asks a MessageSource<?>
to periodically produce messages.
Java DSL allows for starting IntegrationFlow
from a MessageSource<?>
, too.
For this purpose, the IntegrationFlow
fluent API provides an overloaded IntegrationFlow.from(MessageSource<?> messageSource)
method.
You can configure the MessageSource<?>
as a bean and provide it as an argument for that method.
The second parameter of IntegrationFlow.from()
is a Consumer<SourcePollingChannelAdapterSpec>
lambda that lets you provide options (such as PollerMetadata
or SmartLifecycle
) for the SourcePollingChannelAdapter
.
The following example shows how to use the fluent API and a lambda to create an IntegrationFlow
:
@Bean
public MessageSource<Object> jdbcMessageSource() {
return new JdbcPollingChannelAdapter(this.dataSource, "SELECT * FROM something");
}
@Bean
public IntegrationFlow pollingFlow() {
return IntegrationFlow.from(jdbcMessageSource(),
c -> c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1)))
.transform(Transformers.toJson())
.channel("furtherProcessChannel")
.get();
}
对于那些不需要直接构建 Message
对象的情况,你可以使用基于 java.util.function.Supplier
的 IntegrationFlow.fromSupplier()
变体。Supplier.get()
的结果将自动包装进一个 Message
(如果它还没有是一个 Message
的话)。
For those cases that have no requirements to build Message
objects directly, you can use a IntegrationFlow.fromSupplier()
variant that is based on the java.util.function.Supplier
.
The result of the Supplier.get()
is automatically wrapped in a Message
(if it is not already a Message
).