Redis Support

Spring Integration 2.1 引入了对 Redis的支持:“an open source advanced key-value store”。此支持以 Redis 为基础的 MessageStore`以及 Redis 通过其 `PUBLISH, SUBSCRIBE, and UNSUBSCRIBE命令支持的发布/订阅消息传递适配器的形式提供。 你需要将此依赖项包含在你的项目中:

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-redis</artifactId>
    <version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-redis:{project-version}"

你还需要包括 Redis 客户端依赖项,例如 Lettuce。 如需下载、安装和运行 Redis,请参见 Redis documentation

Connecting to Redis

要开始与Redis交互,首先需要连接到它。Spring Integration使用了由另一个Spring项目 Spring Data Redis提供的支持,该项目提供了典型的Spring构造:ConnectionFactory`和`Template。这些抽象简化了与多个Redis客户端Java API的集成。目前,Spring Data Redis支持 JedisLettuce

Using RedisConnectionFactory

若要连接到 Redis,您可以使用 RedisConnectionFactory 接口的一种实现。以下清单显示了接口定义:

public interface RedisConnectionFactory extends PersistenceExceptionTranslator {

    /**
     * Provides a suitable connection for interacting with Redis.
     * @return connection for interacting with Redis.
     */
    RedisConnection getConnection();
}

以下示例演示如何在 Java 中创建 LettuceConnectionFactory

LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();

以下示例演示如何在 Spring 的 XML 配置中创建 LettuceConnectionFactory

<bean id="redisConnectionFactory"
    class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
    <property name="port" value="7379" />
</bean>

RedisConnectionFactory 的实现提供一组属性,例如端口和主机,可以在需要时设置这些属性。一旦有了 RedisConnectionFactory 实例,就可以创建一个 RedisTemplate 实例并用 RedisConnectionFactory 注入它。

Using RedisTemplate

与Spring中的其他模板类(例如`JdbcTemplate`和`JmsTemplate`)一样,RedisTemplate`是一个助手类,简化了Redis数据访问代码。更多有关`RedisTemplate`及其变体(例如`StringRedisTemplate)的信息,请参见 Spring Data Redis documentation

以下示例演示如何在 Java 中创建 RedisTemplate 实例:

RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);

以下示例演示如何在 Spring 的 XML 配置中创建 RedisTemplate 实例:

<bean id="redisTemplate"
         class="org.springframework.data.redis.core.RedisTemplate">
    <property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>

Messaging with Redis

如“@34”中所述,Redis 通过其“@31”、“@32”和“@33”命令为发布-订阅消息传递提供支持。与 JMS 和 AMQP 一样,Spring Integration 提供消息通道和适配器,用于通过 Redis 发送和接收消息。

Redis Publish/Subscribe channel

与 JMS 类似,在某些情况下,生产者和消费者都应成为同一应用程序的一部分,在同一进程中运行。您可以通过使用一对入站和出站通道适配器来实现此目的。但是,与 Spring Integration 的 JMS 支持一样,有一种更简单的方法来解决此用例。您可以创建一个发布-订阅通道,如下例所示:

<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>

publish-subscribe-channel 的行为与 main Spring Integration 命名空间中的普通 <publish-subscribe-channel/> 元素非常相似。任何端点的 input-channeloutput-channel 属性都可以引用它。不同之处在于此通道由 Redis 主题名称支持: String 值由 topic-name 属性指定。但是,与 JMS 不同,不必预先创建此主题或由 Redis 自动创建它。在 Redis 中,主题是作为地址发挥作用的简单 String 值。生产者和消费者可以使用相同的 String 值作为其主题名称进行通信。简单订阅此通道意味着在生产端点和消费端点之间可以进行异步发布-订阅消息传递。但是,与通过在简单 Spring Integration <channel/> 元素内添加 <queue/> 元素创建的异步消息通道不同,这些消息不会存储在内存队列中。相反,这些消息通过 Redis 传递,这使您可以依赖它对持久性和集群的支持以及它与其他非 Java 平台的互操作性。

Redis Inbound Channel Adapter

Redis 入站通道适配器 (RedisInboundChannelAdapter) 以与其他入站适配器相同的方式将传入的 Redis 消息调整为 Spring 消息。它接收特定于平台的消息(在这种情况下为 Redis),并使用 MessageConverter 策略将它们转换为 Spring 消息。以下示例演示如何配置 Redis 入站通道适配器:

<int-redis:inbound-channel-adapter id="redisAdapter"
       topics="thing1, thing2"
       channel="receiveChannel"
       error-channel="testErrorChannel"
       message-converter="testConverter" />

<bean id="redisConnectionFactory"
    class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
    <property name="port" value="7379" />
</bean>

<bean id="testConverter" class="things.something.SampleMessageConverter" />

前面的示例显示了 Redis 入站通道适配器的简单但完整的配置。请注意,前面的配置依赖于自动发现某些 bean 的熟悉的 Spring 范例。在这种情况下,redisConnectionFactory 会隐式注入到适配器中。您可以改为使用 connection-factory 属性显式指定它。

