Pub/Sub Messaging

Spring Data 为 Redis 提供专门的消息集成,其功能与 Spring 框架中 JMS 集成的功能和命名类似。 Redis 消息传递大致可以分为两个功能区域:

  • 消息的发布或生成

  • 消息的订阅或消费

这是一个通常称为发布/订阅(简称 Pub/Sub)的模式的示例。RedisTemplate 类用于消息生成。对于类似于 Java EE 的消息驱动的 bean 样式的异步接收,Spring Data 提供了一个专门的消息侦听器容器,用于创建消息驱动的 POJO(MDP),对于同步接收,提供 RedisConnection 合约。 org.springframework.data.redis.connectionorg.springframework.data.redis.listener 包为 Redis 消息传递提供核心功能。

Publishing (Sending Messages)

要发布消息,您可以像使用其他操作一样使用底层的 [Reactive]RedisConnection 或高级的 [Reactive]RedisOperations。这两个实体都提供 publish 方法,该方法接受消息和目标频道作为参数。虽然 RedisConnection 需要原始数据(字节数组),但 [Reactive]RedisOperations 允许将任意对象作为消息传递,如下面的示例所示:

  • Imperative

  • Reactive

// send message through connection
RedisConnection con = …
byte[] msg = …
byte[] channel = …
con.pubSubCommands().publish(msg, channel);

// send message through RedisOperations
RedisOperations operations = …
Long numberOfClients = operations.convertAndSend("hello!", "world");
// send message through connection
ReactiveRedisConnection con = …
ByteBuffer[] msg = …
ByteBuffer[] channel = …
con.pubSubCommands().publish(msg, channel);

// send message through ReactiveRedisOperations
ReactiveRedisOperations operations = …
Mono<Long> numberOfClients = operations.convertAndSend("hello!", "world");

Subscribing (Receiving Messages)

在接收端,可以通过直接命名通道或使用模式匹配订阅一个或多个通道。后一种方法非常有用,因为它不仅可以通过一个命令创建多个订阅,还可以监听在订阅时尚未创建的通道(只要它们与模式匹配)。

在低级别中,RedisConnection 提供 subscribepSubscribe 方法,用于分别映射订阅按通道或按模式的 Redis 命令。请注意,可以使用多个通道或模式作为参数。要更改连接的订阅或查询它是否正在监听,RedisConnection 提供 getSubscriptionisSubscribed 方法。

Spring Data Redis 中的订阅命令属于阻塞命令。也就是说,在连接上调用 subscribe 时,当前线程会被阻塞,因为它会开始等待消息。只有在订阅被取消时,该线程才会释放,而订阅取消发生在另一个线程在“ same”连接上调用“ unsubscribe”或“ pUnsubscribe”时。有关解决此问题的方案,请参阅(本文档的后续内容)“Message Listener Containers”。

如前所述,一旦订阅,连接便开始等待消息。只允许添加新订阅、修改现有订阅和取消现有订阅的命令。调用 subscribepSubscribeunsubscribepUnsubscribe 以外的任何内容都会引发异常。

为了订阅消息,需要实现 MessageListener 回调。每次新消息到达时,都会调用回调,并且用户代码由 onMessage 方法运行。该接口不仅可以访问实际消息,还可以访问通过该消息接收到的通道和订阅用来匹配通道的模式(如果有)。此信息允许被调用者不仅通过内容区分各种消息,还可以检查其他详细信息。

Message Listener Containers

由于其阻塞性,低级订阅并不具有吸引力,因为它需要为每个侦听器连接和线程管理。为了缓解此问题,Spring Data 提供了 RedisMessageListenerContainer,它可以执行所有繁重的工作。如果你熟悉 EJB 和 JMS,你应该会觉得这些概念很熟悉,因为它设计得尽可能接近 Spring 框架中的支持及其消息驱动的 POJO(MDP)。

RedisMessageListenerContainer 充当消息侦听器容器。它用于从 Redis 通道接收消息,并驱动注入其中的 MessageListener 实例。侦听器容器负责消息接收的所有线程化,并分派到侦听器中进行处理。消息侦听器容器是在 MDP 和消息传递提供程序之间的中介,并负责注册以接收消息、资源获取和释放、异常转换等。这使你可以作为应用程序开发人员编写与接收消息(并对其做出反应)相关的(可能很复杂的)业务逻辑,并将样板 Redis 基础设施问题委托给框架。

MessageListener 还可以实现 SubscriptionListener 以在订阅/取消订阅确认后接收通知。同步调用时,监听订阅通知可能很有用。

此外,为了最大程度地减少应用程序占用空间,RedisMessageListenerContainer 允许一个连接和一个线程被多个侦听器共享,即使它们不共享订阅。因此,无论应用程序跟踪多少侦听器或通道,运行时开销在其整个生命周期内都保持不变。此外,容器允许运行时配置更改,以便你可以在应用程序运行时添加或删除侦听器,而无需重新启动。此外,容器使用延迟订阅方法,仅在需要时使用 RedisConnection。如果取消订阅所有侦听器,将自动执行清理,并释放线程。

