Dynamic and Runtime Integration Flows
可以在运行时注册 IntegrationFlow
及其所有依赖组件。在 5.0 版之前,我们使用 BeanFactory.registerSingleton()
钩子。从 Spring Framework 5.0
开始,我们使用 instanceSupplier
钩子进行程序化的 BeanDefinition
注册。以下示例展示了如何以编程方式注册一个 bean:
IntegrationFlow
and all its dependent components can be registered at runtime.
Before version 5.0, we used the BeanFactory.registerSingleton()
hook.
Starting in the Spring Framework 5.0
, we use the instanceSupplier
hook for programmatic BeanDefinition
registration.
The following example shows how to programmatically register a bean:
BeanDefinition beanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
.getRawBeanDefinition();
((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);
请注意,在上面的示例中,instanceSupplier
钩子是 genericBeanDefinition
方法的最后一个参数,在此示例中由一个 lambda 提供。
Note that, in the preceding example, the instanceSupplier
hook is the last parameter to the genericBeanDefinition
method, provided by a lambda in this case.
将自动完成所有必需的 bean 初始化和生命周期,这与标准上下文字符串 bean 定义一样。
All the necessary bean initialization and lifecycle is done automatically, as it is with the standard context configuration bean definitions.
为了简化开发体验,Spring Integration 引入了 IntegrationFlowContext
以在运行时注册和管理 IntegrationFlow
实例,如下例所示:
To simplify the development experience, Spring Integration introduced IntegrationFlowContext
to register and manage IntegrationFlow
instances at runtime, as the following example shows:
@Autowired
private AbstractServerConnectionFactory server1;
@Autowired
private IntegrationFlowContext flowContext;
...
@Test
public void testTcpGateways() {
TestingUtilities.waitListening(this.server1, null);
IntegrationFlow flow = f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client1"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}
当我们有多个配置选项并且必须创建类似流的多个实例时,此功能非常有用。为此,我们可以迭代我们的选项并在循环内创建和注册 IntegrationFlow
实例。另一个变量是当我们的数据源不是基于 Spring 时,我们必须立即创建它。此类示例就是 Reactive Streams 事件源,如下例所示:
This is useful when we have multiple configuration options and have to create several instances of similar flows.
To do so, we can iterate our options and create and register IntegrationFlow
instances within a loop.
Another variant is when our source of data is not Spring-based, so we must create it on the fly.
Such a sample is Reactive Streams event source, as the following example shows:
Flux<Message<?>> messageFlux =
Flux.just("1,2,3,4")
.map(v -> v.split(","))
.flatMapIterable(Arrays::asList)
.map(Integer::parseInt)
.map(GenericMessage<Integer>::new);
QueueChannel resultChannel = new QueueChannel();
IntegrationFlow integrationFlow =
IntegrationFlow.from(messageFlux)
.<Integer, Integer>transform(p -> p * 2)
.channel(resultChannel)
.get();
this.integrationFlowContext.registration(integrationFlow)
.register();
(由 IntegrationFlowContext.registration()
产生的)IntegrationFlowRegistrationBuilder
可用于指定要注册的 IntegrationFlow
的 bean 名称,以控制其 autoStartup
以及注册非 Spring Integration 的 bean。通常情况下,这些额外的 bean 是连接器工厂(AMQP、JMS、(S)FTP、TCP/UDP 等)、序列化器和反序列化器或任何其他必需的支持组件。
The IntegrationFlowRegistrationBuilder
(as a result of the IntegrationFlowContext.registration()
) can be used to specify a bean name for the IntegrationFlow
to register, to control its autoStartup
, and to register, non-Spring Integration beans.
Usually, those additional beans are connection factories (AMQP, JMS, (S)FTP, TCP/UDP, and others.), serializers and deserializers, or any other required support components.
可以使用 IntegrationFlowRegistration.destroy()
回调,在你不再需要一个动态注册的 IntegrationFlow
及其所有依赖的 bean 时,将它们删除。更多信息,请参见 link:https://docs.spring.io/spring-integration/api/org/springframework/integration/dsl/context/IntegrationFlowContext.html[IntegrationFlowContext
Javadoc。
You can use the IntegrationFlowRegistration.destroy()
callback to remove a dynamically registered IntegrationFlow
and all its dependent beans when you no longer need them.
See the IntegrationFlowContext
Javadoc for more information.
从版本 5.0.6 开始, |
Starting with version 5.0.6, all generated bean names in an |
此外,从版本 5.0.6 开始,注册 builder API 有了一个新方法:useFlowIdAsPrefix()
。如果希望声明相同流的多个实例,并且当流中的组件具有相同的 ID 时避免 bean 名称冲突,此功能非常有用,如下例所示:
Also, starting with version 5.0.6, the registration builder API has a new method: useFlowIdAsPrefix()
.
This is useful if you wish to declare multiple instances of the same flow and avoid bean name collisions when components in the flows have the same ID, as the following example shows:
private void registerFlows() {
IntegrationFlowRegistration flow1 =
this.flowContext.registration(buildFlow(1234))
.id("tcp1")
.useFlowIdAsPrefix()
.register();
IntegrationFlowRegistration flow2 =
this.flowContext.registration(buildFlow(1235))
.id("tcp2")
.useFlowIdAsPrefix()
.register();
}
private IntegrationFlow buildFlow(int port) {
return f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
}
在这种情况下,可以使用 bean 名称 tcp1.client.handler
引用第一个流的消息处理器。
In this case, the message handler for the first flow can be referenced with bean a name of tcp1.client.handler
.
当使用 |
An |