/** *创建同步节点demo **/ public class CreateDemo implements Watcher { private static final CountDownLatch LATCH = new CountDownLatch(1);

public static void main(String[] args) throws IOException, InterruptedException, KeeperException { ZooKeeper zk = new ZooKeeper(“127.0.0.1:2181”, 5000,new CreateDemo()); LATCH.await(); //等待连接上zk以后 String path1 = zk.create(“/test01”, “test01”.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println(“创建无序节点:”+path1);

String path2 = zk.create(“/test02”, “test02”.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(“创建有序节点:”+path2); }

@Override public void process(WatchedEvent event) { if(Event.KeeperState.SyncConnected == event.getState()){ LATCH.countDown(); } } }

我们来运行一下,可以看到控制台输出的内容:

创建无序节点:/test01 创建有序节点:/test020000000002

创建异步节点

创建异步节点,我们需要注意传递ctx参数,以及StringCallback的实例,并且我们需要注意的是,create方法并不会阻塞后续业务执行:

/** *创建异步节点demo **/ public class CreateAyncDemo implements Watcher { private static final CountDownLatch LATCH = new CountDownLatch(1);

public static void main(String[] args) throws IOException, InterruptedException, KeeperException { ZooKeeper zk = new ZooKeeper(“127.0.0.1:2181”, 5000,new CreateAyncDemo()); LATCH.await(); //等待连接上zk以后 zk.create(“/testAsyn001”, “testAsyn001”.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new IStringCallback(),“我是上下文”);

zk.create(“/testAsyn002”, “testAsyn002”.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new IStringCallback(),“我是上下文”); Thread.sleep(Integer.MAX_VALUE); }

@Override public void process(WatchedEvent event) { if(Event.KeeperState.SyncConnected == event.getState()){ LATCH.countDown(); } }

} class IStringCallback implements AsyncCallback.StringCallback{ @Override public void processResult(int rc, String path, Object ctx, String name) { System.out.println(“创建节点完毕的回调函数:[rc:”+rc+“,path:”+path+“,ctx:”+ctx+“,name:”+name+“]”); } }

我们使用Thread.sleep阻塞一下当前的业务,运行可以看到控制台成功输出:

创建节点完毕的回调函数:[rc:0,path:/testAsyn001,ctx:我是上下文,name:/testAsyn001] 创建节点完毕的回调函数:[rc:0,path:/testAsyn002,ctx:我是上下文,name:/testAsyn0020000000009]

删除节点

客户端通过delete方法来删除某一个节点,delete方法如下:

//同步删除 public void delete(final String path, int version) throws InterruptedException, KeeperException //异步删除 public void delete(final String path, int version, VoidCallback cb,Object ctx)

可以看到删除节点操作和创建节点一样,支持同步和异步操作,但是这里我们需要注意的是,**在Zookeeper中,只运行按照顺序删除节点,即子节点存在无法直接删除父节点,必须优先删除下面的所有子节点!**接下来我们编写一个案例,删除节点:

public class DeleteDemo implements Watcher { private static final CountDownLatch LATCH = new CountDownLatch(1);

public static void main(String[] args) throws IOException, InterruptedException, KeeperException { ZooKeeper zk = new ZooKeeper(“127.0.0.1:2181”, 5000,new DeleteDemo()); LATCH.await(); //等待连接上zk以后 String path1 = zk.create(“/test01”, “test01”.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println(“创建无序节点:”+path1);

String path2 = zk.create(“/test02”, “test02”.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(“创建有序节点:”+path2); //触发同步删除 zk.delete(“/test01”,0); //触发异步删除 zk.delete(“/test02”,0,new IVoidCallback(),“我是触发异步删除的数据”); Thread.sleep(Integer.MAX_VALUE); }

@Override public void process(WatchedEvent event) { if(Event.KeeperState.SyncConnected == event.getState()){ LATCH.countDown(); } } }

class IVoidCallback implements AsyncCallback.VoidCallback{

@Override public void processResult(int rc, String path, Object ctx) { System.out.println(“异步删除节点触发:[path:”+path+“,rc:”+rc+“,ctx:”+ctx+“]”); } }

运行以后,可以看到控制台的输出如下:

创建无序节点:/test01 创建有序节点:/test020000000011 异步删除节点触发:[path:/test02,rc:-101,ctx:我是触发异步删除的数据]

获取数据

在Zookeeper中有两种获取数据的接口,一种是获取当前节点的相关数据信息,一种是获取当前节点下的子节点相关的数据信息,接下来我们来看看这两种Api接口

getData

客户端通过getData方法可以获取当前节点的相关数据内容,方法如下:

public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException //第二个 public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException //第三个 public void getData(final String path, Watcher watcher,DataCallback cb, Object ctx) //第四个 public void getData(String path, boolean watch, DataCallback cb, Object ctx)

可见这里也提供了同步获取数据和异步获取数据的方法,接下来我们来看看对应参数的说明:

path

path是需要获取数据的节点所在的路径

watcher

watcher是用来注册的通知接口,如果节点发生变更,就会通过当前的接口通知客户端

stat

stat是用来描述该数据节点的状态信息,如果我们需要获取当前节点的状态信息,可以在客户端创建该变量传入方法中,有服务端替换该变量

cb

cb是异步获取数据的时候注册的回调函数接口,类型为DataCallback

ctx

ctx是在异步获取数据过程中传递给异步回调函数接口的实例,一般用来传递上下文

接下来,我们来编写代码获取一下节点的内容数据:

public class GetDataDemo implements Watcher { private static final CountDownLatch LATCH = new CountDownLatch(1);

public static void main(String[] args) throws IOException, KeeperException, InterruptedException { ZooKeeper zk = new ZooKeeper(“127.0.0.1:2181”,5 * 1000, new GetDataDemo()); LATCH.await(); String path = zk.create(“/testGetData01”, “/testGetData01”.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println(“创建节点成功:”+path); //同步调用查询接口 Stat stat = new Stat(); byte[] data = zk.getData(“/testGetData01”, true, stat); System.out.println(“同步获取当前节点内容:”+new String(data)+“,stat:”+stat); //异步调用查询接口 zk.getData(“/testGetData01”, true,new IDataCallback(),“我是异步获取数据的回调函数”); Thread.sleep(5 * 1000); }

@Override public void process(WatchedEvent event) { if(Event.KeeperState.SyncConnected == event.getState()){ LATCH.countDown(); } } }

class IDataCallback implements AsyncCallback.DataCallback{

@Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { System.out.println(“异步获取节点数据:[path”+path+“,ctx:”+ctx+“,rc:”+rc+“,data:”+new String(data)+“,”+data.toString()+“,stat:”+stat+“]”); } }

运行以后,控制台的输出如下:

创建节点成功:/testGetData01 同步获取当前节点内容:/testGetData01,stat:40,40,1579166587745,1579166587745,0,0,0,72059329994162178,14,0,40

异步获取节点数据:[path/testGetData01,ctx:我是异步获取数据的回调函数,rc:0,data:/testGetData01,[B@22987e0b,stat:40,40,1579166587745,1579166587745,0,0,0,72059329994162178,14,0,40 ]

getChildren

如果当前客户端想要获取当前节点下的子节点的内容数据,就需要使用getChildren方法,方法的定义如下:

public List getChildren(final String path, Watcher watcher) throws KeeperException, InterruptedException //第二个 public List getChildren(String path, boolean watch)throws KeeperException, InterruptedException //第三个 public void getChildren(final String path, Watcher watcher,ChildrenCallback cb, Object ctx) //第四个 public void getChildren(String path, boolean watch, ChildrenCallback cb,Object ctx) //第五个 public List getChildren(final String path, Watcher watcher,Stat stat) hrows KeeperException, InterruptedException //第六个 public List getChildren(String path, boolean watch, Stat stat) hrows KeeperException, InterruptedException //第七个 public void getChildren(final String path, Watcher watcher,Children2Callback cb, Object ctx) //第八个 public void getChildren(String path, boolean watch, Children2Callback cb,Object ctx)

可见与getData等Api一样,都提供了多个场景下使用的api接口,并且都有同步异步两种Api的使用,而这里的参数前面都有介绍,可以参照上面的参数说明。接下来,我们来看看同步获取子节点数据与异步Api的使用:

public class GetChildrenDemo implements Watcher { private static final CountDownLatch LATCH = new CountDownLatch(1);

public static void main(String[] args) throws IOException, InterruptedException, KeeperException { ZooKeeper zk = new ZooKeeper(“127.0.0.1:2181”,5 * 1000, new GetChildrenDemo()); LATCH.await(); String path = zk.create(“/testGetChildren01”, “/testGetChildren01”.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println(“创建节点成功:”+path); //同步调用查询接口 Stat stat = new Stat(); List data = zk.getChildren(“/testGetChildren01”, true); System.out.println(“同步获取当前节点下的子节点:”+ data); //异步调用查询接口 zk.getChildren(“/testGetChildren01”, true,new IChildrenCallback(),“我是异步获取子节点的回调函数”); Thread.sleep(5 * 1000); }

@Override public void process(WatchedEvent event) { if(Event.KeeperState.SyncConnected == event.getState()){ LATCH.countDown(); } } }

class IChildrenCallback implements AsyncCallback.ChildrenCallback{

@Override public void processResult(int rc, String path, Object ctx, List children) { System.out.println(“异步获取获取子节点:[rc:”+rc+“,path:”+path+“,ctx:”+ctx+“,children:”+children); } }

可以看到当前的节点刚刚创建,并没有子节点,所以获得的子节点列表为空:

创建节点成功:/testGetChildren01 同步获取当前节点下的子节点:[] 异步获取获取子节点:[rc:0,path:/testGetChildren01,ctx:我是异步获取子节点的回调函数,children:[]

修改节点

在创建节点以后,如果客户端想要更改节点的内容,这个时候就需要使用修改的Api,在zookeeper中修改节点的内容使用setData方法,如下:

//同步修改节点数据 Stat setData(final String path, byte d ata[], in t version) //异步修改节点数据 void setData(final String path, byte data[], in t version, StatCallback cb, Object ctx)

zookeeper的修改Api仅有两个,对应同步和异步操作,并且在操作过程中希望传入对应的版本,防止误操作,接下来我们来编码实践:

public class SetDataDemo implements Watcher { private static final CountDownLatch LATCH = new CountDownLatch(1);

public static void main(String[] args) throws IOException, InterruptedException, KeeperException { ZooKeeper zk = new ZooKeeper(“127.0.0.1:2181”,5 * 1000, new SetDataDemo()); LATCH.await(); Stat stat = new Stat(); String path = zk.create(“/testSetData”, “/testSetData”.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,stat); System.out.println(“创建节点成功:”+path); //同步修改数据 Stat stat1 = zk.setData(path, “/testSetData1”.getBytes(), stat.getVersion()); byte[] data = zk.getData(path, true, stat); System.out.println(“修改后的内容:”+new String(data)+“,stat:”+stat1); //异步修改数据 zk.setData(path,“/testSetData2”.getBytes(),stat1.getVersion(),new IStatCallback(),“状态异步修改”); Thread.sleep(5 * 1000); }

@Override public void process(WatchedEvent event) { if(Event.KeeperState.SyncConnected == event.getState()){ LATCH.countDown(); } } }

class IStatCallback implements AsyncCallback.StatCallback{

@Override public void processResult(int rc, String path, Object ctx, Stat stat) { System.out.println(“修改数据成功:[rc:”+rc+“,path:”+path+“,ctx:”+ctx+“,stat:”+stat+“]”); } }

将我们创建的节点进行修改后,再次查询,并且再次异步修改,控制台输出如下,可见的确修改成功了:

创建节点成功:/testSetData 修改后的内容:/testSetData1,stat:54,55,1579243770843,1579243770939,1,0,0,72058810458177539,13,0,54

修改数据成功:[rc:0,path:/testSetData,ctx:状态异步修改,stat:54,56,1579243770843,1579243771008,2,0,0,72058810458177539,13,0,54 ]

总结

前面我们通过编码实践来学习了Zookeeper的原生Api,也发现了原生Api的一些优秀的特点,例如:

zookeeper中对节点的操作全部提供了同步和异步两种操作方式,并且有多种不同情况下的方法重载

但同时我们在操作Zookeeper的过程中也发现了一些很不灵活的点,一不小心就有可能导致开发过程中出现漏洞,例如:

1.原生Api的创建节点,必须按照层级创建,因此建议创建之前我们也要检查该节点是否存在

2.Zookeeper中节点的删除操作也必须按照层级进行删除,即当前节点下存在子节点,必须优先删除子节点,再去删除当前节点,建议删除之前优先检查一下是否存在子节点

除此之外,如果我们对某个节点进行监听,如果细心的话也会发现,原生的Api实现中,监听触发一次以后就不会再触发了。可见Zookeeper的原生Api实现对于企业级开发而言,较为复杂和不人性化,那么我们能不能将这些操作进行封装,变成简易操作?答案是肯定的,基于 Zookeeper的Api进行封装的主流框架主要有两个:

一个是个人开发的ZkClient,一个是目前Apache的开源项目-Curator,并且由于Curator版本维护更稳定,而且除了常见的zk节点操作以外,包括一些分布式方案,如选举,分布式锁,分布式队列等常见分布式操作都有进行封装,使得操作更加简单便利,因此更推荐使用Curator来操作Zookeeper

资源分享

网上学习 Android的资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。希望这份系统化的技术体系对大家有一个方向参考。

2020年虽然路途坎坷,都在说Android要没落,但是,不要慌,做自己的计划,学自己的习,竞争无处不在,每个行业都是如此。相信自己,没有做不到的,只有想不到的。祝大家2021年万事大吉。 《Android学习笔记总结+移动架构视频+大厂面试真题+项目实战源码》,点击传送门,即可获取! 5281782762)]

[外链图片转存中…(img-GRkBjxto-1715281782765)]

[外链图片转存中…(img-E3guCz6h-1715281782768)]

网上学习 Android的资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。希望这份系统化的技术体系对大家有一个方向参考。

2020年虽然路途坎坷,都在说Android要没落,但是,不要慌,做自己的计划,学自己的习,竞争无处不在,每个行业都是如此。相信自己,没有做不到的,只有想不到的。祝大家2021年万事大吉。 《Android学习笔记总结+移动架构视频+大厂面试真题+项目实战源码》,点击传送门,即可获取!

文章链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。