A2.领域驱动设计(DDD)之领域事件与Spring Integration JDBC融合实践

前言

这是一个基于Java 21 的 六边形架构与领域驱动设计的一个通用项目,并且结合现有的最新版本技术架构实现了 领域驱动设计模式和六边形架构模式组件定义. 并且结合微服务,介绍了领域层,领域事件,资源库,分布式锁,序列化,安全认证,日志等,并提供了实现功能. 并且我会以日常发布文章和更新代码的形式来完善它.

开篇

Spring Integration 中文参考文档: https://www.iokays.com/spring-integration/index.html Spring Integration JDBC 中文参考文档: https://www.iokays.com/spring-integration/jdbc.html

领域事件

重要的事件肯定会在系统其它地方引起反应,因此理解为什么会有这些反应同样也很重要。 领域事件是一个领域模型中极其重要的部分,用来表示领域中发生的事件。忽略不相关的领域活动, 同时明确领域专家要跟踪或希望被通知的事情,或与其他模型对象中的状态更改相关联。

实现领域驱动设计这本书已经给出了几种事件的机制:

domain event

我们要处理的是使用 本地消息表 来实现领域事件的存储, 因为当业务数据和领域事件处在同一个数据库中,我们可以使用同一个数据库事务机制来保证业务数据和领域事件同时存储。 既做到了事件不会丢失,又做到了领域事件不会因为业务回滚导致出现脏事件。

Spring Data JPA

该系列我们已经讲了使用Spring Data JPA来实现领域层的值对象,实体和聚合根,并且知道聚合根管理了领域事件,由Spring Event进行监听,并自动发送领域事件。

package com.iokays.common.domain.jpa;

import com.iokays.common.core.domain.AggregateRoot;
import com.iokays.common.core.event.DomainEvent;
import jakarta.persistence.MappedSuperclass;
import org.springframework.data.annotation.Transient;
import org.springframework.data.domain.AfterDomainEventPublication;
import org.springframework.data.domain.DomainEvents;
import org.springframework.util.Assert;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

/**
 * 聚合根
 * 该聚合根实现了Spring Data的{@link DomainEvents}和{@link AfterDomainEventPublication}注解,
 * 复用了AbstractAggregateRoot 的代码
 *
 * @param <A>
 * @see org.springframework.data.domain.AbstractAggregateRoot
 */
@MappedSuperclass
public abstract class AbstractAggregateRoot<A extends AbstractAggregateRoot<A>> extends ConcurrencySafeEntity<A> implements AggregateRoot {

    @Transient
    private final transient List<DomainEvent> domainEvents = new ArrayList<>();

    /**
     * Registers the given event object for publication on a call to a Spring Data repository's save or delete methods.
     *
     * @param event must not be {@literal null}.
     * @return the event that has been added.
     * @see #andEvent(DomainEvent)
     */
    protected DomainEvent registerEvent(DomainEvent event) {

        Assert.notNull(event, "Domain event must not be null");

        this.domainEvents.add(event);
        return event;
    }

    /**
     * Clears all domain events currently held. Usually invoked by the infrastructure in place in Spring Data
     * repositories.
     */
    @AfterDomainEventPublication
    protected void clearDomainEvents() {
        this.domainEvents.clear();
    }

    /**
     * All domain events currently captured by the aggregate.
     */
    @DomainEvents
    protected Collection<DomainEvent> domainEvents() {
        return Collections.unmodifiableList(domainEvents);
    }

    /**
     * Adds all events contained in the given aggregate to the current one.
     *
     * @param aggregate must not be {@literal null}.
     * @return the aggregate
     */
    protected final A andEventsFrom(A aggregate) {

        Assert.notNull(aggregate, "Aggregate must not be null");

        this.domainEvents.addAll(aggregate.domainEvents());

        return (A) this;
    }

    /**
     * Adds the given event to the aggregate for later publication
     * when calling a Spring Data repository's save or delete method.
     * Does the same as {@link #registerEvent(DomainEvent)} but returns the aggregate instead of the event.
     *
     * @param event must not be {@literal null}.
     * @return the aggregate
     * @see #registerEvent(DomainEvent)
     */
    protected final A andEvent(DomainEvent event) {

        registerEvent(event);

        return (A) this;
    }
}

registerEvent(): 注册事件.

