赞
踩
Zookeeper是一个Apache开源的分布式的应用,为系统架构提供协调服务。从设计模式角度来审视:该组件是一个基于观察者模式设计的框架,负责存储和管理数据,接受观察者的注册,一旦数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的观察者做出相应的反应,从而实现集群中类似Master/Slave管理模式。ZooKeeper的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。
Spring Boot 2.6.3 、Zookeeper3.4.6 、 JDK1.8
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- <version>2.12.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>2.12.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-client</artifactId>
- <version>2.12.0</version>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <version>1.18.4</version>
- </dependency>

- zoo:
- keeper:
- #开启标志
- enabled: true
- #服务器地址
- server: 192.168.116.100:2181
- #命名空间,被称为ZNode
- namespace: lx
- #权限控制,加密
- digest: smile:111111
- #会话超时时间
- sessionTimeoutMs: 3000
- #连接超时时间
- connectionTimeoutMs: 60000
- #最大重试次数
- maxRetries: 10
- #初始休眠时间
- baseSleepTimeMs: 1000

- package com.example.zk.config;
-
- import lombok.Data;
- import org.springframework.boot.context.properties.ConfigurationProperties;
- import org.springframework.stereotype.Component;
-
- /**
- * @author lanx
- * @date 2022/3/6
- */
- @Data
- @Component
- @ConfigurationProperties(prefix = "zoo.keeper")
- public class ZookeeperProperties {
- private String enabled;
- private String server;
- private String namespace;
- private String digest;
- private int sessionTimeoutMs;
- private int connectionTimeoutMs;
- private int maxRetries;
- private int baseSleepTimeMs;
-
- }

- package com.example.zk.config;
-
- import lombok.extern.slf4j.Slf4j;
- import org.apache.curator.RetryPolicy;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.retry.ExponentialBackoffRetry;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Configuration;
-
- import javax.annotation.PostConstruct;
-
- /**
- * 配置
- * @author lanx
- * @date 2022/3/6
- */
- @Slf4j
- @Configuration
- public class ZookeeperConfig {
- @Autowired
- private ZookeeperProperties zookeeperProperties ;
-
- private static CuratorFramework client = null ;
- /**
- * 初始化
- */
- @PostConstruct
- public void init (){
- //重试策略,初试时间1秒,重试10次
- RetryPolicy policy = new ExponentialBackoffRetry(
- zookeeperProperties.getBaseSleepTimeMs(),
- zookeeperProperties.getMaxRetries());
- //通过工厂创建Curator
- client = CuratorFrameworkFactory.builder()
- .connectString(zookeeperProperties.getServer())
- .authorization("digest",zookeeperProperties.getDigest().getBytes())
- .connectionTimeoutMs(zookeeperProperties.getConnectionTimeoutMs())
- .sessionTimeoutMs(zookeeperProperties.getSessionTimeoutMs())
- .retryPolicy(policy).build();
- //开启连接
- client.start();
- log.info("zookeeper 初始化完成...");
- }
- public static CuratorFramework getClient (){
- return client ;
- }
- public static void closeClient (){
- if (client != null){
- client.close();
- }
- }
- }

- package com.example.zk.config;
-
- import lombok.extern.slf4j.Slf4j;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.recipes.cache.*;
- import org.springframework.boot.ApplicationArguments;
- import org.springframework.boot.ApplicationRunner;
- import org.springframework.stereotype.Component;
-
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
-
- /**
- * 节点数据监听
- * @author lanx
- * @date 2022/3/6
- */
- @Component
- @Slf4j
- public class ZookListening implements ApplicationRunner {
-
- @Override
- public void run(ApplicationArguments args) throws Exception {
- /**
- * 在注册监听器的时候,如果传入此参数,当事件触发时,逻辑由线程池处理
- */
- ExecutorService pool = Executors.newFixedThreadPool(2);
- /**
- * 监听数据节点的变化情况
- */
- final NodeCache nodeCache = new NodeCache(ZookeeperConfig.getClient(), "/lanxi", false);
- nodeCache.start(true);
- nodeCache.getListenable().addListener(
- /**
- * 此方法只监听创建节点和更新节点,在删除节点是不触发此操作
- */
- new NodeCacheListener() {
- @Override
- public void nodeChanged() throws Exception {
- log.info("Node data is changed, new data: {}" ,
- new String(nodeCache.getCurrentData().getData()));
- }
- },
- pool
- );
-
-
- /**
- * 监听子节点的变化情况
- *
- * 建立一个PathChildrenCache的缓存,第三个参数为是否接收节点数据内容,如果为 false 则不接受
- */
- final PathChildrenCache childrenCache = new PathChildrenCache(ZookeeperConfig.getClient(), "/llx", true);
- //在初始时就进行缓存监听
- childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
- childrenCache.getListenable().addListener(
- new PathChildrenCacheListener() {
- /**
- * 监听节点的变更。 新建、修改、删除
- * @param client
- * @param event
- * @throws Exception
- */
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
- throws Exception {
- switch (event.getType()) {
- case CHILD_ADDED:
- System.out.println("CHILD_ADDED: " + event.getData().getPath());
- break;
- case CHILD_REMOVED:
- System.out.println("CHILD_REMOVED: " + event.getData().getPath());
- break;
- case CHILD_UPDATED:
- System.out.println("CHILD_UPDATED: " + event.getData().getPath());
- break;
- default:
- break;
- }
- }
- },
- pool
- );
-
- }
- }

