当前位置:   article > 正文

springBoot集成zookeeper,常用api及实现分布式锁_分布式锁api

分布式锁api

开源项目:https://gitee.com/kekingcn/spring-boot-klock-starter  基于redis的分布锁

目录

一、安装zookeeper

1.下载

2.创建配置文件zoo.cfg

3.启动服务端

4.数据结构及基础命令

二、集成SpringBoot常用api及实现分布式锁

1.Curator的pom依赖

2.JAVA配置文件

3.Curator常用api

4.分布式锁

5.AOP实现分布式锁


 

一、安装zookeeper

1.下载

官网网站:https://zookeeper.apache.org/

国内镜像地址:https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

本文下载的3.5.5版本

2.创建配置文件zoo.cfg

zookeeper只需解压就可以使用,在启动之前进入conf目录,创建个文件,命名为zoo.cfg,内容如下:

  1. tickTime=2000
  2. dataDir= .. /data
  3. clientPort=2181
注:
    tickTime 是心跳时间,默认是2秒。指客户端按照一定频率发送心跳包到服务器端以维持网络套接字连接;
    dataDir 是zookepeer保存的内存快照,以及事务日志记录。
    clientPort 是指客户端连接的端口,及zookepeer的启动端口。
 

3.启动服务端

进入bin目录,运行zkServer:

./zkServer.sh start
运行客户端运行zkCli
./bin/zkCli.sh

4.数据结构及基础命令

zookeeper提供的命名(namespace)类似文件系统,每个节点都是通过路径来表示的,不同的是,节点可以包含一定的数据 (2MB字节),这些节点可以用来存放业务信息,如配置信息等。

4.1 查看目录:ls [-s] [-w] [-R] path

 ls /

4.2 创建节点:create [-s] [-e] [-c] [-t ttl] path [data] [acl]

-e  临时节点

-s  有序号的节点

如,在/节点下创建test临时节点,节点数据设置为data

create -e /test data

4.3 获取节点数据: get [-s] [-w] path

-s 详细信息

-w 设置watched监听

get -s -w /test
注:watch 操作, ls 命令和 get命 令都可以增加一次  watch 操作, 节点变 化的时 候会通知客户端。通知完毕后,还需要再次调用 ls 或者 get 才能监昕此 节点变化。
 
4.4 设置节点数据:set [-s] [-v version] path data
 
 set -s /test lvjie

4.5 删除节点:delete [-v version] path

delete /test

二、集成SpringBoot常用api及实现分布式锁

ZooKeeper本身提供了低级别的 Java API 来实现前面讲的节点操作。 Curator Apache供的一个访问 ZooKeeper 的工具包,封装了这些低级别操作同时也提供一些高级服务,比如分布式锁、领导选取等。

1.Curator的pom依赖

  1. <dependency>
  2. <groupId>org.apache.curator</groupId>
  3. <artifactId>curator-recipes</artifactId>
  4. <version>3.3.0</version>
  5. </dependency>

2.JAVA配置文件

  1. package com.ztxy.module.log.controller.config;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.CuratorFrameworkFactory;
  4. import org.apache.curator.retry.ExponentialBackoffRetry;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. /**
  8. * @author jie
  9. * @Description:
  10. * @date 2020/1/16 15:16
  11. */
  12. @Configuration
  13. public class ZookeeperConfig {
  14. /**
  15. * Curator Apache
  16. * 供的个访问Zookeeper的工具包,封装了这些低级别操作同时也提供一些高级服务,比如分布式锁、领导选取
  17. * @return
  18. */
  19. @Bean
  20. public CuratorFramework curatorFramework(){
  21. // ExponentialBackoffRetry是种重连策略,每次重连的间隔会越来越长,1000毫秒是初始化的间隔时间,3代表尝试重连次数。
  22. ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3);
  23. // 创建client
  24. CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("xxx.xxx.xxx.xxx:2181", retry);
  25. // 添加watched 监听器
  26. curatorFramework.getCuratorListenable().addListener(new MyCuratorListener());
  27. curatorFramework.start();
  28. return curatorFramework;
  29. }
  30. }

 watched监听器,监听的节点数据发生改变时将回调eventReceived方法

  1. package com.ztxy.module.log.controller.config;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.api.CuratorEvent;
  4. import org.apache.curator.framework.api.CuratorEventType;
  5. import org.apache.curator.framework.api.CuratorListener;
  6. import org.apache.zookeeper.WatchedEvent;
  7. /**
  8. * @author jie
  9. * @Description:
  10. * @date 2020/1/16 16:29
  11. */
  12. public class MyCuratorListener implements CuratorListener {
  13. @Override
  14. public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
  15. CuratorEventType type = event.getType();
  16. if(type == CuratorEventType.WATCHED){
  17. WatchedEvent watchedEvent = event.getWatchedEvent();
  18. String path = watchedEvent.getPath();
  19. System.out.println(watchedEvent.getType()+" -- "+ path);
  20. // 重新设置改节点监听
  21. if(null != path)
  22. client.checkExists().watched().forPath(path);
  23. }
  24. }
  25. }

