当前位置:   article > 正文

api 创建zookeeper客户端_Springboot2(29)集成zookeeper的增删改查、节点监听、分布式读写锁、分布式计数器...

.values.zookeeper.enabled

实现zookeeper节点的增删改查、节点监听、分布式读写锁、分布式计数器

添加依赖

<properties>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<java.version>1.8</java.version>

<zookeeper.version>3.4.8</zookeeper.version>

<curator.version>2.11.1</curator.version>

</properties>

<dependencies>

<dependency>

<groupId>org.apache.zookeeper</groupId>

<artifactId>zookeeper</artifactId>

<version>${zookeeper.version}</version>

<exclusions>

<exclusion>

<groupId>org.slf4j</groupId>

<artifactId>slf4j-log4j12</artifactId>

</exclusion>

<exclusion>

<groupId>org.slf4j</groupId>

<artifactId>slf4j-api</artifactId>

</exclusion>

</exclusions>

</dependency>

<dependency>

<groupId>org.apache.curator</groupId>

<artifactId>curator-recipes</artifactId>

<version>${curator.version}</version>

</dependency>

</dependencies>

ZkClient(curator)

这里使用的是curator,curator是对zookeeper的简单封装,提供了一些集成的方法,或者是提供了更优雅的api

/**

* zookeeper客户端

*/

@Data

@Slf4j

public class ZkClient {

private final Logger logger = LoggerFactory.getLogger(this.getClass());

private CuratorFramework client;

public TreeCache cache;

private ZookeeperProperties zookeeperProperties;

public ZkClient(ZookeeperProperties zookeeperProperties){

this.zookeeperProperties = zookeeperProperties;

}

/**

* 初始化zookeeper客户端

*/

public void init() {

try{

RetryPolicy retryPolicy = new ExponentialBackoffRetry(zookeeperProperties.getBaseSleepTimeMs(),

zookeeperProperties.getMaxRetries());

Builder builder = CuratorFrameworkFactory.builder()

.connectString(zookeeperProperties.getServer()).retryPolicy(retryPolicy)

.sessionTimeoutMs( zookeeperProperties.getSessionTimeoutMs())

.connectionTimeoutMs( zookeeperProperties.getConnectionTimeoutMs())

.namespace( zookeeperProperties.getNamespace());

if(StringUtils.isNotEmpty( zookeeperProperties.getDigest())){

builder.authorization("digest", zookeeperProperties.getDigest().getBytes("UTF-8"));

builder.aclProvider(new ACLProvider() {

@Override

public List<ACL> getDefaultAcl() {

return ZooDefs.Ids.CREATOR_ALL_ACL;

}

@Override

public List<ACL> getAclForPath(final String path) {

return ZooDefs.Ids.CREATOR_ALL_ACL;

}

});

}

client = builder.build();

client.start();

initLocalCache("/test");

// addConnectionStateListener();

client.getConnectionStateListenable().addListener(new ConnectionStateListener() {

public void stateChanged(CuratorFramework client, ConnectionState state) {

if (state == ConnectionState.LOST) {

//连接丢失

http://logger.info("lost session with zookeeper");

} else if (state == ConnectionState.CONNECTED) {

//连接新建

http://logger.info("connected with zookeeper");

} else if (state == ConnectionState.RECONNECTED) {

http://logger.info("reconnected with zookeeper");

}

}

});

}catch(Exception e){

e.printStackTrace();

}

}

/**

* 初始化本地缓存

* @param watchRootPath

* @throws Exception

*/

private void initLocalCache(String watchRootPath) throws Exception {

cache = new TreeCache(client, watchRootPath);

TreeCacheListener listener = (client1, event) ->{

http://log.info("event:" + event.getType() +

" |path:" + (null != event.getData() ? event.getData().getPath() : null));

if(event.getData()!=null && event.getData().getData()!=null){

http://log.info("发生变化的节点内容为:" + new String(event.getData().getData()));

}

// client1.getData().

};

cache.getListenable().addListener(listener);

cache.start();

}

public void stop() {

client.close();

}

public CuratorFramework getClient() {

return client;

}

