A Brief History of Spring’s Data Integration Journey

Spring 在数据集成上的探索始于 Spring Integration 。凭借其编程模型,它提供了一致的开发人员体验,用于构建应用程序,这些应用程序可以包含 Enterprise Integration Patterns 以连接到外部系统,例如数据库、消息代理和其他系统。 快进到云时代,微服务已在企业环境中占有突出地位。 Spring Boot 改变了开发人员构建应用程序的方式。利用 Spring 的编程模型和由 Spring Boot 处理的运行时职责,开发独立的、生产级基于 Spring 的微服务变得很无缝。 为了将此扩展到数据集成工作负载,将 Spring Integration 和 Spring Boot 汇总到一个新项目中。Spring Cloud Stream 应运而生。 借助 Spring Cloud Stream,开发人员可以:

  • 以独立的方式构建、测试和部署以数据为中心应用程序。

  • 应用现代微服务架构模式,包括通过消息传送实现的合成。

  • 用以事件为中心的思维方式分离应用程序责任。事件可以表示时间内发生的一个事件,下游用户应用程序无需知道事件的原始位置或生产者的身份即可对其作出反应。

  • 将业务逻辑移植到消息代理上(例如 RabbitMQ、Apache Kafka、Amazon Kinesis)。

  • 依赖于框架对常见用例的自动内容类型支持。可以扩展到不同的数据转换类型。

  • 等等. . .

Quick Start

按照此三步指南,您甚至可以在深入了解任何详细信息之前,在不到 5 分钟的时间内试用 Spring Cloud Stream。

我们向您展示如何创建一个 Spring Cloud Stream 应用程序,该应用程序接收来自您选择的信道中间件(稍后详细介绍)的消息并将接收到的消息记录到控制台。我们称其为“LoggingConsumer”。虽然不太实用,但它很好地介绍了部分主要概念和抽象,让您更容易理解本用户指南的其余部分。

三个步骤如下:

Creating a Sample Application by Using Spring Initializr

首先,访问 Spring Initializr。在那里,您可以生成我们的 LoggingConsumer 应用程序。执行下列操作:

  1. Dependencies 部分中,开始键入 stream。当 “Cloud Stream” 选项出现时,选择该选项。

  2. 开始键入“kafka”或“rabbit”。

  3. Select “Kafka” or “RabbitMQ”.基本上,您可以选择要与之绑定的信道中间件。我们建议您使用已安装的或感觉更方便安装和运行的中间件。另外,正如您从 Initilaizer 屏幕上看到的那样,还有几个其他选项供您选择。例如,您可以选择 Gradle 作为构建工具来代替 Maven(默认设置)。

  4. Artifact 字段中,键入“logging-consumer”。*工件*字段的值变为应用程序名称。如果您为中间件选择了 RabbitMQ,那么您的 Spring Initializr 现在应如下所示:

spring initializr
  1. 单击 Generate Project 按钮。这样做会在您的硬盘上下载已压缩的生成项目版本。

  2. 将文件解压到你想用作项目目录的文件夹中。

我们鼓励您探索 Spring Initializr 中的许多可用选项。它允许您创建多种不同类型的 Spring 应用程序。

Importing the Project into Your IDE

现在,您可以将项目导入到您的 IDE 中。请记住,根据 IDE,您可能需要按照特定的导入程序进行操作。例如,根据项目如何生成(Maven 或 Gradle),您可能需要按照特定的导入程序进行操作(例如,在 Eclipse 或 STS 中,您需要使用文件 → 导入 → Maven → 现有 Maven 项目)。

导入后,项目不得有任何类型错误。同样,src/main/java 应包含 com.example.loggingconsumer.LoggingConsumerApplication

在技术层面上,现阶段您可以运行应用程序的主类。它已经是有效的 Spring Boot 应用程序。但是,它不执行任何操作,所以我们要添加一些代码。

Adding a Message Handler, Building, and Running

修改 com.example.loggingconsumer.LoggingConsumerApplication 类,使其如下所示:

@SpringBootApplication
public class LoggingConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(LoggingConsumerApplication.class, args);
	}

	@Bean
	public Consumer<Person> log() {
	    return person -> {
	        System.out.println("Received: " + person);
	    };
	}

	public static class Person {
		private String name;
		public String getName() {
			return name;
		}
		public void setName(String name) {
			this.name = name;
		}
		public String toString() {
			return this.name;
		}
	}
}

