Event type based routing in Kafka Streams applications
常规消息通道基础绑定器中提供的路由函数在 Kafka 流绑定器中不受支持。然而,Kafka 流绑定器仍然通过传入记录上的记录头事件类型提供路由功能。
Routing functions available in regular message channel based binders are not supported in Kafka Streams binder. However, Kafka Streams binder still provides routing capabilities through the event type record header on the inbound records.
为了根据事件类型启用路由,应用程序必须提供以下属性。
To enable routing based on event types, the application must provide the following property.
spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes
。
spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes
.
这可以是逗号分隔的值。
This can be a comma separated value.
例如,假设我们有以下函数:
For example, lets assume we have this function:
@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
return input -> input;
}
我们还假设我们只希望在传入记录具有 foo
或 bar
事件类型时才执行此函数中的业务逻辑。可以使用绑定上的 eventTypes
属性将其表示如下。
Let us also assume that we only want the business logic in this function to be executed, if the incoming record has event types as foo
or bar
.
That can be expressed as below using the eventTypes
property on the binding.
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,bar
现在,当应用程序运行时,绑定器会检查每个传入记录的标头 event_type
,并查看是否将其值设置为 foo
或 bar
。如果它找不到其中任何一个,则会跳过函数执行。
Now, when the application runs, the binder checks each incoming records for the header event_type
and see if it has value set as foo
or bar
.
If it does not find either of them, then the function execution will be skipped.
默认情况下,绑定器期望记录头键为 event_type
,但可以根据绑定进行更改。例如,如果我们想将此绑定上的头键从默认值更改为 my_event
,则可以按如下方式进行更改。
By default, the binder expects the record header key to be event_type
, but that can be changed per binding.
For instance, if we want to change the header key on this binding to my_event
instead of the default, that can be changed as below.
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event
。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event
.
在 Kafkfa 流绑定器中使用事件路由功能时,它使用字节数组 Serde
反序列化所有传入记录。如果记录头与事件类型匹配,则仅使用实际 Serde
使用配置的或推断的 Serde
执行正确反序列化。如果你在绑定上设置了反序列化异常处理程序,则会导致问题,因为预期的反序列化仅在堆栈中发生,从而导致意外错误。为了解决此问题,你可以在绑定上设置以下属性,以强制绑定器使用配置的或推断的 Serde
而不是字节数组 Serde
。
When using the event routing feature in Kafkfa Streams binder, it uses the byte array Serde
to deserialze all incoming records.
If the record headers match the event type, then only it uses the actual Serde
to do a proper deserialization using either the configured or the inferred Serde
.
This introduces issues if you set a deserialization exception handler on the binding as the expected deserialization only happens down the stack causing unexpected errors.
In order to address this issue, you can set the following property on the binding to force the binder to use the configured or inferred Serde
instead of byte array Serde
.
spring.cloud.stream.kafka.streams.bindings.<process-in-0>.consumer.useConfiguredSerdeWhenRoutingEvents
通过这种方式,应用程序在使用事件路由功能时可以立即检测到反序列化问题,并且可以做出适当的处理决策。
This way, the application can detect deserialization issues right away when using the event routing feature and can take appropriate handling decisions.