/**

* 创建节点

* @param mode 节点类型

* 1、PERSISTENT 持久化目录节点,存储的数据不会丢失。

* 2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失

* 3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除

*4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。

* @param path 节点名称

* @param nodeData 节点数据

*/

public void createNode(CreateMode mode, String path , String nodeData) {

try {

//使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点

client.create().creatingParentsIfNeeded().withMode(mode).forPath(path,nodeData.getBytes("UTF-8"));

} catch (Exception e) {

logger.error("注册出错", e);

}

}

/**

* 创建节点

* @param mode 节点类型

* 1、PERSISTENT 持久化目录节点,存储的数据不会丢失。

* 2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失

* 3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除

* 4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。

* @param path 节点名称

*/

public void createNode(CreateMode mode,String path ) {

try {

//使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点

client.create().creatingParentsIfNeeded().withMode(mode).forPath(path);

} catch (Exception e) {

logger.error("注册出错", e);

}

}

/**

* 删除节点数据

*

* @param path

*/

public void deleteNode(final String path) {

try {

deleteNode(path,true);

} catch (Exception ex) {

log.error("{}",ex);

}

}

/**

* 删除节点数据

* @param path

* @param deleteChildre 是否删除子节点

*/

public void deleteNode(final String path,Boolean deleteChildre){

try {

if(deleteChildre){

//guaranteed()删除一个节点,强制保证删除,

// 只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到删除节点成功

client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);

}else{

client.delete().guaranteed().forPath(path);

}

} catch (Exception e) {

e.printStackTrace();

}

}

/**

* 设置指定节点的数据

* @param path

* @param datas

*/

public void setNodeData(String path, byte[] datas){

try {

client.setData().forPath(path, datas);

}catch (Exception ex) {

log.error("{}",ex);

}

}

/**

* 获取指定节点的数据

* @param path

* @return

*/

public byte[] getNodeData(String path){

Byte[] bytes = null;

try {

if(cache != null){

ChildData data = cache.getCurrentData(path);

if(data != null){

return data.getData();

}

}

client.getData().forPath(path);

return client.getData().forPath(path);

}catch (Exception ex) {

log.error("{}",ex);

}

return null;

}

/**

* 获取数据时先同步

* @param path

* @return

*/

public byte[] synNodeData(String path){

client.sync();

return getNodeData( path);

}

/**

* 判断路径是否存在

*

* @param path

* @return

*/

public boolean isExistNode(final String path) {

client.sync();

try {

return null != client.checkExists().forPath(path);

} catch (Exception ex) {

return false;

}

}

/**

* 获取节点的子节点

* @param path

* @return

*/

public List<String> getChildren(String path) {

List<String> childrenList = new ArrayList<>();

try {

childrenList = client.getChildren().forPath(path);

} catch (Exception e) {

logger.error("获取子节点出错", e);

}

return childrenList;

}

/**

* 随机读取一个path子路径, "/"为根节点对应该namespace

* 先从cache中读取,如果没有,再从zookeeper中查询

* @param path

* @return

* @throws Exception

*/

public String getRandomData(String path) {

try{

Map<String,ChildData> cacheMap = cache.getCurrentChildren(path);

if(cacheMap != null && cacheMap.size() > 0) {

logger.debug("get random value from cache,path="+path);

Collection<ChildData> values = cacheMap.values();

List<ChildData> list = new ArrayList<>(values);

Random rand = new Random();

byte[] b = list.get(rand.nextInt(list.size())).getData();

return new String(b,"utf-8");

}

if(isExistNode(path)) {

logger.debug("path [{}] is not exists,return null",path);

return null;

} else {

logger.debug("read random from zookeeper,path="+path);

List<String> list = client.getChildren().forPath(path);

if(list == null || list.size() == 0) {

logger.debug("path [{}] has no children return null",path);

return null;

}

Random rand = new Random();

String child = list.get(rand.nextInt(list.size()));

path = path + "/" + child;

byte[] b = client.getData().forPath(path);

String value = new String(b,"utf-8");

return value;

}

}catch(Exception e){

log.error("{}",e);

}

return null;

}

/**

* 可重入共享锁 -- Shared Reentrant Lock

* @param lockPath

* @param time

* @param dealWork 获取

* @return

*/