此外,还要注意,前面的配置会用自定义 MessageConverter 注入适配器。该方法类似于 JMS,在其中 MessageConverter 实例用于在 Redis 消息和 Spring Integration 消息有效负载之间进行转换。默认值为 SimpleMessageConverter

入站适配器可以订阅多个主题名称,因此 topics 属性中会出现逗号分隔的值集。

自3.0版本开始,入站适配器现在除了现有的`topics`属性之外,还具有`topic-patterns`属性。此属性包含一个以逗号分隔的Redis主题模式集。更多有关Redis发布-订阅的信息,请参见 Redis Pub/Sub

入站适配器可以使用 RedisSerializer`反序列化 Redis 消息的主体。<int-redis:inbound-channel-adapter>`的 `serializer`属性可以设置为一个空字符串,这会生成 `RedisSerializer`属性的 `null`值。在这种情况下,Redis 消息的原始 `byte[]`主体将作为消息有效负载提供。

从 5.0 版开始,您可以通过使用 <int-redis:inbound-channel-adapter>task-executor 属性向入站适配器提供 Executor 实例。此外,接收的 Spring Integration 消息现在具有 RedisHeaders.MESSAGE_SOURCE 头,以指示已发布消息的来源:主题或模式。您可以将其用于下游路由逻辑。

Redis Outbound Channel Adapter

Redis 出站通道适配器以与其他出站适配器相同的方式将传出的 Spring Integration 消息调整为 Redis 消息。它接收 Spring Integration 消息并使用 MessageConverter 策略将它们转换为特定于平台的消息(在这种情况下为 Redis)。以下示例演示如何配置 Redis 出站通道适配器:

<int-redis:outbound-channel-adapter id="outboundAdapter"
    channel="sendChannel"
    topic="thing1"
    message-converter="testConverter"/>

<bean id="redisConnectionFactory"
    class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
    <property name="port" value="7379"/>
</bean>

<bean id="testConverter" class="things.something.SampleMessageConverter" />

此配置与 Redis 入站通道适配器类似。适配器隐式注入 RedisConnectionFactory,其通过 redisConnectionFactory 作为 Bean 名称进行定义。此示例还包括可选的(和自定义的)MessageConvertertestConverter Bean)。

自 Spring Integration 3.0 起,<int-redis:outbound-channel-adapter> 针对 topic 属性提供了一种替代方法:您可以在运行时使用 topic-expression 属性来确定消息的 Redis 主题。这些属性是互斥的。

Redis Queue Inbound Channel Adapter

Spring Integration 3.0 引入了一个队列入站通道适配器来从 Redis 列表中“弹出”消息。默认情况下,它使用“右侧弹出”,但您可以将其配置为使用“左侧弹出”。适配器是消息驱动的。它使用一个内部侦听器线程,而不使用轮询器。

以下清单显示了 queue-inbound-channel-adapter 的所有可用属性:

<int-redis:queue-inbound-channel-adapter id=""  1
                    channel=""  2
                    auto-startup=""  3
                    phase=""  4
                    connection-factory=""  5
                    queue=""  6
                    error-channel=""  7
                    serializer=""  8
                    receive-timeout=""  9
                    recovery-interval=""  10
                    expect-message=""  11
                    task-executor=""  12
                    right-pop=""/>  13
1 组件 bean 名称。如果您未提供 channel 属性,则使用此 id 属性在应用程序上下文中创建并注册 DirectChannel,将其作为 bean 名称。在这种情况下,端点本身以 bean 名称 id.adapter 注册。(如果 bean 名称是 thing1,则端点注册为 thing1.adapter。)
2 MessageChannel,用于从此端点发送 Message 实例。
3 一个 SmartLifecycle 属性,用于指定此端点是否应在应用程序上下文启动后自动启动。其默认为 true
4 一个 SmartLifecycle 属性,用于指定此端点启动的阶段。其默认为 0
5 RedisConnectionFactory bean 的引用。其默认为 redisConnectionFactory
6 在基于队列的“pop”操作执行的 Redis 列表名称,用于获取 Redis 消息。
7 MessageChannel,在侦听端点任务中收到异常时,向其发送 ErrorMessage`实例。默认情况下,底层的 `MessagePublishingErrorHandler`使用应用程序上下文的默认 `errorChannel
8 RedisSerializer`bean 引用。它可以是空字符串,表示“无序列化程序”。在这种情况下,将来自入站 Redis 消息的原始 `byte[]`作为 `Message`有效负载发送给 `channel。默认情况下,它是一个 JdkSerializationRedisSerializer
9 “pop”操作等待队列中 Redis 消息的超时时间(以毫秒为单位)。默认值为 1 秒。
10 侦听器任务在“pop”操作出现异常后应休眠的时间(以毫秒为单位),然后再重新启动侦听器任务。
11 指定此端点是否期望 Redis 队列中的数据包含完整的 Message`实例。如果将此属性设置为 `true,则 serializer`不能为一个空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。其默认值为 `false
12 Spring TaskExecutor(或标准 JDK 1.5+ Executor)bean 的引用。它用于底层的侦听任务。默认为 SimpleAsyncTaskExecutor
13 指定此端点是否应该使用 “right pop”(当 true)或 “left pop”(当 false)从 Redis 列表中读取消息。如果 true,则在与默认 Redis 队列出站通道适配器一起使用时,Redis 列表将充当一个 FIFO`队列。将其设置为 `false`以与使用 "`right push`"写入列表的软件一起使用,或实现类似堆栈的消息顺序。自 4.3 版以来,它的默认值为 `true

