Debezium Support
Debezium Engine,变更数据捕获 (CDC) 入站通道适配器。DebeziumMessageProducer
允许捕获数据库变更事件,将它们转换为消息,然后流传输到出站通道。
Debezium Engine, Change Data Capture (CDC) inbound channel adapter.
The DebeziumMessageProducer
allows capturing database change events, converting them into messages and streaming later to the outbound channels.
您需要将 Spring Integration Debezium 依赖项包含到您的项目:
You need to include the spring integration Debezium dependency to your project:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-debezium</artifactId>
<version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-debezium:{project-version}"
你还需要为输入数据库包含 debezium connector 依赖项。例如,要将 Debezium 与 PostgreSQL 配合使用,你需要 postgres debezium 连接器:
You also need to include a debezium connector dependency for your input Database. For example to use Debezium with PostgreSQL you will need the postgres debezium connector:
-
Maven
-
Gradle
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${debezium-version}</version>
</dependency>
compile "io.debezium:debezium-connector-postgres:{debezium-version}"
用与正在使用的 Replace the |
Inbound Debezium Channel Adapter
Debezium 适配器需要预先配置好的 DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>>
实例。
The Debezium adapter expects a pre-configured DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>>
instance.
debezium-supplier 提供了一个开箱即用的 The debezium-supplier provides an out of the box |
Debezium Java DSL 可以从提供的 The Debezium Java DSL can create a |
此外,DebeziumMessageProducer
可以使用以下配置属性进行调整:
Additionally, the DebeziumMessageProducer
can be tuned with the following configuration properties:
-
contentType
- allows handling forJSON
(default),AVRO
andPROTOBUF
message contents. The contentTypemust
be be aligned with theSerializationFormat
configured for the providedDebeziumEngine.Builder
. -
enableBatch
- when set tofalse
(default), the debezium adapter would send newMessage
for everyChangeEvent
data change event received from the source database. If set totrue
then the adapter sends downstream a singleMessage
for each batch ofChangeEvent
received from the Debezium engine. Such a payload is not serializable and would require a custom serialization/deserialization implementation. -
enableEmptyPayload
- Enables support for tombstone (aka delete) messages. On a database row delete, Debezium can send a tombstone change event that has the same key as the deleted row and a value ofOptional.empty
. Defaults tofalse
. -
headerMapper
- customHeaderMapper
implementation that allows for selecting and converting theChangeEvent
headers intoMessage
headers. The defaultDefaultDebeziumHeaderMapper
implementation provides a setter forsetHeaderNamesToMap
. By default, all headers are mapped. -
taskExecutor
- Set a customTaskExecutor
for the Debezium engine.
下面的代码片段演示此通道适配器的各种配置:
The following code snippets demonstrate various configuration for this channel adapter:
Configuring with Java Configuration
以下 Spring Boot 应用展示了如何通过 Java 配置来配置入站适配器的一个示例:
The following Spring Boot application shows an example of how to configure the inbound adapter with Java configuration:
@SpringBootApplication
public class DebeziumJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(DebeziumJavaApplication.class)
.web(WebApplicationType.NONE)
.run(args);
}
@Bean
public MessageChannel debeziumInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer debeziumMessageProducer(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
MessageChannel debeziumInputChannel) {
DebeziumMessageProducer debeziumMessageProducer =
new DebeziumMessageProducer(debeziumEngineBuilder);
debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
return debeziumMessageProducer;
}
@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(Message<?> message) {
Object destination = message.getHeaders().get(DebeziumHeaders.DESTINATION); (1)
String key = new String((byte[]) message.getHeaders().get(DebeziumHeaders.KEY)); (2)
String payload = new String((byte[]) message.getPayload()); (3)
System.out.println("KEY: " + key + ", DESTINATION: " + destination + ", PAYLOAD: " + payload);
}
}
1 | A name of the logical destination for which the event is intended.
Usually the destination is composed of the topic.prefix configuration option, the database name and the table name. For example: my-topic.inventory.orders . |
2 | Contains the schema for the changed table’s key and the changed row’s actual key.
Both the key schema and its corresponding key payload contain a field for each column in the changed table’s PRIMARY KEY (or unique constraint) at the time the connector created the event. |
3 | Like the key, the payload has a schema section and a payload value section. The schema section contains the schema that describes the Envelope structure of the payload value section, including its nested fields. Change events for operations that create, update or delete data all have a value payload with an envelope structure. |
The |
类似地,我们可以配置 DebeziumMessageProducer
以批量处理传入的更改事件:
Similarly, we can configure the DebeziumMessageProducer
to process the incoming change events in batches:
@Bean
public MessageProducer debeziumMessageProducer(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
MessageChannel debeziumInputChannel) {
DebeziumMessageProducer debeziumMessageProducer = new DebeziumMessageProducer(debeziumEngineBuilder);
debeziumMessageProducer.setEnableBatch(true);
debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
return debeziumMessageProducer;
}
@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(List<ChangeEvent<Object, Object>> payload) {
System.out.println(payload);
}
Debezium Java DSL Support
spring-integration-debezium
通过 Debezium
工厂和 DebeziumMessageProducerSpec
实现提供方便的 Java DSL 流畅 API。
The spring-integration-debezium
provides a convenient Java DSL fluent API via the Debezium
factory and the DebeziumMessageProducerSpec
implementations.
适用于 Debezium Java DSL 的入站通道适配器是:
The Inbound Channel Adapter for Debezium Java DSL is:
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder = ...
IntegrationFlow.from(
Debezium.inboundChannelAdapter(debeziumEngineBuilder)
.headerNames("special*")
.contentType("application/json")
.enableBatch(false))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
或者通过本机 debezium 配置属性创建 DebeziumMessageProducerSpec
实例,并默认为 JSON
序列化格式。
Or create an DebeziumMessageProducerSpec
instance from native debezium configuration properties and default to JSON
serialization formats.
Properties debeziumConfig = ...
IntegrationFlow
.from(Debezium.inboundChannelAdapter(debeziumConfig))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
下面的 Spring Boot 应用程序提供使用 Java DSL 配置入站适配器的示例:
The following Spring Boot application provides an example of configuring the inbound adapter with the Java DSL:
@SpringBootApplication
public class DebeziumJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(DebeziumJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow debeziumInbound(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder) {
return IntegrationFlow
.from(Debezium
.inboundChannelAdapter(debeziumEngineBuilder)
.headerNames("special*")
.contentType("application/json")
.enableBatch(false))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
.get();
}
}