Testing Applications

  • 嵌入式 Kafka 代理,可以通过 EmbeddedKafkaBroker 管理。

  • KafkaTestUtils 提供了静态方法来使用记录和检索各种记录偏移。

  • JUnit 规则为 EmbeddedKafkaZKBroker 提供了一个封装,用于创建 Kafka 和 Zookeeper 服务器。

  • @EmbeddedKafka 注解简化了 EmbeddedKafkaBroker 的使用,允许自动注入 broker 地址列表。

  • Hamcrest 和 AssertJ 匹配器和条件提供了对消费者记录的断言。

  • 框架提供 MockConsumerFactory 和 MockProducerFactory,允许在不使用实际代理服务器的情况下使用 MockConsumer 和 MockProducer。

spring-kafka-test jar 包含一些有用的实用程序,以帮助测试您的应用程序。

Embedded Kafka Broker

提供了两种实现:

  • EmbeddedKafkaZKBroker - 旧实现,用于启动嵌入式 Zookeeper 实例。

  • EmbeddedKafkaKraftBroker -(默认)在组合的控制器和代理模式中使用 Kraft 代替 Zookeeper(自 3.1 起)。

有几种配置代理的技术,如下文所述。

KafkaTestUtils

`org.springframework.kafka.test.utils.KafkaTestUtils`提供了一些静态帮助程序方法来使用记录、检索各种记录偏移量等。有关完整详细信息,请参阅其 Javadocs

JUnit

org.springframework.kafka.test.utils.KafkaTestUtils 还提供了一些设置生产者和消费者属性的静态方法。以下列表显示了这些方法签名:

/**
 * Set up test properties for an {@code <Integer, String>} consumer.
 * @param group the group id.
 * @param autoCommit the auto commit.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> consumerProps(String group, String autoCommit,
                                       EmbeddedKafkaBroker embeddedKafka) { ... }

/**
 * Set up test properties for an {@code <Integer, String>} producer.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) { ... }

从版本 2.5 开始,consumerProps 方法将 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 设置为 earliest。这是因为在大多数情况下,您希望消费者使用测试用例中发送的任何消息。ConsumerConfig 默认值为 latest,这意味着消费者启动之前测试已经发送的消息不会接收到那些记录。要恢复到以前的行为,请在调用该方法后将该属性设置为 latest。 使用嵌入式代理时,通常最佳做法是为每个测试使用不同的主题,以防止串扰。如果由于某种原因这是不可行的,请注意 consumeFromEmbeddedTopics 方法的默认行为是在分配后将分配的分区寻找到开头。由于它无权访问消费者属性,因此您必须使用获取 seekToEnd 布尔参数的重载方法来寻找到结尾而不是开头。

提供了一个适用于 `EmbeddedKafkaZKBroker`的 JUnit 4 `@Rule`包装器,用于创建嵌入式 Kafka 和嵌入式 Zookeeper 服务器。(有关将 `@EmbeddedKafka`与 JUnit 5 配合使用的信息,请参阅 @EmbeddedKafka Annotation)。以下列表展示了这些方法的签名:

/**
 * Create embedded Kafka brokers.
 * @param count the number of brokers.
 * @param controlledShutdown passed into TestUtils.createBrokerConfig.
 * @param topics the topics to create (2 partitions per).
 */
public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { ... }

/**
 *
 * Create embedded Kafka brokers.
 * @param count the number of brokers.
 * @param controlledShutdown passed into TestUtils.createBrokerConfig.
 * @param partitions partitions per topic.
 * @param topics the topics to create.
 */
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { ... }

EmbeddedKafkaKraftBroker 不支持 JUnit4。

EmbeddedKafkaBroker 类有一个实用方法,允许您使用它创建的所有主题。以下示例显示了如何使用它:

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);

KafkaTestUtils 有一些实用方法可以从消费者获取结果。以下列表显示了这些方法签名:

/**
 * Poll the consumer, expecting a single record for the specified topic.
 * @param consumer the consumer.
 * @param topic the topic.
 * @return the record.
 * @throws org.junit.ComparisonFailure if exactly one record is not received.
 */
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... }

/**
 * Poll the consumer for records.
 * @param consumer the consumer.
 * @return the records.
 */
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }

以下示例显示如何使用 KafkaTestUtils

...
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...

EmbeddedKafkaBroker 启动嵌入式 Kafka 和嵌入式 Zookeeper 服务器时,一个名为 spring.embedded.kafka.brokers 的系统属性被设置为 Kafka 代理的地址,一个名为 spring.embedded.zookeeper.connect 的系统属性被设置为 Zookeeper 的地址。为此属性提供了方便的常量(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERSEmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT)。

