A Brief History of Spring’s Data Integration Journey

Spring 在数据集成上的探索始于 Spring Integration 。凭借其编程模型,它提供了一致的开发人员体验,用于构建应用程序,这些应用程序可以包含 Enterprise Integration Patterns 以连接到外部系统,例如数据库、消息代理和其他系统。

Spring’s journey on Data Integration started with Spring Integration. With its programming model, it provided a consistent developer experience to build applications that can embrace Enterprise Integration Patterns to connect with external systems such as, databases, message brokers, and among others.

快进到云时代,微服务已在企业环境中占有突出地位。 Spring Boot 改变了开发人员构建应用程序的方式。利用 Spring 的编程模型和由 Spring Boot 处理的运行时职责,开发独立的、生产级基于 Spring 的微服务变得很无缝。

Fast forward to the cloud-era, where microservices have become prominent in the enterprise setting. Spring Boot transformed the way how developers built Applications. With Spring’s programming model and the runtime responsibilities handled by Spring Boot, it became seamless to develop stand-alone, production-grade Spring-based microservices.

为了将此扩展到数据集成工作负载,将 Spring Integration 和 Spring Boot 汇总到一个新项目中。Spring Cloud Stream 应运而生。

To extend this to Data Integration workloads, Spring Integration and Spring Boot were put together into a new project. Spring Cloud Stream was born.

借助 Spring Cloud Stream,开发人员可以:

With Spring Cloud Stream, developers can:

  • Build, test and deploy data-centric applications in isolation.

  • Apply modern microservices architecture patterns, including composition through messaging.

  • Decouple application responsibilities with event-centric thinking. An event can represent something that has happened in time, to which the downstream consumer applications can react without knowing where it originated or the producer’s identity.

  • Port the business logic onto message brokers (such as RabbitMQ, Apache Kafka, Amazon Kinesis).

  • Rely on the framework’s automatic content-type support for common use-cases. Extending to different data conversion types is possible.

  • and many more. . .

Quick Start

按照此三步指南,您甚至可以在深入了解任何详细信息之前,在不到 5 分钟的时间内试用 Spring Cloud Stream。

You can try Spring Cloud Stream in less than 5 min even before you jump into any details by following this three-step guide.

我们向您展示如何创建一个 Spring Cloud Stream 应用程序,该应用程序接收来自您选择的信道中间件(稍后详细介绍)的消息并将接收到的消息记录到控制台。我们称其为“LoggingConsumer”。虽然不太实用,但它很好地介绍了部分主要概念和抽象,让您更容易理解本用户指南的其余部分。

We show you how to create a Spring Cloud Stream application that receives messages coming from the messaging middleware of your choice (more on this later) and logs received messages to the console. We call it LoggingConsumer. While not very practical, it provides a good introduction to some of the main concepts and abstractions, making it easier to digest the rest of this user guide.

三个步骤如下:

The three steps are as follows:

Creating a Sample Application by Using Spring Initializr

首先,访问 Spring Initializr。在那里,您可以生成我们的 LoggingConsumer 应用程序。执行下列操作:

To get started, visit the Spring Initializr. From there, you can generate our LoggingConsumer application. To do so:

  1. In the Dependencies section, start typing stream. When the “Cloud Stream” option should appears, select it.

  2. Start typing either 'kafka' or 'rabbit'.

  3. Select “Kafka” or “RabbitMQ”.[.iokays-translated-018c7be20cb20afc1d3e89cb356b9a91] 基本上,您可以选择要与之绑定的信道中间件。我们建议您使用已安装的或感觉更方便安装和运行的中间件。另外,正如您从 Initilaizer 屏幕上看到的那样,还有几个其他选项供您选择。例如,您可以选择 Gradle 作为构建工具来代替 Maven(默认设置)。

Basically, you choose the messaging middleware to which your application binds. We recommend using the one you have already installed or feel more comfortable with installing and running. Also, as you can see from the Initilaizer screen, there are a few other options you can choose. For example, you can choose Gradle as your build tool instead of Maven (the default). . In the Artifact field, type 'logging-consumer'.[.iokays-translated-20037ef146f4e360f7ffe158e11892e8] *工件*字段的值变为应用程序名称。如果您为中间件选择了 RabbitMQ,那么您的 Spring Initializr 现在应如下所示:

The value of the Artifact field becomes the application name. If you chose RabbitMQ for the middleware, your Spring Initializr should now be as follows:

spring initializr
  1. Click the Generate Project button.[.iokays-translated-4e7f1da6af852d28c0452517439faf17] 这样做会在您的硬盘上下载已压缩的生成项目版本。

