StreamsBuilderFactoryBean configurer

通常需要自定义创建 KafkaStreams 对象的 StreamsBuilderFactoryBean。基于 Spring Kafka 提供的基础支持,粘合剂允许你自定义 StreamsBuilderFactoryBean。你可以使用 StreamsBuilderFactoryBeanConfigurer 来自定义 StreamsBuilderFactoryBean 本身。然后,一旦你通过此配置程序访问了 StreamsBuilderFactoryBean,你就可以使用 KafkaStreamsCustomzier 来自定义相应的 KafkaStreams。这两个自定义程序都是 Spring for Apache Kafka 项目的一部分。 这里有一个使用 StreamsBuilderFactoryBeanConfigurer 的示例。

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return sfb -> sfb.setStateListener((newState, oldState) -> {
         //Do some action here!
    });
}

上述内容展示了你可以执行的自定义 StreamsBuilderFactoryBean 操作。你可以调用 StreamsBuilderFactoryBean 中的任何可用的突变操作来自定义它。该自定义程序将在工厂 bean 启动之前由粘合剂进行调用。 一旦你访问了 StreamsBuilderFactoryBean,你还可以自定义底层的 KafkaStreams 对象。以下是可以这么做的蓝图。

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                });
            }
        });
    };
}

KafkaStreamsCustomizer 将在底层的 KafkaStreams 启动之前由 StreamsBuilderFactoryBeabn 进行调用。 在整个应用程序中只能有一个 StreamsBuilderFactoryBeanConfigurer。那么我们如何解释多个 Kafka Streams 处理器,因为每个处理器都由单独的 StreamsBuilderFactoryBean 对象提供支持?在这种情况下,如果那些处理器的定制需要有所不同,那么应用程序需要基于应用程序 ID 应用一些过滤器。 例如:

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {
        if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
                .equals("processor1-application-id")) {
            factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
                @Override
                public void customize(KafkaStreams kafkaStreams) {
                    kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                    });
                }
            });
        }
    };

Using StreamsBuilderFactoryBeanConfigurer to register a global state store

如上所述,粘合剂不提供一种一流的方式来注册全局状态商店作为一项功能。为此,你需要使用自定义程序。以下是如何做到这一点。

@Bean
public StreamsBuilderFactoryBeanConfigurer customizer() {
    return fb -> {
        try {
            final StreamsBuilder streamsBuilder = fb.getObject();
            streamsBuilder.addGlobalStore(...);
        }
        catch (Exception e) {

        }
    };
}

同样,如果你有多个处理器,你希望通过使用应用程序 ID 过滤掉其他 StreamsBuilderFactoryBean 对象将全局状态存储区附加到正确的 StreamsBuilder

Using StreamsBuilderFactoryBeanConfigurer to register a production exception handler

在错误处理部分,我们指出粘合剂不提供一流的方式来处理生产异常。尽管事实如此,你仍然可以使用 StreamsBuilderFacotryBean 自定义程序来注册生产异常处理程序。见下文。

@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
    return fb -> {
        fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                            CustomProductionExceptionHandler.class);
    };
}

再次声明,如果你有多个处理器,你可能希望针对正确的 StreamsBuilderFactoryBean 来适当设置它。你还可以使用配置属性添加这样的生产异常处理程序(有关该内容的更多信息,请参见下文),但如果你选择采用编程方法,这是一个不错的选择。