当前位置:   article > 正文

Springboot-Zookeeper(curator)实现分布式锁、分布式ID等_springboot curator

springboot curator

        Zookeeper 的原生客户端使用起来比较繁琐,一般生成环境很少使用。curator 在外面封装了一层,使用起来更加方便,并且还提供了常用的场景,比如 leader 选举,分布式锁,分布式队列。

        官方文档说明:Curator 2.x.x-兼容两个zk 3.4.x 和zk 3.5.x,Curator 3.x.x-兼容兼容zk 3.5。

        Curator官网:Apache Curator –

 一、引入依赖

  1. <!-- web -->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-test</artifactId>
  9. <scope>test</scope>
  10. </dependency>
  11. <!-- fastjson -->
  12. <dependency>
  13. <groupId>com.alibaba</groupId>
  14. <artifactId>fastjson</artifactId>
  15. <version>1.2.83</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.curator</groupId>
  19. <artifactId>curator-framework</artifactId>
  20. <version>5.2.0</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.apache.curator</groupId>
  24. <artifactId>curator-recipes</artifactId>
  25. <version>5.2.0</version>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.apache.curator</groupId>
  29. <artifactId>curator-client</artifactId>
  30. <version>5.2.0</version>
  31. </dependency>

二、application.yml配置

  1. zookeeper:
  2. connect:
  3. address: IP:2181 #zookeeper连接地址
  4. connection-time-out: 10000 #连接超时时间 毫秒
  5. session-time-out: 10000 #session超时时间 毫秒
  6. sleep-time-out: 3000 #重试初试时间 毫秒
  7. max-retries: 3 #重试次数
  8. wait-time: 20 #连接等待时间 毫秒
  9. name-space: curator #命名空间

三、配置文件

  1. /**
  2. * @Description: curator-congif
  3. * @create by meng on 13:56 2022/10/26
  4. */
  5. @Configuration
  6. public class CuratorConfig {
  7. @Value("${zookeeper.connect.address}")
  8. private String connectStr;
  9. @Value("${zookeeper.connect.connection-time-out}")
  10. private int connectionTimeout;
  11. @Value("${zookeeper.connect.session-time-out}")
  12. private int sessionTimeout;
  13. @Value("${zookeeper.connect.sleep-time-out}")
  14. private int sleepTimeOut;
  15. @Value("${zookeeper.connect.max-retries}")
  16. private int maxRetries;
  17. @Value("${zookeeper.connect.name-space}")
  18. private String namespace;
  19. /**
  20. * 初始化curator客户端
  21. * @return
  22. */
  23. @Bean
  24. public CuratorFramework getCuratorClient() {
  25. RetryPolicy retryPolicy = new ExponentialBackoffRetry(sleepTimeOut, maxRetries);
  26. CuratorFramework client = CuratorFrameworkFactory.builder()
  27. .connectString(connectStr)
  28. .connectionTimeoutMs(connectionTimeout)
  29. .sessionTimeoutMs(sessionTimeout)
  30. .namespace(namespace)
  31. .retryPolicy(retryPolicy).build();
  32. client.start();
  33. return client;
  34. }
  35. }

