赞
踩
点击下方名片,设为星标!
回复“1024”获取2TB学习资源!
前面介绍了 Zookeeper 集群 ZAB 协议、配置中心、注册中心、数据与存储、会话与事务管理等相关的知识点,今天我将详细的为大家介绍 zookeeper 分布式锁相关知识,希望大家能够从中收获多多!如有帮助,请点在看、转发支持一波!!!
在平时我们对锁的使用,在针对单个服务,我们可以用 Java 自带的一些锁来实现,资源的顺序访问,但是随着业务的发展,现在基本上公司的服务都是多个,单纯的 Lock或者 Synchronize 只能解决单个JVM线程的问题,那么针对于单个服务的 Java 的锁是无法满足我们业务的需要的,为了解决多个服务跨服务访问共享资源,于是就有了分布锁,分布式锁产生的原因就是集群。在分布式系统中,多个进程或节点可能需要同时访问共享资源。为了确保数据一致性和并发控制,需要使用分布式锁来协调这些进程或节点之间的访问。分布式锁可以让每个进程或节点按照特定的规则访问共享资源,从而避免冲突和竞争条件的发生。
下图就是一个分布式锁的常见应用案例。
分布式锁的实现方式主要以(ZooKeeper、Reids、Mysql)这三种为主。
今天我们主要讲解的是使用 ZooKeeper来实现分布式锁,ZooKeeper的应用场景主要包含这几个方面:
1.服务注册与订阅(共用节点)
2.分布式通知(监听ZNode)
3.服务命令(ZNode特性)
4.数据订阅、发布(Watcher)
5.分布式锁(临时节点)
ZooKeeper实现分布式锁,主要是得益于ZooKeeper 保证了数据的强一致性,锁的服务可以分为两大类:
保持独占所有试图来获取当前锁的客户端,最终有且只有一个能够成功得到当前锁的钥匙,通常我们会把 ZooKeeper 上的节点(ZNode)看做一把锁,通过 create 临时节点的方式来实现,当多个客户端都去创建一把锁的时候,那么只有成功创建了那个客户端才能拥有这把锁。
控制时序所有试图获取锁的客户端,都是被顺序执行,只是会有一个序号(zxid),我们会有一个节点,例如:/testLock,所有临时节点都在这个下面去创建,ZK的父节点(/testLock) 维持了一个序号,这个是ZK自带的属性,他保证了子节点创建的时序性,从而也形成了每个客户端的一个全局时序。
ZooKeeper的分布式锁是基于ZooKeeper提供的有序节点(Sequential Nodes)和 Watch 机制实现的。具体实现步骤如下:
1.每个进程或节点在ZooKeeper的某个节点上创建一个有序节点,节点名称可以是一个递增的数字,也可以是其他可以排序的字符串。
2.进程或节点根据节点名称的顺序来竞争获取锁,获取到锁的进程或节点可以访问共享资源,其他进程或节点需要等待。
3.当有一个进程或节点释放锁时,ZooKeeper会通知等待队列中的第一个进程或节点,让其继续竞争获取锁。
因为ZooKeeper的有序节点是按照创建的顺序排序的,所以可以通过监听前一个节点的变化来实现获取锁。当一个进程或节点需要获取锁时,它会在ZooKeeper上创建一个有序节点,并获取所有有序节点中的最小值。如果当前节点是最小值,则表示该进程或节点已经获取到锁;否则,该进程或节点需要监听前一个节点的变化,等待前一个节点释放锁后再次尝试获取锁。
ZooKeeper的分布式锁具有以下优点:
可以确保分布式环境下的并发控制和数据一致性。
可以避免死锁和竞争条件的发生。
可以提供较高的性能和可靠性。
但是,ZooKeeper的分布式锁也存在一些局限性:
1.由于需要频繁地在ZooKeeper上进行节点的创建、删除和监听操作,因此会产生较高的网络和性能开销。
2.当锁被持有时,其他进程或节点需要等待前一个节点释放锁才能继续尝试获取锁,因此锁的竞争情况就会相对平均,不会出现某一个进程或节点一直占用锁的情况。
使用ZooKeeper实现分布式锁的基本流程如下:
1.创建一个ZooKeeper客户端,并连接到ZooKeeper服务器。
2.在ZooKeeper的一个目录下创建一个锁节点,例如/locks/lock_node。
3.当需要获取锁时,调用create()方法在/locks目录下创建一个临时有序节点,例如/locks/lock_node/lock_000000001
,同时设置watcher事件,监控它的前一个节点。
4.调用getChildren()方法获取/locks目录下所有的子节点,判断自己创建的节点是否为序号最小的节点。
5.如果是序号最小的节点,则表示获取到了锁,可以执行临界区代码;否则调用exists()方法监控自己前面的一个节点。
6.当前面的一个节点被删除时,触发watcher事件,重复第4和第5步,直到获取到锁为止。
7.释放锁时,调用delete()方法删除自己创建的节点,其他等待锁的进程或节点就可以获取到锁。
需要注意的是,分布式锁的实现还需要处理以下问题:
1.临时节点的创建和删除必须是原子性的,否则会出现多个节点同时创建或删除的情况,导致锁的失效。
2.如果一个进程或节点创建了临时节点但没有及时删除,就会造成死锁,因为其他进程或节点永远也无法获取到锁。
3.如果一个进程或节点获取到锁后因为某些原因没有及时释放锁,就会导致其他进程或节点一直等待,降低了系统的性能。
因此,在实现分布式锁时,需要考虑锁的可靠性
、高效性
和容错性
,并对异常情况进行处理,以确保锁的正确性和系统的稳定性。
另外 ZooKeeper 还提供了一种基于临时节点的分布式锁机制,这种锁被称为“短暂节点锁”。使用短暂节点锁时,每个客户端进程会在 ZooKeeper 上创建一个临时节点,并在该节点上注册一个 Watcher 来监听该节点。当客户端进程需要获取锁时,它会在指定的 ZooKeeper 节点下创建一个短暂节点。如果该节点的序号是当前所有节点中最小的,则该客户端进程获得锁;否则,该进程需要等待,直到 Watcher 监听到节点被删除为止。
短暂节点锁的优点是它不会出现羊群效应,而且当进程失去锁时,它所创建的短暂节点会被自动删除,这可以有效减少ZooKeeper上的数据量。不过,它的缺点是每个客户端都需要创建一个短暂节点,如果客户端数量很多,ZooKeeper上的节点数量可能会很快增加,从而导致性能下降。
ZooKeeper的分布式锁机制可以通过不同的实现方式来满足不同的需求。开发者需要根据实际情况选择适合自己的锁实现方式,以实现高效、可靠的分布式系统。其中包含了分布式锁的实现。使用ZooKeeper实现分布式锁可以避免多个节点同时操作共享资源的问题,确保数据的一致性和可靠性。
在ZooKeeper中,分布式锁的实现基于临时节点和Watch机制,同时需要实现两个基本操作:加锁
和释放锁
。 具体实现方法有两种:
一种是使用顺序节点实现锁的竞争,
另一种是使用锁路径中的版本号实现锁的控制。
无论哪种实现方法,都需要处理锁竞争的情况和节点异常退出的情况,以确保锁的正确性和可靠性。分布式锁的实现需要考虑多个因素,包括锁的粒度、锁的持有时间、锁的竞争方式等,需要根据具体应用场景进行调整和优化。
多个请求同时添加一个相同的临时节点,只有一个可以添加成功。添加成功的获取到锁
执行业务逻辑
完成业务流程后,删除节点释放锁。
由于zookeeper获取链接是一个耗时过程,这里可以在项目启动时,初始化链接,并且只初始化一次。借助于spring特性,代码实现如下:
- @Component
- public class zkClient {
- private static final String connectString = "192.168.107.135";
-
- private static final String ROOT_PATH = "/distributed";
-
- private ZooKeeper zooKeeper;
-
- @PostConstruct
- public void init() throws IOException {
- this.zooKeeper = new ZooKeeper(connectString, 30000, new Watcher() {
- @Override
- public void process(WatchedEvent watchedEvent) {
- System.out.println("zookeeper 获取链接成功");
- }
- });
- //创建分布式锁根节点
- try {
- if (this.zooKeeper.exists(ROOT_PATH, false) == null) {
- this.zooKeeper.create(ROOT_PATH, null,
- ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- @PreDestroy
- public void destroy() {
- if (zooKeeper != null) {
- try {
- zooKeeper.close();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * 初始化分布式对象方法
- */
- public ZkDistributedLock getZkDistributedLock(String lockname){
- return new ZkDistributedLock(zooKeeper,lockname);
- }
- }

代码落地
- public class ZkDistributedLock {
- public static final String ROOT_PATH = "/distribute";
- private String path;
- private ZooKeeper zooKeeper;
-
-
- public ZkDistributedLock(ZooKeeper zooKeeper, String lockname) {
- this.zooKeeper = zooKeeper;
- this.path = ROOT_PATH + "/" + lockname;
- }
-
- public void lock() {
- try {
- zooKeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- try {
- Thread.sleep(200);
- lock();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- }
-
- public void unlock(){
- try {
- this.zooKeeper.delete(path,0);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (KeeperException e) {
- e.printStackTrace();
- }
-
- }
- }

改造StockService的checkAndLock方法:
- @Autowired
- private zkClient client;
-
- public void checkAndLock() {
- // 加锁,获取锁失败重试
- ZkDistributedLock lock = this.client.getZkDistributedLock("lock");
- lock.lock();
- // 先查询库存是否充足
- Stock stock = this.stockMapper.selectById(1L);
- // 再减库存
- if (stock != null && stock.getCount() > 0) {
- stock.setCount(stock.getCount() - 1);
- this.stockMapper.updateById(stock);
- }
- lock.unlock();
- }

性能一般,mysql数据库的库存余量为0(注意:所有测试之前都要先修改库存量为5000)。
性能一般(比mysql略好)
不可重入
接下来首先来提高性能。
基本实现中由于无限自旋影响性能:试想:每个请求要想正常的执行完成,最终都是要创建节点,如果能够避免争抢必然可以提高性能。这里借助于zk的临时序列化节点,实现分布式锁:
- public class ZkDistributedLock {
- public static final String ROOT_PATH = "/distribute";
- private String path;
- private ZooKeeper zooKeeper;
-
-
- public ZkDistributedLock(ZooKeeper zooKeeper, String lockname) {
- this.zooKeeper = zooKeeper;
- try {
- this.path = zooKeeper.create(ROOT_PATH + "/" + lockname + "_",
- null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- public void lock() {
- String preNode = getpreNode(path);
- //如果该节点没有前一个节点,说明该节点是最小的节点
- if (StringUtils.isEmpty(preNode)) {
- return;
- }
- //重新检查是否获取到锁
- try {
- Thread.sleep(20);
- lock();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 获取指定节点的前节点
- *
- * @param path
- * @return
- */
- private String getpreNode(String path) {
- //获取当前节点的序列化序号
- Long curSerial = Long.valueOf(StringUtil.substringAfter(path, '_'));
- //获取根路径下的所有序列化子节点
- try {
- List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);
- //判空处理
- if (CollectionUtils.isEmpty(nodes)) {
- return null;
- }
- //获取前一个节点
- Long flag = 0L;
- String preNode = null;
- for (String node : nodes) {
- //获取每个节点的序列化号
- Long serial = Long.valueOf(StringUtil.substringAfter(path, '_'));
- if (serial < curSerial && serial > flag) {
- flag = serial;
- preNode = node;
- }
- }
- return preNode;
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return null;
- }
-
- public void unlock() {
- try {
- this.zooKeeper.delete(path, 0);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (KeeperException e) {
- e.printStackTrace();
- }
-
- }
- }

主要修改了构造方法和lock方法:并添加了getPreNode获取前置节点的方法。
测试结果如下:性能反而更弱了。
原因:虽然不用反复争抢创建节点了,但是会自选判断自己是最小的节点,这个判断逻辑反而更复杂更 耗时。
对于这个算法有个极大的优化点:假如当前有1000个节点在等待锁,如果获得锁的客户端释放锁时,这1000个客户端都会被唤醒,这种情况称为“羊群效应”;在这种羊群效应中,zookeeper需要通知1000个 客户端,这会阻塞其他的操作,最好的情况应该只唤醒新的最小节点对应的客户端。应该怎么做呢?在 设置事件监听时,每个客户端应该对刚好在它之前的子节点设置事件监听,例如子节点列表 为/lock/lock-0000000000、/lock/lock-0000000001、/lock/lock-0000000002,序号为1的客户端监听 序号为0的子节点删除消息,序号为2的监听序号为1的子节点删除消息。
客户端连接zookeeper,并在/lock下创建临时的且有序的子节点,第一个客户端对应的子节点 为/lock/lock-0000000000,第二个为/lock/lock-0000000001,以此类推;
客户端获取/lock下的子节点列表,判断自己创建的子节点是否为当前子节点列表中序号最小的子 节点,如果是则认为获得锁,否则监听刚好在自己之前一位的子节点删除消息,获得子节点变更通 知后重复此步骤直至获得锁;
执行业务代码;
完成业务流程后,删除对应的子节点释放锁。
- public void lock() {
- String preNode = getpreNode(path);
- //如果该节点没有前一个节点,说明该节点是最小的节点
- if (StringUtils.isEmpty(preNode)) {
- return;
- } else {
- CountDownLatch countDownLatch = new CountDownLatch(1);
- try {
- if (this.zooKeeper.exists(ROOT_PATH + "/" + preNode, watchedEvent -> {
- countDownLatch.countDown();
- }) == null) {
- return;
- }
- countDownLatch.await();
- return;
-
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- try {
- Thread.sleep(200);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- lock();
- }
- }

压力测试效果如下:由此可见性能提高不少仅次于redis的分布式锁。
引入ThreadLocal线程局部变量保证zk分布式锁的可重入性。
在对应的线程的存储数据。
- public class ZkDistributedLock {
- public static final String ROOT_PATH = "/distribute";
- private String path;
- private ZooKeeper zooKeeper;
- private static final ThreadLocal<Integer> THREAD_LOCAL = new ThreadLocal<>();
-
-
- public ZkDistributedLock(ZooKeeper zooKeeper, String lockname) {
- this.zooKeeper = zooKeeper;
- try {
- this.path = zooKeeper.create(ROOT_PATH + "/" + lockname + "_",
- null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- public void lock() {
- Integer flag = THREAD_LOCAL.get();
- if (flag != null && flag > 0) {
- THREAD_LOCAL.set(flag + 1);
- return;
- }
- String preNode = getpreNode(path);
- //如果该节点没有前一个节点,说明该节点是最小的节点
- if (StringUtils.isEmpty(preNode)) {
- return;
- } else {
- CountDownLatch countDownLatch = new CountDownLatch(1);
- try {
- if (this.zooKeeper.exists(ROOT_PATH + "/" + preNode, watchedEvent -> {
- countDownLatch.countDown();
- }) == null) {
- return;
- }
- countDownLatch.await();
- THREAD_LOCAL.set(1);
- return;
-
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- try {
- Thread.sleep(200);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- lock();
- }
- }
-
- /**
- * 获取指定节点的前节点
- *
- * @param path
- * @return
- */
- private String getpreNode(String path) {
- //获取当前节点的序列化序号
- Long curSerial = Long.valueOf(StringUtil.substringAfter(path, '_'));
- //获取根路径下的所有序列化子节点
- try {
- List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);
- //判空处理
- if (CollectionUtils.isEmpty(nodes)) {
- return null;
- }
- //获取前一个节点
- Long flag = 0L;
- String preNode = null;
- for (String node : nodes) {
- //获取每个节点的序列化号
- Long serial = Long.valueOf(StringUtil.substringAfter(path, '_'));
- if (serial < curSerial && serial > flag) {
- flag = serial;
- preNode = node;
- }
- }
- return preNode;
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return null;
- }
-
- public void unlock() {
- try {
- THREAD_LOCAL.set(THREAD_LOCAL.get() - 1);
- if (THREAD_LOCAL.get() == 0) {
- this.zooKeeper.delete(path, 0);
- THREAD_LOCAL.remove();
- }
-
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (KeeperException e) {
- e.printStackTrace();
- }
-
- }
- }

互斥 排他:zk节点的不可重复性,以及序列化节点的有序性。
防死锁:
可自动释放锁:临时节点。
可重入锁:借助于ThreadLocal。
防误删:临时节点。
加锁/解锁要具备原子性。
单点问题:使用Zookeeper可以有效的解决单点问题,ZK一般是集群部署的。
集群问题:zookeeper集群是强一致性的,只要集群中有半数以上的机器存活,就可以对外提供服务。
公平锁:有序性节点。
我们知道了 Redis 主要是通过 setnx 命令实现分布式锁,Zookeeper 采用临时节点和事件监听机制可以实现分布式锁。那么这两种方式有哪些关键的区别呢?
Redis分布式锁,获取不到锁时,需要不断轮询去尝试获取锁,比较消耗性能;ZooKeeper分布式锁,获取不到锁时,注册监听器即可,不需要不断主动尝试获取锁,性能开销较小;
锁未释放时服务器宕机。Redis只能等超时时间到将锁释放。ZooKeeper的临时节点检测不到服务器的心跳,节点移除,锁自动被释放;
这样看好像ZooKeeper比Redis更胜一筹,但Redis提供的API和库更加丰富,在很大程度上能够减少开发的工作量。而且如果是小规模的项目,已经部署了Redis,可能没太大必要去再部署一套ZooKeeper集群去实现分布式锁,大家根据场景自行选择。
参考文章:https://blog.csdn.net/polsnet/article/
details/130444403 https://blog.csdn.net/m0_62436868
/article/details/130465612
读者专属技术群
构建高质量的技术交流社群,欢迎从事后端开发、运维技术进群(备注岗位,已在技术交流群的请勿重复添加)。主要以技术交流、内推、行业探讨为主,请文明发言。广告人士勿入,切勿轻信私聊,防止被骗。
扫码加我好友,拉你进群
推荐阅读 点击标题可跳转
PS:因为公众号平台更改了推送规则,如果不想错过内容,记得读完点一下“在看”,加个“星标”,这样每次新文章推送才会第一时间出现在你的订阅列表里。点“在看”支持我们吧!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。