日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > java >内容正文

java

java 超时集合_Java之集合(二十三)SynchronousQueue

發(fā)布時(shí)間:2025/3/15 java 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java 超时集合_Java之集合(二十三)SynchronousQueue 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1.前言

本章介紹阻塞隊(duì)列SynchronousQueue。之前介紹過(guò)LinkedTransferQueue,特點(diǎn)提供了讓生產(chǎn)者知道消費(fèi)者消費(fèi)了其產(chǎn)出,沒(méi)消費(fèi)就等待的模式,本章介紹的這個(gè)類則必須是生產(chǎn)者生產(chǎn)后消費(fèi)者消費(fèi)了才會(huì)繼續(xù)下去,反之亦然,消費(fèi)者必須等待生產(chǎn)者產(chǎn)出。SynchronousQueue只有這一種模式,而LinkedTransferQueue是可選的,SynchronousQueue不存儲(chǔ)元素,像接力棒一樣,沒(méi)有交接就一直等。所以其特定是沒(méi)有容量,不能peek查看,如果沒(méi)有消費(fèi)者不能插入,不能遍歷,該隊(duì)列表現(xiàn)的就像一個(gè)空的集合。同樣的,該隊(duì)列不接受空元素。默認(rèn)情況下,線程的等待喚醒是非公平的,可以設(shè)置成公平模式,保證線程是先入先出(先到先得)。通常兩種模式的性能差不多,非公平模式可以維持更多的線程,公平模式則支持更高的吞吐量。

2.SynchronousQueue

2.1 實(shí)現(xiàn)原理

該類的實(shí)現(xiàn)是基于dual stack和dual queue算法,dual queue在LinkedTransferQueue中介紹過(guò)。queue和stack都包含數(shù)據(jù)節(jié)點(diǎn)和請(qǐng)求節(jié)點(diǎn),其特點(diǎn)就是任何操作都能明確當(dāng)前隊(duì)列的所處模式(數(shù)據(jù)--沒(méi)有被消費(fèi)者消費(fèi)或請(qǐng)求--沒(méi)有生產(chǎn)者)。stack和queue都繼承自抽象類Transferer,其定義了唯一方法transfer用來(lái)put或者take,在dual數(shù)據(jù)結(jié)構(gòu)中,定義成一個(gè)方法原因在于put和take操作是對(duì)稱的。

SynchronousQueue的實(shí)現(xiàn)與原算法有些不同的地方:1、原算法使用bit-marked指針,這里使用mode bits,導(dǎo)致了一系列的改動(dòng)。2、SynchronousQueue會(huì)阻塞線程等待到裝滿。3、通過(guò)超時(shí)和中斷,支持取消操作,包括清除所有取消節(jié)點(diǎn)/線程,避免垃圾存留或內(nèi)存損耗。

阻塞操作大多通過(guò)LockSupport類的park或unpark方法,除非是在多核CPU上該節(jié)點(diǎn)看起來(lái)是下一個(gè)首個(gè)填滿的結(jié)點(diǎn),通過(guò)自旋一位。在非常忙碌的隊(duì)列中,自旋可以顯著提升吞吐量。cleaning操作在queue和stack中不同,queue中remove操作是O(1)時(shí)間,但是stack為O(n)時(shí)間。

2.2 數(shù)據(jù)結(jié)構(gòu)

Transferer就是上面所說(shuō)的抽象類,里面只有一個(gè)方法。后面也有Stack和Queue的實(shí)現(xiàn)。

NCPUS:當(dāng)前主機(jī)CPU核數(shù)

maxTimedSpins:限時(shí)等待阻塞前自旋的次數(shù),單核為0,多核32

maxUntimedSpins:不限時(shí)等待阻塞前自旋的次數(shù),maxTimedSpins * 16

spinForTimeoutThreshold:納秒數(shù),這個(gè)為了更快的自旋而不是使用park時(shí)間。初略估計(jì)足夠了,默認(rèn)1000

transferer:具體使用的實(shí)現(xiàn)對(duì)象。

通過(guò)構(gòu)造函數(shù)可以看出,公平模式使用的是queue,非公平模式使用的是stack,默認(rèn)非公平。

2.3 基本操作

該類的基本操作都是基于transferer實(shí)現(xiàn)的,所以這里就不進(jìn)行介紹。

存入取出的不同之處只在于第一參數(shù)是否是null,不為null就是存入,為null就是取出。所以該隊(duì)列也不能存入null元素。其它的方法都是空。

2.4 TransferQueue

數(shù)據(jù)結(jié)構(gòu)和之前所講LinkedTransferQueue基本一致,方法也類似。主要看transfer(E,boolean,long)方法。基本的算法就是循環(huán)做兩件事情:1、如果隊(duì)列為空或者持有相同的模式的結(jié)點(diǎn),嘗試添加隊(duì)列結(jié)點(diǎn),等待fulfilled或cancelled,并返回匹配項(xiàng)。2、如果隊(duì)列不為空,放入的和其模式相反,即可以匹配就通過(guò)CAS操作填充該節(jié)點(diǎn)的item字段并出隊(duì)列,返回匹配項(xiàng)。

