Producing and Consuming Messages

你可以简单地编写函数并将其公开为 @Bean 来编写 Spring Cloud Stream 应用程序。还可以使用基于 Spring 集成注解的配置或基于 Spring Cloud Stream 注解的配置,尽管从 spring-cloud-stream 3.x 开始,我们建议使用函数实现。

You can write a Spring Cloud Stream application by simply writing functions and exposing them as @Bean s. You can also use Spring Integration annotations based configuration or Spring Cloud Stream annotation based configuration, although starting with spring-cloud-stream 3.x we recommend using functional implementations.

Spring Cloud Function support

Overview

自 Spring Cloud Stream v2.1 以来,定义 stream handlerssources 的另一个方案是使用对 Spring Cloud Function 内置支持,其中它们可以表示为类型为 java.util.function.[Supplier/Function/Consumer] 的 bean。

Since Spring Cloud Stream v2.1, another alternative for defining stream handlers and sources is to use build-in support for Spring Cloud Function where they can be expressed as beans of type java.util.function.[Supplier/Function/Consumer].

若要指定将哪个函数 Bean 绑定到绑定公开的外部目标,必须提供 spring.cloud.function.definition 属性。

To specify which functional bean to bind to the external destination(s) exposed by the bindings, you must provide spring.cloud.function.definition property.

如果只有类型为 java.util.function.[Supplier/Function/Consumer] 的单例 bean,由于这样的函数 bean 会被自动发现,因此你可以跳过 spring.cloud.function.definition 属性。但是,为了避免混淆,使用此属性被认为是最佳实践。有时,此自动发现可能会妨碍进程,因为类型为 java.util.function.[Supplier/Function/Consumer] 的单例 bean 可能出于处理消息以外的目的而存在,但由于是单例,因此它会被自动发现并自动绑定。对于这些罕见情况,你可以通过提供 spring.cloud.stream.function.autodetect 属性(将值设置为 false)来禁用自动发现。

In the event you only have single bean of type java.util.function.[Supplier/Function/Consumer], you can skip the spring.cloud.function.definition property, since such functional bean will be auto-discovered. However, it is considered best practice to use such property to avoid any confusion. Some time this auto-discovery can get in the way, since single bean of type java.util.function.[Supplier/Function/Consumer] could be there for purposes other then handling messages, yet being single it is auto-discovered and auto-bound. For these rare scenarios you can disable auto-discovery by providing spring.cloud.stream.function.autodetect property with value set to false.

这是一个应用程序示例,它将消息处理程序公开为 java.util.function.Function,有效地支持 pass-thru 语义,作为数据消费者和生产者。

Here is the example of the application exposing message handler as java.util.function.Function effectively supporting pass-thru semantics by acting as consumer and producer of data.

@SpringBootApplication
public class MyFunctionBootApp {

	public static void main(String[] args) {
		SpringApplication.run(MyFunctionBootApp.class);
	}

	@Bean
	public Function<String, String> toUpperCase() {
		return s -> s.toUpperCase();
	}
}

在前面的示例中,我们定义了一个类型为 java.util.function.Function 的 Bean,名为 toUpperCase,作为消息处理程序,其“输入”和“输出”必须绑定到提供目标绑定器公开的外部目标。默认情况下,“输入”和“输出”绑定名称将是 toUpperCase-in-0toUpperCase-out-0。请参阅 [函数绑定名称] 一节,了解用于建立绑定名称的命名约定详细信息。

In the preceding example, we define a bean of type java.util.function.Function called toUpperCase to be acting as message handler whose 'input' and 'output' must be bound to the external destinations exposed by the provided destination binder. By default the 'input' and 'output' binding names will be toUpperCase-in-0 and toUpperCase-out-0. Please see [Functional binding names] section for details on naming convention used to establish binding names.

以下是支持其他语义的简单函数应用程序示例:

Below are the examples of simple functional applications to support other semantics:

这是一个公开为 java.util.function.Suppliersource 语义示例

Here is the example of a source semantics exposed as java.util.function.Supplier

@SpringBootApplication
public static class SourceFromSupplier {

	@Bean
	public Supplier<Date> date() {
		return () -> new Date(12345L);
	}
}

下面是公开为 java.util.function.Consumersink 语义 示例

Here is the example of a sink semantics exposed as java.util.function.Consumer

@SpringBootApplication
public static class SinkFromConsumer {

	@Bean
	public Consumer<String> sink() {
		return System.out::println;
	}
}

Suppliers (Sources)

说到如何触发调用,FunctionConsumer 相当直接。它们是根据发送到其绑定的目标数据(事件)触发的。换言之,它们是经典事件驱动组件。

Function and Consumer are pretty straightforward when it comes to how their invocation is triggered. They are triggered based on data (events) sent to the destination they are bound to. In other words, they are classic event-driven components.

然而,Supplier 在触发方面属于自己的类别。由于它根据定义是数据的来源(原点),因此不会订阅任何入站目标,因此必须由其他一些机制触发。Supplier 实现也存在问题,此实现可以是 命令式的反应式的,并且与此类提供程序的触发直接相关。

However, Supplier is in its own category when it comes to triggering. Since it is, by definition, the source (the origin) of the data, it does not subscribe to any in-bound destination and, therefore, has to be triggered by some other mechanism(s). There is also a question of Supplier implementation, which could be imperative or reactive and which directly relates to the triggering of such suppliers.

考虑以下示例:

Consider the following sample:

@SpringBootApplication
public static class SupplierConfiguration {

	@Bean
	public Supplier<String> stringSupplier() {
		return () -> "Hello from Supplier";
	}
}

前面的 Supplier Bean 每当其 get() 方法被调用时就会生成一个字符串。然而,谁调用了此方法以及多久调用一次?该框架提供了一种默认轮询机制(回答“谁?”的问题),它将触发对提供程序的调用,默认情况下它将每秒执行一次(回答“多久?”的问题)。换言之,上述配置每秒生成一条消息,并且每条消息都发送到由绑定器公开的 output 目标。若要了解如何定制轮询机制,请参阅 Polling Configuration Properties 一节。

The preceding Supplier bean produces a string whenever its get() method is invoked. However, who invokes this method and how often? The framework provides a default polling mechanism (answering the question of "Who?") that will trigger the invocation of the supplier and by default it will do so every second (answering the question of "How often?"). In other words, the above configuration produces a single message every second and each message is sent to an output destination that is exposed by the binder. To learn how to customize the polling mechanism, see Polling Configuration Properties section.

考虑另一个示例:

Consider a different example:

@SpringBootApplication
public static class SupplierConfiguration {

    @Bean
    public Supplier<Flux<String>> stringSupplier() {
        return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(1000);
                    return "Hello from Supplier";
                } catch (Exception e) {
                    // ignore
                }
            }
        })).subscribeOn(Schedulers.elastic()).share();
    }
}

