赞
踩
分布式的环境中会不会出现脏数据的情况呢?类似单机程序中线程安全的问题。观察下面的例子
上面的设计是存在线程安全问题
问题
假设Redis 里面的某个商品库存为 1;此时两个用户同时下单,其中一个下单请求执行到第 3 步,更新数据库的库存为 0,但是第 4 步还没有执行。
而另外一个用户下单执行到了第 2 步,发现库存还是 1,就继续执行第 3 步。但是商品库存已经为0,所以如果数据库没有限制就会出现超卖的问题。
解决方法
比较直观的解决方法就是用锁把 2、3、4 步锁住,让他们执行完之后,另一个线程才能进来执行。
以上是单机情况下的加锁解决方法。然而,如果公司业务发展迅速,系统应对并发不断提高,解决方案是要增加一台机器,结果会出现更大的问题
假设有两个下单请求同时到来,分别由两个机器执行,那么这两个请求是可以同时执行了,依然存在超卖的问题。
因为如图所示系统是运行在两个不同的 JVM 里面,不同的机器上,增加的锁只对自己当前 JVM 里面的线程有效,对于其他 JVM 的线程是无效的。所以现在已经不是线程安全问题。需要保证两台机器加的锁是同一个锁,此时分布式锁就能解决该问题。
此时就要介绍到分布式锁的作用:分布式锁的作用:在整个系统提供一个全局、唯一的锁,在分布式系统中每个系统在进行相关操作的时候需要获取到该锁,才能执行相应操作。
根据上述分析,对于多机的情况我们需要使用分布式锁来解决上述问题。这里就要引入zookeeper来实现。为什么借助zookeeper可以实现分布式线程锁呢?
这就利用到了zookeeper的原理,zookeeper提供了监听存储在zk内部数据的功能,从而可以达到基于数据的集群管理。同时,zk可以创建临时带序号的节点,且ZK会保证节点的全局有序性。所以利用上面两个特性,可以利用ZK实现分布式锁的功能。具体描述如下。
实现思路
整体的流程图可以如下所示:
图中各部分的注释信息如上图所示。接下来,为了更直观的理解zk的分布式锁,采用代码的方法对上述思路进行测试。
这里创建10个线程,来模拟通过zk集群获取锁的过程,代码如下所示:
main方法
创建10个线程,为了演示更加直观,这些线程的执行的具体任务,就是抢锁.
// zk实现分布式锁 public class DisLockTest { public static void main(String[] args) { // 使用10个线程模拟分布式环境 for (int i=0; i<10; i++) { Thread t = new Thread(new DisLockRunnable()).start(); // 启动线程 } } static class DisLockRunnable implements Runnable{ @Override public void run() { // 每个线程具体的任务,每个线程就是抢锁 DisClient disClient = new DisClient(); // 获取锁 disClient.getDisLock(); // 模拟获取锁后的其它动作 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } // 释放锁 disClient.deleteLock(); } } }
核心实现
核心代码部分就是实现上述介绍过程中的实现分布式锁的逻辑。每个线程在调用getDisLock
方法获取锁的时候,首先都需要去zk某一指定目录下创建一个有序的临时节点,然后获取到目录下序号最小的节点判断是不是自己刚刚创建的那个节点。如果是,则说明该线程可以获取到锁,如果不是,则需要设置监听器,去监听其上一个节点的状态。如果其上一个节点发生变化,说明上一个节点代表的线程执行结束,则该节点可以解除等待并重新执行尝试获取锁的操作,然后再重复执行上面的过程获取目录下序号最小的几点判断是不是自己的,如果是,则该线程获取到锁。
代码内容如下:
// 线程执行任务就是抢锁 实现思路: // 1. 去zk中创建临时序列节点, 并获取到序号 // 2. 判断自己创建节点序号是否是当前节点最小序号,如果是则获取锁, 执行相关操作,最后要释放锁 // 3. 不是最小节点,当前线程需要等待,等待前一个序号的节点被删除,然后再次判断自己是否是最小节点 public class DisClient { public DisClient() { // 初始化zk的/distrilLock节点, 会出现线程安全问题 // 使用同步锁,保证了检查和创建的安全性 synchronized (DisClient.class) { if (!zkClient.exists("/distrilock")) { zkClient.createPersistent("/distrilock"); } } } // 定义前一个节点 String beforeNodePath; // 定义当前节点 String currentNodePath; // 计数器, 用来使没有获得锁的线程进行等待 CountDownLatch countDownLatch=null; // 获取到ZkClient, 连接zk集群 private ZkClient zkClient = new ZkClient("linux121:2181, linux122:2181"); // 把抢锁过程分为两部分,一部分是创建节点,比较序号。另一部分是等待锁 // 完整获取锁方法 public void getDisLock() { // 获取当前线程名称 String threadName = Thread.currentThread().getName(); // 首先调用tryGetLock if (tryGetLock()) { // 说明获取到锁了 System.out.println(threadName + ": 获取到了锁"); } else{ System.out.println(threadName + ": 获取锁失败, 进入等待状态"); } // 没有获取到锁,则需要进行等待,同时监听其上一个节点 waitForLock(); // 递归获取锁。即等待结束后,可以调用getDisLock方法重新尝试获取锁 getDisLock(); } } // 尝试获取锁 public boolean tryGetLock() { // 在指定目录下创建临时顺序节点 if (null == currentNodePath || "".equals(currentNodePath)) { currentNodePath = zkClient.createEphemeralSequential("/distrilock/", "lock"); } // 获取到路径下所有的子节点 List<String> childs = zkClient.getChildren("/distrilock"); // 对节点信息进行排序 Collections.sort(childs); // 默认是升序 String minNode = childs.get(0); // 获取序号最小的节点 // 判断自己创建节点是否与最小序号一致 if (currentNodePath.equals("/distrilock/" + minNode)) { // 说明当前线程创建的就是序号最小节点 return true; } else{ // 说明最小节点不是自己创建的 // 此时,要监控自己当前节点序号前一个的节点 int i = Collections.binarySearch(childs, currentNodePath.substring("/distrilock/".length())); // 前一个(lastNodeChild是不包括父节点) String lastNodeChild = childs.get(i - 1); beforeNodePath = "/distrilock/" + lastNodeChild; } return false; } // 等待之前节点释放锁, 如何判断锁被释放,需要唤醒线程,继续尝试tryGetLock public void waitForLock() { // 准备一个监听器 IZkDataListener iZkDataListener = new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception { } @Override // 监听数据被删除 public void handleDataDeleted(String s) throws Exception { // 提醒当前线程再次获取锁 // 这里即当有前一个节点被删除时,唤醒下一个节点的线程 countDownLatch.countDown(); // 把值减一,变为0,则唤醒之前await的线程 } }; // 监控前一个节点 zkClient.subscribeDataChanges(beforeNodePath, iZkDataListener); // 在监听的通知没来之前,该线程应该是等待状态, 先判断一次上一个节点是否还存在 if (zkClient.exists(beforeNodePath)) { // 开始等待, CountDownLatch: 线程同步计数器 countDownLatch = new CountDownLatch(1); try { countDownLatch.await(); // 阻塞,直到countDownLatch值变为0 } catch (InterruptedException e) { e.printStackTrace(); } } // 此时监听结束,可以解除监听 zkClient.subscribeDataChanges(beforeNodePath, iZkDataListener); } // 释放锁 public void deleteLock() { if (zkClient!=null) { zkClient.delete(currentNodePath); zkClient.close(); System.out.println(Thread.currentThread().getName() + ": 释放了锁资源"); } } }
最后的输出结果:
Thread-7: 获取到了锁 Thread-4: 获取锁失败, 进入等待状态 Thread-5: 获取锁失败, 进入等待状态 Thread-2: 获取锁失败, 进入等待状态 Thread-8: 获取锁失败, 进入等待状态 Thread-0: 获取锁失败, 进入等待状态 Thread-3: 获取锁失败, 进入等待状态 Thread-6: 获取锁失败, 进入等待状态 Thread-1: 获取锁失败, 进入等待状态 Thread-9: 获取锁失败, 进入等待状态 Thread-7: 释放了锁资源 Thread-5: 获取到了锁 Thread-5: 释放了锁资源 Thread-8: 获取到了锁 Thread-8: 释放了锁资源 Thread-0: 获取到了锁 Thread-0: 释放了锁资源 Thread-2: 获取到了锁 Thread-2: 释放了锁资源 Thread-4: 获取到了锁 Thread-4: 释放了锁资源 Thread-3: 获取到了锁 Thread-3: 释放了锁资源 Thread-6: 获取到了锁 Thread-6: 释放了锁资源 Thread-1: 获取到了锁 Thread-1: 释放了锁资源 Thread-9: 获取到了锁 Thread-9: 释放了锁资源
可以看到,整体流程就是首先第一个线程获取到锁,然后其余线程都获取不到进入等待状态。此时每个线程都监听其前一个节点的状态。然后就是每当前一个线程释放锁之后,后一个线程获取锁,依次进行。
通过上面的代码流程可以方便我们更好的理解ZK实现分布式锁的原理和步骤。
最后,分布式锁的实现可以是 Redis、Zookeeper,相对来说生产环境如果使用分布式锁可以考虑使用Redis实现而非Zk。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。