Hazelcast Support

Spring Integration 提供通道适配器和其他实用组件来与内存数据网格 Hazelcast 交互。 你需要将此依赖项包含在你的项目中:

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-hazelcast</artifactId>
    <version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-hazelcast:{project-version}"

Hazelcast 组件的 XML 命名空间和 schemaLocation 定义为:

xmlns:int-hazelcast="http://www.springframework.org/schema/integration/hazelcast"
xsi:schemaLocation="http://www.springframework.org/schema/integration/hazelcast
          https://www.springframework.org/schema/integration/hazelcast/spring-integration-hazelcast.xsd"

Hazelcast Event-driven Inbound Channel Adapter

Hazelcast 提供分布式数据结构,例如:

  • com.hazelcast.map.IMap

  • com.hazelcast.multimap.MultiMap

  • com.hazelcast.collection.IList

  • com.hazelcast.collection.ISet

  • com.hazelcast.collection.IQueue

  • com.hazelcast.topic.ITopic

  • com.hazelcast.replicatedmap.ReplicatedMap

它还提供事件侦听器,以便侦听对这些数据结构所做的修改。

  • com.hazelcast.core.EntryListener<K, V>

  • com.hazelcast.collection.ItemListener

  • com.hazelcast.topic.MessageListener

Hazelcast 事件驱动的入站通道适配器侦听相关的缓存事件,并将事件消息发送到定义的通道。它同时支持 XML 配置和 JavaConfig 驱动的配置。

XML Configuration :

<int-hazelcast:inbound-channel-adapter channel="mapChannel"
                      cache="map"
                      cache-events="UPDATED, REMOVED"
                      cache-listening-policy="SINGLE" />

Hazelcast 事件驱动的入站通道适配器需要以下属性:

  • channel:指定发送消息的通道;

  • cache:指定要侦听的分布式对象引用。这是一个强制属性;

  • cache-events:指定要侦听的缓存事件。这是一个可选属性,其默认值是 ADDED。其支持的值如下:

  • IMapMultiMap`所支持的缓存事件类型:`ADDEDREMOVEDUPDATEDEVICTEDEVICT_ALLCLEAR_ALL

  • ReplicatedMap 支持的缓存事件类型:ADDEDREMOVEDUPDATEDEVICTED

  • IListISetIQueue 支持的缓存事件类型: ADDEDREMOVEDITopic 没有缓存事件类型。

  • cache-listening-policy:指定缓存侦听策略为 SINGLE`或 `ALL。这是一个可选属性,其默认值是 SINGLE。每个 Hazelcast 内部通道适配器监听具有相同 cache-events 属性的相同缓存对象,可以接收单个事件消息或所有事件消息。如果是 ALL,所有 Hazelcast 内部通道适配器监听具有相同 cache-events 属性的相同缓存对象,都将接收所有事件消息。如果是 SINGLE,它们将接收唯一事件消息。

一些配置示例:

Distributed Map
<int:channel id="mapChannel"/>

<int-hazelcast:inbound-channel-adapter channel="mapChannel"
                              cache="map"
                              cache-events="UPDATED, REMOVED" />

<bean id="map" factory-bean="instance" factory-method="getMap">
    <constructor-arg value="map"/>
</bean>

<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>
Distributed MultiMap
<int-hazelcast:inbound-channel-adapter channel="multiMapChannel"
                              cache="multiMap"
                              cache-events="ADDED, REMOVED, CLEAR_ALL" />

<bean id="multiMap" factory-bean="instance" factory-method="getMultiMap">
    <constructor-arg value="multiMap"/>
</bean>
Distributed List
<int-hazelcast:inbound-channel-adapter  channel="listChannel"
                               cache="list"
                               cache-events="ADDED, REMOVED"
                               cache-listening-policy="ALL" />

<bean id="list" factory-bean="instance" factory-method="getList">
    <constructor-arg value="list"/>