为了帮助处理消息的异步特性,容器需要一个 java.util.concurrent.Executor(或 Spring 的 TaskExecutor)来调度消息。根据负载、侦听器数量或运行时环境,你应该更改或调整执行器以更好地满足你的需求。特别是,在受管环境(例如应用程序服务器)中,强烈建议选择一个合适的 TaskExecutor 以利用其运行时。

The MessageListenerAdapter

MessageListenerAdapter 类是 Spring 中异步消息传递支持的最终组件。简而言之,它允许你将几乎 任何 类公开为 MDP(尽管存在一些约束)。

考虑以下接口定义:

public interface MessageDelegate {
  void handleMessage(String message);
  void handleMessage(Map message);
  void handleMessage(byte[] message);
  void handleMessage(Serializable message);
  // pass the channel/pattern as well
  void handleMessage(Serializable message, String channel);
 }

请注意,尽管该接口没有扩展 MessageListener 接口,但它仍然可以通过使用 MessageListenerAdapter 类用作 MDP。而且请注意,根据它们可以接收和处理的各种 Message 类型的 内容,各种消息处理方法是强类型化的。此外,邮件发送到的频道或模式可以作为第二个 String 类型参数传递给方法:

public class DefaultMessageDelegate implements MessageDelegate {
  // implementation elided for clarity...
}

请注意,MessageDelegate 接口(上面的 DefaultMessageDelegate 类)的上述实现*没有*任何 Redis 依赖项。它确实是一个 POJO,我们使用以下配置使其成为 MDP:

  • Java

  • XML

@Configuration
class MyConfig {

  // …

  @Bean
  DefaultMessageDelegate listener() {
    return new DefaultMessageDelegate();
  }

  @Bean
  MessageListenerAdapter messageListenerAdapter(DefaultMessageDelegate listener) {
    return new MessageListenerAdapter(listener, "handleMessage");
  }

  @Bean
  RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageListenerAdapter listener) {

    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.addMessageListener(listener, ChannelTopic.of("chatroom"));
    return container;
  }
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:redis="http://www.springframework.org/schema/redis"
   xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/redis https://www.springframework.org/schema/redis/spring-redis.xsd">

<!-- the default ConnectionFactory -->
<redis:listener-container>
  <!-- the method attribute can be skipped as the default method name is "handleMessage" -->
  <redis:listener ref="listener" method="handleMessage" topic="chatroom" />
</redis:listener-container>

<bean id="listener" class="redisexample.DefaultMessageDelegate"/>
 ...
</beans>

侦听器主题可以是频道(例如,“ topic="chatroom"”)或模式(例如,“ topic="*room"”)。

前面的示例使用 Redis 命名空间来声明消息侦听器容器并自动将 POJO 注册为侦听器。完整的功能 bean 定义如下:

<bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
  <constructor-arg>
    <bean class="redisexample.DefaultMessageDelegate"/>
  </constructor-arg>
</bean>

<bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
  <property name="connectionFactory" ref="connectionFactory"/>
  <property name="messageListeners">
    <map>
      <entry key-ref="messageListener">
        <bean class="org.springframework.data.redis.listener.ChannelTopic">
          <constructor-arg value="chatroom"/>
        </bean>
      </entry>
    </map>
  </property>
</bean>

每次收到消息时,适配器都会自动且透明地执行低级格式和所需对象类型之间转换(使用配置的 RedisSerializer)。方法调用引起的任何异常都将被容器捕获和处理(默认情况下,异常将记录下来)。

Reactive Message Listener Container

Spring Data 提供 ReactiveRedisMessageListenerContainer,它会代表用户完成所有转换和订阅状态管理的繁重工作。

消息监听容器本身不需要外部线程资源。它使用驱动程序线程发布消息。

ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);

Flux<ChannelMessage<String, String>> stream = container.receive(ChannelTopic.of("my-channel"));

为等待并确保正确的订阅,可以使用返回 Mono<Flux<ChannelMessage>>receiveLater 方法。生成的 Mono 在完成对给定主题的订阅后,使用内部发布器完成。通过拦截 onNext 信号,可以同步服务器端订阅。

ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);

Mono<Flux<ChannelMessage<String, String>>> stream = container.receiveLater(ChannelTopic.of("my-channel"));

stream.doOnNext(inner -> // notification hook when Redis subscriptions are synchronized with the server)
    .flatMapMany(Function.identity())
    .…;

Subscribing via template API

如上所述,可以直接使用 ReactiveRedisTemplate 订阅频道/模式。此方法提供了一个直接且有限的解决方案,因为这样不能在初始订阅后添加订阅。然而,仍然可以使用 take(Duration) 等方式通过返回的 Flux 来控制消息流。当完成读取、出现错误或取消时,所有绑定资源都会再次释放。

redisTemplate.listenToChannel("channel1", "channel2").doOnNext(msg -> {
    // message processing ...
}).subscribe();