赞
踩
目录
官网网站:https://zookeeper.apache.org/
国内镜像地址:https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/
本文下载的3.5.5版本

zookeeper只需解压就可以使用,在启动之前进入conf目录,创建个文件,命名为zoo.cfg,内容如下:
- tickTime=2000
- dataDir= .. /data
- clientPort=2181
进入bin目录,运行zkServer:
./zkServer.sh start
./bin/zkCli.sh
zookeeper提供的命名(namespace)类似文件系统,每个节点都是通过路径来表示的,不同的是,节点可以包含一定的数据 (2MB字节),这些节点可以用来存放业务信息,如配置信息等。
4.1 查看目录:ls [-s] [-w] [-R] path
ls /
4.2 创建节点:create [-s] [-e] [-c] [-t ttl] path [data] [acl]
-e 临时节点
-s 有序号的节点
如,在/节点下创建test临时节点,节点数据设置为data
create -e /test data
4.3 获取节点数据: get [-s] [-w] path
-s 详细信息
-w 设置watched监听
get -s -w /test
set -s /test lvjie
4.5 删除节点:delete [-v version] path
delete /test
ZooKeeper本身提供了低级别的 Java API 来实现前面讲的节点操作。 Curator Apache供的一个访问 ZooKeeper 的工具包,封装了这些低级别操作同时也提供一些高级服务,比如分布式锁、领导选取等。
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>3.3.0</version>
- </dependency>
- package com.ztxy.module.log.controller.config;
-
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.retry.ExponentialBackoffRetry;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * @author jie
- * @Description:
- * @date 2020/1/16 15:16
- */
- @Configuration
- public class ZookeeperConfig {
-
- /**
- * Curator Apache
- * 供的个访问Zookeeper的工具包,封装了这些低级别操作同时也提供一些高级服务,比如分布式锁、领导选取
- * @return
- */
- @Bean
- public CuratorFramework curatorFramework(){
- // ExponentialBackoffRetry是种重连策略,每次重连的间隔会越来越长,1000毫秒是初始化的间隔时间,3代表尝试重连次数。
- ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3);
- // 创建client
- CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("xxx.xxx.xxx.xxx:2181", retry);
- // 添加watched 监听器
- curatorFramework.getCuratorListenable().addListener(new MyCuratorListener());
- curatorFramework.start();
- return curatorFramework;
- }
-
- }

watched监听器,监听的节点数据发生改变时将回调eventReceived方法
- package com.ztxy.module.log.controller.config;
-
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.api.CuratorEvent;
- import org.apache.curator.framework.api.CuratorEventType;
- import org.apache.curator.framework.api.CuratorListener;
- import org.apache.zookeeper.WatchedEvent;
-
- /**
- * @author jie
- * @Description:
- * @date 2020/1/16 16:29
- */
- public class MyCuratorListener implements CuratorListener {
- @Override
- public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
- CuratorEventType type = event.getType();
- if(type == CuratorEventType.WATCHED){
- WatchedEvent watchedEvent = event.getWatchedEvent();
- String path = watchedEvent.getPath();
- System.out.println(watchedEvent.getType()+" -- "+ path);
- // 重新设置改节点监听
- if(null != path)
- client.checkExists().watched().forPath(path);
- }
-
- }
- }

CuratorFramework提供简单的API来操 zk节点,还有zk事件,API 是链式操作风格,遇到forPath接口就触发ZooKeeper调用
- package com.ztxy.module.log.controller.rest.v;
-
- import com.ztxy.module.log.common.InvokeResult;
- import com.ztxy.module.log.service.v.CuratorLockService;
- import com.ztxy.module.log.vo.v.VoluntaryActiveTypeVO;
- import io.swagger.annotations.Api;
- import io.swagger.annotations.ApiOperation;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.api.CuratorListener;
- import org.apache.zookeeper.CreateMode;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.*;
-
- /**
- * @Description:
- * @author: jie
- * @Createed Date: 2020-1-13 16:43:24
- * @ModificationHistory: Who When What
- * --------- ------------- --------------------------------------
- **/
- @RestController
- @RequestMapping("api/v1/zookeeperApiRest")
- @Api(value = "zookeeperApiRest", description = "ZookeeperApiRest相关API")
- public class ZookeeperApiRest {
-
- @Autowired
- private CuratorLockService curatorLockService;
-
- @Autowired
- private CuratorFramework client;
-
- @GetMapping("set")
- @ApiOperation(value = "设置节点数据", notes = "xxx")
- public InvokeResult set(String path, String value) throws Exception{
- client.setData().forPath(path,value.getBytes());
- return InvokeResult.success();
- }
-
- @GetMapping("get")
- @ApiOperation(value = "获取节点数据并监听")
- public InvokeResult get(String path) throws Exception {
- // watched() 监听节点 当节点变化时将通知Curator,需要添加一个监听器CuratorListener
- byte[] bytes = client.getData().watched().forPath(path);
- return InvokeResult.success(new String(bytes));
- }
-
- @GetMapping("create")
- @ApiOperation(value = "创建节点", notes = "xxx")
- public InvokeResult create(String path, String value) throws Exception{
- // 创建有序节点 CreateMode 指定节点类型
- client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path,value.getBytes());
- return InvokeResult.success();
- }
-
- }