与默认 spring.embedded.kafka.brokers 系统属性不同,Kafka 代理的地址可以显示为任意和便捷的属性。为此,可以在启动嵌入式 Kafka 之前设置 spring.embedded.kafka.brokers.propertyEmbeddedKafkaBroker.BROKER_LIST_PROPERTY)系统属性。例如,对于 Spring Boot,预计会设置 spring.kafka.bootstrap-servers 配置属性以自动配置 Kafka 客户端。因此,在使用随机端口运行带有嵌入式 Kafka 的测试之前,我们可以将 spring.embedded.kafka.brokers.property=spring.kafka.bootstrap-servers 设置为一个系统属性 - 并且 EmbeddedKafkaBroker 将使用它公开其代理地址。这现在是此属性的默认值(从版本 3.0.10 开始)。

使用 EmbeddedKafkaBroker.brokerProperties(Map<String, String>),您可以为 Kafka 服务器提供其他属性。有关可能的代理属性的详细信息,请参阅 Kafka Config

Configuring Topics

以下示例配置创建名为 cathat 的主题,它们有 5 个分区,创建一个名为 thing1 的主题,它有 10 个分区,并创建一个名为 thing2 的主题,它有 15 个分区:

public class MyTests {

    @ClassRule
    private static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, 5, "cat", "hat");

    @Test
    public void test() {
        embeddedKafkaRule.getEmbeddedKafka()
              .addTopics(new NewTopic("thing1", 10, (short) 1), new NewTopic("thing2", 15, (short) 1));
        ...
    }

}

默认情况下,当出现问题(例如,添加已存在的主题)时,addTopics 会引发一个异常。2.6 版本添加了此方法的新版本,该版本返回一个 Map<String, Exception>;键是主题名称,成功时值为 null,失败时值为 Exception

Using the Same Broker(s) for Multiple Test Classes

使用类似于以下内容,可对多个 test 类使用同一个 Broker:

public final class EmbeddedKafkaHolder {

    private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaZKBroker(1, false)
            .brokerListProperty("spring.kafka.bootstrap-servers");

    private static boolean started;

    public static EmbeddedKafkaBroker getEmbeddedKafka() {
        if (!started) {
            try {
                embeddedKafka.afterPropertiesSet();
            }
            catch (Exception e) {
                throw new KafkaException("Embedded broker failed to start", e);
            }
            started = true;
        }
        return embeddedKafka;
    }

    private EmbeddedKafkaHolder() {
        super();
    }

}

这假定 Spring Boot 环境,并且嵌入式 Broker 替换引导服务器属性。

然后,在每个 test 类中,可以使用类似于以下的内容:

static {
    EmbeddedKafkaHolder.getEmbeddedKafka().addTopics("topic1", "topic2");
}

private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();

如果你不使用 Spring Boot,则可以使用 broker.getBrokersAsString() 获取引导服务器。

前述示例未提供一种机制,用于在所有测试完成后关闭代理。如果(比如说)您在 Gradle 守护程序中运行测试,这可能存在问题。您不应在这样的情况下使用此项技术,或者您应使用某种技术来在测试完成后对 EmbeddedKafkaBroker 上的 destroy() 发起调用。

从 3.0 版本开始,该框架为 JUnit Platform 公开了 GlobalEmbeddedKafkaTestExecutionListener;它在默认情况下处于禁用状态。这需要 JUnit Platform 1.8 或更高版本。此侦听器的目的是为整个 test 计划启动一个全局 EmbeddedKafkaBroker,并在计划结束时停止它。为启用此侦听器,并因此为项目中的所有 test 提供一个全局嵌入式 Kafka 集群,必须通过系统属性或 JUnit Platform 配置将 spring.kafka.global.embedded.enabled 属性设置为 true。此外,可提供以下属性:

  • spring.kafka.embedded.count - 要管理的 Kafka 代理数量;

  • spring.kafka.embedded.ports - 每个 Kafka 代理的端口(以逗号分隔),如果首选随机端口,则为 0;值的数量必须等于上述的 count

  • spring.kafka.embedded.topics - 要在已启动的 Kafka 集群中创建的主题(以逗号分隔);

  • spring.kafka.embedded.partitions - 为创建的主题提供的分区数量;

  • spring.kafka.embedded.broker.properties.location - 其他 Kafka 代理配置属性的文件位置;此属性的值必须遵循 Spring 资源抽象模式;

  • spring.kafka.embedded.kraft - 为 false 时,使用 EmbeddedKafkaZKBroker 而不是 EmbeddedKafkaKraftBroker

