Apache Camel Support

Spring Integration 提供了 API 和配置,用于与在同一应用程序上下文中声明的 Apache Camel端点进行通信。 你需要将此依赖项包含在你的项目中:

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-camel</artifactId>
    <version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-camel:{project-version}"

Spring Integration 和 Apache Camel 实施企业集成模式,并提供组合它们的一种便捷方式,但这些项目对其 API 和抽象实现使用不同的方法。Spring Integration 完全依赖于 Spring 核心中的依赖项注入容器。它使用许多其他 Spring 项目(Spring Data、Spring AMQP、Spring for Apache Kafka 等)来实现其通道适配器。它还将 MessageChannel`抽象用作开发人员在组合其集成流时需要了解的一等公民。另一方面,Apache Camel 不提供消息通道的一等公民抽象,并建议通过内部交换(对 API 隐藏)来组合其路由。此外,在 Spring 应用程序中使用它还需要一些额外的 dependencies and configurations。 即使最终的企业集成解决方案的实现方式并不重要,也会考虑开发人员体验和高生产力。因此,开发人员可能会出于许多原因选择某个框架,或者如果某些目标系统的支持存在差距,则会同时选择两个框架。Spring Integration 和 Apache Camel 应用程序可以通过它们实现信道适配器的许多外部协议进行交互。例如,Spring Integration 流可能将记录发布到由消费者端的 Apache Camel 端点使用的 Apache Kafka 主题。或者,Apache Camel 路由可能将数据写入 SFTP 文件目录,该目录由 Spring Integration 的 SFTP 入站信道适配器轮询。或者,它们可以通过 `ApplicationEvent abstraction在同一 Spring 应用程序上下文中进行通信。 为了简化开发流程并避免不必要的网络跳跃,Apache Camel 提供了一个 module,用于通过消息通道与 Spring Integration 进行通信。只需引用应用程序上下文中 MessageChannel,即可发送或使用消息。当 Apache Camel 路由是消息流的启动者,而 Spring Integration 仅作为解决方案的一部分发挥辅助作用时,这种方式效果很好。 为了获得类似的开发人员体验,Spring 集成现在提供一个用于调用 Apache Camel 端点并选择性等待回复的通道适配器。没有入站通道适配器,因为对于 Apache Camel 消息,从 Spring 集成 API 和抽象角度来说,订阅 MessageChannel 已足够。

Outbound Channel Adapter for Apache Camel

CamelMessageHandlerAbstractReplyProducingMessageHandler 实现,可以在单向(默认)和请求-回复模式下工作。它使用 org.apache.camel.ProducerTemplate 来向 org.apache.camel.Endpoint 发送(或发送并接收)。可以通过 ExchangePattern 选项(可以针对请求消息在运行时通过 SpEL 表达式求值)来控制交互模式。目标 Apache Camel 端点可以显式配置或作为在运行时求值的 SpEL 表达式。否则,它会回退到 ProducerTemplate 上提供的 defaultEndpoint。除了指定端点,还可以提供内联显式 LambdaRouteBuilder,例如,用于调用 Spring 集成中没有通道适配器支持的 Apache Camel 组件。

此外,可以提供 HeaderMapper<org.apache.camel.Message>CamelHeaderMapper 是一个默认实现),以确定在 Spring 集成和 Apache Camel 消息之间映射哪些标头。默认情况下,所有标头都被映射。

CamelMessageHandler 支持 async 模式,调用 ProducerTemplate.asyncSend() 并生成一个 CompletableFuture 以进行回复处理(如果有的)。

exchangeProperties 可以通过一个必须求值为 Map 的 SpEL 表达式来定制。

如果没有提供 ProducerTemplate,它将通过从应用程序上下文中解析得出的 CamelContext bean 创建。

@Bean
@ServiceActivator(inputChannel = "sendToCamel")
CamelMessageHandler camelService(ProducerTemplate producerTemplate) {
    CamelHeaderMapper headerMapper = new CamelHeaderMapper();
    headerMapper.setOutboundHeaderNames("");
    headerMapper.setInboundHeaderNames("testHeader");

    CamelMessageHandler camelMessageHandler = new CamelMessageHandler(producerTemplate);
    camelMessageHandler.setEndpointUri("direct:simple");
    camelMessageHandler.setExchangePatternExpression(spelExpressionParser.parseExpression("headers.exchangePattern"));
    camelMessageHandler.setHeaderMapper(headerMapper);
    return camelMessageHandler;
}

对于 Java DSL 流定义,可以使用 Camel 工厂提供的几个变种来配置此通道适配器:

@Bean
IntegrationFlow camelFlow() {
    return f -> f
            .handle(Camel.gateway().endpointUri("direct:simple"))
            .handle(Camel.route(this::camelRoute))
            .handle(Camel.handler().endpointUri("log:com.mycompany.order?level=WARN"));
}

private void camelRoute(RouteBuilder routeBuilder) {
    routeBuilder.from("direct:inbound").transform(routeBuilder.simple("${body.toUpperCase()}"));
}