Working with Application Events

为了使应用程序模块尽可能相互解耦,它们的主要交互方式应该是事件发布和使用。这避免了原始模块了解所有潜在的利益相关者,这是实现应用程序模块集成测试(参见 Integration Testing Application Modules)的关键方面。

To keep application modules as decoupled as possible from each other, their primary means of interaction should be event publication and consumption. This avoids the originating module to know about all potentially interested parties, which is a key aspect to enable application module integration testing (see Integration Testing Application Modules).

我们经常会发现像这样定义的应用程序组件:

Often we will find application components defined like this:

  • Java

  • Kotlin

@Service
@RequiredArgsConstructor
public class OrderManagement {

  private final InventoryManagement inventory;

  @Transactional
  public void complete(Order order) {

    // State transition on the order aggregate go here

    // Invoke related functionality
    inventory.updateStockFor(order);
  }
}
@Service
class OrderManagement(val inventory: InventoryManagement) {

  @Transactional
  fun complete(order: Order) {
    inventory.updateStockFor(order)
  }
}

complete(…) 方法创建功能重力,因为它会吸引相关功能,从而与其他应用程序模块中定义的 Spring Bean 进行交互。这尤其使得组件更难测试,因为我们需要的那些依赖于 Bean 的实例来创建 OrderManagement 的实例(参见 Dealing with Efferent Dependencies)。这也意味着,无论何时希望将进一步的功能与业务事件订单完成相集成的时候,都必须接触该类。

The complete(…) method creates functional gravity in the sense that it attracts related functionality and thus interaction with Spring beans defined in other application modules. This especially makes the component harder to test as we need to have instances available of those depended on beans just to create an instance of OrderManagement (see Dealing with Efferent Dependencies). It also means that we will have to touch the class whenever we would like to integrate further functionality with the business event order completion.

我们可以按照如下方式更改应用程序模块交互:

We can change the application module interaction as follows:

Publishing an application event via Spring’s ApplicationEventPublisher
  • Java

  • Kotlin

@Service
@RequiredArgsConstructor
public class OrderManagement {

  private final ApplicationEventPublisher events;
  private final OrderInternal dependency;

  @Transactional
  public void complete(Order order) {

    // State transition on the order aggregate go here

    events.publishEvent(new OrderCompleted(order.getId()));
  }
}
@Service
class OrderManagement(val events: ApplicationEventPublisher, val dependency: OrderInternal) {

  @Transactional
  fun complete(order: Order) {
    events.publishEvent(OrderCompleted(order.id))
  }
}

请注意,我们并非依赖其他应用程序模块的 Spring Bean,而是使用 Spring 的 ApplicationEventPublisher 来发布一个域事件,一旦在主聚合上完成了状态转换。有关事件发布的更多聚合驱动方法,请参阅 Spring Data’s application event publication mechanism 以获取详细信息。由于事件发布默认情况下是同步发生的,因此总体安排的事务语义与上述示例中保持相同。无论是好处,因为我们可以获得非常简单的一致性模型(订单 and 的状态更改或库存更新都成功,否则都不成功),还是坏处,因为更多触发的相关功能将扩大事务边界,并可能导致整个事务失败,即使是导致错误的功能并不是必要的。

Note, how, instead of depending on the other application module’s Spring bean, we use Spring’s ApplicationEventPublisher to publish a domain event, once we have completed the state transitions on the primary aggregate. For a more aggregate-driven approach to event publication, see Spring Data’s application event publication mechanism for details. As event publication happens synchronously by default, the transactional semantics of the overall arrangement stay the same as in the example above. Both for the good, as we get to a very simple consistency model (either both the status change of the order and the inventory update succeed or none of them does), but also for the bad as more triggered related functionality will widen the transaction boundary and potentially cause the entire transaction to fail, even if the functionality that is causing the error is not crucial.

另一种方法是将事件消费移到事务提交时的异步处理,并将辅助功能完全当作辅助功能:

A different way of approaching this is by moving the event consumption to asynchronous handling at transaction commit and treat secondary functionality exactly as that:

An async, transactional event listener
  • Java

  • Kotlin

@Component
class InventoryManagement {

