海豚调度Dolphinscheduler源码分析(三)
今天繼續(xù)分析海豚調(diào)度的源碼
上回分析的是dolphinscheduler-service模塊zookeeper相關(guān)的代碼
這回分析是dolphinscheduler-server模塊zookeeper相關(guān)的代碼
ZkMasterClient master服務(wù)zk客戶端類
類繼承的關(guān)系如下:
這個(gè)類的方法如下:
方法介紹:
- start()??根據(jù)路徑dolphinscheduler/lock/failover/master 創(chuàng)建一個(gè)分布式鎖,并進(jìn)行初始化,檢查是否有master節(jié)點(diǎn)競(jìng)爭鎖,確保只有一個(gè)主master,如果只有一個(gè)master節(jié)點(diǎn),那么無法進(jìn)行master服務(wù)的故障轉(zhuǎn)移
- dataChange() 變更zk節(jié)點(diǎn)
- removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) 移除zookeeper 節(jié)點(diǎn),并在/dead路徑添加節(jié)點(diǎn),并會(huì)判斷是否需要容錯(cuò)
- handleDeadServer()? 父類方法,就是處理宕機(jī)服務(wù)的zookeeper路徑,將獲取節(jié)點(diǎn)刪除,添加/dead路徑數(shù)據(jù)
- failoverServerWhenDown() 當(dāng)服務(wù)宕機(jī)后,轉(zhuǎn)移服務(wù),分為master服務(wù)和server服務(wù)
- checkTaskInstanceNeedFailover()
- failoverWorker()? 將worker上的task任務(wù)進(jìn)行故障轉(zhuǎn)移
- 如果是yarn任務(wù),干掉yarn任務(wù)
- 將任務(wù)狀態(tài)變更為需要故障轉(zhuǎn)移
- ? 當(dāng)工作節(jié)點(diǎn)全部為null時(shí),將所有任務(wù)進(jìn)行故障轉(zhuǎn)移
zk分布式鎖獲取代碼如下:
public void start() {//Curator是zk的一個(gè)客戶端框架,其中分裝了分布式公平可重入互斥鎖,最為常見是InterProcessMutex
InterProcessMutex mutex = null;
try {
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master
///根據(jù)這個(gè)路徑dolphinscheduler/lock/failover/master 創(chuàng)建一個(gè)分布式鎖
String znodeLock = getMasterStartUpLockPath();
//InterProcessMutex的構(gòu)造方法,需要一個(gè)客戶端和路徑
mutex = new InterProcessMutex(getZkClient(), znodeLock);
//獲取鎖,鎖的獲取,最后必須釋放
mutex.acquire();
// init system znode
this.initSystemZNode();
//檢查是否有master節(jié)點(diǎn)
while (!checkZKNodeExists(NetUtils.getHost(), ZKNodeType.MASTER)){
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
// self tolerant
//如果活動(dòng)的master節(jié)點(diǎn)只有1個(gè),無法進(jìn)行master服務(wù)的容錯(cuò),故failoverMaster(null)
if (getActiveMasterNum() == 1) {
failoverWorker(null, true);
failoverMaster(null);
}
}catch (Exception e){
logger.error("master start up exception",e);
}finally {
//釋放鎖,這個(gè)方法是父類AbstractZKClient的,在finally中釋放,保證鎖最后能夠釋放
releaseMutex(mutex);
}
}
對(duì)于InterProcessMutex,Curator是ZooKeeper的一個(gè)客戶端框架,其中封裝了分布式互斥鎖的實(shí)現(xiàn),最為常用的是InterProcessMutex
InterProcessMutex基于Zookeeper實(shí)現(xiàn)了分布式的公平可重入互斥鎖,類似于單個(gè)JVM進(jìn)程內(nèi)的ReentrantLock(fair=true)
全局同步的可重入分布式鎖,任何時(shí)刻不會(huì)有兩個(gè)客戶端同時(shí)持有該鎖。Reentrant和JDK的ReentrantLock類似, 意味著同一個(gè)客戶端在擁有鎖的同時(shí),可以多次獲取,不會(huì)被阻塞
相關(guān)鏈接:https://blog.csdn.net/hosaos/article/details/89521537
相關(guān)鏈接:https://www.cnblogs.com/a-du/p/9876314.html
相關(guān)鏈接:https://blog.csdn.net/qq_34021712/article/details/82878396
主要方法:
//獲取鎖,若失敗則阻塞等待直到成功,支持重入 public void acquire() throws Exception //超時(shí)獲取鎖,超時(shí)失敗 public boolean acquire(long time, TimeUnit unit) throws Exception //釋放鎖,一般在finally中釋放 public void release() throws Exception注意點(diǎn),調(diào)用acquire()方法后需相應(yīng)調(diào)用release()來釋放鎖
private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) 移除zk節(jié)點(diǎn)/**? ? ?
* remove zookeeper node path
** @param path zookeeper node path* @param zkNodeType zookeeper node type* @param failover is failover*/private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) {logger.info("{} node deleted : {}", zkNodeType.toString(), path);InterProcessMutex mutex = null;try {String failoverPath = getFailoverLockPath(zkNodeType);// create a distributed lockmutex = new InterProcessMutex(getZkClient(), failoverPath);mutex.acquire();String serverHost = getHostByEventDataPath(path);// handle dead server//處理 宕機(jī)服務(wù),刪除原來節(jié)點(diǎn),在dead路徑增加節(jié)點(diǎn),handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP);//failover server
//是否故障轉(zhuǎn)移服務(wù)if(failover){failoverServerWhenDown(serverHost, zkNodeType);}}catch (Exception e){logger.error("{} server failover failed.", zkNodeType.toString());logger.error("failover exception ",e);}finally {releaseMutex(mutex);}}
zookeeper分布式鎖詳解
在分布式環(huán)境中 ,為了保證數(shù)據(jù)的一致性,經(jīng)常在程序的某個(gè)運(yùn)行點(diǎn)(例如,減庫存操作或者流水號(hào)生成等)需要進(jìn)行同步控制。以一個(gè)"流水號(hào)生成"的場(chǎng)景為例,普通的后臺(tái)應(yīng)用通常都是使用時(shí)間戳來生成流水號(hào),但是在用戶訪問量很大的情況下,可能會(huì)出現(xiàn)并發(fā)問題。下面通過示例程序就演示一個(gè)典型的并發(fā)問題:
public static void main(String[] args) throws Exception {CountDownLatch down = new CountDownLatch(1);for (int i=0;i<10;i++){new Thread(new Runnable() {@Overridepublic void run() {try {down.await();} catch (InterruptedException e) {e.printStackTrace();}SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");String orderNo = sdf.format(new Date());System.out.println("生成的訂單號(hào)是:"+orderNo);}}).start();}down.countDown(); }輸出結(jié)果如下:
Thread[Thread-8,5,main]生成的訂單號(hào)是:14:41:26|098 Thread[Thread-4,5,main]生成的訂單號(hào)是:14:41:26|107 Thread[Thread-9,5,main]生成的訂單號(hào)是:14:41:26|108 Thread[Thread-3,5,main]生成的訂單號(hào)是:14:41:26|108 Thread[Thread-0,5,main]生成的訂單號(hào)是:14:41:26|108 Thread[Thread-6,5,main]生成的訂單號(hào)是:14:41:26|108 Thread[Thread-7,5,main]生成的訂單號(hào)是:14:41:26|108 Thread[Thread-2,5,main]生成的訂單號(hào)是:14:41:26|108 Thread[Thread-5,5,main]生成的訂單號(hào)是:14:41:26|108 Thread[Thread-1,5,main]生成的訂單號(hào)是:14:41:26|108不難發(fā)現(xiàn),生成的10個(gè)訂單不少都是重復(fù)的,如果是實(shí)際的生產(chǎn)環(huán)境中,這顯然沒有滿足我們的也無需求。究其原因,就是因?yàn)樵跊]有進(jìn)行同步的情況下,出現(xiàn)了并發(fā)問題。下面我們來看看如何使用Curator實(shí)現(xiàn)分布式鎖功能。
Shared Reentrant Lock(分布式可重入鎖)
全局同步的可重入分布式鎖,任何時(shí)刻不會(huì)有兩個(gè)客戶端同時(shí)持有該鎖。Reentrant和JDK的ReentrantLock類似, 意味著同一個(gè)客戶端在擁有鎖的同時(shí),可以多次獲取,不會(huì)被阻塞。
相關(guān)的類
InterProcessMutex
使用
創(chuàng)建InterProcessMutex實(shí)例
InterProcessMutex提供了兩個(gè)構(gòu)造方法,傳入一個(gè)CuratorFramework實(shí)例和一個(gè)要使用的節(jié)點(diǎn)路徑,InterProcessMutex還允許傳入一個(gè)自定義的驅(qū)動(dòng)類,默認(rèn)是使用StandardLockInternalsDriver。
獲取鎖
使用acquire方法獲取鎖,acquire方法有兩種:
public void acquire() throws Exception;獲取鎖,一直阻塞到獲取到鎖為止。獲取鎖的線程在獲取鎖后仍然可以調(diào)用acquire() 獲取鎖(可重入)。 鎖獲取使用完后,調(diào)用了幾次acquire(),就得調(diào)用幾次release()釋放。
public boolean acquire(long time, TimeUnit unit) throws Exception;與acquire()類似,等待time * unit時(shí)間獲取鎖,如果仍然沒有獲取鎖,則直接返回false。
釋放鎖
使用release()方法釋放鎖
線程通過acquire()獲取鎖時(shí),可通過release()進(jìn)行釋放,如果該線程多次調(diào)用 了acquire()獲取鎖,則如果只調(diào)用 一次release()該鎖仍然會(huì)被該線程持有。
注意:同一個(gè)線程中InterProcessMutex實(shí)例是可重用的,也就是不需要在每次獲取鎖的時(shí)候都new一個(gè)InterProcessMutex實(shí)例,用同一個(gè)實(shí)例就好。
鎖撤銷
InterProcessMutex 支持鎖撤銷機(jī)制,可通過調(diào)用makeRevocable()將鎖設(shè)為可撤銷的,當(dāng)另一線程希望你釋放該鎖時(shí),實(shí)例里的listener會(huì)被調(diào)用。 撤銷機(jī)制是協(xié)作的。
示例代碼(官網(wǎng))
共享資源
public class FakeLimitedResource {//總共250張火車票private Integer ticket = 250;public void use() throws InterruptedException {try {System.out.println("火車票還剩"+(--ticket)+"張!");}catch (Exception e){e.printStackTrace();}} }使用鎖操作資源
public class ExampleClientThatLocks {/** 鎖 */private final InterProcessMutex lock;/** 共享資源 */private final FakeLimitedResource resource;/** 客戶端名稱 */private final String clientName;public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {this.resource = resource;this.clientName = clientName;lock = new InterProcessMutex(client, lockPath);}public void doWork(long time, TimeUnit unit) throws Exception {if ( !lock.acquire(time, unit) ) {throw new IllegalStateException(clientName + " could not acquire the lock");}try {System.out.println(clientName + " has the lock");//操作資源resource.use();} finally {System.out.println(clientName + " releasing the lock");lock.release(); //總是在Final塊中釋放鎖。}} }客戶端
public class LockingExample {private static final int QTY = 5;private static final int REPETITIONS = QTY * 10;private static final String CONNECTION_STRING = "172.20.10.9:2181";private static final String PATH = "/examples/locks";public static void main(String[] args) throws Exception {//FakeLimitedResource模擬某些外部資源,這些外部資源一次只能由一個(gè)進(jìn)程訪問final FakeLimitedResource resource = new FakeLimitedResource();ExecutorService service = Executors.newFixedThreadPool(QTY);try {for ( int i = 0; i < QTY; ++i ){final int index = i;Callable<Void> task = new Callable<Void>() {@Overridepublic Void call() throws Exception {CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECTION_STRING, new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE));try {client.start();ExampleClientThatLocks example = new ExampleClientThatLocks(client, PATH, resource, "Client " + index);for ( int j = 0; j < REPETITIONS; ++j ) {example.doWork(10, TimeUnit.SECONDS);}}catch ( InterruptedException e ){Thread.currentThread().interrupt();}catch ( Exception e ){e.printStackTrace();}finally{CloseableUtils.closeQuietly(client);}return null;}};service.submit(task);}service.shutdown();service.awaitTermination(10, TimeUnit.MINUTES);}catch (Exception e){e.printStackTrace();}} }起五個(gè)線程,即五個(gè)窗口賣票,五個(gè)客戶端分別有50張票可以賣,先是嘗試獲取鎖,操作資源后,釋放鎖。
轉(zhuǎn)自:https://blog.csdn.net/qq_34021712/article/details/82878396
總結(jié)
以上是生活随笔為你收集整理的海豚调度Dolphinscheduler源码分析(三)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一行代码实现蒲公英市场APP检查更新
- 下一篇: CDH/HDP迁移之路