Doing so downloads the zipped version of the generated project to your hard drive. . Unzip the file into the folder you want to use as your project directory.

我们鼓励您探索 Spring Initializr 中的许多可用选项。它允许您创建多种不同类型的 Spring 应用程序。

We encourage you to explore the many possibilities available in the Spring Initializr. It lets you create many different kinds of Spring applications.

Importing the Project into Your IDE

现在,您可以将项目导入到您的 IDE 中。请记住,根据 IDE,您可能需要按照特定的导入程序进行操作。例如,根据项目如何生成(Maven 或 Gradle),您可能需要按照特定的导入程序进行操作(例如,在 Eclipse 或 STS 中,您需要使用文件 → 导入 → Maven → 现有 Maven 项目)。

Now you can import the project into your IDE. Keep in mind that, depending on the IDE, you may need to follow a specific import procedure. For example, depending on how the project was generated (Maven or Gradle), you may need to follow specific import procedure (for example, in Eclipse or STS, you need to use File → Import → Maven → Existing Maven Project).

导入后,项目不得有任何类型错误。同样,src/main/java 应包含 com.example.loggingconsumer.LoggingConsumerApplication

Once imported, the project must have no errors of any kind. Also, src/main/java should contain com.example.loggingconsumer.LoggingConsumerApplication.

在技术层面上,现阶段您可以运行应用程序的主类。它已经是有效的 Spring Boot 应用程序。但是,它不执行任何操作,所以我们要添加一些代码。

Technically, at this point, you can run the application’s main class. It is already a valid Spring Boot application. However, it does not do anything, so we want to add some code.

Adding a Message Handler, Building, and Running

修改 com.example.loggingconsumer.LoggingConsumerApplication 类,使其如下所示:

Modify the com.example.loggingconsumer.LoggingConsumerApplication class to look as follows:

@SpringBootApplication
public class LoggingConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(LoggingConsumerApplication.class, args);
	}

	@Bean
	public Consumer<Person> log() {
	    return person -> {
	        System.out.println("Received: " + person);
	    };
	}

	public static class Person {
		private String name;
		public String getName() {
			return name;
		}
		public void setName(String name) {
			this.name = name;
		}
		public String toString() {
			return this.name;
		}
	}
}

如前一列表所示:

As you can see from the preceding listing:

  • We are using functional programming model (see [Spring Cloud Function support]) to define a single message handler as Consumer.

  • We are relying on framework conventions to bind such handler to the input destination binding exposed by the binder.

这样做还能让您看到该框架的核心功能之一:它尝试将传入的消息有效内容自动转换为 Person 类型。

Doing so also lets you see one of the core features of the framework: It tries to automatically convert incoming message payloads to type Person.

现在,您拥有一个功能齐全的 Spring Cloud Stream 应用程序,它监听消息。为了简单起见,我们假设您在 step one 中选择了 RabbitMQ。假设您已安装并运行了 RabbitMQ,您可以在 IDE 中运行其 main 方法来启动应用程序。

You now have a fully functional Spring Cloud Stream application that does listens for messages. From here, for simplicity, we assume you selected RabbitMQ in step one. Assuming you have RabbitMQ installed and running, you can start the application by running its main method in your IDE.

您应看到以下输出:

You should see following output:

	--- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg, bound to: input
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2a3a299:0/SimpleConnection@66c83fc8. . .
	. . .
	--- [ main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg
	. . .
	--- [ main] c.e.l.LoggingConsumerApplication         : Started LoggingConsumerApplication in 2.531 seconds (JVM running for 2.897)

转到 RabbitMQ 管理控制台或任何其他 RabbitMQ 客户端,将消息发送给 input.anonymous.CbMIwdkJSBO1ZoPDOtHtCganonymous.CbMIwdkJSBO1ZoPDOtHtCg 部分表示组名称并已生成,因此它必定与您环境中的名称不同。为了一些更具可预测性的内容,您可以通过设置 spring.cloud.stream.bindings.input.group=hello(或您喜欢的任何名称)来使用显式组名称。

Go to the RabbitMQ management console or any other RabbitMQ client and send a message to input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg. The anonymous.CbMIwdkJSBO1ZoPDOtHtCg part represents the group name and is generated, so it is bound to be different in your environment. For something more predictable, you can use an explicit group name by setting spring.cloud.stream.bindings.input.group=hello (or whatever name you like).

该消息的内容应是 Person 类的 JSON 表示形式,如下所示:

The contents of the message should be a JSON representation of the Person class, as follows:

{"name":"Sam Spade"}

然后在您的控制台中应该看到:

Then, in your console, you should see:

Received: Sam Spade

您还可以使用 ./mvnw clean install 将您的应用程序构建并打包到启动 JAR 中,并使用 java -jar 命令运行构建的 JAR。

You can also build and package your application into a boot jar (by using ./mvnw clean install) and run the built JAR by using the java -jar command.

现在您有一个正在运行的(尽管非常基本的)Spring Cloud 流应用程序。

Now you have a working (albeit very basic) Spring Cloud Stream application.

Spring Expression Language (SpEL) in the context of Streaming data

在整个参考手册中,您将遇到许多可以利用 Spring 表达式语言(SpEL)的功能和示例。在使用它时了解某些限制非常重要。

Throughout this reference manual you will encounter many features and examples where you can utilize Spring Expression Language (SpEL). It is important to understand certain limitations when it comes to using it.

SpEL 让您可以访问当前消息以及您正在运行的应用程序上下文。然而,了解 SpEL 可以看到什么类型的数据非常重要,尤其是在传入消息的上下文中。消息从代理到达时以 byte[] 的形式出现。然后,它被 Binder 转换为 Message<byte[]>,而您可以看到消息的有效负载保留其原始形式。消息的标头是 <String, Object>,其中值通常是另一种基元或基元的集合/数组,因此为 Object。这是因为 Binder 不知道所需输入类型,因为它无法访问用户代码(函数)。因此,Binder 实际上传递了一个信封,信封中有有效负载和某些可读元数据,形式为消息标头,就像通过邮件传递的信件一样。这意味着虽然可以访问消息的有效负载,但您只能以原始数据(即 byte[])的形式访问它。虽然开发人员经常要求能够通过 SpEL 访问具体类型(例如 Foo、Bar 等)的有效负载对象字段,但您可以看到实现起来有多么困难甚至不可能。这里有一个示例来演示此问题;假设您有一个路由表达式,用于根据有效负载类型路由到不同的函数。此要求意味着有效负载从 byte[] 转换为特定类型,然后应用 SpEL。然而,为了执行此类转换,我们需要知道传递给转换器的实际类型,而这是我们不知道的函数签名。解决此要求的一种更好的方法是将类型信息作为消息标头传递(例如,application/json;type=foo.bar.Baz)。您将获得一个清晰可读的字符串值,可以在一年中访问和评估,并且易于阅读的 SpEL 表达式。

SpEL gives you access to the current Message as well as the Application Context you are running in. However it is important to understand what type of data SpEL can see especially in the context of the incoming Message. From the broker, the message arrives in a form of a byte[]. It is then transformed to a Message<byte[]> by the binders whereas you can see the payload of the message maintains its raw form. The headers of the message are <String, Object>, where values are typically another primitive or a collection/array of primitives, hence Object. That is because binder does not know the required input type as it has no access to the user code (function). So effectively binder delivered an envelope with the payload and some readable meta-data in the form of message headers, just like the letter delivered by mail. This means that while accessing payload of the message is possible you will only have access to it as raw data (i.e., byte[]). And while it may be very common for developers to ask for ability to have SpEL access to fields of a payload object as concrete type (e.g., Foo, Bar etc), you can see how difficult or even impossible would it be to achieve. Here is one example to demonstrate the problem; Imagine you have a routing expression to route to different functions based on payload type. This requirement would imply payload conversion from byte[] to a specific type and then applying the SpEL. However, in order to perform such conversion we would need to know the actual type to pass to converter and that comes from function’s signature which we don’t know which one. A better approach to solve this requirement would be to pass the type information as message headers (e.g., application/json;type=foo.bar.Baz ). You’ll get a clear readable String value that could be accessed and evaluated in a year and easy to read SpEL expression.

此外,使用有效负载作为路由决策被认为是非常糟糕的做法,因为有效负载被视为特殊数据 - 只能由最终收件人读取的数据。同样,使用邮件传递类比,您不希望邮递员打开您的信封并阅读信的内容来做出一些传递决定。相同的概念也适用于这里,尤其是在生成消息时包含此类信息相对容易时。它强制执行与通过网络传输的数据的设计相关的特定级别的规范,以及哪些数据可以被视为公共数据,哪些数据是有特权的。

Additionally, it is considered very bad practice to use payload for routing decisions, since the payload is considered to be privileged data - data only to be read by its final recipient. Again, using the mail delivery analogy you would not want the mailman to open your envelope and read the contents of the letter to make some delivery decisions. The same concept applies here, especially when it is relatively easy to include such information when generating a Message. It enforces certain level of discipline related to the design of data to be transmitted over the network and which pieces of such data can be considered as public and which are privileged.