前面的 Supplier Bean 采用了响应式编程风格。通常不同于命令式提供程序,它仅应触发一次,因为其 get() 方法的调用会生成(提供)消息的连续流,而不是单个消息。

The preceding Supplier bean adopts the reactive programming style. Typically, and unlike the imperative supplier, it should be triggered only once, given that the invocation of its get() method produces (supplies) the continuous stream of messages and not an individual message.

该框架识别编程风格的差异,并保证此类提供程序仅触发一次。

The framework recognizes the difference in the programming style and guarantees that such a supplier is triggered only once.

然而,想象一下你想轮询某个数据源并返回表示结果集的有限数据流的用例。反应式编程样式是此类 Supplier 的完美机制。但鉴于生成流的有限性,此类 Supplier 仍需要定期调用。

However, imagine the use case where you want to poll some data source and return a finite stream of data representing the result set. The reactive programming style is a perfect mechanism for such a Supplier. However, given the finite nature of the produced stream, such Supplier still needs to be invoked periodically.

考虑以下模拟此类用例的示例,它通过生成一个有限数据流来模拟此类用例:

Consider the following sample, which emulates such use case by producing a finite stream of data:

@SpringBootApplication
public static class SupplierConfiguration {

	@PollableBean
	public Supplier<Flux<String>> stringSupplier() {
		return () -> Flux.just("hello", "bye");
	}
}

Bean 本身用 @PollableBean 注释(@Bean 的子集)进行注释,因此向框架发出一个信号,表示虽然此类 supplier 的实现是反应式的,但仍需要对其进行轮询。

The bean itself is annotated with PollableBean annotation (sub-set of @Bean), thus signaling to the framework that although the implementation of such a supplier is reactive, it still needs to be polled.

PollableBean 中定义了一个 splittable 属性,该属性会向此注释的后处理程序发出信号,指示由注释的组件生成的结果必须拆分,并且默认设置为 true。这意味着框架会拆分返回值,并将其每个项作为一条消息分别发送出去。如果您不希望产生此行为,可以在此点上将其设置为 false。然后,此类提供程序将仅返回生成的 Flux,而不会将其拆分。

There is a splittable attribute defined in PollableBean which signals to the post processors of this annotation that the result produced by the annotated component has to be split and is set to true by default. It means that the framework will split the returning sending out each item as an individual message. If this is not he desired behavior you can set it to false at which point such supplier will simply return the produced Flux without splitting it.

Supplier & threading

正如你现在所了解到的,与由事件触发的 FunctionConsumer 不同的,Supplier 没有输入,因此由不同的机制(poller)触发,该机制可能具有不可预测的线程机制。虽然大多数情况下线程机制的细节与函数的下游执行无关,但在某些情况下可能会出现问题,尤其是在具有特定线程关联预期集成的框架中。例如, Spring Cloud Sleuth 依赖于存储在本地线程中的跟踪数据。对于这些情况,我们通过 StreamBridge 提供了另一种机制,用户可以在其中更好地控制线程机制。可以在 Sending arbitrary data to an output (e.g. Foreign event-driven sources) 部分获取更多详细信息。

As you have learned by now, unlike Function and Consumer, which are triggered by an event (they have input data), Supplier does not have any input and thus triggered by a different mechanism - poller, which may have an unpredictable threading mechanism. And while the details of the threading mechanism most of the time are not relevant to the downstream execution of the function it may present an issue in certain cases especially with integrated frameworks that may have certain expectations to thread affinity. For example, Spring Cloud Sleuth which relies on tracing data stored in thread local. For those cases we have another mechanism via StreamBridge, where user has more control over threading mechanism. You can get more details in Sending arbitrary data to an output (e.g. Foreign event-driven sources) section.

Consumer (Reactive)

反应式 Consumer 有点特殊,因为它的返回类型为 void,框架无权引用它进行订阅。你很可能不需要编写 Consumer<Flux<?>>,而可以作为 Function<Flux<?>, Mono<Void>> 编写它,将 then 运算符调用为流上的最后一个运算符。

Reactive Consumer is a little bit special because it has a void return type, leaving framework with no reference to subscribe to. Most likely you will not need to write Consumer<Flux<?>>, and instead write it as a Function<Flux<?>, Mono<Void>> invoking then operator as the last operator on your stream.

例如:

For example:

public Function<Flux<?>, Mono<Void>> consumer() {
	return flux -> flux.map(..).filter(..).then();
}

但是,如果你确实需要编写明确的 Consumer<Flux<?>>,请记得订阅传入的 Flux。

But if you do need to write an explicit Consumer<Flux<?>>, remember to subscribe to the incoming Flux.

此外,请记住,同样的规则适用于在混合反应式和命令式函数时的函数组合。Spring Cloud Function 确实支持将反应式函数与命令式函数组合,但是你必须知道某些限制。例如,假设你已经将反应式函数与命令式消费者组合。此类组合的结果是反应式 Consumer。然而,无法订阅此类消费者(如本节前面所述),因此此限制只能通过使你的消费者具有反应性并手动订阅(如前面所讨论的),或将你的函数更改为命令式来解决。

Also, keep in mind that the same rule applies for function composition when mixing reactive and imperative functions. Spring Cloud Function indeed supports composing reactive functions with imperative, however you must be aware of certain limitations. For example, assume you have composed reactive function with imperative consumer. The result of such composition is a reactive Consumer. However, there is no way to subscribe to such consumer as discussed earlier in this section, so this limitation can only be addressed by either making your consumer reactive and subscribing manually (as discussed earlier), or changing your function to be imperative.

Polling Configuration Properties

Spring Cloud Stream 公开了以下属性,并以 spring.integration.poller. 为前缀:

The following properties are exposed by Spring Cloud Stream and are prefixed with the spring.integration.poller.:

fixedDelay

Fixed delay for default poller in milliseconds.

默认值:1000L。

Default: 1000L.

maxMessagesPerPoll

Maximum messages for each polling event of the default poller.

默认值:1L。

Default: 1L.

cron

Cron expression value for the Cron Trigger.

默认值:无。

Default: none.

initialDelay

Initial delay for periodic triggers.

默认值:0。

Default: 0.

timeUnit

The TimeUnit to apply to delay values.

默认值:MILLISECONDS。

Default: MILLISECONDS.

例如,--spring.integration.poller.fixed-delay=2000 将轮询器间隔设置为每隔两秒轮询一次。

For example --spring.integration.poller.fixed-delay=2000 sets the poller interval to poll every two seconds.

Per-binding polling configuration

上一节说明了如何配置将应用于所有绑定的单个默认轮询器。虽然它很适合 Spring Cloud Stream 设计的微服务的模型,其中每个微服务表示一个组件(例如,Supplier),因此默认轮询器配置就足够了,但有一些边缘情况,其中你可能有几个需要不同轮询配置的组件。

