当前位置:   article > 正文

Zookeeper-基础_zk连接的properties

zk连接的properties

1. 简介

zookeeper是一个开源的分布式协调服务, 提供分布式数据一致性解决方案,分布式应用程序可以实现数据统一配置管理、统一命名服务、分布式锁、集群管理等功能.

ZooKeeper主要服务于分布式系统,使用分布式系统就无法避免对节点管理的问题(需要实时感知节点的状态、对节点进行统一管理等等),而由于这些问题处理起来可能相对麻烦和提高了系统的复杂性,ZooKeeper作为一个能够通用解决这些问题的中间件就应运而生了。

2. 数据模型

2.1 模型结构

ZooKeeper 提供的命名空间很像标准文件系统的命名空间。名称是由斜杠 (/) 分隔的一系列路径元素。ZooKeeper 命名空间中的每个节点都由路径标识。

与典型的为存储而设计的文件系统不同,ZooKeeper 数据保存在内存中,这意味着 ZooKeeper 可以实现高吞吐量和低延迟。

2.2 模型的特点

  • 每个子目录如/app1都被称作一个znode(节点)。这个znode是被它所在的路径唯一标识。
  • znode可以有子节点目录,并且每个znode可以存储数据。
  • znode是有版本的,每个znode中存储的数据可以有多个版本,也就是一个访问路径中可以存储多份数据。每次 znode 的数据更改时,版本号都会增加。例如,每当客户端检索数据时,它也会收到数据的版本。
  • 每个znode都有一个访问控制列表 (ACL),它限制了谁可以做什么。
  • znode可以被监控,包括这个目录中存储的数据的修改,子节点目录变化等,一旦变化可以通知设置监控的客户端。

⚠️注意:当使用zkCli.sh 创建会话时,节点的监听事件只能被触发一次。

2.3 节点分类

2.3.1 Persistent

持久节点:节点被创建后,就一直存在,除非客户端主动删除这个节点。

2.3.2 Persistent Sequential

持久顺序节点:有序的持久节点。在zk中,每个父节点会为它的第一级子节点维护一份时序,会记录每个子节点创建的先后顺序。例如:

 

$ create -s /app1 # 创建 /app1 节点

Created /app10000000000 # 创建成功后的节点名称为 /app10000000000

2.3.3 Ephemeral

临时节点:和持久节点不同的是,临时节点的生命周期和客户端会话绑定且临时节点下面不能创建子节点。如果客户端会话失效,那么这个节点就会自动被清除掉。

注意:客户端失效临时节点会被清除,但如果是断开链接,临时节点并不会立马被清除。

“立马”:在会话超时持续时间内未从客户端收到任何心跳信号之后,zk服务器将删除该会话的临时节点。但如果正常关闭会话,临时节点会立马被清除。

 
  1. # 1. 创建会话
  2. $ ./zkCli.sh
  3. # 2. 创建临时节点 /app3
  4. $ create -e /app3
  5. Created /app3
  6. # 3. ctrl + c 关闭会话
  7. # 4. 紧接着再次创建会话
  8. $ ./zkCli.sh
  9. # 5. 查看节点内容 发现临时节点依旧存在
  10. $ ls /
  11. [app10000000000, app3, zookeeper]
  12. # 6. 等待几秒,再次查看 发现临时节点消失
  13. $ ls /
  14. [app10000000000, zookeeper]
  15. # 7. 再次创建临时节点 /app3
  16. $ create -e /app3
  17. Created /app3
  18. # 8. 正常关闭会话
  19. $ quit
  20. # 9. 再次创建会话 临时节点消失
  21. $ ./zkCli.sh
  22. $ ls /
  23. [app10000000000, zookeeper]

2.3.4 Ephemeral Sequential

临时顺序节点:有序的临时节点。创建es节点时,zk会维护一份时序,会记录每个节点的顺序。例如:

  1. # 1. 创建会话
  2. $ ./zkCli.sh
  3. # 2. 创建临时节点 /app3
  4. $ create -e /app3
  5. Created /app3
  6. # 3. ctrl + c 关闭会话
  7. # 4. 紧接着再次创建会话
  8. $ ./zkCli.sh
  9. # 5. 查看节点内容 发现临时节点依旧存在
  10. $ ls /
  11. [app10000000000, app3, zookeeper]
  12. # 6. 等待几秒,再次查看 发现临时节点消失
  13. $ ls /
  14. [app10000000000, zookeeper]
  15. # 7. 再次创建临时节点 /app3
  16. $ create -e /app3
  17. Created /app3
  18. # 8. 正常关闭会话
  19. $ quit
  20. # 9. 再次创建会话 临时节点消失
  21. $ ./zkCli.sh
  22. $ ls /
  23. [app10000000000, zookeeper]

