Testing Applications

  • 嵌入式 Kafka 代理,可以通过 EmbeddedKafkaBroker 管理。

  • KafkaTestUtils 提供了静态方法来使用记录和检索各种记录偏移。

  • JUnit 规则为 EmbeddedKafkaZKBroker 提供了一个封装,用于创建 Kafka 和 Zookeeper 服务器。

  • @EmbeddedKafka 注解简化了 EmbeddedKafkaBroker 的使用,允许自动注入 broker 地址列表。

  • Hamcrest 和 AssertJ 匹配器和条件提供了对消费者记录的断言。

  • 框架提供 MockConsumerFactory 和 MockProducerFactory,允许在不使用实际代理服务器的情况下使用 MockConsumer 和 MockProducer。

spring-kafka-test jar 包含一些有用的实用程序,以帮助测试您的应用程序。

The spring-kafka-test jar contains some useful utilities to assist with testing your applications.

Embedded Kafka Broker

提供了两种实现:

Two implementations are provided:

  • EmbeddedKafkaZKBroker - legacy implementation which starts an embedded Zookeeper instance.

  • EmbeddedKafkaKraftBroker - (default) uses Kraft instead of Zookeeper in combined controller and broker modes (since 3.1).

有几种配置代理的技术,如下文所述。

There are several techniques to configure the broker as discussed in the following sections.

KafkaTestUtils

`org.springframework.kafka.test.utils.KafkaTestUtils`提供了一些静态帮助程序方法来使用记录、检索各种记录偏移量等。有关完整详细信息,请参阅其 Javadocs

org.springframework.kafka.test.utils.KafkaTestUtils provides a number of static helper methods to consume records, retrieve various record offsets, and others. Refer to its Javadocs for complete details.

JUnit

org.springframework.kafka.test.utils.KafkaTestUtils 还提供了一些设置生产者和消费者属性的静态方法。以下列表显示了这些方法签名:

org.springframework.kafka.test.utils.KafkaTestUtils also provides some static methods to set up producer and consumer properties. The following listing shows those method signatures:

/**
 * Set up test properties for an {@code <Integer, String>} consumer.
 * @param group the group id.
 * @param autoCommit the auto commit.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> consumerProps(String group, String autoCommit,
                                       EmbeddedKafkaBroker embeddedKafka) { ... }

/**
 * Set up test properties for an {@code <Integer, String>} producer.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) { ... }

从版本 2.5 开始,consumerProps 方法将 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 设置为 earliest。这是因为在大多数情况下,您希望消费者使用测试用例中发送的任何消息。ConsumerConfig 默认值为 latest,这意味着消费者启动之前测试已经发送的消息不会接收到那些记录。要恢复到以前的行为,请在调用该方法后将该属性设置为 latest

Starting with version 2.5, the consumerProps method sets the ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest. This is because, in most cases, you want the consumer to consume any messages sent in a test case. The ConsumerConfig default is latest which means that messages already sent by a test, before the consumer starts, will not receive those records. To revert to the previous behavior, set the property to latest after calling the method.

使用嵌入式代理时,通常最佳做法是为每个测试使用不同的主题,以防止串扰。如果由于某种原因这是不可行的,请注意 consumeFromEmbeddedTopics 方法的默认行为是在分配后将分配的分区寻找到开头。由于它无权访问消费者属性,因此您必须使用获取 seekToEnd 布尔参数的重载方法来寻找到结尾而不是开头。

When using the embedded broker, it is generally best practice using a different topic for each test, to prevent cross-talk. If this is not possible for some reason, note that the consumeFromEmbeddedTopics method’s default behavior is to seek the assigned partitions to the beginning after assignment. Since it does not have access to the consumer properties, you must use the overloaded method that takes a seekToEnd boolean parameter to seek to the end instead of the beginning.

提供了一个适用于 `EmbeddedKafkaZKBroker`的 JUnit 4 `@Rule`包装器,用于创建嵌入式 Kafka 和嵌入式 Zookeeper 服务器。(有关将 `@EmbeddedKafka`与 JUnit 5 配合使用的信息,请参阅 @EmbeddedKafka Annotation)。以下列表展示了这些方法的签名:

A JUnit 4 @Rule wrapper for the EmbeddedKafkaZKBroker is provided to create an embedded Kafka and an embedded Zookeeper server. (See @EmbeddedKafka Annotation for information about using @EmbeddedKafka with JUnit 5). The following listing shows the signatures of those methods:

/**
 * Create embedded Kafka brokers.
 * @param count the number of brokers.
 * @param controlledShutdown passed into TestUtils.createBrokerConfig.
 * @param topics the topics to create (2 partitions per).
 */
