Working with Application Events

为了使应用程序模块尽可能相互解耦,它们的主要交互方式应该是事件发布和使用。这避免了原始模块了解所有潜在的利益相关者,这是实现应用程序模块集成测试(参见 Integration Testing Application Modules)的关键方面。 我们经常会发现像这样定义的应用程序组件:

  • Java

  • Kotlin

@Service
@RequiredArgsConstructor
public class OrderManagement {

  private final InventoryManagement inventory;

  @Transactional
  public void complete(Order order) {

    // State transition on the order aggregate go here

    // Invoke related functionality
    inventory.updateStockFor(order);
  }
}
@Service
class OrderManagement(val inventory: InventoryManagement) {

  @Transactional
  fun complete(order: Order) {
    inventory.updateStockFor(order)
  }
}

complete(…) 方法创建功能重力,因为它会吸引相关功能,从而与其他应用程序模块中定义的 Spring Bean 进行交互。这尤其使得组件更难测试,因为我们需要的那些依赖于 Bean 的实例来创建 OrderManagement 的实例(参见 Dealing with Efferent Dependencies)。这也意味着,无论何时希望将进一步的功能与业务事件订单完成相集成的时候,都必须接触该类。 我们可以按照如下方式更改应用程序模块交互:

Publishing an application event via Spring’s ApplicationEventPublisher
  • Java

  • Kotlin

@Service
@RequiredArgsConstructor
public class OrderManagement {

  private final ApplicationEventPublisher events;
  private final OrderInternal dependency;

  @Transactional
  public void complete(Order order) {

    // State transition on the order aggregate go here

    events.publishEvent(new OrderCompleted(order.getId()));
  }
}
@Service
class OrderManagement(val events: ApplicationEventPublisher, val dependency: OrderInternal) {

  @Transactional
  fun complete(order: Order) {
    events.publishEvent(OrderCompleted(order.id))
  }
}

请注意,我们并非依赖其他应用程序模块的 Spring Bean,而是使用 Spring 的 ApplicationEventPublisher 来发布一个域事件,一旦在主聚合上完成了状态转换。有关事件发布的更多聚合驱动方法,请参阅 Spring Data’s application event publication mechanism 以获取详细信息。由于事件发布默认情况下是同步发生的,因此总体安排的事务语义与上述示例中保持相同。无论是好处,因为我们可以获得非常简单的一致性模型(订单 and 的状态更改或库存更新都成功,否则都不成功),还是坏处,因为更多触发的相关功能将扩大事务边界,并可能导致整个事务失败,即使是导致错误的功能并不是必要的。 另一种方法是将事件消费移到事务提交时的异步处理,并将辅助功能完全当作辅助功能:

An async, transactional event listener
  • Java

  • Kotlin

@Component
class InventoryManagement {

  @Async
  @TransactionalEventListener
  void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {

  @Async
  @TransactionalEventListener
  fun on(event: OrderCompleted) { /* … */ }
}

这现在有效地将原始事务与侦听器的执行解耦。虽然这避免了原始业务事务的扩展,但也产生了一个风险:如果侦听器由于某种原因而失败,则事件发布将丢失,除非每个侦听器实际实现了自己的安全机制。更糟的是,这甚至不能完全奏效,因为系统甚至可能在调用该方法之前就发生故障。

Application Module Listener

要在事务本身中运行事务事件监听器,后用 @Transactional 注解它。

An async, transactional event listener running in a transaction itself
  • Java

  • Kotlin

@Component
class InventoryManagement {

  @Async
  @Transactional(propagation = Propagation.REQUIRES_NEW)
  @TransactionalEventListener
  void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {

  @Async
  @Transactional(propagation = Propagation.REQUIRES_NEW)
  @TransactionalEventListener
  fun on(event: OrderCompleted) { /* … */ }
}

为了简化声明如何通过事件集成模块的默认方式,Spring Modulith 提供 @ApplicationModuleListener,以便快捷声明

An application module listener
  • Java

  • Kotlin

@Component
class InventoryManagement {

