赞
踩
实现功能:zookeeper节点的增删改查、节点监听、分布式读写锁、分布式计数器
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <java.version>1.8</java.version>
- <zookeeper.version>3.4.8</zookeeper.version>
- <curator.version>2.11.1</curator.version>
- </properties>
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>${zookeeper.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>${curator.version}</version>
- </dependency>
-
- </dependencies>

这里使用的是curator,curator是对zookeeper的简单封装,提供了一些集成的方法,或者是提供了更优雅的api
- /**
- * zookeeper客户端
- */
- @Data
- @Slf4j
- public class ZkClient {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
- private CuratorFramework client;
- public TreeCache cache;
- private ZookeeperProperties zookeeperProperties;
-
- public ZkClient(ZookeeperProperties zookeeperProperties){
- this.zookeeperProperties = zookeeperProperties;
- }
-
- /**
- * 初始化zookeeper客户端
- */
- public void init() {
- try{
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(zookeeperProperties.getBaseSleepTimeMs(),
- zookeeperProperties.getMaxRetries());
- Builder builder = CuratorFrameworkFactory.builder()
- .connectString(zookeeperProperties.getServer()).retryPolicy(retryPolicy)
- .sessionTimeoutMs( zookeeperProperties.getSessionTimeoutMs())
- .connectionTimeoutMs( zookeeperProperties.getConnectionTimeoutMs())
- .namespace( zookeeperProperties.getNamespace());
- if(StringUtils.isNotEmpty( zookeeperProperties.getDigest())){
- builder.authorization("digest", zookeeperProperties.getDigest().getBytes("UTF-8"));
- builder.aclProvider(new ACLProvider() {
- @Override
- public List<ACL> getDefaultAcl() {
- return ZooDefs.Ids.CREATOR_ALL_ACL;
- }
-
- @Override
- public List<ACL> getAclForPath(final String path) {
- return ZooDefs.Ids.CREATOR_ALL_ACL;
- }
- });
- }
- client = builder.build();
- client.start();
-
- initLocalCache("/test");
- // addConnectionStateListener();
-
-
- client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
- public void stateChanged(CuratorFramework client, ConnectionState state) {
- if (state == ConnectionState.LOST) {
- //连接丢失
- logger.info("lost session with zookeeper");
- } else if (state == ConnectionState.CONNECTED) {
- //连接新建
- logger.info("connected with zookeeper");
- } else if (state == ConnectionState.RECONNECTED) {
- logger.info("reconnected with zookeeper");
- }
- }
- });
- }catch(Exception e){
- e.printStackTrace();
- }
- }
-
- /**
- * 初始化本地缓存
- * @param watchRootPath
- * @throws Exception
- */
- private void initLocalCache(String watchRootPath) throws Exception {
- cache = new TreeCache(client, watchRootPath);
- TreeCacheListener listener = (client1, event) ->{
- log.info("event:" + event.getType() +
- " |path:" + (null != event.getData() ? event.getData().getPath() : null));
-
- if(event.getData()!=null && event.getData().getData()!=null){
- log.info("发生变化的节点内容为:" + new String(event.getData().getData()));
- }
-
- // client1.getData().
- };
- cache.getListenable().addListener(listener);
- cache.start();
- }
-
-
- public void stop() {
- client.close();
- }
-
- public CuratorFramework getClient() {
- return client;
- }
-
-
- /**
- * 创建节点
- * @param mode 节点类型
- * 1、PERSISTENT 持久化目录节点,存储的数据不会丢失。
- * 2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失
- * 3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除
- *4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。
- * @param path 节点名称
- * &

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。