当前位置:   article > 正文

apache zookeeper java_springBoot 整合 ZooKeeper Java客户端之 Apache Curator 实战

springboot zookeeper apache curator

/*** 演示 Apache Curator API

* 1、增删查改

* 2、ACL 访问权限控制

* 3、注册 watch 事件的三个接口*/@RestControllerpublic classCuratorClientApiText {private Logger logger = LoggerFactory.getLogger(CuratorClientApiText.class);

@AutowiredprivateCuratorFramework zkClient;

@AutowiredprivateApacheZooKeeperProperties apacheZooKeeperProperties;private String userParentPath = "/user";private String userPersistent = "/user/persistent";private String userEphemeral = "/ephemeral";private String userPersistentSequential = "/user/persistent_sequential";private String userEphemeralSequential = "/ephemeral_sequential";private String result = "";/*** Curator API 是链式调用风格,遇到 forPath 接口就触发ZooKeeper 调用

*

* 将演示创建 ZooKeeper 四种数据模型

*

*@return

*/@GetMapping("/create/node")publicString createNode(){try{//添加 acl 用户

List aclList = new ArrayList<>();

aclList.add(new ACL(ZooDefs.Perms.ALL, newId(apacheZooKeeperProperties.getScheme(), DigestAuthenticationProvider.generateDigest(apacheZooKeeperProperties.getAuthId()))));/*** CuratorListener监听,此监听主要针对background通知和错误通知;

* 使用 watched() 只会观察一次,只对该节点本身 create、delete、setData 有效;

* 使用 inBackground() 会异步监听到返回信息,一旦使用该接口,就不会有返回值*/zkClient.getCuratorListenable().addListener(newCuratorListenerImpl());/*** NodeCache可以监听节点本身创建、删除,以及内容的变化,但对于子节点的变化不会监听*/NodeCache nodeCache= newNodeCache(zkClient,userParentPath);

NodeCacheListenerImpl nodeCacheListener= newNodeCacheListenerImpl();

nodeCacheListener.setNodeCache(nodeCache);

nodeCache.start(true); //设置为 true 把该节点数据存储到本地

nodeCache.getListenable().addListener(nodeCacheListener);/*** PathChildrenCache用于监听所有子节点的变化*/PathChildrenCache pathChildrenCache= new PathChildrenCache(zkClient, userParentPath, true);/*** StartMode: 初始化方式

* POST_INITIALIZED_EVENT : 异步初始化之后触发事件

* NORMAL:异步初始化

* BUILD_INITIAL_CACHE:同步初始化*/pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

pathChildrenCache.getListenable().addListener(newPathChildrenCacheListenerImpl());//注册 watch 事件

/*Stat stat = zkClient.checkExists()

.watched()

.forPath(userParentPath);

logger.info("/userParentPath 路径状态..." + stat);

stat = zkClient.checkExists()

.watched()

.forPath(userPersistent);

logger.info("/userPersistent 路径状态..." + stat);

stat = zkClient.checkExists()

.watched()

.forPath(userEphemeral);

logger.info("/userEphemeral 路径状态..." + stat);*/

//持久节点 creatingParentContainersIfNeeded() 接口自动递归创建所需节点的父节点

result =zkClient.create()

.creatingParentContainersIfNeeded()

.withMode(CreateMode.PERSISTENT)

.withACL(aclList)

.forPath(userPersistent,"userPersistentData".getBytes());

Thread.sleep(1000 * 5);

logger.info("持久节点..." +result);//临时节点 不能有子节点

result =zkClient.create()

.creatingParentContainersIfNeeded()

.withMode(CreateMode.EPHEMERAL)

.forPath(userEphemeral,"userEphemeralData".getBytes());

Thread.sleep(1000 * 5);

logger.info("临时节点..." +result);//持久序列节点

result =zkClient.create()

.creatingParentContainersIfNeeded()

.withMode(CreateMode.PERSISTENT_SEQUENTIAL)

.forPath(userPersistentSequential,"userPersistentSequentialData".getBytes());

Thread.sleep(1000 * 5);

logger.info("持久有序节点..." +result);//临时序列节点

result =zkClient.create()

.creatingParentContainersIfNeeded()

.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)

.forPath(userEphemeralSequential,"userEphemeralSequentialData".getBytes());

Thread.sleep(1000 * 5);

logger.info("临时有序节点..." +result);

}catch(Exception e){

logger.info("创建节点失败...");

e.printStackTrace();

}return "curator api 创建节点";

}

@GetMapping("/node/children")publicString nodeChildren(){try{

Stat stat=zkClient.checkExists().watched().forPath(userParentPath);

Thread.sleep(1000 * 5);

logger.info("判断节点是否存在..." +stat);

List userPaths =zkClient.getChildren().forPath(userParentPath);

Thread.sleep(1000 * 5);

logger.info("获取所有子节点..." +userPaths);

}catch(Exception e){

logger.info("失败...");

e.printStackTrace();

}return "curator api 获取所有子节点";

}

@GetMapping("/data/node")publicString dataNode(){try{//获取一个节点的内容

byte[] bytes =zkClient.getData().forPath(userPersistent);

Thread.sleep(1000 * 5);

logger.info("获取节点数据..." + newString(bytes));//修改一个节点的内容

Stat stat = zkClient.setData().forPath(userPersistent, "updateUserPersistentData".getBytes());

Thread.sleep(1000 * 5);

logger.info("修改节点数据..." +stat);

}catch(Exception e){

logger.info("失败...");

e.printStackTrace();

}return "curator api 获取、修改节点数据";

}

@GetMapping("/delete/node")publicString deleteNode(){try{//删除一个节点,强制指定版本进行删除

Stat stat = newStat();

zkClient.getData().storingStatIn(stat).forPath(userPersistent);

zkClient.delete().withVersion(stat.getVersion()).forPath(userPersistent);

Thread.sleep(1000 * 5);//删除一个节点,并且递归删除其所有子节点

zkClient.delete().deletingChildrenIfNeeded().forPath(userParentPath);

Thread.sleep(1000 * 5);

}catch(Exception e){

logger.info("删除节点失败...");

e.printStackTrace();

}return "curator api 删除节点";

}

}

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/53969
推荐阅读
相关标签
  

闽ICP备14008679号