A3.微服务分布式锁与Spring Integration 融合实践

前言

这是一个基于Java 21 的 六边形架构与领域驱动设计的一个通用项目,并且结合现有的最新版本技术架构实现了 领域驱动设计模式和六边形架构模式组件定义. 并且结合微服务,介绍了领域层,领域事件,资源库,分布式锁,序列化,安全认证,日志等,并提供了实现功能. 并且我会以日常发布文章和更新代码的形式来完善它.

简介

在多实例及其微服务的分布式环境下,单一基于线程的锁可能无法满足我们的需求,因此我们需要一种分布式锁。加锁,释放锁是一个技术方案,所以我们希望在业务对锁的操作是无感知的,因此我们利用注解的形式来实现。

package com.iokays.common.core.lock;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.time.temporal.ChronoUnit;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface DistributedLock {

    /**
     * 锁实例的名称
     *
     * @return 指定锁名称
     */
    String value() default "";

    /**
     * 用于动态计算key的Spring表达式
     *
     * @return Spring Expression Language (SpEL) expression for computing the key dynamically.
     */
    String key() default "";

    /**
     * 等待锁的最大时间
     *
     * @return the maximum time to wait for the lock
     */
    long time() default 0;

    /**
     * 时间单位
     *
     * @return he time unit of the time argument
     */
    ChronoUnit unit() default ChronoUnit.SECONDS;

}

这个基于方法的注解表明了,我们需要实现两个技术点:

  1. 动态获取分布式注解上方法的参数得到锁的key,对锁进行操作。 我们需要通过Spring Expression Language (SpEL) 来动态获取注解的参数,然后基于SpEL来获取锁的key。

  2. 锁的实现。

首先第一个技术点: 我们需要通过Spring Expression Language (SpEL) 来动态获取注解的参数,然后基于SpEL来获取锁的key。该方式的实现可以借鉴Spring Cache。 第二个技术点: 同时在实现锁的方式上,我们可以直接基于Redis自己写一套,但是,我们更希望基于Spring Integration来实现,并且它已经实现了基于Redis,JDBC, 默认线程级的3种分布式锁的实现,我们在使用是直接调用就可以了。

Spring Expression Language (SpEL)

因为分布式的key,我们用到这项技术,你需要掌握理解下图的一些类。并且我这边提供了一些测试类帮助我们来理解:

cached expression evaluator about class

下面是对SpEL的测试用例:更多资料可以查看: https://www.iokays.com/spring-framework/core/expressions.html

package com.iokays.common.spel;

import com.iokays.common.spel.entity.Inventor;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;

import java.util.ArrayList;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.List;

import static org.junit.jupiter.api.Assertions.*;

public class SpelSampleTest {

    /**
     * SpelExpressionParser:
     * 可以将字符串形式的 SpEL 表达式解析为 SpelExpression 对象。
     * 解析后的 SpelExpression 对象可以用于在运行时评估表达式的结果。
     */
    @Test
    void testValue() {
        final var parser = new SpelExpressionParser();
        //字符串
        var expression = parser.parseExpression("'Hello World'");
        String msgStr = (String) expression.getValue();
        assertEquals("Hello World", msgStr);

        //调用方法
        expression = parser.parseExpression("'Hello World'.concat('!')");
        msgStr = (String) expression.getValue();
        assertEquals("Hello World!", msgStr);

        //bytes
        expression = parser.parseExpression("'Hello World'.bytes");
        byte[] bytes = (byte[]) expression.getValue();
        assertArrayEquals(bytes, "Hello World".getBytes());

        //嵌套访问:火车头
        expression = parser.parseExpression("'Hello World'.bytes.length");
        Integer length = (Integer) expression.getValue();
        Assertions.assertEquals(length, "Hello World".getBytes().length);

        //构造函数 toUpperCase可以不用加()
        expression = parser.parseExpression("new String('Hello World').toUpperCase()");
        msgStr = (String) expression.getValue();
        Assertions.assertEquals(msgStr, "HELLO WORLD");
    }

