Dynamic and Runtime Integration Flows

可以在运行时注册 IntegrationFlow 及其所有依赖组件。在 5.0 版之前,我们使用 BeanFactory.registerSingleton() 钩子。从 Spring Framework 5.0 开始,我们使用 instanceSupplier 钩子进行程序化的 BeanDefinition 注册。以下示例展示了如何以编程方式注册一个 bean:

BeanDefinition beanDefinition =
         BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
               .getRawBeanDefinition();

((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);

请注意,在上面的示例中,instanceSupplier 钩子是 genericBeanDefinition 方法的最后一个参数,在此示例中由一个 lambda 提供。

将自动完成所有必需的 bean 初始化和生命周期,这与标准上下文字符串 bean 定义一样。

为了简化开发体验,Spring Integration 引入了 IntegrationFlowContext 以在运行时注册和管理 IntegrationFlow 实例,如下例所示:

@Autowired
private AbstractServerConnectionFactory server1;

@Autowired
private IntegrationFlowContext flowContext;

...

@Test
public void testTcpGateways() {
    TestingUtilities.waitListening(this.server1, null);

    IntegrationFlow flow = f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client1"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());

    IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
    assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}

当我们有多个配置选项并且必须创建类似流的多个实例时,此功能非常有用。为此,我们可以迭代我们的选项并在循环内创建和注册 IntegrationFlow 实例。另一个变量是当我们的数据源不是基于 Spring 时,我们必须立即创建它。此类示例就是 Reactive Streams 事件源,如下例所示:

Flux<Message<?>> messageFlux =
    Flux.just("1,2,3,4")
        .map(v -> v.split(","))
        .flatMapIterable(Arrays::asList)
        .map(Integer::parseInt)
        .map(GenericMessage<Integer>::new);

QueueChannel resultChannel = new QueueChannel();

IntegrationFlow integrationFlow =
    IntegrationFlow.from(messageFlux)
        .<Integer, Integer>transform(p -> p * 2)
        .channel(resultChannel)
        .get();

this.integrationFlowContext.registration(integrationFlow)
            .register();

(由 IntegrationFlowContext.registration() 产生的)IntegrationFlowRegistrationBuilder 可用于指定要注册的 IntegrationFlow 的 bean 名称,以控制其 autoStartup 以及注册非 Spring Integration 的 bean。通常情况下,这些额外的 bean 是连接器工厂(AMQP、JMS、(S)FTP、TCP/UDP 等)、序列化器和反序列化器或任何其他必需的支持组件。

可以使用 IntegrationFlowRegistration.destroy() 回调,在你不再需要一个动态注册的 IntegrationFlow 及其所有依赖的 bean 时,将它们删除。更多信息,请参见 link:https://docs.spring.io/spring-integration/api/org/springframework/integration/dsl/context/IntegrationFlowContext.html[IntegrationFlowContext Javadoc。

从版本 5.0.6 开始,IntegrationFlow 定义中生成的所有 Bean 名称前都会加上流程 ID 作为前缀。我们建议始终指定一个显式流程 ID。否则,IntegrationFlowContext 中会启动一个同步屏障,以便为 IntegrationFlow 生成 Bean 名称并注册其 Bean。我们对这两个操作进行同步,以避免在相同的生成 Bean 名称可能用于不同的 IntegrationFlow 实例时出现的争用情况。

此外,从版本 5.0.6 开始,注册 builder API 有了一个新方法:useFlowIdAsPrefix()。如果希望声明相同流的多个实例,并且当流中的组件具有相同的 ID 时避免 bean 名称冲突,此功能非常有用,如下例所示:

private void registerFlows() {
    IntegrationFlowRegistration flow1 =
              this.flowContext.registration(buildFlow(1234))
                    .id("tcp1")
                    .useFlowIdAsPrefix()
                    .register();

    IntegrationFlowRegistration flow2 =
              this.flowContext.registration(buildFlow(1235))
                    .id("tcp2")
                    .useFlowIdAsPrefix()
                    .register();
}

private IntegrationFlow buildFlow(int port) {
    return f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());
}

在这种情况下,可以使用 bean 名称 tcp1.client.handler 引用第一个流的消息处理器。

当使用 useFlowIdAsPrefix() 时,需要一个 id 属性。