Apache Kafka Support

Overview

适用于 Apache Kafka 的 Spring 集成基于 Spring for Apache Kafka project

Spring Integration for Apache Kafka is based on the Spring for Apache Kafka project.

你需要将此依赖项包含在你的项目中:

You need to include this dependency into your project:

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-kafka</artifactId>
    <version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-kafka:{project-version}"

它提供以下组件:

It provides the following components:

Outbound Channel Adapter

出站通道适配器用于将消息从 Spring Integration 通道发布到 Apache Kafka 主题。通道在应用程序上下文中定义,然后连接到将消息发送到 Apache Kafka 的应用程序。发送者应用程序可以使用 Spring Integration 消息(由出站通道适配器在内部转换为 Kafka 记录)发布到 Apache Kafka,如下所示:

The Outbound channel adapter is used to publish messages from a Spring Integration channel to Apache Kafka topics. The channel is defined in the application context and then wired into the application that sends messages to Apache Kafka. Sender applications can publish to Apache Kafka by using Spring Integration messages, which are internally converted to Kafka records by the outbound channel adapter, as follows:

  • The payload of the Spring Integration message is used to populate the payload of the Kafka record.

  • By default, the kafka_messageKey header of the Spring Integration message is used to populate the key of the Kafka record.

你可以通过 kafka_topickafka_partitionId 头分别自定义用于发布消息的目标主题和分区。

You can customize the target topic and partition for publishing the message through the kafka_topic and kafka_partitionId headers, respectively.

此外,<int-kafka:outbound-channel-adapter> 提供了通过对出站消息应用 SpEL 表达式来提取密钥、目标主题和目标分区的功能。为此,它支持三对互斥的属性:

In addition, the <int-kafka:outbound-channel-adapter> provides the ability to extract the key, target topic, and target partition by applying SpEL expressions on the outbound message. To that end, it supports three mutually exclusive pairs of attributes:

  • topic and topic-expression

  • message-key and message-key-expression

  • partition-id and partition-id-expression

这些属性分别让你可以在适配器上将 topicmessage-keypartition-id 指定为静态值,或者在运行时根据请求消息动态计算其值。

These let you specify topic, message-key, and partition-id, respectively, as static values on the adapter or to dynamically evaluate their values at runtime against the request message.

KafkaHeaders 接口(由 spring-kafka 提供)包含用于与头信息交互的常量。messageKeytopic 默认头信息现在需要 kafka_ 前缀。在从使用旧头信息的早期版本迁移时,您需要在 <int-kafka:outbound-channel-adapter> 上指定 message-key-expression="headers['messageKey']"topic-expression="headers['topic']"。您还可以使用 <header-enricher>MessageBuilder 将上游头信息更改为来自 KafkaHeaders 的新头信息。如果您使用常量值,也可以使用 topicmessage-key 来在适配器上配置它们。

The KafkaHeaders interface (provided by spring-kafka) contains constants used for interacting with headers. The messageKey and topic default headers now require a kafka_ prefix. When migrating from an earlier version that used the old headers, you need to specify message-key-expression="headers['messageKey']" and topic-expression="headers['topic']" on the <int-kafka:outbound-channel-adapter>. Alternatively, you can change the headers upstream to the new headers from KafkaHeaders by using a <header-enricher> or a MessageBuilder. If you use constant values, you can also configure them on the adapter by using topic and message-key.

注意:如果适配器配置有主题或消息密钥(使用常量或表达式),则将使用这些密钥,并且将忽略相应头。如果你希望头覆盖配置,则需要在表达式中进行配置,例如以下表达式:

NOTE : If the adapter is configured with a topic or message key (either with a constant or expression), those are used and the corresponding header is ignored. If you wish the header to override the configuration, you need to configure it in an expression, such as the following:

topic-expression="headers['topic'] != null ? headers['topic'] : 'myTopic'"

适配器需要一个 KafkaTemplate,而 KafkaTemplate 又需要一个配置适当的 KafkaProducerFactory

The adapter requires a KafkaTemplate, which, in turn, requires a suitably configured KafkaProducerFactory.

如果提供了 send-failure-channel (sendFailureChannel),并且接收到了 send() 故障(同步或异步),则会向该通道发送一个 ErrorMessage。有效负载是一个带有 failedMessagerecord(即 ProducerRecord)和 cause 属性的 KafkaSendFailureException。你可以通过设置 error-message-strategy 属性来覆盖 DefaultErrorMessageStrategy

If a send-failure-channel (sendFailureChannel) is provided and a send() failure (sync or async) is received, an ErrorMessage is sent to the channel. The payload is a KafkaSendFailureException with failedMessage, record (the ProducerRecord) and cause properties. You can override the DefaultErrorMessageStrategy by setting the error-message-strategy property.

如果提供了 send-success-channel (sendSuccessChannel),则在发送成功后会发送一个有效负载类型为 org.apache.kafka.clients.producer.RecordMetadata 的消息。

If a send-success-channel (sendSuccessChannel) is provided, a message with a payload of type org.apache.kafka.clients.producer.RecordMetadata is sent after a successful send.

