Producing and Consuming Messages

你可以简单地编写函数并将其公开为 @Bean 来编写 Spring Cloud Stream 应用程序。还可以使用基于 Spring 集成注解的配置或基于 Spring Cloud Stream 注解的配置,尽管从 spring-cloud-stream 3.x 开始,我们建议使用函数实现。

Spring Cloud Function support

Overview

自 Spring Cloud Stream v2.1 以来,定义 stream handlerssources 的另一个方案是使用对 Spring Cloud Function 内置支持,其中它们可以表示为类型为 java.util.function.[Supplier/Function/Consumer] 的 bean。

若要指定将哪个函数 Bean 绑定到绑定公开的外部目标,必须提供 spring.cloud.function.definition 属性。

如果只有类型为 java.util.function.[Supplier/Function/Consumer] 的单例 bean,由于这样的函数 bean 会被自动发现,因此你可以跳过 spring.cloud.function.definition 属性。但是,为了避免混淆,使用此属性被认为是最佳实践。有时,此自动发现可能会妨碍进程,因为类型为 java.util.function.[Supplier/Function/Consumer] 的单例 bean 可能出于处理消息以外的目的而存在,但由于是单例,因此它会被自动发现并自动绑定。对于这些罕见情况,你可以通过提供 spring.cloud.stream.function.autodetect 属性(将值设置为 false)来禁用自动发现。

这是一个应用程序示例,它将消息处理程序公开为 java.util.function.Function,有效地支持 pass-thru 语义,作为数据消费者和生产者。

@SpringBootApplication
public class MyFunctionBootApp {

	public static void main(String[] args) {
		SpringApplication.run(MyFunctionBootApp.class);
	}

	@Bean
	public Function<String, String> toUpperCase() {
		return s -> s.toUpperCase();
	}
}

在前面的示例中,我们定义了一个类型为 java.util.function.Function 的 Bean,名为 toUpperCase,作为消息处理程序,其“输入”和“输出”必须绑定到提供目标绑定器公开的外部目标。默认情况下,“输入”和“输出”绑定名称将是 toUpperCase-in-0toUpperCase-out-0。请参阅 [函数绑定名称] 一节,了解用于建立绑定名称的命名约定详细信息。

以下是支持其他语义的简单函数应用程序示例:

这是一个公开为 java.util.function.Suppliersource 语义示例

@SpringBootApplication
public static class SourceFromSupplier {

	@Bean
	public Supplier<Date> date() {
		return () -> new Date(12345L);
	}
}

下面是公开为 java.util.function.Consumersink 语义 示例

@SpringBootApplication
public static class SinkFromConsumer {

	@Bean
	public Consumer<String> sink() {
		return System.out::println;
	}
}

Suppliers (Sources)

说到如何触发调用,FunctionConsumer 相当直接。它们是根据发送到其绑定的目标数据(事件)触发的。换言之,它们是经典事件驱动组件。

然而,Supplier 在触发方面属于自己的类别。由于它根据定义是数据的来源(原点),因此不会订阅任何入站目标,因此必须由其他一些机制触发。Supplier 实现也存在问题,此实现可以是 命令式的反应式的,并且与此类提供程序的触发直接相关。

考虑以下示例:

@SpringBootApplication
public static class SupplierConfiguration {

	@Bean
	public Supplier<String> stringSupplier() {
		return () -> "Hello from Supplier";
	}
}

前面的 Supplier Bean 每当其 get() 方法被调用时就会生成一个字符串。然而,谁调用了此方法以及多久调用一次?该框架提供了一种默认轮询机制(回答“谁?”的问题),它将触发对提供程序的调用,默认情况下它将每秒执行一次(回答“多久?”的问题)。换言之,上述配置每秒生成一条消息,并且每条消息都发送到由绑定器公开的 output 目标。若要了解如何定制轮询机制,请参阅 Polling Configuration Properties 一节。

考虑另一个示例:

@SpringBootApplication
public static class SupplierConfiguration {

    @Bean
    public Supplier<Flux<String>> stringSupplier() {
        return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(1000);
                    return "Hello from Supplier";
                } catch (Exception e) {
                    // ignore
                }
            }
        })).subscribeOn(Schedulers.elastic()).share();
    }
}

