Pub/Sub Messaging

Spring Data 为 Redis 提供专门的消息集成,其功能与 Spring 框架中 JMS 集成的功能和命名类似。

Spring Data provides dedicated messaging integration for Redis, similar in functionality and naming to the JMS integration in Spring Framework.

Redis 消息传递大致可以分为两个功能区域:

Redis messaging can be roughly divided into two areas of functionality:

  • Publication or production of messages

  • Subscription or consumption of messages

这是一个通常称为发布/订阅(简称 Pub/Sub)的模式的示例。RedisTemplate 类用于消息生成。对于类似于 Java EE 的消息驱动的 bean 样式的异步接收,Spring Data 提供了一个专门的消息侦听器容器,用于创建消息驱动的 POJO(MDP),对于同步接收,提供 RedisConnection 合约。

This is an example of the pattern often called Publish/Subscribe (Pub/Sub for short). The RedisTemplate class is used for message production. For asynchronous reception similar to Java EE’s message-driven bean style, Spring Data provides a dedicated message listener container that is used to create Message-Driven POJOs (MDPs) and, for synchronous reception, the RedisConnection contract.

org.springframework.data.redis.connectionorg.springframework.data.redis.listener 包为 Redis 消息传递提供核心功能。

The org.springframework.data.redis.connection and org.springframework.data.redis.listener packages provide the core functionality for Redis messaging.

Publishing (Sending Messages)

要发布消息,您可以像使用其他操作一样使用底层的 [Reactive]RedisConnection 或高级的 [Reactive]RedisOperations。这两个实体都提供 publish 方法,该方法接受消息和目标频道作为参数。虽然 RedisConnection 需要原始数据(字节数组),但 [Reactive]RedisOperations 允许将任意对象作为消息传递,如下面的示例所示:

To publish a message, you can use, as with the other operations, either the low-level [Reactive]RedisConnection or the high-level [Reactive]RedisOperations. Both entities offer the publish method, which accepts the message and the destination channel as arguments. While RedisConnection requires raw data (array of bytes), the [Reactive]RedisOperations lets arbitrary objects be passed in as messages, as shown in the following example:

  • Imperative

  • Reactive

// send message through connection
RedisConnection con = …
byte[] msg = …
byte[] channel = …
con.pubSubCommands().publish(msg, channel);

// send message through RedisOperations
RedisOperations operations = …
Long numberOfClients = operations.convertAndSend("hello!", "world");
// send message through connection
ReactiveRedisConnection con = …
ByteBuffer[] msg = …
ByteBuffer[] channel = …
con.pubSubCommands().publish(msg, channel);

// send message through ReactiveRedisOperations
ReactiveRedisOperations operations = …
Mono<Long> numberOfClients = operations.convertAndSend("hello!", "world");

Subscribing (Receiving Messages)

在接收端,可以通过直接命名通道或使用模式匹配订阅一个或多个通道。后一种方法非常有用,因为它不仅可以通过一个命令创建多个订阅,还可以监听在订阅时尚未创建的通道(只要它们与模式匹配)。

On the receiving side, one can subscribe to one or multiple channels either by naming them directly or by using pattern matching. The latter approach is quite useful, as it not only lets multiple subscriptions be created with one command but can also listen on channels not yet created at subscription time (as long as they match the pattern).

在低级别中,RedisConnection 提供 subscribepSubscribe 方法,用于分别映射订阅按通道或按模式的 Redis 命令。请注意,可以使用多个通道或模式作为参数。要更改连接的订阅或查询它是否正在监听,RedisConnection 提供 getSubscriptionisSubscribed 方法。

At the low-level, RedisConnection offers the subscribe and pSubscribe methods that map the Redis commands for subscribing by channel or by pattern, respectively. Note that multiple channels or patterns can be used as arguments. To change the subscription of a connection or query whether it is listening, RedisConnection provides the getSubscription and isSubscribed methods.