  @Async
  @TransactionalEventListener
  void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {

  @Async
  @TransactionalEventListener
  fun on(event: OrderCompleted) { /* … */ }
}

这现在有效地将原始事务与侦听器的执行解耦。虽然这避免了原始业务事务的扩展,但也产生了一个风险:如果侦听器由于某种原因而失败,则事件发布将丢失,除非每个侦听器实际实现了自己的安全机制。更糟的是,这甚至不能完全奏效,因为系统甚至可能在调用该方法之前就发生故障。

This now effectively decouples the original transaction from the execution of the listener. While this avoids the expansion of the original business transaction, it also creates a risk: if the listener fails for whatever reason, the event publication is lost, unless each listener actually implements its own safety net. Even worse, that doesn’t even fully work, as the system might fail before the method is even invoked.

Application Module Listener

要在事务本身中运行事务事件监听器,后用 @Transactional 注解它。

To run a transactional event listener in a transaction itself, it would need to be annotated with @Transactional in turn.

An async, transactional event listener running in a transaction itself
  • Java

  • Kotlin

@Component
class InventoryManagement {

  @Async
  @Transactional(propagation = Propagation.REQUIRES_NEW)
  @TransactionalEventListener
  void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {

  @Async
  @Transactional(propagation = Propagation.REQUIRES_NEW)
  @TransactionalEventListener
  fun on(event: OrderCompleted) { /* … */ }
}

为了简化声明如何通过事件集成模块的默认方式,Spring Modulith 提供 @ApplicationModuleListener,以便快捷声明

To ease the declaration of what is supposed to describe the default way of integrating modules via events, Spring Modulith provides @ApplicationModuleListener to shortcut the declaration

An application module listener
  • Java

  • Kotlin

@Component
class InventoryManagement {

  @ApplicationModuleListener
  void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {

  @ApplicationModuleListener
  fun on(event: OrderCompleted) { /* … */ }
}

The Event Publication Registry

Spring Modulith 随带事件发布注册表,可插入 Spring Framework 的核心事件发布机制。事件发布时,它会发现将获取事件的 TRANSACTIONAL 事件监听器,并将每个事件的条目(深蓝色)写入事件发布日志,作为原始业务事务的一部分。

Spring Modulith ships with an event publication registry that hooks into the core event publication mechanism of Spring Framework. On event publication, it finds out about the transactional event listeners that will get the event delivered and writes entries for each of them (dark blue) into an event publication log as part of the original business transaction.

event publication registry start
Figure 1. The transactional event listener arrangement before execution

每个事务事件监听器都包装到一个切面中,如果监听器的执行成功,该切面将标记日志条目已完成。倘若监听器失败,日志条目将保持不变,以便可以根据应用程序的需求部署重试机制。默认情况下,所有未完成的事件发布都在应用程序启动时重新提交。

Each transactional event listener is wrapped into an aspect that marks that log entry as completed if the execution of the listener succeeds. In case the listener fails, the log entry stays untouched so that retry mechanisms can be deployed depending on the application’s needs. By default, all incomplete event publications are resubmitted at application startup.

event publication registry end
Figure 2. The transactional event listener arrangement after execution

Spring Boot Event Registry Starters

使用事务事件发布日志需要将一组工件添加到您的应用程序中。为了简化此任务,Spring Modulith 提供了以 persistence technology 为中心的 Starter POM,并默认为基于 Jackson 的 EventSerializer 实现。以下 Starter 可用:

Using the transactional event publication log requires a combination of artifacts added to your application. To ease that task, Spring Modulith provides starter POMs that are centered around the publication-registry.publication-repositories to be used and default to the Jackson-based publication-registry.serialization implementation. The following starters are available:

Persistence Technology Artifact Description

JPA

spring-modulith-starter-jpa

Using JPA as persistence technology.

JDBC

spring-modulith-starter-jdbc

Using JDBC as persistence technology. Also works in JPA-based applications but bypasses your JPA provider for actual event persistence.

MongoDB

spring-modulith-starter-mongodb

Using JDBC as persistence technology. Also enables MongoDB transactions and requires a replica set setup of the server to interact with. The transaction auto-configuration can be disabled by setting the spring.modulith.events.mongobd.transaction-management.enabled property to false.

Neo4j

spring-modulith-starter-neo4j

Using Neo4j behind Spring Data Neo4j.

Managing Event Publications

在应用程序运行期间可能需要以各种方式管理事件发布。可能需要在给定的时间段后向相应的侦听器重新提交未完成的发布。另一方面,可能必须从数据库中清除已完成的发布或将其移至存档存储。由于对这类清理的需求因应用程序而异,因此 Spring Modulith 提供了处理两种发布的 API。该 API 可通过 spring-modulith-events-api 工件获得,您可以将其添加到您的应用程序中:

Event publications may need to be managed in a variety of ways during the runtime of an application. Incomplete publications might have to be re-submitted to the corresponding listeners after a given amount of time. Completed publications on the other hand, will likely have to be purged from the database or moved into an archive store. As the needs for that kind of housekeeping strongly vary from application to application, Spring Modulith offers API to deal with both kinds of publications. That API is available through the spring-modulith-events-api artifact, that you can add to your application:

Using Spring Modulith Events API artifact
  • Maven

