日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【学习笔记】抽象队列同步器AQS应用之BlockingQueue详解

發(fā)布時間:2024/9/30 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【学习笔记】抽象队列同步器AQS应用之BlockingQueue详解 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

文章目錄

  • 什么是AQS框架
    • Aqs核心源碼
    • 基于aqs實現(xiàn)的鎖
  • BlockingQueue
    • ArrayBlockingQueue
    • LinkedBlockingQueue
    • DelayQueue
    • BlockingQueue API
    • 多線程生產(chǎn)者-消費者示例

什么是AQS框架

1、AQS是一個JAVA線程同步的框架。是JDK中很多鎖工具的核心實現(xiàn)框架。
2、在AQS中,維護了一個信號量 state和一個線程組成的雙向鏈表隊列。其中,這個線程隊列,是用來給線程排隊的,而state就像是一個紅綠燈,用來控制線程排隊或放行的。在不同的場景下,有不用的意義。
在可重入鎖這個場景下, states就用來表示加的次數(shù)。0標識無鎖,每加一次鎖, states就加1.釋放鎖 state就減1

簡單說一下AQS,AQS全稱為AbstractQueuedSychronizer,翻譯過來應該是抽象隊列同步器。如果說java.util.concurrent的基礎是CAS的話,那么AQS就是整個Java并發(fā)包的核心了,ReentrantLock、CountDownLatch、Semaphore等等都用到了它。
AQS實際上以雙向隊列的形式連接所有的Entry,比方說ReentrantLock,所有等待的線程都被放在一個Entry中并連成雙向隊列,前面一個線程使用ReentrantLock好了,則雙向隊列實際上的第一個Entry開始運行。
AQS定義了對雙向隊列所有的操作,而只開放了tryLock和tryRelease方法給開發(fā)者使用,開發(fā)者可以根據(jù)自己的實現(xiàn)重寫tryLock和tryRelease方法,以實現(xiàn)自己的并發(fā)功能。

AQS內(nèi)部用一個volatile修飾的int類型的成員變量state來控制同步狀態(tài)。

  • state = 0:表示沒有線程正在獨占共享資源的鎖。
  • state = 1:表示有線程正在共享資源的鎖。

AQS雖說是一個抽象類,但是其內(nèi)部沒有一個方法是抽象方法,因為AQS只是基礎的組件,作者并不希望使用者對其直接進行操作,更傾向于其作為基礎組件,為其實現(xiàn)類提供基礎的幫助。

AQS采用的是模板方法模式,其內(nèi)部除了提供并發(fā)的操作核心方法以及同步隊列的操作之外,還提供了一些模板方法讓子類自己實現(xiàn),如加鎖解鎖。

AQS作為基礎的組件,封裝的都是核心的并發(fā)操作,實際上還分為兩種模式,共享模式和獨占模式,如Reentrantlock,ReentrantReadWriteLock(寫鎖部分)都是獨占鎖,ReentrantReadWriteLock(讀鎖部分)就是共享鎖。
這兩種模式的解鎖和加鎖邏輯都不一樣,但是AQS只關注內(nèi)部的公共方法的實現(xiàn),不關心外部的具體實現(xiàn),所以提供了模板方法給子類。

要實現(xiàn)獨占模式,則需要實現(xiàn)tryAcquire(加鎖)和tryRelease(解鎖),而實現(xiàn)共享模式則需要實現(xiàn)tryAcquireShared(加鎖)和tryReleaseShared(解鎖),無論是共享模式還是獨占模式,其底層實現(xiàn)都是同一個AQS,只是加鎖和解鎖邏輯不一樣,所以,根據(jù)自己的需求自定義鎖也就變得簡單。
aqs類:

看看AQS提供的5個模板方法:

AQS在內(nèi)部定義了一個volatile int state變量,表示同步狀態(tài):當線程調(diào)用lock方法時,如果state=0,說明沒有任何線程占有共享資源的鎖,可以獲得鎖并將state=1,如果state=1說明有線程目前正在使用共享變量,其他線程必須加入同步隊列進行等待

aqs通過node內(nèi)部類構成一個雙向鏈表結構的同步隊列來完成線程獲取鎖的排隊動作,當有線程獲取鎖失敗以后,就被添加到隊列末尾。AQS通過內(nèi)部類conditionobject構建等待隊列,當condition調(diào)用wait()方法后,線程會加入等待隊列中,而當condition調(diào)用signal()方法后,線程將從等待隊列轉移到同步隊列中進行鎖競爭。

【conditionobject類】
ConditionObject是AQS中的內(nèi)部類,提供了條件鎖的同步實現(xiàn),實現(xiàn)了Condition接口,并且實現(xiàn)了其中的await(),signal(),signalALL()等方法。