前面的 Supplier Bean 采用了响应式编程风格。通常不同于命令式提供程序,它仅应触发一次,因为其 get() 方法的调用会生成(提供)消息的连续流,而不是单个消息。

该框架识别编程风格的差异,并保证此类提供程序仅触发一次。

然而,想象一下你想轮询某个数据源并返回表示结果集的有限数据流的用例。反应式编程样式是此类 Supplier 的完美机制。但鉴于生成流的有限性,此类 Supplier 仍需要定期调用。

考虑以下模拟此类用例的示例,它通过生成一个有限数据流来模拟此类用例:

@SpringBootApplication
public static class SupplierConfiguration {

	@PollableBean
	public Supplier<Flux<String>> stringSupplier() {
		return () -> Flux.just("hello", "bye");
	}
}

Bean 本身用 @PollableBean 注释(@Bean 的子集)进行注释,因此向框架发出一个信号,表示虽然此类 supplier 的实现是反应式的,但仍需要对其进行轮询。

PollableBean 中定义了一个 splittable 属性,该属性会向此注释的后处理程序发出信号,指示由注释的组件生成的结果必须拆分,并且默认设置为 true。这意味着框架会拆分返回值,并将其每个项作为一条消息分别发送出去。如果您不希望产生此行为,可以在此点上将其设置为 false。然后,此类提供程序将仅返回生成的 Flux,而不会将其拆分。

Supplier & threading

正如你现在所了解到的,与由事件触发的 FunctionConsumer 不同的,Supplier 没有输入,因此由不同的机制(poller)触发,该机制可能具有不可预测的线程机制。虽然大多数情况下线程机制的细节与函数的下游执行无关,但在某些情况下可能会出现问题,尤其是在具有特定线程关联预期集成的框架中。例如, Spring Cloud Sleuth 依赖于存储在本地线程中的跟踪数据。对于这些情况,我们通过 StreamBridge 提供了另一种机制,用户可以在其中更好地控制线程机制。可以在 Sending arbitrary data to an output (e.g. Foreign event-driven sources) 部分获取更多详细信息。

Consumer (Reactive)

反应式 Consumer 有点特殊,因为它的返回类型为 void,框架无权引用它进行订阅。你很可能不需要编写 Consumer<Flux<?>>,而可以作为 Function<Flux<?>, Mono<Void>> 编写它,将 then 运算符调用为流上的最后一个运算符。

例如:

public Function<Flux<?>, Mono<Void>> consumer() {
	return flux -> flux.map(..).filter(..).then();
}

但是,如果你确实需要编写明确的 Consumer<Flux<?>>,请记得订阅传入的 Flux。

此外,请记住,同样的规则适用于在混合反应式和命令式函数时的函数组合。Spring Cloud Function 确实支持将反应式函数与命令式函数组合,但是你必须知道某些限制。例如,假设你已经将反应式函数与命令式消费者组合。此类组合的结果是反应式 Consumer。然而,无法订阅此类消费者(如本节前面所述),因此此限制只能通过使你的消费者具有反应性并手动订阅(如前面所讨论的),或将你的函数更改为命令式来解决。

Polling Configuration Properties

Spring Cloud Stream 公开了以下属性,并以 spring.integration.poller. 为前缀:

fixedDelay

Fixed delay for default poller in milliseconds.

默认值:1000L。

maxMessagesPerPoll

Maximum messages for each polling event of the default poller.

默认值:1L。

cron

Cron expression value for the Cron Trigger.

默认值:无。

initialDelay

Initial delay for periodic triggers.

默认值:0。

timeUnit

The TimeUnit to apply to delay values.

默认值:MILLISECONDS。

例如,--spring.integration.poller.fixed-delay=2000 将轮询器间隔设置为每隔两秒轮询一次。

Per-binding polling configuration

上一节说明了如何配置将应用于所有绑定的单个默认轮询器。虽然它很适合 Spring Cloud Stream 设计的微服务的模型,其中每个微服务表示一个组件(例如,Supplier),因此默认轮询器配置就足够了,但有一些边缘情况,其中你可能有几个需要不同轮询配置的组件。

对于此类情况,请使用基于绑定的轮询器配置方式。例如,假设你有一个输出绑定 supply-out-0。在这种情况下,你可以使用前缀 spring.cloud.stream.bindings.supply-out-0.producer.poller.. 来为该绑定配置轮询器(例如,spring.cloud.stream.bindings.supply-out-0.producer.poller.fixed-delay=2000)。