public Object getSRLock(String lockPath,long time, SRLockDealCallback<?> dealWork){

InterProcessMutex lock = new InterProcessMutex(client, lockPath);

try {

if (!lock.acquire(time, TimeUnit.SECONDS)) {

log.error("get lock fail:{}", " could not acquire the lock");

return null;

}

log.debug("{} get the lock",lockPath);

Object b = dealWork.deal();

return b;

}catch(Exception e){

log.error("{}", e);

}finally{

try {

lock.release();

} catch (Exception e) {

//log.error("{}",e);

}

}

return null;

}

/**

* 获取读写锁

* @param path

* @return

*/

public InterProcessReadWriteLock getReadWriteLock(String path){

InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, path);

return readWriteLock;

}

/**

* 在注册监听器的时候,如果传入此参数,当事件触发时,逻辑由线程池处理

*/

ExecutorService pool = Executors.newFixedThreadPool(2);

/**

* 监听数据节点的变化情况

* @param watchPath

* @param listener

*/

public void watchPath(String watchPath,TreeCacheListener listener){

// NodeCache nodeCache = new NodeCache(client, watchPath, false);

TreeCache cache = new TreeCache(client, watchPath);

cache.getListenable().addListener(listener,pool);

try {

cache.start();

} catch (Exception e) {

e.printStackTrace();

}

}

}

配置文件

zookeeper.enabled: true

#zookeeper.server: 47.106.106.53:9036,47.106.106.53:9037,47.106.106.53:9038

zookeeper.server: 10.10.2.137:2181,10.10.2.138:2181,10.10.2.139:2181

zookeeper.namespace: demo

zookeeper.digest: rt:rt #zkCli.sh acl 命令 addauth digest mpush

zookeeper.sessionTimeoutMs: 1000 #会话超时时间,单位为毫秒,默认60000ms,连接断开后,其它客户端还能请到临时节点的时间

zookeeper.connectionTimeoutMs: 6000 #连接创建超时时间,单位为毫秒

zookeeper.maxRetries: 3 #最大重试次数

zookeeper.baseSleepTimeMs: 1000 #初始sleep时间 ,毫秒

程序会创建节点demo为namespace,之后所有增删改查的操作都这节点下完成

Controller层方法

@Api(tags="zookeeper基本操作")

@RequestMapping("/zk")

@RestController

@Slf4j

