Redis Streams

Redis Streams 以抽象方式对日志数据结构建模。通常,日志是仅附加数据结构,从头开始随机位置,或者通过流式传输新消息来消费的。

请访问“ Redis reference documentation”详细了解 Redis Streams。

Redis Streams 粗略可以分为两个功能区域:

  • Appending records

  • Consuming records

尽管此模式与 Pub/Sub 相似,但主要区别在于消息的持久性和它们被使用的方式。 Pub/Sub 依赖于瞬态消息的广播(即,如果您不倾听,则会错过一条消息),而 Redis Stream 使用持久且仅追加的数据类型,此数据类型保留消息直至修剪流。消费的另一个区别在于,Pub/Sub 注册服务端订阅。Redis 将到达的消息推送到客户端,而 Redis Streams 需要主动轮询。 org.springframework.data.redis.connectionorg.springframework.data.redis.stream 包为 Redis Streams 提供核心功能。

Appending

要发送记录,您可以与其他操作一样,使用底层的 RedisConnection 或高级的 StreamOperations。这两个实体都提供接受记录和目标流作为参数的 add (xAdd) 方法。RedisConnection 需要原始数据(字节数组),而 StreamOperations 允许将任意对象作为记录传入,如下例所示:

// append message through connection
RedisConnection con = …
byte[] stream = …
ByteRecord record = StreamRecords.rawBytes(…).withStreamKey(stream);
con.xAdd(record);

// append message through RedisTemplate
RedisTemplate template = …
StringRecord record = StreamRecords.string(…).withStreamKey("my-stream");
template.opsForStream().add(record);

流记录作为其有效负载携带 Map(键值元组)。将记录追加到流中会返回可作为进一步引用的 RecordId

Consuming

在消费端,可以消费一个或多个流。Redis Streams 提供读取命令,允许从已知流内容中的任意位置(随机访问)和流结尾之外的流中消费。

RedisConnection 在低层提供了 xReadxReadGroup 方法,分别映射用于在消费者组中读取和读取的 Redis 命令。请注意,多个流可以用作参数。

Redis 中的订阅命令可以是阻塞的。也就是说,在连接上调用“ xRead”时,当前线程会被阻塞,因为它会开始等待消息。只有在读取命令超时或收到消息时,该线程才会释放。

要消耗流消息,可以轮询应用程序代码中的消息,也可以使用两个 Asynchronous reception through Message Listener Containers 中的一个(命令式或响应式)中的一个。每次有新记录到达时,容器会通知应用程序代码。

Synchronous reception

虽然流消费通常与异步处理相关,但可以同步消费消息。重载的 StreamOperations.read(…) 方法提供此功能。在同步接收期间,调用线程可能会在消息可用时一直处于阻塞状态。StreamReadOptions.block 属性指定接收者放弃等待消息前应该等待多长时间。

// Read message through RedisTemplate
RedisTemplate template = …

List<MapRecord<K, HK, HV>> messages = template.opsForStream().read(StreamReadOptions.empty().count(2),
				StreamOffset.latest("my-stream"));

List<MapRecord<K, HK, HV>> messages = template.opsForStream().read(Consumer.from("my-group", "my-consumer"),
				StreamReadOptions.empty().count(2),
				StreamOffset.create("my-stream", ReadOffset.lastConsumed()))

Asynchronous reception through Message Listener Containers

由于其阻塞特性,低级别轮询不可取,因为它需要为每一个消费者都进行连接和线程管理。为了缓解此问题,Spring Data 提供了消息侦听器来完成所有繁重的工作。如果您熟悉 EJB 和 JMS,应该会发现这些概念很熟悉,因为它旨在尽可能接近 Spring Framework 中的支持及其消息驱动的 POJO (MDP) 。

Spring Data 提供了针对所使用的编程模型定制的两个实现:

  • `StreamMessageListenerContainer`用作命令式编程模型的消息侦听器容器。它用于从 Redis 流中消耗记录并将注入其中的 `StreamListener`实例驱动起来。

  • `StreamReceiver`提供消息侦听器的响应式变体。它用于将 Redis 流中的消息消耗为潜在的无限流并通过 `Flux`发布流消息。

StreamMessageListenerContainerStreamReceiver 负责消息接收的所有线程处理和分派到侦听器进行处理。消息侦听器容器/接收器是 MDP 和消息提供者之间的中间代理,它负责注册以接收消息、资源获取和释放、异常转换等。这让开发者可以编写与接收消息(并做出反应)相关联的(可能很复杂的)业务逻辑,并将样板 Redis 基础架构问题委派给框架。

这两个容器都允许运行时配置更改,这样您可以在应用程序运行时添加或删除订阅,而无需重启。此外,容器使用延迟订阅方法,仅在需要时才使用 RedisConnection。如果所有侦听器都取消订阅,它会自动执行清理,并释放线程。