  @ApplicationModuleListener
  void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {

  @ApplicationModuleListener
  fun on(event: OrderCompleted) { /* … */ }
}

The Event Publication Registry

Spring Modulith 随带事件发布注册表,可插入 Spring Framework 的核心事件发布机制。事件发布时,它会发现将获取事件的 TRANSACTIONAL 事件监听器,并将每个事件的条目(深蓝色)写入事件发布日志,作为原始业务事务的一部分。

event publication registry start
Figure 1. The transactional event listener arrangement before execution

每个事务事件监听器都包装到一个切面中,如果监听器的执行成功,该切面将标记日志条目已完成。倘若监听器失败,日志条目将保持不变,以便可以根据应用程序的需求部署重试机制。默认情况下,所有未完成的事件发布都在应用程序启动时重新提交。

event publication registry end
Figure 2. The transactional event listener arrangement after execution

Spring Boot Event Registry Starters

使用事务事件发布日志需要将一组工件添加到您的应用程序中。为了简化此任务,Spring Modulith 提供了以 persistence technology 为中心的 Starter POM,并默认为基于 Jackson 的 EventSerializer 实现。以下 Starter 可用:

Persistence Technology Artifact Description

JPA

spring-modulith-starter-jpa

使用 JPA 作为持久性技术。

JDBC

spring-modulith-starter-jdbc

使用 JDBC 作为持久性技术。它也适用于基于 JPA 的应用程序,但会绕过 JPA 提供程序来进行实际事件持久化。

MongoDB

spring-modulith-starter-mongodb

使用 JDBC 作为持久性技术。此外,它还启用 MongoDB 事务,并要求服务器的副本集设置与其交互。可以通过将 spring.modulith.events.mongobd.transaction-management.enabled 属性设置为 false 来禁用事务自动配置。

Neo4j

spring-modulith-starter-neo4j

在 Spring Data Neo4j 之后使用 Neo4j。

Managing Event Publications

在应用程序运行期间可能需要以各种方式管理事件发布。可能需要在给定的时间段后向相应的侦听器重新提交未完成的发布。另一方面,可能必须从数据库中清除已完成的发布或将其移至存档存储。由于对这类清理的需求因应用程序而异,因此 Spring Modulith 提供了处理两种发布的 API。该 API 可通过 spring-modulith-events-api 工件获得,您可以将其添加到您的应用程序中:

Using Spring Modulith Events API artifact
  • Maven

  • Gradle

<dependency>
  <groupId>org.springframework.modulith</groupId>
  <artifactId>spring-modulith-events-api</artifactId>
  <version>{projectVersion}</version>
</dependency>
dependencies {
  implementation 'org.springframework.modulith:spring-modulith-events-api:{projectVersion}'
}

此工件包含两个主要的抽象,可以用作 Spring Bean 提供给应用程序代码:

  • CompletedEventPublications ——该界面允许访问所有已完成的事件发布,并提供 API 来立即从数据库中清除所有已完成的事件,或清除超过既定持续时间(例如 1 分钟)的已完成发布。

  • IncompleteEventPublications——该界面允许访问所有未完成的事件发布,以重新提交与给定谓词匹配的发布或相对原始发布日期早于给定 Duration 的发布。

Event Publication Repositories

为了实际编写事件发布日志,Spring Modulith 公开了 EventPublicationRepository SPI 和实现,用于支持事务的流行持久性技术,如 JPA、JDBC 和 MongoDB。你可以通过将相应的 JAR 添加到你的 Spring Modulith 应用程序中来选择要使用的持久性技术。我们准备了专门的 starters 来简化此任务。

当将相应配置属性 (spring.modulith.events.jdbc-schema-initialization.enabled) 设置为 true 时,基于 JDBC 的实现可以为事件发布日志创建专用表。有关详细信息,请参阅附录中的 schema overview

Event Serializer

每个日志条目都以序列化格式包含原始事件。spring-modulith-events-core 中包含的 EventSerializer 抽象允许插入不同的策略,以将事件实例转换成适合数据存储的格式。Spring Modulith 通过 spring-modulith-events-jackson 工件提供了一个基于 Jackson 的 JSON 实现,该实现通过标准 Spring Boot 自动配置默认注册一个使用 ObjectMapperJacksonEventSerializer

Customizing the Event Publication Date

默认情况下,事件发布注册表将使用 Clock.systemUTC() 返回的日期作为事件发布日期。如果您想自定义此功能,请用应用程序上下文中注册类型为 clock 的 Bean:

@Configuration
class MyConfiguration {

