0%

zookeeper的API

ZooKeeper API 实战:从原生客户端到主流框架

ZooKeeper 提供了原生 Java API 用于分布式协调,但原生 API 存在会话重连、监听器一次性触发等痛点。实际开发中,更推荐使用封装完善的 ZkClient 或 Curator 框架。以下详细介绍三类客户端的核心用法与最佳实践。

原生 ZooKeeper API

原生 API 是 ZooKeeper 官方提供的基础接口,适合理解底层原理,但需手动处理诸多细节(如异常、会话维护)。

引入依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.1</version>
</dependency>

核心操作示例

(1)创建连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private ZooKeeper zkClient;
private final String connectString = "localhost:2181"; // 集群地址用逗号分隔
private final int sessionTimeout = 3000; // 会话超时时间(毫秒)

@Before
public void init() throws IOException {
// 回调监听器:处理连接状态变化(如连接建立、断开)
Watcher watcher = event -> {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
System.out.println("连接建立成功");
}
};
// 初始化连接(异步操作,需等待回调确认连接成功)
zkClient = new ZooKeeper(connectString, sessionTimeout, watcher);
}
(2)创建节点
1
2
3
4
5
6
7
8
9
10
11
@Test
public void createNode() throws KeeperException, InterruptedException {
// 参数:路径、数据、权限、节点类型
String path = zkClient.create(
"/native_test",
"hello".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, // 开放权限(所有用户可操作)
CreateMode.PERSISTENT // 持久节点
);
System.out.println("创建节点:" + path); // 输出:/native_test
}
  • 节点类型PERSISTENT(持久)、EPHEMERAL(临时)、PERSISTENT_SEQUENTIAL(持久有序)、EPHEMERAL_SEQUENTIAL(临时有序)。
(3)获取节点数据
1
2
3
4
5
6
7
8
9
10
11
@Test
public void getNodeData() throws KeeperException, InterruptedException {
Stat stat = new Stat(); // 用于接收节点元数据
byte[] data = zkClient.getData(
"/native_test",
false, // 是否监听数据变化(一次性)
stat
);
System.out.println("数据:" + new String(data)); // 输出:hello
System.out.println("版本号:" + stat.getDataVersion()); // 输出:0
}
(4)更新节点数据
1
2
3
4
5
6
@Test
public void updateNode() throws KeeperException, InterruptedException {
// 参数:路径、新数据、版本号(-1 表示无视版本)
Stat stat = zkClient.setData("/native_test", "world".getBytes(), -1);
System.out.println("更新后版本:" + stat.getDataVersion()); // 输出:1
}
(5)删除节点
1
2
3
4
5
6
@Test
public void deleteNode() throws KeeperException, InterruptedException {
// 参数:路径、版本号(-1 表示无视版本)
zkClient.delete("/native_test", -1);
System.out.println("节点删除成功");
}
(6)监听节点变化

原生监听器是一次性的,触发后需重新注册:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
public void watchNode() throws InterruptedException {
// 注册数据变化监听
zkClient.getData("/watch_test", event -> {
System.out.println("事件类型:" + event.getType()); // 如 NodeDataChanged
try {
// 重新注册监听(否则仅触发一次)
zkClient.getData("/watch_test", this, null);
} catch (Exception e) {
e.printStackTrace();
}
}, null);

// 阻塞等待事件(实际开发中用业务逻辑替代)
Thread.sleep(60000);
}

原生 API 的痛点

  • 会话超时需手动重连;
  • 监听器是一次性的,需手动重新注册;
  • 异常处理繁琐(如 KeeperException 需细分处理)。

ZkClient:简化原生 API 的封装

ZkClient 是对原生 API 的轻量封装,解决了会话重连、监听器自动注册等问题,使用更简洁。

引入依赖

1
2
3
4
5
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.11</version>
</dependency>

核心操作示例

(1)创建连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private ZkClient zkClient;
private final String connectString = "localhost:2181";

@Before
public void init() {
// 参数:连接地址、会话超时、连接超时、序列化器(默认不支持字符串,需自定义)
zkClient = new ZkClient(
connectString,
3000,
3000,
new StringZkSerializer() // 自定义字符串序列化器
);
}

// 自定义序列化器(解决字符串数据存储问题)
static class StringZkSerializer implements ZkSerializer {
@Override
public byte[] serialize(Object data) {
return String.valueOf(data).getBytes(StandardCharsets.UTF_8);
}
@Override
public Object deserialize(byte[] bytes) {
return new String(bytes, StandardCharsets.UTF_8);
}
}
(2)创建节点

