跳至主要內容

ZooKeeper Java客户端

IT王小二约 2428 字

ZooKeeper Java客户端

作者:IT王小二

博客:https://itwxe.comopen in new window

这里介绍两个客户端,一个为 ZooKeeper 原生客户端,一个为 ZkClient。

首先创建一个maven项目,pom文件中添加依赖。

<!-- 原生 -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.12</version>
</dependency>
<!-- zkclient -->
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.10</version>
</dependency>

一、ZooKeeper原生客户端

之前安装过程看到 ZooKeeper 依赖 jdk 环境,可以看出它的源码使用了Java语言基础,学习原生客户端对于以后看源码有帮助,接下来看一看使用方式。

1. 创建会话

/**
 * @Author SunnyBear
 * @Description 创建Session
 */
public class TestCreateSession {

    /**
     * ZooKeeper服务地址
     */
    private final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";

    /**
     * 会话超时时间
     */
    private static final int SESSION_TIMEOUT = 30000;

    public static void main(String[] args) throws IOException, InterruptedException {
        new TestCreateSession().testSession1();
        System.out.println("--------------------------华丽的分割线--------------------------");
        new TestCreateSession().testSession2();
    }

    /**
     * 获得session的方式,这种方式可能会在ZooKeeper还没有获得连接的时候就已经对ZK进行访问了
     * 测试可以发现连接状态为CONNECTING,而不是CONNECTED
     */
    public void testSession1() throws IOException {
        ZooKeeper zooKeeper = new ZooKeeper(SERVER, SESSION_TIMEOUT, null);
        System.out.println("zooKeeper: " + zooKeeper);
        System.out.println("zooKeeper.getState(): " + zooKeeper.getState());
    }

    /**
     * 发令枪
     */
    private CountDownLatch connectedSemaphore = new CountDownLatch(1);

    /**
     * 使用发令枪对获得Session的方式进行优化,在ZooKeeper初始化完成以前先等待,等待完成后再进行后续操作
     */
    public void testSession2() throws IOException, InterruptedException {
        ZooKeeper zooKeeper = new ZooKeeper(SERVER, SESSION_TIMEOUT, new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    // 状态为已连接时才进行后续操作
                    connectedSemaphore.countDown();
                    System.out.println("状态为已连接。。");
                }
            }
        });
        connectedSemaphore.await();
        System.out.println("zooKeeper: " + zooKeeper);
        System.out.println("zooKeeper.getState(): " + zooKeeper.getState());
    }
}

2. 基本操作

/**
 * @Author SunnyBear
 * @Description 基本操作
 */
public class TestJavaApi implements Watcher {

    private static final int SESSION_TIMEOUT = 10000;
    private static final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";
    private static final String ZK_PATH = "/SunnyBearApiTest";
    private static final String ZK_DATA = "我是SunnyBearApiTest的数据";
    private ZooKeeper zk = null;

    private CountDownLatch connectedSemaphore = new CountDownLatch(1);

