当前位置:   article > 正文

Springboot集成zookeeper

springboot集成zookeeper

Springboot集成zookeeper

实现功能:zookeeper节点的增删改查、节点监听、分布式读写锁、分布式计数器

添加依赖

  1. <properties>
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3. <java.version>1.8</java.version>
  4. <zookeeper.version>3.4.8</zookeeper.version>
  5. <curator.version>2.11.1</curator.version>
  6. </properties>
  7. <dependencies>
  8. <dependency>
  9. <groupId>org.apache.zookeeper</groupId>
  10. <artifactId>zookeeper</artifactId>
  11. <version>${zookeeper.version}</version>
  12. <exclusions>
  13. <exclusion>
  14. <groupId>org.slf4j</groupId>
  15. <artifactId>slf4j-log4j12</artifactId>
  16. </exclusion>
  17. <exclusion>
  18. <groupId>org.slf4j</groupId>
  19. <artifactId>slf4j-api</artifactId>
  20. </exclusion>
  21. </exclusions>
  22. </dependency>
  23. <dependency>
  24. <groupId>org.apache.curator</groupId>
  25. <artifactId>curator-recipes</artifactId>
  26. <version>${curator.version}</version>
  27. </dependency>
  28. </dependencies>

ZkClient(curator)

这里使用的是curator,curator是对zookeeper的简单封装,提供了一些集成的方法,或者是提供了更优雅的api
 

  1. /**
  2. * zookeeper客户端
  3. */
  4. @Data
  5. @Slf4j
  6. public class ZkClient {
  7. private final Logger logger = LoggerFactory.getLogger(this.getClass());
  8. private CuratorFramework client;
  9. public TreeCache cache;
  10. private ZookeeperProperties zookeeperProperties;
  11. public ZkClient(ZookeeperProperties zookeeperProperties){
  12. this.zookeeperProperties = zookeeperProperties;
  13. }
  14. /**
  15. * 初始化zookeeper客户端
  16. */
  17. public void init() {
  18. try{
  19. RetryPolicy retryPolicy = new ExponentialBackoffRetry(zookeeperProperties.getBaseSleepTimeMs(),
  20. zookeeperProperties.getMaxRetries());
  21. Builder builder = CuratorFrameworkFactory.builder()
  22. .connectString(zookeeperProperties.getServer()).retryPolicy(retryPolicy)
  23. .sessionTimeoutMs( zookeeperProperties.getSessionTimeoutMs())
  24. .connectionTimeoutMs( zookeeperProperties.getConnectionTimeoutMs())
  25. .namespace( zookeeperProperties.getNamespace());
  26. if(StringUtils.isNotEmpty( zookeeperProperties.getDigest())){
  27. builder.authorization("digest", zookeeperProperties.getDigest().getBytes("UTF-8"));
  28. builder.aclProvider(new ACLProvider() {
  29. @Override
  30. public List<ACL> getDefaultAcl() {
  31. return ZooDefs.Ids.CREATOR_ALL_ACL;
  32. }
  33. @Override
  34. public List<ACL> getAclForPath(final String path) {
  35. return ZooDefs.Ids.CREATOR_ALL_ACL;
  36. }
  37. });
  38. }
  39. client = builder.build();
  40. client.start();
  41. initLocalCache("/test");
  42. // addConnectionStateListener();
  43. client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
  44. public void stateChanged(CuratorFramework client, ConnectionState state) {
  45. if (state == ConnectionState.LOST) {
  46. //连接丢失
  47. logger.info("lost session with zookeeper");
  48. } else if (state == ConnectionState.CONNECTED) {
  49. //连接新建
  50. logger.info("connected with zookeeper");
  51. } else if (state == ConnectionState.RECONNECTED) {
  52. logger.info("reconnected with zookeeper");
  53. }
  54. }
  55. });
  56. }catch(Exception e){
  57. e.printStackTrace();
  58. }
  59. }
  60. /**
  61. * 初始化本地缓存
  62. * @param watchRootPath
  63. * @throws Exception
  64. */
  65. private void initLocalCache(String watchRootPath) throws Exception {
  66. cache = new TreeCache(client, watchRootPath);
  67. TreeCacheListener listener = (client1, event) ->{
  68. log.info("event:" + event.getType() +
  69. " |path:" + (null != event.getData() ? event.getData().getPath() : null));
  70. if(event.getData()!=null && event.getData().getData()!=null){
  71. log.info("发生变化的节点内容为:" + new String(event.getData().getData()));
  72. }
  73. // client1.getData().
  74. };
  75. cache.getListenable().addListener(listener);
  76. cache.start();
  77. }
  78. public void stop() {
  79. client.close();
  80. }
  81. public CuratorFramework getClient() {
  82. return client;
  83. }
  84. /**
  85. * 创建节点
  86. * @param mode 节点类型
  87. * 1、PERSISTENT 持久化目录节点,存储的数据不会丢失。
  88. * 2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失
  89. * 3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除
  90. *4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。
  91. * @param path 节点名称
  92. * &
本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/article/detail/53983
推荐阅读
相关标签
  

闽ICP备14008679号