工作原理

Zookeeper 分布式锁的核心思想是利用 Zookeeper 的临时有序节点+Watcher监听机制来实现锁的获取和释放。具体步骤如下:

  1. 创建临时有序节点:每个客户端要获取锁时,需要在 Zookeeper 的某个路径下创建一个临时有序节点。其中:
    • 有序是为了保证每个节点都会被分配一个唯一的递增序号;
    • 临时是为了保证持有锁的线程异常退出后锁正常释放。
  2. 判断是否有资格获取锁:如果当前客户端创建的节点是序号最小的节点,则获取锁成功。否则监听前一个节点的删除事件。
  3. 释放锁:客户端释放锁只需要删除自己创建的节点。此时如果有其他客户端正在等待锁(当前客户端只可能被一个其他客户端监听),监听到节点删除事件后,会重新检查是否可以获取锁。

ZooKeeper 这种首尾相接、后面监听前面的方式,可以有效避免惊群效应。当一个节点释放锁后,只有它后面的那一个节点才做出反应。原理可以概括为下图:

image-20250226230818712

自定义分布式锁实现

说明:实现自定义分布式锁时,Zookeeper Java客户端采用Curator

首先定义一个Zookeeper Client工厂类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ClientFactory {
//连接地址
private static final String connectionString = "127.0.0.1:2181";
//等待事件的基础单位,单位毫秒
private static final int BASE_SLEEP_TIME = 1000;
//最大重试次数
private static final int MAX_RETRIES = 3;
private static volatile CuratorFramework zkClient;

public static CuratorFramework getClient() { //单例
if (zkClient == null) {
synchronized (ClientFactory.class) {
if (zkClient == null) {
createSimple();
}
}
}
return zkClient;
}

public static void createSimple() {
//重试策略: 第一次重试等待1秒,第二次重试等待2秒,第三次重试等待4秒
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES);
zkClient = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
zkClient.start();
}

public static void createWithOptions(int connectionTimeoutMs, int sessionTimeoutMs) {
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES);
zkClient = CuratorFrameworkFactory.builder()
.connectString(connectionString)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(connectionTimeoutMs) //连接超时
.sessionTimeoutMs(sessionTimeoutMs) //会话超时
.build();
zkClient.start();
}
}

定义Lock接口并实现:

