赞
踩
在zookeeper中,根据zookeeper的特性,我们有两种方式来实现分布式锁。
在zookeeper节点下面,让需要获得锁的程序,都去创建名为Lock的临时节点,因为多实例创建的节点名称相同,只会有一个实例创建节点成功,那么这个实例就可以获得锁,就可以去执行本身代码逻辑,其他没有创建成功的实例,就去注册一个watch去监听Lock节点,当抢到锁的实例执行完本身程序后,就会去删除自己创建的Lock节点,那么就会触发watch机制,监听这个节点的其他实例就会再次去尝试创建这个节点,从而实现分布式锁。
其实现分布式锁的原理就是:zookeeper中节点不重名 + watch机制。流程图如下:
代码实现如下:
- public class ZKDistributeLock implements Lock {
-
- private String lockPath;
- private ZkClient client;
- // 锁重入计数
- private ThreadLocal<Integer> reentrantCount = new ThreadLocal<>();
-
- public ZKDistributeLock(String lockPath) {
- super();
- this.lockPath = lockPath;
-
- client = new ZkClient("localhost:2181");
- client.setZkSerializer(new MyZkSerializer());
- }
-
- @Override
- public boolean tryLock() { // 不会阻塞
- if (this.reentrantCount.get() != null) {
- int count = this.reentrantCount.get();
- if (count > 0) {
- this.reentrantCount.set(++count);
- return true;
- }
- }
- // 创建节点
- try {
- client.createEphemeral(lockPath);
- this.reentrantCount.set(1);
- } catch (ZkNodeExistsException e) {
- return false;
- }
- return true;
- }
-
- @Override
- public void unlock() {
- // 重入的释放锁处理
- if (this.reentrantCount.get() != null) {
- int count = this.reentrantCount.get();
- if (count > 1) {
- this.reentrantCount.set(--count);
- return;
- } else {
- this.reentrantCount.set(null);
- }
- }
- client.delete(lockPath);
- }
-
- @Override
- public void lock() { // 如果获取不到锁,阻塞等待
- if (!tryLock()) {
- // 没获得锁,阻塞自己
- waitForLock();
- // 再次尝试
- lock();
- }
-
- }
-
- private void waitForLock() {
-
- CountDownLatch cdl = new CountDownLatch(1);
-
- IZkDataListener listener = new IZkDataListener() {
-
- @Override
- public void handleDataDeleted(String dataPath) throws Exception {
- System.out.println("----收到节点被删除了-------------");
- cdl.countDown();
- }
-
- @Override
- public void handleDataChange(String dataPath, Object data) throws Exception {
- }
- };
- client.subscribeDataChanges(lockPath, listener);
- // 阻塞自己
- if (this.client.exists(lockPath)) {
- try {
- cdl.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- // 取消注册
- client.unsubscribeDataChanges(lockPath, listener);
- }
-
- @Override
- public void lockInterruptibly() throws InterruptedException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public Condition newCondition() {
- // TODO Auto-generated method stub
- return null;
- }
-
- public static void main(String[] args) {
- // 并发数
- int currency = 50;
- // 循环屏障
- CyclicBarrier cb = new CyclicBarrier(currency);
- // 多线程模拟高并发
- for (int i = 0; i < currency; i++) {
- new Thread(new Runnable() {
- public void run() {
- System.out.println(Thread.currentThread().getName() + "---------我准备好---------------");
- // 等待一起出发
- try {
- cb.await();
- } catch (InterruptedException | BrokenBarrierException e) {
- e.printStackTrace();
- }
- ZKDistributeLock lock = new ZKDistributeLock("/distLock11");
-
- try {
- lock.lock();
- System.out.println(Thread.currentThread().getName() + " 获得锁!");
- } finally {
- lock.unlock();
- }
- }
- }).start();
-
- }
- }
- }

用此方法实现的分布式锁,存在的缺点就是会产生惊群效应,当一个线程去释放掉锁的时候,其它线程都回去抢占这个锁资源,如果线程太多,影响其性能。
可以在zookeeper节点下面创建一个Lock节点,并在其下面创建临时顺序节点,当第一个节点抢到锁后,后面的节点,只会去watch自己的上一个节点,当自己的上一个节点释放锁后,只会触发下个节点的watch机制,不会引起惊群效应,提高效率。
实现原理:取号 + 最小号获得锁 + watch机制
流程图如下:
代码实现如下:
- public class ZKDistributeImproveLock implements Lock {
-
- /*
- * 利用临时顺序节点来实现分布式锁
- * 获取锁:取排队号(创建自己的临时顺序节点),然后判断自己是否是最小号,如是,则获得锁;不是,则注册前一节点的watcher,阻塞等待
- * 释放锁:删除自己创建的临时顺序节点
- */
- private String lockPath;
- private ZkClient client;
- private ThreadLocal<String> currentPath = new ThreadLocal<>();
- private ThreadLocal<String> beforePath = new ThreadLocal<>();
- // 锁重入计数
- private ThreadLocal<Integer> reentrantCount = new ThreadLocal<>();
-
- public ZKDistributeImproveLock(String lockPath) {
- super();
- this.lockPath = lockPath;
- client = new ZkClient("localhost:2181");
- client.setZkSerializer(new MyZkSerializer());
- if (!this.client.exists(lockPath)) {
- try {
- this.client.createPersistent(lockPath);
- } catch (ZkNodeExistsException e) {
-
- }
- }
- }
-
- @Override
- public boolean tryLock() {
- if (this.reentrantCount.get() != null) {
- int count = this.reentrantCount.get();
- if (count > 0) {
- this.reentrantCount.set(++count);
- return true;
- }
- }
-
- if (this.currentPath.get() == null) {
- currentPath.set(this.client.createEphemeralSequential(lockPath + "/", "aaa"));
- }
- // 获得所有的子
- List<String> children = this.client.getChildren(lockPath);
- // 排序list
- Collections.sort(children);
- // 判断当前节点是否是最小的
- if (currentPath.get().equals(lockPath + "/" + children.get(0))) {
- this.reentrantCount.set(1);
- return true;
- } else {
- // 取到前一个
- // 得到字节的索引号
- int curIndex = children.indexOf(currentPath.get().substring(lockPath.length() + 1));
- beforePath.set(lockPath + "/" + children.get(curIndex - 1));
- }
- return false;
- }
-
- @Override
- public void lock() {
- if (!tryLock()) {
- // 阻塞等待
- waitForLock();
- // 再次尝试加锁
- lock();
- }
- }
-
- private void waitForLock() {
- CountDownLatch cdl = new CountDownLatch(1);
- // 注册watcher
- IZkDataListener listener = new IZkDataListener() {
-
- @Override
- public void handleDataDeleted(String dataPath) throws Exception {
- System.out.println("-----监听到节点被删除");
- cdl.countDown();
- }
-
- @Override
- public void handleDataChange(String dataPath, Object data) throws Exception {
-
- }
- };
-
- client.subscribeDataChanges(this.beforePath.get(), listener);
-
- // 怎么让自己阻塞
- if (this.client.exists(this.beforePath.get())) {
- try {
- cdl.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- // 醒来后,取消watcher
- client.unsubscribeDataChanges(this.beforePath.get(), listener);
- }
-
- @Override
- public void unlock() {
- // 重入的释放锁处理
- if (this.reentrantCount.get() != null) {
- int count = this.reentrantCount.get();
- if (count > 1) {
- this.reentrantCount.set(--count);
- return;
- } else {
- this.reentrantCount.set(null);
- }
- }
- // 删除节点
- this.client.delete(this.currentPath.get());
- }
-
- @Override
- public void lockInterruptibly() throws InterruptedException {
- // TODO Auto-generated method stub
- }
-
- @Override
- public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public Condition newCondition() {
- // TODO Auto-generated method stub
- return null;
- }
-
- public static void main(String[] args) {
- // 并发数
- int currency = 50;
- // 循环屏障
- CyclicBarrier cb = new CyclicBarrier(currency);
- // 多线程模拟高并发
- for (int i = 0; i < currency; i++) {
- new Thread(new Runnable() {
- public void run() {
- System.out.println(Thread.currentThread().getName() + "---------我准备好---------------");
- // 等待一起出发
- try {
- cb.await();
- } catch (InterruptedException | BrokenBarrierException e) {
- e.printStackTrace();
- }
- ZKDistributeImproveLock lock = new ZKDistributeImproveLock("/distLock");
-
- try {
- lock.lock();
- System.out.println(Thread.currentThread().getName() + " 获得锁!");
- } finally {
- lock.unlock();
- }
- }
- }).start();
- }
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。