赞
踩
假设在分布式集群环境中需要对某一段代码加锁,同一时间内只允许一个jvm的一个线程执行
<!--zookeeper依赖--> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.13</version> </dependency> <!--管理zookeeper需要用到的--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.1</version> </dependency> <!--zkclient--> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>
import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.junit.Test; import java.util.concurrent.CountDownLatch; public class ZooKeeperLockTest { //运行测试 public static void main(String[] args) throws InterruptedException { ZooKeeperLockTest zooKeeperLock = new ZooKeeperLockTest(); //使用分布式锁实现任务方法 zooKeeperLock.runTest(); } //1.获取ZooKeeper连接 public ZkClient zkClient = new ZkClient("127.0.0.1:2181"); //2.信号量,通过信号量来设置线程的阻塞与唤醒 public CountDownLatch countDownLatch = null; //3.获取锁(也就是向Zookeeper上创建一个临时节点 //创建临时节点不会返回数据,如果创建失败会报错 //如果报错说明创建失败返回false) public boolean tryLock() { try { zkClient.createEphemeral("/LOCK"); return true; } catch (Exception e) { return false; } } //3.获取锁失败时的阻塞方法 public void waitLock() { //1.创建IZkDataListener事件对象,需要重写handleDataDeleted() //与handleDataChange() 方法,在第一执行当前方法时并不是执行内部的 //方法,只是先初始化IZkDataListener,直接进入了下面的第2步 //当被监听的节点触发监听事件时,自动调用重写的方法 IZkDataListener izkDataListener = new IZkDataListener() { //删除监听节点时回调执行的方法 //当执行该方法时说明前面获取到锁的线程已经执行完毕释放了锁(也就是关闭 //Zookeeper连接,删除了临时节点,触发了监听事件 public void handleDataDeleted(String path) throws Exception { if (countDownLatch != null) { //调用countDown()方法,将信号量累减为0(刚开始是1) //唤醒后续获取锁失败等待的线程 countDownLatch.countDown(); } } //修改监听节点时回调执行的方法 public void handleDataChange(String path, Object data) throws Exception { } }; //2.调用subscribeDataChanges() 方法,开启监听 zkClient.subscribeDataChanges("/LOCK", izkDataListener); //3.判断其它线程是否释放了锁,也就是判断ZooKeeper上代表锁的节点是否还存在 if (zkClient.exists("/LOCK")) { //如果存在初始化信号量对象并设置为1 countDownLatch = new CountDownLatch(1); try { //信号量调用await()方法,判断信号量的值,如果不为0则阻塞等待 //当信号量值变为0时,当前阻塞等待的线程会自动由此处开始继续向下执行 countDownLatch.await(); System.out.println("放开阻塞继续执行"); } catch (Exception e) { e.printStackTrace(); } } //删除监听,(不删除会影响执行效率,注意该方法在if外部) //监听是在第2步骤开启的,此处删除并不影响后续锁的获取与释放 //在由线程进来如果获取锁失败,在第2步骤开启监听后,第3步骤就阻塞了 zkClient.unsubscribeDataChanges("/LOCK", izkDataListener); } //4.释放锁 public void unLock() { if (zkClient != null) { zkClient.close(); System.out.println("释放锁资源..."); } } //5.逻辑代码整合在调用逻辑代码前首先获取锁,如果获取成功执行 //需要加锁的逻辑代码,失败则阻塞等待 public void runTest() throws InterruptedException { //1.获取锁 boolean b = tryLock(); //2.判断锁释放获取成功,成功则执行加锁逻辑代码 //代码执行完毕,释放锁 if (b) { System.out.println("获取锁成功,需要加锁执行的逻辑代码执行----->"); Thread.sleep(10000); System.out.println("加锁逻辑代码执行完毕,开始释放锁----->"); //3.释放锁 unLock(); } else { //4.获取锁失败,在此处进行阻塞等待 waitLock(); //当上一个获取锁的线程执行完毕后,释放锁 //此处获取锁失败阻塞的线程被唤醒继续执行(是在监听事件中唤醒的) //递归调用当前的 runTest()方法,再次尝试获取执行 runTest(); } } //test方法测试开启第二个线程去获取锁执行业务 @Test public void test() throws InterruptedException { ZooKeeperLockTest zooKeeperLock = new ZooKeeperLockTest(); zooKeeperLock.runTest(); } @Test public void test2() throws InterruptedException { zkClient.delete("/LOCK"); } @Test public void test3() { zkClient.close(); } }
- 利用zk临时序列节点的特性,创建同名节点zk会根据创建时间自动在节点nam上添加序列后缀例如"/lock/name0001"
- 多线程获取锁时在zk指定/lock节点下创建临时序列节点,创建成功后,获取"/lock"节点下的所有子节点,判断当前线程创建的节点是否是第一个,如果是获取锁成功放行
- 如果不是第一个子节点,则说明锁被其它线程获取,获取该节点的上一个节点,对该节点的上一个节点开启监听,当前线程countDownLatch.await()阻塞等待
- 当获取到锁的线程执行完毕后,删除该线程在zk上创建的临时节点,由于只有一个线程对当前节点开启了监听,在唤醒线程时也只会唤醒对当前节点开启监听的一个线程
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。