ConditionObject分析
 使用方式如下:

Lock lock = new ReentrantLock();Condition condition = lock.newCondition();//創(chuàng)建和該鎖關聯(lián)的條件鎖public void conditionWait() throws InterruptedException{lock.lock();try {condition.await();}finally {lock.unlock();}}public void ConditionSignal() throws InterruptedException{lock.lock();try {condition.signal();}finally {lock.unlock();}}

AQS和conditon各自維護了不同的隊列,在使用lock和condition的時候,其實就是兩個隊列的互相移動。
aqs中的類:

???AbstractOwnableSynchronizer:抽象類,定義了存儲獨占當前線程的屬性和設置,獲取當前線程的方法。
???AbstractQueuenSynchronize:抽象類,AQS框架核心類,內(nèi)部以虛擬隊列的方式管理線程的鎖獲取與鎖釋放,其中獲取鎖(tryAcquire方法)和釋放鎖(tryRelease方法)并沒有提供默認的實現(xiàn),需要子類重寫方法的具體邏輯,目的是為了使開發(fā)人可以自定義獲取鎖和釋放鎖的方式。
???Node:AbstractQueuenSynchronize的內(nèi)部類,用于構建虛擬隊列(雙向鏈表),為每個進入同步隊列的線程封裝成Node對象加入隊列,管理需要獲取鎖的線程。
???Sync:抽象類,是ReentrantLock的內(nèi)部類,繼承了AbstractQueuenSynchronize,實現(xiàn)了tryRelease方法,并提供抽象方法lock,供子類實現(xiàn)
???NonfairSync:Reentrantlock的內(nèi)部類,繼承Sync,非公平鎖的實現(xiàn)類
???FairSync:Reentrantlock的內(nèi)部類,繼承Sync,公平鎖的實現(xiàn)類
???Reentrantlock:實現(xiàn)了Lock接口,創(chuàng)建時默認為非公平鎖。

Aqs核心源碼

獲取鎖:

lock:

compareAndSetState 賦值成功則獲取鎖

