赞
踩
可以和 黑马程序员Redis 这门课配合观看
分布式锁:满足「分布式系统」或「集群模式」下多进程可见并且互斥的锁。
分布式锁的核心思想就是让大家都使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路。
那么分布式锁应该满足一些什么样的条件呢?
常见的分布式锁有三种:
实现分布式锁时需要实现的两个基本方法:
核心思路:我们利用 redis 的 setnx,当有多个线程进入时,我们就利用改方法,第一个线程进入时,redis 中就有这个 key 了,返回了1,如果结果是1,则表示抢到了锁,然后去执行任务,在然后去释放锁,推出锁逻辑。如果没有抢到锁,等待一定时间重试即可。
锁的基本接口:
public interface ILock {
/**
* 尝试获取锁
*
* @param timeoutSec 锁只有的超时时间,过期后自动释放锁
* @return true表示获取成功,否则失败
*/
boolean tryLock(long timeoutSec);
/**
* 释放锁
*/
void unlock();
}
实现类:SimpleRedisLock
利用setnx方法进行加锁,同时增加过期时间,防止死锁,此方法可以保证加锁和增加过期时间具有原子性
package com.hmdp.utils; import cn.hutool.core.lang.UUID; import org.springframework.core.io.ClassPathResource; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import java.util.Collections; import java.util.concurrent.TimeUnit; public class SimpleRedisLock implements ILock { // key private String name; private StringRedisTemplate stringRedisTemplate; private static final String KEY_PREFIX = "lock:"; // 不同的JVM会有不同的UUID,这样不同的服务UUID不同 // 在拼接上线程id,不同的线程 线程id不同 private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-"; public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) { this.name = name; this.stringRedisTemplate = stringRedisTemplate; } // 获取锁 @Override public boolean tryLock(long timeoutSec) { String threadId = ID_PREFIX + String.valueOf(Thread.currentThread().getId()); // 获取锁 Boolean ok = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + this.name, threadId, timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(ok); } // 释放锁 @Override public void unlock() { stringRedisTemplate.delete(KEY_PREFIX + this.name); } }
修改业务代码:
@Override public Result seckillVoucher(Long voucherId) { // 1.查询优惠券 SeckillVoucher voucher = seckillVoucherService.getById(voucherId); // 2.判断秒杀是否开始 if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { // 尚未开始 return Result.fail("秒杀尚未开始!"); } // 3.判断秒杀是否已经结束 if (voucher.getEndTime().isBefore(LocalDateTime.now())) { // 尚未开始 return Result.fail("秒杀已经结束!"); } // 4.判断库存是否充足 if (voucher.getStock() < 1) { // 库存不足 return Result.fail("库存不足!"); } Long userId = UserHolder.getUser().getId(); //创建锁对象(新增代码) SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate); //获取锁对象 boolean isLock = lock.tryLock(1200); //加锁失败 if (!isLock) { return Result.fail("不允许重复下单"); } try { //获取代理对象(事务) IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { //释放锁 lock.unlock(); } }
逻辑说明:持有锁的线程在锁的内部出现了阻塞,导致他的锁自动释放了,这时线程2来尝试获得锁,由于线程1已经释放,所以线程2就拿到了锁,然后线程2在持有锁的过程中,线程1反应过来,继续执行,而线程1执行过程中,走到了删除锁的逻辑,此时就会把本应该属于线程2的锁进行删除,这就是误删别人锁的情况。一旦把锁给误删了,那么线程3此时就可以获得到锁,这样就可能出现线程并发问题(一个人在数据库有两条记录)。
解决方案:既然删除的时候可能存在删除别人锁的问题,那我们可以在删的时候判断是不是自己的不就可以了吗,如果是自己的锁,那就锁,不是自己的锁就不能删。那上诉问题就解决了,线程1删除锁的时候发现不是自己的就不在进行删除操作。只有线程2才能删这把锁。
需求:修改之前的分布式锁实现,解决误删问题
满足:在获取锁的时存入线程标识(可以用UUID表示),在释放锁时先获取锁中的线程标识,判断是否与当前线程标识一致:
具体代码如下:
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-"; @Override public boolean tryLock(long timeoutSec) { // 获取线程标示 String threadId = ID_PREFIX + Thread.currentThread().getId(); // 获取锁 Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } @Override public void unlock() { // 获取线程标示 String threadId = ID_PREFIX + Thread.currentThread().getId(); // 获取锁中的标示 String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); // 判断标示是否一致 if(threadId.equals(id)) { // 释放锁 stringRedisTemplate.delete(KEY_PREFIX + name); } }
更为极端的误删问题:
线程1现在持有锁之后,在执行业务逻辑过程中,他正准备删除锁,而且已经走到了条件判断的过程中,比如他已经判断当前这把锁确实属于他自己的,正准备删除锁,但是突然该系统卡顿了,过了一会锁过期了,那么此时线程2过来并获得到了锁,存的是自己的线程标识,但是此时线程1卡顿结束,然后直接删除了属于线程2的锁,相当于条件判断并没有起作用,这就是删除锁的原子性问题,之所以有这个问题,是因为线程1的获取锁、判断锁和删除锁是三个操作操作,并不是原子性的,所以我们要防止刚才的情况发生。
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一 种编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html,这里 重点介绍Redis提供的调用函数,我们可以使用lua去操作redis,又能保证他的原子性,这样就可以实现 拿锁比锁删锁是一个原子性动作了,作为Java程序员这一块并不作一个简单要求,并不需要大家过于精通,只需要知道他有什么作用即可。
这里重点介绍Redis提供的调用函数,语法如下:
redis.call('命令名称', 'key', '其它参数', ...)
例如,我们要执行set name jack,则脚本是这样:
# 执行 set name jack
redis.call('set', 'name', 'jack')
例如,我们要先执行set name Rose,再执行get name,则脚本如下:
# 先执行 set name jack
redis.call('set', 'name', 'Rose')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name
接下来我们回忆一下释放锁的逻辑:
如果用Lua脚本则是这样的:
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0
ua脚本本身并不需要大家花费太多时间去研究,只需要知道如何调用,大致是什么意思即可,所以在笔记中并不会详细的去解释这些lua表达式的含义。
我们的RedisTemplate中,可以利用execute方法去执行lua脚本,参数对应关系就如下图:
Java代码
package com.hmdp.utils; import cn.hutool.core.lang.UUID; import org.springframework.core.io.ClassPathResource; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import java.util.Collections; import java.util.concurrent.TimeUnit; public class SimpleRedisLock implements ILock { // key private String name; private StringRedisTemplate stringRedisTemplate; private static final String KEY_PREFIX = "lock:"; // 不同的JVM会有不同的UUID,这样不同的服务UUID不同 // 在拼接上线程id,不同的线程 线程id不同 private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-"; private static final DefaultRedisScript<Long> UNLOCK_SCRIPT; static { UNLOCK_SCRIPT = new DefaultRedisScript<>(); UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua")); UNLOCK_SCRIPT.setResultType(Long.class); } public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) { this.name = name; this.stringRedisTemplate = stringRedisTemplate; } @Override public boolean tryLock(long timeoutSec) { String threadId = ID_PREFIX + Thread.currentThread().getId(); // 获取锁 Boolean ok = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + this.name, threadId, timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(ok); } @Override public void unlock() { // 调用lua脚本 stringRedisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + this.name), ID_PREFIX + Thread.currentThread().getId()); // // 获取线程表示 // String threadId = ID_PREFIX + String.valueOf(Thread.currentThread().getId()); // String id = stringRedisTemplate.opsForValue().get(threadId); // if (threadId.equals(id)) { // // 释放锁 // stringRedisTemplate.delete(KEY_PREFIX + this.name); // } } }
基于Redis的分布式锁实现思路:
我么一路走来,利用添加过期时间来防止死锁问题的发生(假如不设置过期时间,一个线程获取锁后系统崩了,那么这个锁永远不会被释放,所以需要添加过期时间),但有了过期时间之后,可能出现误删别人锁的问题,这个问题我们是通过在删锁之前判断是不是自己的锁来解决,但还存在原子性问题,也就是我们无法保证「拿锁、比锁、删锁」是一个原子性的动作,最后通过Lua表达式来解决这个问题。
但是目前还剩下一个问题:「锁不住」,什么是锁不住呢?就是当锁的过期时间到了之后,我还想继续拥有锁,那我可不可以续期呢? 就好像在网吧上网,网费到了之后,然后可以续费一样。这样的话锁就可以一直拥有,就不会在有什么误删锁的问题。那要怎么解决呢,这就要以来接下来要学习的「redission」了。
基于 「set nx」 实现的分布式锁存在下面的问题:
重入问题:重入问题是指 获取锁的线程可以再次进入到相同的代码块中,可重入锁的意义在于防止死锁,比如 HashTable 这样的代码中,他的方法都是使用 synchronized 修饰的,假如他在方法1内,调用方法2,那么此时如果是不可重入的,会导致死锁,因为方法2会等待方法1释放锁,但方法1需要等方法2执行完后才会释放锁,这样就发生了死锁,死锁存在的四个条件:互斥、持有并等待、不可剥夺、环路等待。所以可重入锁的主要意义就是防止死锁,我们的 synchronized 和 Lock 锁都是可重入的。
不可重试:是指当前的分布式只能尝试一次,我们认为合理的情况:当线程获得锁是啊比后,他应该再次尝试获得锁。
超时释放:我们在加锁时增加了过期时间,这样我们可以防止死锁,但是如果卡顿的时间超厂,虽然我们采用了lua表达式防止删锁的时候,误删别人的锁,但是毕竟没有锁住,有安全隐患。
**主从一致性:**如果Redis提供了主从集群,当我们向集群中写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。
那么什么是Redission呢
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
Redission提供了分布式锁的多种多样的功能
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
@Resource private RedissionClient redissonClient; @Test void testRedisson() throws Exception{ //获取锁(可重入),指定锁的名称 RLock lock = redissonClient.getLock("anyLock"); //尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位 boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS); //判断获取锁成功 if(isLock){ try{ System.out.println("执行业务"); }finally{ //释放锁 lock.unlock(); } } }
@Resource private RedissonClient redissonClient; @Override public Result seckillVoucher(Long voucherId) { // 1.查询优惠券 SeckillVoucher voucher = seckillVoucherService.getById(voucherId); // 2.判断秒杀是否开始 if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { // 尚未开始 return Result.fail("秒杀尚未开始!"); } // 3.判断秒杀是否已经结束 if (voucher.getEndTime().isBefore(LocalDateTime.now())) { // 尚未开始 return Result.fail("秒杀已经结束!"); } // 4.判断库存是否充足 if (voucher.getStock() < 1) { // 库存不足 return Result.fail("库存不足!"); } Long userId = UserHolder.getUser().getId(); //创建锁对象 这个代码不用了,因为我们现在要使用分布式锁 //SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate); RLock lock = redissonClient.getLock("lock:order:" + userId); //获取锁对象 boolean isLock = lock.tryLock(); //加锁失败 if (!isLock) { return Result.fail("不允许重复下单"); } try { //获取代理对象(事务) IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { //释放锁 lock.unlock(); } }
我们之前使用Redis的 set nx
来加锁,但是由于 set nx
是互斥的,它是通过判断这个 key 是否存在来判断能不能加锁的,一旦加了锁,就不可能在被加第二次,所以是不可重入的。
通过下面这段代码我们来分析一下:
private RLock lock = null; @BeforeEach void setUp() { lock = redissonClient.getLock("lock"); } @Test void method01() { boolean ok = lock.tryLock(); if (!ok) { System.out.println("获取锁失败:1"); return; } try { System.out.println("获取锁成功:1"); method02(); } finally { System.out.println("释放锁:1"); lock.unlock(); } } void method02() { boolean ok = lock.tryLock(); if (!ok) { System.out.println("获取锁失败:2"); return; } try { System.out.println("获取锁成功:2"); method02(); } finally { System.out.println("释放锁:2"); lock.unlock(); } }
在一个线程中,连续连续获取锁,就表示锁的重入了。
在Lock锁中,它是借助于底层的一个 voaltile 的一个 state 变量来记录重入的状态的,比如当前没有人持有这把锁,那么 state=0,加入有人持有这把锁,那么 state=1,如果持有这把锁的人再次持有这把锁,那么state就会+1,如果是对于synchronized而言,他在c语言代码中会有一个count,原理和state类似,也是重入一次就加一,释放一次就-1 ,直到减少成0 时,表示当前这把锁没有被人持有。
在 redisson 中,我们也可以支持可重入锁,在是怎么实现的吗?
首先我们的目的是想在一个锁上加一个计数器,来记录这把锁已经被重入多少次,那redis中有什么数据结构可以直接拿来用吗,嘿嘿,我们肯定想到了 Hash,Hash就完美实现了这个问题。
在分布式锁中,他采用hash结构用来存储锁,其中大key表示表示这把锁是否存在,用小key表示当前这把锁被哪个线程持有。
此时,我们每加锁一次就会让value值+1,每释放锁一次就会让 value 值 -1。这样我们就是实现了可重入锁。当value值等于0时,表面这把锁已经没有人拥有,所以就可以删除掉这把锁。
由于加锁和删锁有多个操作,所以我们需要使用 Lua 脚本来保证原子性。
加锁:
local key = KEY[1] -- 锁的key local threadId = ARGV[1] -- 线程唯一标识 local releaseTime = ARGV[2] -- 锁的自动释放时间 -- 判断锁是否存在 if (redis.call('exists', key) == 0) then -- 不存在,获取锁,并设置 value 为 1 redis.call('hset', key, threadId, 1) -- 设置有效期 redis.call('expire', key, releaseTime) -- 返回结果 return 1 end -- 锁已经存在,判断threadId是否是自己 if (redis.call('hexists', key, threadId) == 1) then -- 是自己,重入次数+1releaseTime redis.call('hincrby', key, threadId, '1') -- 设置有效期 redis.call('expire', key, releaseTime) -- 返回结果 return 1 end -- 当前锁不是自己的,获取锁失败 return 0
删锁:
local key = KEY[1] -- 锁的key local threadId = ARGV[1] -- 线程唯一标识 local releaseTime = ARGV[2] -- 锁的自动释放时间 -- 判断当前锁是否是自己的 if (redis.call('hexists', key, threadId) == 0) then -- 如果锁不是自己的,则直接返回 return 0 end -- 是自己的,则重入次数-1 local count = redis.call('hincrby', key, threadId, '-1') -- 判断重入次数是否为0 if (count > 0) then -- 大于0说明不能释放锁,重置有效期后返回 redis.call('expire', key, releaseTime) return nil else -- 等于0说明可以释放锁,直接删除 redis.call('del', key) end
我们来看一下源码:
// 方法名翻译:异步尝试获取锁 private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1) { // 当超时时间不等于-1时,直接去获取锁 return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN); } // 超时时间等于-1时,走的时看门狗机制 // 这里默认超时时间是三十秒,但是会不断重新设置超时时间,怎么做到的呢,看下面的代码 // 这里也是异步调用的 RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime, commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN); // 当异步调用结束之后,我们去看获取锁是否成功 // ttlRemaining 表示剩余有效期 // e 表示发生的异常 ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // 获取锁成功 // lock acquired if (ttlRemaining) { // 自动续约过期时间 scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }
获取锁的代码:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", // 这里返回当前锁的剩余时间,后面会用到这里 Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
释放锁的代码
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + // 发布一个订阅,表示这把锁已经被删除,后面会用到这里 "return 1; " + "end; " + "return nil;", Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
尝试获取锁的代码:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); // 等待时间 long current = System.currentTimeMillis(); // 当前系统时间 long threadId = Thread.currentThread().getId(); // 当前线程id Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // 尝试获取锁 // lock acquired if (ttl == null) { // 当ttl==null表示获取锁成功,返回true return true; } // 此时ttl != null ,说明当有这把锁存在但是线程id不是自己,那么就需要等待别人释放这把锁 time -= System.currentTimeMillis() - current; // 用等待时间减去获取锁所花费的时间就是剩余等待的时间 // 如果当前时间已经用完,说明等待期限到了,就没办法获取锁了 if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // 从这里开始就是 锁重试 的代码 // 记录当前时间 current = System.currentTimeMillis(); // 这里因为我们刚刚获取锁失败,所以不会再次去获取锁,因为此时大概率锁的持有者还在 // 执行业务,所以这里再去抢锁没有意义,浪费使劲、浪费资源。 // 为了不让CPU一直去忙着枪锁,我们可以发布一个订阅,订阅该线程的锁是否释放 // 如果释放我再去抢锁,这样就不会耽误大量时间了。 // 还记得上面删除锁里有一段发布订阅的通知吗,我们这里就是订阅那个通知。 RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); // 尝试等待订阅的通知 // 这里的时间是获取锁的等待时间 // 如果在剩余时间内订阅完成返回true,否则返回false if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { // 走到这里就表示订阅失败 // 那就可以取消订阅了 if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) { // 取消订阅 unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(waitTime, unit, threadId); // 返回失败结果 return false; } // 走到这里就表示订阅通知成功,锁已经被释放了,那么我们就可以来获取锁了。 try { // 等待时间减去订阅通知所消耗的时间 time -= System.currentTimeMillis() - current; // 时间不够了,那么就返回失败 if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // 这里就开始尝试多次获取锁,所以用了一个死循环 while (true) { // 获取当前时间 long currentTime = System.currentTimeMillis(); // 尝试去获取锁 ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired // 锁已经获得,返回true if (ttl == null) { return true; } // 减去尝试获取锁的时间 time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // 这里因为又没有获取到锁,说明锁已经被其他线程抢到了,所以需要再次订阅通知 // waiting for message currentTime = System.currentTimeMillis(); // 如果那把锁还有剩余存活时间,并且小于我当前的等待时间 // 表示我只需要使用那把锁的剩余存活时间就可以收到通知 if (ttl >= 0 && ttl < time) { subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { // 否则表示我还只能用我剩下的等待时间去尝试订阅通知 subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // 走到这里说明等待时间还有没用完,那就再次尝试获取锁,再一次循环。 } } finally { // 取消订阅 unsubscribe(subscribeFuture, threadId); } // return get(tryLockAsync(waitTime, leaseTime, unit)); }
private void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); // getEntryName():获取当前锁的名称 // entry :当前锁的过期节点 // EXPIRATION_RENEWAL_MAP:ConcurrentMap,用来存放所有的锁 // putIfAbsent 如果这把锁不存在,才能放进去,这样就能保证同一把锁只有一个过期节点 ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { // 这里表示这把锁已经存在了,那么这里就是重入锁,然后就把当前线程id加入到旧锁中去 // 那就不需要再次启动续期任务 oldEntry.addThreadId(threadId); } else { // 这里表示是一个新锁 // 那就需要开启一个续期任务 entry.addThreadId(threadId); renewExpiration(); } }
private void renewExpiration() { // 从集合中获取这把锁的过期集合 ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } // Timeout :定时任务 // new TimerTask():任务 // delay:internalLockLeaseTime / 3,多长时间单位执行一次,这里给的就是我们之前默认时间30/3=10秒 // 这里就是表示我们每10秒钟执行一次任务。 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } // 得到线程id Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } RFuture<Boolean> future = renewExpirationAsync(threadId); // 执行完后查看结果 future.onComplete((res, e) -> { // 如果有异常 if (e != null) { log.error("Can't update lock " + getName() + " expiration", e); return; } // 重置有效期成功 if (res) { // 再次续约,递归当前函数 // 那么就会一直执行下去,无穷无尽,直到锁被释放取消掉该定时任务或者服务宕机 // reschedule itself renewExpiration(); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // 将定时任务存入当前过期节点中 ee.setTimeout(task); }
重置有效期:
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 如果是当前线程id拥有的锁,那就去更新有效期
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
释放锁
public RFuture<Void> unlockAsync(long threadId) { RPromise<Void> result = new RedissonPromise<Void>(); // 去释放锁 RFuture<Boolean> future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> { // 释放完之后,去删掉续约中的集合中的当前线程的过期节点 cancelExpirationRenewal(threadId); if (e != null) { result.tryFailure(e); return; } if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return; } result.trySuccess(null); }); return result; }
void cancelExpirationRenewal(Long threadId) { // 从集合中得到这个锁的过期节点 ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (task == null) { return; } if (threadId != null) { // 从集合中删掉这把锁 task.removeThreadId(threadId); } if (threadId == null || task.hasNoThreads()) { // 同时也要把这把锁里面的定时任务删除 Timeout timeout = task.getTimeout(); if (timeout != null) { timeout.cancel(); } // 从集合中删除这个锁的过期节点 EXPIRATION_RENEWAL_MAP.remove(getEntryName()); } }
public static class ExpirationEntry { // Map 用来存这把锁的对应的线程id的重入次数 // key:线程id // value:重入次数 // 这里集合只会存一个线程id,但我不知道为啥要用一个Map来存,不可以用两个变量吗? // 可能这样更方便? private final Map<Long, Integer> threadIds = new LinkedHashMap<>(); // 存这把锁的定时任务(看门狗机制) private volatile Timeout timeout; public ExpirationEntry() { super(); } public synchronized void addThreadId(long threadId) { // 更新当前锁的重入次数 // 如果是第一次获取锁,那么重入次数=1 // 否则 重入次数+1 Integer counter = threadIds.get(threadId); if (counter == null) { counter = 1; } else { counter++; } threadIds.put(threadId, counter); } public synchronized boolean hasNoThreads() { return threadIds.isEmpty(); } public synchronized Long getFirstThreadId() { if (threadIds.isEmpty()) { return null; } // 获取当前锁的线程id return threadIds.keySet().iterator().next(); } public synchronized void removeThreadId(long threadId) { // 锁的重入次数-1 // 如果为0,则从集合中删掉这把锁 Integer counter = threadIds.get(threadId); if (counter == null) { return; } counter--; if (counter == 0) { threadIds.remove(threadId); } else { threadIds.put(threadId, counter); } } public void setTimeout(Timeout timeout) { this.timeout = timeout; } // 获取定时任务 public Timeout getTimeout() { return timeout; } }
流程图:
我们来分析一下上诉流程:
如果是无参的 tryLock() 方法,那么就没有重试机制,只有看门狗机制
如果是有参的 tryLock(waitTime,TimeUnit),既有重试机制,又有看门狗机制
如果是有参的 tryLocak(long waitTime, long leaseTime, TimeUnit unit),只有重试机制,没有看门狗机制。
重试机制:
总的来说,就是在waitTime时间返回内多次尝试获取锁,获取失败也不会傻等,而是会去订阅这把锁的通知,一旦那把锁释放,就会发布一个通知,当收到这个通知后我在去获取锁,这样就实现了重试也提高了性能。
看门狗机制:如果是没有传入时间,则此时也会进行抢锁, 而且抢锁时间是默认看门狗时间30s,ttlRemainingFuture.onComplete((ttlRemaining, e)
这句话相当于对以上抢锁进行了监听,也就是说当上边抢锁完毕后,此方法会被调用,具体调用的逻辑就是去后台开启一个线程,进行续约逻辑,也就是看门狗线程。
看门狗线程会开启一个定时任务:每10s会行重置锁的有效期,时间一到,这个定时任务就触发了,它就会去续约,把当前这把锁续约成30s,如果操作成功,那么此时就会递归调用自己,在重新设置一个定时任务,于是在过10s又会去续约,完成不停的续约,这样锁就不会过期了。
直到什么时候这个定时任务会停止呢,当这把锁被完全释放的时候就会被删除。或者服务宕机了。
主从一致性问题:如果Redis提够了主从集群,主从同步存在延迟,当主服务宕机时,如果从并没有同步主中的数据,则会出现锁失效。
如果我们只有一个Redis主机,那么如果这个Redis主机发生了故障,那么所以需要Redis服务的都会发生问题,也包括分布式锁。
所以为了解决单主机的问题,我们需要搭建一个Redis集群。主从集群就是有一些主机器用来做写操作,从机器用来做读操作,主机器一旦有命令进来,那么从机器就会去同步主机器上的数据,来保证主从一致性问题。
假设我们现在有三个机器,一个是主,两个从。此时我们去写命令,写在主机上,主机会将数据同步给从机,但是假设还没有来得及把数据写入到从机去的时候,此时主机宕机,哨兵会发现主机宕机,并让一个从变成主,而此时新的主实际上并没有锁信息,此时锁信息就已经丢掉了。
为了解决这个问题,redission提出来了MutiLock锁,使用这把锁咱们就不使用主从了,每个节点的地位都是一样的, 这把锁 加锁的逻辑需要写入到每一个主从节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。因为有的节点上已经有锁了,有的因为挂了从节点就拿不到锁就不能够成功。
源码解析:
lock = redissonClient.getMultiLock(lock);
lock.lock();
调用几次之后走进这个方法:
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { // 基础等待时间 :redis节点个数 * 1500 // 表示每个锁都有1500毫秒的获取锁时间 long baseWaitTime = locks.size() * 1500; long waitTime = -1; // 这里就是一些优化锁的等待时间的逻辑 // 计算最合理的等待时间 if (leaseTime == -1) { waitTime = baseWaitTime; } else { leaseTime = unit.toMillis(leaseTime); waitTime = leaseTime; if (waitTime <= 2000) { waitTime = 2000; } else if (waitTime <= baseWaitTime) { waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime); } else { waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime); } } while (true) { // 死循环 尝试获取锁,不停的尝试所有的锁 // 只要没有全部获取到或者 没有超过等待时间就会一直尝试 if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) { return; } } }
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { // try { // return tryLockAsync(waitTime, leaseTime, unit).get(); // } catch (ExecutionException e) { // throw new IllegalStateException(e); // } // 设置新的过期时间 long newLeaseTime = -1; // 如果传了过期时间 if (leaseTime != -1) { // 如果没传等待时间 if (waitTime == -1) { // 每传等待时间那就说明只一次获取锁,过期时间就不会发生变化 newLeaseTime = unit.toMillis(leaseTime); } else { // 如果传了等待时间,那么过期时间就等于等待时间的二倍,可以让锁的过期时间多一些 // 因为获取锁的时间就很长,如果过期时间太短,那么可能最后一把锁获取到的时候第一把所却过期了 // 为了安全考虑 newLeaseTime = unit.toMillis(waitTime)*2; } } long time = System.currentTimeMillis(); // 剩余的等待时间 long remainTime = -1; if (waitTime != -1) { remainTime = unit.toMillis(waitTime); } // 上锁的等待时间 long lockWaitTime = calcLockWaitTime(remainTime); // 允许获取失败锁的个数:0 int failedLocksLimit = failedLocksLimit(); // 获取成功的锁的集合 List<RLock> acquiredLocks = new ArrayList<>(locks.size()); // 通过迭代器遍历的方式获取每一把锁 for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) { // 当前这把锁 RLock lock = iterator.next(); boolean lockAcquired; try { if (waitTime == -1 && leaseTime == -1) { lockAcquired = lock.tryLock(); } else { long awaitTime = Math.min(lockWaitTime, remainTime); lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS); } } catch (RedisResponseTimeoutException e) { unlockInner(Arrays.asList(lock)); lockAcquired = false; } catch (Exception e) { lockAcquired = false; } // 如果获取锁成功那么就加到获取成功锁的集合中 if (lockAcquired) { acquiredLocks.add(lock); } else { // 如果 if (locks.size() - acquiredLocks.size() == failedLocksLimit()) { break; } // 获取锁失败 if (failedLocksLimit == 0) { // 释放所有已经获取成功的锁 unlockInner(acquiredLocks); // 如果是一次尝试获取锁,直接返回false if (waitTime == -1) { return false; } // 如果不是一次获取获取锁的话,那就重新开始获取所有的锁 failedLocksLimit = failedLocksLimit(); // 将已经获取到的锁清空 acquiredLocks.clear(); // reset iterator // 将迭代器退回到初始位置 while (iterator.hasPrevious()) { iterator.previous(); } } else { // 允许获取失败锁的个数:-1 // 不明白是为什么要-1 failedLocksLimit--; } } if (remainTime != -1) { remainTime -= System.currentTimeMillis() - time; time = System.currentTimeMillis(); // 如果获取锁花费的时间已经超过剩余的等待时间,那么就停止 if (remainTime <= 0) { unlockInner(acquiredLocks); return false; } } } // 走到这里说明全部锁都获取成功 // 如果过期时间等于-1,那上面获取锁的时候会走看门狗机制一直去续期 // 如果不等于-1,那么就需要给没把所都重置有效期,因为在上面获取锁的过程中前几个获取到锁的 // 有效期已经消耗了很长时间,所以这里做的目的就是为了让每把锁的有效期设置为一样的。 if (leaseTime != -1) { List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size()); for (RLock rLock : acquiredLocks) { RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS); futures.add(future); } for (RFuture<Boolean> rFuture : futures) { rFuture.syncUninterruptibly(); } } return true; }
当我们去设置了多个锁时,redission会将多个锁添加到一个集合中,然后用while循环去不停去尝试拿锁,但是会有一个总共的加锁时间,这个时间是用需要加锁的个数 * 1500ms ,假设有3个锁,那么时间就是4500ms,假设在这4500ms内,所有的锁都加锁成功, 那么此时才算是加锁成功,如果在4500ms有线程加锁失败,则会再次去进行重试.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。