必须使用多个线程配置 task-executor 来进行处理;否则,当 RedisQueueMessageDrivenEndpoint 尝试在出现错误后重新启动侦听器任务时,可能会产生死锁。errorChannel 可用于处理这些错误,以避免重新启动,但最好不要将您的应用程序暴露于可能的死锁情况。请参阅 Spring Framework Reference Manual 以了解可能的 TaskExecutor 实现。

Redis Queue Outbound Channel Adapter

Spring Integration 3.0 引入了一个队列出站通道适配器来从 Spring Integration 消息中“推送”到 Redis 列表。默认情况下,它使用“左侧推送”,但您可以将其配置为使用“右侧推送”。以下清单显示了 Redis queue-outbound-channel-adapter 的所有可用属性:

<int-redis:queue-outbound-channel-adapter id=""  1
                    channel=""  2
                    connection-factory=""  3
                    queue=""  4
                    queue-expression=""  5
                    serializer=""  6
                    extract-payload=""  7
                    left-push=""/>  8
1 组件 bean 名称。如果您没有提供 channel`属性,则会使用此 `id`属性作为 bean 名称,在应用程序上下文中创建和注册一个 `DirectChannel。在这种情况下,端点将注册,bean 名称为 id`加上 `.adapter。(如果 bean 名称是 thing1,则会将端点注册为 thing1.adapter。)
2 此端点从中接收 Message`实例的 `MessageChannel
3 RedisConnectionFactory bean 的引用。其默认为 redisConnectionFactory
4 基于队列的“push”操作执行的 Redis 列表名称,用于发送 Redis 消息。此属性与 `queue-expression`互斥。
5 SpEL Expression,用于确定 Redis 列表名称。它会在运行时使用传入的 `Message`作为 `#root`变量。此属性与 `queue`互斥。
6 RedisSerializer`bean 引用。默认为 `JdkSerializationRedisSerializer。但是,对于 String`有效负载,如果未提供 `serializer`引用,则会使用 `StringRedisSerializer
7 指定此端点是否应该只向 Redis 队列发送有效负载或整个 Message。默认为 true
8 指定此端点是否应该使用 “left push”(当 true)或 “right push”(当 false)向 Redis 列表写入消息。如果 true,则在与默认 Redis 队列入站通道适配器一起使用时,Redis 列表将充当一个 FIFO`队列。将其设置为 `false`以与使用 "`left pop`"从列表中读取消息的软件一起使用,或实现类似堆栈的消息顺序。自 4.3 版以来,它的默认值为 `true

Redis Application Events

自 Spring Integration 3.0 起,Redis 模块提供了 IntegrationEvent 的一个实现,后者又是一个 org.springframework.context.ApplicationEventRedisExceptionEvent 封装了 Redis 操作中的异常(其中端点是事件的“来源”)。例如,<int-redis:queue-inbound-channel-adapter/> 在从 BoundListOperations.rightPop 操作捕获异常后发出这些事件。异常可能是任何泛 org.springframework.data.redis.RedisSystemExceptionorg.springframework.data.redis.RedisConnectionFailureException。使用 <int-event:inbound-channel-adapter/> 处理这些事件对于确定后台 Redis 任务的问题和采取管理操作很有用。

Redis Message Store

如_Enterprise Integration Patterns_(EIP)书中所述, message store可让你持久化消息。在处理能够缓冲消息(聚合器、重新排序器和其它)的组件时,当可靠性是一个问题时,这一点可能很有用。在Spring Integration中,`MessageStore`策略还为 claim check模式提供了基础,EIP中也介绍了此模式。

Spring Integration 的 Redis 模块提供了 RedisMessageStore。以下示例说明如何将其与聚合器一起使用:

<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
    <constructor-arg ref="redisConnectionFactory"/>
</bean>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
         message-store="redisMessageStore"/>

前面的示例是一个 Bean 配置,它期望一个 RedisConnectionFactory 作为构造函数参数。

默认情况下,RedisMessageStore 使用 Java 序列化来序列化消息。但是,如果您想要使用其他序列化技术(例如 JSON),您可以通过设置 RedisMessageStorevalueSerializer 属性来提供自己的序列化程序。

