ZooKeeper Java客户端
2020年5月22日约 2428 字...
ZooKeeper Java客户端
作者:IT王小二
这里介绍两个客户端,一个为 ZooKeeper 原生客户端,一个为 ZkClient。
首先创建一个maven项目,pom文件中添加依赖。
<!-- 原生 -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.12</version>
</dependency>
<!-- zkclient -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
一、ZooKeeper原生客户端
之前安装过程看到 ZooKeeper 依赖 jdk 环境,可以看出它的源码使用了Java语言基础,学习原生客户端对于以后看源码有帮助,接下来看一看使用方式。
1. 创建会话
/**
* @Author SunnyBear
* @Description 创建Session
*/
public class TestCreateSession {
/**
* ZooKeeper服务地址
*/
private final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";
/**
* 会话超时时间
*/
private static final int SESSION_TIMEOUT = 30000;
public static void main(String[] args) throws IOException, InterruptedException {
new TestCreateSession().testSession1();
System.out.println("--------------------------华丽的分割线--------------------------");
new TestCreateSession().testSession2();
}
/**
* 获得session的方式,这种方式可能会在ZooKeeper还没有获得连接的时候就已经对ZK进行访问了
* 测试可以发现连接状态为CONNECTING,而不是CONNECTED
*/
public void testSession1() throws IOException {
ZooKeeper zooKeeper = new ZooKeeper(SERVER, SESSION_TIMEOUT, null);
System.out.println("zooKeeper: " + zooKeeper);
System.out.println("zooKeeper.getState(): " + zooKeeper.getState());
}
/**
* 发令枪
*/
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
/**
* 使用发令枪对获得Session的方式进行优化,在ZooKeeper初始化完成以前先等待,等待完成后再进行后续操作
*/
public void testSession2() throws IOException, InterruptedException {
ZooKeeper zooKeeper = new ZooKeeper(SERVER, SESSION_TIMEOUT, new Watcher() {
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
// 状态为已连接时才进行后续操作
connectedSemaphore.countDown();
System.out.println("状态为已连接。。");
}
}
});
connectedSemaphore.await();
System.out.println("zooKeeper: " + zooKeeper);
System.out.println("zooKeeper.getState(): " + zooKeeper.getState());
}
}
2. 基本操作
/**
* @Author SunnyBear
* @Description 基本操作
*/
public class TestJavaApi implements Watcher {
private static final int SESSION_TIMEOUT = 10000;
private static final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";
private static final String ZK_PATH = "/SunnyBearApiTest";
private static final String ZK_DATA = "我是SunnyBearApiTest的数据";
private ZooKeeper zk = null;
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
/**
* 创建连接
* @param server zk服务器地址列表
* @param sessionTimeout session超时时间
*/
public void createConnection(String server, int sessionTimeout) {
try {
zk = new ZooKeeper(server, sessionTimeout, this);
connectedSemaphore.await();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 收到来自Server的Watcher通知,然后调用的方法处理
*/
public void process(WatchedEvent watchedEvent) {
System.out.println("收到事件通知:" + watchedEvent);
if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
connectedSemaphore.countDown();
}
}
/**
* 关闭连接
*/
public void releaseConnection() {
if (this.zk != null) {
try {
this.zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 创建节点
* @param path 节点path
* @param data 初始数据内容
* @return 是否创建成功
*/
public boolean createPath(String path, String data) {
try {
/**
* ZooDefs.Ids.OPEN_ACL_UNSAFE:节点权限
* CreateMode.EPHEMERAL:节点类型为临时节点
*/
String createPath = this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("创建节点path:" + createPath);
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
/**
* 读取指定节点数据
* @param path 节点path
* @return 节点内容
*/
public String readData(String path) {
try {
String data = new String(this.zk.getData(path, false, null));
System.out.println("读取数据成功path:" + path + ",data:" + data);
return data;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return "";
}
/**
* 更新指定节点数据
* @param path 节点path
* @param data 数据data
* @return 是否成功
*/
public boolean writeData(String path, String data) {
try {
System.out.println("更新数据成功,path:" + path + ", stat: "
+ this.zk.setData(path, data.getBytes(), -1));
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
/**
* 删除节点
* @param path 节点path
* @return 是否成功
*/
public boolean deleteNode(String path) {
try {
this.zk.delete(path, -1);
System.out.println("删除节点成功,path:" + path);
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
public static void main(String[] args) {
TestJavaApi testJavaApi = new TestJavaApi();
testJavaApi.createConnection(SERVER, SESSION_TIMEOUT);
if (testJavaApi.createPath(ZK_PATH, ZK_DATA)) {
System.out.println("创建后数据内容:" + testJavaApi.readData(ZK_PATH));
testJavaApi.writeData(ZK_PATH, "我是SunnyBearApiTest修改后的数据");
System.out.println("更新后数据内容:" + testJavaApi.readData(ZK_PATH));
testJavaApi.deleteNode(ZK_PATH);
}
testJavaApi.releaseConnection();
}
}
3. 监听机制
Zookeeper采用了Watcher机制实现数据的发布/订阅功能。该机制在被订阅对象发生变化时会异步通知客户端,因此客户端不必在Watcher注册后轮询阻塞,从而减轻了客户端压力,Watcher是一次性的,如果被触发了需要重新注册。
/**
* @Author SunnyBear
* @Description 监听机制
*/
public class TestWatcher implements Watcher {
private AtomicInteger seq = new AtomicInteger();
private static final int SESSION_TIMEOUT = 100000;
private static final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";
private static final String PARENT_PATH = "/testWatch";
private static final String CHILDREN_PATH = "/testWatch/children";
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
private ZooKeeper zk = null;
/**
* 创建连接
* @param server zk服务器地址列表
* @param sessionTimeout session超时时间
*/
public void createConnection(String server, int sessionTimeout) {
try {
zk = new ZooKeeper(server, sessionTimeout, this);
connectedSemaphore.await();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 关闭连接
*/
public void releaseConnection() {
if (this.zk != null) {
try {
this.zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 创建节点
* @param path 节点path
* @param data 初始数据内容
* @return 是否创建成功
*/
public boolean createPath(String path, String data) {
try {
// 设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)
this.zk.exists(path, true);
// 创建持久节点
String createPath = this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("创建节点path:" + createPath);
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
/**
* 读取指定节点数据
* @param path 节点path
* @param needWatch 表示是否需要注册一个watcher。true:注册默认watcher,false:不需要注册watcher
* @return 节点内容
*/
public String readData(String path, boolean needWatch) {
try {
String data = new String(this.zk.getData(path, needWatch, null));
System.out.println("读取数据成功path:" + path + ",data:" + data);
return data;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return "";
}
/**
* 更新指定节点数据
* @param path 节点path
* @param data 数据data
* @return 是否成功
*/
public boolean writeData(String path, String data) {
try {
System.out.println("更新数据成功,path:" + path + ", stat: "
+ this.zk.setData(path, data.getBytes(), -1));
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
/**
* 删除节点
* @param path 节点path
* @return 是否成功
*/
public boolean deleteNode(String path) {
try {
this.zk.delete(path, -1);
System.out.println("删除节点成功,path:" + path);
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
/**
* 判断指定节点是否存在
* @param path 节点路径
* @param needWatch 表示是否需要注册一个watcher
*/
public Stat exists(String path, boolean needWatch) {
try {
return this.zk.exists(path, needWatch);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 获取子节点
* @param path 节点路径
* @param needWatch 表示是否需要注册一个watcher
*/
private List<String> getChildren(String path, boolean needWatch) {
try {
return this.zk.getChildren(path, needWatch);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 删除测试用的所有节点
*/
public void deleteAllTestPath() {
if(this.exists(CHILDREN_PATH, false) != null){
this.deleteNode(CHILDREN_PATH);
}
if(this.exists(PARENT_PATH, false) != null){
this.deleteNode(PARENT_PATH);
}
}
/**
* 收到来自Server的Watcher通知,然后调用的方法处理
*/
public void process(WatchedEvent watchedEvent) {
System.out.println("收到事件通知:" + watchedEvent);
try {
Thread.sleep(200);
if (watchedEvent == null) {
return;
}
// 连接状态
Event.KeeperState eventState = watchedEvent.getState();
// 事件类型
Event.EventType eventType = watchedEvent.getType();
// 受影响的path
String eventPath = watchedEvent.getPath();
// 打印一下监听的信息
String eventPrefix = "[Watch - " + this.seq.incrementAndGet() + "] ";
System.out.println(eventPrefix + "收到Watcher通知");
System.out.println(eventPrefix + "连接状态:\t" + eventState.toString());
System.out.println(eventPrefix + "事件类型:\t" + eventType.toString());
if (Event.KeeperState.SyncConnected == eventState) {
switch (eventType) {
case None:
System.out.println(eventPrefix + "成功连接zk服务器");
connectedSemaphore.countDown();
break;
case NodeCreated:
System.out.println(eventPrefix + "节点创建");
Thread.sleep(200);
this.zk.exists(eventPath, true);
break;
case NodeDataChanged:
System.out.println(eventPrefix + "节点数据更新");
Thread.sleep(200);
System.out.println(eventPrefix + "数据内容: " + this.readData(PARENT_PATH, true));
break;
case NodeChildrenChanged:
System.out.println(eventPrefix + "子节点变更");
Thread.sleep(3000);
System.out.println(eventPrefix + "子节点列表:" + this.getChildren(PARENT_PATH, true));
break;
case NodeDeleted:
System.out.println(eventPrefix + "节点 " + eventPath + " 被删除");
break;
default:
break;
}
} else if (Watcher.Event.KeeperState.Disconnected == eventState) {
System.out.println(eventPrefix + "与ZK服务器断开连接");
} else if (Watcher.Event.KeeperState.AuthFailed == eventState) {
System.out.println(eventPrefix + "权限检查失败");
} else if (Watcher.Event.KeeperState.Expired == eventState) {
System.out.println(eventPrefix + "会话失效");
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
System.out.println("-------------------------------------------------------------");
}
public static void main(String[] args) throws InterruptedException {
TestWatcher zkWatch = new TestWatcher();
// 创建连接
zkWatch.createConnection(SERVER, SESSION_TIMEOUT);
Thread.sleep(1000);
// 删除所有节点
zkWatch.deleteAllTestPath();
if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "")) {
/**
* 读取数据,在操作节点数据之前先调用zookeeper的getData()方法是为了可以watch到对节点的操作。watch是一次性的
* 也就是说,如果第二次又重新调用了setData()方法,在此之前需要重新调用一次
*/
System.out.println("------------------------读取PARENT------------------------");
zkWatch.readData(PARENT_PATH, true);
// 更新数据
zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");
Thread.sleep(1000);
/**
* 读取子节点,设置对子节点变化的watch,如果不写该方法,则在创建子节点是只会输出NodeCreated,而不会输出NodeChildrenChanged,
* 也就是说创建子节点时没有watch,
* 如果是递归的创建子节点,如path="/p/c1/c2"的话,getChildren(PARENT_PATH, ture)只会在创建c1时watch,输出c1的NodeChildrenChanged,
* 而不会输出创建c2时的NodeChildrenChanged,如果watch到c2的NodeChildrenChanged,则需要再调用一次getChildren(String path, true)方法,
* 其中path="/p/c1"
*/
System.out.println("------------------------读取CHILDREN------------------------");
zkWatch.getChildren(PARENT_PATH, true);
Thread.sleep(1000);
// 创建子节点
zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "");
Thread.sleep(1000);
zkWatch.readData(CHILDREN_PATH, true);
zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");
}
Thread.sleep(20000);
// 清理节点
zkWatch.deleteAllTestPath();
Thread.sleep(1000);
zkWatch.releaseConnection();
}
}
二、ZkClient
1. 基本操作
/**
* @Author SunnyBear
* @Description ZkClient基本操作
*/
public class TestZkClientApi {
private static final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";
private static final int SESSION_TIMEOUT = 30000;
public static void main(String[] args) {
ZkClient zkClient = new ZkClient(SERVER, SESSION_TIMEOUT);
// 创建临时节点,值为null
zkClient.createEphemeral("/zkTemp");
// 创建持久节点,,值为null,如果父节点不存在则创建
zkClient.createPersistent("/zkPersistent/zk1", true);
// 创建持久节点,有值
zkClient.createPersistent("/zkPersistent/zk2", "zk2内容");
zkClient.createPersistent("/zkPersistent/zk3", "zk3内容");
// 查询节点下面的所有节点
List<String> childrenList = zkClient.getChildren("/zkPersistent");
for (String p : childrenList) {
String childrenPath = "/zkPersistent/" + p;
// 查询节点数据
String data = zkClient.readData(childrenPath);
System.out.println(childrenPath + ":" + data);
}
// 修改节点数据
zkClient.writeData("/zkPersistent/zk1", "给zk1更新了内容");
System.out.println(zkClient.readData("/zkPersistent/zk1"));
// 删除节点
zkClient.delete("/zkTemp");
// 递归删除,即包含子目录的删除
zkClient.deleteRecursive("/zkPersistent");
// 关闭连接
zkClient.close();
}
}
2. 监听机制
/**
* @Author SunnyBear
* @Description ZkClient监听的测试
*/
public class ZkClientWatcher {
private static final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";
private static final int SESSION_TIMEOUT = 30000;
public static void main(String[] args) throws InterruptedException {
// test1();
test2();
}
/**
* 订阅子节点的变化
*/
public static void test1() throws InterruptedException {
ZkClient zkClient = new ZkClient(new ZkConnection(SERVER), SESSION_TIMEOUT);
// 给父节点添加监听子节点变化
zkClient.subscribeChildChanges("/zkPersistent", new IZkChildListener() {
public void handleChildChange(String parentPath, List<String> currentChildList) throws Exception {
System.out.println("parentPath:" + parentPath);
System.out.println("currentChildList" + currentChildList);
}
});
Thread.sleep(3000);
zkClient.createPersistent("/zkPersistent");
Thread.sleep(1000);
zkClient.createPersistent("/zkPersistent/"+ "zk1", "zk1内容");
Thread.sleep(1000);
zkClient.createPersistent("/zkPersistent/" + "zk2", "zk2内容");
Thread.sleep(1000);
zkClient.delete("/super/c2");
Thread.sleep(1000);
zkClient.deleteRecursive("/super");
Thread.sleep(10000);
zkClient.close();
}
/**
* 订阅内容变化
*/
public static void test2() throws InterruptedException {
ZkClient zkClient = new ZkClient(new ZkConnection(SERVER), SESSION_TIMEOUT);
zkClient.createPersistent("/zkPersistent", "zkPersistentData");
// 对父节点添加监听子节点变化。
zkClient.subscribeDataChanges("/zkPersistent", new IZkDataListener() {
public void handleDataDeleted(String path) throws Exception {
System.out.println("删除的节点为:" + path);
}
public void handleDataChange(String path, Object data) throws Exception {
System.out.println("变更的节点为:" + path + ", 变更内容为:" + data);
}
});
Thread.sleep(3000);
zkClient.writeData("/zkPersistent", "zkPersistentDataUpdate", -1);
Thread.sleep(1000);
zkClient.delete("/zkPersistent");
Thread.sleep(10000);
}
}