当前位置:   article > 正文

SpringBoot整合Zookeeper客户端Curator_springboot整合curator

springboot整合curator

1. 引入依赖

  • 这里springboot使用 2.5.4 版本,zk使用 3.7.0 版本,curator使用 5.2.1 版本

在这里插入图片描述

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>zookeeper-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>zookeeper-demo</name>
    <description>zookeeper-demo</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        
        <!--需要和zookeeper服务端版本保持一致-->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>5.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.1</version>
        </dependency>

        <!-- Swagger2 -->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.76</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89

2. 配置文件

zookeeper.host=127.0.0.1:2181
  • 1

3. Zookeeper配置文件类

package com.example.zookeeperdemo.config;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ZookeeperConfig {

    @Value("${zookeeper.host}")
    private String host;

    @Bean
    public CuratorFramework curatorFramework() {
        CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
                .connectString(host)
                .sessionTimeoutMs(5000)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();
        curatorFramework.start();
        return curatorFramework;
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

4. swagger配置

package com.example.zookeeperdemo.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Configuration
@EnableSwagger2
public class SwaggerConfig {

    @Bean
    public Docket createRestApi() {
        return new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()).select()
                .apis(RequestHandlerSelectors.basePackage("com.example")).build();
    }

    private ApiInfo apiInfo() {
        return new ApiInfoBuilder().version("1.0").build();
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

5. 消息返回体

  • 用来包装返回数据
package com.example.zookeeperdemo.dto;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class AjaxResult<T> {

    private Integer code;

    private String message;

    private T data;

    public static <T> AjaxResult<T> ok(T data) {
        return new AjaxResult<>(0, "success", data);
    }

    public static <T> AjaxResult<T> error(T data) {
        return new AjaxResult<>(1, "error", data);
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

6. 创建订单service

  • 为后续测试分布式锁方法使用
package com.example.zookeeperdemo.service;

import org.springframework.stereotype.Component;

@Component
public class OrderService {

    public static Integer i = 0;

    public Integer createOrder() {
        return ++i;
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

7. 创建controller

  • 包括对zk节点的常用操作,创建、更新、删除、监听及分布式锁
package com.example.zookeeperdemo.controller;

import com.alibaba.fastjson.JSONObject;
import com.example.zookeeperdemo.dto.AjaxResult;
import com.example.zookeeperdemo.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

@Slf4j
@RequestMapping("/zk")
@RestController
public class ZkController {

    @Resource
    private CuratorFramework curatorFramework;

    @Resource
    private OrderService orderService;

    /**
     * 检查节点是否存在
     */
    @GetMapping("/isExistNode")
    public AjaxResult isExistNode(@RequestParam String nodeName) {
        try {
            return AjaxResult.ok(Objects.nonNull(curatorFramework.checkExists().forPath(nodeName)));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return AjaxResult.error(null);
    }

    /**
     * 创建节点
     */
    @GetMapping("/createNode")
    public AjaxResult createNode(@RequestParam String nodeName) {
        try {
            return AjaxResult.ok(curatorFramework.create()
                    // 如果有子节点会递归创建
                    .creatingParentsIfNeeded()
                    // 设置为持久节点
                    .withMode(CreateMode.PERSISTENT).forPath(nodeName));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return AjaxResult.error(null);
    }

    /**
     * 更新节点数据
     */
    @GetMapping("/setNodeValue")
    public AjaxResult setNodeValue(@RequestParam String nodeName, @RequestParam String nodeValue) {
        try {
            return AjaxResult.ok(curatorFramework.setData().forPath(nodeName, nodeValue.getBytes(StandardCharsets.UTF_8)));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return AjaxResult.error(null);
    }

    /**
     * 创建节点并设置数据
     */
    @GetMapping("/createNodeAndValue")
    public AjaxResult createNodeAndValue(@RequestParam String nodeName, @RequestParam String nodeValue) {
        try {
            return AjaxResult.ok(curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
                    .forPath(nodeName, nodeValue.getBytes(StandardCharsets.UTF_8)));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return AjaxResult.error(null);
    }

    /**
     * 查询节点数据
     */
    @GetMapping("/queryNodeValue")
    public AjaxResult queryNodeValue(@RequestParam String nodeName) {
        try {
            return AjaxResult.ok(new String(curatorFramework.getData().storingStatIn(new Stat()).forPath(nodeName)));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return AjaxResult.error(null);
    }

    /**
     * 查询节点下的所有子节点
     */
    @GetMapping("/queryNodeChild")
    public AjaxResult queryNodeChild(@RequestParam String nodeName) {
        try {
            return AjaxResult.ok(curatorFramework.getChildren().forPath(nodeName));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return AjaxResult.error(null);
    }

    /**
     * 异步执行
     */
    @GetMapping("/asyncCreateNode")
    public AjaxResult asyncCreateNode(@RequestParam String nodeName) {
        try {
            return AjaxResult.ok(curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
                    // 异步执行方法,可以设置回调方法
                    .inBackground((curatorFramework1, curatorEvent) ->
                            log.info(curatorEvent.getPath() + ":" + JSONObject.toJSONString(curatorEvent.getStat())))
                    .forPath(nodeName));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return AjaxResult.error(null);
    }

    /**
     * 递归删除
     */
    @GetMapping("/deleteNode")
    public AjaxResult deleteNode(@RequestParam String nodeName) {
        try {
            // guaranteed 用来保证如果删除失败,则会话有效期内一直尝试删除
            // deletingChildrenIfNeeded 表示如果有子节点,则递归删除
            return AjaxResult.ok(curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(nodeName));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return AjaxResult.error(null);
    }

    /**
     * 单次注册监听,返回当前节点的内容信息;使用Watcher对节点进行监听,但只能监听一次;被监听的节点需要存在,否则会提示节点不存在
     */
    @GetMapping("/watchNode")
    public AjaxResult watchNode(@RequestParam String nodeName) {
        try {
            byte[] bytes = curatorFramework.getData().usingWatcher((CuratorWatcher) watchedEvent -> log
                    .info(watchedEvent.getPath() + ":" + JSONObject.toJSONString(watchedEvent))).forPath(nodeName);
            return AjaxResult.ok(new String(bytes));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return AjaxResult.error(null);
    }

    /**
     * 单次监听异步操作,即调用inBackground()方法会异步获得监听,同步操作则不会触发监听事件
     */
    @GetMapping("/asyncListener")
    public AjaxResult asyncListener(@RequestParam String nodeName) {
        try {
            CuratorListener curatorListener = new CuratorListener() {
                @Override
                public void eventReceived(CuratorFramework curatorFramework, CuratorEvent curatorEvent)
                        throws Exception {
                    log.info(curatorEvent.getPath() + ":" + JSONObject.toJSONString(curatorEvent));
                }
            };
            curatorFramework.getCuratorListenable().addListener(curatorListener);
            log.info(new String(curatorFramework.getData().forPath(nodeName)));
            log.info(JSONObject.toJSONString(curatorFramework.setData().forPath(nodeName, "123".getBytes())));
            // 异步操作
            curatorFramework.delete().inBackground().forPath(nodeName);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return AjaxResult.ok(null);
    }

    /**
     * 反复注册监听,Cache事件监听可以理解为一个本地缓存视图与远程Zookeeper视图的对比过程,可以监听节点的创建,修改和删除
     */
    @GetMapping("/nodeCache")
    public AjaxResult nodeCache(@RequestParam String nodeName) {
        CuratorCache curatorCache = CuratorCache.build(curatorFramework, nodeName);
        CuratorCacheListener listener = CuratorCacheListener.builder().forNodeCache(
                        () -> log.info("nodeCache监听 " + nodeName + ":" + new String(curatorFramework.getData().forPath(nodeName))))
                .build();
        curatorCache.listenable().addListener(listener);
        curatorCache.start();
        return AjaxResult.ok(null);
    }

    /**
     * 子节点监听
     */
    @GetMapping("/childNodeCache")
    public AjaxResult childNodeCache(@RequestParam String nodeName) {
        CuratorCache curatorCache = CuratorCache.build(curatorFramework, nodeName);
        CuratorCacheListener childNodeCache = CuratorCacheListener.builder()
                .forPathChildrenCache(nodeName, curatorFramework, (curatorFramework1, pathChildrenCacheEvent) -> log
                        .info("childNodeCache监听 " + JSONObject.toJSONString(pathChildrenCacheEvent.getType())))
                .build();
        curatorCache.listenable().addListener(childNodeCache);
        curatorCache.start();
        return AjaxResult.ok(null);
    }

    /**
     * 分布式锁
     */
    @GetMapping("/lock")
    public AjaxResult lock(@RequestParam Integer count) throws Exception {
        String nodeName = "/lock";
        if (Objects.isNull(curatorFramework.checkExists().forPath(nodeName))) {
            curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(nodeName);
        }
        InterProcessMutex lock = new InterProcessMutex(curatorFramework, nodeName);

        List<Integer> list = new ArrayList<>();
        for (int i = 0; i < count; i++) {
            list.add(i);
        }
        List<Integer> collect =
            list.parallelStream().map(integer -> createOrder(lock)).sorted().collect(Collectors.toList());
        for (Integer str : collect) {
            log.info(str.toString());
        }
        return AjaxResult.ok(null);
    }

    public Integer createOrder(InterProcessMutex lock) {
        try {
            lock.acquire();
            Integer order = orderService.createOrder();
            lock.release();
            return order;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258

8. 分布式锁使用

  • 分别使用InterProcessMutex的acquire和release方法,来获取和释放锁

  • 不使用分布式锁的结果,会有重复的值

在这里插入图片描述

  • 使用锁,调用100次,没有重复的值

在这里插入图片描述
在这里插入图片描述

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/article/detail/53961
推荐阅读
相关标签