Java多线程系列 JUC线程池05 线程池原理解析(四)
轉(zhuǎn)載?http://www.cnblogs.com/skywang12345/p/3544116.html??https://blog.csdn.net/programmer_at/article/details/79799267
Executor執(zhí)行Callable任務(wù)
Callable 和 Future 是比較有趣的一對(duì)組合。當(dāng)我們需要獲取線程的執(zhí)行結(jié)果時(shí),就需要用到它們。Callable用于產(chǎn)生結(jié)果,Future用于獲取結(jié)果。
1. Callable
Callable 是一個(gè)接口,它只包含一個(gè)call()方法。Callable是一個(gè)返回結(jié)果并且可能拋出異常的任務(wù)。為了便于理解,我們可以將Callable比作一個(gè)Runnable接口,而Callable的call()方法則類似于Runnable的run()方法。
Callable的源碼如下:
public interface Callable<V> {V call() throws Exception; }說(shuō)明:從中我們可以看出Callable支持泛型。
2. Future
Future 是一個(gè)接口。它用于表示異步計(jì)算的結(jié)果。提供了檢查計(jì)算是否完成的方法,以等待計(jì)算的完成,并獲取計(jì)算的結(jié)果。
Future的源碼如下:
public interface Future<V> {// 試圖取消對(duì)此任務(wù)的執(zhí)行。boolean cancel(boolean mayInterruptIfRunning)// 如果在任務(wù)正常完成前將其取消,則返回 true。boolean isCancelled()// 如果任務(wù)已完成,則返回 true。boolean isDone()// 如有必要,等待計(jì)算完成,然后獲取其結(jié)果。V get() throws InterruptedException, ExecutionException;// 如有必要,最多等待為使計(jì)算完成所給定的時(shí)間之后,獲取其結(jié)果(如果結(jié)果可用)。V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException; }說(shuō)明: Future用于表示異步計(jì)算的結(jié)果。它的實(shí)現(xiàn)類是FutureTask,在講解FutureTask之前,我們先看看Callable, Future, FutureTask它們之間的關(guān)系圖,如下:
?
說(shuō)明:
(01) RunnableFuture是一個(gè)接口,它繼承了Runnable和Future這兩個(gè)接口。RunnableFuture的源碼如下:
(02) FutureTask實(shí)現(xiàn)了RunnableFuture接口。所以,我們也說(shuō)它實(shí)現(xiàn)了Future接口。
示例和源碼分析
我們先通過(guò)一個(gè)示例看看Callable和Future的基本用法,然后再分析示例的實(shí)現(xiàn)原理。
import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutionException;class MyCallable implements Callable {@Override public Integer call() throws Exception {int sum = 0;// 執(zhí)行任務(wù)for (int i=0; i<100; i++)sum += i;//return sum; return Integer.valueOf(sum);} }public class CallableTest1 {public static void main(String[] args) throws ExecutionException, InterruptedException{//創(chuàng)建一個(gè)線程池ExecutorService pool = Executors.newSingleThreadExecutor();//創(chuàng)建有返回值的任務(wù)Callable c1 = new MyCallable();//執(zhí)行任務(wù)并獲取Future對(duì)象 Future f1 = pool.submit(c1);// 輸出結(jié)果 System.out.println(f1.get()); //關(guān)閉線程池 pool.shutdown(); } }運(yùn)行結(jié)果:
4950結(jié)果說(shuō)明:
在主線程main中,通過(guò)newSingleThreadExecutor()新建一個(gè)線程池。接著創(chuàng)建Callable對(duì)象c1,然后再通過(guò)pool.submit(c1)將c1提交到線程池中進(jìn)行處理,并且將返回的結(jié)果保存到Future對(duì)象f1中。然后,我們通過(guò)f1.get()獲取Callable中保存的結(jié)果;最后通過(guò)pool.shutdown()關(guān)閉線程池。
?
1. submit任務(wù),等待線程池execute?
1. 執(zhí)行FutureTask類的get方法時(shí),會(huì)把主線程封裝成WaitNode節(jié)點(diǎn)并保存在waiters鏈表中, 并阻塞等待運(yùn)行結(jié)果;?
2. FutureTask任務(wù)執(zhí)行完成后,通過(guò)UNSAFE設(shè)置waiters相應(yīng)的waitNode為null,并通過(guò)LockSupport類unpark方法喚醒主線程;
在實(shí)際業(yè)務(wù)場(chǎng)景中,Future和Callable基本是成對(duì)出現(xiàn)的,Callable負(fù)責(zé)產(chǎn)生結(jié)果,Future負(fù)責(zé)獲取結(jié)果。?
1. Callable接口類似于Runnable,只是Runnable沒(méi)有返回值。?
2. Callable任務(wù)除了返回正常結(jié)果之外,如果發(fā)生異常,該異常也會(huì)被返回,即Future可以拿到異步執(zhí)行任務(wù)各種結(jié)果;?
3. Future.get方法會(huì)導(dǎo)致主線程阻塞,直到Callable任務(wù)執(zhí)行完成;
1. submit()
submit()在ExecutorService.java中的定義:
<T> Future<T> submit(Callable<T> task);<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);submit()在AbstractExecutorService.java中實(shí)現(xiàn),AbstractExecutorService.submit()實(shí)現(xiàn)了ExecutorService.submit(),并且可以獲取執(zhí)行完的返回值, 而ThreadPoolExecutor是AbstractExecutorService.submit()的子類,所以submit方法也是ThreadPoolExecutor的方法,它的源碼如下:
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();// 創(chuàng)建一個(gè)RunnableFuture對(duì)象RunnableFuture<T> ftask = newTaskFor(task);// 執(zhí)行“任務(wù)ftask” execute(ftask);// 返回“ftask”return ftask; }說(shuō)明:submit()通過(guò)newTaskFor(task)創(chuàng)建了RunnableFuture對(duì)象ftask。它的源碼如下:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable); }通過(guò)submit方法提交的Callable任務(wù)會(huì)被封裝成了一個(gè)FutureTask對(duì)象。通過(guò)Executor.execute方法提交FutureTask到線程池中等待被執(zhí)行,最終執(zhí)行的是FutureTask的run方法;
2. FutureTask的構(gòu)造函數(shù)
FutureTask的內(nèi)部狀態(tài)及構(gòu)造函數(shù)如下:
public class FutureTask<V> implements RunnableFuture<V> {private volatile int state;private static final int NEW = 0;private static final int COMPLETING = 1;private static final int NORMAL = 2;private static final int EXCEPTIONAL = 3;private static final int CANCELLED = 4;private static final int INTERRUPTING = 5;private static final int INTERRUPTED = 6;public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();// callable是一個(gè)Callable對(duì)象this.callable = callable;// state記錄FutureTask的狀態(tài)this.state = NEW; // ensure visibility of callable } }3. FutureTask的run()方法
我們繼續(xù)回到submit()的源碼中。
在newTaskFor()新建一個(gè)ftask對(duì)象之后,會(huì)通過(guò)execute(ftask)執(zhí)行該任務(wù)。此時(shí)ftask被當(dāng)作一個(gè)Runnable對(duì)象進(jìn)行執(zhí)行,最終會(huì)調(diào)用到它的run()方法;ftask的run()方法在java/util/concurrent/FutureTask.java中實(shí)現(xiàn),源碼如下:
說(shuō)明:FutureTask.run方法是在線程池中被執(zhí)行的,而非主線程?
1. 通過(guò)執(zhí)行Callable任務(wù)的call方法;?
2. 如果call執(zhí)行成功,則通過(guò)set方法保存結(jié)果,之后調(diào)用FutureTask的get()方法,返回的就是通過(guò)set(result)保存的值;?
3. 如果call執(zhí)行有異常,則通過(guò)setException保存異常;
4. get方法
public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s); }內(nèi)部通過(guò)awaitDone方法對(duì)主線程進(jìn)行阻塞,具體實(shí)現(xiàn)如下:
private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;if (s > COMPLETING) {if (q != null)q.thread = null;return s;}else if (s == COMPLETING) // cannot time out yet Thread.yield();else if (q == null)q = new WaitNode();else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}}說(shuō)明:
?
轉(zhuǎn)載于:https://www.cnblogs.com/lizhouwei/p/9119074.html
總結(jié)
以上是生活随笔為你收集整理的Java多线程系列 JUC线程池05 线程池原理解析(四)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 几款开源的图形化Redis客户端管理软件
- 下一篇: 配置防盗链、 访问控制Directory