Apache Cassandra Support
Spring Integration 提供了通道适配器(从 6.0 版开始),用于对 Apache Cassandra 集群执行数据库操作。它完全基于 { Spring Data for Apache Cassandra} 项目。
Spring Integration provides channel adapters (starting with version 6.0) for performing database operations against an Apache Cassandra cluster. It is fully based on the Spring Data for Apache Cassandra project.
你需要将此依赖项包含在你的项目中:
You need to include this dependency into your project:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-cassandra</artifactId>
<version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-cassandra:{project-version}"
Cassandra Outbound Components
CassandraMessageHandler
是一个 AbstractReplyProducingMessageHandler
实现,可以在单向(默认)和请求-回复模式(producesReply
选项)中工作。它在默认情况下是异步的(setAsync(false)
可重置),并针对提供的 ReactiveCassandraOperations
执行响应式的 INSERT
、UPDATE
、DELETE
或 STATEMENT
操作。操作类型可以通过 CassandraMessageHandler.Type
选项配置。ingestQuery
将模式设置为 INSERT
;query
或 statementExpression
或 statementProcessor
将模式设置为 STATEMENT
。
The CassandraMessageHandler
is an AbstractReplyProducingMessageHandler
implementation and can work in both one-way (default) and request-reply modes (a producesReply
option).
It is asynchronous by default (setAsync(false)
to reset) and performs reactive INSERT
, UPDATE
, DELETE
or STATEMENT
operations against the provided ReactiveCassandraOperations
.
The type of operation can be configured via the CassandraMessageHandler.Type
option.
The ingestQuery
sets the mode into an INSERT
; the query
or statementExpression
, or statementProcessor
sets the mode into a STATEMENT
.
以下代码段演示了此通道适配器或网关的各种配置:
The following code snippets demonstrate various configuration for this channel adapter or gateway:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
IntegrationFlow cassandraSelectFlow(ReactiveCassandraOperations cassandraOperations) {
return flow -> flow
.handle(Cassandra.outboundGateway(cassandraOperations)
.query("SELECT * FROM book WHERE author = :author limit :size")
.parameter("author", "payload")
.parameter("size", m -> m.getHeaders().get("limit")))
.channel(c -> c.flux("resultChannel"));
}
@Bean
fun outboundReactive(cassandraOperations: ReactiveCassandraOperations) =
integrationFlow {
handle(
Cassandra.outboundChannelAdapter(cassandraOperations)
.statementExpression("T(QueryBuilder).truncate('book').build()")
) { async(false) }
}
@ServiceActivator(inputChannel = "cassandraSelectChannel")
@Bean
public MessageHandler cassandraMessageHandler() {
CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
cassandraMessageHandler.setQuery("SELECT * FROM book WHERE author = :author limit :size");
Map<String, Expression> params = new HashMap<>();
params.put("author", PARSER.parseExpression("payload"));
params.put("size", PARSER.parseExpression("headers.limit"));
cassandraMessageHandler.setParameterExpressions(params);
cassandraMessageHandler.setOutputChannel(resultChannel());
cassandraMessageHandler.setProducesReply(true);
return cassandraMessageHandler;
}
<int-cassandra:outbound-channel-adapter id="outboundAdapter"
cassandra-template="cassandraTemplate"
write-options="writeOptions"
auto-startup="false"
async="false"/>
<int-cassandra:outbound-gateway id="outgateway"
request-channel="input"
cassandra-template="cassandraTemplate"
mode="STATEMENT"
write-options="writeOptions"
query="SELECT * FROM book limit :size"
reply-channel="resultChannel"
auto-startup="true">
<int-cassandra:parameter-expression name="author" expression="payload"/>
<int-cassandra:parameter-expression name="size" expression="headers.limit"/>
</int-cassandra:outbound-gateway>
如果 CassandraMessageHandler
在默认异步模式下用作网关,则会产生一个 Mono<WriteResult>
,该结果会根据提供的 MessageChannel
实现进行处理。对于真正的响应式处理,建议将 FluxMessageChannel
用于输出通道配置。在同步模式下,调用 Mono.block()
以获取回复值。
If a CassandraMessageHandler
is used as a gateway in the default async mode, a Mono<WriteResult>
is produced, which is handled according to the provided MessageChannel
implementation.
For true reactive processing a FluxMessageChannel
is recommended for the output channel configuration.
In sync mode Mono.block()
is called to obtain the reply value.
如果执行 INSERT
、UPDATE
或 DELETE
操作,则在请求消息有效负载中预期一个实体(标记为 org.springframework.data.cassandra.core.mapping.Table
)。如果有效负载是实体列表,则执行相应的批处理操作。
If INSERT
, UPDATE
or DELETE
operations are performed, an entity (marked org.springframework.data.cassandra.core.mapping.Table
) is expected in the request message payload.
If the payload is a list of entities, then the respective batch operation is performed.
ingestQuery
模式期望有效负载作为要插入的值矩阵存在 - List<List<?>>
。例如,如果实体如下所示:
The ingestQuery
mode expects the payload to be present as a matrix of values to insert - List<List<?>>
.
For example, if the entity is like this:
@Table("book")
public record Book(@PrimaryKey String isbn,
String title,
@Indexed String author,
int pages,
LocalDate saleDate,
boolean isInStock) {
}
并且通道适配器具有此配置:
And channel adapter has this configuration:
@Bean
public MessageHandler cassandraMessageHandler3() {
CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
String cqlIngest = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)";
cassandraMessageHandler.setIngestQuery(cqlIngest);
cassandraMessageHandler.setAsync(false);
return cassandraMessageHandler;
}
请求消息有效负载必须按如下所示转换:
The request message payload must be converted like this:
List<List<Object>> ingestBooks =
payload.stream()
.map(book ->
List.<Object>of(
book.isbn(),
book.title(),
book.author(),
book.pages(),
book.saleDate(),
book.isInStock()))
.toList();
对于更复杂的用例,有效负载可以作为 com.datastax.oss.driver.api.core.cql.Statement
实例。建议使用 com.datastax.oss.driver.api.querybuilder.QueryBuilder
API 构建各种语句以针对 Apache Cassandra 执行。例如,若要删除 Book
表中的所有数据,可以向 CassandraMessageHandler
发送一个有效负载如下所示的消息:QueryBuilder.truncate("book").build()
。或者,对于基于请求消息的逻辑,可以为 CassandraMessageHandler
提供 statementExpression
或 statementProcessor
以基于该消息构建一个 Statement
。为了方便起见,将 com.datastax.oss.driver.api.querybuilder
注册为 SpEL 评估上下文中的 import
,因此目标表达式可以像这样简单:
For more sophisticated use-cases, the payload can be as an instance of com.datastax.oss.driver.api.core.cql.Statement
.
The com.datastax.oss.driver.api.querybuilder.QueryBuilder
API is recommended to build various statements to execute against Apache Cassandra.
For example, to remove all the data from the Book
table, a message with a payload like this can be sent to the CassandraMessageHandler
: QueryBuilder.truncate("book").build()
.
Alternatively, for logic based on a request message, a statementExpression
or statementProcessor
can be provided for the CassandraMessageHandler
to build a Statement
based on that message.
For convenience, a com.datastax.oss.driver.api.querybuilder
is registered as an import
into a SpEL evaluation context, so a target expression can be as simple as this:
statement-expression="T(QueryBuilder).selectFrom("book").all()"
setParameterExpressions(Map<String, Expression> parameterExpressions)
表示可绑定的命名查询参数,仅与 setQuery(String query)
选项一起使用。请参阅上面提到的 Java 和 XML 示例。
The setParameterExpressions(Map<String, Expression> parameterExpressions)
represents bindable named query parameters and is used only with a setQuery(String query)
option.
See Java and XML samples mentioned above.