当前位置:   article > 正文

ZooKeeper 实现分布式锁的简单示例

ZooKeeper 实现分布式锁的简单示例

一. 分布式锁概述

  1. 在分布式环境中,服务器集群,多个jvm运行,如果使用Lock或synchronized多个jvm是无法保证线程安全的
  2. 什么是分布式锁: 用来控制同一个任务的是执行,或设置任务顺序执行,保证同一时间内只有一个jvm执行某一个任务
  3. 分布式锁的实现方案: 基于数据库(效率较低),基于redis,基于ZooKeeper,SpringCloud全局锁
  4. redis 实现分布式锁可能产生的问题: 在redis主从架构中,通过setnx()向redis中存储锁,主节点接收数据,然后将数据同步到从节点,假设在同步未执行完毕时,主节点宕机,从节点升级为主节点,造成新的中节点上没有锁的问题
  5. 如何选择分布式锁: 根据实际的业务场景,假设当前项目并发要求较高可以使用redis,如果并发要求不高可以使用zookeeper

二. ZooKeeper 实现分布式锁

ZooKeeper 实现分布式锁原理

  1. ZooKeeper 可以充当一个文件系统,可以将数据以节点形式存入ZooKeeper中,并提供了增删改查,监听,通知等方法
  2. 当向ZooKeeper中发送请求时,集群环境下的ZooKeeper接收到请求首先会将请求转发给Leader,Leader在将请求广播给所有的Follwer,多Follwer同时处理请求,通过ZooKeeper对这个项目构架就可以找到一个唯一点
  3. ZooKeeper中存储数据的节点分为持久节点与临时节点,利用临时节点的特性,当连接断开,存储的临时节点数据消失
  4. ZooKeeper中不能创建两个相同的节点的特性

实现分析

假设在分布式集群环境中需要对某一段代码加锁,同一时间内只允许一个jvm的一个线程执行

  1. 在执行该代码以前先向ZooKeeper中创建一个临时节点例如"/Lock",该节点就可以看为锁
  2. 利用ZooKeeper中不允许创建相同节点的特性,如果创建失败,说明已有线程持有了锁
  3. 当获取锁失败时对该代表锁的节点设置监听,当前线程进行阻塞等待
  4. 当需要加锁执行的代码执行完毕后,关闭Zookeeper连接,代表锁的临时节点消失,通过ZooKeeper监听,回调方法执行唤醒其它获取锁失败阻塞等待的线程

代码示例

  1. 此处使用ZkClient 操作Zookeeper 项目中除了引入ZooKeeper的依赖以外还需要需要引入ZkClient 依赖, ZkClient 方法使用简介
	<!--zookeeper依赖-->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.13</version>
        </dependency>
        <!--管理zookeeper需要用到的-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.1</version>
        </dependency>
        <!--zkclient-->
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.10</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  1. 注意使用 ZkClient 创建临时节点以后,需要通过ZkClient 调用close() 方法后,该临时节点才会消失
  2. 此处使用 CountDownLatch 信号量方式,来设置线程阻塞等待,与线程唤醒等
  3. 重点关注获取锁失败时的阻塞方法
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;

public class ZooKeeperLockTest {
    //运行测试
    public static void main(String[] args) throws InterruptedException {
        ZooKeeperLockTest zooKeeperLock = new ZooKeeperLockTest();

        //使用分布式锁实现任务方法
        zooKeeperLock.runTest();
    }

    //1.获取ZooKeeper连接
    public ZkClient zkClient = new ZkClient("127.0.0.1:2181");

    //2.信号量,通过信号量来设置线程的阻塞与唤醒
    public CountDownLatch countDownLatch = null;