Sending arbitrary data to an output (e.g. Foreign event-driven sources)

在某些情况下,数据实际来源可能来自不是 Binder 的外部(外来)系统。例如,数据来源可能是经典的 REST 端点。我们如何将此类来源与 Spring Cloud Stream 使用的功能机制连接起来?

Spring Cloud Stream 提供了两种机制,让我们详细了解一下它们。

在此,对于两个示例,我们将使用绑定到根 Web 上下文的名为 delegateToSupplier 的标准 MVC 端点方法,通过 StreamBridge 机制将传入请求委派给流。

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.output-bindings=toStream");
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		System.out.println("Sending " + body);
		streamBridge.send("toStream", body);
	}
}

此处,我们自动装配一个 StreamBridge Bean,它允许我们将数据发送到输出绑定,从而有效地将非流应用程序与 Spring Cloud Stream 连接起来。请注意,前面的示例没有定义任何源函数(例如,Supplier Bean),从而使框架无法提前创建源绑定,这对于配置包含函数 Bean 的情况是典型的。而且,这很好,因为 StreamBridge 将在首次调用其 send(..) 操作时为不存在的绑定启动输出绑定的创建(如果需要,还包括目标自动配置),并将其缓存以供后续重用(有关更多详细信息,请参阅 StreamBridge and Dynamic Destinations)。

但是,如果你想在初始化(启动)时预先创建输出绑定,你可以受益于 spring.cloud.stream.output-bindings 属性,你可以在其中声明源的名称。提供的名称将用作创建源绑定的触发器。你可以使用“;”表示多个源(多个输出绑定)(例如,--spring.cloud.stream.output-bindings=foo;bar)。

此外,请注意,streamBridge.send(..) 方法对数据使用 Object。这意味着您可以向其发送 POJO 或 Message,并且在发送输出时,它将遵循与从任何函数或提供相同级别一致性(如函数)的功能发送输出时相同的例程。这意味着输出类型转换、分区等都得到尊重,如同它是函数产生的输出一样。

StreamBridge with async send

StreamBridge 使用 Spring Integration 框架提供的发送机制,这是 Spring Cloud Stream 的核心。默认情况下,此机制使用发送者的线程。换句话说,发送处于阻塞状态。虽然在很多情况下这都可以,但在某些情况下,您希望该发送是异步的。要做到这一点,请在调用发送方法之一之前使用 StreamBridgesetAsync(true) 方法。

Observability Context propagation with asynchronous send

在使用框架提供的可观测性支持以及对 Spring 框架提供支持时,突破线程边界会影响可观测性 上下文的 一致性,从而影响您的跟踪历史记录。要避免这种情况,您只需添加 Micrometer 的 context-propagation 依赖即可(请参见下文)

<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>context-propagation</artifactId>
    <version>1.1.0</version>
</dependency>

StreamBridge and Dynamic Destinations

当输出目的地在事先未知的情况下,也可以使用 StreamBridge,类似于 [Routing FROM Consumer] 部分中描述的用例。

我们来看一个示例

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class, args);
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		System.out.println("Sending " + body);
		streamBridge.send("myDestination", body);
	}
}

正如您所看到的,前面的示例与上一个示例非常相似,不同之处在于通过 spring.cloud.stream.output-bindings 属性(未提供)提供了明确的绑定指令。在这里,我们正在向不存在的 myDestination 名称发送数据作为绑定。因此,此名称将被视为动态目的地,如 [Routing FROM Consumer] 部分中所述。

在前面的示例中,我们使用 ApplicationRunner 作为 foreign source 来馈送流。

一个更实际的示例,其中外部源是 REST 端点。

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class);
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		streamBridge.send("myBinding", body);
	}
}

正如您在 delegateToSupplier 方法中所看到的,我们使用 StreamBridge 将数据发送到 myBinding 绑定。在这里,您还受益于 StreamBridge 的动态特性,其中如果 myBinding 不存在,它将自动创建并缓存,否则将使用现有绑定。