如果您的应用程序使用事务,并且同一个通道适配器用于在侦听器容器启动事务的情况下发布消息,以及在不存在现有事务的情况下发布消息,您必须在 KafkaTemplate 上配置一个 transactionIdPrefix 来覆盖容器或事务管理器使用的前缀。由容器发起的交易(生产者工厂或交易管理器属性)使用的前缀在所有应用程序实例上必须相同。对于仅生产者的事务,使用的前缀必须在所有应用程序实例上都是唯一的。

If your application uses transactions and the same channel adapter is used to publish messages where the transaction is started by a listener container, as well as publishing where there is no existing transaction, you must configure a transactionIdPrefix on the KafkaTemplate to override the prefix used by the container or transaction manager. The prefix used by container-initiated transactions (the producer factory or transaction manager property) must be the same on all application instances. The prefix used for producer-only transactions must be unique on all application instances.

你可以配置一个 flushExpression,它必须解析为一个布尔值。如果你使用的是 linger.msbatch.size Kafka 生产者属性,那么在发送几条消息后刷新可能有用;表达式应该在最后一条消息上计算为 Boolean.TRUE,并且一个不完整的批处理将立即发送。默认情况下,表达式在 KafkaIntegrationHeaders.FLUSH 头(kafka_flush)中查找一个 Boolean 值。如果该值为 true 则会刷新,如果该值是 false 或头不存在则不会刷新。

You can configure a flushExpression which must resolve to a boolean value. Flushing after sending several messages might be useful if you are using the linger.ms and batch.size Kafka producer properties; the expression should evaluate to Boolean.TRUE on the last message and an incomplete batch will be sent immediately. By default, the expression looks for a Boolean value in the KafkaIntegrationHeaders.FLUSH header (kafka_flush). The flush will occur if the value is true and not if it’s false or the header is absent.

KafkaProducerMessageHandler.sendTimeoutExpression 默认值已从 10 秒更改为 delivery.timeout.ms Kafka 生产者属性 + 5000,以便将超时后的实际 Kafka 错误传播到应用程序,而不是由该框架生成超时错误。这对于保持一致性而言已被更改,因为你可能会得到意外的行为(Spring 可能会对发送进行超时,而它实际上最终会成功)。重要提示:该超时在默认情况下为 120 秒,因此你可能希望缩短该超时以更及时地获得故障。

The KafkaProducerMessageHandler.sendTimeoutExpression default has changed from 10 seconds to the delivery.timeout.ms Kafka producer property + 5000 so that the actual Kafka error after a timeout is propagated to the application, instead of a timeout generated by this framework. This has been changed for consistency because you may get unexpected behavior (Spring may timeout the send, while it is actually, eventually, successful). IMPORTANT: That timeout is 120 seconds by default so you may wish to reduce it to get more timely failures.

Configuration

以下示例展示了如何为 Apache Kafka 配置出站通道适配器:

The following example shows how to configure the outbound channel adapter for Apache Kafka:

  • Java DSL

  • Java

  • XML

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
}

@Bean
public IntegrationFlow sendToKafkaFlow() {
    return f -> f
            .splitWith(s -> s.<String>function(p -> Stream.generate(() -> p).limit(101).iterator()))
            .publishSubscribeChannel(c -> c
                    .subscribe(sf -> sf.handle(
                            kafkaMessageHandler(producerFactory(), TEST_TOPIC1)
                                    .timestampExpression("T(Long).valueOf('1487694048633')"),
                            e -> e.id("kafkaProducer1")))
                    .subscribe(sf -> sf.handle(
                            kafkaMessageHandler(producerFactory(), TEST_TOPIC2)
                                   .timestamp(m -> 1487694048644L),
                            e -> e.id("kafkaProducer2")))
            );
}

@Bean
public DefaultKafkaHeaderMapper mapper() {
    return new DefaultKafkaHeaderMapper();
}

