当前位置:   article > 正文

Spring Boot 整合Zookeeper组件_springboot集成zookeeper

springboot集成zookeeper

简介

         Zookeeper是一个Apache开源的分布式的应用,为系统架构提供协调服务。从设计模式角度来审视:该组件是一个基于观察者模式设计的框架,负责存储和管理数据,接受观察者的注册,一旦数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的观察者做出相应的反应,从而实现集群中类似Master/Slave管理模式。ZooKeeper的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。

技术架构:

Spring Boot 2.6.3 、Zookeeper3.4.6 、 JDK1.8

导入依赖

  1. <dependency>
  2. <groupId>org.apache.curator</groupId>
  3. <artifactId>curator-framework</artifactId>
  4. <version>2.12.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.curator</groupId>
  8. <artifactId>curator-recipes</artifactId>
  9. <version>2.12.0</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.curator</groupId>
  13. <artifactId>curator-client</artifactId>
  14. <version>2.12.0</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.projectlombok</groupId>
  18. <artifactId>lombok</artifactId>
  19. <version>1.18.4</version>
  20. </dependency>

application.yml配置

  1. zoo:
  2. keeper:
  3. #开启标志
  4. enabled: true
  5. #服务器地址
  6. server: 192.168.116.100:2181
  7. #命名空间,被称为ZNode
  8. namespace: lx
  9. #权限控制,加密
  10. digest: smile:111111
  11. #会话超时时间
  12. sessionTimeoutMs: 3000
  13. #连接超时时间
  14. connectionTimeoutMs: 60000
  15. #最大重试次数
  16. maxRetries: 10
  17. #初始休眠时间
  18. baseSleepTimeMs: 1000

Zookeeper配置

  1. package com.example.zk.config;
  2. import lombok.Data;
  3. import org.springframework.boot.context.properties.ConfigurationProperties;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * @author lanx
  7. * @date 2022/3/6
  8. */
  9. @Data
  10. @Component
  11. @ConfigurationProperties(prefix = "zoo.keeper")
  12. public class ZookeeperProperties {
  13. private String enabled;
  14. private String server;
  15. private String namespace;
  16. private String digest;
  17. private int sessionTimeoutMs;
  18. private int connectionTimeoutMs;
  19. private int maxRetries;
  20. private int baseSleepTimeMs;
  21. }

Zookeeper 初始化

  1. package com.example.zk.config;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.curator.RetryPolicy;
  4. import org.apache.curator.framework.CuratorFramework;
  5. import org.apache.curator.framework.CuratorFrameworkFactory;
  6. import org.apache.curator.retry.ExponentialBackoffRetry;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.context.annotation.Configuration;
  9. import javax.annotation.PostConstruct;
  10. /**
  11. * 配置
  12. * @author lanx
  13. * @date 2022/3/6
  14. */
  15. @Slf4j
  16. @Configuration
  17. public class ZookeeperConfig {
  18. @Autowired
  19. private ZookeeperProperties zookeeperProperties ;
  20. private static CuratorFramework client = null ;
  21. /**
  22. * 初始化
  23. */
  24. @PostConstruct
  25. public void init (){
  26. //重试策略,初试时间1秒,重试10次
  27. RetryPolicy policy = new ExponentialBackoffRetry(
  28. zookeeperProperties.getBaseSleepTimeMs(),
  29. zookeeperProperties.getMaxRetries());
  30. //通过工厂创建Curator
  31. client = CuratorFrameworkFactory.builder()
  32. .connectString(zookeeperProperties.getServer())
  33. .authorization("digest",zookeeperProperties.getDigest().getBytes())
  34. .connectionTimeoutMs(zookeeperProperties.getConnectionTimeoutMs())
  35. .sessionTimeoutMs(zookeeperProperties.getSessionTimeoutMs())
  36. .retryPolicy(policy).build();
  37. //开启连接
  38. client.start();
  39. log.info("zookeeper 初始化完成...");
  40. }
  41. public static CuratorFramework getClient (){
  42. return client ;
  43. }
  44. public static void closeClient (){
  45. if (client != null){
  46. client.close();
  47. }
  48. }
  49. }