  • Gradle

<dependency>
  <groupId>org.springframework.modulith</groupId>
  <artifactId>spring-modulith-events-api</artifactId>
  <version>{projectVersion}</version>
</dependency>
dependencies {
  implementation 'org.springframework.modulith:spring-modulith-events-api:{projectVersion}'
}

此工件包含两个主要的抽象,可以用作 Spring Bean 提供给应用程序代码:

This artifact contains two primary abstractions, that are available to application code as Spring Beans:

  • CompletedEventPublications — This interface allows accessing all completed event publications, and provides API to immediately purge all of them from the database or the completed publications older that a given duration (for example, 1 minute).

  • IncompleteEventPublications-- This interface allows accessing all incomplete event publications to resubmit either the ones matching a given predicate or older than a given Duration relative to the original publishing date.

Event Publication Repositories

为了实际编写事件发布日志,Spring Modulith 公开了 EventPublicationRepository SPI 和实现,用于支持事务的流行持久性技术,如 JPA、JDBC 和 MongoDB。你可以通过将相应的 JAR 添加到你的 Spring Modulith 应用程序中来选择要使用的持久性技术。我们准备了专门的 starters 来简化此任务。

To actually write the event publication log, Spring Modulith exposes an EventPublicationRepository SPI and implementations for popular persistence technologies that support transactions, like JPA, JDBC and MongoDB. You select the persistence technology to be used by adding the corresponding JAR to your Spring Modulith application. We have prepared dedicated starters to ease that task.

当将相应配置属性 (spring.modulith.events.jdbc-schema-initialization.enabled) 设置为 true 时,基于 JDBC 的实现可以为事件发布日志创建专用表。有关详细信息,请参阅附录中的 schema overview

The JDBC-based implementation can create a dedicated table for the event publication log when the respective configuration property (spring.modulith.events.jdbc-schema-initialization.enabled) is set to true. For details, please consult the schema overview in the appendix.

Event Serializer

每个日志条目都以序列化格式包含原始事件。spring-modulith-events-core 中包含的 EventSerializer 抽象允许插入不同的策略,以将事件实例转换成适合数据存储的格式。Spring Modulith 通过 spring-modulith-events-jackson 工件提供了一个基于 Jackson 的 JSON 实现,该实现通过标准 Spring Boot 自动配置默认注册一个使用 ObjectMapperJacksonEventSerializer

Each log entry contains the original event in serialized form. The EventSerializer abstraction contained in spring-modulith-events-core allows plugging different strategies for how to turn the event instances into a format suitable for the datastore. Spring Modulith provides a Jackson-based JSON implementation through the spring-modulith-events-jackson artifact, which registers a JacksonEventSerializer consuming an ObjectMapper through standard Spring Boot auto-configuration by default.

Customizing the Event Publication Date

默认情况下,事件发布注册表将使用 Clock.systemUTC() 返回的日期作为事件发布日期。如果您想自定义此功能,请用应用程序上下文中注册类型为 clock 的 Bean:

By default, the Event Publication Registry will use the date returned by the Clock.systemUTC() as event publication date. If you want to customize this, register a bean of type clock with the application context:

@Configuration
class MyConfiguration {

  @Bean
  Clock myCustomClock() {
    return … // Your custom Clock instance created here.
  }
}

Externalizing Events

应用程序模块之间交换的某些事件可能对外部系统很有趣。Spring Modulith 允许将选定的事件发布到各种消息代理。为了使用该支持,您需要执行以下步骤:

Some of the events exchanged between application modules might be interesting to external systems. Spring Modulith allows publishing selected events to a variety of message brokers. To use that support you need to take the following steps:

  1. Add the externalization.infrastructure to your project.

  2. Select event types to be externalized by annotating them with either Spring Modulith’s or jMolecules' @Externalized annotation.

