日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【JAVA】延迟队列DelayQueue的应用

發布時間:2023/12/14 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【JAVA】延迟队列DelayQueue的应用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

最近在開發CRM管理系統時遇到一個需求:銷售部門的人員在使用該系統時,可以從【線索公海】模塊中 “領取” 潛在的客戶線索到自己的【線索私海】模塊中,成為自己私有的潛在客戶線索,以便后期進行跟蹤、開發,同時,也可以主動放棄該線索,將線索 “釋放” 回【線索公海】中,若開發成功,則客戶進入【客戶私海】模塊中,成為自己的潛在客戶,若這時不想繼續開發這個客戶了,進行 “釋放”,則該客戶進入【客戶公海】中以供所有銷售進行 “領取”,誰領取到了,就進入相應銷售的【客戶私海】中

在這個基礎上,我們希望實現這樣一個功能:
用戶在領取了線索后,若24小時內沒有將線索成功開發為自己的潛在客戶,則自動釋放使之成為公海線索,并且48小時內凍結該線索(無法領取),同樣,潛在客戶60天內沒有開發成正式客戶,則自動釋放該客戶資源到公海中,同樣是48小時內不能被重新認領

在這個場景下,我想到了DelayQueue

DelayQueue介紹

簡單來說,DelayQueue是一個根據元素的到期時間來排序的隊列,而并非是一般的隊列那樣先進先出,最快過期的元素排在隊首,越晚到期的元素排得越后
使用時,元素必須實現Delayed接口,生產者線程往隊列里添加元素時,會觸發Delayed接口中的compareTo方法進行排序,消費者線索獲取元素時,會調用Delayed接口中的getDelay方法來檢查隊首元素是否到期,getDelay方法返回的是離到期時間剩余的時間值,若getDelay返回的值小0或者等于0,則表示已到期,消費者線程取出進行消費,若getDelay方法返回的值大于0,則消費者線程會被阻塞,wait返回的時間值后,再從隊列頭部取出元素進行消費

數據結構

閱讀DelayQueue的源碼

可以看到它包含了:
一個PriorityQueue——PriorityQueue是一個優先級隊列,它是一個沒有阻塞功能的Queue,也就是說DelayQueue底層通過PriorityQueue來實現元素的存儲

一個ReentrantLock鎖

一個線程leader——DelayQueue使用類似Leader-Followr模式,即消費者線程要獲取元素時,若元素還沒過期,則消費者線程阻塞等待的時間即元素的剩余過期時間,即消費者線程等待的元素保證是最先過期的元素,這樣消費者線程可以盡量把時間花在處理任務上,最小化空等的時間,以提高線程的利用效率

一個阻塞的條件Condition——實現出隊時阻塞的功能

特性

DelayQueue是一個無界隊列,因此入隊時不會阻塞,與優先級隊列入隊相同
DelayQueue的特性主要在出隊上
出隊時:
1.若隊列為空,則阻塞
2.若不為空,則檢查堆頂的元素是否過期,剩余過期時間小于等于0則出隊,若大于0,則:判斷當前有無消費者線程作為leader正在等待獲取元素,若leader不為null,則直接阻塞,若leader為null,則將當前消費者線程設為leader,并按照最早過期的時間進行阻塞

示意圖:

過了2s后,元素5到期了,喚醒消費者線程1并獲取元素5進行消費
同時把消費者線程2設為leader,此時元素4為堆頂元素,2s后到期,所以消費者線程2的阻塞時間設置為2s

又過了2s,元素4到期,喚醒消費者線程2并獲取元素4進行消費
消費者線程1繼續處理元素5

繼續過2s后,若此時消費者線程1或者消費者線程2處理完任務,則繼續獲取元素進行消費,并且元素3剛剛好到期了
若此時兩個線程都沒有處理完任務,則會出現元素3到期了,但是沒有消費者來取出消費,同時,隊列中不斷有新的元素入隊,就會造成任務延期,隊列會越來越大,元素延遲處理的時間會越來越長

假設此時又過了2s,還是沒有消費者線程空下來:

因此,若任務處理時間較長,任務增長速度快,且到期時間較集中,則需要加快消費者線程處理任務的速度和增加消費者線程數量,否則就會造成任務延期越來越長,反之,也不能盲目增加消費者線程數量,數量太多導致資源浪費

實例

結合項目需求,使用DelayQueue來實現線索、客戶的超時功能
(1)創建任務類:DelayTask.java,實現Delayed接口,作為延遲隊列中的元素,然后只需將線索類、客戶類繼承該類

