Working with Application Events
为了使应用程序模块尽可能相互解耦,它们的主要交互方式应该是事件发布和使用。这避免了原始模块了解所有潜在的利益相关者,这是实现应用程序模块集成测试(参见 Integration Testing Application Modules)的关键方面。 我们经常会发现像这样定义的应用程序组件:
-
Java
-
Kotlin
@Service
@RequiredArgsConstructor
public class OrderManagement {
private final InventoryManagement inventory;
@Transactional
public void complete(Order order) {
// State transition on the order aggregate go here
// Invoke related functionality
inventory.updateStockFor(order);
}
}
@Service
class OrderManagement(val inventory: InventoryManagement) {
@Transactional
fun complete(order: Order) {
inventory.updateStockFor(order)
}
}
complete(…)
方法创建功能重力,因为它会吸引相关功能,从而与其他应用程序模块中定义的 Spring Bean 进行交互。这尤其使得组件更难测试,因为我们需要的那些依赖于 Bean 的实例来创建 OrderManagement
的实例(参见 Dealing with Efferent Dependencies)。这也意味着,无论何时希望将进一步的功能与业务事件订单完成相集成的时候,都必须接触该类。
我们可以按照如下方式更改应用程序模块交互:
ApplicationEventPublisher
-
Java
-
Kotlin
@Service
@RequiredArgsConstructor
public class OrderManagement {
private final ApplicationEventPublisher events;
private final OrderInternal dependency;
@Transactional
public void complete(Order order) {
// State transition on the order aggregate go here
events.publishEvent(new OrderCompleted(order.getId()));
}
}
@Service
class OrderManagement(val events: ApplicationEventPublisher, val dependency: OrderInternal) {
@Transactional
fun complete(order: Order) {
events.publishEvent(OrderCompleted(order.id))
}
}
请注意,我们并非依赖其他应用程序模块的 Spring Bean,而是使用 Spring 的 ApplicationEventPublisher
来发布一个域事件,一旦在主聚合上完成了状态转换。有关事件发布的更多聚合驱动方法,请参阅 Spring Data’s application event publication mechanism 以获取详细信息。由于事件发布默认情况下是同步发生的,因此总体安排的事务语义与上述示例中保持相同。无论是好处,因为我们可以获得非常简单的一致性模型(订单 and 的状态更改或库存更新都成功,否则都不成功),还是坏处,因为更多触发的相关功能将扩大事务边界,并可能导致整个事务失败,即使是导致错误的功能并不是必要的。
另一种方法是将事件消费移到事务提交时的异步处理,并将辅助功能完全当作辅助功能:
-
Java
-
Kotlin
@Component
class InventoryManagement {
@Async
@TransactionalEventListener
void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {
@Async
@TransactionalEventListener
fun on(event: OrderCompleted) { /* … */ }
}
这现在有效地将原始事务与侦听器的执行解耦。虽然这避免了原始业务事务的扩展,但也产生了一个风险:如果侦听器由于某种原因而失败,则事件发布将丢失,除非每个侦听器实际实现了自己的安全机制。更糟的是,这甚至不能完全奏效,因为系统甚至可能在调用该方法之前就发生故障。
Application Module Listener
要在事务本身中运行事务事件监听器,后用 @Transactional
注解它。
-
Java
-
Kotlin
@Component
class InventoryManagement {
@Async
@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener
void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {
@Async
@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener
fun on(event: OrderCompleted) { /* … */ }
}
为了简化声明如何通过事件集成模块的默认方式,Spring Modulith 提供 @ApplicationModuleListener
,以便快捷声明
-
Java
-
Kotlin
@Component
class InventoryManagement {
@ApplicationModuleListener
void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {
@ApplicationModuleListener
fun on(event: OrderCompleted) { /* … */ }
}
The Event Publication Registry
Spring Modulith 随带事件发布注册表,可插入 Spring Framework 的核心事件发布机制。事件发布时,它会发现将获取事件的 TRANSACTIONAL 事件监听器,并将每个事件的条目(深蓝色)写入事件发布日志,作为原始业务事务的一部分。
每个事务事件监听器都包装到一个切面中,如果监听器的执行成功,该切面将标记日志条目已完成。倘若监听器失败,日志条目将保持不变,以便可以根据应用程序的需求部署重试机制。默认情况下,所有未完成的事件发布都在应用程序启动时重新提交。
Spring Boot Event Registry Starters
使用事务事件发布日志需要将一组工件添加到您的应用程序中。为了简化此任务,Spring Modulith 提供了以 persistence technology 为中心的 Starter POM,并默认为基于 Jackson 的 EventSerializer 实现。以下 Starter 可用:
Persistence Technology | Artifact | Description |
---|---|---|
JPA |
|
使用 JPA 作为持久性技术。 |
JDBC |
|
使用 JDBC 作为持久性技术。它也适用于基于 JPA 的应用程序,但会绕过 JPA 提供程序来进行实际事件持久化。 |
MongoDB |
|
使用 JDBC 作为持久性技术。此外,它还启用 MongoDB 事务,并要求服务器的副本集设置与其交互。可以通过将 |
Neo4j |
|
在 Spring Data Neo4j 之后使用 Neo4j。 |
Managing Event Publications
在应用程序运行期间可能需要以各种方式管理事件发布。可能需要在给定的时间段后向相应的侦听器重新提交未完成的发布。另一方面,可能必须从数据库中清除已完成的发布或将其移至存档存储。由于对这类清理的需求因应用程序而异,因此 Spring Modulith 提供了处理两种发布的 API。该 API 可通过 spring-modulith-events-api
工件获得,您可以将其添加到您的应用程序中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-events-api</artifactId>
<version>{projectVersion}</version>
</dependency>
dependencies {
implementation 'org.springframework.modulith:spring-modulith-events-api:{projectVersion}'
}
此工件包含两个主要的抽象,可以用作 Spring Bean 提供给应用程序代码:
-
CompletedEventPublications
——该界面允许访问所有已完成的事件发布,并提供 API 来立即从数据库中清除所有已完成的事件,或清除超过既定持续时间(例如 1 分钟)的已完成发布。 -
IncompleteEventPublications
——该界面允许访问所有未完成的事件发布,以重新提交与给定谓词匹配的发布或相对原始发布日期早于给定Duration
的发布。
Event Publication Repositories
为了实际编写事件发布日志,Spring Modulith 公开了 EventPublicationRepository
SPI 和实现,用于支持事务的流行持久性技术,如 JPA、JDBC 和 MongoDB。你可以通过将相应的 JAR 添加到你的 Spring Modulith 应用程序中来选择要使用的持久性技术。我们准备了专门的 starters 来简化此任务。
当将相应配置属性 (spring.modulith.events.jdbc-schema-initialization.enabled
) 设置为 true
时,基于 JDBC 的实现可以为事件发布日志创建专用表。有关详细信息,请参阅附录中的 schema overview。
Externalizing Events
应用程序模块之间交换的某些事件可能对外部系统很有趣。Spring Modulith 允许将选定的事件发布到各种消息代理。为了使用该支持,您需要执行以下步骤:
-
将 broker-specific Spring Modulith artifact 添加到项目中。
-
通过使用 Spring Modulith 或 jMolecules 的
@Externalized
注解对事件类型进行注释,选择要外接的事件类型。 -
在注释的值中指定特定的代理目标路由。
要了解如何使用其他方式选择要对外化的事件,或自定义它们在代理中的路由,请查看 Fundamentals of Event Externalization。
Supported Infrastructure
Broker | Artifact | Description |
---|---|---|
Kafka |
|
使用 Spring Kafka 与代理交互。逻辑路由密钥将用作 |
AMQP |
|
使用 Spring AMQP 与任何兼容代理交互。例如,需要明确的 Spring Rabbit 依赖项声明。逻辑路由密钥将用作 AMQP 路由密钥。 |
JMS |
|
使用 Spring 的核心 JMS 支持。不支持路由键。 |
SQS |
|
使用 Spring Cloud AWS SQS 支持。逻辑路由键将用作 SQS 消息组 ID。当设置路由键时,需要将 SQS 队列配置为 FIFO 队列。 |
SNS |
|
使用 Spring Cloud AWS SNS 支持。逻辑路由键将用作 SNS 消息组 ID。当设置路由键时,需要将 SNS 配置为 FIFO 主题,并启用基于内容的重复数据删除。 |
Fundamentals of Event Externalization
事件外化对每个已发布的应用程序事件执行三个步骤。
-
Determining whether the event is supposed to be externalized——我们将其称为 “event selection”。默认情况下,仅位于 Spring Boot 自动配置包中且使用受支持的
@Externalized
注解进行注释的事件类型才会被选择用于外接。 -
Mapping the event (optional) ——默认情况下,事件使用应用程序中存在的 Jackson
ObjectMapper
序列化为 JSON 并按原样发布。映射步骤允许开发人员自定义该表示,甚至完全用一个适合外部方的表示替换原始事件。请注意,映射步骤在待发布对象的实际序列化之前。 -
Determining a routing target ——消息代理客户端需要一个逻辑目标来发布消息。该目标通常识别物理基础架构(主题、交换机或队列,具体取决于代理),并且通常从事件类型中静态派生。除非在
@Externalized
注解中明确定义,否则 Spring Modulith 将应用程序本地类型名称用作目标。换句话说,在基本包为com.acme.app
的 Spring Boot 应用程序中,事件类型com.acme.app.sample.SampleEvent
将被发布到sample.SampleEvent
。某些代理还允许定义动态路由键,该键在实际目标中用于不同的目的。默认情况下,不使用任何路由键。
Annotation-based Event Externalization Configuration
要通过 @Externalized
注解定义自定义路由键,可以使用 $target::$key
模式的每个特定注释中提供的 target/value 属性。键可以是 SpEL 表达式,它获取配置为根对象的事件实例。
-
Java
-
Kotlin
@Externalized("customer-created::#{#this.getLastname()}") (2)
class CustomerCreated {
String getLastname() { (1)
// …
}
}
@Externalized("customer-created::#{#this.getLastname()}") (2)
class CustomerCreated {
fun getLastname(): String { (1)
// …
}
}
CustomerCreated
事件通过存取器方法显示客户的姓氏。然后通过 #this.getLastname()
表达式在 ::
分隔符之后的 target 声明中使用该方法。
如果密钥计算变得更复杂,建议将其委托给一个将事件作为参数的 Spring Bean:
-
Java
-
Kotlin
@Externalized("…::#{@beanName.someMethod(#this)}")
@Externalized("…::#{@beanName.someMethod(#this)}")
Programmatic Event Externalization Configuration
spring-modulith-events-api
工件包含 EventExternalizationConfiguration
,允许开发人员自定义以上提到的所有步骤。
-
Java
-
Kotlin
@Configuration
class ExternalizationConfiguration {
@Bean
EventExternalizationConfiguration eventExternalizationConfiguration() {
return EventExternalizationConfiguration.externalizing() (1)
.select(EventExternalizationConfiguration.annotatedAsExternalized()) (2)
.mapping(SomeEvent.class, it -> …) (3)
.routeKey(WithKeyProperty.class, WithKeyProperty::getKey) (4)
.build();
}
}
@Configuration
class ExternalizationConfiguration {
@Bean
fun eventExternalizationConfiguration(): EventExternalizationConfiguration {
EventExternalizationConfiguration.externalizing() (1)
.select(EventExternalizationConfiguration.annotatedAsExternalized()) (2)
.mapping(SomeEvent::class, it -> …) (3)
.routeKey(WithKeyProperty::class, WithKeyProperty::getKey) (4)
.build()
}
}
1 | 我们首先创建一个 EventExternalizationConfiguration 的默认实例。 |
2 | 通过对前一次调用返回的 Selector 实例调用 select(…) 方法之一,自定义事件选择。此步骤从根本上禁用应用程序基本包过滤器,因为我们现在只查找该注释。现在存在方便的方法,可以按类型、包、包和注释轻松选择事件。此外,还存在一种快捷方式,可以在一步内定义选择和路由。 |
3 | 我们为 SomeEvent 实例定义一个映射步骤。请注意,路由仍将由原始事件实例确定,除非您还对路由器调用 ….routeMapped() 。 |
4 | 我们最终通过定义一个方法句柄来提取事件实例的值,由此确定一个路由键。或者,可以使用前一调用返回的 Router 实例上的通用 route(…) 方法为各个事件生成一个完整的 RoutingKey 。 |
Testing published events
以下部分描述了仅关注跟踪 Spring 应用程序事件的测试方法。有关使用 |
Spring Modulith 的 @ApplicationModuleTest
使能够将 PublishedEvents
实例注入到测试方法中,以验证在测试的业务操作过程中已发布特定的事件集。
-
Java
-
Kotlin
@ApplicationModuleTest
class OrderIntegrationTests {
@Test
void someTestMethod(*PublishedEvents events*) {
// …
var matchingMapped = events.ofType(OrderCompleted.class)
.matching(OrderCompleted::getOrderId, reference.getId());
assertThat(matchingMapped).hasSize(1);
}
}
@ApplicationModuleTest
class OrderIntegrationTests {
@Test
fun someTestMethod(events: PublishedEvents events) {
// …
var matchingMapped = events.ofType(OrderCompleted::class)
.matching(OrderCompleted::getOrderId, reference.getId())
assertThat(matchingMapped).hasSize(1)
}
}
请注意,PublishedEvents`如何公开 API 来选择匹配特定条件的事件。验证通过 AssertJ 断言完成,该断言验证预期元素的数量。如果您对这些断言使用 AssertJ,则还可以将 `AssertablePublishedEvents
用作测试方法参数类型,并使用它提供的流利断言 API。
AssertablePublishedEvents
to verify event publications-
Java
-
Kotlin
@ApplicationModuleTest
class OrderIntegrationTests {
@Test
void someTestMethod(*AssertablePublishedEvents events*) {
// …
assertThat(events)
.contains(OrderCompleted.class)
.matching(OrderCompleted::getOrderId, reference.getId());
}
}
@ApplicationModuleTest
class OrderIntegrationTests {
@Test
fun someTestMethod(events: AssertablePublishedEvents) {
// …
assertThat(events)
.contains(OrderCompleted::class)
.matching(OrderCompleted::getOrderId, reference.getId())
}
}
请注意,assertThat(…)
表达式返回的类型如何允许直接定义已发布事件上的约束。