public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { ... }

/**
 *
 * Create embedded Kafka brokers.
 * @param count the number of brokers.
 * @param controlledShutdown passed into TestUtils.createBrokerConfig.
 * @param partitions partitions per topic.
 * @param topics the topics to create.
 */
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { ... }

EmbeddedKafkaKraftBroker 不支持 JUnit4。

The EmbeddedKafkaKraftBroker is not supported with JUnit4.

EmbeddedKafkaBroker 类有一个实用方法,允许您使用它创建的所有主题。以下示例显示了如何使用它:

The EmbeddedKafkaBroker class has a utility method that lets you consume for all the topics it created. The following example shows how to use it:

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);

KafkaTestUtils 有一些实用方法可以从消费者获取结果。以下列表显示了这些方法签名:

The KafkaTestUtils has some utility methods to fetch results from the consumer. The following listing shows those method signatures:

/**
 * Poll the consumer, expecting a single record for the specified topic.
 * @param consumer the consumer.
 * @param topic the topic.
 * @return the record.
 * @throws org.junit.ComparisonFailure if exactly one record is not received.
 */
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... }

/**
 * Poll the consumer for records.
 * @param consumer the consumer.
 * @return the records.
 */
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }

以下示例显示如何使用 KafkaTestUtils

The following example shows how to use KafkaTestUtils:

...
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...

EmbeddedKafkaBroker 启动嵌入式 Kafka 和嵌入式 Zookeeper 服务器时,一个名为 spring.embedded.kafka.brokers 的系统属性被设置为 Kafka 代理的地址,一个名为 spring.embedded.zookeeper.connect 的系统属性被设置为 Zookeeper 的地址。为此属性提供了方便的常量(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERSEmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT)。

When the embedded Kafka and embedded Zookeeper server are started by the EmbeddedKafkaBroker, a system property named spring.embedded.kafka.brokers is set to the address of the Kafka brokers and a system property named spring.embedded.zookeeper.connect is set to the address of Zookeeper. Convenient constants (EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS and EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT) are provided for this property.

与默认 spring.embedded.kafka.brokers 系统属性不同,Kafka 代理的地址可以显示为任意和便捷的属性。为此,可以在启动嵌入式 Kafka 之前设置 spring.embedded.kafka.brokers.propertyEmbeddedKafkaBroker.BROKER_LIST_PROPERTY)系统属性。例如,对于 Spring Boot,预计会设置 spring.kafka.bootstrap-servers 配置属性以自动配置 Kafka 客户端。因此,在使用随机端口运行带有嵌入式 Kafka 的测试之前,我们可以将 spring.embedded.kafka.brokers.property=spring.kafka.bootstrap-servers 设置为一个系统属性 - 并且 EmbeddedKafkaBroker 将使用它公开其代理地址。这现在是此属性的默认值(从版本 3.0.10 开始)。

Instead of default spring.embedded.kafka.brokers system property, the address of the Kafka brokers can be exposed to any arbitrary and convenient property. For this purpose a spring.embedded.kafka.brokers.property (EmbeddedKafkaBroker.BROKER_LIST_PROPERTY) system property can be set before starting an embedded Kafka. For example, with Spring Boot a spring.kafka.bootstrap-servers configuration property is expected to be set for auto-configuring Kafka client, respectively. So, before running tests with an embedded Kafka on random ports, we can set spring.embedded.kafka.brokers.property=spring.kafka.bootstrap-servers as a system property - and the EmbeddedKafkaBroker will use it to expose its broker addresses. This is now the default value for this property (starting with version 3.0.10).