    /**
     * 测试根对象
     */
    @Test
    void testRootObject() {
        final var c = new GregorianCalendar();
        c.set(1956, Calendar.AUGUST, 9);

        final var tesla = new Inventor("Nikola Tesla", c.getTime(), "Serbian");
        final var parser = new SpelExpressionParser();

        // test value
        Expression exp = parser.parseExpression("name"); // Parse name as an expression
        String name = (String) exp.getValue(tesla);
        Assertions.assertEquals(name, "Nikola Tesla");

        //在SpEL中,== 运算符对于引用类型的比较实际上是调用了 equals 方法。
        // 这对于大多数场景来说是非常有用的,因为它允许你进行更灵活的值比较。
        exp = parser.parseExpression("name == new String('Nikola Tesla')");
        Boolean result = exp.getValue(tesla, Boolean.class);
        assertEquals(Boolean.TRUE, result);
        assertSame(name, "Nikola Tesla");
        assertNotSame(name, new String("Nikola Tesla"));
    }

    class Simple {
        public List<Boolean> booleanList = new ArrayList<>();
    }
}

上面的用例简单的测试了怎么获取值和调用方法的一些简单的操作,如果要获取到方法的参数,我们需要了解:MethodBasedEvaluationContext. 同时我也提供了一些测试方法, 并在测试类上有清晰的注释来解释这个类。

package com.iokays.common.spel;


import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.context.expression.MethodBasedEvaluationContext;
import org.springframework.core.DefaultParameterNameDiscoverer;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;

import java.lang.reflect.Proxy;
import java.util.Objects;

interface Samer {
    boolean isSame(String name, Integer age);
}

/**
 * 来源: https://www.cnblogs.com/wt20/p/17659683.html
 * <p>
 * MethodBasedEvaluationContext是StandardEvaluationContext的一个子类。
 * 它主要是把方法参数也加到了变量中,使得用户可以直接通过#+参数名来获取值。常常用于解析注解中的SpEL表达式。
 * <p>
 * java编译后通过反射是拿不到真实的方法参数名称的,需要带上-parameters参数编译才行,不过Spring还另外基于ASM的方式解析字节码文件,
 * 获取字节码的本地方法表来获取方法真实参数。DefaultParameterNameDiscoverer实现类同时使用上面所说的两种方式来获取方法参数名。
 */
public class MethodBasedEvaluationContextTest {

    @Test
    public void test() {
        ExpressionParser expressionParser = new SpelExpressionParser();

        // 代理对象
        Samer samer = (Samer) Proxy.newProxyInstance(
                MethodBasedEvaluationContextTest.class.getClassLoader(),
                new Class[]{Samer.class},
                (proxy, method, args) -> {
                    //root 对象, 当表达式使用: #root, #this 需要
                    final var root = new Object();

                    final var evaluationContext = new MethodBasedEvaluationContext(root, method, args, new DefaultParameterNameDiscoverer());

                    // 通过#+方法参数名
                    Expression expression = expressionParser.parseExpression("#name");
                    Assertions.assertEquals("andy", expression.getValue(evaluationContext));

                    // 通过# + 内置的变量名+下标a0
                    expression = expressionParser.parseExpression("#a0");
                    Assertions.assertEquals("andy", expression.getValue(evaluationContext));

                    // 通过# + 内置的变量名+下标p0
                    expression = expressionParser.parseExpression("#p0");
                    Assertions.assertEquals("andy", expression.getValue(evaluationContext));

                    // 通过# + 内置的变量名+下标p0
                    expression = expressionParser.parseExpression("#p1");
                    Assertions.assertEquals(18, expression.getValue(evaluationContext));

                    return Objects.equals("andy", args[0]) && Objects.equals(18, args[1]);
                }
        );

        samer.isSame("andy", 18);
    }
}

Spring Integration Redis

Spring Integration Redis 引入了 DefaultLockRegistry 。使用从 LockRegistry 实例获得的锁来确保一次只有一个线程操作一个组。 具体的实现,我可以引用包的的形式来确定自己分布式锁的方案。

