日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java Review - 并发编程_DelayQueue原理源码剖析

發布時間:2025/3/21 java 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java Review - 并发编程_DelayQueue原理源码剖析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • 概述
  • 類圖結構
  • 小Demo
  • 核心方法&源碼解讀
    • offer操作
    • take操作
    • poll操作
    • size操作
  • 小結


概述

DelayQueue并發隊列是一個無界阻塞延遲隊列,隊列中的每個元素都有個過期時間,當從隊列獲取元素時,只有過期元素才會出隊列。

隊列頭元素是最快要過期的元素。

類圖結構

由該圖可知

  • DelayQueue內部使用PriorityQueue存放數據,使用ReentrantLock實現線程同步。

  • 另外,隊列里面的元素要實現Delayed接口,由于每個元素都有一個過期時間,所以要實現獲知當前元素還剩下多少時間就過期了的接口,由于內部使用優先級隊列來實現,所以要實現元素之間相互比較的接口。

/*** A mix-in style interface for marking objects that should be* acted upon after a given delay.** <p>An implementation of this interface must define a* {@code compareTo} method that provides an ordering consistent with* its {@code getDelay} method.** @since 1.5* @author Doug Lea*/ public interface Delayed extends Comparable<Delayed> {/*** Returns the remaining delay associated with this object, in the* given time unit.** @param unit the time unit* @return the remaining delay; zero or negative values indicate* that the delay has already elapsed*/long getDelay(TimeUnit unit); }
  • 條件變量available與lock鎖是對應的,其目的是為了實現線程間同步
private final transient ReentrantLock lock = new ReentrantLock();/*** Condition signalled when a newer element becomes available* at the head of the queue or a new thread may need to* become leader.*/private final Condition available = lock.newCondition();
  • 其中leader變量的使用基于Leader-Follower模式的變體,用于盡量減少不必要的線程等待。當一個線程調用隊列的take方法變為leader線程后,它會調用條件變量available.awaitNanos(delay)等待delay時間,但是其他線程(follwer線程)則會調用available.await()進行無限等待

leader線程延遲時間過期后,會退出take方法,并通過調用available.signal()方法喚醒一個follwer線程,被喚醒的follwer線程被選舉為新的leader線程。

每日一博 - DelayQueue阻塞隊列源碼解讀

/*** Thread designated to wait for the element at the head of* the queue. This variant of the Leader-Follower pattern* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to* minimize unnecessary timed waiting. When a thread becomes* the leader, it waits only for the next delay to elapse, but* other threads await indefinitely. The leader thread must* signal some other thread before returning from take() or* poll(...), unless some other thread becomes leader in the* interim. Whenever the head of the queue is replaced with* an element with an earlier expiration time, the leader* field is invalidated by being reset to null, and some* waiting thread, but not necessarily the current leader, is* signalled. So waiting threads must be prepared to acquire* and lose leadership while waiting.*/private Thread leader = null;


小Demo

