加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

Java代码操作zookeeper

发布时间:2020-12-15 07:57:18 所属栏目:Java 来源:网络整理
导读:以下为一个完整JAVA操作Zookeeper项目步骤: ? 1. 项目中pom.xml中添加需要的依赖jar包信息 dependencies dependency groupIdorg.apache.zookeeper/groupId artifactIdzookeeper/artifactId version3.4.9/version /dependency dependency groupIdcom.101tec/

以下为一个完整JAVA操作Zookeeper项目步骤:

?

1. 项目中pom.xml中添加需要的依赖jar包信息

<dependencies>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.9</version>
    </dependency>
    <dependency>
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.10</version>
    </dependency>
</dependencies>

?

2. 在resource下添加log4j.properties日志打印信息

log4j.rootLogger=DEBUG,myConsole
log4j.appender.myConsole=org.apache.log4j.ConsoleAppender
log4j.appender.myConsole.ImmediateFlush=true
log4j.appender.myConsole.Target=System.out
log4j.appender.myConsole.layout=org.apache.log4j.PatternLayout
log4j.appender.myConsole.layout.ConversionPattern=[%-5p] %d(%r) --> [%t] %l: %m %x %n

?

?3. 使用Java代码操作Zookeeper

  包括创建节点、设置节点值、获取节点值、判断节点是否存在

  创建节点时,存在四种模式:(即在createZKNode方法中)

    1. CreateMode.PERSISTENT :持久节点,一旦创建就保存到硬盘上面

    2.?CreateMode.SEQUENTIAL : 顺序持久节点

    3. CreateMode.EPHEMERAL :临时节点,创建以后如果断开连接则该节点自动删除

    4. CreateMode.EPHEMERAL_SEQUENTIAL :顺序临时节点

  创建ZKOperaDemo.java类:

package com.hxc.zookeeperDemo;

import java.io.IOException;
import java.util.ArrayList;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.ZooDefs.Ids;

/**
 * 
 * @author sun_flower
 * 
 */
public class ZKOperaDemo {
    
    private static String connectString = "192.168.202.132:2181";
    private static int sessionTimeout = 50 * 1000;
    /**
     * 连接Zookeeper服务器
     * @return
     * @throws IOException
     */
    public ZooKeeper connectionZooKeeper() throws IOException {
        
        ZooKeeper zooKeeper = new ZooKeeper(connectString,sessionTimeout,new Watcher() {
            
            public void process(WatchedEvent event) {
                //可做其他操作(设置监听或观察者)
            }
        });
        return zooKeeper;
    }
    
    /**
     * 创建节点
     * 1. CreateMode.PERSISTENT :持久节点,一旦创建就保存到硬盘上面
     2.  CreateMode.SEQUENTIAL : 顺序持久节点
     3.  CreateMode.EPHEMERAL :临时节点,创建以后如果断开连接则该节点自动删除
     4.  CreateMode.EPHEMERAL_SEQUENTIAL :顺序临时节点
     * @param zooKeeper Zookeeper已经建立连接的对象
     * @param path 要创建节点的路径
     * @param data 该节点上的数据
     * @return 返回创建的节点的路径
     * @throws KeeperException
     * @throws InterruptedException
     */
    public String createZKNode(ZooKeeper zooKeeper,String path,String data) throws KeeperException,InterruptedException {
        byte[] bytesData = data.getBytes();
        //访问控制列表
        ArrayList<ACL> openAclUnsafe = Ids.OPEN_ACL_UNSAFE;
        //创建模式
        CreateMode mode = CreateMode.PERSISTENT;
        String result = zooKeeper.create(path,bytesData,openAclUnsafe,mode);
        System.out.println("创建节点成功: " + result);
        return result;
    }
    
    /**
     * 获取节点上的数据
     * @param zooKeeper Zookeeper已经建立连接的对象
     * @param path 节点路径
     * @return 返回节点上的数据
     * @throws KeeperException
     * @throws InterruptedException
     */
    public String getZKNodeData(ZooKeeper zooKeeper,String path) throws KeeperException,InterruptedException {
        byte[] data = zooKeeper.getData(path,false,new Stat());
//        System.out.println("该节点" + path + "上的数据伟: " + new String(data));
        return new String(data);
    }
    
