Retrying critical business logic
在一些情况下,你可能希望重试对应用程序至关重要的业务逻辑部分。也许是对关系数据库的外部调用,或从 Kafka Streams 处理器调用 REST 端点。这些调用可能由于网络问题或远程服务不可用等各种原因而失败。大多数时候,如果你能重试,这些失败可能会自行解决。默认情况下,Kafka Streams 绑定器会为所有输入绑定创建 RetryTemplate
bean。
There are scenarios in which you might want to retry parts of your business logic that are critical to the application.
There maybe an external call to a relational database or invoking a REST endpoint from the Kafka Streams processor.
These calls can fail for various reasons such as network issues or remote service unavailability.
More often, these failures may self resolve if you can try them again.
By default, Kafka Streams binder creates RetryTemplate
beans for all the input bindings.
如果函数具有以下签名,
If the function has the following signature,
@Bean
public java.util.function.Consumer<KStream<Object, String>> process()
并且具有默认绑定名称,则 RetryTemplate
将注册为 process-in-0-RetryTemplate
。这是遵循绑定名称(process-in-0
)加上文字“-RetryTemplate
”。在存在多个输入绑定时,将为每个绑定提供单独的 RetryTemplate
bean。如果应用程序中有自定义的 RetryTemplate
bean,并通过 spring.cloud.stream.bindings.<binding-name>.consumer.retryTemplateName
提供,则它优先于任何输入绑定级别重试模板配置属性。
and with default binding name, the RetryTemplate
will be registered as process-in-0-RetryTemplate
.
This is following the convention of binding name (process-in-0
) followed by the literal -RetryTemplate
.
In the case of multiple input bindings, there will be a separate RetryTemplate
bean available per binding.
If there is a custom RetryTemplate
bean available in the application and provided through spring.cloud.stream.bindings.<binding-name>.consumer.retryTemplateName
, then that takes precedence over any input binding level retry template configuration properties.
将绑定中的 RetryTemplate
注入到应用程序后,就可以使用它来重试应用程序的任何关键部分。这里有一个例子:
Once the RetryTemplate
from the binding is injected into the application, it can be used to retry any critical sections of the application.
Here is an example:
@Bean
public java.util.function.Consumer<KStream<Object, String>> process(@Lazy @Qualifier("process-in-0-RetryTemplate") RetryTemplate retryTemplate) {
return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(Object o, String s) {
retryTemplate.execute(context -> {
//Critical business logic goes here.
});
}
@Override
public void close() {
}
});
}
或者,你可以使用自定义 RetryTemplate
,如下所示。
Or you can use a custom RetryTemplate
as below.
@EnableAutoConfiguration
public static class CustomRetryTemplateApp {
@Bean
@StreamRetryTemplate
RetryTemplate fooRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
RetryPolicy retryPolicy = new SimpleRetryPolicy(4);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1);
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(Object o, String s) {
fooRetryTemplate().execute(context -> {
//Critical business logic goes here.
});
}
@Override
public void close() {
}
});
}
}
请注意,在重试用尽后,默认情况下将抛出最后一个异常,导致处理器终止。如果你希望处理异常并继续处理,则可以向 execute
方法添加 RecoveryCallback:这里是一个示例。
Note that when retries are exhausted, by default, the last exception will be thrown, causing the processor to terminate.
If you wish to handle the exception and continue processing, you can add a RecoveryCallback to the execute
method:
Here is an example.
retryTemplate.execute(context -> {
//Critical business logic goes here.
}, context -> {
//Recovery logic goes here.
return null;
));
有关重试模板、重试策略、退避策略等更多信息,请参阅 Spring Retry 项目。
Refer to the Spring Retry project for more information about the RetryTemplate, retry policies, backoff policies and more.