赞
踩
Zookeeper 的原生客户端使用起来比较繁琐,一般生成环境很少使用。curator 在外面封装了一层,使用起来更加方便,并且还提供了常用的场景,比如 leader 选举,分布式锁,分布式队列。
官方文档说明:Curator 2.x.x-兼容两个zk 3.4.x 和zk 3.5.x,Curator 3.x.x-兼容兼容zk 3.5。
Curator官网:Apache Curator –
- <!-- web -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
-
- <!-- fastjson -->
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.83</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- <version>5.2.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>5.2.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-client</artifactId>
- <version>5.2.0</version>
- </dependency>

- zookeeper:
- connect:
- address: IP:2181 #zookeeper连接地址
- connection-time-out: 10000 #连接超时时间 毫秒
- session-time-out: 10000 #session超时时间 毫秒
- sleep-time-out: 3000 #重试初试时间 毫秒
- max-retries: 3 #重试次数
- wait-time: 20 #连接等待时间 毫秒
- name-space: curator #命名空间
- /**
- * @Description: curator-congif
- * @create by meng on 13:56 2022/10/26
- */
- @Configuration
- public class CuratorConfig {
-
- @Value("${zookeeper.connect.address}")
- private String connectStr;
- @Value("${zookeeper.connect.connection-time-out}")
- private int connectionTimeout;
- @Value("${zookeeper.connect.session-time-out}")
- private int sessionTimeout;
- @Value("${zookeeper.connect.sleep-time-out}")
- private int sleepTimeOut;
- @Value("${zookeeper.connect.max-retries}")
- private int maxRetries;
- @Value("${zookeeper.connect.name-space}")
- private String namespace;
-
- /**
- * 初始化curator客户端
- * @return
- */
- @Bean
- public CuratorFramework getCuratorClient() {
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(sleepTimeOut, maxRetries);
- CuratorFramework client = CuratorFrameworkFactory.builder()
- .connectString(connectStr)
- .connectionTimeoutMs(connectionTimeout)
- .sessionTimeoutMs(sessionTimeout)
- .namespace(namespace)
- .retryPolicy(retryPolicy).build();
- client.start();
- return client;
- }
- }