Spring Data Redis 中的订阅命令属于阻塞命令。也就是说,在连接上调用 subscribe 时,当前线程会被阻塞,因为它会开始等待消息。只有在订阅被取消时,该线程才会释放,而订阅取消发生在另一个线程在“ same”连接上调用“ unsubscribe”或“ pUnsubscribe”时。有关解决此问题的方案,请参阅(本文档的后续内容)“Message Listener Containers”。

Subscription commands in Spring Data Redis are blocking. That is, calling subscribe on a connection causes the current thread to block as it starts waiting for messages. The thread is released only if the subscription is canceled, which happens when another thread invokes unsubscribe or pUnsubscribe on the same connection. See “Message Listener Containers” (later in this document) for a solution to this problem.

如前所述,一旦订阅,连接便开始等待消息。只允许添加新订阅、修改现有订阅和取消现有订阅的命令。调用 subscribepSubscribeunsubscribepUnsubscribe 以外的任何内容都会引发异常。

As mentioned earlier, once subscribed, a connection starts waiting for messages. Only commands that add new subscriptions, modify existing subscriptions, and cancel existing subscriptions are allowed. Invoking anything other than subscribe, pSubscribe, unsubscribe, or pUnsubscribe throws an exception.

为了订阅消息,需要实现 MessageListener 回调。每次新消息到达时,都会调用回调,并且用户代码由 onMessage 方法运行。该接口不仅可以访问实际消息,还可以访问通过该消息接收到的通道和订阅用来匹配通道的模式(如果有)。此信息允许被调用者不仅通过内容区分各种消息,还可以检查其他详细信息。

In order to subscribe to messages, one needs to implement the MessageListener callback. Each time a new message arrives, the callback gets invoked and the user code gets run by the onMessage method. The interface gives access not only to the actual message but also to the channel it has been received through and the pattern (if any) used by the subscription to match the channel. This information lets the callee differentiate between various messages not just by content but also examining additional details.

Message Listener Containers

由于其阻塞性,低级订阅并不具有吸引力,因为它需要为每个侦听器连接和线程管理。为了缓解此问题,Spring Data 提供了 RedisMessageListenerContainer,它可以执行所有繁重的工作。如果你熟悉 EJB 和 JMS,你应该会觉得这些概念很熟悉,因为它设计得尽可能接近 Spring 框架中的支持及其消息驱动的 POJO(MDP)。

Due to its blocking nature, low-level subscription is not attractive, as it requires connection and thread management for every single listener. To alleviate this problem, Spring Data offers RedisMessageListenerContainer, which does all the heavy lifting. If you are familiar with EJB and JMS, you should find the concepts familiar, as it is designed to be as close as possible to the support in Spring Framework and its message-driven POJOs (MDPs).

RedisMessageListenerContainer 充当消息侦听器容器。它用于从 Redis 通道接收消息,并驱动注入其中的 MessageListener 实例。侦听器容器负责消息接收的所有线程化,并分派到侦听器中进行处理。消息侦听器容器是在 MDP 和消息传递提供程序之间的中介,并负责注册以接收消息、资源获取和释放、异常转换等。这使你可以作为应用程序开发人员编写与接收消息(并对其做出反应)相关的(可能很复杂的)业务逻辑,并将样板 Redis 基础设施问题委托给框架。

RedisMessageListenerContainer acts as a message listener container. It is used to receive messages from a Redis channel and drive the MessageListener instances that are injected into it. The listener container is responsible for all threading of message reception and dispatches into the listener for processing. A message listener container is the intermediary between an MDP and a messaging provider and takes care of registering to receive messages, resource acquisition and release, exception conversion, and the like. This lets you as an application developer write the (possibly complex) business logic associated with receiving a message (and reacting to it) and delegates boilerplate Redis infrastructure concerns to the framework.

MessageListener 还可以实现 SubscriptionListener 以在订阅/取消订阅确认后接收通知。同步调用时,监听订阅通知可能很有用。

A MessageListener can additionally implement SubscriptionListener to receive notifications upon subscription/unsubscribe confirmation. Listening to subscription notifications can be useful when synchronizing invocations.

