A3.微服务分布式锁与Spring Integration 融合实践
前言
这是一个基于Java 21 的 六边形架构与领域驱动设计的一个通用项目,并且结合现有的最新版本技术架构实现了 领域驱动设计模式和六边形架构模式组件定义. 并且结合微服务,介绍了领域层,领域事件,资源库,分布式锁,序列化,安全认证,日志等,并提供了实现功能. 并且我会以日常发布文章和更新代码的形式来完善它.
简介
在多实例及其微服务的分布式环境下,单一基于线程的锁可能无法满足我们的需求,因此我们需要一种分布式锁。加锁,释放锁是一个技术方案,所以我们希望在业务对锁的操作是无感知的,因此我们利用注解的形式来实现。
package com.iokays.common.core.lock;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.time.temporal.ChronoUnit;
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface DistributedLock {
/**
* 锁实例的名称
*
* @return 指定锁名称
*/
String value() default "";
/**
* 用于动态计算key的Spring表达式
*
* @return Spring Expression Language (SpEL) expression for computing the key dynamically.
*/
String key() default "";
/**
* 等待锁的最大时间
*
* @return the maximum time to wait for the lock
*/
long time() default 0;
/**
* 时间单位
*
* @return he time unit of the time argument
*/
ChronoUnit unit() default ChronoUnit.SECONDS;
}
这个基于方法的注解表明了,我们需要实现两个技术点:
-
动态获取分布式注解上方法的参数得到锁的key,对锁进行操作。 我们需要通过Spring Expression Language (SpEL) 来动态获取注解的参数,然后基于SpEL来获取锁的key。
-
锁的实现。
首先第一个技术点: 我们需要通过Spring Expression Language (SpEL) 来动态获取注解的参数,然后基于SpEL来获取锁的key。该方式的实现可以借鉴Spring Cache。 第二个技术点: 同时在实现锁的方式上,我们可以直接基于Redis自己写一套,但是,我们更希望基于Spring Integration来实现,并且它已经实现了基于Redis,JDBC, 默认线程级的3种分布式锁的实现,我们在使用是直接调用就可以了。
Spring Expression Language (SpEL)
因为分布式的key,我们用到这项技术,你需要掌握理解下图的一些类。并且我这边提供了一些测试类帮助我们来理解:
下面是对SpEL的测试用例:更多资料可以查看: https://www.iokays.com/spring-framework/core/expressions.html
package com.iokays.common.spel;
import com.iokays.common.spel.entity.Inventor;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.List;
import static org.junit.jupiter.api.Assertions.*;
public class SpelSampleTest {
/**
* SpelExpressionParser:
* 可以将字符串形式的 SpEL 表达式解析为 SpelExpression 对象。
* 解析后的 SpelExpression 对象可以用于在运行时评估表达式的结果。
*/
@Test
void testValue() {
final var parser = new SpelExpressionParser();
//字符串
var expression = parser.parseExpression("'Hello World'");
String msgStr = (String) expression.getValue();
assertEquals("Hello World", msgStr);
//调用方法
expression = parser.parseExpression("'Hello World'.concat('!')");
msgStr = (String) expression.getValue();
assertEquals("Hello World!", msgStr);
//bytes
expression = parser.parseExpression("'Hello World'.bytes");
byte[] bytes = (byte[]) expression.getValue();
assertArrayEquals(bytes, "Hello World".getBytes());
//嵌套访问:火车头
expression = parser.parseExpression("'Hello World'.bytes.length");
Integer length = (Integer) expression.getValue();
Assertions.assertEquals(length, "Hello World".getBytes().length);
//构造函数 toUpperCase可以不用加()
expression = parser.parseExpression("new String('Hello World').toUpperCase()");
msgStr = (String) expression.getValue();
Assertions.assertEquals(msgStr, "HELLO WORLD");
}
/**
* 测试根对象
*/
@Test
void testRootObject() {
final var c = new GregorianCalendar();
c.set(1956, Calendar.AUGUST, 9);
final var tesla = new Inventor("Nikola Tesla", c.getTime(), "Serbian");
final var parser = new SpelExpressionParser();
// test value
Expression exp = parser.parseExpression("name"); // Parse name as an expression
String name = (String) exp.getValue(tesla);
Assertions.assertEquals(name, "Nikola Tesla");
//在SpEL中,== 运算符对于引用类型的比较实际上是调用了 equals 方法。
// 这对于大多数场景来说是非常有用的,因为它允许你进行更灵活的值比较。
exp = parser.parseExpression("name == new String('Nikola Tesla')");
Boolean result = exp.getValue(tesla, Boolean.class);
assertEquals(Boolean.TRUE, result);
assertSame(name, "Nikola Tesla");
assertNotSame(name, new String("Nikola Tesla"));
}
class Simple {
public List<Boolean> booleanList = new ArrayList<>();
}
}
上面的用例简单的测试了怎么获取值和调用方法的一些简单的操作,如果要获取到方法的参数,我们需要了解:MethodBasedEvaluationContext. 同时我也提供了一些测试方法, 并在测试类上有清晰的注释来解释这个类。
package com.iokays.common.spel;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.context.expression.MethodBasedEvaluationContext;
import org.springframework.core.DefaultParameterNameDiscoverer;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import java.lang.reflect.Proxy;
import java.util.Objects;
interface Samer {
boolean isSame(String name, Integer age);
}
/**
* 来源: https://www.cnblogs.com/wt20/p/17659683.html
* <p>
* MethodBasedEvaluationContext是StandardEvaluationContext的一个子类。
* 它主要是把方法参数也加到了变量中,使得用户可以直接通过#+参数名来获取值。常常用于解析注解中的SpEL表达式。
* <p>
* java编译后通过反射是拿不到真实的方法参数名称的,需要带上-parameters参数编译才行,不过Spring还另外基于ASM的方式解析字节码文件,
* 获取字节码的本地方法表来获取方法真实参数。DefaultParameterNameDiscoverer实现类同时使用上面所说的两种方式来获取方法参数名。
*/
public class MethodBasedEvaluationContextTest {
@Test
public void test() {
ExpressionParser expressionParser = new SpelExpressionParser();
// 代理对象
Samer samer = (Samer) Proxy.newProxyInstance(
MethodBasedEvaluationContextTest.class.getClassLoader(),
new Class[]{Samer.class},
(proxy, method, args) -> {
//root 对象, 当表达式使用: #root, #this 需要
final var root = new Object();
final var evaluationContext = new MethodBasedEvaluationContext(root, method, args, new DefaultParameterNameDiscoverer());
// 通过#+方法参数名
Expression expression = expressionParser.parseExpression("#name");
Assertions.assertEquals("andy", expression.getValue(evaluationContext));
// 通过# + 内置的变量名+下标a0
expression = expressionParser.parseExpression("#a0");
Assertions.assertEquals("andy", expression.getValue(evaluationContext));
// 通过# + 内置的变量名+下标p0
expression = expressionParser.parseExpression("#p0");
Assertions.assertEquals("andy", expression.getValue(evaluationContext));
// 通过# + 内置的变量名+下标p0
expression = expressionParser.parseExpression("#p1");
Assertions.assertEquals(18, expression.getValue(evaluationContext));
return Objects.equals("andy", args[0]) && Objects.equals(18, args[1]);
}
);
samer.isSame("andy", 18);
}
}
Spring Integration Redis
Spring Integration Redis 引入了 DefaultLockRegistry 。使用从 LockRegistry 实例获得的锁来确保一次只有一个线程操作一个组。 具体的实现,我可以引用包的的形式来确定自己分布式锁的方案。
Spring Integration Redis: https://www.iokays.com/spring-integration/redis.html#redis-lock-registry
Spring Integration JDBC: https://www.iokays.com/spring-integration/jdbc/lock-registry.html
第一步我们需要提供一个锁提供者的Bean, Redis:RedisLockRegistry; JDBC: JdbcLockRegistry.
@Bean("sampleLock")
public LockRegistry lockRegistry() {
return new DefaultLockRegistry();
}
package com.iokays.common.distributed.lock;
import jakarta.annotation.Resource;
import org.apache.commons.lang3.Validate;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.support.locks.LockRegistry;
import java.util.concurrent.TimeUnit;
@SpringBootTest
public class LockRegistryTest {
@Resource
private LockRegistry lockRegistry;
/**
* 简单锁测试
*/
@Test
public void testLock() {
final var orderLock = lockRegistry.obtain("ORDER_NO_00001");
orderLock.lock();
System.out.println("执行操作");
orderLock.unlock();
}
/**
* 带有时间的加锁测试
*
* @throws InterruptedException
*/
@Test
public void testTryLock() throws InterruptedException {
final var orderLock = lockRegistry.obtain("ORDER_NO_00002");
// 1秒内尝试获取锁,失败返回false
final var mark = orderLock.tryLock(1, TimeUnit.SECONDS);
Validate.isTrue(mark, "获取锁失败");
System.out.println("执行操作");
orderLock.unlock();
}
}
这是一个最简单的使用方式,大家可以利用多线程的场景对锁进行的测试。
分布式锁的实现
当我们了解这两种技术点后,在实现分布式锁就会简单,也只需要两步:
第一:提供基于分布式锁表达式计算方式,上面我们已经知道MethodBasedEvaluationContext, 直接继承:CachedExpressionEvaluator,并实现 createMethodBasedEvaluationContext这个方法即可。
package com.iokays.common.distributed.lock;
import com.google.common.collect.Maps;
import org.springframework.context.expression.AnnotatedElementKey;
import org.springframework.context.expression.CachedExpressionEvaluator;
import org.springframework.context.expression.MethodBasedEvaluationContext;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import java.lang.reflect.Method;
import java.util.Map;
/**
* Distributed Expression Evaluator
*/
public class DistributedLockExpressionEvaluator extends CachedExpressionEvaluator {
private final Map<ExpressionKey, Expression> cache = Maps.newHashMap();
/**
* Create MethodBasedEvaluationContext
* MethodBasedEvaluationContext是StandardEvaluationContext的一个子类。它主要是把方法参数也加到了变量中,使得用户可以直接通过#+参数名来获取值。常常用于解析注解中的SpEL表达式。
* <p>
* java编译后通过反射是拿不到真实的方法参数名称的,需要带上-parameters参数编译才行,不过Spring还另外基于ASM的方式解析字节码文件,
* 获取字节码的本地方法表来获取方法真实参数。DefaultParameterNameDiscoverer实现类同时使用上面所说的两种方式来获取方法参数名。
*
* @param method
* @param args
* @return
*/
public EvaluationContext createMethodBasedEvaluationContext(Method method, Object[] args) {
//当你的应用场景用 #root, 取决于你是否根据 root对象可以用来生成key 是需要rootObject的。
final var rootObject = new DistributedLockExpressionRootObject(method, args);
return new MethodBasedEvaluationContext(rootObject, method, args, this.getParameterNameDiscoverer());
}
public Object eval(String expression, AnnotatedElementKey elementKey, EvaluationContext evalContext) {
return getExpression(cache, elementKey, expression).getValue(evalContext);
}
}
第二:利用Spring AOP的环绕通知来实现分布式的加锁,释放锁操作。
package com.iokays.common.distributed.lock;
import com.iokays.common.core.lock.DistributedLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.aop.support.AopUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.expression.AnnotatedElementKey;
import org.springframework.core.BridgeMethodResolver;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.stereotype.Component;
import java.lang.reflect.Proxy;
import java.time.Duration;
import java.util.Objects;
/**
* 分布式锁切面实现
* 基于切面的实现,是因为在释放锁需在提交事务(声明注解性事务)之后。
* 备注: 事务生效时机是在第一次读、写表(InnoDB)的操作开始。
*/
@Aspect
@Component
public class DistributedLockAspect {
private static final Logger log = LoggerFactory.getLogger(DistributedLockAspect.class);
/**
* Spring 集成的 LockRegistry
*/
private final ApplicationContext applicationContext;
private final DistributedLockExpressionEvaluator evaluator = new DistributedLockExpressionEvaluator();
public DistributedLockAspect(final ApplicationContext applicationContext) {
this.applicationContext = Objects.requireNonNull(applicationContext);
}
/**
* Spring 环绕通知 实现分布式锁调用
*/
@Around("@annotation(distributedLock)")
public Object around(final ProceedingJoinPoint pjp, final DistributedLock distributedLock) throws Throwable {
Validate.notNull(pjp, "pjp is null");
Validate.notNull(distributedLock, "distributedLock is null");
Validate.noNullElements(new Object[]{distributedLock.key(), distributedLock.time(), distributedLock.unit()});
final String key = getKey(distributedLock.key(), pjp);
log.info("distributedLock: {}, key: {}", distributedLock, key);
final var value = distributedLock.value();
final var lockRegistry = StringUtils.isNotBlank(value) ?
applicationContext.getBean(value, LockRegistry.class) : applicationContext.getBean(LockRegistry.class);
return distributedLock.time() == 0L ?
lockRegistry.executeLocked(key, () -> pjp.proceed()) :
lockRegistry.executeLocked(key, Duration.of(distributedLock.time(), distributedLock.unit()), () -> pjp.proceed());
}
private String getKey(final String key, final ProceedingJoinPoint pjp) {
final var signature = (MethodSignature) pjp.getSignature();
final var method = BridgeMethodResolver.findBridgedMethod(signature.getMethod());
final var target = pjp.getTarget();
final var targetClass = AopProxyUtils.ultimateTargetClass(target);
final var targetMethod = (!Proxy.isProxyClass(targetClass) ?
AopUtils.getMostSpecificMethod(method, targetClass) : method);
final var methodKey = new AnnotatedElementKey(targetMethod, targetClass);
final var evaluationContext = evaluator.createMethodBasedEvaluationContext(targetMethod, pjp.getArgs());
final Object result = evaluator.eval(key, methodKey, evaluationContext);
return Objects.requireNonNull(result).toString();
}
}
getKey方法:利用了AOP的一些方法,大家可以去了解下。
现在我直接使用测试类来演示:
package com.iokays.common.distributed.lock;
import com.iokays.common.core.lock.DistributedLock;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Service
public class SampleService {
private final AtomicInteger counter = new AtomicInteger(0);
@DistributedLock(value = "sampleLock", key = "#key", time = 1)
public void run(final String key) {
log.info("run: {}, counter: {}", key, counter.getAndIncrement());
}
}
package com.iokays.common.distributed.lock;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.stream.Stream;
@SpringBootTest
class DistributedLockAspectTest {
@Resource
private SampleService service;
@Test
void test() {
Stream.generate(() -> "key_1").limit(10).parallel().forEach(service::run);
}
}