The previous section shows how to configure a single default poller that will be applied to all bindings. While it fits well with the model of microservices spring-cloud-stream designed for where each microservice represents a single component (e.g., Supplier) and thus default poller configuration is enough, there are edge cases where you may have several components that require different polling configurations

对于此类情况,请使用基于绑定的轮询器配置方式。例如,假设你有一个输出绑定 supply-out-0。在这种情况下,你可以使用前缀 spring.cloud.stream.bindings.supply-out-0.producer.poller.. 来为该绑定配置轮询器(例如,spring.cloud.stream.bindings.supply-out-0.producer.poller.fixed-delay=2000)。

For such cases please use per-binding way of configuring poller. For example, assume you have an output binding supply-out-0. In this case you can configure poller for such binding using spring.cloud.stream.bindings.supply-out-0.producer.poller.. prefix (e.g., spring.cloud.stream.bindings.supply-out-0.producer.poller.fixed-delay=2000).

Sending arbitrary data to an output (e.g. Foreign event-driven sources)

在某些情况下,数据实际来源可能来自不是 Binder 的外部(外来)系统。例如,数据来源可能是经典的 REST 端点。我们如何将此类来源与 Spring Cloud Stream 使用的功能机制连接起来?

There are cases where the actual source of data may be coming from the external (foreign) system that is not a binder. For example, the source of the data may be a classic REST endpoint. How do we bridge such source with the functional mechanism used by spring-cloud-stream?

Spring Cloud Stream 提供了两种机制,让我们详细了解一下它们。

Spring Cloud Stream provides two mechanisms, so let’s look at them in more details

在此,对于两个示例,我们将使用绑定到根 Web 上下文的名为 delegateToSupplier 的标准 MVC 端点方法,通过 StreamBridge 机制将传入请求委派给流。

Here, for both samples we’ll use a standard MVC endpoint method called delegateToSupplier bound to the root web context, delegating incoming requests to stream via StreamBridge mechanism.

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.output-bindings=toStream");
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		System.out.println("Sending " + body);
		streamBridge.send("toStream", body);
	}
}

此处,我们自动装配一个 StreamBridge Bean,它允许我们将数据发送到输出绑定,从而有效地将非流应用程序与 Spring Cloud Stream 连接起来。请注意,前面的示例没有定义任何源函数(例如,Supplier Bean),从而使框架无法提前创建源绑定,这对于配置包含函数 Bean 的情况是典型的。而且,这很好,因为 StreamBridge 将在首次调用其 send(..) 操作时为不存在的绑定启动输出绑定的创建(如果需要,还包括目标自动配置),并将其缓存以供后续重用(有关更多详细信息,请参阅 StreamBridge and Dynamic Destinations)。

Here we autowire a StreamBridge bean which allows us to send data to an output binding effectively bridging non-stream application with spring-cloud-stream. Note that preceding example does not have any source functions defined (e.g., Supplier bean) leaving the framework with no trigger to create source bindings in advance, which would be typical for cases where configuration contains function beans. And that is fine, since StreamBridge will initiate creation of output bindings (as well as destination auto-provisioning if necessary) for non existing bindings on the first call to its send(..) operation caching it for subsequent reuse (see StreamBridge and Dynamic Destinations for more details).

但是,如果你想在初始化(启动)时预先创建输出绑定,你可以受益于 spring.cloud.stream.output-bindings 属性,你可以在其中声明源的名称。提供的名称将用作创建源绑定的触发器。你可以使用“;”表示多个源(多个输出绑定)(例如,--spring.cloud.stream.output-bindings=foo;bar)。

However, if you want to pre-create an output binding at the initialization (startup) time you can benefit from spring.cloud.stream.output-bindings property where you can declare the name of your sources. The provided name will be used as a trigger to create a source binding. You can use ; to signify multiple sources (multiple output bindings) (e.g., --spring.cloud.stream.output-bindings=foo;bar)

此外,请注意,streamBridge.send(..) 方法对数据使用 Object。这意味着您可以向其发送 POJO 或 Message,并且在发送输出时,它将遵循与从任何函数或提供相同级别一致性(如函数)的功能发送输出时相同的例程。这意味着输出类型转换、分区等都得到尊重,如同它是函数产生的输出一样。

Also, note that streamBridge.send(..) method takes an Object for data. This means you can send POJO or Message to it and it will go through the same routine when sending output as if it was from any Function or Supplier providing the same level of consistency as with functions. This means the output type conversion, partitioning etc are honored as if it was from the output produced by functions.

StreamBridge with async send

StreamBridge 使用 Spring Integration 框架提供的发送机制,这是 Spring Cloud Stream 的核心。默认情况下,此机制使用发送者的线程。换句话说,发送处于阻塞状态。虽然在很多情况下这都可以,但在某些情况下,您希望该发送是异步的。要做到这一点,请在调用发送方法之一之前使用 StreamBridgesetAsync(true) 方法。

StreamBridge uses sending mechanism provided by Spring Integration framework which is at the core of the Spring Cloud Stream. By default this mechanism uses the sender’s thread. In other words, the send is blocking. While this is ok for many cases, there are cases when you want such send to be async. To do that use setAsync(true) method of the StreamBridge before invoking one of the send methods.

Observability Context propagation with asynchronous send

在使用框架提供的可观测性支持以及对 Spring 框架提供支持时,突破线程边界会影响可观测性 上下文的 一致性,从而影响您的跟踪历史记录。要避免这种情况,您只需添加 Micrometer 的 context-propagation 依赖即可(请参见下文)

When using Observability support provided by the framework as well as supporting Spring frameworks, breaking thread boundaries will affect consistency of Observability context, thus your tracing history. To avoid that all you need is to add context-propagation dependency form Micrometer (see below)

<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>context-propagation</artifactId>
    <version>1.1.0</version>
</dependency>

StreamBridge and Dynamic Destinations

当输出目的地在事先未知的情况下,也可以使用 StreamBridge,类似于 [Routing FROM Consumer] 部分中描述的用例。

StreamBridge can also be used for cases when output destination(s) are not known ahead of time similar to the use cases described in [Routing FROM Consumer] section.

我们来看一个示例

Let’s look at the example

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class, args);
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		System.out.println("Sending " + body);
		streamBridge.send("myDestination", body);
	}
}

正如您所看到的,前面的示例与上一个示例非常相似,不同之处在于通过 spring.cloud.stream.output-bindings 属性(未提供)提供了明确的绑定指令。在这里,我们正在向不存在的 myDestination 名称发送数据作为绑定。因此,此名称将被视为动态目的地,如 [Routing FROM Consumer] 部分中所述。

As you can see the preceding example is very similar to the previous one with the exception of explicit binding instruction provided via spring.cloud.stream.output-bindings property (which is not provided). Here we’re sending data to myDestination name which does not exist as a binding. Therefore such name will be treated as dynamic destination as described in [Routing FROM Consumer] section.

