赞
踩
•在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题。
•但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题。
•那么就需要一种更加高级的锁机制,来处理种跨机器的进程之间的数据同步问题——这就是分布式锁。
•核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。
1.客户端获取锁时,在lock节点下创建临时顺序节点。
2.然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。
3.如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。
4.如果发现比自己小的那个节点被删除,则客户端的
Watcher会收到相应通知,此时再次判断自己创建的节点
是否是lock子节点中序号最小的,如果是则获取到了锁,
如果不是则重复以上步骤继续获取到比自己小的一个节点
并注册监听。
Curator实现分布式锁API
在Curator中有五种锁方案:
InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
InterProcessMutex:分布式可重入排它锁
InterProcessReadWriteLock:分布式读写锁
InterProcessMultiLock:将多个锁作为单个实体管理的容器
InterProcessSemaphoreV2:共享信号量
package com.itheima.curator; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import sun.security.util.math.intpoly.IntegerPolynomial; import java.util.concurrent.TimeUnit; public class Ticket12306 implements Runnable{ private int tickets = 10; /// 相当于数据库的票数 private InterProcessMutex interProcessMutex; /// 分布式锁 public Ticket12306() { /// 创建重试策略 休眠时间, 重试次数 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); CuratorFramework build = CuratorFrameworkFactory.builder().connectString("192.168.0.102:2181") .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000) .retryPolicy(retryPolicy).build(); build.start(); /** * @param client 当前客户端 * @param path 当前路径 */ interProcessMutex = new InterProcessMutex(build,"/lock"); } /** * 模拟 */ @Override public void run() { while (true){ /// 获取锁锁 /** * @param time 时间 * @param unit 时间单位 */ try { interProcessMutex.acquire(3,TimeUnit.SECONDS); if(tickets > 0) { System.out.println(Thread.currentThread()+":"+tickets); Thread.sleep(100); tickets--; } } catch (Exception exception) { exception.printStackTrace(); } finally { /// 释放锁 try { interProcessMutex.release(); } catch (Exception exception) { exception.printStackTrace(); } } } } }
package com.itheima.curator; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.*; import org.apache.curator.retry.ExponentialBackoffRetry; import org.junit.After; import org.junit.Before; import org.junit.Test; public class LockTest { public static void main(String[] args) { Ticket12306 ticket12306 = new Ticket12306(); /// 创建客户端 Thread thread = new Thread(ticket12306,"携程"); Thread thread1 = new Thread(ticket12306,"飞猪"); // Thread[携程,5,main]:10 // Thread[飞猪,5,main]:9 // Thread[携程,5,main]:8 // Thread[飞猪,5,main]:7 // Thread[携程,5,main]:6 // Thread[飞猪,5,main]:5 // Thread[携程,5,main]:4 // Thread[飞猪,5,main]:3 // Thread[携程,5,main]:2 // Thread[飞猪,5,main]:1 thread.start(); thread1.start(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。