缓存动态目的地(绑定)可能会在动态目的地过多时导致内存泄漏。为了获得一些控制级别,我们提供了一种针对输出绑定的自驱逐缓存机制,其默认缓存大小为 10。这意味着如果您的动态目的地大小超过该数字,则可能会有一个现有绑定被驱逐,因此需要重新创建,这会造成轻微的性能下降。您可以通过 spring.cloud.stream.dynamic-destination-cache-size 属性来增加缓存大小,将其设置为所需的值。

curl -H "Content-Type: text/plain" -X POST -d "hello from the other side" http://localhost:8080/

通过展示两个示例,我们要强调这种方法适用于任何类型的外部源。

如果你使用 Solace PubSub+ Binder,Spring Cloud Stream 保留了 scst_targetDestination 标头(可通过 BinderHeaders.TARGET_DESTINATION 检索),该标头允许将消息从其绑定配置的目标重定向到此标头指定的 target 目标。这允许 Binder 管理发布到动态目标所需的资源,从而减轻了框架的负担,并避免了前一个注释中提到的缓存问题。更多信息 here

Output Content Type with StreamBridge

您还可以使用以下方法签名提供特定内容类型 public boolean send(String bindingName, Object data, MimeType outputContentType)。或者,如果您将数据作为 Message 发送,那么它的内容类型会得到尊重。

Using specific binder type with StreamBridge

Spring Cloud Stream 支持多个绑定程序场景。例如,您可能从 Kafka 接收数据并将其发送至 RabbitMQ。

有关多绑定程序场景的更多信息,请参见 [Binders] 部分,具体请参见 [Multiple Binders on the Classpath]

如果您计划使用 StreamBridge,并且在应用程序中配置了多个绑定程序,您还必须告诉 StreamBridge 使用哪个绑定程序。为此,send 方法还有两种变体:

public boolean send(String bindingName, @Nullable String binderType, Object data)

public boolean send(String bindingName, @Nullable String binderType, Object data, MimeType outputContentType)

如您所见,您可以提供一个附加的参数 - binderType,告诉 BindingService 在创建动态绑定时使用哪个绑定程序。

在使用 spring.cloud.stream.output-bindings 属性或在不同的绑定下已创建绑定后,binderType 参数将不再起作用。

Using channel interceptors with StreamBridge

由于 StreamBridge 使用 MessageChannel 来建立输出绑定,因此可以通过 StreamBridge 发送数据时激活通道拦截器。由应用程序决定在 StreamBridge 上应用哪些通道拦截器。除非 Spring Cloud Stream 用 @GlobalChannelInterceptor(patterns = "*") 对其进行注释,否则它不会将检测到的所有通道拦截器注入到 StreamBridge 中。

让我们假设您在应用程序中有以下两个不同的 StreamBridge 绑定。

streamBridge.send("foo-out-0", message);

streamBridge.send("bar-out-0", message);

如果您希望将频道拦截应用于两个 StreamBridge 绑定,那么您可以声明以下的 GlobalChannelInterceptor bean。

@Bean
@GlobalChannelInterceptor(patterns = "*")
public ChannelInterceptor customInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

然而,如果您不喜欢上述全局方法并且希望为每个绑定拥有一个专门的拦截器,那么可以进行以下操作。

@Bean
@GlobalChannelInterceptor(patterns = "foo-*")
public ChannelInterceptor fooInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

@Bean
@GlobalChannelInterceptor(patterns = "bar-*")
public ChannelInterceptor barInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

您可以灵活地将模式变得更加严格或自定义,以满足您的业务需求。

使用此方法,应用程序可以决定注入到 StreamBridge 中的拦截器,而不是应用所有可用的拦截器。

StreamBridge 通过包含 StreamBridge 的所有 send 方法的 StreamOperations 接口提供了一份契约。因此,应用程序可以选择使用 StreamOperations 进行自动装配。当使用 StreamBridge 的单元测试代码时,通过为 StreamOperations 接口提供模拟或类似机制,这将非常有用。

Reactive Functions support

由于 Spring Cloud Function 是建立在 Project Reactor 基础上,因此在实现 SupplierFunctionConsumer 的同时,从响应式编程模型中受益无需做太多工作。

例如:

@SpringBootApplication
public static class SinkFromConsumer {

	@Bean
	public Function<Flux<String>, Flux<String>> reactiveUpperCase() {
		return flux -> flux.map(val -> val.toUpperCase());
	}
}

