java 超时集合_Java之集合(二十三)SynchronousQueue
1.前言
本章介紹阻塞隊(duì)列SynchronousQueue。之前介紹過(guò)LinkedTransferQueue,特點(diǎn)提供了讓生產(chǎn)者知道消費(fèi)者消費(fèi)了其產(chǎn)出,沒(méi)消費(fèi)就等待的模式,本章介紹的這個(gè)類則必須是生產(chǎn)者生產(chǎn)后消費(fèi)者消費(fèi)了才會(huì)繼續(xù)下去,反之亦然,消費(fèi)者必須等待生產(chǎn)者產(chǎn)出。SynchronousQueue只有這一種模式,而LinkedTransferQueue是可選的,SynchronousQueue不存儲(chǔ)元素,像接力棒一樣,沒(méi)有交接就一直等。所以其特定是沒(méi)有容量,不能peek查看,如果沒(méi)有消費(fèi)者不能插入,不能遍歷,該隊(duì)列表現(xiàn)的就像一個(gè)空的集合。同樣的,該隊(duì)列不接受空元素。默認(rèn)情況下,線程的等待喚醒是非公平的,可以設(shè)置成公平模式,保證線程是先入先出(先到先得)。通常兩種模式的性能差不多,非公平模式可以維持更多的線程,公平模式則支持更高的吞吐量。
2.SynchronousQueue
2.1 實(shí)現(xiàn)原理
該類的實(shí)現(xiàn)是基于dual stack和dual queue算法,dual queue在LinkedTransferQueue中介紹過(guò)。queue和stack都包含數(shù)據(jù)節(jié)點(diǎn)和請(qǐng)求節(jié)點(diǎn),其特點(diǎn)就是任何操作都能明確當(dāng)前隊(duì)列的所處模式(數(shù)據(jù)--沒(méi)有被消費(fèi)者消費(fèi)或請(qǐng)求--沒(méi)有生產(chǎn)者)。stack和queue都繼承自抽象類Transferer,其定義了唯一方法transfer用來(lái)put或者take,在dual數(shù)據(jù)結(jié)構(gòu)中,定義成一個(gè)方法原因在于put和take操作是對(duì)稱的。
SynchronousQueue的實(shí)現(xiàn)與原算法有些不同的地方:1、原算法使用bit-marked指針,這里使用mode bits,導(dǎo)致了一系列的改動(dòng)。2、SynchronousQueue會(huì)阻塞線程等待到裝滿。3、通過(guò)超時(shí)和中斷,支持取消操作,包括清除所有取消節(jié)點(diǎn)/線程,避免垃圾存留或內(nèi)存損耗。
阻塞操作大多通過(guò)LockSupport類的park或unpark方法,除非是在多核CPU上該節(jié)點(diǎn)看起來(lái)是下一個(gè)首個(gè)填滿的結(jié)點(diǎn),通過(guò)自旋一位。在非常忙碌的隊(duì)列中,自旋可以顯著提升吞吐量。cleaning操作在queue和stack中不同,queue中remove操作是O(1)時(shí)間,但是stack為O(n)時(shí)間。
2.2 數(shù)據(jù)結(jié)構(gòu)
Transferer就是上面所說(shuō)的抽象類,里面只有一個(gè)方法。后面也有Stack和Queue的實(shí)現(xiàn)。
NCPUS:當(dāng)前主機(jī)CPU核數(shù)
maxTimedSpins:限時(shí)等待阻塞前自旋的次數(shù),單核為0,多核32
maxUntimedSpins:不限時(shí)等待阻塞前自旋的次數(shù),maxTimedSpins * 16
spinForTimeoutThreshold:納秒數(shù),這個(gè)為了更快的自旋而不是使用park時(shí)間。初略估計(jì)足夠了,默認(rèn)1000
transferer:具體使用的實(shí)現(xiàn)對(duì)象。
通過(guò)構(gòu)造函數(shù)可以看出,公平模式使用的是queue,非公平模式使用的是stack,默認(rèn)非公平。
2.3 基本操作
該類的基本操作都是基于transferer實(shí)現(xiàn)的,所以這里就不進(jìn)行介紹。
存入取出的不同之處只在于第一參數(shù)是否是null,不為null就是存入,為null就是取出。所以該隊(duì)列也不能存入null元素。其它的方法都是空。
2.4 TransferQueue
數(shù)據(jù)結(jié)構(gòu)和之前所講LinkedTransferQueue基本一致,方法也類似。主要看transfer(E,boolean,long)方法。基本的算法就是循環(huán)做兩件事情:1、如果隊(duì)列為空或者持有相同的模式的結(jié)點(diǎn),嘗試添加隊(duì)列結(jié)點(diǎn),等待fulfilled或cancelled,并返回匹配項(xiàng)。2、如果隊(duì)列不為空,放入的和其模式相反,即可以匹配就通過(guò)CAS操作填充該節(jié)點(diǎn)的item字段并出隊(duì)列,返回匹配項(xiàng)。
代碼過(guò)長(zhǎng)不給出,描述一下相關(guān)過(guò)長(zhǎng):
1、通過(guò)E來(lái)判斷當(dāng)前調(diào)用是一個(gè)什么模式的結(jié)點(diǎn)。
2、死循環(huán)處理:
1.頭尾節(jié)點(diǎn)存在null,為初始化進(jìn)行循環(huán)。
2.隊(duì)列為空或模式一致:
判斷t是否是當(dāng)前的尾,不是意味丟失尾,重新循環(huán)
判斷當(dāng)前尾的下一個(gè)是否為null,不為null就是尾結(jié)點(diǎn)滯后了,重新設(shè)置尾結(jié)點(diǎn),重新循環(huán)
不等待就返回null
創(chuàng)建該節(jié)點(diǎn)
設(shè)置尾結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)失敗,被搶先,重新循環(huán)
成功重置尾結(jié)點(diǎn)。
進(jìn)行等待指定時(shí)間。
超時(shí)被取消,清除返回null。
丟失順序,重置頭
返回結(jié)果。
3.隊(duì)列不為空且模式不一致:
頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),如果為空或者頭尾結(jié)點(diǎn)被改變了,讀取不一致重新循環(huán)。
此刻沒(méi)有亂序,取出節(jié)點(diǎn)的item,進(jìn)行CAS操作判斷是否被搶先了,被搶先了移除該節(jié)點(diǎn),繼續(xù)循環(huán)嘗試。
成功了移除該節(jié)點(diǎn),解除waiter的等待。
2.5 TransferStack
stack的結(jié)點(diǎn)數(shù)據(jù)結(jié)構(gòu),和queue的有些不同,就是多了一個(gè)match結(jié)點(diǎn)。node有四個(gè)方法:1、CAS設(shè)置next結(jié)點(diǎn)。2、CAS設(shè)置match結(jié)點(diǎn),返回匹配結(jié)構(gòu)。3、取消當(dāng)前結(jié)點(diǎn)。4、返回當(dāng)前結(jié)點(diǎn)是否取消。
transfer方法的邏輯和queue的類似,stack的transfer循環(huán)需要做三件事情:1、如果棧為空或者模式相同,生成結(jié)點(diǎn)入棧等待匹配,返回結(jié)果或空如果超時(shí)。2、如果棧不為空且模式不同,匹配等待的結(jié)點(diǎn),兩個(gè)都出棧,返回匹配值。由于其他線程可能執(zhí)行第3點(diǎn),匹配或者斷開(kāi)連接可能不是必須的。3、如果棧頂元素匹配成功,幫助其出棧匹配,然后繼續(xù)循環(huán)。
整個(gè)流程就是上面3點(diǎn),其他的照著看代碼應(yīng)該較為簡(jiǎn)單,和queue的思路差不多。由于第三點(diǎn)需要幫助其他線程出棧,這個(gè)過(guò)程可能被其它后到線程搶先,所以是非公平的。
3.使用例子
@Test
public void testSynchronous() {
SynchronousQueue queue = new SynchronousQueue<>();
System.out.println(queue.offer(1));// 立即返回,必須要有消費(fèi)者
System.out.println(queue.poll());// 立即返回,必須要有生產(chǎn)者
long start = System.currentTimeMillis();
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"-" +
queue.take()+",耗時(shí):"+(System.currentTimeMillis()-start)); // 沒(méi)有生產(chǎn)者一直阻塞
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName()+"-" + queue.take());
Thread.sleep(1500);
System.out.println(Thread.currentThread().getName()+"-" + queue.poll(1, TimeUnit.SECONDS));
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
},"consumer").start();
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName()+"-等待2被消耗:"+queue.offer(2));
System.out.println(Thread.currentThread().getName()+"-等待3被消耗:");
long one = System.currentTimeMillis();
queue.put(3);
System.out.println("3被消耗,耗時(shí):" + (System.currentTimeMillis() - one));
System.out.println(Thread.currentThread().getName()+"-等待4被消耗:" +
queue.offer(4, 1, TimeUnit.SECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"prodcuer").start();
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
總結(jié)
以上是生活随笔為你收集整理的java 超时集合_Java之集合(二十三)SynchronousQueue的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 论文英文参考文献[10]的时候后面多空格
- 下一篇: !!基础---c# 下载网页+图片