使用 EmbeddedKafkaBroker.brokerProperties(Map<String, String>),您可以为 Kafka 服务器提供其他属性。有关可能的代理属性的详细信息,请参阅 Kafka Config

With the EmbeddedKafkaBroker.brokerProperties(Map<String, String>), you can provide additional properties for the Kafka servers. See Kafka Config for more information about possible broker properties.

Configuring Topics

以下示例配置创建名为 cathat 的主题,它们有 5 个分区,创建一个名为 thing1 的主题,它有 10 个分区,并创建一个名为 thing2 的主题,它有 15 个分区:

The following example configuration creates topics called cat and hat with five partitions, a topic called thing1 with 10 partitions, and a topic called thing2 with 15 partitions:

public class MyTests {

    @ClassRule
    private static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, 5, "cat", "hat");

    @Test
    public void test() {
        embeddedKafkaRule.getEmbeddedKafka()
              .addTopics(new NewTopic("thing1", 10, (short) 1), new NewTopic("thing2", 15, (short) 1));
        ...
    }

}

默认情况下,当出现问题(例如,添加已存在的主题)时,addTopics 会引发一个异常。2.6 版本添加了此方法的新版本,该版本返回一个 Map<String, Exception>;键是主题名称,成功时值为 null,失败时值为 Exception

By default, addTopics will throw an exception when problems arise (such as adding a topic that already exists). Version 2.6 added a new version of that method that returns a Map<String, Exception>; the key is the topic name and the value is null for success, or an Exception for a failure.

Using the Same Broker(s) for Multiple Test Classes

使用类似于以下内容,可对多个 test 类使用同一个 Broker:

You can use the same broker for multiple test classes with something similar to the following:

public final class EmbeddedKafkaHolder {

    private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaZKBroker(1, false)
            .brokerListProperty("spring.kafka.bootstrap-servers");

    private static boolean started;

    public static EmbeddedKafkaBroker getEmbeddedKafka() {
        if (!started) {
            try {
                embeddedKafka.afterPropertiesSet();
            }
            catch (Exception e) {
                throw new KafkaException("Embedded broker failed to start", e);
            }
            started = true;
        }
        return embeddedKafka;
    }

    private EmbeddedKafkaHolder() {
        super();
    }

}

这假定 Spring Boot 环境,并且嵌入式 Broker 替换引导服务器属性。

This assumes a Spring Boot environment and the embedded broker replaces the bootstrap servers property.

然后,在每个 test 类中,可以使用类似于以下的内容:

Then, in each test class, you can use something similar to the following:

static {
    EmbeddedKafkaHolder.getEmbeddedKafka().addTopics("topic1", "topic2");
}

private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();

如果你不使用 Spring Boot,则可以使用 broker.getBrokersAsString() 获取引导服务器。

If you are not using Spring Boot, you can obtain the bootstrap servers using broker.getBrokersAsString().

前述示例未提供一种机制,用于在所有测试完成后关闭代理。如果(比如说)您在 Gradle 守护程序中运行测试,这可能存在问题。您不应在这样的情况下使用此项技术,或者您应使用某种技术来在测试完成后对 EmbeddedKafkaBroker 上的 destroy() 发起调用。

The preceding example provides no mechanism for shutting down the broker(s) when all tests are complete. This could be a problem if, say, you run your tests in a Gradle daemon. You should not use this technique in such a situation, or you should use something to call destroy() on the EmbeddedKafkaBroker when your tests are complete.

从 3.0 版本开始,该框架为 JUnit Platform 公开了 GlobalEmbeddedKafkaTestExecutionListener;它在默认情况下处于禁用状态。这需要 JUnit Platform 1.8 或更高版本。此侦听器的目的是为整个 test 计划启动一个全局 EmbeddedKafkaBroker,并在计划结束时停止它。为启用此侦听器,并因此为项目中的所有 test 提供一个全局嵌入式 Kafka 集群,必须通过系统属性或 JUnit Platform 配置将 spring.kafka.global.embedded.enabled 属性设置为 true。此外,可提供以下属性:

Starting with version 3.0, the framework exposes a GlobalEmbeddedKafkaTestExecutionListener for the JUnit Platform; it is disabled by default. This requires JUnit Platform 1.8 or greater. The purpose of this listener is to start one global EmbeddedKafkaBroker for the whole test plan and stop it at the end of the plan. To enable this listener, and therefore have a single global embedded Kafka cluster for all the tests in the project, the spring.kafka.global.embedded.enabled property must be set to true via system properties or JUnit Platform configuration. In addition, these properties can be provided:

  • spring.kafka.embedded.count - the number of Kafka brokers to manage;

  • spring.kafka.embedded.ports - ports (comma-separated value) for every Kafka broker to start, 0 if random port is preferred; the number of values must be equal to the count mentioned above;

  • spring.kafka.embedded.topics - topics (comma-separated value) to create in the started Kafka cluster;

  • spring.kafka.embedded.partitions - number of partitions to provision for the created topics;

  • spring.kafka.embedded.broker.properties.location - the location of the file for additional Kafka broker configuration properties; the value of this property must follow the Spring resource abstraction pattern;

  • spring.kafka.embedded.kraft - when false, use an EmbeddedKafkaZKBroker instead of an EmbeddedKafkaKraftBroker.

从本质上说,这些属性模仿一些 @EmbeddedKafka 属性。

Essentially these properties mimic some of the @EmbeddedKafka attributes.

查看有关配置属性及其如何提供给 JUnit 5 User Guide 的更多信息。例如,可以将 spring.embedded.kafka.brokers.property=my.bootstrap-servers 项添加到测试类路径中的 junit-platform.properties 文件中。从版本 3.0.10 开始,对于使用 Spring Boot 应用程序进行测试,代理程序会自动将此默认设置为 spring.kafka.bootstrap-servers

See more information about configuration properties and how to provide them in the JUnit 5 User Guide. For example, a spring.embedded.kafka.brokers.property=my.bootstrap-servers entry can be added into a junit-platform.properties file in the testing classpath. Starting with version 3.0.10, the broker automatically sets this to spring.kafka.bootstrap-servers, by default, for testing with Spring Boot applications.

不建议在单个测试套件中组合全局嵌入式 Kafka 和每次测试类。这两个套件共享相同的系统属性,因此极有可能导致意外行为。

It is recommended to not combine a global embedded Kafka and per-test class in a single test suite. Both of them share the same system properties, so it is very likely going to lead to unexpected behavior.

spring-kafka-testjunit-jupiter-apijunit-platform-launcher(后者用于支持全局嵌入式代理)具有传递依赖项(如果您希望使用嵌入式代理并且不使用 JUnit,您可能希望排除这些依赖项)。

spring-kafka-test has transitive dependencies on junit-jupiter-api and junit-platform-launcher (the latter to support the global embedded broker). If you wish to use the embedded broker and are NOT using JUnit, you may wish to exclude these dependencies.

@EmbeddedKafka Annotation

我们通常建议你将规则用作 @ClassRule,以避免在 test 之间启动和停止 Broker(并为每个 test 使用不同的主题)。从 2.0 版本开始,如果你使用 Spring 的 test 应用程序上下文缓存,还可声明一个 EmbeddedKafkaBroker bean,以便可以在多个 test 类中使用一个 Broker。为了方便起见,我们提供了一个名为 @EmbeddedKafka 的 test 类级注释,以注册 EmbeddedKafkaBroker bean。以下示例显示了如何使用它:

We generally recommend that you use the rule as a @ClassRule to avoid starting and stopping the broker between tests (and use a different topic for each test). Starting with version 2.0, if you use Spring’s test application context caching, you can also declare a EmbeddedKafkaBroker bean, so a single broker can be used across multiple test classes. For convenience, we provide a test class-level annotation called @EmbeddedKafka to register the EmbeddedKafkaBroker bean. The following example shows how to use it:

@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1,
         topics = {
                 KafkaStreamsTests.STREAMING_TOPIC1,
                 KafkaStreamsTests.STREAMING_TOPIC2 })