在前面的示例中,我们使用 ApplicationRunner 作为 foreign source 来馈送流。

In the preceding example, we are using ApplicationRunner as a foreign source to feed the stream.

一个更实际的示例,其中外部源是 REST 端点。

A more practical example, where the foreign source is REST endpoint.

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class);
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		streamBridge.send("myBinding", body);
	}
}

正如您在 delegateToSupplier 方法中所看到的,我们使用 StreamBridge 将数据发送到 myBinding 绑定。在这里,您还受益于 StreamBridge 的动态特性,其中如果 myBinding 不存在,它将自动创建并缓存,否则将使用现有绑定。

As you can see inside of delegateToSupplier method we’re using StreamBridge to send data to myBinding binding. And here you’re also benefiting from the dynamic features of StreamBridge where if myBinding doesn’t exist it will be created automatically and cached, otherwise existing binding will be used.

缓存动态目的地(绑定)可能会在动态目的地过多时导致内存泄漏。为了获得一些控制级别,我们提供了一种针对输出绑定的自驱逐缓存机制,其默认缓存大小为 10。这意味着如果您的动态目的地大小超过该数字,则可能会有一个现有绑定被驱逐,因此需要重新创建,这会造成轻微的性能下降。您可以通过 spring.cloud.stream.dynamic-destination-cache-size 属性来增加缓存大小,将其设置为所需的值。

Caching dynamic destinations (bindings) could result in memory leaks in the event there are many dynamic destinations. To have some level of control we provide a self-evicting caching mechanism for output bindings with default cache size of 10. This means that if your dynamic destination size goes above that number, there is a possibility that an existing binding will be evicted and thus would need to be recreated which could cause minor performance degradation. You can increase the cache size via spring.cloud.stream.dynamic-destination-cache-size property setting it to the desired value.

curl -H "Content-Type: text/plain" -X POST -d "hello from the other side" http://localhost:8080/

通过展示两个示例,我们要强调这种方法适用于任何类型的外部源。

By showing two examples we want to emphasize the approach will work with any type of foreign sources.

如果你使用 Solace PubSub+ Binder,Spring Cloud Stream 保留了 scst_targetDestination 标头(可通过 BinderHeaders.TARGET_DESTINATION 检索),该标头允许将消息从其绑定配置的目标重定向到此标头指定的 target 目标。这允许 Binder 管理发布到动态目标所需的资源,从而减轻了框架的负担,并避免了前一个注释中提到的缓存问题。更多信息 here

If you are using the Solace PubSub+ binder, Spring Cloud Stream has reserved the scst_targetDestination header (retrievable via BinderHeaders.TARGET_DESTINATION), which allows for messages to be redirected from their bindings' configured destination to the target destination specified by this header. This allows for the binder to manage the resources necessary to publish to dynamic destinations, relieving the framework from having to do so, and avoids the caching issues mentioned in the previous Note. More info here.

Output Content Type with StreamBridge

您还可以使用以下方法签名提供特定内容类型 public boolean send(String bindingName, Object data, MimeType outputContentType)。或者,如果您将数据作为 Message 发送,那么它的内容类型会得到尊重。

You can also provide specific content type if necessary with the following method signature public boolean send(String bindingName, Object data, MimeType outputContentType). Or if you send data as a Message, its content type will be honored.

Using specific binder type with StreamBridge

Spring Cloud Stream 支持多个绑定程序场景。例如,您可能从 Kafka 接收数据并将其发送至 RabbitMQ。

Spring Cloud Stream supports multiple binder scenarios. For example you may be receiving data from Kafka and sending it to RabbitMQ.

有关多绑定程序场景的更多信息,请参见 [Binders] 部分,具体请参见 [Multiple Binders on the Classpath]

For more information on multiple binders scenarios, please see [Binders] section and specifically [Multiple Binders on the Classpath]

如果您计划使用 StreamBridge,并且在应用程序中配置了多个绑定程序,您还必须告诉 StreamBridge 使用哪个绑定程序。为此,send 方法还有两种变体:

In the event you are planning to use StreamBridge and have more then one binder configured in your application you must also tell StreamBridge which binder to use. And for that there are two more variations of send method:

public boolean send(String bindingName, @Nullable String binderType, Object data)

public boolean send(String bindingName, @Nullable String binderType, Object data, MimeType outputContentType)

如您所见,您可以提供一个附加的参数 - binderType,告诉 BindingService 在创建动态绑定时使用哪个绑定程序。

As you can see there is one additional argument that you can provide - binderType, telling BindingService which binder to use when creating dynamic binding.

在使用 spring.cloud.stream.output-bindings 属性或在不同的绑定下已创建绑定后,binderType 参数将不再起作用。

For cases where spring.cloud.stream.output-bindings property is used or the binding was already created under different binder, the binderType argument will have no effect.

Using channel interceptors with StreamBridge

由于 StreamBridge 使用 MessageChannel 来建立输出绑定,因此可以通过 StreamBridge 发送数据时激活通道拦截器。由应用程序决定在 StreamBridge 上应用哪些通道拦截器。除非 Spring Cloud Stream 用 @GlobalChannelInterceptor(patterns = "*") 对其进行注释,否则它不会将检测到的所有通道拦截器注入到 StreamBridge 中。

Since StreamBridge uses a MessageChannel to establish the output binding, you can activate channel interceptors when sending data through StreamBridge. It is up to the application to decide which channel interceptors to apply on StreamBridge. Spring Cloud Stream does not inject all the channel interceptors detected into StreamBridge unless they are annoatated with @GlobalChannelInterceptor(patterns = "*").

让我们假设您在应用程序中有以下两个不同的 StreamBridge 绑定。

Let us assume that you have the following two different StreamBridge bindings in the application.

streamBridge.send("foo-out-0", message);

and

streamBridge.send("bar-out-0", message);

如果您希望将频道拦截应用于两个 StreamBridge 绑定,那么您可以声明以下的 GlobalChannelInterceptor bean。

Now, if you want a channel interceptor applied on both the StreamBridge bindings, then you can declare the following GlobalChannelInterceptor bean.

@Bean
@GlobalChannelInterceptor(patterns = "*")
public ChannelInterceptor customInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

然而,如果您不喜欢上述全局方法并且希望为每个绑定拥有一个专门的拦截器,那么可以进行以下操作。

However, if you don’t like the global approach above and want to have a dedicated interceptor for each binding, then you can do the following.

@Bean
@GlobalChannelInterceptor(patterns = "foo-*")
public ChannelInterceptor fooInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

and

@Bean
@GlobalChannelInterceptor(patterns = "bar-*")
public ChannelInterceptor barInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

您可以灵活地将模式变得更加严格或自定义,以满足您的业务需求。

