多线程知识梳理(2) - 并发编程的艺术笔记
?
layout: post
title: 《Java并發編程的藝術》筆記
categories: Java
excerpt: The Art of Java Concurrency Programming.
<img src="http://upload-images.jianshu.io/upload_images/658453-a94405da52987372.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240" width="70%">
好記性不如爛筆頭。多讀多思考。
基本概念 & Java 并發機制的底層實現原理
上下文切換:CPU在任務切換前會保存前一個任務的狀態,以便下次切換回這個任務時,可以再加載這個任務的狀態。所以任務從保存到再加載的過程就是一次任務切換。
內存屏障:一組處理器指令,用于實現對內存操作的順序限制。
鎖的升級
現在我們應該知道,Synchronized 是通過對象內部的一個叫做監視器鎖(monitor)來實現的。但是監視器鎖本質又是依賴于底層的操作系統的 Mutex Lock 來實現的。而操作系統實現線程之間的切換這就需要從用戶態轉換到核心態,這個成本非常高,狀態之間的轉換需要相對比較長的時間,這就是為什么 Synchronized 效率低的原因。因此,這種依賴于操作系統 Mutex Lock 所實現的鎖我們稱之為“重量級鎖”。JDK 中對 Synchronized 做的種種優化,其核心都是為了減少這種重量級鎖的使用。JDK1.6 以后,為了減少獲得鎖和釋放鎖所帶來的性能消耗,提高性能,引入了“輕量級鎖”和“偏向鎖”。
每一個線程在準備獲取共享資源時:
已經獲取偏向鎖的線程為線程1, 新線程為:線程2
第一步,線程2檢查MarkWord里面是不是放的自己的ThreadId ,如果是,表示當前線程是處于 “偏向鎖” ,就可以直接執行方法體了。
第二步,如果MarkWord不是自己的ThreadId, 用CAS來執行切換,如果不成功,線程2根據MarkWord里現有的ThreadId,通知之前線程暫停,之前線程將Markword的內容置為空。 (線程1的同步體執行完后 會根據線程2的請求,暫停線程,置空markword里面的線程ID)
第三步,這樣線程2就以輕量級的鎖機制工作,如果這時線程3進入,就會進入自旋模式等待鎖
第四步,自旋的線程3在自旋過程中,成功獲得資源(即之前獲的資源的線程執行完成并釋放了共享資源),則整個狀態依然處于 輕量級鎖的狀態,如果自旋失敗 ,即自旋時間結束,仍然沒有獲取輕量級鎖,進入重量級鎖。
第五步,線程3進入重量級鎖,將對象的markword修改為指向重量級鎖的指針,線程2執行為同步體,修改Markword時,會失敗,這樣線程2就會意識到進入重量級鎖了,
第六步,線程2釋放鎖,通知重量級鎖喚醒阻塞隊列。
輕量級鎖是為了在線程交替執行同步塊時提高性能,而偏向鎖則是在只有一個線程執行同步塊時進一步提高性能。
處理器實現原子操作的方式:總線鎖(鎖住整個內存);緩存鎖(在處理器內部緩存中實現原子操作,使其他處理器不能緩存 i 的緩存行)。
Java 實現原子操作的方式:鎖和循環 CAS(Compare and Swap 比較并交換);CAS 利用了處理器的 CMPXCHG 指令(該指令是原子的)。
除了偏向鎖,JVM 實現鎖的方式都用了循環 CAS,即當一個線程想進入同步塊的時候使用循環 CAS 的方式來獲取鎖,當它退出同步塊的時候使用循環 CAS 釋放鎖。
// 循環CAS public final int incrementAndGet() {for (;;) {int current = get();int next = current + 1;if (compareAndSet(current, next))return next;} }Java內存模型
3個同步原語:synchronized,volatile,final;
并發編程的兩個關鍵問題:線程間通信和線程間同步;
在共享內存的并發模型中,線程之間共享內存的公共狀態,通過讀-寫內存的公共狀態進行隱式通信。在消息傳遞的并發模型中,線程之間沒有公共狀態,必須通過發送消息來顯式進行通信。
同步是指用于控制不同線程間操作發生相對順序的機制。在共享內存并發模型里,同步是顯式進行的——程序員需要顯式指定某個方法或某段代碼需要在線程間互斥執行。在消息傳遞的并發模型里,由于消息的發送必須在消息的接收之前,因此同步是隱式進行的。
Java的并發采用的是共享內存模型,所以Java線程之間的通信總是隱式進行。
Java內存模型(JMM):
(本地內存是JMM的一個抽象概念,并不真實存在。它涵蓋了緩存、寫緩沖區、寄存器以及其他硬件和編譯器優化。(不完全是內存,也不完全是Cache))
從上圖來看,線程A與線程B之間如要通信的話,必須要經歷下面2個步驟:
重要概念:重排序,編譯器重排序和處理器重排序,為了提高并行度。
數據依賴:寫后讀,寫后寫,讀后寫;這3種情況,只要重排序兩個操作的執行順序,程序的執行結果就會改變;所以重排序時會遵守數據依賴性,不會改變存在數據依賴關系的兩個操作的執行順序。
控制依賴:由于處理器會采用分支預測技術來提高并行度,i = a * a可能會被重排序到if (flag)之前執行——這在單線程中是沒問題的,但在多線程環境下就可能改變程序的執行結果。
if (flag) {i = a * a; }as-if-serial語義:不管怎么重排序,單線程程序的執行結果不能被改變。
happens-before
JSR-133使用happens-before的概念來闡述操作之間的內存可見性。在JMM中,如果一個操作執行的結果需要對另一個操作可見,那么這兩個操作之間必須要存在happens-before關系。這里提到的兩個操作既可以是在一個線程之內,也可以是在不同線程之間。
happen-before的定義如下:
as-if-serial語義保證單線程內的程序執行結果不會改變,happens-before保證正確同步的多線程程序的執行結果不會被改變。
總共有六條規則:
- 程序順序規則:一個線程中的每個操作,happens-before于隨后該線程中的任意后續操作
- 監視器鎖規則:對一個鎖的解鎖,happens-before于隨后對這個鎖的獲取
- volatile變量規則:對一個volatile域的寫,happens-before于對這個變量的讀
- 傳遞性:如果A happens-before B,B happens-before C,那么A happens-before C
- start規則:如果線程A執行線程B的start方法,那么線程A的ThreadB.start()happens-before于線程B的任意操作
- join規則:如果線程A執行線程B的join方法,那么線程B的任意操作happens-before于線程A從TreadB.join()方法成功返回。
順序一致性內存模型:順序一致性內存模型是一個被計算機科學家理想化了的理論參考模型,它為程序員提供了極強的內存可見性保證。順序一致性內存模型有兩大特性:
順序一致性內存模型的視圖:
在JMM中,臨界區內的代碼可以重排序,但不允許臨界區內的代碼“溢出”到臨界區之外,那樣會破壞監視器的內存語義。
JMM保證:單線程程序和正確同步的多線程程序的執行結果與在順序一致性內存模型中的執行結果相同。
volatile
對volatile變量的單個讀寫,可以看成是使用了同一個鎖對這些單個讀寫作了同步。(這樣,即使是64位的long/double型變量,只要用volatile修飾,對該變量的讀寫就具有了原子性。注意,++這種復合操作依舊不具有原子性。)
volatile變量自身的特性:
- 可見性。對一個volatile變量的讀,總是能看到(任意線程)對這個volatile變量最后的寫入。
- 原子性:對任意單個volatile變量的讀/寫具有原子性,但類似于volatile++這種復合操作不具有原子性。
volatile的內存語義(對內存可見性的影響)
- 當寫一個volatile變量時,JMM會把該線程對應的本地內存中的共享變量刷新到主內存。
- 當讀一個volatile變量時,JMM會把該線程對應的本地內存置為無效。線程接下來將從主內存中讀取共享變量。
當第二個操作是volatile寫時,不管第一個操作是什么,都不能重排序。這個規則確保volatile寫之前的操作不會被編譯器重排序到volatile寫之后。
當第一個操作是volatile讀時,不管第二個操作是什么,都不能重排序。這個規則確保volatile讀之后的操作不會被編譯器重排序到volatile讀之前。
當第一個操作是volatile寫,第二個操作是volatile讀時,不能重排序。
鎖的內存語義
眾所周知,鎖可以讓臨界區互斥執行;但鎖有一個同樣重要,但常常被忽視的功能:鎖的內存語義。
- 當線程釋放鎖時,JMM會把該線程對應的本地內存中的共享變量刷新到主內存中
- 當線程獲取鎖時,JMM會把該線程對應的本地內存置為無效。從而使得被監視器保護的臨界區代碼必須要從主內存中去讀取共享變量
對比鎖釋放-獲取的內存語義與volatile寫-讀的內存語義,可以看出:鎖釋放與volatile寫有相同的內存語義;鎖獲取與volatile讀有相同的內存語義。
final的內存語義
- JMM禁止編譯器把final域的寫重排序到構造函數之外(對普通域的寫可能被重排序到構造函數之外!)
- 在一個線程中,初次讀對象引用與初次讀該對象包含的final域,JMM禁止處理器重排序這兩個操作(這兩個操作之間存在間接依賴,大多數處理器會遵守間接依賴,不會重排序這兩個操作,但有少數處理器不遵守間接依賴關系,這個規則就是專門用來針對這種處理器的)
如果final域是引用類型:
public class FinalReferenceExample {final int[] intArray; //final是引用類型static FinalReferenceExample obj;public FinalReferenceExample () { //構造函數intArray = new int[1]; //1intArray[0] = 1; //2}public static void writerOne () { //寫線程A執行obj = new FinalReferenceExample (); //3}... }這里final域為一個引用類型,它引用一個int型的數組對象。對于引用類型,寫final域的重排序規則對編譯器和處理器增加了如下約束:
在構造函數內對一個final引用的對象的成員域的寫入,與隨后在構造函數外把這個被構造對象的引用賦值給一個引用變量,這兩個操作之間不能重排序。
在上圖中,1是對final域的寫入,2是對這個final域引用的對象的成員域的寫入,3是把被構造的對象的引用賦值給某個引用變量。這里除了前面提到的1不能和3重排序外,2和3也不能重排序。
為什么final引用不能從構造函數內“逸出”
前面我們提到過,寫final域的重排序規則可以確保:在引用變量為任意線程可見之前,該引用變量指向的對象的final域已經在構造函數中被正確初始化過了(構造函數完成,對象引用才會產生)。其實要得到這個效果,還需要一個保證:在構造函數內部,不能讓這個被構造對象的引用為其他線程可見,也就是對象引用不能在構造函數中“逸出”。為了說明問題,讓我們來看下面示例代碼:
public class FinalReferenceEscapeExample {final int i;static FinalReferenceEscapeExample obj;public FinalReferenceEscapeExample () {i = 1; //1 寫final域obj = this; //2 this引用在此“逸出”}public static void writer() {new FinalReferenceEscapeExample ();}public static void reader {if (obj != null) { //3int temp = obj.i; //4}} }這里1和2可能會發生重排序,導致final域在被正確初始化之前對象引用就暴露了,從而在線程B的reader中訪問到未初始化的final域。
JSR-133為什么要增強final的語義
在舊的Java內存模型中 ,最嚴重的一個缺陷就是線程可能看到final域的值會改變。比如,一個線程當前看到一個整形final域的值為0(還未初始化之前的默認值),過一段時間之后這個線程再去讀這個final域的值時,卻發現值變為了1(被某個線程初始化之后的值)。最常見的例子就是在舊的Java內存模型中,String的值可能會改變。
為了修補這個漏洞,JSR-133專家組增強了final的語義。通過為final域增加寫和讀重排序規則,可以為java程序員提供初始化安全保證:只要對象是正確構造的(被構造對象的引用在構造函數中沒有“逸出”),那么不需要使用同步(指lock和volatile的使用),就可以保證任意線程都能看到這個final域在構造函數中被初始化之后的值。
雙重檢查鎖定與延遲初始化
延遲初始化:推遲一些高開銷的對象初始化操作,并且只有在使用這些對象時才進行初始化。
private static Instance instance; public synchronized static Instance getInstance() {if (instance == null) {instance = new Instance();}return instance; }上面的方法雖然線程安全,但用synchronized將導致性能開銷。
一個“聰明”的技巧:雙重檢查鎖定:
public class DoubleCheckLocking {private static Instance instance;public static Instance getInstance() {if (instance == null) {synchronized(DoubleCheckLocking.class) {if (instance == null) {instance = new Instance(); // 問題的根源出在這里}}}return instance;} }創建對象的過程instance = new Instance()可以分解為以下三步:
其中,2和3可能會被重排序!重排序之后變成了:分配對象內存空間,返回對象地址,初始化對象;(在單線程內,只要保證2排在4的前面執行,單線程內的執行結果就不會被改變,這個重排序就是被允許的)
在多線程環境下,假設2和3發生重排序,那么一個未初始化的對象引用將從同步塊中“溢出”,另一個線程可能會通過instance訪問到這個未初始化的對象!
解決方案:
1,利用volatile的內存語義來禁止重排序
private volatile static Instance instance;根據volatile寫的內存語義:volatile寫之前的操作禁止被重排序到volatile寫之后。這樣上面2和3之間的重排序將會被禁止,問題根源得到解決。
2,利用類初始化的原子性
在執行類的初始化期間,JVM會去獲取一個鎖。這個鎖可以同步多個線程對同一個類的初始化。
public class InstanceFactory {private static class InstanceHolder {public static Instance instance = new Instance();}public static Instance getInstance() {return InstanceHolder.instance ; // 這里將導致 InstanceHolder 類被初始化} }Java并發編程基礎
設置線程優先級時,針對頻繁阻塞(休眠或IO操作)的線程需要設置較高的優先級,而偏重計算的線程則設置較低的優先級,確保處理器不會被獨占。
線程狀態變遷
可參考 鏈接
疑惑:貌似可以從等待態直接回到就緒/運行態,WHY / HOW?
另,書上一句話:
阻塞狀態是線程阻塞在進入 synchronized 同步代碼塊或方法(獲取鎖)時的狀態,但是阻塞在 java.concurrent 包中 Lock 接口的線程狀態卻是等待狀態,因為 java.concurrent 包中的 Lock 接口對于阻塞的實現均使用了 LockSupport 類中的相關方法。
中斷
中斷可以理解為線程的一個標識位屬性,它表示一個線程是否被其他線程進行了中斷操作。
調用一個線程對象的interrupt()方法,只是將該線程的中斷標識位設為true,并不是真的“中斷“了該線程。這個地方很容易迷惑人。
一個被中斷的線程(被調用了interrupt()方法)如何響應中斷完全取決于該線程本身。
線程有兩種方法來判斷自己是否被中斷:
Object.wait(),Thread.sleep(),Thread.join()等方法均聲明拋出InterruptedException異常,說明這些方法是可中斷的——這些方法在執行時會不斷輪詢監聽中斷標識位,當發現其為true時,會恢復中斷標識位(即設為false),并拋出InterruptedException異常。
進入synchronized塊和Lock.lock()等操作是不可被中斷的(不拋出中斷異常)。
安全地終止線程
輪詢中斷標識位,或另設一個標志:
public class Runner implements Runnable {private volatile boolean on = true;private long i;@Overridepublic void run() {while (on && !Thread.currentThread().isInterrupted()) {i++;}System.out.println("Count i = " + i);}public void cancel() {on = false;} }Runner one = new Runner(); Thread t1 = new Thread(one); t1.start(); ... t1.interrupt();Runner two = new Runner(); new Thread(two).start(); ... two.cancel();等待/通知機制
等待/通知的經典范式:
synchronized(obj) {while(條件不滿足) {obj.wait();}處理邏輯; }synchronized(obj) {改變條件;obj.notifyAll(); }在while循環中判斷條件并調用wait()是使用wait()的唯一正確方式——這樣能保證線程在睡眠前后都會檢查條件。
wait()返回的前提是當前線程獲得鎖;返回后從wait()處繼續執行。
注意一點:wait()會使當前對象釋放鎖,notify() 和 notifyAll() 不會!
synchronized(obj) {if (條件不滿足) {obj.wait();}處理邏輯; }用 if 為什么錯了呢?
wait()的線程被其他線程用notify()或notifyAll()喚醒后,是需要先獲得鎖的(畢竟你是在synchronized塊里);如果在被喚醒到獲得鎖的這段時間內,條件又被另一個線程改變了,而你獲得鎖并從wait()方法返回后,直接跳出了 if 的條件判斷——這時條件是不滿足的,于是產生了邏輯錯誤。所以,線程在睡眠前后都需要檢查條件。
狀態轉換圖
線程調用wait()方法釋放鎖,進入等待隊列,等待狀態(WAITING);被notify()/notifyAll()喚醒后,進入同步隊列,變為阻塞狀態(BLOCKING);隨后可再次獲得鎖并從wait()返回繼續執行。
管道輸入/輸出流
4種實現:PipedOutputStream, PipedInputStream, PipedReader, PipedWriter
PipedWriter out = new PipedWriter(); PipedReader in = new PipedReader(); out.connect(in); // 將輸入流和輸出流進行連接,否則在使用時會拋出IOException;ThreadLocal
在main線程中定義一個ThreadLocal對象,在各個線程中訪問時,訪問到的是各個線程獨立的版本——并且是獨立初始化的ThreadLocal對象。
默認情況下 initValue() 返回 null 。線程在沒有調用 set 之前,第一次調用 get 的時候, get 方法會默認去調用 initValue 這個方法。所以如果沒有覆寫這個方法,可能導致 get 返回的是 null 。當然如果調用過 set 就不會有這種情況了。但是往往在多線程情況下我們不能保證每個線程的在調用 get 之前都調用了 set ,所以最好對 initValue 進行覆寫,以免導致空指針異常。
public class ConcurrentProgramming {public static ThreadLocal<Integer> threadLocalInt = new ThreadLocal<Integer>() {@Overrideprotected Integer initialValue() {return 0;}};// public static ThreadLocal<Integer> threadLocalInt = new ThreadLocal<>();public static void main(String[] args) throws InterruptedException {// threadLocalInt.set(0);// System.out.println(threadLocalInt.get()); // 這里可以正常輸出,因為在當前main線程中是先set,再get;for (int i = 0; i < 2; i++) {new Thread(new Worker()).start();}} }class Worker implements Runnable {@Overridepublic void run() {for (int i = 0; i < 5; i++) {// 但在這里就報空指針錯了———— 所以,并不是共享的同一個ThreadLocal對象,而是每個線程new一個,對嗎?ConcurrentProgramming.threadLocalInt.set(ConcurrentProgramming.threadLocalInt.get() + 1);System.out.println(Thread.currentThread().getName() + ": " + ConcurrentProgramming.threadLocalInt.get());}} }output: Thread-0: 1 Thread-1: 1 Thread-1: 2 Thread-1: 3 Thread-1: 4 Thread-0: 2 Thread-0: 3 Thread-0: 4 Thread-0: 5 Thread-1: 5注意代碼中的注釋部分。沒有重寫initialValue()時,在main中set(0)然后get,沒有問題;但在另外兩個線程中的get卻報空指針異常——說明在main中set的值只在main線程中可見。
This class provides thread-local variables. These variables differ from their normal counterparts in that each thread that accesses one (via its get or set method) has its own, independently initialized copy of the variable.
—— 每個線程有自己的、獨立初始化的變量拷貝。
所以,每個線程會獨自new一個Threadlocal對象,只是共用了同一個變量名,或你寫的ThreadLocal匿名內部類。
等待超時模式
開發人員經常會遇到這樣的方法調用場景:調用一個方法時等待一段時間,如果該方法在給定的時間段內能夠得到結果,那么將立刻返回;反之,超時返回默認結果。
實現方式:在經典的等待/通知模型的加鎖、條件循環、邏輯處理的基礎上作出非常小的改動:
public synchronized Object get(long mills) throws InterruptedException {long future = System.currentTimeMillis() + mills;long remaining = mills;while ((result == null) && remaining > 0) {wait(remaining);remaining = future - System.currentTimeMillis();}return result; }(數據庫連接池示例、線程池示例 未)
Java中的鎖
Lock接口
- void lock() 獲取鎖,調用該方法當前線程將會獲取鎖,當鎖獲取后,該方法將返回。
- void lockInterruptibly() throws InterruptedException 可中斷獲取鎖,與lock()方法不同之處在于該方法會響應中斷,即在鎖的獲取過程中可以中斷當前線程
- boolean tryLock() 嘗試非阻塞的獲取鎖,調用該方法立即返回,true表示獲取到鎖
- boolean tryLock(long time,TimeUnit unit) throws InterruptedException 超時獲取鎖,以下情況會返回:時間內獲取到了鎖,時間內被中斷,時間到了沒有獲取到鎖。
- void unlock() 釋放鎖
- Condition newCondition() 獲取等待通知組件
隊列同步器
隊列同步器AbstractQueuedSynchronizer(AQS)是用來構建鎖或者其他同步組件的基礎框架,它使用了一個int成員變量表示同步狀態,通過內置的FIFO隊列來完成資源獲取線程的排隊工作。下圖顯示了java.concurrent包的實現示意圖:
隊列同步器的實現依賴內部的同步隊列來完成同步狀態的管理。它是一個FIFO的雙向隊列,當線程獲取同步狀態失敗時,同步器會將當前線程和等待狀態等信息包裝成一個節點并將其加入同步隊列,同時會阻塞當前線程。當同步狀態釋放時,會把首節點中的線程喚醒,使其再次嘗試獲取同步狀態。
共享式同步狀態獲取與釋放
共享式獲取與獨占式獲取最主要的區別在于同一時刻能否有多個線程同時獲取到同步狀態。以文件的讀寫為例,如果一個程序在對文件進行讀操作,那么這一時刻對于該文件的寫操作均被阻塞,而讀操作能夠同時進行。寫操作要求對資源的獨占式訪問,而讀操作可以是共享式訪問。
左半部分,共享式訪問資源時,其他共享式的訪問均被允許,而獨占式訪問被阻塞;右半部分是獨占式訪問資源時,同一時刻其他訪問均被阻塞。
重入鎖 ReentrantLock
重入鎖 ReentrantLock,顧名思義,就是支持重進入的鎖,它表示該鎖能夠支持一個線程對資源的重復加鎖。除此之外,該鎖的還支持獲取鎖時的公平和非公平性選擇。
對于獨占鎖(Mutex),考慮如下場景:當一個線程調用Mutex的lock()方法獲取鎖之后,如果再次調用lock()方法,則該線程將會被自己所阻塞,原因是Mutex在實現tryAcquire(int acquires)方法時沒有考慮占有鎖的線程再次獲取鎖的場景,而在調用tryAcquire(int acquires)方法時返回了false,導致該線程被阻塞。簡單地說,Mutex是一個不支持重進入的鎖。
synchronized關鍵字隱式的支持重進入,比如一個synchronized修飾的遞歸方法,在方法執行時,執行線程在獲取了鎖之后仍能連續多次地獲得該鎖,而不像Mutex由于獲取了鎖,而在下一次獲取鎖時出現阻塞自己的情況。
ReentrantLock雖然沒能像synchronized關鍵字一樣支持隱式的重進入,但是在調用lock()方法時,已經獲取到鎖的線程,能夠再次調用lock()方法獲取鎖而不被阻塞。
鎖獲取的公平性問題
公平性與否是針對獲取鎖而言的,如果一個鎖是公平的,那么鎖的獲取順序就應該和鎖的請求順序一致,也就是FIFO。
非公平性鎖可能使線程“饑餓”,當一個線程請求鎖時,只要獲取了同步狀態即成功獲取鎖。在這個前提下,剛釋放鎖的線程再次獲取同步狀態的幾率會非常大,使得其他線程只能在同步隊列中等待。
非公平鎖可能使線程“饑餓”,為什么它又被設定成默認的實現呢?非公平性鎖模式下線程上下文切換的次數少,因此其性能開銷更小。公平性鎖保證了鎖的獲取按照FIFO原則,而代價是進行大量的線程切換。非公平性鎖雖然可能造成線程“饑餓”,但極少的線程切換,保證了其更大的吞吐量。
讀寫鎖
在Java并發包中常用的鎖(如ReentrantLock),基本上都是排他鎖,這些鎖在同一時刻只允許一個線程進行訪問,而讀寫鎖在同一時刻可以允許多個讀線程訪問,但是在寫線程訪問時,所有的讀線程和其他寫線程均被阻塞。讀寫鎖維護了一對鎖,一個讀鎖和一個寫鎖,通過分離讀鎖和寫鎖,使得并發性相比一般的排他鎖有了很大提升。
除了保證寫操作對讀操作的可見性以及并發性的提升之外,讀寫鎖能夠簡化讀寫交互場景的編程方式。假設在程序中定義一個共享的數據結構用作緩存,它大部分時間提供讀服務(例如:查詢和搜索),而寫操作占有的時間很少,但是寫操作完成之后的更新需要對后續的讀服務可見。
在沒有讀寫鎖支持的(Java 5 之前)時候,如果需要完成上述工作就要使用Java的等待通知機制,就是當寫操作開始時,所有晚于寫操作的讀操作均會進入等待狀態,只有寫操作完成并進行通知之后,所有等待的讀操作才能繼續執行(寫操作之間依靠synchronized關鍵字進行同步),這樣做的目的是使讀操作都能讀取到正確的數據,而不會出現臟讀。
改用讀寫鎖實現上述功能,只需要在讀操作時獲取讀鎖,而寫操作時獲取寫鎖即可,當寫鎖被獲取到時,后續(非當前寫操作線程)的讀寫操作都會被阻塞,寫鎖釋放之后,所有操作繼續執行,編程方式相對于使用等待通知機制的實現方式而言,變得簡單明了。
一般情況下,讀寫鎖的性能都會比排它鎖要好,因為大多數場景讀是多于寫的。在讀多于寫的情況下,讀寫鎖能夠提供比排它鎖更好的并發性和吞吐量。Java并發包提供讀寫鎖的實現是ReentrantReadWriteLock。
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); Lock r = rwl.readLock(); Lock w = rwl.writeLock();Condition接口
任何一個Java對象,都擁有一組監視器方法,主要包括wait()、notify()、notifyAll()方法,這些方法與synchronized關鍵字配合使用可以實現等待/通知模式。Condition接口也提供類似的Object的監視器的方法,主要包括await()、signal()、signalAll()方法,這些方法與Lock鎖配合使用也可以實現等待/通知模式。
相比Object實現的監視器方法,Condition接口的監視器方法具有一些Object所沒有的特性:
- Condition接口可以支持多個等待隊列:一個Lock實例可以綁定多個Condition。
- Condition接口支持在等待時不響應中斷:wait()是會響應中斷的;
- Condition接口支持等待到將來的某個時間點返回(和awaitNanos(long)/wait(long)不同!):awaitUntil(Date deadline);
上面用了兩個Condition。(是不是很熟悉?王道,信號量,線程間同步)
等待隊列與同步隊列
在Object的監視器模型上,一個對象擁有一個同步隊列和一個等待隊列,而并發包中的Lock(更確切的說是同步器)可以擁有一個同步隊列和多個等待多列。
Java并發容器和框架
ConcurrentHashMap
在并發環境下,HashMap的put操作會引起死循環。因為多線程會導致HashMap的Entry鏈表形成環形數據結構,使得Entry的next節點永遠不為空。
HashTable容器使用synchronized來保證線程安全,但在線程競爭激烈的情況下HashTable的效率非常低下。因為當一個線程訪問HashTable的同步方法時,其他線程訪問HashTable的同步方法時,可能會進入阻塞或輪詢狀態。如線程1使用put進行添加元素,線程2不但不能使用put方法添加元素,并且也不能使用get方法來獲取元素,所以競爭越激烈效率越低。
ConcurrentHashMap的鎖分段技術
HashTable容器在競爭激烈的并發環境下表現出效率低下的原因,是因為所有訪問HashTable的線程都必須競爭同一把鎖,那假如容器里有多把鎖,每一把鎖用于鎖容器其中一部分數據,那么當多線程訪問容器里不同數據段的數據時,線程間就不會存在鎖競爭,從而可以有效的提高并發訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術,首先將數據分成一段一段的存儲,然后給每一段數據配一把鎖,當一個線程占用鎖訪問其中一個段數據的時候,其他段的數據也能被其他線程訪問。
ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成。Segment是一種可重入鎖ReentrantLock,在ConcurrentHashMap里扮演鎖的角色,HashEntry則用于存儲鍵值對數據。
ConcurrentHashMap的get操作
Segment的get操作實現非常簡單和高效。先經過一次再哈希,然后使用這個哈希值通過哈希運算定位到segment,再通過哈希算法定位到元素,代碼如下:(兩次哈希)
public V get(Object key) {int hash = hash(key.hashCode());return segmentFor(hash).get(key, hash); }ConcurrentHashMap的Put操作
由于put方法里需要對共享變量進行寫入操作,所以為了線程安全,在操作共享變量時必須得加鎖。Put方法首先定位到Segment,然后在Segment里進行插入操作。插入操作需要經歷兩個步驟,第一步判斷是否需要對Segment里的HashEntry數組進行擴容,第二步定位添加元素的位置然后放在HashEntry數組里。(擴容的時候首先會創建一個兩倍于原容量的數組,然后將原數組里的元素進行再hash后插入到新的數組里。為了高效ConcurrentHashMap不會對整個容器進行擴容,而只對某個segment進行擴容)
ConcurrentHashMap的size操作
如果我們要統計整個ConcurrentHashMap里元素的大小,就必須統計所有Segment里元素的大小后求和。Segment里的全局變量count是一個volatile變量,那么在多線程場景下,我們是不是直接把所有Segment的count相加就可以得到整個ConcurrentHashMap大小了呢?不是的,雖然相加時可以獲取每個Segment的count的最新值,但是拿到之后可能累加前使用的count發生了變化,那么統計結果就不準了。所以最安全的做法,是在統計size的時候把所有Segment的put,remove和clean方法全部鎖住,但是這種做法顯然非常低效。
因為在累加count操作過程中,之前累加過的count發生變化的幾率非常小,所以ConcurrentHashMap的做法是先嘗試2次通過不鎖住Segment的方式來統計各個Segment大小,如果統計的過程中,容器的count發生了變化,則再采用加鎖的方式來統計所有Segment的大小。
并發隊列:ConcurrentLinkedQueue
用非阻塞的循環CAS方式實現。
Java中的阻塞隊列
阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變為非空。當隊列滿時,存儲元素的線程會等待隊列可用。
插入和移除操作的四種處理方式
- 拋出異常:是指當阻塞隊列滿時候,再往隊列里插入元素,會拋出IllegalStateException(“Queue full”)異常。當隊列為空時,從隊列里獲取元素時會拋出NoSuchElementException異常 。
- 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列里拿出一個元素,如果沒有則返回null
- 一直阻塞:當阻塞隊列滿時,如果生產者線程往隊列里put元素,隊列會一直阻塞生產者線程,直到拿到數據,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列里take元素,隊列也會阻塞消費者線程,直到隊列可用。
- 超時退出:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,如果超過一定的時間,生產者線程就會退出。
Java里的阻塞隊列
- ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。
- LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。
- PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。
- DelayQueue:一個使用優先級隊列實現的無界阻塞隊列;支持延時獲取元素——在創建元素時可以指定多久才能從隊列中取出當前元素;
- SynchronousQueue:一個不存儲元素的阻塞隊列——每一個put操作必須等待一個take操作;
- LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
- LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。
阻塞隊列的實現原理,見前面BoundedBuffer的代碼。(一個隊列,一個鎖,兩個Condition:notFull,notEmpty,等待通知模型)
Fork/Join框架
與MapReduce一致的思想。
ForkJoinTask(抽象類):我們要使用ForkJoin框架,必須首先創建一個ForkJoin任務。它提供在任務中執行fork()和join()操作的機制。Fork/Join框架提供了以下兩個子類:
- RecursiveAction:用于沒有返回結果的任務。
- RecursiveTask :用于有返回結果的任務。
Fork/Join框架的實現原理
ForkJoinPool由ForkJoinTask數組和ForkJoinWorkerThread數組組成,ForkJoinTask數組負責存放程序提交給ForkJoinPool的任務,而ForkJoinWorkerThread數組負責執行這些任務。(類似于線程池的實現)
Java中的13個原子操作類
原子更新方式
- 原子更新基本類型
- 原子更新數組
- 原子更新引用
- 原子更新屬性(字段)
1,原子更新基本類型
- AtomicBoolean :原子更新布爾類型
- AtomicInteger: 原子更新整型
- AtomicLong: 原子更新長整型
2,原子更新數組
- AtomicIntegerArray :原子更新整型數組里的元素
- AtomicLongArray :原子更新長整型數組里的元素
- AtomicReferenceArray : 原子更新引用類型數組的元素
3,原子更新引用類型
- AtomicReference :原子更新引用類型
- AtomicReferenceFieldUpdater :原子更新引用類型里的字段
- AtomicMarkableReference:原子更新帶有標記位的引用類型。可以原子更新一個布爾類型的標記位和應用類型
4,原子更新字段類
- AtomicIntegerFieldUpdater:原子更新整型的字段的更新器
- AtomicLongFieldUpdater:原子更新長整型字段的更新器
- AtomicStampedReference:原子更新帶有版本號的引用類型。該類將整型數值與引用關聯起來,可用于原子的更新數據和數據的版本號,可以解決使用CAS進行原子更新時可能出現的ABA問題。
(恩,是個坑,需要踩)
Java中的并發工具類
CountDownLatch
(Latch:門閂)
用于等待其他線程完成操作。一個功能更強大的 join().
CountDownLatch c = new CountDownLatch(2); // 等待兩個[點]完成; ... c.countDown(); // 第一個等待的操作完成; ... c.countDown(); // 第二個等待的操作完成;... c.await(); // 等待兩個操作完成; ...CountDownLatch(N)等待N個點完成;這里說的N個點,可以是N個線程,也可以是一個線程里的N個執行步驟。
同步屏障:CyclicBarrier
讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會打開,所有被屏障攔截的線程才會繼續運行。
CyclicBarrier c = new CyclicBarrier(2); // 屏障會攔截/等待兩個線程;// 在第一個線程中; c.await(); // 當前線程(執行了某些操作后)到達屏障;// 在第二個線程中; c.await(); // 當前線程(執行了某些操作后)到達屏障;CyclicBarrier和CountDownLatch的區別
CountDownLatch的計數器只能用一次,而CyclicBarrier的計數器可以使用reset()方法重置。所以CyclicBarrier可以處理更復雜的業務場景。例如,如果計算發生錯誤,可以重置計數器,并讓線程重新執行一次。
控制并發線程數的Semaphore
信號量,用來控制同時訪問特定資源的線程數量。
Semaphore s = new Semaphore(10); Executor threadPool = Executors.newFixedThreadPool(30);for (int i = 0; i < 30; i++) {threadPool.execute(new Runnable() {@Overridepublic void run() {try {s.acquire();System.out.println("Save Date");s.release();} catch (InterruptedException e) {e.printStackTrace();}}}); }在代碼中,雖然有30個線程在執行,但只允許10個并發執行。
線程間交換數據的Exchanger
Exchanger用于進行線程間的數據交換。它提供一個同步點,在這個同步點,兩個線程可以交換彼此的數據。如果第一個線程先執行exchange()方法,它會一直等待第二個線程也執行exchange()方法,然后兩個線程交換數據。
Exchanger<String> exchanger = new Exchanger<>(); // 在線程A中; try {String B = exchanger.exchange("A's data"); } catch (InterruptedException e) {e.printStackTrace(); }// 在線程B中; try {String A = exchanger.exchange("B's data"); } catch (InterruptedException e) {e.printStackTrace(); }Java中的線程池
corePool
首先理解一個[corePool 核心池]的概念:核心池是一個線程池的基本/平均能力保障。在線程池的使用初期,隨著任務的提交,線程池會先盡快填滿核心池——提交一個任務就創建一個線程,即使核心池中有空閑的線程。如果線程池有溫度的話,核心池就是線程池的“常溫”。
線程池的創建
我們可以通過ThreadPoolExecutor來創建一個線程池。
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, threadFactory,handler);- corePoolSize(線程池的基本大小):當提交一個任務到線程池時,線程池會創建一個線程來執行任務,即使其他空閑的基本線程能夠執行新任務也會創建線程,等到需要執行的任務數大于線程池基本大小時就不再創建。如果調用了線程池的prestartAllCoreThreads方法,線程池會提前創建并啟動所有基本線程。
- runnableTaskQueue(任務隊列):用于保存等待執行的任務的阻塞隊列。可以選擇以下幾個阻塞隊列。
- ArrayBlockingQueue:是一個基于數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
- LinkedBlockingQueue:一個基于鏈表結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。
- SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作(offer())必須等到另一個線程調用移除操作(poll()),否則插入操作一直處于阻塞狀態,吞吐量通常要高于LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。
- PriorityBlockingQueue:一個具有優先級得無限阻塞隊列。
maximumPoolSize(線程池最大大小):線程池允許創建的最大線程數。如果隊列滿了,并且已創建的線程數小于最大線程數,則線程池會再創建新的線程執行任務。值得注意的是如果使用了無界的任務隊列這個參數就沒什么效果。
- ThreadFactory:用于設置創建線程的工廠,可以通過線程工廠給每個創建出來的線程設置更有意義的名字,Debug和定位問題時非常又幫助。
- RejectedExecutionHandler(飽和策略):當隊列和線程池都滿了,說明線程池處于飽和狀態,那么必須采取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy,表示無法處理新任務時拋出異常。以下是JDK1.5提供的四種策略。
- AbortPolicy:直接拋出異常。
- CallerRunsPolicy:只用調用者所在線程來運行任務。
- DiscardOldestPolicy:丟棄隊列里最近的一個任務,并執行當前任務。
- DiscardPolicy:不處理,丟棄掉。
當然也可以根據應用場景需要來實現RejectedExecutionHandler接口自定義策略。如記錄日志或持久化不能處理的任務。
- keepAliveTime(線程活動保持時間):線程池的工作線程空閑后,保持存活的時間。所以如果任務很多,并且每個任務執行的時間比較短,可以調大這個時間,提高線程的利用率。
- TimeUnit(線程活動保持時間的單位):可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
提交任務
void execute(Runnable command) // 沒有返回值; <T> Future<T> submit(Callable<T> task) // 有返回值的任務;關閉線程池
我們可以通過調用線程池的shutdown或shutdownNow方法來關閉線程池,但是它們的實現原理不同,shutdown的原理是只是將線程池的狀態設置成SHUTDOWN狀態,然后中斷所有沒有正在執行任務的線程。shutdownNow的原理是遍歷線程池中的工作線程,然后逐個調用線程的interrupt方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止。shutdownNow會首先將線程池的狀態設置成STOP,然后嘗試停止所有的正在執行或暫停任務的線程,并返回等待執行任務的列表。
只要調用了這兩個關閉方法的其中一個,isShutdown方法就會返回true。當所有的任務都已關閉后,才表示線程池關閉成功,這時調用isTerminaed方法會返回true。至于我們應該調用哪一種方法來關閉線程池,應該由提交到線程池的任務特性決定,通常調用shutdown來關閉線程池,如果任務不一定要執行完,則可以調用shutdownNow。
合理的配置線程池
要想合理的配置線程池,就必須首先分析任務特性,可以從以下幾個角度來進行分析:
任務性質不同的任務可以用不同規模的線程池分開處理。CPU密集型任務配置盡可能少的線程數量,如配置Ncpu+1個線程的線程池。IO密集型任務則由于需要等待IO操作,線程并不是一直在執行任務,則配置盡可能多的線程,如2*Ncpu。混合型的任務,如果可以拆分,則將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那么分解后執行的吞吐率要高于串行執行的吞吐率,如果這兩個任務執行時間相差太大,則沒必要進行分解。我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當前設備的CPU個數。
優先級不同的任務可以使用優先級隊列PriorityBlockingQueue來處理。它可以讓優先級高的任務先得到執行,需要注意的是如果一直有優先級高的任務提交到隊列里,那么優先級低的任務可能永遠不能執行。
執行時間不同的任務可以交給不同規模的線程池來處理,或者也可以使用優先級隊列,讓執行時間短的任務先執行。
依賴數據庫連接池的任務,因為線程提交SQL后需要等待數據庫返回結果,如果等待的時間越長CPU空閑時間就越長,那么線程數應該設置越大,這樣才能更好的利用CPU。
建議使用有界隊列,有界隊列能增加系統的穩定性和預警能力,可以根據需要設大一點,比如幾千。有一次我們組使用的后臺任務線程池的隊列和線程池全滿了,不斷的拋出拋棄任務的異常,通過排查發現是數據庫出現了問題,導致執行SQL變得非常緩慢,因為后臺任務線程池里的任務全是需要向數據庫查詢和插入數據的,所以導致線程池里的工作線程全部阻塞住,任務積壓在線程池里。如果當時我們設置成無界隊列,線程池的隊列就會越來越多,有可能會撐滿內存,導致整個系統不可用,而不只是后臺任務出現問題。當然我們的系統所有的任務是用的單獨的服務器部署的,而我們使用不同規模的線程池跑不同類型的任務,但是出現這樣問題時也會影響到其他任務。
線程池的監控
通過線程池提供的參數進行監控。線程池里有一些屬性在監控線程池的時候可以使用
- taskCount:線程池需要執行的任務數量。
- completedTaskCount:線程池在運行過程中已完成的任務數量。小于或等于taskCount。
- largestPoolSize:線程池曾經創建過的最大線程數量。通過這個數據可以知道線程池是否滿過。如等于線程池的最大大小,則表示線程池曾經滿了。
- getPoolSize:線程池的線程數量。如果線程池不銷毀的話,池里的線程不會自動銷毀,所以這個大小只增不減。
- getActiveCount:獲取活動的線程數。
通過擴展線程池進行監控。通過繼承線程池并重寫線程池的beforeExecute,afterExecute和terminated方法,我們可以在任務執行前,執行后和線程池關閉前干一些事情。如監控任務的平均執行時間,最大執行時間和最小執行時間等。這幾個方法在線程池里是空方法。
Executor框架
Executor框架的結構和成員
Executor框架主要由3大部分組成如下:
ThreadPoolExecutor
ThreadPoolExecutor通常由工廠類Executors來創建。Executors可以創建3種類型的ThreadPoolExecutor:SingleThreadExecutor,FixedThreadPool,CachedThreadPool;
FixedThreadPool是使用固定線程數的線程池,Executors提供的API有如下兩個:
FixedThreadPool滿足了資源管理的需求,可以限制當前線程數量。適用于負載較重的服務器環境。
SingleThreadExecutor使用單線程執行任務,Executors提供的API有如下兩個:
SingleThreadExecutor保證了任務執行的順序,不會存在多線程活動。
CachedThreadPool是無界線程池,Executors提供的API有如下兩個:
CachedThreadPool適用于執行很多短期異步任務的小程序,適用于負載較輕的服務器。
ScheduledThreadPoolExecutor
它是ThreadPoolExecutor的子類且實現了ScheduledExecutorService接口,它可以在給定的延遲時間后執行命令,或者定期執行命令,它比Timer更強大更靈活。
Executors可以創建的ScheduledThreadPoolExecutor的類型有ScheduledThreadPoolExecutor和SingleThreadScheduledExecutor等
ScheduledThreadPoolExecutor具有固定線程個數,適用于需要多個后臺線程執行周期任務,并且為了滿足資源管理需求而限制后臺線程數量的場景,Executors中提供的API有如下兩個:
SingleThreadScheduledExecutor具有單個線程,Executors提供的創建API有如下兩個:
它適用于單個后臺線程執行周期任務,并且保證順序一致執行的場景。
ScheduledThreadPoolExecutor
在給定延遲之后執行任務,或者定期執行任務。ScheduledThreadPoolExecutor的功能與Timer類似,但更強大、更靈活。Timer對應的是單個后臺線程,而ScheduledThreadPoolExecutor可以在構造函數中指定多個對應的后臺線程數。
P70421-224830(1).jpg
ScheduledThreadPoolExecutor中線程執行某個周期任務的4個步驟:
步驟1:線程1從工作隊列DelayQueue中獲取已到期的task;
步驟2:線程1執行該task;
步驟3:線程1修改ScheduledFutureTask的time變量為下次被執行的時間;
步驟4:線程1將修改后的task重新放回DelayQueue中。
FutureTask類
Runnable接口:
@FunctionalInterface public interface Runnable {public abstract void run(); }Callable接口(可以有返回值,可以拋出異常):
@FunctionalInterface public interface Callable<V> {V call() throws Exception; }Future接口:
public interface Future<V> {boolean cancel(boolean mayInterruptIfRunning);boolean isCancelled();boolean isDone();V get() throws InterruptedException, ExecutionException;V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException; }FutureTask類的構造方法:
public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW; // ensure visibility of callable }ExecutorService的3個submit()方法都返回Future:
<T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); // 執行成功返回指定的值result; Future<?> submit(Runnable task); // 線程執行成功返回null;Callable和Future的普通用法:
Callable<Integer> callable = new Callable<Integer>() {public Integer call() throws Exception {return new Random().nextInt(100);} }; FutureTask<Integer> future = new FutureTask<Integer>(callable); new Thread(future).start(); int result = future.get();Executors
?
作者:xiaogmail
鏈接:https://www.jianshu.com/p/8d90dc5b341e
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權并注明出處。
總結
以上是生活随笔為你收集整理的多线程知识梳理(2) - 并发编程的艺术笔记的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 存款被银行员工转走,客户告银行要求赔偿,
- 下一篇: 【转】4.3SharePoint服务器端