domainEvents(): 因为含有@DomainEvents注解,在Spring Data JPA 在执行(save,delete)操作时,会将事件自动发布出去.

最后我们可以使用Spring Event来进行监听,这部分我们自己利用本地消息表的机制来存储事件。也可以使用下面将要介绍的 Spring Integration JDBC

@Transactional(propagation = Propagation.MANDATORY, isolation = Isolation.REPEATABLE_READ)
class DomainEventBusMessagePublisher {

    @EventListener
    public void handle(final DomainEvent<?> evt) {
        //消息发布[入库], 分布式事务本地消息表的持久化的位置
        println(STR. "消息发布[入库]: 客户已注册事件: \{ evt }" );
    }
}

Spring Integration JDBC

Spring Integration 是一套强大的事件驱动消息框架,可以在应用(系统)间进行消息的传递。

同时提供了大量的通道适配器。一个将消息通道连接到其他系统或传送方式的端点。通道适配器可以是入站还是出站的。通常,通道适配器会在消息与从其他系统接收或发送至其他系统的 任何对象或资源(文件、HTTP 请求、JMS 消息等)之间进行一些映射。

spring integration message channel

通道适配器提供了对Spring Event的支持,支持SpringEvent与Spring Integration消息的自动映射。

spring integration source endpoint

最左侧部分表示SpringEvent, 将SpringEvent映射为 Message

解决了用SpringEvent实现的领域事件自动发送,怎么将消息保存起来,Spring Integration 提供了基于数据库存储的适配器,满足我们的需求。我们使用的是 Spring Integration JDBC, 同时Spring 也对Mysql, Pgsql提供了默认的实现。

首先我们使用出站适配器将消息 Message 转为数据库存储格式。 因为出站适配器的操作和业务是在同一个事务中执行的,所以保证了领域事件入库的准确性。

spring integration target endpoint

然后使用入站适配器将数据库存储格式转为Message,

spring integration source endpoint

最后将Message发送到指定的MQ队列中,并标记消息已发送, 整体的映射流程:

SpringEvent(入站) → Message → 写入数据库Message格式(出站) → 读取数据库Message格式(入站) → Message → 发送.

代码部分

首先我们先看看 Spring Integration JDBC 最简单的应用, 改代码部分在 sample-spring-integration-jdbc 模块。 因为Spring已经使用了一套默认的JDBC实现,我们可以先简单的调用测试下。

第一步: 我们需要在 application.properties 中配置数据库连接信息和默认库表的生成。

spring.datasource.url=jdbc:h2:D:/Data/h2/test-sample-spring-integration-jdbc.mv.db;AUTO_SERVER=TRUE
spring.datasource.driver-class-name=org.h2.Driver
spring.integration.jdbc.platform=h2
# create tables
spring.integration.jdbc.initialize-schema=always

第二步: 我们直接添加Spring Integration JDBC的配置类即可:

package com.iokays.sample;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
import org.springframework.integration.jdbc.store.channel.H2ChannelMessageStoreQueryProvider;

import javax.sql.DataSource;

/**
 * Spring Integration,SpringEvent配置
 */
@Configuration
@EnableIntegration
public class MyDomainEventStoreIntegrationConfiguration {

    @Bean
    public JdbcChannelMessageStore domainEventMessageStore(final DataSource dataSource) {
        final var result = new JdbcChannelMessageStore(dataSource);

        //提供默认的SQL
        result.setChannelMessageStoreQueryProvider(new H2ChannelMessageStoreQueryProvider());

        //将Spring Integration 的Message默认序列化成JSON序列化方式
        result.setSerializer(DefaultJacksonMessageMapper::toJson);
        result.setDeserializer(DefaultJacksonMessageMapper::fromJsonStream);

        result.setPriorityEnabled(true);
        return result;
    }

}

我们可以查看下 Message实体,序列化的简单实现。

package com.iokays.sample;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.springframework.integration.support.MutableMessage;

import java.util.Map;

/**
 * 替代 Spring Integration 的 Message 默认序列化的实体类
 */
public class DefaultJacksonMessage extends MutableMessage<Object> {

    @JsonCreator
    public DefaultJacksonMessage(@JsonProperty("payload") final Object payload,
                                 @JsonProperty("headers") final Map<String, Object> headers) {
        super(payload, headers);
    }
}
package com.iokays.sample;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import io.vavr.control.Try;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.InputStream;
import java.io.OutputStream;