Imperative StreamMessageListenerContainer

以类似于 EJB 世界中的消息驱动 Bean (MDB) 的方式,流驱动 POJO (SDP) 充当 Stream 消息的接收器。SDP 的一个限制是它必须实现 org.springframework.data.redis.stream.StreamListener 接口。还需要注意的是,在您的 POJO 在多个线程上接收消息的情况下,重要的是确保您的实现是线程安全的。

class ExampleStreamListener implements StreamListener<String, MapRecord<String, String, String>> {

	@Override
	public void onMessage(MapRecord<String, String, String> message) {

		System.out.println("MessageId: " + message.getId());
		System.out.println("Stream: " + message.getStream());
		System.out.println("Body: " + message.getValue());
	}
}

StreamListener 表示一个函数式接口,因此可以使用其 Lambda 形式重写实现:

message -> {

    System.out.println("MessageId: " + message.getId());
    System.out.println("Stream: " + message.getStream());
    System.out.println("Body: " + message.getValue());
};

一旦实现了 StreamListener,便可以创建一个消息侦听器容器并注册一个订阅:

RedisConnectionFactory connectionFactory = …
StreamListener<String, MapRecord<String, String, String>> streamListener = …

StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainerOptions
			.builder().pollTimeout(Duration.ofMillis(100)).build();

StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(connectionFactory,
				containerOptions);

Subscription subscription = container.receive(StreamOffset.fromStart("my-stream"), streamListener);

请参考各种消息侦听器容器的 Javadoc,以获取每个实现支持的全部功能的完整说明。

Reactive StreamReceiver

流数据源的反应性消费通常通过事件或消息的“Flux”发生。通过“StreamReceiver”及其重载的“receive(…​)”消息提供反应性接收器实现。与“StreamMessageListenerContainer”相比,反应性方法需要较少的线程等基础设施资源,因为它利用驱动程序提供的线程资源。接收流是“StreamMessage”的需求驱动发布者:

Flux<MapRecord<String, String, String>> messages = …

return messages.doOnNext(it -> {
    System.out.println("MessageId: " + message.getId());
    System.out.println("Stream: " + message.getStream());
    System.out.println("Body: " + message.getValue());
});

现在,我们需要创建“StreamReceiver”并注册一个订阅以使用流消息:

ReactiveRedisConnectionFactory connectionFactory = …

StreamReceiverOptions<String, MapRecord<String, String, String>> options = StreamReceiverOptions.builder().pollTimeout(Duration.ofMillis(100))
				.build();
StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(connectionFactory, options);

Flux<MapRecord<String, String, String>> messages = receiver.receive(StreamOffset.fromStart("my-stream"));

请参考各种消息侦听器容器的 Javadoc,以获取每个实现支持的全部功能的完整说明。

受需求驱动的消耗使用反压信号激活和停用轮询。StreamReceiver 当需求被满足时,订阅项将暂停轮询,直到订阅者发出进一步的需求。根据 ReadOffset 策略,这可能会导致跳过消息。

Acknowledge strategies

当您通过“Consumer Group”读取消息时,服务器会记住已传递给定消息并将其添加到待处理条目列表(PEL)中。已传递但尚未确认的消息列表。必须通过“StreamOperations.acknowledge”确认消息,以从待处理的条目列表中删除它,如下面的代码段所示。

StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = ...

container.receive(Consumer.from("my-group", "my-consumer"), 1
	StreamOffset.create("my-stream", ReadOffset.lastConsumed()),
    msg -> {

	    // ...
	    redisTemplate.opsForStream().acknowledge("my-group", msg); 2
    });
1 从组 _my-group_中以 _my-consumer_读取。收到的消息不会被确认。
2 在处理后确认消息。

如需在接收时自动确认消息,使用 receiveAutoAck 而非 receive

ReadOffset strategies

流读取操作接受读取偏移量规范,以从此偏移量开始使用消息。“ReadOffset”表示读取偏移量规范。Redis 根据您是以独立方式使用流还是在消费者组中使用流,支持 3 种类型的偏移量:

  • ReadOffset.latest()– 读取最新消息。

  • ReadOffset.from(…)– 在特定消息 Id 后读取。

  • ReadOffset.lastConsumed()– 在最后消耗的消息 Id 之后读取(仅消费者组)。

在基于容器的消息消费的上下文中,我们需在使用消息时提升(或递增)读取偏移量。提升取决于请求的“ReadOffset”和消费模式(带有或不带有消费者组)。下表解释了容器如何提升“ReadOffset”:

Table 1. ReadOffset Advancing
Read offset Standalone Consumer Group

Latest

Read latest message

Read latest message

Specific Message Id

