生活随笔
收集整理的這篇文章主要介紹了
服务器端利器--双缓冲队列
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
http://rrsongzi-gmail-com.iteye.com/blog/696627
傳統隊列是生產者線程和消費者線程從同一個隊列中存取數據,必然需要互斥訪 問,在互相同步等待中浪費了寶貴的時間,使隊列吞吐量受影響。雙緩沖隊使用兩個隊列,將讀寫分離,一個隊列專門用來讀,另一個專門用來寫,當讀隊列空或寫 隊列滿時將兩個隊列互換。這里為了保證隊列的讀寫順序,當讀隊列為空且寫隊列不為空時候才允許兩個隊列互換。
經過測試性能較JDK自帶的queue的確有不小提高。
測試是和JDK6中性能最高的阻塞Queue:java.util.concurrent.ArrayBlockingQueue做比較,這個隊列是環形隊列的實現方式,性能還算不錯,不過我們的目標是沒有最好,只有更好。
測試場景:
起若干個生產者線程,往Queue中放數據,起若干個消費者線程從queue中取數據,統計每個消費者線程取N個數據的平均時間。
數據如下:
場景1?
生產者線程數:1
消費者線程數:1
Queue容量:5w
取元素個數:1000w
JDK ArrayBlockingQueue用時平均為:? 5,302,938,177納秒
雙緩沖隊列用時平均為:????????????????????? 5,146,302,116納秒
相差大概160毫秒
場景2:?
生產者線程數:5
消費者線程數:4
Queue容量:5w
取元素個數:1000w
JDK ArrayBlockingQueue用時平均為:? 32,824,744,868納秒
雙緩沖隊列用時平均為:????????????????????? 20,508,495,221納秒
相差大概12.3秒
可見在生產者消費者都只有一個的時候存和取的同步沖突比較小,雙緩沖隊列
優勢不是很大,當存取線程比較多的時候優勢就很明顯了。
?
隊列主要方法如下:
Java代碼??
<span?style="font-size:?small;">? ? ? ? ? ?? public?class?CircularDoubleBufferedQueue<E>?extends?AbstractQueue<E>?? implements?BlockingQueue<E>,?java.io.Serializable?? {?? ????private?static?final?long?serialVersionUID?=?1L;?? ????private?Logger?logger?=???Logger.getLogger(CircularDoubleBufferedQueue.class.getName());?? ?? ?????? ????private?final?E[]?itemsA;?? ????private?final?E[]?itemsB;?? ?????? ????private?ReentrantLock?readLock,?writeLock;?? ????private?Condition?notEmpty;?? ????private?Condition?notFull;?? ????private?Condition?awake;?? ?????? ?????? ????private?E[]?writeArray,?readArray;?? ????private?volatile?int?writeCount,?readCount;?? ????private?int?writeArrayHP,?writeArrayTP,?readArrayHP,?readArrayTP;?? ?????? ?????? ????public?CircularDoubleBufferedQueue(int?capacity)?? ????{?? ????????if(capacity<=0)?? ????????{?? ????????????throw?new?IllegalArgumentException("Queue?initial?capacity?can't?less?than?0!");?? ????????}?? ?????????? ????????itemsA?=?(E[])new?Object[capacity];?? ????????itemsB?=?(E[])new?Object[capacity];?? ?? ????????readLock?=?new?ReentrantLock();?? ????????writeLock?=?new?ReentrantLock();?? ?????????? ????????notEmpty?=?readLock.newCondition();?? ????????notFull?=?writeLock.newCondition();?? ????????awake?=?writeLock.newCondition();?? ?????????? ????????readArray?=?itemsA;?? ????????writeArray?=?itemsB;?? ????}?? ?????? ????private?void?insert(E?e)?? ????{?? ????????writeArray[writeArrayTP]?=?e;?? ????????++writeArrayTP;?? ????????++writeCount;?? ????}?? ?????? ????private?E?extract()?? ????{?? ????????E?e?=?readArray[readArrayHP];?? ????????readArray[readArrayHP]?=?null;?? ????????++readArrayHP;?? ????????--readCount;?? ????????return?e;?? ????}?? ?? ?????? ????? ? ? ? ? ? ? ? ? ? ? ?? ????private?long?queueSwitch(long?timeout,?boolean?isInfinite)?throws?InterruptedException?? ????{?? ????????writeLock.lock();?? ????????try?? ????????{?? ????????????if?(writeCount?<=?0)?? ????????????{?? ????????????????logger.debug("Write?Count:"?+?writeCount?+?",?Write?Queue?is?empty,?do?not?switch!");?? ????????????????try?? ????????????????{?? ????????????????????logger.debug("Queue?is?empty,?need?wait....");?? ????????????????????if(isInfinite?&&?timeout<=0)?? ????????????????????{?? ????????????????????????awake.await();?? ????????????????????????return?-1;?? ????????????????????}?? ????????????????????else?? ????????????????????{?? ????????????????????????return?awake.awaitNanos(timeout);?? ????????????????????}?? ????????????????}?? ????????????????catch?(InterruptedException?ie)?? ????????????????{?? ????????????????????awake.signal();?? ????????????????????throw?ie;?? ????????????????}?? ????????????}?? ????????????else?? ????????????{?? ????????????????E[]?tmpArray?=?readArray;?? ????????????????readArray?=?writeArray;?? ????????????????writeArray?=?tmpArray;?? ?? ????????????????readCount?=?writeCount;?? ????????????????readArrayHP?=?0;?? ????????????????readArrayTP?=?writeArrayTP;?? ?? ????????????????writeCount?=?0;?? ????????????????writeArrayHP?=?readArrayHP;?? ????????????????writeArrayTP?=?0;?? ?????????????????? ????????????????notFull.signal();?? ????????????????logger.debug("Queue?switch?successfully!");?? ????????????????return?-1;?? ????????????}?? ????????}?? ????????finally?? ????????{?? ????????????writeLock.unlock();?? ????????}?? ????}?? ?? ????public?boolean?offer(E?e,?long?timeout,?TimeUnit?unit)?throws?InterruptedException?? ????{?? ????????if(e?==?null)?? ????????{?? ????????????throw?new?NullPointerException();?? ????????}?? ?????????? ????????long?nanoTime?=?unit.toNanos(timeout);?? ????????writeLock.lockInterruptibly();?? ????????try?? ????????{?? ????????????for?(;;)?? ????????????{?? ????????????????if(writeCount?<?writeArray.length)?? ????????????????{?? ????????????????????insert(e);?? ????????????????????if?(writeCount?==?1)?? ????????????????????{?? ????????????????????????awake.signal();?? ????????????????????}?? ????????????????????return?true;?? ????????????????}?? ?????????????????? ?????????????????? ????????????????if(nanoTime<=0)?? ????????????????{?? ????????????????????logger.debug("offer?wait?time?out!");?? ????????????????????return?false;?? ????????????????}?? ?????????????????? ????????????????try?? ????????????????{?? ????????????????????logger.debug("Queue?is?full,?need?wait....");?? ????????????????????nanoTime?=?notFull.awaitNanos(nanoTime);?? ????????????????}?? ????????????????catch(InterruptedException?ie)?? ????????????????{?? ????????????????????notFull.signal();?? ????????????????????throw?ie;?? ????????????????}?? ????????????}?? ????????}?? ????????finally?? ????????{?? ????????????writeLock.unlock();?? ????????}?? ????}?? ?? ????public?E?poll(long?timeout,?TimeUnit?unit)?throws?InterruptedException?? ????{?? ????????long?nanoTime?=?unit.toNanos(timeout);?? ????????readLock.lockInterruptibly();?? ?????????? ????????try?? ????????{?? ????????????for(;;)?? ????????????{?? ????????????????if(readCount>0)?? ????????????????{?? ????????????????????return?extract();?? ????????????????}?? ?????????????????? ????????????????if(nanoTime<=0)?? ????????????????{?? ????????????????????logger.debug("poll?time?out!");?? ????????????????????return?null;?? ????????????????}?? ????????????????nanoTime?=?queueSwitch(nanoTime,?false);?? ????????????}?? ????????}?? ????????finally?? ????????{?? ????????????readLock.unlock();?? ????????}?? ????}?? ?? }
實現多個讀線程,一個寫線程,并有一定的實時性和并發性
總結
以上是生活随笔為你收集整理的服务器端利器--双缓冲队列的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。