Spring Integration Interaction
Spring Integration Framework扩展了 Spring 编程模型,以支持著名的企业集成模式。它能够在基于 Spring 的应用程序中进行轻量级消息传递,并支持通过声明性适配器与外部系统进行集成。它还提供高级 DSL,可以将各种操作(端点)组合成逻辑集成流。借助此 DSL 配置的 lambda 样式,Spring Integration 已经获得了相当好的 java.util.function`接口采用度。
@MessagingGateway`代理界面还可以作为 `Function`或 `Consumer`出现,根据 Spring Cloud 函数环境,代理界面可以注册到函数目录中。有关 Spring Integration 如何支持函数的更多信息,请参阅 Spring Integration ReferenceManual。
Spring Integration Framework extends the Spring programming model to support the well-known Enterprise Integration Patterns.
It enables lightweight messaging within Spring-based applications and supports integration with external systems via declarative adapters.
It also provides a high-level DSL to compose various operations (endpoints) into a logical integration flow.
With a lambda style of this DSL configuration, Spring Integration already has a good level of java.util.function
interfaces adoption.
The @MessagingGateway
proxy interface can also be as a Function
or Consumer
, which according to the Spring Cloud Function environment can be registered into a function catalog.
See more information in Spring Integration ReferenceManual about its support for functions.
另一方面,从版本 4.0.3
开始,Spring Cloud Function 引入了 spring-cloud-function-integration
模块,该模块从 Spring Integration DSL 角度为与 FunctionCatalog
交互提供了更深层次、更特定于云且基于自动配置的 API。FunctionFlowBuilder
是使用 FunctionCatalog
自动配置和自动装配的,并且表示目标 IntegrationFlow
实例针对函数的 DSL 的入口点。除了标准 IntegrationFlow.from()
工厂(为方便起见)外,FunctionFlowBuilder
还公开了一个 fromSupplier(String supplierDefinition)
工厂,用于在提供的 FunctionCatalog
中查找目标 Supplier
。然后,此 FunctionFlowBuilder
会将 FunctionFlowDefinition
导致结果。此 FunctionFlowDefinition
是 IntegrationFlowExtension
的实现,并公开 apply(String functionDefinition)
和 accept(String consumerDefinition)
运算符,用于分别从 FunctionCatalog
中查找 Function
或 Consumer
。有关详细信息,请参阅其 Javadocs。
On the other hand, starting with version 4.0.3
, Spring Cloud Function introduces a spring-cloud-function-integration
module which provides deeper, more cloud-specific and auto-configuration based API for interaction with a FunctionCatalog
from Spring Integration DSL perspective.
The FunctionFlowBuilder
is auto-configured and autowired with a FunctionCatalog
and represents an entry point for function-specific DSL for target IntegrationFlow
instance.
In addition to standard IntegrationFlow.from()
factories (for convenience), the FunctionFlowBuilder
exposes a fromSupplier(String supplierDefinition)
factory to lookup the target Supplier
in the provided FunctionCatalog
.
Then this FunctionFlowBuilder
leads to the FunctionFlowDefinition
.
This FunctionFlowDefinition
is an implementation of the IntegrationFlowExtension
and exposes apply(String functionDefinition)
and accept(String consumerDefinition)
operators to lookup Function
or Consumer
from the FunctionCatalog
, respectively.
See their Javadocs for more information.
以下示例展示了 FunctionFlowBuilder
的实际操作以及其余 IntegrationFlow
API 的强大功能:
The following example demonstrates the FunctionFlowBuilder
in action alongside with the power of the rest of IntegrationFlow
API:
@Configuration
public class IntegrationConfiguration {
@Bean
Supplier<byte[]> simpleByteArraySupplier() {
return "simple test data"::getBytes;
}
@Bean
Function<String, String> upperCaseFunction() {
return String::toUpperCase;
}
@Bean
BlockingQueue<String> results() {
return new LinkedBlockingQueue<>();
}
@Bean
Consumer<String> simpleStringConsumer(BlockingQueue<String> results) {
return results::add;
}
@Bean
QueueChannel wireTapChannel() {
return new QueueChannel();
}
@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.fromSupplier("simpleByteArraySupplier")
.wireTap("wireTapChannel")
.apply("upperCaseFunction")
.log(LoggingHandler.Level.WARN)
.accept("simpleStringConsumer");
}
}
由于 FunctionCatalog.lookup()
功能不仅限于简单的函数名称,因此上述 apply()
和 accept()
运算符中还可以使用函数组合功能:
Since the FunctionCatalog.lookup()
functionality is not limited just to simple function names, a function composition feature can also be used in the mentioned apply()
and accept()
operators:
@Bean
IntegrationFlow functionCompositionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.from("functionCompositionInput")
.accept("upperCaseFunction|simpleStringConsumer");
}
当我们在 Spring Cloud 应用程序中添加用于预定义函数的自动配置依赖项时,此 API 将变得更加相关。例如, Stream Applications项目除了应用程序映像之外,还提供具有适用于各种集成用例的函数的工件,例如 debezium-supplier
、elasticsearch-consumer
、`aggregator-function`等。
This API becomes more relevant, when we add into our Spring Cloud applications auto-configuration dependencies for predefined functions.
For example Stream Applications project, in addition to application images, provides artifacts with functions for various integration use-case, e.g. debezium-supplier
, elasticsearch-consumer
, aggregator-function
etc.
以下配置分别基于 http-supplier
、spel-function
和 file-consumer
:
The following configuration is based on the http-supplier
, spel-function
and file-consumer
, respectively:
@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.fromSupplier("httpSupplier", e -> e.poller(Pollers.trigger(new OnlyOnceTrigger())))
.<Flux<?>>handle((fluxPayload, headers) -> fluxPayload, e -> e.async(true))
.channel(c -> c.flux())
.apply("spelFunction")
.<String, String>transform(String::toUpperCase)
.accept("fileConsumer");
}
我们只需要将其配置添加到 application.properties
中(如有必要):
What we would need else is just to add their configuration into an application.properties
(if necessary):
http.path-pattern=/testPath
spel.function.expression=new String(payload)
file.consumer.name=test-data.txt