Spring Cloud 简明教程

Spring Cloud - Streams with Apache Kafka

Introduction

在分布式环境中,服务需要互相通信。通信可以同步或异步发生。在本节中,我们将研究服务如何使用 message brokers 异步通信。

执行异步通信的两个主要好处:

  1. Producer and Consumer speed can differ - 如果数据的消费者速度较慢或较快,它不会影响生产者处理,反之亦然。两者都可以以自己独立的速度工作,而不会相互影响。

  2. Producer does not need to handle requests from various consumers - 可能有多个消费者想要从生产者读取相同的数据集。有了中间的消息代理,生产者就不必处理这些消费者产生的负载。此外,生产者级别的任何中断都不会阻止消费者读取较旧的生产者数据,因为这些数据在消息代理中可用。

Apache KafkaRabbitMQ 是用于实现异步通信的两个著名的消息中间件。本教程中,我们将使用 Apache Kafka。

Kafka – Dependency Setting

让我们使用之前一直使用过的“Restaurant”示例。因此,让我们假设将客户服务和餐厅服务通过异步通信进行通信。为此,我们将使用 Apache Kafka。并且我们需要在两个服务中使用它,即客户服务和餐厅服务。

为了使用 Apache Kafka,我们将更新两个服务的 POM,并添加以下依赖项。

<dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

我们还需要运行 Kafka 实例。可以通过多种方式实现,但我们更愿意使用 Docker 容器启动 Kafka。以下是我们可能考虑使用的几个映像 −

无论使用哪个映像,这里需要注意的重要一点是,一旦映像启动并运行,请确保能够在 localhost:9092 访问 Kafka 集群。

既然我们在映像中运行了 Kafka 集群,让我们进入核心示例。

Binding & Binders

在 Spring Cloud 流中,有三个重要的概念 −

  1. External Messaging System − 这是由外部管理的组件,负责存储应用程序产生的事件/消息,这些事件/消息可由其订阅者/使用者读取。请注意,这不在应用程序/Spring 中管理。一些示例包括 Apache Kafka、RabbitMQ

  2. Binders − 这是提供与消息传递系统集成的组件,例如,包括消息传递系统的 IP 地址、身份验证等。

  3. Bindings − 组件使用 Binder 向消息传递系统发送消息或从特定主题/队列使用该消息。

所有上述属性都在 application properties file 中定义。

Example

让我们使用之前一直使用过的“Restaurant”示例。因此,让我们假设每当向客户服务添加新服务时,我们都希望将客户信息通知到附近的餐厅。

为此,首先让我们更新我们的客户服务,以包含和使用 Kafka。请注意,我们将使用客户服务作为数据的生产者。也就是说,当我们通过 API 添加客户时,它也会添加到 Kafka 中。

spring:
   application:
      name: customer-service
   cloud:
      stream:
         source: customerBinding-out-0
         kafka:
            binder:
            brokers: localhost:9092
            replicationFactor: 1
      bindings:
         customerBinding-out-0:
            destination: customer
            producer:
               partitionCount: 3
server:
   port: ${app_port}
eureka:
   client:
      serviceURL:
         defaultZone: http://localhost:8900/eureka

Points to note -

  1. 我们已经使用本地 Kafka 实例的地址定义了一个绑定。

  2. 我们还定义了绑定“customerBinding-out-0”,它使用“customer”主题来输出消息。

  3. 我们还已经在 stream.source 中提到了我们的绑定,以便我们可以在代码中强制使用它。

完成后,让我们通过添加一个新的“addCustomer”方法来更新我们的控制器,该方法负责提供 POST 请求。然后,从 post 请求,我们将数据发送到 Kafka Broker。

package com.tutorialspoint;
import java.util.HashMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
class RestaurantCustomerInstancesController {
   @Autowired
   private StreamBridge streamBridge;
   static HashMap<Long, Customer> mockCustomerData = new HashMap();
   static{
      mockCustomerData.put(1L, new Customer(1, "Jane", "DC"));
      mockCustomerData.put(2L, new Customer(2, "John", "SFO"));
      mockCustomerData.put(3L, new Customer(3, "Kate", "NY"));
   }
   @RequestMapping("/customer/{id}")
   public Customer getCustomerInfo(@PathVariable("id") Long id) {
      System.out.println("Querying customer for id with: " + id);
      return mockCustomerData.get(id);
   }
   @RequestMapping(path = "/customer/{id}", method = RequestMethod.POST)
   public Customer addCustomer(@PathVariable("id") Long id) {
      // add default name
      Customer defaultCustomer = new Customer(id, "Dwayne", "NY");
      streamBridge.send("customerBinding-out-0", defaultCustomer);
      return defaultCustomer;
   }
}