从 4.3.10 版本开始,该框架为 Message 实例和 MessageHeaders 实例提供了 Jackson 序列化程序和反序列化程序实现——分别为 MessageJacksonDeserializerMessageHeadersJacksonSerializer。它们必须针对 ObjectMapper 配置有 SimpleModule 选项。此外,您应该在 ObjectMapper 上设置 enableDefaultTyping 以添加每个序列化的复杂对象(如果您信任来源)的类型信息。此类型信息然后在反序列化过程中使用。该框架提供了一个名为 JacksonJsonUtils.messagingAwareMapper() 的实用方法,它已经提供了所有前面提到的属性和序列化程序。此实用方法带有 trustedPackages 参数,可限制反序列化的 Java 包以避免安全漏洞。默认可信包:java.utiljava.langorg.springframework.messaging.supportorg.springframework.integration.supportorg.springframework.integration.messageorg.springframework.integration.store。要管理 RedisMessageStore 中的 JSON 序列化,您必须以类似于以下示例的方式对其进行配置:

RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);

从 4.3.12 版本开始,RedisMessageStore 支持 prefix 选项,以允许区分同一 Redis 服务器上的存储实例。

Redis Channel Message Stores

RedisMessageStore`shown earlier将每个组作为单个键(组ID)下的值进行维护。虽然你可以使用它为`QueueChannel`提供持久性支持,但出于此目的已提供了专门的`RedisChannelMessageStore(自4.0版本开始)。此存储针对每个通道使用`LIST`,在发送消息时使用`LPUSH`,在接收消息时使用`RPOP`。默认情况下,此存储还使用JDK序列化,但你可以修改值序列化器,就像described earlier一样。

我们建议使用此支持通道的存储库,而不是使用通用 RedisMessageStore。以下示例定义了一个 Redis 消息存储库,并将其用于带有一个队列的通道:

<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
	<constructor-arg ref="redisConnectionFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="redisMessageStore"/>
<int:channel>

用于存储数据的键具有以下形式:<storeBeanName>:<channelId>(在前一个示例中,为 redisMessageStore:somePersistentQueueChannel)。

此外,还提供了 RedisChannelPriorityMessageStore 子类。当您将其与 QueueChannel 一起使用时,消息以(FIFO)优先级顺序收到。它使用标准 IntegrationMessageHeaderAccessor.PRIORITY 头文件,并支持优先级值(“0 - 9”)。具有其他优先级(和没有优先级)的消息在任何具有优先级的消息之后以 FIFO 顺序检索。

这些存储器只实现了 BasicMessageGroupStore,而没有实现 MessageGroupStore。它们只能用于作为 QueueChannel 的后备等情况。

Redis Metadata Store

Spring Integration 3.0引入了基于Redis的新 MetadataStore(请参见Metadata Store)实现。你可以使用`RedisMetadataStore`跨应用程序重启维护`MetadataStore`的状态。你可以将此新`MetadataStore`实现与以下适配器配合使用:

为了指示这些适配器使用新的 RedisMetadataStore,请声明一个名为 metadataStore 的 Spring Bean。Feed 入站通道适配器和 Feed 入站通道适配器都会自动获取并使用声明的 RedisMetadataStore。以下示例显示如何声明此类 Bean:

<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
    <constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>

RedisMetadataStore 由 link:https://docs.spring.io/spring-data/data-redis/docs/current/api/org/springframework/data/redis/support/collections/RedisProperties.html[RedisProperties 支持。与其交互使用 link:https://docs.spring.io/spring-data/data-redis/docs/current/api/org/springframework/data/redis/core/BoundHashOperations.html[BoundHashOperations,而这反过来又要求整个 Properties 存储使用 key。对于 MetadataStore,此 key 扮演了区域角色,当多个应用程序使用同一个 Redis 服务器时,这在分布式环境中很有用。默认情况下,此 key 的值为 MetaData

从版本 4.0 开始,此存储库实现了 ConcurrentMetadataStore,使其能够在多个应用程序实例之间可靠地共享,其中只有一个实例被允许存储或修改密钥值。

您不能在 Redis 集群中使用 RedisMetadataStore.replace()(例如在 AbstractPersistentAcceptOnceFileListFilter 中),因为目前不支持原子性的 WATCH 命令。

Redis Store Inbound Channel Adapter

Redis 存储库入站通道适配器是一个轮询消费器,它从 Redis 集合中读取数据,并将其作为 Message 有效负载发送。以下示例显示了如何配置 Redis 存储库入站通道适配器:

<int-redis:store-inbound-channel-adapter id="listAdapter"
    connection-factory="redisConnectionFactory"
    key="myCollection"
    channel="redisChannel"
    collection-type="LIST" >
    <int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>

上一个示例通过使用 `store-inbound-channel-adapter`元素,显示了如何配置 Redis 存储库入站通道适配器,为各个属性提供数值,例如:

  • key`或 `key-expression:正在使用的集合的键名称。

  • collection-type:此适配器支持的集合类型的枚举。支持的集合为 LISTSETZSETPROPERTIES`和 `MAP

  • connection-factory:对 `o.s.data.redis.connection.RedisConnectionFactory`实例的引用。

  • redis-template:对 `o.s.data.redis.core.RedisTemplate`实例的引用。

  • 其他所有其他入站适配器(如“channel”)共同存在的属性。

无法同时设置 redis-templateconnection-factory