  @Bean
  Clock myCustomClock() {
    return … // Your custom Clock instance created here.
  }
}

Externalizing Events

应用程序模块之间交换的某些事件可能对外部系统很有趣。Spring Modulith 允许将选定的事件发布到各种消息代理。为了使用该支持,您需要执行以下步骤:

  1. broker-specific Spring Modulith artifact 添加到项目中。

  2. 通过使用 Spring Modulith 或 jMolecules 的 @Externalized 注解对事件类型进行注释,选择要外接的事件类型。

  3. 在注释的值中指定特定的代理目标路由。

要了解如何使用其他方式选择要对外化的事件,或自定义它们在代理中的路由,请查看 Fundamentals of Event Externalization

Supported Infrastructure

Broker Artifact Description

Kafka

spring-modulith-events-kafka

使用 Spring Kafka 与代理交互。逻辑路由密钥将用作

AMQP

spring-modulith-events-amqp

使用 Spring AMQP 与任何兼容代理交互。例如,需要明确的 Spring Rabbit 依赖项声明。逻辑路由密钥将用作 AMQP 路由密钥。

JMS

spring-modulith-events-jms

使用 Spring 的核心 JMS 支持。不支持路由键。

SQS

spring-modulith-events-aws-sqs

使用 Spring Cloud AWS SQS 支持。逻辑路由键将用作 SQS 消息组 ID。当设置路由键时,需要将 SQS 队列配置为 FIFO 队列。

SNS

spring-modulith-events-aws-sns

使用 Spring Cloud AWS SNS 支持。逻辑路由键将用作 SNS 消息组 ID。当设置路由键时,需要将 SNS 配置为 FIFO 主题,并启用基于内容的重复数据删除。

Fundamentals of Event Externalization

事件外化对每个已发布的应用程序事件执行三个步骤。

  1. Determining whether the event is supposed to be externalized——我们将其称为 “event selection”。默认情况下,仅位于 Spring Boot 自动配置包中且使用受支持的 @Externalized 注解进行注释的事件类型才会被选择用于外接。

  2. Mapping the event (optional) ——默认情况下,事件使用应用程序中存在的 Jackson ObjectMapper 序列化为 JSON 并按原样发布。映射步骤允许开发人员自定义该表示,甚至完全用一个适合外部方的表示替换原始事件。请注意,映射步骤在待发布对象的实际序列化之前。

  3. Determining a routing target ——消息代理客户端需要一个逻辑目标来发布消息。该目标通常识别物理基础架构(主题、交换机或队列,具体取决于代理),并且通常从事件类型中静态派生。除非在 @Externalized 注解中明确定义,否则 Spring Modulith 将应用程序本地类型名称用作目标。换句话说,在基本包为 com.acme.app 的 Spring Boot 应用程序中,事件类型 com.acme.app.sample.SampleEvent 将被发布到 sample.SampleEvent。某些代理还允许定义动态路由键,该键在实际目标中用于不同的目的。默认情况下,不使用任何路由键。

Annotation-based Event Externalization Configuration

要通过 @Externalized 注解定义自定义路由键,可以使用 $target::$key 模式的每个特定注释中提供的 target/value 属性。键可以是 SpEL 表达式,它获取配置为根对象的事件实例。

Defining a dynamic routing key via SpEL expression
  • Java

  • Kotlin

@Externalized("customer-created::#{#this.getLastname()}") (2)
class CustomerCreated {

  String getLastname() { (1)
    // …
  }
}
@Externalized("customer-created::#{#this.getLastname()}") (2)
class CustomerCreated {
  fun getLastname(): String { (1)
    // …
  }
}

CustomerCreated 事件通过存取器方法显示客户的姓氏。然后通过 #this.getLastname() 表达式在 :: 分隔符之后的 target 声明中使用该方法。

如果密钥计算变得更复杂,建议将其委托给一个将事件作为参数的 Spring Bean:

Invoking a Spring bean to calculate a routing key
  • Java