You have the flexibility to make the patterns more strict or customized to your business needs.

使用此方法,应用程序可以决定注入到 StreamBridge 中的拦截器,而不是应用所有可用的拦截器。

With this approach, the application gets the ability to decide which interceptors to inject in StreamBridge rather than applying all the available interceptors.

StreamBridge 通过包含 StreamBridge 的所有 send 方法的 StreamOperations 接口提供了一份契约。因此,应用程序可以选择使用 StreamOperations 进行自动装配。当使用 StreamBridge 的单元测试代码时,通过为 StreamOperations 接口提供模拟或类似机制,这将非常有用。

StreamBridge provides a contract through the StreamOperations interface that contains all the send methods of StreamBridge. Therefore, applications may choose to autowire using StreamOperations. This is handy when it comes to unit testing code that uses StreamBridge by providing a mock or similar mechanisms for the StreamOperations interface.

Reactive Functions support

由于 Spring Cloud Function 是建立在 Project Reactor 基础上,因此在实现 SupplierFunctionConsumer 的同时,从响应式编程模型中受益无需做太多工作。

Since Spring Cloud Function is build on top of Project Reactor there isn’t much you need to do to benefit from reactive programming model while implementing Supplier, Function or Consumer.

例如:

For example:

@SpringBootApplication
public static class SinkFromConsumer {

	@Bean
	public Function<Flux<String>, Flux<String>> reactiveUpperCase() {
		return flux -> flux.map(val -> val.toUpperCase());
	}
}

在选择响应式或命令式编程模型时必须理解一些重要的事项。

Few important things must be understood when choosing reactive or imperative programming model.

完全响应式还是仅限 API?

Fully reactive or just API?

使用响应式 API 并不一定意味着您可以从其所有响应式特性中受益。换句话说,只有在与兼容系统(如 Reactive Kafka 绑定器)配合使用时,诸如背压之类的功能以及其他高级功能才有效。如果您使用常规 Kafka、Rabbit 或任何其他非响应式绑定器,那么您只能从中体会到响应式 API 本身的便利性,而不能享受到其高级功能,因为流的实际源或目标并非响应式的。

Using reactive API does not necessarily imply that you can benefit from all of the reactive features of such API. In other words things like back-pressure and other advanced features will only work when they are working with compatible system - such as Reactive Kafka binder. In the event you are using regular Kafka or Rabbit or any other non-reactive binder, you can only benefit from the conveniences of the reactive API itself and not its advanced features, since the actual sources or targets of the stream are not reactive.

错误处理和重试

Error handling and retries

在整个手册中,您将查阅到与基于框架的错误处理、重试和其他功能相关的参考,以及与之关联的配置属性。理解它们仅会影响命令式函数这一点很重要,而且对于响应式函数,您不要有相同预期。原因如下:响应式和命令式函数之间存在根本区别。命令式函数是一个_消息处理程序_,它由框架在收到的每个消息上调用。因此,对于 N 个消息,会调用该函数 N 次,并且因为该原因,我们可以包装此函数并添加额外的功能,例如错误处理、重试等。响应式函数是一个_初始化函数_。它仅被调用一次,以便获取用户提供的 Flux/Mono 的引用,以便与框架提供的 Flux/Mono 建立连接。之后,我们(框架)绝对无法了解或控制流。因此,对于响应式函数,当涉及到错误处理和重试时,您必须依赖响应式 API 的丰富性(例如 doOnError().onError*() 等)。

Throughout this manual you will see several reference on the framework-based error handling, retries and other features as well as configuration properties associated with them. It is important to understand that they only effect the imperative functions and you should NOT have the same expectations when it comes to reactive functions. And here is why. . . There is a fundamental difference between reactive and imperative functions. Imperative function is a message handler that is invoked by the framework on each message it receives. So for N messages there will be N invocations of such function and because of that we can wrap such function and add additional functionality such as error handling, retries etc. Reactive function is initialization function. It is invoked only once to get a reference to a Flux/Mono provided by the user to be connected with the one provided by the framework. After that we (the framework) have absolutely no visibility nor control of the stream. Therefore, with reactive functions you must rely on the richness of the reactive API when it comes to error handling and retries (i.e., doOnError(), .onError*() etc).

Functional Composition

使用函数编程模型,您还可以从函数组合中受益,在函数组合中,您可以根据一组简单函数动态组合复杂处理程序。作为一个示例,让我们将以下函数 bean 添加到上面定义的应用程序中

Using functional programming model you can also benefit from functional composition where you can dynamically compose complex handlers from a set of simple functions. As an example let’s add the following function bean to the application defined above

@Bean
public Function<String, String> wrapInQuotes() {
	return s -> "\"" + s + "\"";
}

并修改 spring.cloud.function.definition 属性,以反映您将“toUpperCase”和“wrapInQuotes”组合为新函数的意图。为此,Spring Cloud Function 依赖于 |(管道)符号。因此,要完成我们的示例,我们的属性现在看起来像这样:

and modify the spring.cloud.function.definition property to reflect your intention to compose a new function from both ‘toUpperCase’ and ‘wrapInQuotes’. To do so Spring Cloud Function relies on | (pipe) symbol. So, to finish our example our property will now look like this:

--spring.cloud.function.definition=toUpperCase|wrapInQuotes

Spring Cloud Function 提供的功能组合支持的一项重大好处是您可以组合 reactiveimperative 函数。

One of the great benefits of functional composition support provided by Spring Cloud Function is the fact that you can compose reactive and imperative functions.

组合的结果是一个单独的函数,如您可能猜到的那样,它可能有一个很长且相当晦涩的名称(例如 foo|bar|baz|xyz. . .),在处理其他配置属性时会造成很大不便。这就是 [Functional binding names] 部分中描述的_描述性绑定名称_特性可以提供帮助的地方。

The result of a composition is a single function which, as you may guess, could have a very long and rather cryptic name (e.g., foo|bar|baz|xyz. . .) presenting a great deal of inconvenience when it comes to other configuration properties. This is where descriptive binding names feature described in [Functional binding names] section can help.

例如,如果我们希望为 toUpperCase|wrapInQuotes 提供更具描述性的名称,我们可以使用以下属性 spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes-in-0=quotedUpperCaseInput,允许其他配置属性引用该绑定名称(例如 spring.cloud.stream.bindings.quotedUpperCaseInput.destination=myDestination)。

For example, if we want to give our toUpperCase|wrapInQuotes a more descriptive name we can do so with the following property spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes-in-0=quotedUpperCaseInput allowing other configuration properties to refer to that binding name (e.g., spring.cloud.stream.bindings.quotedUpperCaseInput.destination=myDestination).

Functional Composition and Cross-cutting Concerns