public class ZookeeperController {

@Autowired

private ZkClient zkClient;

@Autowired

private ZkClient zkClientTest;

/**

* 创建节点

* @param type

* @param znode

* @return

*/

@ApiOperation(value = "创建节点",notes = "在命名空间下创建节点")

@ApiImplicitParams({

@ApiImplicitParam(name ="type",value = "节点类型:<br> 0 持久化节点<br> 1 临时节点<br> 2 持久顺序节点<br> 3 临时顺序节点",

allowableValues = "0,1,2,3",defaultValue="3",paramType = "path",required = true,dataType = "Long"),

@ApiImplicitParam(name ="znode",value = "节点名称",paramType = "path",required = true,dataType = "String"),

@ApiImplicitParam(name ="nodeData",value = "节点数据",paramType = "body",dataType = "String")

})

@RequestMapping(value = "/create/{type}/{znode}",method=RequestMethod.POST)

private String create(@PathVariable Integer type,@PathVariable String znode,@RequestBody String nodeData){

znode = "/" + znode;

try {

zkClient.createNode(CreateMode.fromFlag(type),znode,nodeData);

} catch (KeeperException e) {

e.printStackTrace();

}

return znode;

}

/**

* 设置节点数据

* @param znode

* @return

*/

@ApiOperation(value = "设置节点数据",notes = "设置节点数据")

@ApiImplicitParams({

@ApiImplicitParam(name ="znode",value = "节点名称",paramType = "body",required = true,dataType = "String"),

@ApiImplicitParam(name ="nodeData",value = "节点数据",paramType = "query",required = true,dataType = "String")

})

@RequestMapping(value = "/update",method=RequestMethod.POST)

public String update(@RequestBody String znode,@RequestParam String nodeData){

znode = "/" + znode;

zkClient.setNodeData(znode,nodeData.getBytes());

return "sucess";

}

@ApiOperation(value = "删除节点",notes = "删除节点")

@ApiImplicitParams({

@ApiImplicitParam(name ="znode",value = "节点名称",paramType = "query",required = true,dataType = "String")

})

@RequestMapping(value = "/delete",method=RequestMethod.GET)

public String delete(@RequestParam String znode){

znode = "/" + znode;

zkClient.deleteNode(znode);

return "success";

}

@ApiOperation(value = "查找节点的内容",notes = "查找节点的内容")

@ApiImplicitParams({

@ApiImplicitParam(name ="znode",value = "节点名称",paramType = "body",required = true,dataType = "String")

})

@RequestMapping(value = "/find",method=RequestMethod.POST)

public String find(@RequestBody String znode){

znode = "/" + znode;

byte[] b = zkClient.getNodeData(znode);

return new String(b);

}

/**

* 给节点添加读写锁

* @param znode

* @return

*/

@ApiOperation(value = "添加读写锁",notes = "写锁跟读锁互斥,读锁跟读锁共享")

@ApiImplicitParams({

@ApiImplicitParam(name ="lockType",value = "锁类型:<br> 0 写锁<br> 1 读锁",

allowableValues = "0,1",defaultValue="0",paramType = "query",required = true,dataType = "Long"),

@ApiImplicitParam(name ="znode",value = "节点名称",paramType = "query",required = true,dataType = "String")

})

@RequestMapping(value = "/writeLock",method=RequestMethod.GET)

public String readLock(@RequestParam Integer lockType,@RequestParam String znode){

znode = "/" + znode;

InterProcessReadWriteLock readWriteLock = zkClient.getReadWriteLock(znode);

InterProcessMutex writeLock = readWriteLock.writeLock();

InterProcessMutex readLock = readWriteLock.readLock();

Runnable writeRunnable = ()->{

try {

System.out.println("------write lock-----------");

writeLock.acquire();

System.out.println("write acquire");

Thread.sleep(10_000);

System.out.println("write release");

writeLock.release();

} catch (Exception e) {

e.printStackTrace();

}

};

Runnable readRunnable = ()->{

try {

System.out.println("-------read lock----------");

readLock.acquire();

System.out.println("read acquire");

Thread.sleep(20_000);

System.out.println("read release");

readLock.release();

} catch (Exception e) {

e.printStackTrace();

}

};

if(lockType == 0 ){

new Thread(writeRunnable).start();

}else if(lockType == 1){

new Thread(readRunnable).start();

}

return "success";

}

/**

* 监听节点

* @param znode

* @return

*/

@ApiOperation(value = "监听节点",notes = "监控整个树上的所有节点")

@ApiImplicitParams(

@ApiImplicitParam(name ="znode",value = "节点名称",paramType = "body",required = true,dataType = "String")

)

@RequestMapping(value="/watchPath",method=RequestMethod.POST)

public String watchPath(@RequestBody String znode){

znode = "/" + znode;

zkClient.watchPath(znode,(client1, event) ->{

http://log.info("event:" + event.getType() +

" |path:" + (null != event.getData() ? event.getData().getPath() : null));

if(event.getData()!=null && event.getData().getData()!=null){

http://log.info("发生变化的节点内容为:" + new String(event.getData().getData()));

}

});

return "success";

}

/**

* 测试计算器

* 并发越高耗时越长

* 要自己实现获取锁失败重试

* @return

*/

@ApiOperation(value = "模拟分布式计数器",notes = "模拟分布式计数器")

@RequestMapping(value="/counter",method=RequestMethod.POST)