ReentrantLock 實現(xiàn)了非公平鎖和公平鎖,所以在調(diào)用 lock.lock(); 時,會有不同的實現(xiàn)類:

  • 非公平鎖,會直接使用 CAS 進行搶占,修改變量 state 值。如果成功則直接把自己的線程設置到 exclusiveOwnerThread ,也就是獲得鎖成功。 不成功后續(xù)分析
  • 公平鎖,則不會進行搶占,而是規(guī)規(guī)矩矩的進行排隊。 老實人
  • (AQS)acquire

    public final void acquire(int arg){ if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}

    整個這塊代碼里面包含了四個方法的調(diào)用,如下:

  • tryAcquire ,分別由繼承 AQS 的公平鎖( FairSync )、非公平鎖 NonfairSync )實現(xiàn)。
  • addWaiter ,該方法是 AQS 的私有方法,主要用途是方法 tryAcquire 返回 false以后,也就是獲取鎖失敗以后,把當前請求鎖的線程添加到隊列中,并返回 Node節(jié)點。
  • acquireQueued ,負責把 addWaiter 返回的 Node 節(jié)點添加到隊列結尾,并會執(zhí)行獲取鎖操作以及判斷是否把當前線程掛起。
  • selfInterrupt ,是 AQS 中的 Thread.currentThread().interrupt() 方
    法調(diào)用,它的主要作用是在執(zhí)行完 acquire 之前自己執(zhí)行中斷操作。
  • addwaiter方法

    private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // 如果隊列不為空, 使用 CAS 方式將當前節(jié)點設為尾節(jié)點if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; }} // 隊列為空、CAS失敗,將節(jié)點插入隊列 enq(node); return node; }

    當執(zhí)行方法 addWaiter ,那么就是 !tryAcquire = true ,也就是
    tryAcq uire 獲取鎖失敗了。
    ? 接下來就是把當前線程封裝到 Node 節(jié)點中,加入到 FIFO 隊列中。 因為先進先
    出,所以后來的隊列加入到隊尾
    ? compareAndSetTail 不一定一定成功,因為在并發(fā)場景下,可能會出現(xiàn)操作
    失敗。那么失敗后,則需要調(diào)用 enq 方法,該方法會自旋操作,把節(jié)點入隊列。

    1、AQS是一個JAVA線程同步的框架。是JDK中很多鎖工具的核心實現(xiàn)框架。
    2、在AQS中,維護了一個信號量 state和一個線程組成的雙向鏈表隊列。其中,這個線程隊列,是用來給線程排隊的,而state就像是一個紅綠燈,用來控制線程排隊或放行的。在不同的場景下,有不用的意義。
    在可重入鎖這個場景下, states就用來表示加的次數(shù)。0標識無鎖,每加一次鎖, states就加1.釋放鎖 state就減1

    基于aqs實現(xiàn)的鎖


    Semaphore ,信號量鎖。主要用于控制流量,它的作用是限制某段代碼塊的并發(fā)數(shù)。Semaphore有一個構造函數(shù),可以傳入一個int型整數(shù)n,表示某段代碼最多只有n個線程可以訪問,如果超出了n,那么請等待,等到某個線程執(zhí)行完畢這段代碼塊,下一個線程再進入。由此可以看出如果Semaphore構造函數(shù)中傳入的int型整數(shù)n=1,相當于變成了一個synchronized了。比如:數(shù)據(jù)庫連接池給你分配 10個鏈接,那么讓你來一個連一個,連到 10 個還沒有人釋放,那你就等等。
    CountDownLatch ,閉鎖。 Latch 門閂的意思,比如:說四個人一個漂流艇,坐滿了就推下水。

    BlockingQueue

    概要:BlockingQueue,是java.util.concurrent 包提供的用于解決并發(fā)生產(chǎn)者 - 消費者問題的最有用的類,它的特性是在任意時刻只有一個線程可以進行take或者put操作,并且BlockingQueue提供了超時return null的機制,在許多生產(chǎn)場景里都可以看到這個工具的身影。
    隊列類型
    無限隊列 (unbounded queue ) - 幾乎可以無限增長
    有限隊列 ( bounded queue ) - 定義了最大容量
    隊列數(shù)據(jù)結構
    隊列實質(zhì)就是一種存儲數(shù)據(jù)的結構
    通常用鏈表或者數(shù)組實現(xiàn)
    一般而言隊列具備FIFO先進先出的特性,當然也有雙端隊列(Deque)優(yōu)先級隊列
    主要操作:入隊(EnQueue)與出隊(Dequeue)

    ArrayBlockingQueue

    隊列基于數(shù)組實現(xiàn),容量大小在創(chuàng)建ArrayBlockingQueue對象時已定義好
    數(shù)據(jù)結構如下圖

    BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>();

    應用場景
    在線程池中有比較多的應用,生產(chǎn)者消費者場景
    工作原理
    基于ReentrantLock保證線程安全,根據(jù)Condition實現(xiàn)隊列滿時的阻塞

    LinkedBlockingQueue

    是一個基于鏈表的無界隊列(理論上有界)

    BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();

    上面這段代碼中,blockingQueue 的容量將設置為 Integer.MAX_VALUE 。
    向無限隊列添加元素的所有操作都將永遠不會阻塞,[注意這里不是說不會加鎖保證線程安全],因此它可以增長到非常大的容量。
    使用無限 BlockingQueue 設計生產(chǎn)者 - 消費者模型時最重要的是 消費者應該能夠像生產(chǎn)者向隊列添加消息一樣快地消費消息 。否則,內(nèi)存可能會填滿,然后就會得到一個 OutOfMemory 異常。

    DelayQueue

    由優(yōu)先級堆支持的、基于時間的調(diào)度隊列,內(nèi)部基于無界隊列PriorityQueue實現(xiàn),而無界隊列基于數(shù)組的擴容實現(xiàn)。
    隊列創(chuàng)建:

    BlockingQueue<String> blockingQueue = new DelayQueue();

    要求
    入隊的對象必須要實現(xiàn)Delayed接口,而Delayed集成自Comparable接口
    應用場景
    電影票
    工作原理:
    隊列內(nèi)部會根據(jù)時間優(yōu)先級進行排序。延遲類線程池周期執(zhí)行。

    BlockingQueue API

    BlockingQueue 接口的所有方法可以分為兩大類:負責向隊列添加元素的方法和檢索這些元素的方法。在隊列滿/空的情況下,來自這兩個組的每個方法的行為都不同。

    在構建生產(chǎn)者 - 消費者程序時,這些方法是 BlockingQueue 接口中最重要的構建塊。

    多線程生產(chǎn)者-消費者示例

    接下來我們創(chuàng)建一個由兩部分組成的程序 - 生產(chǎn)者 ( Producer ) 和消費者 ( Consumer ) 。
    生產(chǎn)者將生成一個 0 到 100 的隨機數(shù)(十全大補丸的編號),并將該數(shù)字放在 BlockingQueue 中。我們將創(chuàng)建 16 個線程(潘金蓮)用于生成隨機數(shù)并使用 put() 方法阻塞,直到隊列中有可用空間。
    需要記住的重要一點是,我們需要阻止我們的消費者線程無限期地等待元素出現(xiàn)在隊列中。
    從生產(chǎn)者(潘金蓮)向消費者(武大郎)發(fā)出信號的好方法是,不需要處理消息,而是發(fā)送稱為毒 ( poison ) 丸 ( pill ) 的特殊消息。 我們需要發(fā)送盡可能多的毒 ( poison ) 丸 ( pill ) ,因為我們有消費者(武大郎)。然后當消費者從隊列中獲取特殊的毒 ( poison ) 丸 ( pill )消息時,它將優(yōu)雅地完成執(zhí)行。
    以下生產(chǎn)者的代碼:

    @Slf4j public class NumbersProducer implements Runnable {private BlockingQueue<Integer> numbersQueue;private final int poisonPill;private final int poisonPillPerProducer;public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {this.numbersQueue = numbersQueue;this.poisonPill = poisonPill;this.poisonPillPerProducer = poisonPillPerProducer;}public void run() {try {generateNumbers();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}private void generateNumbers() throws InterruptedException {for (int i = 0; i < 100; i++) {numbersQueue.put(ThreadLocalRandom.current().nextInt(100));log.info("潘金蓮-{}號,給武大郎的泡藥!",Thread.currentThread().getId());}for (int j = 0; j < poisonPillPerProducer; j++) {numbersQueue.put(poisonPill);log.info("潘金蓮-{}號,往武大郎的藥里放入第{}顆毒丸!",Thread.currentThread().getId(),j+1);}} }

    我們的生成器構造函數(shù)將 BlockingQueue 作為參數(shù),用于協(xié)調(diào)生產(chǎn)者和使用者之間的處理。我們看到方法 generateNumbers() 將 100 個元素(生產(chǎn)100副藥給武大郎吃)放入隊列中。它還需要有毒 ( poison ) 丸 ( pill ) (潘金蓮給武大郎下毒)消息,以便知道在執(zhí)行完成時放入隊列的消息類型。該消息需要將 poisonPillPerProducer 次放入隊列中。
    每個消費者將使用 take() 方法從 BlockingQueue 獲取一個元素,因此它將阻塞,直到隊列中有一個元素。從隊列中取出一個 Integer 后,它會檢查該消息是否是毒 ( poison ) 丸 ( pill )(武大郎看潘金蓮有沒有下毒) ,如果是,則完成一個線程的執(zhí)行。否則,它將在標準輸出上打印出結果以及當前線程的名稱。

    消費者:

    @Slf4j public class NumbersConsumer implements Runnable {private BlockingQueue<Integer> queue;private final int poisonPill;public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {this.queue = queue;this.poisonPill = poisonPill;}public void run() {try {while (true) {Integer number = queue.take();if (number.equals(poisonPill)) {return;}log.info("武大郎-{}號,喝藥-編號:{}",Thread.currentThread().getId(),number);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}} }

    需要注意的重要事項是隊列的使用。與生成器構造函數(shù)中的相同,隊列作為參數(shù)傳遞。我們可以這樣做,是因為 BlockingQueue 可以在線程之間共享而無需任何顯式同步。
    既然我們有生產(chǎn)者和消費者,我們就可以開始我們的計劃。我們需要定義隊列的容量,并將其設置為 10個元素。
    我們創(chuàng)建4 個生產(chǎn)者線程,并且創(chuàng)建等于可用處理器數(shù)量的消費者線程:

    public class Main {public static void main(String[] args) {int BOUND = 10;int N_PRODUCERS = 16;int N_CONSUMERS = Runtime.getRuntime().availableProcessors();int poisonPill = Integer.MAX_VALUE;int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;int mod = N_CONSUMERS % N_PRODUCERS;BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);//潘金蓮給武大郎熬藥for (int i = 1; i < N_PRODUCERS; i++) {new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();}//武大郎開始喝藥for (int j = 0; j < N_CONSUMERS; j++) {new Thread(new NumbersConsumer(queue, poisonPill)).start();}//潘金蓮開始投毒,武大郎喝完毒藥GGnew Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();}}

    BlockingQueue 是使用具有容量的構造創(chuàng)建的。我們正在創(chuàng)造 4 個生產(chǎn)者和 N 個消費者(武大郎)。我們將我們的毒 ( poison ) 丸 ( pill )消息指定為 Integer.MAX_VALUE,因為我們的生產(chǎn)者在正常工作條件下永遠不會發(fā)送這樣的值。這里要注意的最重要的事情是 BlockingQueue 用于協(xié)調(diào)它們之間的工作。

    總結

    以上是生活随笔為你收集整理的【学习笔记】抽象队列同步器AQS应用之BlockingQueue详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。