函数组合实际上可以让您通过将其分解为一组简单且可单独管理/测试的组件来解决复杂性,这些组件在运行时仍然可以表示为一个组件。但这并不是唯一的益处。

Function composition effectively allows you to address complexity by breaking it down to a set of simple and individually manageable/testable components that could still be represented as one at runtime. But that is not the only benefit.

您还可以使用组合来解决某些交叉非功能性问题,例如内容丰富。例如,假定您有一个可能缺少某些标题的传入消息,或者某些标题未处于您的业务功能所期望的确切状态中。现在,您可以实现解决这些问题的单独函数,然后将其与主要业务函数组合。

You can also use composition to address certain cross-cutting non-functional concerns, such as content enrichment. For example, assume you have an incoming message that may be lacking certain headers, or some headers are not in the exact state your business function would expect. You can now implement a separate function that addresses those concerns and then compose it with the main business function.

我们来看一个示例

Let’s look at the example

@SpringBootApplication
public class DemoStreamApplication {

	public static void main(String[] args) {
		SpringApplication.run(DemoStreamApplication.class,
				"--spring.cloud.function.definition=enrich|echo",
				"--spring.cloud.stream.function.bindings.enrich|echo-in-0=input",
				"--spring.cloud.stream.bindings.input.destination=myDestination",
				"--spring.cloud.stream.bindings.input.group=myGroup");

	}

	@Bean
	public Function<Message<String>, Message<String>> enrich() {
		return message -> {
			Assert.isTrue(!message.getHeaders().containsKey("foo"), "Should NOT contain 'foo' header");
			return MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
		};
	}

	@Bean
	public Function<Message<String>, Message<String>> echo() {
		return message -> {
			Assert.isTrue(message.getHeaders().containsKey("foo"), "Should contain 'foo' header");
			System.out.println("Incoming message " + message);
			return message;
		};
	}
}

虽然简单,但此示例演示了如何将附加标题(非功能性问题)丰富传入消息,以便其他函数(echo)从该标题中受益。echo 函数保持简洁,仅关注业务逻辑。您还可以看到 spring.cloud.stream.function.bindings 属性的使用,以简化组合的绑定名称。

While trivial, this example demonstrates how one function enriches the incoming Message with the additional header(s) (non-functional concern), so the other function - echo - can benefit form it. The echo function stays clean and focused on business logic only. You can also see the usage of spring.cloud.stream.function.bindings property to simplify composed binding name.

Functions with multiple input and output arguments

从版本 3.0 开始,Spring Cloud Stream 为同时具有多重输入和/或多重输出(返回值)的函数提供支持。这实际上意味着什么?它针对的是哪种类型的用例?

Starting with version 3.0 spring-cloud-stream provides support for functions that have multiple inputs and/or multiple outputs (return values). What does this actually mean and what type of use cases it is targeting?

  • Big Data: Imagine the source of data you’re dealing with is highly un-organized and contains various types of data elements (e.g., orders, transactions etc) and you effectively need to sort it out.

  • Data aggregation: Another use case may require you to merge data elements from 2+ incoming _streams.

以上仅描述了几种您可能需要使用单个函数来接受和/或产生多重数据_流_ 的用例。这就是我们在此处针对的用例类型。

The above describes just a few use cases where you may need to use a single function to accept and/or produce multiple streams of data. And that is the type of use cases we are targeting here.

另外,请注意,这里对 streams 概念的强调稍有不同。假设只有允许访问实际数据流(不是各个元素)时,此类函数才有价值。为此,我们依赖于 Project Reactor(即 FluxMono)提供的抽象,该抽象在 spring-cloud-functions 带来的依赖作为 classpath 的一部分已经用在了 classpath 中。

Also, note a slightly different emphasis on the concept of streams here. The assumption is that such functions are only valuable if they are given access to the actual streams of data (not the individual elements). So for that we are relying on abstractions provided by Project Reactor (i.e., Flux and Mono) which is already available on the classpath as part of the dependencies brought in by spring-cloud-functions.

另一个重要的方面是多输入和多输出的表示。虽然 Java 为表示 multiple of something 提供了各种不同的抽象,但这些抽象是 a) unboundedb) lack arityc) lack type information,它们在此上下文中都很重要。举例来说,让我们看一下 Collection 或数组,它只允许我们描述 multiple 单一类型或将所有内容向上转换为 Object,从而影响 Spring Cloud Stream 的透明类型转换特性等等。

Another important aspect is representation of multiple input and outputs. While java provides variety of different abstractions to represent multiple of something those abstractions are a) unbounded, b) lack arity and c) lack type information which are all important in this context. As an example, let’s look at Collection or an array which only allows us to describe multiple of a single type or up-cast everything to an Object, affecting the transparent type conversion feature of spring-cloud-stream and so on.

因此,为了满足所有这些要求,最初的支持依赖于利用 Project Reactor - 元组提供的另一个抽象的签名。不过,我们正在允许更灵活的签名。

So to accommodate all these requirements the initial support is relying on the signature which utilizes another abstraction provided by Project Reactor - Tuples. However, we are working on allowing a more flexible signatures.

请参阅 [Binding and Binding names] 部分以了解用于建立此类应用程序使用的 binding names 的命名约定。

Please refer to [Binding and Binding names] section to understand the naming convention used to establish binding names used by such application.

我们来看几个样例:

Let’s look at the few samples:

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<Tuple2<Flux<String>, Flux<Integer>>, Flux<String>> gather() {
		return tuple -> {
			Flux<String> stringStream = tuple.getT1();
			Flux<String> intStream = tuple.getT2().map(i -> String.valueOf(i));
			return Flux.merge(stringStream, intStream);
		};
	}
}

上面的示例演示了获取两个输入(第一个为 String 类型,第二个为 Integer 类型)并生成单个 String 类型输出的函数。

The above example demonstrates function which takes two inputs (first of type String and second of type Integer) and produces a single output of type String.

因此,对于上面的示例,两个输入绑定将是 gather-in-0gather-in-1,为了一致性,输出绑定也遵循相同的约定并命名为 gather-out-0

So, for the above example the two input bindings will be gather-in-0 and gather-in-1 and for consistency the output binding also follows the same convention and is named gather-out-0.

了解这一点将允许你设置特定于绑定的属性。例如,以下内容将覆盖 gather-in-0 绑定的 content-type:

Knowing that will allow you to set binding specific properties. For example, the following will override content-type for gather-in-0 binding:

--spring.cloud.stream.bindings.gather-in-0.content-type=text/plain
@SpringBootApplication
public class SampleApplication {