private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler(
        ProducerFactory<Integer, String> producerFactory, String topic) {
    return Kafka
            .outboundChannelAdapter(producerFactory)
            .messageKey(m -> m
                    .getHeaders()
                    .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
            .headerMapper(mapper())
            .partitionId(m -> 10)
            .topicExpression("headers[kafka_topic] ?: '" + topic + "'")
            .configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
}
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler handler() throws Exception {
    KafkaProducerMessageHandler<String, String> handler =
            new KafkaProducerMessageHandler<>(kafkaTemplate());
    handler.setTopicExpression(new LiteralExpression("someTopic"));
    handler.setMessageKeyExpression(new LiteralExpression("someKey"));
    handler.setSuccessChannel(successes());
    handler.setFailureChannel(failures());
    return handler;
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
    // set more properties
    return new DefaultKafkaProducerFactory<>(props);
}
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-template="template"
                                    auto-startup="false"
                                    channel="inputToKafka"
                                    topic="foo"
                                    sync="false"
                                    message-key-expression="'bar'"
                                    send-failure-channel="failures"
                                    send-success-channel="successes"
                                    error-message-strategy="ems"
                                    partition-id-expression="2">
</int-kafka:outbound-channel-adapter>

<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                    ... <!-- more producer properties -->
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
</bean>

Message-driven Channel Adapter

KafkaMessageDrivenChannelAdapter (<int-kafka:message-driven-channel-adapter>) 使用一个 spring-kafka KafkaMessageListenerContainerConcurrentListenerContainer

The KafkaMessageDrivenChannelAdapter (<int-kafka:message-driven-channel-adapter>) uses a spring-kafka KafkaMessageListenerContainer or ConcurrentListenerContainer.

此外,还提供 mode 属性。它可以接受 recordbatch 值(默认值:record)。对于 record 模式,每个消息有效负载都从单个 ConsumerRecord 转换而来。对于 batch 模式,有效负载是一个对象列表,这些对象是从使用者轮询返回的所有 ConsumerRecord 实例转换而来的。与批处理 @KafkaListener 一样,KafkaHeaders.RECEIVED_KEYKafkaHeaders.RECEIVED_PARTITIONKafkaHeaders.RECEIVED_TOPICKafkaHeaders.OFFSET 头也是列表,其位置对应于有效负载中的位置。

Also, the mode attribute is available. It can accept values of record or batch (default: record). For record mode, each message payload is converted from a single ConsumerRecord. For batch mode, the payload is a list of objects that are converted from all the ConsumerRecord instances returned by the consumer poll. As with the batched @KafkaListener, the KafkaHeaders.RECEIVED_KEY, KafkaHeaders.RECEIVED_PARTITION, KafkaHeaders.RECEIVED_TOPIC, and KafkaHeaders.OFFSET headers are also lists, with positions corresponding to the position in the payload.

接收到的消息已填充了某些标头。有关详细信息,请参见 KafkaHeaders class

Received messages have certain headers populated. See the KafkaHeaders class for more information.

kafka_consumer 标头中的 Consumer 对象不是线程安全的。您必须仅在适配器内调用侦听器的线程上调用它的方法。如果您将消息传递给另一个线程,您不得调用它的方法。

The Consumer object (in the kafka_consumer header) is not thread-safe. You must invoke its methods only on the thread that calls the listener within the adapter. If you hand off the message to another thread, you must not call its methods.

在提供了 retry-template 的情况下,会根据其重试策略重试交付故障。如果还提供了 error-channel,则在重试次数用尽后将使用一个默认的 ErrorMessageSendingRecoverer 作为恢复回调。你还可以使用 recovery-callback 来指定在该情况下采取的其他操作,或者将其设置为空值以将最终异常抛给监听器容器,以便在其中处理该异常。

When a retry-template is provided, delivery failures are retried according to its retry policy. If an error-channel is also supplied, a default ErrorMessageSendingRecoverer will be used as the recovery callback after retries are exhausted. You can also use the recovery-callback to specify some other action to take in that case, or set it to null to throw the final exception to the listener container so it is handled there.

构建 ErrorMessage(用于 error-channelrecovery-callback)时,您可以通过设置 error-message-strategy 属性自定义错误消息。默认情况下,使用 RawRecordHeaderErrorMessageStrategy,从而允许访问转换后的消息以及原始 ConsumerRecord

When building an ErrorMessage (for use in the error-channel or recovery-callback), you can customize the error message by setting the error-message-strategy property. By default, a RawRecordHeaderErrorMessageStrategy is used, to provide access to the converted message as well as the raw ConsumerRecord.

这种形式的重试是阻塞的,并且如果所有轮询记录的重试延迟总和超过 consumer max.poll.interval.ms 属性,则可能导致重新平衡。相反,请考虑向侦听器容器添加一个 DefaultErrorHandler,配置一个 KafkaErrorSendingMessageRecoverer

This form of retry is blocking and could cause a rebalance if the aggregate retry delays across all polled records might exceed the max.poll.interval.ms consumer property. Instead, consider adding a DefaultErrorHandler to the listener container, configured with a KafkaErrorSendingMessageRecoverer.

Configuration

以下示例展示如何配置一个消息驱动的通道适配器:

The following example shows how to configure a message-driven channel adapter:

  • Java DSL

  • Java

  • XML

@Bean
public IntegrationFlow topic1ListenerFromKafkaFlow() {
    return IntegrationFlow
            .from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
                    KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC1)
                    .configureListenerContainer(c ->
                            c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)
                                    .id("topic1ListenerContainer"))
                    .recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(),
                            new RawRecordHeaderErrorMessageStrategy()))
                    .retryTemplate(new RetryTemplate())
                    .filterInRetry(true))
            .filter(Message.class, m ->
                            m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
                    f -> f.throwExceptionOnRejection(true))
            .<String, String>transform(String::toUpperCase)
            .channel(c -> c.queue("listeningFromKafkaResults1"))
            .get();
}
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
            adapter(KafkaMessageListenerContainer<String, String> container) {
    KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
            new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
    kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
    return kafkaMessageDrivenChannelAdapter;
}

@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
    ContainerProperties properties = new ContainerProperties(this.topic);
    // set more properties
    return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
    // set more properties
    return new DefaultKafkaConsumerFactory<>(props);
}
<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        mode="record"
        retry-template="template"
        recovery-callback="callback"
        error-message-strategy="ems"
        channel="someChannel"
        error-channel="errorChannel" />

