多线程之死锁介绍及预防
綜述
在遇到線程安全問題的時候,我們會使用加鎖機制來確保線程安全,但如果過度地使用加鎖,則可能導致鎖順序死鎖(Lock-Ordering Deadlock)。或者有的場景我們使用線程池和信號量來限制資源的使用,但這些被限制的行為可能會導致資源死鎖(Resource DeadLock)。
我們知道Java應用程序不像數據庫服務器,能夠檢測一組事務中死鎖的發生,進而選擇一個事務去執行;在Java程序中如果遇到死鎖將會是一個非常嚴重的問題,它輕則導致程序響應時間變長,系統吞吐量變小;重則導致應用中的某一個功能直接失去響應能力無法提供服務,這些后果都是不堪設想的。因此我們應該及時發現和規避這些問題。
?
死鎖產生的條件
死鎖的產生有四個必要的條件
- 互斥使用,即當資源被一個線程占用時,別的線程不能使用
- 不可搶占,資源請求者不能強制從資源占有者手中搶奪資源,資源只能由占有者主動釋放
- 請求和保持,當資源請求者在請求其他資源的同時保持對原有資源的占有
- 循環等待,多個線程存在環路的鎖依賴關系而永遠等待下去,例如T1占有T2的資源,T2占有T3的資源,T3占有T1的資源,這種情況可能會形成一個等待環路
對于死鎖產生的四個條件只要能破壞其中一條即可讓死鎖消失,但是條件一是基礎,不能被破壞。
?
各種死鎖
鎖順序死鎖
死鎖 當多個線程同時需要同一個鎖,但是以不同的方式獲取它們。
例如,如果線程1持有鎖A,然后請求鎖B,線程2已經持有鎖B,然后請求鎖A,這樣一個死鎖就發生了。線程1永遠也得不到鎖B,線程2永遠也得不到鎖A。它們永遠也不知道這種情況。
public class TreeNode {TreeNode parent ? = null; ?List ? ? children = new ArrayList(); ?public synchronized void addChild(TreeNode child){if(!this.children.contains(child)) {this.children.add(child);child.setParentOnly(this);}}public synchronized void addChildOnly(TreeNode child){if(!this.children.contains(child){this.children.add(child);}}public synchronized void setParent(TreeNode parent){this.parent = parent;parent.addChildOnly(this);} ?public synchronized void setParentOnly(TreeNode parent){this.parent = parent;} }如果一個線程(1)調用parent.addChild(child)的同時其他線程(2)在同一個parent和child實例上調用child.setParent(parent)方法,就會發生死鎖。 下面是說明這個問題的一些偽代碼:
Thread 1: parent.addChild(child); //locks parent--> child.setParentOnly(parent); ? Thread 2: child.setParent(parent); //locks child--> parent.addChildOnly()首先,線程1調用parent.addChild(child),因為addChild()是同步的,所以線程1會鎖住parent對象,防止其他線程獲得。
然后,線程2調用child.setParent(parent),因為setParent()是同步的,所有線程2會鎖住child對象,防止其他線程獲得。
現在,parent和child對象被這兩個不同的線程鎖住。接下來,線程1嘗試調用child.setParentOnly()方法,但是child對象被線程2鎖住了,因此這個調用就會阻塞在那。線程2也嘗試調用parent.addChildOnly()方法,但是parent對象被線程1鎖住了。線程2也會阻塞在這個方法的調用上?,F在兩個線程都在等待獲取被其他線程持有的鎖。
線程確實需要同時獲得鎖。例如,如果線程1早線程2一點點,獲得了鎖A和B,然后,線程2就會在嘗試獲取鎖B時,阻塞在那。這樣就不會有死鎖發生。由于,線程調度是不確定的,所以,我們無法準確預測什么時候會發生死鎖。
@Slf4j
public class DeadLock implements Runnable {
? ? public int flag = 1;
? ? //靜態對象是類的所有對象共享的
? ? private static Object o1 = new Object(), o2 = new Object();
? ? @Override
? ? public void run() {
? ? ? ? log.info("flag:{}", flag);
? ? ? ? if (flag == 1) {
? ? ? ? ? ? synchronized (o1) {
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? Thread.sleep(500);
? ? ? ? ? ? ? ? } catch (Exception e) {
? ? ? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? synchronized (o2) {
? ? ? ? ? ? ? ? ? ? log.info("1");
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? if (flag == 0) {
? ? ? ? ? ? synchronized (o2) {
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? Thread.sleep(500);
? ? ? ? ? ? ? ? } catch (Exception e) {
? ? ? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? synchronized (o1) {
? ? ? ? ? ? ? ? ? ? log.info("0");
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? public static void main(String[] args) {
? ? ? ? DeadLock td1 = new DeadLock();
? ? ? ? DeadLock td2 = new DeadLock();
? ? ? ? td1.flag = 1;
? ? ? ? td2.flag = 0;
? ? ? ? //td1,td2都處于可執行狀態,但JVM線程調度先執行哪個線程是不確定的。
? ? ? ? //td2的run()可能在td1的run()之前運行
? ? ? ? new Thread(td1).start();
? ? ? ? new Thread(td2).start();
? ? }
}
?
更復雜的死鎖
Thread 1 locks A, waits for B Thread 2 locks B, waits for C Thread 3 locks C, waits for D Thread 4 locks D, waits for A線程1等待線程2,線程2等待線程3,線程3等待線程4,線程4等待線程1.
常見的數據庫死鎖
一個更復雜的死鎖發生場景,就是數據庫事務。一個數據庫可能包含許多SQL更新請求。在一個事務中,要更新一條記錄,但這條記錄被來自其它事務的更新請求鎖住了,知道第一個事務完成。在數據庫中,同一個事務內的每條更新請求可能都會鎖住一些記錄。
如果多個事務同時運行,并且更新相同的記錄。這就會有發生死鎖的風險。
例如:
Transaction 1, request 1, locks record 1 for update Transaction 2, request 1, locks record 2 for update Transaction 1, request 2, tries to lock record 2 for update. Transaction 2, request 2, tries to lock record 1 for update.一個事務事先并不知道所有的它將要鎖住的記錄,所有在數據庫中檢測和預防死鎖變得更加困難。
重入鎖死
重入鎖死是一種類似于死鎖和嵌套管程失敗的情景。
如果一個線程重入獲得了一個非重入的鎖,讀寫鎖或者一些其他的同步器就會發生重入鎖死。重入意味著一個線程已經持有了一個鎖可以再次持有它。Java的同步塊是可以沖入的。因此,下面這段代碼執行將不會出現問題。
public class Reentrant{public synchronized outer(){inner();} ?public synchronized inner(){//do something} }outer和inner方法都被聲明為synchronized,這等同于一個synchronized(this)塊。如果一個線程在outer()方法里面調用inner()方法將不會出現問題,因為這兩個方法都被同步在同一個管程對象"this"上。如果一個線程已經持有了一個管程對象上的鎖,它就可以訪問同一個管程對象上所有的同步塊。這被稱作可重入。
下面Lock的實現是不可重入的:
public class Lock{ private boolean isLocked = false; ?public synchronized void lock()throws InterruptedException{while(isLocked){wait();}isLocked = true;} ?public synchronized void unlock(){isLocked = false;notify();} }如果一個線程兩次調用lock()方法而在兩次調用之間沒有調用unlock(),第二次調用lock()將會阻塞。一個重入鎖死就發生了。
要避免重入鎖死你有兩種選擇:
-
編寫代碼避免獲取已經持有的鎖
-
使用可重入鎖
使用哪種方法更適合于你的程序取決于具體的情景??芍厝腈i的性能常常不如非重入鎖,而且更難實現,可重入鎖通常沒有不可重入鎖那么好的表現,而且實現起來復雜,但這些情況在你的項目中也許算不上什么問題。無論你的項目用鎖來實現方便還是不用鎖方便,可重入特性都需要根據具體問題具體分析。
如何預防死鎖 呢?
鎖排序 當多個線程獲取同一個鎖但是以不同的順序,就會發生死鎖。
如果你確保所有的鎖一直以相同的順序被其他線程獲取,死鎖就不會發生??聪旅孢@例子:
Thread 1: ?lock A lock B Thread 2: ?wait for Alock C (when A locked) Thread 3: ?wait for Await for Bwait for C如果一個線程,像線程3,需要幾個鎖,就必須規定其獲得鎖的順序。在它獲得序列中靠前的鎖之前不能夠獲得靠后的鎖。
比如,線程2或者線程3首先要獲得鎖A,才能夠獲得鎖C。因為,線程A持有鎖A,線程2或者線程3首先必須等待直到鎖A被釋放。然后,在它們能夠獲得鎖B或者C之前,必須成功獲得鎖A。
鎖排序是一個簡單但很有效的預防死鎖的機制。但是,它僅適用于你事先知道所有的鎖的情況下。它并不適用于所有的場景。
鎖超時
另一個預防死鎖的機制是在請求鎖時設置超時時長,也就說一個線程在設置的超時時長內如果沒有獲得鎖就會放棄。如果一個線程在給定時長內沒有成功獲取所有必要的鎖,它將會回退,釋放所有的鎖請求,隨機等待一段時間,然后重試。隨機等待的過程中給了其他線程獲取這個鎖的一個機會,因此,這也可以讓程序在沒有鎖的情況下繼續運行。
Thread 1 locks A Thread 2 locks B ? Thread 1 attempts to lock B but is blocked Thread 2 attempts to lock A but is blocked ? Thread 1's lock attempt on B times out Thread 1 backs up and releases A as well Thread 1 waits randomly (e.g. 257 millis) before retrying. ? Thread 2's lock attempt on A times out Thread 2 backs up and releases B as well Thread 2 waits randomly (e.g. 43 millis) before retrying.在上面的例子中,線程2將會在線程之前大約200毫秒重試去獲得鎖,所以,大體上將會獲得所有的鎖。已經在等待的線程A一直在嘗試獲取鎖A。當線程2完成時,線程1也將會獲得所有的鎖。
我們需要記住一個問題,上面提到的僅僅是因為一個鎖超時了,而不是說線程發生;了死鎖,這也僅僅是說這個線程獲取這個鎖花費了多少時間去完成任務。
另外,如果線程足夠多,盡管設置了超時和重試,也是會有發生死鎖的風險。2個線程各自在重試前等待0~500毫秒也許不會發生死鎖,但如果10或者20個線程情況就不同了。這種情況發生死鎖的概率要比兩個線程的情況要高得多。
鎖超時機制存在的一個問題是在Java中在進入一個同步代碼塊時設置時長是不可能的。你不得不創建一個自定義的鎖相關的類或者使用在Java5中java.util.concurrency包中的并發結構之一。
死鎖檢測
死鎖檢測是一個重量級的死鎖預防機制,主要用于在鎖排序和鎖超時都不可用的場景中。
當一個線程請求一個鎖當時請求被禁止時,這個線程可以遍歷鎖圖(lock graph)檢查是否發生了死鎖。例如,如果一個線程A請求鎖7,但是鎖7被線程B持有,然后,線程A可以檢測線程B是否有請求任何線程A持有的鎖。如果有,就會發生一個死鎖。
當然,一個死鎖場景可能比兩個對象分別持有對方的鎖要復雜的鎖。線程A可能等待線程B,線程B等待線程C,線程C等待線程D,線程D等待線程A。為了檢測死鎖,線程A必須一次測試所有的被線程B請求的鎖。從線程B的鎖請求線程A到達線程C,然后又到達線程D,從上面的檢測中,線程A找到線程A自身持有的一個鎖。這樣,線程A就會知道發生了死鎖。
下面是一個被四個線程持有和請求鎖的圖。類似于這樣的一個數據結構可以用來檢測死鎖。
那么,如果檢測到一個死鎖,這些線程可以做些什么?
一個可能的做法就是釋放所有的鎖,回退,隨機等待一段時間然后重試。這種做法與鎖超時機制非常相似除了只有發生死鎖時線程才會回退(backup)。而不僅僅是因為鎖請求超時。然而,如果大量的線程去請求同一個鎖,可能重復的發生死鎖,盡管存在回退和等待機制。
一個更好的做法就是為這些線程設置優先級,這樣一來,就會只有一個或者一些線程在遇到死鎖時發生回退。剩下的線程繼續請求鎖假如沒有死鎖再發生。如果賦予線程的優先級是固定的,同樣的線程總是擁有更高的優先級。為了避免這種情況,我們可以在發生死鎖時,隨機的為線程設置優先級。
總結
以上是生活随笔為你收集整理的多线程之死锁介绍及预防的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何使用DPA华为备份一体机备份达梦数据
- 下一篇: 【latex】经验总结(待整理)