代碼過(guò)長(zhǎng)不給出,描述一下相關(guān)過(guò)長(zhǎng):

1、通過(guò)E來(lái)判斷當(dāng)前調(diào)用是一個(gè)什么模式的結(jié)點(diǎn)。

2、死循環(huán)處理:

1.頭尾節(jié)點(diǎn)存在null,為初始化進(jìn)行循環(huán)。

2.隊(duì)列為空或模式一致:

判斷t是否是當(dāng)前的尾,不是意味丟失尾,重新循環(huán)

判斷當(dāng)前尾的下一個(gè)是否為null,不為null就是尾結(jié)點(diǎn)滯后了,重新設(shè)置尾結(jié)點(diǎn),重新循環(huán)

不等待就返回null

創(chuàng)建該節(jié)點(diǎn)

設(shè)置尾結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)失敗,被搶先,重新循環(huán)

成功重置尾結(jié)點(diǎn)。

進(jìn)行等待指定時(shí)間。

超時(shí)被取消,清除返回null。

丟失順序,重置頭

返回結(jié)果。

3.隊(duì)列不為空且模式不一致:

頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),如果為空或者頭尾結(jié)點(diǎn)被改變了,讀取不一致重新循環(huán)。

此刻沒(méi)有亂序,取出節(jié)點(diǎn)的item,進(jìn)行CAS操作判斷是否被搶先了,被搶先了移除該節(jié)點(diǎn),繼續(xù)循環(huán)嘗試。

成功了移除該節(jié)點(diǎn),解除waiter的等待。

2.5 TransferStack

stack的結(jié)點(diǎn)數(shù)據(jù)結(jié)構(gòu),和queue的有些不同,就是多了一個(gè)match結(jié)點(diǎn)。node有四個(gè)方法:1、CAS設(shè)置next結(jié)點(diǎn)。2、CAS設(shè)置match結(jié)點(diǎn),返回匹配結(jié)構(gòu)。3、取消當(dāng)前結(jié)點(diǎn)。4、返回當(dāng)前結(jié)點(diǎn)是否取消。

transfer方法的邏輯和queue的類似,stack的transfer循環(huán)需要做三件事情:1、如果棧為空或者模式相同,生成結(jié)點(diǎn)入棧等待匹配,返回結(jié)果或空如果超時(shí)。2、如果棧不為空且模式不同,匹配等待的結(jié)點(diǎn),兩個(gè)都出棧,返回匹配值。由于其他線程可能執(zhí)行第3點(diǎn),匹配或者斷開(kāi)連接可能不是必須的。3、如果棧頂元素匹配成功,幫助其出棧匹配,然后繼續(xù)循環(huán)。

整個(gè)流程就是上面3點(diǎn),其他的照著看代碼應(yīng)該較為簡(jiǎn)單,和queue的思路差不多。由于第三點(diǎn)需要幫助其他線程出棧,這個(gè)過(guò)程可能被其它后到線程搶先,所以是非公平的。

3.使用例子

@Test

public void testSynchronous() {

SynchronousQueue queue = new SynchronousQueue<>();

System.out.println(queue.offer(1));// 立即返回,必須要有消費(fèi)者

System.out.println(queue.poll());// 立即返回,必須要有生產(chǎn)者

long start = System.currentTimeMillis();

new Thread(new Runnable() {

@Override

public void run() {

try {

System.out.println(Thread.currentThread().getName()+"-" +

queue.take()+",耗時(shí):"+(System.currentTimeMillis()-start)); // 沒(méi)有生產(chǎn)者一直阻塞

Thread.sleep(2000);

System.out.println(Thread.currentThread().getName()+"-" + queue.take());

Thread.sleep(1500);

System.out.println(Thread.currentThread().getName()+"-" + queue.poll(1, TimeUnit.SECONDS));

} catch (InterruptedException e1) {

e1.printStackTrace();

}

}

},"consumer").start();

new Thread(new Runnable() {

@Override

public void run() {

try {

Thread.sleep(1000);

System.out.println(Thread.currentThread().getName()+"-等待2被消耗:"+queue.offer(2));

System.out.println(Thread.currentThread().getName()+"-等待3被消耗:");

long one = System.currentTimeMillis();

queue.put(3);

System.out.println("3被消耗,耗時(shí):" + (System.currentTimeMillis() - one));

System.out.println(Thread.currentThread().getName()+"-等待4被消耗:" +

queue.offer(4, 1, TimeUnit.SECONDS));

} catch (InterruptedException e) {

e.printStackTrace();

}

}

},"prodcuer").start();

try {

System.in.read();

} catch (IOException e) {

e.printStackTrace();

}

}

總結(jié)

以上是生活随笔為你收集整理的java 超时集合_Java之集合(二十三)SynchronousQueue的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。