主页 > 软件开发  > 

zookeeper有序临时结点实现公平锁的实践例子

zookeeper有序临时结点实现公平锁的实践例子

目录 实践例子1. 先创建一个持久结点2. 创建一个结点监听程序3. 锁程序4. 测试和输出截图测试说明 回顾zkNode类型zookeeper分布式锁的优缺点

实践例子 1. 先创建一个持久结点

./bin/zkServer.sh start conf/zoo_local.cfg ./bin/zkCli.sh -server 127.0.0.1:2181

2. 创建一个结点监听程序 <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>5.1.0</version> </dependency> CuratorTest package com.test; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; /** * @Author mubi * @Date 2020/3/27 23:43 */ public class CuratorTest { private static String clusterNode = "/mylock"; private static CuratorFramework cf; private static PathChildrenCache pathChildrenCache; private static NodeCache nodeCache; private static TreeCache treeCache; public static void main(String[] args) throws Exception { CuratorTest curatorTest = new CuratorTest(); RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); cf = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000, 1000, retryPolicy); cf.start(); curatorTest.setPathCacheListener(clusterNode, true); curatorTest.setNodeCacheListener(clusterNode, false); curatorTest.setTreeCacheListener(clusterNode); System.in.read(); } /** * 设置Path Cache, 监控本节点的子节点被创建,更新或者删除,注意是子节点, 子节点下的子节点不能递归监控 * 可重入监听 */ public void setPathCacheListener(String path, boolean cacheData) throws Exception { try { pathChildrenCache = new PathChildrenCache(cf, path, cacheData); PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) { ChildData data = event.getData(); switch (event.getType()) { case CHILD_ADDED: System.out.println("子节点增加:" + data.getPath() + " " + data.getData()); try { String rs = new String(data.getData(), "utf-8"); System.out.println("data string:" + rs); } catch (Exception e) { } break; case CHILD_UPDATED: System.out.println("子节点更新:" + data.getPath() + " " + data.getData()); break; case CHILD_REMOVED: System.out.println("子节点删除:" + data.getPath() + " " + data.getData()); break; default: break; } } }; pathChildrenCache.getListenable().addListener(childrenCacheListener); pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); } catch (Exception e) { System.out.println("PathCache监听失败, path=" + path); } } /** * 设置Node Cache, 监控本节点的新增,删除,更新 */ public void setNodeCacheListener(String path, boolean dataIsCompressed) { try { nodeCache = new NodeCache(cf, path, dataIsCompressed); NodeCacheListener nodeCacheListener = new NodeCacheListener() { @Override public void nodeChanged() throws Exception { ChildData childData = nodeCache.getCurrentData(); System.out.println("ZNode节点状态改变, path=" + childData.getPath()); System.out.println("ZNode节点状态改变, data=" + childData.getData()); try { String rs = new String(childData.getData(), "utf-8"); System.out.println("data string:" + rs); } catch (Exception e) { } System.out.println("ZNode节点状态改变, stat=" + childData.getStat()); } }; nodeCache.getListenable().addListener(nodeCacheListener); nodeCache.start(); } catch (Exception e) { System.out.println("创建NodeCache监听失败, path=" + path); } } /** * 设置Tree Cache, 监控本节点的新增,删除,更新 * 节点的update可以监控到, 如果删除不会自动再次创建 * 可重入监听 */ public void setTreeCacheListener(final String path) { try { treeCache = new TreeCache(cf, path); TreeCacheListener treeCacheListener = new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { ChildData data = event.getData(); if (data != null) { switch (event.getType()) { case NODE_ADDED: System.out.println("[TreeCache]子节点增加" + data.getPath() + " " + data.getData()); break; case NODE_UPDATED: System.out.println("[TreeCache]子节点更新" + data.getPath() + " " + data.getData()); try { String rs = new String(data.getData(), "utf-8"); System.out.println("data string:" + rs); } catch (Exception e) { } System.out.println(); break; case NODE_REMOVED: System.out.println("[TreeCache]子节点删除" + data.getPath() + " " + data.getData()); break; default: break; } } else { System.out.println("[TreeCache]节点数据为空, path=" + data.getPath()); } } }; treeCache.getListenable().addListener(treeCacheListener); treeCache.start(); } catch (Exception e) { System.out.println("创建TreeCache监听失败, path=" + path); } } } 3. 锁程序 ZooKeeperLock package com.test; import org.apache.zookeeper.*; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; /** * @Author mubi * @Date 2020/3/28 11:03 */ public class ZooKeeperLock { private String clusterNode = "mylock"; private AtomicInteger atomicInteger = new AtomicInteger(0); private ZooKeeper zk; CountDownLatch countDownLatch = new CountDownLatch(1); public ZooKeeperLock() { try { zk = new ZooKeeper("127.0.0.1:2181", 5000, new Watcher() { @Override public void process(WatchedEvent event) { if (Watcher.Event.KeeperState.SyncConnected.equals(event.getState())) { System.out.println("zk连接成功" + event); countDownLatch.countDown(); } } }); } catch (Exception e) { e.printStackTrace(); } } public Node lock() { Node node = createNode(); if (!tryAcquire(node)) { // 没有获取到锁,就等待 synchronized (node) { try { node.wait(); } catch (Exception e) { e.printStackTrace(); } } } return node; } private boolean tryAcquire(Node node) { try { String path = "/" + clusterNode; // 结点下所有路径,从小到大 List<String> subList = zk.getChildren(path, true); subList = subList.stream() .sorted() .map(n -> path + "/" + n) .collect(Collectors.toList()); // 是最小结点,表示获取到锁了,直接返回 String smallestPath = subList.get(0); if (smallestPath.isEmpty() || node.getPath().equals(smallestPath)) { return true; } else { // 否则要等待前一个结点释放锁 int index = subList.indexOf(node.getPath()); String preNodePath = subList.get(index - 1); zk.exists(preNodePath, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeDeleted) { // 前一个节点删除了释放锁了,就唤醒本结点 synchronized (node) { node.notify(); } } } }); return false; } } catch (Exception e) { return false; } } public void unlock(Node node) { try { zk.delete(node.path, -1); } catch (Exception e) { e.printStackTrace(); } } /** * 临时有序结点 */ class Node { private String path; public Node(String path) { this.path = path; } public String getPath() { return path; } public void setPath(String path) { this.path = path; } } public Node createNode() { try { String path = "/" + clusterNode + "/lock-"; String createdPath = zk.create( path, path.getBytes("utf-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); Node node = new Node(createdPath); System.out.println("create EPHEMERAL_SEQUENTIAL: " + createdPath); return node; } catch (Exception e) { e.printStackTrace(); return null; } } } Test package com.test; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; /** * @Author mubi * @Date 2020/3/28 11:03 */ public class Test { private int a = 0; ZooKeeperLock zooKeeperLock = new ZooKeeperLock(); public void add() { try { // 业务逻辑 TimeUnit.MILLISECONDS.sleep(300); a++; } catch (Exception e) { } finally { } } public void lockAdd() { ZooKeeperLock.Node node = null; try { node = zooKeeperLock.lock(); if (node != null) { // 业务逻辑 TimeUnit.MILLISECONDS.sleep(300); a++; } } catch (Exception e) { // e.printStackTrace(); System.out.println("lock error"); } finally { zooKeeperLock.unlock(node); } } public int getA() { return a; } public static void main(String[] args) throws Exception { List<Thread> threadList = new ArrayList<>(); Test test = new Test(); int n = 20; for (int i = 0; i < n; i++) { threadList.add( new Thread(() -> { test.lockAdd(); }) ); } for (int i = 0; i < n; i++) { threadList.get(i).start(); } for (int i = 0; i < n; i++) { threadList.get(i).join(); } System.out.println(test.getA()); } } 4. 测试和输出截图

测试输出

监听

测试说明

加锁过程

先创建有序临时结点,然后如果加锁成功就返回,否则等待 public Node lock() { Node node = createNode(); if (!tryAcquire(node)) { // 没有获取到锁,就等待 synchronized (node) { try { node.wait(); } catch (Exception e) { e.printStackTrace(); } } } return node; } public Node createNode() { try { String path = "/" + clusterNode + "/lock-"; String createdPath = zk.create( path, path.getBytes("utf-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); Node node = new Node(createdPath); System.out.println("create EPHEMERAL_SEQUENTIAL: " + createdPath); return node; } catch (Exception e) { e.printStackTrace(); return null; } } 判断当前是最小结点就成功;否则判断前序结点是否删除,删除了就可以当前结点;因为加锁的有序的从小到大加上的,也是公平的体现,先来后到 private boolean tryAcquire(Node node) { try { String path = "/" + clusterNode; // 结点下所有路径,从小到大 List<String> subList = zk.getChildren(path, true); subList = subList.stream() .sorted() .map(n -> path + "/" + n) .collect(Collectors.toList()); // 是最小结点,表示获取到锁了,直接返回 String smallestPath = subList.get(0); if (smallestPath.isEmpty() || node.getPath().equals(smallestPath)) { return true; } else { // 否则要等待前一个结点释放锁 int index = subList.indexOf(node.getPath()); String preNodePath = subList.get(index - 1); zk.exists(preNodePath, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeDeleted) { // 前一个节点删除了释放锁了,就唤醒本结点 synchronized (node) { node.notify(); } } } }); return false; } } catch (Exception e) { return false; } } 回顾zkNode类型

1.持久节点(PERSISTENT):这类节点被创建后,就会一直存在于Zk服务器上。直到手动删除。

2.持久顺序节点(PERSISTENT_SEQUENTIAL):它的基本特性同持久节点,不同在于增加了顺序性。父节点会维护一个自增整性数字,用于子节点的创建的先后顺序。

3.临时节点(EPHEMERAL):临时节点的生命周期与客户端的会话绑定,一旦客户端会话失效(非TCP连接断开),那么这个节点就会被自动清理掉。zk规定临时节点只能作为叶子节点。

4.临时顺序节点(EPHEMERAL_SEQUENTIAL):基本特性同临时节点,添加了顺序的特性。

zookeeper分布式锁的优缺点

ai解释如下

优点

缺点

实际应用中的建议

场景选择:在选择使用 ZooKeeper 实现分布式锁时,应评估系统的实际需求,特别是在高并发写入操作的场景下,可能需要考虑其他更适合的方案(如 Redis 的分布式锁等)。

性能优化:可以通过优化客户端的连接方式(例如使用连接池)和合理设计锁的粒度来减轻性能压力。

监控与调优:定期监控 ZooKeeper 的性能指标(如延迟、吞吐量等),并根据需要进行调优。

备份与容灾:确保有适当的备份和容灾计划,以应对潜在的单点故障风险。

标签:

zookeeper有序临时结点实现公平锁的实践例子由讯客互联软件开发栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“zookeeper有序临时结点实现公平锁的实践例子