</bean>
Distributed Set
<int-hazelcast:inbound-channel-adapter channel="setChannel" cache="set" />

<bean id="set" factory-bean="instance" factory-method="getSet">
    <constructor-arg value="set"/>
</bean>
Distributed Queue
<int-hazelcast:inbound-channel-adapter  channel="queueChannel"
                               cache="queue"
                               cache-events="REMOVED"
                               cache-listening-policy="ALL" />

<bean id="queue" factory-bean="instance" factory-method="getQueue">
    <constructor-arg value="queue"/>
</bean>
Distributed Topic
<int-hazelcast:inbound-channel-adapter channel="topicChannel" cache="topic" />

<bean id="topic" factory-bean="instance" factory-method="getTopic">
    <constructor-arg value="topic"/>
</bean>
Replicated Map
<int-hazelcast:inbound-channel-adapter channel="replicatedMapChannel"
                              cache="replicatedMap"
                              cache-events="ADDED, UPDATED, REMOVED"
                              cache-listening-policy="SINGLE"  />

<bean id="replicatedMap" factory-bean="instance" factory-method="getReplicatedMap">
    <constructor-arg value="replicatedMap"/>
</bean>

Java Configuration Sample:

以下示例显示 DistributedMap 配置。相同配置可用于其他分布式数据结构(IMapMultiMapReplicatedMapIListISetIQueueITopic):

@Bean
public PollableChannel distributedMapChannel() {
    return new QueueChannel();
}

@Bean
public IMap<Integer, String> distributedMap() {
    return hazelcastInstance().getMap("Distributed_Map");
}

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer() {
    final HazelcastEventDrivenMessageProducer producer = new HazelcastEventDrivenMessageProducer(distributedMap());
    producer.setOutputChannel(distributedMapChannel());
    producer.setCacheEventTypes("ADDED,REMOVED,UPDATED,CLEAR_ALL");
    producer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE);

    return producer;
}

Hazelcast Continuous Query Inbound Channel Adapter

Hazelcast 连续查询允许对特定映射条目所做的修改进行侦听。Hazelcast 连续查询入站通道适配器是事件驱动的通道适配器,它根据已定义的谓语侦听相关的分布式映射事件。

  • Java

  • XML

@Bean
public PollableChannel cqDistributedMapChannel() {
    return new QueueChannel();
}

@Bean
public IMap<Integer, String> cqDistributedMap() {
    return hazelcastInstance().getMap("CQ_Distributed_Map");
}

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public HazelcastContinuousQueryMessageProducer hazelcastContinuousQueryMessageProducer() {
    final HazelcastContinuousQueryMessageProducer producer =
        new HazelcastContinuousQueryMessageProducer(cqDistributedMap(), "surname=TestSurname");
    producer.setOutputChannel(cqDistributedMapChannel());
    producer.setCacheEventTypes("UPDATED");
    producer.setIncludeValue(false);

    return producer;
}
<int:channel id="cqMapChannel"/>

<int-hazelcast:cq-inbound-channel-adapter
                channel="cqMapChannel"
                cache="cqMap"
                cache-events="UPDATED, REMOVED"
                predicate="name=TestName AND surname=TestSurname"
                include-value="true"
                cache-listening-policy="SINGLE"/>

<bean id="cqMap" factory-bean="instance" factory-method="getMap">
    <constructor-arg value="cqMap"/>
</bean>

<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>

它支持六个属性,如下所示:

  • channel:指定发送消息的通道;

  • cache:指定要侦听的分布式映射参考。强制的;

  • cache-events:指定要侦听的缓存事件。可选属性,ADDED 是其默认值。支持的值为:ADDEDREMOVEDUPDATEDEVICTEDEVICT_ALLCLEAR_ALL

  • predicate:指定一个谓词来侦听对特定映射项所做的修改。强制的;

  • include-value:指定在连续查询结果中包含值和旧值。可选,true 是其默认值;

  • cache-listening-policy:指定缓存监听策略为 SINGLE`或 `ALL。可选项,默认值为 SINGLE。每个侦听具有相同 cache-events 属性的相同缓存对象的 Hazelcast CQ 入站通道适配器,可以接收单个事件消息或所有事件消息。如果为 ALL,则所有侦听具有相同 cache-events 属性的相同缓存对象的 Hazelcast CQ 入站通道适配器将接收所有事件消息。如果为 SINGLE,则它们将接收唯一的事件消息。