Zookeeper节点数据监听

  1. package com.example.zk.config;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.curator.framework.CuratorFramework;
  4. import org.apache.curator.framework.recipes.cache.*;
  5. import org.springframework.boot.ApplicationArguments;
  6. import org.springframework.boot.ApplicationRunner;
  7. import org.springframework.stereotype.Component;
  8. import java.util.concurrent.ExecutorService;
  9. import java.util.concurrent.Executors;
  10. /**
  11. * 节点数据监听
  12. * @author lanx
  13. * @date 2022/3/6
  14. */
  15. @Component
  16. @Slf4j
  17. public class ZookListening implements ApplicationRunner {
  18. @Override
  19. public void run(ApplicationArguments args) throws Exception {
  20. /**
  21. * 在注册监听器的时候,如果传入此参数,当事件触发时,逻辑由线程池处理
  22. */
  23. ExecutorService pool = Executors.newFixedThreadPool(2);
  24. /**
  25. * 监听数据节点的变化情况
  26. */
  27. final NodeCache nodeCache = new NodeCache(ZookeeperConfig.getClient(), "/lanxi", false);
  28. nodeCache.start(true);
  29. nodeCache.getListenable().addListener(
  30. /**
  31. * 此方法只监听创建节点和更新节点,在删除节点是不触发此操作
  32. */
  33. new NodeCacheListener() {
  34. @Override
  35. public void nodeChanged() throws Exception {
  36. log.info("Node data is changed, new data: {}" ,
  37. new String(nodeCache.getCurrentData().getData()));
  38. }
  39. },
  40. pool
  41. );
  42. /**
  43. * 监听子节点的变化情况
  44. *
  45. * 建立一个PathChildrenCache的缓存,第三个参数为是否接收节点数据内容,如果为 false 则不接受
  46. */
  47. final PathChildrenCache childrenCache = new PathChildrenCache(ZookeeperConfig.getClient(), "/llx", true);
  48. //在初始时就进行缓存监听
  49. childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
  50. childrenCache.getListenable().addListener(
  51. new PathChildrenCacheListener() {
  52. /**
  53. * 监听节点的变更。 新建、修改、删除
  54. * @param client
  55. * @param event
  56. * @throws Exception
  57. */
  58. @Override
  59. public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
  60. throws Exception {
  61. switch (event.getType()) {
  62. case CHILD_ADDED:
  63. System.out.println("CHILD_ADDED: " + event.getData().getPath());
  64. break;
  65. case CHILD_REMOVED:
  66. System.out.println("CHILD_REMOVED: " + event.getData().getPath());
  67. break;
  68. case CHILD_UPDATED:
  69. System.out.println("CHILD_UPDATED: " + event.getData().getPath());
  70. break;
  71. default:
  72. break;
  73. }
  74. }
  75. },
  76. pool
  77. );
  78. }
  79. }

curator框架,使用 DistributedAtomicInteger 作为分布式计数器

  1. //curator框架,使用 DistributedAtomicInteger 作为分布式计数器
  2. DistributedAtomicInteger atomicInteger =
  3. new DistributedAtomicInteger(ZookeeperConfig.getClient(), "/llx", new RetryNTimes(3, 1000));
  4. //复位
  5. atomicInteger.forceSet(0);
  6. AtomicValue<Integer> value = atomicInteger.add(1);
  7. log.info("Boolean:{}", value.succeeded());
  8. log.info("最新值:{}", value.postValue());
  9. log.info("原始值:{}", value.preValue());