$ create -s -e /app2 # 创建临时有序节点 /app2

Created /app20000000001 # 创建成功后的节点名称为 app20000000001

3. 安装

3.1 官方

官方地址:Apache ZooKeeper

3.2 docker

$ docker run --name zookeeper --restart always -d zookeeper

3.2 docker-compose

⚠️注意:执行脚本前,需先将配置文件挂载到宿主机上。

  1. version: '3.1'
  2. services:
  3. zk:
  4. image: zookeeper
  5. restart: always
  6. container_name: zookeeper
  7. ports:
  8. - 2181:2181
  9. volumes:
  10. - ./data:/data
  11. - ./logs:/datalog
  12. - ./conf/zoo.cfg:/conf/zoo.cfg

3.3 配置信息

  1. # 数据存储目录
  2. dataDir=/data
  3. # 日志存储目录
  4. dataLogDir=/datalog
  5. # 集群模式下 节点之间的心跳时间2s(每2s进行一次心跳检测)
  6. tickTime=2000
  7. # 集群初始化时 节点之间同步次数。(5 * 2s = 10s 10s内未初始化成功则初始化失败)
  8. initLimit=5
  9. # 集群模式下 同步数据次数。 (2 * 2s = 4s 4s内未同步则超时失败)
  10. syncLimit=2
  11. # 数据快照保留个数
  12. autopurge.snapRetainCount=3
  13. # 数据快照清除时间间隔(单位为小时) 0:不清除 ,1:1小时
  14. autopurge.purgeInterval=0
  15. # 最大的客户端链接数
  16. maxClientCnxns=60

4. 基础命令

不同版本之前的命令会有所差异,本章是基于zk:3.7版本。官方地址

4.1 创建会话

首先执行命令,创建新的会话,进入终端。

  1. # 进入到zk的安装目录的bin目录下执行
  2. $ ./zkCli.sh
  3. # 如果zk不是在本机 则可以使用server参数指定链接地址
  4. $ ./zkCli.sh -server 127.0.0.1:2181

4.2 ls

ls [-s] [-w] [-R] path:ls 命令用于查看某个路径下目录列表。

  • -s:查看此znode状态信息。
  • -w:监听此znode目录变化。
  • -R:递归查看。
  1. $ ls / # 查看根目录
  2. [app, app10000000000, zookeeper]
  3. $ ls /app
  4. []
  5. $ ls -s /app
  6. [app01, app02]
  7. cZxid = 0x1b
  8. ctime = Sun Aug 29 13:07:24 UTC 2021
  9. mZxid = 0x1b
  10. mtime = Sun Aug 29 13:07:24 UTC 2021
  11. pZxid = 0x2e
  12. cversion = 2
  13. dataVersion = 0
  14. aclVersion = 0
  15. ephemeralOwner = 0x0
  16. dataLength = 0
  17. numChildren = 2

监听znode目录变化:

4.3 create

create [-s] [-e] [-c] [-t ttl] path [data] [acl]:create 用于创建znode。

  • -s:创建顺序节点。

  • -e:创建临时节点。

  • -c:创建一个容器节点,当容器中的最后一个子节点被删除时,容器也随之消失。

  • -t:设置节点存活时间(毫秒)。

    ⚠️注意:如果要设置超时时间需在配置文件中激活配置:zookeeper.extendedTypesEnabled=true

  • data:设置的数据。

  • acl:访问控制配置。

  1. $ ls /
  2. [app, app10000000000, zookeeper]
  3. # 创建持久节点 /app3
  4. $ create /app3 zhangtieniu
  5. Created /app3
  6. # 创建有序的持久节点 /app4
  7. $ create -s /app4 zhangsan
  8. Created /app40000000011
  9. # 创建临时节点 /app5
  10. $ create -e /app5 linshi
  11. Created /app5
  12. # 创建临时有序节点 /app6
  13. $ create -s -e /app6 linshiyouxu
  14. Created /app60000000013
  15. $ ls /
  16. [app, app10000000000, app3, app40000000011, app5, app60000000013, zookeeper]

4.4 get

get [-s] [-w] path: 命令用于获取节点数据和状态信息。

  • -s:查看此znode状态信息。
  • -w:监听此znode值变化。
$ get /app3 # 查看/app3 znode内容 
zhangtieniu

监听znod值变化:

4.5 stat

