当前位置:   article > 正文

Springboot使用Zookeeper_zookeeper在spring boot使用

zookeeper在spring boot使用

SpringCloud使用Zookeeper作为服务注册中心

zk可以作为注册中心和配置中心

第一部分:使用ZK的CuratorFramework框架
1、添加配置
<!-- curator-framework -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.2.0</version>
</dependency>
<!-- curator-recipes试用报告 -->
<dependency>
	<groupId>org.apache.curator</groupId>
	<artifactId>curator-recipes</artifactId>
	<version>4.2.0</version>
</dependency>
<!-- curator 做服务发现-->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-x-discovery</artifactId>
    <version>4.2.0</version>
</dependency>


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
2、添加配置类
@Slf4j
@Configuration
public class ZkCoreClient {
    // zk 服务端集群地址
    @Value("${zk.url}")
    private String zkUrl;

    // session 超时时间
    private int timeOut = 60000;

    // zkclient 重试间隔时间
    private int baseSleepTimeMs = 5000;

    //zkclient 重试次数
    private int retryCount = 5;
    /**
     * 使用double-check 创建client
     */
    @Bean
    public CuratorFramework init() {
        CuratorFramework client = CuratorFrameworkFactory
                .builder()
                .connectString(zkUrl)
                .sessionTimeoutMs(timeOut)
                .retryPolicy(new ExponentialBackoffRetry(baseSleepTimeMs, retryCount))
//                            .namespace(appName)
                .build();
        // 或者使用工厂模式
//                    client = CuratorFrameworkFactory.newClient(zkUrl,new ExponentialBackoffRetry(baseSleepTimeMs,retryCount)).usingNamespace(appName);
        client.start();
        log.info("client is created at ================== {}", LocalDateTime.now());

        return client;
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
3、使用工具类
/**
 * 使用Curator 实现zk 的基本操作-增删查改数据 和监听watch
 */
@Slf4j
public class ZkUtils {
    @Autowired
    CuratorFramework client;

    /**
     * @Description: 创建路径
     */

    public String createNode(String path, String value) throws Exception {
        return createNode(path, value, true);
    }

    public String createNode(String path, String value, Boolean isEphemeral) throws Exception {
        if (null == client) {
            throw new RuntimeException("there is not connect to zkServer...");
        }
        String node = client
                .create()
                .creatingParentsIfNeeded()
                .withMode(isEphemeral.equals(true) ? CreateMode.EPHEMERAL_SEQUENTIAL : CreateMode.PERSISTENT_SEQUENTIAL) // 临时顺序节点/持久顺序节点
                .forPath(path, value.getBytes());

        log.info("create node : {}", node);
        return node;
    }

    /**
     * @param path
     * @Description: 删除节点信息
     */
    public void deleteNode(String path) throws Exception {
        if (null == client) {
            throw new RuntimeException("there is not connect to zkServer...");
        }
        client.delete()
                .guaranteed() // 保障机制,若未删除成功,只要会话有效会在后台一直尝试删除
                .deletingChildrenIfNeeded() // 若当前节点包含子节点,子节点也删除
                .forPath(path);
        log.info("{} is deleted ", path);
    }

    /**
     * 判断znode是否存在,Stat就是对znode所有属性的一个映射,stat=null表示节点不存在
     */
    public Stat isExists(String path) throws Exception {
        if (null == client) {
            throw new RuntimeException("there is not connect to zkServer...");
        }
        return client.checkExists().forPath(path);
    }

    /**
     * 查询子节点
     */
    public List<String> getChildren(String path) throws Exception {
        if (null == client) {
            throw new RuntimeException("there is not connect to zkServer...");
        }
        return client.getChildren()
                .forPath(path);
    }


    /**
     * @param path
     * @Description: 获取节点存储的值
     */
    public String getNodeData(String path) throws Exception {
        if (null == client) {
            throw new RuntimeException("there is not connect to zkServer...");
        }
        Stat stat = new Stat();
        byte[] bytes = client.getData().storingStatIn(stat).forPath(path);
        log.info("{} data is : {}", path, new String(bytes));
        log.info("current stat version is {}, createTime is {}", stat.getVersion(), stat.getCtime());
        return new String(bytes);
    }


    /**
     * @param path
     * @param value
     * @Description: 设置节点 数据
     */
    public void setNodeData(String path, String value) throws Exception {
        if (null == client) {
            throw new RuntimeException("there is not connect to zkServer...");
        }
        Stat stat = client.checkExists().forPath(path);
        if (null == stat) {
            log.info(String.format("{} Znode is not exists", path));
            throw new RuntimeException(String.format("{} Znode is not exists", path));
        }
        String nodeData = getNodeData(path);
        client.setData().withVersion(stat.getVersion()).forPath(path, value.getBytes());
        log.info("{} Znode data is set. old vaule is {}, new data is {}", path, nodeData, value);
    }


    /**
     * @param path
     * @Description: 创建 给定节点的监听事件  监听一个节点的更新和创建事件(不包括删除)
     */
    public void addWatcherWithNodeCache(String path) throws Exception {
        if (null == client) {
            throw new RuntimeException("there is not connect to zkServer...");
        }
        // dataIsCompressed if true, data in the path is compressed
        NodeCache nodeCache = new NodeCache(client, path, false);
        NodeCacheListener listener = () -> {
            ChildData currentData = nodeCache.getCurrentData();
            log.info("{} Znode data is chagnge,new data is ---  {}", currentData.getPath(), new String(currentData.getData()));
        };
        nodeCache.getListenable().addListener(listener);
        nodeCache.start();
    }


    /**
     * @param path 给定节点
     * @Description: 监听给定节点下的子节点的创建、删除、更新
     */
    public void addWatcherWithChildCache(String path) throws Exception {
        if (null == client) {
            throw new RuntimeException("there is not connect to zkServer...");
        }
        //cacheData if true, node contents are cached in addition to the stat
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, path, false);
        PathChildrenCacheListener listener = (client, event) -> {
            log.info("event path is --{} ,event type is {}", event.getData().getPath(), event.getType());
        };
        pathChildrenCache.getListenable().addListener(listener);
        // StartMode : NORMAL  BUILD_INITIAL_CACHE  POST_INITIALIZED_EVENT
        // NORMAL:异步初始化, BUILD_INITIAL_CACHE:同步初始化, POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
        pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);
    }