从本质上说,这些属性模仿一些 @EmbeddedKafka 属性。

查看有关配置属性及其如何提供给 JUnit 5 User Guide 的更多信息。例如,可以将 spring.embedded.kafka.brokers.property=my.bootstrap-servers 项添加到测试类路径中的 junit-platform.properties 文件中。从版本 3.0.10 开始,对于使用 Spring Boot 应用程序进行测试,代理程序会自动将此默认设置为 spring.kafka.bootstrap-servers

不建议在单个测试套件中组合全局嵌入式 Kafka 和每次测试类。这两个套件共享相同的系统属性,因此极有可能导致意外行为。

spring-kafka-testjunit-jupiter-apijunit-platform-launcher(后者用于支持全局嵌入式代理)具有传递依赖项(如果您希望使用嵌入式代理并且不使用 JUnit,您可能希望排除这些依赖项)。

@EmbeddedKafka Annotation

我们通常建议你将规则用作 @ClassRule,以避免在 test 之间启动和停止 Broker(并为每个 test 使用不同的主题)。从 2.0 版本开始,如果你使用 Spring 的 test 应用程序上下文缓存,还可声明一个 EmbeddedKafkaBroker bean,以便可以在多个 test 类中使用一个 Broker。为了方便起见,我们提供了一个名为 @EmbeddedKafka 的 test 类级注释,以注册 EmbeddedKafkaBroker bean。以下示例显示了如何使用它:

@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1,
         topics = {
                 KafkaStreamsTests.STREAMING_TOPIC1,
                 KafkaStreamsTests.STREAMING_TOPIC2 })
public class KafkaStreamsTests {

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Test
    public void someTest() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        Consumer<Integer, String> consumer = cf.createConsumer();
        this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KafkaStreamsTests.STREAMING_TOPIC2);
        ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
        assertThat(replies.count()).isGreaterThanOrEqualTo(1);
    }

    @Configuration
    @EnableKafkaStreams
    public static class KafkaStreamsConfiguration {

        @Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
        private String brokerAddresses;

        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration kStreamsConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
            return new KafkaStreamsConfiguration(props);
        }

    }

}

从 2.2.4 版本开始,还可以使用 @EmbeddedKafka 注释来指定 Kafka 端口属性。

从 3.1 版本开始,将 kraft 属性设置为 false,以使用 EmbeddedKafkaZKBroker 而不是 EmbeddedKafkaKraftBroker

以下示例设置了 @EmbeddedKafka 支持属性值解析的 topicsbrokerPropertiesbrokerPropertiesLocation 属性:

@TestPropertySource(locations = "classpath:/test.properties")
@EmbeddedKafka(topics = { "any-topic", "${kafka.topics.another-topic}" },
        brokerProperties = { "log.dir=${kafka.broker.logs-dir}",
                            "listeners=PLAINTEXT://localhost:${kafka.broker.port}",
                            "auto.create.topics.enable=${kafka.broker.topics-enable:true}" },
        brokerPropertiesLocation = "classpath:/broker.properties")

在前一个示例中,属性占位符 ${kafka.topics.another-topic}${kafka.broker.logs-dir}${kafka.broker.port} 从 Spring Environment 解析。此外,还从 brokerPropertiesLocation 指定的 broker.properties 类路径资源中加载了 Broker 属性。属性占位符对于 brokerPropertiesLocation URL 以及在资源中找到的任何属性占位符解析。由 brokerProperties 定义的属性将覆盖在 brokerPropertiesLocation 中找到的属性。

可以使用 JUnit 4 或 JUnit 5 使用 @EmbeddedKafka 注释。

@EmbeddedKafka Annotation with JUnit5

从 2.3 版本开始,有两种方法可以用 @EmbeddedKafka 注释和 JUnit5 一起使用。当与 @SpringJunitConfig 注释一起使用时,嵌入式 Broker 将被添加到 test 应用程序上下文中。可以在类或方法级别将 Broker 自动注入 test 中,以获取 Broker 地址列表。

当*不*使用 Spring test 上下文时,EmbdeddedKafkaCondition 创建一个 Broker;此条件包含一个参数解析器,以便你可以在 test 方法中访问 Broker。

@EmbeddedKafka
public class EmbeddedKafkaConditionTests {

    @Test
    public void test(EmbeddedKafkaBroker broker) {
        String brokerList = broker.getBrokersAsString();
        ...
    }

}

除非用 ExtendWith(SpringExtension.class)(或元注释)注释的用 @EmbeddedKafka 注释的类,否则将创建一个独立的 Broker(位于 Spring 的 TestContext 之外)。@SpringJunitConfig@SpringBootTest 是这样的元注释,并且当同时出现这两种注释中的任何一种时,将使用基于上下文的 Broker。