	@Bean
	public static Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> scatter() {
		return flux -> {
			Flux<Integer> connectedFlux = flux.publish().autoConnect(2);
			UnicastProcessor even = UnicastProcessor.create();
			UnicastProcessor odd = UnicastProcessor.create();
			Flux<Integer> evenFlux = connectedFlux.filter(number -> number % 2 == 0).doOnNext(number -> even.onNext("EVEN: " + number));
			Flux<Integer> oddFlux = connectedFlux.filter(number -> number % 2 != 0).doOnNext(number -> odd.onNext("ODD: " + number));

			return Tuples.of(Flux.from(even).doOnSubscribe(x -> evenFlux.subscribe()), Flux.from(odd).doOnSubscribe(x -> oddFlux.subscribe()));
		};
	}
}

上面的示例与前面的样例有点相反,它演示了获取单个 Integer 类型输入并生成两个输出(都为 String 类型)的函数。

The above example is somewhat of a the opposite from the previous sample and demonstrates function which takes single input of type Integer and produces two outputs (both of type String).

因此,对于上面的示例,输入绑定是 scatter-in-0,输出绑定是 scatter-out-0scatter-out-1

So, for the above example the input binding is scatter-in-0 and the output bindings are scatter-out-0 and scatter-out-1.

你可以使用以下代码对其进行测试:

And you test it with the following code:

@Test
public void testSingleInputMultiOutput() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					SampleApplication.class))
							.run("--spring.cloud.function.definition=scatter")) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		for (int i = 0; i < 10; i++) {
			inputDestination.send(MessageBuilder.withPayload(String.valueOf(i).getBytes()).build());
		}

		int counter = 0;
		for (int i = 0; i < 5; i++) {
			Message<byte[]> even = outputDestination.receive(0, 0);
			assertThat(even.getPayload()).isEqualTo(("EVEN: " + String.valueOf(counter++)).getBytes());
			Message<byte[]> odd = outputDestination.receive(0, 1);
			assertThat(odd.getPayload()).isEqualTo(("ODD: " + String.valueOf(counter++)).getBytes());
		}
	}
}

Multiple functions in a single application

可能还需要在单个应用程序中对几个消息处理器进行分组。你可以通过声明几个函数来做到这一点。

There may also be a need for grouping several message handlers in a single application. You would do so by defining several functions.

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<String, String> uppercase() {
		return value -> value.toUpperCase();
	}

	@Bean
	public Function<String, String> reverse() {
		return value -> new StringBuilder(value).reverse().toString();
	}
}

在上面的示例中,我们有定义两个函数 uppercasereverse 的配置。因此,首先,如前所述,我们需要注意到存在冲突(多个函数),因此我们需要通过提供 spring.cloud.function.definition 属性来解决冲突,该属性指向我们要绑定的实际函数。除了这里我们将使用 ; 分隔符来指向两个函数(见下面的测试用例)。

In the above example we have configuration which defines two functions uppercase and reverse. So first, as mentioned before, we need to notice that there is a a conflict (more then one function) and therefore we need to resolve it by providing spring.cloud.function.definition property pointing to the actual function we want to bind. Except here we will use ; delimiter to point to both functions (see test case below).

与具有多个输入/输出的函数一样,请参阅 [Binding and Binding names] 部分以了解用于建立此类应用程序使用的 binding names 的命名约定。

As with functions with multiple inputs/outputs, please refer to [Binding and Binding names] section to understand the naming convention used to establish binding names used by such application.

你可以使用以下代码对其进行测试:

And you test it with the following code:

@Test
public void testMultipleFunctions() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					ReactiveFunctionConfiguration.class))
							.run("--spring.cloud.function.definition=uppercase;reverse")) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
		inputDestination.send(inputMessage, "uppercase-in-0");
		inputDestination.send(inputMessage, "reverse-in-0");

		Message<byte[]> outputMessage = outputDestination.receive(0, "uppercase-out-0");
		assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());

		outputMessage = outputDestination.receive(0, "reverse-out-1");
		assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
	}
}

Batch Consumers

当使用支持批量侦听器的 MessageChannelBinder,并且为使用者绑定启用了该特性时,你可以将 spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode 设置为 true 以允许将整个消息批处理传递给 List 中的函数。

When using a MessageChannelBinder that supports batch listeners, and the feature is enabled for the consumer binding, you can set spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode to true to enable the entire batch of messages to be passed to the function in a List.

@Bean
public Function<List<Person>, Person> findFirstPerson() {
    return persons -> persons.get(0);
}

Batch Producers

你还可以通过返回一堆消息来使用生产者端的批量处理概念,这些消息可有效地提供相反的效果,其中集合中的每条消息都将由绑定程序单独发送。

You can also use the concept of batching on the producer side by returning a collection of Messages which effectively provides an inverse effect where each message in the collection will be sent individually by the binder.

考虑以下函数:

Consider the following function:

@Bean
public Function<String, List<Message<String>>> batch() {
	return p -> {
		List<Message<String>> list = new ArrayList<>();
		list.add(MessageBuilder.withPayload(p + ":1").build());
		list.add(MessageBuilder.withPayload(p + ":2").build());
		list.add(MessageBuilder.withPayload(p + ":3").build());
		list.add(MessageBuilder.withPayload(p + ":4").build());
		return list;
	};
}

返回列表中的每条消息都将被单独发送,从而向输出目标发送四条消息。

Each message in the returned list will be sent individually resulting in four messages sent to output destination.

Spring Integration flow as functions

实现函数时,您可能遇到的复杂要求适合 Enterprise Integration Patterns(EIP)类别。这些要求最适合使用 Spring Integration(SI) 这样的框架来处理, Spring Integration 是 EIP 的参考实现。

When you implement a function, you may have complex requirements that fit the category of Enterprise Integration Patterns (EIP). These are best handled by using a framework such as Spring Integration (SI), which is a reference implementation of EIP.

谢天谢地,SI 已经提供了通过 Integration flow as gateway 将集成流作为函数来显示的支持。考虑以下示例:

Thankfully SI already provides support for exposing integration flows as functions via Integration flow as gateway Consider the following sample:

@SpringBootApplication
public class FunctionSampleSpringIntegrationApplication {

	public static void main(String[] args) {
		SpringApplication.run(FunctionSampleSpringIntegrationApplication.class, args);
	}

	@Bean
	public IntegrationFlow uppercaseFlow() {
		return IntegrationFlows.from(MessageFunction.class, "uppercase")
				.<String, String>transform(String::toUpperCase)
				.logAndReply(LoggingHandler.Level.WARN);
	}

	public interface MessageFunction extends Function<Message<String>, Message<String>> {

	}
}

对于熟悉 SI 的人,你可以看到我们定义了一个 IntegrationFlow 类型的 bean,在这里我们申明了我们要公开为 Function<String, String>(使用 SI DSL)的集成流,称为 uppercaseMessageFunction 接口让我们能显性地申明输入和输出的类型,以便正确转换类型。请参阅 [Content Type Negotiation] 部分以了解更多有关类型转换的信息。

