Message Channels

除了带有 EIP 方法的 IntegrationFlowBuilder 之外,Java DSL 还提供了一个流畅的 API 来配置 MessageChannel 实例。为此,提供了 MessageChannels 生成器工厂。以下示例显示了如何使用它:

@Bean
public PriorityChannelSpec priorityChannel() {
    return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
                        .interceptor(wireTap());
}

相同的 MessageChannels`生成器工厂可用于 `channel()`EIP 方法从 `IntegrationFlowBuilder`布线端点,类似于在 XML 配置中布线 `input-channel/output-channel`对。默认情况下,端点使用 `DirectChannel`实例进行布线,其中 bean 名称基于以下模式:[IntegrationFlow.beanName].channel#。该规则也适用于通过内联 `MessageChannels`生成器工厂用​​法生成的未命名通道。但是,所有 `MessageChannels`方法都有一个变体,它知道您可以用来设置 `MessageChannel`实例的 bean 名称的 `channelId。`MessageChannel`引用和 `beanName`可用作 bean 方法调用。以下示例显示了使用 `channel()`EIP 方法的可能方式:

@Bean
public QueueChannelSpec queueChannel() {
    return MessageChannels.queue();
}

@Bean
public PublishSubscribeChannelSpec<?> publishSubscribe() {
    return MessageChannels.publishSubscribe();
}

@Bean
public IntegrationFlow channelFlow() {
    return IntegrationFlow.from("input")
                .fixedSubscriberChannel()
                .channel("queueChannel")
                .channel(publishSubscribe())
                .channel(MessageChannels.executor("executorChannel", this.taskExecutor))
                .channel("output")
                .get();
}
  • from("input") 表示:“找到并使用 ID 为“input”的 MessageChannel,或者创建一个”。

  • fixedSubscriberChannel() 生成 FixedSubscriberChannel 实例并将它使用名为 channelFlow.channel#0 注册。

  • channel("queueChannel") 以相同方式工作,但使用已有的 queueChannel bean。

  • channel(publishSubscribe()) 是 bean-method 引用。

  • channel(MessageChannels.executor("executorChannel", this.taskExecutor))IntegrationFlowBuilder,它向 ExecutorChannel 暴露 IntegrationComponentSpec,并将其注册为 executorChannel

  • channel("output")DirectChannel bean 注册为 output,作为其名称,只要不存在名称为该名称的任何 bean。

注意:前面的 IntegrationFlow 定义有效,并且其所有通道都使用 BridgeHandler 实例应用到端点。

请务必通过 MessageChannels 工厂从不同的 IntegrationFlow 实例使用相同的内联通道定义。即使 DSL 解析器将不存在的对象注册为 Bean,它也无法从不同的 IntegrationFlow 容器确定相同对象 (MessageChannel)。以下示例有误:

@Bean
public IntegrationFlow startFlow() {
    return IntegrationFlow.from("input")
                .transform(...)
                .channel(MessageChannels.queue("queueChannel"))
                .get();
}

@Bean
public IntegrationFlow endFlow() {
    return IntegrationFlow.from(MessageChannels.queue("queueChannel"))
                .handle(...)
                .get();
}

该错误示例的结果是以下异常:

Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
     there is already object [queueChannel] bound
	    at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)

为了让它正常工作,您需要为该通道声明 @Bean,并从不同的 IntegrationFlow 实例使用其 bean 方法。