Spring Cloud Stream Schema Registry

Introduction

当组织具有基于消息传递的 pub/sub 架构,并且多个生产者和消费者微服务彼此通信时,通常需要所有这些微服务就基于模式的合同达成一致。当此类模式需要进行演变以适应新的业务要求时,仍然需要现有组件继续工作。Spring Cloud Stream 为一个独立的模式注册表服务器提供支持,可以使用该服务器注册上述模式并由应用程序使用。Spring Cloud Stream 模式注册表支持还为基于 Avro 的模式注册表客户端提供支持,这些客户端本质上提供消息转换器,该消息转换器与模式注册表通信以在消息转换期间协调模式。Spring Cloud Stream 提供的模式演变支持既适用于上述独立模式注册表,也适用于专门适用于 Apache Kafka 的 Confluent 提供的模式注册表。

Spring Cloud Stream Schema Registry overview

Spring Cloud Stream Schema Registry 为模式演变提供支持,以便数据可以随着时间的推移进行演变,并且仍然可以与较早或较新的生产者和消费者结合使用,反之亦然。大多数序列化模型(特别是那些旨在跨不同平台和语言实现可移植性的模型),依赖于描述数据如何在二进制有效负载中序列化的模式。为了序列化数据然后进行解释,发送方和接收方都必须能够访问描述二进制格式的模式。在某些情况下,模式可以从序列化时的有效负载类型或从反序列化时的目标类型推断出来。然而,许多应用程序受益于访问描述二进制数据格式的显式模式。模式注册表允许你将模式信息存储为文本格式(通常是 JSON),并使各种需要它以二进制格式接收和发送数据的应用程序可以访问该信息。模式可以作为以下形式的元组进行引用:

  • 一个作为模式逻辑名称的主题

  • The schema version

  • 模式格式,描述数据二进制格式

Spring Cloud Stream Schema Registry 提供以下组件:

  • Standalone Schema Registry ServerBy default, it is using an H2 database, but server can be used with PostgreSQL or MySQL by providing appropriate datasource configuration.

  • 能够通过与模式注册表通信来消息传递的模式注册表客户端。Currently, the client can communicate to the standalone schema registry or the Confluent Schema Registry.

Schema Registry Client

与模式注册表服务器交互的客户端端抽象是 SchemaRegistryClient 接口,它具有以下结构:

public interface SchemaRegistryClient {

    SchemaRegistrationResponse register(String subject, String format, String schema);

    String fetch(SchemaReference schemaReference);

    String fetch(Integer id);

}

Spring Cloud Stream 为与自己的模式服务器交互和与 Confluent Schema Registry 交互提供开箱即用的实现。

可以使用 @EnableSchemaRegistryClient 为 Spring Cloud Stream 模式注册表客户端进行配置,如下所示:

@SpringBootApplication
@EnableSchemaRegistryClient
public class ConsumerApplication {

}

默认转换器经过优化,不仅可以缓存远程服务器的架构,还可以缓存 parse()toString() 方法,这些方法相当昂贵。正因为如此,它使用了一个不缓存响应的 DefaultSchemaRegistryClient 。如果您打算更改默认行为,则可以在代码中直接使用客户端并将其覆盖到所需的输出。为此,您必须将 spring.cloud.stream.schemaRegistryClient.cached=true 属性添加到您的应用程序属性中。

Schema Registry Client Properties

模式注册表客户端支持以下属性:

spring.cloud.stream.schemaRegistryClient.endpoint

The location of the schema-server. When setting this, use a full URL, including protocol (http or https) , port, and context path.

Default

http://localhost:8990/

spring.cloud.stream.schemaRegistryClient.cached

Whether the client should cache schema server responses. Normally set to false, as the caching happens in the message converter. Clients using the schema registry client should set this to true.

Default

false

Avro Schema Registry Client Message Converters

对于在应用程序上下文中注册了 SchemaRegistryClient bean 的应用程序,Spring Cloud Stream 会自动配置一个用于模式管理的 Apache Avro 消息转换器。这简化了模式演变,因为接收消息的应用程序可以轻松获取可以与其自己的读取器模式协调的写入器模式。