支持递归创建父节点:

1
2
3
4
5
6
@Test
public void createNode() {
// 递归创建 /zkclient/p/c(父节点不存在时自动创建)
zkClient.createPersistent("/zkclient/p/c", true);
System.out.println("节点创建成功");
}
(3)监听子节点变化

ZkClient 监听器是持久化的,无需手动重注册:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void watchChildren() throws InterruptedException {
// 监听 /zkclient 的子节点变化
zkClient.subscribeChildChanges("/zkclient", (parentPath, children) -> {
System.out.println("父路径:" + parentPath);
System.out.println("子节点列表:" + children);
});

// 触发监听(创建子节点)
zkClient.createPersistent("/zkclient/child1");
Thread.sleep(1000); // 等待事件触发
zkClient.delete("/zkclient/child1");
Thread.sleep(1000);

// 阻塞等待(实际开发中用业务逻辑替代)
Thread.sleep(60000);
}
(4)其他操作
1
2
3
4
5
6
7
8
// 更新数据
zkClient.writeData("/zkclient", "test");

// 获取数据
String data = zkClient.readData("/zkclient");

// 递归删除节点(含子节点)
zkClient.deleteRecursive("/zkclient");

Curator:功能最全的客户端框架

Curator 是 Apache 顶级项目,提供了更强大的封装,支持分布式锁、选举等高级功能,是生产环境的首选。

引入依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
</dependency>

核心操作示例

(1)创建连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private CuratorFramework client;
private final String connectString = "localhost:2181";

@Before
public void init() {
// 重试策略:初始休眠 1 秒,最多重试 3 次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

// 构建客户端(支持命名空间隔离)
client = CuratorFrameworkFactory.builder()
.connectString(connectString)
.sessionTimeoutMs(3000)
.connectionTimeoutMs(3000)
.retryPolicy(retryPolicy)
.namespace("curator") // 命名空间(所有操作路径自动拼接前缀 /curator)
.build();

// 启动客户端(必须调用)
client.start();
}
(2)创建节点

支持链式调用,功能更灵活:

1
2
3
4
5
6
7
8
@Test
public void createNode() throws Exception {
// 递归创建节点(父节点不存在时自动创建)
client.create()
.creatingParentsIfNeeded() // 递归创建父节点
.withMode(CreateMode.PERSISTENT) // 节点类型
.forPath("/test", "hello".getBytes()); // 路径和数据
}
(3)监听节点变化

Curator 提供 CuratorCache 实现高效监听(支持持久化和递归监听):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Test
public void watchNode() throws Exception {
// 构建缓存(监听 /test 节点及其子节点)
CuratorCache cache = CuratorCache.build(client, "/test");

// 注册监听器
CuratorCacheListener listener = CuratorCacheListener.builder()
.forCreates(node -> System.out.println("节点创建:" + node.getPath()))
.forUpdates((oldNode, newNode) ->
System.out.println("节点更新:" + newNode.getPath())
)
.forDeletes(node -> System.out.println("节点删除:" + node.getPath()))
.build();

// 添加监听器并启动缓存
cache.listenable().addListener(listener);
cache.start();

// 触发事件
client.create().forPath("/test/child");
Thread.sleep(1000);
client.delete().forPath("/test/child");

// 阻塞等待
Thread.sleep(60000);
}
(4)分布式锁(Curator 高级特性)

Curator 内置分布式锁实现,简化锁竞争逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void distributedLock() throws Exception {
// 创建分布式锁(基于临时有序节点)
InterProcessMutex lock = new InterProcessMutex(client, "/lock");

try {
// 获取锁(最多等待 10 秒)
if (lock.acquire(10, TimeUnit.SECONDS)) {
System.out.println("获取锁成功,执行业务逻辑");
// 模拟业务操作
Thread.sleep(5000);
}
} finally {
// 释放锁
if (lock.isAcquiredInThisProcess()) {
lock.release();
System.out.println("释放锁成功");
}
}
}

客户端对比与选型

客户端 优势 劣势 适用场景
原生 API 最底层,无依赖 需手动处理重连、监听注册,繁琐 学习底层原理
ZkClient 轻量封装,解决核心痛点 功能较少,社区维护不活跃(停更于 2019) 简单分布式协调需求
Curator 功能全面,支持分布式锁、选举等高级特性 依赖较重 生产环境,复杂分布式场景

欢迎关注我的其它发布渠道

表情 | 预览
快来做第一个评论的人吧~
Powered By Valine
v1.3.10