Hazelcast Cluster Monitor Inbound Channel Adapter

Hazelcast 集群监视器支持侦听对集群所做的修改。Hazelcast 集群监视器入站通道适配器是事件驱动的通道适配器,它侦听相关的成员资格、分布式对象、迁移、生命周期和客户端事件:

  • Java

  • XML

@Bean
public PollableChannel eventChannel() {
    return new QueueChannel();
}

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public HazelcastClusterMonitorMessageProducer hazelcastClusterMonitorMessageProducer() {
    HazelcastClusterMonitorMessageProducer producer = new HazelcastClusterMonitorMessageProducer(hazelcastInstance());
    producer.setOutputChannel(eventChannel());
    producer.setMonitorEventTypes("DISTRIBUTED_OBJECT");

    return producer;
}
<int:channel id="monitorChannel"/>

<int-hazelcast:cm-inbound-channel-adapter
                 channel="monitorChannel"
                 hazelcast-instance="instance"
                 monitor-types="MEMBERSHIP, DISTRIBUTED_OBJECT" />

<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>

它支持三个属性,如下所示:

  • channel:指定发送消息的通道;

  • hazelcast-instance:指定侦听集群事件的 Hazelcast 实例引用。这是必需属性;

  • monitor-types:指定侦听的监视器类型。这是可选属性,默认值为 MEMBERSHIP。支持的值为 MEMBERSHIP, DISTRIBUTED_OBJECT, MIGRATION, LIFECYCLE, CLIENT

Hazelcast Distributed SQL Inbound Channel Adapter

Hazelcast 允许在分布式映射上运行分布式查询。Hazelcast 分布式 SQL 入站通道适配器是一种轮询入站通道适配器。它运行已定义的分布式 SQL 命令,并根据迭代类型返回结果。

  • Java

  • XML

@Bean
public PollableChannel dsDistributedMapChannel() {
    return new QueueChannel();
}

@Bean
public IMap<Integer, String> dsDistributedMap() {
    return hazelcastInstance().getMap("DS_Distributed_Map");
}

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
@InboundChannelAdapter(value = "dsDistributedMapChannel", poller = @Poller(maxMessagesPerPoll = "1"))
public HazelcastDistributedSQLMessageSource hazelcastDistributedSQLMessageSource() {
    final HazelcastDistributedSQLMessageSource messageSource =
        new HazelcastDistributedSQLMessageSource(dsDistributedMap(),
            "name='TestName' AND surname='TestSurname'");
    messageSource.setIterationType(DistributedSQLIterationType.ENTRY);

    return messageSource;
}
<int:channel id="dsMapChannel"/>

<int-hazelcast:ds-inbound-channel-adapter
            channel="dsMapChannel"
            cache="dsMap"
            iteration-type="ENTRY"
            distributed-sql="active=false OR age >= 25 OR name = 'TestName'">
    <int:poller fixed-delay="100"/>
</int-hazelcast:ds-inbound-channel-adapter>

<bean id="dsMap" factory-bean="instance" factory-method="getMap">
    <constructor-arg value="dsMap"/>
</bean>

<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>