第一步我们需要提供一个锁提供者的Bean, Redis:RedisLockRegistry; JDBC: JdbcLockRegistry.

    @Bean("sampleLock")
    public LockRegistry lockRegistry() {
        return new DefaultLockRegistry();
    }
package com.iokays.common.distributed.lock;


import jakarta.annotation.Resource;
import org.apache.commons.lang3.Validate;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.support.locks.LockRegistry;

import java.util.concurrent.TimeUnit;

@SpringBootTest
public class LockRegistryTest {

    @Resource
    private LockRegistry lockRegistry;

    /**
     * 简单锁测试
     */
    @Test
    public void testLock() {
        final var orderLock = lockRegistry.obtain("ORDER_NO_00001");
        orderLock.lock();
        System.out.println("执行操作");
        orderLock.unlock();
    }

    /**
     * 带有时间的加锁测试
     *
     * @throws InterruptedException
     */
    @Test
    public void testTryLock() throws InterruptedException {
        final var orderLock = lockRegistry.obtain("ORDER_NO_00002");

        // 1秒内尝试获取锁,失败返回false
        final var mark = orderLock.tryLock(1, TimeUnit.SECONDS);
        Validate.isTrue(mark, "获取锁失败");

        System.out.println("执行操作");
        orderLock.unlock();

    }
}

这是一个最简单的使用方式,大家可以利用多线程的场景对锁进行的测试。

分布式锁的实现

当我们了解这两种技术点后,在实现分布式锁就会简单,也只需要两步:

第一:提供基于分布式锁表达式计算方式,上面我们已经知道MethodBasedEvaluationContext, 直接继承:CachedExpressionEvaluator,并实现 createMethodBasedEvaluationContext这个方法即可。

package com.iokays.common.distributed.lock;

import com.google.common.collect.Maps;
import org.springframework.context.expression.AnnotatedElementKey;
import org.springframework.context.expression.CachedExpressionEvaluator;
import org.springframework.context.expression.MethodBasedEvaluationContext;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;

import java.lang.reflect.Method;
import java.util.Map;

/**
 * Distributed Expression Evaluator
 */
public class DistributedLockExpressionEvaluator extends CachedExpressionEvaluator {

    private final Map<ExpressionKey, Expression> cache = Maps.newHashMap();

    /**
     * Create MethodBasedEvaluationContext
     * MethodBasedEvaluationContext是StandardEvaluationContext的一个子类。它主要是把方法参数也加到了变量中,使得用户可以直接通过#+参数名来获取值。常常用于解析注解中的SpEL表达式。
     * <p>
     * java编译后通过反射是拿不到真实的方法参数名称的,需要带上-parameters参数编译才行,不过Spring还另外基于ASM的方式解析字节码文件,
     * 获取字节码的本地方法表来获取方法真实参数。DefaultParameterNameDiscoverer实现类同时使用上面所说的两种方式来获取方法参数名。
     *
     * @param method
     * @param args
     * @return
     */
    public EvaluationContext createMethodBasedEvaluationContext(Method method, Object[] args) {
        //当你的应用场景用 #root, 取决于你是否根据 root对象可以用来生成key 是需要rootObject的。
        final var rootObject = new DistributedLockExpressionRootObject(method, args);
        return new MethodBasedEvaluationContext(rootObject, method, args, this.getParameterNameDiscoverer());
    }

    public Object eval(String expression, AnnotatedElementKey elementKey, EvaluationContext evalContext) {
        return getExpression(cache, elementKey, expression).getValue(evalContext);
    }

}

第二:利用Spring AOP的环绕通知来实现分布式的加锁,释放锁操作。

package com.iokays.common.distributed.lock;

import com.iokays.common.core.lock.DistributedLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.aop.support.AopUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.expression.AnnotatedElementKey;
import org.springframework.core.BridgeMethodResolver;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.stereotype.Component;

import java.lang.reflect.Proxy;
import java.time.Duration;
import java.util.Objects;

