欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

zookeeper做分布式锁

发布时间:2024/9/30 编程问答 36 豆豆
生活随笔 收集整理的这篇文章主要介绍了 zookeeper做分布式锁 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

原理

多个客户端抢一把锁,这些客户端在 zookeeper 里创建各自的临时序列node,无论什么顺序,阻塞住,直到找到第一个创建的节点才放开,即第一个客户端获得了锁,不是第一个的客户端,就阻塞,直到第一个客户端运行结束,删除他创建的节点,让剩下的客户端继续轮循直到找到第一个的过程。

结果展示

用10个线程模拟10个客户端的结果如下:

代码

package org.faithgreen.lock1;import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat;import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch;public class LockWatchCallBack1 implements Watcher, AsyncCallback.StatCallback, AsyncCallback.ChildrenCallback, AsyncCallback.StringCallback {private ZooKeeper zk;private String threadName;private String pathName;private CountDownLatch latch = new CountDownLatch(1);public ZooKeeper getZk() {return zk;}public void setZk(ZooKeeper zk) {this.zk = zk;}public String getThreadName() {return threadName;}public void setThreadName(String threadName) {this.threadName = threadName;}public String getPathName() {return pathName;}public void setPathName(String pathName) {this.pathName = pathName;}/*** 创建 CreateMode.EPHEMERAL_SEQUENTIAL 节点,阻塞*/public void tryLock() {zk.create("/lock", threadName.getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, "abc");try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}}/*** zk.create 的 StringCallBack impl* 如果创建完了,这个回调会被调用,获取children** @param rc* @param path* @param o* @param childPath*/@Overridepublic void processResult(int rc, String path, Object o, String childPath) {if (childPath != null) {System.out.println(childPath + " created");pathName = childPath;zk.getChildren("/", this, this, "dev");}}/*** 获取 children 的回调* 多个客户端创建节点,list 总是有第一个的,如果是第一个,线程放行,否则在前一个节点上添加 watcher** @param rc* @param s* @param o* @param list*/@Overridepublic void processResult(int rc, String s, Object o, List<String> list) { // Collections.sort(list);int i = list.indexOf(pathName.substring(1));if (i == 0) {// 是第一个,就对了,获取锁了System.out.println(threadName + " is first ....");latch.countDown();} else {// 不是第一个,就在他的前一个加 watcherzk.exists("/" + list.get(i - 1), this, this, "dev");}}/*** children 节点的 watcher* 在释放锁是删除节点的时候触发再一次获取 children 的过程** @param e*/@Overridepublic void process(WatchedEvent e) {switch (e.getType()) {case None:break;case NodeCreated:break;case NodeDeleted:zk.getChildren("/", false, this, "dadd");break;case NodeDataChanged:break;case NodeChildrenChanged:break;}}@Overridepublic void processResult(int i, String s, Object o, Stat stat) {}/*** 释放锁的时候,删除节点*/public void unLock() {try {System.out.println(threadName + "工作结束,释放锁 ....");zk.delete(pathName, -1);} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}} } package org.faithgreen.lock1;import org.apache.zookeeper.ZooKeeper; import org.faithgreen.conf1.ZooKeeperUtils; import org.junit.After; import org.junit.Before; import org.junit.Test;import java.io.IOException;public class TestLock1 {ZooKeeper zk;@Beforepublic void getZk() {zk = ZooKeeperUtils.getZk();}@Afterpublic void after() {try {zk.close();} catch (InterruptedException e) {e.printStackTrace();}}@Testpublic void lock() {for (int i = 0; i < 10; i++) {new Thread(() -> {LockWatchCallBack1 w = new LockWatchCallBack1();String name = Thread.currentThread().getName();w.setThreadName(name);w.setZk(zk);// 获取锁w.tryLock();// 模拟客户端得到锁之后的工作System.out.println(name + " working ...");// 释放锁w.unLock();}).start();}try {System.in.read();} catch (IOException e) {e.printStackTrace();}} } package org.faithgreen.conf1;import org.apache.zookeeper.ZooKeeper;import java.io.IOException; import java.util.concurrent.CountDownLatch;public class ZooKeeperUtils {static ZooKeeper zk;final static String address = "192.168.172.3:2181,192.168.172.4:2181,192.168.172.5:2181,192.168.172.6:2181/testLock";static CountDownLatch latch = new CountDownLatch(1);static DefaultWatcher defaultWatcher = new DefaultWatcher();public static ZooKeeper getZk() {ZooKeeper zk = null;try {zk = new ZooKeeper(address, 3000, defaultWatcher);defaultWatcher.setLatch(latch);latch.await();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}return zk;}}

总结

以上是生活随笔为你收集整理的zookeeper做分布式锁的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。