默认情况下,适配器使用 StringRedisTemplate。这使用 StringRedisSerializer 实例作为密钥、值、哈希密钥和哈希值。如果你的 Redis 存储库包含使用其他技术序列化的对象,你必须提供配置了适当的序列化程序的 RedisTemplate。例如,如果存储库是使用具有 extract-payload-elements 设置为 false`的 Redis 存储库出站适配器进行写入的,你必须提供如下配置的 `RedisTemplate

<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
    <property name="connectionFactory" ref="redisConnectionFactory"/>
    <property name="keySerializer">
        <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
    </property>
    <property name="hashKeySerializer">
        <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
    </property>
</bean>

RedisTemplateString 序列化程序用于密钥和哈希密钥,将默认 JDK 序列化程序用于值和哈希值。

由于它具有 key 的文字值,上一个示例相对简单且静态。有时,你可能需要根据某些条件在运行时更改密钥值。要实现此目的,请改用 key-expression,提供的表达式可以是任何有效的 SpEL 表达式。

此外,你可能需要对从 Redis 集合中读取成功处理的数据执行一些后期处理。例如,你可能需要在处理后移动或移除值。你可以使用 Spring Integration 2.2 添加的事务同步功能来实现此目的。以下示例使用 key-expression 和事务同步:

<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
        connection-factory="redisConnectionFactory"
        key-expression="'presidents'"
        channel="otherRedisChannel"
        auto-startup="false"
        collection-type="ZSET">
            <int:poller fixed-rate="1000" max-messages-per-poll="2">
                <int:transactional synchronization-factory="syncFactory"/>
            </int:poller>
</int-redis:store-inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
	<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>

你可以通过使用 transactional 元素声明你的轮询器作为事务性的。此元素可以引用一个真正的交易管理器(例如,如果你的流程的某个其他部分调用 JDBC)。如果没有一个“真正的”事务,你能够使用一个 o.s.i.transaction.PseudoTransactionManager,这是 Spring 的 PlatformTransactionManager 的实现,且当没有实际事务时,能够使用 Redis 适配器的事务同步功能。

这不会使 Redis 活动本身具有事务性。它允许在成功(提交)之前或之后以及失败(回滚)之后执行操作的同步。

一旦你的轮询器是事务性的,你能够在 transactional 元素上设置 o.s.i.transaction.TransactionSynchronizationFactory 的实例。TransactionSynchronizationFactory 创建 TransactionSynchronization 的实例。为了你的方便,我们展示了一个基于 SpEL 的默认 TransactionSynchronizationFactory,它允许你配置 SpEL 表达式,其执行与事务协调(同步)。支持事前提交、事后提交和事后回滚的表达式,以及发送评估结果(如果存在)的通道(每个事件类型一个)。对于每个子元素,你能指定 expressionchannel 属性。如果仅存在 channel 属性,则接收到的消息将作为特定同步方案的一部分发送到那里。如果仅存在 expression 属性且表达式的结果为非空值,则生成一条消息,其中结果作为有效负载,并将其发送到默认通道 (NullChannel) 且显示在日志中(以 DEBUG 级别)。如果你想让评估结果转到特定的通道,请添加 channel 属性。如果表达式的结果为空或无效,则不生成消息。

RedisStoreMessageSource 添加一个 store 属性,其中有一个 RedisStore 实例绑定至事务 IntegrationResourceHolder,它能够从 TransactionSynchronizationProcessor 实现中进行访问。

有关事务同步的更多信息,请参阅 Transaction Synchronization

RedisStore Outbound Channel Adapter

Redis 存储库出站通道适配器允许你将消息有效负载写入 Redis 集合,如以下示例所示:

<int-redis:store-outbound-channel-adapter id="redisListAdapter"
          collection-type="LIST"
          channel="requestChannel"
          key="myCollection" />

上述配置了一个 Redis 存储库出站通道适配器,使用 store-inbound-channel-adapter 元素。它为各个属性提供数值,例如:

  • key`或 `key-expression:正在使用的集合的键名称。

  • extract-payload-elements:如果设置为 true(默认设置),并且有效负载是 "`multi-value`"对象的一个实例(即 Collection`或 `Map),它将使用 "`addAll`"和 "`putAll`"语义进行存储。否则,如果设置为 false,则将有效负载存储为单个条目,无论其类型如何。如果有效负载不是 "`multi-value`"对象的实例,则此属性的值将被忽略,有效负载始终存储为单个条目。

  • collection-type:此适配器支持的 Collection`类型的枚举。支持的集合为 `LISTSETZSETPROPERTIES`和 `MAP

  • map-key-expression:返回正在存储的条目的键名称的 SpEL 表达式。仅当 collection-type`为 `MAP`或 `PROPERTIES,并且“提取有效负载元素”为 false 时才有效。

  • connection-factory:对 `o.s.data.redis.connection.RedisConnectionFactory`实例的引用。

  • redis-template:对 `o.s.data.redis.core.RedisTemplate`实例的引用。

  • 其他所有其他入站适配器(如“channel”)共同存在的属性。

无法同时设置 redis-templateconnection-factory