  3. Specify the broker-specific routing target in the annotation’s value.

要了解如何使用其他方式选择要对外化的事件,或自定义它们在代理中的路由,请查看 Fundamentals of Event Externalization

To find out how to use other ways of selecting events for externalization, or customize their routing within the broker, check out Fundamentals of Event Externalization.

Supported Infrastructure

Broker Artifact Description

Kafka

spring-modulith-events-kafka

Uses Spring Kafka for the interaction with the broker. The logical routing key will be used as

AMQP

spring-modulith-events-amqp

Uses Spring AMQP for the interaction with any compatible broker. Requires an explicit dependency declaration for Spring Rabbit for example. The logical routing key will be used as AMQP routing key.

JMS

spring-modulith-events-jms

Uses Spring’s core JMS support. Does not support routing keys.

SQS

spring-modulith-events-aws-sqs

Uses Spring Cloud AWS SQS support. The logical routing key will be used as SQS message group id. When routing key is set, requires SQS queue to be configured as a FIFO queue.

SNS

spring-modulith-events-aws-sns

Uses Spring Cloud AWS SNS support. The logical routing key will be used as SNS message group id. When routing key is set, requires SNS to be configured as a FIFO topic with content based deduplication enabled.

Fundamentals of Event Externalization

事件外化对每个已发布的应用程序事件执行三个步骤。

The event externalization performs three steps on each application event published.

  1. Determining whether the event is supposed to be externalized — We refer to this as “event selection”. By default, only event types located within a Spring Boot auto-configuration package and annotated with one of the supported @Externalized annotations are selected for externalization.

  2. Mapping the event (optional) — By default, the event is serialized to JSON using the Jackson ObjectMapper present in the application and published as is. The mapping step allows developers to either customize the representation or even completely replace the original event with a representation suitable for external parties. Note, that the mapping step precedes the actual serialization of the to be published object.

  3. Determining a routing target — Message broker clients need a logical target to publish the message to. The target usually identifies physical infrastructure (a topic, exchange, or queue depending on the broker) and is often statically derived from the event type. Unless defined in the @Externalized annotation specifically, Spring Modulith uses the application-local type name as target. In other words, in a Spring Boot application with a base package of com.acme.app, an event type com.acme.app.sample.SampleEvent would get published to sample.SampleEvent.[.iokays-translated-ce592fc7c1a77cefd22eccc4f52a6437] 某些代理还允许定义动态路由键,该键在实际目标中用于不同的目的。默认情况下,不使用任何路由键。

Some brokers also allow to define a rather dynamic routing key, that is used for different purposes within the actual target. By default, no routing key is used.

Annotation-based Event Externalization Configuration

要通过 @Externalized 注解定义自定义路由键,可以使用 $target::$key 模式的每个特定注释中提供的 target/value 属性。键可以是 SpEL 表达式,它获取配置为根对象的事件实例。

To define a custom routing key via the @Externalized annotations, a pattern of $target::$key can be used for the target/value attribute available in each of the particular annotations. The key can be a SpEL expression which will get the event instance configured as root object.

Defining a dynamic routing key via SpEL expression
  • Java

  • Kotlin

@Externalized("customer-created::#{#this.getLastname()}") (2)
class CustomerCreated {

  String getLastname() { (1)
    // …
  }
}
@Externalized("customer-created::#{#this.getLastname()}") (2)
class CustomerCreated {
  fun getLastname(): String { (1)
    // …
  }
}

CustomerCreated 事件通过存取器方法显示客户的姓氏。然后通过 #this.getLastname() 表达式在 :: 分隔符之后的 target 声明中使用该方法。

The CustomerCreated event exposes the lastname of the customer via an accessor method. That method is then used via the #this.getLastname() expression in key expression following the :: delimiter of the target declaration.

如果密钥计算变得更复杂,建议将其委托给一个将事件作为参数的 Spring Bean:

If the key calculation becomes more involved, it is advisable to rather delegate that into a Spring bean that takes the event as argument:

Invoking a Spring bean to calculate a routing key
  • Java

  • Kotlin

@Externalized("…::#{@beanName.someMethod(#this)}")
@Externalized("…::#{@beanName.someMethod(#this)}")

Programmatic Event Externalization Configuration

spring-modulith-events-api 工件包含 EventExternalizationConfiguration,允许开发人员自定义以上提到的所有步骤。

The spring-modulith-events-api artifact contains EventExternalizationConfiguration that allows developers to customize all of the above mentioned steps.

Programmatically configuring event externalization
  • Java