当存在 Spring 测试应用程序上下文时,主题和代理属性可以包含属性占位符,只要属性在某处有定义,这些属性就会得到解决。如果不存在 Spring 上下文,这些占位符不会得到解决。

Embedded Broker in @SpringBootTest Annotations

Spring Initializr 现在会自动将 spring-kafka-test 依赖项以测试范围添加到项目配置中。

如果你的应用程序在 spring-cloud-stream 中使用 Kafka 绑定器,并且希望对 test 使用嵌入式 Broker,则必须删除 spring-cloud-stream-test-support 依赖项,因为它会在 test 案例中用 test 绑定器替换实际绑定器。如果你希望某些 test 使用 test 绑定器而另一些 test 使用嵌入式 Broker,则使用实际绑定器的 test 需要通过在 test 类中排除绑定器来自动配置来禁用 test 绑定器。以下示例显示了如何执行此操作:

@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.autoconfigure.exclude="
    + "org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration")
public class MyApplicationTests {
    ...
}

Spring Boot 应用程序测试中使用嵌入式代理服务器有多种方法。

包括:

JUnit4 Class Rule

以下示例演示如何使用 JUnit4 类规则创建嵌入式代理服务器:

@RunWith(SpringRunner.class)
@SpringBootTest
public class MyApplicationTests {

    @ClassRule
    public static EmbeddedKafkaRule broker = new EmbeddedKafkaRule(1, false, "someTopic")
            .brokerListProperty("spring.kafka.bootstrap-servers");

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    public void test() {
        ...
    }

}

请注意,由于这是一个 Spring Boot 应用程序,因此我们覆盖代理服务器列表属性以设置 Spring Boot 的属性。

@EmbeddedKafka Annotation or EmbeddedKafkaBroker Bean

以下示例演示如何使用 @EmbeddedKafka 注解创建嵌入式代理服务器:

@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = "someTopic",
        bootstrapServersProperty = "spring.kafka.bootstrap-servers") // this is now the default
public class MyApplicationTests {

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    public void test() {
        ...
    }

}

从版本 3.0.10 开始,bootstrapServersProperty 会自动设置为 spring.kafka.bootstrap-servers

Hamcrest Matchers

org.springframework.kafka.test.hamcrest.KafkaMatchers 提供以下匹配器:

/**
 * @param key the key
 * @param <K> the type.
 * @return a Matcher that matches the key in a consumer record.
 */
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Matcher that matches the value in a consumer record.
 */
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... }

/**
 * @param partition the partition.
 * @return a Matcher that matches the partition in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }

/**
 * Matcher testing the timestamp of a {@link ConsumerRecord} assuming the topic has been set with
 * {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}.
 *
 * @param ts timestamp of the consumer record.
 * @return a Matcher that matches the timestamp in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts) {
  return hasTimestamp(TimestampType.CREATE_TIME, ts);
}

/**
 * Matcher testing the timestamp of a {@link ConsumerRecord}
 * @param type timestamp type of the record
 * @param ts timestamp of the consumer record.
 * @return a Matcher that matches the timestamp in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts) {
  return new ConsumerRecordTimestampMatcher(type, ts);
}

AssertJ Conditions

您可以使用以下 AssertJ 条件:

/**
 * @param key the key
 * @param <K> the type.
 * @return a Condition that matches the key in a consumer record.
 */
public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Condition that matches the value in a consumer record.
 */
public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... }

/**
 * @param key the key.
 * @param value the value.
 * @param <K> the key type.
 * @param <V> the value type.
 * @return a Condition that matches the key in a consumer record.
 * @since 2.2.12
 */
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value) { ... }

