日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

java 超时集合_Java之集合(二十三)SynchronousQueue

發(fā)布時間:2025/3/15 java 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java 超时集合_Java之集合(二十三)SynchronousQueue 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

1.前言

本章介紹阻塞隊列SynchronousQueue。之前介紹過LinkedTransferQueue,特點提供了讓生產(chǎn)者知道消費者消費了其產(chǎn)出,沒消費就等待的模式,本章介紹的這個類則必須是生產(chǎn)者生產(chǎn)后消費者消費了才會繼續(xù)下去,反之亦然,消費者必須等待生產(chǎn)者產(chǎn)出。SynchronousQueue只有這一種模式,而LinkedTransferQueue是可選的,SynchronousQueue不存儲元素,像接力棒一樣,沒有交接就一直等。所以其特定是沒有容量,不能peek查看,如果沒有消費者不能插入,不能遍歷,該隊列表現(xiàn)的就像一個空的集合。同樣的,該隊列不接受空元素。默認情況下,線程的等待喚醒是非公平的,可以設(shè)置成公平模式,保證線程是先入先出(先到先得)。通常兩種模式的性能差不多,非公平模式可以維持更多的線程,公平模式則支持更高的吞吐量。

2.SynchronousQueue

2.1 實現(xiàn)原理

該類的實現(xiàn)是基于dual stack和dual queue算法,dual queue在LinkedTransferQueue中介紹過。queue和stack都包含數(shù)據(jù)節(jié)點和請求節(jié)點,其特點就是任何操作都能明確當前隊列的所處模式(數(shù)據(jù)--沒有被消費者消費或請求--沒有生產(chǎn)者)。stack和queue都繼承自抽象類Transferer,其定義了唯一方法transfer用來put或者take,在dual數(shù)據(jù)結(jié)構(gòu)中,定義成一個方法原因在于put和take操作是對稱的。

SynchronousQueue的實現(xiàn)與原算法有些不同的地方:1、原算法使用bit-marked指針,這里使用mode bits,導(dǎo)致了一系列的改動。2、SynchronousQueue會阻塞線程等待到裝滿。3、通過超時和中斷,支持取消操作,包括清除所有取消節(jié)點/線程,避免垃圾存留或內(nèi)存損耗。

阻塞操作大多通過LockSupport類的park或unpark方法,除非是在多核CPU上該節(jié)點看起來是下一個首個填滿的結(jié)點,通過自旋一位。在非常忙碌的隊列中,自旋可以顯著提升吞吐量。cleaning操作在queue和stack中不同,queue中remove操作是O(1)時間,但是stack為O(n)時間。

2.2 數(shù)據(jù)結(jié)構(gòu)

Transferer就是上面所說的抽象類,里面只有一個方法。后面也有Stack和Queue的實現(xiàn)。

NCPUS:當前主機CPU核數(shù)

maxTimedSpins:限時等待阻塞前自旋的次數(shù),單核為0,多核32

maxUntimedSpins:不限時等待阻塞前自旋的次數(shù),maxTimedSpins * 16

spinForTimeoutThreshold:納秒數(shù),這個為了更快的自旋而不是使用park時間。初略估計足夠了,默認1000

transferer:具體使用的實現(xiàn)對象。

通過構(gòu)造函數(shù)可以看出,公平模式使用的是queue,非公平模式使用的是stack,默認非公平。

2.3 基本操作

該類的基本操作都是基于transferer實現(xiàn)的,所以這里就不進行介紹。

存入取出的不同之處只在于第一參數(shù)是否是null,不為null就是存入,為null就是取出。所以該隊列也不能存入null元素。其它的方法都是空。

2.4 TransferQueue

數(shù)據(jù)結(jié)構(gòu)和之前所講LinkedTransferQueue基本一致,方法也類似。主要看transfer(E,boolean,long)方法。基本的算法就是循環(huán)做兩件事情:1、如果隊列為空或者持有相同的模式的結(jié)點,嘗試添加隊列結(jié)點,等待fulfilled或cancelled,并返回匹配項。2、如果隊列不為空,放入的和其模式相反,即可以匹配就通過CAS操作填充該節(jié)點的item字段并出隊列,返回匹配項。