3.Curator常用api

CuratorFramework提供简单的API来操 zk节点,还有zk事件,API 是链式操作风格,遇到forPath接口就触发ZooKeeper调用

  1. package com.ztxy.module.log.controller.rest.v;
  2. import com.ztxy.module.log.common.InvokeResult;
  3. import com.ztxy.module.log.service.v.CuratorLockService;
  4. import com.ztxy.module.log.vo.v.VoluntaryActiveTypeVO;
  5. import io.swagger.annotations.Api;
  6. import io.swagger.annotations.ApiOperation;
  7. import org.apache.curator.framework.CuratorFramework;
  8. import org.apache.curator.framework.api.CuratorListener;
  9. import org.apache.zookeeper.CreateMode;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.web.bind.annotation.*;
  12. /**
  13. * @Description:
  14. * @author: jie
  15. * @Createed Date: 2020-1-13 16:43:24
  16. * @ModificationHistory: Who When What
  17. * --------- ------------- --------------------------------------
  18. **/
  19. @RestController
  20. @RequestMapping("api/v1/zookeeperApiRest")
  21. @Api(value = "zookeeperApiRest", description = "ZookeeperApiRest相关API")
  22. public class ZookeeperApiRest {
  23. @Autowired
  24. private CuratorLockService curatorLockService;
  25. @Autowired
  26. private CuratorFramework client;
  27. @GetMapping("set")
  28. @ApiOperation(value = "设置节点数据", notes = "xxx")
  29. public InvokeResult set(String path, String value) throws Exception{
  30. client.setData().forPath(path,value.getBytes());
  31. return InvokeResult.success();
  32. }
  33. @GetMapping("get")
  34. @ApiOperation(value = "获取节点数据并监听")
  35. public InvokeResult get(String path) throws Exception {
  36. // watched() 监听节点 当节点变化时将通知Curator,需要添加一个监听器CuratorListener
  37. byte[] bytes = client.getData().watched().forPath(path);
  38. return InvokeResult.success(new String(bytes));
  39. }
  40. @GetMapping("create")
  41. @ApiOperation(value = "创建节点", notes = "xxx")
  42. public InvokeResult create(String path, String value) throws Exception{
  43. // 创建有序节点 CreateMode 指定节点类型
  44. client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path,value.getBytes());
  45. return InvokeResult.success();
  46. }
  47. }