stat [-w] path:命令用于查看节点状态信息。

  • -w:监听节点状态变化。
  1. $ stat /app3
  2. cZxid = 0x34 # 创建节点时的事务id
  3. ctime = Sun Aug 29 14:31:52 UTC 2021 # 创建时间
  4. mZxid = 0x34 # 创建的版本号
  5. mtime = Sun Aug 29 14:31:52 UTC 2021 # 修改时间
  6. pZxid = 0x34 # 子节点列表最后一次被修改的事务id
  7. cversion = 0 # 节点版本号
  8. dataVersion = 0 # 数据版本号
  9. aclVersion = 0 # 权限版本号
  10. ephemeralOwner = 0x0 # 临时节点sessionid,当前节点为持久节点时,默认为“0x0”
  11. dataLength = 11 # 节点值数据长度
  12. numChildren = 0 # 子节点数量

4.6 set

set [-s] [-v version] path data: 命令用于修改节点存储的数据。

  • -s:查看设置成功后的状态信息。
  • -v:指定设置值的版本号,该值只能为该节点的最新版本。可以使用其实现乐观锁。
  1. $ set /app wangmazi
  2. $ set -s /app lisi
  3. cZxid = 0x1b
  4. ctime = Sun Aug 29 13:07:24 UTC 2021
  5. mZxid = 0x43
  6. mtime = Sun Aug 29 14:50:52 UTC 2021
  7. pZxid = 0x3f
  8. cversion = 4
  9. dataVersion = 6
  10. aclVersion = 0
  11. ephemeralOwner = 0x0
  12. dataLength = 4
  13. numChildren = 4

4.7 delete

delete [-v version] path:删除指定节点。

⚠️注意:当删除的节点有子节点时,不能使用该命令。需使用deleteall命令删除。

  • -v:删除指定版本。与set同理
$ delete /app3

4.8 quit

关闭会话。

5. 节点的监听机制

zk的监听机制,可以使客户端可以监听znode节点的变化,znode节点的变化出发相应的事件,然后清除对该节点的检测。

⚠️注意:在这里再强调一次,当使用zkCli.sh 创建会话时,对znode的监听只能触发一次。但在使用java客户端链接时,可以一直触发。

$ ls -w /path # 监听节点目录的变化 
$ get -w /path # 监听节点数据的变化

6. quick start

6.1 项目结构

 
  1. .
  2. ├── pom.xml
  3. └── src
  4. └── main
  5. ├── java
  6. │ └── com
  7. │ └── ldx
  8. │ └── zookeeper
  9. │ ├── ZookeeperApplication.java # 启动类
  10. │ ├── config
  11. │ │ ├── CuratorClientConfig.java # zk配置类
  12. │ │ └── CuratorClientProperties.java # zk 配置属性文件
  13. │ ├── controller
  14. │ │ └── AppController.java # zk 测试文件
  15. │ └── util
  16. │ └── CuratorClient.java # zk工具类
  17. └── resources
  18. └── application.yaml # 服务配置文件

6.2 引入依赖

Curator 是 Netflix 公司开源的一套 zookeeper 客户端框架,解决了很多 Zookeeper 客户端非常底层的细节开发工作,包括连接重连、反复注册 Watcher 和 NodeExistsException 异常等。

curator-recipes:封装了一些高级特性,如:Cache 事件监听、选举、分布式锁、分布式计数器、分布式 Barrier 等。

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.5.4</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>com.ldx</groupId>
  12. <artifactId>zookeeper</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>zookeeper</name>
  15. <description>Demo project for Spring Boot</description>
  16. <properties>
  17. <java.version>1.8</java.version>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-web</artifactId>
  23. </dependency>
  24. <!-- client 操作工具包 -->
  25. <dependency>
  26. <groupId>org.apache.curator</groupId>
  27. <artifactId>curator-recipes</artifactId>
  28. <version>5.2.0</version>
  29. </dependency>
  30. <dependency>
  31. <groupId>org.springframework.boot</groupId>
  32. <artifactId>spring-boot-configuration-processor</artifactId>
  33. </dependency>
  34. <dependency>
  35. <groupId>commons-lang</groupId>
  36. <artifactId>commons-lang</artifactId>
  37. <version>2.6</version>
  38. </dependency>
  39. <dependency>
  40. <groupId>org.projectlombok</groupId>
  41. <artifactId>lombok</artifactId>
  42. <optional>true</optional>
  43. </dependency>
  44. <dependency>
  45. <groupId>org.springframework.boot</groupId>
  46. <artifactId>spring-boot-starter-test</artifactId>
  47. <scope>test</scope>
  48. </dependency>
  49. </dependencies>
  50. <build>
  51. <plugins>
  52. <plugin>
  53. <groupId>org.springframework.boot</groupId>
  54. <artifactId>spring-boot-maven-plugin</artifactId>
  55. <configuration>
  56. <excludes>
  57. <exclude>
  58. <groupId>org.projectlombok</groupId>
  59. <artifactId>lombok</artifactId>
  60. </exclude>
  61. </excludes>
  62. </configuration>
  63. </plugin>
  64. </plugins>
  65. </build>
  66. </project>