public String counter(@RequestBody String znode){

SharedCount baseCount = new SharedCount(zkClientTest.getClient(), znode, 0);

try {

baseCount.start();

//生成线程池

ExecutorService executor = Executors.newCachedThreadPool();

Consumer<SharedCount> consumer = (SharedCount count) -> {

try {

List<Callable<Boolean>> callList = new ArrayList<>();

Callable<Boolean> call = () -> {

boolean result = false;

try {

Long time = System.currentTimeMillis();

while(!result){

VersionedValue<Integer> oldVersion = baseCount.getVersionedValue();

int newCnt = oldVersion.getValue() + 1;

result = baseCount.trySetCount(oldVersion, newCnt);

if(System.currentTimeMillis()-time>10_000||result){

break;

}

TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100)+1);

}

} catch (Exception e) {

}

return result;

};

//5个线程

for (int i = 0; i < 100; i++) {

callList.add(call);

}

List<Future<Boolean>> futures = executor.invokeAll(callList);

} catch (Exception e) {

}

};

//测试分布式int类型的计数器

consumer.accept(baseCount);

System.out.println("final cnt : " + baseCount.getCount());

} catch (Exception e) {

e.printStackTrace();

}

return "success:"+baseCount.getCount();

}

/**

* DistributedAtomicLong计数器可以自己设置重试的次数与间隔

* 并发越高耗时越长

* 要自己实现获取锁失败重试

*/

@ApiOperation(value = "模拟分布式计数器2",notes = "模拟分布式计数器2")

@RequestMapping(value="/counter2",method=RequestMethod.POST)

public String distributedCount(@RequestBody String znode) throws Exception {

DistributedAtomicLong distributedAtomicLong = new DistributedAtomicLong(

zkClientTest.getClient(), znode, new RetryNTimes(10, 30));

//生成线程池

ExecutorService executor = Executors.newCachedThreadPool();

Consumer<DistributedAtomicLong> consumer = (DistributedAtomicLong count) -> {

try {

List<Callable<Boolean>> callList = new ArrayList<>();

Callable<Boolean> call = () -> {

boolean result = false;

try {

AtomicValue<Long> val = count.increment();

System.out.println("old cnt: "+val.preValue()+" new cnt : "+ val.postValue()+" result:"+val.succeeded());

result = val.succeeded();

} catch (Exception e) {

} finally {

}

return result;

};

//5个线程

for (int i = 0; i < 500; i++) {

callList.add(call);

}

List<Future<Boolean>> futures = executor.invokeAll(callList);

} catch (Exception e) {

}

};

consumer.accept(distributedAtomicLong);

return "success:"+distributedAtomicLong.get().postValue();

}

/**

*

* @return

* @throws KeeperException

*/

@ApiOperation(value = "模拟服务注册和随机获取服务",notes = "模拟服务注册和随机获取服务")

@RequestMapping(value="/serviceRegistry",method=RequestMethod.POST)

public String serviceRegistry() throws KeeperException {

//服务注册

zkClient.createNode(CreateMode.fromFlag(1),"/test/hello_service1","http://1270.0.1:8001/");

zkClient.createNode(CreateMode.fromFlag(1),"/test/hello_service2","http://1270.0.1:8002/");

zkClient.createNode(CreateMode.fromFlag(1),"/test/hello_service3","http://1270.0.1:8003/");

return zkClient.getRandomData("/test");

}

}

测试

测试地址:http://127.0.0.1:8080/swagger-ui.html

读写锁测试

在界面发送两次写锁

cc9b1f8234a9a831de8a67338d7d78b1.png

后面打印

2018-12-29 11:45:27.214 INFO 53332 --- [ Thread-24] : ------write lock-----------

2018-12-29 11:45:27.242 INFO 53332 --- [ Thread-24] : write acquire

2018-12-29 11:45:30.870 INFO 53332 --- [ Thread-25] : ------write lock-----------

2018-12-29 11:45:37.243 INFO 53332 --- [ Thread-24] : write release

2018-12-29 11:45:37.276 INFO 53332 --- [ Thread-25] : write acquire

2018-12-29 11:45:47.276 INFO 53332 --- [ Thread-25] : write release

可以看出写锁是互斥的,另外写锁跟读锁也是互斥的,读锁跟读锁之间是共享的(自行测试)

计数器测试

修改方法中的线程个数,会发现并发越大,计数器执行时间越长,而且很大可能数据不准确。所以不适用于高并发的场景。

c792cb80e69c7b4367977ca2fafdea14.png
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/53966
推荐阅读
相关标签
  

闽ICP备14008679号