Operator gateway()

`gateway()`操作符在 `IntegrationFlow`定义中是一项特殊服务激活器实现,通过其输入通道调用其他端点或集成流,并等待答复。从技术层面看,它在 `<chain>`定义中扮演嵌套 `<gateway>`组件的相同角色(见 Calling a Chain from within a Chain),允许流变得更简洁、更直观。从商业角度或逻辑上来看,它是一个消息网关,允许在目标集成解决方案中不同部分之间分配并重复使用功能(见 Messaging Gateways)。此操作符有数个实现重载,以适应不同的目标:

  • gateway(String requestChannel) 根据其名称向某个终端节点输入通道发送消息;

  • gateway(MessageChannel requestChannel) 根据其直接注入向某个终端节点输入通道发送消息;

  • gateway(IntegrationFlow flow) 向提供的 IntegrationFlow 的输入通道发送消息。

所有这些都有一个带有第二个 Consumer<GatewayEndpointSpec> 参数的变体,用于配置目标 GatewayMessageHandler 和相应的 AbstractEndpoint。此外,基于 IntegrationFlow 的方法允许调用现有的 IntegrationFlow bean 或通过 IntegrationFlow 函数接口的内嵌 lambda 声明流程为子流程,或者将其解压缩为 private 方法,以获得更简洁的代码样式:

@Bean
IntegrationFlow someFlow() {
        return IntegrationFlow
                .from(...)
                .gateway(subFlow())
                .handle(...)
                .get();
}

private static IntegrationFlow subFlow() {
        return f -> f
                .scatterGather(s -> s.recipientFlow(...),
                        g -> g.outputProcessor(MessageGroup::getOne))
}

如果下游流不总返回响应,你应该将 requestTimeout 设置为 0,以防止无限期挂起调用线程。在这种情况下,流将在该点结束并且线程将被释放以执行进一步的工作。