6.3 application.yaml

  1. server:
  2. port: 8080
  3. # curator配置
  4. curator-client:
  5. # 连接字符串
  6. connection-string: localhost:2181
  7. # 根节点
  8. namespace: ldx
  9. # 节点数据编码
  10. charset: utf8
  11. # session超时时间
  12. session-timeout-ms: 60000
  13. # 连接超时时间
  14. connection-timeout-ms: 15000
  15. # 关闭连接超时时间
  16. max-close-wait-ms: 1000
  17. # 默认数据
  18. default-data: ""
  19. # 当半数以上zookeeper服务出现故障仍然提供读服务
  20. can-be-read-only: false
  21. # 自动创建父节点
  22. use-container-parents-if-available: true
  23. # 重试策略,默认使用BoundedExponentialBackoffRetry
  24. retry:
  25. max-sleep-time-ms: 10000
  26. base-sleep-time-ms: 1000
  27. max-retries: 3
  28. # 认证信息
  29. #auth:
  30. #scheme: digest
  31. # auth: username:password

6.4 CuratorClientProperties

  1. package com.ldx.zookeeper.config;
  2. import lombok.Data;
  3. import org.springframework.boot.context.properties.ConfigurationProperties;
  4. /**
  5. * zk 属性配置类
  6. *
  7. * @author ludangxin
  8. * @date 2021/8/31
  9. */
  10. @Data
  11. @ConfigurationProperties(prefix = "curator-client")
  12. public class CuratorClientProperties {
  13. /**
  14. * 连接地址
  15. */
  16. private String connectionString;
  17. /**
  18. * 命名空间
  19. */
  20. private String namespace;
  21. /**
  22. * 字符集
  23. */
  24. private String charset = "utf8";
  25. /**
  26. * 会话超时时间 毫秒
  27. */
  28. private int sessionTimeoutMs = 60000;
  29. /**
  30. * 连接超时时间 毫秒
  31. */
  32. private int connectionTimeoutMs = 15000;
  33. /**
  34. * 最大关闭等待时间 毫秒
  35. */
  36. private int maxCloseWaitMs = 1000;
  37. /**
  38. * 默认数据
  39. */
  40. private String defaultData = "";
  41. /**
  42. * 当半数以上zookeeper服务出现故障仍然提供读服务
  43. */
  44. private boolean canBeReadOnly = false;
  45. /**
  46. * 自动创建父节点
  47. */
  48. private boolean useContainerParentsIfAvailable = true;
  49. /**
  50. * 线程池名称
  51. */
  52. private String threadFactoryClassName;
  53. private Retry retry = new Retry();
  54. private Auth auth = new Auth();
  55. @Data
  56. public static class Retry {
  57. private int maxSleepTimeMs = 10000;
  58. private int baseSleepTimeMs = 1000;
  59. private int maxRetries = 3;
  60. }
  61. @Data
  62. public static class Auth {
  63. private String scheme = "digest";
  64. private String auth;
  65. }
  66. }

