Fork me on GitHub

SpringBoot2.X使用Redis实现分布式锁机制

目录

我们工作中经常会使用分布式锁,今天就在这记录一下,Spring Cloud官方 说自己实现了 Global Locks ,但又无法找到任何相关文档的原因——人家早把相关代码搬迁到Spring Integration了。

对Spring Integration不是很熟悉,简单介绍一下——官方说法,这是一个 企业集成模式 的实现;通俗地说,Spring Integration的定位是一个轻量级的ESB,尽管它做了很多ESB不做的事情。顺便说一下,Spring Cloud Stream的底层也是Spring Integration。

Spring Integration提供的全局锁目前为如下存储提供了实现:

它们使用相同的API抽象——这正是Spring最擅长的。这意味着,不论使用哪种存储,你的编码体验是一样的,有一天想更换实现,只需要修改依赖和配置就可以了,无需修改代码

redis实现分布式锁,实现了Lock接口,和ReentrantLock,有可重入,阻塞等功能

使用

依赖

1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

写配置:

注:这里我们也可以使用redis集群链接或者哨兵模式

1
2
3
4
spring:
redis:
port: 6379
host: localhost

初始化

1
2
3
4
5
6
7
@Configuration
public class RedisLockConfiguration {
@Bean
public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) {
return new RedisLockRegistry(redisConnectionFactory, "spring-cloud");
}
}

举例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@RestController
public class RedisController {

@Autowired
private RedisLockRegistry redisLockRegistry;

@RequestMapping("/redisLock")
public String redisLock(Integer id){
//redis的key冒号:连接
//registryKey和lockKey自动冒号连接,最终key为REDIS_LOCK:USER_ID:1,值为uuid
Lock lock = redisLockRegistry.obtain("USER_ID:" + id);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
lock.lock();
System.out.println(Thread.currentThread().getName() + " begin " + new Date());
// 自己的业务逻辑
System.out.println(Thread.currentThread().getName() + " end " + new Date());
lock.unlock();
}).start();
}

return "ok";
}

}

源码分析

ExpirableLockRegistry接口,添加一个过期释放锁的方法

1
2
3
4
5
6
7
8
9
10
public interface ExpirableLockRegistry extends LockRegistry {

/**
* Remove locks last acquired more than 'age' ago that are not currently locked.
* @param age the time since the lock was last obtained.
* @throws IllegalStateException if the registry configuration does not support this feature.
*/
void expireUnusedOlderThan(long age);

}

LockRegistry接口,只有一个获取锁的方法

1
2
3
4
5
6
7
8
9
10
11
@FunctionalInterface
public interface LockRegistry {

/**
* Obtains the lock associated with the parameter object.
* @param lockKey The object with which the lock is associated.
* @return The associated lock.
*/
Lock obtain(Object lockKey);

}

RedisLockRegistry构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private static final long DEFAULT_EXPIRE_AFTER = 60000L;

private final String registryKey;

private final StringRedisTemplate redisTemplate;

private final RedisScript<Boolean> obtainLockScript;

private final long expireAfter;

private static final String OBTAIN_LOCK_SCRIPT =
"local lockClientId = redis.call('GET', KEYS[1])\n" +
"if lockClientId == ARGV[1] then\n" +
" redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
" return true\n" +
"elseif not lockClientId then\n" +
" redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])\n" +
" return true\n" +
"end\n" +
"return false";

/**
* Constructs a lock registry with the default (60 second) lock expiration.
* @param connectionFactory The connection factory.
* @param registryKey The key prefix for locks.
*/
public RedisLockRegistry(RedisConnectionFactory connectionFactory, String registryKey) {
this(connectionFactory, registryKey, DEFAULT_EXPIRE_AFTER);
}

/**
* Constructs a lock registry with the supplied lock expiration.
* @param connectionFactory The connection factory.
* @param registryKey The key prefix for locks.
* @param expireAfter The expiration in milliseconds.
*/
public RedisLockRegistry(RedisConnectionFactory connectionFactory, String registryKey, long expireAfter) {
Assert.notNull(connectionFactory, "'connectionFactory' cannot be null");
Assert.notNull(registryKey, "'registryKey' cannot be null");
this.redisTemplate = new StringRedisTemplate(connectionFactory);
this.obtainLockScript = new DefaultRedisScript<>(OBTAIN_LOCK_SCRIPT, Boolean.class);
this.registryKey = registryKey;
this.expireAfter = expireAfter;
}