对于出站消息,如果绑定的内容类型设置为 application/*+avro,则会激活 MessageConverter,如下面的示例所示:

spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro

在出站转换期间,消息转换器会尝试推断每个出站消息的模式(基于其类型),并使用 SchemaRegistryClient 根据主题类型将其注册到一个主题。如果已找到相同的模式,那么会检索它的一个引用。如果没有,则会注册模式,并提供一个新的版本号。消息会使用以下格式通过 contentType 标题发送: application/[prefix].[subject].v[version]+avro,其中 prefix 是可配置的,而 subject 根据主题类型推断出来。

例如,类型为 User 的消息可以作为二进制有效负载发送,其内容类型为 application/vnd.user.v2+avro,其中 user 是主题,2 是版本号。

在接收消息时,转换器从传入消息的标头推断模式引用并尝试检索它。该模式在反序列化过程中用作写入器模式。

Avro Schema Registry Message Converter Properties

如果你通过设置 spring.cloud.stream.stream.bindings.<输出绑定名称>.contentType=application/*+avro 启用了基于 Avro 的架构注册表客户端,你可以通过设置下列属性来自定义注册行为。

spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled

Enable if you want the converter to use reflection to infer a Schema from a POJO.

默认值:false

spring.cloud.stream.schema.avro.readerSchema

Avro compares schema versions by looking at a writer schema (origin payload) and a reader schema (your application payload). See the Avro documentation for more information. If set, this overrides any lookups at the schema server and uses the local schema as the reader schema. Default: null

spring.cloud.stream.schema.avro.schemaLocations

Registers any .avsc files listed in this property with the Schema Server.

默认值:empty

spring.cloud.stream.schema.avro.prefix

The prefix to be used on the Content-Type header.

默认值:vnd

spring.cloud.stream.schema.avro.subjectNamingStrategy

Determines the subject name used to register the Avro schema in the schema registry. Two implementations are available, org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy, where the subject is the schema name, and org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy, which returns a fully qualified subject using the Avro schema namespace and name. Custom strategies can be created by implementing org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy.

默认值:org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy

spring.cloud.stream.schema.avro.ignoreSchemaRegistryServer

Ignore any schema registry communication. Useful for testing purposes so that when running a unit test, it does not unnecessarily try to connect to a Schema Registry server.

默认值:false

Apache Avro Message Converters

Spring Cloud Stream 通过其 spring-cloud-stream-schema-registry-client 模块提供对基于架构的消息转换器的支持。当前,基于架构的消息转换器唯一支持的序列化格式是 Apache Avro,未来版本中会增加更多格式。

spring-cloud-stream-schema-registry-client 模块包含两种可用于 Apache Avro 序列化的消息转换器类型:

  • 使用序列化或反序列化的对象的类信息或在启动时已知位置的模式的转换器。

  • 使用模式注册表的转换器。他们在运行时定位模式,并在领域对象演变时动态注册新模式。

Converters with Schema Support

AvroSchemaMessageConverter 支持通过使用预定义的架构或使用类中可用的架构信息(反射方式或包含在 SpecificRecord 中)对消息进行序列化和反序列化。如果你提供自定义转换器,那么默认的 AvroSchemaMessageConverter bean 不会创建。以下示例展示了自定义转换器:

要使用自定义转换器,你可以简单地将它添加到应用程序上下文中,也可以选择指定一个或多个与其关联的 MimeTypes。默认的 MimeTypeapplication/avro

如果转换的目标类型是 GenericRecord,则必须设置架构。

以下示例展示了如何在接收器应用程序中配置转换器,方法是在没有预定义架构的情况下注册 Apache Avro MessageConverter。在此示例中,请注意 MIME 类型值是 avro/bytes,而不是默认的 application/avro

@SpringBootApplication
public static class SinkApplication {

  //...

  @Bean
  public MessageConverter userMessageConverter() {
      return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
  }
}

相反,以下应用程序注册了具有预定义架构(在类路径中找到)的转换器:

@SpringBootApplication
public static class SinkApplication {

  //...

  @Bean
  public MessageConverter userMessageConverter() {
      AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
      converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
      return converter;
  }
}

Schema Registry Server

Spring Cloud Stream 提供了架构注册表服务器实现。要使用它,你可以下载最新的 spring-cloud-stream-schema-registry-server 版本并将其作为独立应用程序运行:

wget https://repo1.maven.org/maven2/org/springframework/cloud/spring-cloud-stream-schema-registry-server/4.0.3/spring-cloud-stream-schema-registry-server-4.0.3.jar
java -jar ./spring-cloud-stream-schema-registry-server-4.0.3.jar

你可以在现有 Spring Boot Web 应用程序中嵌入架构注册表。要实现这一点,向项目添加 spring-cloud-stream-schema-registry-core 工件并使用 @EnableSchemaRegistryServer 注释,它将架构注册表服务器 REST 控制器添加到你的应用程序。以下示例展示了一个启用了架构注册表的 Spring Boot 应用程序:

@SpringBootApplication
@EnableSchemaRegistryServer
public class SchemaRegistryServerApplication {
public static void main(String[] args) {
SpringApplication.run(SchemaRegistryServerApplication.class, args);
}
}

spring.cloud.stream.schema.server.path 属性可用于控制架构服务器的根路径(特别当它嵌入在其他应用程序中时)。布尔属性 spring.cloud.stream.schema.server.allowSchemaDeletion 启用架构删除。默认情况下,此功能处于禁用状态。

模式注册表服务器使用关系数据库存储模式。默认情况下,它使用嵌入式数据库。您可以使用 Spring Boot SQL database and JDBC configuration options 来定制模式存储。

Schema Registry Server API

架构注册表服务器 API 包含以下操作:

Registering a New Schema

要注册新架构,向 / 端点发送 POST 请求。

/ 接收具有以下字段的 JSON 有效负载:

  • subject: The schema subject

  • format: The schema format

  • definition: The schema definition

其响应是一个具有以下字段的 JSON 中的架构对象:

  • id: The schema ID

  • subject: The schema subject

  • format: The schema format

  • version: The schema version

  • definition: The schema definition

Retrieving an Existing Schema by Subject, Format, and Version

要按主题、格式和版本检索现有的模式,请向“{subject}/{format}/{version}”端点发送“GET”请求。

其响应是一个具有以下字段的 JSON 中的架构对象:

  • id: The schema ID

  • subject: The schema subject

  • format: The schema format

  • version: The schema version

  • definition: The schema definition

Retrieving an Existing Schema by Subject and Format

要按主题和格式检索现有的模式,请向“/subject/format”端点发送“GET”请求。

它的响应是一个包含每个模式对象的 JSON 模式列表,其中包含以下字段:

  • id: The schema ID

  • subject: The schema subject

  • format: The schema format

  • version: The schema version

  • definition: The schema definition

Retrieving an Existing Schema by ID

要按其 ID 检索模式,请向“/schemas/{id}”端点发送“GET”请求。

其响应是一个具有以下字段的 JSON 中的架构对象:

  • id: The schema ID

  • subject: The schema subject

  • format: The schema format

  • version: The schema version

  • definition: The schema definition

Deleting a Schema by Subject, Format, and Version

要删除由其主题、格式和版本标识的模式,请向“{subject}/{format}/{version}”端点发送“DELETE”请求。

Deleting a Schema by ID

要按其 ID 删除模式,请向“/schemas/{id}”端点发送“DELETE”请求。

Deleting a Schema by Subject

“DELETE /{subject}”

按主题删除现有模式。

此备注仅适用于 Spring Cloud Stream 1.1.0.RELEASE 用户。Spring Cloud Stream 1.1.0.RELEASE 用于存储 Schema 对象的表名称为 schemaSchema 是多个数据库实现中的关键字。为了避免将来出现任何冲突,从 1.1.1.RELEASE 开始,我们选择将存储表命名为 SCHEMA_REPOSITORY。所有升级 Spring Cloud Stream 1.1.0.RELEASE 的用户都应在升级之前将现有架构迁移到新表。

Using Confluent’s Schema Registry

默认配置创建一个 DefaultSchemaRegistryClient bean。如果你要使用 Confluent Schema 注册表,你需要创建一个 ConfluentSchemaRegistryClient 类型的 bean,该 bean 替代了框架默认配置的 bean。以下示例演示了如何创建一个这样的 bean:

@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
  ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
  client.setEndpoint(endpoint);
  return client;
}

ConfluentSchemaRegistryClient 已针对 Confluent 平台版本 4.0.0 进行了测试。

Schema Registration and Resolution

为了更好地理解 Spring Cloud Stream 如何注册和解析新模式及其如何使用 Avro 模式比较功能,我们提供了两个单独的小节:

Schema Registration Process (Serialization)

注册过程的第一部分是从通过通道发送的有效负载中提取模式。Avro 类型(如 SpecificRecordGenericRecord)已经包含一个模式,可以立即从实例中检索该模式。对于 POJO,如果将 spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled 属性设置为“true”(默认),则会推断出一个模式。

一旦获取到模式,转换器会从远程服务器加载其元数据(版本)。首先,它查询本地缓存。如果没有找到结果,它会将数据提交到服务器,服务器将回复版本信息。转换器始终会缓存结果,以避免为需要序列化的每条新消息查询模式服务器而产生的开销。

有了模式版本信息,转换器就会把消息的 contentType 标题设置为携带版本信息,例如:application/vnd.user.v1+avro

Schema Resolution Process (Deserialization)

在读取包含版本信息(即带有类似 Schema Registration Process (Serialization) 中描述的模式的 contentType 标头)的消息时,转换器会查询模式服务器以获取消息的编写者模式。一旦找到了传入消息的正确模式,它就会检索读取者模式,并通过使用 Avro 的模式解析支持,将其读入读者定义(设置默认值和所有缺失属性)。

你应该了解写入器模式(撰写消息的应用程序)和读取器模式(接收应用程序)之间的区别。我们建议花点时间阅读 the Avro terminology 并了解此过程。Spring Cloud Stream 始终获取写入器模式以确定如何读取消息。如果要让 Avro 的模式演进支持发挥作用,则需要确保为应用程序正确设置了 readerSchema