Using Protocol Adapters

到目前为止显示的所有示例都说明了 DSL 如何通过使用 Spring Integration 编程模型支持消息传递架构。但是,我们尚未进行任何真正的集成。要做到这一点,需要通过 HTTP、JMS、AMQP、TCP、JDBC、FTP、SMTP 等访问远程资源或者访问本地文件系统。Spring Integration 支持所有这些功能,并且支持更多功能。理想情况下,DSL 应为所有这些功能提供一流的支持,但实现所有这些功能并持续关注添加到 Spring Integration 的新适配器是一项艰巨的任务。因此,期望是 DSL 不断追赶 Spring Integration。

All the examples shown so far illustrate how the DSL supports a messaging architecture by using the Spring Integration programming model. However, we have yet to do any real integration. Doing so requires access to remote resources over HTTP, JMS, AMQP, TCP, JDBC, FTP, SMTP, and so on or access to the local file system. Spring Integration supports all of these and more. Ideally, the DSL should offer first class support for all of them, but it is a daunting task to implement all of these and keep up as new adapters are added to Spring Integration. So the expectation is that the DSL is continually catching up with Spring Integration.

因此,我们提供了高级 API 来无缝定义特定于协议的消息传递。我们使用工厂和建造器模式以及 lambda 来实现此目的。你可以将工厂类视为"`命名空间工厂`”,因为它们与来自具体协议特定 Spring Integration 模块的组件的 XML 命名空间的作用相同。目前,Spring Integration Java DSL 支持 AmqpFeedJmsFiles(S)FtpHttpJPAMongoDbTCP/UDPMailWebFluxScripts 命名空间工厂。以下示例显示了如何使用其中三个(Amqp、Jms 和 Mail):

Consequently, we provide the high-level API to seamlessly define protocol-specific messaging. We do so with the factory and builder patterns and with lambdas. You can think of the factory classes as “Namespace Factories”, because they play the same role as the XML namespace for components from the concrete protocol-specific Spring Integration modules. Currently, Spring Integration Java DSL supports the Amqp, Feed, Jms, Files, (S)Ftp, Http, JPA, MongoDb, TCP/UDP, Mail, WebFlux, and Scripts namespace factories. The following example shows how to use three of them (Amqp, Jms, and Mail):

@Bean
public IntegrationFlow amqpFlow() {
    return IntegrationFlow.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue()))
            .transform("hello "::concat)
            .transform(String.class, String::toUpperCase)
            .get();
}

@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
    return IntegrationFlow.from("jmsOutboundGatewayChannel")
            .handle(Jms.outboundGateway(this.jmsConnectionFactory)
                        .replyContainer(c ->
                                    c.concurrentConsumers(3)
                                            .sessionTransacted(true))
                        .requestDestination("jmsPipelineTest"))
            .get();
}

@Bean
public IntegrationFlow sendMailFlow() {
    return IntegrationFlow.from("sendMailChannel")
            .handle(Mail.outboundAdapter("localhost")
                            .port(smtpPort)
                            .credentials("user", "pw")
                            .protocol("smtp")
                            .javaMailProperties(p -> p.put("mail.debug", "true")),
                    e -> e.id("sendMailEndpoint"))
            .get();
}

前面的示例展示了如何将“命名空间工厂”用作内联适配器声明。但是,你可以从 @Bean 定义中使用它们来让 IntegrationFlow 方法链更具有可读性。

The preceding example shows how to use the “namespace factories” as inline adapters declarations. However, you can use them from @Bean definitions to make the IntegrationFlow method chain more readable.

在花费精力处理其他事项之前,我们正在征求社区对这些命名空间工厂的意见。我们也感谢对我们接下来应该支持哪些适配器和网关进行优先级排序的任何输入。

We are soliciting community feedback on these namespace factories before we spend effort on others. We also appreciate any input into prioritization for which adapters and gateways we should support next.

你可以在此参考手册中特定于协议的章节中找到更多 Java DSL 示例。

You can find more Java DSL samples in the protocol-specific chapters throughout this reference manual.

所有其他协议通道适配器都可以配置为通用 bean 并连接到 IntegrationFlow,如下例所示:

All other protocol channel adapters may be configured as generic beans and wired to the IntegrationFlow, as the following examples show:

@Bean
public QueueChannelSpec wrongMessagesChannel() {
    return MessageChannels
            .queue()
            .wireTap("wrongMessagesWireTapChannel");
}

@Bean
public IntegrationFlow xpathFlow(MessageChannel wrongMessagesChannel) {
    return IntegrationFlow.from("inputChannel")
            .filter(new StringValueTestXPathMessageSelector("namespace-uri(/*)", "my:namespace"),
                    e -> e.discardChannel(wrongMessagesChannel))
            .log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
            .route(xpathRouter(wrongMessagesChannel))
            .get();
}

@Bean
public AbstractMappingMessageRouter xpathRouter(MessageChannel wrongMessagesChannel) {
    XPathRouter router = new XPathRouter("local-name(/*)");
    router.setEvaluateAsString(true);
    router.setResolutionRequired(false);
    router.setDefaultOutputChannel(wrongMessagesChannel);
    router.setChannelMapping("Tags", "splittingChannel");
    router.setChannelMapping("Tag", "receivedChannel");
    return router;
}