日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

聊聊高并发(二十五)解析java.util.concurrent各个组件(七) 理解Semaphore

發布時間:2024/1/17 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 聊聊高并发(二十五)解析java.util.concurrent各个组件(七) 理解Semaphore 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

前幾篇分析了一下AQS的原理和實現,這篇拿Semaphore信號量做例子看看AQS實際是如何使用的。

?

Semaphore表示了一種可以同時有多個線程進入臨界區的同步器,它維護了一個狀態表示可用的票據,只有拿到了票據的線程盡可以進入臨界區,否則就等待,直到獲得釋放出的票據。Semaphore常用在資源池中來管理資源。當狀態只有1個0兩個值時,它退化成了一個互斥的同步器,類似鎖。

?

下面來看看Semaphore的代碼。

它維護了一個內部類Sync來繼承AQS,定制tryXXX方法來使用AQS。我們之前提到過AQS支持獨占和共享兩種模式,Semaphore明顯就是共享模式,它支持多個線程可以同時進入臨界區。所以Sync擴展了Shared相關的方法。

可以看到Sync的主要操作都是對狀態的無鎖修改,它不需要處理AQS隊列相關的操作。在聊聊高并發(二十四)解析java.util.concurrent各個組件(六) 深入理解AQS(四)?我們說了AQS提供了tryXXX接口給子類擴展,相當于給子類一個機會,可以自己處理狀態,決定是否入同步隊列。

1. nonfailTryAcquireShared()非公平的tryAcquire,它立刻修改了票據狀態,而不需要管是否有先來的線程正在等待,而一旦有可用的票據,就直接獲得了鎖,不需要進入AQS的隊列等待同步。

2. tryReleaseShared()方法負責釋放共享狀態的資源,它只修改了票據狀態,由AQS的releaseShared()方法來負責喚醒在AQS隊列等待的線程

3. reducePermits()和drainPermits()方法都是直接修改了狀態,從而限制可用的資源

?

