简介
ZooKeeper 旨在为复杂的分布式系统提供一个简单易用的协调机制,解决分布式环境下的数据一致性、配置管理、命名服务等难题,减少开发者在分布式系统开发中处理复杂协调逻辑的负担。
应用场景
- 注册中心:ZooKeeper 可以作为协调中心,实现节点的注册与发现、任务分配、集群管理等功能。
- 分布式锁:在分布式环境中,多个进程或线程可能会同时访问共享资源,使用 ZooKeeper 可以实现分布式锁,保证数据的一致性和完整性。
- 配置管理:ZooKeeper 可以集中管理配置信息,并在配置发生变化时及时通知各个节点。
数据模型
ZooKeeper 使用类似文件系统目录树的层次化结构来组织数据,其中每个节点被称为 Znode。根节点为 /
,从根节点开始可以向下创建子节点,形成一棵倒挂的树。每个 Znode 都有一个唯一的路径,类似于文件系统中的文件路径,路径由斜杠分隔的一系列名称组成,如 /parent/child
。但与文件系统不同的是,文件系统中的路径无法保存数据,只有文件可以保存数据,而所有Znode都是可以保存少量数据的。同时,每个 Znode 还包含一些元数据,如版本号、时间戳等,这些元数据用于实现数据的一致性和并发控制。

Znode有以下类型:
- 持久节点(Persistent):一旦创建,除非显式删除,否则会一直存在于 ZooKeeper 中。
- 临时节点(Ephemeral):与客户端会话绑定,当客户端会话结束(如客户端断开连接)时,临时节点会自动被删除。临时节点不能有子节点。应用场景:在分布式锁实现中,客户端创建一个临时节点表示获得锁,当客户端崩溃或主动释放锁时,临时节点自动删除,其他客户端可以尝试获取锁。
- 顺序节点(Sequential):可以是持久顺序节点或临时顺序节点。在创建顺序节点时,ZooKeeper 会在节点名称后面自动添加一个单调递增的序列号,常用于实现分布式队列、分布式选举等场景。
基本增删改查
使用curator做基本的增删改查非常简单,参考以下测试代码即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
| @SpringBootTest public class SomeTestApplicationTests {
CuratorFramework client;
@Before public void testConnect(){
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);
client = CuratorFrameworkFactory.builder() .connectString("101.132.242.94:2181") .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000) .retryPolicy(retryPolicy) .build(); client.start(); }
@Test public void testCreateNode() throws Exception { client.create().withMode(CreateMode.PERSISTENT).forPath("/test", "test".getBytes()); }
@Test public void testCreateMulti() throws Exception { String s = client.create().creatingParentsIfNeeded().forPath("/multi/p1"); System.out.println(s); }
@Test public void testGet() throws Exception { byte[] bytes = client.getData().forPath("/test"); System.out.println(new String(bytes));
List<String> strings = client.getChildren().forPath("/"); System.out.println(strings);
Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath("/test"); System.out.println(stat);
}
@Test public void testSet() throws Exception { client.setData().forPath("/test", "test2".getBytes()); }
@Test public void testSetWithVersion() throws Exception { Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath("/test"); int version = stat.getVersion(); System.out.println(version); client.setData().withVersion(version).forPath("/test", "testVersion".getBytes()); }
@Test public void testDelete() throws Exception {
client.create().forPath("/back"); client.delete().guaranteed().inBackground((curatorFramework, curatorEvent) -> { System.out.println(curatorEvent.getType()); System.out.println("我被删除了!"); }).forPath("/back"); Thread.sleep(10*1000); }
@After public void testClose(){ client.close(); } }
|
Watch事件监听
ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。ZooKeeper提供了三种Watcher:
- NodeCache : 只是监听某一个特定的节点。
- PathChildrenCache : 监控一个ZNode的子节点。
- TreeCache : 可以监控当前节点+所有子节点,即上面两种watcher的并集。
示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| @Test public void testNodeCache() throws Exception { NodeCache nodeCache = new NodeCache(client, "/app1"); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("监听到节点的变化"); if(nodeCache.getCurrentData() != null){ byte[] bytes = nodeCache.getCurrentData().getData(); System.out.println(new String(bytes)); } } }); nodeCache.start(true); while(true); }
@Test public void testPathChildrenCache() throws Exception { PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception { System.out.println("监听到子节点的改变"); System.out.println(event); PathChildrenCacheEvent.Type type = event.getType(); if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){ byte[] bytes = event.getData().getData(); System.out.println(new String(bytes)); } } }); pathChildrenCache.start(); while(true); }
|
TreeCache和PathChildrenCache用法类似,这里就不演示了。
__END__