- /**
- * @Description: CuratorService
- * @create by meng on 13:57 2022/10/26
- */
- @Service
- public class CuratorService {
-
- @Value("${zookeeper.connect.wait-time}")
- private int waitTime;
- @Autowired
- private CuratorFramework curatorClient;
-
- /**
- * 创建节点
- * @param path
- * @param data
- * @return
- * @throws Exception
- */
- public String createNode(String path, String data) throws Exception {
- String hostAddress = InetAddress.getLocalHost().getHostAddress();
- String serviceInstance = "prometheus" + "-" + hostAddress + "-";
- String nodePath = curatorClient.create().creatingParentsIfNeeded()
- .forPath(path + "/" + serviceInstance, data.getBytes(StandardCharsets.UTF_8));
- return nodePath;
- }
-
- /**
- * 创建指定类型的无序节点(持久或临时)
- * @param nodeType
- * @param path
- * @param data
- * @return
- * @throws Exception
- */
- public String createTypeNode(CreateMode nodeType, String path, String data) throws Exception {
- String nodePath = curatorClient.create().creatingParentsIfNeeded().withMode(nodeType)
- .forPath(path, data.getBytes(StandardCharsets.UTF_8));
- return nodePath;
- }
-
- /**
- * 创建指定类型的有序节点
- * @param nodeType
- * @param path
- * @param data
- * @return
- */
- public String createTypeSeqNode(CreateMode nodeType, String path, String data) throws Exception {
- String nodePath = curatorClient.create().creatingParentsIfNeeded().withProtection().withMode(nodeType)
- .forPath(path, data.getBytes(StandardCharsets.UTF_8));
- return nodePath;
- }
-
- /**
- * 设置值
- * @param path
- * @param data
- * @return
- */
- public Stat setData(String path, String data) throws Exception {
- int version = 0; // 当前节点的版本信息
- Stat stat = new Stat();
- curatorClient.getData().storingStatIn(stat).forPath(path);
- version = stat.getVersion();
- // 如果版本信息不一致,说明当前数据被修改过,则修改失败程序报错
- curatorClient.setData().withVersion(version).forPath(path, data.getBytes(StandardCharsets.UTF_8));
- return stat;
- }
-
- /**
- * 异步设置值
- * @param path
- * @param data
- * @return
- * @throws Exception
- */
- public Stat setDataAsync(String path, String data) throws Exception {
- CuratorListener listener = new CuratorListener() {
- @Override
- public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
- //examine event for details
- }
- };
- curatorClient.getCuratorListenable().addListener(listener);
- Stat stat = curatorClient.setData().inBackground().forPath(path, data.getBytes(StandardCharsets.UTF_8));
- return stat;
- }
-
- /**
- * 删除节点
- * @param path
- * @throws Exception
- */
- public void deleteNode(String path) throws Exception {
- // curatorClient.delete().deletingChildrenIfNeeded().forPath(path);
- // deletingChildrenIfNeeded如果有子节点一并删除
- // guaranteed必须成功比如网络抖动时造成命令失败
- curatorClient.delete().guaranteed().deletingChildrenIfNeeded().inBackground(new BackgroundCallback() {
- @Override
- public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
- System.out.println("删除成功");
- // { "path":"/javacui/p1","resultCode":0,"type":"DELETE"}
- System.out.println(JSON.toJSONString(curatorEvent, true));
- }
- }).forPath(path);
- }
-
- /**
- * 查询节点
- * @param path
- * @throws Exception
- */
- public Stat getPath(String path) throws Exception {
- // 查内容
- byte[] data = curatorClient.getData().forPath(path);
- System.out.println(new String(data));
-
- // 查状态
- Stat stat = new Stat();
- curatorClient.getData().storingStatIn(stat).forPath(path);
- System.out.println(JSON.toJSONString(stat, true));
- return stat;
- }
-
- /**
- * 查看子节点
- * @param path
- * @return
- * @throws Exception
- */
- public List<String> watchedGetChildren(String path) throws Exception {
- List<String> childrenList = curatorClient.getChildren().watched().forPath(path);
- return childrenList;
- }
-
- /**
- * 查看子节点
- * @param path
- * @param watcher
- * @return
- * @throws Exception
- */
- public List<String> watchedGetChildren(String path, Watcher watcher) throws Exception {
- List<String> childrenList = curatorClient.getChildren().usingWatcher(watcher).forPath(path);
- return childrenList;
- }
-
- /**
- * 创建分布式锁
- * @return
- */
- public void getLock(String path) {
- InterProcessLock lock = new InterProcessMutex(curatorClient, path);
- try {
- if (lock.acquire(waitTime, TimeUnit.MILLISECONDS)) {
- System.out.println("to do something");
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- lock.release();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- /**
- * 获取分布式ID
- * @param path
- * @param data
- * @return
- * @throws Exception
- */
- public String getDistributedId(String path, String data) throws Exception {
- String seqNode = this.createTypeSeqNode(CreateMode.EPHEMERAL_SEQUENTIAL, path, data);
- System.out.println(seqNode);
- int index = seqNode.lastIndexOf(path);
- if (index >= 0) {
- index += path.length();
- return index <= seqNode.length() ? seqNode.substring(index) : "";
- }
- return seqNode;
- }
-
- }

- package org.meng.zookeeperCurator;
-
- import com.alibaba.fastjson.JSON;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.api.BackgroundCallback;
- import org.apache.curator.framework.api.CuratorEvent;
- import org.apache.zookeeper.data.Stat;
- import org.junit.jupiter.api.Test;
- import org.meng.zookeeperCurator.service.CuratorService;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
-
- import java.net.InetAddress;
- import java.nio.charset.StandardCharsets;
- import java.util.Collections;
- import java.util.List;
-
- @SpringBootTest
- class ZookeeperCuratorApplicationTests {
-
- private static final String path = "/zkTest";
-
- @Autowired
- private CuratorService curatorService;
-
- @Test
- void contextLoads() {
- }
-
- // 创建节点,赋值
- @Test
- public void createPath() throws Exception {
- curatorService.createNode(path, "测试zookeeper");
- }
-
- // 修改数据
- @Test
- public void setData() throws Exception {
- String hostAddress = InetAddress.getLocalHost().getHostAddress();
- String serviceInstance = "prometheus" + "-" + hostAddress + "-";
- curatorService.setData(path + "/" + serviceInstance, "测试zookeeper修改");
- }
-
- // 查询节点
- @Test
- public void getPath() throws Exception {
- String hostAddress = InetAddress.getLocalHost().getHostAddress();
- String serviceInstance = "prometheus" + "-" + hostAddress + "-";
- curatorService.getPath(path + "/" + serviceInstance);
- }
-
- // 查询子节点
- @Test
- public void getPaths() throws Exception {
- List<String> list = curatorService.watchedGetChildren(path);
- System.out.println(list);
- }
-
- // 删除节点
- @Test
- public void deletePath() throws Exception {
- String hostAddress = InetAddress.getLocalHost().getHostAddress();
- String serviceInstance = "prometheus" + "-" + hostAddress + "-";
- curatorService.deleteNode(path + "/" + serviceInstance);
- }
-
- // 负载均衡
- @Test
- public void loadPath() throws Exception {
- List<String> instanceList = curatorService.watchedGetChildren(path);
- Collections.sort(instanceList);
- String hostAddress = InetAddress.getLocalHost().getHostAddress();
- int instanceNo = 0;
- if (hostAddress != null) {
- for (int i=0; i<instanceList.size(); i++) {
- if (instanceList.get(i).split("-")[1].equals(hostAddress)) {
- instanceNo = i;
- }
- }
- } else {
- System.out.println("获取本地IP失败");
- }
- System.out.println("实例总数:"+instanceList.size()+", 第"+instanceNo+"个实例");
- }
-
- @Test
- public void getLock() throws Exception {
- curatorService.getLock(path +"/zkLock");
- }
-
- @Test
- public void getDistributedId() throws Exception {
- String zkDemoId = curatorService.getDistributedId(path + "/distributedId", "zkDemoId");
- }
-
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。