四、接口服务

  1. /**
  2. * @Description: CuratorService
  3. * @create by meng on 13:57 2022/10/26
  4. */
  5. @Service
  6. public class CuratorService {
  7. @Value("${zookeeper.connect.wait-time}")
  8. private int waitTime;
  9. @Autowired
  10. private CuratorFramework curatorClient;
  11. /**
  12. * 创建节点
  13. * @param path
  14. * @param data
  15. * @return
  16. * @throws Exception
  17. */
  18. public String createNode(String path, String data) throws Exception {
  19. String hostAddress = InetAddress.getLocalHost().getHostAddress();
  20. String serviceInstance = "prometheus" + "-" + hostAddress + "-";
  21. String nodePath = curatorClient.create().creatingParentsIfNeeded()
  22. .forPath(path + "/" + serviceInstance, data.getBytes(StandardCharsets.UTF_8));
  23. return nodePath;
  24. }
  25. /**
  26. * 创建指定类型的无序节点(持久或临时)
  27. * @param nodeType
  28. * @param path
  29. * @param data
  30. * @return
  31. * @throws Exception
  32. */
  33. public String createTypeNode(CreateMode nodeType, String path, String data) throws Exception {
  34. String nodePath = curatorClient.create().creatingParentsIfNeeded().withMode(nodeType)
  35. .forPath(path, data.getBytes(StandardCharsets.UTF_8));
  36. return nodePath;
  37. }
  38. /**
  39. * 创建指定类型的有序节点
  40. * @param nodeType
  41. * @param path
  42. * @param data
  43. * @return
  44. */
  45. public String createTypeSeqNode(CreateMode nodeType, String path, String data) throws Exception {
  46. String nodePath = curatorClient.create().creatingParentsIfNeeded().withProtection().withMode(nodeType)
  47. .forPath(path, data.getBytes(StandardCharsets.UTF_8));
  48. return nodePath;
  49. }
  50. /**
  51. * 设置值
  52. * @param path
  53. * @param data
  54. * @return
  55. */
  56. public Stat setData(String path, String data) throws Exception {
  57. int version = 0; // 当前节点的版本信息
  58. Stat stat = new Stat();
  59. curatorClient.getData().storingStatIn(stat).forPath(path);
  60. version = stat.getVersion();
  61. // 如果版本信息不一致,说明当前数据被修改过,则修改失败程序报错
  62. curatorClient.setData().withVersion(version).forPath(path, data.getBytes(StandardCharsets.UTF_8));
  63. return stat;
  64. }
  65. /**
  66. * 异步设置值
  67. * @param path
  68. * @param data
  69. * @return
  70. * @throws Exception
  71. */
  72. public Stat setDataAsync(String path, String data) throws Exception {
  73. CuratorListener listener = new CuratorListener() {
  74. @Override
  75. public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
  76. //examine event for details
  77. }
  78. };
  79. curatorClient.getCuratorListenable().addListener(listener);
  80. Stat stat = curatorClient.setData().inBackground().forPath(path, data.getBytes(StandardCharsets.UTF_8));
  81. return stat;
  82. }
  83. /**
  84. * 删除节点
  85. * @param path
  86. * @throws Exception
  87. */
  88. public void deleteNode(String path) throws Exception {
  89. // curatorClient.delete().deletingChildrenIfNeeded().forPath(path);
  90. // deletingChildrenIfNeeded如果有子节点一并删除
  91. // guaranteed必须成功比如网络抖动时造成命令失败
  92. curatorClient.delete().guaranteed().deletingChildrenIfNeeded().inBackground(new BackgroundCallback() {
  93. @Override
  94. public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
  95. System.out.println("删除成功");
  96. // { "path":"/javacui/p1","resultCode":0,"type":"DELETE"}
  97. System.out.println(JSON.toJSONString(curatorEvent, true));
  98. }
  99. }).forPath(path);
  100. }
  101. /**
  102. * 查询节点
  103. * @param path
  104. * @throws Exception
  105. */
  106. public Stat getPath(String path) throws Exception {
  107. // 查内容
  108. byte[] data = curatorClient.getData().forPath(path);
  109. System.out.println(new String(data));
  110. // 查状态
  111. Stat stat = new Stat();
  112. curatorClient.getData().storingStatIn(stat).forPath(path);
  113. System.out.println(JSON.toJSONString(stat, true));
  114. return stat;
  115. }
  116. /**
  117. * 查看子节点
  118. * @param path
  119. * @return
  120. * @throws Exception
  121. */
  122. public List<String> watchedGetChildren(String path) throws Exception {
  123. List<String> childrenList = curatorClient.getChildren().watched().forPath(path);
  124. return childrenList;
  125. }
  126. /**
  127. * 查看子节点
  128. * @param path
  129. * @param watcher
  130. * @return
  131. * @throws Exception
  132. */
  133. public List<String> watchedGetChildren(String path, Watcher watcher) throws Exception {
  134. List<String> childrenList = curatorClient.getChildren().usingWatcher(watcher).forPath(path);
  135. return childrenList;
  136. }
  137. /**
  138. * 创建分布式锁
  139. * @return
  140. */
  141. public void getLock(String path) {
  142. InterProcessLock lock = new InterProcessMutex(curatorClient, path);
  143. try {
  144. if (lock.acquire(waitTime, TimeUnit.MILLISECONDS)) {
  145. System.out.println("to do something");
  146. }
  147. } catch (Exception e) {
  148. e.printStackTrace();
  149. } finally {
  150. try {
  151. lock.release();
  152. } catch (Exception e) {
  153. e.printStackTrace();
  154. }
  155. }
  156. }
  157. /**
  158. * 获取分布式ID
  159. * @param path
  160. * @param data
  161. * @return
  162. * @throws Exception
  163. */
  164. public String getDistributedId(String path, String data) throws Exception {
  165. String seqNode = this.createTypeSeqNode(CreateMode.EPHEMERAL_SEQUENTIAL, path, data);
  166. System.out.println(seqNode);
  167. int index = seqNode.lastIndexOf(path);
  168. if (index >= 0) {
  169. index += path.length();
  170. return index <= seqNode.length() ? seqNode.substring(index) : "";
  171. }
  172. return seqNode;
  173. }
  174. }

