Sub-flows support
一些 if…else
和 publish-subscribe
组件提供了使用子流程指定其逻辑或映射的功能。最简单的示例是 .publishSubscribeChannel()
, 如下例所示:
@Bean
public IntegrationFlow subscribersFlow() {
return flow -> flow
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
.subscribe(f -> f
.<Integer>handle((p, h) -> p / 2)
.channel(c -> c.queue("subscriber1Results")))
.subscribe(f -> f
.<Integer>handle((p, h) -> p * 2)
.channel(c -> c.queue("subscriber2Results"))))
.<Integer>handle((p, h) -> p * 3)
.channel(c -> c.queue("subscriber3Results"));
}
可以使用单独的 IntegrationFlow
@Bean
定义实现相同的结果,但我们希望你发现这种子流程逻辑组合样式很有用。我们发现它会导致代码更短(因此更具可读性)。
从 5.3 版本开始,提供了基于 BroadcastCapableChannel
的 publishSubscribeChannel()
实现,以在代理支持的消息通道上配置子流程订阅者。例如,我们现在可以将多个订阅者配置为 Jms.publishSubscribeChannel()
上的子流程:
@Bean
public JmsPublishSubscribeMessageChannelSpec jmsPublishSubscribeChannel() {
return Jms.publishSubscribeChannel(jmsConnectionFactory())
.destination("pubsub");
}
@Bean
public IntegrationFlow pubSubFlow(BroadcastCapableChannel jmsPublishSubscribeChannel) {
return f -> f
.publishSubscribeChannel(jmsPublishSubscribeChannel,
pubsub -> pubsub
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel1")))
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}
类似的 publish-subscribe
子流程组合提供了 .routeToRecipients()
方法。
另一个示例是在 .filter()
方法中使用 .discardFlow()
而不是 .discardChannel()
。
.route()
值得特别关注。考虑以下示例:
@Bean
public IntegrationFlow routeFlow() {
return f -> f
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.channelMapping("true", "evenChannel")
.subFlowMapping("false", sf ->
sf.<Integer>handle((p, h) -> p * 3)))
.transform(Object::toString)
.channel(c -> c.queue("oddChannel"));
}
.channelMapping()
继续像在常规 Router
映射中一样工作,但 .subFlowMapping()
将该子流程绑定到主流程。换言之,任何路由器的子流程都将在 .route()
之后返回到主流程。
有时,你需要从 .subFlowMapping()
引用现有的 IntegrationFlow
@Bean
。以下示例演示了如何执行此操作:
@Bean
public IntegrationFlow splitRouteAggregate() {
return f -> f
.split()
.<Integer, Boolean>route(o -> o % 2 == 0,
m -> m
.subFlowMapping(true, oddFlow())
.subFlowMapping(false, sf -> sf.gateway(evenFlow())))
.aggregate();
}
@Bean
public IntegrationFlow oddFlow() {
return f -> f.handle(m -> System.out.println("odd"));
}
@Bean
public IntegrationFlow evenFlow() {
return f -> f.handle((p, h) -> "even");
}
在这种情况下,当你需要从该子流程接收回复并继续主流程时,此 IntegrationFlow
Bean 引用(或其输入通道)必须用 .gateway()
包装,如前一个示例所示。前一个示例中的 oddFlow()
引用没有包装到 .gateway()
。因此,我们不希望从此路由分支获得回复。否则,你最终会遇到类似于以下内容的异常:
Caused by: org.springframework.beans.factory.BeanCreationException:
The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c)
is a one-way 'MessageHandler' and it isn’t appropriate to configure 'outputChannel'.
This is the end of the integration flow.
当你将子流程配置为 lambda 时,框架将处理与子流程的请求-回复交互,并且不需要网关。
子流程可以嵌套到任何深度,但我们不建议这样做。事实上,即使在路由器的情况下,在流程中添加复杂的子流程也会很快看起来像一盘意大利面,并且对于人类来说很难解析。
在 DSL 支持子流程配置的情况下,当组件配置通常需要通道并且该子流程以
框架在内部创建一个 |