Testing Applications
-
PulsarConsumerTestUtil: 用于从 Pulsar 主题消费消息的类型安全、流畅 API,支持指定待满足条件以在返回消息之前等待。
-
PulsarTestContainerSupport: 自动启动 Pulsar Testcontainer,以便在测试应用程序中使用 Pulsar 集成,只需在测试类中实现此接口即可。
spring-pulsar-test
依赖项在测试应用程序时包括一些有用的实用工具。
PulsarConsumerTestUtil
org.springframework.pulsar.test.support.PulsarConsumerTestUtil
为从测试中的 Pulsar 主题消费消息提供了一个类型安全、流畅的 API。
以下示例显示如何从主题消费消息 5 秒:
List<Message<String>> messages = PulsarConsumerTestUtil.consumeMessages(consumerFactory)
.fromTopic("my-topic")
.withSchema(Schema.STRING)
.awaitAtMost(Duration.ofSeconds(5))
.get();
还可以使用 until
方法让你指定在返回消息之前必须满足的条件。以下示例使用条件从主题消费 5 条消息。
List<Message<String>> messages = PulsarConsumerTestUtil.consumeMessages(consumerFactory)
.fromTopic("my-topic")
.withSchema(Schema.STRING)
.awaitAtMost(Duration.ofSeconds(5))
.until(messages -> messages.size() == 5)
.get();
一套常用的条件在 org.springframework.pulsar.test.support.ConsumedMessagesConditions
中提供。以下示例使用工厂提供的 atLeastOneMessageMatches
条件在其中一条消息的值为 "boom"
时返回消费的消息。
List<Message<String>> messages = PulsarConsumerTestUtil.consumeMessages(consumerFactory)
.fromTopic("my-topic")
.withSchema(Schema.STRING)
.awaitAtMost(Duration.ofSeconds(5))
.until(ConsumedMessagesConditions.atLeastOneMessageMatches("boom"))
.get();
PulsarTestContainerSupport
org.springframework.pulsar.test.support.PulsarTestContainerSupport
接口提供了一个静态 Pulsar Testcontainer。在使用 Junit Jupiter 时,容器通过 @BeforeAll
批注为每个测试类自动启动一次。
以下示例演示如何在 @SpringBootTest
中结合前面提到的 PulsarConsumerTestUtil
使用容器支持。
@SpringBootTest
class MyApplicationTests implements PulsarTestContainerSupport {
@DynamicPropertySource
static void pulsarProperties(DynamicPropertyRegistry registry) {
registry.add("spring.pulsar.client.service-url", PULSAR_CONTAINER::getPulsarBrokerUrl);
registry.add("spring.pulsar.admin.service-url", PULSAR_CONTAINER::getHttpServiceUrl);
}
@Test
void sendAndReceiveWorksAsExpected(
@Autowired PulsarTemplate<String> template,
@Autowired PulsarConsumerFactory<String> consumerFactory) {
var topic = "some-topic";
var msg = "foo-5150";
template.send(topic, msg);
var matchedUsers = PulsarConsumerTestUtil.consumeMessages(consumerFactory)
.fromTopic(topic)
.withSchema(Schema.STRING)
.awaitAtMost(Duration.ofSeconds(2))
.until(ConsumedMessagesConditions.atLeastOneMessageMatches(msg))
.get();
assertThat(matchedUsers).hasSize(1);
}
}