Apache Kafka Streams Support
从 1.1.4 版本开始,Spring for Apache Kafka 为“ Kafka Streams ”提供一流支持。要从 Spring 应用程序使用它, kafka-streams
jar 必须出现在类路径中。它是 Spring for Apache Kafka 项目的一个可选依赖项,并且不会被传递性下载。
Starting with version 1.1.4, Spring for Apache Kafka provides first-class support for Kafka Streams.
To use it from a Spring application, the kafka-streams
jar must be present on classpath.
It is an optional dependency of the Spring for Apache Kafka project and is not downloaded transitively.
Basics
参考 Apache Kafka Streams 文档建议使用 API 的以下方式:
The reference Apache Kafka Streams documentation suggests the following way of using the API:
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
StreamsBuilder builder = ...; // when using the Kafka Streams DSL
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
KafkaStreams streams = new KafkaStreams(builder, config);
// Start the Kafka Streams instance
streams.start();
// Stop the Kafka Streams instance
streams.close();
因此,我们有两个主要组件:
So, we have two main components:
-
StreamsBuilder
: With an API to buildKStream
(orKTable
) instances. -
KafkaStreams
: To manage the lifecycle of those instances.
由一个 |
All |
Spring Management
为了从 Spring 应用程序上下文角度简化使用 Kafka Streams 并通过容器进行生命周期管理,Spring 与 Apache Kafka 引入了 StreamsBuilderFactoryBean
。这是一个 AbstractFactoryBean
实现,用于将 StreamsBuilder
单例实例公开为 bean。以下示例创建了该 bean:
To simplify using Kafka Streams from the Spring application context perspective and use the lifecycle management through a container, Spring for Apache Kafka introduces StreamsBuilderFactoryBean
.
This is an AbstractFactoryBean
implementation to expose a StreamsBuilder
singleton instance as a bean.
The following example creates such a bean:
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
从版本 2.2 开始,流配置现在以 KafkaStreamsConfiguration
对象而非 StreamsConfig
的形式提供。
Starting with version 2.2, the stream configuration is now provided as a KafkaStreamsConfiguration
object rather than a StreamsConfig
.
StreamsBuilderFactoryBean
还实现了 SmartLifecycle
以管理内部 KafkaStreams
实例的生命周期。类似于 Kafka Streams API,在启动 KafkaStreams
之前必须定义 KStream
实例。这同样适用于 Kafka Streams 的 Spring API。因此,当你在 StreamsBuilderFactoryBean
上使用默认 autoStartup = true
时,必须在刷新应用程序上下文之前在 StreamsBuilder
上声明 KStream
实例。例如,KStream
可以是常规 bean 定义,而 Kafka Streams API 没有任何影响。以下示例演示了如何执行此操作:
The StreamsBuilderFactoryBean
also implements SmartLifecycle
to manage the lifecycle of an internal KafkaStreams
instance.
Similar to the Kafka Streams API, you must define the KStream
instances before you start the KafkaStreams
.
That also applies for the Spring API for Kafka Streams.
Therefore, when you use default autoStartup = true
on the StreamsBuilderFactoryBean
, you must declare KStream
instances on the StreamsBuilder
before the application context is refreshed.
For example, KStream
can be a regular bean definition, while the Kafka Streams API is used without any impacts.
The following example shows how to do so:
@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
// Fluent KStream API
return stream;
}
如果你想手动控制生命周期(例如,通过某些条件停止并启动),你可以使用工厂 Bean (&
) prefix直接引用“ StreamsBuilderFactoryBean
”Bean。由于“ StreamsBuilderFactoryBean
”使用其内部“ KafkaStreams
”实例,因此可以安全地再次停止并重新启动它。每个“ start()
”上都创建一个新“ KafkaStreams
”。你还可以考虑使用不同的“ StreamsBuilderFactoryBean
”实例,如果你想分别控制“ KStream
”实例的生命周期。
If you would like to control the lifecycle manually (for example, stopping and starting by some condition), you can reference the StreamsBuilderFactoryBean
bean directly by using the factory bean (&
) prefix.
Since StreamsBuilderFactoryBean
uses its internal KafkaStreams
instance, it is safe to stop and restart it again.
A new KafkaStreams
is created on each start()
.
You might also consider using different StreamsBuilderFactoryBean
instances, if you would like to control the lifecycles for KStream
instances separately.
你还可以指定 StreamsBuilderFactoryBean
中的 KafkaStreams.StateListener
,Thread.UncaughtExceptionHandler
和 StateRestoreListener
选项,这些选项委派给内部 KafkaStreams
实例。另外,除了在 StreamsBuilderFactoryBean
上间接设置这些选项外,从 2.1.5 版开始,你还可以使用 KafkaStreamsCustomizer
回调接口来配置内部 KafkaStreams
实例。请注意,KafkaStreamsCustomizer
覆盖了 StreamsBuilderFactoryBean
提供的选项。如果你需要直接执行一些 KafkaStreams
操作,则可以通过使用 StreamsBuilderFactoryBean.getKafkaStreams()
访问内部 KafkaStreams
实例。你可以按类型自动装配 StreamsBuilderFactoryBean
bean,但你应该确保在 bean 定义中使用全类型,如下例所示:
You also can specify KafkaStreams.StateListener
, Thread.UncaughtExceptionHandler
, and StateRestoreListener
options on the StreamsBuilderFactoryBean
, which are delegated to the internal KafkaStreams
instance.
Also, apart from setting those options indirectly on StreamsBuilderFactoryBean
, starting with version 2.1.5, you can use a KafkaStreamsCustomizer
callback interface to configure an inner KafkaStreams
instance.
Note that KafkaStreamsCustomizer
overrides the options provided by StreamsBuilderFactoryBean
.
If you need to perform some KafkaStreams
operations directly, you can access that internal KafkaStreams
instance by using StreamsBuilderFactoryBean.getKafkaStreams()
.
You can autowire StreamsBuilderFactoryBean
bean by type, but you should be sure to use the full type in the bean definition, as the following example shows:
@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
或者,如果你使用接口 bean 定义,则可以添加 @Qualifier
来按名称注入。以下示例演示了如何执行此操作:
Alternatively, you can add @Qualifier
for injection by name if you use interface bean definition.
The following example shows how to do so:
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
从 2.4.1 版开始,工厂 bean 有一个新属性 infrastructureCustomizer
,类型为 KafkaStreamsInfrastructureCustomizer
;这允许在创建流之前自定义 StreamsBuilder
(例如,添加状态存储)和/或 Topology
。
Starting with version 2.4.1, the factory bean has a new property infrastructureCustomizer
with type KafkaStreamsInfrastructureCustomizer
; this allows customization of the StreamsBuilder
(e.g. to add a state store) and/or the Topology
before the stream is created.
public interface KafkaStreamsInfrastructureCustomizer {
void configureBuilder(StreamsBuilder builder);
void configureTopology(Topology topology);
}
提供了默认无操作实现,以避免在不需要时必须实现这两种方法。
Default no-op implementations are provided to avoid having to implement both methods if one is not required.
提供了 CompositeKafkaStreamsInfrastructureCustomizer
,用于在需要应用多个自定义时。
A CompositeKafkaStreamsInfrastructureCustomizer
is provided, for when you need to apply multiple customizers.
KafkaStreams Micrometer Support
在 2.5.3 版中引入,可以配置 KafkaStreamsMicrometerListener
以自动为工厂 bean 管理的 KafkaStreams
对象注册微型仪表。
Introduced in version 2.5.3, you can configure a KafkaStreamsMicrometerListener
to automatically register micrometer meters for the KafkaStreams
object managed by the factory bean:
streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
Streams JSON Serialization and Deserialization
为了在读取或将数据写入 JSON 格式的主题或状态存储时序列化和反序列化数据,Spring for Apache Kafka 提供了一个使用 JSON 的 JsonSerde
实现,委托给 Serialization, Deserialization, and Message Conversion 中所述的 JsonSerializer
和 JsonDeserializer
。 JsonSerde
实现通过其构造函数(目标类型或 ObjectMapper
)提供相同的配置选项。在以下示例中,我们使用“ JsonSerde
”来序列化和反序列化 Kafka 流的“ Cat
”有效负载(无论需要实例的位置,都可以以类似的方式使用“ JsonSerde
”):
For serializing and deserializing data when reading or writing to topics or state stores in JSON format, Spring for Apache Kafka provides a JsonSerde
implementation that uses JSON, delegating to the JsonSerializer
and JsonDeserializer
described in Serialization, Deserialization, and Message Conversion.
The JsonSerde
implementation provides the same configuration options through its constructor (target type or ObjectMapper
).
In the following example, we use the JsonSerde
to serialize and deserialize the Cat
payload of a Kafka stream (the JsonSerde
can be used in a similar fashion wherever an instance is required):
stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");
从 2.3 版开始,在生产者/消费者工厂中以编程方式构造序列化器/反序列化器时,可以使用流畅的 API,这简化了配置。
When constructing the serializer/deserializer programmatically for use in the producer/consumer factory, since version 2.3, you can use the fluent API, which simplifies configuration.
stream.through(
new JsonSerde<>(MyKeyType.class)
.forKeys()
.noTypeInfo(),
new JsonSerde<>(MyValueType.class)
.noTypeInfo(),
"myTypes");
Using KafkaStreamBrancher
KafkaStreamBrancher
类引入了一种更便捷的方式,以便在 KStream
之上构建条件分支。
The KafkaStreamBrancher
class introduces a more convenient way to build conditional branches on top of KStream
.
考虑以下未使用 KafkaStreamBrancher
的示例:
Consider the following example that does not use KafkaStreamBrancher
:
KStream<String, String>[] branches = builder.stream("source").branch(
(key, value) -> value.contains("A"),
(key, value) -> value.contains("B"),
(key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");
以下示例使用 KafkaStreamBrancher
:
The following example uses KafkaStreamBrancher
:
new KafkaStreamBrancher<String, String>()
.branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
.branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
//default branch should not necessarily be defined in the end of the chain!
.defaultBranch(ks -> ks.to("C"))
.onTopOf(builder.stream("source"));
//onTopOf method returns the provided stream so we can continue with method chaining
Configuration
为了配置 Kafka 流环境,“ StreamsBuilderFactoryBean
”需要一个“ KafkaStreamsConfiguration
”实例。查看 Apache Kafka documentation 了解所有可能选项。
To configure the Kafka Streams environment, the StreamsBuilderFactoryBean
requires a KafkaStreamsConfiguration
instance.
See the Apache Kafka documentation for all possible options.
从版本 2.2 开始,现在使用 KafkaStreamsConfiguration
对象而非 StreamsConfig
来提供流配置。
Starting with version 2.2, the stream configuration is now provided as a KafkaStreamsConfiguration
object, rather than as a StreamsConfig
.
为了避免大多数情况下的样板代码,特别是在开发微服务时,Spring 与 Apache Kafka 提供了 @EnableKafkaStreams
注解,你应该将其放在 @Configuration
类上。你需要做的就是声明一个名为 defaultKafkaStreamsConfig
的 KafkaStreamsConfiguration
bean。在应用程序上下文中自动声明了一个名为 defaultKafkaStreamsBuilder
的 StreamsBuilderFactoryBean
bean。你还可以声明和使用任何其他 StreamsBuilderFactoryBean
bean。你可以通过提供实现 StreamsBuilderFactoryBeanConfigurer
的 bean 来执行该 bean 的其他自定义。如果有多个这样的 bean,它们将根据其 Ordered.order
属性应用。
To avoid boilerplate code for most cases, especially when you develop microservices, Spring for Apache Kafka provides the @EnableKafkaStreams
annotation, which you should place on a @Configuration
class.
All you need is to declare a KafkaStreamsConfiguration
bean named defaultKafkaStreamsConfig
.
A StreamsBuilderFactoryBean
bean, named defaultKafkaStreamsBuilder
, is automatically declared in the application context.
You can declare and use any additional StreamsBuilderFactoryBean
beans as well.
You can perform additional customization of that bean, by providing a bean that implements StreamsBuilderFactoryBeanConfigurer
.
If there are multiple such beans, they will be applied according to their Ordered.order
property.
默认情况下,在工厂 bean 停止时,系统将调用 KafkaStreams.cleanUp()
方法。从 2.1.2 版本开始,工厂 bean 具有其他构造函数,采用 CleanupConfig
对象,该对象具有可用于控制是否在 start()
或 stop()
期间调用 cleanUp()
方法的属性,或两者都不调用。从 2.7 版本开始,默认为从不清理本地状态。
By default, when the factory bean is stopped, the KafkaStreams.cleanUp()
method is called.
Starting with version 2.1.2, the factory bean has additional constructors, taking a CleanupConfig
object that has properties to let you control whether the cleanUp()
method is called during start()
or stop()
or neither.
Starting with version 2.7, the default is to never clean up local state.
Header Enricher
3.0 版本添加了 ContextualProcessor
的 HeaderEnricherProcessor
扩展;提供与已弃用的 HeaderEnricher
相同的功能,后者实现了已弃用的 Transformer
接口。这可用于在流处理中添加标头;标头值是 SpEL 表达式;表达式评估的根对象具有 3 个属性:
Version 3.0 added the HeaderEnricherProcessor
extension of ContextualProcessor
; providing the same functionality as the deprecated HeaderEnricher
which implemented the deprecated Transformer
interface.
This can be used to add headers within the stream processing; the header values are SpEL expressions; the root object of the expression evaluation has 3 properties:
-
record
- theorg.apache.kafka.streams.processor.api.Record
(key
,value
,timestamp
,headers
) -
key
- the key of the current record -
value
- the value of the current record -
context
- theProcessorContext
, allowing access to the current record metadata
这些表达式必须返回一个 byte[]
或一个“ String
”(它将使用 UTF-8
转换为 byte[]
)。
The expressions must return a byte[]
or a String
(which will be converted to byte[]
using UTF-8
).
要在流中使用丰富器:
To use the enricher within a stream:
.process(() -> new HeaderEnricherProcessor(expressions))
处理器不更改 key
或 value
;它只是添加标头。
The processor does not change the key
or value
; it simply adds headers.
对于每条记录,都需要一个新的实例。
You need a new instance for each record.
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))
这是一个简单的示例,添加一个文字标头和一个变量:
Here is a simple example, adding one literal header and one variable:
Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
.process(() -> supplier)
.to(OUTPUT);
MessagingProcessor
3.0 版本添加了 ContextualProcessor
的 MessagingProcessor
扩展;提供与已弃用的 MessagingTransformer
相同的功能,后者实现了已弃用的 Transformer
接口。这允许 Kafka 流拓扑与 Spring 消息传递组件进行交互,例如 Spring 集成流。变换器需要 MessagingFunction
的实现。
Version 3.0 added the MessagingProcessor
extension of ContextualProcessor
, providing the same functionality as the deprecated MessagingTransformer
which implemented the deprecated Transformer
interface.
This allows a Kafka Streams topology to interact with a Spring Messaging component, such as a Spring Integration flow.
The transformer requires an implementation of MessagingFunction
.
@FunctionalInterface
public interface MessagingFunction {
Message<?> exchange(Message<?> message);
}
Spring Integration 使用其 GatewayProxyFactoryBean
自动提供实施。它还要求 MessagingMessageConverter
将键、值和元数据(包括头)转换为/从 Spring 消息传递 Message<?>
。有关详细信息,请参见 link:https://docs.spring.io/spring-integration/docs/current/reference/html/kafka.html#streams-integration[[Calling a Spring Integration Flow from a KStream
。
Spring Integration automatically provides an implementation using its GatewayProxyFactoryBean
.
It also requires a MessagingMessageConverter
to convert the key, value and metadata (including headers) to/from a Spring Messaging Message<?>
.
See [Calling a Spring Integration Flow from a KStream
] for more information.
Recovery from Deserialization Exceptions
2.3 版引入了 RecoveringDeserializationExceptionHandler
,当出现反序列化异常时,该策略可以执行一些操作。请参阅有关 DeserializationExceptionHandler`的 Kafka 文档,`RecoveringDeserializationExceptionHandler`就是其中一项实现。`RecoveringDeserializationExceptionHandler`已配置有 `ConsumerRecordRecoverer`实现。该框架提供了 `DeadLetterPublishingRecoverer
,它将失败的记录发送到死信主题。有关此恢复程序的详细信息,请参阅 Publishing Dead-letter Records。
Version 2.3 introduced the RecoveringDeserializationExceptionHandler
which can take some action when a deserialization exception occurs.
Refer to the Kafka documentation about DeserializationExceptionHandler
, of which the RecoveringDeserializationExceptionHandler
is an implementation.
The RecoveringDeserializationExceptionHandler
is configured with a ConsumerRecordRecoverer
implementation.
The framework provides the DeadLetterPublishingRecoverer
which sends the failed record to a dead-letter topic.
See Publishing Dead-letter Records for more information about this recoverer.
要配置恢复器,请将以下属性添加到流配置:
To configure the recoverer, add the following properties to your streams configuration:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
...
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
...
return new KafkaStreamsConfiguration(props);
}
@Bean
public DeadLetterPublishingRecoverer recoverer() {
return new DeadLetterPublishingRecoverer(kafkaTemplate(),
(record, ex) -> new TopicPartition("recovererDLQ", -1));
}
当然,recoverer()
bean 可以是你自己实现的 ConsumerRecordRecoverer
。
Of course, the recoverer()
bean can be your own implementation of ConsumerRecordRecoverer
.
Interactive Query Support
从 3.2 版开始,适用于 Apache Kafka 的 Spring 提供了在 Kafka Streams 中进行交互式查询所需的基本工具。交互式查询在有状态的 Kafka Streams 应用程序中非常有用,因为它们提供了一种持续查询应用程序中有状态存储的方式。因此,如果某个应用程序希望实现所考虑系统的当前视图,交互式查询提供了一种实现此目标的方法。如需了解有关交互式查询的详细信息,请参阅此 article。适用于 Apache Kafka 的 Spring 中的支持围绕称为 `KafkaStreamsInteractiveQueryService`的 API 展开,该 API 是 Kafka Streams 库中交互式查询 API 的简化封装。应用程序可以将此服务的实例创建为 Bean,然后稍后使用它按名称检索状态存储。
Starting with version 3.2, Spring for Apache Kafka provides basic facilities required for interactive queries in Kafka Streams.
Interactive queries are useful in stateful Kafka Streams applications since they provide a way to constantly query the stateful stores in the application.
Thus, if an application wants to materialize the current view of the system under consideration, interactive queries provide a way to do that.
To learn more about interacive queries, see this article.
The support in Spring for Apache Kafka is centered around an API called KafkaStreamsInteractiveQueryService
which is a facade around interactive queries APIs in Kafka Streams library.
An application can create an instance of this service as a bean and then later on use it to retrieve the state store by its name.
以下代码片段显示了一个示例。
The following code snippet shows an example.
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
return kafkaStreamsInteractiveQueryService;
}
假设一个 Kafka 流应用程序有一个名为 app-store
的状态存储,那么可以通过 KafkStreamsInteractiveQuery
API 检索该存储,如下所示。
Assuming that a Kafka Streams application has a state store called app-store
, then that store can be retrieved via the KafkStreamsInteractiveQuery
API as show below.
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;
ReadOnlyKeyValueStore<Object, Object> appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());
一旦应用程序获得了对状态存储的访问权限,那么它就可以从中查询键值信息。
Once an application gains access to the state store, then it can query from it for key-value information.
在这种情况下,应用程序使用的状态存储是一个只读键值存储。Kafka 流应用程序可以使用其他类型的状态存储。例如,如果应用程序更喜欢查询基于窗口的存储,则可以在 Kafka 流应用程序业务逻辑中构建该存储,然后稍后检索该存储。因此,KafkaStreamsInteractiveQueryService
中用于检索可查询存储的 API 具有通用存储类型签名,以便最终用户可以分配适当的类型。
In this case, the state store that the application uses is a read-only key value store.
There are other types of state stores that a Kafka Streams application can use.
For instance, if an application prefers to query a window based store, it can build that store in the Kafka Streams application business logic and then later on retrieve it.
Because of this reason, the API to retrieve the queryable store in KafkaStreamsInteractiveQueryService
has a generic store type signature, so that the end-user can assign the proper type.
以下是来自 API 的类型签名。
Here is the type signature from the API.
public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)
调用此方法时,用户可以明确地询问适当的状态存储类型,正如我们在上面的示例中所做的那样。
When calling this method, the user can specifially ask for the proper state store type, as we have done in the above example.
Retrying State Store Retrieval
当尝试使用 KafkaStreamsInteractiveQueryService
检索状态存储时,由于各种原因,状态存储可能无法被找到。如果这些原因是瞬态的,KafkaStreamsInteractiveQueryService
提供了一个选项,可以通过注入自定义 RetryTemplate
来重试状态存储的检索。默认情况下,KafkaStreamsInteractiveQueryService
中使用的 RetryTemmplate
使用三个最大尝试次数,并使用固定后退一秒。
When trying to retrieve the state store using the KafkaStreamsInteractiveQueryService
, there is a chance that the state store might not be found for various reasons.
If those reasons are transitory, KafkaStreamsInteractiveQueryService
provides an option to retry the retrieval of the state store by allowing to inject a custom RetryTemplate
.
By default, the RetryTemmplate
that is used in KafkaStreamsInteractiveQueryService
uses a maximum attempts of three with a fixed backoff of one second.
以下是如何将自定义 RetryTemmplate
注入 KafkaStreamsInteractiveQueryService
,最大尝试次数为十次。
Here is how you can inject a custom RetryTemmplate
into KafkaStreamsInteractiveQueryService
with the maximum attempts of ten.
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
retryTemplate.setRetryPolicy(retryPolicy);
kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
return kafkaStreamsInteractiveQueryService;
}
Querying Remote State Stores
上面显示的用于检索状态存储的 API retrieveQueryableStore
旨在用于本地可用的键值对状态存储。在生产环境中,Kafka 流应用程序很可能基于分区数量进行分布部署。如果一个主题有四个分区,并且同一 Kafka 流处理器有四个实例正在运行,那么每个实例可能负责处理主题中的单个分区。在此场景中,虽然调用 retrieveQueryableStore
可能会返回有效存储,但它可能无法提供实例正在查找的正确结果。让我们假设具有四个分区的主题包含各种密钥的数据,并且单个分区始终负责一个特定密钥。如果调用 retrieveQueryableStore
的实例正在查找此实例未托管的密钥的信息,那么该实例将不会接收到任何数据。这是因为当前的 Kafka 流实例对该密钥一无所知。为了解决这个问题,调用实例首先需要确保它们拥有特定密钥所托管的 Kafka 流处理器实例的主机信息。可从带有以下相同 application.id
的任何 Kafka 流实例中检索主机信息。
The API shown above for retrieving the state store - retrieveQueryableStore
is intended for locally available key-value state stores.
In productions settings, Kafka Streams applications are most likely distributed based on the number of partitions.
If a topic has four partitions and there are four instances of the same Kafka Streams processor running, then each instance maybe responsible for processing a single partition from the topic.
In this scenario, calling retrieveQueryableStore
may not give the correct result that an instance is looking for, although it might return a valid store.
Let’s assume that the topic with four partitions has data about various keys and a single partition is always responsible for a specific key.
If the instance that is calling retrieveQueryableStore
is looking for information about a key that this instance does not host, then it will not receive any data.
This is because the current Kafka Streams instance does not know anything about this key.
To fix this, the calling instance first needs to make sure that they have the host information for the Kafka Streams processor instance where the particular key is hosted.
This can be retrieved from any Kafka Streams instance under the same application.id
as below.
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;
HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());
在上面的示例代码中,调用实例正在从名为 app-store
的状态存储中查询特定密钥 12345
。API 还需要一个对应的密钥序列化器,在本例中是 IntegerSerializer
。Kafka 流遍历同一 application.id
下的所有实例,并尝试找出哪个实例托管此特定密钥,一旦找到,它将返回该主机信息作为 HostInfo
对象。
In the example code above, the calling instance is querying for a particular key 12345
from the state-store named app-store
.
The API also needs a corresponding key serializer, which in this case is the IntegerSerializer
.
Kafka Streams looks through all it’s instances under the same application.id
and tries to find which instance hosts this particular key,
Once found, it returns that host information as a HostInfo
object.
此 API 的外观如下:
This is how the API looks like:
public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)
当以分布式方式同时使用具有相同 `application.id`的 Kafka Streams 处理程序的多个实例时,该应用程序应提供一个 RPC 层,可以通过 RPC 端点(如 REST 端点)查询状态存储。有关此内容的详细信息,请参阅此 article。在使用适用于 Apache Kafka 的 Spring 时,可以通过使用基于 Spring 的 web 技术轻松添加基于 REST 的 Spring 端点。有了 REST 端点后,可用于从任何 Kafka Streams 实例查询状态存储,已知某个实例承载给定 `HostInfo`的键。
When using multiple instances of the Kafka Streams processors of the same application.id
in a distributed way like this, the application is supposed to provide an RPC layer where the state stores can be queried over an RPC endpoint such as a REST one.
See this article for more details on this.
When using Spring for Apache Kafka, it is very easy to add a Spring based REST endpoint by using the spring-web technologies.
Once there is a REST endpoint, then that can be used to query the state stores from any Kafka Streams instance, given the HostInfo
where the key is hosted is known to the instance.
如果密钥托管实例是当前实例,那么应用程序不需要调用 RPC 机制,而是进行 JVM 内调用。然而,麻烦的是应用程序可能不知道进行调用的实例是密钥托管所在的位置,因为特定服务器可能会由于消费者重新平衡而丢失分区。为了解决这个问题,KafkaStreamsInteractiveQueryService
提供了一个便捷的 API,用于通过 API 方法 getCurrentKafkaStreamsApplicationHostInfo()
查询当前主机信息,该方法返回当前 HostInfo
。想法是应用程序首先可以获取有关密钥存储在哪里的信息,然后将 HostInfo
与有关当前实例的信息进行比较。如果 HostInfo
数据匹配,那么它可以通过 retrieveQueryableStore
继续执行简单的 JVM 调用,否则使用 RPC 选项。
If the key hosting the instance is the current instance, then the application does not need to call the RPC mechanism, but rather make an in-JVM call.
However, the trouble is that an application may not know that the instance that is making the call is where the key is hosted because a particular server may lose a partition due to a consumer rebalance.
To fix this issue, KafkaStreamsInteractiveQueryService
provides a convenient API for querying the current host information via an API method getCurrentKafkaStreamsApplicationHostInfo()
that returns the current HostInfo
.
The idea is that the application can first acquire information about where the key is held, and then compare the HostInfo
with the one about the current instance.
If the HostInfo
data matches, then it can proceed with a simple JVM call via the retrieveQueryableStore
, otherwise go with the RPC option.
Kafka Streams Example
以下示例结合了我们在本章中涵盖的各种主题:
The following example combines the various topics we have covered in this chapter:
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new KafkaStreamsConfiguration(props);
}
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> fb.setStateListener((newState, oldState) -> {
System.out.println("State transition from " + oldState + " to " + newState);
});
}
@Bean
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
stream
.mapValues((ValueMapper<String, String>) String::toUpperCase)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(1_000)))
.reduce((String value1, String value2) -> value1 + value2,
Named.as("windowStore"))
.toStream()
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
.filter((i, s) -> s.length() > 40)
.to("streamingTopic2");
stream.print(Printed.toSysOut());
return stream;
}
}