public class KafkaStreamsTests {

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Test
    public void someTest() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        Consumer<Integer, String> consumer = cf.createConsumer();
        this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KafkaStreamsTests.STREAMING_TOPIC2);
        ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
        assertThat(replies.count()).isGreaterThanOrEqualTo(1);
    }

    @Configuration
    @EnableKafkaStreams
    public static class KafkaStreamsConfiguration {

        @Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
        private String brokerAddresses;

        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration kStreamsConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
            return new KafkaStreamsConfiguration(props);
        }

    }

}

从 2.2.4 版本开始,还可以使用 @EmbeddedKafka 注释来指定 Kafka 端口属性。

Starting with version 2.2.4, you can also use the @EmbeddedKafka annotation to specify the Kafka ports property.

从 3.1 版本开始,将 kraft 属性设置为 false,以使用 EmbeddedKafkaZKBroker 而不是 EmbeddedKafkaKraftBroker

Starting with version 3.1, set the kraft property to false to use an EmbeddedKafkaZKBroker instead of an EmbeddedKafkaKraftBroker.

以下示例设置了 @EmbeddedKafka 支持属性值解析的 topicsbrokerPropertiesbrokerPropertiesLocation 属性:

The following example sets the topics, brokerProperties, and brokerPropertiesLocation attributes of @EmbeddedKafka support property placeholder resolutions:

@TestPropertySource(locations = "classpath:/test.properties")
@EmbeddedKafka(topics = { "any-topic", "${kafka.topics.another-topic}" },
        brokerProperties = { "log.dir=${kafka.broker.logs-dir}",
                            "listeners=PLAINTEXT://localhost:${kafka.broker.port}",
                            "auto.create.topics.enable=${kafka.broker.topics-enable:true}" },
        brokerPropertiesLocation = "classpath:/broker.properties")

在前一个示例中,属性占位符 ${kafka.topics.another-topic}${kafka.broker.logs-dir}${kafka.broker.port} 从 Spring Environment 解析。此外,还从 brokerPropertiesLocation 指定的 broker.properties 类路径资源中加载了 Broker 属性。属性占位符对于 brokerPropertiesLocation URL 以及在资源中找到的任何属性占位符解析。由 brokerProperties 定义的属性将覆盖在 brokerPropertiesLocation 中找到的属性。

In the preceding example, the property placeholders ${kafka.topics.another-topic}, ${kafka.broker.logs-dir}, and ${kafka.broker.port} are resolved from the Spring Environment. In addition, the broker properties are loaded from the broker.properties classpath resource specified by the brokerPropertiesLocation. Property placeholders are resolved for the brokerPropertiesLocation URL and for any property placeholders found in the resource. Properties defined by brokerProperties override properties found in brokerPropertiesLocation.

可以使用 JUnit 4 或 JUnit 5 使用 @EmbeddedKafka 注释。

You can use the @EmbeddedKafka annotation with JUnit 4 or JUnit 5.

@EmbeddedKafka Annotation with JUnit5

从 2.3 版本开始,有两种方法可以用 @EmbeddedKafka 注释和 JUnit5 一起使用。当与 @SpringJunitConfig 注释一起使用时,嵌入式 Broker 将被添加到 test 应用程序上下文中。可以在类或方法级别将 Broker 自动注入 test 中,以获取 Broker 地址列表。

Starting with version 2.3, there are two ways to use the @EmbeddedKafka annotation with JUnit5. When used with the @SpringJunitConfig annotation, the embedded broker is added to the test application context. You can auto wire the broker into your test, at the class or method level, to get the broker address list.

当*不*使用 Spring test 上下文时,EmbdeddedKafkaCondition 创建一个 Broker;此条件包含一个参数解析器,以便你可以在 test 方法中访问 Broker。

When not using the spring test context, the EmbdeddedKafkaCondition creates a broker; the condition includes a parameter resolver so you can access the broker in your test method.

@EmbeddedKafka
public class EmbeddedKafkaConditionTests {

    @Test
    public void test(EmbeddedKafkaBroker broker) {
        String brokerList = broker.getBrokersAsString();
        ...
    }

}

除非用 ExtendWith(SpringExtension.class)(或元注释)注释的用 @EmbeddedKafka 注释的类,否则将创建一个独立的 Broker(位于 Spring 的 TestContext 之外)。@SpringJunitConfig@SpringBootTest 是这样的元注释,并且当同时出现这两种注释中的任何一种时,将使用基于上下文的 Broker。