/**
 * DefaultJacksonMessage 序列化工具
 */
public class DefaultJacksonMessageMapper {

    private static final ObjectMapper objectMapper = new ObjectMapper();

    static {
        objectMapper.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, true);
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
        objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
    }

    /**
     * 将对象序列化为JSON字节流 [序列化]
     *
     * @param object
     * @param outputStream
     */
    public static void toJson(Object object, OutputStream outputStream) {
        Try.run(() -> objectMapper.writeValue(outputStream, object));
    }

    /**
     * 将JSON字节流转换为对象 [反序列化]
     *
     * @param inputStream
     * @return
     */
    public static DefaultJacksonMessage fromJsonStream(InputStream inputStream) {
        return Try.of(() -> objectMapper.readValue(inputStream, DefaultJacksonMessage.class)).get();
    }
}

最后,我们在测试类中直接调用即可。

package com.iokays.sample;

import jakarta.annotation.Resource;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
import org.springframework.messaging.Message;
import org.springframework.transaction.annotation.Transactional;

@SpringBootTest
class SampleTest {

    @Resource
    private JdbcChannelMessageStore jdbcChannelMessageStore;

    @Test
    @Transactional
    void test() {
        final var groupId = "order";
        jdbcChannelMessageStore.addMessageToGroup(groupId, new DefaultJacksonMessage("1", null));
        jdbcChannelMessageStore.addMessageToGroup(groupId, new DefaultJacksonMessage("2", null));

        //断言消息的个数为2
        Assertions.assertEquals(2, jdbcChannelMessageStore.messageGroupSize(groupId));

        final Message<?> message = jdbcChannelMessageStore.pollMessageFromGroup(groupId);

        //断言消息内容为1
        Assertions.assertEquals("1", message.getPayload());

        //断言消息的个数为1, 因为调用DEL语句删除了。
        Assertions.assertEquals(1, jdbcChannelMessageStore.messageGroupSize(groupId));

    }

}

执行的一些SQL

SELECT INT_CHANNEL_MESSAGE.MESSAGE_ID, INT_CHANNEL_MESSAGE.MESSAGE_BYTES
from INT_CHANNEL_MESSAGE
where INT_CHANNEL_MESSAGE.GROUP_KEY = ? and INT_CHANNEL_MESSAGE.REGION = ?
order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE LIMIT 1

DELETE from INT_CHANNEL_MESSAGE where MESSAGE_ID=? and GROUP_KEY=? and REGION=?

我们看到默认的查询,只能每次查询一条记录,并且消息的处理方式是物理删除。这样远远不能满足我们的需求,我们需要批量的查询多条数据,并且消息需要永久持久化。 那么我们利用本地消息表的机制和Spring Integration JDBC 提供的管道适配器来解决我们的问题。 该代码部分在 common-domain-event-store-with-spring-integration-jdbc common-domain-event-send-with-spring-integration-jdbc 两个模块。 分成两个模块的原因是事件的存储和发送可能不在同一套应用或系统上。 存储侧只需要保证业务数据和领域事件同时持久化即可, 发送侧只需要保证消息按顺序发送即可。 整个配置过程,我们只需要处理出站,入站适配器就可以了。

第一步:建立本地消息表.

CREATE TABLE LOCAL_MESSAGE  (
	MESSAGE_ID CHAR(36) NOT NULL,
	MESSAGE_SEQUENCE BIGINT NOT NULL AUTO_INCREMENT UNIQUE,
	MESSAGE_BYTES BLOB NOT NULL,
	MESSAGE_STATE INT NOT NULL,
	CREATED_DATE DATETIME(6) NOT NULL,
	UPDATED_DATE DATETIME(6) NOT NULL,
	constraint INT_MESSAGE_PK primary key (MESSAGE_ID)
) ENGINE=InnoDB;

第二步:建立消息出站适配器, 并于SpringEvent监听的领域事件进行绑定。

package com.iokays.common.domain.event.store;

import com.iokays.common.core.event.DomainEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.event.inbound.ApplicationEventListeningMessageProducer;
import org.springframework.integration.jdbc.JdbcMessageHandler;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.GenericMessage;

import javax.sql.DataSource;
import java.sql.Date;
import java.util.Objects;

/**
 * Spring Integration,SpringEvent配置
 */