此外,为了最大程度地减少应用程序占用空间,RedisMessageListenerContainer 允许一个连接和一个线程被多个侦听器共享,即使它们不共享订阅。因此,无论应用程序跟踪多少侦听器或通道,运行时开销在其整个生命周期内都保持不变。此外,容器允许运行时配置更改,以便你可以在应用程序运行时添加或删除侦听器,而无需重新启动。此外,容器使用延迟订阅方法,仅在需要时使用 RedisConnection。如果取消订阅所有侦听器,将自动执行清理,并释放线程。

Furthermore, to minimize the application footprint, RedisMessageListenerContainer lets one connection and one thread be shared by multiple listeners even though they do not share a subscription. Thus, no matter how many listeners or channels an application tracks, the runtime cost remains the same throughout its lifetime. Moreover, the container allows runtime configuration changes so that you can add or remove listeners while an application is running without the need for a restart. Additionally, the container uses a lazy subscription approach, using a RedisConnection only when needed. If all the listeners are unsubscribed, cleanup is automatically performed, and the thread is released.

为了帮助处理消息的异步特性,容器需要一个 java.util.concurrent.Executor(或 Spring 的 TaskExecutor)来调度消息。根据负载、侦听器数量或运行时环境,你应该更改或调整执行器以更好地满足你的需求。特别是,在受管环境(例如应用程序服务器)中,强烈建议选择一个合适的 TaskExecutor 以利用其运行时。

To help with the asynchronous nature of messages, the container requires a java.util.concurrent.Executor (or Spring’s TaskExecutor) for dispatching the messages. Depending on the load, the number of listeners, or the runtime environment, you should change or tweak the executor to better serve your needs. In particular, in managed environments (such as app servers), it is highly recommended to pick a proper TaskExecutor to take advantage of its runtime.

The MessageListenerAdapter

MessageListenerAdapter 类是 Spring 中异步消息传递支持的最终组件。简而言之,它允许你将几乎 任何 类公开为 MDP(尽管存在一些约束)。

The MessageListenerAdapter class is the final component in Spring’s asynchronous messaging support. In a nutshell, it lets you expose almost any class as a MDP (though there are some constraints).

考虑以下接口定义:

Consider the following interface definition:

public interface MessageDelegate {
  void handleMessage(String message);
  void handleMessage(Map message);
  void handleMessage(byte[] message);
  void handleMessage(Serializable message);
  // pass the channel/pattern as well
  void handleMessage(Serializable message, String channel);
 }

请注意,尽管该接口没有扩展 MessageListener 接口,但它仍然可以通过使用 MessageListenerAdapter 类用作 MDP。而且请注意,根据它们可以接收和处理的各种 Message 类型的 内容,各种消息处理方法是强类型化的。此外,邮件发送到的频道或模式可以作为第二个 String 类型参数传递给方法:

Notice that, although the interface does not extend the MessageListener interface, it can still be used as a MDP by using the MessageListenerAdapter class. Notice also how the various message handling methods are strongly typed according to the contents of the various Message types that they can receive and handle. In addition, the channel or pattern to which a message is sent can be passed in to the method as the second argument of type String:

public class DefaultMessageDelegate implements MessageDelegate {
  // implementation elided for clarity...
}

请注意,MessageDelegate 接口(上面的 DefaultMessageDelegate 类)的上述实现*没有*任何 Redis 依赖项。它确实是一个 POJO,我们使用以下配置使其成为 MDP:

Notice how the above implementation of the MessageDelegate interface (the above DefaultMessageDelegate class) has no Redis dependencies at all. It truly is a POJO that we make into an MDP with the following configuration:

  • Java

  • XML

@Configuration
class MyConfig {

  // …

  @Bean
  DefaultMessageDelegate listener() {
    return new DefaultMessageDelegate();
  }

  @Bean
  MessageListenerAdapter messageListenerAdapter(DefaultMessageDelegate listener) {
    return new MessageListenerAdapter(listener, "handleMessage");
  }

  @Bean
  RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageListenerAdapter listener) {

    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.addMessageListener(listener, ChannelTopic.of("chatroom"));
    return container;
  }
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:redis="http://www.springframework.org/schema/redis"
   xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/redis https://www.springframework.org/schema/redis/spring-redis.xsd">

