赞
踩
/*** 演示 Apache Curator API
* 1、增删查改
* 2、ACL 访问权限控制
* 3、注册 watch 事件的三个接口*/@RestControllerpublic classCuratorClientApiText {private Logger logger = LoggerFactory.getLogger(CuratorClientApiText.class);
@AutowiredprivateCuratorFramework zkClient;
@AutowiredprivateApacheZooKeeperProperties apacheZooKeeperProperties;private String userParentPath = "/user";private String userPersistent = "/user/persistent";private String userEphemeral = "/ephemeral";private String userPersistentSequential = "/user/persistent_sequential";private String userEphemeralSequential = "/ephemeral_sequential";private String result = "";/*** Curator API 是链式调用风格,遇到 forPath 接口就触发ZooKeeper 调用
*
* 将演示创建 ZooKeeper 四种数据模型
*
*@return
*/@GetMapping("/create/node")publicString createNode(){try{//添加 acl 用户
List aclList = new ArrayList<>();
aclList.add(new ACL(ZooDefs.Perms.ALL, newId(apacheZooKeeperProperties.getScheme(), DigestAuthenticationProvider.generateDigest(apacheZooKeeperProperties.getAuthId()))));/*** CuratorListener监听,此监听主要针对background通知和错误通知;
* 使用 watched() 只会观察一次,只对该节点本身 create、delete、setData 有效;
* 使用 inBackground() 会异步监听到返回信息,一旦使用该接口,就不会有返回值*/zkClient.getCuratorListenable().addListener(newCuratorListenerImpl());/*** NodeCache可以监听节点本身创建、删除,以及内容的变化,但对于子节点的变化不会监听*/NodeCache nodeCache= newNodeCache(zkClient,userParentPath);
NodeCacheListenerImpl nodeCacheListener= newNodeCacheListenerImpl();
nodeCacheListener.setNodeCache(nodeCache);
nodeCache.start(true); //设置为 true 把该节点数据存储到本地
nodeCache.getListenable().addListener(nodeCacheListener);/*** PathChildrenCache用于监听所有子节点的变化*/PathChildrenCache pathChildrenCache= new PathChildrenCache(zkClient, userParentPath, true);/*** StartMode: 初始化方式
* POST_INITIALIZED_EVENT : 异步初始化之后触发事件
* NORMAL:异步初始化
* BUILD_INITIAL_CACHE:同步初始化*/pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
pathChildrenCache.getListenable().addListener(newPathChildrenCacheListenerImpl());//注册 watch 事件
/*Stat stat = zkClient.checkExists()
.watched()
.forPath(userParentPath);
logger.info("/userParentPath 路径状态..." + stat);
stat = zkClient.checkExists()
.watched()
.forPath(userPersistent);
logger.info("/userPersistent 路径状态..." + stat);
stat = zkClient.checkExists()
.watched()
.forPath(userEphemeral);
logger.info("/userEphemeral 路径状态..." + stat);*/
//持久节点 creatingParentContainersIfNeeded() 接口自动递归创建所需节点的父节点
result =zkClient.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(aclList)
.forPath(userPersistent,"userPersistentData".getBytes());
Thread.sleep(1000 * 5);
logger.info("持久节点..." +result);//临时节点 不能有子节点
result =zkClient.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(userEphemeral,"userEphemeralData".getBytes());
Thread.sleep(1000 * 5);
logger.info("临时节点..." +result);//持久序列节点
result =zkClient.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT_SEQUENTIAL)
.forPath(userPersistentSequential,"userPersistentSequentialData".getBytes());
Thread.sleep(1000 * 5);
logger.info("持久有序节点..." +result);//临时序列节点
result =zkClient.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(userEphemeralSequential,"userEphemeralSequentialData".getBytes());
Thread.sleep(1000 * 5);
logger.info("临时有序节点..." +result);
}catch(Exception e){
logger.info("创建节点失败...");
e.printStackTrace();
}return "curator api 创建节点";
}
@GetMapping("/node/children")publicString nodeChildren(){try{
Stat stat=zkClient.checkExists().watched().forPath(userParentPath);
Thread.sleep(1000 * 5);
logger.info("判断节点是否存在..." +stat);
List userPaths =zkClient.getChildren().forPath(userParentPath);
Thread.sleep(1000 * 5);
logger.info("获取所有子节点..." +userPaths);
}catch(Exception e){
logger.info("失败...");
e.printStackTrace();
}return "curator api 获取所有子节点";
}
@GetMapping("/data/node")publicString dataNode(){try{//获取一个节点的内容
byte[] bytes =zkClient.getData().forPath(userPersistent);
Thread.sleep(1000 * 5);
logger.info("获取节点数据..." + newString(bytes));//修改一个节点的内容
Stat stat = zkClient.setData().forPath(userPersistent, "updateUserPersistentData".getBytes());
Thread.sleep(1000 * 5);
logger.info("修改节点数据..." +stat);
}catch(Exception e){
logger.info("失败...");
e.printStackTrace();
}return "curator api 获取、修改节点数据";
}
@GetMapping("/delete/node")publicString deleteNode(){try{//删除一个节点,强制指定版本进行删除
Stat stat = newStat();
zkClient.getData().storingStatIn(stat).forPath(userPersistent);
zkClient.delete().withVersion(stat.getVersion()).forPath(userPersistent);
Thread.sleep(1000 * 5);//删除一个节点,并且递归删除其所有子节点
zkClient.delete().deletingChildrenIfNeeded().forPath(userParentPath);
Thread.sleep(1000 * 5);
}catch(Exception e){
logger.info("删除节点失败...");
e.printStackTrace();
}return "curator api 删除节点";
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。