赞
踩
锁是什么?
编程中的锁是控制不同线程之间访问共享资源的一种实现,需要实现互斥,来防止彼此干扰,来保证数据一致性。
门 和 锁的比喻
人是不同的线程,卫生间是共享资源
你在上洗手间的时候肯定要把门锁上吧,这就是加锁,只要你在里面,这个卫生间就被锁了,只有你出来之后别人才能用。想象一下如果卫生间的门没有锁会是什么样?
应用场景
使用分布式锁的目的,无外乎就是保证同一时间只有一个客户端可以对共享资源进行操作。
根据锁的用途还可以细分为以下两类
这种情况下,对共享资源的操作一定是幂等性操作,无论你操作多少次都不会出现不同结果。在这里使用锁,无外乎就是为了避免重复操作共享资源从而提高效率。
这种情况下,对共享资源的操作一般是非幂等性操作。在这种情况下,如果出现多个客户端操作共享资源,就可能意味着数据不一致,数据丢失。
怎么实现?
不要着急,下面我们开始的实现它,以下我们会使用redis实现分布式锁。
继续看下去吧~
redis中执行命令setnx key val
setnx 意思是
SET if Not eXists
当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0
那么意味者,系统认定成功更改值的线程持有该锁,有没问题(没有问题)?
redis中执行命令 del key
删除redis中这个值,意味着释放了这把锁,线程就可以重新争抢这把锁
现在就已经实现里加锁和释放锁,是不是已经完成了?
线程获得锁后程序出错/系统宕机还没来得及释放造成死锁?
- 使用try…finally 确保获取锁结束后必然会释放锁
- 为每个锁添加一个过期时间
如何设置超时时间?
redis中执行命令
expire key timeout
为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁
由于setnx和expire是两部操作,如果在setnx时还未进行expire程序宕机也会出现死锁问题?
解决方案:
set key val NX EX 10
原子性的操作同时给key设置值和加过期时间
释放锁的设计
当线程a准备释放锁,此时线程a的锁已经自然过期,线程b重新占用的了锁,那么线程a准备释放的锁就是线程刚刚获得锁。
只允许释放自己上加的锁
解决方案: 加锁设置的值不再是1 ,换成一个线程的唯一标识。
set lock uuid NX EX 10
uuid是一个唯一标识
解决这个问题同时引入一个新的问题?
释放锁之前,要先查询下锁的这个值是不是自己加上的,然后执行del操作,由于查询和删除不是原子操作,(例如:查询时发现是自己上的锁要准备释放锁,这是锁已经自然过期,在极短时间内另一个线程获取了这个锁,这时候准备释放锁的操作会错误的释放掉这个刚刚获得的锁)
解决方案:那就需要保证查询和删除是原子性。
由于redis的命令不支持 查询和删除是原子操作
那么我们通过发送lua脚本给redis执行保证原子性
书接上文:
如何设置合理的超时时间?
可能遇到网络问题,慢查询超过了锁过期时间,业务逻辑还未执行完成,锁自然释放了,这显然不是我们想看到的。
设置时间过长,一旦发生宕机重启,就意味着 1 小时内,分布式锁的服务全部节点不可用,让运维手动删除这个锁么?这也增加了运维成本,也不是我们想要的。
有一个动态的过期时间,能在我程序未执行完成,不断给我增加时间,这是不是能解决我们的问题呢?
让获得锁的线程开启一个守护线程,用来给快要过期的锁「续航」
- 如果快要过期,但是业务逻辑还没执行完成,自动对这个锁进行续期,重新设置过期时间。
这个守护线程我们一般也把它叫做「看门狗」线程
后面我们用python语言实现这样一个看门狗线程。
所以这就完成所有redis锁的设计了吗?
我们还可以优化锁的实现
可重入锁指的是在一个线程中可以多次获取同一把锁
a,b方法使用同一个锁的情况:
假设 X 线程在 a 方法获取锁之后,a方法调用 b 方法,如果此时不可重入,线程就必须等待锁释放,自己等待自己释放显然是不会有结果的。
实现起来也很简单,锁的value值改为hash类型对应uid的值是数值,重入一次增加1,释放则减少1.
下面我们用python代码来实现。
# -*- coding: utf-8 -*- # @DateTime : 2022/8/25 17:38 # @Author : charlesxie import threading import uuid import weakref import redis import time LOCK_SCRIPT = b""" if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('expire', KEYS[1], ARGV[1]); return 1; end ; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('expire', KEYS[1], ARGV[1]); return 1; end ; return 0; """ UNLOCK_SCRIPT = b""" if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then return nil; end ; local counter = redis.call('hincrby', KEYS[1], ARGV[1], -1); if (counter > 0) then return 0; else redis.call('del', KEYS[1]); return 1; end ; return nil; """ RENEW_SCRIPT = b""" if redis.call("exists", KEYS[1]) == 0 then return 1 elseif redis.call("ttl", KEYS[1]) < 0 then return 2 else redis.call("expire", KEYS[1], ARGV[1]) return 0 end """ class RedisLock: """ redis实现互斥锁,支持重入和续锁 """ def __init__(self, conn, lock_name, expire=30, uid=None, is_renew=True): self.conn = conn self.lock_script = None self.unlock_script = None self.renew_script = None self.register_script() self._name = f"lock:{lock_name}" self._expire = int(expire) self._uid = uid or str(uuid.uuid4()) self._lock_renew_interval = self._expire * 2 / 3 self._lock_renew_threading = None self.is_renew = is_renew self.is_acquired = None self.is_released = None @property def id(self): return self._uid @property def expire(self): return self._expire def acquire(self): result = self.lock_script(keys=(self._name,), args=(self._expire, self._uid)) if self.is_renew: self._start_renew_threading() self.is_acquired = True if result else False print(f"争抢锁:{self._uid}-{self.is_acquired}\n") return self.is_acquired def release(self): if self.is_renew: self._stop_renew_threading() result = self.unlock_script(keys=(self._name,), args=(self._uid,)) self.is_released = True if result else False print(f"释放锁{self.is_released}") return self.is_released def register_script(self): self.lock_script = self.conn.register_script(LOCK_SCRIPT) self.unlock_script = self.conn.register_script(UNLOCK_SCRIPT) self.renew_script = self.conn.register_script(RENEW_SCRIPT) def renew(self, renew_expire=30): result = self.renew_script(keys=(self._name,), args=(renew_expire,)) if result == 1: raise Exception(f"{self._name} 没有获得锁或锁过期!") elif result == 2: raise Exception(f"{self._name} 未设置过期时间") elif result: raise Exception(f"未知错误码: {result}") print("续命一波", result) @staticmethod def _renew_scheduler(weak_self, interval, lock_event): while not lock_event.wait(timeout=interval): lock = weak_self() if lock is None: break lock.renew(renew_expire=lock.expire) del lock def _start_renew_threading(self): self.lock_event = threading.Event() self._lock_renew_threading = threading.Thread(target=self._renew_scheduler, kwargs={ "weak_self": weakref.ref(self), "interval": self._lock_renew_interval, "lock_event": self.lock_event }) self._lock_renew_threading.demon = True self._lock_renew_threading.start() def _stop_renew_threading(self): if self._lock_renew_threading is None or not self._lock_renew_threading.is_alive(): return self.lock_event.set() # join 作用是确保thread子线程执行完毕后才能执行下一个线程 self._lock_renew_threading.join() self._lock_renew_threading = None def __enter__(self): self.acquire() return self def __exit__(self, exc_type=None, exc_val=None, exc_tb=None): self.release() def run_work(my_user_id): with RedisLock(redis_client, "test", uid=my_user_id, expire=5) as r: if r.is_acquired: print(f"just do it,{my_user_id}") time.sleep(20) else: print(f"quit, {my_user_id}") if __name__ == '__main__': redis_client = redis.Redis(host="localhost", port=6379, db=2) a1 = threading.Thread(target=run_work, args=("charles",)) a2 = threading.Thread(target=run_work, args=("xie",)) a1.start() a2.start()
争抢锁:charles-True
争抢锁:xie-False
quit, xie
just do it,charles
释放锁False
续命一波 0
续命一波 0
续命一波 0
续命一波 0
续命一波 0
释放锁True
进程已结束,退出代码0
讲的是 redis通过主从模式哨兵模式集群模式部署成为一个redis集群,而我们在使用 Redis 时,一般会采用主从集群 + 哨兵的模式部署,这样做的好处在于,当主库异常宕机时,哨兵可以实现「故障自动切换」,把从库提升为主库,继续提供服务,以此保证可用性。
同样分布式也会引入一些问题,
如果你对某个redis master实例,写入锁数据,此时会异步复制给对应的master slave实例。
但是这个过程中一旦发生redis master宕机,主备切换,redis slave变为了redis master。
接着就会导致,客户端2来尝试加锁的时候,在新的redis master上完成了加锁
此时就会存在2个客户端同时拥有这把锁。
这样的业务系统一定会出问题,导致各种脏数据的产生和数据不一致。
为了应对这个情形, redis的作者antirez提出了RedLock算法,
RedLock算法思想:
- 不能只在一个redis实例上创建锁,应该是在多个redis实例上创建锁
- 必须在大多数(n/2+1)redis节点上都成功创建锁,才能算这个整体的RedLock加锁成功
Redlock 的方案基于 2 个前提:
你可看作是把5把小锁合并成一把大锁使用
那么开始我们的实现过程:
class RedLock: def __init__(self, masters, lock_name, expire=30, uid=None, is_renew=True): self.masters = masters self.lock_script = None self.unlock_script = None self.renew_script = None self.register_script() self._name = f"lock:{lock_name}" self._expire = int(expire) self._uid = uid or str(uuid.uuid4()) self._lock_renew_interval = self._expire * 2 / 3 self._lock_renew_threading = None # 是否开启续锁 self.is_renew = is_renew self.is_acquired = None self.is_released = None @property def id(self): return self._uid @property def expire(self): return self._expire def __drift(self) -> float: return self._expire * 0.01 + .002 def register_script(self): master = next(iter(self.masters)) # type: ignore self.lock_script = master.register_script(LOCK_SCRIPT) self.unlock_script = master.register_script(UNLOCK_SCRIPT) self.renew_script = master.register_script(RENEW_SCRIPT) def __acquire(self, master): result = self.lock_script(keys=(self._name,), args=(self._expire, self._uid), client=master) return result def acquire(self): futures = [] success_acquired_num = 0 start_time = timeit.default_timer() executor = ThreadPoolExecutor() for master in self.masters: futures.append(executor.submit(self.__acquire, master)) executor.shutdown(True) for future in as_completed(futures): try: success_acquired_num += future.result() except Exception as e: print(f"{e}") else: if success_acquired_num > len(self.masters) // 2: end_time = timeit.default_timer() if end_time - start_time - self.__drift() < self._expire: self.is_acquired = True if self.is_renew and self.is_acquired: self._start_renew_threading() print(f"{self._uid}-争抢锁:{success_acquired_num}-{self.is_acquired}\n") return self.is_acquired def __release(self, master): result = self.unlock_script(keys=(self._name,), args=(self._uid,), client=master) return result def release(self): if self.is_renew and self.is_acquired: self._stop_renew_threading() futures = [] success = [] executor = ThreadPoolExecutor() for master in self.masters: futures.append(executor.submit(self.__release, master)) executor.shutdown(True) for future in as_completed(futures): result = future.result() if result == 1: success.append(result) self.is_released = True if len(success) > len(self.masters) // 2 else False print(f"释放锁{success}-{self.is_released}") return self.is_released def __renew(self, master, renew_expire): result = self.renew_script(keys=(self._name,), args=(renew_expire,), client=master) return result def renew(self, renew_expire=30): futures = [] success = [] executor = ThreadPoolExecutor() for master in self.masters: futures.append(executor.submit(self.__renew, master, renew_expire)) executor.shutdown(True) for future in as_completed(futures): result = future.result() if result == 0: success.append(result) print("续命一波", success) def __enter__(self): self.acquire() return self def __exit__(self, exc_type=None, exc_val=None, exc_tb=None): self.release() def _stop_renew_threading(self): if self._lock_renew_threading is None or not self._lock_renew_threading.is_alive(): return self.lock_event.set() # join 作用是确保thread子线程执行完毕后才能执行下一个线程 self._lock_renew_threading.join() self._lock_renew_threading = None def _start_renew_threading(self): self.lock_event = threading.Event() self._lock_renew_threading = threading.Thread(target=self._renew_scheduler, kwargs={ "weak_self": weakref.ref(self), "interval": self._lock_renew_interval, "lock_event": self.lock_event }) self._lock_renew_threading.demon = True self._lock_renew_threading.start() @staticmethod def _renew_scheduler(weak_self, interval, lock_event): while not lock_event.wait(timeout=interval): lock = weak_self() if lock is None: break lock.renew(renew_expire=lock.expire) del lock
if __name__ == '__main__': redis_client = redis.Redis(host="localhost", port=6379, db=2) redis_client2 = redis.Redis(host="localhost", port=6379, db=1) redis_client3 = redis.Redis(host="localhost", port=6379, db=3) redis_client4 = redis.Redis(host="localhost", port=6379, db=4) # 单机锁 # a1 = threading.Thread(target=run_work, args=("charles",)) # a2 = threading.Thread(target=run_work, args=("xie",)) # # a1.start() # a2.start() # 分布式锁 masters = [redis_client, redis_client2, redis_client3, redis_client4] pool = ThreadPool(5) pool.starmap(run_work_distributed, [("u1",), ("u2",), ("u3",), ("u4",), ("u5",)]) pool.close() pool.join()
官方推荐的轮子:
包含不同语言的不同设计,能满足大部分生产上的需求。
开箱即用,效果杠杠的。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。