它需要一个轮询器,并支持四个属性:

  • channel:指定发送消息到的通道。这是必需属性;

  • cache:指定已查询的分布式 `IMap`引用。这是必需属性;

  • iteration-type:指定结果类型。分布式 SQL 可运行在 EntrySet, KeySet, LocalKeySet`或 `Values`中。这是可选属性,默认值为 `VALUE。支持的值为 ENTRY, `KEY, LOCAL_KEY`和 `VALUE

  • distributed-sql:指定 SQL 语句的 where 子句。这是必需属性。

Hazelcast Outbound Channel Adapter

Hazelcast 出站通道适配器侦听其定义的通道,并将传入的消息写入到相关的分布式缓存。它期望其中一个:cachecache-expressionHazelcastHeaders.CACHE_NAME 用于分布式对象定义。支持的分布式对象是:IMapMultiMapReplicatedMapIListISetIQueueITopic

  • Java

  • XML

@Bean
public MessageChannel distributedMapChannel() {
    return new DirectChannel();
}

@Bean
public IMap<Integer, String> distributedMap() {
    return hzInstance().getMap("Distributed_Map");
}

@Bean
public HazelcastInstance hzInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
@ServiceActivator(inputChannel = "distributedMapChannel")
public HazelcastCacheWritingMessageHandler hazelcastCacheWritingMessageHandler() {
    HazelcastCacheWritingMessageHandler handler = new HazelcastCacheWritingMessageHandler();
    handler.setDistributedObject(distributedMap());
    handler.setKeyExpression(new SpelExpressionParser().parseExpression("payload.id"));
    handler.setExtractPayload(true);
    return handler;
}
<int-hazelcast:outbound-channel-adapter channel="mapChannel"
                    cache-expression="headers['CACHE_HEADER']"
                    key-expression="payload.key"
                    extract-payload="true"/>

它需要以下属性:

  • channel:指定发送消息的通道;

  • cache:指定分布式对象引用。可选项;

  • cache-expression:通过 Spring 表达式语言 (SpEL) 指定分布式对象。可选项;

  • key-expression:通过 Spring 表达式语言 (SpEL) 指定键值对的键。仅对于 IMap, `MultiMap`和 `ReplicatedMap`分布式数据结构而言是可选项和必需。

  • extract-payload:指定是否发送整个消息或仅发送有效负载。可选项,默认值为 true。如果为 true,则仅有效负载将被写入分布式对象。否则,通过转换消息头和有效负载,将写入整个消息。

通过在头中设置分布式对象名称,可以经由同一信道将消息写入不同的分布式对象。如果未定义 cachecache-expression 特性,必须在请求 Message 中设置 HazelcastHeaders.CACHE_NAME 头。

Hazelcast Leader Election

如果需要领导人选举(例如,对于仅一个节点应该接收消息的高可用消息使用者而言),可以使用基于 Hazelcast 的 LeaderInitiator

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public LeaderInitiator initiator() {
    return new LeaderInitiator(hazelcastInstance());
}

当一个节点被选为领导人时,它将向所有应用程序侦听器发送 OnGrantedEvent

Hazelcast Message Store

对于分布式消息状态管理(例如,对于持久性 QueueChannel 或跟踪 Aggregator 消息组),提供了 HazelcastMessageStore 实现:

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public MessageGroupStore messageStore() {
    return new HazelcastMessageStore(hazelcastInstance());
}

默认情况下,SPRING_INTEGRATION_MESSAGE_STORE IMap 用于将消息和组存储为键/值。任何自定义 IMap 都可以提供给 HazelcastMessageStore

Hazelcast Metadata Store

实现一个 ListenableMetadataStore 可用,它使用后备 Hazelcast IMap。默认情况下,将使用名称为 SPRING_INTEGRATION_METADATA_STORE 的映射,该名称可以自定义。

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public MetadataStore metadataStore() {
    return new HazelcastMetadataStore(hazelcastInstance());
}

HazelcastMetadataStore 实现 ListenableMetadataStore,允许你注册你自己的 MetadataStoreListener 类型的侦听器,以通过 addListener(MetadataStoreListener 回调) 侦听事件。

Hazelcast Lock Registry

可以使用后备 Hazelcast 分布式 ILock 支持实现一个 LockRegistry

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public LockRegistry lockRegistry() {
    return new HazelcastLockRegistry(hazelcastInstance());
}

与共享 MessageGroupStore(例如,Aggregator 存储管理)配合使用时,HazelcastLockRegistry 可用于跨多个应用程序实例提供此功能,以便一次只允许一个实例处理组。

对于所有分布式操作,必须在 HazelcastInstance 上启用 CP 子系统。

Message Channels with Hazelcast

Hazelcast IQueueITopic 分布式对象本质上是消息原语,可以与 Spring Integration 核心组件配合使用,而无需在此 Hazelcast 模块中实现额外的实现。

QueueChannel 可由任何 java.util.Queue 提供,包括已提到的 Hazelcast 分布式 IQueue

@Bean
PollableChannel hazelcastQueueChannel(HazelcastInstance hazelcastInstance) {
    return new QueueChannel(hazelcastInstance.Message<?>>getQueue("springIntegrationQueue"));
}

将此配置放置在应用程序的 Hazelcast 集群中的多个节点上,将使 QueueChannel 得到分布,并且只有一个节点将能够从该 IQueue 轮询单个 Message。其工作方式类似于 PollableJmsChannelPollableKafkaChannelPollableAmqpChannel

如果生产者端不是 Spring 集成应用程序,则无法配置 QueueChannel,因此使用普通 Hazelcast IQueue API 产生数据。在这种情况下,QueueChannel 方法在消费者端是错误的:必须改用 Inbound Channel Adapter 解决方案:

@Bean
public IQueue<String> myStringHzQueue(HazelcastInstance hazelcastInstance) {
    return hazelcastInstance.getQueue("springIntegrationQueue");
}

@Bean
@InboundChannelAdapter(channel = "stringValuesFromHzQueueChannel")
Supplier<String> fromHzIQueueSource(IQueue<String> myStringHzQueue) {
    return myStringHzQueue::poll;
}

Hazelcast 中的 ITopic 抽象与 JMS 中的 Topic 具有相似的语义:所有订户都收到发布的消息。使用一对简单的 MessageChannel bean,此机制作为开箱即用的功能得到支持:

@Bean
public ITopic<Message<?>> springIntegrationTopic(HazelcastInstance hazelcastInstance,
        MessageChannel fromHazelcastTopicChannel) {

    ITopic<Message<?>> topic = hazelcastInstance.getTopic("springIntegrationTopic");
	topic.addMessageListener(m -> fromHazelcastTopicChannel.send(m.getMessageObject()));
	return topic;
}

@Bean
public MessageChannel publishToHazelcastTopicChannel(ITopic<Message<?>> springIntegrationTopic) {
    return new FixedSubscriberChannel(springIntegrationTopic::publish);
}

@Bean
public MessageChannel fromHazelcastTopicChannel() {
    return new DirectChannel();
}

FixedSubscriberChannelDirectChannel 的优化变体,在初始化时需要一个 MessageHandler。由于 MessageHandler 是一个函数式接口,因此可以为 handleMessage 方法提供一个简单的 lambda。当消息发送到 publishToHazelcastTopicChannel 时,它只是发布到 Hazelcast ITopiccom.hazelcast.topic.MessageListener 也是一个函数式接口,因此可以为 ITopic#addMessageListener 提供一个 lambda。因此,fromHazelcastTopicChannel 订阅者将使用所有发送到所述 ITopic 的消息。

可以将 ExecutorChannelIExecutorService 一起提供。例如,通过相应的配置,可以实现集群范围内单例:

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance(
                new Config()
                    .addExecutorConfig(new ExecutorConfig()
                         .setName("singletonExecutor")
                         .setPoolSize(1)));
}

@Bean
public MessageChannel hazelcastSingletonExecutorChannel(HazelcastInstance hazelcastInstance) {
    return new ExecutorChannel(hazelcastInstance.getExecutorService("singletonExecutor"));
}