Using Protocol Adapters

到目前为止显示的所有示例都说明了 DSL 如何通过使用 Spring Integration 编程模型支持消息传递架构。但是,我们尚未进行任何真正的集成。要做到这一点,需要通过 HTTP、JMS、AMQP、TCP、JDBC、FTP、SMTP 等访问远程资源或者访问本地文件系统。Spring Integration 支持所有这些功能,并且支持更多功能。理想情况下,DSL 应为所有这些功能提供一流的支持,但实现所有这些功能并持续关注添加到 Spring Integration 的新适配器是一项艰巨的任务。因此,期望是 DSL 不断追赶 Spring Integration。

因此,我们提供了高级 API 来无缝定义特定于协议的消息传递。我们使用工厂和建造器模式以及 lambda 来实现此目的。你可以将工厂类视为"`命名空间工厂`”,因为它们与来自具体协议特定 Spring Integration 模块的组件的 XML 命名空间的作用相同。目前,Spring Integration Java DSL 支持 AmqpFeedJmsFiles(S)FtpHttpJPAMongoDbTCP/UDPMailWebFluxScripts 命名空间工厂。以下示例显示了如何使用其中三个(Amqp、Jms 和 Mail):

@Bean
public IntegrationFlow amqpFlow() {
    return IntegrationFlow.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue()))
            .transform("hello "::concat)
            .transform(String.class, String::toUpperCase)
            .get();
}

@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
    return IntegrationFlow.from("jmsOutboundGatewayChannel")
            .handle(Jms.outboundGateway(this.jmsConnectionFactory)
                        .replyContainer(c ->
                                    c.concurrentConsumers(3)
                                            .sessionTransacted(true))
                        .requestDestination("jmsPipelineTest"))
            .get();
}

@Bean
public IntegrationFlow sendMailFlow() {
    return IntegrationFlow.from("sendMailChannel")
            .handle(Mail.outboundAdapter("localhost")
                            .port(smtpPort)
                            .credentials("user", "pw")
                            .protocol("smtp")
                            .javaMailProperties(p -> p.put("mail.debug", "true")),
                    e -> e.id("sendMailEndpoint"))
            .get();
}

前面的示例展示了如何将“命名空间工厂”用作内联适配器声明。但是,你可以从 @Bean 定义中使用它们来让 IntegrationFlow 方法链更具有可读性。

在花费精力处理其他事项之前,我们正在征求社区对这些命名空间工厂的意见。我们也感谢对我们接下来应该支持哪些适配器和网关进行优先级排序的任何输入。

你可以在此参考手册中特定于协议的章节中找到更多 Java DSL 示例。

所有其他协议通道适配器都可以配置为通用 bean 并连接到 IntegrationFlow,如下例所示:

@Bean
public QueueChannelSpec wrongMessagesChannel() {
    return MessageChannels
            .queue()
            .wireTap("wrongMessagesWireTapChannel");
}

@Bean
public IntegrationFlow xpathFlow(MessageChannel wrongMessagesChannel) {
    return IntegrationFlow.from("inputChannel")
            .filter(new StringValueTestXPathMessageSelector("namespace-uri(/*)", "my:namespace"),
                    e -> e.discardChannel(wrongMessagesChannel))
            .log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
            .route(xpathRouter(wrongMessagesChannel))
            .get();
}

@Bean
public AbstractMappingMessageRouter xpathRouter(MessageChannel wrongMessagesChannel) {
    XPathRouter router = new XPathRouter("local-name(/*)");
    router.setEvaluateAsString(true);
    router.setResolutionRequired(false);
    router.setDefaultOutputChannel(wrongMessagesChannel);
    router.setChannelMapping("Tags", "splittingChannel");
    router.setChannelMapping("Tag", "receivedChannel");
    return router;
}