工作原理
Zookeeper 分布式锁的核心思想是利用 Zookeeper 的临时有序节点+Watcher监听机制来实现锁的获取和释放。具体步骤如下:
- 创建临时有序节点:每个客户端要获取锁时,需要在 Zookeeper 的某个路径下创建一个临时有序节点。其中:
- 有序是为了保证每个节点都会被分配一个唯一的递增序号;
- 临时是为了保证持有锁的线程异常退出后锁正常释放。
- 判断是否有资格获取锁:如果当前客户端创建的节点是序号最小的节点,则获取锁成功。否则监听前一个节点的删除事件。
- 释放锁:客户端释放锁只需要删除自己创建的节点。此时如果有其他客户端正在等待锁(当前客户端只可能被一个其他客户端监听),监听到节点删除事件后,会重新检查是否可以获取锁。
ZooKeeper 这种首尾相接、后面监听前面的方式,可以有效避免惊群效应。当一个节点释放锁后,只有它后面的那一个节点才做出反应。原理可以概括为下图:

自定义分布式锁实现
说明:实现自定义分布式锁时,Zookeeper Java客户端采用Curator
首先定义一个Zookeeper Client工厂类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class ClientFactory { private static final String connectionString = "127.0.0.1:2181"; private static final int BASE_SLEEP_TIME = 1000; private static final int MAX_RETRIES = 3; private static volatile CuratorFramework zkClient;
public static CuratorFramework getClient() { if (zkClient == null) { synchronized (ClientFactory.class) { if (zkClient == null) { createSimple(); } } } return zkClient; }
public static void createSimple() { ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES); zkClient = CuratorFrameworkFactory.newClient(connectionString, retryPolicy); zkClient.start(); }
public static void createWithOptions(int connectionTimeoutMs, int sessionTimeoutMs) { ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES); zkClient = CuratorFrameworkFactory.builder() .connectString(connectionString) .retryPolicy(retryPolicy) .connectionTimeoutMs(connectionTimeoutMs) .sessionTimeoutMs(sessionTimeoutMs) .build(); zkClient.start(); } }
|
定义Lock接口并实现:
1 2 3 4 5
| public interface Lock { boolean lock() throws Exception;
boolean unlock() throws Exception; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
| public class ZkLock implements Lock{
private String zkPath; private String lockPrefix; private long waitTime; CuratorFramework zkClient; private Thread thread; private String lockPath; private String waitPath; final AtomicInteger lockCount = new AtomicInteger(0);
public ZkLock(String zkPath) throws Exception { this.zkPath = zkPath; this.lockPrefix = zkPath + "/seq-"; this.waitTime = 0L; this.zkClient = ClientFactory.getClient(); try { if (zkClient.checkExists().forPath(zkPath) == null) { zkClient.create().creatingParentsIfNeeded().forPath(zkPath); } } catch (Exception e) { e.printStackTrace(); } } public ZkLock(String zkPath, long waitTime) { this.zkPath = zkPath; this.lockPrefix = zkPath + "/seq-"; this.waitTime = waitTime; this.zkClient = ClientFactory.getClient(); try { if (zkClient.checkExists().forPath(zkPath) == null) { zkClient.create().creatingParentsIfNeeded().forPath(zkPath); } } catch (Exception e) { e.printStackTrace(); } }
@Override public boolean lock() throws Exception { synchronized (this) { if (lockCount.get() == 0) { thread = Thread.currentThread(); lockCount.incrementAndGet(); } else { if (!thread.equals(Thread.currentThread())) { return false; } lockCount.incrementAndGet(); return true; } } return tryLock(); }
private boolean tryLock() throws Exception { lockPath = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(lockPrefix); List<String> childList = zkClient.getChildren().forPath(zkPath); if (childList.size() == 1) { return true; } else { Collections.sort(childList); String curNode = lockPath.substring(zkPath.length() + 1); int index = childList.indexOf(curNode); if (index < 0) { throw new Exception("加锁异常"); } else if (index == 0) { return true; } else { waitPath = zkPath + "/" + childList.get(index - 1); final CountDownLatch waitLatch = new CountDownLatch(1); Watcher w = new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) { System.out.println("监听到节点删除事件:" + watchedEvent); waitLatch.countDown(); } } }; zkClient.getData().usingWatcher(w).forPath(waitPath); if (waitTime == 0L) { waitLatch.await(); return true; } else { return waitLatch.await(waitTime, TimeUnit.SECONDS); } } } }
@Override public boolean unlock() throws Exception { if (!thread.equals(Thread.currentThread())) { return false; } int newLockCount = lockCount.decrementAndGet(); if (newLockCount < 0) { throw new Exception("解锁异常"); } else if (newLockCount > 0) { return true; } else { try { if (zkClient.checkExists().forPath(lockPath) != null) { zkClient.delete().forPath(lockPath); } } catch (Exception e) { e.printStackTrace(); return false; } return true; } } }
|
测试自定义分布式锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| public class ZkLockTest {
public static void main(String[] args) throws Exception { System.out.println("开始测试ZK分布式锁...");
new Thread(new Runnable() { @Override public void run() { Lock zkLock = new ZkLock("/test/lock", 15L); System.out.println("线程1启动"); try { boolean lock = zkLock.lock(); if (lock) { System.out.println("线程1获取到锁"); Thread.sleep(10*1000); System.out.println("线程1释放锁"); zkLock.unlock(); } else { System.out.println("线程1获取锁失败"); } } catch (Exception e) { throw new RuntimeException(e); } } }).start();
new Thread(new Runnable() { @Override public void run() { Lock zkLock = new ZkLock("/test/lock", 15L); System.out.println("线程2启动"); try { boolean lock = zkLock.lock(); if (lock) { System.out.println("线程2获取到锁"); Thread.sleep(10*1000); System.out.println("线程2释放锁"); zkLock.unlock(); } else { System.out.println("线程2获取锁失败"); } } catch (Exception e) { throw new RuntimeException(e); } } }).start();
while(true); } }
|
测试结果如下,这里我把zookeeper的日志信息折叠了:

