日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java concurrent 框架,java.util.concurrent 包下的 Synchronizer 框架

發布時間:2025/4/5 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java concurrent 框架,java.util.concurrent 包下的 Synchronizer 框架 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

看完書?java concurrency in practice 當然是想找點啥好玩的東東玩玩。 當看到了Doug Lee 的論文 << The java.util.concurrent Synchronizer Framework >> 大呼來的太晚喔, 前段時間看那個ReentrantLock 的代碼真的是痛苦啊,不過現在也不晚不是。 ?呵呵, 上菜:這個框架的核心是一個AbstractQueuedSynchronizer 類 (下面簡稱AQS) ?它基本上的思路是:

采用Template Method Pattern. ?它實現了non-contended 的synchronization 算法;

繼承 它的Subclass ?一般不直接作為Synchronzier, 而是作為私有的實現 被用來delegate. ?比如 他舉了個例子:

class Mutex implements Lock, java.io.Serializable {

Java代碼

//?Our?internal?helper?class

private?static?class?Sync?extends?AbstractQueuedSynchronizer?{

.....

}

//?The?sync?object?does?all?the?hard?work.?We?just?forward?to?it.

private?final?Sync?sync?=?new?Sync();

public?void?lock()????????????????{?sync.acquire(1);?}

public?boolean?tryLock()??????????{?return?sync.tryAcquire(1);?}

public?void?unlock()??????????????{?sync.release(1);?}

public?Condition?newCondition()???{?return?sync.newCondition();?}

public?boolean?isLocked()?????????{?return?sync.isHeldExclusively();?}

public?boolean?hasQueuedThreads()?{?return?sync.hasQueuedThreads();?}

public?void?lockInterruptibly()?throws?InterruptedException?{

sync.acquireInterruptibly(1);

}

public?boolean?tryLock(long?timeout,?TimeUnit?unit)

throws?InterruptedException?{

return?sync.tryAcquireNanos(1,?unit.toNanos(timeout));

}

在這個 AQS 類中 提供了 兩大類方法

acquire ? 其中還包括 ?非阻塞的 tryAcquire; ?帶 time out 的;可以通過interruption 來cancellality 的。

release ? 相對簡單點, 因為在調用release 方法的時候基本上暗含了個意思當前的線程是獲得了鎖的。

在JUC 包中沒有為各種Synchronizer 類提供統一的API 比如 Lock.lock, Semaphore.acquire, CountDownLatch.await ?and ?FutureTask.get 都是映射到這個AQS 的 acquire 方法。

要實現一個Synchronizer 的基本思路非常直接的

acquire 操作

采用CAS 操作去更新同步狀態 ?如果不成功 ?{

enqueue 當前線程

block 當前線程

}

dequeue ?當前線程 ?if it was queued ? ? ?// 此時應該是當前線程被別的線程 release 來喚醒的。

release 操作 :

更新同步狀態, 此時可以直接 set 而不通過 CAS。

假如 等待隊列中還有線程 ?unblock ?one or more

要支持這些操作 需要 三類基本的組件:

1, ? Atomically managing synchronization state。 ? 這個通過

private ? volatile ?int ? state;

在處理acquire 的時候基本是通過用CAS 實現的 ?compareAndSetState 來

2, ? Block /Unblock 線程 。 這個是通過 JUC 里面的一個封裝類 LockSupport.park / unpark 來實現的。LockSupport 都deleget 到了 Unsafte 對應的方法

3, 維護 一個隊列來存放 等待線程。 這個是通過一個 CLH queue 的變種。 它是一種linked queue 的通過 head 和tail 。

三個里面最復雜的就是這個 CLH queue 的維護了, queue 中節點被定義為 ?:

static final class Node {

Java代碼

volatile?int?waitStatus;

volatile?Node?prev;

volatile?Node?next;

volatile?Thread?thread;

Node?nextWaiter;

final?boolean?isShared()?{

return?nextWaiter?==?SHARED;

}

final?Node?predecessor()?throws?NullPointerException?{

Node?p?=?prev;

if?(p?==?null)

throw?new?NullPointerException();

else

return?p;

}

Node()?{????//?Used?to?establish?initial?head?or?SHARED?marker

}

Node(Thread?thread,?Node?mode)?{?????//?Used?by?addWaiter

this.nextWaiter?=?mode;

this.thread?=?thread;

}

Node(Thread?thread,?int?waitStatus)?{?//?Used?by?Condition

this.waitStatus?=?waitStatus;

this.thread?=?thread;

}

AQS 中定義了

Java代碼

private?Node?addWaiter(Node?mode)?{

Node?node?=?new?Node(Thread.currentThread(),?mode);

//?Try?the?fast?path?of?enq;?backup?to?full?enq?on?failure

Node?pred?=?tail;

if?(pred?!=?null)?{

node.prev?=?pred;

if?(compareAndSetTail(pred,?node))?{

pred.next?=?node;

return?node;

}

}

enq(node);

return?node;

}

這個方法通常在 acquire 的時候如果 tryAcquire 失敗就會將insert 一個 Node 到 tail 之后, 并且 node.prev 指向先前的 tail。 當然這個是在?compareAndSetTail(pred, node) ?返回true 的時候比較簡單, 否則就進入 enq(node) 方法 差不多也是這個邏輯insert 一個 Node 到 tail 之后。

在 1.6.0_25 版本里面AQS 就沒有在 release 的時候 通過喚醒等待隊列里面head指向的線程, 然后然后該線程是在 方法里面繼續resume, 基本上會

if (p == head && tryAcquire(arg)) {

setHead(node);

p.next = null; // help GC

return interrupted;

}。

就是將head的下一個節點設為Head, 原來的head 呢 p.next = null, ?p 不再引用任何對象, gc 就會發生作用。 ?這個基本上會的例外是啥呢, 新線程acquire 調用 tryAcquire 搶占成功 。 這個線程就只有繼續park 了。

基本上來講 有了 這個 AQS ?, 我們要想實現一個 Synchronizer 就比較簡單了, ?最簡單的情況下 ?treAcquire , tryRelease 實現下。AQS 里面提供的需要繼承的方法不是采用abstract 的方式 而是用拋出 notImplementedException的 方式, 如果像那些tryAcquireShared 之類的 在 exclusive 模式下根本就不會被調用到, ?我們也就根本就不必override。 ?比如 先前我們所的那個 Mutex 我們只需要定義它的內部類 Sync

Java代碼

//?Our?internal?helper?class

private?static?class?Sync?extends?AbstractQueuedSynchronizer?{

//?Report?whether?in?locked?state

protected?boolean?isHeldExclusively()?{

return?getState()?==?1;

}

//?Acquire?the?lock?if?state?is?zero

public?boolean?tryAcquire(int?acquires)?{

assert?acquires?==?1;?//?Otherwise?unused

if?(compareAndSetState(0,?1))?{

setExclusiveOwnerThread(Thread.currentThread());

return?true;

}

return?false;

}

//?Release?the?lock?by?setting?state?to?zero

protected?boolean?tryRelease(int?releases)?{

assert?releases?==?1;?//?Otherwise?unused

if?(getState()?==?0)?throw?new?IllegalMonitorStateException();

setExclusiveOwnerThread(null);

setState(0);

return?true;

}

//?Provide?a?Condition

Condition?newCondition()?{?return?new?ConditionObject();?}

}

這個框架是非常精巧的, ?還有很多地方比如那個 ConditionObject 里面涉及到另外一個單獨的condition queue。只有再慢慢的啃了。

另外貼一張本人手工畫的圖,非常潦草多包涵了。

總結

以上是生活随笔為你收集整理的java concurrent 框架,java.util.concurrent 包下的 Synchronizer 框架的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。