简介

ZooKeeper 旨在为复杂的分布式系统提供一个简单易用的协调机制,解决分布式环境下的数据一致性、配置管理、命名服务等难题,减少开发者在分布式系统开发中处理复杂协调逻辑的负担。

应用场景
  • 注册中心:ZooKeeper 可以作为协调中心,实现节点的注册与发现、任务分配、集群管理等功能。
  • 分布式锁:在分布式环境中,多个进程或线程可能会同时访问共享资源,使用 ZooKeeper 可以实现分布式锁,保证数据的一致性和完整性。
  • 配置管理:ZooKeeper 可以集中管理配置信息,并在配置发生变化时及时通知各个节点。
数据模型

ZooKeeper 使用类似文件系统目录树的层次化结构来组织数据,其中每个节点被称为 Znode。根节点为 /,从根节点开始可以向下创建子节点,形成一棵倒挂的树。每个 Znode 都有一个唯一的路径,类似于文件系统中的文件路径,路径由斜杠分隔的一系列名称组成,如 /parent/child。但与文件系统不同的是,文件系统中的路径无法保存数据,只有文件可以保存数据,而所有Znode都是可以保存少量数据的。同时,每个 Znode 还包含一些元数据,如版本号、时间戳等,这些元数据用于实现数据的一致性和并发控制。

image-20250226202856862

Znode有以下类型:

  • 持久节点(Persistent):一旦创建,除非显式删除,否则会一直存在于 ZooKeeper 中。
  • 临时节点(Ephemeral):与客户端会话绑定,当客户端会话结束(如客户端断开连接)时,临时节点会自动被删除。临时节点不能有子节点。应用场景:在分布式锁实现中,客户端创建一个临时节点表示获得锁,当客户端崩溃或主动释放锁时,临时节点自动删除,其他客户端可以尝试获取锁。
  • 顺序节点(Sequential):可以是持久顺序节点或临时顺序节点。在创建顺序节点时,ZooKeeper 会在节点名称后面自动添加一个单调递增的序列号,常用于实现分布式队列、分布式选举等场景。

基本增删改查

使用curator做基本的增删改查非常简单,参考以下测试代码即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
@SpringBootTest
public class SomeTestApplicationTests {

CuratorFramework client;

@Before
public void testConnect(){

RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);
// 客户端创建方式一
// CuratorFramework client = CuratorFrameworkFactory.newClient("101.132.242.94:2181", retryPolicy);

// 客户端创建方式二
client = CuratorFrameworkFactory.builder()
.connectString("101.132.242.94:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.build();
client.start();
}

/**
* 创建节点:
* 1.基本创建:create().forPath("")
* 2.创建带有数据的节点:create().forPath("", "".getBytes())
* 3.设置节点类型,默认为临时节点:create.withMode(CreateMode.xxx).forPath("")
* 4.创建多级节点:create().creatingParentsIfNeeded().forPath("")
*/
@Test
public void testCreateNode() throws Exception {
client.create().withMode(CreateMode.PERSISTENT).forPath("/test", "test".getBytes());
}

@Test
public void testCreateMulti() throws Exception {
String s = client.create().creatingParentsIfNeeded().forPath("/multi/p1");
System.out.println(s);
}

/**
* 读取节点数据:
* 1.读取一个节点的数据 get:getData().forPath("")
* 2.读取一个节点的子节点 ls:getChildren().forPath("")
* 3.读取一个节点的数据,并且返回节点的Stat信息 ls -s:getData().storingStatIn(stat).forPath("")
*/
@Test
public void testGet() throws Exception {
byte[] bytes = client.getData().forPath("/test");
System.out.println(new String(bytes));

List<String> strings = client.getChildren().forPath("/");
System.out.println(strings);

Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/test");
System.out.println(stat);

}

/**
* 修改节点数据:
* 1.修改一个节点的数据:setData().forPath("", "".getBytes())
* 2.根据版本修改:setData().withVersion(-1).forPath("", "".getBytes())
*/
@Test
public void testSet() throws Exception {
client.setData().forPath("/test", "test2".getBytes());
}

@Test
public void testSetWithVersion() throws Exception {
// CAS,版本不对不能改
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/test");
int version = stat.getVersion();
System.out.println(version);
client.setData().withVersion(version).forPath("/test", "testVersion".getBytes());
}

/**
* 删除节点:
* 1.删除一个节点:delete().forPath("")
* 2.删除一个节点,并且删除其子节点:delete().deletingChildrenIfNeeded().forPath("")
* 3.删除一个节点,确保成功(防止网络抖动):delete().guaranteed().forPath("")
* 4.根据版本号删除节点:delete().withVersion().forPath("")
* 5.回调删除:delete().inBackground().forPath("")
*/
@Test
public void testDelete() throws Exception {
// client.delete().guaranteed().forPath("/test");
// client.delete().deletingChildrenIfNeeded().forPath("/multi");

client.create().forPath("/back");
client.delete().guaranteed().inBackground((curatorFramework, curatorEvent) -> {
System.out.println(curatorEvent.getType());
System.out.println("我被删除了!");
}).forPath("/back");
// 记得休眠一下,否则异步线程还没执行完就被JVM中断掉了
Thread.sleep(10*1000);
}

@After
public void testClose(){
client.close();
}
}

Watch事件监听

ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。ZooKeeper提供了三种Watcher:

  • NodeCache : 只是监听某一个特定的节点。
  • PathChildrenCache : 监控一个ZNode的子节点。
  • TreeCache : 可以监控当前节点+所有子节点,即上面两种watcher的并集。

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 监听一个节点的变化
@Test
public void testNodeCache() throws Exception {
// 创建NodeCache对象并注册监听
NodeCache nodeCache = new NodeCache(client, "/app1");
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("监听到节点的变化");
// 获取节点的当前数据
if(nodeCache.getCurrentData() != null){
byte[] bytes = nodeCache.getCurrentData().getData();
System.out.println(new String(bytes));
}
}
});
// 开启监听,如果设置为true,开启监听时会加载缓存数据
nodeCache.start(true);
while(true);
}

// 监听子节点的变化
@Test
public void testPathChildrenCache() throws Exception {
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true);
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
System.out.println("监听到子节点的改变");
System.out.println(event);
// 仅当更新时才操作
PathChildrenCacheEvent.Type type = event.getType();
if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
byte[] bytes = event.getData().getData();
System.out.println(new String(bytes));
}
}
});
pathChildrenCache.start();
while(true);
}

TreeCache和PathChildrenCache用法类似,这里就不演示了。

__END__