4.分布式锁

  1. package com.ztxy.module.log.service.v.impl;
  2. import com.ztxy.module.log.service.v.CuratorLockService;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.curator.framework.CuratorFramework;
  5. import org.apache.curator.framework.recipes.locks.InterProcessLock;
  6. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.stereotype.Service;
  9. import org.springframework.transaction.annotation.Transactional;
  10. import java.util.concurrent.TimeUnit;
  11. /**
  12. * @author jie
  13. * @Description:
  14. * @date 2020/1/16 16:42
  15. */
  16. @Service
  17. @Slf4j
  18. @Transactional(rollbackFor = Exception.class)
  19. public class CuratorLockServiceImpl implements CuratorLockService {
  20. @Autowired
  21. private CuratorFramework client;
  22. /**
  23. * 分布式锁的初始路径
  24. */
  25. private String lockPath = "/lock/test";
  26. @Override
  27. public void lockTest(String value) {
  28. lockPath = lockPath + "/" + value;
  29. log.info("尝试执行业务"+value);
  30. // 创建分布式锁
  31. InterProcessMutex lock = new InterProcessMutex(client, lockPath);
  32. try {
  33. // 获取锁资源
  34. if(lock.acquire(10, TimeUnit.HOURS)){
  35. log.info("业务逻辑模拟,耗时25s");
  36. Thread.sleep(25000);
  37. }
  38. } catch (Exception e) {
  39. e.printStackTrace();
  40. } finally {
  41. try {
  42. lock.release();
  43. log.info("释放锁资源");
  44. } catch (Exception e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. }
  49. }

到这里分布式锁就完成了,同时启动多个springBoot调用此服务进行测试就行了。controller调用

  1. @GetMapping("lock/test")
  2. @ApiOperation(value = "分布式锁测试", notes = "xxx")
  3. public InvokeResult lock(String value) throws Exception{
  4. curatorLockService.lockTest(value);
  5. return InvokeResult.success();
  6. }

5.AOP实现分布式锁

优化代码:熟悉aop的也可用aop实现上面的锁机制,让代码看起来更美观实现如下

pom依赖

  1. <dependency>
  2. <groupId>commons-collections</groupId>
  3. <artifactId>commons-collections</artifactId>
  4. <version>3.2</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.aspectj</groupId>
  8. <artifactId>aspectjrt</artifactId>
  9. <version>1.9.2</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.springframework.boot</groupId>
  13. <artifactId>spring-boot-starter-aop</artifactId>
  14. </dependency>

自定义注解

  1. package com.ztxy.module.log.config;
  2. import java.lang.annotation.ElementType;
  3. import java.lang.annotation.Retention;
  4. import java.lang.annotation.RetentionPolicy;
  5. import java.lang.annotation.Target;
  6. /**
  7. * @author jie
  8. * @Description:
  9. * @date 2020/1/17 10:51
  10. */
  11. @Target(ElementType.METHOD)
  12. @Retention(RetentionPolicy.RUNTIME)
  13. public @interface ClusterLock {
  14. String LOCK_PATH();
  15. }

aspect切面

  1. package com.ztxy.module.log.controller.config;
  2. import com.ztxy.module.log.config.ClusterLock;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.curator.framework.CuratorFramework;
  5. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  6. import org.aspectj.lang.ProceedingJoinPoint;
  7. import org.aspectj.lang.annotation.Around;
  8. import org.aspectj.lang.annotation.Aspect;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.stereotype.Component;
  11. import java.util.concurrent.TimeUnit;
  12. /**
  13. * @author jie
  14. * @Description:
  15. * @date 2020/1/17 10:55
  16. */
  17. @Component
  18. @Slf4j
  19. @Aspect
  20. public class ClusterLockAdvice {
  21. @Autowired
  22. private CuratorFramework client;
  23. @Around("@annotation(clusterLock)")
  24. public void lockAround(ProceedingJoinPoint point, ClusterLock clusterLock){
  25. try {
  26. // 约定第一个参数就为业务类型
  27. Object value = point.getArgs()[0];
  28. if(null == value || !(value instanceof String) ){
  29. point.proceed(point.getArgs());
  30. return;
  31. }
  32. log.info("尝试执行业务"+value);
  33. String lockPath = clusterLock.LOCK_PATH();
  34. lockPath = lockPath + "/" + String.valueOf(value);
  35. InterProcessMutex lock = new InterProcessMutex(client, lockPath);
  36. try {
  37. if(lock.acquire(10, TimeUnit.HOURS)){
  38. point.proceed(point.getArgs());
  39. }
  40. } catch (Exception e) {
  41. e.printStackTrace();
  42. } finally {
  43. try {
  44. lock.release();
  45. log.info("释放锁资源");
  46. } catch (Exception e) {
  47. e.printStackTrace();
  48. }
  49. }
  50. } catch (Throwable throwable) {
  51. throwable.printStackTrace();
  52. }
  53. }
  54. }

使用

  1. @ClusterLock(LOCK_PATH = "/lock/test2")
  2. @Override
  3. public void lockTest2(String value) {
  4. log.info("业务逻辑模拟,耗时25s");
  5. try {
  6. Thread.sleep(25000);
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. }

如有不足,欢迎批评指正

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/53973
推荐阅读
相关标签
  

闽ICP备14008679号