ZooKeeper API 实战:从原生客户端到主流框架
ZooKeeper 提供了原生 Java API 用于分布式协调,但原生 API 存在会话重连、监听器一次性触发等痛点。实际开发中,更推荐使用封装完善的 ZkClient 或 Curator 框架。以下详细介绍三类客户端的核心用法与最佳实践。
原生 ZooKeeper API
原生 API 是 ZooKeeper 官方提供的基础接口,适合理解底层原理,但需手动处理诸多细节(如异常、会话维护)。
引入依赖
1 2 3 4 5
| <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.6.1</version> </dependency>
|
核心操作示例
(1)创建连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| private ZooKeeper zkClient; private final String connectString = "localhost:2181"; private final int sessionTimeout = 3000;
@Before public void init() throws IOException { Watcher watcher = event -> { if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { System.out.println("连接建立成功"); } }; zkClient = new ZooKeeper(connectString, sessionTimeout, watcher); }
|
(2)创建节点
1 2 3 4 5 6 7 8 9 10 11
| @Test public void createNode() throws KeeperException, InterruptedException { String path = zkClient.create( "/native_test", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT ); System.out.println("创建节点:" + path); }
|
- 节点类型:
PERSISTENT
(持久)、EPHEMERAL
(临时)、PERSISTENT_SEQUENTIAL
(持久有序)、EPHEMERAL_SEQUENTIAL
(临时有序)。
(3)获取节点数据
1 2 3 4 5 6 7 8 9 10 11
| @Test public void getNodeData() throws KeeperException, InterruptedException { Stat stat = new Stat(); byte[] data = zkClient.getData( "/native_test", false, stat ); System.out.println("数据:" + new String(data)); System.out.println("版本号:" + stat.getDataVersion()); }
|
(4)更新节点数据
1 2 3 4 5 6
| @Test public void updateNode() throws KeeperException, InterruptedException { Stat stat = zkClient.setData("/native_test", "world".getBytes(), -1); System.out.println("更新后版本:" + stat.getDataVersion()); }
|
(5)删除节点
1 2 3 4 5 6
| @Test public void deleteNode() throws KeeperException, InterruptedException { zkClient.delete("/native_test", -1); System.out.println("节点删除成功"); }
|
(6)监听节点变化
原生监听器是一次性的,触发后需重新注册:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Test public void watchNode() throws InterruptedException { zkClient.getData("/watch_test", event -> { System.out.println("事件类型:" + event.getType()); try { zkClient.getData("/watch_test", this, null); } catch (Exception e) { e.printStackTrace(); } }, null);
Thread.sleep(60000); }
|
原生 API 的痛点
- 会话超时需手动重连;
- 监听器是一次性的,需手动重新注册;
- 异常处理繁琐(如
KeeperException
需细分处理)。
ZkClient:简化原生 API 的封装
ZkClient 是对原生 API 的轻量封装,解决了会话重连、监听器自动注册等问题,使用更简洁。
引入依赖
1 2 3 4 5
| <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.11</version> </dependency>
|
核心操作示例
(1)创建连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| private ZkClient zkClient; private final String connectString = "localhost:2181";
@Before public void init() { zkClient = new ZkClient( connectString, 3000, 3000, new StringZkSerializer() ); }
static class StringZkSerializer implements ZkSerializer { @Override public byte[] serialize(Object data) { return String.valueOf(data).getBytes(StandardCharsets.UTF_8); } @Override public Object deserialize(byte[] bytes) { return new String(bytes, StandardCharsets.UTF_8); } }
|
(2)创建节点
支持递归创建父节点:
1 2 3 4 5 6
| @Test public void createNode() { zkClient.createPersistent("/zkclient/p/c", true); System.out.println("节点创建成功"); }
|
(3)监听子节点变化
ZkClient 监听器是持久化的,无需手动重注册:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Test public void watchChildren() throws InterruptedException { zkClient.subscribeChildChanges("/zkclient", (parentPath, children) -> { System.out.println("父路径:" + parentPath); System.out.println("子节点列表:" + children); });
zkClient.createPersistent("/zkclient/child1"); Thread.sleep(1000); zkClient.delete("/zkclient/child1"); Thread.sleep(1000);
Thread.sleep(60000); }
|
(4)其他操作
1 2 3 4 5 6 7 8
| zkClient.writeData("/zkclient", "test");
String data = zkClient.readData("/zkclient");
zkClient.deleteRecursive("/zkclient");
|
Curator:功能最全的客户端框架
Curator 是 Apache 顶级项目,提供了更强大的封装,支持分布式锁、选举等高级功能,是生产环境的首选。
引入依赖
1 2 3 4 5
| <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.1.0</version> </dependency>
|
核心操作示例
(1)创建连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| private CuratorFramework client; private final String connectString = "localhost:2181";
@Before public void init() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(3000) .connectionTimeoutMs(3000) .retryPolicy(retryPolicy) .namespace("curator") .build(); client.start(); }
|
(2)创建节点
支持链式调用,功能更灵活:
1 2 3 4 5 6 7 8
| @Test public void createNode() throws Exception { client.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath("/test", "hello".getBytes()); }
|
(3)监听节点变化
Curator 提供 CuratorCache
实现高效监听(支持持久化和递归监听):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Test public void watchNode() throws Exception { CuratorCache cache = CuratorCache.build(client, "/test"); CuratorCacheListener listener = CuratorCacheListener.builder() .forCreates(node -> System.out.println("节点创建:" + node.getPath())) .forUpdates((oldNode, newNode) -> System.out.println("节点更新:" + newNode.getPath()) ) .forDeletes(node -> System.out.println("节点删除:" + node.getPath())) .build(); cache.listenable().addListener(listener); cache.start(); client.create().forPath("/test/child"); Thread.sleep(1000); client.delete().forPath("/test/child"); Thread.sleep(60000); }
|
(4)分布式锁(Curator 高级特性)
Curator 内置分布式锁实现,简化锁竞争逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Test public void distributedLock() throws Exception { InterProcessMutex lock = new InterProcessMutex(client, "/lock"); try { if (lock.acquire(10, TimeUnit.SECONDS)) { System.out.println("获取锁成功,执行业务逻辑"); Thread.sleep(5000); } } finally { if (lock.isAcquiredInThisProcess()) { lock.release(); System.out.println("释放锁成功"); } } }
|
客户端对比与选型
客户端 |
优势 |
劣势 |
适用场景 |
原生 API |
最底层,无依赖 |
需手动处理重连、监听注册,繁琐 |
学习底层原理 |
ZkClient |
轻量封装,解决核心痛点 |
功能较少,社区维护不活跃(停更于 2019) |
简单分布式协调需求 |
Curator |
功能全面,支持分布式锁、选举等高级特性 |
依赖较重 |
生产环境,复杂分布式场景 |
v1.3.10