<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                <entry key="bootstrap.servers" value="localhost:9092" />
                ...
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.kafka.listener.config.ContainerProperties">
            <constructor-arg name="topics" value="foo" />
        </bean>
    </constructor-arg>

</bean>

您还可以使用容器工厂(用于 @KafkaListener 注解)来为其他目的创建 ConcurrentMessageListenerContainer 实例。有关示例,请参见 the Spring for Apache Kafka documentation

You can also use the container factory that is used for @KafkaListener annotations to create ConcurrentMessageListenerContainer instances for other purposes. See the Spring for Apache Kafka documentation for an example.

使用 Java DSL,不需要将容器配置为 @Bean,因为 DSL 将容器注册为 Bean。以下示例展示如何执行此操作:

With the Java DSL, the container does not have to be configured as a @Bean, because the DSL registers the container as a bean. The following example shows how to do so:

@Bean
public IntegrationFlow topic2ListenerFromKafkaFlow() {
    return IntegrationFlow
            .from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2),
            KafkaMessageDrivenChannelAdapter.ListenerMode.record)
                .id("topic2Adapter"))
            ...
            get();
}

请注意,在这种情况下,为适配器指定了一个 idtopic2Adapter)。容器在应用程序上下文中以名称 topic2Adapter.container 注册。如果适配器没有 id 属性,则容器的 Bean 名称是容器的完全限定类名加上 #n,其中 n 为每个容器递增。

Notice that, in this case, the adapter is given an id (topic2Adapter). The container is registered in the application context with a name of topic2Adapter.container. If the adapter does not have an id property, the container’s bean name is the container’s fully qualified class name plus #n, where n is incremented for each container.

Inbound Channel Adapter

KafkaMessageSource 提供了一个可轮询的通道适配器实现。

The KafkaMessageSource provides a pollable channel adapter implementation.

Configuration

  • Java DSL

  • Kotlin

  • Java

  • XML

@Bean
public IntegrationFlow flow(ConsumerFactory<String, String> cf)  {
    return IntegrationFlow.from(Kafka.inboundChannelAdapter(cf, new ConsumerProperties("myTopic")),
                          e -> e.poller(Pollers.fixedDelay(5000)))
            .handle(System.out::println)
            .get();
}
@Bean
fun sourceFlow(cf: ConsumerFactory<String, String>) =
    integrationFlow(Kafka.inboundChannelAdapter(cf,
        ConsumerProperties(TEST_TOPIC3).also {
            it.groupId = "kotlinMessageSourceGroup"
        }),
        { poller(Pollers.fixedDelay(100)) }) {
        handle { m ->

        }
    }
@InboundChannelAdapter(channel = "fromKafka", poller = @Poller(fixedDelay = "5000"))
@Bean
public KafkaMessageSource<String, String> source(ConsumerFactory<String, String> cf)  {
    ConsumerProperties consumerProperties = new ConsumerProperties("myTopic");
	consumerProperties.setGroupId("myGroupId");
	consumerProperties.setClientId("myClientId");
    retunr new KafkaMessageSource<>(cf, consumerProperties);
}
<int-kafka:inbound-channel-adapter
        id="adapter1"
        consumer-factory="consumerFactory"
        consumer-properties="consumerProperties1"
        ack-factory="ackFactory"
        channel="inbound"
        message-converter="converter"
        payload-type="java.lang.String"
        raw-header="true"
        auto-startup="false">
    <int:poller fixed-delay="5000"/>
</int-kafka:inbound-channel-adapter>

<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    <constructor-arg>
        <map>
            <entry key="max.poll.records" value="1"/>
        </map>
    </constructor-arg>
</bean>

<bean id="consumerProperties1" class="org.springframework.kafka.listener.ConsumerProperties">
    <constructor-arg name="topics" value="topic1"/>
    <property name="groupId" value="group"/>
    <property name="clientId" value="client"/>
</bean>

请参阅 javadoc 以了解可用的属性。

Refer to the javadocs for available properties.

默认情况下,max.poll.records 必须在消费者工厂中显式设置,或者如果消费者工厂是 DefaultKafkaConsumerFactory,则将强制其为 1。您可以将属性 allowMultiFetch 设置为 true 以覆盖此行为。

By default, max.poll.records must be either explicitly set in the consumer factory, or it will be forced to 1 if the consumer factory is a DefaultKafkaConsumerFactory. You can set the property allowMultiFetch to true to override this behavior.

您必须在 max.poll.interval.ms 内轮询消费者以避免重新平衡。如果您将 allowMultiFetch 设置为 true,您必须在 max.poll.interval.ms 内处理所有检索到的记录,并重新轮询。

You must poll the consumer within max.poll.interval.ms to avoid a rebalance. If you set allowMultiFetch to true you must process all the retrieved records, and poll again, within max.poll.interval.ms.

该适配器发出的消息包含一个标头 kafka_remainingRecords,其中包含上次轮询中剩余的记录数。

Messages emitted by this adapter contain a header kafka_remainingRecords with a count of records remaining from the previous poll.

从版本 6.2 开始,KafkaMessageSource 支持客户机属性中提供的 ErrorHandlingDeserializerDeserializationException 从记录标头中提取并抛给被调用者。使用 SourcePollingChannelAdapter,此异常将被包装到 ErrorMessage 中并发布到其 errorChannel。有关详细信息,请参见 ErrorHandlingDeserializer 文档。