Points to note

  1. 我们正在自动连接 StreamBridge,这将是我们用于发送消息的内容。

  2. 我们用于 send 方法的参数同时指定我们要用于将数据发送到的绑定。

现在,让我们更新我们的餐厅服务以包含和订阅“客户”主题。请注意,我们将使用餐厅服务作为数据的消费者。也就是说,每当我们通过 API 添加客户时,餐厅服务都将通过 Kafka 得知此情况。

首先,让我们更新 application properties 文件。

spring:
   application:
      name: restaurant-service
   cloud:
      function:
         definition: customerBinding
      stream:
         kafka:
            binder:
               brokers: localhost:9092
               replicationFactor: 1
            bindings:
               customerBinding-in-0:
               destination: customer
server:
   port: ${app_port}
eureka:
   client:
      serviceURL:
         defaultZone: http://localhost:8900/eureka

完成后,让我们通过添加一个新的 customerBinding 方法来更新我们的控制器,该方法负责获取请求并提供一个将打印请求及其元数据详细信息的函数。

package com.tutorialspoint;
import java.util.HashMap;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
class RestaurantController {
   @Autowired
   CustomerService customerService;
   @Autowired
   private StreamBridge streamBridge;
   static HashMap<Long, Restaurant> mockRestaurantData = new HashMap();
   static{
      mockRestaurantData.put(1L, new Restaurant(1, "Pandas", "DC"));
      mockRestaurantData.put(2L, new Restaurant(2, "Indies", "SFO"));
      mockRestaurantData.put(3L, new Restaurant(3, "Little Italy", "DC"));
      mockRestaurantData.put(4L, new Restaurant(4, "Pizeeria", "NY"));
   }
   @RequestMapping("/restaurant/customer/{id}")
   public List<Restaurant> getRestaurantForCustomer(@PathVariable("id") Long id) {
      System.out.println("Got request for customer with id: " + id);
      String customerCity = customerService.getCustomerById(id).getCity();
      return mockRestaurantData.entrySet().stream().filter(
entry -> entry.getValue().getCity().equals(customerCity))
.map(entry -> entry.getValue())
.collect(Collectors.toList());
   }
   @RequestMapping("/restaurant/cust/{id}")
   public void getRestaurantForCust(@PathVariable("id") Long id) {
      streamBridge.send("ordersBinding-out-0", id);
   }
   @Bean
   public Consumer<Message<Customer>> customerBinding() {
      return msg -> {
         System.out.println(msg);
      };
   }
}

Points to note -

  1. 我们使用 customerBinding,它应传递在为该绑定收到消息时被调用的函数。

  2. 我们在创建捆绑和指定主题时,也需要在 YAML 文件中使用我们为此函数/bean 使用的名称。

现在,让我们像往常一样执行上述代码,启动 Eureka 服务器。请注意,这不是硬性要求,只是为了完整性而存在的。

然后,让我们使用以下命令编译并开始更新客户服务 -

mvn clean install ; java -Dapp_port=8083 -jar .\target\spring-cloud-eurekaclient-
1.0.jar --spring.config.location=classpath:application-kafka.yml

然后,让我们使用以下命令编译并开始更新餐厅服务 -

mvn clean install; java -Dapp_port=8082 -jar .\target\spring-cloud-feign-client-
1.0.jar --spring.config.location=classpath:application-kafka.yml

我们已经设置好了,现在让我们通过点击 API 来测试我们的代码块 -

curl -X POST http://localhost:8083/customer/1

这是我们将为该 API 获得的输出 -

{
   "id": 1,
   "name": "Dwayne",
   "city": "NY"
}

现在,让我们检查餐厅服务的日志 -