<!-- the default ConnectionFactory -->
<redis:listener-container>
  <!-- the method attribute can be skipped as the default method name is "handleMessage" -->
  <redis:listener ref="listener" method="handleMessage" topic="chatroom" />
</redis:listener-container>

<bean id="listener" class="redisexample.DefaultMessageDelegate"/>
 ...
</beans>

侦听器主题可以是频道(例如,“ topic="chatroom"”)或模式(例如,“ topic="*room"”)。

The listener topic can be either a channel (for example, topic="chatroom") or a pattern (for example, topic="*room")

前面的示例使用 Redis 命名空间来声明消息侦听器容器并自动将 POJO 注册为侦听器。完整的功能 bean 定义如下:

The preceding example uses the Redis namespace to declare the message listener container and automatically register the POJOs as listeners. The full-blown beans definition follows:

<bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
  <constructor-arg>
    <bean class="redisexample.DefaultMessageDelegate"/>
  </constructor-arg>
</bean>

<bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
  <property name="connectionFactory" ref="connectionFactory"/>
  <property name="messageListeners">
    <map>
      <entry key-ref="messageListener">
        <bean class="org.springframework.data.redis.listener.ChannelTopic">
          <constructor-arg value="chatroom"/>
        </bean>
      </entry>
    </map>
  </property>
</bean>

每次收到消息时,适配器都会自动且透明地执行低级格式和所需对象类型之间转换(使用配置的 RedisSerializer)。方法调用引起的任何异常都将被容器捕获和处理(默认情况下,异常将记录下来)。

Each time a message is received, the adapter automatically and transparently performs translation (using the configured RedisSerializer) between the low-level format and the required object type. Any exception caused by the method invocation is caught and handled by the container (by default, exceptions get logged).

Reactive Message Listener Container

Spring Data 提供 ReactiveRedisMessageListenerContainer,它会代表用户完成所有转换和订阅状态管理的繁重工作。

Spring Data offers ReactiveRedisMessageListenerContainer which does all the heavy lifting of conversion and subscription state management on behalf of the user.

消息监听容器本身不需要外部线程资源。它使用驱动程序线程发布消息。

The message listener container itself does not require external threading resources. It uses the driver threads to publish messages.

ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);

Flux<ChannelMessage<String, String>> stream = container.receive(ChannelTopic.of("my-channel"));

为等待并确保正确的订阅,可以使用返回 Mono<Flux<ChannelMessage>>receiveLater 方法。生成的 Mono 在完成对给定主题的订阅后,使用内部发布器完成。通过拦截 onNext 信号,可以同步服务器端订阅。

To await and ensure proper subscription, you can use the receiveLater method that returns a Mono<Flux<ChannelMessage>>. The resulting Mono completes with an inner publisher as a result of completing the subscription to the given topics. By intercepting onNext signals, you can synchronize server-side subscriptions.

ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);

Mono<Flux<ChannelMessage<String, String>>> stream = container.receiveLater(ChannelTopic.of("my-channel"));

stream.doOnNext(inner -> // notification hook when Redis subscriptions are synchronized with the server)
    .flatMapMany(Function.identity())
    .…;

Subscribing via template API

如上所述,可以直接使用 ReactiveRedisTemplate 订阅频道/模式。此方法提供了一个直接且有限的解决方案,因为这样不能在初始订阅后添加订阅。然而,仍然可以使用 take(Duration) 等方式通过返回的 Flux 来控制消息流。当完成读取、出现错误或取消时,所有绑定资源都会再次释放。

As mentioned above you can directly use ReactiveRedisTemplate to subscribe to channels / patterns. This approach offers a straight forward, though limited solution as you lose the option to add subscriptions after the initial ones. Nevertheless you still can control the message stream via the returned Flux using eg. take(Duration). When done reading, on error or cancellation all bound resources are freed again.

redisTemplate.listenToChannel("channel1", "channel2").doOnNext(msg -> {
    // message processing ...
}).subscribe();