Starting with version 6.2, the KafkaMessageSource supports an ErrorHandlingDeserializer provided in the consumer properties. A DeserializationException is extracted from record headers and thrown to the called. With a SourcePollingChannelAdapter this exception is wrapped into an ErrorMessage and published to its errorChannel. See ErrorHandlingDeserializer documentation for more information.

Outbound Gateway

出站网关用于请求/回复操作。它不同于大多数 Spring Integration 网关,原因在于发送线程不会封锁在网关中,而回复将在回复侦听器容器线程上进行处理。如果你的代码在同步 Messaging Gateway 代码之后调用网关,则用户线程会一直就此封锁,直至收到回复(或超时)。

The outbound gateway is for request/reply operations. It differs from most Spring Integration gateways in that the sending thread does not block in the gateway, and the reply is processed on the reply listener container thread. If your code invokes the gateway behind a synchronous Messaging Gateway, the user thread blocks there until the reply is received (or a timeout occurs).

在应答容器被分配主题和分区之前,网关不接受请求。建议您向模板的应答容器属性添加一个 ConsumerRebalanceListener,并在向网关发送消息之前等待 onPartitionsAssigned 调用。

The gateway does not accept requests until the reply container has been assigned its topics and partitions. It is suggested that you add a ConsumerRebalanceListener to the template’s reply container properties and wait for the onPartitionsAssigned call before sending messages to the gateway.

KafkaProducerMessageHandler sendTimeoutExpression 默认值为 delivery.timeout.ms Kafka 生产者属性 + 5000,以便在超时后将实际的 Kafka 错误传播到应用程序,而不是由该框架生成超时。由于您可能会遇到意外行为(Spring 可能会让 send() 超时,而它实际上最终是成功的),因此进行了这种更改以保持一致性。重要提示:该超时时间默认是 120 秒,因此您可能希望将其减少,以便更及时地获取失败通知。

The KafkaProducerMessageHandler sendTimeoutExpression default is delivery.timeout.ms Kafka producer property + 5000 so that the actual Kafka error after a timeout is propagated to the application, instead of a timeout generated by this framework. This has been changed for consistency because you may get unexpected behavior (Spring may time out the send(), while it is actually, eventually, successful). IMPORTANT: That timeout is 120 seconds by default, so you may wish to reduce it to get more timely failures.

Configuration

以下示例展示如何配置一个网关:

The following example shows how to configure a gateway:

  • Java DSL

  • Java

  • XML

