日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

海豚调度Dolphinscheduler源码分析(三)

發(fā)布時(shí)間:2024/3/13 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 海豚调度Dolphinscheduler源码分析(三) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

今天繼續(xù)分析海豚調(diào)度的源碼

上回分析的是dolphinscheduler-service模塊zookeeper相關(guān)的代碼

這回分析是dolphinscheduler-server模塊zookeeper相關(guān)的代碼

ZkMasterClient master服務(wù)zk客戶端類

類繼承的關(guān)系如下:

這個(gè)類的方法如下:

方法介紹:

  • start()??根據(jù)路徑dolphinscheduler/lock/failover/master 創(chuàng)建一個(gè)分布式鎖,并進(jìn)行初始化,檢查是否有master節(jié)點(diǎn)競(jìng)爭鎖,確保只有一個(gè)主master,如果只有一個(gè)master節(jié)點(diǎn),那么無法進(jìn)行master服務(wù)的故障轉(zhuǎn)移
  • dataChange() 變更zk節(jié)點(diǎn)
  • removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) 移除zookeeper 節(jié)點(diǎn),并在/dead路徑添加節(jié)點(diǎn),并會(huì)判斷是否需要容錯(cuò)
  • handleDeadServer()? 父類方法,就是處理宕機(jī)服務(wù)的zookeeper路徑,將獲取節(jié)點(diǎn)刪除,添加/dead路徑數(shù)據(jù)
  • failoverServerWhenDown() 當(dāng)服務(wù)宕機(jī)后,轉(zhuǎn)移服務(wù),分為master服務(wù)和server服務(wù)
  • checkTaskInstanceNeedFailover()
  • failoverWorker()? 將worker上的task任務(wù)進(jìn)行故障轉(zhuǎn)移
    • 如果是yarn任務(wù),干掉yarn任務(wù)
    • 將任務(wù)狀態(tài)變更為需要故障轉(zhuǎn)移
    • ? 當(dāng)工作節(jié)點(diǎn)全部為null時(shí),將所有任務(wù)進(jìn)行故障轉(zhuǎn)移

zk分布式鎖獲取代碼如下:

public void start() {

//Curator是zk的一個(gè)客戶端框架,其中分裝了分布式公平可重入互斥鎖,最為常見是InterProcessMutex
InterProcessMutex mutex = null;
try {
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master
///根據(jù)這個(gè)路徑dolphinscheduler/lock/failover/master 創(chuàng)建一個(gè)分布式鎖
String znodeLock = getMasterStartUpLockPath();
//InterProcessMutex的構(gòu)造方法,需要一個(gè)客戶端和路徑
mutex = new InterProcessMutex(getZkClient(), znodeLock);
//獲取鎖,鎖的獲取,最后必須釋放
mutex.acquire();

// init system znode
this.initSystemZNode();

    //檢查是否有master節(jié)點(diǎn)
while (!checkZKNodeExists(NetUtils.getHost(), ZKNodeType.MASTER)){
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}


// self tolerant
//如果活動(dòng)的master節(jié)點(diǎn)只有1個(gè),無法進(jìn)行master服務(wù)的容錯(cuò),故failoverMaster(null)
if (getActiveMasterNum() == 1) {
failoverWorker(null, true);
failoverMaster(null);
}

}catch (Exception e){
logger.error("master start up exception",e);
}finally {
//釋放鎖,這個(gè)方法是父類AbstractZKClient的,在finally中釋放,保證鎖最后能夠釋放
releaseMutex(mutex);
}
}

對(duì)于InterProcessMutex,Curator是ZooKeeper的一個(gè)客戶端框架,其中封裝了分布式互斥鎖的實(shí)現(xiàn),最為常用的是InterProcessMutex

InterProcessMutex基于Zookeeper實(shí)現(xiàn)了分布式的公平可重入互斥鎖,類似于單個(gè)JVM進(jìn)程內(nèi)的ReentrantLock(fair=true)

全局同步的可重入分布式鎖,任何時(shí)刻不會(huì)有兩個(gè)客戶端同時(shí)持有該鎖。Reentrant和JDK的ReentrantLock類似, 意味著同一個(gè)客戶端在擁有鎖的同時(shí),可以多次獲取,不會(huì)被阻塞