1
2
3
4
5
public interface Lock {
boolean lock() throws Exception;

boolean unlock() throws Exception;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
public class ZkLock implements Lock{

private String zkPath; //分布式锁节点,如"/test/lock"
private String lockPrefix; //子节点前缀,如"/test/lock/seq-"
private long waitTime; //超时等待
CuratorFramework zkClient; //ZK客户端
private Thread thread; //当前线程
private String lockPath; //当前加锁节点
private String waitPath; //前一个等待节点
final AtomicInteger lockCount = new AtomicInteger(0); //重入计数器

public ZkLock(String zkPath) throws Exception {
this.zkPath = zkPath;
this.lockPrefix = zkPath + "/seq-";
this.waitTime = 0L;
this.zkClient = ClientFactory.getClient();
try {
if (zkClient.checkExists().forPath(zkPath) == null) {
zkClient.create().creatingParentsIfNeeded().forPath(zkPath);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public ZkLock(String zkPath, long waitTime) {
this.zkPath = zkPath;
this.lockPrefix = zkPath + "/seq-";
this.waitTime = waitTime;
this.zkClient = ClientFactory.getClient();
try {
if (zkClient.checkExists().forPath(zkPath) == null) {
zkClient.create().creatingParentsIfNeeded().forPath(zkPath);
}
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 加锁
*/
@Override
public boolean lock() throws Exception {
//可重入
synchronized (this) {
if (lockCount.get() == 0) {
thread = Thread.currentThread();
lockCount.incrementAndGet();
} else {
if (!thread.equals(Thread.currentThread())) {
return false;
}
lockCount.incrementAndGet();
return true;
}
}
return tryLock();
}

/**
* 尝试获取锁
*/
private boolean tryLock() throws Exception {
// 创建临时子节点并获取锁zkPath下的所有子节点
lockPath = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(lockPrefix);
List<String> childList = zkClient.getChildren().forPath(zkPath);
if (childList.size() == 1) {
return true;
} else {
// 对子节点进行排序
Collections.sort(childList);
String curNode = lockPath.substring(zkPath.length() + 1);
int index = childList.indexOf(curNode);
if (index < 0) {
throw new Exception("加锁异常");
} else if (index == 0) {
//第一个节点,加锁成功
return true;
} else {
//监听前一个节点
waitPath = zkPath + "/" + childList.get(index - 1);
final CountDownLatch waitLatch = new CountDownLatch(1);
Watcher w = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType() == Event.EventType.NodeDeleted &&
watchedEvent.getPath().equals(waitPath)) {
System.out.println("监听到节点删除事件:" + watchedEvent);
waitLatch.countDown();
}
}
};
zkClient.getData().usingWatcher(w).forPath(waitPath);
if (waitTime == 0L) {
waitLatch.await();
return true;
} else {
// 如果设置了等待时间,等待指定时间
return waitLatch.await(waitTime, TimeUnit.SECONDS);
}
}
}
}

/**
* 释放锁
*/
@Override
public boolean unlock() throws Exception {
if (!thread.equals(Thread.currentThread())) {
return false;
}
int newLockCount = lockCount.decrementAndGet();
if (newLockCount < 0) {
throw new Exception("解锁异常");
} else if (newLockCount > 0) {
return true;
} else {
try {
if (zkClient.checkExists().forPath(lockPath) != null) {
zkClient.delete().forPath(lockPath);
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
}
}

测试自定义分布式锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class ZkLockTest {

public static void main(String[] args) throws Exception {
System.out.println("开始测试ZK分布式锁...");

new Thread(new Runnable() {
@Override
public void run() {
Lock zkLock = new ZkLock("/test/lock", 15L);
System.out.println("线程1启动");
try {
boolean lock = zkLock.lock();
if (lock) {
System.out.println("线程1获取到锁");
Thread.sleep(10*1000);
System.out.println("线程1释放锁");
zkLock.unlock();
} else {
System.out.println("线程1获取锁失败");
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
Lock zkLock = new ZkLock("/test/lock", 15L);
System.out.println("线程2启动");
try {
boolean lock = zkLock.lock();
if (lock) {
System.out.println("线程2获取到锁");
Thread.sleep(10*1000);
System.out.println("线程2释放锁");
zkLock.unlock();
} else {
System.out.println("线程2获取锁失败");
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}).start();

while(true);
}
}

测试结果如下,这里我把zookeeper的日志信息折叠了:

image-20250227100801296

Curator分布式锁

在Curator中已经帮我们实现好了分布式锁,包含五种锁方案:

  • InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
  • InterProcessMutex:分布式可重入排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器
  • InterProcessSemaphoreV2:共享信号量

测试一下可重入锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class InterProcessMutexTest {

public static void main(String[] args) {
CuratorFramework zkClient = ClientFactory.getClient();
InterProcessMutex zkMutex = new InterProcessMutex(zkClient, "/test/mutex");

new Thread(new Runnable() {
@Override
public void run() {
System.out.println("线程1启动");
try {
zkMutex.acquire(); //阻塞等待,也可超时等待
System.out.println("线程1获取到锁");
Thread.sleep(2000);
System.out.println("线程1释放锁");
zkMutex.release();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
System.out.println("线程2启动");
try {
zkMutex.acquire();
System.out.println("线程2获取到锁");
Thread.sleep(2000);
System.out.println("线程2释放锁");
zkMutex.release();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}).start();
}
}

结果如下:

image-20250227101721860

image-20250227101141630

Zookeeper 分布式锁对比Redis 分布式锁

  • Redis 是 AP 架构,更注重可用性和分区容错性,主从复制机制在某些情况下可能会导致数据不一致,尤其是在网络分区或故障恢复时。因此更适合需要高性能的场景。
  • ZooKeeper 是 CP 架构,会通过原子广播保证数据的一致性,适合需要强一致性的场景。

参考

Zookeeper实战——分布式锁实现以及原理_zk分布式锁实现原理-CSDN博客

__END__