@Bean
public IntegrationFlow outboundGateFlow(
        ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {

    return IntegrationFlow.from("kafkaRequests")
            .handle(Kafka.outboundGateway(kafkaTemplate))
            .channel("kafkaReplies")
            .get();
}
@Bean
@ServiceActivator(inputChannel = "kafkaRequests", outputChannel = "kafkaReplies")
public KafkaProducerMessageHandler<String, String> outGateway(
        ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
    return new KafkaProducerMessageHandler<>(kafkaTemplate);
}
<int-kafka:outbound-gateway
    id="allProps"
    error-message-strategy="ems"
    kafka-template="template"
    message-key-expression="'key'"
    order="23"
    partition-id-expression="2"
    reply-channel="replies"
    reply-timeout="43"
    request-channel="requests"
    requires-reply="false"
    send-success-channel="successes"
    send-failure-channel="failures"
    send-timeout-expression="44"
    sync="true"
    timestamp-expression="T(System).currentTimeMillis()"
    topic-expression="'topic'"/>

请参阅 javadoc 以了解可用的属性。

Refer to the javadocs for available properties.

请注意,与 outbound channel adapter 相同的类正在被使用,唯一的区别是传递到构造函数中的 KafkaTemplate 是一个 ReplyingKafkaTemplate。有关详细信息,请参见 the Spring for Apache Kafka documentation

Notice that the same class as the outbound channel adapter is used, the only difference being that the KafkaTemplate passed into the constructor is a ReplyingKafkaTemplate. See the Spring for Apache Kafka documentation for more information.

出站主题、分区、密钥等是由与出站适配器相同的方式确定的。答复主题的确定如下:

The outbound topic, partition, key, and so on are determined in the same way as the outbound adapter. The reply topic is determined as follows:

  1. A message header named KafkaHeaders.REPLY_TOPIC (if present, it must have a String or byte[] value) is validated against the template’s reply container’s subscribed topics.

  2. If the template’s replyContainer is subscribed to only one topic, it is used.

您还可以指定一个 KafkaHeaders.REPLY_PARTITION 标头来确定用于答复的特定分区。同样,这将根据模板的答复容器的订阅进行验证。

You can also specify a KafkaHeaders.REPLY_PARTITION header to determine a specific partition to be used for replies. Again, this is validated against the template’s reply container’s subscriptions.

或者,您还可以使用类似以下 Bean 的配置:

Alternatively, you can also use a configuration similar to the following bean:

@Bean
public IntegrationFlow outboundGateFlow() {
    return IntegrationFlow.from("kafkaRequests")
            .handle(Kafka.outboundGateway(producerFactory(), replyContainer())
                .configureKafkaTemplate(t -> t.replyTimeout(30_000)))
            .channel("kafkaReplies")
            .get();
}

Inbound Gateway

入站网关用于请求/响应操作。

The inbound gateway is for request/reply operations.

Configuration

以下示例展示如何配置一个入站网关:

The following example shows how to configure an inbound gateway:

  • Java DSL

  • Java

  • XML

@Bean
public IntegrationFlow serverGateway(
        ConcurrentMessageListenerContainer<Integer, String> container,
        KafkaTemplate<Integer, String> replyTemplate) {
    return IntegrationFlow
            .from(Kafka.inboundGateway(container, replyTemplate)
                .replyTimeout(30_000))
            .<String, String>transform(String::toUpperCase)
            .get();
}
@Bean
public KafkaInboundGateway<Integer, String, String> inboundGateway(
        AbstractMessageListenerContainer<Integer, String>container,
        KafkaTemplate<Integer, String> replyTemplate) {

    KafkaInboundGateway<Integer, String, String> gateway =
        new KafkaInboundGateway<>(container, replyTemplate);
    gateway.setRequestChannel(requests);
    gateway.setReplyChannel(replies);
    gateway.setReplyTimeout(30_000);
    return gateway;
}
<int-kafka:inbound-gateway
        id="gateway1"
        listener-container="container1"
        kafka-template="template"
        auto-startup="false"
        phase="100"
        request-timeout="5000"
        request-channel="nullChannel"
        reply-channel="errorChannel"
        reply-timeout="43"
        message-converter="messageConverter"
        payload-type="java.lang.String"
        error-message-strategy="ems"
        retry-template="retryTemplate"
        recovery-callback="recoveryCallback"/>

请参阅 javadoc 以了解可用的属性。

Refer to the javadocs for available properties.

当提供 RetryTemplate 时,会根据其重试策略重试传递失败。如果还提供了 error-channel,则在重试用尽后,ErrorMessageSendingRecoverer 将被用作恢复回调。你还可以使用 recovery-callback 在那种情况下指定要执行的其他操作,或将其设置为 null,以向侦听器容器抛出最终的异常,以便在那里进行处理。

When a RetryTemplate is provided, delivery failures are retried according to its retry policy. If an error-channel is also supplied, a default ErrorMessageSendingRecoverer will be used as the recovery callback after retries are exhausted. You can also use the recovery-callback to specify some other action to take in that case, or set it to null to throw the final exception to the listener container so it is handled there.

构建 ErrorMessage(用于 error-channelrecovery-callback)时,您可以通过设置 error-message-strategy 属性自定义错误消息。默认情况下,使用 RawRecordHeaderErrorMessageStrategy,从而允许访问转换后的消息以及原始 ConsumerRecord

When building an ErrorMessage (for use in the error-channel or recovery-callback), you can customize the error message by setting the error-message-strategy property. By default, a RawRecordHeaderErrorMessageStrategy is used, to provide access to the converted message as well as the raw ConsumerRecord.

这种形式的重试是阻塞的,并且如果所有轮询记录的重试延迟总和超过 consumer max.poll.interval.ms 属性,则可能导致重新平衡。相反,请考虑向侦听器容器添加一个 DefaultErrorHandler,配置一个 KafkaErrorSendingMessageRecoverer

This form of retry is blocking and could cause a rebalance if the aggregate retry delays across all polled records might exceed the max.poll.interval.ms consumer property. Instead, consider adding a DefaultErrorHandler to the listener container, configured with a KafkaErrorSendingMessageRecoverer.

下面的示例展示了如何使用 Java DSL 配置一个简单的转换器,将消息转换为大写:

The following example shows how to configure a simple upper case converter with the Java DSL:

或者,你可以使用类似于以下的代码来配置一个转换器,将消息转换为大写:

Alternatively, you could configure an upper-case converter by using code similar to the following:

@Bean
public IntegrationFlow serverGateway() {
    return IntegrationFlow
            .from(Kafka.inboundGateway(consumerFactory(), containerProperties(),
                    producerFactory())
                .replyTimeout(30_000))
            .<String, String>transform(String::toUpperCase)
            .get();
}

您还可以使用容器工厂(用于 @KafkaListener 注解)来为其他目的创建 ConcurrentMessageListenerContainer 实例。有关示例,请参见 the Spring for Apache Kafka documentationMessage-driven Channel Adapter

You can also use the container factory that is used for @KafkaListener annotations to create ConcurrentMessageListenerContainer instances for other purposes. See the Spring for Apache Kafka documentation and Message-driven Channel Adapter for examples.

Channels Backed by Apache Kafka Topics

Spring Integration 具有受 Apache Kafka 主题支持的 MessageChannel 实现,以保证持久性。

Spring Integration has MessageChannel implementations backed by an Apache Kafka topic for persistence.

每个通道都需要一个 KafkaTemplate 用于发送方,以及一个侦听器容器工厂(对于可订阅的通道)或一个 KafkaMessageSource 用于可轮询的通道。

Each channel requires a KafkaTemplate for the sending side and either a listener container factory (for subscribable channels) or a KafkaMessageSource for a pollable channel.

Java DSL Configuration

  • Java DSL

  • Java

  • XML

@Bean
public IntegrationFlow flowWithSubscribable(KafkaTemplate<Integer, String> template,
        ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {

    return IntegrationFlow.from(...)
            ...
            .channel(Kafka.channel(template, containerFactory, "someTopic1").groupId("group1"))
            ...
            .get();
}

@Bean
public IntegrationFlow flowWithPubSub(KafkaTemplate<Integer, String> template,
        ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {

    return IntegrationFlow.from(...)
            ...
            .publishSubscribeChannel(pubSub(template, containerFactory),
                pubsub -> pubsub
                            .subscribe(subflow -> ...)
                            .subscribe(subflow -> ...))
            .get();
}

@Bean
public BroadcastCapableChannel pubSub(KafkaTemplate<Integer, String> template,
        ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {

    return Kafka.publishSubscribeChannel(template, containerFactory, "someTopic2")
            .groupId("group2")
            .get();
}

@Bean
public IntegrationFlow flowWithPollable(KafkaTemplate<Integer, String> template,
        KafkaMessageSource<Integer, String> source) {

    return IntegrationFlow.from(...)
            ...
            .channel(Kafka.pollableChannel(template, source, "someTopic3").groupId("group3"))
            .handle(...,  e -> e.poller(...))
            ...
            .get();
}
/**
 * Channel for a single subscriber.
 **/
@Bean
SubscribableKafkaChannel pointToPoint(KafkaTemplate<String, String> template,
    KafkaListenerContainerFactory<String, String> factory)

    SubscribableKafkaChannel channel =
        new SubscribableKafkaChannel(template, factory, "topicA");
    channel.setGroupId("group1");
    return channel;
}

/**
 * Channel for multiple subscribers.
 **/
@Bean
SubscribableKafkaChannel pubsub(KafkaTemplate<String, String> template,
    KafkaListenerContainerFactory<String, String> factory)

    SubscribableKafkaChannel channel =
        new SubscribableKafkaChannel(template, factory, "topicB", true);
    channel.setGroupId("group2");
    return channel;
}

/**
 * Pollable channel (topic is configured on the source)
 **/
@Bean
PollableKafkaChannel pollable(KafkaTemplate<String, String> template,
    KafkaMessageSource<String, String> source)

    PollableKafkaChannel channel =
        new PollableKafkaChannel(template, source);
    channel.setGroupId("group3");
    return channel;
}
<int-kafka:channel kafka-template="template" id="ptp" topic="ptpTopic" group-id="ptpGroup"
    container-factory="containerFactory" />

<int-kafka:pollable-channel kafka-template="template" id="pollable" message-source="source"
    group-id = "pollableGroup"/>

<int-kafka:publish-subscribe-channel kafka-template="template" id="pubSub" topic="pubSubTopic"
    group-id="pubSubGroup" container-factory="containerFactory" />

Message Conversion

StringJsonMessageConverter 提供。有关详细信息,请参见 the Spring for Apache Kafka documentation

A StringJsonMessageConverter is provided. See the Spring for Apache Kafka documentation for more information.

将此转换器与消息驱动通道适配器一起使用时,您可以指定希望将传入有效负载转换为何类型。这是通过在适配器上设置 payload-type 属性(payloadType 属性)来实现的。以下示例演示如何在 XML 配置中执行此操作:

When using this converter with a message-driven channel adapter, you can specify the type to which you want the incoming payload to be converted. This is achieved by setting the payload-type attribute (payloadType property) on the adapter. The following example shows how to do so in XML configuration:

<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        channel="nullChannel"
        message-converter="messageConverter"
        payload-type="com.example.Thing"
        error-channel="errorChannel" />

<bean id="messageConverter"
    class="org.springframework.kafka.support.converter.MessagingMessageConverter"/>

以下示例演示如何在 Java 配置中为适配器设置 payload-type 属性(payloadType 属性):

The following example shows how to set the payload-type attribute (payloadType property) on the adapter in Java configuration:

@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
            adapter(KafkaMessageListenerContainer<String, String> container) {
    KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
            new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
    kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
    kafkaMessageDrivenChannelAdapter.setMessageConverter(converter());
    kafkaMessageDrivenChannelAdapter.setPayloadType(Thing.class);
    return kafkaMessageDrivenChannelAdapter;
}

Null Payloads and Log Compaction 'Tombstone' Records

Spring 消息传递 Message<?> 对象不能有 null 有效载荷。当您将端点用于 Apache Kafka 时,null 有效载荷(也称为墓碑记录)由类型为 KafkaNull 的有效载荷表示。有关详细信息,请参见 the Spring for Apache Kafka documentation

Spring Messaging Message<?> objects cannot have null payloads. When you use the endpoints for Apache Kafka, null payloads (also known as tombstone records) are represented by a payload of type KafkaNull. See the Spring for Apache Kafka documentation for more information.

Spring Integration 端点的 POJO 方法可以使用 true null 值,而不是 KafkaNull。为此,请使用 @Payload(required = false) 标记参数。以下示例演示如何执行此操作:

The POJO methods for Spring Integration endpoints can use a true null value instead of KafkaNull. To do so, mark the parameter with @Payload(required = false). The following example shows how to do so:

@ServiceActivator(inputChannel = "fromSomeKafkaInboundEndpoint")
public void in(@Header(KafkaHeaders.RECEIVED_KEY) String key,
               @Payload(required = false) Customer customer) {
    // customer is null if a tombstone record
    ...
}

Calling a Spring Integration flow from a KStream

您可以使用 MessagingTransformer 调用来自 KStream 的集成流:

You can use a MessagingTransformer to invoke an integration flow from a KStream:

@Bean
public KStream<byte[], byte[]> kStream(StreamsBuilder kStreamBuilder,
        MessagingTransformer<byte[], byte[], byte[]> transformer)  transformer) {
    KStream<byte[], byte[]> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    stream.mapValues((ValueMapper<byte[], byte[]>) String::toUpperCase)
            ...
            .transform(() -> transformer)
            .to(streamingTopic2);

    stream.print(Printed.toSysOut());

    return stream;
}

