当前位置:   article > 正文

使用zookeeper实现分布式锁

使用zookeeper实现分布式锁

zookeeper中,根据zookeeper的特性,我们有两种方式来实现分布式锁。

实现方式一:

在zookeeper节点下面,让需要获得锁的程序,都去创建名为Lock的临时节点,因为多实例创建的节点名称相同,只会有一个实例创建节点成功,那么这个实例就可以获得锁,就可以去执行本身代码逻辑,其他没有创建成功的实例,就去注册一个watch去监听Lock节点,当抢到锁的实例执行完本身程序后,就会去删除自己创建的Lock节点,那么就会触发watch机制,监听这个节点的其他实例就会再次去尝试创建这个节点,从而实现分布式锁

其实现分布式锁的原理就是:zookeeper中节点不重名 + watch机制。流程图如下:

 

 

代码实现如下:

  1. public class ZKDistributeLock implements Lock {
  2. private String lockPath;
  3. private ZkClient client;
  4. // 锁重入计数
  5. private ThreadLocal<Integer> reentrantCount = new ThreadLocal<>();
  6. public ZKDistributeLock(String lockPath) {
  7. super();
  8. this.lockPath = lockPath;
  9. client = new ZkClient("localhost:2181");
  10. client.setZkSerializer(new MyZkSerializer());
  11. }
  12. @Override
  13. public boolean tryLock() { // 不会阻塞
  14. if (this.reentrantCount.get() != null) {
  15. int count = this.reentrantCount.get();
  16. if (count > 0) {
  17. this.reentrantCount.set(++count);
  18. return true;
  19. }
  20. }
  21. // 创建节点
  22. try {
  23. client.createEphemeral(lockPath);
  24. this.reentrantCount.set(1);
  25. } catch (ZkNodeExistsException e) {
  26. return false;
  27. }
  28. return true;
  29. }
  30. @Override
  31. public void unlock() {
  32. // 重入的释放锁处理
  33. if (this.reentrantCount.get() != null) {
  34. int count = this.reentrantCount.get();
  35. if (count > 1) {
  36. this.reentrantCount.set(--count);
  37. return;
  38. } else {
  39. this.reentrantCount.set(null);
  40. }
  41. }
  42. client.delete(lockPath);
  43. }
  44. @Override
  45. public void lock() { // 如果获取不到锁,阻塞等待
  46. if (!tryLock()) {
  47. // 没获得锁,阻塞自己
  48. waitForLock();
  49. // 再次尝试
  50. lock();
  51. }
  52. }
  53. private void waitForLock() {
  54. CountDownLatch cdl = new CountDownLatch(1);
  55. IZkDataListener listener = new IZkDataListener() {
  56. @Override
  57. public void handleDataDeleted(String dataPath) throws Exception {
  58. System.out.println("----收到节点被删除了-------------");
  59. cdl.countDown();
  60. }
  61. @Override
  62. public void handleDataChange(String dataPath, Object data) throws Exception {
  63. }
  64. };
  65. client.subscribeDataChanges(lockPath, listener);
  66. // 阻塞自己
  67. if (this.client.exists(lockPath)) {
  68. try {
  69. cdl.await();
  70. } catch (InterruptedException e) {
  71. e.printStackTrace();
  72. }
  73. }
  74. // 取消注册
  75. client.unsubscribeDataChanges(lockPath, listener);
  76. }
  77. @Override
  78. public void lockInterruptibly() throws InterruptedException {
  79. // TODO Auto-generated method stub
  80. }
  81. @Override
  82. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  83. // TODO Auto-generated method stub
  84. return false;
  85. }
  86. @Override
  87. public Condition newCondition() {
  88. // TODO Auto-generated method stub
  89. return null;
  90. }
  91. public static void main(String[] args) {
  92. // 并发数
  93. int currency = 50;
  94. // 循环屏障
  95. CyclicBarrier cb = new CyclicBarrier(currency);
  96. // 多线程模拟高并发
  97. for (int i = 0; i < currency; i++) {
  98. new Thread(new Runnable() {
  99. public void run() {
  100. System.out.println(Thread.currentThread().getName() + "---------我准备好---------------");
  101. // 等待一起出发
  102. try {
  103. cb.await();
  104. } catch (InterruptedException | BrokenBarrierException e) {
  105. e.printStackTrace();
  106. }
  107. ZKDistributeLock lock = new ZKDistributeLock("/distLock11");
  108. try {
  109. lock.lock();
  110. System.out.println(Thread.currentThread().getName() + " 获得锁!");
  111. } finally {
  112. lock.unlock();
  113. }
  114. }
  115. }).start();
  116. }
  117. }
  118. }

用此方法实现的分布式锁,存在的缺点就是会产生惊群效应,当一个线程去释放掉锁的时候,其它线程都回去抢占这个锁资源,如果线程太多,影响其性能。

实现方式二:

可以在zookeeper节点下面创建一个Lock节点,并在其下面创建临时顺序节点,当第一个节点抢到锁后,后面的节点,只会去watch自己的上一个节点,当自己的上一个节点释放锁后,只会触发下个节点的watch机制,不会引起惊群效应,提高效率。

实现原理:取号 + 最小号获得锁 + watch机制

