DelayQueue详解
一、DelayQueue是什么
DelayQueue是一個無界的BlockingQueue,用于放置實現了Delayed接口的對象,其中的對象只能在其到期時才能從隊列中取走。這種隊列是有序的,即隊頭對象的延遲到期時間最長。注意:不能將null元素放置到這種隊列中。
二、DelayQueue能做什么
1. 淘寶訂單業務:下單之后如果三十分鐘之內沒有付款就自動取消訂單。?
2. 餓了嗎訂餐通知:下單成功后60s之后給用戶發送短信通知。
3.關閉空閑連接。服務器中,有很多客戶端的連接,空閑一段時間之后需要關閉之。
4.緩存。緩存中的對象,超過了空閑時間,需要從緩存中移出。
5.任務超時處理。在網絡協議滑動窗口請求應答式交互時,處理超時未響應的請求等。
三、實例展示
定義元素類,作為隊列的元素
DelayQueue只能添加(offer/put/add)實現了Delayed接口的對象,意思是說我們不能想往DelayQueue里添加什么就添加什么,不能添加int、也不能添加String進去,必須添加我們自己的實現了Delayed接口的類的對象,來代碼:
/*** compareTo 方法必須提供與 getDelay 方法一致的排序*/ class MyDelayedTask implements Delayed{private String name ;private long start = System.currentTimeMillis();private long time ;public MyDelayedTask(String name,long time) {this.name = name;this.time = time;}/*** 需要實現的接口,獲得延遲時間 用過期時間-當前時間* @param unit* @return*/@Overridepublic long getDelay(TimeUnit unit) {return unit.convert((start+time) - System.currentTimeMillis(),TimeUnit.MILLISECONDS);}/*** 用于延遲隊列內部比較排序 當前時間的延遲時間 - 比較對象的延遲時間* @param o* @return*/@Overridepublic int compareTo(Delayed o) {MyDelayedTask o1 = (MyDelayedTask) o;return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));}@Overridepublic String toString() {return "MyDelayedTask{" +"name='" + name + '\'' +", time=" + time +'}';} }其中,compareTo 方法 getDelay 方法 就是Delayed接口的方法,我們必須實現,而且按照JAVASE文檔,compareTo 方法必須提供與 getDelay 方法一致的排序,也就是說compareTo方法里可以按照getDelay方法的返回值大小排序,即在compareTo方法里比較getDelay方法返回值大小
寫main方法測試
定義一個DelayQueue,添加幾個元素,while循環獲取元素
private static DelayQueue delayQueue = new DelayQueue();public static void main(String[] args) throws InterruptedException {new Thread(new Runnable() {@Overridepublic void run() {delayQueue.offer(new MyDelayedTask("task1",10000));delayQueue.offer(new MyDelayedTask("task2",3900));delayQueue.offer(new MyDelayedTask("task3",1900));delayQueue.offer(new MyDelayedTask("task4",5900));delayQueue.offer(new MyDelayedTask("task5",6900));delayQueue.offer(new MyDelayedTask("task6",7900));delayQueue.offer(new MyDelayedTask("task7",4900));}}).start();while (true) {Delayed take = delayQueue.take();System.out.println(take);}}執行結果
MyDelayedTask{name='task3', time=1900} MyDelayedTask{name='task2', time=3900} MyDelayedTask{name='task7', time=4900} MyDelayedTask{name='task4', time=5900} MyDelayedTask{name='task5', time=6900} MyDelayedTask{name='task6', time=7900} MyDelayedTask{name='task1', time=10000}DelayQueue屬于排序隊列,它的特殊之處在于隊列的元素必須實現Delayed接口,該接口需要實現compareTo和getDelay方法。
static class Task implements Delayed{@Override//比較延時,隊列里元素的排序依據public int compareTo(Delayed o) {return 0;}@Override//獲取剩余時間public long getDelay(TimeUnit unit) {return 0;}}?
元素進入隊列后,先進行排序,然后,只有getDelay也就是剩余時間為0的時候,該元素才有資格被消費者從隊列中取出來,所以構造函數一般都有一個時間傳入。
具體另一個實例:
import java.sql.Time; import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit;public class Delayquue {public static void main(String[] args) throws Exception {BlockingQueue<Task> delayqueue = new DelayQueue<>();long now = System.currentTimeMillis();delayqueue.put(new Task(now+3000));delayqueue.put(new Task(now+4000));delayqueue.put(new Task(now+6000));delayqueue.put(new Task(now+1000));System.out.println(delayqueue);for(int i=0; i<4; i++) {System.out.println(delayqueue.take());}}static class Task implements Delayed{long time = System.currentTimeMillis();public Task(long time) {this.time = time;}@Overridepublic int compareTo(Delayed o) {if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))return -1;else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) return 1;else return 0;}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(time - System.currentTimeMillis(),TimeUnit.MILLISECONDS);}@Overridepublic String toString() {return "" + time;}} }?輸出結果:
可以看出來,每隔一段時間就會輸出一個元素,這個間隔時間就是由構造函數定義的秒數來決定的。
?
原理分析:
內部結構
- 可重入鎖
- 用于根據delay時間排序的優先級隊列
- 用于優化阻塞通知的線程元素leader
- 用于實現阻塞和通知的Condition對象
delayed和PriorityQueue
在理解delayQueue原理之前我們需要先了解兩個東西,delayed和PriorityQueue.
- delayed是一個具有過期時間的元素
- PriorityQueue是一個根據隊列里元素某些屬性排列先后的順序隊列
delayQueue其實就是在每次往優先級隊列中添加元素,然后以元素的delay/過期值作為排序的因素,以此來達到先過期的元素會拍在隊首,每次從隊列里取出來都是最先要過期的元素
offer方法
take方法
get點
整個代碼的過程中并沒有使用上太難理解的地方,但是有幾個比較難以理解他為什么這么做的地方
leader元素的使用
大家可能看到在我們的DelayQueue中有一個Thread類型的元素leader,那么他是做什么的呢,有什么用呢?
讓我們先看一下元素注解上的doc描述:
Thread designated to wait for the element at the head of the queue.
This variant of the Leader-Follower pattern serves to minimize unnecessary timed waiting.
when a thread becomes the leader, it waits only for the next delay to elapse, but other threads await indefinitely.
The leader thread must signal some other thread before returning from take() or poll(...), unless some other thread becomes leader in the interim.
Whenever the head of the queue is replaced with an element with an earlier expiration time, the leader field is invalidated by being reset to null, and some waiting thread, but not necessarily the current leader, is signalled.
So waiting threads must be prepared to acquire and lose leadership while waiting.
上面主要的意思就是說用leader來減少不必要的等待時間,那么這里我們的DelayQueue是怎么利用leader來做到這一點的呢:
這里我們想象著我們有個多個消費者線程用take方法去取,內部先加鎖,然后每個線程都去peek第一個節點.
如果leader不為空說明已經有線程在取了,設置當前線程等待
如果為空說明沒有其他線程去取這個節點,設置leader并等待delay延時到期,直到poll后結束循環
else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread) leader = null; } }take方法中為什么釋放first元素
first = null; // don't retain ref while waiting我們可以看到doug lea后面寫的注釋,那么這段代碼有什么用呢?
想想假設現在延遲隊列里面有三個對象。
- 線程A進來獲取first,然后進入 else 的else ,設置了leader為當前線程A
- 線程B進來獲取first,進入else的阻塞操作,然后無限期等待
- 這時在JDK 1.7下面他是持有first引用的
- 如果線程A阻塞完畢,獲取對象成功,出隊,這個對象理應被GC回收,但是他還被線程B持有著,GC鏈可達,所以不能回收這個first.
- 假設還有線程C 、D、E.. 持有對象1引用,那么無限期的不能回收該對象1引用了,那么就會造成內存泄露.
鏈接: https://www.jianshu.com/p/e0bcc9eae0ae https://www.jianshu.com/p/bf9f6b08ba5b https://blog.csdn.net/toocruel/article/details/82769595
?
轉載于:https://www.cnblogs.com/myseries/p/10944211.html
總結
以上是生活随笔為你收集整理的DelayQueue详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 并发编程之 进程
- 下一篇: 【干货】仪器仪表常用术语汇总