import java.util.Random; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/19 23:05* @mark: show me the code , change the world*/ public class DelayQueueTest {static class DelayedEle implements Delayed {private final long delayTime; //延遲時間private final long expire; //到期時間private String data; //數據public DelayedEle(long delay, String data) {delayTime = delay;this.data = data;expire = System.currentTimeMillis() + delay;}/*** 剩余時間=到期時間-當前時間*/@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}/*** 優先隊列里面優先級規則*/@Overridepublic int compareTo(Delayed o) {return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));}@Overridepublic String toString() {final StringBuilder sb = new StringBuilder("DelayedElement{");sb.append("delay=").append(delayTime);sb.append(", expire=").append(expire);sb.append(", data='").append(data).append('\'');sb.append('}');return sb.toString();}}public static void main(String[] args) throws InterruptedException {// 1 創建延時隊列DelayQueue<DelayedEle> delayQueue = new DelayQueue<DelayedEle>();// 2 創建延時任務Random random = new Random();for (int i = 0; i < 10; i++) {DelayedEle ele = new DelayedEle(random.nextInt(500), "task-" + i);delayQueue.offer(ele);}System.out.println("開始操作,delayQueue隊列大小為:" + delayQueue.size());// 3 依次取出任務并打印DelayedEle delayedEle = null;try {// 3.1 循環,如果想避免虛假喚醒,則不能把全部元素都打印出來for (; ; ) {// 3.2 獲取過期的任務并打印while ((delayedEle = delayQueue.take()) != null) {System.out.println(delayedEle.toString());}}} catch (InterruptedException e) {e.printStackTrace();}} }

首先創建延遲任務DelayedEle類,其中delayTime表示當前任務需要延遲多少ms時間過期,expire則是當前時間的ms值加上delayTime的值。

另外,實現了Delayed接口,實現了long getDelay(TimeUnit unit)方法用來獲取當前元素還剩下多少時間過期,實現了int compareTo(Delayed o)方法用來決定優先級隊列元素的比較規則。

在main函數內首先創建了一個延遲隊列,然后使用隨機數生成器生成了10個延遲任務,最后通過循環依次獲取延遲任務,并打印。運行上面代碼,一個可能的輸出如下所示。

可見,出隊的順序和delay時間有關,而與創建任務的順序無關。


核心方法&源碼解讀

offer操作

插入元素到隊列,如果插入元素為null則拋出NullPointerException異常,否則由于是無界隊列,所以一直返回true。插入元素要實現Delayed接口。

/*** Inserts the specified element into this delay queue.** @param e the element to add* @return {@code true}* @throws NullPointerException if the specified element is null*/public boolean offer(E e) {final ReentrantLock lock = this.lock; // 1 lock.lock();try {q.offer(e);if (q.peek() == e) {// 2 leader = null;available.signal();}return true;} finally {lock.unlock();}}
  • 首先獲取獨占鎖,然后添加元素到優先級隊列,由于q是優先級隊列,所以添加元素后,調用q.peek()方法返回的并不一定是當前添加的元素
  • 如果代碼(2)判斷結果為true,則說明當前元素e是最先將過期的,那么重置leader線程為null,這時候激活avaliable變量條件隊列里面的一個線程,告訴它隊列里面有元素了。

take操作

獲取并移除隊列里面延遲時間過期的元素,如果隊列里面沒有過期元素則等待。

/*** Retrieves and removes the head of this queue, waiting if necessary* until an element with an expired delay is available on this queue.** @return the head of this queue* @throws InterruptedException {@inheritDoc}*/public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {// 1 獲取但不移除隊首元素 E first = q.peek();if (first == null) available.await(); // 2 else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0) // 3 return q.poll();first = null; // don't retain ref while waiting if (leader != null) // 4available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread; // 5try {available.awaitNanos(delay); // 6} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && q.peek() != null) // 7 available.signal();lock.unlock();}}
  • 首先獲取獨占鎖lock。假設線程A第一次調用隊列的take()方法時隊列為空,則執行代碼(1)后first==null,所以會執行代碼(2)把當前線程放入available的條件隊列里阻塞等待。

  • 當有另外一個線程B執行offer(item)方法并且添加元素到隊列時,假設此時沒有其他線程執行入隊操作,則線程B添加的元素是隊首元素,那么執行q.peek()。

  • e這時候就會重置leader線程為null,并且激活條件變量的條件隊列里面的一個線程。此時線程A就會被激活。

  • 線程A被激活并循環后重新獲取隊首元素,這時候first就是線程B新增的元素,可知這時候first不為null,則調用first.getDelay(TimeUnit.NANOSECONDS)方法查看該元素還剩余多少時間就要過期,如果delay<=0則說明已經過期,那么直接出隊返回。

  • 否則查看leader是否為null,不為null則說明其他線程也在執行take,則把該線程放入條件隊列。如果這時候leader為null,則選取當前線程A為leader線程,

  • 然后執行代碼(5)等待delay時間(這期間該線程會釋放鎖,所以其他線程可以offer添加元素,也可以take阻塞自己),剩余過期時間到后,線程A會重新競爭得到鎖,然后重置leader線程為null,重新進入循環,這時候就會發現隊頭的元素已經過期了,則會直接返回隊頭元素。

  • 在返回前會執行finally塊里面的代碼(7),代碼(7)執行結果為true則說明當前線程從隊列移除過期元素后,又有其他線程執行了入隊操作,那么這時候調用條件變量的singal方法,激活條件隊列里面的等待線程。


poll操作

獲取并移除隊頭過期元素,如果沒有過期元素則返回null。

/*** Retrieves and removes the head of this queue, or returns {@code null}* if this queue has no elements with an expired delay.** @return the head of this queue, or {@code null} if this* queue has no elements with an expired delay*/public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {E first = q.peek();// 如果隊列為空,或者不為空但是對頭元素沒有過期,則返回nullif (first == null || first.getDelay(NANOSECONDS) > 0)return null;elsereturn q.poll();} finally {lock.unlock();}}

首先獲取獨占鎖,然后獲取隊頭元素,如果隊頭元素為null或者還沒過期則返回null,否則返回隊頭元素。


size操作

計算隊列元素個數,包含過期的和沒有過期的。

public int size() {final ReentrantLock lock = this.lock;lock.lock();try {return q.size();} finally {lock.unlock();}}

先獲取獨占鎖,然后調用優先級隊列的size方法。


小結

DelayQueue隊列內部使用PriorityQueue存放數據,使用ReentrantLock實現線程同步。

另外隊列里面的元素要實現Delayed接口,其中一個是獲取當前元素到過期時間剩余時間的接口,在出隊時判斷元素是否過期了,一個是元素之間比較的接口,因為這是一個有優先級的隊列。

《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀

總結

以上是生活随笔為你收集整理的Java Review - 并发编程_DelayQueue原理源码剖析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。