Skip to content

Latest commit

 

History

History
243 lines (189 loc) · 7.08 KB

分布式锁.md

File metadata and controls

243 lines (189 loc) · 7.08 KB

分布式锁

常见的实现方式

  • 基于数据库
  • 基于缓存(redis,memcached)
  • 基于Zookeeper

期望:

这把锁要是一把可重入锁(避免死锁)
这把锁最好是一把阻塞锁(根据业务需求考虑要不要这条)
有高可用的获取锁和释放锁功能
获取锁和释放锁的性能要好

Redis 实现分布式锁

setnx(key, value):SET if Not eXists,若该key-value不存在,则成功加入缓存并且返回1,否则返回0。
get(key):获得key对应的value值,若不存在则返回nil。
getset(key, value):先获取key对应的value值,若不存在则返回nil,然后将旧的value更新为新的value。
expire(key, seconds):设置key-value的有效期为seconds秒。
set(key, value, options)
options:

  • EX seconds -- Set the specified expire time, in seconds.
  • PX milliseconds -- Set the specified expire time, in milliseconds.
  • NX -- Only set the key if it does not already exist.
  • XX -- Only set the key if it already exist.

Redis原生命令setnx 可解决加锁,配合上面命令实现可超时锁,当然使用 set + options 更简单

在 Spring 的实现

redisTemplate.opsForValue().setIfAbsent(key,value,timeout,timeUnit);

对应 Redis 命令 SET key value EX/PX timeout NX

redisTemplate.opsForValue().setIfAbsent.setIfAbsent(key, value);

对应 Redis 命令 SETNX key value

实现代码 参考

Spring Data Redis + Spring AOP + Java annotation

RedisCacheStore.java

@Component
public class RedisCacheStore implements CacheStore<String, String> {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    public Optional<String> get(String key) {
        return Optional.of(redisTemplate.opsForValue().get(key));
    }

    @Override
    public void put(String key, String value, long timeout, TimeUnit timeUnit) {
        redisTemplate.opsForValue().set(key,value,timeout,timeUnit);
    }

    @Override
    public Boolean putIfAbsent(String key, String value, long timeout, TimeUnit timeUnit) {
        return redisTemplate.opsForValue().setIfAbsent(key,value,timeout,timeUnit);
    }

    @Override
    public void put(String key, String value) {
        redisTemplate.opsForValue().set(key,value);
    }

    @Override
    public void delete(String key) {
        redisTemplate.delete(key);
    }
}
@Slf4j
@Aspect
@Configuration
public class CacheLockInterceptor {

    private final static String CACHE_LOCK_PREFOX = "cache_lock_";

    private final static String CACHE_LOCK_VALUE = "locked";

    @Autowired
    private RedisCacheStore cacheStore;


    @Around("@annotation(pro.ifuture.lbt.cache.lock.CacheLock)")
    public Object interceptCacheLock(ProceedingJoinPoint joinPoint) throws Throwable {
        // Get method signature
        MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();

        log.debug("Starting locking: [{}]", methodSignature.toString());

        // Get cache lock
        CacheLock cacheLock = methodSignature.getMethod().getAnnotation(CacheLock.class);

        // Build cache lock key
        String cacheLockKey = buildCacheLockKey(cacheLock, joinPoint);

        log.debug("Built lock key: [{}]", cacheLockKey);


        try {
            // Get from cache
            Boolean cacheResult = cacheStore.putIfAbsent(cacheLockKey, CACHE_LOCK_VALUE, cacheLock.expired(), cacheLock.timeUnit());

            if (cacheResult == null) {
                throw new ServiceException("Unknown reason of cache " + cacheLockKey).setErrorData(cacheLockKey);
            }

            if (!cacheResult) {
                throw new FrequentAccessException("Visit too often, please try again later!").setErrorData(cacheLockKey);
            }

            // Proceed the method
            return joinPoint.proceed();
        } finally {
            // Delete the cache
            if (cacheLock.autoDelete()) {
                cacheStore.delete(cacheLockKey);
                log.debug("Deleted the cache lock: [{}]", cacheLock);
            }
        }
    }

    private String buildCacheLockKey(@NonNull CacheLock cacheLock, @NonNull ProceedingJoinPoint joinPoint) {
        Assert.notNull(cacheLock, "Cache lock must not be null");
        Assert.notNull(joinPoint, "Proceeding join point must not be null");

        // Get the method
        MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();

        // Build the cache lock key
        StringBuilder cacheKeyBuilder = new StringBuilder(CACHE_LOCK_PREFOX);

        String delimiter = cacheLock.delimiter();

        if (StringUtils.isNotBlank(cacheLock.prefix())) {
            cacheKeyBuilder.append(cacheLock.prefix());
        } else {
            cacheKeyBuilder.append(methodSignature.getMethod().toString());
        }

        // Handle cache lock key building
        Annotation[][] parameterAnnotations = methodSignature.getMethod().getParameterAnnotations();

        for (int i = 0; i < parameterAnnotations.length; i++) {
            log.debug("Parameter annotation[{}] = {}", i, parameterAnnotations[i]);

            for (int j = 0; j < parameterAnnotations[i].length; j++) {
                Annotation annotation = parameterAnnotations[i][j];
                log.debug("Parameter annotation[{}][{}]: {}", i, j, annotation);
                if (annotation instanceof CacheParam) {
                    // Get current argument
                    Object arg = joinPoint.getArgs()[i];
                    log.debug("Cache param args: [{}]", arg);

                    // Append to the cache key
                    cacheKeyBuilder.append(delimiter).append(arg.toString());
                }
            }
        }

        if (cacheLock.traceRequest()) {
            // Append http request info
            cacheKeyBuilder.append(delimiter).append(ServletUtils.getRequestIp());
        }

        return cacheKeyBuilder.toString();
    }
}
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface CacheLock {

    /**
     * Cache prefix, default is ""
     *
     * @return cache prefix
     */
    @AliasFor("value")
    String prefix() default "";

    /**
     * Alias of prefix, default is ""
     *
     * @return alias of prefix
     */
    @AliasFor("prefix")
    String value() default "";

    /**
     * Expired time, default is 5.
     *
     * @return expired time
     */
    long expired() default 5;

    /**
     * Time unit, default is TimeUnit.SECONDS.
     *
     * @return time unit
     */
    TimeUnit timeUnit() default TimeUnit.SECONDS;

    /**
     * Delimiter, default is ':'
     *
     * @return delimiter
     */
    String delimiter() default ":";

    /**
     * Whether delete cache after method invocation.
     *
     * @return true if delete cache after method invocation; false otherwise
     */
    boolean autoDelete() default true;

    /**
     * Whether trace the request info.
     *
     * @return true if trace the request info; false otherwise
     */
    boolean traceRequest() default false;
}