默认情况下,适配器使用 StringRedisTemplate。这使用 StringRedisSerializer 实例作为键、值、哈希键和哈希值。但是,如果 extract-payload-elements 设置为 false,将使用具有 StringRedisSerializer 实例作为键和哈希键以及 JdkSerializationRedisSerializer 实例作为值和哈希值 RedisTemplate。使用 JDK 序列化程序时,重要的是要知道对所有值使用 Java 序列化,无论该值实际上是否为集合。如果你需要更多地控制值序列化,请考虑提供你自己的 RedisTemplate,而不依赖于这些默认值。

由于它具有 key 和其他属性的文字值,上一个示例相对简单且静态。有时,你可能需要在运行时根据某些条件动态更改值。要实现此目的,请使用它们的 -expression 等效项(key-expressionmap-key-expression 等),提供的表达式可以是任何有效的 SpEL 表达式。

Redis Outbound Command Gateway

Spring Integration 4.0 引入了 Redis 命令网关,允许你通过使用通用 RedisConnection#execute 方法执行任何标准 Redis 命令。下表显示了 Redis 出站网关的可用属性:

<int-redis:outbound-gateway
        request-channel=""  1
        reply-channel=""  2
        requires-reply=""  3
        reply-timeout=""  4
        connection-factory=""  5
        redis-template=""  6
        arguments-serializer=""  7
        command-expression=""  8
        argument-expressions=""  9
        use-command-variable=""  10
        arguments-strategy="" /> 11
1 此端点从中接收 Message`实例的 `MessageChannel
2 此端点向其发送回复 Message`实例的 `MessageChannel
3 在此出站网关必须返回一个非 null 值的指定条件。默认为 true。当 Redis 返回 null`值时,将抛出 `ReplyRequiredException
4 等待发送回复消息的超时时间(以毫秒为单位)。通常适用于基于队列的有限回复通道。
5 RedisConnectionFactory`bean 的引用。默认为 `redisConnectionFactory。它与“redis 模板”属性互斥。
6 一个 RedisTemplate bean 的引用。它与“连接工厂”属性互斥。
7 org.springframework.data.redis.serializer.RedisSerializer 实例的引用。如果需要,它用于将每个命令参数序列化为 byte[]。
8 返回命令键的 SpEL 表达式。它默认为 redis_command 消息头。它不能计算为 null
9 用作命令参数计算的逗号分隔的 SpEL 表达式。与 arguments-strategy 属性互斥。如果您不提供任何属性,则 payload 将用作命令参数。参数表达式可以计算为“null”以支持可变数量的参数。
10 一个 boolean 标志,指定已计算的 Redis 命令字符串是否在 o.s.i.redis.outbound.ExpressionArgumentsStrategy 中的 argument-expressions 配置时的表达式计算上下文中作为 #cmd 变量而提供。否则,将忽略此属性。
11 o.s.i.redis.outbound.ArgumentsStrategy 实例的引用。它与 argument-expressions 属性互斥。如果您不提供任何属性,则 payload 将用作命令参数。

你能使用 <int-redis:outbound-gateway> 作为通用组件来执行任何所需的 Redis 操作。以下示例显示了如何从 Redis 原子计数器获取递增值:

<int-redis:outbound-gateway request-channel="requestChannel"
    reply-channel="replyChannel"
    command-expression="'INCR'"/>

Message 有效负载应具有 redisCounter 的名称,此名称可能由 org.springframework.data.redis.support.atomic.RedisAtomicInteger bean 定义提供。

RedisConnection#execute`方法的返回类型具有通用的`Object。实际结果取决于命令类型。例如,MGET`返回`List<byte[]>。有关命令、其参数和结果类型,更多信息请参见 Redis Specification

Redis Queue Outbound Gateway

Spring Integration 引入了 Redis 队列出站网关来执行请求和答复场景。它将对话 UUID 推送到提供的 queue,将带有该 UUID 作为其密钥的值推送到 Redis 列表中,并等待来自具有 UUID.reply 密钥的 Redis 列表的答复。每个交互使用不同的 UUID。下表显示了 Redis 出站网关的可用属性:

<int-redis:queue-outbound-gateway
        request-channel=""  1
        reply-channel=""  2
        requires-reply=""  3
        reply-timeout=""  4
        connection-factory=""  5
        queue=""  6
        order=""  7
        serializer=""  8
        extract-payload=""/>  9
1 此端点从中接收 Message`实例的 `MessageChannel
2 此端点向其发送回复 Message`实例的 `MessageChannel
3 指定此出站网关是否必须返回非空值。此值默认值为 false。否则,当 Redis 返回 null 值时,将抛出一个 ReplyRequiredException
4 等待发送回复消息的超时时间(以毫秒为单位)。通常适用于基于队列的有限回复通道。
5 RedisConnectionFactory bean 的引用。它默认为 redisConnectionFactory。它与“redis-template”属性互斥。
6 出站网关向其发送会话 UUID 的 Redis 列表的名称。
7 当注册多个网关时,此出站网关的顺序。
8 RedisSerializer bean 引用。它可以是空字符串,表示 “no serializer”。在这种情况下,来自入站 Redis 消息的原始 byte[] 将作为 Message 负载发送到 channel。默认情况下,它是一个 JdkSerializationRedisSerializer
9 指定此端点是否期望来自 Redis 队列的数据包含整个 Message 实例。如果此属性设置为 true,则 serializer 不能是空字符串,因为消息需要某种形式的反序列化(默认为 JDK 序列化)。