/**
 * @param partition the partition.
 * @return a Condition that matches the partition in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }

/**
 * @param value the timestamp.
 * @return a Condition that matches the timestamp value in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> timestamp(long value) {
  return new ConsumerRecordTimestampCondition(TimestampType.CREATE_TIME, value);
}

/**
 * @param type the type of timestamp
 * @param value the timestamp.
 * @return a Condition that matches the timestamp value in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value) {
  return new ConsumerRecordTimestampCondition(type, value);
}

Example

以下示例汇集了本章中涵盖的大多数主题:

public class KafkaTemplateTests {

    private static final String TEMPLATE_TOPIC = "templateTopic";

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);

    @Test
    public void testTemplate() throws Exception {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
            embeddedKafka.getEmbeddedKafka());
        DefaultKafkaConsumerFactory<Integer, String> cf =
                            new DefaultKafkaConsumerFactory<>(consumerProps);
        ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
        KafkaMessageListenerContainer<Integer, String> container =
                            new KafkaMessageListenerContainer<>(cf, containerProperties);
        final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
        container.setupMessageListener(new MessageListener<Integer, String>() {

            @Override
            public void onMessage(ConsumerRecord<Integer, String> record) {
                System.out.println(record);
                records.add(record);
            }

        });
        container.setBeanName("templateTests");
        container.start();
        ContainerTestUtils.waitForAssignment(container,
                            embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
        Map<String, Object> producerProps =
                            KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
        ProducerFactory<Integer, String> pf =
                            new DefaultKafkaProducerFactory<>(producerProps);
        KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
        template.setDefaultTopic(TEMPLATE_TOPIC);
        template.sendDefault("foo");
        assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
        template.sendDefault(0, 2, "bar");
        ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("bar"));
        template.send(TEMPLATE_TOPIC, 0, 2, "baz");
        received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("baz"));
    }

}

前一个示例使用 Hamcrest 匹配器。使用 AssertJ,最终部分看起来如下代码:

assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
// using individual assertions
assertThat(received).has(key(2));
assertThat(received).has(value("bar"));
assertThat(received).has(partition(0));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
// using allOf()
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));

Mock Consumer and Producer

kafka-clients 库提供 MockConsumerMockProducer 类用于测试目的。

如果您希望在使用侦听器容器或 KafkaTemplate 的某些测试中使用这些类,则从 3.0.7 起,框架现在提供 MockConsumerFactoryMockProducerFactory 实现。

这些工厂可以在侦听器容器和模板中使用,而不是默认工厂,后者需要正在运行(或嵌入式)代理服务器。

这是一个返回单个使用者的简单实现的示例:

@Bean
ConsumerFactory<String, String> consumerFactory() {
    MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    TopicPartition topicPartition0 = new TopicPartition("topic", 0);
    List<TopicPartition> topicPartitions = Collections.singletonList(topicPartition0);
    Map<TopicPartition, Long> beginningOffsets = topicPartitions.stream().collect(Collectors
            .toMap(Function.identity(), tp -> 0L));
    consumer.updateBeginningOffsets(beginningOffsets);
    consumer.schedulePollTask(() -> {
        consumer.addRecord(
                new ConsumerRecord<>("topic", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test1",
                        new RecordHeaders(), Optional.empty()));
        consumer.addRecord(
                new ConsumerRecord<>("topic", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test2",
                        new RecordHeaders(), Optional.empty()));
    });
    return new MockConsumerFactory(() -> consumer);
}

如果您希望使用并发进行测试,则工厂的构造函数中的 Supplier lambda 每次都需要创建一个新实例。

使用 MockProducerFactory 时,有两个构造函数;一个是创建简单工厂,另一个是创建支持事务的工厂。

这里有一些示例:

@Bean
ProducerFactory<String, String> nonTransFactory() {
    return new MockProducerFactory<>(() ->
            new MockProducer<>(true, new StringSerializer(), new StringSerializer()));
}

@Bean
ProducerFactory<String, String> transFactory() {
    MockProducer<String, String> mockProducer =
            new MockProducer<>(true, new StringSerializer(), new StringSerializer());
    mockProducer.initTransactions();
    return new MockProducerFactory<String, String>((tx, id) -> mockProducer, "defaultTxId");
}

请注意,在第二种情况下,lambda 是一个 BiFunction<Boolean, String>,其中第一个参数为 true 表示调用者需要事务性制作者;可选的第二个参数包含事务性 ID。这可以是默认值(如构造函数中提供的),或者可以通过 KafkaTransactionManager(或 KafkaTemplate 用于本地事务)覆盖,如果已配置。如果您希望基于此值使用其他 MockProducer,则提供事务性 ID。

如果您在多线程环境中使用制作者,则 BiFunction 应返回多个制作者(可能使用 ThreadLocal 线程绑定)。

事务 MockProducer`s must be initialized for transactions by calling `initTransaction()

使用 MockProducer 时,如果您不想在每次发送后关闭制作者,则可以提供自定义 MockProducer 实现,它覆盖 close 方法,该方法不调用父类的 close 方法。这对于在同一制作者上验证多次发布而不关闭它时测试非常方便。

这是一个示例:

@Bean
MockProducer<String, String> mockProducer() {
    return new MockProducer<>(false, new StringSerializer(), new StringSerializer()) {
        @Override
        public void close() {

        }
    };
}

@Bean
ProducerFactory<String, String> mockProducerFactory(MockProducer<String, String> mockProducer) {
    return new MockProducerFactory<>(() -> mockProducer);
}