深入剖析线程池基本原理以及常见面试题详解
文章目錄
- 面試官:能給我講講線程池的實現原理?
- 線程池類繼承關系
- ThreadPoolExecutor
- 核心數據結構
- 面試官:給我講講線程池的有哪些參數?
- 面試官:如何優雅的關閉線程?
- 線程的生命周期
- 面試官:線程池哪五種狀態?
- 面試官:線程池哪4種拒絕策略?并分別說一下作用和實現原理?
- DiscardOldestPolicy
- AbortPolicy
- DiscardPolicy
- CallerRunsPolicy
- 面試官:線程池常用的阻塞隊列有哪些?能說下各自的區別?
- SynchronousQueue應用
- PriorityBlockedQueue應用
- DelayQueue應用
- 面試官:如何結合業務合理的配置線程池參數?CPU密集型和IO密集型如何配置?線程設置過多會造成什么影響?
- CPU 密集型任務
- IO密集型任務
- 面試官:給我講講什么是線程復用?
- 面試官:為什么《阿里巴巴開發手冊》不推薦使用Executor創建線程?
- ScheduledThreadPoolExecutor
- 延時執行
- 周期執行
- 面試題:你知道延遲執行、周期性執行任務實現原理?
- 面試題:為什么不使用Timer而使用ScheduledThreadPoolExecutor?
- CompletableFuture異步編程工具
- 基本使用
- 四種任務原型
- 面試題:你知道CompletableFuture內部原理?
- CompletableFuture的構造:ForkJoinPool
- 任務類型的適配
- 任務的鏈式執行過程分析
- 什么是 Java8 的 ForkJoinPool?
- 應用
- 核心數據結構
面試官:能給我講講線程池的實現原理?
聲:回答該問題需要了解線程池有哪些方法并講解每個方法的作用,以及各個類的繼承關系,線程池的運行原理,線程池的狀態轉換、生命周期,線程池的構造參數,線程池Runnable->Worker->Thread執行任務->線程復用機制等
線程池類繼承關系
ThreadPoolExecutor
核心數據結構
public class ThreadPoolExecutor extends AbstractExecutorService {//存儲線程池的狀態和線程數量private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 存放任務的阻塞隊列private final BlockingQueue<Runnable> workQueue;// 對線程池內部各種變量進行互斥訪問控制private final ReentrantLock mainLock = new ReentrantLock();// 線程集合private final HashSet<Worker> workers = new HashSet<Worker>();每一個線程是一個Worker對象,Worker是ThreadPoolExecutor內部類,核心數據結構如下:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {final Thread thread; // Worker封裝的線程Runnable firstTask; // Worker接收到的第1個任務volatile long completedTasks; // Worker執行完畢的任務個數 }由定義會發現,Worker繼承于AQS,也就是說Worker本身就是一把鎖。這把鎖有什么用處呢?用于線程池的關閉、線程執行任務的過程中。
面試官:給我講講線程池的有哪些參數?
ThreadPoolExecutor在其構造方法中提供了幾個核心配置參數,來配置不同策略的線程池。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)面試官:如何優雅的關閉線程?
線程池的關閉比線程的關閉更加復雜,因為線程池的關閉涉及到很多場景,如果有線程正在執行任務?如果任務隊列不為空?還有當前線程進來如何處理,因此,關閉過程不可能是瞬時的,而是需要一個平滑的過渡,這就涉及線程池的完整生命周期管理。
線程的生命周期
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));在JDK 7中,把線程數量(workerCount)和線程池狀態(runState)這兩個變量打包存儲在一個字
段里面,即ctl變量。如下圖所示,最高的3位存儲線程池狀態,其余29位存儲線程個數。而在JDK 6中,
這兩個變量是分開存儲的。
關于內部封裝的獲取生命周期狀態、獲取線程池線程數量的計算方法如以下代碼所示
面試官:ctl為什么這樣設計?這樣做的好處?
用一個變量去存儲兩個值,可避免在做相關決策時,出現不一致的情況,不必為了維護兩者的一致,而占用鎖資源。通過閱讀線程池源代碼也可以發現,經常出現要同時判斷線程池運行狀態和線程數量的情況。線程池也提供了若干方法去供用戶獲得線程池當前的運行狀態、線程個數。這里都使用的是位運算的方式,相比于基本運算,速度也會快很多。
線程狀態轉換過程:
狀態解釋:
切記:線程狀態-1、0、1、2、3轉化只能從小到大,而不能逆向轉換。
除 terminated()之外,線程池還提供了其他幾個鉤子方法,這些方法的實現都是空的。如果想實現
自己的線程池,可以重寫這幾個方法:
面試官:線程池哪五種狀態?
// runState is stored in the high-order bitsprivate static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;面試官:線程池哪4種拒絕策略?并分別說一下作用和實現原理?
接口類:
public interface RejectedExecutionHandler {void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }實現類:
DiscardOldestPolicy
public static class DiscardOldestPolicy implements RejectedExecutionHandler {public DiscardOldestPolicy() { }/*** 從任務隊列中調用poll()方法刪除最先入隊列的(最老的)任務* 拓展:隊列是先進先出,由此調用poll()方法是取出的是先入隊列的數據*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}}AbortPolicy
public static class AbortPolicy implements RejectedExecutionHandler {/*** Creates an {@code AbortPolicy}.*/public AbortPolicy() { }/*** Always throws RejectedExecutionException.* 丟棄準備添加的任務并拋出異常* @param r the runnable task requested to be executed* @param e the executor attempting to execute this task* @throws RejectedExecutionException always*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}}DiscardPolicy
public static class DiscardPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardPolicy}.*/public DiscardPolicy() { }/*** Does nothing, which has the effect of discarding task r.* 不做任何處理,丟棄準備添加的任務* @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}CallerRunsPolicy
public static class CallerRunsPolicy implements RejectedExecutionHandler {/*** Creates a {@code CallerRunsPolicy}.*/public CallerRunsPolicy() { }/*** Executes task r in the caller's thread, unless the executor* has been shut down, in which case the task is discarded.* 準備添加的任務,直接調用run()方法交給提交任務的線程執行* @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}}面試官:線程池常用的阻塞隊列有哪些?能說下各自的區別?
| ArrayBlockedQueue | 數組實現有界隊列,FIFO先入先出,支持公平鎖、非公平鎖 |
| LinkedBlockedQueue | 單鏈表實現的有界隊列,如果不指定容量默認為Integer.MAX_VALUE |
| SynchronousQueue | 不存儲元素的隊列,每個put()操作時必須有線程正在調用take(),該元素才存在,Executors.newCacheThreadPool()就使用該隊列,每來一個任務如果沒有空閑線程(線程復用)則創建新線程執行任務 |
| PriorityBlockedQueue | 無界的優先隊列,默認按自然排序,自定義實現compareTo()定制自己優先級,不同保證同優先級順序 |
| DelayQueue | 無界延遲隊列,利用PriorityBlockedQueue實現,在創建元素時可以指定多久能夠獲取到該元素,只有滿足延遲時間才能獲取到數據,ScheduledThreadPoolExecutor定時任務就是利用自己實現的延時隊列(思想一致) |
SynchronousQueue應用
@Testpublic void SynchronousQueue() throws InterruptedException {SynchronousQueue<Integer> queue = new SynchronousQueue<>();Random random = new Random();AtomicInteger ait = new AtomicInteger(0);new Thread(() -> {try {for (int i = 0; i < 3; i++) {Integer integer = queue.take();if (integer != null){int count = ait.incrementAndGet();System.out.println(count + "-" + integer);}}} catch (InterruptedException e) {e.printStackTrace();}}).start();TimeUnit.SECONDS.sleep(3);new Thread(() -> {for (int i = 0; i < 3; i++) {queue.offer(random.nextInt());}}).start();TimeUnit.SECONDS.sleep(5);}PriorityBlockedQueue應用
和PriorityQueue使用一樣,無非就是加了鎖阻塞生產、消費者線程
@Testpublic void priorityQueue(){PriorityQueue<Integer> queue = new PriorityQueue<>(new Comparator<Integer>() {@Overridepublic int compare(Integer o1, Integer o2) {return Integer.compare(o1, o2);}});queue.add(2);queue.add(1);queue.add(3);while (!queue.isEmpty()){System.out.println(queue.poll());}PriorityQueue<CustomRank> queue2 = new PriorityQueue<>();queue2.add(new CustomRank(2));queue2.add(new CustomRank(1));queue2.add(new CustomRank(3));while (!queue2.isEmpty()){System.out.println(queue2.poll().v);}}public class CustomRank implements Comparable<CustomRank>{Integer v;public CustomRank(Integer v) {this.v = v;}@Overridepublic int compareTo(CustomRank o) {return Integer.compare(this.v, o.v);}}DelayQueue應用
@Testpublic void delayQueue() throws InterruptedException {DelayQueue<CustomTimeTask> queue = new DelayQueue<>();queue.add(new CustomTimeTask("我是第一個任務", 4, TimeUnit.SECONDS));queue.add(new CustomTimeTask("我是第二個任務", 8, TimeUnit.SECONDS));queue.add(new CustomTimeTask("我是第三個任務", 16, TimeUnit.SECONDS));while (!queue.isEmpty()){CustomTimeTask task = queue.take();System.out.format("name: {%s}, time: {%s} \n", task.name, new Date());}}class CustomTimeTask implements Delayed{//觸發時間long time;//任務名稱String name;public CustomTimeTask(String name,long time, TimeUnit timeUnit) {this.time = System.currentTimeMillis() + timeUnit.toMillis(time);this.name = name;}@Overridepublic long getDelay(TimeUnit unit) {return time - System.currentTimeMillis();}/*** 利用優先隊列將任務按照觸發時間從小到大排序* @param o* @return*/@Overridepublic int compareTo(Delayed o) {CustomTimeTask other = (CustomTimeTask) o;return Long.compare(this.time, other.time);}@Overridepublic String toString() {return "CustomTimeTask{" +"time=" + time +", name='" + name + '\'' +'}';}}面試官:如何結合業務合理的配置線程池參數?CPU密集型和IO密集型如何配置?線程設置過多會造成什么影響?
答案:其實沒有完整的公式去計算,我在使用的時候一般是根據業務場景,動態的去改變線程池參數選擇最優配置方案
CPU 密集型任務
IO密集型任務
面試官:給我講講什么是線程復用?
什么是線程復用?
通過同一個線程去執行不同的任務,這就是線程復用。
java.util.concurrent.ThreadPoolExecutor#execute
public void execute(Runnable command) {// 如果傳入的Runnable的空,就拋出異常if (command == null)throw new NullPointerException();int c = ctl.get();// 線程池中的線程比核心線程數少 if (workerCountOf(c) < corePoolSize) {// 新建一個核心線程執行任務if (addWorker(command, true))return;c = ctl.get();}// 核心線程已滿,但是任務隊列未滿,添加到隊列中if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 任務成功添加到隊列以后,再次檢查是否需要添加新的線程,因為已存在的線程可能被銷毀了if (! isRunning(recheck) && remove(command))// 如果線程池處于非運行狀態,并且把當前的任務從任務隊列中移除成功,則拒絕該任務reject(command);else if (workerCountOf(recheck) == 0)// 如果之前的線程已經被銷毀完,新建一個非核心線程addWorker(null, false);}// 核心線程池已滿,隊列已滿,嘗試創建一個非核心新的線程else if (!addWorker(command, false))// 如果創建新線程失敗,說明線程池關閉或者線程池滿了,拒絕任務reject(command);}線程復用源碼分析:java.util.concurrent.ThreadPoolExecutor#runWorker
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // 釋放鎖 設置work的state=0 允許中斷boolean completedAbruptly = true;try {//一直執行 如果task不為空 或者 從隊列中獲取的task不為空while (task != null || (task = getTask()) != null) {task.run();//執行task中的run方法}}completedAbruptly = false;} finally {//1.將 worker 從數組 workers 里刪除掉//2.根據布爾值 allowCoreThreadTimeOut 來決定是否補充新的 Worker 進數組 workersprocessWorkerExit(w, completedAbruptly);}}面試官:為什么《阿里巴巴開發手冊》不推薦使用Executor創建線程?
ScheduledThreadPoolExecutor
延時執行
ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "schedule-thread");}});/*** 延遲執行* @throws InterruptedException*/@Testvoid testSchedule() throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);System.out.println(new Date());threadPool.schedule(new TimeTask(), 3, TimeUnit.SECONDS);countDownLatch.await();}class TimeTask implements Runnable{@Overridepublic void run() {try {TimeUnit.SECONDS.sleep(4);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + new Date() + " 任務執行完成");}}周期執行
1.scheduleAtFixedRate方法
按固定頻率執行,與任務本身執行時間無關。但有個前提條件,任務執行時間必須小于間隔時間,例如間隔時間是5s,每5s執行一次任務,任務的執行時間必須小于5s。
@Testvoid testScheduleAtFixedRate() throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);threadPool.scheduleAtFixedRate(new TimeTask(), 2, 3, TimeUnit.SECONDS);countDownLatch.await();}2.scheduleWithFixedDelay方法
按固定間隔執行,與任務本身執行時間有關。例如,任務本身執行時間是10s,間隔2s,則下一次開始執行的時間就是12s。
@Testvoid testScheduleWithFixedDelay() throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);threadPool.scheduleWithFixedDelay(new TimeTask(), 2, 3, TimeUnit.SECONDS);countDownLatch.await();}面試題:你知道延遲執行、周期性執行任務實現原理?
ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,這意味著其內部的數據結構和ThreadPoolExecutor是基本一樣的。
延遲執行任務依靠的是DelayQueue。DelayQueue是 BlockingQueue的一種,其實現原理是二叉堆。
而周期性執行任務是執行完一個任務之后,再把該任務扔回到任務隊列中,如此就可以對一個任務反復執行。
不過這里并沒有使用DelayQueue,而是在ScheduledThreadPoolExecutor內部又實現了一個特定的DelayQueue。
static class DelayedWorkQueue extends AbstractQueue<Runnable>implements BlockingQueue<Runnable> {...}其原理和DelayQueue一樣,但針對任務的取消進行了優化。下面主要講延遲執行和周期性執行的實現過程。
延遲執行設計原理:
傳進去的是一個Runnable,外加延遲時間delay。在內部通過decorateTask(…)方法把Runnable包裝成一個ScheduleFutureTask對象,而DelayedWorkQueue中存放的正是這種類型的對象,這種類型的對象一定實現了Delayed接口。
從上面的代碼中可以看出,schedule()方法本身很簡單,就是把提交的Runnable任務加上delay時間,轉換成ScheduledFutureTask對象,放入DelayedWorkerQueue中。任務的執行過程還是復用的ThreadPoolExecutor,延遲的控制是在DelayedWorkerQueue內部完成的。
周期性執行設計原理:
和schedule(…)方法的框架基本一樣,也是包裝一個ScheduledFutureTask對象,只是在延遲時間參數之外多了一個周期參數,然后放入DelayedWorkerQueue就結束了。
兩個方法的區別在于一個傳入的周期是一個負數,另一個傳入的周期是一個正數,為什么要這樣做呢?
用于生成任務序列號的sequencer,創建ScheduledFutureTask的時候使用:
withFixedDelay和atFixedRate的區別就體現在setNextRunTime里面。
如果是atFixedRate,period>0,下一次開始執行時間等于上一次開始執行時間+period;
如果是withFixedDelay,period < 0,下一次開始執行時間等于triggerTime(-p),為now+(-period),now即上一次執行的結束時間。
面試題:為什么不使用Timer而使用ScheduledThreadPoolExecutor?
CompletableFuture異步編程工具
基本使用
package net.dreamzuora.thread;import org.testng.annotations.Test;import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier;/*** 異步編程工具*/ public class CompletableFutureDemo {/*** CompletableFuture實現了Future接口,所以它也具有Future的特性:調用get()方法會阻塞在那,* 直到結果返回。* 另外1個線程調用complete方法完成該Future,則所有阻塞在get()方法的線程都將獲得返回結果。* @throws ExecutionException* @throws InterruptedException*/@Testvoid complete() throws ExecutionException, InterruptedException {CompletableFuture<String> completeFuture = new CompletableFuture<>();new Thread(() -> {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}completeFuture.complete("gome");}).start();System.out.println(completeFuture.get());}/*** 阻塞等待任務執行完成*/@Testvoid runAsyncTest() throws ExecutionException, InterruptedException {CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(3);System.out.println("hello word!");} catch (InterruptedException e) {e.printStackTrace();}});//阻塞等待任務完成completableFuture.get();System.out.println("succ");}/*** 帶返回值的任務執行* @throws ExecutionException* @throws InterruptedException*/@Testvoid supplyAsync() throws ExecutionException, InterruptedException {CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return "hello";}});String result = stringCompletableFuture.get();System.out.println(result);}/*** thenRun():上個任務結束再執行(不帶上一個返回值結果)下一個任務* thenAccept后面跟的是一個有參數、無返回值的方法,稱為Consumer,返回值也是* CompletableFuture<Void>類型。顧名思義,只進不出,所以稱為Consumer;前面的* Supplier,是無參數,有返回值,只出不進,和Consumer剛好相反。* @throws ExecutionException* @throws InterruptedException*/@Testvoid thenRun() throws ExecutionException, InterruptedException {CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {System.out.println("第一次執行");}).thenRun(new Runnable() {@Overridepublic void run() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二次執行");}});completableFuture.get();}/*** thenAccept():上個任務結束再執行(前面任務的結果作為下一個任務的入參)下一個任務* @throws ExecutionException* @throws InterruptedException*/@Testvoid thenAccept() throws ExecutionException, InterruptedException {CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {return "hello";}}).thenAccept(new Consumer<String>() {@Overridepublic void accept(String param) {System.out.println(param + " word!");}});completableFuture.get();}/*** thenApply 后面跟的是一個有參數、有返回值的方法,稱為Function。返回值是* CompletableFuture<String>類型。* @throws ExecutionException* @throws InterruptedException*/@Testvoid thenApply() throws ExecutionException, InterruptedException {CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return "第一個任務執行完成!";}}).thenApply(new Function<String, String>() {@Overridepublic String apply(String firstTaskResult) {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return firstTaskResult + " 第二個任務執行完成!";}});String result = stringCompletableFuture.get();System.out.println(result);}/*** 第1個參數是一個CompletableFuture類型,第2個參數是一個方法,并且是一個BiFunction,也就* 是該方法有2個輸入參數,1個返回值。* 從該接口的定義可以大致推測,它是要在2個 CompletableFuture 完成之后,把2個* CompletableFuture的返回值傳進去,再額外做一些事情。* @throws ExecutionException* @throws InterruptedException*/@Testvoid thenCompose() throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync((Supplier<String>) () -> "第一個任務執行完成!").thenCompose(new Function<String, CompletionStage<String>>() {@Overridepublic CompletionStage<String> apply(String firstTask) {return CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {return firstTask + " 第二個任務執行完成!";}});}});String s = future.get();System.out.println(s);}/*** 如果希望返回值是一個非嵌套的CompletableFuture,可以使用thenCompose:* @throws ExecutionException* @throws InterruptedException*/@Testvoid thenCombine() throws ExecutionException, InterruptedException {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {return "第一個任務執行完成! ";}}).thenCombine(CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {return "第二個任務執行完成! ";}}), new BiFunction<String, String, Integer>() {@Overridepublic Integer apply(String s1, String s2) {return s1.length() + s2.length();}});System.out.println(future.get());}/*** 等待所有的CompletableFuture執行完成,無返回值* @throws ExecutionException* @throws InterruptedException*/@Testvoid allOf() throws ExecutionException, InterruptedException {AtomicInteger atc = new AtomicInteger(0);CompletableFuture[] completableFutures = new CompletableFuture[10];for (int i = 0; i < 10; i++){CompletableFuture supplyAsync = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return atc.incrementAndGet();}});completableFutures[i] = supplyAsync;}CompletableFuture<Void> completableFuture = CompletableFuture.allOf(completableFutures);completableFuture.get();System.out.println(atc);}/*** anyOf:只要有任意一個CompletableFuture結束,就可以做接下來的事情,而無須像* AllOf那樣,等待所有的CompletableFuture結束。* 但由于每個CompletableFuture的返回值類型都可能不同,任意一個,意味著無法判斷是什么類* 型,所以anyOf的返回值是CompletableFuture<Object>類型*/@Testvoid anyOf() throws ExecutionException, InterruptedException {AtomicInteger atc = new AtomicInteger(0);CompletableFuture[] completableFutures = new CompletableFuture[10];for (int i = 0; i < 10; i++){CompletableFuture supplyAsync = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return atc.incrementAndGet();}});completableFutures[i] = supplyAsync;}Integer result = (Integer) CompletableFuture.anyOf(completableFutures).get();System.out.println(result);} }四種任務原型
通過上面的例子可以總結出,提交給CompletableFuture執行的任務有四種類型:Runnable、Consumer、Supplier、Function。下面是這四種任務原型的對比。
runAsync 與 supplierAsync 是 CompletableFuture 的靜態方法;而 thenAccept、thenAsync、thenApply是CompletableFutre的成員方法。
因為初始的時候沒有CompletableFuture對象,也沒有參數可傳,所以提交的只能是Runnable或者Supplier,只能是靜態方法;
通過靜態方法生成CompletableFuture對象之后,便可以鏈式地提交其他任務了,這個時候就可以提交Runnable、Consumer、Function,且都是成員方法。
面試題:你知道CompletableFuture內部原理?
CompletableFuture的構造:ForkJoinPool
private static final Executor asyncPool = useCommonPool ?ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();任務執行
public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);}static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {if (f == null) throw new NullPointerException();CompletableFuture<Void> d = new CompletableFuture<Void>();e.execute(new AsyncRun(d, f));return d;}
通過上面的代碼可以看到,asyncPool是一個static類型,supplierAsync、asyncSupplyStage也都是static方法。
Static方法會返回一個CompletableFuture類型對象,之后就可以鏈式調用CompletionStage里面的各個方法。
任務類型的適配
我們向CompletableFuture提交的任務是Runnable/Supplier/Consumer/Function 。因此,肯定需要一個適配機制,把這四種類型的任務轉換成ForkJoinTask,然后提交給ForkJoinPool,如下圖所示:
supplyAsync()->Supplier->AsyncSupply
在 supplyAsync(…)方法內部,會把一個 Supplier 轉換成一個 AsyncSupply,然后提交給ForkJoinPool執行;
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);}static <U> CompletableFuture<U> asyncSupplyStage(Executor e,Supplier<U> f) {if (f == null) throw new NullPointerException();CompletableFuture<U> d = new CompletableFuture<U>();e.execute(new AsyncSupply<U>(d, f));return d;}static final class AsyncSupply<T> extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {CompletableFuture<T> dep; Supplier<T> fn;AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {this.dep = dep; this.fn = fn;}...}runAsync()->Runnable->AsyncRun
在runAsync(…)方法內部,會把一個Runnable轉換成一個AsyncRun,然后提交給ForkJoinPool執行;
thenAccept()->Consumer->UniAccept
在 thenRun/thenAccept/thenApply 內部,會分別把Runnable/Consumer/Function 轉換成UniRun/UniAccept/UniApply對象,然后提交給ForkJoinPool執行;
除此之外,還有兩種 CompletableFuture 組合的情況,分為“與”和“或”,所以有對應的Bi和Or類型
的Completion類型
任務的鏈式執行過程分析
下面以CompletableFuture.supplyAsync(…).thenApply(…).thenRun(…)鏈式代碼為例,分析整個執行過程。
static final class AsyncSupply<T> extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {...}什么是 Java8 的 ForkJoinPool?
ForkJoinPool就是JDK7提供的一種“分治算法”的多線程并行計算框架。Fork意為分叉,Join意為合并,一分一合,相互配合,形成分治算法。此外,也可以將ForkJoinPool看作一個單機版的
Map/Reduce,多個線程并行計算。
相比于ThreadPoolExecutor,ForkJoinPool可以更好地實現計算的負載均衡,提高資源利用率。
假設有5個任務,在ThreadPoolExecutor中有5個線程并行執行,其中一個任務的計算量很大,其余4個任務的計算量很小,這會導致1個線程很忙,其他4個線程則處于空閑狀態。
利用ForkJoinPool,可以把大的任務拆分成很多小任務,然后這些小任務被所有的線程執行,從而
實現任務計算的負載均衡。
應用
1.斐波那契數列
@Testvoid testForkJoin() throws ExecutionException, InterruptedException {ForkJoinPool forkJoinPool = new ForkJoinPool();ForkJoinTask<Integer> task = forkJoinPool.submit(new FibonacciTask(5));System.out.println(task.get());}// 1 1 2 3 5 8 ...class FibonacciTask extends RecursiveTask<Integer> {int n;public FibonacciTask(int n) {this.n = n;}@Overrideprotected Integer compute() {if (n <= 1){return 1;}FibonacciTask task1 = new FibonacciTask(n - 1);task1.fork();FibonacciTask task2 = new FibonacciTask(n - 2);task2.fork();return task1.join() + task2.join();}}核心數據結構
與ThreadPoolExector不同的是,除一個全局的任務隊列之外,每個線程還有一個自己的局部隊列。
本課程內容參考:
1.《并發編程78講》-徐隆曦 滴滴出行高級工程師
2.美團技術博客-Java線程池實現原理及其在美團業務中的實踐
3.《java并發編程實戰》
4.CSDN博客-面試官:你知道什么是線程池的線程復用原理嗎?
總結
以上是生活随笔為你收集整理的深入剖析线程池基本原理以及常见面试题详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java多线程爬虫框架crawler4j
- 下一篇: 【转载保存】RunTime.getRun