获取锁

1
2
3
4
5
6
7
private final Map<String, RedisLock> locks = new ConcurrentHashMap<>();
@Override
public Lock obtain(Object lockKey) {
Assert.isInstanceOf(String.class, lockKey);
String path = (String) lockKey;
return this.locks.computeIfAbsent(path, RedisLock::new);
}

Map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
default V computeIfAbsent(K key,
Function<? super K, ? extends V> mappingFunction) {
Objects.requireNonNull(mappingFunction);
V v;
if ((v = get(key)) == null) {
V newValue;
if ((newValue = mappingFunction.apply(key)) != null) {
put(key, newValue);
return newValue;
}
}

return v;
}

default V putIfAbsent(K key, V value) {
V v = get(key);
if (v == null) {
v = put(key, value);
}

return v;
}

每个lockKey创建一个锁,缓存起来
computeIfAbsent和putIfAbsent的区别是,前者是一个函数式接口,创建对象,作为缓存的值,后者是直接传进来值

上锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

@Override
public void lock() {
this.localLock.lock();
while (true) {
try {
while (!obtainLock()) {
Thread.sleep(100); //NOSONAR
}
break;
}
catch (InterruptedException e) {
/*
* This method must be uninterruptible so catch and ignore
* interrupts and only break out of the while loop when
* we get the lock.
*/
}
catch (Exception e) {
this.localLock.unlock();
rethrowAsLockException(e);
}
}
}

private final String clientId = UUID.randomUUID().toString();
private boolean obtainLock() {
boolean success = RedisLockRegistry.this.redisTemplate.execute(RedisLockRegistry.this.obtainLockScript,
Collections.singletonList(this.lockKey), RedisLockRegistry.this.clientId,
String.valueOf(RedisLockRegistry.this.expireAfter));
if (success) {
this.lockedAt = System.currentTimeMillis();
}
return success;
}

先用ReentrantLock加锁,再用redis调用lua脚本

1
2
3
4
5
6
7
8
9
10
private static final String OBTAIN_LOCK_SCRIPT =
"local lockClientId = redis.call('GET', KEYS[1])\n" +
"if lockClientId == ARGV[1] then\n" +
" redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
" return true\n" +
"elseif not lockClientId then\n" +
" redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])\n" +
" return true\n" +
"end\n" +
"return false";

如果lockKey没有值,设置值,过期时间60秒。否则是线程重入锁,刷新过期时间60秒
redis加锁成功后,每个线程保存加锁时间

如果加锁失败,100毫秒重试,一直循环到获取锁,所以锁是可重入的。

释放锁

RedisLockRegistry.RedisLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

@Override
public void unlock() {
if (!this.localLock.isHeldByCurrentThread()) {
throw new IllegalStateException("You do not own lock at " + this.lockKey);
}
if (this.localLock.getHoldCount() > 1) {
this.localLock.unlock();
return;
}
try {
if (Thread.currentThread().isInterrupted()) {
RedisLockRegistry.this.executor.execute(this::removeLockKey);
}
else {
removeLockKey();
}

if (logger.isDebugEnabled()) {
logger.debug("Released lock; " + this);
}
}
catch (Exception e) {
ReflectionUtils.rethrowRuntimeException(e);
}
finally {
this.localLock.unlock();
}
}

private void removeLockKey() {
if (RedisUtils.isUnlinkAvailable(RedisLockRegistry.this.redisTemplate)) {
RedisLockRegistry.this.redisTemplate.unlink(this.lockKey);
}
else {
RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
}
}

ReentrantLock保存了上锁的线程,和线程的重入次数,如果是重入锁,计数器减一,即aqs的state减一,否则redis删除key,然后释放ReentrantLock锁。

相关文章

微信打赏

赞赏是不耍流氓的鼓励