redis分布式锁(手写)–尚硅谷分布式锁教程笔记
-
前期学的是不可重入锁的版本:
加锁用 redisTemplate.opsForValue().setIfAbsent("lock", uuid);
解锁用 redisTemplate.opsForValue().delete("lock");
问题一:加锁后就宕机导致死锁
解决一:给锁加一个过期时间
问题二:业务还在执行但锁已经过期了,前面业务可能释放后面的锁
解决二:解锁之前要判断是不是自己的锁
问题三:判断锁是否是自己的和解锁非原子操作
解决三:用Lua脚本将两步合为一步,保证原子性
问题四:不可重入
解决四:放弃这种原始的加锁解锁方式
问题五:锁过期导致线程并发
解决五:用Timer定时器实现自动续期
-
后期学的是可重入锁的版本:
用 Lua脚本和redis的hash结构,基本结构为lock:uuid+Thread.currentThread().getId() :count
1)加锁和解锁都用Lua脚本,脚本中判断各种可能的逻辑(有过期时间的设置,判断是不是自己的锁,是否是重入等),也保证了判断和加锁的原子性 解决上面讲的问题一二三四
2)Timer定时器 + lua脚本:实现锁的自动续期
判断锁是否自己的锁(hexists == 1),如果是自己的锁则执行expire重置过期时间 解决了上面的问题五
总结:所谓的分布式锁,就是用一个第三方的数据库,所有线程(包括同一个进程中的和不同进程中的)都访问的是这个数据库(这对于所有线程就是唯一的),使用逻辑锁就可以使各个线程之间不会出现并发问题
同一个进程用线程id区分,不同进程用uuid区分
最终版本
工厂方法,便于后期拓展
@Component
public class DistributedLockClient {
@Autowired
private StringRedisTemplate redisTemplate;
private String uuid;
public DistributedLockClient() {
this.uuid = UUID.randomUUID().toString();
// 这个UUID在spring容器初始化时就已经创建好了,保证了一个服务器的uuid都是一样的
// 同一个服务中用ThreadId区分
// 不同服务中首先uuid就不同了,就区分开了
}
public DistributedRedisLock getRedisLock(String lockName){
return new DistributedRedisLock(redisTemplate, lockName, uuid);
}
}
public class DistributedRedisLock implements Lock {
private StringRedisTemplate redisTemplate;
private String lockName;
private String uuid;
private long expire = 30; // 默认过期时间30ms
public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) {
this.redisTemplate = redisTemplate;
this.lockName = lockName;
this.uuid = uuid + ":" + Thread.currentThread().getId(); // 这步关键
}
@Override
public void lock() {
this.tryLock();
}
@Override
public boolean tryLock() {
try {
return this.tryLock(-1L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if (time != -1){
this.expire = unit.toSeconds(time);
}
// 还未上锁,已经上锁且是自己的
String script = "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +
"then " +
" redis.call('hincrby', KEYS[1], ARGV[1], 1) " +
" redis.call('expire', KEYS[1], ARGV[2]) " +
" return 1 " +
"else " +
" return 0 " +
"end";
while (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))){
Thread.sleep(50);
}
// 加锁成功,返回之前,开启定时器自动续期
this.renewExpire();
return true;
}
@Override
public void unlock() {
String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " +
"then " +
" return nil " +
"elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " +
"then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
Long flag = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuid);
if (flag == null){
throw new IllegalMonitorStateException("this lock doesn't belong to you!");
}
}
private void renewExpire(){
String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +
"then " +
" return redis.call('expire', KEYS[1], ARGV[2]) " +
"else " +
" return 0 " +
"end";
new Timer().schedule(new TimerTask() {
@Override
public void run() { // 开启的定时器是一个子线程,不能用上面的getId方法,而是直接在主线程中拼好uuid(该uuid是生成的uuid+线程数),然后这里用
if (redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))) {
renewExpire(); // 再次开启一次性任务,直到上面续期失败了就直接结束了
} // 另一种实现方式是用周期任务,在unlock中取消定时任务
}
}, this.expire * 1000 / 3);// 设置成只有延迟时间,是一次性的任务,就不需要思考结束任务的方式
}
}
版权声明:本文为qq_60313016原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
THE END