在选择响应式或命令式编程模型时必须理解一些重要的事项。 完全响应式还是仅限 API? 使用响应式 API 并不一定意味着您可以从其所有响应式特性中受益。换句话说,只有在与兼容系统(如 Reactive Kafka 绑定器)配合使用时,诸如背压之类的功能以及其他高级功能才有效。如果您使用常规 Kafka、Rabbit 或任何其他非响应式绑定器,那么您只能从中体会到响应式 API 本身的便利性,而不能享受到其高级功能,因为流的实际源或目标并非响应式的。 错误处理和重试 在整个手册中,您将查阅到与基于框架的错误处理、重试和其他功能相关的参考,以及与之关联的配置属性。理解它们仅会影响命令式函数这一点很重要,而且对于响应式函数,您不要有相同预期。原因如下:响应式和命令式函数之间存在根本区别。命令式函数是一个_消息处理程序_,它由框架在收到的每个消息上调用。因此,对于 N 个消息,会调用该函数 N 次,并且因为该原因,我们可以包装此函数并添加额外的功能,例如错误处理、重试等。响应式函数是一个_初始化函数_。它仅被调用一次,以便获取用户提供的 Flux/Mono 的引用,以便与框架提供的 Flux/Mono 建立连接。之后,我们(框架)绝对无法了解或控制流。因此,对于响应式函数,当涉及到错误处理和重试时,您必须依赖响应式 API 的丰富性(例如 doOnError().onError*() 等)。

Functional Composition

使用函数编程模型,您还可以从函数组合中受益,在函数组合中,您可以根据一组简单函数动态组合复杂处理程序。作为一个示例,让我们将以下函数 bean 添加到上面定义的应用程序中

@Bean
public Function<String, String> wrapInQuotes() {
	return s -> "\"" + s + "\"";
}

并修改 spring.cloud.function.definition 属性,以反映您将“toUpperCase”和“wrapInQuotes”组合为新函数的意图。为此,Spring Cloud Function 依赖于 |(管道)符号。因此,要完成我们的示例,我们的属性现在看起来像这样:

--spring.cloud.function.definition=toUpperCase|wrapInQuotes

Spring Cloud Function 提供的功能组合支持的一项重大好处是您可以组合 reactiveimperative 函数。

组合的结果是一个单独的函数,如您可能猜到的那样,它可能有一个很长且相当晦涩的名称(例如 foo|bar|baz|xyz. . .),在处理其他配置属性时会造成很大不便。这就是 [Functional binding names] 部分中描述的_描述性绑定名称_特性可以提供帮助的地方。

例如,如果我们希望为 toUpperCase|wrapInQuotes 提供更具描述性的名称,我们可以使用以下属性 spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes-in-0=quotedUpperCaseInput,允许其他配置属性引用该绑定名称(例如 spring.cloud.stream.bindings.quotedUpperCaseInput.destination=myDestination)。

Functional Composition and Cross-cutting Concerns

函数组合实际上可以让您通过将其分解为一组简单且可单独管理/测试的组件来解决复杂性,这些组件在运行时仍然可以表示为一个组件。但这并不是唯一的益处。

您还可以使用组合来解决某些交叉非功能性问题,例如内容丰富。例如,假定您有一个可能缺少某些标题的传入消息,或者某些标题未处于您的业务功能所期望的确切状态中。现在,您可以实现解决这些问题的单独函数,然后将其与主要业务函数组合。

我们来看一个示例

@SpringBootApplication
public class DemoStreamApplication {

	public static void main(String[] args) {
		SpringApplication.run(DemoStreamApplication.class,
				"--spring.cloud.function.definition=enrich|echo",
				"--spring.cloud.stream.function.bindings.enrich|echo-in-0=input",
				"--spring.cloud.stream.bindings.input.destination=myDestination",
				"--spring.cloud.stream.bindings.input.group=myGroup");

	}

	@Bean
	public Function<Message<String>, Message<String>> enrich() {
		return message -> {
			Assert.isTrue(!message.getHeaders().containsKey("foo"), "Should NOT contain 'foo' header");
			return MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
		};
	}

	@Bean
	public Function<Message<String>, Message<String>> echo() {
		return message -> {
			Assert.isTrue(message.getHeaders().containsKey("foo"), "Should contain 'foo' header");
			System.out.println("Incoming message " + message);
			return message;
		};
	}
}

