赞
踩
目录
在分布式系统中,存在着许多进程同时运转,而为了防止分布式系统中的多个进程之间相互干扰,我们需要一种分布式协调技术来对这些进程进行调度。而这个分布式协调技术的核心就是来实现分布式锁。
分布式锁:控制分布式系统有序的去对共享资源进行操作,通过互斥来保持一致性。

setnx 命令实现。

在这里我们使用两个线程来模拟分布锁中的客户端,编写分布式锁类,通过测试要实现:
只有一个线程能够获取到锁,等到这个线程释放后另外一个线程才能拿到锁。
| zookeeper集群环境 |
| 一个简单的springboot环境 |
建议大家在实现之前可以先对zookeeper的一些API操作有一些了解,这些我在我的另外一篇文章做了详细的介绍,有需要的小伙伴可以移步去看看喔。
1.创建一个简单的springboot工程,在pom.xml文件导入zookeeper依赖

- <!--引入对应的zookeeper -->
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.7.1</version>
- </dependency>
2.开启zookeeper集群中的客户端 (至少保证集群能够正常启动)
[root@zookeeper3 ~]# zkCli.sh

3.编写DistributedLock(分布式锁)
- package com.canrioyuan.zookeepertest.zkcase2;
-
- import org.apache.zookeeper.*;
- import org.apache.zookeeper.data.Stat;
-
- import java.io.IOException;
- import java.util.Collections;
- import java.util.List;
- import java.util.concurrent.CountDownLatch;
-
- //分布锁
- public class DistributedLock {
-
- //设置zookeeper连接
- private final String connectString = "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181";
- //设置超时时间
- private final int sessionTimeout = 2000;
- //声明zookeeper
- private final ZooKeeper zk;
- //CountDownLatch使用场景
- //线程计数器 用于线程执行任务,计数 等待线程结束
- private CountDownLatch countDownLatch = new CountDownLatch(1);
- private CountDownLatch waitLatch = new CountDownLatch(1);
- //定义该临时节点上一个节点的路径
- private String waitPath;
- //定义临时节点
- private String mode;
-
-
- public DistributedLock() throws IOException, InterruptedException, KeeperException {
-
- //获取连接
- zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
- @Override
- public void process(WatchedEvent watchedEvent) {
- //如果连接上zk的话便可以对countDownLatch进行释放
- if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
- countDownLatch.countDown();
- }
- //如果上一个节点进行了删除节点的操作后则可以对监听进行释放
- if(watchedEvent.getType()==Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){
- waitLatch.countDown();
- }
-
-
-
- }
- });
-
- //等待zk连接后才会继续往下执行
- countDownLatch.await();
-
- //判断根节点/locks是否存在
- Stat stat = zk.exists("/locks", false);
- int version = stat.getVersion();
- System.out.println(version+"111111");
-
- //如果节点不存在
- if (stat == null) {
- zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
-
- }
-
-
- //对zk加锁
- public void zkLock() {
- //创建节点(临时带序号的)
- try {
- mode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
-
- //判断节点是否是最小的序号节点,如果是的话就获取到锁;如果不是,监听到他序号前一个节点
- List<String> children = zk.getChildren("/locks", false);
-
- //用于判断children中的值
- //如果只有一个值,你就直接获取锁;如果有多个节点,则需要判断谁最小
- if (children.size() == 1) {
- return;
- } else {
- //对获取到的节点进行排序方便持续获取节点
- Collections.sort(children);
-
- //获取节点的名称 截取掉节点的前缀
- String thisNode = mode.substring("/locks/".length());
-
- //通过该节点的名称获取该节点在集合中的位置
- int index = children.indexOf(thisNode);
- System.out.println(index+"11111111112");
- //对节点所在的索引进行判断
- if (index == -1) {
- System.out.println("数据出现错误");
- } else if (index == 0) {
- //只有一个节点可以直接获取锁
- return;
- } else {
- //需要监听他前一个节点的变化
- waitPath = "/locks/" + children.get(index - 1);
- //通过获取前一个节点的路径对这个节点进行监听
- zk.getData(waitPath, true, null);
-
- //等待监听
- waitLatch.await();
- //监听结束后获得锁
- return;
- }
-
- }
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- }
-
-
- //对zk解锁
- public void unZkLock() {
- try {
- zk.delete(mode,0);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (KeeperException e) {
- e.printStackTrace();
- }
- //删除节点
- }
- }

4.编写测试类DistributedLockTest
- package com.canrioyuan.zookeepertest.zkcase2;
-
- import org.apache.zookeeper.KeeperException;
-
- import java.io.IOException;
-
- public class DistributedLockTest {
-
- public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
-
- final DistributedLock lock1 = new DistributedLock();
- final DistributedLock lock2 = new DistributedLock();
-
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- lock1.zkLock();
- System.out.println("线程1启动,获取到锁");
- Thread.sleep(5 * 1000);
-
- lock1.unZkLock();
- System.out.println("线程1释放锁");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }).start();
-
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- lock2.zkLock();
- System.out.println("线程2启动,获取到锁");
- Thread.sleep(5 * 1000);
-
- lock2.unZkLock();
- System.out.println("线程2释放锁");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }).start();
-
- }
-
-
- }

5.启动测试,观察结果
线程1先获取到锁
![]()
线程1执行完业务,释放锁 ;线程2获取到锁

线程2释放锁

因为我之前把连接的字符串设置成IP地址导致的报错,我们必须使用主机名

1.进入linux的hosts文件修改映射
[root@zookeeper2 /]# vi /etc/hosts
在以下代码中增加对应的主机映射

2.修改每一台虚拟机的zoo.cfg
[root@zookeeper2 /]# vi /opt/module/zookeeper/conf/zoo.cfg
修改成对应的映射

3.修改DistributedLock中的connectString

这是我在运行中遇到的问题,大家一定要注意i

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。