Redis Queue Inbound Gateway

Spring Integration 4.1 引入了 Redis 队列入站网关来执行请求和答复场景。它从提供的 queue 中弹出对话 UUID,从 Redis 列表中弹出具有该 UUID 作为其密钥的值,并将答复推送到具有 UUID.reply 密钥的 Redis 列表中。下表显示了 Redis 队列入站网关的可用属性:

<int-redis:queue-inbound-gateway
        request-channel=""  1
        reply-channel=""  2
        executor=""  3
        reply-timeout=""  4
        connection-factory=""  5
        queue=""  6
        order=""  7
        serializer=""  8
        receive-timeout=""  9
        expect-message=""  10
        recovery-interval=""/>  11
1 这个端点从中发送从 Redis 数据创建的 Message 实例的 MessageChannel
2 此端点从中等待答复 Message 实例的 MessageChannel。可选 - 仍然使用 replyChannel 头。
3 对 Spring TaskExecutor (或标准 JDK Executor)bean 的引用。它用于底层监听任务。它默认为 SimpleAsyncTaskExecutor
4 等待发送回复消息的超时时间(以毫秒为单位)。通常适用于基于队列的有限回复通道。
5 RedisConnectionFactory`bean 的引用。默认为 `redisConnectionFactory。它与“redis 模板”属性互斥。
6 用于会话 UUID 的 Redis 列表的名称。
7 当注册多个网关时,此入站网关的顺序。
8 RedisSerializer bean 引用。它可以是空字符串,表示 “no serializer”。在这种情况下,来自入站 Redis 消息的原始 byte[] 将作为 Message 负载发送到 channel。它默认为 JdkSerializationRedisSerializer。(请注意,在 4.3 版之前的版本中,它默认是 StringRedisSerializer。要恢复该行为,请提供对 StringRedisSerializer 的引用)。
9 等待获取接收消息之前等待的超时时间(以毫秒为单位)。它通常适用于基于队列的有限请求通道。
10 指定此端点是否期望来自 Redis 队列的数据包含整个 Message 实例。如果此属性设置为 true,则 serializer 不能是空字符串,因为消息需要某种形式的反序列化(默认为 JDK 序列化)。
11 在 “right pop” 操作发生异常后,侦听任务在重新启动侦听任务之前应休眠的时间(以毫秒为单位)。

必须使用多个线程配置 task-executor 来进行处理;否则,当 RedisQueueMessageDrivenEndpoint 尝试在出现错误后重新启动侦听器任务时,可能会产生死锁。errorChannel 可用于处理这些错误,以避免重新启动,但最好不要将您的应用程序暴露于可能的死锁情况。请参阅 Spring Framework Reference Manual 以了解可能的 TaskExecutor 实现。

Redis Stream Outbound Channel Adapter

Spring Integration 5.4 引入了反应式 Redis Stream 输出通道适配器,将消息负载写入 Redis 流中。输出通道适配器使用 ReactiveStreamOperations.add(…​) 将一个 Record 添加到流中。以下示例展示了如何使用 Java 配置和服务类来使用 Redis Stream 输出通道适配器。

@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
        ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
    ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
        new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); 1
    reactiveStreamMessageHandler.setSerializationContext(serializationContext); 2
    reactiveStreamMessageHandler.setHashMapper(hashMapper); 3
    reactiveStreamMessageHandler.setExtractPayload(true); 4
    return reactiveStreamMessageHandler;
}
1 使用 ReactiveRedisConnectionFactory 和流名称构造 ReactiveRedisStreamMessageHandler 的一个实例以添加记录。另一个构造函数变体基于 SpEL 表达式,以针对请求消息计算流键。
2 设置用于在添加到流之前序列化记录键和值的 RedisSerializationContext
3 设置 HashMapper,它提供了 Java 类型与 Redis 哈希/映射之间的协定。
4 如果为“true”,通道适配器将从请求消息中提取要添加到流中的记录的有效负载。或者使用整个消息作为一个值。它默认为 true

Redis Stream Inbound Channel Adapter

Spring Integration 5.4 引入了反应式流输入通道适配器,用于从 Redis Stream 中读取消息。输入通道适配器根据自动确认标志使用 StreamReceiver.receive(…​)StreamReceiver.receiveAutoAck() 从 Redis 流中读取记录。以下示例展示了如何对 Redis Stream 输入通道适配器使用 Java 配置。

