Spring Cloud 简明教程
Spring Cloud - Streams with Apache Kafka
Introduction
在分布式环境中,服务需要互相通信。通信可以同步或异步发生。在本节中,我们将研究服务如何使用 message brokers 异步通信。
In a distributed environment, services need to communicate with each other. The communication can either happen synchronously or asynchronously. In this section, we will look at how services can communicate by asynchronously using message brokers.
执行异步通信的两个主要好处:
Two major benefits of performing asynchronous communication −
-
Producer and Consumer speed can differ − If the consumer of the data is slow or fast, it does not affect the producer processing and vice versa. Both can work at their own individual speeds without affecting each other.
-
Producer does not need to handle requests from various consumers − There maybe multiple consumers who want to read the same set of data from the producer. With a message broker in between, the producer does not need to take care of the load these consumers generate. Plus, any outages at producer level would not block the consumer from reading older producer data, as this data would be available in the message brokers.
Apache Kafka 和 RabbitMQ 是用于实现异步通信的两个著名的消息中间件。本教程中,我们将使用 Apache Kafka。
Apache Kafka and RabbitMQ are two well-known message brokers used for making asynchronous communication. In this tutorial, we will use Apache Kafka.
Kafka – Dependency Setting
让我们使用之前一直使用过的“Restaurant”示例。因此,让我们假设将客户服务和餐厅服务通过异步通信进行通信。为此,我们将使用 Apache Kafka。并且我们需要在两个服务中使用它,即客户服务和餐厅服务。
Let’s use the case of Restaurant that we have been using earlier. So, let us say we have our Customer Service and the Restaurant Service communicating via asynchronous communication. To do that, we will use Apache Kafka. And we will need to use that in both services, i.e., Customer Service and Restaurant Service.
为了使用 Apache Kafka,我们将更新两个服务的 POM,并添加以下依赖项。
To use Apache Kafka, we will update the POM of both services and add the following dependency.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
我们还需要运行 Kafka 实例。可以通过多种方式实现,但我们更愿意使用 Docker 容器启动 Kafka。以下是我们可能考虑使用的几个映像 −
We also need to have Kafka instances running. There are multiple ways through which it can be done,but we will prefer starting Kafka using Docker container. Here are a few images we can consider using −
无论使用哪个映像,这里需要注意的重要一点是,一旦映像启动并运行,请确保能够在 localhost:9092 访问 Kafka 集群。
Whichever image we use, the important thing here to note is that once the image is up and running,please ensure that the Kafka cluster is accessible at localhost:9092
既然我们在映像中运行了 Kafka 集群,让我们进入核心示例。
Now that we have the Kafka cluster running on our image, let’s move to the core example.
Binding & Binders
在 Spring Cloud 流中,有三个重要的概念 −
There are three important concepts when it comes to Spring Cloud streams −
-
External Messaging System − This is the component which is managed externally and is responsible to store the events/messages produced by the application that can be read by their subscriber/consumer. Note that this is not managed within the app/Spring. Few examples being Apache Kafka, RabbitMQ
-
Binders − This is the component which provides integration with messaging system, for example, consisting of IP address of messaging system, authentication, etc.
-
Bindings − This component uses the Binders to produce messages to the messaging system or consume the message from a specific topic/queue.
所有上述属性都在 application properties file 中定义。
All the above properties are defined in the application properties file.
Example
让我们使用之前一直使用过的“Restaurant”示例。因此,让我们假设每当向客户服务添加新服务时,我们都希望将客户信息通知到附近的餐厅。
Let us use the case of Restaurant that we have been using earlier. So, let us suppose whenever a new service is added to the Customer Service, we want to notify the customer info to the nearby Restaurants about him/her.
为此,首先让我们更新我们的客户服务,以包含和使用 Kafka。请注意,我们将使用客户服务作为数据的生产者。也就是说,当我们通过 API 添加客户时,它也会添加到 Kafka 中。
For this purpose, let us update our Customer Service first to include and use Kafka. Note that we will use Customer Service as a producer of the data. That is, whenever we add the Customer via API, it will also be added to the Kafka.
spring:
application:
name: customer-service
cloud:
stream:
source: customerBinding-out-0
kafka:
binder:
brokers: localhost:9092
replicationFactor: 1
bindings:
customerBinding-out-0:
destination: customer
producer:
partitionCount: 3
server:
port: ${app_port}
eureka:
client:
serviceURL:
defaultZone: http://localhost:8900/eureka
Points to note -
Points to note −
-
We have defined a binder with the address of our local Kafka instances.
-
We have also defined the binding ‘customerBinding-out-0’ which uses ‘customer’ topic to output the messages in.
-
We have also mentioned our binding in the stream.source so that we can imperatively use that in our code.
完成后,让我们通过添加一个新的“addCustomer”方法来更新我们的控制器,该方法负责提供 POST 请求。然后,从 post 请求,我们将数据发送到 Kafka Broker。
Once this is done, let us now update our controller by adding a new method ‘addCustomer’ which is responsible to serve the POST request. And then, from the post request, we send the data to the Kafka Broker.
package com.tutorialspoint;
import java.util.HashMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
class RestaurantCustomerInstancesController {
@Autowired
private StreamBridge streamBridge;
static HashMap<Long, Customer> mockCustomerData = new HashMap();
static{
mockCustomerData.put(1L, new Customer(1, "Jane", "DC"));
mockCustomerData.put(2L, new Customer(2, "John", "SFO"));
mockCustomerData.put(3L, new Customer(3, "Kate", "NY"));
}
@RequestMapping("/customer/{id}")
public Customer getCustomerInfo(@PathVariable("id") Long id) {
System.out.println("Querying customer for id with: " + id);
return mockCustomerData.get(id);
}
@RequestMapping(path = "/customer/{id}", method = RequestMethod.POST)
public Customer addCustomer(@PathVariable("id") Long id) {
// add default name
Customer defaultCustomer = new Customer(id, "Dwayne", "NY");
streamBridge.send("customerBinding-out-0", defaultCustomer);
return defaultCustomer;
}
}
Points to note
Points to note
-
We are Autowiring StreamBridge which is what we will use to send the messages.
-
The parameters we use in the ‘send’ method also specify the binding we want to use to send the data to.
现在,让我们更新我们的餐厅服务以包含和订阅“客户”主题。请注意,我们将使用餐厅服务作为数据的消费者。也就是说,每当我们通过 API 添加客户时,餐厅服务都将通过 Kafka 得知此情况。
Now let us update our Restaurant Service to include and subscribe to ‘customer’ topic. Note that we will use Restaurant Service as a consumer of the data. That is, whenever we add the Customer via API, the Restaurant Service would come to know about it via Kafka.
首先,让我们更新 application properties 文件。
First, let us update the application properties file.
spring:
application:
name: restaurant-service
cloud:
function:
definition: customerBinding
stream:
kafka:
binder:
brokers: localhost:9092
replicationFactor: 1
bindings:
customerBinding-in-0:
destination: customer
server:
port: ${app_port}
eureka:
client:
serviceURL:
defaultZone: http://localhost:8900/eureka
完成后,让我们通过添加一个新的 customerBinding
方法来更新我们的控制器,该方法负责获取请求并提供一个将打印请求及其元数据详细信息的函数。
Once this is done, let us now update our controller by adding a new method ‘customerBinding’ which is responsible to fetch the request and provide a function which will print the request along with its metadata details.
package com.tutorialspoint;
import java.util.HashMap;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
class RestaurantController {
@Autowired
CustomerService customerService;
@Autowired
private StreamBridge streamBridge;
static HashMap<Long, Restaurant> mockRestaurantData = new HashMap();
static{
mockRestaurantData.put(1L, new Restaurant(1, "Pandas", "DC"));
mockRestaurantData.put(2L, new Restaurant(2, "Indies", "SFO"));
mockRestaurantData.put(3L, new Restaurant(3, "Little Italy", "DC"));
mockRestaurantData.put(4L, new Restaurant(4, "Pizeeria", "NY"));
}
@RequestMapping("/restaurant/customer/{id}")
public List<Restaurant> getRestaurantForCustomer(@PathVariable("id") Long id) {
System.out.println("Got request for customer with id: " + id);
String customerCity = customerService.getCustomerById(id).getCity();
return mockRestaurantData.entrySet().stream().filter(
entry -> entry.getValue().getCity().equals(customerCity))
.map(entry -> entry.getValue())
.collect(Collectors.toList());
}
@RequestMapping("/restaurant/cust/{id}")
public void getRestaurantForCust(@PathVariable("id") Long id) {
streamBridge.send("ordersBinding-out-0", id);
}
@Bean
public Consumer<Message<Customer>> customerBinding() {
return msg -> {
System.out.println(msg);
};
}
}
Points to note -
Points to note −
-
We are using ‘customerBinding’ which is supposed to pass on the function which would be called when a message arrives for this binding.
-
The name that we use for this function/bean also needs to be used in the YAML file while creating the bundling and specifying the topic.
现在,让我们像往常一样执行上述代码,启动 Eureka 服务器。请注意,这不是硬性要求,只是为了完整性而存在的。
Now, let us execute the above code as always, start the Eureka Server. Note that this is not hard requirement and is present here for the sake of completeness.
然后,让我们使用以下命令编译并开始更新客户服务 -
Then, let us compile and start updating Customer Service using the following command −
mvn clean install ; java -Dapp_port=8083 -jar .\target\spring-cloud-eurekaclient-
1.0.jar --spring.config.location=classpath:application-kafka.yml
然后,让我们使用以下命令编译并开始更新餐厅服务 -
Then, let us compile and start updating Restaurant Service using the following command −
mvn clean install; java -Dapp_port=8082 -jar .\target\spring-cloud-feign-client-
1.0.jar --spring.config.location=classpath:application-kafka.yml
我们已经设置好了,现在让我们通过点击 API 来测试我们的代码块 -
And we are set, let us now test our code pieces by hitting the API −
curl -X POST http://localhost:8083/customer/1
这是我们将为该 API 获得的输出 -
Here is the output that we will get for this API −
{
"id": 1,
"name": "Dwayne",
"city": "NY"
}
现在,让我们检查餐厅服务的日志 -
And now, let us check the logs for the Restaurant Service −
GenericMessage [payload=Customer [id=1, name=Dwayne, city=NY],
headers={kafka_offset=1,...
因此,实际上,您看到使用 Kafka Broker,餐厅服务已收到有关新添加的客户的通知。
So, effectively, you see that using Kafka Broker, Restaurant Service was notified about the newly added Customer.
Partitions & Consumer Groups
分区和消费者组是您在使用 Spring Cloud Stream 时应该了解的两个重要概念。
Partitions and Consumer Groups are two important concepts that you should be aware of while using Spring Cloud streams.
Partitions - 它们用于对数据进行分区,以便我们可以在多个使用者之间划分工作。
Partitions − They are used to partition the data so that we can divide the work between multiple consumers.
让我们看看如何使用 Spring Cloud 对数据进行分区。比如说,我们想根据客户 ID 对数据进行分区。因此,让我们更新我们的客户服务以使其相同。为此,我们需要说明
Let us see how to partition the data in Spring Cloud. Say, we want to partition the data based on the Customer ID. So, let us update our Customer Service for the same. For that, what we will need to tell
让我们更新我们的客户服务应用程序属性,以指定我们数据的键。
Let us update our Customer Service application property to specify the key for our data.
spring:
application:
name: customer-service
cloud:
function:
definition: ordersBinding
stream:
source: customerBinding-out-0
kafka:
binder:
brokers: localhost:9092
replicationFactor: 1
bindings:
customerBinding-out-0:
destination: customer
producer:
partitionKeyExpression: 'getPayload().getId()'
partitionCount: 3
server:
port: ${app_port}
eureka:
client:
serviceURL:
defaultZone: http://localhost:8900/eureka
对于指定键,即“partitionKeyExpression”,我们提供 Spring Expression Language。该表达式将类型假定为 GenericMessage<Customer>,因为我们正在消息中发送 Customer 数据。请注意,GenericMessage 是 Spring Framework 类,用于在一个对象中包装有效负载和头信息。因此,我们从该消息中获取有效负载,该有效负载的类型为 Customer,然后我们在客户上调用 getId() 方法。
For specifying the key, i.e., “partitionKeyExpression” we provide Spring Expression Language. The expression assumes the type as GenericMessage<Customer> since we are sending the Customer data in the message. Note that GenericMessage is the Spring Framework class used for wrapping the payload and the headers in a single object. So, we get the payload from this message which is of the type Customer and then we call the getId() method on the customer.
现在,让我们还更新我们的消费者(即餐厅服务)以在使用请求时记录更多信息。
Now, let us also update our consumer, i.e., the Restaurant Service to log more info while consuming the request.
现在,我们按惯例执行上面的代码,启动 Eureka 服务端。注意,这不是一项硬性要求,在此出现是为了完整性。
Now, let us execute the above code as always, start the Eureka Server. Note that this is not a hard requirement and is present here for the sake of completeness.
然后,让我们使用以下命令编译并开始更新客户服务 -
Then, let us compile and start updating Customer Service using the following command −
mvn clean install ; java -Dapp_port=8083 -jar .\target\spring-cloud-eurekaclient-
1.0.jar --spring.config.location=classpath:application-kafka.yml
然后,让我们使用以下命令编译并开始更新餐厅服务 -
Then, let us compile and start updating Restaurant Service using the following command −
mvn clean install; java -Dapp_port=8082 -jar .\target\spring-cloud-feign-client-
1.0.jar --spring.config.location=classpath:application-kafka.yml
我们已做好设置,现在开始测试我们的代码部分。下面是我们在测试中将要执行的操作:
And we are set, let us now test our code pieces. As part of testing, here is what we will do −
-
Insert a customer with Id 1: curl -X POST [role="bare"]http://localhost:8083/customer/1
-
Insert a customer with Id 1: curl -X POST [role="bare"]http://localhost:8083/customer/1
-
Insert a customer with Id 1: curl -X POST [role="bare"]http://localhost:8083/customer/5
-
Insert a customer with Id 1: curl -X POST [role="bare"]http://localhost:8083/customer/3
-
Insert a customer with Id 1: curl -X POST [role="bare"]http://localhost:8083/customer/1
我们不太关心 API 的输出。相反,我们更关心将数据发送到的分区。由于我们使用顾客 ID 作为关键信息,我们希望具有相同 ID 的顾客将最终进入相同的分区。
We do not care much about the output of the API. Rather, we care more about the partition to which the data is sent to. Since we are using customer ID as the key, we expect that the customer with the same ID would end up in the same partition.
现在,让我们检查餐厅服务的日志 -
And now, let us check the logs for the Restaurant Service −
Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400
Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323
Partition Id: 1
Customer: Customer [id=1, name=Dwayne, city=NY]
Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400
Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323
Partition Id: 1
Customer: Customer [id=1, name=Dwayne, city=NY]
Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400
Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323
Partition Id: 2
Customer: Customer [id=5, name=Dwayne, city=NY]
Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400
Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323
Partition Id: 0
Customer: Customer [id=3, name=Dwayne, city=NY]
Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323
Partition Id: 1
Customer: Customer [id=1, name=Dwayne, city=NY]
因此,正如我们所见,具有 Id 1 的顾客每次都最终进入相同的分区,即第 1 分区。
So, as we see, Customer with Id 1 ended up in the same partition every time, i.e., partition 1.
Consumer Group − 消费群组是为相同目的读取相同话题的消费者的逻辑分组。话题中的数据在消费群组中的消费者之间进行分区,因此,特定消费群组中只有一个消费者可以读取话题的一个分区。
Consumer Group − A consumer group is the logical grouping of consumers reading the same topic for the same purpose. Data in a topic is partitioned between the consumers in a consumer group so that only one consumer from a given consumer group can read a partition of a topic.
要定义一个消费群组,我们只需要在使用 Kafka 话题名称的绑定中定义一个群组。例如,让我们在用于控制器的应用程序文件中定义消费群组名称。
To define a consumer group, all we need to do is define a group in the bindings where we use the Kafka topic name. For example, let us define the consumer group name in our application file for our controller.
spring:
application:
name: restaurant-service
cloud:
function:
definition: customerBinding
stream:
kafka:
binder:
brokers: localhost:9092
replicationFactor: 1
bindings:
customerBinding-in-0:
destination: customer
group: restController
server:
port: ${app_port}
eureka:
client:
serviceURL:
defaultZone: http://localhost:8900/eureka
让我们重新编译并启动 Restaurant 服务。现在,让我们通过对 Customer 服务上的 POST API 执行操作来生成事件:
Let us recompile and start the Restaurant Service. Now, let us generate the event by hitting the POST API on the Customer Service −
使用 Id 1 插入一名顾客:curl -X POST [role="bare"] [role="bare"]http://localhost:8083/customer/1
Insert a customer with Id 1: curl -X POST [role="bare"]http://localhost:8083/customer/1
现在,如果我们检查 Restaurant 服务的日志,我们会看到以下内容:
Now, if we check the logs of our Restaurant Service, we will see the following −
Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400
Consumer Group: restContoller
Partition Id: 1
Customer: Customer [id=1, name=Dwayne, city=NY]
因此,从输出中可以看到,我们创建了一个名为“rest-contoller”的消费群组,其消费者负责读取这些话题。在上述情况下,我们仅仅运行了该服务的单个实例,因此“customer”话题的所有分区都分配给了同一个实例。但是,如果我们有多个分区,那么分区将分布在工作进程之间。
So, as we see from the output, we have a consumer group called ‘rest-contoller’ created, whose consumers are responsible to read the topics. In the above case, we just had a single instance of the service running, so all the partition of the ‘customer’ topic was assigned to the same instance. But, if we have multiple partitions, we will have partitions distributed among the workers.