A standalone broker (outside the Spring’s TestContext) will be created unless a class annotated @EmbeddedKafka is also annotated (or meta-annotated) with ExtendWith(SpringExtension.class). @SpringJunitConfig and @SpringBootTest are so meta-annotated and the context-based broker will be used when either of those annotations are also present.

当存在 Spring 测试应用程序上下文时,主题和代理属性可以包含属性占位符,只要属性在某处有定义,这些属性就会得到解决。如果不存在 Spring 上下文,这些占位符不会得到解决。

When there is a Spring test application context available, the topics and broker properties can contain property placeholders, which will be resolved as long as the property is defined somewhere. If there is no Spring context available, these placeholders won’t be resolved.

Embedded Broker in @SpringBootTest Annotations

Spring Initializr 现在会自动将 spring-kafka-test 依赖项以测试范围添加到项目配置中。

Spring Initializr now automatically adds the spring-kafka-test dependency in test scope to the project configuration.

如果你的应用程序在 spring-cloud-stream 中使用 Kafka 绑定器,并且希望对 test 使用嵌入式 Broker,则必须删除 spring-cloud-stream-test-support 依赖项,因为它会在 test 案例中用 test 绑定器替换实际绑定器。如果你希望某些 test 使用 test 绑定器而另一些 test 使用嵌入式 Broker,则使用实际绑定器的 test 需要通过在 test 类中排除绑定器来自动配置来禁用 test 绑定器。以下示例显示了如何执行此操作:

If your application uses the Kafka binder in spring-cloud-stream and if you want to use an embedded broker for tests, you must remove the spring-cloud-stream-test-support dependency, because it replaces the real binder with a test binder for test cases. If you wish some tests to use the test binder and some to use the embedded broker, tests that use the real binder need to disable the test binder by excluding the binder auto configuration in the test class. The following example shows how to do so:

@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.autoconfigure.exclude="
    + "org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration")
public class MyApplicationTests {
    ...
}

Spring Boot 应用程序测试中使用嵌入式代理服务器有多种方法。

There are several ways to use an embedded broker in a Spring Boot application test.

包括:

They include:

JUnit4 Class Rule

以下示例演示如何使用 JUnit4 类规则创建嵌入式代理服务器:

The following example shows how to use a JUnit4 class rule to create an embedded broker:

@RunWith(SpringRunner.class)
@SpringBootTest
public class MyApplicationTests {

    @ClassRule
    public static EmbeddedKafkaRule broker = new EmbeddedKafkaRule(1, false, "someTopic")
            .brokerListProperty("spring.kafka.bootstrap-servers");

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    public void test() {
        ...
    }

}

请注意,由于这是一个 Spring Boot 应用程序,因此我们覆盖代理服务器列表属性以设置 Spring Boot 的属性。

Notice that, since this is a Spring Boot application, we override the broker list property to set Spring Boot’s property.

@EmbeddedKafka Annotation or EmbeddedKafkaBroker Bean

以下示例演示如何使用 @EmbeddedKafka 注解创建嵌入式代理服务器:

The following example shows how to use an @EmbeddedKafka Annotation to create an embedded broker:

@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = "someTopic",
        bootstrapServersProperty = "spring.kafka.bootstrap-servers") // this is now the default
public class MyApplicationTests {

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    public void test() {
        ...
    }

}

从版本 3.0.10 开始,bootstrapServersProperty 会自动设置为 spring.kafka.bootstrap-servers

The bootstrapServersProperty is automatically set to spring.kafka.bootstrap-servers by default, starting with version 3.0.10.

Hamcrest Matchers

org.springframework.kafka.test.hamcrest.KafkaMatchers 提供以下匹配器:

The org.springframework.kafka.test.hamcrest.KafkaMatchers provides the following matchers:

/**
 * @param key the key
 * @param <K> the type.
 * @return a Matcher that matches the key in a consumer record.
 */
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Matcher that matches the value in a consumer record.
 */
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... }