6.5 CuratorClientConfig

  1. package com.ldx.zookeeper.config;
  2. import com.ldx.zookeeper.util.CuratorClient;
  3. import lombok.SneakyThrows;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.commons.lang.StringUtils;
  6. import org.apache.curator.RetryPolicy;
  7. import org.apache.curator.ensemble.EnsembleProvider;
  8. import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
  9. import org.apache.curator.framework.CuratorFrameworkFactory;
  10. import org.apache.curator.framework.api.ACLProvider;
  11. import org.apache.curator.framework.api.CompressionProvider;
  12. import org.apache.curator.framework.imps.GzipCompressionProvider;
  13. import org.apache.curator.retry.BoundedExponentialBackoffRetry;
  14. import org.apache.curator.utils.DefaultZookeeperFactory;
  15. import org.apache.curator.utils.ZookeeperFactory;
  16. import org.apache.zookeeper.ZooDefs;
  17. import org.apache.zookeeper.data.ACL;
  18. import org.springframework.boot.context.properties.EnableConfigurationProperties;
  19. import org.springframework.context.annotation.Bean;
  20. import org.springframework.context.annotation.Configuration;
  21. import java.nio.charset.Charset;
  22. import java.util.List;
  23. import java.util.concurrent.ThreadFactory;
  24. /**
  25. * zk 配置类
  26. *
  27. * @author ludangxin
  28. * @date 2021/8/31
  29. */
  30. @Slf4j
  31. @Configuration
  32. @EnableConfigurationProperties(CuratorClientProperties.class)
  33. public class CuratorClientConfig {
  34. @Bean
  35. public EnsembleProvider ensembleProvider(CuratorClientProperties curatorClientProperties) {
  36. return new FixedEnsembleProvider(curatorClientProperties.getConnectionString());
  37. }
  38. @Bean
  39. public RetryPolicy retryPolicy(CuratorClientProperties curatorClientProperties) {
  40. CuratorClientProperties.Retry retry = curatorClientProperties.getRetry();
  41. return new BoundedExponentialBackoffRetry(retry.getBaseSleepTimeMs(), retry.getMaxSleepTimeMs(), retry.getMaxRetries());
  42. }
  43. @Bean
  44. public CompressionProvider compressionProvider() {
  45. return new GzipCompressionProvider();
  46. }
  47. @Bean
  48. public ZookeeperFactory zookeeperFactory() {
  49. return new DefaultZookeeperFactory();
  50. }
  51. @Bean
  52. public ACLProvider aclProvider() {
  53. return new ACLProvider() {
  54. @Override
  55. public List<ACL> getDefaultAcl() {
  56. return ZooDefs.Ids.CREATOR_ALL_ACL;
  57. }
  58. @Override
  59. public List<ACL> getAclForPath(String path) {
  60. return ZooDefs.Ids.CREATOR_ALL_ACL;
  61. }
  62. };
  63. }
  64. @Bean
  65. @SneakyThrows
  66. public CuratorFrameworkFactory.Builder builder(EnsembleProvider ensembleProvider,
  67. RetryPolicy retryPolicy,
  68. CompressionProvider compressionProvider,
  69. ZookeeperFactory zookeeperFactory,
  70. ACLProvider aclProvider,
  71. CuratorClientProperties curatorClientProperties) {
  72. String charset = curatorClientProperties.getCharset();
  73. CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
  74. .ensembleProvider(ensembleProvider)
  75. .retryPolicy(retryPolicy)
  76. .compressionProvider(compressionProvider)
  77. .zookeeperFactory(zookeeperFactory)
  78. .namespace(curatorClientProperties.getNamespace())
  79. .sessionTimeoutMs(curatorClientProperties.getSessionTimeoutMs())
  80. .connectionTimeoutMs(curatorClientProperties.getConnectionTimeoutMs())
  81. .maxCloseWaitMs(curatorClientProperties.getMaxCloseWaitMs())
  82. .defaultData(curatorClientProperties.getDefaultData().getBytes(Charset.forName(charset)))
  83. .canBeReadOnly(curatorClientProperties.isCanBeReadOnly());
  84. if (!curatorClientProperties.isUseContainerParentsIfAvailable()) {
  85. builder.dontUseContainerParents();
  86. }
  87. CuratorClientProperties.Auth auth = curatorClientProperties.getAuth();
  88. if (StringUtils.isNotBlank(auth.getAuth())) {
  89. builder.authorization(auth.getScheme(), auth.getAuth().getBytes(Charset.forName(charset)));
  90. builder.aclProvider(aclProvider);
  91. }
  92. String threadFactoryClassName = curatorClientProperties.getThreadFactoryClassName();
  93. if (StringUtils.isNotBlank(threadFactoryClassName)) {
  94. try {
  95. Class<?> cls = Class.forName(threadFactoryClassName);
  96. ThreadFactory threadFactory = (ThreadFactory) cls.newInstance();
  97. builder.threadFactory(threadFactory);
  98. } catch (Exception e) {
  99. log.error("init CuratorClient error", e);
  100. }
  101. }
  102. return builder;
  103. }
  104. @Bean(initMethod = "init", destroyMethod = "stop")
  105. public CuratorClient curatorClient(CuratorFrameworkFactory.Builder builder) {
  106. return new CuratorClient(builder);
  107. }
  108. }