虽然简单,但此示例演示了如何将附加标题(非功能性问题)丰富传入消息,以便其他函数(echo)从该标题中受益。echo 函数保持简洁,仅关注业务逻辑。您还可以看到 spring.cloud.stream.function.bindings 属性的使用,以简化组合的绑定名称。

Functions with multiple input and output arguments

从版本 3.0 开始,Spring Cloud Stream 为同时具有多重输入和/或多重输出(返回值)的函数提供支持。这实际上意味着什么?它针对的是哪种类型的用例?

  • Big Data: Imagine the source of data you&#8217;re dealing with is highly un-organized and contains various types of data elements (e.g., orders, transactions etc) and you effectively need to sort it out.

  • Data aggregation: Another use case may require you to merge data elements from 2+ incoming _streams.

以上仅描述了几种您可能需要使用单个函数来接受和/或产生多重数据_流_ 的用例。这就是我们在此处针对的用例类型。

另外,请注意,这里对 streams 概念的强调稍有不同。假设只有允许访问实际数据流(不是各个元素)时,此类函数才有价值。为此,我们依赖于 Project Reactor(即 FluxMono)提供的抽象,该抽象在 spring-cloud-functions 带来的依赖作为 classpath 的一部分已经用在了 classpath 中。

另一个重要的方面是多输入和多输出的表示。虽然 Java 为表示 multiple of something 提供了各种不同的抽象,但这些抽象是 a) unboundedb) lack arityc) lack type information,它们在此上下文中都很重要。举例来说,让我们看一下 Collection 或数组,它只允许我们描述 multiple 单一类型或将所有内容向上转换为 Object,从而影响 Spring Cloud Stream 的透明类型转换特性等等。

因此,为了满足所有这些要求,最初的支持依赖于利用 Project Reactor - 元组提供的另一个抽象的签名。不过,我们正在允许更灵活的签名。

请参阅 [Binding and Binding names] 部分以了解用于建立此类应用程序使用的 binding names 的命名约定。

我们来看几个样例:

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<Tuple2<Flux<String>, Flux<Integer>>, Flux<String>> gather() {
		return tuple -> {
			Flux<String> stringStream = tuple.getT1();
			Flux<String> intStream = tuple.getT2().map(i -> String.valueOf(i));
			return Flux.merge(stringStream, intStream);
		};
	}
}

上面的示例演示了获取两个输入(第一个为 String 类型,第二个为 Integer 类型)并生成单个 String 类型输出的函数。

因此,对于上面的示例,两个输入绑定将是 gather-in-0gather-in-1,为了一致性,输出绑定也遵循相同的约定并命名为 gather-out-0

了解这一点将允许你设置特定于绑定的属性。例如,以下内容将覆盖 gather-in-0 绑定的 content-type:

--spring.cloud.stream.bindings.gather-in-0.content-type=text/plain
@SpringBootApplication
public class SampleApplication {

	@Bean
	public static Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> scatter() {
		return flux -> {
			Flux<Integer> connectedFlux = flux.publish().autoConnect(2);
			UnicastProcessor even = UnicastProcessor.create();
			UnicastProcessor odd = UnicastProcessor.create();
			Flux<Integer> evenFlux = connectedFlux.filter(number -> number % 2 == 0).doOnNext(number -> even.onNext("EVEN: " + number));
			Flux<Integer> oddFlux = connectedFlux.filter(number -> number % 2 != 0).doOnNext(number -> odd.onNext("ODD: " + number));

			return Tuples.of(Flux.from(even).doOnSubscribe(x -> evenFlux.subscribe()), Flux.from(odd).doOnSubscribe(x -> oddFlux.subscribe()));
		};
	}
}

上面的示例与前面的样例有点相反,它演示了获取单个 Integer 类型输入并生成两个输出(都为 String 类型)的函数。

因此,对于上面的示例,输入绑定是 scatter-in-0,输出绑定是 scatter-out-0scatter-out-1

你可以使用以下代码对其进行测试:

