Sub-flows support

一些 if…​elsepublish-subscribe 组件提供了使用子流程指定其逻辑或映射的功能。最简单的示例是 .publishSubscribeChannel(), 如下例所示:

Some of if…​else and publish-subscribe components provide the ability to specify their logic or mapping by using sub-flows. The simplest sample is .publishSubscribeChannel(), as the following example shows:

@Bean
public IntegrationFlow subscribersFlow() {
    return flow -> flow
            .publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p / 2)
                            .channel(c -> c.queue("subscriber1Results")))
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p * 2)
                            .channel(c -> c.queue("subscriber2Results"))))
            .<Integer>handle((p, h) -> p * 3)
            .channel(c -> c.queue("subscriber3Results"));
}

可以使用单独的 IntegrationFlow @Bean 定义实现相同的结果,但我们希望你发现这种子流程逻辑组合样式很有用。我们发现它会导致代码更短(因此更具可读性)。

You can achieve the same result with separate IntegrationFlow @Bean definitions, but we hope you find the sub-flow style of logic composition useful. We find that it results in shorter (and so more readable) code.

从 5.3 版本开始,提供了基于 BroadcastCapableChannelpublishSubscribeChannel() 实现,以在代理支持的消息通道上配置子流程订阅者。例如,我们现在可以将多个订阅者配置为 Jms.publishSubscribeChannel() 上的子流程:

Starting with version 5.3, a BroadcastCapableChannel-based publishSubscribeChannel() implementation is provided to configure sub-flow subscribers on broker-backed message channels. For example, we now can configure several subscribers as sub-flows on the Jms.publishSubscribeChannel():

@Bean
public JmsPublishSubscribeMessageChannelSpec jmsPublishSubscribeChannel() {
    return Jms.publishSubscribeChannel(jmsConnectionFactory())
                .destination("pubsub");
}

@Bean
public IntegrationFlow pubSubFlow(BroadcastCapableChannel jmsPublishSubscribeChannel) {
    return f -> f
            .publishSubscribeChannel(jmsPublishSubscribeChannel,
                    pubsub -> pubsub
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel1")))
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}

类似的 publish-subscribe 子流程组合提供了 .routeToRecipients() 方法。

A similar publish-subscribe sub-flow composition provides the .routeToRecipients() method.

另一个示例是在 .filter() 方法中使用 .discardFlow() 而不是 .discardChannel()

Another example is using .discardFlow() instead of .discardChannel() on the .filter() method.

.route() 值得特别关注。考虑以下示例:

The .route() deserves special attention. Consider the following example:

@Bean
public IntegrationFlow routeFlow() {
    return f -> f
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.channelMapping("true", "evenChannel")
                            .subFlowMapping("false", sf ->
                                    sf.<Integer>handle((p, h) -> p * 3)))
            .transform(Object::toString)
            .channel(c -> c.queue("oddChannel"));
}

.channelMapping() 继续像在常规 Router 映射中一样工作,但 .subFlowMapping() 将该子流程绑定到主流程。换言之,任何路由器的子流程都将在 .route() 之后返回到主流程。

The .channelMapping() continues to work as it does in regular Router mapping, but the .subFlowMapping() tied that sub-flow to the main flow. In other words, any router’s sub-flow returns to the main flow after .route().

有时,你需要从 .subFlowMapping() 引用现有的 IntegrationFlow @Bean。以下示例演示了如何执行此操作:

Sometimes, you need to refer to an existing IntegrationFlow @Bean from the .subFlowMapping(). The following example shows how to do so:

@Bean
public IntegrationFlow splitRouteAggregate() {
    return f -> f
            .split()
            .<Integer, Boolean>route(o -> o % 2 == 0,
                    m -> m
                            .subFlowMapping(true, oddFlow())
                            .subFlowMapping(false, sf -> sf.gateway(evenFlow())))
            .aggregate();
}

@Bean
public IntegrationFlow oddFlow() {
    return f -> f.handle(m -> System.out.println("odd"));
}

@Bean
public IntegrationFlow evenFlow() {
    return f -> f.handle((p, h) -> "even");
}

在这种情况下,当你需要从该子流程接收回复并继续主流程时,此 IntegrationFlow Bean 引用(或其输入通道)必须用 .gateway() 包装,如前一个示例所示。前一个示例中的 oddFlow() 引用没有包装到 .gateway()。因此,我们不希望从此路由分支获得回复。否则,你最终会遇到类似于以下内容的异常:

In this case, when you need to receive a reply from such a sub-flow and continue the main flow, this IntegrationFlow bean reference (or its input channel) has to be wrapped with a .gateway() as shown in the preceding example. The oddFlow() reference in the preceding example is not wrapped to the .gateway(). Therefore, we do not expect a reply from this routing branch. Otherwise, you end up with an exception similar to the following: Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c) is a one-way 'MessageHandler' and it isn’t appropriate to configure 'outputChannel'. This is the end of the integration flow.

当你将子流程配置为 lambda 时,框架将处理与子流程的请求-回复交互,并且不需要网关。

When you configure a sub-flow as a lambda, the framework handles the request-reply interaction with the sub-flow and a gateway is not needed.

子流程可以嵌套到任何深度,但我们不建议这样做。事实上,即使在路由器的情况下,在流程中添加复杂的子流程也会很快看起来像一盘意大利面,并且对于人类来说很难解析。

Sub-flows can be nested to any depth, but we do not recommend doing so. In fact, even in the router case, adding complex sub-flows within a flow would quickly begin to look like a plate of spaghetti and be difficult for a human to parse.

在 DSL 支持子流程配置的情况下,当组件配置通常需要通道并且该子流程以 channel() 元素开头时,框架会在组件输出通道和流程的输入通道之间隐式放置一个 bridge()。例如,在此 filter 定义中:

In cases where the DSL supports a subflow configuration, when a channel is normally needed for the component being configured, and that subflow starts with a channel() element, the framework implicitly places a bridge() between the component output channel and the flow’s input channel. For example, in this filter definition:

.filter(p -> p instanceof String, e -> e
	.discardFlow(df -> df
                         .channel(MessageChannels.queue())
                         ...)

框架在内部创建一个 DirectChannel Bean 以注入到 MessageFilter.discardChannel 中。然后它将子流程包装到从这个隐式通道开始的 IntegrationFlow 中,以便订阅,并将一个 bridge 放在流程中指定的 channel() 之前。当现有的 IntegrationFlow Bean 被用作子流程引用(而不是内联子流程,例如 lambda)时,不需要这样的桥,因为框架可以从流程 Bean 中解析第一个通道。对于内联子流程,输入通道尚未可用。

the Framework internally creates a DirectChannel bean for injecting into the MessageFilter.discardChannel. Then it wraps the subflow into an IntegrationFlow starting with this implicit channel for the subscription and places a bridge before the channel() specified in the flow. When an existing IntegrationFlow bean is used as a subflow reference (instead of an inline subflow, e.g. a lambda), there is no such bridge required because the framework can resolve the first channel from the flow bean. With an inline subflow, the input channel is not yet available.