@Configuration
@EnableIntegration
public class MyDomainEventStoreIntegrationConfiguration {

    //logger
    private static final Logger log = LoggerFactory.getLogger(MyDomainEventStoreIntegrationConfiguration.class);

    /**
     * 存储领域事件的出站适配器
     *
     * @param dataSource
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "storeDomainEventChannel")
    public MessageHandler jdbcMessageHandler(DataSource dataSource) {
        final var jdbcMessageHandler = new JdbcMessageHandler(dataSource,
                "INSERT INTO LOCAL_MESSAGE (MESSAGE_ID, MESSAGE_BYTES, MESSAGE_STATE, CREATED_DATE, UPDATED_DATE) VALUES (?, ?, 0, ?, ?)");
        jdbcMessageHandler.setPreparedStatementSetter((ps, m) -> {
            if (m instanceof GenericMessage<?> gm) {
                ps.setString(1, Objects.requireNonNull(gm.getHeaders().getId()).toString());
                ps.setBytes(2, DomainEventInputMessageMapper.toBytes(gm.getPayload()));
                final var time = new Date(Objects.requireNonNull(gm.getHeaders().getTimestamp()));
                ps.setDate(3, time);
                ps.setDate(4, time);
            } else {
                throw new IllegalArgumentException("message type not support");
            }
            log.info("message: {}", m);
        });
        return jdbcMessageHandler;
    }

    /**
     * 监听领域事件, 将事件发送到storeDomainEventChannel
     *
     * @return
     */
    @Bean
    public ApplicationEventListeningMessageProducer domainEventMessageProducer() {
        final var result = new ApplicationEventListeningMessageProducer();
        result.setEventTypes(DomainEvent.class);
        result.setOutputChannelName("storeDomainEventChannel");
        return result;
    }

}

事件的存储就完成,接下来我们在 common-domain-event-send-with-spring-integration-jdbc 处理领域事件从数据库到入站适配器发送的过程。

package com.iokays.common.domain.event.send;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;

import javax.sql.DataSource;
import java.util.List;
import java.util.Map;

/**
 * 基于数据库的入站适配器
 * 读取数据库中的事件,并且更新数据库
 */
@Configuration
@EnableIntegration
public class MyDomainEventSendIntegrationConfiguration {

    //logger
    private static final Logger log = LoggerFactory.getLogger(MyDomainEventSendIntegrationConfiguration.class);

    /**
     * 从数据库中读取未发送的事件,标记为已发送【1】
     *
     * @param dataSource
     * @return
     */
    @Bean
    public MessageSource<Object> jdbcMessageSource(DataSource dataSource) {
        final var messageSource = new JdbcPollingChannelAdapter(dataSource, "SELECT * FROM LOCAL_MESSAGE WHERE MESSAGE_STATE = 0 ORDER BY MESSAGE_SEQUENCE LIMIT 1024");
        messageSource.setUpdateSql("UPDATE LOCAL_MESSAGE SET MESSAGE_STATE = 1 WHERE MESSAGE_ID in (:ID)");
        messageSource.setUpdateSqlParameterSourceFactory(v -> {
            final List<Map<String, Object>> list = (List<Map<String, Object>>) v;
            //获取查询出来的全部 MESSAGE_ID,并更新为 1
            final var ids = list.stream().map(v1 -> v1.get("MESSAGE_ID")).map(Object::toString).toList();
            return new MapSqlParameterSource().addValue("ID", ids);
        });
        return messageSource;
    }

    /**
     * 轮询读取数据库中的事件
     *
     * @param jdbcMessageSource
     * @return
     */
    @Bean
    public IntegrationFlow integrationFlow(final MessageSource<Object> jdbcMessageSource) {
        return IntegrationFlow
                .from(jdbcMessageSource, e -> e.poller(Pollers.fixedRate(100).transactional()))
                .handle(v -> {
                    //将这一步改为 发送到MQ, 改为MQ的出站通道适配器
                    log.info("message: {}", v);
                })
                .get();
    }

}

一个简单完整的流程就处理完了, 当领域层域事件发送到消息队列(已在相同事务处理)后,再异步将消息发送到MQ,各大业务监听MQ的消息,处理业务。

未完待续…​

基于Spring Integration JDBC对领域事件的处理完了, 下篇将会介绍微服务自定义注解分布式锁与Spring Integration Redis融合实践。