    /**
     * @Description: 监听 给定节点的创建、更新(不包括删除) 以及 该节点下的子节点的创建、删除、更新动作。
     */
    public void addWatcherWithTreeCache(String path) throws Exception {
        if (null == client) {
            throw new RuntimeException("there is not connect to zkServer...");
        }
        TreeCache treeCache = new TreeCache(client, path);
        TreeCacheListener listener = (client, event) -> {
            log.info("节点路径 --{} ,节点事件类型: {} , 节点值为: {}", Objects.nonNull(event.getData()) ? event.getData().getPath() : "无数据", event.getType());
        };
        treeCache.getListenable().addListener(listener);
        treeCache.start();
    }

}

public class zkRegiestter{
    /**
     * 服务注册:将当前启动的服务信息(采用map的形式)进行注册到zk中。
     * 这里采用的是map 可以自定义类
     */
    public void registerService(String serviceName, String... urls) throws Exception {
        ServiceInstanceBuilder<Map> serviceInstanceBuilder = ServiceInstance.builder();
        serviceInstanceBuilder.address(InetAddress.getLocalHost().getHostAddress());
        serviceInstanceBuilder.port(Integer.parseInt(environment.getProperty("server.port")));
        serviceInstanceBuilder.name(serviceName);
        Map config = new HashMap();
        config.put("url", urls);
        serviceInstanceBuilder.payload(config);

        ServiceInstance<Map> serviceInstance = serviceInstanceBuilder.build();
        ServiceDiscovery<Map> serviceDiscovery = ServiceDiscoveryBuilder.builder(Map.class)
                .client(client).serializer(new JsonInstanceSerializer<Map>(Map.class))
                .basePath(SERVICE_ROOT_PATH).build();

        serviceDiscovery.registerService(serviceInstance);
        serviceDiscovery.start();
    }