- //curator框架,使用 DistributedAtomicInteger 作为分布式计数器
- DistributedAtomicInteger atomicInteger =
- new DistributedAtomicInteger(ZookeeperConfig.getClient(), "/llx", new RetryNTimes(3, 1000));
- //复位
- atomicInteger.forceSet(0);
- AtomicValue<Integer> value = atomicInteger.add(1);
- log.info("Boolean:{}", value.succeeded());
- log.info("最新值:{}", value.postValue());
- log.info("原始值:{}", value.preValue());
- for (int i = 0; i < 10; i++) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- //分布式锁
- final InterProcessMutex lock = new InterProcessMutex(ZookeeperConfig.getClient(), "/llx");
- try {
- //获取锁
- lock.acquire();
- log.info("线程:{},执行业务逻辑", Thread.currentThread().getName());
- Thread.sleep(1000*10);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- //释放锁
- lock.release();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }).start();
- }

- package com.example.zk.service;
-
- import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
- import org.apache.zookeeper.CreateMode;
-
- import java.util.List;
-
- public interface ZookeeperService {
- /**
- * 判断节点是否存在
- */
- boolean isExistNode (final String path) ;
- /**
- * 创建节点
- */
- void createNode (CreateMode mode, String path ) ;
- /**
- * 设置节点数据
- */
- void setNodeData (String path, String nodeData) ;
- /**
- * 创建节点
- */
- void createNodeAndData (CreateMode mode, String path , String nodeData) ;
- /**
- * 获取节点数据
- */
- String getNodeData (String path) ;
- /**
- * 获取节点下数据
- */
- List<String> getNodeChild (String path) ;
- /**
- * 是否递归删除节点
- */
- void deleteNode (String path,Boolean recursive) ;
- /**
- * 获取读写锁
- */
- InterProcessReadWriteLock getReadWriteLock (String path) ;
- }