?
  • abstract static class Sync extends AbstractQueuedSynchronizer {

  • private static final long serialVersionUID = 1192457210091910933L;

  • ?
  • Sync(int permits) {

  • setState(permits);

  • }

  • ?
  • final int getPermits() {

  • return getState();

  • }

  • ?
  • final int nonfairTryAcquireShared(int acquires) {

  • for (;;) {

  • int available = getState();

  • int remaining = available - acquires;

  • if (remaining < 0 ||

  • compareAndSetState(available, remaining))

  • return remaining;

  • }

  • }

  • ?
  • protected final boolean tryReleaseShared(int releases) {

  • for (;;) {

  • int current = getState();

  • int next = current + releases;

  • if (next < current) // overflow

  • throw new Error("Maximum permit count exceeded");

  • if (compareAndSetState(current, next))

  • return true;

  • }

  • }

  • ?
  • final void reducePermits(int reductions) {

  • for (;;) {

  • int current = getState();

  • int next = current - reductions;

  • if (next > current) // underflow

  • throw new Error("Permit count underflow");

  • if (compareAndSetState(current, next))

  • return;

  • }

  • }

  • ?
  • final int drainPermits() {

  • for (;;) {

  • int current = getState();

  • if (current == 0 || compareAndSetState(current, 0))

  • return current;

  • }

  • }

  • }

  • ?

    Sync也是一個抽象類,具體的實現是NonfailSync和FairSync,代表了非公平實現和公平實現。在上一篇已經提到,所謂的非公平只是說在獲取資源時開了一個口子,可以讓后來的線程不需要管在AQS隊列中的先來的線程來獲取資源,而一旦獲取失敗,就得進入AQS隊列等待,而AQS隊列是先來先服務的FIFO隊列。

    可以看到,NonfailSync和FairSync只是在tryAcquireShared方法的實現上不同,其他都是一樣的。

    ?

    ?
  • /**

  • * NonFair version

  • */

  • static final class NonfairSync extends Sync {

  • private static final long serialVersionUID = -2694183684443567898L;

  • ?
  • NonfairSync(int permits) {

  • super(permits);

  • }

  • ?
  • protected int tryAcquireShared(int acquires) {

  • return nonfairTryAcquireShared(acquires);

  • }

  • }

  • ?
  • /**

  • * Fair version

  • */

  • static final class FairSync extends Sync {

  • private static final long serialVersionUID = 2014338818796000944L;

  • ?
  • FairSync(int permits) {

  • super(permits);

  • }

  • ?
  • protected int tryAcquireShared(int acquires) {

  • for (;;) {

  • if (hasQueuedPredecessors())

  • return -1;

  • int available = getState();

  • int remaining = available - acquires;

  • if (remaining < 0 ||

  • compareAndSetState(available, remaining))

  • return remaining;

  • }

  • }

  • }


  • 再來看看Semaphore自己提供的方法,

    1.支持可中斷和不可中斷的獲取/釋放

    2.支持限時獲取

    3.支持tryXX獲取/釋放

    4. 支持同時獲取/釋放多個資源

    ?

    可以看到Semaphore的實現都是基于AQS的方法來作的,單個資源的獲取/釋放操作都是請求1個資源,所以參數傳遞的是1,多個資源獲取傳遞了一個int個數。

    ?

    ?
  • public void acquire() throws InterruptedException {

  • sync.acquireSharedInterruptibly(1);

  • }

  • ?
  • public void acquireUninterruptibly() {

  • ??????? sync.acquireShared(1);

  • ??? }

  • ?
  • public boolean tryAcquire() {

  • ??????? return sync.nonfairTryAcquireShared(1) >= 0;

  • ??? }

  • ?
  • public boolean tryAcquire(long timeout, TimeUnit unit)

  • ??????? throws InterruptedException {

  • ??????? return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));

  • ??? }

  • ?
  • public void release() {

  • ??????? sync.releaseShared(1);

  • ??? }

  • ?
  • public void acquire(int permits) throws InterruptedException {

  • ??????? if (permits < 0) throw new IllegalArgumentException();

  • ??????? sync.acquireSharedInterruptibly(permits);

  • ??? }

  • ?
  • public void acquireUninterruptibly(int permits) {

  • ??????? if (permits < 0) throw new IllegalArgumentException();

  • ??????? sync.acquireShared(permits);

  • ??? }

  • ?
  • public boolean tryAcquire(int permits) {

  • ??????? if (permits < 0) throw new IllegalArgumentException();

  • ??????? return sync.nonfairTryAcquireShared(permits) >= 0;

  • ??? }

  • ?
  • public boolean tryAcquire(int permits, long timeout, TimeUnit unit)

  • ??????? throws InterruptedException {

  • ??????? if (permits < 0) throw new IllegalArgumentException();

  • ??????? return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));

  • ??? }

  • ?
  • public void release(int permits) {

  • ??????? if (permits < 0) throw new IllegalArgumentException();

  • ??????? sync.releaseShared(permits);

  • ??? }

  • ?

  • 下面用一個實例來測試一下Semaphore的功能。

    1. 創建一個有兩個票據的Semaphore

    2. 創建20個線程來競爭執行race()方法

    3. 在race()方法里先打印一句等待獲取資源的話,再獲取資源,獲得資源后打印一句話,最后釋放資源,釋放資源前打印一句話

    ?

    ?
  • package com.lock.test;

  • ?
  • import java.util.concurrent.Semaphore;

  • ?
  • public class SemaphoreUsecase {

  • private Semaphore semaphore = new Semaphore(2);

  • ?
  • public void race(){

  • System.out.println("Thread " + Thread.currentThread().getName() + " is waiting the resource");

  • semaphore.acquireUninterruptibly();

  • try{

  • System.out.println("Thread " + Thread.currentThread().getName() + " got the resource");

  • try {

  • Thread.sleep(3000);

  • } catch (InterruptedException e) {

  • e.printStackTrace();

  • }

  • }finally{

  • System.out.println("Thread " + Thread.currentThread().getName() + " is releasing the resource");

  • semaphore.release();

  • }

  • }

  • ?
  • public static void main(String[] args){

  • final SemaphoreUsecase usecase = new SemaphoreUsecase();

  • ?
  • for(int i = 0; i < 10; i++){

  • Thread t = new Thread(new Runnable(){

  • ?
  • @Override

  • public void run() {

  • usecase.race();

  • }

  • ?
  • }, String.valueOf(i));

  • t.start();

  • }

  • }

  • }


  • 測試結果:

    可以看到先來的兩個線程先獲得了資源,后來的線程都在等待,當有線程釋放資源之后,等待的線程才會去獲得資源,直到都獲得/釋放資源

    ?

    ?
  • Thread 0 is waiting the resource

  • Thread 0 got the resource

  • Thread 2 is waiting the resource

  • Thread 2 got the resource

  • Thread 1 is waiting the resource

  • Thread 4 is waiting the resource

  • Thread 3 is waiting the resource

  • Thread 5 is waiting the resource

  • Thread 6 is waiting the resource

  • Thread 7 is waiting the resource

  • Thread 8 is waiting the resource

  • Thread 9 is waiting the resource

  • Thread 2 is releasing the resource

  • Thread 0 is releasing the resource

  • Thread 1 got the resource

  • Thread 4 got the resource

  • Thread 1 is releasing the resource

  • Thread 4 is releasing the resource

  • Thread 3 got the resource

  • Thread 5 got the resource

  • Thread 3 is releasing the resource

  • Thread 5 is releasing the resource

  • Thread 6 got the resource

  • Thread 7 got the resource

  • Thread 7 is releasing the resource

  • Thread 6 is releasing the resource

  • Thread 8 got the resource

  • Thread 9 got the resource

  • Thread 8 is releasing the resource

  • Thread 9 is releasing the resource

  • ?

    總結

    以上是生活随笔為你收集整理的聊聊高并发(二十五)解析java.util.concurrent各个组件(七) 理解Semaphore的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。