@Test
public void testSingleInputMultiOutput() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					SampleApplication.class))
							.run("--spring.cloud.function.definition=scatter")) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		for (int i = 0; i < 10; i++) {
			inputDestination.send(MessageBuilder.withPayload(String.valueOf(i).getBytes()).build());
		}

		int counter = 0;
		for (int i = 0; i < 5; i++) {
			Message<byte[]> even = outputDestination.receive(0, 0);
			assertThat(even.getPayload()).isEqualTo(("EVEN: " + String.valueOf(counter++)).getBytes());
			Message<byte[]> odd = outputDestination.receive(0, 1);
			assertThat(odd.getPayload()).isEqualTo(("ODD: " + String.valueOf(counter++)).getBytes());
		}
	}
}

Multiple functions in a single application

可能还需要在单个应用程序中对几个消息处理器进行分组。你可以通过声明几个函数来做到这一点。

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<String, String> uppercase() {
		return value -> value.toUpperCase();
	}

	@Bean
	public Function<String, String> reverse() {
		return value -> new StringBuilder(value).reverse().toString();
	}
}

在上面的示例中,我们有定义两个函数 uppercasereverse 的配置。因此,首先,如前所述,我们需要注意到存在冲突(多个函数),因此我们需要通过提供 spring.cloud.function.definition 属性来解决冲突,该属性指向我们要绑定的实际函数。除了这里我们将使用 ; 分隔符来指向两个函数(见下面的测试用例)。

与具有多个输入/输出的函数一样,请参阅 [Binding and Binding names] 部分以了解用于建立此类应用程序使用的 binding names 的命名约定。

你可以使用以下代码对其进行测试:

@Test
public void testMultipleFunctions() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					ReactiveFunctionConfiguration.class))
							.run("--spring.cloud.function.definition=uppercase;reverse")) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
		inputDestination.send(inputMessage, "uppercase-in-0");
		inputDestination.send(inputMessage, "reverse-in-0");

		Message<byte[]> outputMessage = outputDestination.receive(0, "uppercase-out-0");
		assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());

		outputMessage = outputDestination.receive(0, "reverse-out-1");
		assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
	}
}

Batch Consumers

当使用支持批量侦听器的 MessageChannelBinder,并且为使用者绑定启用了该特性时,你可以将 spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode 设置为 true 以允许将整个消息批处理传递给 List 中的函数。

@Bean
public Function<List<Person>, Person> findFirstPerson() {
    return persons -> persons.get(0);
}

Batch Producers

你还可以通过返回一堆消息来使用生产者端的批量处理概念,这些消息可有效地提供相反的效果,其中集合中的每条消息都将由绑定程序单独发送。

考虑以下函数:

@Bean
public Function<String, List<Message<String>>> batch() {
	return p -> {
		List<Message<String>> list = new ArrayList<>();
		list.add(MessageBuilder.withPayload(p + ":1").build());
		list.add(MessageBuilder.withPayload(p + ":2").build());
		list.add(MessageBuilder.withPayload(p + ":3").build());
		list.add(MessageBuilder.withPayload(p + ":4").build());
		return list;
	};
}

返回列表中的每条消息都将被单独发送,从而向输出目标发送四条消息。

Spring Integration flow as functions

实现函数时,您可能遇到的复杂要求适合 Enterprise Integration Patterns(EIP)类别。这些要求最适合使用 Spring Integration(SI) 这样的框架来处理, Spring Integration 是 EIP 的参考实现。

谢天谢地,SI 已经提供了通过 Integration flow as gateway 将集成流作为函数来显示的支持。考虑以下示例:

@SpringBootApplication
public class FunctionSampleSpringIntegrationApplication {

	public static void main(String[] args) {
		SpringApplication.run(FunctionSampleSpringIntegrationApplication.class, args);
	}

	@Bean
	public IntegrationFlow uppercaseFlow() {
		return IntegrationFlows.from(MessageFunction.class, "uppercase")
				.<String, String>transform(String::toUpperCase)
				.logAndReply(LoggingHandler.Level.WARN);
	}

	public interface MessageFunction extends Function<Message<String>, Message<String>> {

	}
}

对于熟悉 SI 的人,你可以看到我们定义了一个 IntegrationFlow 类型的 bean,在这里我们申明了我们要公开为 Function<String, String>(使用 SI DSL)的集成流,称为 uppercaseMessageFunction 接口让我们能显性地申明输入和输出的类型,以便正确转换类型。请参阅 [Content Type Negotiation] 部分以了解更多有关类型转换的信息。

