赞
踩
curator是Netflix公司开源的一个 zookeeper客户端,后捐献给 apache,curator框架在zookeeper原生API接口上进行了包装,解决了很多zooKeeper客户端非常底层的细节开发。提供zooKeeper各种应用场景(比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等的抽象封装,实现了Fluent风格的APl接口,是最好用,最流行的zookeeper的客户端
原生zookeeperAPI的不足
curator特点
session会话超时重连watcher反复注册apiFluent风格API<!-- zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.10</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.1</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.1</version> </dependency>
@Configuration @ConfigurationProperties(prefix = "zookeeper.curator") @Data public class ZookeeperConfig { /** * 集群地址 */ private String ip; /** * 连接超时时间 */ private Integer connectionTimeoutMs; /** * 会话超时时间 */ private Integer sessionTimeOut; /** * 重试机制时间参数 */ private Integer sleepMsBetweenRetry; /** * 重试机制重试次数 */ private Integer maxRetries; /** * 命名空间(父节点名称) */ private String namespace; /** * - `session`重连策略 - `RetryPolicy retry Policy = new RetryOneTime(3000);` - 说明:三秒后重连一次,只重连一次 - `RetryPolicy retryPolicy = new RetryNTimes(3,3000);` - 说明:每三秒重连一次,重连三次 - `RetryPolicy retryPolicy = new RetryUntilElapsed(1000,3000);` - 说明:每三秒重连一次,总等待时间超过个`10`秒后停止重连 - `RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3)` - 说明:这个策略的重试间隔会越来越长 - 公式:`baseSleepTImeMs * Math.max(1,random.nextInt(1 << (retryCount + 1)))` - `baseSleepTimeMs` = `1000` 例子中的值 - `maxRetries` = `3` 例子中的值 * @return * @throws Exception */ @Bean("curatorClient") public CuratorFramework curatorClient() throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder() //连接地址 集群用,隔开 .connectString(ip) .connectionTimeoutMs(connectionTimeoutMs) //会话超时时间 .sessionTimeoutMs(sessionTimeOut) //设置重试机制 .retryPolicy(new ExponentialBackoffRetry(sleepMsBetweenRetry,maxRetries)) //设置命名空间 在操作节点的时候,会以这个为父节点 .namespace(namespace) .build(); client.start(); //注册监听器 ZookeeperWatches watches = new ZookeeperWatches(client); watches.znodeWatcher(); watches.znodeChildrenWatcher(); return client; }
zookeeper:
curator:
ip: 192.168.213.138:2181
sessionTimeOut: 50000
sleepMsBetweenRetry: 1000
maxRetries: 3
namespace: demo
connectionTimeoutMs: 50000
public class ZookeeperWatches { private CuratorFramework client; public ZookeeperWatches(CuratorFramework client) { this.client = client; } public void znodeWatcher() throws Exception { NodeCache nodeCache = new NodeCache(client, "/node"); nodeCache.start(); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("=======节点改变==========="); String path = nodeCache.getPath(); String currentDataPath = nodeCache.getCurrentData().getPath(); String currentData = new String(nodeCache.getCurrentData().getData()); Stat stat = nodeCache.getCurrentData().getStat(); System.out.println("path:"+path); System.out.println("currentDataPath:"+currentDataPath); System.out.println("currentData:"+currentData); } }); System.out.println("节点监听注册完成"); } public void znodeChildrenWatcher() throws Exception { PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/node",true); pathChildrenCache.start(); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { System.out.println("=======节点子节点改变==========="); Type type = event.getType(); String childrenData = new String(event.getData().getData()); String childrenPath = event.getData().getPath(); Stat childrenStat = event.getData().getStat(); System.out.println("子节点监听类型:"+type); System.out.println("子节点路径:"+childrenPath); System.out.println("子节点数据:"+childrenData); System.out.println("子节点元数据:"+childrenStat); } }); System.out.println("子节点监听注册完成"); } }
@RestController @RequestMapping(value = "/zookeeper") @Api(tags = {"zookeeper的测试类"}) public class ZookeeperController { @Resource(name = "curatorClient") private CuratorFramework client; @Value("${zookeeper.curator.namespace}") String namespace; @RequestMapping("/createZnode") @ApiOperation("zookeeper测试---递归创建节点") @ApiOperationSupport(author = "zxh",order = 1) @ApiImplicitParams({ @ApiImplicitParam(name = "path", value = "路径", required = true, paramType = "query"), @ApiImplicitParam(name = "data", value = "值", required = false, paramType = "query"), }) public String createZnode(String path,@RequestParam(defaultValue = "")String data){ path = "/"+path; List<ACL> aclList = new ArrayList<>(); Id id = new Id("world", "anyone"); aclList.add(new ACL(ZooDefs.Perms.ALL, id)); try { client.create() .creatingParentsIfNeeded() //没有父节点时 创建父节点 .withMode(CreateMode.PERSISTENT) //节点类型 .withACL(aclList) //配置权限 .forPath(path, data.getBytes()); } catch (Exception e) { e.printStackTrace(); return "节点创建失败"+e.getMessage(); } return "节点创建成功"; } @RequestMapping("/createAsyncZnode") @ApiOperation("zookeeper测试---异步递归创建节点") @ApiOperationSupport(author = "zxh",order = 2) @ApiImplicitParams({ @ApiImplicitParam(name = "path", value = "路径", required = true, paramType = "query"), @ApiImplicitParam(name = "data", value = "值", required = false, paramType = "query"), }) public String createAsyncZnode(String path,@RequestParam(defaultValue = "")String data){ String paths = "/"+path; try { client.create() .creatingParentsIfNeeded() .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //异步回调 增删改都有异步方法 .inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println("异步回调--获取权限:"+client.getACL().forPath(paths)); System.out.println("异步回调--获取数据:"+new String(client.getData().forPath(paths))); System.out.println("异步回调--获取事件名称:"+event.getName()); System.out.println("异步回调--获取事件类型:"+event.getType()); } }) .forPath(paths, data.getBytes()); } catch (Exception e) { e.printStackTrace(); return "节点创建失败"+e.getMessage(); } return "节点创建成功"; } @RequestMapping("/selectZnode") @ApiOperation("zookeeper测试---查看节点和元数据") @ApiOperationSupport(author = "zxh",order = 3) @ApiImplicitParams({ @ApiImplicitParam(name = "path", value = "路径", required = true, paramType = "query"), }) public JSONObject selectZnode(String path){ JSONObject jsonObject = new JSONObject(); String namespace = "/"+this.namespace; Stat stat; try { stat = client.checkExists().forPath(path); if (stat == null) { jsonObject.put("error", "不存在该节点"); } String dataString = new String(client.getData().forPath(path)); jsonObject.put(namespace+path, dataString); jsonObject.put("stat", stat); } catch (Exception e) { e.printStackTrace(); } return jsonObject; } @RequestMapping("/selectChildrenZnode") @ApiOperation("zookeeper测试---查看子节点和数据") @ApiOperationSupport(author = "zxh",order = 4) @ApiImplicitParams({ @ApiImplicitParam(name = "path", value = "路径", required = true, paramType = "query"), }) public Map<String,String> selectChildrenZnode(String path){ Map<String, String> map = new HashMap<>(); String namespace = "/"+this.namespace; try { List<String> list = client.getChildren().forPath(path); for (String s : list) { String dataString = new String(client.getData().forPath(path+"/"+s)); map.put(namespace+path+"/"+s, dataString); } } catch (Exception e) { e.printStackTrace(); } return map; } @RequestMapping("/setData") @ApiOperation("zookeeper测试---设置数据") @ApiOperationSupport(author = "zxh",order = 5) @ApiImplicitParams({ @ApiImplicitParam(name = "path", value = "路径", required = true, paramType = "query"), @ApiImplicitParam(name = "data", value = "数据", required = true, paramType = "query"), @ApiImplicitParam(name = "version", value = "版本号(-1时 版本号不起作用)", required = true, paramType = "query"), }) public JSONObject setData(String path,String data,Integer version) { JSONObject jsonObject = new JSONObject(); try { Stat stat = client.setData().withVersion(version).forPath(path, data.getBytes()); jsonObject.put("success", "修改成功"); jsonObject.put("version", stat.getVersion()); } catch (Exception e) { e.printStackTrace(); jsonObject.put("error", "修改失败:"+e.getMessage()); return jsonObject; } return jsonObject; } @RequestMapping("/delete") @ApiOperation("zookeeper测试---删除节点") @ApiOperationSupport(author = "zxh",order = 6) @ApiImplicitParams({ @ApiImplicitParam(name = "path", value = "路径", required = true, paramType = "query"), @ApiImplicitParam(name = "version", value = "版本号(-1时 版本号不起作用)", required = true, paramType = "query"), @ApiImplicitParam(name = "isRecursive", value = "是否递归删除 1是 0否 默认为0", required = false, paramType = "query"), }) public JSONObject delete(String path,Integer version,@RequestParam(defaultValue = "0")Integer isRecursive) { JSONObject jsonObject = new JSONObject(); try { if (isRecursive == 1) { client.delete().deletingChildrenIfNeeded().withVersion(version).forPath(path); }else { client.delete().withVersion(version).forPath(path); } jsonObject.put("success", "删除成功"); } catch (Exception e) { e.printStackTrace(); jsonObject.put("error", "删除失败:"+e.getMessage()); return jsonObject; } return jsonObject; } @SuppressWarnings("finally") @RequestMapping("/transactionDisabled") @ApiOperation("zookeeper测试---测试事务(不开启事务)") @ApiOperationSupport(author = "zxh",order = 7) @ApiImplicitParams({ @ApiImplicitParam(name = "createPath", value = "创建的路径", required = true, paramType = "query"), @ApiImplicitParam(name = "createData", value = "创建的数据", required = true, paramType = "query"), @ApiImplicitParam(name = "setPath", value = "修改数据的路径", required = true, paramType = "query"), @ApiImplicitParam(name = "setData", value = "修改的数据", required = true, paramType = "query"), }) public String transactionDisabled(String createPath,String createData,String setPath,String setData) { try { //创建一个新的路径 client.create().withMode(CreateMode.PERSISTENT).forPath(createPath,createData.getBytes()); //修改一个没有的数据 让其报错 client.setData().forPath(setPath, setData.getBytes()); } catch (Exception e) { e.printStackTrace(); }finally { return "执行完成"; } } @SuppressWarnings({ "deprecation", "finally" }) @RequestMapping("/transactionEnabled") @ApiOperation("zookeeper测试---测试事务(开启事务)") @ApiOperationSupport(author = "zxh",order = 8) @ApiImplicitParams({ @ApiImplicitParam(name = "createPath", value = "创建的路径", required = true, paramType = "query"), @ApiImplicitParam(name = "createData", value = "创建的数据", required = true, paramType = "query"), @ApiImplicitParam(name = "setPath", value = "修改数据的路径", required = true, paramType = "query"), @ApiImplicitParam(name = "setData", value = "修改的数据", required = true, paramType = "query"), }) public String transactionEnabled(String createPath,String createData,String setPath,String setData) { try { /** * 这里有个坑点 使用 CuratorFramework 进行事务处理时,如果使用org.apache.zookeeper 的依赖版本是 3.6.x时 * 会出现找不到 MultiTransactionRecord 类的异常 * 在 3.6.x 版本 没有 MultiTransactionRecord 但是在3.4.10版本有这个类 不知道什么删除了 * 而 curator-framework 的 事务处理用到 CuratorMultiTransactionRecord 这个 类 * 但是 CuratorMultiTransactionRecord 继承了 MultiTransactionRecord 这个类 就出现了类找不到的异常 * * 解决办法 :要么降低zookeeper 的版本 为3.4.10 要么使用zookeeper原生事务代码 *我这里降低了zookeeper的版本 */ //该方法后续版本建议删除 // client.inTransaction() // .create().withMode(CreateMode.PERSISTENT).forPath(createPath,createData.getBytes()) // .and() // .setData().forPath(setPath, setData.getBytes()) // .and().commit(); //上述代码 替换成 以下代码 CuratorOp create = client.transactionOp().create().withMode(CreateMode.PERSISTENT).forPath(createPath,createData.getBytes()); CuratorOp setOp = client.transactionOp().setData().forPath(setPath, setData.getBytes()); //该方法有返回值 可以打印结果查看 一般不需要 client.transaction().forOperations(Arrays.asList(create,setOp)); } catch (Exception e) { e.printStackTrace(); }finally { return "执行完成"; } } @RequestMapping("/InterProcessMutexUse") @ApiOperation("zookeeper测试---测试可重入排它锁") @ApiOperationSupport(author = "zxh",order = 8) public String InterProcessMutexUse() throws Exception{ System.out.println("排它锁测试"); InterProcessMutex lock = new InterProcessMutex(client, "/lock"); System.out.println("占有锁中"); lock.acquire(20L, TimeUnit.SECONDS); System.out.println("执行操作中"); for (int i = 0; i < 20; i++) { TimeUnit.SECONDS.sleep(1); System.out.println(i); } lock.release(); return "锁已释放"; } @RequestMapping("/interProcessReadWriteLockUseWrite") @ApiOperation("zookeeper测试---测试读写锁--写锁") @ApiOperationSupport(author = "zxh",order = 9) public String interProcessReadWriteLockUseWrite() throws Exception { System.out.println("写锁"); // 分布式读写锁 InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock"); // 开启两个进程测试,观察到写写互斥,特性同排它锁 System.out.println("获取锁中"); lock.writeLock().acquire(); System.out.println("操作中"); for (int i = 0; i < 10; i++) { TimeUnit.SECONDS.sleep(1); System.out.println(i); } lock.writeLock().release(); return "释放写锁"; } @RequestMapping("/interProcessReadWriteLockUseRead") @ApiOperation("zookeeper测试---测试读写锁--读锁") @ApiOperationSupport(author = "zxh",order = 10) public String interProcessReadWriteLockUseRead() throws Exception { System.out.println("读锁"); // 分布式读写锁 InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock"); // 开启两个进程测试,观察得到读读共享,两个进程并发进行,注意并发和并行是两个概念,(并发是线程启动时间段不一定一致,并行是时间轴一致的) // 再测试两个进程,一个读,一个写,也会出现互斥现象 System.out.println("获取锁中"); lock.readLock().acquire(); System.out.println("操作中"); for (int i = 0; i < 10; i++) { TimeUnit.SECONDS.sleep(1); System.out.println(i); } lock.readLock().release(); return "释放读锁"; } }
1、到zookeeper的bin目录下,输入命令./zkCli.sh
2、输入命令 ls /(节点名称) 即可查看想查看的节点信息
查看初始节点 ls /
新增:

查看:

服务器zookeeper节点:

ps:为什么会有demo节点
yml配置文件中配置了`namespace`,对应配置类中的`.namespace(namespace)` @Bean("curatorClient") public CuratorFramework curatorClient() throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder() //连接地址 集群用,隔开 .connectString(ip) .connectionTimeoutMs(connectionTimeoutMs) //会话超时时间 .sessionTimeoutMs(sessionTimeOut) //设置重试机制 .retryPolicy(new ExponentialBackoffRetry(sleepMsBetweenRetry,maxRetries)) //设置命名空间 在操作节点的时候,会以这个为父节点 .namespace(namespace) .build(); client.start();

查看zookeeper上的数据 原来是小张 现在变成了小小



test子节点删除了
zookeeper初始状态 /demo 下 只有一个node节点

执行不开启事务的方法:

系统报错:
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /demo/dddddddd
再次查看zookeeper: 有transaction节点

将transaction节点删掉 测试开启事务
删除节点:delete /demo/transaction

开启事务方法:

系统报错:
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode
再次查看zookeeper: 没有transaction节点

监听代码在 ZookeeperWatches 类
在配置类开启监听具体代码查看ZookeeperConfig 配置类
项目启动中会出现如下信息 如果zookeeper没有node节点 会自动生成node节点并触发监听回调代码

对node节点创建子节点、修改子节点、删除子节点 会触发子节点的监听回调
create /demo/node/test aa

zookeeper中如何实现排他锁
/Locks下创建临时有序节点/Locks/Lock_,创建成功后/Locks下面会有每个客户端对应的节点,如/Locks/Lock_000000001/Locks下子节点,并进行排序,判断排在前面的是否为自己,如果自己的锁节点在第一位,代表获取锁成功Lock_000000002,那么则监听Lock_000000001(Lock_000000001)对应的客户端执行完成,释放了锁,将会触发监听客户端(Lock_000000002)的逻辑2步逻辑,判断自己是否获得了锁工作中有这样的一个场景:数据库用户名和密码信息放在一个配置文件中,应用读取该配置文件,配置文件信息放入缓存
若数据库的用户名和密码改变时候,还需要重新加载媛存,比较麻烦,通过 Zookeeper可以轻松完成,当数据库发生变化时自动完成缓存同步
使用事件监听机制可以做出一个简单的配置中心
设计思路
zookeeper 服务器zookeeper中的配置信息,注册watcher监听器,存入本地变量zookeeper中的配置信息发生变化时,通过watcher的回调方法捕获数据变化事件在过去的单库单表型系统中,通常可以使用数据库字段自带的auto_ increment属性来自动为每条记录生成个唯一的ID。但是分库分表后,就无法再依靠数据库的auto_ increment属性来唯一标识一条记录了。此时我们就可以用zookeeper在分布式环境下生成全局唯一ID,通过zookeeper创建持久顺序节点,返回该节点序号,即为新id,然后将比自己小的节点删除。
@RequestMapping("/getZookeeperId") @ApiOperation("zookeeper测试---分布式唯一id") @ApiOperationSupport(author = "zxh",order = 12) public String getZookeeperId(){ TreeSet<String> sortNode = new TreeSet<>(); //唯一id String maxId = ""; try { client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/seq/id-"); List<String> forPath = client.getChildren().forPath("/seq"); forPath.forEach(s->{ String id = s.split("-")[1]; sortNode.add(id); }); String minId = sortNode.first(); client.delete().forPath("/seq/id-"+minId); maxId= sortNode.last(); } catch (Exception e) { e.printStackTrace(); return "分布式id获取失败"+e.getMessage(); } return maxId; }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。