如前一列表所示:

  • 我们使用函数编程模型(见 [Spring Cloud Function support])来定义单个消息处理器,为 Consumer

  • 我们依赖框架约定来将这样的处理器绑定到由粘合剂暴露的输入目的地绑定上。

这样做还能让您看到该框架的核心功能之一:它尝试将传入的消息有效内容自动转换为 Person 类型。

现在,您拥有一个功能齐全的 Spring Cloud Stream 应用程序,它监听消息。为了简单起见,我们假设您在 step one 中选择了 RabbitMQ。假设您已安装并运行了 RabbitMQ,您可以在 IDE 中运行其 main 方法来启动应用程序。

您应看到以下输出:

	--- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg, bound to: input
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2a3a299:0/SimpleConnection@66c83fc8. . .
	. . .
	--- [ main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg
	. . .
	--- [ main] c.e.l.LoggingConsumerApplication         : Started LoggingConsumerApplication in 2.531 seconds (JVM running for 2.897)

转到 RabbitMQ 管理控制台或任何其他 RabbitMQ 客户端,将消息发送给 input.anonymous.CbMIwdkJSBO1ZoPDOtHtCganonymous.CbMIwdkJSBO1ZoPDOtHtCg 部分表示组名称并已生成,因此它必定与您环境中的名称不同。为了一些更具可预测性的内容,您可以通过设置 spring.cloud.stream.bindings.input.group=hello(或您喜欢的任何名称)来使用显式组名称。

该消息的内容应是 Person 类的 JSON 表示形式,如下所示:

{"name":"Sam Spade"}

然后在您的控制台中应该看到:

Received: Sam Spade

您还可以使用 ./mvnw clean install 将您的应用程序构建并打包到启动 JAR 中,并使用 java -jar 命令运行构建的 JAR。

现在您有一个正在运行的(尽管非常基本的)Spring Cloud 流应用程序。

Spring Expression Language (SpEL) in the context of Streaming data

在整个参考手册中,您将遇到许多可以利用 Spring 表达式语言(SpEL)的功能和示例。在使用它时了解某些限制非常重要。

SpEL 让您可以访问当前消息以及您正在运行的应用程序上下文。然而,了解 SpEL 可以看到什么类型的数据非常重要,尤其是在传入消息的上下文中。消息从代理到达时以 byte[] 的形式出现。然后,它被 Binder 转换为 Message<byte[]>,而您可以看到消息的有效负载保留其原始形式。消息的标头是 <String, Object>,其中值通常是另一种基元或基元的集合/数组,因此为 Object。这是因为 Binder 不知道所需输入类型,因为它无法访问用户代码(函数)。因此,Binder 实际上传递了一个信封,信封中有有效负载和某些可读元数据,形式为消息标头,就像通过邮件传递的信件一样。这意味着虽然可以访问消息的有效负载,但您只能以原始数据(即 byte[])的形式访问它。虽然开发人员经常要求能够通过 SpEL 访问具体类型(例如 Foo、Bar 等)的有效负载对象字段,但您可以看到实现起来有多么困难甚至不可能。这里有一个示例来演示此问题;假设您有一个路由表达式,用于根据有效负载类型路由到不同的函数。此要求意味着有效负载从 byte[] 转换为特定类型,然后应用 SpEL。然而,为了执行此类转换,我们需要知道传递给转换器的实际类型,而这是我们不知道的函数签名。解决此要求的一种更好的方法是将类型信息作为消息标头传递(例如,application/json;type=foo.bar.Baz)。您将获得一个清晰可读的字符串值,可以在一年中访问和评估,并且易于阅读的 SpEL 表达式。

此外,使用有效负载作为路由决策被认为是非常糟糕的做法,因为有效负载被视为特殊数据 - 只能由最终收件人读取的数据。同样,使用邮件传递类比,您不希望邮递员打开您的信封并阅读信的内容来做出一些传递决定。相同的概念也适用于这里,尤其是在生成消息时包含此类信息相对容易时。它强制执行与通过网络传输的数据的设计相关的特定级别的规范,以及哪些数据可以被视为公共数据,哪些数据是有特权的。