面试:你说你精通Java并发,给我讲讲Java并发之J.U.C
轉載自?面試:你說你精通Java并發,給我講講Java并發之J.U.C
J.U.C
J.U.C即java.util.concurrent包,為我們提供了很多高性能的并發類,可以說是java并發的核心。
J.U.C和CAS和Unsafe和AQS
Concurrent包下所有類底層都是依靠CAS操作來實現,而sun.misc.Unsafe為我們提供了一系列的CAS操作。
AQS框架是J.U.C中實現鎖及同步機制的基礎,其底層是通過調用 LockSupport .unpark()和 LockSupport .park()實現線程的阻塞和喚醒。
J.U.C框架
J.U.C的整個框架分為5個部分:tools、locks、collections、executor和atomic。
Atomic
該包下主要是一些原子變量類,僅依賴于Unsafe,并且被其他模塊所依賴。
Locks
該包下主要是關于鎖及其相關類,僅依賴于Unsafe或內部依賴,并且被其他高級模塊所依賴。由于LockSupport類底層邏輯簡單且僅依賴Unsafe,同時為其他高級模塊所依賴,所以需要先了解LockSupport類的運行原理,然后重點研究AbstractQueuedSynchronizer框架,理解獨占鎖和共享鎖的實現原理,并清楚Condition如何與AbstractQueuedSynchronizer進行協作,最后很容易就能理解ReentrantLock是如何實現的。
Collections
該包會依賴Unsafe和前兩個基礎模塊,并且模塊內部各個容器間相互較為獨立,所以沒有固定的學習順序,理解編程中常用的集合類原理即可:ConcurrentHashMap、CopyOnWriteArrayList、CopyOnWriteArraySet、ArrayBlockingQueue、LinkedBlockingQueue(阻塞隊列在線程池中有使用,所以理解常用阻塞隊列的特性很重要)。
Executor
這一部分的核心是線程池的運行原理,也是實際應用中較多的部分,會依賴于前幾個模塊。首先了解Callable、Future、RunnableFuture三個接口間的關系以及FutureTask的實現原理,然后研究如何創建ThreadPoolExecutor,如何運行一個任務,如何管理自身的線程,同時了解RejectedExecutionHandler的四種實現差異,最后,在實際應用中學習如何通過調整ThreadPoolExecutor的參數來優化線程池。
Tools
這一部分是以前面幾個模塊為基礎的高級特性模塊,實際應用的場景相對較少,主要應用在多線程間相互依賴執行結果場景,沒有具體的學習順序,最好CountDownLatch、CyclicBarrier、Semaphore、Exchanger、Executors都了解下,對后面學習Guava的框架有幫助。
參考:
J.U.C框架學習順序
CAS與sun.misc.Unsafe
Doug Lea并發編程文章全部譯文
J.U.C體系結構(java.util.concurrent)
JAVA并發編程J.U.C學習總結
J.U.C - AQS
可重入鎖
? ReentrantLock是可重入鎖,可重入鎖就是當前持有該鎖的線程能夠多次獲取該鎖,無需等待??芍厝腈i是如何實現的呢?這要從ReentrantLock的一個內部類Sync的父類說起,Sync的父類是AbstractQueuedSynchronizer(AQS,抽象隊列同步器)。
AQS
? AQS是JDK1.5提供的一個基于FIFO等待隊列實現的一個用于實現同步器的基礎框架,這個基礎框架的重要性可以這么說,JCU包里面幾乎所有的有關鎖、多線程并發以及線程同步器等重要組件的實現都是基于AQS這個框架。AQS的核心思想是基于 volatileintstate這樣的一個屬性同時配合Unsafe工具對其原子性的操作來實現對當前鎖的狀態進行修改。當state的值為0的時候,標識該Lock不被任何線程所占有。
ReentrantLock鎖的架構
? ReentrantLock的架構主要包括一個Sync的內部抽象類以及Sync抽象類的兩個實現類。他們的結構示意圖如下:
? 如上圖所示,AQS的父類AOS(AbstractOwnableSynchronizer)主要提供一個exclusiveOwnerThread屬性,用于關聯當前持有該鎖的線程。? 另外、Sync的兩個實現類分別是NonfairSync和FairSync,一個是用于實現公平鎖,一個是用于實現非公平鎖。那么Sync為什么要被設計成內部類呢?Sync被設計成為安全的外部不可訪問的內部類,使得ReentrantLock中所有涉及對AQS的訪問都要經過Sync,其實,Sync被設計成為內部類主要是為了安全性考慮,這也是作者在AQS的comments上強調的一點。
AQS框架
總體框架圖
? 如上圖所示和前面所述,AQS維護了一個volatile int state域和一個FIFO線程等待隊列(利用雙向鏈表實現,多線程爭用資源被阻塞時會進入此隊列)。
域和方法
? 主要的域如下:
private transient volatile Node head; //同步隊列的head節點private transient volatile Node tail; //同步隊列的tail節點private volatile int state; //同步狀態? AQS提供的可以修改同步狀態的3個方法:
protected final int getState(); //獲取同步狀態protected final void setState(int newState); //設置同步狀態protected final boolean compareAndSetState(int expect, int update); //CAS設置同步狀態? 這三種叫做均是原子操作,其中compareAndSetState的實現依賴于Unsafe的compareAndSwapInt()方法。代碼實現如下:
private volatile int state;protected final int getState() {return state;}protected final void setState(int newState) {state = newState;}protected final boolean compareAndSetState(int expect, int update) {// See below for intrinsics setup to support thisreturn unsafe.compareAndSwapInt(this, stateOffset, expect, update);}自定義資源共享方式
? AQS定義兩種資源共享方式:Exclusive(獨占,只有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch(CountDownLatch是并發的))。? 不同的自定義同步器爭用共享資源的方式也不同。自定義同步器在實現時只需要實現共享資源state的獲取與釋放方式即可,至于具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。自定義同步器實現時主要實現以下幾種方法:
-
isHeldExclusively():該線程是否正在獨占資源。只有用到condition才需要去實現它。
-
tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
-
tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
-
tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源。
-
tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續等待結點返回true,否則返回false。
? 以ReentrantLock為例,state初始化為0,表示未鎖定狀態。A線程lock()時,會調用tryAcquire()獨占該鎖并將state+1。此后,其他線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)為止,其它線程才有機會獲取該鎖。當然,釋放鎖之前,A線程自己是可以重復獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多么次,這樣才能保證state是能回到零態的。? 再以CountDownLatch以例,任務分為N個子線程去執行,state也初始化為N(注意N要與線程個數一致)。這N個子線程是并行執行的,每個子線程執行完后countDown()一次,state會CAS減1。等到所有子線程都執行完后(即state=0),會unpark()主調用線程,然后主調用線程就會從await()函數返回,繼續后余動作。? 一般來說,自定義同步器要么是獨占方法,要么是共享方式,他們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。但AQS也支持自定義同步器同時實現獨占和共享兩種方式,如ReentrantReadWriteLock。
源碼解析
1. acquire(int)
? acquire是一種以獨占方式獲取資源,如果獲取到資源,線程直接返回,否則進入等待隊列,直到獲取到資源為止,且整個過程忽略中斷的影響。該方法是獨占模式下線程獲取共享資源的頂層入口。獲取到資源后,線程就可以去執行其臨界區代碼了。下面是acquire()的源碼:
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}? 函數流程如下:
-
tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
-
addWaiter()將該線程加入等待隊列的尾部,并標記為獨占模式;
-
acquireQueued()使線程在等待隊列中獲取資源,一直獲取到資源后才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
-
如果線程在等待過程中被中斷過,它是不響應的。只是獲取資源后才再進行自我中斷selfInterrupt(),將中斷補上。
? 接下來介紹相關方法。
1.1 tryAcquire(int)
? tryAcquire嘗試以獨占的方式獲取資源,如果獲取成功,則直接返回true,否則直接返回false。該方法可以用于實現Lock中的tryLock()方法。該方法的默認實現是拋出UnsupportedOperationException,具體實現由自定義的擴展了AQS的同步類來實現。AQS在這里只負責定義了一個公共的方法框架。這里之所以沒有定義成abstract,是因為獨占模式下只用實現tryAcquire-tryRelease,而共享模式下只用實現tryAcquireShared-tryReleaseShared。如果都定義成abstract,那么每個模式也要去實現另一模式下的接口。
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}1.2 addWaiter(Node)
? 該方法用于將當前線程根據不同的模式(Node.EXCLUSIVE互斥模式、Node.SHARED共享模式)加入到等待隊列的隊尾,并返回當前線程所在的結點。如果隊列不為空,則以通過compareAndSetTail方法以CAS(CAS (compare and swap) 比較并交換,就是將內存值與預期值進行比較,如果相等才將新值替換到內存中,并返回true表示操作成功;如果不相等,則直接返回false表示操作失敗。)的方式將當前線程節點加入到等待隊列的末尾。否則,通過enq(node)方法初始化一個等待隊列,并返回當前節點。源碼如下:
private Node addWaiter(Node mode) {//以給定模式構造結點。mode有兩種:EXCLUSIVE(獨占)和SHARED(共享)Node node = new Node(Thread.currentThread(), mode);//嘗試快速方式直接放到隊尾。Node pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}//上一步失敗則通過enq入隊。enq(node);return node;}1.2.1 enq(node)
? enq(node)用于將當前節點插入等待隊列,如果隊列為空,則初始化當前隊列。整個過程以CAS自旋的方式進行,直到成功加入隊尾為止。源碼如下:
private Node enq(final Node node) {//CAS"自旋",直到成功加入隊尾for (; ; ) {Node t = tail;if (t == null) { // 隊列為空,創建一個空的標志結點作為head結點,并將tail也指向它。if (compareAndSetHead(new Node()))tail = head;} else {//正常流程,放入隊尾node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}1.3 acquireQueued(Node, int)
? 通過tryAcquire()和addWaiter(),該線程獲取資源失敗,已經被放入等待隊列尾部了,接下來就是等待隊列前面的線程依次出隊列,最后輪到自己被喚醒。acquireQueued(Node, int)函數的作用就是這個。? acquireQueued()用于隊列中的線程自旋地以獨占且不可中斷的方式獲取同步狀態(acquire),直到拿到鎖之后再返回。該方法的實現分成兩部分:如果當前節點已經成為頭結點,嘗試獲取鎖(tryAcquire)成功,然后返回;否則檢查當前節點是否應該被park(即進入waiting狀態),然后將該線程park并且檢查當前線程是否被可以被中斷。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true; /* 標記是否成功拿到資源 */try {boolean interrupted = false;/* 標記等待過程中是否被中斷過 *//* 又是一個“自旋”! */for (; ; ) {final Node p = node.predecessor(); /* 拿到前驅 *//* 如果前驅是head,即該結點已成老二,那么便有資格去嘗試獲取資源(可能是老大釋放完資源喚醒自己的,當然也可能被interrupt了)。 */if (p == head && tryAcquire(arg)) {setHead(node);/* 拿到資源后,將head指向該結點。所以head所指的標桿結點,就是當前獲取到資源的那個結點或null。 */p.next = null;/* setHead中node.prev已置為null,此處再將head.next置為null,就是為了方便GC回收以前的head結點。也就意味著之前拿完資源的結點出隊了! */failed = false;return (interrupted); /* 返回等待過程中是否被中斷過 */}/* 如果自己可以休息了,就進入waiting狀態,直到被unpark() */if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true; /* 如果等待過程中被中斷過,哪怕只有那么一次,就將interrupted標記為true */}} finally {if (failed)cancelAcquire(node);}}1.3.1 shouldParkAfterFailedAcquire(Node, Node)
? shouldParkAfterFailedAcquire方法通過對當前節點的前一個節點的狀態進行判斷,對當前節點做出不同的操作(進入waiting狀態或者繼續往前找)。
int ws = pred.waitStatus;//拿到前驅的狀態if (ws == Node.SIGNAL)//如果已經告訴前驅拿完號后通知自己一下,那就可以安心休息了return true;if (ws > 0) {/** 如果前驅放棄了,那就一直往前找,直到找到最近一個正常等待的狀態,并排在它的后邊。* 注意:那些放棄的結點,由于被自己“加塞”到它們前邊,它們相當于形成一個無引用鏈,稍后就會被保安大叔趕走了(GC回收)!*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {//如果前驅正常,那就把前驅的狀態設置成SIGNAL,告訴它拿完號后通知自己一下。有可能失敗,人家說不定剛剛釋放完呢!compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;1.3.2 parkAndCheckInterrupt()
? 該方法讓線程去休息,真正進入等待狀態。park()會讓當前線程進入waiting狀態。在此狀態下,有兩種途徑可以喚醒該線程:1)被unpark();2)被interrupt()。需要注意的是,Thread.interrupted()會清除當前線程的中斷標記位。
private final boolean parkAndCheckInterrupt(){LockSupport.park(this);//調用park()使線程進入waiting狀態return Thread.interrupted();//如果被喚醒,查看自己是不是被中斷的。}1.3.3 acquireQueued()小結
? acquireQueued()函數的具體流程:
-
結點進入隊尾后,檢查狀態,找到安全休息點;
-
調用park()進入waiting狀態,等待unpark()或interrupt()喚醒自己;
-
被喚醒后,看自己是不是有資格能拿到號。如果拿到,head指向當前結點,并返回從入隊到拿到號的整個過程中是否被中斷過;如果沒拿到,繼續流程1。
1.4 acquire()小結
? acquire()的流程:
-
調用自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
-
沒成功,則addWaiter()將該線程加入等待隊列的尾部,并標記為獨占模式;
-
acquireQueued()使線程在等待隊列中休息,有機會時(輪到自己,會被unpark())會去嘗試獲取資源。獲取到資源后才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
-
如果線程在等待過程中被中斷過,它是不響應的。只是獲取資源后才再進行自我中斷selfInterrupt(),將中斷補上。
J.U.C - 其它組件(這部分還需要細致總結)
FutureTask
? 在介紹 Callable 時我們知道它可以有返回值,返回值通過 Future 進行封裝。FutureTask 實現了 RunnableFuture 接口,該接口繼承自 Runnable 和 Future 接口,這使得 FutureTask 既可以當做一個任務執行,也可以有返回值。
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>
? FutureTask 可用于異步獲取執行結果或取消執行任務的場景。當一個計算任務需要執行很長時間,那么就可以用 FutureTask 來封裝這個任務,主線程在完成自己的任務之后再去獲取結果。
public class FutureTaskExample {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {int result = 0;for (int i = 0; i < 100; i++) {Thread.sleep(10);result += i;}return result;}});Thread computeThread = new Thread(futureTask);computeThread.start();Thread otherThread = new Thread(() -> {System.out.println("other task is running...");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}});otherThread.start();System.out.println(futureTask.get());} }?
?控制臺輸出結果為:
other task is running...
4950
BlockingQueue
? java.util.concurrent.BlockingQueue 接口有以下阻塞隊列的實現:
-
FIFO 隊列 :?LinkedBlockingQueue、ArrayBlockingQueue(固定長度)
-
優先級隊列 :?PriorityBlockingQueue 提供了阻塞的 take() 和 put() 方法:如果隊列為空 take() 將阻塞,直到隊列中有內容;如果隊列為滿 put() 將阻塞,直到隊列有空閑位置。
? 使用 BlockingQueue 實現生產者消費者問題
public class ProductorConsumer {private static BlockingQueue<String> quene = new ArrayBlockingQueue<>(5);private static class Productor extends Thread {@Overridepublic void run() {try {quene.put("product");} catch (InterruptedException e) {e.printStackTrace();}System.out.print("productor...");}}private static class Consumer extends Thread {@Overridepublic void run() {try {String product = quene.take();} catch (InterruptedException e) {e.printStackTrace();}System.out.print("consumer...");}}public static void main(String[] args) {for (int i = 0; i < 2; i++) {Productor productor = new Productor();productor.start();}for (int i = 0; i < 5; i++) {Consumer consumer = new Consumer();consumer.start();}for (int i = 0; i < 3; i++) {Productor productor = new Productor();productor.start();}} }? 控制臺輸出結果為(每次都不一樣):
?productor...productor...consumer...consumer...productor...productor...consumer...consumer...productor...consumer...
ForkJoin
使用了“分治”的思想。
主要用于并行計算中,和 MapReduce 原理類似,都是把大的計算任務拆分成多個小任務并行計算。
import java.util.concurrent.RecursiveTask;public class ForkJoinExample extends RecursiveTask<Integer> {private final int threshold = 5;private int first;private int last;public ForkJoinExample(int first, int last) {this.first = first;this.last = last;}@Overrideprotected Integer compute() {int result = 0;if ((last - first) <= threshold) {// 任務足夠小則直接計算for (int i = first; i <= last; i++) {result += i;}} else {// 拆分成小任務int middle = first + ((last - first) / 2);ForkJoinExample leftTask = new ForkJoinExample(first, middle);ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);leftTask.fork();rightTask.fork();result = leftTask.join() + rightTask.join();}return result;} }?
竊取算法(工作竊密算法)
工作竊取(work-stealing)算法是指某個線程從其他隊列里竊取任務來執行。
一個大任務分割為若干個互不依賴的子任務,為了減少線程間的競爭,把這些子任務分別放到不同的隊列里,并未每個隊列創建一個單獨的線程來執行隊列里的任務,線程和隊列一一對應。比如線程1負責處理1隊列里的任務,2線程負責2隊列的。但是有的線程會先把自己隊列里的任務干完,而其他線程對應的隊列里還有任務待處理。干完活的線程與其等著,不如幫其他線程干活,于是它就去其他線程的隊列里竊取一個任務來執行。而在這時它們可能會訪問同一個隊列,所以為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務線程永遠從雙端隊列的尾部拿任務執行。
-
優點:充分利用線程進行并行計算,減少線程間的競爭。
-
缺點:在某些情況下還是會存在競爭,比如雙端隊列里只有一個任務時。并且該算法會消耗更多的系統資源, 比如創建多個線程和多個雙端隊列。
在Java中,
-
可以使用LinkedBlockingDeque來實現工作竊取算法
-
JDK1.7引入的Fork/Join框架就是基于工作竊取算法
另外,jdk1.7中引入了一種新的線程池:WorkStealingPool,具體可以參見另一篇筆記:“Java并發-Executor框架和線程池”
參考:
工作竊取算法 work-stealing
生產者消費者模式之工作竊取算法
Java并發之AQS詳解
總結
以上是生活随笔為你收集整理的面试:你说你精通Java并发,给我讲讲Java并发之J.U.C的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: win8.1怎么安装dedeampz-p
- 下一篇: 初级Java开发与架构之间的差距不仅仅是