    /**
     * 设置节点上的数据
     * @param zooKeeper Zookeeper已经建立连接的对象
     * @param path 节点路径
     * @param data
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    public Stat setZKNodeData(ZooKeeper zooKeeper,InterruptedException {
        return zooKeeper.setData(path,data.getBytes(),-1);
    }
    
    /**
     * 判断节点是否存在
     * @param zooKeeper
     * @param path 节点路径
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    public Stat isExitZKPath(ZooKeeper zooKeeper,InterruptedException {
        Stat stat = zooKeeper.exists(path,false);
        return stat;
    }
}

?

4. 测试代码TestZK.java

  1)测试连接是否成功:

  //1.测试连接是否成功
    @Test
    public void testConnection() throws IOException,InterruptedException {
        ZKOperaDemo zkOperaDemo = new ZKOperaDemo();
        ZooKeeper zooKeeper = zkOperaDemo.connectionZooKeeper();
        System.out.println("====================");
        System.out.println(zooKeeper);
        System.out.println("====================");
        Thread.sleep(Long.MAX_VALUE);
    }

?运行之后控制台打印的信息如下,最后两条可以看到一直在打印 ping ZooKeeper服务的信息(Got ping response for sessionid: 0x16c9968e7d0000e after 2ms??

[INFO ] 2019-08-21 09:42:56,068(0) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:zookeeper.version=3.4.9-1757313,built on 08/23/2016 06:50 GMT  
[INFO ] 2019-08-21 09:42:56,079(11) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:host.name=LAPTOP-L6EGT293  
[INFO ] 2019-08-21 09:42:56,079(11) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:java.version=1.8.0_60  
[INFO ] 2019-08-21 09:42:56,080(12) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:java.vendor=Oracle Corporation  
[INFO ] 2019-08-21 09:42:56,080(12) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:java.home=D:ProgramFilesJRE1.8  
[INFO ] 2019-08-21 09:42:56,081(13) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:java.class.path=D:eclipse_mySpaceszookeeperDemotargettest-classes;D:eclipse_mySpaceszookeeperDemotargetclasses;D:ProgramFileseclipsepluginsorg.junit_4.12.0.v201504281640junit.jar;D:ProgramFileseclipsepluginsorg.hamcrest.core_1.3.0.v20180420-1519.jar;C:Userssun_flower.m2repositoryorgapachezookeeperzookeeper3.4.9zookeeper-3.4.9.jar;C:Userssun_flower.m2repositoryorgslf4jslf4j-api1.6.1slf4j-api-1.6.1.jar;C:Userssun_flower.m2repositoryorgslf4jslf4j-log4j121.6.1slf4j-log4j12-1.6.1.jar;C:Userssun_flower.m2repositorylog4jlog4j1.2.16log4j-1.2.16.jar;C:Userssun_flower.m2repositoryjlinejline.9.94jline-0.9.94.jar;C:Userssun_flower.m2repositoryionettynetty3.10.5.Finalnetty-3.10.5.Final.jar;C:Userssun_flower.m2repositorycom101teczkclient.10zkclient-0.10.jar;D:ProgramFileseclipseconfigurationorg.eclipse.osgi407.cp;D:ProgramFileseclipseconfigurationorg.eclipse.osgi406.cp  
[INFO ] 2019-08-21 09:42:56,082(14) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:java.library.path=D:ProgramFilesJRE1.8bin;C:WINDOWSSunJavabin;C:WINDOWSsystem32;C:WINDOWS;C:ProgramDataOracleJavajavapath;C:Windowssystem32;C:Windows;C:WindowsSystem32Wbem;C:WindowsSystem32WindowsPowerShellv1.0;C:WindowsSystem32OpenSSH;C:Program Files (x86)NVIDIA CorporationPhysXCommon;C:Program FilesNVIDIA CorporationNVIDIA NvDLISR;D:ProgramFilesJDK1.8bin;D:ProgramFilesJDK1.8jrebin;C:WINDOWSsystem32;C:WINDOWS;C:WINDOWSSystem32Wbem;C:WINDOWSSystem32WindowsPowerShellv1.0;C:WINDOWSSystem32OpenSSH;D:ProgramFilesapache-maven-3.3.9bin;D:ProgramFilesmysql5.0bin;C:Userssun_flowerAppDataLocalMicrosoftWindowsApps;;D:ProgramFilesIntelliJIDEA2018.3.5bin;;.  
[INFO ] 2019-08-21 09:42:56,082(14) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:java.io.tmpdir=C:UsersSUN_FL~1AppDataLocalTemp  
[INFO ] 2019-08-21 09:42:56,083(15) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:java.compiler=<NA>  
[INFO ] 2019-08-21 09:42:56,083(15) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:os.name=Windows 10  
[INFO ] 2019-08-21 09:42:56,083(15) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:os.arch=amd64  
[INFO ] 2019-08-21 09:42:56,084(16) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:os.version=10.0  
[INFO ] 2019-08-21 09:42:56,084(16) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:user.name=sun_flower  
[INFO ] 2019-08-21 09:42:56,084(16) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:user.home=C:Userssun_flower  
[INFO ] 2019-08-21 09:42:56,084(16) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:user.dir=D:eclipse_mySpaceszookeeperDemo  
[INFO ] 2019-08-21 09:42:56,087(19) --> [main] org.apache.zookeeper.ZooKeeper.<init>(ZooKeeper.java:438): Initiating client connection,connectString=192.168.202.132:2181 sessionTimeout=50000 watcher=com.hxc.zookeeperDemo.ZKOperaDemo$1@6073f712  
[DEBUG] 2019-08-21 09:42:56,094(26) --> [main] org.apache.zookeeper.ClientCnxn.<clinit>(ClientCnxn.java:117): zookeeper.disableAutoWatchReset is false  
[INFO ] 2019-08-21 09:42:56,296(228) --> [main] org.apache.zookeeper.ZooKeeper.<init>(ZooKeeper.java:438): Initiating client connection,connectString=192.168.202.132:2181 sessionTimeout=50000 watcher=com.hxc.zookeeperDemo.ZKOperaDemo$1@6ea6d14e  
==================== State:CONNECTING sessionid:0x0 local:null remoteserver:null lastZxid:0 xid:1 sent:0 recv:0 queuedpkts:0 pendingresp:0 queuedevents:0
====================
[INFO ] 2019-08-21 09:42:56,317(249) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.logStartConnect(ClientCnxn.java:1032): Opening socket connection to server 192.168.202.132/192.168.202.132:2181. Will not attempt to authenticate using SASL (unknown error)  
[INFO ] 2019-08-21 09:42:56,325(257) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:876): Socket connection established to 192.168.202.132/192.168.202.132:2181,initiating session  
[DEBUG] 2019-08-21 09:42:56,327(259) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:949): Session establishment request sent on 192.168.202.132/192.168.202.132:2181  
[INFO ] 2019-08-21 09:42:56,333(265) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.onConnected(ClientCnxn.java:1299): Session establishment complete on server 192.168.202.132/192.168.202.132:2181,sessionid = 0x16c9968e7d0000d,negotiated timeout = 40000  
[INFO ] 2019-08-21 09:42:57,320(1252) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:876): Socket connection established to 192.168.202.132/192.168.202.132:2181,initiating session  
[DEBUG] 2019-08-21 09:42:57,321(1253) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:949): Session establishment request sent on 192.168.202.132/192.168.202.132:2181  
[INFO ] 2019-08-21 09:42:57,325(1257) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.onConnected(ClientCnxn.java:1299): Session establishment complete on server 192.168.202.132/192.168.202.132:2181,sessionid = 0x16c9968e7d0000e,negotiated timeout = 40000  
[DEBUG] 2019-08-21 09:43:09,671(13603) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:742): Got ping response for sessionid: 0x16c9968e7d0000d after 6ms  
[DEBUG] 2019-08-21 09:43:10,656(14588) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:742): Got ping response for sessionid: 0x16c9968e7d0000e after 2ms  

?如果控制台报如下错误(java.net.ConnectException:?Connection refused: no further information),则可能是ZooKeeper服务没有开启或者防火墙没有关闭或防火墙没有开启2181端口

[WARN ] 2019-08-21 09:59:42,274(2080) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1162): Session 0x0 for server null,unexpected error,closing socket connection and attempting reconnect  
java.net.ConnectException: Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
    at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
[DEBUG] 2019-08-21 09:59:42,276(2082) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxnSocketNIO.cleanup(ClientCnxnSocketNIO.java:203): Ignoring exception during shutdown input  

?

  2)测试连接ZooKeeper服务通后之后,就可以完整的测试其他功能了

  所有完整的测试代码:(测试创建节点、设置节点值、获取节点值、判断节点是否存在)

package com.hxc.zookeeperDemo;

import java.io.IOException;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;

public class TestZK {
    
    //1.测试连接是否成功
    @Test
    public void testConnection() throws IOException,InterruptedException {
        ZKOperaDemo zkOperaDemo = new ZKOperaDemo();
        ZooKeeper zooKeeper = zkOperaDemo.connectionZooKeeper();
        System.out.println("====================");
        System.out.println(zooKeeper);
        System.out.println("====================");
        Thread.sleep(Long.MAX_VALUE);
    }
    private ZKOperaDemo nodeOperation = new ZKOperaDemo();
    private ZooKeeper zooKeeper = null;
    {
        try {
            zooKeeper = nodeOperation.connectionZooKeeper();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //测试创建节点
    @Test
    public void testCreateZKNode() throws KeeperException,InterruptedException {
        String result = nodeOperation.createZKNode(zooKeeper,"/address","ShenZhen");
        System.out.println(result);
    }
    //测试获取节点数据
    @Test
    public void testGetZKNodeData() throws KeeperException,InterruptedException {
        String result = nodeOperation.getZKNodeData(zooKeeper,"/address");
        System.out.println(result);
    }
    //测试设置节点数据
    @Test
    public void testSetZKNodeData() throws KeeperException,InterruptedException {
        Stat stat = nodeOperation.setZKNodeData(zooKeeper,"Shen Zhen update");
        System.out.println(stat);    //结果是二进制数据
        if(null != null)
        System.out.println(stat.getCversion());
    }
    //测试节点是否存在
    @Test
    public void testIsExitZKPath() throws KeeperException,InterruptedException {
        Stat stat = nodeOperation.isExitZKPath(zooKeeper,"/addressaa");
        System.out.println(stat);    //结果是二进制数据  如果节点不存在,则返回null
        if(null != null)
            System.out.println(stat.getCversion());
    }
    
    
}

  3)在Zookeeper服务上就能看到测试后的信息了:

? ? ? ? ? ??

?5. zookeeper的通知机制

  在初次建立连接和设置节点时均可设置观察者(监听),每一次的观察只使用一次,每次使用完观察若还想监听下次的操作,需要重新设置观察者。即Watcher设置到节点上之后是一次性的,通知一次之后就会失效。所以我们在通知的回调方法中接收执行通知操作后需要再继续设置一个Watcher。

  实现持续的观察代码部分:(递归回调)

  //通知机制
    /**
     *   监听节点 获取节点上的数据  
     * @param zooKeeper Zookeeper已经建立连接的对象
     * @param path 节点路径
     * @return 返回节点上的数据
     * @throws KeeperException
     * @throws InterruptedException
     */
    public void getZKNodeData2(final ZooKeeper zooKeeper,final String path) throws KeeperException,new Watcher() {
            
            public void process(WatchedEvent event) {
                try {
                    String data2 = ZKOperaDemo.process(zooKeeper,path);
                    System.out.println("第一次调用============= " + data2 + " =================");
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },new Stat());
        System.out.println("该节点" + path + "上的数据为: " + new String(data));
        Thread.sleep(Long.MAX_VALUE);
    }
    