    /**
     * 服务发现:通过serviceDiscovery查询到服务
     */
    public void discovery(String serviceName) {
        try {
            ServiceDiscovery<Map> serviceDiscovery = ServiceDiscoveryBuilder.builder(Map.class)
                    .client(client).basePath(SERVICE_ROOT_PATH).build();
            serviceDiscovery.start();
            //根据名称获取服务
            Collection<ServiceInstance<Map>> services = serviceDiscovery.queryForInstances(serviceName);
            for (ServiceInstance<Map> service : services) {
                System.out.print(service.getPayload() + " -- ");
                System.out.println(service.getAddress() + ":" + service.getPort());
            }
            System.out.println();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

public class zkLockClient{
    @Autowired
    CuratorFramework client;

    /**
     * 首先需要调用create 创建路径
     * 尝试获取锁 包含解锁操作
     */
    public <T> TwoTuple<Boolean, T> tryLock(LockCallback<T> lockCallback, String lockKey, Long timeout) {
        InterProcessMutex lock = new InterProcessMutex(client, lockKey);
        try {
            if (lock.acquire(timeout, TimeUnit.MILLISECONDS)) {
                log.info(Thread.currentThread().getName() + " get lock");
                return new TwoTuple<>(true, lockCallback.exec());
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                log.info(Thread.currentThread().getName() + " release lock");
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return new TwoTuple<>(false, null);
    }
}




  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
第二部分:搭建服务提供者
1、添加依赖
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
</dependency>
 <dependency>
     <groupId>org.apache.zookeeper</groupId>
     <artifactId>zookeeper</artifactId>
     <version>3.5.5</version>
 </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
2、添加配置文件
spring:
  application:
    name: HelloWorld
  cloud:
    zookeeper:
      connect-string: localhost:2181
      discovery:
        enabled: true
server:
  port: 8081
endpoints:
  restart:
    enabled: true
logging:
  level:
    org.apache.zookeeper.ClientCnxn: WARN
## 单独配置zk
zookeeper:
  address: 127.0.0.1:2181
  timeout: 4000
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
3、启动类
  • @EnableDiscoveryClient
@SpringBootApplication
@EnableDiscoveryClient
public class HelloWorldApplication {
    public static void main(String[] args) {
        SpringApplication.run(HelloWorldApplication.class, args);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
3、ZK的配置类
@Configuration
public class ZookeeperConfig {
    private static final Logger logger = LoggerFactory.getLogger(ZookeeperConfig.class);

    @Value("${zookeeper.address}")
    private    String connectString;

    @Value("${zookeeper.timeout}")
    private  int timeout;


    @Bean(name = "zkClient")
    public ZooKeeper zkClient(){
        ZooKeeper zooKeeper=null;
        try {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            //连接成功后,会回调watcher监听,此连接操作是异步的,执行完new语句后,直接调用后续代码
            //  可指定多台服务地址 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
             zooKeeper = new ZooKeeper(connectString, timeout, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if(Event.KeeperState.SyncConnected==event.getState()){
                        //如果收到了服务端的响应事件,连接成功
                        countDownLatch.countDown();
                    }
                }
            });
            countDownLatch.await();
            logger.info("【初始化ZooKeeper连接状态....】={}",zooKeeper.getState());

        }catch (Exception e){
            logger.error("初始化ZooKeeper连接异常....】={}",e);
        }
         return  zooKeeper;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
4、ZK的封装工具类
@Component
public class ZkApi {
    private static final Logger logger = LoggerFactory.getLogger(ZkApi.class);
    @Autowired
    private ZooKeeper zkClient;

    /**
     * 判断指定节点是否存在
     */
    public Stat exists(String path, boolean needWatch){
       return zkClient.exists(path,needWatch);
    }

    public Stat exists(String path,Watcher watcher ){
        return zkClient.exists(path,watcher);
    }

    /**
     * 创建持久化节点
     */
    public boolean createNode(String path, String data){
       zkClient.create(path,data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
                   return true;
    }


    /**
     * 修改持久化节点
     */
    public boolean updateNode(String path, String data){
         //zk的数据版本是从0开始计数的。如果客户端传入的是-1,则表示zk服务器需要基于最新的数据进行更新。如果对zk的数据节点的更新操作没有原子性要求则可以使用-1.
        //version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查
        zkClient.setData(path,data.getBytes(),-1);
        return true;
    }

    /**
     * 删除持久化节点
     */
    public boolean deleteNode(String path){
       //version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查
        zkClient.delete(path,-1);
        return true;
    }

    /**
      * 获取当前节点的子节点(不包含孙子节点)
      * @param path 父节点path
      */
    public List<String> getChildren(String path) throws KeeperException, InterruptedException{
        List<String> list = zkClient.getChildren(path, false);
        return list;
    }

    /**
     * 获取指定节点的值
     * @param path
     * @return
     */
    public  String getData(String path,Watcher watcher){
       Stat stat=new Stat();
       byte[] bytes=zkClient.getData(path,watcher,stat);
       return  new String(bytes);
    }

    @PostConstruct
    public  void init(){
        String path="/zk-watcher-2";
        createNode(path,"测试");
        String value=getData(path,new WatcherApi());
        // 删除节点出发 监听事件
        deleteNode(path);

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
5、实现Watcher监听
public class WatcherApi implements Watcher {

    private static final Logger logger = LoggerFactory.getLogger(WatcherApi.class);
    @Override
    public void process(WatchedEvent event) {
        logger.info("【Watcher监听事件】={}",event.getState());
        logger.info("【监听路径为】={}",event.getPath());
        logger.info("【监听的类型为】={}",event.getType()); //  三种监听类型: 创建,删除,更新
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/54005
推荐阅读
相关标签
  

闽ICP备14008679号