赞
踩
Zookeeper官网:https://zookeeper.apache.org/index.html
在正式介绍Zookeeper的常用命令之前,我们先来了解一下Zookeeper的相关数据模型:
Zookeeper是一个常见的客户端-服务器模型,我们可以使用命令行或者JavaAPI的方式充当客户端进行访问,其架构如下图所示:
./zkServer.sh start
启动zookeeper服务./zkServer.sh status
查看zookeeper服务运行状态./zkServer.sh restart
重启zookeeper服务./zkServer.sh stop
关闭zookeeper服务./zkCli.sh -server ip:port
连接指定的zookeeper服务(如连接本地可忽略选项直接使用./zkCli.sh)quit
退出客户端交互界面help
查看命令帮助ls 目录
查看指定目录下的znode节点ls -s 目录
查看节点详细信息create znode [value]
创建znode节点(可以携带data)create znode -e [value]
创建临时节点(会话结束后消失)create znode -s [value]
创建顺序节点get znode
查看节点携带数据set znode value
设置节点数据delete znode
删除指定的znode节点(必须为空)deleteall znode
删除指定的znode节点及其子节点Curator:是一个Zookeeper的Java客户端库
Curator官网:http://curator.apache.org/
我们可以使用CuratorFrameworkFactory
静态工厂类进行创建,可以通过如下两种方式配置:
newClient()
方法build()
方法下面我们就给出对应两种代码的实现方式:
newClient:
/** * ZooKeeper测试类 */ public class ZooKeeperTest { private CuratorFramework client = null; @Before public void initByNewClient() { CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 3000, 3000, new ExponentialBackoffRetry(3000, 1)); this.client = client; this.client.start(); } }
build:
/** * ZooKeeper测试类 */ public class ZooKeeperTest { private CuratorFramework client = null; @Before public void init() { CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("127.0.0.1:2181") .sessionTimeoutMs(3000) .connectionTimeoutMs(3000) .retryPolicy(new ExponentialBackoffRetry(3000, 1)) .namespace("") .build(); client.start(); this.client = client; } }
其中各个配置项含义如下:
connectString
:连接字符串,配置服务器地址,格式为ip:portsessionTimeoutMs
:会话超时时间connectionTimeoutMs
:连接超时时间retryPolicy
:重试策略namespace
:设置根目录位置创建节点有如下常见的四种方式:
Case1:创建节点(不携带数据)
@Test
public void testCreate1() throws Exception {
String path = client.create().forPath("/app1");
System.out.println(path);
}
Case2:创建节点(携带数据)
@Test
public void testCreate2() throws Exception {
String path = client.create().forPath("/app2", "curator java api".getBytes());
System.out.println(path);
}
Case3:创建多级节点
@Test
public void testCreate4() throws Exception {
client.create().creatingParentsIfNeeded().forPath("/test/test1/test2");
}
Case4:创建节点并指定类型
@Test
public void testCreate3() throws Exception {
client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
client.create().withMode(CreateMode.PERSISTENT).forPath("/app4");
client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/app5");
client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/app6");
}
删除节点有如下常见的两种方式:
Case1:删除节点(不含子节点)
@Test
public void testDelete() throws Exception {
client.delete().forPath("/app1");
}
Case2:删除节点(递归删除子节点)
@Test
public void testDeleteAll() throws Exception {
client.delete().deletingChildrenIfNeeded().forPath("/test");
}
查询节点有如下常见的三种方式:
Case1:查询子节点信息
@Test
public void testGetChildren() throws Exception {
List<String> childrenList = client.getChildren().forPath("/");
System.out.println(childrenList);
}
Case2:查询节点数据
@Test
public void testGetData() throws Exception {
byte[] bytes = client.getData().forPath("/app2");
System.out.println("data: " + new String(bytes));
}
Case3:查询节点详细信息
@Test
public void testGetData3() throws Exception {
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app2");
System.out.println(stat);
}
修改节点有如下常见的两种方式:
Case1:修改节点数据
@Test
public void testSetData() throws Exception {
Stat stat = client.setData().forPath("/app1", "some data".getBytes());
System.out.println(stat);
}
Case2:修改节点数据(带有版本号)
@Test
public void testSetData2() throws Exception {
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app2");
int version = stat.getVersion();
System.out.println(version);
client.setData().withVersion(version).forPath("/app2", "set with version".getBytes());
}
Watcher事件监听机制:
代码实现:
@Test
public void testCuratorCache() throws Exception {
NodeCache cache = new NodeCache(client, "/app1");
cache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("监听到节点变化...");
}
});
cache.start();
while (true) {
}
}
代码实现:
@Test public void testPathChildrenCache() throws Exception { PathChildrenCache cache = new PathChildrenCache(client, "/app1", true); cache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { System.out.println("监听到子节点变化..."); PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType(); if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { System.out.println("监听到子节点数据变化..."); System.out.println("更新后数据: " + pathChildrenCacheEvent.getData().getData()); } } }); cache.start(); while (true) { } }
代码实现:
@Test
public void testTreeCache() throws Exception {
TreeCache cache = new TreeCache(client, "/app1");
cache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println("监听到节点发生变化...");
System.out.println(treeCacheEvent);
}
});
cache.start();
while (true) {
}
}
/lock
节点下创建一个 **临时顺序 **节点/lock
节点下的全部子节点,如果发现当前节点编号是最小的,则该节点对应的客户端获取到锁,使用完锁后,删除该节点注意:这里创建临时节点是因为防止获取到锁的客户端宕机了,进而导致锁永远不会被删的情况;这是创建顺序节点是方便编号的排序
Cutator提供了下面五种分布式锁的方式:
代码如下:
package org.example; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.concurrent.TimeUnit; public class ZooKeeperLockTest { private static int tickets = 10; // 票数 public static void main(String[] args) { // 建立连接 CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("127.0.0.1:2181") .sessionTimeoutMs(3000) .connectionTimeoutMs(3000) .retryPolicy(new ExponentialBackoffRetry(3000, 1)) .namespace("") .build(); client.start(); // 获取分布式锁 InterProcessMutex lock = new InterProcessMutex(client, "/lock"); Thread t1 = new Thread(() -> { while (true) { try { boolean hasLock = lock.acquire(3, TimeUnit.SECONDS); if (hasLock && tickets > 0) { // 不断抢票 System.out.println("线程" + Thread.currentThread().getName() + "抢到了当前第" + tickets + "张票"); tickets--; if (tickets <= 0) { break; } } } catch (Exception e) { throw new RuntimeException(e); } finally { try { lock.release(); } catch (Exception e) { throw new RuntimeException(e); } } } }, "携程"); Thread t2 = new Thread(() -> { while (true) { try { boolean hasLock = lock.acquire(3, TimeUnit.SECONDS); if (hasLock && tickets > 0) { // 不断抢票 System.out.println("线程" + Thread.currentThread().getName() + "抢到了当前第" + tickets + "张票"); tickets--; if (tickets <= 0) { break; } } } catch (Exception e) { throw new RuntimeException(e); } finally { try { lock.release(); } catch (Exception e) { throw new RuntimeException(e); } } } }, "飞猪"); t1.start(); t2.start(); } }
Leader选举过程:
比如有三台服务器,编号分别是1,2,3。则编号越大在选择算法中的权重就越大
服务器中存放的数据ID越大,值越大说明更新的越频繁,则在选择算法中的权重就越大
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。