赞
踩
单进程的系统中,存在多线程同时操作一个公共变量,此时需要加锁对变量进行同步操作,保证多线程的操作线性执行消除并发修改。解决的是单进程中的多线程并发问题。
只要的应用场景是在集群模式的多个相同服务,可能会部署在不同机器上,解决进程间安全问题,防止多进程同时操作一个变量或者数据库。解决的是多进程的并发问题。
1,在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行;
2,具备可重入特性;
3,具备锁失效机制,防止死锁;
4,具备非阻塞锁特性,即如果没有获取到锁直接返回获取锁失败。
1.1,redis有很高的性能;
1.2,redis命令对此支持较好,实现起来比较方便.
- /**
- * redis 分布式锁
- * @author reyco
- *
- */
- public class DistributedLock {
-
- private JedisPool jedisPool;
-
- public DistributedLock(JedisPool jedisPool) {
- super();
- this.jedisPool = jedisPool;
- }
-
- /**
- * 获取锁
- *
- * @param lockName
- * 锁的名称
- * @param acquireTimeout
- * 获取锁的超时时间,单位毫秒数
- * @param timeout
- * key的超时时间,单位毫秒数
- */
- public String lock(String lockName, long acquireTimeout, long timeout) {
- Jedis jedis = null;
- // 返回标识
- String identifier = null;
- try {
- jedis = jedisPool.getResource();
- // key
- String lockKey = "lock_" + lockName;
- // key所对应的value值
- String lockValue = UUID.randomUUID().toString();
- // 计算key的超时时间
- Integer expireTime = (int) timeout / 1000;
- // 获取锁的超时时间
- long end = System.currentTimeMillis() + acquireTimeout;
- // 循环获取锁
- while (System.currentTimeMillis() <= end) {
- // key是否存在
- if (jedis.setnx(lockKey, lockValue) == 1) {
- // 如果不存在,设置key过期时间
- jedis.expire(lockKey, expireTime);
- identifier = lockValue;
- return identifier;
- }
- // 如果设置过期时间异常,那么下一次必须设置key过期时间,否则可能出现死锁
- if (jedis.ttl(lockKey) == -1) {
- jedis.expire(lockKey, expireTime);
- }
- // 设置一个时间间隔,避免频繁尝试获取锁
- Thread.sleep(20);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- return identifier;
- }
- /**
- * 手动释放锁
- *
- * @param lockName
- * 锁的名称
- */
- public boolean unlock(String lockName,String lockValue) {
- try {
- // key
- String lockKey = "lock_" + lockName;
- while(jedis.exists(lockKey)) {
- jedis.watch(lockKey);
- if(lockValue.equals(jedis.get(lockKey))) {
- Transaction transaction = jedis.multi();
- // 手动删除key,释放锁
- transaction.del(lockKey);
- List<Object> execQueue = transaction.exec();
- if(execQueue != null) {
- return true;
- }
- }
- jedis.unwatch();
- break;
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- return false;
- }

- /**
- * @author reyco
- * @date 2021年3月18日---下午5:14:56
- *
- * <pre>
- * redis锁实现
- *
- * <pre>
- */
- @Component
- public class RedisLock implements DistributedLock {
-
- protected Logger logger = LoggerFactory.getLogger(this.getClass());
-
- public final static String SET_LUA_SCRIPT = "if redis.call('setnx', KEYS[1],ARGV[1]) == 1 then return redis.call('EXPIRE',KEYS[1],ARGV[2]) else return 0 end";// lua脚本,用来获取分布式锁
-
- public final static String DEL_LUA_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";// lua脚本,用来释放分布式锁
-
- public final static String RENEWAL_LUA_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('EXPIRE',KEYS[1],ARGV[2]) else return 0 end";// lua脚本,用来续约过期时间
- @Autowired
- @Qualifier("distributedLockThread")
- ExecutorService executorService;
-
- /**
- * 当前线程锁信息
- */
- private ThreadLocal<LockInfo> lockInfoThreadLocal = new ThreadLocal<>();
- /**
- * 锁的key
- */
- private static final String DISTRIBUTED_LOCK_KEY = "distributedLock:";
- /**
- * key的名称
- */
- private static final String DISTRIBUTED_LOCK_DEFAULT = "default";
- /**
- * 锁的超时时间,默认3000
- */
- private static final Integer DISTRIBUTED_LOCK_EXPIRE = 3000;
-
- @Autowired
- private StringRedisTemplate redisTemplate;
-
- @Override
- public void lock() {
- LockInfo lockInfo = new LockInfo(DISTRIBUTED_LOCK_KEY+DISTRIBUTED_LOCK_DEFAULT, SnowFlake.getNextId().toString(), DISTRIBUTED_LOCK_EXPIRE);
- lockInfoThreadLocal.set(lockInfo);
- List<String> keys = new ArrayList<>();
- keys.add(lockInfo.getLockKey());
- String valueExpireTime = (lockInfo.getExpireTime()/1000)+"";
- DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>(SET_LUA_SCRIPT,Long.class);
- while(redisTemplate.execute(defaultRedisScript,keys,lockInfo.getLockValue(),valueExpireTime).intValue()==0) {
- try {
- TimeUnit.MILLISECONDS.sleep(lockInfo.getExpireTime()/10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- logger.debug("加锁成功,【key:" +lockInfo.getLockKey() + "】,【value:" + lockInfo.getLockValue() + "】,【expireTimeSecond:"+lockInfo.getExpireTime()+"】");
- // 招一个看门狗
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- while(lockInfo.getLockValue().equals(redisTemplate.opsForValue().get(lockInfo.getLockKey()))) {
- List<String> keys = new ArrayList<>();
- keys.add(lockInfo.getLockKey());
- DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>(RENEWAL_LUA_SCRIPT,Long.class);
- Long result = redisTemplate.execute(defaultRedisScript,keys,lockInfo.getLockValue(),valueExpireTime);
- if(result.intValue()==1) {
- logger.debug("续约成功,【key:" +lockInfo.getLockKey() + "】,【value:" + lockInfo.getLockValue() + "】,【expireTimeSecond:"+lockInfo.getExpireTime()+"】");
- }
- try {
- Thread.sleep(lockInfo.getExpireTime()/3);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- });
- }
- @Override
- public void unlock() {
- LockInfo lockInfo = lockInfoThreadLocal.get();
- lockInfoThreadLocal.remove();
- List<String> keys = new ArrayList<>();
- keys.add(lockInfo.getLockKey());
- DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>(DEL_LUA_SCRIPT,Long.class);
- Long integer = redisTemplate.execute(defaultRedisScript,keys,lockInfo.getLockValue());
- if(integer.intValue()==1) {
- logger.debug("解锁成功,【key:" +lockInfo.getLockKey() + "】,【value:" + lockInfo.getLockValue() + "】");
- }
- }
-
- /**
- * 加锁
- *
- * @param lockKey
- * 锁的lockKey
- * @param lockValue
- * 锁的value值
- * @param expireTime
- * 过期时间
- */
- @Override
- public void lock(String lockKey, String lockValue, int expireTime) {
- List<String> keys = new ArrayList<>();
- keys.add(lockKey);
- String valueExpireTime = (expireTime/1000)+"";
- DefaultRedisScript<Long> setDefaultRedisScript = new DefaultRedisScript<>(SET_LUA_SCRIPT,Long.class);
- while(redisTemplate.execute(setDefaultRedisScript,keys,lockValue,valueExpireTime).intValue()==0) {
- try {
- Thread.sleep(expireTime/10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- logger.debug("加锁成功,【key:"+lockKey+"】,【value:"+lockValue+"】,【expireTime:"+expireTime+"】");
- //获取锁成功:招一个看门狗
- new Thread(new Runnable() {
- @Override
- public void run() {
- while(lockValue.equals(redisTemplate.opsForValue().get(lockKey))) {
- List<String> keys = new ArrayList<>();
- keys.add(lockKey);
- DefaultRedisScript<Long> renDefaultRedisScript = new DefaultRedisScript<>(RENEWAL_LUA_SCRIPT,Long.class);
- Long result = redisTemplate.execute(renDefaultRedisScript,keys,lockValue,valueExpireTime);
- if(result.intValue()==1) {
- logger.debug("续约成功,【key:"+lockValue+ "】,【value:"+lockValue+"】,【expireTimeSecond:"+valueExpireTime+"】");
- }
- try {
- Thread.sleep(expireTime/3);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }).start();
- }
- /**
- * 解锁
- * @param lockKey
- * @param lockValue
- */
- @Override
- public void unLock(String lockKey, String lockValue) {
- List<String> keys = new ArrayList<>();
- keys.add(lockKey);
- DefaultRedisScript<Long> delDefaultRedisScript = new DefaultRedisScript<>(DEL_LUA_SCRIPT,Long.class);
- Long integer = redisTemplate.execute(delDefaultRedisScript,keys,lockValue);
- if(integer.intValue()==1) {
- logger.debug("解锁成功,【key:" +lockKey + "】,【value:" + lockValue + "】");
- }
- }
-
- @Override
- public void lockInterruptibly() throws InterruptedException {
- }
-
- @Override
- public boolean tryLock() {
- return false;
- }
-
- @Override
- public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
- return false;
- }
-
- @Override
- public Condition newCondition() {
- return null;
- }
-
- /**
- * 锁信息
- *
- * @author reyco
- *
- */
- static class LockInfo {
- private String lockKey;
- private String lockValue;
- private Integer expireTime;
-
- public LockInfo(String lockKey, String lockValue, Integer expireTime) {
- super();
- this.lockKey = lockKey;
- this.lockValue = lockValue;
- this.expireTime = expireTime;
- }
-
- public String getLockKey() {
- return lockKey;
- }
-
- public void setLockKey(String lockKey) {
- this.lockKey = lockKey;
- }
-
- public String getLockValue() {
- return lockValue;
- }
-
- public void setLockValue(String lockValue) {
- this.lockValue = lockValue;
- }
-
- public Integer getExpireTime() {
- return expireTime;
- }
-
- public void setExpireTime(Integer expireTime) {
- this.expireTime = expireTime;
- }
- }
- }

2.1,使用zk的临时节点特性失效分布式锁;
2.1.1,采用临时节点实现分布式锁(互斥锁);
将zookeeper上的一个znode看作是一把锁,所有客户端都去创建/distribute_lock 节点,最终成功创建的 那个客户端也即拥有了这把锁。用完删除掉自己创建的/distribute_lock节点就释放出锁。没有获取到锁的客 户端等待,对/distribute_lock节点watcher删除监听,如果监听到 /distribute_lock节点删除,所有客户端去争取获取锁,没有获取到锁的客户端继续监听。
这种实现方式有羊群问题,会出现死锁问题. 如:监听客户端很多,所有客户端争抢锁,会出现网络阻 塞;获取锁的客户端没有及时的删除节点,所有客户端一直等待,会出现死锁问题。
2.1.2,采用临时顺序节点实现分布式锁:超时机制+临时顺序节点(共享锁).
/distribute_lock 已经预先存在,所有客户端在它下面创建临时顺序编号目录节点,编号最小的获得 锁,用完删除,依次方便。
a,在约定的节点下面创建/distribute_lock节点;
b,所有客户端在/distribute_lock节点下创建自己的临时顺序节点;
c,所有客户端需要获取锁;
先对所有临时顺序节点排序,然后尝试获取锁。如果获取锁,正常执行,如果没有获取到锁,监听比自己次小的节点的删除事件。
此种实现方案有bug:如果中间客户端出现网络故障或宕机,就会出现多个客户端获取锁的bug。
- /**
- * Zookeeper 超时机制+临时节点
- *
- * @author reyco
- *
- */
- public class DistributedLock implements Lock {
- //private static final String ZK_ADDRESS = "192.168.2.107:2181,192.168.2.108:2181,192.168.2.110:2181";
- private static final String ZK_ADDRESS = "192.168.241.123:2181,192.168.241.124:2181,192.168.241.125:2181";
- /**
- * 父节点
- */
- private String distributedLock = "/distributedLock";
- /**
- * 子节点
- */
- private static final String LOCKNAME = "/lock_";
- /**
- * 当前节点
- */
- private static ThreadLocal<String> currentPath = new ThreadLocal<>();
- /**
- * 前一个节点
- */
- private static ThreadLocal<String> beforePath = new ThreadLocal<>();
- /**
- * zkc
- */
- private ZkClient zkClient;
- /**
- * 超时时间
- */
- private static final int TIMIOUT = 5000;
-
- /**
- *
- * @param rootPath
- */
- public DistributedLock(String rootPath) {
- this.distributedLock = rootPath;
- zkClient = new ZkClient(new ZkConnection(ZK_ADDRESS, TIMIOUT));
- if (!zkClient.exists(distributedLock)) {
- zkClient.createPersistent(distributedLock);
- }
- zkClient.setZkSerializer(new ZkSerializer() {
- @Override
- public byte[] serialize(Object data) throws ZkMarshallingError {
- return String.valueOf(data).getBytes();
- }
- @Override
- public Object deserialize(byte[] bytes) throws ZkMarshallingError {
- return new String(bytes);
- }
- });
- }
-
- @Override
- public void lock() {
- // 尝试获取锁
- if (!tryLock()) {
- // 等待
- waitForLock();
- // 获取锁
- lock();
- }
- }
-
- /**
- *
- */
- private void waitForLock() {
- CountDownLatch countDownLatch = new CountDownLatch(1);
- // 注册监听
- IZkDataListener listener = new IZkDataListener() {
- @Override
- public void handleDataDeleted(String dataPath) throws Exception {
- System.out.println(Thread.currentThread().getName()+",监听节点被删除。。。" + dataPath);
- countDownLatch.countDown();
- }
- @Override
- public void handleDataChange(String dataPath, Object data) throws Exception {
- System.out.println("监听节点数据变化。。。" + data);
- }
- };
- zkClient.subscribeDataChanges(this.beforePath.get(), listener);
- try {
- if (this.zkClient.exists(this.beforePath.get())) {
- countDownLatch.await();
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- zkClient.unsubscribeDataChanges(this.beforePath.get(), listener);
- }
-
- @Override
- public void lockInterruptibly() throws InterruptedException {
-
- }
-
- /**
- * 尝试获取锁
- */
- @Override
- public boolean tryLock() {
- // 如果存在就不创建了
- if (null == this.currentPath.get()) {
- // 创建临时顺序节点
- currentPath.set(this.zkClient.createPersistentSequential(distributedLock + LOCKNAME, "lockName"));
- }
- // 获取所有子节点
- List<String> children = this.zkClient.getChildren(distributedLock);
- // 排序children
- Collections.sort(children);
- // 判断是否最小节点
- if (currentPath.get().equals(distributedLock + "/" + children.get(0))) {
- return true;
- }
- // 获取前一个节点
- else {
- // 获取当前节点的索引号
- int currentIndex = children.indexOf(currentPath.get().substring(distributedLock.length()+1));
- beforePath.set(distributedLock + "/"+children.get(currentIndex - 1));
- }
- return false;
- }
-
- @Override
- public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
- return false;
- }
-
- @Override
- public void unlock() {
- zkClient.delete(this.currentPath.get());
- }
-
- @Override
- public Condition newCondition() {
- return null;
- }
-
- /**
- * 测试
- * @param args
- * @throws InterruptedException
- */
- public static void main(String[] args) throws InterruptedException {
- String path = "/distributedLock";
- List<String> list = new ArrayList<>();
- int threadSize = 20;
- CountDownLatch countDownLatch = new CountDownLatch(threadSize);
- ExecutorService threadPool = Executors.newFixedThreadPool(threadSize);
- for (int i = 0; i < threadSize; i++) {
- threadPool.execute(new Runnable() {
- @Override
- public void run() {
- DistributedLock distributedLock = new DistributedLock(path);
- try {
- distributedLock.lock();
- list.add("asdw ");
- countDownLatch.countDown();
- } catch (Exception e) {
- e.printStackTrace();
- }finally {
- distributedLock.unlock();
- }
- }
- });
- }
- countDownLatch.await();
- threadPool.shutdown();
- System.out.println(list.size());
- }
- }

2,解决zk临时顺序节点分布式锁的bug,采用:超时机制+永久节点 实现分布式锁(共享锁)。 重构一个类似redis-ping-pong:永久顺序节点写入(客户端ip+port),判断获取锁的节点是否能正常工作,如果能正常工作就让其正常执行业务,执行业务后删除节点;如果不能正常工作就直接删除这个节点。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。