Apache Cassandra Support

Spring Integration 提供了通道适配器(从 6.0 版开始),用于对 Apache Cassandra 集群执行数据库操作。它完全基于 { Spring Data for Apache Cassandra} 项目。

Spring Integration provides channel adapters (starting with version 6.0) for performing database operations against an Apache Cassandra cluster. It is fully based on the Spring Data for Apache Cassandra project.

你需要将此依赖项包含在你的项目中:

You need to include this dependency into your project:

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-cassandra</artifactId>
    <version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-cassandra:{project-version}"

Cassandra Outbound Components

CassandraMessageHandler 是一个 AbstractReplyProducingMessageHandler 实现,可以在单向(默认)和请求-回复模式(producesReply 选项)中工作。它在默认情况下是异步的(setAsync(false) 可重置),并针对提供的 ReactiveCassandraOperations 执行响应式的 INSERTUPDATEDELETESTATEMENT 操作。操作类型可以通过 CassandraMessageHandler.Type 选项配置。ingestQuery 将模式设置为 INSERTquerystatementExpressionstatementProcessor 将模式设置为 STATEMENT

The CassandraMessageHandler is an AbstractReplyProducingMessageHandler implementation and can work in both one-way (default) and request-reply modes (a producesReply option). It is asynchronous by default (setAsync(false) to reset) and performs reactive INSERT, UPDATE, DELETE or STATEMENT operations against the provided ReactiveCassandraOperations. The type of operation can be configured via the CassandraMessageHandler.Type option. The ingestQuery sets the mode into an INSERT; the query or statementExpression, or statementProcessor sets the mode into a STATEMENT.

以下代码段演示了此通道适配器或网关的各种配置:

The following code snippets demonstrate various configuration for this channel adapter or gateway:

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
IntegrationFlow cassandraSelectFlow(ReactiveCassandraOperations cassandraOperations) {
    return flow -> flow
            .handle(Cassandra.outboundGateway(cassandraOperations)
                    .query("SELECT * FROM book WHERE author = :author limit :size")
                    .parameter("author", "payload")
                    .parameter("size", m -> m.getHeaders().get("limit")))
            .channel(c -> c.flux("resultChannel"));
}
@Bean
fun outboundReactive(cassandraOperations: ReactiveCassandraOperations) =
    integrationFlow {
        handle(
            Cassandra.outboundChannelAdapter(cassandraOperations)
                              .statementExpression("T(QueryBuilder).truncate('book').build()")
        ) { async(false) }
    }
@ServiceActivator(inputChannel = "cassandraSelectChannel")
@Bean
public MessageHandler cassandraMessageHandler() {
    CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
    cassandraMessageHandler.setQuery("SELECT * FROM book WHERE author = :author limit :size");

    Map<String, Expression> params = new HashMap<>();
    params.put("author", PARSER.parseExpression("payload"));
    params.put("size", PARSER.parseExpression("headers.limit"));

    cassandraMessageHandler.setParameterExpressions(params);

    cassandraMessageHandler.setOutputChannel(resultChannel());
    cassandraMessageHandler.setProducesReply(true);
    return cassandraMessageHandler;
}
<int-cassandra:outbound-channel-adapter id="outboundAdapter"
                                        cassandra-template="cassandraTemplate"
                                        write-options="writeOptions"
                                        auto-startup="false"
                                        async="false"/>

<int-cassandra:outbound-gateway id="outgateway"
                                request-channel="input"
                                cassandra-template="cassandraTemplate"
                                mode="STATEMENT"
                                write-options="writeOptions"
                                query="SELECT * FROM book limit :size"
                                reply-channel="resultChannel"
                                auto-startup="true">
    <int-cassandra:parameter-expression name="author" expression="payload"/>
    <int-cassandra:parameter-expression name="size" expression="headers.limit"/>
</int-cassandra:outbound-gateway>

