日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Jdk1.6 JUC源码解析(13)-LinkedBlockingQueue

發布時間:2025/5/22 编程问答 59 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Jdk1.6 JUC源码解析(13)-LinkedBlockingQueue 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
功能簡介:
  • LinkedBlockingQueue是一種基于單向鏈表實現的有界的(可選的,不指定默認int最大值)阻塞隊列。隊列中的元素遵循先入先出 (FIFO)的規則。新元素插入到隊列的尾部,從隊列頭部取出元素。(在并發程序中,基于鏈表實現的隊列和基于數組實現的隊列相比,往往具有更高的吞吐 量,但性能稍差一些)
源碼分析:
  • 首先看下LinkedBlockingQueue內部的數據結構:
public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {private static final long serialVersionUID = -6903933977591709194L;/*** Linked list node class*/static class Node<E> {/** The item, volatile to ensure barrier separating write and read */volatile E item;Node<E> next;Node(E x) { item = x; }}/** The capacity bound, or Integer.MAX_VALUE if none */private final int capacity;/** 這里的count為原子量,避免了一些使用count的地方需要加兩把鎖。 */private final AtomicInteger count = new AtomicInteger(0);/** Head of linked list */private transient Node<E> head;/** Tail of linked list */private transient Node<E> last;/** Lock held by take, poll, etc */private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes */private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts */private final Condition notFull = putLock.newCondition();/*** Creates a <tt>LinkedBlockingQueue</tt> with a capacity of* {@link Integer#MAX_VALUE}.*/public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}/*** Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.** @param capacity the capacity of this queue* @throws IllegalArgumentException if <tt>capacity</tt> is not greater* than zero*/public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}public LinkedBlockingQueue(Collection<? extends E> c) {this(Integer.MAX_VALUE);for (E e : c)add(e);}

  首先可見,內部為單向鏈表;其次,內部為兩把鎖:存鎖和取鎖,并分別關聯一個條件(是一種雙鎖隊列)。

  • 還是從put和take入手,先看下put方法:
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();// Note: convention in all put/take/etc is to preset// local var holding count negative to indicate failure unless set.int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {/** Note that count is used in wait guard even though it is* not protected by lock. This works because count can* only decrease at this point (all other puts are shut* out by lock), and we (or some other waiting put) are* signalled if it ever changes from* capacity. Similarly for all other uses of count in* other wait guards.*/try {while (count.get() == capacity)notFull.await();} catch (InterruptedException ie) {notFull.signal(); // propagate to a non-interrupted threadthrow ie;}insert(e);c = count.getAndIncrement();if (c + 1 < capacity)/* * 注意這里的處理:和單鎖隊列不同,count為原子量,不需要鎖保護。* put過程中可能有其他線程執行多次get,所以這里需要判斷一下當前* 如果還有剩余容量,那么繼續喚醒notFull條件上等待的線程。*/notFull.signal(); } finally {putLock.unlock();}if (c == 0) //如果count又0變為1,說明在隊列是空的情況下插入了1個元素,喚醒notNull條件上等待的線程。 signalNotEmpty();}/*** Creates a node and links it at end of queue.* @param x the item*/private void insert(E x) {last = last.next = new Node<E>(x);}/*** Signals a waiting take. Called only from put/offer (which do not* otherwise ordinarily lock takeLock.)*/private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();}}

? ? ? ?代碼很容易看懂,再看下take方法實現:

public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {try {while (count.get() == 0)notEmpty.await();} catch (InterruptedException ie) {notEmpty.signal(); // propagate to a non-interrupted threadthrow ie;}x = extract();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}/*** Removes a node from head of queue,* @return the node*/private E extract() {Node<E> first = head.next;head = first;E x = first.item;first.item = null;return x;}/*** Signals a waiting put. Called only from take/poll.*/private void signalNotFull() {final ReentrantLock putLock = this.putLock;putLock.lock();try {notFull.signal();} finally {putLock.unlock();}}

?? ? ? 和put對等的邏輯,也很容易看懂。

  • 上面看到,主要方法里并沒有同時用兩把鎖,但有些方法里會同時使用兩把鎖,比如remove方法等:
public boolean remove(Object o) {if (o == null) return false;boolean removed = false;fullyLock();try {Node<E> trail = head;Node<E> p = head.next;while (p != null) {if (o.equals(p.item)) {removed = true;break;}trail = p;p = p.next;}if (removed) {p.item = null;trail.next = p.next;if (last == p)last = trail;if (count.getAndDecrement() == capacity)notFull.signalAll();}} finally {fullyUnlock();}return removed;}/*** Lock to prevent both puts and takes.*/private void fullyLock() {putLock.lock();takeLock.lock();}/*** Unlock to allow both puts and takes.*/private void fullyUnlock() {takeLock.unlock();putLock.unlock();}

總結

以上是生活随笔為你收集整理的Jdk1.6 JUC源码解析(13)-LinkedBlockingQueue的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。