Semaphore 源码分析
需要提前了解的知識(shí)點(diǎn): AbstractQueuedSynchronizer 實(shí)現(xiàn)原理
類(lèi)介紹
Semaphore(信號(hào)量)是用來(lái)控制同時(shí)訪問(wèn)特定資源的線程數(shù)量,它通過(guò)協(xié)調(diào)各個(gè)線程,以保證合理的使用公共資源。比如控制用戶的訪問(wèn)量,同一時(shí)刻只允許1000個(gè)用戶同時(shí)使用系統(tǒng),如果超過(guò)1000個(gè)并發(fā),則需要等待。
使用場(chǎng)景
比如模擬一個(gè)停車(chē)場(chǎng)停車(chē)信號(hào),假設(shè)停車(chē)場(chǎng)只有兩個(gè)車(chē)位,一開(kāi)始兩個(gè)車(chē)位都是空的。這時(shí)如果同時(shí)來(lái)了兩輛車(chē),看門(mén)人允許它們進(jìn)入停車(chē)場(chǎng),然后放下車(chē)攔。以后來(lái)的車(chē)必須在入口等待,直到停車(chē)場(chǎng)中有車(chē)輛離開(kāi)。這時(shí),如果有一輛車(chē)離開(kāi)停車(chē)場(chǎng),看門(mén)人得知后,打開(kāi)車(chē)攔,放入一輛,如果又離開(kāi)一輛,則又可以放入一輛,如此往復(fù)。
public class SemaphoreDemo {private static Semaphore s = new Semaphore(2);public static void main(String[] args) {ExecutorService pool = Executors.newCachedThreadPool();pool.submit(new ParkTask("1"));pool.submit(new ParkTask("2"));pool.submit(new ParkTask("3"));pool.submit(new ParkTask("4"));pool.submit(new ParkTask("5"));pool.submit(new ParkTask("6"));pool.shutdown();}static class ParkTask implements Runnable {private String name;public ParkTask(String name) {this.name = name;}@Overridepublic void run() {try {s.acquire();System.out.println("Thread "+this.name+" start...");TimeUnit.SECONDS.sleep(new Random().nextInt(10));} catch (InterruptedException e) {e.printStackTrace();} finally {s.release();}}} }Semaphore 源碼分析
Semaphore 通過(guò)使用內(nèi)部類(lèi)Syn繼承AQS來(lái)實(shí)現(xiàn)。
支持公平鎖和非公平鎖。內(nèi)部使用的AQS的共享鎖。
具體實(shí)現(xiàn)可參考 AbstractQueuedSynchronizer 源碼分析
Semaphore 的結(jié)構(gòu)如下:
Semaphore構(gòu)造
public Semaphore(int permits) {sync = new NonfairSync(permits); }public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits); }構(gòu)造方法指定信號(hào)量的許可數(shù)量,默認(rèn)采用的是非公平鎖,也只可以指定為公平鎖。
permits賦值給AQS中的state變量。
acquire:可響應(yīng)中斷的獲得信號(hào)量
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1); }public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits); }獲得信號(hào)量方法,這兩個(gè)方法支持 Interrupt中斷機(jī)制,可使用acquire() 方法每次獲取一個(gè)信號(hào)量,也可以使用acquire(int permits) 方法獲取指定數(shù)量的信號(hào)量 。
acquire:不可響應(yīng)中斷的獲取信號(hào)量
public void acquireUninterruptibly() {sync.acquireShared(1); }public void acquireUninterruptibly(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.acquireShared(permits); }這兩個(gè)方法不響應(yīng)Interrupt中斷機(jī)制,其它功能同acquire方法機(jī)制。
tryAcquire 方法,嘗試獲得信號(hào)量
public boolean tryAcquire() {return sync.nonfairTryAcquireShared(1) >= 0; }public boolean tryAcquire(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }public boolean tryAcquire(int permits, long timeout, TimeUnit unit)throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); }嘗試獲得信號(hào)量有三個(gè)方法。
1. 嘗試獲取信號(hào)量,如果獲取成功則返回true,否則馬上返回false,不會(huì)阻塞當(dāng)前線程。
2. 嘗試獲取信號(hào)量,如果在指定的時(shí)間內(nèi)獲得信號(hào)量,則返回true,否則返回false
3. 嘗試獲取指定數(shù)量的信號(hào)量,如果在指定的時(shí)間內(nèi)獲得信號(hào)量,則返回true,否則返回false。
release 釋放信號(hào)量
public void release() {sync.releaseShared(1); }調(diào)用AQS中的releaseShared方法,使得state每次減一來(lái)控制信號(hào)量。
availablePermits方法,獲取當(dāng)前剩余的信號(hào)量數(shù)量
public int availablePermits() {return sync.getPermits(); }//=========Sync類(lèi)======== final int getPermits() {return getState();}該方法返回AQS中state變量的值,當(dāng)前剩余的信號(hào)量個(gè)數(shù)
drainPermits方法
public int drainPermits() {return sync.drainPermits(); }//=========Sync類(lèi)======== final int drainPermits() {for (;;) {int current = getState();if (current == 0 || compareAndSetState(current, 0))return current;} }獲取并返回立即可用的所有許可。Sync類(lèi)的drainPermits方法,獲取1個(gè)信號(hào)量后將可用的信號(hào)量個(gè)數(shù)置為0。例如總共有10個(gè)信號(hào)量,已經(jīng)使用了5個(gè),再調(diào)用drainPermits方法后,可以獲得一個(gè)信號(hào)量,剩余4個(gè)信號(hào)量就消失了,總共可用的信號(hào)量就變成6個(gè)了。
reducePermits 方法
protected void reducePermits(int reduction) {if (reduction < 0) throw new IllegalArgumentException();sync.reducePermits(reduction); }//=========Sync類(lèi)======== final void reducePermits(int reductions) {for (;;) {int current = getState();int next = current - reductions;if (next > current) // underflowthrow new Error("Permit count underflow");if (compareAndSetState(current, next))return;} }該方法是protected 方法,減少信號(hào)量個(gè)數(shù)
判斷AQS等待隊(duì)列中是否還有Node
public final boolean hasQueuedThreads() {return sync.hasQueuedThreads(); }//=========AbstractQueuedSynchronizer類(lèi)======== public final boolean hasQueuedThreads() {//頭結(jié)點(diǎn)不等于尾節(jié)點(diǎn)就說(shuō)明鏈表中還有元素return head != tail; }getQueuedThreads方法
protected Collection<Thread> getQueuedThreads() {return sync.getQueuedThreads(); }//=========AbstractQueuedSynchronizer類(lèi)======== public final Collection<Thread> getQueuedThreads() {ArrayList<Thread> list = new ArrayList<Thread>();for (Node p = tail; p != null; p = p.prev) {Thread t = p.thread;if (t != null)list.add(t);}return list; }該方法獲取AQS中等待隊(duì)列中所有未獲取信號(hào)量的線程相關(guān)的信息(等待獲取信號(hào)量的線程相關(guān)信息)。
本人簡(jiǎn)書(shū)blog地址:http://www.jianshu.com/u/1f0067e24ff8????
點(diǎn)擊這里快速進(jìn)入簡(jiǎn)書(shū)
總結(jié)
以上是生活随笔為你收集整理的Semaphore 源码分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: CountDownLatch 源码分析
- 下一篇: 深入理解 Synchronized