流程图如下:

代码实现如下:

  1. public class ZKDistributeImproveLock implements Lock {
  2. /*
  3. * 利用临时顺序节点来实现分布式锁
  4. * 获取锁:取排队号(创建自己的临时顺序节点),然后判断自己是否是最小号,如是,则获得锁;不是,则注册前一节点的watcher,阻塞等待
  5. * 释放锁:删除自己创建的临时顺序节点
  6. */
  7. private String lockPath;
  8. private ZkClient client;
  9. private ThreadLocal<String> currentPath = new ThreadLocal<>();
  10. private ThreadLocal<String> beforePath = new ThreadLocal<>();
  11. // 锁重入计数
  12. private ThreadLocal<Integer> reentrantCount = new ThreadLocal<>();
  13. public ZKDistributeImproveLock(String lockPath) {
  14. super();
  15. this.lockPath = lockPath;
  16. client = new ZkClient("localhost:2181");
  17. client.setZkSerializer(new MyZkSerializer());
  18. if (!this.client.exists(lockPath)) {
  19. try {
  20. this.client.createPersistent(lockPath);
  21. } catch (ZkNodeExistsException e) {
  22. }
  23. }
  24. }
  25. @Override
  26. public boolean tryLock() {
  27. if (this.reentrantCount.get() != null) {
  28. int count = this.reentrantCount.get();
  29. if (count > 0) {
  30. this.reentrantCount.set(++count);
  31. return true;
  32. }
  33. }
  34. if (this.currentPath.get() == null) {
  35. currentPath.set(this.client.createEphemeralSequential(lockPath + "/", "aaa"));
  36. }
  37. // 获得所有的子
  38. List<String> children = this.client.getChildren(lockPath);
  39. // 排序list
  40. Collections.sort(children);
  41. // 判断当前节点是否是最小的
  42. if (currentPath.get().equals(lockPath + "/" + children.get(0))) {
  43. this.reentrantCount.set(1);
  44. return true;
  45. } else {
  46. // 取到前一个
  47. // 得到字节的索引号
  48. int curIndex = children.indexOf(currentPath.get().substring(lockPath.length() + 1));
  49. beforePath.set(lockPath + "/" + children.get(curIndex - 1));
  50. }
  51. return false;
  52. }
  53. @Override
  54. public void lock() {
  55. if (!tryLock()) {
  56. // 阻塞等待
  57. waitForLock();
  58. // 再次尝试加锁
  59. lock();
  60. }
  61. }
  62. private void waitForLock() {
  63. CountDownLatch cdl = new CountDownLatch(1);
  64. // 注册watcher
  65. IZkDataListener listener = new IZkDataListener() {
  66. @Override
  67. public void handleDataDeleted(String dataPath) throws Exception {
  68. System.out.println("-----监听到节点被删除");
  69. cdl.countDown();
  70. }
  71. @Override
  72. public void handleDataChange(String dataPath, Object data) throws Exception {
  73. }
  74. };
  75. client.subscribeDataChanges(this.beforePath.get(), listener);
  76. // 怎么让自己阻塞
  77. if (this.client.exists(this.beforePath.get())) {
  78. try {
  79. cdl.await();
  80. } catch (InterruptedException e) {
  81. e.printStackTrace();
  82. }
  83. }
  84. // 醒来后,取消watcher
  85. client.unsubscribeDataChanges(this.beforePath.get(), listener);
  86. }
  87. @Override
  88. public void unlock() {
  89. // 重入的释放锁处理
  90. if (this.reentrantCount.get() != null) {
  91. int count = this.reentrantCount.get();
  92. if (count > 1) {
  93. this.reentrantCount.set(--count);
  94. return;
  95. } else {
  96. this.reentrantCount.set(null);
  97. }
  98. }
  99. // 删除节点
  100. this.client.delete(this.currentPath.get());
  101. }
  102. @Override
  103. public void lockInterruptibly() throws InterruptedException {
  104. // TODO Auto-generated method stub
  105. }
  106. @Override
  107. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  108. // TODO Auto-generated method stub
  109. return false;
  110. }
  111. @Override
  112. public Condition newCondition() {
  113. // TODO Auto-generated method stub
  114. return null;
  115. }
  116. public static void main(String[] args) {
  117. // 并发数
  118. int currency = 50;
  119. // 循环屏障
  120. CyclicBarrier cb = new CyclicBarrier(currency);
  121. // 多线程模拟高并发
  122. for (int i = 0; i < currency; i++) {
  123. new Thread(new Runnable() {
  124. public void run() {
  125. System.out.println(Thread.currentThread().getName() + "---------我准备好---------------");
  126. // 等待一起出发
  127. try {
  128. cb.await();
  129. } catch (InterruptedException | BrokenBarrierException e) {
  130. e.printStackTrace();
  131. }
  132. ZKDistributeImproveLock lock = new ZKDistributeImproveLock("/distLock");
  133. try {
  134. lock.lock();
  135. System.out.println(Thread.currentThread().getName() + " 获得锁!");
  136. } finally {
  137. lock.unlock();
  138. }
  139. }
  140. }).start();
  141. }
  142. }
  143. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/965796
推荐阅读
相关标签
  

闽ICP备14008679号