五、测试相关操作

  1. package org.meng.zookeeperCurator;
  2. import com.alibaba.fastjson.JSON;
  3. import org.apache.curator.framework.CuratorFramework;
  4. import org.apache.curator.framework.api.BackgroundCallback;
  5. import org.apache.curator.framework.api.CuratorEvent;
  6. import org.apache.zookeeper.data.Stat;
  7. import org.junit.jupiter.api.Test;
  8. import org.meng.zookeeperCurator.service.CuratorService;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.boot.test.context.SpringBootTest;
  11. import java.net.InetAddress;
  12. import java.nio.charset.StandardCharsets;
  13. import java.util.Collections;
  14. import java.util.List;
  15. @SpringBootTest
  16. class ZookeeperCuratorApplicationTests {
  17. private static final String path = "/zkTest";
  18. @Autowired
  19. private CuratorService curatorService;
  20. @Test
  21. void contextLoads() {
  22. }
  23. // 创建节点,赋值
  24. @Test
  25. public void createPath() throws Exception {
  26. curatorService.createNode(path, "测试zookeeper");
  27. }
  28. // 修改数据
  29. @Test
  30. public void setData() throws Exception {
  31. String hostAddress = InetAddress.getLocalHost().getHostAddress();
  32. String serviceInstance = "prometheus" + "-" + hostAddress + "-";
  33. curatorService.setData(path + "/" + serviceInstance, "测试zookeeper修改");
  34. }
  35. // 查询节点
  36. @Test
  37. public void getPath() throws Exception {
  38. String hostAddress = InetAddress.getLocalHost().getHostAddress();
  39. String serviceInstance = "prometheus" + "-" + hostAddress + "-";
  40. curatorService.getPath(path + "/" + serviceInstance);
  41. }
  42. // 查询子节点
  43. @Test
  44. public void getPaths() throws Exception {
  45. List<String> list = curatorService.watchedGetChildren(path);
  46. System.out.println(list);
  47. }
  48. // 删除节点
  49. @Test
  50. public void deletePath() throws Exception {
  51. String hostAddress = InetAddress.getLocalHost().getHostAddress();
  52. String serviceInstance = "prometheus" + "-" + hostAddress + "-";
  53. curatorService.deleteNode(path + "/" + serviceInstance);
  54. }
  55. // 负载均衡
  56. @Test
  57. public void loadPath() throws Exception {
  58. List<String> instanceList = curatorService.watchedGetChildren(path);
  59. Collections.sort(instanceList);
  60. String hostAddress = InetAddress.getLocalHost().getHostAddress();
  61. int instanceNo = 0;
  62. if (hostAddress != null) {
  63. for (int i=0; i<instanceList.size(); i++) {
  64. if (instanceList.get(i).split("-")[1].equals(hostAddress)) {
  65. instanceNo = i;
  66. }
  67. }
  68. } else {
  69. System.out.println("获取本地IP失败");
  70. }
  71. System.out.println("实例总数:"+instanceList.size()+", 第"+instanceNo+"个实例");
  72. }
  73. @Test
  74. public void getLock() throws Exception {
  75. curatorService.getLock(path +"/zkLock");
  76. }
  77. @Test
  78. public void getDistributedId() throws Exception {
  79. String zkDemoId = curatorService.getDistributedId(path + "/distributedId", "zkDemoId");
  80. }
  81. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/772517
推荐阅读
相关标签
  

闽ICP备14008679号