当前位置:   article > 正文

分布式锁解决方案_分布式锁解决什么问题

分布式锁解决什么问题

锁:

      单进程的系统中,存在多线程同时操作一个公共变量,此时需要加锁对变量进行同步操作,保证多线程的操作线性执行消除并发修改。解决的是单进程中的多线程并发问题。

分布式锁:

      只要的应用场景是在集群模式的多个相同服务,可能会部署在不同机器上,解决进程间安全问题,防止多进程同时操作一个变量或者数据库。解决的是多进程的并发问题。

JVM有锁的感念(局限于同一台java虚拟机)

一:分布式锁具备的条件:

           1,在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行;

           2,具备可重入特性;

          3,具备锁失效机制,防止死锁;

          4,具备非阻塞锁特性,即如果没有获取到锁直接返回获取锁失败。

二:分布式锁的实现方式

       1,基于缓存(redis)实现分布式锁(共享锁);

              1.1,redis有很高的性能; 
              1.2,redis命令对此支持较好,实现起来比较方便.                   

  1. /**
  2. * redis 分布式锁
  3. * @author reyco
  4. *
  5. */
  6. public class DistributedLock {
  7. private JedisPool jedisPool;
  8. public DistributedLock(JedisPool jedisPool) {
  9. super();
  10. this.jedisPool = jedisPool;
  11. }
  12. /**
  13. * 获取锁
  14. *
  15. * @param lockName
  16. * 锁的名称
  17. * @param acquireTimeout
  18. * 获取锁的超时时间,单位毫秒数
  19. * @param timeout
  20. * key的超时时间,单位毫秒数
  21. */
  22. public String lock(String lockName, long acquireTimeout, long timeout) {
  23. Jedis jedis = null;
  24. // 返回标识
  25. String identifier = null;
  26. try {
  27. jedis = jedisPool.getResource();
  28. // key
  29. String lockKey = "lock_" + lockName;
  30. // key所对应的value值
  31. String lockValue = UUID.randomUUID().toString();
  32. // 计算key的超时时间
  33. Integer expireTime = (int) timeout / 1000;
  34. // 获取锁的超时时间
  35. long end = System.currentTimeMillis() + acquireTimeout;
  36. // 循环获取锁
  37. while (System.currentTimeMillis() <= end) {
  38. // key是否存在
  39. if (jedis.setnx(lockKey, lockValue) == 1) {
  40. // 如果不存在,设置key过期时间
  41. jedis.expire(lockKey, expireTime);
  42. identifier = lockValue;
  43. return identifier;
  44. }
  45. // 如果设置过期时间异常,那么下一次必须设置key过期时间,否则可能出现死锁
  46. if (jedis.ttl(lockKey) == -1) {
  47. jedis.expire(lockKey, expireTime);
  48. }
  49. // 设置一个时间间隔,避免频繁尝试获取锁
  50. Thread.sleep(20);
  51. }
  52. } catch (Exception e) {
  53. e.printStackTrace();
  54. }
  55. return identifier;
  56. }
  57. /**
  58. * 手动释放锁
  59. *
  60. * @param lockName
  61. * 锁的名称
  62. */
  63. public boolean unlock(String lockName,String lockValue) {
  64. try {
  65. // key
  66. String lockKey = "lock_" + lockName;
  67. while(jedis.exists(lockKey)) {
  68. jedis.watch(lockKey);
  69. if(lockValue.equals(jedis.get(lockKey))) {
  70. Transaction transaction = jedis.multi();
  71. // 手动删除key,释放锁
  72. transaction.del(lockKey);
  73. List<Object> execQueue = transaction.exec();
  74. if(execQueue != null) {
  75. return true;
  76. }
  77. }
  78. jedis.unwatch();
  79. break;
  80. }
  81. } catch (Exception e) {
  82. e.printStackTrace();
  83. }
  84. return false;
  85. }
  1. /**
  2. * @author reyco
  3. * @date 2021年3月18日---下午5:14:56
  4. *
  5. * <pre>
  6. * redis锁实现
  7. *
  8. * <pre>
  9. */
  10. @Component
  11. public class RedisLock implements DistributedLock {
  12. protected Logger logger = LoggerFactory.getLogger(this.getClass());
  13. public final static String SET_LUA_SCRIPT = "if redis.call('setnx', KEYS[1],ARGV[1]) == 1 then return redis.call('EXPIRE',KEYS[1],ARGV[2]) else return 0 end";// lua脚本,用来获取分布式锁
  14. public final static String DEL_LUA_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";// lua脚本,用来释放分布式锁
  15. public final static String RENEWAL_LUA_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('EXPIRE',KEYS[1],ARGV[2]) else return 0 end";// lua脚本,用来续约过期时间
  16. @Autowired
  17. @Qualifier("distributedLockThread")
  18. ExecutorService executorService;
  19. /**
  20. * 当前线程锁信息
  21. */
  22. private ThreadLocal<LockInfo> lockInfoThreadLocal = new ThreadLocal<>();
  23. /**
  24. * 锁的key
  25. */
  26. private static final String DISTRIBUTED_LOCK_KEY = "distributedLock:";
  27. /**
  28. * key的名称
  29. */
  30. private static final String DISTRIBUTED_LOCK_DEFAULT = "default";
  31. /**
  32. * 锁的超时时间,默认3000
  33. */
  34. private static final Integer DISTRIBUTED_LOCK_EXPIRE = 3000;
  35. @Autowired
  36. private StringRedisTemplate redisTemplate;
  37. @Override
  38. public void lock() {
  39. LockInfo lockInfo = new LockInfo(DISTRIBUTED_LOCK_KEY+DISTRIBUTED_LOCK_DEFAULT, SnowFlake.getNextId().toString(), DISTRIBUTED_LOCK_EXPIRE);
  40. lockInfoThreadLocal.set(lockInfo);
  41. List<String> keys = new ArrayList<>();
  42. keys.add(lockInfo.getLockKey());
  43. String valueExpireTime = (lockInfo.getExpireTime()/1000)+"";
  44. DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>(SET_LUA_SCRIPT,Long.class);
  45. while(redisTemplate.execute(defaultRedisScript,keys,lockInfo.getLockValue(),valueExpireTime).intValue()==0) {
  46. try {
  47. TimeUnit.MILLISECONDS.sleep(lockInfo.getExpireTime()/10);
  48. } catch (InterruptedException e) {
  49. e.printStackTrace();
  50. }
  51. }
  52. logger.debug("加锁成功,【key:" +lockInfo.getLockKey() + "】,【value:" + lockInfo.getLockValue() + "】,【expireTimeSecond:"+lockInfo.getExpireTime()+"】");
  53. // 招一个看门狗
  54. executorService.execute(new Runnable() {
  55. @Override
  56. public void run() {
  57. while(lockInfo.getLockValue().equals(redisTemplate.opsForValue().get(lockInfo.getLockKey()))) {
  58. List<String> keys = new ArrayList<>();
  59. keys.add(lockInfo.getLockKey());
  60. DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>(RENEWAL_LUA_SCRIPT,Long.class);
  61. Long result = redisTemplate.execute(defaultRedisScript,keys,lockInfo.getLockValue(),valueExpireTime);
  62. if(result.intValue()==1) {
  63. logger.debug("续约成功,【key:" +lockInfo.getLockKey() + "】,【value:" + lockInfo.getLockValue() + "】,【expireTimeSecond:"+lockInfo.getExpireTime()+"】");
  64. }
  65. try {
  66. Thread.sleep(lockInfo.getExpireTime()/3);
  67. } catch (InterruptedException e) {
  68. e.printStackTrace();
  69. }
  70. }
  71. }
  72. });
  73. }
  74. @Override
  75. public void unlock() {
  76. LockInfo lockInfo = lockInfoThreadLocal.get();
  77. lockInfoThreadLocal.remove();
  78. List<String> keys = new ArrayList<>();
  79. keys.add(lockInfo.getLockKey());
  80. DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>(DEL_LUA_SCRIPT,Long.class);
  81. Long integer = redisTemplate.execute(defaultRedisScript,keys,lockInfo.getLockValue());
  82. if(integer.intValue()==1) {
  83. logger.debug("解锁成功,【key:" +lockInfo.getLockKey() + "】,【value:" + lockInfo.getLockValue() + "】");
  84. }
  85. }
  86. /**
  87. * 加锁
  88. *
  89. * @param lockKey
  90. * 锁的lockKey
  91. * @param lockValue
  92. * 锁的value值
  93. * @param expireTime
  94. * 过期时间
  95. */
  96. @Override
  97. public void lock(String lockKey, String lockValue, int expireTime) {
  98. List<String> keys = new ArrayList<>();
  99. keys.add(lockKey);
  100. String valueExpireTime = (expireTime/1000)+"";
  101. DefaultRedisScript<Long> setDefaultRedisScript = new DefaultRedisScript<>(SET_LUA_SCRIPT,Long.class);
  102. while(redisTemplate.execute(setDefaultRedisScript,keys,lockValue,valueExpireTime).intValue()==0) {
  103. try {
  104. Thread.sleep(expireTime/10);
  105. } catch (InterruptedException e) {
  106. e.printStackTrace();
  107. }
  108. }
  109. logger.debug("加锁成功,【key:"+lockKey+"】,【value:"+lockValue+"】,【expireTime:"+expireTime+"】");
  110. //获取锁成功:招一个看门狗
  111. new Thread(new Runnable() {
  112. @Override
  113. public void run() {
  114. while(lockValue.equals(redisTemplate.opsForValue().get(lockKey))) {
  115. List<String> keys = new ArrayList<>();
  116. keys.add(lockKey);
  117. DefaultRedisScript<Long> renDefaultRedisScript = new DefaultRedisScript<>(RENEWAL_LUA_SCRIPT,Long.class);
  118. Long result = redisTemplate.execute(renDefaultRedisScript,keys,lockValue,valueExpireTime);
  119. if(result.intValue()==1) {
  120. logger.debug("续约成功,【key:"+lockValue+ "】,【value:"+lockValue+"】,【expireTimeSecond:"+valueExpireTime+"】");
  121. }
  122. try {
  123. Thread.sleep(expireTime/3);
  124. } catch (InterruptedException e) {
  125. e.printStackTrace();
  126. }
  127. }
  128. }
  129. }).start();
  130. }
  131. /**
  132. * 解锁
  133. * @param lockKey
  134. * @param lockValue
  135. */
  136. @Override
  137. public void unLock(String lockKey, String lockValue) {
  138. List<String> keys = new ArrayList<>();
  139. keys.add(lockKey);
  140. DefaultRedisScript<Long> delDefaultRedisScript = new DefaultRedisScript<>(DEL_LUA_SCRIPT,Long.class);
  141. Long integer = redisTemplate.execute(delDefaultRedisScript,keys,lockValue);
  142. if(integer.intValue()==1) {
  143. logger.debug("解锁成功,【key:" +lockKey + "】,【value:" + lockValue + "】");
  144. }
  145. }
  146. @Override
  147. public void lockInterruptibly() throws InterruptedException {
  148. }
  149. @Override
  150. public boolean tryLock() {
  151. return false;
  152. }
  153. @Override
  154. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  155. return false;
  156. }
  157. @Override
  158. public Condition newCondition() {
  159. return null;
  160. }
  161. /**
  162. * 锁信息
  163. *
  164. * @author reyco
  165. *
  166. */
  167. static class LockInfo {
  168. private String lockKey;
  169. private String lockValue;
  170. private Integer expireTime;
  171. public LockInfo(String lockKey, String lockValue, Integer expireTime) {
  172. super();
  173. this.lockKey = lockKey;
  174. this.lockValue = lockValue;
  175. this.expireTime = expireTime;
  176. }
  177. public String getLockKey() {
  178. return lockKey;
  179. }
  180. public void setLockKey(String lockKey) {
  181. this.lockKey = lockKey;
  182. }
  183. public String getLockValue() {
  184. return lockValue;
  185. }
  186. public void setLockValue(String lockValue) {
  187. this.lockValue = lockValue;
  188. }
  189. public Integer getExpireTime() {
  190. return expireTime;
  191. }
  192. public void setExpireTime(Integer expireTime) {
  193. this.expireTime = expireTime;
  194. }
  195. }
  196. }


      2,基于Zookeeper实现分布式锁;

                     2.1,使用zk的临时节点特性失效分布式锁;

                          2.1.1,采用临时节点实现分布式锁(互斥锁);

                                     将zookeeper上的一个znode看作是一把锁,所有客户端都去创建/distribute_lock 节点,最终成功创建的   那个客户端也即拥有了这把锁。用完删除掉自己创建的/distribute_lock节点就释放出锁。没有获取到锁的客 户端等待,对/distribute_lock节点watcher删除监听,如果监听到 /distribute_lock节点删除,所有客户端去争取获取锁,没有获取到锁的客户端继续监听。

                                     这种实现方式有羊群问题,会出现死锁问题.  如:监听客户端很多,所有客户端争抢锁,会出现网络阻    塞;获取锁的客户端没有及时的删除节点,所有客户端一直等待,会出现死锁问题。 

                          2.1.2,采用临时顺序节点实现分布式锁:超时机制+临时顺序节点(共享锁).

                                           /distribute_lock 已经预先存在,所有客户端在它下面创建临时顺序编号目录节点,编号最小的获得      锁,用完删除,依次方便。

                                    a,在约定的节点下面创建/distribute_lock节点;

                                    b,所有客户端在/distribute_lock节点下创建自己的临时顺序节点;

                                    c,所有客户端需要获取锁;

                                              先对所有临时顺序节点排序,然后尝试获取锁。如果获取锁,正常执行,如果没有获取到锁,监听比自己次小的节点的删除事件。

                                      此种实现方案有bug:如果中间客户端出现网络故障或宕机,就会出现多个客户端获取锁的bug。

            

  1. /**
  2. * Zookeeper 超时机制+临时节点
  3. *
  4. * @author reyco
  5. *
  6. */
  7. public class DistributedLock implements Lock {
  8. //private static final String ZK_ADDRESS = "192.168.2.107:2181,192.168.2.108:2181,192.168.2.110:2181";
  9. private static final String ZK_ADDRESS = "192.168.241.123:2181,192.168.241.124:2181,192.168.241.125:2181";
  10. /**
  11. * 父节点
  12. */
  13. private String distributedLock = "/distributedLock";
  14. /**
  15. * 子节点
  16. */
  17. private static final String LOCKNAME = "/lock_";
  18. /**
  19. * 当前节点
  20. */
  21. private static ThreadLocal<String> currentPath = new ThreadLocal<>();
  22. /**
  23. * 前一个节点
  24. */
  25. private static ThreadLocal<String> beforePath = new ThreadLocal<>();
  26. /**
  27. * zkc
  28. */
  29. private ZkClient zkClient;
  30. /**
  31. * 超时时间
  32. */
  33. private static final int TIMIOUT = 5000;
  34. /**
  35. *
  36. * @param rootPath
  37. */
  38. public DistributedLock(String rootPath) {
  39. this.distributedLock = rootPath;
  40. zkClient = new ZkClient(new ZkConnection(ZK_ADDRESS, TIMIOUT));
  41. if (!zkClient.exists(distributedLock)) {
  42. zkClient.createPersistent(distributedLock);
  43. }
  44. zkClient.setZkSerializer(new ZkSerializer() {
  45. @Override
  46. public byte[] serialize(Object data) throws ZkMarshallingError {
  47. return String.valueOf(data).getBytes();
  48. }
  49. @Override
  50. public Object deserialize(byte[] bytes) throws ZkMarshallingError {
  51. return new String(bytes);
  52. }
  53. });
  54. }
  55. @Override
  56. public void lock() {
  57. // 尝试获取锁
  58. if (!tryLock()) {
  59. // 等待
  60. waitForLock();
  61. // 获取锁
  62. lock();
  63. }
  64. }
  65. /**
  66. *
  67. */
  68. private void waitForLock() {
  69. CountDownLatch countDownLatch = new CountDownLatch(1);
  70. // 注册监听
  71. IZkDataListener listener = new IZkDataListener() {
  72. @Override
  73. public void handleDataDeleted(String dataPath) throws Exception {
  74. System.out.println(Thread.currentThread().getName()+",监听节点被删除。。。" + dataPath);
  75. countDownLatch.countDown();
  76. }
  77. @Override
  78. public void handleDataChange(String dataPath, Object data) throws Exception {
  79. System.out.println("监听节点数据变化。。。" + data);
  80. }
  81. };
  82. zkClient.subscribeDataChanges(this.beforePath.get(), listener);
  83. try {
  84. if (this.zkClient.exists(this.beforePath.get())) {
  85. countDownLatch.await();
  86. }
  87. } catch (Exception e) {
  88. e.printStackTrace();
  89. }
  90. zkClient.unsubscribeDataChanges(this.beforePath.get(), listener);
  91. }
  92. @Override
  93. public void lockInterruptibly() throws InterruptedException {
  94. }
  95. /**
  96. * 尝试获取锁
  97. */
  98. @Override
  99. public boolean tryLock() {
  100. // 如果存在就不创建了
  101. if (null == this.currentPath.get()) {
  102. // 创建临时顺序节点
  103. currentPath.set(this.zkClient.createPersistentSequential(distributedLock + LOCKNAME, "lockName"));
  104. }
  105. // 获取所有子节点
  106. List<String> children = this.zkClient.getChildren(distributedLock);
  107. // 排序children
  108. Collections.sort(children);
  109. // 判断是否最小节点
  110. if (currentPath.get().equals(distributedLock + "/" + children.get(0))) {
  111. return true;
  112. }
  113. // 获取前一个节点
  114. else {
  115. // 获取当前节点的索引号
  116. int currentIndex = children.indexOf(currentPath.get().substring(distributedLock.length()+1));
  117. beforePath.set(distributedLock + "/"+children.get(currentIndex - 1));
  118. }
  119. return false;
  120. }
  121. @Override
  122. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  123. return false;
  124. }
  125. @Override
  126. public void unlock() {
  127. zkClient.delete(this.currentPath.get());
  128. }
  129. @Override
  130. public Condition newCondition() {
  131. return null;
  132. }
  133. /**
  134. * 测试
  135. * @param args
  136. * @throws InterruptedException
  137. */
  138. public static void main(String[] args) throws InterruptedException {
  139. String path = "/distributedLock";
  140. List<String> list = new ArrayList<>();
  141. int threadSize = 20;
  142. CountDownLatch countDownLatch = new CountDownLatch(threadSize);
  143. ExecutorService threadPool = Executors.newFixedThreadPool(threadSize);
  144. for (int i = 0; i < threadSize; i++) {
  145. threadPool.execute(new Runnable() {
  146. @Override
  147. public void run() {
  148. DistributedLock distributedLock = new DistributedLock(path);
  149. try {
  150. distributedLock.lock();
  151. list.add("asdw ");
  152. countDownLatch.countDown();
  153. } catch (Exception e) {
  154. e.printStackTrace();
  155. }finally {
  156. distributedLock.unlock();
  157. }
  158. }
  159. });
  160. }
  161. countDownLatch.await();
  162. threadPool.shutdown();
  163. System.out.println(list.size());
  164. }
  165. }

                     2,解决zk临时顺序节点分布式锁的bug,采用:超时机制+永久节点 实现分布式锁(共享锁)。                                                               重构一个类似redis-ping-pong:永久顺序节点写入(客户端ip+port),判断获取锁的节点是否能正常工作,如果能正常工作就让其正常执行业务,执行业务后删除节点;如果不能正常工作就直接删除这个节点。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/776323
推荐阅读
相关标签
  

闽ICP备14008679号