日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

从串行线程封闭到对象池、线程池

發布時間:2023/12/4 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 从串行线程封闭到对象池、线程池 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.


今天講一個牛逼而實用的概念,串行線程封閉。對象池是串行線程封閉的典型應用場景;線程池糅合了對象池技術,但核心實現不依賴于對象池,很容易產生誤會。


本文從串行線程封閉和對象池入手,最后通過源碼分析線程池的核心原理,厘清對象池與線程池之間的誤會。


線程封閉與串行線程封閉


線程封閉


線程封閉是一種常見的線程安全設計策略:僅在固定的一個線程內訪問對象,不對其他線程共享


使用線程封閉技術,對象O始終只對一個線程T1可見,“單線程”中自然不存在線程安全的問題。


ThreadLocal是常用的線程安全工具。線程封閉在Servlet及高層的web框架Spring等中應用不少。

https://monkeysayhi.github.io/2016/11/27/源碼%7CThreadLocal的實現原理/


串行線程封閉


線程封閉雖然好用,卻限制了對象的共享。串行線程封閉改進了這一點:對象O只能由單個線程T1擁有,但可以通過安全的發布對象O來轉移O的所有權;在轉移所有權后,也只有另一個線程T2能獲得這個O的所有權,并且發布O的T1不會再訪問O。


所謂“所有權”,指修改對象的權利。


相對于線程封閉,串行線程封閉使得任意時刻,最多僅有一個線程擁有對象的所有權。當然,這不是絕對的,只要線程T1事實不會再修改對象O,那么就相當于僅有T2擁有對象的所有權。串行線層封閉讓對象變得可以共享(雖然只能串行的擁有所有權),靈活性得到大大提高;相對的,要共享對象就涉及安全發布的問題,依靠BlockingQueue等同步工具很容易實現這一點。


對象池是串行線程封閉的經典應用場景,如數據庫連接池等。


對象池


對象池利用了串行封閉:將對象O“借給”一個請求線程T1,T1使用完再交還給對象池,并保證“未擅自發布該對象”且“以后不再使用”;對象池收回O后,等T2來借的時候再把它借給T2,完成對象所有權的傳遞。


猴子擼了一個簡化版的線程池,用戶只需要覆寫newObject()方法:


public abstract class AbstractObjectPool<T> {

? protected final int min;

? protected final int max;

? protected final List<T> usings = new LinkedList<>();

? protected final List<T> buffer = new LinkedList<>();

? private volatile boolean inited = false;

? public AbstractObjectPool(int min, int max) {

? ? this.min = min;

? ? this.max = max;

? ? if (this.min < 0 || this.min > this.max) {

? ? ? throw new IllegalArgumentException(String.format(

? ? ? ? ? "need 0 <= min <= max <= Integer.MAX_VALUE, given min: %s, max: %s", this.min, this.max));

? ? }

? }

? public void init() {

? ? for (int i = 0; i < min; i++) {

? ? ? buffer.add(newObject());

? ? }

? ? inited = true;

? }

? protected void checkInited() {

? ? if (!inited) {

? ? ? throw new IllegalStateException("not inited");

? ? }

? }

? abstract protected T newObject();

? public synchronized T getObject() {

? ? checkInited();

? ? if (usings.size() == max) {

? ? ? return null;

? ? }

? ? if (buffer.size() == 0) {

? ? ? T newObj = newObject();

? ? ? usings.add(newObj);

? ? ? return newObj;

? ? }

? ? T oldObj = buffer.remove(0);

? ? usings.add(oldObj);

? ? return oldObj;

? }

? public synchronized void freeObject(T obj) {

? ? checkInited();

? ? if (!usings.contains(obj)) {

? ? ? throw new IllegalArgumentException(String.format("obj not in using queue: %s", obj));

? ? }

? ? usings.remove(usings.indexOf(obj));

? ? buffer.add(obj);

? }

}


AbstractObjectPool具有以下特性

支持設置最小、最大容量

對象一旦申請就不再釋放,避免了GC


雖然很簡單,但大可以用于一些時間敏感、資源充裕的場景。如果時間進一步敏感,可將getObject()、freeObject()改寫為并發程度更高的版本,但記得保證安全發布安全回收;如果資源不那么充裕,可以適當增加對象回收策略。


可以看到,一個對象池的基本行為包括:

創建對象newObject()

借取對象getObject()

歸還對象freeObject()


典型的對象池有各種連接池、常量池等,應用非常多,模型也大同小異,不做解析。令人迷惑的是線程池,很容易讓人誤以為線程池的核心原理也是對象池,下面來追一遍源碼。


線程池


首先擺出結論:線程池糅合了對象池模型,但核心原理是生產者-消費者模型


繼承結構如下:


用戶可以將Runnable(或Callables)實例提交給線程池,線程池會異步執行該任務,返回響應的結果(完成/返回值)。


猴子最喜歡的是submit(Callable<T> task)方法。我們從該方法入手,逐步深入函數棧,探究線程池的實現原理。


submit()


submit()方法在ExecutorService接口中定義,AbstractExecutorService實現,ThreadPoolExecutor直接繼承。


public abstract class AbstractExecutorService implements ExecutorService {

...

? ? public <T> Future<T> submit(Callable<T> task) {

? ? ? ? if (task == null) throw new NullPointerException();

? ? ? ? RunnableFuture<T> ftask = newTaskFor(task);

? ? ? ? execute(ftask);

? ? ? ? return ftask;

? ? }

...



AbstractExecutorService#newTaskFor()創建一個RunnableFuture類型的FutureTask。


核心是execute()方法。


execute()


execute()方法在Executor接口中定義,ThreadPoolExecutor實現。


public class ThreadPoolExecutor extends AbstractExecutorService {

...

? ? public void execute(Runnable command) {

? ? ? ? if (command == null)

? ? ? ? ? ? throw new NullPointerException();

? ? ? ? int c = ctl.get();

? ? ? ? if (workerCountOf(c) < corePoolSize) {

? ? ? ? ? ? if (addWorker(command, true))

? ? ? ? ? ? ? ? return;

? ? ? ? ? ? c = ctl.get();

? ? ? ? }

? ? ? ? if (isRunning(c) && workQueue.offer(command)) {

? ? ? ? ? ? int recheck = ctl.get();

? ? ? ? ? ? if (! isRunning(recheck) && remove(command))

? ? ? ? ? ? ? ? reject(command);

? ? ? ? ? ? else if (workerCountOf(recheck) == 0)

? ? ? ? ? ? ? ? addWorker(null, false);

? ? ? ? }

? ? ? ? else if (!addWorker(command, false))

? ? ? ? ? ? reject(command);

? ? }

...

}


我們暫且忽略線程池的池化策略。關注一個最簡單的場景,看能不能先回答一個問題:線程池中的任務如何執行?


核心是addWorker()方法。以8行的參數為例,此時,線程池中的線程數未達到最小線程池大小corePoolSize,通常可以直接在9行返回。


addWorker()


簡化如下:


public class ThreadPoolExecutor extends AbstractExecutorService {

...

? ? private boolean addWorker(Runnable firstTask, boolean core) {

? ? ? ? boolean workerStarted = false;

? ? ? ? boolean workerAdded = false;

? ? ? ? Worker w = null;

? ? ? ? try {

? ? ? ? ? ? w = new Worker(firstTask);

? ? ? ? ? ? final Thread t = w.thread;

? ? ? ? ? ? if (t != null) {

? ? ? ? ? ? ? ? final ReentrantLock mainLock = this.mainLock;

? ? ? ? ? ? ? ? mainLock.lock();

? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? int rs = runStateOf(ctl.get());

? ? ? ? ? ? ? ? ? ? if (rs < SHUTDOWN) {

? ? ? ? ? ? ? ? ? ? ? ? workers.add(w);

? ? ? ? ? ? ? ? ? ? ? ? workerAdded = true;

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? } finally {

? ? ? ? ? ? ? ? ? ? mainLock.unlock();

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? if (workerAdded) {

? ? ? ? ? ? ? ? ? ? t.start();

? ? ? ? ? ? ? ? ? ? workerStarted = true;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? } finally {

? ? ? ? ? ? if (! workerStarted)

? ? ? ? ? ? ? ? addWorkerFailed(w);

? ? ? ? }

? ? ? ? return workerStarted;

? ? }

...

}


我去掉了很多用于管理線程池、維護線程安全的代碼。假設線程池未關閉,worker(即w,下同)添加成功,則必然能夠將worker添加至workers中。workers是一個HashSet:


private final HashSet<Worker> workers = new HashSet<Worker>();


哪里是對象池?


如果說與對象池有關,那么workers即相當于示例代碼中的using,應用了對象池模型;只不過這里的using是一直增長的,直到達到最大線程池大小maximumPoolSize。


但是很明顯,線程池并沒有將線程發布出去,workers也僅僅完成using“保存線程”的功能。那么,線程池中的任務如何執行呢?跟線程池有沒有關系?


哪里又不是?


注意9、17、24行:


9行將我們提交到線程池的firstTask封裝入一個worker。

17行將worker加入workers,維護起來

24行則啟動了worker中的線程t


核心在與這三行,但線程池并沒有直接在addWorker()中啟動任務firstTask,代之以啟動一個worker。最終任務必然被啟動,那么我們繼續看Worker如何啟動這個任務。


Worker


Worker實現了Runnable接口:


private final class Worker

? ? extends AbstractQueuedSynchronizer

? ? implements Runnable

{

...

? ? Worker(Runnable firstTask) {

? ? ? ? setState(-1); // inhibit interrupts until runWorker

? ? ? ? this.firstTask = firstTask;

? ? ? ? this.thread = getThreadFactory().newThread(this);

? ? }

? ? /** Delegates main run loop to outer runWorker? */

? ? public void run() {

? ? ? ? runWorker(this);

? ? }

...

}


為什么要將構造Worker時的參數命名為firstTask?因為當且僅當需要建立新的Worker以執行任務task時,才會調用構造函數。因此,任務task對于新Worker而言,是第一個任務firstTask。


Worker的實現非常簡單:將自己作為Runable實例,構造時在內部創建并持有一個線程thread。Thread和Runable的使用大家很熟悉了,核心是Worker的run方法,它直接調用了runWorker()方法。


runWorker()


敲黑板!!!


重頭戲來了。簡化如下:


public class ThreadPoolExecutor extends AbstractExecutorService {

...

? ? final void runWorker(Worker w) {

? ? ? ? Thread wt = Thread.currentThread();

? ? ? ? Runnable task = w.firstTask;

? ? ? ? w.firstTask = null;

? ? ? ? w.unlock(); // allow interrupts

? ? ? ? boolean completedAbruptly = true;

? ? ? ? try {

? ? ? ? ? ? while (task != null || (task = getTask()) != null) {

? ? ? ? ? ? ? ? w.lock();

? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? beforeExecute(wt, task);

? ? ? ? ? ? ? ? ? ? Throwable thrown = null;

? ? ? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? ? ? task.run();

? ? ? ? ? ? ? ? ? ? } catch (RuntimeException x) {

? ? ? ? ? ? ? ? ? ? ? ? thrown = x; throw x;

? ? ? ? ? ? ? ? ? ? } catch (Error x) {

? ? ? ? ? ? ? ? ? ? ? ? thrown = x; throw x;

? ? ? ? ? ? ? ? ? ? } catch (Throwable x) {

? ? ? ? ? ? ? ? ? ? ? ? thrown = x; throw new Error(x);

? ? ? ? ? ? ? ? ? ? } finally {

? ? ? ? ? ? ? ? ? ? ? ? afterExecute(task, thrown);

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? } finally {

? ? ? ? ? ? ? ? ? ? task = null;

? ? ? ? ? ? ? ? ? ? w.completedTasks++;

? ? ? ? ? ? ? ? ? ? w.unlock();

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? ? ? completedAbruptly = false;

? ? ? ? } finally {

? ? ? ? ? ? processWorkerExit(w, completedAbruptly);

? ? ? ? }

? ? }

...

}


我們在前面將要執行的任務賦值給firstTask,5-6行首先取出任務task,并將firstTask置為null。因為接下來要執行task,firstTask字段就沒有用了。


重點是10-31行的while循環。下面分情況討論。


case1:第一次進入循環,task不為null


case1對應前面作出的諸多假設。


第一次進入循環時,task==firstTask,不為null,使10行布爾短路直接進入循環;從而16行執行的是firstTask的run()方法;異常處理不表;最后,finally代碼塊中,task會被置為null,導致下一輪循環會進入case2。


case2:非第一次進入循環,task為null


case2是更普遍的情況,也就是線程池的核心


case1中,task被置為了null,使10行布爾表達式執行第二部分(task = getTask()) != null(getTask()稍后再講,它返回一個用戶已提交的任務)。假設task得到了一個已提交的任務,從而16行執行的是新獲得的任務task的run()方法。后同case1,最后task仍然會被置為null,以后循環都將進入case2。


GETTASK()


任務從哪來呢?簡化如下:


public class ThreadPoolExecutor extends AbstractExecutorService {

...

? ? private Runnable getTask() {

? ? ? ? boolean timedOut = false;

? ? ? ? for (;;) {

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? Runnable r = timed ?

? ? ? ? ? ? ? ? ? ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

? ? ? ? ? ? ? ? ? ? workQueue.take();

? ? ? ? ? ? ? ? if (r != null)

? ? ? ? ? ? ? ? ? ? return r;

? ? ? ? ? ? ? ? timedOut = true;

? ? ? ? ? ? } catch (InterruptedException retry) {

? ? ? ? ? ? ? ? timedOut = false;

? ? ? ? ? ? }

? ? ? ? }

? ? }

...

}


我們先看最簡單的,19-28行。


首先,workQueue是一個線程安全的BlockingQueue,大部分時候使用的實現類是LinkedBlockingQueue:


private final BlockingQueue<Runnable> workQueue;


假設timed為false,則調用阻塞的take()方法,返回的r一定不是null,從而12行退出,將任務交給了某個worker線程。


一個小細節有點意思:前面每個worker線程runWorker()方法時,在循環中加鎖粒度在worker級別,直接使用的lock同步;但因為每一個woker都會調用getTask(),考慮到性能因素,源碼中getTask()中使用樂觀的CAS+SPIN實現無鎖同步。


關于樂觀鎖和CAS,可以參考https://monkeysayhi.github.io/2017/10/22/源碼%7C并發一枝花之ConcurrentLinkedQueue【偽】/


workQueue中的元素從哪來呢?這就要回顧execute()方法了。


EXECUTE()


public class ThreadPoolExecutor extends AbstractExecutorService {

...

? ? public void execute(Runnable command) {

? ? ? ? if (command == null)

? ? ? ? ? ? throw new NullPointerException();

? ? ? ? int c = ctl.get();

? ? ? ? if (workerCountOf(c) < corePoolSize) {

? ? ? ? ? ? if (addWorker(command, true))

? ? ? ? ? ? ? ? return;

? ? ? ? ? ? c = ctl.get();

? ? ? ? }

? ? ? ? if (isRunning(c) && workQueue.offer(command)) {

? ? ? ? ? ? int recheck = ctl.get();

? ? ? ? ? ? if (! isRunning(recheck) && remove(command))

? ? ? ? ? ? ? ? reject(command);

? ? ? ? ? ? else if (workerCountOf(recheck) == 0)

? ? ? ? ? ? ? ? addWorker(null, false);

? ? ? ? }

? ? ? ? else if (!addWorker(command, false))

? ? ? ? ? ? reject(command);

? ? }

...

}


前面以8行的參數為例,此時,線程池中的線程數未達到最小線程池大小corePoolSize,通常可以直接在9行返回。進入8行的條件是“當前worker數小于最小線程池大小corePoolSize”。


如果不滿足,會繼續執行到12行。isRunning(c)判斷線程池是否未關閉,我們關注未關閉的情況;則會繼續執行布爾表達式的第二部分workQueue.offer(command),嘗試將任務command放入隊列workQueue。


workQueue.offer()的行為取決于線程池持有的BlockingQueue實例。


Executors.newFixedThreadPool()、Executors.newSingleThreadExecutor()創建的線程池使用LinkedBlockingQueue,而Executors.newCachedThreadPool()創建的線程池則使用SynchronousQueue。


以LinkedBlockingQueue為例,創建時不配置容量,即創建為無界隊列,則LinkedBlockingQueue#offer()永遠返回true,從而進入12-18行。


更細節的內容不必關心了,當workQueue.offer()返回true時,已經將任務command放入了隊列workQueue。當未來的某個時刻,某worker執行完某一個任務之后,會從workQueue中再取出一個任務繼續執行,直到線程池關閉,直到海枯石爛。


CachedThreadPool是一種無界線程池,使用SynchronousQueue能進一步提升性能,簡化代碼結構。留給讀者分析。


CASE2小結


可以看到,實際上,線程池的核心原理與對象池模型無關,而是生產者-消費者模型


  • 生產者(調用submit()或execute()方法)將任務task放入隊列

  • 消費者(worker線程)循環從隊列中取出任務處理任務(執行task.run())


鉤子方法


回到runWorker()方法,在執行任務的過程中,線程池保留了一些鉤子方法,如beforeExecute()、afterExecute()。用戶可以在實現自己的線程池時,可以通過覆寫鉤子方法為線程池添加功能。


但猴子不認為鉤子方法是一種好的設計。因為鉤子方法大多依賴于源碼實現,那么除非了解源碼或API聲明絕對的嚴謹正確,否則很難正確使用鉤子方法。等發生錯誤時再去了解實現,可能就太晚了。說到底,還是不要使用類似extends這種表達“擴展”語義的語法來實現繼承,詳見Java中如何恰當的表達“繼承”與“擴展”的語義?。


當然,鉤子方法也是極其方便的。權衡看待。


總結


相對于線程封閉,串行線程封閉離用戶的距離更近一些,簡單靈活,實用性強,很容易掌握。而線程封閉更多淪為單純的設計策略,單純使用線程封閉的場景不多。


線程池與串行線程封閉、對象池的關系不大,但經常被混為一談;沒看過源碼的很難想到其實現方案,面試時也能立分高下。


線程池的實現很有意思。在追源碼之前,猴子一直以為線程池就是把線程存起來,用的時候取出來執行任務;看了源碼才知道實現如此之妙,簡潔優雅效率高。


源碼才是最好的老師。


來源:ImportNew



總結

以上是生活随笔為你收集整理的从串行线程封闭到对象池、线程池的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。