    /**
     * 创建连接
     * @param server zk服务器地址列表
     * @param sessionTimeout session超时时间
     */
    public void createConnection(String server, int sessionTimeout) {
        try {
            zk = new ZooKeeper(server, sessionTimeout, this);
            connectedSemaphore.await();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 收到来自Server的Watcher通知,然后调用的方法处理
     */
    public void process(WatchedEvent watchedEvent) {
        System.out.println("收到事件通知:" + watchedEvent);
        if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
            connectedSemaphore.countDown();
        }
    }

    /**
     * 关闭连接
     */
    public void releaseConnection() {
        if (this.zk != null) {
            try {
                this.zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 创建节点
     * @param path 节点path
     * @param data 初始数据内容
     * @return 是否创建成功
     */
    public boolean createPath(String path, String data) {
        try {
            /**
             * ZooDefs.Ids.OPEN_ACL_UNSAFE:节点权限
             * CreateMode.EPHEMERAL:节点类型为临时节点
             */
            String createPath = this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            System.out.println("创建节点path:" + createPath);
            return true;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 读取指定节点数据
     * @param path 节点path
     * @return 节点内容
     */
    public String readData(String path) {
        try {
            String data = new String(this.zk.getData(path, false, null));
            System.out.println("读取数据成功path:" + path + ",data:" + data);
            return data;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "";
    }

    /**
     * 更新指定节点数据
     * @param path 节点path
     * @param data 数据data
     * @return 是否成功
     */
    public boolean writeData(String path, String data) {
        try {
            System.out.println("更新数据成功,path:" + path + ", stat: "
                    + this.zk.setData(path, data.getBytes(), -1));
            return true;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 删除节点
     * @param path 节点path
     * @return 是否成功
     */
    public boolean deleteNode(String path) {
        try {
            this.zk.delete(path, -1);
            System.out.println("删除节点成功,path:" + path);
            return true;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    public static void main(String[] args) {
        TestJavaApi testJavaApi = new TestJavaApi();
        testJavaApi.createConnection(SERVER, SESSION_TIMEOUT);
        if (testJavaApi.createPath(ZK_PATH, ZK_DATA)) {
            System.out.println("创建后数据内容:" + testJavaApi.readData(ZK_PATH));
            testJavaApi.writeData(ZK_PATH, "我是SunnyBearApiTest修改后的数据");
            System.out.println("更新后数据内容:" + testJavaApi.readData(ZK_PATH));
            testJavaApi.deleteNode(ZK_PATH);
        }
        testJavaApi.releaseConnection();
    }
}

3. 监听机制

Zookeeper采用了Watcher机制实现数据的发布/订阅功能。该机制在被订阅对象发生变化时会异步通知客户端,因此客户端不必在Watcher注册后轮询阻塞,从而减轻了客户端压力,Watcher是一次性的,如果被触发了需要重新注册。

/**
 * @Author SunnyBear
 * @Description 监听机制
 */
public class TestWatcher implements Watcher {

    private AtomicInteger seq = new AtomicInteger();
    private static final int SESSION_TIMEOUT = 100000;
    private static final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";
    private static final String PARENT_PATH = "/testWatch";
    private static final String CHILDREN_PATH = "/testWatch/children";
    private CountDownLatch connectedSemaphore = new CountDownLatch(1);
    private ZooKeeper zk = null;

    /**
     * 创建连接
     * @param server zk服务器地址列表
     * @param sessionTimeout session超时时间
     */
    public void createConnection(String server, int sessionTimeout) {
        try {
            zk = new ZooKeeper(server, sessionTimeout, this);
            connectedSemaphore.await();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 关闭连接
     */
    public void releaseConnection() {
        if (this.zk != null) {
            try {
                this.zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 创建节点
     * @param path 节点path
     * @param data 初始数据内容
     * @return 是否创建成功
     */
    public boolean createPath(String path, String data) {
        try {
            // 设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)
            this.zk.exists(path, true);
            // 创建持久节点
            String createPath = this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("创建节点path:" + createPath);
            return true;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 读取指定节点数据
     * @param path 节点path
     * @param needWatch  表示是否需要注册一个watcher。true:注册默认watcher,false:不需要注册watcher
     * @return 节点内容
     */
    public String readData(String path, boolean needWatch) {
        try {
            String data = new String(this.zk.getData(path, needWatch, null));
            System.out.println("读取数据成功path:" + path + ",data:" + data);
            return data;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "";
    }

    /**
     * 更新指定节点数据
     * @param path 节点path
     * @param data 数据data
     * @return 是否成功
     */
    public boolean writeData(String path, String data) {
        try {
            System.out.println("更新数据成功,path:" + path + ", stat: "
                    + this.zk.setData(path, data.getBytes(), -1));
            return true;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 删除节点
     * @param path 节点path
     * @return 是否成功
     */
    public boolean deleteNode(String path) {
        try {
            this.zk.delete(path, -1);
            System.out.println("删除节点成功,path:" + path);
            return true;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 判断指定节点是否存在
     * @param path 节点路径
     * @param needWatch 表示是否需要注册一个watcher
     */
    public Stat exists(String path, boolean needWatch) {
        try {
            return this.zk.exists(path, needWatch);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 获取子节点
     * @param path 节点路径
     * @param needWatch 表示是否需要注册一个watcher
     */
    private List<String> getChildren(String path, boolean needWatch) {
        try {
            return this.zk.getChildren(path, needWatch);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 删除测试用的所有节点
     */
    public void deleteAllTestPath() {
        if(this.exists(CHILDREN_PATH, false) != null){
            this.deleteNode(CHILDREN_PATH);
        }
        if(this.exists(PARENT_PATH, false) != null){
            this.deleteNode(PARENT_PATH);
        }
    }

    /**
     * 收到来自Server的Watcher通知,然后调用的方法处理
     */
    public void process(WatchedEvent watchedEvent) {
        System.out.println("收到事件通知:" + watchedEvent);
        try {
            Thread.sleep(200);
            if (watchedEvent == null) {
                return;
            }
            // 连接状态
            Event.KeeperState eventState = watchedEvent.getState();
            // 事件类型
            Event.EventType eventType = watchedEvent.getType();
            // 受影响的path
            String eventPath = watchedEvent.getPath();
            // 打印一下监听的信息
            String eventPrefix = "[Watch - " + this.seq.incrementAndGet() + "] ";
            System.out.println(eventPrefix + "收到Watcher通知");
            System.out.println(eventPrefix + "连接状态:\t" + eventState.toString());
            System.out.println(eventPrefix + "事件类型:\t" + eventType.toString());

            if (Event.KeeperState.SyncConnected == eventState) {
                switch (eventType) {
                    case None:
                        System.out.println(eventPrefix + "成功连接zk服务器");
                        connectedSemaphore.countDown();
                        break;
                    case NodeCreated:
                        System.out.println(eventPrefix + "节点创建");
                        Thread.sleep(200);
                        this.zk.exists(eventPath, true);
                        break;
                    case NodeDataChanged:
                        System.out.println(eventPrefix + "节点数据更新");
                        Thread.sleep(200);
                        System.out.println(eventPrefix + "数据内容: " + this.readData(PARENT_PATH, true));
                        break;
                    case NodeChildrenChanged:
                        System.out.println(eventPrefix + "子节点变更");
                        Thread.sleep(3000);
                        System.out.println(eventPrefix + "子节点列表:" + this.getChildren(PARENT_PATH, true));
                        break;
                    case NodeDeleted:
                        System.out.println(eventPrefix + "节点 " + eventPath + " 被删除");
                        break;
                    default:
                        break;
                }
            } else if (Watcher.Event.KeeperState.Disconnected == eventState) {
                System.out.println(eventPrefix + "与ZK服务器断开连接");
            } else if (Watcher.Event.KeeperState.AuthFailed == eventState) {
                System.out.println(eventPrefix + "权限检查失败");
            } else if (Watcher.Event.KeeperState.Expired == eventState) {
                System.out.println(eventPrefix + "会话失效");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
        System.out.println("-------------------------------------------------------------");
    }

    public static void main(String[] args) throws InterruptedException {
        TestWatcher zkWatch = new TestWatcher();
        // 创建连接
        zkWatch.createConnection(SERVER, SESSION_TIMEOUT);

        Thread.sleep(1000);

        // 删除所有节点
        zkWatch.deleteAllTestPath();

        if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "")) {
            /**
             * 读取数据,在操作节点数据之前先调用zookeeper的getData()方法是为了可以watch到对节点的操作。watch是一次性的
             * 也就是说,如果第二次又重新调用了setData()方法,在此之前需要重新调用一次
             */
            System.out.println("------------------------读取PARENT------------------------");
            zkWatch.readData(PARENT_PATH, true);
            // 更新数据
            zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");

            Thread.sleep(1000);

            /**
             * 读取子节点,设置对子节点变化的watch,如果不写该方法,则在创建子节点是只会输出NodeCreated,而不会输出NodeChildrenChanged,
             * 也就是说创建子节点时没有watch,
             * 如果是递归的创建子节点,如path="/p/c1/c2"的话,getChildren(PARENT_PATH, ture)只会在创建c1时watch,输出c1的NodeChildrenChanged,
             * 而不会输出创建c2时的NodeChildrenChanged,如果watch到c2的NodeChildrenChanged,则需要再调用一次getChildren(String path, true)方法,
             * 其中path="/p/c1"
             */
            System.out.println("------------------------读取CHILDREN------------------------");
            zkWatch.getChildren(PARENT_PATH, true);

            Thread.sleep(1000);
            // 创建子节点
            zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "");

            Thread.sleep(1000);

            zkWatch.readData(CHILDREN_PATH, true);
            zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");
        }

        Thread.sleep(20000);
        // 清理节点
        zkWatch.deleteAllTestPath();
        Thread.sleep(1000);
        zkWatch.releaseConnection();
    }
}

二、ZkClient

1. 基本操作

/**
 * @Author SunnyBear
 * @Description ZkClient基本操作
 */
public class TestZkClientApi {

    private static final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";

    private static final int SESSION_TIMEOUT = 30000;

    public static void main(String[] args) {
        ZkClient zkClient = new ZkClient(SERVER, SESSION_TIMEOUT);

        // 创建临时节点,值为null
        zkClient.createEphemeral("/zkTemp");
        // 创建持久节点,,值为null,如果父节点不存在则创建
        zkClient.createPersistent("/zkPersistent/zk1", true);
        // 创建持久节点,有值
        zkClient.createPersistent("/zkPersistent/zk2", "zk2内容");
        zkClient.createPersistent("/zkPersistent/zk3", "zk3内容");

        // 查询节点下面的所有节点
        List<String> childrenList = zkClient.getChildren("/zkPersistent");
        for (String p : childrenList) {
            String childrenPath = "/zkPersistent/" + p;
            // 查询节点数据
            String data = zkClient.readData(childrenPath);
            System.out.println(childrenPath + ":" + data);
        }

        // 修改节点数据
        zkClient.writeData("/zkPersistent/zk1", "给zk1更新了内容");
        System.out.println(zkClient.readData("/zkPersistent/zk1"));

        // 删除节点
        zkClient.delete("/zkTemp");
        // 递归删除,即包含子目录的删除
        zkClient.deleteRecursive("/zkPersistent");

        // 关闭连接
        zkClient.close();
    }
}

2. 监听机制

/**
 * @Author SunnyBear
 * @Description ZkClient监听的测试
 */
public class ZkClientWatcher {

    private static final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";

    private static final int SESSION_TIMEOUT = 30000;

    public static void main(String[] args) throws InterruptedException {
//        test1();
        test2();
    }

    /**
     * 订阅子节点的变化
     */
    public static void test1() throws InterruptedException {
        ZkClient zkClient = new ZkClient(new ZkConnection(SERVER), SESSION_TIMEOUT);

        // 给父节点添加监听子节点变化
        zkClient.subscribeChildChanges("/zkPersistent", new IZkChildListener() {
            public void handleChildChange(String parentPath, List<String> currentChildList) throws Exception {
                System.out.println("parentPath:" + parentPath);
                System.out.println("currentChildList" + currentChildList);
            }
        });

        Thread.sleep(3000);

        zkClient.createPersistent("/zkPersistent");
        Thread.sleep(1000);

        zkClient.createPersistent("/zkPersistent/"+ "zk1", "zk1内容");
        Thread.sleep(1000);

        zkClient.createPersistent("/zkPersistent/" + "zk2", "zk2内容");
        Thread.sleep(1000);

        zkClient.delete("/super/c2");
        Thread.sleep(1000);

        zkClient.deleteRecursive("/super");
        Thread.sleep(10000);

        zkClient.close();
    }

    /**
     * 订阅内容变化
     */
    public static void test2() throws InterruptedException {
        ZkClient zkClient = new ZkClient(new ZkConnection(SERVER), SESSION_TIMEOUT);

        zkClient.createPersistent("/zkPersistent", "zkPersistentData");

        // 对父节点添加监听子节点变化。
        zkClient.subscribeDataChanges("/zkPersistent", new IZkDataListener() {

            public void handleDataDeleted(String path) throws Exception {
                System.out.println("删除的节点为:" + path);
            }

            public void handleDataChange(String path, Object data) throws Exception {
                System.out.println("变更的节点为:" + path + ", 变更内容为:" + data);
            }
        });

        Thread.sleep(3000);
        zkClient.writeData("/zkPersistent", "zkPersistentDataUpdate", -1);

        Thread.sleep(1000);
        zkClient.delete("/zkPersistent");

        Thread.sleep(10000);
    }
}