  • Kotlin

@Externalized("…::#{@beanName.someMethod(#this)}")
@Externalized("…::#{@beanName.someMethod(#this)}")

Programmatic Event Externalization Configuration

spring-modulith-events-api 工件包含 EventExternalizationConfiguration,允许开发人员自定义以上提到的所有步骤。

Programmatically configuring event externalization
  • Java

  • Kotlin

@Configuration
class ExternalizationConfiguration {

  @Bean
  EventExternalizationConfiguration eventExternalizationConfiguration() {

    return EventExternalizationConfiguration.externalizing()                 (1)
      .select(EventExternalizationConfiguration.annotatedAsExternalized())   (2)
      .mapping(SomeEvent.class, it -> …)                                     (3)
      .routeKey(WithKeyProperty.class, WithKeyProperty::getKey)              (4)
      .build();
  }
}
@Configuration
class ExternalizationConfiguration {

  @Bean
  fun eventExternalizationConfiguration(): EventExternalizationConfiguration {

    EventExternalizationConfiguration.externalizing()                         (1)
      .select(EventExternalizationConfiguration.annotatedAsExternalized())    (2)
      .mapping(SomeEvent::class, it -> …)                                     (3)
      .routeKey(WithKeyProperty::class, WithKeyProperty::getKey)              (4)
      .build()
  }
}
1 我们首先创建一个 EventExternalizationConfiguration 的默认实例。
2 通过对前一次调用返回的 Selector 实例调用 select(…) 方法之一,自定义事件选择。此步骤从根本上禁用应用程序基本包过滤器,因为我们现在只查找该注释。现在存在方便的方法,可以按类型、包、包和注释轻松选择事件。此外,还存在一种快捷方式,可以在一步内定义选择和路由。
3 我们为 SomeEvent 实例定义一个映射步骤。请注意,路由仍将由原始事件实例确定,除非您还对路由器调用 ….routeMapped()
4 我们最终通过定义一个方法句柄来提取事件实例的值,由此确定一个路由键。或者,可以使用前一调用返回的 Router 实例上的通用 route(…) 方法为各个事件生成一个完整的 RoutingKey

Testing published events

以下部分描述了仅关注跟踪 Spring 应用程序事件的测试方法。有关使用 @ApplicationModuleListener 的测试模块的更全面方法,请查看 Scenario API

Spring Modulith 的 @ApplicationModuleTest 使能够将 PublishedEvents 实例注入到测试方法中,以验证在测试的业务操作过程中已发布特定的事件集。

Event-based integration testing of the application module arrangement
  • Java

  • Kotlin

@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  void someTestMethod(*PublishedEvents events*) {

    // …
    var matchingMapped = events.ofType(OrderCompleted.class)
      .matching(OrderCompleted::getOrderId, reference.getId());

    assertThat(matchingMapped).hasSize(1);
  }
}
@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  fun someTestMethod(events: PublishedEvents events) {

    // …
    var matchingMapped = events.ofType(OrderCompleted::class)
      .matching(OrderCompleted::getOrderId, reference.getId())

    assertThat(matchingMapped).hasSize(1)
  }
}

请注意,PublishedEvents`如何公开 API 来选择匹配特定条件的事件。验证通过 AssertJ 断言完成,该断言验证预期元素的数量。如果您对这些断言使用 AssertJ,则还可以将 `AssertablePublishedEvents 用作测试方法参数类型,并使用它提供的流利断言 API。

Using AssertablePublishedEvents to verify event publications
  • Java

  • Kotlin

@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  void someTestMethod(*AssertablePublishedEvents events*) {

    // …
    assertThat(events)
      .contains(OrderCompleted.class)
      .matching(OrderCompleted::getOrderId, reference.getId());
  }
}
@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  fun someTestMethod(events: AssertablePublishedEvents) {

    // …
    assertThat(events)
      .contains(OrderCompleted::class)
      .matching(OrderCompleted::getOrderId, reference.getId())
  }
}

请注意,assertThat(…) 表达式返回的类型如何允许直接定义已发布事件上的约束。