/**
 * @param partition the partition.
 * @return a Matcher that matches the partition in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }

/**
 * Matcher testing the timestamp of a {@link ConsumerRecord} assuming the topic has been set with
 * {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}.
 *
 * @param ts timestamp of the consumer record.
 * @return a Matcher that matches the timestamp in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts) {
  return hasTimestamp(TimestampType.CREATE_TIME, ts);
}

/**
 * Matcher testing the timestamp of a {@link ConsumerRecord}
 * @param type timestamp type of the record
 * @param ts timestamp of the consumer record.
 * @return a Matcher that matches the timestamp in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts) {
  return new ConsumerRecordTimestampMatcher(type, ts);
}

AssertJ Conditions

您可以使用以下 AssertJ 条件:

You can use the following AssertJ conditions:

/**
 * @param key the key
 * @param <K> the type.
 * @return a Condition that matches the key in a consumer record.
 */
public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Condition that matches the value in a consumer record.
 */
public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... }

/**
 * @param key the key.
 * @param value the value.
 * @param <K> the key type.
 * @param <V> the value type.
 * @return a Condition that matches the key in a consumer record.
 * @since 2.2.12
 */
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value) { ... }

/**
 * @param partition the partition.
 * @return a Condition that matches the partition in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }

/**
 * @param value the timestamp.
 * @return a Condition that matches the timestamp value in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> timestamp(long value) {
  return new ConsumerRecordTimestampCondition(TimestampType.CREATE_TIME, value);
}

/**
 * @param type the type of timestamp
 * @param value the timestamp.
 * @return a Condition that matches the timestamp value in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value) {
  return new ConsumerRecordTimestampCondition(type, value);
}

Example

以下示例汇集了本章中涵盖的大多数主题:

The following example brings together most of the topics covered in this chapter:

public class KafkaTemplateTests {

    private static final String TEMPLATE_TOPIC = "templateTopic";

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);

    @Test
    public void testTemplate() throws Exception {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
            embeddedKafka.getEmbeddedKafka());
        DefaultKafkaConsumerFactory<Integer, String> cf =
                            new DefaultKafkaConsumerFactory<>(consumerProps);
        ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
        KafkaMessageListenerContainer<Integer, String> container =
                            new KafkaMessageListenerContainer<>(cf, containerProperties);
        final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
        container.setupMessageListener(new MessageListener<Integer, String>() {

            @Override
            public void onMessage(ConsumerRecord<Integer, String> record) {
                System.out.println(record);
                records.add(record);
            }

        });
        container.setBeanName("templateTests");
        container.start();
        ContainerTestUtils.waitForAssignment(container,
                            embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
        Map<String, Object> producerProps =
                            KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
        ProducerFactory<Integer, String> pf =
                            new DefaultKafkaProducerFactory<>(producerProps);
        KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
        template.setDefaultTopic(TEMPLATE_TOPIC);
        template.sendDefault("foo");
        assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
        template.sendDefault(0, 2, "bar");
        ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("bar"));
        template.send(TEMPLATE_TOPIC, 0, 2, "baz");
        received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("baz"));
    }

}

前一个示例使用 Hamcrest 匹配器。使用 AssertJ,最终部分看起来如下代码:

The preceding example uses the Hamcrest matchers. With AssertJ, the final part looks like the following code:

assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
// using individual assertions
assertThat(received).has(key(2));
assertThat(received).has(value("bar"));
assertThat(received).has(partition(0));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
// using allOf()
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));

Mock Consumer and Producer

kafka-clients 库提供 MockConsumerMockProducer 类用于测试目的。

The kafka-clients library provides MockConsumer and MockProducer classes for testing purposes.

如果您希望在使用侦听器容器或 KafkaTemplate 的某些测试中使用这些类,则从 3.0.7 起,框架现在提供 MockConsumerFactoryMockProducerFactory 实现。

If you wish to use these classes in some of your tests with listener containers or KafkaTemplate respectively, starting with version 3.0.7, the framework now provides MockConsumerFactory and MockProducerFactory implementations.

这些工厂可以在侦听器容器和模板中使用,而不是默认工厂,后者需要正在运行(或嵌入式)代理服务器。

These factories can be used in the listener container and template instead of the default factories, which require a running (or embedded) broker.

这是一个返回单个使用者的简单实现的示例:

Here is an example of a simple implementation returning a single consumer:

@Bean
ConsumerFactory<String, String> consumerFactory() {
    MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    TopicPartition topicPartition0 = new TopicPartition("topic", 0);
    List<TopicPartition> topicPartitions = Collections.singletonList(topicPartition0);
    Map<TopicPartition, Long> beginningOffsets = topicPartitions.stream().collect(Collectors
            .toMap(Function.identity(), tp -> 0L));
    consumer.updateBeginningOffsets(beginningOffsets);
    consumer.schedulePollTask(() -> {
        consumer.addRecord(
                new ConsumerRecord<>("topic", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test1",
                        new RecordHeaders(), Optional.empty()));
        consumer.addRecord(
                new ConsumerRecord<>("topic", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test2",
                        new RecordHeaders(), Optional.empty()));
    });
    return new MockConsumerFactory(() -> consumer);
}

如果您希望使用并发进行测试,则工厂的构造函数中的 Supplier lambda 每次都需要创建一个新实例。

If you wish to test with concurrency, the Supplier lambda in the factory’s constructor would need to create a new instance each time.

使用 MockProducerFactory 时,有两个构造函数;一个是创建简单工厂,另一个是创建支持事务的工厂。

With the MockProducerFactory, there are two constructors; one to create a simple factory, and one to create factory that supports transactions.

这里有一些示例:

Here are examples:

@Bean
ProducerFactory<String, String> nonTransFactory() {
    return new MockProducerFactory<>(() ->
            new MockProducer<>(true, new StringSerializer(), new StringSerializer()));
}

@Bean
ProducerFactory<String, String> transFactory() {
    MockProducer<String, String> mockProducer =
            new MockProducer<>(true, new StringSerializer(), new StringSerializer());
    mockProducer.initTransactions();
    return new MockProducerFactory<String, String>((tx, id) -> mockProducer, "defaultTxId");
}

请注意,在第二种情况下,lambda 是一个 BiFunction<Boolean, String>,其中第一个参数为 true 表示调用者需要事务性制作者;可选的第二个参数包含事务性 ID。这可以是默认值(如构造函数中提供的),或者可以通过 KafkaTransactionManager(或 KafkaTemplate 用于本地事务)覆盖,如果已配置。如果您希望基于此值使用其他 MockProducer,则提供事务性 ID。

Notice in the second case, the lambda is a BiFunction<Boolean, String> where the first parameter is true if the caller wants a transactional producer; the optional second parameter contains the transactional id. This can be the default (as provided in the constructor), or can be overridden by the KafkaTransactionManager (or KafkaTemplate for local transactions), if so configured. The transactional id is provided in case you wish to use a different MockProducer based on this value.

如果您在多线程环境中使用制作者,则 BiFunction 应返回多个制作者(可能使用 ThreadLocal 线程绑定)。

If you are using producers in a multi-threaded environment, the BiFunction should return multiple producers (perhaps thread-bound using a ThreadLocal).

事务 MockProducer`s must be initialized for transactions by calling `initTransaction()

Transactional MockProducer`s must be initialized for transactions by calling `initTransaction().

使用 MockProducer 时,如果您不想在每次发送后关闭制作者,则可以提供自定义 MockProducer 实现,它覆盖 close 方法,该方法不调用父类的 close 方法。这对于在同一制作者上验证多次发布而不关闭它时测试非常方便。

When using the MockProducer, if you do not want to close the producer after each send, then you can provide a custom MockProducer implementation that overrides the close method that does not call the close method from the super class. This is convenient for testing, when verifying multiple publishing on the same producer without closing it.

这是一个示例:

Here is an example:

@Bean
MockProducer<String, String> mockProducer() {
    return new MockProducer<>(false, new StringSerializer(), new StringSerializer()) {
        @Override
        public void close() {

        }
    };
}

@Bean
ProducerFactory<String, String> mockProducerFactory(MockProducer<String, String> mockProducer) {
    return new MockProducerFactory<>(() -> mockProducer);
}