赞
踩
启动ZooKeeper服务
./zkServer.sh start
停止ZooKeeper服务
./zkServer.sh stop
查看ZooKeeper服务状态
./zkServer.sh status
连接ZooKeeper服务端
./zkCli.sh -server ip:port
断开连接
quit
显示指定目录下的节点
ls 目录
创建节点
create /节点path value
获取节点值
get /节点path # 使用get查看节点的状态信息和数据。
stat /节点path # 使用stat只返回节点的状态信息,没有数据。
设置节点值
set /节点path value
删除单个节点
delete /节点path
删除带有子节点的节点
deleteall /节点path
使用此命令注册的监听器能够在节点的数据发生改变的时候,向客户端发出通知。
get /节点path watch
使用此命令注册的监听器能够在节点的数据或者状态信息发生改变的时候,向客户端发出通知。
stat /节点path watch
能够监听该节点下所有子节点的增加和删除操作
ls/ls2 /节点path watch
创建临时节点
create -e /节点path value
创建顺序节点
create -s /节点path value
查询节点详细信息
ls -s /节点path
节点详细信息介绍
- czxid:节点被创建的事务 ID
- ctime:创建时间
- mzxid:最后一次被更新的事务 ID
- mtime:修改时间
- pzxid:子节点列表最后一次被更新的事务 ID
- cversion:子节点的版本号
- dataversion:数据版本号
- aclversion:权限版本号
- ephemeralOwner:用于临时节点,代表临时节点的事务 ID,如果为持久节点则为 0
- dataLength:节点存储的数据的长度
- numChildren:当前节点的子节点个数
1.概述
2.特性
3.scheme:权限模式
描述 | 方案 |
---|---|
world | 只有一个用户:anyone,代表登录 Zookeeper 的所有人(默认) |
ip | 对客户端使用 IP 地址认证 |
auth | 使用已添加认证的用户认证 |
digest | 使用 用户名 : 密码 方式认证 |
4.id:授权对象
5.permission:授予的权限
权限 | ACL 简写 | 描述 |
---|---|---|
create | c | 可以创建子节点 |
delete | d | 可以删除子节点(即下一级节点) |
read | r | 可以读取节点数据及显示子节点列表 |
write | w | 可以设置节点数据 |
admin | a | 可以设置节点访问控制列表权限 |
6.授权的相关命令
命令 | 使用方式 | 描述 |
---|---|---|
getAcl | getAcl | 读取ACL权限 |
setAcl | setAcl | 设置ACL权限 |
addauth | addauth | 添加认证用户 |
7.相关命令
world 授权模式
setAcl <path> world:anyone:<acl>
ip 授权模式
setAcl <path> ip:<ip>:<acl>
auth 授权模式
addauth digest <user>:<password> # 添加认证用户
setAcl <path> auth:<user>:<acl> # 授权
digest 授权模式
setAcl <path> digest:<user>:<password>:<acl>
<!-- curator -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
方式一(CuratorTest.java文件)
public class CuratorTest { private CuratorFramework client; @Test public void testConnect() { /* * @param connectString 连接字符串。zk server 地址和端口 "192.168.2.212:2181,192.168.2.212:2181" * @param sessionTimeoutMs 会话超时时间 单位ms * @param connectionTimeoutMs 连接超时时间 单位ms * @param retryPolicy 重试策略 */ // 重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); // 创建连接,构造客户端对象_第一种方式 client = CuratorFrameworkFactory.newClient("192.168.2.212:2181", 60 * 1000,15 * 1000, retryPolicy); // 开启连接 client.start(); } }
方式二(CuratorTest.java文件)
public class CuratorTest { private CuratorFramework client; @Test public void testConnect() { // 重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); // 创建连接构造客户端对象_第二种方式:链式编程的方式 client = CuratorFrameworkFactory.builder() .connectString("192.168.2.212:2181") .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000) .retryPolicy(retryPolicy) .build(); // 开启连接 client.start(); } }
CuratorTest.java文件
public class CuratorTest { private CuratorFramework client; @Before public void testConnect() { // 重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); // 创建连接构造客户端对象_第二种方式:链式编程的方式 client = CuratorFrameworkFactory.builder() .connectString("192.168.2.212:2181") .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000) .retryPolicy(retryPolicy) .build(); // 开启连接 client.start(); } /* 1.基本创建 */ @Test public void testCreate_1() throws Exception { // 如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储 String path = client.create().forPath("/app1"); } /* 2.创建节点,带有数据 */ @Test public void testCreate_2() throws Exception { String path = client.create().forPath("/app2", "hehe".getBytes()); } /* 3.设置节点的类型 */ @Test public void testCreate_3() throws Exception { // 默认类型:持久化。 String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3"); } /* 4.创建多级节点 /app1/p1 */ @Test public void testCreate_4() throws Exception { // creatingParentsIfNeeded():如果父节点不存在,则创建父节点 String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1"); } @After public void close() { if (client != null) { client.close(); } } }
CuratorTest.java文件
@Test public void testGet_1() throws Exception { //1.查询数据:get byte[] data = client.getData().forPath("/app1"); } @Test public void testGet_2() throws Exception { // 2.查询子节点: ls List<String> path = client.getChildren().forPath("/"); } @Test public void testGet_3() throws Exception { Stat status = new Stat(); //3.查询节点状态信息:ls -s client.getData().storingStatIn(status).forPath("/app1"); }
CuratorTest.java文件
/** * 修改数据 * 1. 基本修改数据:setData().forPath() * 2. 根据版本修改: setData().withVersion().forPath() * * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。 */ @Test public void testSet() throws Exception { client.setData().forPath("/app1", "itcast".getBytes()); } @Test public void testSetForVersion() throws Exception { Stat status = new Stat(); // 查询节点状态信息:ls -s client.getData().storingStatIn(status).forPath("/app1"); // 查询出来的版本信息 int version = status.getVersion(); client.setData().withVersion(version).forPath("/app1", "heheda".getBytes()); }
CuratorTest.java文件
/** * 删除节点 * 1. 删除单个节点:delete().forPath("/app1"); * 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1"); * 3. 必须成功的删除:为了防止网络抖动。本质就是重试。 client.delete().guaranteed().forPath("/app2"); * 4. 回调:inBackground * * @throws Exception */ @Test public void testDelete_1() throws Exception { // 1.删除单个节点 client.delete().forPath("/app1"); } @Test public void testDelete_2() throws Exception { // 2.删除带有子节点的节点 client.delete().deletingChildrenIfNeeded().forPath("/app4"); } @Test public void testDelete_3() throws Exception { // 3.必须成功的删除 client.delete().guaranteed().forPath("/app2"); } @Test public void testDelete4() throws Exception { // 4.回调 client.delete().guaranteed().inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println(event); } }).forPath("/app1"); }
Watch 事件监听
CuratorWatcherTest.java
public class CuratorWatcherTest { private CuratorFramework client; @Before public void testConnect() { // 重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); client = CuratorFrameworkFactory.builder() .connectString("192.168.2.212:2181") .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000) .retryPolicy(retryPolicy) .build(); // 开启连接 client.start(); } @After public void close() { if (client != null) { // 关闭连接 client.close(); } } /** * NodeCache给指定一个节点注册监听器 */ @Test public void testNodeCache() throws Exception { // 1.创建 NodeCache 对象 final NodeCache nodeCache = new NodeCache(client, "/app1"); // 2.注册监听 nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("节点变化了~"); // 获取修改节点后的数据 byte[] data = nodeCache.getCurrentData().getData(); System.out.println("新数据:" + new String(data)); } }); // 3.开启监听 nodeCache.start(true); while (true) { } } }
CuratorWatcherTest.java
/** * PathChildrenCache监听某个节点的所有子节点 */ @Test public void testPathChildrenCache() throws Exception { // 1.创建监听对象 PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true); // 2. 绑定监听器 pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { System.out.println("子节点变化了~"); System.out.println(event); // 监听子节点的数据变更,并且拿到变更后的数据。 // 1.获取类型 PathChildrenCacheEvent.Type type = event.getType(); // 2.判断类型是否是update if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { System.out.println("数据变了!!!"); byte[] data = event.getData().getData(); System.out.println(new String(data)); } } }); // 3. 开启监听 pathChildrenCache.start(); while (true) { } }
CuratorWatcherTest.java
/** * TreeCache监听某个节点自己和所有子节点 */ @Test public void testTreeCache() throws Exception { // 1. 创建监听器 TreeCache treeCache = new TreeCache(client, "/app2"); // 2. 注册监听 treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { System.out.println("节点变化了"); System.out.println(event); } }); // 3. 开启监听 treeCache.start(); while (true) { } }
分布式锁
ZooKeeper分布式锁原理
Ticket12306.java
// 以多线程的方式模拟卖票。 public class Ticket12306 implements Runnable{ private int tickets = 10; // 数据库的票数 private InterProcessMutex lock ; // 分布式可重入排它锁 public Ticket12306() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("192.168.2.212:2181") .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000) .retryPolicy(retryPolicy) .build(); client.start(); // 创建锁对象 lock = new InterProcessMutex(client,"/lock"); } @Override public void run() { while(true){ // 获取锁 try { lock.acquire(3, TimeUnit.SECONDS); if(tickets > 0){ System.out.println(Thread.currentThread() + ":" + tickets); Thread.sleep(100); tickets--; } } catch (Exception e) { e.printStackTrace(); }finally { // 释放锁 try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } } }
LockTest.java
public class LockTest {
public static void main(String[] args) {
Ticket12306 ticket12306 = new Ticket12306();
// 创建客户端
Thread t1 = new Thread(ticket12306,"携程");
Thread t2 = new Thread(ticket12306,"飞猪");
t1.start();
t2.start();
}
}
在每个 zookeeper 的 data 目录下创建一个 myid 文件,内容分别是1、2、3 。这个文件就是记录每个服务器的 ID。
echo 1 >/usr/local/zookeeper-cluster/zookeeper-1/data/myid
echo 2 >/usr/local/zookeeper-cluster/zookeeper-2/data/myid
echo 3 >/usr/local/zookeeper-cluster/zookeeper-3/data/myid
在每一个 zookeeper 的 zoo.cfg 配置客户端访问端口(clientPort)和集群服务器 IP 列表
vim /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
vim /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg
vim /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg
集群服务器IP列表如下
server.1=192.168.2.212:2881:3881
server.2=192.168.2.212:2882:3882
server.3=192.168.2.212:2883:3883
解释:server.服务器ID = 服务器IP地址 : 服务器之间通信端口 : 服务器之间投票选举端口
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。