赞
踩
zk可以作为注册中心和配置中心
<!-- curator-framework --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.2.0</version> </dependency> <!-- curator-recipes试用报告 --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.2.0</version> </dependency> <!-- curator 做服务发现--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-x-discovery</artifactId> <version>4.2.0</version> </dependency>
@Slf4j @Configuration public class ZkCoreClient { // zk 服务端集群地址 @Value("${zk.url}") private String zkUrl; // session 超时时间 private int timeOut = 60000; // zkclient 重试间隔时间 private int baseSleepTimeMs = 5000; //zkclient 重试次数 private int retryCount = 5; /** * 使用double-check 创建client */ @Bean public CuratorFramework init() { CuratorFramework client = CuratorFrameworkFactory .builder() .connectString(zkUrl) .sessionTimeoutMs(timeOut) .retryPolicy(new ExponentialBackoffRetry(baseSleepTimeMs, retryCount)) // .namespace(appName) .build(); // 或者使用工厂模式 // client = CuratorFrameworkFactory.newClient(zkUrl,new ExponentialBackoffRetry(baseSleepTimeMs,retryCount)).usingNamespace(appName); client.start(); log.info("client is created at ================== {}", LocalDateTime.now()); return client; } }
/** * 使用Curator 实现zk 的基本操作-增删查改数据 和监听watch */ @Slf4j public class ZkUtils { @Autowired CuratorFramework client; /** * @Description: 创建路径 */ public String createNode(String path, String value) throws Exception { return createNode(path, value, true); } public String createNode(String path, String value, Boolean isEphemeral) throws Exception { if (null == client) { throw new RuntimeException("there is not connect to zkServer..."); } String node = client .create() .creatingParentsIfNeeded() .withMode(isEphemeral.equals(true) ? CreateMode.EPHEMERAL_SEQUENTIAL : CreateMode.PERSISTENT_SEQUENTIAL) // 临时顺序节点/持久顺序节点 .forPath(path, value.getBytes()); log.info("create node : {}", node); return node; } /** * @param path * @Description: 删除节点信息 */ public void deleteNode(String path) throws Exception { if (null == client) { throw new RuntimeException("there is not connect to zkServer..."); } client.delete() .guaranteed() // 保障机制,若未删除成功,只要会话有效会在后台一直尝试删除 .deletingChildrenIfNeeded() // 若当前节点包含子节点,子节点也删除 .forPath(path); log.info("{} is deleted ", path); } /** * 判断znode是否存在,Stat就是对znode所有属性的一个映射,stat=null表示节点不存在 */ public Stat isExists(String path) throws Exception { if (null == client) { throw new RuntimeException("there is not connect to zkServer..."); } return client.checkExists().forPath(path); } /** * 查询子节点 */ public List<String> getChildren(String path) throws Exception { if (null == client) { throw new RuntimeException("there is not connect to zkServer..."); } return client.getChildren() .forPath(path); } /** * @param path * @Description: 获取节点存储的值 */ public String getNodeData(String path) throws Exception { if (null == client) { throw new RuntimeException("there is not connect to zkServer..."); } Stat stat = new Stat(); byte[] bytes = client.getData().storingStatIn(stat).forPath(path); log.info("{} data is : {}", path, new String(bytes)); log.info("current stat version is {}, createTime is {}", stat.getVersion(), stat.getCtime()); return new String(bytes); } /** * @param path * @param value * @Description: 设置节点 数据 */ public void setNodeData(String path, String value) throws Exception { if (null == client) { throw new RuntimeException("there is not connect to zkServer..."); } Stat stat = client.checkExists().forPath(path); if (null == stat) { log.info(String.format("{} Znode is not exists", path)); throw new RuntimeException(String.format("{} Znode is not exists", path)); } String nodeData = getNodeData(path); client.setData().withVersion(stat.getVersion()).forPath(path, value.getBytes()); log.info("{} Znode data is set. old vaule is {}, new data is {}", path, nodeData, value); } /** * @param path * @Description: 创建 给定节点的监听事件 监听一个节点的更新和创建事件(不包括删除) */ public void addWatcherWithNodeCache(String path) throws Exception { if (null == client) { throw new RuntimeException("there is not connect to zkServer..."); } // dataIsCompressed if true, data in the path is compressed NodeCache nodeCache = new NodeCache(client, path, false); NodeCacheListener listener = () -> { ChildData currentData = nodeCache.getCurrentData(); log.info("{} Znode data is chagnge,new data is --- {}", currentData.getPath(), new String(currentData.getData())); }; nodeCache.getListenable().addListener(listener); nodeCache.start(); } /** * @param path 给定节点 * @Description: 监听给定节点下的子节点的创建、删除、更新 */ public void addWatcherWithChildCache(String path) throws Exception { if (null == client) { throw new RuntimeException("there is not connect to zkServer..."); } //cacheData if true, node contents are cached in addition to the stat PathChildrenCache pathChildrenCache = new PathChildrenCache(client, path, false); PathChildrenCacheListener listener = (client, event) -> { log.info("event path is --{} ,event type is {}", event.getData().getPath(), event.getType()); }; pathChildrenCache.getListenable().addListener(listener); // StartMode : NORMAL BUILD_INITIAL_CACHE POST_INITIALIZED_EVENT // NORMAL:异步初始化, BUILD_INITIAL_CACHE:同步初始化, POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件 pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL); } /** * @Description: 监听 给定节点的创建、更新(不包括删除) 以及 该节点下的子节点的创建、删除、更新动作。 */ public void addWatcherWithTreeCache(String path) throws Exception { if (null == client) { throw new RuntimeException("there is not connect to zkServer..."); } TreeCache treeCache = new TreeCache(client, path); TreeCacheListener listener = (client, event) -> { log.info("节点路径 --{} ,节点事件类型: {} , 节点值为: {}", Objects.nonNull(event.getData()) ? event.getData().getPath() : "无数据", event.getType()); }; treeCache.getListenable().addListener(listener); treeCache.start(); } } public class zkRegiestter{ /** * 服务注册:将当前启动的服务信息(采用map的形式)进行注册到zk中。 * 这里采用的是map 可以自定义类 */ public void registerService(String serviceName, String... urls) throws Exception { ServiceInstanceBuilder<Map> serviceInstanceBuilder = ServiceInstance.builder(); serviceInstanceBuilder.address(InetAddress.getLocalHost().getHostAddress()); serviceInstanceBuilder.port(Integer.parseInt(environment.getProperty("server.port"))); serviceInstanceBuilder.name(serviceName); Map config = new HashMap(); config.put("url", urls); serviceInstanceBuilder.payload(config); ServiceInstance<Map> serviceInstance = serviceInstanceBuilder.build(); ServiceDiscovery<Map> serviceDiscovery = ServiceDiscoveryBuilder.builder(Map.class) .client(client).serializer(new JsonInstanceSerializer<Map>(Map.class)) .basePath(SERVICE_ROOT_PATH).build(); serviceDiscovery.registerService(serviceInstance); serviceDiscovery.start(); } /** * 服务发现:通过serviceDiscovery查询到服务 */ public void discovery(String serviceName) { try { ServiceDiscovery<Map> serviceDiscovery = ServiceDiscoveryBuilder.builder(Map.class) .client(client).basePath(SERVICE_ROOT_PATH).build(); serviceDiscovery.start(); //根据名称获取服务 Collection<ServiceInstance<Map>> services = serviceDiscovery.queryForInstances(serviceName); for (ServiceInstance<Map> service : services) { System.out.print(service.getPayload() + " -- "); System.out.println(service.getAddress() + ":" + service.getPort()); } System.out.println(); } catch (Exception e) { e.printStackTrace(); } } } public class zkLockClient{ @Autowired CuratorFramework client; /** * 首先需要调用create 创建路径 * 尝试获取锁 包含解锁操作 */ public <T> TwoTuple<Boolean, T> tryLock(LockCallback<T> lockCallback, String lockKey, Long timeout) { InterProcessMutex lock = new InterProcessMutex(client, lockKey); try { if (lock.acquire(timeout, TimeUnit.MILLISECONDS)) { log.info(Thread.currentThread().getName() + " get lock"); return new TwoTuple<>(true, lockCallback.exec()); } } catch (Exception e) { e.printStackTrace(); } finally { try { log.info(Thread.currentThread().getName() + " release lock"); lock.release(); } catch (Exception e) { e.printStackTrace(); } } return new TwoTuple<>(false, null); } }
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.5</version>
</dependency>
spring: application: name: HelloWorld cloud: zookeeper: connect-string: localhost:2181 discovery: enabled: true server: port: 8081 endpoints: restart: enabled: true logging: level: org.apache.zookeeper.ClientCnxn: WARN ## 单独配置zk zookeeper: address: 127.0.0.1:2181 timeout: 4000
@SpringBootApplication
@EnableDiscoveryClient
public class HelloWorldApplication {
public static void main(String[] args) {
SpringApplication.run(HelloWorldApplication.class, args);
}
}
@Configuration public class ZookeeperConfig { private static final Logger logger = LoggerFactory.getLogger(ZookeeperConfig.class); @Value("${zookeeper.address}") private String connectString; @Value("${zookeeper.timeout}") private int timeout; @Bean(name = "zkClient") public ZooKeeper zkClient(){ ZooKeeper zooKeeper=null; try { final CountDownLatch countDownLatch = new CountDownLatch(1); //连接成功后,会回调watcher监听,此连接操作是异步的,执行完new语句后,直接调用后续代码 // 可指定多台服务地址 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 zooKeeper = new ZooKeeper(connectString, timeout, new Watcher() { @Override public void process(WatchedEvent event) { if(Event.KeeperState.SyncConnected==event.getState()){ //如果收到了服务端的响应事件,连接成功 countDownLatch.countDown(); } } }); countDownLatch.await(); logger.info("【初始化ZooKeeper连接状态....】={}",zooKeeper.getState()); }catch (Exception e){ logger.error("初始化ZooKeeper连接异常....】={}",e); } return zooKeeper; } }
@Component public class ZkApi { private static final Logger logger = LoggerFactory.getLogger(ZkApi.class); @Autowired private ZooKeeper zkClient; /** * 判断指定节点是否存在 */ public Stat exists(String path, boolean needWatch){ return zkClient.exists(path,needWatch); } public Stat exists(String path,Watcher watcher ){ return zkClient.exists(path,watcher); } /** * 创建持久化节点 */ public boolean createNode(String path, String data){ zkClient.create(path,data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); return true; } /** * 修改持久化节点 */ public boolean updateNode(String path, String data){ //zk的数据版本是从0开始计数的。如果客户端传入的是-1,则表示zk服务器需要基于最新的数据进行更新。如果对zk的数据节点的更新操作没有原子性要求则可以使用-1. //version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查 zkClient.setData(path,data.getBytes(),-1); return true; } /** * 删除持久化节点 */ public boolean deleteNode(String path){ //version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查 zkClient.delete(path,-1); return true; } /** * 获取当前节点的子节点(不包含孙子节点) * @param path 父节点path */ public List<String> getChildren(String path) throws KeeperException, InterruptedException{ List<String> list = zkClient.getChildren(path, false); return list; } /** * 获取指定节点的值 * @param path * @return */ public String getData(String path,Watcher watcher){ Stat stat=new Stat(); byte[] bytes=zkClient.getData(path,watcher,stat); return new String(bytes); } @PostConstruct public void init(){ String path="/zk-watcher-2"; createNode(path,"测试"); String value=getData(path,new WatcherApi()); // 删除节点出发 监听事件 deleteNode(path); } }
public class WatcherApi implements Watcher {
private static final Logger logger = LoggerFactory.getLogger(WatcherApi.class);
@Override
public void process(WatchedEvent event) {
logger.info("【Watcher监听事件】={}",event.getState());
logger.info("【监听路径为】={}",event.getPath());
logger.info("【监听的类型为】={}",event.getType()); // 三种监听类型: 创建,删除,更新
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。