@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
       ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
            new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); 1
    messageProducer.setStreamReceiverOptions( 2
                StreamReceiver.StreamReceiverOptions.builder()
                      .pollTimeout(Duration.ofMillis(100))
                      .build());
    messageProducer.setAutoStartup(true); 3
    messageProducer.setAutoAck(false); 4
    messageProducer.setCreateConsumerGroup(true); 5
    messageProducer.setConsumerGroup("my-group"); 6
    messageProducer.setConsumerName("my-consumer"); 7
    messageProducer.setOutputChannel(fromRedisStreamChannel); 8
    messageProducer.setReadOffset(ReadOffset.latest()); 9
    messageProducer.extractPayload(true); 10
    return messageProducer;
}
1 使用 ReactiveRedisConnectionFactory 和流键构造 ReactiveRedisStreamMessageProducer 的一个实例以读取记录。
2 一个 StreamReceiver.StreamReceiverOptions 用于使用反应式基础设施消费 redis 流。
3 一个 SmartLifecycle 属性,用于指定此端点是否应在应用程序上下文启动后自动启动或不启动。它默认为 true。如果为 false,则应手动 messageProducer.start() 启动 RedisStreamMessageProducer
4 如果 false,接收的消息不会自动确认。消息的确认将延迟到客户端使用消息时。默认为 true
5 如果 true,将创建消费者组。创建消费者组流期间也会创建流(如果尚不存在)。消费者组跟踪消息传递并在使用者之间进行区分。默认为 false
6 设置消费者组名称。默认为定义的 bean 名称。
7 设置使用者名称。读取消息,作为 my-consumer,来自 `my-group`组。
8 从该端点发送消息到的消息通道。
9 定义读取消息的偏移。默认为 ReadOffset.latest()
10 如果为“true”,通道适配器将从 Record`中提取有效负载值。否则,整个 `Record`将用作有效负载。默认为 `true

如果 autoAck 设置为 false,Redis 流中的 Record 不会由 Redis 驱动程序自动确认,而是将一个 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 头添加到要使用一个 SimpleAcknowledgment 实例作为值的消息中。由目标集成流负责在其基于此记录的消息上完成业务逻辑时调用其 acknowledge() 回调。如果在反序列化的过程中发生异常并且配置了 errorChannel,则需要类似的逻辑。因此,目标错误处理程序必须决定确认或取消确认此类失败的消息。与 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 一起,ReactiveRedisStreamMessageProducer 还会将这些头填充到要生成的消息中:RedisHeaders.STREAM_KEYRedisHeaders.STREAM_MESSAGE_IDRedisHeaders.CONSUMER_GROUPRedisHeaders.CONSUMER

从 5.5 版开始,您可以在 ReactiveRedisStreamMessageProducer 上明确地配置 StreamReceiver.StreamReceiverOptionsBuilder 选项,包括新引入的 onErrorResume 函数,如果在反序列化错误发生时 Redis Stream 消费者应继续轮询,则需要此函数。默认函数会向错误通道(如果已提供)发送带有可能的对失败消息进行确认的消息,如上所述。所有这些 StreamReceiver.StreamReceiverOptionsBuilder 与外部提供的 StreamReceiver.StreamReceiverOptions 是互斥的。

Redis Lock Registry

Spring Integration 4.0 引入了 RedisLockRegistry。某些组件(例如,聚合器和重新排序器)使用从 LockRegistry 实例获得的锁来确保一次只有一个线程操作一个组。DefaultLockRegistry 在单个组件内执行此功能。您现在可以在这些组件上配置外部锁注册表。将它与一个共享的 MessageGroupStore 结合使用时,您可以使用 RedisLockRegistry 为多个应用程序实例提供这一功能,从而一次仅有一个实例可以操作该组。

当锁被本地线程释放时,另一个本地线程通常可以立即获取该锁。如果锁是由使用不同注册表实例的线程释放的,则获取该锁可能需要最多 100 毫秒。

为了避免 “挂起” 锁(当服务器发生故障时),此注册表中的锁会在默认 60 秒后过期,但您可以在注册表上配置此值。锁通常会保持更短的时间。

因为键可能会过期,尝试解锁已过期的锁会导致抛出异常。但是,受此类锁保护的资源可能已被破坏,因此应将此类异常视为严重异常。你应为过期设置一个足够大的值以防止这种情况发生,但也应设置一个足够低的值,以便在一段时间后服务器故障后可以恢复锁。

从 5.0 版开始,RedisLockRegistry 实现 ExpirableLockRegistry,它会移除最后获取时间早于 age 且当前未锁定的锁。

从 5.5.6 版开始,RedisLockRegistryRedisLockRegistry.locks 中支持自动清理 redisLock 的缓存,通过 RedisLockRegistry.setCacheCapacity()。有关更多信息,请参见其 JavaDocs。

从 5.5.13 版开始,RedisLockRegistry 公开了一个 setRedisLockType(RedisLockType) 选项,用于确定在何种模式下应发生 Redis 锁获取:

  • RedisLockType.SPIN_LOCK- 通过定期轮询(100ms)是否可以获取锁来获取锁。默认值。

  • RedisLockType.PUB_SUB_LOCK- 通过 redis 发布-订阅订阅获取锁。

pub-sub 是首选模式 - 客户端 Redis 服务器之间的网络流量更少,并且性能更好 - 当订阅收到另一个进程中的解锁通知后,锁将立即获取。然而,Redis 在主/从连接中不支持 pub-sub(例如在 AWS ElastiCache 环境中),因此,繁忙循环模式被选择为默认模式,以便使注册表在任何环境中都能工作。