Event Routing

  1. Routing TO:通过 RoutingFunction 将事件路由到特定订阅者,使用消息头或应用程序属性中的指令。

  2. Routing FROM:通过 spring.cloud.stream.sendto.destination 标头或 NewDestinationBindingCallback,动态解析事件订阅者的目的地,允许在运行时确定目标。

在 Spring Cloud Stream 的上下文中,事件路由指的是_a) 将事件路由到特定事件订阅者_或_b) 将事件订阅者产生的事件路由到特定目的地_的能力。我们在这里将其称为路由“TO”和路由“FROM”。

Routing TO Consumer

可以通过依赖 Spring Cloud Function 3.0 中提供的 RoutingFunction 来实现路由。您需要做的就是通过 --spring.cloud.stream.function.routing.enabled=true 应用程序属性启用它,或提供 spring.cloud.function.routing-expression 属性。一旦启用,RoutingFunction 将绑定到接收所有消息并根据提供的指令将它们路由到其他函数的输入目的地。

为了绑定路由目的地的名称是 functionRouter-in-0(请参阅 RoutingFunction.FUNCTION_NAME 和绑定命名约定 [Functional binding names])。

指令可以随单个消息以及应用程序属性一起提供。

以下是一些示例:

Using message headers

@SpringBootApplication
public class SampleApplication {

	public static void main(String[] args) {
		SpringApplication.run(SampleApplication.class,
                       "--spring.cloud.stream.function.routing.enabled=true");
	}

	@Bean
	public Consumer<String> even() {
		return value -> {
			System.out.println("EVEN: " + value);
		};
	}

	@Bean
	public Consumer<String> odd() {
		return value -> {
			System.out.println("ODD: " + value);
		};
    }
}

通过向由 Binder(即 rabbit、kafka)公开的 functionRouter-in-0 目的地发送消息,此消息将被路由到适当的(“even”或“odd”)使用者。

默认情况下,RoutingFunction 将查找 spring.cloud.function.definitionspring.cloud.function.routing-expression(对于使用 SpEL 的更动态场景)标头,如果找到,它的值将被视为路由指令。

例如,将 spring.cloud.function.routing-expression 标头设置值为 T(java.lang.System).currentTimeMillis() % 2 == 0 ? 'even' : 'odd' 将最终以半随机的方式将请求路由到 oddeven 函数。此外,对于 SpEL,求值上下文的 root objectMessage,因此您也可以对各个标头(或消息)进行求值 …​.routing-expression=headers['type']

Using application properties

spring.cloud.function.routing-expression 和/或 spring.cloud.function.definition 可以作为应用程序属性传递(例如 spring.cloud.function.routing-expression=headers['type'])。

@SpringBootApplication
public class RoutingStreamApplication {

  public static void main(String[] args) {
      SpringApplication.run(RoutingStreamApplication.class,
	  "--spring.cloud.function.routing-expression="
	  + "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
  }
  @Bean
  public Consumer<Integer> even() {
    return value -> System.out.println("EVEN: " + value);
  }

  @Bean
  public Consumer<Integer> odd() {
    return value -> System.out.println("ODD: " + value);
  }
}

通过应用程序属性传递指令对于响应式函数尤其重要,因为响应式函数仅被调用一次来传递发布者,因此对各个项目的访问受到限制。

Routing Function and output binding

RoutingFunction 是一个 Function,因此与任何其他函数没有什么不同。好吧…​ 几乎没有。

RoutingFunction 路由到另一个 Function 时,其输出将被发送到 RoutingFunction 的输出绑定,该绑定按预期为 functionRouter-in-0。但是,如果 RoutingFunction 路由到 Consumer 时会怎样?换句话说,调用 RoutingFunction 的结果可能不会产生任何可发送到输出绑定的结果,因此没有必要产生任何结果。因此,在创建绑定时,我们对 RoutingFunction 的处理方式略有不同。即使对于您作为用户来说是透明的(实际上您不需要做任何事情),但了解其中的一些机制将有助于您理解其内部工作原理。

因此,规则是:我们从不为 RoutingFunction 创建输出绑定,只创建输入绑定。因此,当您路由到 Consumer 时,RoutingFunction 通过不具有任何输出绑定而有效地变成 Consumer。但是,如果 RoutingFunction 碰巧路由到另一个产生输出的 Function,则 RoutingFunction 的输出绑定将会动态创建,此时 RoutingFunction 将在绑定方面充当常规 Function(同时具有输入和输出绑定)。

Routing FROM Consumer

除了静态目的地外,Spring Cloud Stream 允许应用程序向动态绑定的目的地发送消息。例如,当需要在运行时确定目标目的地时,此功能非常有用。应用程序可以通过以下两种方式之一进行此操作。

spring.cloud.stream.sendto.destination

您还可以通过指定 spring.cloud.stream.sendto.destination 标头(该标头设置为要解析的目的地名称)来委派框架动态解析输出目的地。

请考虑以下示例:

@SpringBootApplication
@Controller
public class SourceWithDynamicDestination {

    @Bean
	public Function<String, Message<String>> destinationAsPayload() {
		return value -> {
			return MessageBuilder.withPayload(value)
				.setHeader("spring.cloud.stream.sendto.destination", value).build();};
	}
}

虽然微不足道,但您可以在此示例中清楚地看到,我们的输出是一个消息,其 spring.cloud.stream.sendto.destination 标头设置为输入参数的值。框架将查询此标头,并尝试创建或发现具有该名称的目的地,并将输出发送到该目的地。

如果目的地名称是预先已知的,则可以将生产者属性配置为与任何其他目的地一样。或者,如果您注册了一个 NewDestinationBindingCallback<> Bean,则在创建绑定之前调用该 Bean。该回调采用绑定程序使用的扩展生产者属性的泛型类型。它有一个方法:

void configure(String destinationName, MessageChannel channel, ProducerProperties producerProperties,
        T extendedProducerProperties);

以下示例演示如何使用 RabbitMQ 绑定程序:

@Bean
public NewDestinationBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
    return (name, channel, props, extended) -> {
        props.setRequiredGroups("bindThisQueue");
        extended.setQueueNameGroupOnly(true);
        extended.setAutoBindDlq(true);
        extended.setDeadLetterQueueName("myDLQ");
    };
}

如果您需要使用多种绑定器类型支持动态目的地,请使用 Object 作为通用类型并将 extended 参数按需要强制转换为适当的类型。

另请参阅 [Using StreamBridge] 部分,以了解如何利用其他选项 (StreamBridge) 用于类似的情况。