- package com.example.zk.service.impl;
-
- import com.example.zk.config.ZookeeperConfig;
- import com.example.zk.service.ZookeeperService;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
- import org.apache.zookeeper.CreateMode;
- import org.apache.zookeeper.data.Stat;
- import org.springframework.stereotype.Service;
- import org.springframework.util.StringUtils;
-
- import java.util.ArrayList;
- import java.util.List;
-
- @Slf4j
- @Service
- public class ZookeeperServiceImpl implements ZookeeperService {
- @Override
- public boolean isExistNode(String path) {
- CuratorFramework client = ZookeeperConfig.getClient();
- client.sync() ;
- try {
- Stat stat = client.checkExists().forPath(path);
- return client.checkExists().forPath(path) != null;
- } catch (Exception e) {
- log.error("isExistNode error...", e);
- e.printStackTrace();
- }
- return false;
- }
- @Override
- public void createNode(CreateMode mode, String path) {
- CuratorFramework client = ZookeeperConfig.getClient() ;
- try {
- // 递归创建所需父节点
- client.create().creatingParentsIfNeeded().withMode(mode).forPath(path);
- } catch (Exception e) {
- log.error("createNode error...", e);
- e.printStackTrace();
- }
- }
- @Override
- public void setNodeData(String path, String nodeData) {
- CuratorFramework client = ZookeeperConfig.getClient() ;
- try {
- // 设置节点数据
- client.setData().forPath(path, nodeData.getBytes("UTF-8"));
- } catch (Exception e) {
- log.error("setNodeData error...", e);
- e.printStackTrace();
- }
- }
- @Override
- public void createNodeAndData(CreateMode mode, String path, String nodeData) {
- CuratorFramework client = ZookeeperConfig.getClient() ;
- try {
- // 创建节点,关联数据
- client.create().creatingParentsIfNeeded().withMode(mode)
- .forPath(path,nodeData.getBytes("UTF-8"));
- } catch (Exception e) {
- log.error("createNode error...", e);
- e.printStackTrace();
- }
- }
- @Override
- public String getNodeData(String path) {
- CuratorFramework client = ZookeeperConfig.getClient() ;
- try {
- // 数据读取和转换
- byte[] dataByte = client.getData().forPath(path) ;
- String data = new String(dataByte,"UTF-8") ;
- if (!StringUtils.isEmpty(data)){
- return data ;
- }
- }catch (Exception e) {
- log.error("getNodeData error...", e);
- e.printStackTrace();
- }
- return null;
- }
- @Override
- public List<String> getNodeChild(String path) {
- CuratorFramework client = ZookeeperConfig.getClient() ;
- List<String> nodeChildDataList = new ArrayList<>();
- try {
- // 节点下数据集
- nodeChildDataList = client.getChildren().forPath(path);
- } catch (Exception e) {
- log.error("getNodeChild error...", e);
- e.printStackTrace();
- }
- return nodeChildDataList;
- }
- @Override
- public void deleteNode(String path, Boolean recursive) {
- CuratorFramework client = ZookeeperConfig.getClient() ;
- try {
- if(recursive) {
- // 递归删除节点
- client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
- } else {
- // 删除单个节点
- client.delete().guaranteed().forPath(path);
- }
- } catch (Exception e) {
- log.error("deleteNode error...", e);
- e.printStackTrace();
- }
- }
- @Override
- public InterProcessReadWriteLock getReadWriteLock(String path) {
- CuratorFramework client = ZookeeperConfig.getClient() ;
- // 写锁互斥、读写互斥
- InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, path);
- return readWriteLock ;
- }
- }

Zookeeper操作Controller类
- package com.example.zk.web;
-
- import com.example.zk.service.ZookeeperService;
- import org.apache.zookeeper.CreateMode;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.util.List;
-
- import static org.apache.zookeeper.CreateMode.PERSISTENT;
-
- /**
- * Zookeeper操作
- * @author lanx
- * @date 2022/3/6
- */
- @RestController
- public class ZookeeperApi {
- @Autowired
- private ZookeeperService zookeeperService ;
-
- /**
- * 获取节点下数据
- * @param path
- * @return
- */
- @GetMapping("/getNodeData")
- public String getNodeData (String path) {
- return zookeeperService.getNodeData(path) ;
- }
-
- /**
- * 判断节点是否存在
- * @param path
- * @return
- */
- @GetMapping("/isExistNode")
- public boolean isExistNode (final String path){
- return zookeeperService.isExistNode(path) ;
- }
-
- /**
- * 创建节点
- * @param path
- * @return
- */
- @GetMapping("/createNode")
- public String createNode (String path ){
- zookeeperService.createNode(CreateMode.PERSISTENT,path) ;
- return "success" ;
- }
-
- /**
- * 节点设置数据
- * @param path
- * @param nodeData
- * @return
- */
- @GetMapping("/setNodeData")
- public String setNodeData (String path, String nodeData) {
- zookeeperService.setNodeData(path,nodeData) ;
- return "success" ;
- }
-
- /**
- * 创建节点并保存数据
- * @param path
- * @param nodeData
- * @return
- */
- @GetMapping("/createNodeAndData")
- public String createNodeAndData ( String path , String nodeData){
- zookeeperService.createNodeAndData(PERSISTENT,path,nodeData) ;
- return "success" ;
- }
-
- /**
- *获取节点下数据
- * @param path
- * @return
- */
- @GetMapping("/getNodeChild")
- public List<String> getNodeChild (String path) {
- return zookeeperService.getNodeChild(path) ;
- }
-
- /**
- * 是否递归删除节点
- * @param path
- * @param recursive
- * @return
- */
- @GetMapping("/deleteNode")
- public String deleteNode (String path,Boolean recursive) {
- zookeeperService.deleteNode(path,recursive) ;
- return "success" ;
- }
- }

http://127.0.0.1:8088/createNode?path=/llx/222

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。