6.6 CuratorClient

  1. package com.ldx.zookeeper.util;
  2. import lombok.SneakyThrows;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.curator.framework.CuratorFramework;
  5. import org.apache.curator.framework.CuratorFrameworkFactory;
  6. import org.apache.curator.framework.recipes.cache.CuratorCache;
  7. import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
  8. import org.apache.curator.framework.recipes.locks.*;
  9. import org.apache.curator.framework.state.ConnectionState;
  10. import org.apache.zookeeper.CreateMode;
  11. import java.nio.charset.Charset;
  12. import java.util.List;
  13. import java.util.Objects;
  14. import java.util.concurrent.Executor;
  15. import java.util.concurrent.TimeUnit;
  16. /**
  17. * zookeeper工具类
  18. *
  19. * @author ludangxin
  20. * @date 2021/8/30
  21. */
  22. @Slf4j
  23. public class CuratorClient {
  24. /**
  25. * 默认的字符编码集
  26. */
  27. private static final String DEFAULT_CHARSET = "utf8";
  28. /**
  29. * 客户端
  30. */
  31. private final CuratorFramework client;
  32. /**
  33. * 字符集
  34. */
  35. private String charset = DEFAULT_CHARSET;
  36. @SneakyThrows
  37. public CuratorClient(CuratorFrameworkFactory.Builder builder) {
  38. client = builder.build();
  39. }
  40. @SneakyThrows
  41. public CuratorClient(CuratorFrameworkFactory.Builder builder, String charset) {
  42. client = builder.build();
  43. this.charset = charset;
  44. }
  45. public void init() {
  46. client.start();
  47. client.getConnectionStateListenable().addListener((client, state) -> {
  48. if (state==ConnectionState.LOST) {
  49. // 连接丢失
  50. log.info("lost session with zookeeper");
  51. } else if (state==ConnectionState.CONNECTED) {
  52. // 连接新建
  53. log.info("connected with zookeeper");
  54. } else if (state==ConnectionState.RECONNECTED) {
  55. // 重新连接
  56. log.info("reconnected with zookeeper");
  57. }
  58. });
  59. }
  60. /**
  61. * 关闭会话
  62. */
  63. public void stop() {
  64. log.info("zookeeper session close");
  65. client.close();
  66. }
  67. /**
  68. * 创建节点
  69. *
  70. * @param mode 节点类型
  71. * 1、PERSISTENT 持久化目录节点,存储的数据不会丢失。
  72. * 2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失
  73. * 3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除
  74. * 4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已经存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。
  75. * @param path 节点名称
  76. * @param nodeData 节点数据
  77. */
  78. @SneakyThrows
  79. public void createNode(CreateMode mode, String path, String nodeData) {
  80. // 使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点
  81. client.create().creatingParentsIfNeeded().withMode(mode).forPath(path, nodeData.getBytes(Charset.forName(charset)));
  82. }
  83. /**
  84. * 创建节点
  85. *
  86. * @param mode 节点类型
  87. * 1、PERSISTENT 持久化目录节点,存储的数据不会丢失。
  88. * 2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失
  89. * 3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除
  90. * 4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已经存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。
  91. * @param path 节点名称
  92. */
  93. @SneakyThrows
  94. public void createNode(CreateMode mode, String path) {
  95. // 使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点
  96. client.create().creatingParentsIfNeeded().withMode(mode).forPath(path);
  97. }
  98. /**
  99. * 删除节点数据
  100. *
  101. * @param path 节点名称
  102. */
  103. @SneakyThrows
  104. public void deleteNode(final String path) {
  105. deleteNode(path, true);
  106. }
  107. /**
  108. * 删除节点数据
  109. *
  110. * @param path 节点名称
  111. * @param deleteChildre 是否删除子节点
  112. */
  113. @SneakyThrows
  114. public void deleteNode(final String path, Boolean deleteChildre) {
  115. if (deleteChildre) {
  116. // guaranteed()删除一个节点,强制保证删除,
  117. // 只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到删除节点成功
  118. client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
  119. } else {
  120. client.delete().guaranteed().forPath(path);
  121. }
  122. }
  123. /**
  124. * 设置指定节点的数据
  125. *
  126. * @param path 节点名称
  127. * @param data 节点数据
  128. */
  129. @SneakyThrows
  130. public void setNodeData(String path, String data) {
  131. client.setData().forPath(path, data.getBytes(Charset.forName(charset)));
  132. }
  133. /**
  134. * 获取指定节点的数据
  135. *
  136. * @param path 节点名称
  137. * @return 节点数据
  138. */
  139. @SneakyThrows
  140. public String getNodeData(String path) {
  141. return new String(client.getData().forPath(path), Charset.forName(charset));
  142. }
  143. /**
  144. * 获取数据时先同步
  145. *
  146. * @param path 节点名称
  147. * @return 节点数据
  148. */
  149. public String synNodeData(String path) {
  150. client.sync();
  151. return getNodeData(path);
  152. }
  153. /**
  154. * 判断节点是否存在
  155. *
  156. * @param path 节点名称
  157. * @return true 节点存在,false 节点不存在
  158. */
  159. @SneakyThrows
  160. public boolean isExistNode(final String path) {
  161. client.sync();
  162. return Objects.nonNull(client.checkExists().forPath(path));
  163. }
  164. /**
  165. * 获取节点的子节点
  166. *
  167. * @param path 节点名称
  168. * @return 子节点集合
  169. */
  170. @SneakyThrows
  171. public List<String> getChildren(String path) {
  172. return client.getChildren().forPath(path);
  173. }
  174. /**
  175. * 创建排他锁
  176. *
  177. * @param path 节点名称
  178. * @return 排他锁
  179. */
  180. public InterProcessSemaphoreMutex getSemaphoreMutexLock(String path) {
  181. return new InterProcessSemaphoreMutex(client, path);
  182. }
  183. /**
  184. * 创建可重入排他锁
  185. *
  186. * @param path 节点名称
  187. * @return 可重入排他锁
  188. */
  189. public InterProcessMutex getMutexLock(String path) {
  190. return new InterProcessMutex(client, path);
  191. }
  192. /**
  193. * 创建一组可重入排他锁
  194. *
  195. * @param paths 节点名称集合
  196. * @return 锁容器
  197. */
  198. public InterProcessMultiLock getMultiMutexLock(List<String> paths) {
  199. return new InterProcessMultiLock(client, paths);
  200. }
  201. /**
  202. * 创建一组任意类型的锁
  203. *
  204. * @param locks 锁集合
  205. * @return 锁容器
  206. */
  207. public InterProcessMultiLock getMultiLock(List<InterProcessLock> locks) {
  208. return new InterProcessMultiLock(locks);
  209. }
  210. /**
  211. * 加锁
  212. *
  213. * @param lock 分布式锁对象
  214. */
  215. @SneakyThrows
  216. public void acquire(InterProcessLock lock) {
  217. lock.acquire();
  218. }
  219. /**
  220. * 加锁
  221. *
  222. * @param lock 分布式锁对象
  223. * @param time 等待时间
  224. * @param unit 时间单位
  225. */
  226. @SneakyThrows
  227. public void acquire(InterProcessLock lock, long time, TimeUnit unit) {
  228. lock.acquire(time, unit);
  229. }
  230. /**
  231. * 释放锁
  232. *
  233. * @param lock 分布式锁对象
  234. */
  235. @SneakyThrows
  236. public void release(InterProcessLock lock) {
  237. lock.release();
  238. }
  239. /**
  240. * 检查是否当前jvm的线程获取了锁
  241. *
  242. * @param lock 分布式锁对象
  243. * @return true/false
  244. */
  245. public boolean isAcquiredInThisProcess(InterProcessLock lock) {
  246. return lock.isAcquiredInThisProcess();
  247. }
  248. /**
  249. * 获取读写锁
  250. *
  251. * @param path 节点名称
  252. * @return 读写锁
  253. */
  254. public InterProcessReadWriteLock getReadWriteLock(String path) {
  255. return new InterProcessReadWriteLock(client, path);
  256. }
  257. /**
  258. * 监听数据节点的变化情况
  259. *
  260. * @param path 节点名称
  261. * @param listener 监听器
  262. * @return 监听节点的TreeCache实例
  263. */
  264. @SneakyThrows
  265. public CuratorCache watch(String path, CuratorCacheListener listener) {
  266. CuratorCache curatorCache = CuratorCache.builder(client, path).build();
  267. curatorCache.listenable().addListener(listener);
  268. curatorCache.start();
  269. return curatorCache;
  270. }
  271. /**
  272. * 监听数据节点的变化情况
  273. *
  274. * @param path 节点名称
  275. * @param listener 监听器
  276. * @return 监听节点的TreeCache实例
  277. */
  278. public CuratorCache watch(String path, CuratorCacheListener listener, Executor executor) {
  279. CuratorCache curatorCache = CuratorCache.builder(client, path).build();
  280. curatorCache.listenable().addListener(listener, executor);
  281. curatorCache.start();
  282. return curatorCache;
  283. }
  284. /**
  285. * 取消监听节点
  286. *
  287. * @param path 节点名称
  288. * @param listener 监听器
  289. */
  290. public void unwatch(String path, CuratorCacheListener listener) {
  291. CuratorCache curatorCache = CuratorCache.builder(client, path).build();
  292. curatorCache.listenable().removeListener(listener);
  293. }
  294. }