For those who are familiar with SI you can see we define a bean of type IntegrationFlow where we declare an integration flow that we want to expose as a Function<String, String> (using SI DSL) called uppercase. The MessageFunction interface lets us explicitly declare the type of the inputs and outputs for proper type conversion. See [Content Type Negotiation] section for more on type conversion.

若要接收原始输入,可以使用 from(Function.class, …​)

To receive raw input you can use from(Function.class, …​).

最终,函数绑定到目标绑定程序公开的输入和输出目标。

The resulting function is bound to the input and output destinations exposed by the target binder.

请参阅 [Binding and Binding names] 部分以了解用于建立此类应用程序使用的 binding names 的命名约定。

Please refer to [Binding and Binding names] section to understand the naming convention used to establish binding names used by such application.

要详细了解 Spring Integration 和 Spring Cloud Stream 之间的互操作性,特别是围绕功能编程模型,您可能会发现 this post 非常有趣,因为它深入研究了通过整合 Spring Integration 和 Spring Cloud Stream/Functions 中的最佳部分可以应用的各种模式。

For more details on interoperability of Spring Integration and Spring Cloud Stream specifically around functional programming model you may find this post very interesting, as it dives a bit deeper into various patterns you can apply by merging the best of Spring Integration and Spring Cloud Stream/Functions.

Using Polled Consumers

Overview

使用轮询使用者时,根据需要轮询 PollableMessageSource。要定义轮询使用者的绑定,需要提供 spring.cloud.stream.pollable-source 属性。

When using polled consumers, you poll the PollableMessageSource on demand. To define binding for polled consumer you need to provide spring.cloud.stream.pollable-source property.

考虑轮询使用者绑定的以下示例:

Consider the following example of a polled consumer binding:

--spring.cloud.stream.pollable-source=myDestination

前一个示例中的轮询源名称 myDestination 将生成 myDestination-in-0 绑定名称,以便与函数式编程模型保持一致。

The pollable-source name myDestination in the preceding example will result in myDestination-in-0 binding name to stay consistent with functional programming model.

给定前一个示例中的轮询使用者,可以使用它,如下所示:

Given the polled consumer in the preceding example, you might use it as follows:

@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
    return args -> {
        while (someCondition()) {
            try {
                if (!destIn.poll(m -> {
                    String newPayload = ((String) m.getPayload()).toUpperCase();
                    destOut.send(new GenericMessage<>(newPayload));
                })) {
                    Thread.sleep(1000);
                }
            }
            catch (Exception e) {
                // handle failure
            }
        }
    };
}

更简单、更像 Spring 的替代方法是配置一个计划任务 Bean。例如:

A less manual and more Spring-like alternative would be to configure a scheduled task bean. For example,

@Scheduled(fixedDelay = 5_000)
public void poll() {
	System.out.println("Polling...");
	this.source.poll(m -> {
		System.out.println(m.getPayload());

	}, new ParameterizedTypeReference<Foo>() { });
}

PollableMessageSource.poll() 方法采用一个 MessageHandler 参数(通常是 Lambda 表达式,如下所示)。如果收到并成功处理消息,则返回 true

The PollableMessageSource.poll() method takes a MessageHandler argument (often a lambda expression, as shown here). It returns true if the message was received and successfully processed.

与消息驱动的使用者一样,如果 MessageHandler 抛出异常,消息将发布到错误通道,如 Error Handling 中所述。

As with message-driven consumers, if the MessageHandler throws an exception, messages are published to error channels, as discussed in Error Handling.

通常,当 MessageHandler 退出时,poll() 方法确认该消息。如果该方法异常退出,则将拒绝该消息(不会重新排队),但请参阅 Handling Errors 。您可以通过承担确认责任来覆盖该行为,如以下示例所示:

Normally, the poll() method acknowledges the message when the MessageHandler exits. If the method exits abnormally, the message is rejected (not re-queued), but see Handling Errors. You can override that behavior by taking responsibility for the acknowledgment, as shown in the following example:

@Bean
public ApplicationRunner poller(PollableMessageSource dest1In, MessageChannel dest2Out) {
    return args -> {
        while (someCondition()) {
            if (!dest1In.poll(m -> {
                StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).noAutoAck();
                // e.g. hand off to another thread which can perform the ack
                // or acknowledge(Status.REQUEUE)

            })) {
                Thread.sleep(1000);
            }
        }
    };
}

您必须在某个时刻 ack(或 nack)消息,以避免资源泄漏。

You must ack (or nack) the message at some point, to avoid resource leaks.

一些消息传递系统(例如 Apache Kafka)在日志中维护一个简单的偏移量。如果传递失败并且重新排入队列 StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).acknowledge(Status.REQUEUE);,则任何后续成功 ack 的消息都会重新传递。

Some messaging systems (such as Apache Kafka) maintain a simple offset in a log. If a delivery fails and is re-queued with StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).acknowledge(Status.REQUEUE);, any later successfully ack’d messages are redelivered.

还有一个重载的 poll 方法,其定义如下:

There is also an overloaded poll method, for which the definition is as follows:

poll(MessageHandler handler, ParameterizedTypeReference<?> type)

type 是一个转换提示,它允许转换传入的消息负载,如以下示例所示:

The type is a conversion hint that allows the incoming message payload to be converted, as shown in the following example:

boolean result = pollableSource.poll(received -> {
			Map<String, Foo> payload = (Map<String, Foo>) received.getPayload();
            ...

		}, new ParameterizedTypeReference<Map<String, Foo>>() {});

Handling Errors

默认情况下,为可轮询源配置错误通道;如果回调抛出异常,ErrorMessage 将发送到错误通道 (<destination>.<group>.errors);此错误通道也桥接到全局 Spring Integration errorChannel

By default, an error channel is configured for the pollable source; if the callback throws an exception, an ErrorMessage is sent to the error channel (<destination>.<group>.errors); this error channel is also bridged to the global Spring Integration errorChannel.

可以使用 @ServiceActivator 订阅任一错误通道以处理错误;如果没有订阅,错误将仅被记录,且消息将被确认成功。如果错误通道服务激活器抛出一个异常,消息将被拒绝(默认情况下)并且不会重新交付。如果服务激活器抛出一个 RequeueCurrentMessageException,消息将在代理处重新排队,并在随后的轮询中再次被检索。

You can subscribe to either error channel with a @ServiceActivator to handle errors; without a subscription, the error will simply be logged and the message will be acknowledged as successful. If the error channel service activator throws an exception, the message will be rejected (by default) and won’t be redelivered. If the service activator throws a RequeueCurrentMessageException, the message will be requeued at the broker and will be again retrieved on a subsequent poll.

如果侦听器直接抛出一个 RequeueCurrentMessageException,消息将重新排队,如上所述,并且不会发送到错误通道。

If the listener throws a RequeueCurrentMessageException directly, the message will be requeued, as discussed above, and will not be sent to the error channels.