Curator分布式锁
在Curator中已经帮我们实现好了分布式锁,包含五种锁方案:
- InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
- InterProcessMutex:分布式可重入排它锁
- InterProcessReadWriteLock:分布式读写锁
- InterProcessMultiLock:将多个锁作为单个实体管理的容器
- InterProcessSemaphoreV2:共享信号量
测试一下可重入锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public class InterProcessMutexTest {
public static void main(String[] args) { CuratorFramework zkClient = ClientFactory.getClient(); InterProcessMutex zkMutex = new InterProcessMutex(zkClient, "/test/mutex");
new Thread(new Runnable() { @Override public void run() { System.out.println("线程1启动"); try { zkMutex.acquire(); System.out.println("线程1获取到锁"); Thread.sleep(2000); System.out.println("线程1释放锁"); zkMutex.release(); } catch (Exception e) { throw new RuntimeException(e); } } }).start();
new Thread(new Runnable() { @Override public void run() { System.out.println("线程2启动"); try { zkMutex.acquire(); System.out.println("线程2获取到锁"); Thread.sleep(2000); System.out.println("线程2释放锁"); zkMutex.release(); } catch (Exception e) { throw new RuntimeException(e); } } }).start(); } }
|
结果如下:


Zookeeper 分布式锁对比Redis 分布式锁
- Redis 是 AP 架构,更注重可用性和分区容错性,主从复制机制在某些情况下可能会导致数据不一致,尤其是在网络分区或故障恢复时。因此更适合需要高性能的场景。
- ZooKeeper 是 CP 架构,会通过原子广播保证数据的一致性,适合需要强一致性的场景。
参考
Zookeeper实战——分布式锁实现以及原理_zk分布式锁实现原理-CSDN博客
__END__