Testing Applications

  • PulsarConsumerTestUtil: 用于从 Pulsar 主题消费消息的类型安全、流畅 API,支持指定待满足条件以在返回消息之前等待。

  • PulsarTestContainerSupport: 自动启动 Pulsar Testcontainer,以便在测试应用程序中使用 Pulsar 集成,只需在测试类中实现此接口即可。

spring-pulsar-test 依赖项在测试应用程序时包括一些有用的实用工具。

PulsarConsumerTestUtil

org.springframework.pulsar.test.support.PulsarConsumerTestUtil 为从测试中的 Pulsar 主题消费消息提供了一个类型安全、流畅的 API。

以下示例显示如何从主题消费消息 5 秒:

List<Message<String>> messages = PulsarConsumerTestUtil.consumeMessages(consumerFactory)
            .fromTopic("my-topic")
            .withSchema(Schema.STRING)
            .awaitAtMost(Duration.ofSeconds(5))
            .get();

还可以使用 until 方法让你指定在返回消息之前必须满足的条件。以下示例使用条件从主题消费 5 条消息。

List<Message<String>> messages = PulsarConsumerTestUtil.consumeMessages(consumerFactory)
            .fromTopic("my-topic")
            .withSchema(Schema.STRING)
            .awaitAtMost(Duration.ofSeconds(5))
            .until(messages -> messages.size() == 5)
            .get();

一套常用的条件在 org.springframework.pulsar.test.support.ConsumedMessagesConditions 中提供。以下示例使用工厂提供的 atLeastOneMessageMatches 条件在其中一条消息的值为 "boom" 时返回消费的消息。

List<Message<String>> messages = PulsarConsumerTestUtil.consumeMessages(consumerFactory)
            .fromTopic("my-topic")
            .withSchema(Schema.STRING)
            .awaitAtMost(Duration.ofSeconds(5))
            .until(ConsumedMessagesConditions.atLeastOneMessageMatches("boom"))
            .get();

PulsarTestContainerSupport

org.springframework.pulsar.test.support.PulsarTestContainerSupport 接口提供了一个静态 Pulsar Testcontainer。在使用 Junit Jupiter 时,容器通过 @BeforeAll 批注为每个测试类自动启动一次。

以下示例演示如何在 @SpringBootTest 中结合前面提到的 PulsarConsumerTestUtil 使用容器支持。

@SpringBootTest
class MyApplicationTests implements PulsarTestContainerSupport {

	@DynamicPropertySource
	static void pulsarProperties(DynamicPropertyRegistry registry) {
		registry.add("spring.pulsar.client.service-url", PULSAR_CONTAINER::getPulsarBrokerUrl);
		registry.add("spring.pulsar.admin.service-url", PULSAR_CONTAINER::getHttpServiceUrl);
	}

	@Test
	void sendAndReceiveWorksAsExpected(
			@Autowired PulsarTemplate<String> template,
			@Autowired PulsarConsumerFactory<String> consumerFactory) {
		var topic = "some-topic";
		var msg = "foo-5150";
		template.send(topic, msg);
		var matchedUsers = PulsarConsumerTestUtil.consumeMessages(consumerFactory)
				.fromTopic(topic)
				.withSchema(Schema.STRING)
				.awaitAtMost(Duration.ofSeconds(2))
				.until(ConsumedMessagesConditions.atLeastOneMessageMatches(msg))
				.get();
		assertThat(matchedUsers).hasSize(1);
	}
}