Event type based routing in Kafka Streams applications

常规消息通道基础绑定器中提供的路由函数在 Kafka 流绑定器中不受支持。然而,Kafka 流绑定器仍然通过传入记录上的记录头事件类型提供路由功能。

Routing functions available in regular message channel based binders are not supported in Kafka Streams binder. However, Kafka Streams binder still provides routing capabilities through the event type record header on the inbound records.

为了根据事件类型启用路由,应用程序必须提供以下属性。

To enable routing based on event types, the application must provide the following property.

spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes

spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes.

这可以是逗号分隔的值。

This can be a comma separated value.

例如,假设我们有以下函数:

For example, lets assume we have this function:

@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
    return input -> input;
}

我们还假设我们只希望在传入记录具有 foobar 事件类型时才执行此函数中的业务逻辑。可以使用绑定上的 eventTypes 属性将其表示如下。

Let us also assume that we only want the business logic in this function to be executed, if the incoming record has event types as foo or bar. That can be expressed as below using the eventTypes property on the binding.

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,bar

现在,当应用程序运行时,绑定器会检查每个传入记录的标头 event_type,并查看是否将其值设置为 foobar。如果它找不到其中任何一个,则会跳过函数执行。

Now, when the application runs, the binder checks each incoming records for the header event_type and see if it has value set as foo or bar. If it does not find either of them, then the function execution will be skipped.

默认情况下,绑定器期望记录头键为 event_type,但可以根据绑定进行更改。例如,如果我们想将此绑定上的头键从默认值更改为 my_event,则可以按如下方式进行更改。

By default, the binder expects the record header key to be event_type, but that can be changed per binding. For instance, if we want to change the header key on this binding to my_event instead of the default, that can be changed as below.

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event.

在 Kafkfa 流绑定器中使用事件路由功能时,它使用字节数组 Serde 反序列化所有传入记录。如果记录头与事件类型匹配,则仅使用实际 Serde 使用配置的或推断的 Serde 执行正确反序列化。如果你在绑定上设置了反序列化异常处理程序,则会导致问题,因为预期的反序列化仅在堆栈中发生,从而导致意外错误。为了解决此问题,你可以在绑定上设置以下属性,以强制绑定器使用配置的或推断的 Serde 而不是字节数组 Serde

When using the event routing feature in Kafkfa Streams binder, it uses the byte array Serde to deserialze all incoming records. If the record headers match the event type, then only it uses the actual Serde to do a proper deserialization using either the configured or the inferred Serde. This introduces issues if you set a deserialization exception handler on the binding as the expected deserialization only happens down the stack causing unexpected errors. In order to address this issue, you can set the following property on the binding to force the binder to use the configured or inferred Serde instead of byte array Serde.

spring.cloud.stream.kafka.streams.bindings.<process-in-0>.consumer.useConfiguredSerdeWhenRoutingEvents

通过这种方式,应用程序在使用事件路由功能时可以立即检测到反序列化问题,并且可以做出适当的处理决策。

This way, the application can detect deserialization issues right away when using the event routing feature and can take appropriate handling decisions.