浅谈Zookeeper开源客户端框架Curator
zookeepercurator Curator是Netflix开源的一套ZooKeeper客户端框架. Netflix在使用ZooKeeper的过程中发现ZooKeeper自带的客户端太底层,应用方在使用的时候需要自己处理很多事情,于是在它的基础上包装了一下,提供了一套更好用的客户端框架. Netflix在用ZooKeeper的过程中遇到的问题,我们也遇到了,所以开始研究一下,首先从他在github上的源码,wiki文档以及Netflix的技术blog入手. 看完官方的文档之后,发现Curator主要解决了三类问题: 1.封装ZooKeeper client与ZooKeeper server之间的连接处理; Curator列举的ZooKeeper使用过程中的几个问题 1.初始化连接的问题: 在client与server之间握手建立连接的过程中,如果握手失败,执行所有的同步方法(比如create,getData等)将抛出异常 Curator主要从以下几个方面降低了zk使用的复杂性: 1.重试机制:提供可插拔的重试机制,它将给捕获所有可恢复的异常配置一个重试策略,并且内部也提供了几种标准的重试策略(比如指数补偿). Curator通过以上的处理,让用户专注于自身的业务本身,而无需花费更多的精力在zk本身. Curator声称的一些亮点: 日志工具 内部采用SLF4J 来输出日志 和Curator相比,另一个ZooKeeper客户端――zkClient 文档几乎没有 Curator几个组成部分 1.Client: 是ZooKeeper客户端的一个替代品,提供了一些底层处理和相关的工具方法. Client 这是一个底层的API,应用方基本对这个可以无视,较好直接从Curator Framework入手,主要包括三部分: 不间断连接管理 一种典型的用法: RetryLoop retryLoop = client.newRetryLoop(); while ( retryLoop.shouldContinue() ) { try { // perform your work ... // it's important to re-get the ZK instance as there may have been an error and the instance was re-created ZooKeeper zk = client.getZookeeper(); retryLoop.markComplete(); } catch ( Exception e ) { retryLoop.takeException(e); } } 如果在操作过程中失败,且这种失败是可重试的,而且在允许的次数内,Curator将保证操作的最终完成. RetryLoop.callWithRetry(client,new Callable() { @Override public Void call() throws Exception { // do your work here - it will get retried if needed return null; } }); 重试策略 RetryPolicy接口只有一个方法(以前版本有两个方法): public boolean allowRetry(int retryCount,long elapsedTimeMs); 在开始重试之前,allowRetry方法被调用,其参数将指定当前重试次数,和操作已消耗时间. 如果允许,将继续重试,否则抛出异常. Curator内部实现的几种重试策略: 1.ExponentialBackoffRetry:重试指定的次数,且每一次重试之间停顿的时间逐渐增加. Framework 是ZooKeeper Client更高的抽象API 自动连接管理: 当ZooKeeper客户端内部出现异常,将自动进行重连或重试,该过程对外几乎完全透明 更清晰的API: 简化了ZooKeeper原生的方法,事件等,提供流程的接口 CuratorFrameworkFactory类提供了两个方法,一个工厂方法newClient,一个构建方法build. 使用工厂方法newClient可以创建一个默认的实例,而build构建方法可以对实例进行定制. 当CuratorFramework实例构建完成,紧接着调用start()方法,在应用结束的时候,需要调用close()方法. CuratorFramework是线程安全的. 在一个应用中可以共享同一个zk集群的CuratorFramework. CuratorFramework API采用了连贯风格的接口(Fluent Interface). 所有的操作一律返回构建器,当所有元素加在一起之后,整个方法看起来就像一个完整的句子. 比如下面的操作: client.create().forPath("/head",new byte[0]); client.delete().inBackground().forPath("/head"); client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child",new byte[0]); client.getData().watched().inBackground().forPath("/test"); 方法说明: 1.create(): 发起一个create操作. 可以组合其他方法 (比如mode 或background) 最后以forPath()方法结尾 通知(Notification) Curator的相关代码已经更新了,里面的接口已经由ClientListener改成CuratorListener了,而且接口中去掉了clientCloseDueToError方法. 只有一个方法: UnhandledErrorListener接口用来对异常进行处理. CuratorEvent(在以前版本为ClientEvent)是对各种操作触发相关事件对象(POJO)的一个完整封装,而事件对象的内容跟事件类型相关,下面是对应关系:
名称空间(Namespace) 因为一个zk集群会被多个应用共享,为了避免各个应用的zk patch冲突,Curator Framework内部会给每一个Curator Framework实例分配一个namespace(可选). 这样你在create ZNode的时候都会自动加上这个namespace作为这个node path的root. 使用代码如下: CuratorFramework client = CuratorFrameworkFactory.builder().namespace("MyApp") ... build(); … client.create().forPath("/test",data); // node was actually written to: "/MyApp/test" Recipe Curator实现ZooKeeper的所有recipe(除了两段提交) 选举 集群领导选举(leader election) 锁服务 共享锁: 全局同步分布式锁,同一时间两台机器只有一台能获得同一把锁. 队列(Queue) 分布式队列:采用持久顺序zk node来实现FIFO队列,如果有多个消费者,可以使用LeaderSelector来保证队列的消费者顺序 BlockingQueueConsumer: JDK阻塞队列的分布式版本 关卡(Barrier) 分布式关卡:一堆客户端去处理一堆任务,只有所有的客户端都执行完,所有客户端才能继续往下处理 双分布式关卡:同时开始,同时结束 计数器(Counter) 共享计数器:所有客户端监听同一个znode path,并共享一个的integer计数值 工具类 Path Cache Path Cache用于监听ZNode的子节点的变化,当add,update,remove子节点时将改变Path Cache state,同时返回所有子节点的data和state. Curator中采用了PathChildrenCache类来处理Path Cache,状态的变化则采用PathChildrenCacheListener来监听. 相关用法参见TestPathChildrenCache测试类 注意: 当zk server的数据发生变化,zk client会出现不一致,这个需要通过版本号来识别这种状态的变化 Test Server 用来在测试中模拟一个本地进程内ZooKeeper Server. Test Cluster 用来在测试中模拟一个ZooKeeper Server集群 ZKPaths工具类 提供了和ZNode相关的path处理工具方法: 1.getNodeFromPath: 根据给定path获取node name. i.e. "/one/two/three" -> "three" 2.mkdirs: 根据给定路径递归创建所有node 3.getSortedChildren: 根据给定路径,返回一个按序列号排序的子节点列表 4.makePath: 根据给定的path和子节点名,创建一个完整path EnsurePath工具类 直接看例子,具体的说就是调用多次,只会执行一次创建节点操作. EnsurePath ensurePath = new EnsurePath(aFullPathToEnsure); ... String nodePath = aFullPathToEnsure + "/foo"; ensurePath.ensure(zk); // first time syncs and creates if needed zk.create(nodePath,...); ... ensurePath.ensure(zk); // subsequent times are NOPs zk.create(nodePath,...); Notification事件处理 Curator对ZooKeeper的事件Watcher进行了封装处理,然后实现了一套监听机制. 提供了几个监听接口用来处理ZooKeeper连接状态的变化 当连接出现异常,将通过ConnectionStateListener接口进行监听,并进行相应的处理,这些状态变化包括: 1.暂停(SUSPENDED): 当连接丢失,将暂停所有操作,直到连接重新建立,如果在规定时间内无法建立连接,将触发LOST通知 从com.netflix.curator.framework.imps.CuratorFrameworkImpl.validateConnection(CuratorEvent)方法中我们可以知道,Curator分别将ZooKeeper的Disconnected,Expired,SyncConnected三种状态转换成上面三种状态. 总结 以上就是本文关于浅谈Zookeeper开源客户端框架Curator的全部内容,感兴趣的朋友可以参阅:zookeeper watch机制的理解、为zookeeper配置相应的acl权限、apache zookeeper使用方法实例详解等,希望对大家有所帮助。如有不足之处,欢迎留言指正。感谢朋友们对本站的支持! (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |