Distributed Locks

在许多情况下,必须以排他方式对某个上下文(甚至单个消息)执行操作。一个示例是聚合器组件,其中我们必须检查当前消息的消息组状态,以确定我们是否可以释放组或仅添加该消息以便将来考虑。出于此目的,Java 提供了一个带有 java.util.concurrent.locks.Lock 实现的 API。但是,当应用程序分布式和/或在集群中运行时,问题会变得更加复杂。在这种情况下,锁定具有挑战性,需要一些共享状态及其特定方法来实现排他性要求。

In many situations the action against some context (or even single message) has to be performed in an exclusive manner. One example is an aggregator component where we have to check the message group state for the current message to determine whether we can release the group or just add that message for future consideration. For this purpose Java provides an API with java.util.concurrent.locks.Lock implementations. However, the problem becomes more complex when an application is distributed and/or run in the cluster. The locking in this case is challenging and requires some shared state and its specific approach to achieve the exclusivity requirement.

Spring Integration 提供了一个带有基于 ReentrantLock API 的内存中 DefaultLockRegistry 实现的 LockRegistrty 抽象。LockRegistrtyobtain(Object) 方法需要一个用于特定上下文的“锁定键”。例如,聚合器使用 correlationKey 来锁定对其组的操作。这样可以同时使用不同的锁。此 obtain(Object) 方法返回一个 java.util.concurrent.locks.Lock 实例(取决于 LockRegistry 实现),因此其余逻辑与标准 Java 并发算法相同。

Spring Integration provides a LockRegistrty abstraction with an in-memory DefaultLockRegistry implementation based on the ReentrantLock API. The obtain(Object) method of the LockRegistrty requires a lock key for specific context. For example, an aggregator uses a correlationKey to lock operations around its group. This way different locks can be used concurrently. This obtain(Object) method returns a java.util.concurrent.locks.Lock instance (depending on the LockRegistry implementation), therefore the rest of the logic is the same as standard Java Concurrency algorithm.

从版本 6.2 开始,LockRegistry 提供了一个 executeLocked() API(此界面中的“默认”方法)来执行一些锁定任务。此 API 的行为类似于众所周知的 JdbcTemplateJmsTemplateRestTemplate。以下示例演示了此 API 的用法:

Starting with version 6.2, the LockRegistry provides an executeLocked() API (default methods in this interface) to perform some task while locked. The behavior of this API is similar to well-known JdbcTemplate, JmsTemplate or RestTemplate. The following example demonstrates the usage of this API:

LockRegistry registry = new DefaultLockRegistry();
...
registry.executeLocked("someLockKey", () -> someExclusiveResourceCall());

该方法从任务调用中重新抛出异常,如果 Lock 中断,将抛出 InterruptedException。此外,当 lock.tryLock() 返回 false 时,一个使用 Duration 的变体将抛出一个 java.util.concurrent.TimeoutException

The method rethrows an exception from the task call, throws an InterruptedException if Lock is interrupted. In addition, a variant with Duration throws a java.util.concurrent.TimeoutException when lock.tryLock() returns false.

Spring Integration 为分布式锁提供了这些 LockRegistrty 实现:

Spring Integration provides these LockRegistrty implementations for distributed locks:

Spring Integration AWS扩展还实现了 DynamoDbLockRegistry

Spring Integration AWS extension also implements a DynamoDbLockRegistry.