    public static String process(final ZooKeeper zooKeeper,new Watcher() {
            public void process(WatchedEvent event) {
                try {
                    String data = ZKOperaDemo.process(zooKeeper,path);
                    System.out.println("============= " + data + " =================");
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },new Stat());
        return new String(data);
    }
    

?测试代码:

   @Test
    public void testGetZKNodeData2() throws KeeperException,InterruptedException {
        nodeOperation.getZKNodeData2(zooKeeper,"/address");
    }

启动了这个服务后,然后向节点设置值之后,就可以在控制台打印相应的监听信息:

  1)初次启动时,控制台打印的部分主要信息:

[DEBUG] 2019-08-21 16:08:16,230(95) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:843): Reading reply sessionid:0x16cb1e90922000e,packet:: clientPath:null serverPath:null finished:false header:: 1,4  replyHeader:: 1,512,0  request:: ‘/address,T  response:: #5368656e205a68656e20757064617465,s{448,511,1566270677948,1566374882666,19,16,448}   
该节点/address上的数据为: Shen Zhen update
[DEBUG] 2019-08-21 16:08:29,564(13429) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:742): Got ping response for sessionid: 0x16cb1e90922000e after 2ms  

  2)在服务端或者在测试获取节点值的部分重新设置:(为了更直观查看,在服务端设置值)

? ? ? ? ? ??

  然后控制台上马上打印对应的观察者信息:

[DEBUG] 2019-08-21 16:10:27,104(130969) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:843): Reading reply sessionid:0x16cb1e90922000e,packet:: clientPath:null serverPath:null finished:false header:: 2,4  replyHeader:: 2,515,T  response:: #6265696a696e67,1566375027091,20,7,448}   
第一次调用============= beijing =================
[DEBUG] 2019-08-21 16:10:40,437(144302) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:742): Got ping response for sessionid: 0x16cb1e90922000e after 1ms  

?

   3)再次调用:

? ? ? ? ?

  控制台打印的信息:(会持续的监听)

[DEBUG] 2019-08-21 16:12:36,498(260363) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:843): Reading reply sessionid:0x16cb1e90922000e,packet:: clientPath:null serverPath:null finished:false header:: 3,4  replyHeader:: 3,516,T  response:: #7368616e67686169,1566375156485,21,8,448}   
============= shanghai =================
[DEBUG] 2019-08-21 16:12:49,832(273697) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:742): Got ping response for sessionid: 0x16cb1e90922000e after 1ms  

??

以上就是简单的java操作zookeeper的方式

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读