若要接收原始输入,可以使用 from(Function.class, …​)

最终,函数绑定到目标绑定程序公开的输入和输出目标。

请参阅 [Binding and Binding names] 部分以了解用于建立此类应用程序使用的 binding names 的命名约定。

要详细了解 Spring Integration 和 Spring Cloud Stream 之间的互操作性,特别是围绕功能编程模型,您可能会发现 this post 非常有趣,因为它深入研究了通过整合 Spring Integration 和 Spring Cloud Stream/Functions 中的最佳部分可以应用的各种模式。

Using Polled Consumers

Overview

使用轮询使用者时,根据需要轮询 PollableMessageSource。要定义轮询使用者的绑定,需要提供 spring.cloud.stream.pollable-source 属性。

考虑轮询使用者绑定的以下示例:

--spring.cloud.stream.pollable-source=myDestination

前一个示例中的轮询源名称 myDestination 将生成 myDestination-in-0 绑定名称,以便与函数式编程模型保持一致。

给定前一个示例中的轮询使用者,可以使用它,如下所示:

@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
    return args -> {
        while (someCondition()) {
            try {
                if (!destIn.poll(m -> {
                    String newPayload = ((String) m.getPayload()).toUpperCase();
                    destOut.send(new GenericMessage<>(newPayload));
                })) {
                    Thread.sleep(1000);
                }
            }
            catch (Exception e) {
                // handle failure
            }
        }
    };
}

更简单、更像 Spring 的替代方法是配置一个计划任务 Bean。例如:

@Scheduled(fixedDelay = 5_000)
public void poll() {
	System.out.println("Polling...");
	this.source.poll(m -> {
		System.out.println(m.getPayload());

	}, new ParameterizedTypeReference<Foo>() { });
}

PollableMessageSource.poll() 方法采用一个 MessageHandler 参数(通常是 Lambda 表达式,如下所示)。如果收到并成功处理消息,则返回 true

与消息驱动的使用者一样,如果 MessageHandler 抛出异常,消息将发布到错误通道,如 Error Handling 中所述。

通常,当 MessageHandler 退出时,poll() 方法确认该消息。如果该方法异常退出,则将拒绝该消息(不会重新排队),但请参阅 Handling Errors 。您可以通过承担确认责任来覆盖该行为,如以下示例所示:

@Bean
public ApplicationRunner poller(PollableMessageSource dest1In, MessageChannel dest2Out) {
    return args -> {
        while (someCondition()) {
            if (!dest1In.poll(m -> {
                StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).noAutoAck();
                // e.g. hand off to another thread which can perform the ack
                // or acknowledge(Status.REQUEUE)

            })) {
                Thread.sleep(1000);
            }
        }
    };
}

您必须在某个时刻 ack(或 nack)消息,以避免资源泄漏。

一些消息传递系统(例如 Apache Kafka)在日志中维护一个简单的偏移量。如果传递失败并且重新排入队列 StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).acknowledge(Status.REQUEUE);,则任何后续成功 ack 的消息都会重新传递。

还有一个重载的 poll 方法,其定义如下:

poll(MessageHandler handler, ParameterizedTypeReference<?> type)

type 是一个转换提示,它允许转换传入的消息负载,如以下示例所示:

boolean result = pollableSource.poll(received -> {
			Map<String, Foo> payload = (Map<String, Foo>) received.getPayload();
            ...

		}, new ParameterizedTypeReference<Map<String, Foo>>() {});

Handling Errors

默认情况下,为可轮询源配置错误通道;如果回调抛出异常,ErrorMessage 将发送到错误通道 (<destination>.<group>.errors);此错误通道也桥接到全局 Spring Integration errorChannel

可以使用 @ServiceActivator 订阅任一错误通道以处理错误;如果没有订阅,错误将仅被记录,且消息将被确认成功。如果错误通道服务激活器抛出一个异常,消息将被拒绝(默认情况下)并且不会重新交付。如果服务激活器抛出一个 RequeueCurrentMessageException,消息将在代理处重新排队,并在随后的轮询中再次被检索。

如果侦听器直接抛出一个 RequeueCurrentMessageException,消息将重新排队,如上所述,并且不会发送到错误通道。