    //3.获取锁(也就是向Zookeeper上创建一个临时节点
    //创建临时节点不会返回数据,如果创建失败会报错
    //如果报错说明创建失败返回false)
    public boolean tryLock() {
        try {
            zkClient.createEphemeral("/LOCK");
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    //3.获取锁失败时的阻塞方法
    public void waitLock() {

        //1.创建IZkDataListener事件对象,需要重写handleDataDeleted()
        //与handleDataChange() 方法,在第一执行当前方法时并不是执行内部的
        //方法,只是先初始化IZkDataListener,直接进入了下面的第2步
        //当被监听的节点触发监听事件时,自动调用重写的方法
        IZkDataListener izkDataListener = new IZkDataListener() {
            //删除监听节点时回调执行的方法
            //当执行该方法时说明前面获取到锁的线程已经执行完毕释放了锁(也就是关闭
            //Zookeeper连接,删除了临时节点,触发了监听事件
            public void handleDataDeleted(String path) throws Exception {
                if (countDownLatch != null) {
                    //调用countDown()方法,将信号量累减为0(刚开始是1)
                    //唤醒后续获取锁失败等待的线程
                    countDownLatch.countDown();
                }
            }
            //修改监听节点时回调执行的方法
            public void handleDataChange(String path, Object data) throws Exception {
            }
        };


        //2.调用subscribeDataChanges() 方法,开启监听
        zkClient.subscribeDataChanges("/LOCK", izkDataListener);

        //3.判断其它线程是否释放了锁,也就是判断ZooKeeper上代表锁的节点是否还存在
        if (zkClient.exists("/LOCK")) {
            //如果存在初始化信号量对象并设置为1
            countDownLatch = new CountDownLatch(1);
            try {
                //信号量调用await()方法,判断信号量的值,如果不为0则阻塞等待
                //当信号量值变为0时,当前阻塞等待的线程会自动由此处开始继续向下执行
                countDownLatch.await();
                System.out.println("放开阻塞继续执行");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        //删除监听,(不删除会影响执行效率,注意该方法在if外部)
        //监听是在第2步骤开启的,此处删除并不影响后续锁的获取与释放
        //在由线程进来如果获取锁失败,在第2步骤开启监听后,第3步骤就阻塞了
        zkClient.unsubscribeDataChanges("/LOCK", izkDataListener);
    }

    //4.释放锁
    public void unLock() {
        if (zkClient != null) {
            zkClient.close();
            System.out.println("释放锁资源...");
        }
    }


    //5.逻辑代码整合在调用逻辑代码前首先获取锁,如果获取成功执行
    //需要加锁的逻辑代码,失败则阻塞等待
    public void runTest() throws InterruptedException {
        //1.获取锁
        boolean b = tryLock();

        //2.判断锁释放获取成功,成功则执行加锁逻辑代码
        //代码执行完毕,释放锁
        if (b) {
            System.out.println("获取锁成功,需要加锁执行的逻辑代码执行----->");
            Thread.sleep(10000);
            System.out.println("加锁逻辑代码执行完毕,开始释放锁----->");
            //3.释放锁
            unLock();
        } else {
            //4.获取锁失败,在此处进行阻塞等待
            waitLock();

            //当上一个获取锁的线程执行完毕后,释放锁
            //此处获取锁失败阻塞的线程被唤醒继续执行(是在监听事件中唤醒的)
            //递归调用当前的 runTest()方法,再次尝试获取执行
            runTest();
        }
    }

    //test方法测试开启第二个线程去获取锁执行业务
    @Test
    public void test() throws InterruptedException {
        ZooKeeperLockTest zooKeeperLock = new ZooKeeperLockTest();
        zooKeeperLock.runTest();
    }

    @Test
    public void test2() throws InterruptedException {
        zkClient.delete("/LOCK");
    }

    @Test
    public void test3() {
        zkClient.close();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130

ZooKeeper 实现分布式锁需要考虑的问题

  1. 羊群效应: 基于zk的临时节点实现分布式锁,当获取锁的线程执行完毕释放锁后,会唤醒等待的所有线程再次去争抢锁资源,某个线程获取锁成功后其它线程再次阻塞等待,这个唤醒所有线程,就是羊群效应
  2. 通过zk临时序列节点实现分布式锁,解决羊群效应
  • 利用zk临时序列节点的特性,创建同名节点zk会根据创建时间自动在节点nam上添加序列后缀例如"/lock/name0001"
  • 多线程获取锁时在zk指定/lock节点下创建临时序列节点,创建成功后,获取"/lock"节点下的所有子节点,判断当前线程创建的节点是否是第一个,如果是获取锁成功放行
  • 如果不是第一个子节点,则说明锁被其它线程获取,获取该节点的上一个节点,对该节点的上一个节点开启监听,当前线程countDownLatch.await()阻塞等待
  • 当获取到锁的线程执行完毕后,删除该线程在zk上创建的临时节点,由于只有一个线程对当前节点开启了监听,在唤醒线程时也只会唤醒对当前节点开启监听的一个线程
  1. zk 脑裂问题: 由于网络原因,zk集群环境判断当Leader节点宕机,选举Follower节点升级为Leader,在极端情况下,当前选举出的Follower节点中被没有存储当前正在执的线程获取到锁的节点,此时再有其它线程执行,也会获取锁成功,此时就有两个线程同步执行,解决这个问题,在存储临时节点时附加一个唯一序号或id,其它线程再创建临时节点时与上一个节点序号解析比对,如果累计加1等于当前节点的则说明没有出现以上问题,如果不是则说明出现了脑裂
    Zookeeper3.4.6的选举算法是FastLeaderElection,该算法的规则是投票超过半数的服务器才能当选为Leader。这个算法能够保证leader的唯一性

三. Curator

  1. Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等(参考大神的博客)

添加链接描述
添加链接描述
添加链接描述
添加链接描述
添加链接描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/965741
推荐阅读
相关标签
  

闽ICP备14008679号