@Bean
@DependsOn("flow")
public MessagingTransformer<byte[], byte[], String> transformer(
        MessagingFunction function) {

    MessagingMessageConverter converter = new MessagingMessageConverter();
    converter.setHeaderMapper(new SimpleKafkaHeaderMapper("*"));
    return new MessagingTransformer<>(function, converter);
}

@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from(MessagingFunction.class)
        ...
        .get();
}

当集成流从接口开始时,所创建的代理具有流 Bean 的名称,后跟 “.gateway”,因此如果需要,可以使用此 Bean 名称作为 @Qualifier

When an integration flow starts with an interface, the proxy that is created has the name of the flow bean, appended with ".gateway" so this bean name can be used a a @Qualifier if needed.

Performance Considerations for read/process/write Scenarios

许多应用程序从主题中消耗,执行一些处理并写入另一个主题。在大多数情况下,如果 write 失败,应用程序将抛出一个异常,以便重试传入请求和/或将其发送到死信主题。此功能由基础消息侦听器容器和配置合理的错误处理程序支持。但是,为了支持这一点,我们需要在写操作成功(或失败)之前阻塞侦听器线程,以便可以将任何异常抛出到容器中。在使用单个记录时,通过在出站适配器上设置 sync 属性来实现此操作。但是,在使用批处理时,使用 sync 会导致严重的性能下降,因为应用程序将在发送下一条消息之前等待每次发送的结果。您还可以执行多次发送,然后等待这些发送的结果。这是通过向消息处理程序中添加 futuresChannel 来实现的。要启用此功能,请将 KafkaIntegrationHeaders.FUTURE_TOKEN 添加到出站消息;然后,这可用于将 Future 与特定已发送消息相关联。以下是如何使用此功能的一个示例:

Many applications consume from a topic, perform some processing and write to another topic. In most, cases, if the write fails, the application would want to throw an exception so the incoming request can be retried and/or sent to a dead letter topic. This functionality is supported by the underlying message listener container, together with a suitably configured error handler. However, in order to support this, we need to block the listener thread until the success (or failure) of the write operation so that any exceptions can be thrown to the container. When consuming single records, this is achieved by setting the sync property on the outbound adapter. However, when consuming batches, using sync causes a significant performance degradation because the application would wait for the result of each send before sending the next message. You also can perform multiple sends and then wait for the results of those sends afterwards. This is achieved by adding a futuresChannel to the message handler. To enable the feature add KafkaIntegrationHeaders.FUTURE_TOKEN to the outbound messages; this can then be used to correlate a Future to a particular sent message. Here is an example of how you might use this feature:

@SpringBootApplication
public class FuturesChannelApplication {

    public static void main(String[] args) {
        SpringApplication.run(FuturesChannelApplication.class, args);
    }

    @Bean
    IntegrationFlow inbound(ConsumerFactory<String, String> consumerFactory, Handler handler) {
        return IntegrationFlow.from(Kafka.messageDrivenChannelAdapter(consumerFactory,
                    ListenerMode.batch, "inTopic"))
                .handle(handler)
                .get();
    }

    @Bean
    IntegrationFlow outbound(KafkaTemplate<String, String> kafkaTemplate) {
        return IntegrationFlow.from(Gate.class)
                .enrichHeaders(h -> h
                        .header(KafkaHeaders.TOPIC, "outTopic")
                        .headerExpression(KafkaIntegrationHeaders.FUTURE_TOKEN, "headers[id]"))
                .handle(Kafka.outboundChannelAdapter(kafkaTemplate)
                        .futuresChannel("futures"))
                .get();
    }

    @Bean
    PollableChannel futures() {
        return new QueueChannel();
    }

}

@Component
@DependsOn("outbound")
class Handler {

    @Autowired
    Gate gate;

    @Autowired
    PollableChannel futures;

    public void handle(List<String> input) throws Exception {
        System.out.println(input);
        input.forEach(str -> this.gate.send(str.toUpperCase()));
        for (int i = 0; i < input.size(); i++) {
            Message<?> future = this.futures.receive(10000);
            ((Future<?>) future.getPayload()).get(10, TimeUnit.SECONDS);
        }
    }

}

interface Gate {

    void send(String out);

}