JUC队列-ConcurrentLinkedQueue(四)
ConcurrentLinkedQueue介紹
ConcurrentLinkedQueue是線程安全的隊(duì)列,它適用于“高并發(fā)”的場景。
它是一個基于鏈接節(jié)點(diǎn)的無界線程安全隊(duì)列,按照 FIFO(先進(jìn)先出)原則對元素進(jìn)行排序。隊(duì)列元素中不可以放置null元素(內(nèi)部實(shí)現(xiàn)的特殊節(jié)點(diǎn)除外)。
ConcurrentLinkedQueue的UML圖:
說明:
LinkedBlockingQueue在實(shí)現(xiàn)“多線程對競爭資源的互斥訪問”時,對于“插入”和“取出(刪除)”操作分別使用了不同的鎖。對于插入操作,通過“插入鎖putLock”進(jìn)行同步;對于取出操作,通過“取出鎖takeLock”進(jìn)行同步。
此外,插入鎖putLock和“非滿條件notFull”相關(guān)聯(lián),取出鎖takeLock和“非空條件notEmpty”相關(guān)聯(lián)。通過notFull和notEmpty更細(xì)膩的控制鎖。
ConcurrentLinkedQueue的源碼分析
構(gòu)造方法
public ConcurrentLinkedQueue() {head = tail = new Node<E>(null); }head和tail的定義如下:
private transient volatile Node<E> head; private transient volatile Node<E> tail;所以他們都是線程可見的。
Node的聲明如下:
private static class Node<E> {volatile E item;volatile Node<E> next;Node(E item) {UNSAFE.putObject(this, itemOffset, item);}boolean casItem(E cmp, E val) {return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);}void lazySetNext(Node<E> val) {UNSAFE.putOrderedObject(this, nextOffset, val);}boolean casNext(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long itemOffset;private static final long nextOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class k = Node.class;itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));} catch (Exception e) {throw new Error(e);}} }Node是個單向鏈表節(jié)點(diǎn),next用于指向下一個Node,item用于存儲數(shù)據(jù)。它們都是可見的,所以Node中操作節(jié)點(diǎn)數(shù)據(jù)的API,都是可以通過Unsafe機(jī)制的CAS函數(shù)實(shí)現(xiàn)的;例如casNext()是通過CAS函數(shù)“比較并設(shè)置節(jié)點(diǎn)的下一個節(jié)點(diǎn)”
添加元素
public boolean add(E e) {return offer(e); }offer()方法
public boolean offer(E e) {// 檢查e是不是null,是的話拋出NullPointerException異常。checkNotNull(e);// 創(chuàng)建新的節(jié)點(diǎn)final Node<E> newNode = new Node<E>(e);// 將“新的節(jié)點(diǎn)”添加到鏈表的末尾。for (Node<E> t = tail, p = t;;) {Node<E> q = p.next;// 情況1:q為空if (q == null) {// CAS操作:如果“p的下一個節(jié)點(diǎn)為null”(即p為尾節(jié)點(diǎn)),則設(shè)置p的下一個節(jié)點(diǎn)為newNode。// 如果該CAS操作成功的話,則比較“p和t”(若p不等于t,則設(shè)置newNode為新的尾節(jié)點(diǎn)),然后返回true。// 如果該CAS操作失敗,這意味著“其它線程對尾節(jié)點(diǎn)進(jìn)行了修改”,則重新循環(huán)。if (p.casNext(null, newNode)) {if (p != t) // hop two nodes at a timecasTail(t, newNode); // Failure is OK.return true;}}// 情況2:p和q相等else if (p == q)p = (t != (t = tail)) ? t : head;// 情況3:其它elsep = (p != t && t != (t = tail)) ? t : q;} }取出元素
public E poll() {// 設(shè)置“標(biāo)記”restartFromHead:for (;;) {for (Node<E> h = head, p = h, q;;) {E item = p.item;// 情況1// 表頭的數(shù)據(jù)不為null,并且“設(shè)置表頭的數(shù)據(jù)為null”這個操作成功的話;// 則比較“p和h”(若p!=h,即表頭發(fā)生了變化,則更新表頭,即設(shè)置表頭為p),然后返回原表頭的item值。if (item != null && p.casItem(item, null)) {if (p != h) // hop two nodes at a timeupdateHead(h, ((q = p.next) != null) ? q : p);return item;}// 情況2// 表頭的下一個節(jié)點(diǎn)為null,即鏈表只有一個“內(nèi)容為null的表頭節(jié)點(diǎn)”。則更新表頭為p,并返回null。else if ((q = p.next) == null) {updateHead(h, p);return null;}// 情況3// 這可能到由于“情況4”的發(fā)生導(dǎo)致p=q,在該情況下跳轉(zhuǎn)到restartFromHead標(biāo)記重新操作。else if (p == q)continue restartFromHead;// 情況4// 設(shè)置p為qelsep = q;}}}總結(jié):ConcurrentLinkedQueue為什么支持高并發(fā),它與其他隊(duì)列有什么不同之處?
我們知道,常用的多線程同步機(jī)制有下面三種:
volatile 變量:輕量級多線程同步機(jī)制,不會引起上下文切換和線程調(diào)度。僅提供內(nèi)存可見性保證,不提供原子性。
CAS 原子指令:輕量級多線程同步機(jī)制,不會引起上下文切換和線程調(diào)度。它同時提供內(nèi)存可見性和原子化更新保證。
互斥鎖:重量級多線程同步機(jī)制,可能會引起上下文切換和線程調(diào)度,它同時提供內(nèi)存可見性和原子性。
ConcurrentLinkedQueue用了volatile和CAS原子指令操作隊(duì)列,比起其他隊(duì)列的互斥鎖,更加輕量,更加適合高并發(fā)。
總結(jié)
以上是生活随笔為你收集整理的JUC队列-ConcurrentLinkedQueue(四)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JUC队列-LinkedBlocking
- 下一篇: 容器源码分析之PriorityQueue