Message Channels
除了带有 EIP 方法的 IntegrationFlowBuilder
之外,Java DSL 还提供了一个流畅的 API 来配置 MessageChannel
实例。为此,提供了 MessageChannels
生成器工厂。以下示例显示了如何使用它:
In addition to the IntegrationFlowBuilder
with EIP methods, the Java DSL provides a fluent API to configure MessageChannel
instances.
For this purpose the MessageChannels
builder factory is provided.
The following example shows how to use it:
@Bean
public PriorityChannelSpec priorityChannel() {
return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
.interceptor(wireTap());
}
相同的 MessageChannels`生成器工厂可用于 `channel()`EIP 方法从 `IntegrationFlowBuilder`布线端点,类似于在 XML 配置中布线 `input-channel
/output-channel`对。默认情况下,端点使用 `DirectChannel`实例进行布线,其中 bean 名称基于以下模式:
[IntegrationFlow.beanName].channel#。该规则也适用于通过内联 `MessageChannels`生成器工厂用法生成的未命名通道。但是,所有 `MessageChannels`方法都有一个变体,它知道您可以用来设置 `MessageChannel`实例的 bean 名称的 `channelId
。`MessageChannel`引用和 `beanName`可用作 bean 方法调用。以下示例显示了使用 `channel()`EIP 方法的可能方式:
The same MessageChannels
builder factory can be used in the channel()
EIP method from IntegrationFlowBuilder
to wire endpoints, similar to wiring an input-channel
/output-channel
pair in the XML configuration.
By default, endpoints are wired with DirectChannel
instances where the bean name is based on the following pattern: [IntegrationFlow.beanName].channel#[channelNameIndex]
.
This rule is also applied for unnamed channels produced by inline MessageChannels
builder factory usage.
However, all MessageChannels
methods have a variant that is aware of the channelId
that you can use to set the bean names for MessageChannel
instances.
The MessageChannel
references and beanName
can be used as bean-method invocations.
The following example shows the possible ways to use the channel()
EIP method:
@Bean
public QueueChannelSpec queueChannel() {
return MessageChannels.queue();
}
@Bean
public PublishSubscribeChannelSpec<?> publishSubscribe() {
return MessageChannels.publishSubscribe();
}
@Bean
public IntegrationFlow channelFlow() {
return IntegrationFlow.from("input")
.fixedSubscriberChannel()
.channel("queueChannel")
.channel(publishSubscribe())
.channel(MessageChannels.executor("executorChannel", this.taskExecutor))
.channel("output")
.get();
}
-
from("input")
means "'find and use theMessageChannel
with the "input" id, or create one'". -
fixedSubscriberChannel()
produces an instance ofFixedSubscriberChannel
and registers it with a name ofchannelFlow.channel#0
. -
channel("queueChannel")
works the same way but uses an existingqueueChannel
bean. -
channel(publishSubscribe())
is the bean-method reference. -
channel(MessageChannels.executor("executorChannel", this.taskExecutor))
is theIntegrationFlowBuilder
that exposesIntegrationComponentSpec
to theExecutorChannel
and registers it asexecutorChannel
. -
channel("output")
registers theDirectChannel
bean withoutput
as its name, as long as no beans with this name already exist.
注意:前面的 IntegrationFlow
定义有效,并且其所有通道都使用 BridgeHandler
实例应用到端点。
Note: The preceding IntegrationFlow
definition is valid, and all of its channels are applied to endpoints with BridgeHandler
instances.
请务必通过 MessageChannels
工厂从不同的 IntegrationFlow
实例使用相同的内联通道定义。即使 DSL 解析器将不存在的对象注册为 Bean,它也无法从不同的 IntegrationFlow
容器确定相同对象 (MessageChannel
)。以下示例有误:
Be careful to use the same inline channel definition through MessageChannels
factory from different IntegrationFlow
instances.
Even if the DSL parser registers non-existent objects as beans, it cannot determine the same object (MessageChannel
) from different IntegrationFlow
containers.
The following example is wrong:
@Bean
public IntegrationFlow startFlow() {
return IntegrationFlow.from("input")
.transform(...)
.channel(MessageChannels.queue("queueChannel"))
.get();
}
@Bean
public IntegrationFlow endFlow() {
return IntegrationFlow.from(MessageChannels.queue("queueChannel"))
.handle(...)
.get();
}
该错误示例的结果是以下异常:
The result of that bad example is the following exception:
Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
there is already object [queueChannel] bound
at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)
为了让它正常工作,您需要为该通道声明 @Bean
,并从不同的 IntegrationFlow
实例使用其 bean 方法。
To make it work, you need to declare @Bean
for that channel and use its bean method from different IntegrationFlow
instances.