欢迎访问 生活随笔!

生活随笔

当前位置: 首页 >

Curator操作ZooKeeper

发布时间:2025/3/17 32 豆豆
生活随笔 收集整理的这篇文章主要介绍了 Curator操作ZooKeeper 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

Curator极大简化了ZooKeeper的使用,增加了针对ZooKeeper集群中connection的管理。

节点的创建和删除

 

import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.data.Stat;public class CuratorBase {static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";static final int SESSION_TIMEOUT = 35000;//会话超时时间,默认为60000,单位:ms static final int CONNECTION_TIMEOUT=60000;//连接超时时间,默认为15000,单位:ms public static void main(String[] args) throws Exception {//重试策略:初试时间为10s,最大重试次数为20RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 20);//创建连接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(retryPolicy).build();//开启连接 cf.start();//建立节点 指定节点类型(不加withMode默认为持久类型节点)、路径、数据内容cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/persistent/p1","p1 value".getBytes());Thread.sleep(30000);//删除节点cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/persistent");cf.close();} }

run as--java application

线程休眠30s后,执行节点删除操作

 

节点内容的修改

import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.data.Stat;public class CuratorBase {static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";static final int SESSION_TIMEOUT = 35000;//会话超时时间,默认为60000,单位:ms static final int CONNECTION_TIMEOUT=60000;//连接超时时间,默认为15000,单位:ms public static void main(String[] args) throws Exception {//重试策略:初试时间为10s,最大重试次数为20RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 20);//创建连接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(retryPolicy).build();//开启连接 cf.start();//创建节点cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/persistent/p1","p1 value".getBytes());//cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/persistent/p2","p2 value".getBytes());//读取节点String ret1 = new String(cf.getData().forPath("/persistent/p1"));System.out.println(ret1);//修改节点cf.setData().forPath("/persistent/p1", "new p1 value".getBytes());String ret2 = new String(cf.getData().forPath("/persistent/p1"));System.out.println(ret2);cf.close();} }

Eclipse的console输出

Eclipse的ZooKeeper Explorer内容

 

节点操作的回调函数

节点的新增、修改、删除,都可以设置其回调函数。该回调函数可以输出服务器的状态码、服务器事件类型等内容。还可以加入一个线程池进行优化操作。在批量节点操作的时候,可以用线程池去规划callback,可以将很多的任务放到队列中,使用线程池中的线程将队列中的任务进行处理。线程池中线程的个数可以根据具体的机器配置而定。

下面代码中,节点的创建操作是一个异步的过程,不会阻塞主线程main的执行,代码中将主线程main休眠,子线程在执行完节点的创建操作后执行回调函数并输出相关内容。若不添加主线程休眠的代码,则主线程执行完代码后结束,此时节点创建的子线程还没有完成节点的创建,因main线程的结束子线程也结束,进而就不能完成节点创建和回调函数的执行。

import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.data.Stat;public class CuratorBase {static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";static final int SESSION_TIMEOUT = 35000;//会话超时时间,默认为60000,单位:ms static final int CONNECTION_TIMEOUT=60000;//连接超时时间,默认为15000,单位:ms public static void main(String[] args) throws Exception {//重试策略:初试时间为10s,最大重试次数为20RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 20);//创建连接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(retryPolicy).build();//开启连接 cf.start();// 绑定回调函数ExecutorService pool = Executors.newCachedThreadPool();cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).inBackground(new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {System.out.println("code:" + ce.getResultCode());System.out.println("type:" + ce.getType());System.out.println("线程为:" + Thread.currentThread().getName());}}, pool).forPath("/persistent/p2","p2 value".getBytes());System.out.println("主线程:"+Thread.currentThread().getName());Thread.sleep(Integer.MAX_VALUE);cf.close();} }

Eclipse中console输出

ZooKeeper Explorer中内容



获取子节点

 

import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.data.Stat;public class CuratorBase {static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";static final int SESSION_TIMEOUT = 35000;//会话超时时间,默认为60000,单位:ms static final int CONNECTION_TIMEOUT=60000;//连接超时时间,默认为15000,单位:ms public static void main(String[] args) throws Exception {//重试策略:初试时间为10s,最大重试次数为20RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 20);//创建连接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(retryPolicy).build();//开启连接 cf.start();// 绑定回调函数ExecutorService pool = Executors.newCachedThreadPool();cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).inBackground(new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {System.out.println("code:" + ce.getResultCode());System.out.println("type:" + ce.getType());System.out.println("线程为:" + Thread.currentThread().getName());}}, pool).forPath("/persistent/p2","p2 value".getBytes());System.out.println("主线程:"+Thread.currentThread().getName());Thread.sleep(20000);//主线程休眠20s,等待节点创建完毕// 读取子节点getChildren方法 和 判断节点是否存在checkExists方法List<String> list = cf.getChildren().forPath("/persistent");for(String p : list){System.out.println(p);}Stat stat_p1 = cf.checkExists().forPath("/persistent/p1");System.out.println(stat_p1);Stat stat_p2 = cf.checkExists().forPath("/persistent/p2");System.out.println(stat_p2);cf.close();} }

Eclipse的console输出

若上面代码将Thread.sleep(20000);删除,有时会出现下面的异常,原因是节点创建和main主线程的执行是异步的。

 

转载于:https://www.cnblogs.com/cat520/p/9412815.html

总结

以上是生活随笔为你收集整理的Curator操作ZooKeeper的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。