相關(guān)鏈接:https://blog.csdn.net/hosaos/article/details/89521537

相關(guān)鏈接:https://www.cnblogs.com/a-du/p/9876314.html

相關(guān)鏈接:https://blog.csdn.net/qq_34021712/article/details/82878396

主要方法:

//獲取鎖,若失敗則阻塞等待直到成功,支持重入 public void acquire() throws Exception //超時(shí)獲取鎖,超時(shí)失敗 public boolean acquire(long time, TimeUnit unit) throws Exception //釋放鎖,一般在finally中釋放 public void release() throws Exception

注意點(diǎn),調(diào)用acquire()方法后需相應(yīng)調(diào)用release()來釋放鎖

private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) 移除zk節(jié)點(diǎn)

      /**? ? ?

     * remove zookeeper node path

   ** @param path zookeeper node path* @param zkNodeType zookeeper node type* @param failover is failover*/private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) {logger.info("{} node deleted : {}", zkNodeType.toString(), path);InterProcessMutex mutex = null;try {String failoverPath = getFailoverLockPath(zkNodeType);// create a distributed lockmutex = new InterProcessMutex(getZkClient(), failoverPath);mutex.acquire();String serverHost = getHostByEventDataPath(path);// handle dead server
       //處理 宕機(jī)服務(wù),刪除原來節(jié)點(diǎn),在dead路徑增加節(jié)點(diǎn),handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP);//failover server
       //是否故障轉(zhuǎn)移服務(wù)if(failover){failoverServerWhenDown(serverHost, zkNodeType);}}catch (Exception e){logger.error("{} server failover failed.", zkNodeType.toString());logger.error("failover exception ",e);}finally {releaseMutex(mutex);}}

zookeeper分布式鎖詳解

在分布式環(huán)境中 ,為了保證數(shù)據(jù)的一致性,經(jīng)常在程序的某個(gè)運(yùn)行點(diǎn)(例如,減庫存操作或者流水號(hào)生成等)需要進(jìn)行同步控制。以一個(gè)"流水號(hào)生成"的場(chǎng)景為例,普通的后臺(tái)應(yīng)用通常都是使用時(shí)間戳來生成流水號(hào),但是在用戶訪問量很大的情況下,可能會(huì)出現(xiàn)并發(fā)問題。下面通過示例程序就演示一個(gè)典型的并發(fā)問題:

public static void main(String[] args) throws Exception {CountDownLatch down = new CountDownLatch(1);for (int i=0;i<10;i++){new Thread(new Runnable() {@Overridepublic void run() {try {down.await();} catch (InterruptedException e) {e.printStackTrace();}SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");String orderNo = sdf.format(new Date());System.out.println("生成的訂單號(hào)是:"+orderNo);}}).start();}down.countDown(); }

輸出結(jié)果如下:

Thread[Thread-8,5,main]生成的訂單號(hào)是:14:41:26|098 Thread[Thread-4,5,main]生成的訂單號(hào)是:14:41:26|107 Thread[Thread-9,5,main]生成的訂單號(hào)是:14:41:26|108 Thread[Thread-3,5,main]生成的訂單號(hào)是:14:41:26|108 Thread[Thread-0,5,main]生成的訂單號(hào)是:14:41:26|108 Thread[Thread-6,5,main]生成的訂單號(hào)是:14:41:26|108 Thread[Thread-7,5,main]生成的訂單號(hào)是:14:41:26|108 Thread[Thread-2,5,main]生成的訂單號(hào)是:14:41:26|108 Thread[Thread-5,5,main]生成的訂單號(hào)是:14:41:26|108 Thread[Thread-1,5,main]生成的訂單號(hào)是:14:41:26|108

不難發(fā)現(xiàn),生成的10個(gè)訂單不少都是重復(fù)的,如果是實(shí)際的生產(chǎn)環(huán)境中,這顯然沒有滿足我們的也無需求。究其原因,就是因?yàn)樵跊]有進(jìn)行同步的情況下,出現(xiàn)了并發(fā)問題。下面我們來看看如何使用Curator實(shí)現(xiàn)分布式鎖功能。

Shared Reentrant Lock(分布式可重入鎖)

全局同步的可重入分布式鎖,任何時(shí)刻不會(huì)有兩個(gè)客戶端同時(shí)持有該鎖。Reentrant和JDK的ReentrantLock類似, 意味著同一個(gè)客戶端在擁有鎖的同時(shí),可以多次獲取,不會(huì)被阻塞。

相關(guān)的類

InterProcessMutex

使用

創(chuàng)建InterProcessMutex實(shí)例
InterProcessMutex提供了兩個(gè)構(gòu)造方法,傳入一個(gè)CuratorFramework實(shí)例和一個(gè)要使用的節(jié)點(diǎn)路徑,InterProcessMutex還允許傳入一個(gè)自定義的驅(qū)動(dòng)類,默認(rèn)是使用StandardLockInternalsDriver。

public InterProcessMutex(CuratorFramework client, String path); public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver);
獲取鎖

使用acquire方法獲取鎖,acquire方法有兩種:

public void acquire() throws Exception;

獲取鎖,一直阻塞到獲取到鎖為止。獲取鎖的線程在獲取鎖后仍然可以調(diào)用acquire() 獲取鎖(可重入)。 鎖獲取使用完后,調(diào)用了幾次acquire(),就得調(diào)用幾次release()釋放。

public boolean acquire(long time, TimeUnit unit) throws Exception;

與acquire()類似,等待time * unit時(shí)間獲取鎖,如果仍然沒有獲取鎖,則直接返回false。

釋放鎖

使用release()方法釋放鎖
線程通過acquire()獲取鎖時(shí),可通過release()進(jìn)行釋放,如果該線程多次調(diào)用 了acquire()獲取鎖,則如果只調(diào)用 一次release()該鎖仍然會(huì)被該線程持有。

注意:同一個(gè)線程中InterProcessMutex實(shí)例是可重用的,也就是不需要在每次獲取鎖的時(shí)候都new一個(gè)InterProcessMutex實(shí)例,用同一個(gè)實(shí)例就好。

鎖撤銷

InterProcessMutex 支持鎖撤銷機(jī)制,可通過調(diào)用makeRevocable()將鎖設(shè)為可撤銷的,當(dāng)另一線程希望你釋放該鎖時(shí),實(shí)例里的listener會(huì)被調(diào)用。 撤銷機(jī)制是協(xié)作的。

示例代碼(官網(wǎng))

共享資源

public class FakeLimitedResource {//總共250張火車票private Integer ticket = 250;public void use() throws InterruptedException {try {System.out.println("火車票還剩"+(--ticket)+"張!");}catch (Exception e){e.printStackTrace();}} }

使用鎖操作資源

public class ExampleClientThatLocks {/** 鎖 */private final InterProcessMutex lock;/** 共享資源 */private final FakeLimitedResource resource;/** 客戶端名稱 */private final String clientName;public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {this.resource = resource;this.clientName = clientName;lock = new InterProcessMutex(client, lockPath);}public void doWork(long time, TimeUnit unit) throws Exception {if ( !lock.acquire(time, unit) ) {throw new IllegalStateException(clientName + " could not acquire the lock");}try {System.out.println(clientName + " has the lock");//操作資源resource.use();} finally {System.out.println(clientName + " releasing the lock");lock.release(); //總是在Final塊中釋放鎖。}} }

客戶端

public class LockingExample {private static final int QTY = 5;private static final int REPETITIONS = QTY * 10;private static final String CONNECTION_STRING = "172.20.10.9:2181";private static final String PATH = "/examples/locks";public static void main(String[] args) throws Exception {//FakeLimitedResource模擬某些外部資源,這些外部資源一次只能由一個(gè)進(jìn)程訪問final FakeLimitedResource resource = new FakeLimitedResource();ExecutorService service = Executors.newFixedThreadPool(QTY);try {for ( int i = 0; i < QTY; ++i ){final int index = i;Callable<Void> task = new Callable<Void>() {@Overridepublic Void call() throws Exception {CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECTION_STRING, new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE));try {client.start();ExampleClientThatLocks example = new ExampleClientThatLocks(client, PATH, resource, "Client " + index);for ( int j = 0; j < REPETITIONS; ++j ) {example.doWork(10, TimeUnit.SECONDS);}}catch ( InterruptedException e ){Thread.currentThread().interrupt();}catch ( Exception e ){e.printStackTrace();}finally{CloseableUtils.closeQuietly(client);}return null;}};service.submit(task);}service.shutdown();service.awaitTermination(10, TimeUnit.MINUTES);}catch (Exception e){e.printStackTrace();}} }

起五個(gè)線程,即五個(gè)窗口賣票,五個(gè)客戶端分別有50張票可以賣,先是嘗試獲取鎖,操作資源后,釋放鎖。

轉(zhuǎn)自:https://blog.csdn.net/qq_34021712/article/details/82878396

總結(jié)

以上是生活随笔為你收集整理的海豚调度Dolphinscheduler源码分析(三)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 欧美日韩色图 | 欧美成人二区 | 色老汉视频| 国产伦精品一区二区免费 | 国产特黄一级片 | 成人一级片| 久久福利网站 | 亚洲精品视频观看 | 亚洲天堂区 | 吻胸摸激情床激烈视频 | 日本伦理片在线看 | wwyoujizzcom| av在线免费观看网站 | 日日噜噜夜夜狠狠久久丁香五月 | av网站免费在线看 | 91九色蝌蚪91por成人 | 色一情一乱一区二区三区 | 免费播放片大片 | 西西人体做爰大胆gogo | 日本不卡一区二区 | 一区二区国产精品视频 | 新天堂在线 | 8x8x永久免费视频 | av一起看香蕉 | 黄色天堂网站 | 国产亚洲欧美日韩精品一区二区三区 | 97精品久久人人爽人人爽 | 中国丰满老太hd | 婷婷狠狠干 | 成人一区二区三区仙踪林 | av片久久| 国产av无码专区亚洲精品 | 精品无码国产一区二区三区av | 黑巨茎大战欧美白妞 | 韩国黄色大片 | 色噜噜影院| 国产欧美一区二区三区四区 | 亚洲 成人 av | 婷婷综合av | 欧美专区一区 | 美女毛片 | 开心激情婷婷 | 色婷婷色丁香 | 色综合区| 欧美日韩亚洲精品一区二区 | 日本免费在线播放 | 超碰97人人射妻 | 女教师三上悠亚ssni-152 | 葵司ssni-879在线播放 | 91国产在线免费观看 | 亚洲人人干 | 亚洲tv在线 | 无码人妻精品一区二区50 | 午夜啪啪网 | 欧美老女人性生活 | 亚洲区色 | 波多野结衣视频网站 | 黄色成年人视频 | 日本激情一区二区三区 | 搡老熟女国产 | 91网站免费视频 | 樱空桃在线观看 | 日本黄色高清 | 久久神马 | xxx麻豆| 黄色三极片| 成年人国产 | 亚洲夜夜夜| 欧美视频在线一区 | 日韩精品无码一区二区三区 | 亚洲激情自拍偷拍 | 91欧美日韩麻豆精品 | 久久av免费| 精品福利一区二区三区 | 欧美野外猛男的大粗鳮 | 成人在线视频免费看 | 狂野欧美性猛交xxxx巴西 | 国产亚洲欧美精品久久久久久 | 久久人人添人人爽添人人片 | 福利免费在线观看 | 欧美日韩高清在线观看 | 水蜜桃色314在线观看 | 91视频www | 米奇色| 亚洲视频一区在线播放 | 亚洲免费大全 | 黄色动漫在线免费观看 | 麻豆影视av | 青青草原国产在线 | 91精品国产91久久久久青草 | 中文字幕在线色 | 国产乱码在线观看 | 黄色在线免费 | 瑟瑟久久| 最新国产网址 | 亚洲一区二区三区免费在线观看 | 日韩欧美中文字幕一区 | 在线观看欧美国产 | 日韩免费一级片 |