使用最后看到的邮件作为下一个 MessageId

使用最后看到的邮件作为下一个 MessageId

Last Consumed

使用最后看到的邮件作为下一个 MessageId

消费者组根据最后消费的邮件

从特定消息 ID 和最后使用消息中读取内容可被认为是安全的操作,用于确保使用添加到流中的所有消息。对读取使用最新消息可以跳过在轮询操作处于死时间的状态时添加到流中的消息。轮询会引入死时间,其中消息可以在各个轮询命令之间到达。流使用不是线性连续读取,而是拆分为重复的“XREAD”调用。

Serialization

发送到流中的任何记录都需要序列化为其二进制格式。由于流非常接近哈希数据结构,因此流密钥、字段名称和值使用配置在“RedisTemplate”上的相应序列化程序。

Table 2. Stream Serialization
Stream Property Serializer Description

key

keySerializer

used for Record#getStream()

field

hashKeySerializer

用于有效负载中每个地图密钥

value

hashValueSerializer

用于有效负载中每个地图值

请确保检查正在使用的“RedisSerializer”,并注意,如果您决定不使用任何序列化程序,则您需要确保这些值已是二进制格式的。

Object Mapping

Simple Values

“StreamOperations”允许直接将简单值通过“ObjectRecord”附加到流中,而不必将这些值放入“Map”结构中。该值将随后分配给“payload”字段,并且可以在读回该值时提取它。

ObjectRecord<String, String> record = StreamRecords.newRecord()
    .in("my-stream")
    .ofObject("my-value");

redisTemplate()
    .opsForStream()
    .add(record); 1

List<ObjectRecord<String, String>> records = redisTemplate()
    .opsForStream()
    .read(String.class, StreamOffset.fromStart("my-stream"));
1 XADD my-stream * "_class" "java.lang.String" "_raw" "my-value"

“ObjectRecord”通过与所有其他记录完全相同的序列化过程,因此还可以使用返回“MapRecord”的未类型化读取操作来获取记录。

Complex Values

可以通过 3 种方式将复杂值添加到流中:

  • 使用简单的值进行转换,例如字符串 JSON 表示形式。

  • 使用合适的 `RedisSerializer`序列化该值。

  • 使用 HashMapper将值转换为适用于序列化的 Map

第一个方法是最直接的一个,但是忽略了流结构提供的字段值功能,但是流中的值对于其他消费者来说仍然可读。第二个选项具有与第一个选项相同的好处,但是由于所有消费者必须实现完全相同的序列化机制,因此可能导致非常特殊的消费者限制。“HashMapper”方法稍微复杂一些,使用了流哈希结构,但是扁平化了源。如果选择合适的序列化程序组合,则其他消费者仍然能够读取记录。

HashMappers”使用特定类型将有效负载转换为“ Map”。确保使用能够(反)序列化哈希的哈希键和哈希值序列化器。

ObjectRecord<String, User> record = StreamRecords.newRecord()
    .in("user-logon")
    .ofObject(new User("night", "angel"));

redisTemplate()
    .opsForStream()
    .add(record); 1

List<ObjectRecord<String, User>> records = redisTemplate()
    .opsForStream()
    .read(User.class, StreamOffset.fromStart("user-logon"));
1 XADD user-logon * "_class" "com.example.User" "firstname" "night" "lastname" "angel"

StreamOperations 默认情况下使用 ObjectHashMapper。获取 StreamOperations 时,您可以提供适合您要求的 HashMapper

redisTemplate()
    .opsForStream(new Jackson2HashMapper(true))
    .add(record); 1
1 XADD user-logon * "firstname" "night" "@class" "com.example.User" "lastname" "angel"

一个“StreamMessageListenerContainer”可能不知道域类型上使用的任何“@TypeAlias”,因为那些需要通过“MappingContext”解析。确保使用“initialEntitySet”初始化“RedisMappingContext”。

@Bean
RedisMappingContext redisMappingContext() {
    RedisMappingContext ctx = new RedisMappingContext();
    ctx.setInitialEntitySet(Collections.singleton(Person.class));
    return ctx;
}

@Bean
RedisConverter redisConverter(RedisMappingContext mappingContext) {
    return new MappingRedisConverter(mappingContext);
}

@Bean
ObjectHashMapper hashMapper(RedisConverter converter) {
    return new ObjectHashMapper(converter);
}

@Bean
StreamMessageListenerContainer streamMessageListenerContainer(RedisConnectionFactory connectionFactory, ObjectHashMapper hashMapper) {
    StreamMessageListenerContainerOptions<String, ObjectRecord<String, Object>> options = StreamMessageListenerContainerOptions.builder()
            .objectMapper(hashMapper)
            .build();

    return StreamMessageListenerContainer.create(connectionFactory, options);
}