- package com.ztxy.module.log.service.v.impl;
-
- import com.ztxy.module.log.service.v.CuratorLockService;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.recipes.locks.InterProcessLock;
- import org.apache.curator.framework.recipes.locks.InterProcessMutex;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import org.springframework.transaction.annotation.Transactional;
-
- import java.util.concurrent.TimeUnit;
-
- /**
- * @author jie
- * @Description:
- * @date 2020/1/16 16:42
- */
-
- @Service
- @Slf4j
- @Transactional(rollbackFor = Exception.class)
- public class CuratorLockServiceImpl implements CuratorLockService {
-
- @Autowired
- private CuratorFramework client;
-
- /**
- * 分布式锁的初始路径
- */
- private String lockPath = "/lock/test";
-
- @Override
- public void lockTest(String value) {
- lockPath = lockPath + "/" + value;
- log.info("尝试执行业务"+value);
-
- // 创建分布式锁
- InterProcessMutex lock = new InterProcessMutex(client, lockPath);
- try {
- // 获取锁资源
- if(lock.acquire(10, TimeUnit.HOURS)){
- log.info("业务逻辑模拟,耗时25s");
- Thread.sleep(25000);
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- lock.release();
- log.info("释放锁资源");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- }
- }

到这里分布式锁就完成了,同时启动多个springBoot调用此服务进行测试就行了。controller调用
- @GetMapping("lock/test")
- @ApiOperation(value = "分布式锁测试", notes = "xxx")
- public InvokeResult lock(String value) throws Exception{
- curatorLockService.lockTest(value);
- return InvokeResult.success();
- }
优化代码:熟悉aop的也可用aop实现上面的锁机制,让代码看起来更美观实现如下:
pom依赖
- <dependency>
- <groupId>commons-collections</groupId>
- <artifactId>commons-collections</artifactId>
- <version>3.2</version>
- </dependency>
- <dependency>
- <groupId>org.aspectj</groupId>
- <artifactId>aspectjrt</artifactId>
- <version>1.9.2</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-aop</artifactId>
- </dependency>
自定义注解
- package com.ztxy.module.log.config;
-
- import java.lang.annotation.ElementType;
- import java.lang.annotation.Retention;
- import java.lang.annotation.RetentionPolicy;
- import java.lang.annotation.Target;
-
- /**
- * @author jie
- * @Description:
- * @date 2020/1/17 10:51
- */
-
- @Target(ElementType.METHOD)
- @Retention(RetentionPolicy.RUNTIME)
- public @interface ClusterLock {
- String LOCK_PATH();
- }

aspect切面
- package com.ztxy.module.log.controller.config;
-
- import com.ztxy.module.log.config.ClusterLock;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.recipes.locks.InterProcessMutex;
- import org.aspectj.lang.ProceedingJoinPoint;
- import org.aspectj.lang.annotation.Around;
- import org.aspectj.lang.annotation.Aspect;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import java.util.concurrent.TimeUnit;
-
- /**
- * @author jie
- * @Description:
- * @date 2020/1/17 10:55
- */
-
- @Component
- @Slf4j
- @Aspect
- public class ClusterLockAdvice {
-
- @Autowired
- private CuratorFramework client;
-
- @Around("@annotation(clusterLock)")
- public void lockAround(ProceedingJoinPoint point, ClusterLock clusterLock){
- try {
- // 约定第一个参数就为业务类型
- Object value = point.getArgs()[0];
- if(null == value || !(value instanceof String) ){
- point.proceed(point.getArgs());
- return;
- }
- log.info("尝试执行业务"+value);
- String lockPath = clusterLock.LOCK_PATH();
- lockPath = lockPath + "/" + String.valueOf(value);
- InterProcessMutex lock = new InterProcessMutex(client, lockPath);
- try {
- if(lock.acquire(10, TimeUnit.HOURS)){
- point.proceed(point.getArgs());
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- lock.release();
- log.info("释放锁资源");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- } catch (Throwable throwable) {
- throwable.printStackTrace();
- }
-
- }
-
- }

使用
- @ClusterLock(LOCK_PATH = "/lock/test2")
- @Override
- public void lockTest2(String value) {
- log.info("业务逻辑模拟,耗时25s");
- try {
- Thread.sleep(25000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
如有不足,欢迎批评指正
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。