  • Kotlin

@Configuration
class ExternalizationConfiguration {

  @Bean
  EventExternalizationConfiguration eventExternalizationConfiguration() {

    return EventExternalizationConfiguration.externalizing()                 (1)
      .select(EventExternalizationConfiguration.annotatedAsExternalized())   (2)
      .mapping(SomeEvent.class, it -> …)                                     (3)
      .routeKey(WithKeyProperty.class, WithKeyProperty::getKey)              (4)
      .build();
  }
}
@Configuration
class ExternalizationConfiguration {

  @Bean
  fun eventExternalizationConfiguration(): EventExternalizationConfiguration {

    EventExternalizationConfiguration.externalizing()                         (1)
      .select(EventExternalizationConfiguration.annotatedAsExternalized())    (2)
      .mapping(SomeEvent::class, it -> …)                                     (3)
      .routeKey(WithKeyProperty::class, WithKeyProperty::getKey)              (4)
      .build()
  }
}
1 We start by creating a default instance of EventExternalizationConfiguration.
2 We customize the event selection by calling one of the select(…) methods on the Selector instance returned by the previous call. This step fundamentally disables the application base package filter as we only look for the annotation now. Convenience methods to easily select events by type, by packages, packages and annotation exist. Also, a shortcut to define selection and routing in one step.
3 We define a mapping step for SomeEvent instances. Note, that the routing will still be determined by the original event instance, unless you additionally call ….routeMapped() on the router.
4 We finally determine a routing key by defining a method handle to extract a value of the event instance. Alternatively, a full RoutingKey can be produced for individual events by using the general route(…) method on the Router instance returned from the previous call.

Testing published events

以下部分描述了仅关注跟踪 Spring 应用程序事件的测试方法。有关使用 @ApplicationModuleListener 的测试模块的更全面方法,请查看 Scenario API

The following section describes a testing approach solely focused on tracking Spring application events. For a more holistic approach on testing modules that use @ApplicationModuleListener, please check out the Scenario API.

Spring Modulith 的 @ApplicationModuleTest 使能够将 PublishedEvents 实例注入到测试方法中,以验证在测试的业务操作过程中已发布特定的事件集。

Spring Modulith’s @ApplicationModuleTest enables the ability to get a PublishedEvents instance injected into the test method to verify a particular set of events has been published during the course of the business operation under test.

Event-based integration testing of the application module arrangement
  • Java

  • Kotlin

@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  void someTestMethod(*PublishedEvents events*) {

    // …
    var matchingMapped = events.ofType(OrderCompleted.class)
      .matching(OrderCompleted::getOrderId, reference.getId());

    assertThat(matchingMapped).hasSize(1);
  }
}
@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  fun someTestMethod(events: PublishedEvents events) {

    // …
    var matchingMapped = events.ofType(OrderCompleted::class)
      .matching(OrderCompleted::getOrderId, reference.getId())

    assertThat(matchingMapped).hasSize(1)
  }
}

请注意,PublishedEvents`如何公开 API 来选择匹配特定条件的事件。验证通过 AssertJ 断言完成,该断言验证预期元素的数量。如果您对这些断言使用 AssertJ,则还可以将 `AssertablePublishedEvents 用作测试方法参数类型,并使用它提供的流利断言 API。

Note, how PublishedEvents exposes API to select events matching a certain criteria. The verification is concluded by an AssertJ assertion that verifies the number of elements expected. If you are using AssertJ for those assertions anyway, you can also use AssertablePublishedEvents as test method parameter type and use the fluent assertion APIs provided through that.

Using AssertablePublishedEvents to verify event publications
  • Java

  • Kotlin

@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  void someTestMethod(*AssertablePublishedEvents events*) {

    // …
    assertThat(events)
      .contains(OrderCompleted.class)
      .matching(OrderCompleted::getOrderId, reference.getId());
  }
}
@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  fun someTestMethod(events: AssertablePublishedEvents) {

    // …
    assertThat(events)
      .contains(OrderCompleted::class)
      .matching(OrderCompleted::getOrderId, reference.getId())
  }
}

请注意,assertThat(…) 表达式返回的类型如何允许直接定义已发布事件上的约束。

Note, how the type returned by the assertThat(…) expression allows to define constraints on the published events directly.