6.7 AppController

  1. package com.ldx.zookeeper.controller;
  2. import com.fasterxml.jackson.core.JsonProcessingException;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.ldx.zookeeper.util.CuratorClient;
  5. import lombok.RequiredArgsConstructor;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
  8. import org.apache.zookeeper.CreateMode;
  9. import org.apache.zookeeper.data.Stat;
  10. import org.springframework.web.bind.annotation.*;
  11. /**
  12. * demo
  13. *
  14. * @author ludangxin
  15. * @date 2021/8/30
  16. */
  17. @Slf4j
  18. @RestController
  19. @RequestMapping("app")
  20. @RequiredArgsConstructor
  21. public class AppController {
  22. private final CuratorClient curatorClient;
  23. @GetMapping("{appName}")
  24. public String getData(@PathVariable String appName) {
  25. return curatorClient.getNodeData(setPrefix(appName));
  26. }
  27. @PostMapping("{appName}")
  28. public String addApp(@PathVariable String appName, @RequestParam String data) {
  29. curatorClient.createNode(CreateMode.PERSISTENT, setPrefix(appName), data);
  30. return "ok";
  31. }
  32. @PostMapping("{appName}/{childName}")
  33. public String addAppChild(@PathVariable String appName, @PathVariable String childName, @RequestParam String data) {
  34. curatorClient.createNode(CreateMode.PERSISTENT, setPrefix(appName).concat(setPrefix(childName)), data);
  35. return "ok";
  36. }
  37. @PutMapping("{appName}")
  38. public String setData(@PathVariable String appName, String data) {
  39. curatorClient.setNodeData(setPrefix(appName), data);
  40. return "ok";
  41. }
  42. @PutMapping("{appName}/{childName}")
  43. public String setData(@PathVariable String appName, @PathVariable String childName, String data) {
  44. curatorClient.setNodeData(setPrefix(appName).concat(setPrefix(childName)), data);
  45. return "ok";
  46. }
  47. @DeleteMapping("{appName}")
  48. public String delApp(@PathVariable String appName) {
  49. curatorClient.deleteNode(setPrefix(appName));
  50. return "ok";
  51. }
  52. @PostMapping("{appName}/watch/dir")
  53. public String watchAppDir(@PathVariable String appName) {
  54. CuratorCacheListener listener = CuratorCacheListener.builder().forDeletes(obj -> {
  55. String path = obj.getPath();
  56. String data = new String(obj.getData());
  57. Stat stat = obj.getStat();
  58. log.info("节点:{} 被删除,节点数据:{},节点状态:\\{version:{},createTime:{}\\}", path, data,
  59. stat.getVersion(), stat.getCtime());
  60. }).build();
  61. curatorClient.watch(setPrefix(appName), listener);
  62. return "ok";
  63. }
  64. @PostMapping("{appName}/watch/data")
  65. public String watchAppData(@PathVariable String appName) {
  66. ObjectMapper mapper = new ObjectMapper();
  67. CuratorCacheListener listener = CuratorCacheListener.builder().forChanges((oldNode, newNode) -> {
  68. try {
  69. String path = oldNode.getPath();
  70. log.info("节点:{} 被修改,修改前:{} ", path, mapper.writeValueAsString(oldNode));
  71. log.info("节点:{} 被修改,修改后:{} ", path, mapper.writeValueAsString(newNode));
  72. } catch(JsonProcessingException e) {
  73. e.printStackTrace();
  74. }
  75. }).build();
  76. curatorClient.watch(setPrefix(appName), listener);
  77. return "ok";
  78. }
  79. private String setPrefix(String appName) {
  80. String prefix = "/";
  81. if(!appName.startsWith(prefix)) {
  82. appName = prefix.concat(appName);
  83. }
  84. return appName;
  85. }
  86. }