curator 框架,使用 InterProcessMutex 分布式锁

  1. for (int i = 0; i < 10; i++) {
  2. new Thread(new Runnable() {
  3. @Override
  4. public void run() {
  5. //分布式锁
  6. final InterProcessMutex lock = new InterProcessMutex(ZookeeperConfig.getClient(), "/llx");
  7. try {
  8. //获取锁
  9. lock.acquire();
  10. log.info("线程:{},执行业务逻辑", Thread.currentThread().getName());
  11. Thread.sleep(1000*10);
  12. } catch (Exception e) {
  13. e.printStackTrace();
  14. } finally {
  15. try {
  16. //释放锁
  17. lock.release();
  18. } catch (Exception e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. }
  23. }).start();
  24. }

Zookeeper操作Service类

  1. package com.example.zk.service;
  2. import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
  3. import org.apache.zookeeper.CreateMode;
  4. import java.util.List;
  5. public interface ZookeeperService {
  6. /**
  7. * 判断节点是否存在
  8. */
  9. boolean isExistNode (final String path) ;
  10. /**
  11. * 创建节点
  12. */
  13. void createNode (CreateMode mode, String path ) ;
  14. /**
  15. * 设置节点数据
  16. */
  17. void setNodeData (String path, String nodeData) ;
  18. /**
  19. * 创建节点
  20. */
  21. void createNodeAndData (CreateMode mode, String path , String nodeData) ;
  22. /**
  23. * 获取节点数据
  24. */
  25. String getNodeData (String path) ;
  26. /**
  27. * 获取节点下数据
  28. */
  29. List<String> getNodeChild (String path) ;
  30. /**
  31. * 是否递归删除节点
  32. */
  33. void deleteNode (String path,Boolean recursive) ;
  34. /**
  35. * 获取读写锁
  36. */
  37. InterProcessReadWriteLock getReadWriteLock (String path) ;
  38. }
  1. package com.example.zk.service.impl;
  2. import com.example.zk.config.ZookeeperConfig;
  3. import com.example.zk.service.ZookeeperService;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.curator.framework.CuratorFramework;
  6. import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
  7. import org.apache.zookeeper.CreateMode;
  8. import org.apache.zookeeper.data.Stat;
  9. import org.springframework.stereotype.Service;
  10. import org.springframework.util.StringUtils;
  11. import java.util.ArrayList;
  12. import java.util.List;
  13. @Slf4j
  14. @Service
  15. public class ZookeeperServiceImpl implements ZookeeperService {
  16. @Override
  17. public boolean isExistNode(String path) {
  18. CuratorFramework client = ZookeeperConfig.getClient();
  19. client.sync() ;
  20. try {
  21. Stat stat = client.checkExists().forPath(path);
  22. return client.checkExists().forPath(path) != null;
  23. } catch (Exception e) {
  24. log.error("isExistNode error...", e);
  25. e.printStackTrace();
  26. }
  27. return false;
  28. }
  29. @Override
  30. public void createNode(CreateMode mode, String path) {
  31. CuratorFramework client = ZookeeperConfig.getClient() ;
  32. try {
  33. // 递归创建所需父节点
  34. client.create().creatingParentsIfNeeded().withMode(mode).forPath(path);
  35. } catch (Exception e) {
  36. log.error("createNode error...", e);
  37. e.printStackTrace();
  38. }
  39. }
  40. @Override
  41. public void setNodeData(String path, String nodeData) {
  42. CuratorFramework client = ZookeeperConfig.getClient() ;
  43. try {
  44. // 设置节点数据
  45. client.setData().forPath(path, nodeData.getBytes("UTF-8"));
  46. } catch (Exception e) {
  47. log.error("setNodeData error...", e);
  48. e.printStackTrace();
  49. }
  50. }
  51. @Override
  52. public void createNodeAndData(CreateMode mode, String path, String nodeData) {
  53. CuratorFramework client = ZookeeperConfig.getClient() ;
  54. try {
  55. // 创建节点,关联数据
  56. client.create().creatingParentsIfNeeded().withMode(mode)
  57. .forPath(path,nodeData.getBytes("UTF-8"));
  58. } catch (Exception e) {
  59. log.error("createNode error...", e);
  60. e.printStackTrace();
  61. }
  62. }
  63. @Override
  64. public String getNodeData(String path) {
  65. CuratorFramework client = ZookeeperConfig.getClient() ;
  66. try {
  67. // 数据读取和转换
  68. byte[] dataByte = client.getData().forPath(path) ;
  69. String data = new String(dataByte,"UTF-8") ;
  70. if (!StringUtils.isEmpty(data)){
  71. return data ;
  72. }
  73. }catch (Exception e) {
  74. log.error("getNodeData error...", e);
  75. e.printStackTrace();
  76. }
  77. return null;
  78. }
  79. @Override
  80. public List<String> getNodeChild(String path) {
  81. CuratorFramework client = ZookeeperConfig.getClient() ;
  82. List<String> nodeChildDataList = new ArrayList<>();
  83. try {
  84. // 节点下数据集
  85. nodeChildDataList = client.getChildren().forPath(path);
  86. } catch (Exception e) {
  87. log.error("getNodeChild error...", e);
  88. e.printStackTrace();
  89. }
  90. return nodeChildDataList;
  91. }
  92. @Override
  93. public void deleteNode(String path, Boolean recursive) {
  94. CuratorFramework client = ZookeeperConfig.getClient() ;
  95. try {
  96. if(recursive) {
  97. // 递归删除节点
  98. client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
  99. } else {
  100. // 删除单个节点
  101. client.delete().guaranteed().forPath(path);
  102. }
  103. } catch (Exception e) {
  104. log.error("deleteNode error...", e);
  105. e.printStackTrace();
  106. }
  107. }
  108. @Override
  109. public InterProcessReadWriteLock getReadWriteLock(String path) {
  110. CuratorFramework client = ZookeeperConfig.getClient() ;
  111. // 写锁互斥、读写互斥
  112. InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, path);
  113. return readWriteLock ;
  114. }
  115. }

Zookeeper操作Controller类

  1. package com.example.zk.web;
  2. import com.example.zk.service.ZookeeperService;
  3. import org.apache.zookeeper.CreateMode;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.web.bind.annotation.GetMapping;
  6. import org.springframework.web.bind.annotation.RestController;
  7. import java.util.List;
  8. import static org.apache.zookeeper.CreateMode.PERSISTENT;
  9. /**
  10. * Zookeeper操作
  11. * @author lanx
  12. * @date 2022/3/6
  13. */
  14. @RestController
  15. public class ZookeeperApi {
  16. @Autowired
  17. private ZookeeperService zookeeperService ;
  18. /**
  19. * 获取节点下数据
  20. * @param path
  21. * @return
  22. */
  23. @GetMapping("/getNodeData")
  24. public String getNodeData (String path) {
  25. return zookeeperService.getNodeData(path) ;
  26. }
  27. /**
  28. * 判断节点是否存在
  29. * @param path
  30. * @return
  31. */
  32. @GetMapping("/isExistNode")
  33. public boolean isExistNode (final String path){
  34. return zookeeperService.isExistNode(path) ;
  35. }
  36. /**
  37. * 创建节点
  38. * @param path
  39. * @return
  40. */
  41. @GetMapping("/createNode")
  42. public String createNode (String path ){
  43. zookeeperService.createNode(CreateMode.PERSISTENT,path) ;
  44. return "success" ;
  45. }
  46. /**
  47. * 节点设置数据
  48. * @param path
  49. * @param nodeData
  50. * @return
  51. */
  52. @GetMapping("/setNodeData")
  53. public String setNodeData (String path, String nodeData) {
  54. zookeeperService.setNodeData(path,nodeData) ;
  55. return "success" ;
  56. }
  57. /**
  58. * 创建节点并保存数据
  59. * @param path
  60. * @param nodeData
  61. * @return
  62. */
  63. @GetMapping("/createNodeAndData")
  64. public String createNodeAndData ( String path , String nodeData){
  65. zookeeperService.createNodeAndData(PERSISTENT,path,nodeData) ;
  66. return "success" ;
  67. }
  68. /**
  69. *获取节点下数据
  70. * @param path
  71. * @return
  72. */
  73. @GetMapping("/getNodeChild")
  74. public List<String> getNodeChild (String path) {
  75. return zookeeperService.getNodeChild(path) ;
  76. }
  77. /**
  78. * 是否递归删除节点
  79. * @param path
  80. * @param recursive
  81. * @return
  82. */
  83. @GetMapping("/deleteNode")
  84. public String deleteNode (String path,Boolean recursive) {
  85. zookeeperService.deleteNode(path,recursive) ;
  86. return "success" ;
  87. }
  88. }

接口测试

http://127.0.0.1:8088/createNode?path=/llx/222

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/53967
推荐阅读
相关标签
  

闽ICP备14008679号