/**
 * 分布式锁切面实现
 * 基于切面的实现,是因为在释放锁需在提交事务(声明注解性事务)之后。
 * 备注: 事务生效时机是在第一次读、写表(InnoDB)的操作开始。
 */
@Aspect
@Component
public class DistributedLockAspect {

    private static final Logger log = LoggerFactory.getLogger(DistributedLockAspect.class);

    /**
     * Spring 集成的 LockRegistry
     */
    private final ApplicationContext applicationContext;

    private final DistributedLockExpressionEvaluator evaluator = new DistributedLockExpressionEvaluator();

    public DistributedLockAspect(final ApplicationContext applicationContext) {
        this.applicationContext = Objects.requireNonNull(applicationContext);
    }

    /**
     * Spring 环绕通知 实现分布式锁调用
     */
    @Around("@annotation(distributedLock)")
    public Object around(final ProceedingJoinPoint pjp, final DistributedLock distributedLock) throws Throwable {
        Validate.notNull(pjp, "pjp is null");
        Validate.notNull(distributedLock, "distributedLock is null");
        Validate.noNullElements(new Object[]{distributedLock.key(), distributedLock.time(), distributedLock.unit()});

        final String key = getKey(distributedLock.key(), pjp);
        log.info("distributedLock: {}, key: {}", distributedLock, key);

        final var value = distributedLock.value();

        final var lockRegistry = StringUtils.isNotBlank(value) ?
                applicationContext.getBean(value, LockRegistry.class) : applicationContext.getBean(LockRegistry.class);

        return distributedLock.time() == 0L ?
                lockRegistry.executeLocked(key, () -> pjp.proceed()) :
                lockRegistry.executeLocked(key, Duration.of(distributedLock.time(), distributedLock.unit()), () -> pjp.proceed());
    }

    private String getKey(final String key, final ProceedingJoinPoint pjp) {
        final var signature = (MethodSignature) pjp.getSignature();
        final var method = BridgeMethodResolver.findBridgedMethod(signature.getMethod());

        final var target = pjp.getTarget();
        final var targetClass = AopProxyUtils.ultimateTargetClass(target);

        final var targetMethod = (!Proxy.isProxyClass(targetClass) ?
                AopUtils.getMostSpecificMethod(method, targetClass) : method);

        final var methodKey = new AnnotatedElementKey(targetMethod, targetClass);

        final var evaluationContext = evaluator.createMethodBasedEvaluationContext(targetMethod, pjp.getArgs());
        final Object result = evaluator.eval(key, methodKey, evaluationContext);
        return Objects.requireNonNull(result).toString();
    }

}

getKey方法:利用了AOP的一些方法,大家可以去了解下。

现在我直接使用测试类来演示:

package com.iokays.common.distributed.lock;

import com.iokays.common.core.lock.DistributedLock;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Service
public class SampleService {

    private final AtomicInteger counter = new AtomicInteger(0);

    @DistributedLock(value = "sampleLock", key = "#key", time = 1)
    public void run(final String key) {
        log.info("run: {}, counter: {}", key, counter.getAndIncrement());
    }

}
package com.iokays.common.distributed.lock;

import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.stream.Stream;


@SpringBootTest
class DistributedLockAspectTest {

    @Resource
    private SampleService service;

    @Test
    void test() {
        Stream.generate(() -> "key_1").limit(10).parallel().forEach(service::run);
    }

}

注意项

在应用服务层,这个分布式锁的注解和事务的注解一般是同时存在的,就出现了一个先后的问题,因为事务的生效开始时间是第一次调用SQL产生的,所以不管怎么处理,应用程序都是先加锁,再添加事务。但是在方法提交后,事务会第一时间结束,所以锁的释放在后面。可能导致在处理同一个数据的另一个方法获取不到锁,所以在获取锁的时候,我们可以使用带有时间的加锁方法,来尽量避免这个问题。

未完待续…​

基于Spring Integration对分布式锁的处理完了, 下篇将会介绍领域对象怎么使用Jackson来自定义序列化和反序列化.