GenericMessage [payload=Customer [id=1, name=Dwayne, city=NY],
headers={kafka_offset=1,...

因此,实际上,您看到使用 Kafka Broker,餐厅服务已收到有关新添加的客户的通知。

Partitions & Consumer Groups

分区和消费者组是您在使用 Spring Cloud Stream 时应该了解的两个重要概念。

Partitions - 它们用于对数据进行分区,以便我们可以在多个使用者之间划分工作。

让我们看看如何使用 Spring Cloud 对数据进行分区。比如说,我们想根据客户 ID 对数据进行分区。因此,让我们更新我们的客户服务以使其相同。为此,我们需要说明

让我们更新我们的客户服务应用程序属性,以指定我们数据的键。

spring:
   application:
      name: customer-service
   cloud:
      function:
         definition: ordersBinding
      stream:
         source: customerBinding-out-0
         kafka:
            binder:
               brokers: localhost:9092
               replicationFactor: 1
         bindings:
            customerBinding-out-0:
               destination: customer
               producer:
                  partitionKeyExpression: 'getPayload().getId()'
                  partitionCount: 3
server:
   port: ${app_port}
eureka:
   client:
      serviceURL:
         defaultZone: http://localhost:8900/eureka

对于指定键,即“partitionKeyExpression”,我们提供 Spring Expression Language。该表达式将类型假定为 GenericMessage<Customer>,因为我们正在消息中发送 Customer 数据。请注意,GenericMessage 是 Spring Framework 类,用于在一个对象中包装有效负载和头信息。因此,我们从该消息中获取有效负载,该有效负载的类型为 Customer,然后我们在客户上调用 getId() 方法。

现在,让我们还更新我们的消费者(即餐厅服务)以在使用请求时记录更多信息。

现在,我们按惯例执行上面的代码,启动 Eureka 服务端。注意,这不是一项硬性要求,在此出现是为了完整性。

然后,让我们使用以下命令编译并开始更新客户服务 -

mvn clean install ; java -Dapp_port=8083 -jar .\target\spring-cloud-eurekaclient-
1.0.jar --spring.config.location=classpath:application-kafka.yml

然后,让我们使用以下命令编译并开始更新餐厅服务 -

mvn clean install; java -Dapp_port=8082 -jar .\target\spring-cloud-feign-client-
1.0.jar --spring.config.location=classpath:application-kafka.yml

我们已做好设置,现在开始测试我们的代码部分。下面是我们在测试中将要执行的操作:

  1. 使用 Id 1 插入一名顾客:curl -X POST [role="bare"] [role="bare"]http://localhost:8083/customer/1

  2. 使用 Id 1 插入一名顾客:curl -X POST [role="bare"] [role="bare"]http://localhost:8083/customer/1

  3. 使用 Id 1 插入一名顾客:curl -X POST [role="bare"] [role="bare"]http://localhost:8083/customer/5

  4. 使用 Id 1 插入一名顾客:curl -X POST [role="bare"] [role="bare"]http://localhost:8083/customer/3

  5. 使用 Id 1 插入一名顾客:curl -X POST [role="bare"] [role="bare"]http://localhost:8083/customer/1

我们不太关心 API 的输出。相反,我们更关心将数据发送到的分区。由于我们使用顾客 ID 作为关键信息,我们希望具有相同 ID 的顾客将最终进入相同的分区。

现在,让我们检查餐厅服务的日志 -

Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400
Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323
Partition Id: 1
Customer: Customer [id=1, name=Dwayne, city=NY]
Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400
Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323
Partition Id: 1
Customer: Customer [id=1, name=Dwayne, city=NY]
Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400
Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323
Partition Id: 2
Customer: Customer [id=5, name=Dwayne, city=NY]
Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400
Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323
Partition Id: 0
Customer: Customer [id=3, name=Dwayne, city=NY]
Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323
Partition Id: 1
Customer: Customer [id=1, name=Dwayne, city=NY]

因此,正如我们所见,具有 Id 1 的顾客每次都最终进入相同的分区,即第 1 分区。

Consumer Group − 消费群组是为相同目的读取相同话题的消费者的逻辑分组。话题中的数据在消费群组中的消费者之间进行分区,因此,特定消费群组中只有一个消费者可以读取话题的一个分区。

要定义一个消费群组,我们只需要在使用 Kafka 话题名称的绑定中定义一个群组。例如,让我们在用于控制器的应用程序文件中定义消费群组名称。

spring:
   application:
      name: restaurant-service
   cloud:
      function:
         definition: customerBinding
      stream:
         kafka:
            binder:
               brokers: localhost:9092
               replicationFactor: 1
            bindings:
               customerBinding-in-0:
               destination: customer
               group: restController
server:
   port: ${app_port}
eureka:
   client:
      serviceURL:
         defaultZone: http://localhost:8900/eureka

让我们重新编译并启动 Restaurant 服务。现在,让我们通过对 Customer 服务上的 POST API 执行操作来生成事件:

使用 Id 1 插入一名顾客:curl -X POST [role="bare"] [role="bare"]http://localhost:8083/customer/1

现在,如果我们检查 Restaurant 服务的日志,我们会看到以下内容:

Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400
Consumer Group: restContoller
Partition Id: 1
Customer: Customer [id=1, name=Dwayne, city=NY]

因此,从输出中可以看到,我们创建了一个名为“rest-contoller”的消费群组,其消费者负责读取这些话题。在上述情况下,我们仅仅运行了该服务的单个实例,因此“customer”话题的所有分区都分配给了同一个实例。但是,如果我们有多个分区,那么分区将分布在工作进程之间。