ZooKeeper分布式锁的实现
2020年5月25日约 1457 字...
ZooKeeper分布式锁的实现
作者:IT王小二
在分布式的情况下,sychornized 和 Lock 已经不能满足我们的要求了,那么就需要使用第三方的锁了,这里我们就使用 ZooKeeper 来实现一个分布式锁。
一、分布式锁方案比较
方案 | 实现思路 | 优点 | 缺点 |
---|---|---|---|
利用 MySQL 的实现方案 | 利用数据库自身提供的锁机制实现,要求数据库支持行级锁 | 实现简单 | 性能差,无法适应高并发场景;容易出现死锁的情况;无法优雅的实现阻塞式锁 |
利用 Redis 的实现方案 | 使用 Setnx 和 lua 脚本机制实现,保证对缓存操作序列的原子性 | 性能好 | 实现相对复杂,有可能出现死锁;无法优雅的实现阻塞式锁 |
利用 ZooKeeper 的实现方案 | 基于 ZooKeeper 节点特性及 watch 机制实现 | 性能好,稳定可靠性高,能较好地实现阻塞式锁 | 实现相对复杂 |
二、ZooKeeper实现分布式锁
这里使用 ZooKeeper 来实现分布式锁,以50个并发请求来获取订单编号为例,描述两种方案,第一种为基础实现,第二种在第一种基础上进行了优化。
1. 方案一
流程描述:
具体代码:
OrderNumGenerator:
/**
* @Author SunnyBear
* @Description 生成随机订单号
*/
public class OrderNumGenerator {
private static long count = 0;
/**
* 使用日期加数值拼接成订单号
*/
public String getOrderNumber() throws Exception {
String date = DateTimeFormatter.ofPattern("yyyyMMddHHmmss").format(LocalDateTime.now());
String number = new DecimalFormat("000000").format(count++);
return date + number;
}
}
Lock:
/**
* @Author SunnyBear
* @Description 自定义锁接口
*/
public interface Lock {
/**
* 获取锁
*/
public void getLock();
/**
* 释放锁
*/
public void unLock();
}
AbstractLock:
/**
* @Author SunnyBear
* @Description 定义一个模板,具体的方法由子类来实现
*/
public abstract class AbstractLock implements Lock {
/**
* 获取锁
*/
@Override
public void getLock() {
if (tryLock()) {
System.out.println("--------获取到了自定义Lock锁的资源--------");
} else {
// 没拿到锁则阻塞,等待拿锁
waitLock();
getLock();
}
}
/**
* 尝试获取锁,如果拿到了锁返回true,没有拿到则返回false
*/
public abstract boolean tryLock();
/**
* 阻塞,等待获取锁
*/
public abstract void waitLock();
}
ZooKeeperAbstractLock:
/**
* @Author SunnyBear
* @Description 定义需要的服务连接
*/
public abstract class ZooKeeperAbstractLock extends AbstractLock {
private static final String SERVER_ADDR = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";
protected ZkClient zkClient = new ZkClient(SERVER_ADDR);
protected static final String PATH = "/lock";
}
ZooKeeperDistrbuteLock:
/**
* @Author SunnyBear
* @Description 真正实现锁的细节
*/
public class ZooKeeperDistrbuteLock extends ZooKeeperAbstractLock {
private CountDownLatch countDownLatch = null;
/**
* 尝试拿锁
*/
@Override
public boolean tryLock() {
try {
// 创建临时节点
zkClient.createEphemeral(PATH);
return true;
} catch (Exception e) {
// 创建失败报异常
return false;
}
}
/**
* 阻塞,等待获取锁
*/
@Override
public void waitLock() {
// 创建监听
IZkDataListener iZkDataListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
// 释放锁,删除节点时唤醒等待的线程
if (countDownLatch != null) {
countDownLatch.countDown();
}
}
};
// 注册监听
zkClient.subscribeDataChanges(PATH, iZkDataListener);
// 节点存在时,等待节点删除唤醒
if (zkClient.exists(PATH)) {
countDownLatch = new CountDownLatch(1);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 删除监听
zkClient.unsubscribeDataChanges(PATH, iZkDataListener);
}
/**
* 释放锁
*/
@Override
public void unLock() {
if (zkClient != null) {
System.out.println("释放锁资源");
zkClient.delete(PATH);
zkClient.close();
}
}
}
测试效果:使用50个线程来并发测试ZooKeeper实现的分布式锁
/**
* @Author SunnyBear
* @Description 使用50个线程来并发测试ZooKeeper实现的分布式锁
*/
public class OrderService {
private static class OrderNumGeneratorService implements Runnable {
private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();;
private Lock lock = new ZooKeeperDistrbuteLock();
@Override
public void run() {
lock.getLock();
try {
System.out.println(Thread.currentThread().getName() + ", 生成订单编号:" + orderNumGenerator.getOrderNumber());
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unLock();
}
}
}
public static void main(String[] args) {
System.out.println("----------生成唯一订单号----------");
for (int i = 0; i < 50; i++) {
new Thread(new OrderNumGeneratorService()).start();
}
}
}
2. 方案二
方案二在方案一的基础上进行优化,避免产生“羊群效应”,方案一一旦临时节点删除,释放锁,那么其他在监听这个节点变化的线程,就会去竞争锁,同时访问 ZooKeeper,那么怎么更好的避免各线程的竞争现象呢,就是使用临时顺序节点,临时顺序节点排序,每个临时顺序节点只监听它本身的前一个节点变化。
流程描述:
具体代码
具体只需要将方案一中的 ZooKeeperDistrbuteLock 改变,增加一个 ZooKeeperDistrbuteLock2,测试代码中使用 ZooKeeperDistrbuteLock2 即可测试,其他代码都不需要改变。
/**
* @Author SunnyBear
* @Description 真正实现锁的细节
*/
public class ZooKeeperDistrbuteLock2 extends ZooKeeperAbstractLock {
private CountDownLatch countDownLatch = null;
/**
* 当前请求节点的前一个节点
*/
private String beforePath;
/**
* 当前请求的节点
*/
private String currentPath;
public ZooKeeperDistrbuteLock2() {
if (!zkClient.exists(PATH)) {
// 创建持久节点,保存临时顺序节点
zkClient.createPersistent(PATH);
}
}
@Override
public boolean tryLock() {
// 如果currentPath为空则为第一次尝试拿锁,第一次拿锁赋值currentPath
if (currentPath == null || currentPath.length() == 0) {
// 在指定的持久节点下创建临时顺序节点
currentPath = zkClient.createEphemeralSequential(PATH + "/", "lock");
}
// 获取所有临时节点并排序,例如:000044
List<String> childrenList = zkClient.getChildren(PATH);
Collections.sort(childrenList);
if (currentPath.equals(PATH + "/" + childrenList.get(0))) {
// 如果当前节点在所有节点中排名第一则获取锁成功
return true;
} else {
int wz = Collections.binarySearch(childrenList, currentPath.substring(6));
beforePath = PATH + "/" + childrenList.get(wz - 1);
}
return false;
}
@Override
public void waitLock() {
// 创建监听
IZkDataListener iZkDataListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
// 释放锁,删除节点时唤醒等待的线程
if (countDownLatch != null) {
countDownLatch.countDown();
}
}
};
// 注册监听,这里是给排在当前节点前面的节点增加(删除数据的)监听,本质是启动另外一个线程去监听前置节点
zkClient.subscribeDataChanges(beforePath, iZkDataListener);
// 前置节点存在时,等待前置节点删除唤醒
if (zkClient.exists(beforePath)) {
countDownLatch = new CountDownLatch(1);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 删除对前置节点的监听
zkClient.unsubscribeDataChanges(beforePath, iZkDataListener);
}
/**
* 释放锁
*/
@Override
public void unLock() {
if (zkClient != null) {
System.out.println("释放锁资源");
zkClient.delete(currentPath);
zkClient.close();
}
}
}