Event type based routing in Kafka Streams applications

常规消息通道基础绑定器中提供的路由函数在 Kafka 流绑定器中不受支持。然而,Kafka 流绑定器仍然通过传入记录上的记录头事件类型提供路由功能。

为了根据事件类型启用路由,应用程序必须提供以下属性。

spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes

这可以是逗号分隔的值。

例如,假设我们有以下函数:

@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
    return input -> input;
}

我们还假设我们只希望在传入记录具有 foobar 事件类型时才执行此函数中的业务逻辑。可以使用绑定上的 eventTypes 属性将其表示如下。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,bar

现在,当应用程序运行时,绑定器会检查每个传入记录的标头 event_type,并查看是否将其值设置为 foobar。如果它找不到其中任何一个,则会跳过函数执行。

默认情况下,绑定器期望记录头键为 event_type,但可以根据绑定进行更改。例如,如果我们想将此绑定上的头键从默认值更改为 my_event,则可以按如下方式进行更改。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event

在 Kafkfa 流绑定器中使用事件路由功能时,它使用字节数组 Serde 反序列化所有传入记录。如果记录头与事件类型匹配,则仅使用实际 Serde 使用配置的或推断的 Serde 执行正确反序列化。如果你在绑定上设置了反序列化异常处理程序,则会导致问题,因为预期的反序列化仅在堆栈中发生,从而导致意外错误。为了解决此问题,你可以在绑定上设置以下属性,以强制绑定器使用配置的或推断的 Serde 而不是字节数组 Serde

spring.cloud.stream.kafka.streams.bindings.<process-in-0>.consumer.useConfiguredSerdeWhenRoutingEvents

通过这种方式,应用程序在使用事件路由功能时可以立即检测到反序列化问题,并且可以做出适当的处理决策。