代碼過長不給出,描述一下相關(guān)過長:

1、通過E來判斷當前調(diào)用是一個什么模式的結(jié)點。

2、死循環(huán)處理:

1.頭尾節(jié)點存在null,為初始化進行循環(huán)。

2.隊列為空或模式一致:

判斷t是否是當前的尾,不是意味丟失尾,重新循環(huán)

判斷當前尾的下一個是否為null,不為null就是尾結(jié)點滯后了,重新設(shè)置尾結(jié)點,重新循環(huán)

不等待就返回null

創(chuàng)建該節(jié)點

設(shè)置尾結(jié)點的下一個節(jié)點失敗,被搶先,重新循環(huán)

成功重置尾結(jié)點。

進行等待指定時間。

超時被取消,清除返回null。

丟失順序,重置頭

返回結(jié)果。

3.隊列不為空且模式不一致:

頭結(jié)點的下一個節(jié)點,如果為空或者頭尾結(jié)點被改變了,讀取不一致重新循環(huán)。

此刻沒有亂序,取出節(jié)點的item,進行CAS操作判斷是否被搶先了,被搶先了移除該節(jié)點,繼續(xù)循環(huán)嘗試。

成功了移除該節(jié)點,解除waiter的等待。

2.5 TransferStack

stack的結(jié)點數(shù)據(jù)結(jié)構(gòu),和queue的有些不同,就是多了一個match結(jié)點。node有四個方法:1、CAS設(shè)置next結(jié)點。2、CAS設(shè)置match結(jié)點,返回匹配結(jié)構(gòu)。3、取消當前結(jié)點。4、返回當前結(jié)點是否取消。

transfer方法的邏輯和queue的類似,stack的transfer循環(huán)需要做三件事情:1、如果棧為空或者模式相同,生成結(jié)點入棧等待匹配,返回結(jié)果或空如果超時。2、如果棧不為空且模式不同,匹配等待的結(jié)點,兩個都出棧,返回匹配值。由于其他線程可能執(zhí)行第3點,匹配或者斷開連接可能不是必須的。3、如果棧頂元素匹配成功,幫助其出棧匹配,然后繼續(xù)循環(huán)。

整個流程就是上面3點,其他的照著看代碼應(yīng)該較為簡單,和queue的思路差不多。由于第三點需要幫助其他線程出棧,這個過程可能被其它后到線程搶先,所以是非公平的。

3.使用例子

@Test

public void testSynchronous() {

SynchronousQueue queue = new SynchronousQueue<>();

System.out.println(queue.offer(1));// 立即返回,必須要有消費者

System.out.println(queue.poll());// 立即返回,必須要有生產(chǎn)者

long start = System.currentTimeMillis();

new Thread(new Runnable() {

@Override

public void run() {

try {

System.out.println(Thread.currentThread().getName()+"-" +

queue.take()+",耗時:"+(System.currentTimeMillis()-start)); // 沒有生產(chǎn)者一直阻塞

Thread.sleep(2000);

System.out.println(Thread.currentThread().getName()+"-" + queue.take());

Thread.sleep(1500);

System.out.println(Thread.currentThread().getName()+"-" + queue.poll(1, TimeUnit.SECONDS));

} catch (InterruptedException e1) {

e1.printStackTrace();

}

}

},"consumer").start();

new Thread(new Runnable() {

@Override

public void run() {

try {

Thread.sleep(1000);

System.out.println(Thread.currentThread().getName()+"-等待2被消耗:"+queue.offer(2));

System.out.println(Thread.currentThread().getName()+"-等待3被消耗:");

long one = System.currentTimeMillis();

queue.put(3);

System.out.println("3被消耗,耗時:" + (System.currentTimeMillis() - one));

System.out.println(Thread.currentThread().getName()+"-等待4被消耗:" +

queue.offer(4, 1, TimeUnit.SECONDS));

} catch (InterruptedException e) {

e.printStackTrace();

}

}

},"prodcuer").start();

try {

System.in.read();

} catch (IOException e) {

e.printStackTrace();

}

}

總結(jié)

以上是生活随笔為你收集整理的java 超时集合_Java之集合(二十三)SynchronousQueue的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。