Debezium Support
Debezium Engine,变更数据捕获 (CDC) 入站通道适配器。DebeziumMessageProducer
允许捕获数据库变更事件,将它们转换为消息,然后流传输到出站通道。
您需要将 Spring Integration Debezium 依赖项包含到您的项目:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-debezium</artifactId>
<version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-debezium:{project-version}"
你还需要为输入数据库包含 debezium connector 依赖项。例如,要将 Debezium 与 PostgreSQL 配合使用,你需要 postgres debezium 连接器:
-
Maven
-
Gradle
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${debezium-version}</version>
</dependency>
compile "io.debezium:debezium-connector-postgres:{debezium-version}"
用与正在使用的 |
Inbound Debezium Channel Adapter
Debezium 适配器需要预先配置好的 DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>>
实例。
debezium-supplier 提供了一个开箱即用的 |
Debezium Java DSL 可以从提供的 |
此外,DebeziumMessageProducer
可以使用以下配置属性进行调整:
-
contentType
- 允许处理JSON
(默认)、AVRO
和PROTOBUF
消息内容。contentTypemust
可以与为提供的DebeziumEngine.Builder
所配置的SerializationFormat
对齐。 -
enableBatch
- 当设置为false
(默认) 时,Debezium 适配器将为从源数据库接收到的每个ChangeEvent
数据变更事件发送新的Message
。如果设置为true
,则该适配器会向下游发送单个Message
,用于从 Debezium 引擎接收的每批ChangeEvent
。此类有效内容不可序列化,需要自定义序列化/反序列化实现。 -
enableEmptyPayload
- 启用对墓碑(又名删除)消息的支持。在数据库行删除时,Debezium 可以发送一个墓碑变更事件,该事件具有与已删除行相同的键和Optional.empty
的值。默认为false
。 -
headerMapper
- 自定义HeaderMapper
实现,该实现允许选择并将ChangeEvent
标头转换为Message
标头。默认DefaultDebeziumHeaderMapper
实现为setHeaderNamesToMap
提供了一个 setter。默认情况下,会映射所有标头。 -
taskExecutor
- 为 Debezium 引擎设置自定义TaskExecutor
。
下面的代码片段演示此通道适配器的各种配置:
Configuring with Java Configuration
以下 Spring Boot 应用展示了如何通过 Java 配置来配置入站适配器的一个示例:
@SpringBootApplication
public class DebeziumJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(DebeziumJavaApplication.class)
.web(WebApplicationType.NONE)
.run(args);
}
@Bean
public MessageChannel debeziumInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer debeziumMessageProducer(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
MessageChannel debeziumInputChannel) {
DebeziumMessageProducer debeziumMessageProducer =
new DebeziumMessageProducer(debeziumEngineBuilder);
debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
return debeziumMessageProducer;
}
@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(Message<?> message) {
Object destination = message.getHeaders().get(DebeziumHeaders.DESTINATION); (1)
String key = new String((byte[]) message.getHeaders().get(DebeziumHeaders.KEY)); (2)
String payload = new String((byte[]) message.getPayload()); (3)
System.out.println("KEY: " + key + ", DESTINATION: " + destination + ", PAYLOAD: " + payload);
}
}
1 | 该事件预期的逻辑目标的名称。目标通常由 topic.prefix 配置选项、数据库名称和表名称组成。例如:my-topic.inventory.orders 。 |
2 | 包含已更改表的键架构和已更改行的实际键。在连接器创建事件时,键架构及其相应的键负载都包含已更改表(或唯一约束)中每一列的字段。 |
3 | 与键类似,负载有一个架构部分和一个负载值部分。架构部分包含描述负载值部分的消息封送结构的架构,包括其嵌套字段。创建、更新或删除数据的操作的变更事件都有一个具有消息封送结构的值负载。 |
|
类似地,我们可以配置 DebeziumMessageProducer
以批量处理传入的更改事件:
@Bean
public MessageProducer debeziumMessageProducer(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
MessageChannel debeziumInputChannel) {
DebeziumMessageProducer debeziumMessageProducer = new DebeziumMessageProducer(debeziumEngineBuilder);
debeziumMessageProducer.setEnableBatch(true);
debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
return debeziumMessageProducer;
}
@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(List<ChangeEvent<Object, Object>> payload) {
System.out.println(payload);
}
Debezium Java DSL Support
spring-integration-debezium
通过 Debezium
工厂和 DebeziumMessageProducerSpec
实现提供方便的 Java DSL 流畅 API。
适用于 Debezium Java DSL 的入站通道适配器是:
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder = ...
IntegrationFlow.from(
Debezium.inboundChannelAdapter(debeziumEngineBuilder)
.headerNames("special*")
.contentType("application/json")
.enableBatch(false))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
或者通过本机 debezium 配置属性创建 DebeziumMessageProducerSpec
实例,并默认为 JSON
序列化格式。
Properties debeziumConfig = ...
IntegrationFlow
.from(Debezium.inboundChannelAdapter(debeziumConfig))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
下面的 Spring Boot 应用程序提供使用 Java DSL 配置入站适配器的示例:
@SpringBootApplication
public class DebeziumJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(DebeziumJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow debeziumInbound(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder) {
return IntegrationFlow
.from(Debezium
.inboundChannelAdapter(debeziumEngineBuilder)
.headerNames("special*")
.contentType("application/json")
.enableBatch(false))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
.get();
}
}