@Data public class DelayTask implements Delayed {/*** 開始計時時間 不設置則默認為當前系統時間*/private transient Date taskStartTime = new Date();/*** 過期時間 不設置則默認1分鐘*/private transient long taskExpiredTime = 60 * 1000;/*** 初始設置開始計時時間* taskStartTime 開始時間 [String] [yyyy-MM-dd HH:mm:ss]* taskExpiredTime 過期時間 [long] 單位:s* @param taskStartTime* @param taskExpiredTime*/public void initTaskTime(String taskStartTime, long taskExpiredTime) {if(Assert.notEmpty(taskStartTime)) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");try {this.taskStartTime = sdf.parse(taskStartTime);} catch (ParseException e) {e.printStackTrace();}}this.taskExpiredTime = taskExpiredTime;this.taskExpiredTime += this.taskStartTime.getTime();}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(taskExpiredTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {return (this.getDelay(TimeUnit.MILLISECONDS) - ((DelayTask) o).getDelay(TimeUnit.MILLISECONDS)) > 0 ? 1:0;}}

(2)創建一個單例的延遲隊列工具類:DelayQueueHelper
聲明了一個延遲隊列,并且對外提供一個統一、全局的操作延遲隊列的入口(入隊、刪除元素操作)

public class DelayQueueHelper {private volatile static DelayQueueHelper delayQueueHelper = null;//私海線索過期時間:24hpublic static final long CLUE_EXPIRED_TIME = 24 * 60 * 60 * 1000;//私海客戶過期時間:60天public static final long CUS_EXPIRED_TIME = 60L * 24 * 60 * 60 * 1000;//線索、客戶釋放后冷凍時間:48hpublic static final long BLOCK_TIME = 48 * 60 * 60 * 1000;private DelayQueue<DelayTask> queue = new DelayQueue<>();private DelayQueueHelper() {}public static DelayQueueHelper getInstance() {if(delayQueueHelper == null) {synchronized(DelayQueueHelper.class) {delayQueueHelper = new DelayQueueHelper();}}return delayQueueHelper;}public void addTask(DelayTask task) {queue.put(task);}public void removeTask(DelayTask task) {if(task == null){return;}for(Iterator<DelayTask> iterator = queue.iterator(); iterator.hasNext();) {if(task instanceof Clue) {Clue clue = (Clue) task;Clue queueObj = (Clue) iterator.next();if(clue.getId().equals(queueObj.getId())){queue.remove(queueObj);}}}}public DelayQueue<DelayTask> getQueue() {return queue;}}

(3)創建一個初始化類:DelayQueueRunner,實現ApplicationRunner接口
1.系統啟動時,首先將所有任務入隊 (DelayQueue的缺點:宕機、系統重啟后數據會被清空,因此系統初始化時需將所有滿足條件的元素入隊)
2.開啟一個消費者線程,循環從延遲隊列中獲取到期的線索、客戶進行消費(將線索、客戶狀態修改為釋放狀態、解除凍結狀態)

@Slf4j @Component public class DelayQueueRunner implements ApplicationRunner {@Overridepublic void run(ApplicationArguments args) throws Exception {DelayQueueHelper queueHelper = DelayQueueHelper.getInstance();//1.將所有未到期的線程、客戶入隊//......//2.開啟一個消費者線程run(queueHelper.getQueue());}public void run(DelayQueue queue) {new Thread() {@Overridepublic void run() {try {while (true) {DelayTask task = (DelayTask) queue.take();executeTask(task);}} catch (InterruptedException e) {log.error(e.getMessage());e.printStackTrace();}}}.start();}private void executeTask(DelayTask task) {if(task instanceof Clue) {Clue clue = (Clue) task;//修改狀態clue.update();}}}

(4)在添加、釋放線索記錄、客戶記錄時,通過DelayQueueHelper對隊列中的元素進行相應的入隊、出隊操作

/*** 將線索\客戶加入超時自動更新狀態隊列* @param clue 線索\客戶對象* @param type 0:私海線索 1:私海客戶 3:釋放后元素* @param startTime 開始計時時間*/public void addToTimeoutAutoUpdateQueue(Clue clue, int type, Date startTime) {long expireTime = 0;if(type == CLUE) { //線索隊列expireTime = DelayQueueHelper.CLUE_EXPIRED_TIME;}else if(type == CUS) { //客戶隊列expireTime = DelayQueueHelper.CUS_EXPIRED_TIME;}else if(type == LOCK) { //凍結隊列expireTime = DelayQueueHelper.BLOCK_TIME;}SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");DelayQueueHelper queueHelper = DelayQueueHelper.getInstance();clue.initTaskTime(sdf.format(startTime), expireTime);queueHelper.addTask(clue);}/*** 將線索從超時自動更新狀態隊列中刪除* @param clue*/public void removeFromTimeoutAutoUpdateQueue(Clue clue) {DelayQueueHelper queueHelper = DelayQueueHelper.getInstance();queueHelper.removeTask(clue);}

總結

以上是生活随笔為你收集整理的【JAVA】延迟队列DelayQueue的应用的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。