Event Routing

  1. Routing TO:通过 RoutingFunction 将事件路由到特定订阅者,使用消息头或应用程序属性中的指令。

  2. Routing FROM:通过 spring.cloud.stream.sendto.destination 标头或 NewDestinationBindingCallback,动态解析事件订阅者的目的地,允许在运行时确定目标。

在 Spring Cloud Stream 的上下文中,事件路由指的是_a) 将事件路由到特定事件订阅者_或_b) 将事件订阅者产生的事件路由到特定目的地_的能力。我们在这里将其称为路由“TO”和路由“FROM”。

Event Routing, in the context of Spring Cloud Stream, is the ability to either a) route events to a particular event subscriber or b) route events produced by an event subscriber to a particular destination. Here we’ll refer to it as route ‘TO’ and route ‘FROM’.

Routing TO Consumer

可以通过依赖 Spring Cloud Function 3.0 中提供的 RoutingFunction 来实现路由。您需要做的就是通过 --spring.cloud.stream.function.routing.enabled=true 应用程序属性启用它,或提供 spring.cloud.function.routing-expression 属性。一旦启用,RoutingFunction 将绑定到接收所有消息并根据提供的指令将它们路由到其他函数的输入目的地。

Routing can be achieved by relying on RoutingFunction available in Spring Cloud Function 3.0. All you need to do is enable it via --spring.cloud.stream.function.routing.enabled=true application property or provide spring.cloud.function.routing-expression property. Once enabled RoutingFunction will be bound to input destination receiving all the messages and route them to other functions based on the provided instruction.

为了绑定路由目的地的名称是 functionRouter-in-0(请参阅 RoutingFunction.FUNCTION_NAME 和绑定命名约定 [Functional binding names])。

For the purposes of binding the name of the routing destination is functionRouter-in-0 (see RoutingFunction.FUNCTION_NAME and binding naming convention [Functional binding names]).

指令可以随单个消息以及应用程序属性一起提供。

Instruction could be provided with individual messages as well as application properties.

以下是一些示例:

Here are couple of samples:

Using message headers

@SpringBootApplication
public class SampleApplication {

	public static void main(String[] args) {
		SpringApplication.run(SampleApplication.class,
                       "--spring.cloud.stream.function.routing.enabled=true");
	}

	@Bean
	public Consumer<String> even() {
		return value -> {
			System.out.println("EVEN: " + value);
		};
	}

	@Bean
	public Consumer<String> odd() {
		return value -> {
			System.out.println("ODD: " + value);
		};
    }
}

通过向由 Binder(即 rabbit、kafka)公开的 functionRouter-in-0 目的地发送消息,此消息将被路由到适当的(“even”或“odd”)使用者。

By sending a message to the functionRouter-in-0 destination exposed by the binder (i.e., rabbit, kafka), such message will be routed to the appropriate (‘even’ or ‘odd’) Consumer.

默认情况下,RoutingFunction 将查找 spring.cloud.function.definitionspring.cloud.function.routing-expression(对于使用 SpEL 的更动态场景)标头,如果找到,它的值将被视为路由指令。

By default RoutingFunction will look for a spring.cloud.function.definition or spring.cloud.function.routing-expression (for more dynamic scenarios with SpEL) header and if it is found, its value will be treated as the routing instruction.

例如,将 spring.cloud.function.routing-expression 标头设置值为 T(java.lang.System).currentTimeMillis() % 2 == 0 ? 'even' : 'odd' 将最终以半随机的方式将请求路由到 oddeven 函数。此外,对于 SpEL,求值上下文的 root objectMessage,因此您也可以对各个标头(或消息)进行求值 …​.routing-expression=headers['type']

For example, setting spring.cloud.function.routing-expression header to value T(java.lang.System).currentTimeMillis() % 2 == 0 ? 'even' : 'odd' will end up semi-randomly routing request to either odd or even functions. Also, for SpEL, the root object of the evaluation context is Message so you can do evaluation on individual headers (or message) as well …​.routing-expression=headers['type']

Using application properties

spring.cloud.function.routing-expression 和/或 spring.cloud.function.definition 可以作为应用程序属性传递(例如 spring.cloud.function.routing-expression=headers['type'])。

