日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

分布式锁-zk临时节点

發布時間:2025/3/21 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 分布式锁-zk临时节点 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

多線程訪問同一個共享資源時,會出現并發問題,synchronized或者lock 類的鎖只能控制單一進程的資源訪問,多進程下就需要用到分布式鎖

利用zk 可以實現獨占鎖,(同級節點唯一性)多個進程往zk指定節點下創建一個相同名稱的節點,只有一個能成功,創建失敗的通過zk的watcher機制監聽子節點變化,一個監聽到子節點刪除事件,會再次觸發所有進程的寫鎖,但這里會有驚群效應,會影響到性能

利用有序節點實現分布式鎖:每個客戶端都往一個指定節點(locks)注冊一個臨時有序節點,越早創建的節點編號越小,最小編號的節點獲得鎖,通過監聽比自己小的節點,當比自己小的節點刪除后,客戶端會收到watcher,再次判斷自己的節點是不是所有節點最小的,是則獲得鎖;這種方式也會解決驚群問題

接下來我們來看實現:curator 分布式鎖的使用
curator對鎖封裝了一層,提供了InterProcessMutex;還提供了leader 選舉、分布式隊列 InterProcessMutex 分布式可重入排他鎖
InterProcessSemaphoreMutex 分布式排他鎖
InterProcessReadWriteLock 分布式讀寫鎖

public class LeaderSelectorClient extends LeaderSelectorListenerAdapter implements Closeable {private String name; //表示當前的進程private LeaderSelector leaderSelector; //leader選舉的APIprivate CountDownLatch countDownLatch=new CountDownLatch(1);public LeaderSelectorClient(){}public LeaderSelectorClient(String name) {this.name = name;}public LeaderSelector getLeaderSelector() {return leaderSelector;}public void setLeaderSelector(LeaderSelector leaderSelector) {this.leaderSelector = leaderSelector;}public void start(){leaderSelector.start(); //開始競爭leader}@Overridepublic void takeLeadership(CuratorFramework client) throws Exception {//如果進入當前的方法,意味著當前的進程獲得了鎖。獲得鎖以后,這個方法會被回調//這個方法執行結束之后,表示釋放leader權限System.out.println(name+"->現在是leader了"); // countDownLatch.await(); //阻塞當前的進程防止leader丟失}@Overridepublic void close() throws IOException {leaderSelector.close();}private static String CONNECTION_STR="zk集群地址,至少三臺機器";public static void main(String[] args) throws IOException {CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(CONNECTION_STR).sessionTimeoutMs(50000000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();curatorFramework.start();LeaderSelectorClient leaderSelectorClient=new LeaderSelectorClient("ClientA");LeaderSelector leaderSelector=new LeaderSelector(curatorFramework,"/leader",leaderSelectorClient);leaderSelectorClient.setLeaderSelector(leaderSelector);leaderSelectorClient.start(); //開始選舉System.in.read();} } 我們來看下curator 實現分布式鎖的原理,這里我把注釋寫在了代碼中,所以把代碼貼到一塊public InterProcessMutex(CuratorFramework client, String path) {// 實現公平鎖的核心:zookeeper利用path 創建臨時有序節點this(client, path, new StandardLockInternalsDriver());}public StandardLockInternalsDriver() {}public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {this(client, path, "lock-", 1, driver);}//maxLeases:互斥鎖InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {this.threadData = Maps.newConcurrentMap();this.basePath = PathUtils.validatePath(path);//InterProcessMutex 把分布式鎖的申請和釋放委托給了 LockInternals internalsthis.internals = new LockInternals(client, driver, path, lockName, maxLeases);}// 無限等待public void acquire() throws Exception {if (!this.internalLock(-1L, (TimeUnit)null)) {throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);}}// 限時等待public boolean acquire(long time, TimeUnit unit) throws Exception {return this.internalLock(time, unit);}private boolean internalLock(long time, TimeUnit unit) throws Exception {Thread currentThread = Thread.currentThread();//同一線程再次acquire,首先判斷threaData 是否有這個線程鎖信息,如果有則原子+1,然后返回InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);if (lockData != null) {// 實現可重入;lockData.lockCount.incrementAndGet();return true;// 映射表沒有對應的鎖信息,嘗試通過LockInternals 獲取鎖} else {String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());if (lockPath != null) {// 成功獲取鎖,存儲到映射表中InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);this.threadData.put(currentThread, newLockData);return true;} else {return false;}}}// 記錄線程和鎖信息的映射關系private final ConcurrentMap<Thread, InterProcessMutex.LockData> threadData;//zk 中一個臨時有序節點對應一個鎖,但是讓鎖生效需要排隊private static class LockData {final Thread owningThread;final String lockPath;final AtomicInteger lockCount;private LockData(Thread owningThread, String lockPath) {this.lockCount = new AtomicInteger(1); //分布式鎖重入次數this.owningThread = owningThread;this.lockPath = lockPath;}}//嘗試獲取鎖,并返回鎖對應的zk 臨時有序節點路徑String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {long startMillis = System.currentTimeMillis();// millisToWait 是個nullLong millisToWait = unit != null ? unit.toMillis(time) : null;byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;int retryCount = 0;String ourPath = null;//是否已經有分布式鎖?boolean hasTheLock = false;//是否已經完成嘗試獲取分布式鎖操作boolean isDone = false;while(!isDone) {isDone = true;try {//driver = StandardLockInternalsDriverourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);//循環等待激活分布式鎖hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);} catch (NoNodeException var14) {if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {throw var14;}isDone = false;}}//成功獲取分布式鎖,返回有序節點的路徑return hasTheLock ? ourPath : null;}//在zk 中創建臨時順序節點public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {String ourPath;//默認內容是ip地址if (lockNodeBytes != null) {// creatingParentContainersIfNeeded:創建父節點,如果不支持CreateMode.CONTAINER,就采用CreateMode.PERSISTENT// withProtection:臨時節點添加GUIDourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path, lockNodeBytes);} else {ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path);}return ourPath;}//循環等待激活分布式鎖private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {boolean haveTheLock = false;boolean doDelete = false;try {if (this.revocable.get() != null) {((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);}while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {List<String> children = this.getSortedChildren();String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases);if (predicateResults.getsTheLock()) {haveTheLock = true;} else {String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();synchronized(this) {try {((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);if (millisToWait == null) {this.wait();} else {millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);startMillis = System.currentTimeMillis();if (millisToWait > 0L) {this.wait(millisToWait);} else {doDelete = true;break;}}} catch (NoNodeException var19) {}}}}} catch (Exception var21) {ThreadUtils.checkInterrupted(var21);doDelete = true;throw var21;} finally {if (doDelete) {this.deleteOurPath(ourPath);}}return haveTheLock;} 《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀

總結

以上是生活随笔為你收集整理的分布式锁-zk临时节点的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。