7. ZAB 协议介绍

ZAB(ZooKeeper Atomic Broadcast 原子广播) 协议是为分布式协调服务 ZooKeeper 专门设计的一种支持崩溃恢复的原子广播协议。 在 ZooKeeper 中,主要依赖 ZAB 协议来实现分布式数据一致性,基于该协议,ZooKeeper 实现了一种主备模式的系统架构来保持集群中各个副本之间的数据一致性。

ZAB 协议两种基本的模式:崩溃恢复和消息广播

ZAB协议包括两种基本的模式,分别是 崩溃恢复和消息广播。当整个服务框架在启动过程中,或是当 Leader 服务器出现网络中断、崩溃退出与重启等异常情况时,ZAB 协议就会进人恢复模式并选举产生新的Leader服务器。当选举产生了新的 Leader 服务器,同时集群中已经有过半的机器与该Leader服务器完成了状态同步之后,ZAB协议就会退出恢复模式。其中,所谓的状态同步是指数据同步,用来保证集群中存在过半的机器能够和Leader服务器的数据状态保持一致

当集群中已经有过半的Follower服务器完成了和Leader服务器的状态同步,那么整个服务框架就可以进人消息广播模式了。 当一台同样遵守ZAB协议的服务器启动后加人到集群中时,如果此时集群中已经存在一个Leader服务器在负责进行消息广播,那么新加人的服务器就会自觉地进人数据恢复模式:找到Leader所在的服务器,并与其进行数据同步,然后一起参与到消息广播流程中去。正如上文介绍中所说的,ZooKeeper设计成只允许唯一的一个Leader服务器来进行事务请求的处理。Leader服务器在接收到客户端的事务请求后,会生成对应的事务提案并发起一轮广播协议;而如果集群中的其他机器接收到客户端的事务请求,那么这些非Leader服务器会首先将这个事务请求转发给Leader服务器。

代码:Zookeeper-基础-Java文档类资源-CSDN下载

转载自:Zookeeper-基础 - 张铁牛 - 博客园

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/933832
推荐阅读
相关标签
  

闽ICP备14008679号