如果 CassandraMessageHandler 在默认异步模式下用作网关,则会产生一个 Mono<WriteResult>,该结果会根据提供的 MessageChannel 实现进行处理。对于真正的响应式处理,建议将 FluxMessageChannel 用于输出通道配置。在同步模式下,调用 Mono.block() 以获取回复值。

If a CassandraMessageHandler is used as a gateway in the default async mode, a Mono<WriteResult> is produced, which is handled according to the provided MessageChannel implementation. For true reactive processing a FluxMessageChannel is recommended for the output channel configuration. In sync mode Mono.block() is called to obtain the reply value.

如果执行 INSERTUPDATEDELETE 操作,则在请求消息有效负载中预期一个实体(标记为 org.springframework.data.cassandra.core.mapping.Table)。如果有效负载是实体列表,则执行相应的批处理操作。

If INSERT, UPDATE or DELETE operations are performed, an entity (marked org.springframework.data.cassandra.core.mapping.Table) is expected in the request message payload. If the payload is a list of entities, then the respective batch operation is performed.

ingestQuery 模式期望有效负载作为要插入的值矩阵存在 - List<List<?>>。例如,如果实体如下所示:

The ingestQuery mode expects the payload to be present as a matrix of values to insert - List<List<?>>. For example, if the entity is like this:

@Table("book")
public record Book(@PrimaryKey String isbn,
                   String title,
                   @Indexed String author,
                   int pages,
                   LocalDate saleDate,
                   boolean isInStock) {

}

并且通道适配器具有此配置:

And channel adapter has this configuration:

@Bean
public MessageHandler cassandraMessageHandler3() {
    CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
    String cqlIngest = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)";
    cassandraMessageHandler.setIngestQuery(cqlIngest);
    cassandraMessageHandler.setAsync(false);
    return cassandraMessageHandler;
}

请求消息有效负载必须按如下所示转换:

The request message payload must be converted like this:

List<List<Object>> ingestBooks =
    payload.stream()
            .map(book ->
                    List.<Object>of(
                            book.isbn(),
                            book.title(),
                            book.author(),
                            book.pages(),
                            book.saleDate(),
                            book.isInStock()))
            .toList();

对于更复杂的用例,有效负载可以作为 com.datastax.oss.driver.api.core.cql.Statement 实例。建议使用 com.datastax.oss.driver.api.querybuilder.QueryBuilder API 构建各种语句以针对 Apache Cassandra 执行。例如,若要删除 Book 表中的所有数据,可以向 CassandraMessageHandler 发送一个有效负载如下所示的消息:QueryBuilder.truncate("book").build()。或者,对于基于请求消息的逻辑,可以为 CassandraMessageHandler 提供 statementExpressionstatementProcessor 以基于该消息构建一个 Statement。为了方便起见,将 com.datastax.oss.driver.api.querybuilder 注册为 SpEL 评估上下文中的 import,因此目标表达式可以像这样简单:

For more sophisticated use-cases, the payload can be as an instance of com.datastax.oss.driver.api.core.cql.Statement. The com.datastax.oss.driver.api.querybuilder.QueryBuilder API is recommended to build various statements to execute against Apache Cassandra. For example, to remove all the data from the Book table, a message with a payload like this can be sent to the CassandraMessageHandler: QueryBuilder.truncate("book").build(). Alternatively, for logic based on a request message, a statementExpression or statementProcessor can be provided for the CassandraMessageHandler to build a Statement based on that message. For convenience, a com.datastax.oss.driver.api.querybuilder is registered as an import into a SpEL evaluation context, so a target expression can be as simple as this:

statement-expression="T(QueryBuilder).selectFrom("book").all()"

setParameterExpressions(Map<String, Expression> parameterExpressions) 表示可绑定的命名查询参数,仅与 setQuery(String query) 选项一起使用。请参阅上面提到的 Java 和 XML 示例。

The setParameterExpressions(Map<String, Expression> parameterExpressions) represents bindable named query parameters and is used only with a setQuery(String query) option. See Java and XML samples mentioned above.