The spring.cloud.function.routing-expression and/or spring.cloud.function.definition can be passed as application properties (e.g., spring.cloud.function.routing-expression=headers['type'].

@SpringBootApplication
public class RoutingStreamApplication {

  public static void main(String[] args) {
      SpringApplication.run(RoutingStreamApplication.class,
	  "--spring.cloud.function.routing-expression="
	  + "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
  }
  @Bean
  public Consumer<Integer> even() {
    return value -> System.out.println("EVEN: " + value);
  }

  @Bean
  public Consumer<Integer> odd() {
    return value -> System.out.println("ODD: " + value);
  }
}

通过应用程序属性传递指令对于响应式函数尤其重要,因为响应式函数仅被调用一次来传递发布者,因此对各个项目的访问受到限制。

Passing instructions via application properties is especially important for reactive functions given that a reactive function is only invoked once to pass the Publisher, so access to the individual items is limited.

Routing Function and output binding

RoutingFunction 是一个 Function,因此与任何其他函数没有什么不同。好吧…​ 几乎没有。

RoutingFunction is a Function and as such treated no differently than any other function. Well. . . almost.

RoutingFunction 路由到另一个 Function 时,其输出将被发送到 RoutingFunction 的输出绑定,该绑定按预期为 functionRouter-in-0。但是,如果 RoutingFunction 路由到 Consumer 时会怎样?换句话说,调用 RoutingFunction 的结果可能不会产生任何可发送到输出绑定的结果,因此没有必要产生任何结果。因此,在创建绑定时,我们对 RoutingFunction 的处理方式略有不同。即使对于您作为用户来说是透明的(实际上您不需要做任何事情),但了解其中的一些机制将有助于您理解其内部工作原理。

When RoutingFunction routes to another Function, its output is sent to the output binding of the RoutingFunction which is functionRouter-in-0 as expected. But what if RoutingFunction routes to a Consumer? In other words the result of invocation of the RoutingFunction may not produce anything to be sent to the output binding, thus making it necessary to even have one. So, we do treat RoutingFunction a little bit differently when we create bindings. And even though it is transparent to you as a user (there is really nothing for you to do), being aware of some of the mechanics would help you understand its inner workings.

因此,规则是:我们从不为 RoutingFunction 创建输出绑定,只创建输入绑定。因此,当您路由到 Consumer 时,RoutingFunction 通过不具有任何输出绑定而有效地变成 Consumer。但是,如果 RoutingFunction 碰巧路由到另一个产生输出的 Function,则 RoutingFunction 的输出绑定将会动态创建,此时 RoutingFunction 将在绑定方面充当常规 Function(同时具有输入和输出绑定)。

So, the rule is; We never create output binding for the RoutingFunction, only input. So when you routing to Consumer, the RoutingFunction effectively becomes as a Consumer by not having any output bindings. However, if RoutingFunction happen to route to another Function which produces the output, the output binding for the RoutingFunction will be create dynamically at which point RoutingFunction will act as a regular Function with regards to bindings (having both input and output bindings).

Routing FROM Consumer

除了静态目的地外,Spring Cloud Stream 允许应用程序向动态绑定的目的地发送消息。例如,当需要在运行时确定目标目的地时,此功能非常有用。应用程序可以通过以下两种方式之一进行此操作。

Aside from static destinations, Spring Cloud Stream lets applications send messages to dynamically bound destinations. This is useful, for example, when the target destination needs to be determined at runtime. Applications can do so in one of two ways.

spring.cloud.stream.sendto.destination

您还可以通过指定 spring.cloud.stream.sendto.destination 标头(该标头设置为要解析的目的地名称)来委派框架动态解析输出目的地。

You can also delegate to the framework to dynamically resolve the output destination by specifying spring.cloud.stream.sendto.destination header set to the name of the destination to be resolved.

请考虑以下示例:

Consider the following example:

@SpringBootApplication
@Controller
public class SourceWithDynamicDestination {

    @Bean
	public Function<String, Message<String>> destinationAsPayload() {
		return value -> {
			return MessageBuilder.withPayload(value)
				.setHeader("spring.cloud.stream.sendto.destination", value).build();};
	}
}

虽然微不足道,但您可以在此示例中清楚地看到,我们的输出是一个消息,其 spring.cloud.stream.sendto.destination 标头设置为输入参数的值。框架将查询此标头,并尝试创建或发现具有该名称的目的地,并将输出发送到该目的地。

Albeit trivial you can clearly see in this example, our output is a Message with spring.cloud.stream.sendto.destination header set to the value of he input argument. The framework will consult this header and will attempt to create or discover a destination with that name and send output to it.

如果目的地名称是预先已知的,则可以将生产者属性配置为与任何其他目的地一样。或者,如果您注册了一个 NewDestinationBindingCallback<> Bean,则在创建绑定之前调用该 Bean。该回调采用绑定程序使用的扩展生产者属性的泛型类型。它有一个方法:

If destination names are known in advance, you can configure the producer properties as with any other destination. Alternatively, if you register a NewDestinationBindingCallback<> bean, it is invoked just before the binding is created. The callback takes the generic type of the extended producer properties used by the binder. It has one method:

void configure(String destinationName, MessageChannel channel, ProducerProperties producerProperties,
        T extendedProducerProperties);

以下示例演示如何使用 RabbitMQ 绑定程序:

The following example shows how to use the RabbitMQ binder:

@Bean
public NewDestinationBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
    return (name, channel, props, extended) -> {
        props.setRequiredGroups("bindThisQueue");
        extended.setQueueNameGroupOnly(true);
        extended.setAutoBindDlq(true);
        extended.setDeadLetterQueueName("myDLQ");
    };
}

如果您需要使用多种绑定器类型支持动态目的地,请使用 Object 作为通用类型并将 extended 参数按需要强制转换为适当的类型。

If you need to support dynamic destinations with multiple binder types, use Object for the generic type and cast the extended argument as needed.

另请参阅 [Using StreamBridge] 部分,以了解如何利用其他选项 (StreamBridge) 用于类似的情况。

Also, please see [Using StreamBridge] section to see how yet another option (StreamBridge) can be utilized for similar cases.