terminated 线程_Java【多线程系列】JUC线程池—2. 原理(二)、Callable和Future
在"Java多線程系列--“基礎篇”01之 基本概念"中,我們介紹過,線程有5種狀態:新建狀態,就緒狀態,運行狀態,阻塞狀態,死亡狀態。線程池也有5種狀態;然而,線程池不同于線程,線程池的5種狀態是:Running, SHUTDOWN, STOP, TIDYING, TERMINATED。
線程池狀態定義代碼如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY = (1 << COUNT_BITS) - 1;private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;private static int ctlOf(int rs, int wc) { return rs | wc; }說明:
ctl是一個AtomicInteger類型的原子對象。ctl記錄了"線程池中的任務數量"和"線程池狀態"2個信息。
ctl共包括32位。其中,高3位表示"線程池狀態",低29位表示"線程池中的任務數量"。
| RUNNING | 對應的高3位值是111 |
| SHUTDOWN | 對應的高3位值是000 |
| STOP | 對應的高3位值是001 |
| TIDYING | 對應的高3位值是010 |
| TERMINATED | 對應的高3位值是011 |
線程池各個狀態之間的切換如下圖所示:
1. RUNNING
(01) 狀態說明:線程池處在RUNNING狀態時,能夠接收新任務,以及對已添加的任務進行處理。
(02) 狀態切換:線程池的初始化狀態是RUNNING。換句話說,線程池被一旦被創建,就處于RUNNING狀態!道理很簡單,在ctl的初始化代碼中(如下),就將它初始化為RUNNING狀態,并且"任務數量"初始化為0。
(01) 狀態說明:線程池處在SHUTDOWN狀態時,不接收新任務,但能處理已添加的任務。
(02) 狀態切換:調用線程池的shutdown()接口時,線程池由RUNNING -> SHUTDOWN。
3. STOP
(01) 狀態說明:線程池處在STOP狀態時,不接收新任務,不處理已添加的任務,并且會中斷正在處理的任務。
(02) 狀態切換:調用線程池的shutdownNow()接口時,線程池由(RUNNING or SHUTDOWN ) -> STOP。
4. TIDYING
(01) 狀態說明:當所有的任務已終止,ctl記錄的"任務數量"為0,線程池會變為TIDYING狀態。當線程池變為TIDYING狀態時,會執行鉤子函數terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變為TIDYING時,進行相應的處理;可以通過重載terminated()函數來實現。
(02) 狀態切換:當線程池在SHUTDOWN狀態下,阻塞隊列為空并且線程池中執行的任務也為空時,就會由 SHUTDOWN -> TIDYING。
當線程池在STOP狀態下,線程池中執行的任務為空時,就會由STOP -> TIDYING。
5. TERMINATED
(01) 狀態說明:線程池徹底終止,就變成TERMINATED狀態。
(02) 狀態切換:線程池處在TIDYING狀態時,執行完terminated()之后,就會由 TIDYING -> TERMINATED。
6. 拒絕策略介紹
線程池的拒絕策略,是指當任務添加到線程池中被拒絕,而采取的處理措施。
當任務添加到線程池中之所以被拒絕,可能是由于:
第一,線程池異常關閉。
第二,任務數量超過線程池的最大限制。
線程池共包括4種拒絕策略,它們分別是:AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy和DiscardPolicy。
| AbortPolicy | 當任務添加到線程池中被拒絕時,它將拋出 RejectedExecutionException 異常 |
| CallerRunsPolicy | 當任務添加到線程池中被拒絕時,會在線程池當前正在運行的Thread線程池中處理被拒絕的任務 |
| DiscardOldestPolicy | 當任務添加到線程池中被拒絕時,線程池會放棄等待隊列中最舊的未處理任務,然后將被拒絕的任務添加到等待隊列中 |
| DiscardPolicy | 當任務添加到線程池中被拒絕時,線程池將丟棄被拒絕的任務 |
線程池默認的處理策略是AbortPolicy!
7. 拒絕策略對比和示例
下面通過示例,分別演示線程池的4種拒絕策略。
(01) DiscardPolicy 示例
(02) DiscardOldestPolicy 示例
(03) AbortPolicy 示例
(04) CallerRunsPolicy 示例
7.1 DiscardPolicy 示例
import java.lang.reflect.Field;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy;public class DiscardPolicyDemo { private static final int THREADS_SIZE = 1; private static final int CAPACITY = 1; public static void main(String[] args) throws Exception { // 創建線程池。線程池的"最大池大小"和"核心池大小"都為1(THREADS_SIZE),"線程池"的阻塞隊列容量為1(CAPACITY)。 ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS, new ArrayBlockingQueue(CAPACITY)); // 設置線程池的拒絕策略為"丟棄" pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); // 新建10個任務,并將它們添加到線程池中。 for (int i = 0; i < 10; i++) { Runnable myrun = new MyRunnable("task-"+i); pool.execute(myrun); } // 關閉線程池 pool.shutdown(); }}class MyRunnable implements Runnable { private String name; public MyRunnable(String name) { this.name = name; } @Override public void run() { try { System.out.println(this.name + " is running."); Thread.sleep(100); } catch (Exception e) { e.printStackTrace(); } }}運行結果:
task-0 is running.task-1 is running.結果說明:線程池pool的"最大池大小"和"核心池大小"都為1(THREADS_SIZE),這意味著"線程池能同時運行的任務數量最大只能是1"。
線程池pool的阻塞隊列是ArrayBlockingQueue,ArrayBlockingQueue是一個有界的阻塞隊列,ArrayBlockingQueue的容量為1。這也意味著線程池的阻塞隊列只能有一個線程池阻塞等待。
根據""中分析的execute()代碼可知:線程池中共運行了2個任務。第1個任務直接放到Worker中,通過線程去執行;第2個任務放到阻塞隊列中等待。其他的任務都被丟棄了!
7.2 DiscardOldestPolicy 示例
import java.lang.reflect.Field;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy;public class DiscardOldestPolicyDemo { private static final int THREADS_SIZE = 1; private static final int CAPACITY = 1; public static void main(String[] args) throws Exception { // 創建線程池。線程池的"最大池大小"和"核心池大小"都為1(THREADS_SIZE),"線程池"的阻塞隊列容量為1(CAPACITY)。 ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS, new ArrayBlockingQueue(CAPACITY)); // 設置線程池的拒絕策略為"DiscardOldestPolicy" pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); // 新建10個任務,并將它們添加到線程池中。 for (int i = 0; i < 10; i++) { Runnable myrun = new MyRunnable("task-"+i); pool.execute(myrun); } // 關閉線程池 pool.shutdown(); }}class MyRunnable implements Runnable { private String name; public MyRunnable(String name) { this.name = name; } @Override public void run() { try { System.out.println(this.name + " is running."); Thread.sleep(200); } catch (Exception e) { e.printStackTrace(); } }}運行結果:task-0 is running.task-9 is running.結果說明:將"線程池的拒絕策略"由DiscardPolicy修改為DiscardOldestPolicy之后,當有任務添加到線程池被拒絕時,線程池會丟棄阻塞隊列中末尾的任務,然后將被拒絕的任務添加到末尾。
7.3 AbortPolicy 示例
import java.lang.reflect.Field;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;import java.util.concurrent.RejectedExecutionException;public class AbortPolicyDemo { private static final int THREADS_SIZE = 1; private static final int CAPACITY = 1; public static void main(String[] args) throws Exception { // 創建線程池。線程池的"最大池大小"和"核心池大小"都為1(THREADS_SIZE),"線程池"的阻塞隊列容量為1(CAPACITY)。 ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS, new ArrayBlockingQueue(CAPACITY)); // 設置線程池的拒絕策略為"拋出異常" pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); try { // 新建10個任務,并將它們添加到線程池中。 for (int i = 0; i < 10; i++) { Runnable myrun = new MyRunnable("task-"+i); pool.execute(myrun); } } catch (RejectedExecutionException e) { e.printStackTrace(); // 關閉線程池 pool.shutdown(); } }}class MyRunnable implements Runnable { private String name; public MyRunnable(String name) { this.name = name; } @Override public void run() { try { System.out.println(this.name + " is running."); Thread.sleep(200); } catch (Exception e) { e.printStackTrace(); } }}(某一次)運行結果:
java.util.concurrent.RejectedExecutionException at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1774) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656) at AbortPolicyDemo.main(AbortPolicyDemo.java:27)task-0 is running.task-1 is running.結果說明:將"線程池的拒絕策略"由DiscardPolicy修改為AbortPolicy之后,當有任務添加到線程池被拒絕時,會拋出RejectedExecutionException。
7.4 CallerRunsPolicy 示例
import java.lang.reflect.Field;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;public class CallerRunsPolicyDemo { private static final int THREADS_SIZE = 1; private static final int CAPACITY = 1; public static void main(String[] args) throws Exception { // 創建線程池。線程池的"最大池大小"和"核心池大小"都為1(THREADS_SIZE),"線程池"的阻塞隊列容量為1(CAPACITY)。 ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS, new ArrayBlockingQueue(CAPACITY)); // 設置線程池的拒絕策略為"CallerRunsPolicy" pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 新建10個任務,并將它們添加到線程池中。 for (int i = 0; i < 10; i++) { Runnable myrun = new MyRunnable("task-"+i); pool.execute(myrun); } // 關閉線程池 pool.shutdown(); }}class MyRunnable implements Runnable { private String name; public MyRunnable(String name) { this.name = name; } @Override public void run() { try { System.out.println(this.name + " is running."); Thread.sleep(100); } catch (Exception e) { e.printStackTrace(); } }}運行結果:
task-2 is running.task-3 is running.task-4 is running.task-5 is running.task-6 is running.task-7 is running.task-8 is running.task-9 is running.task-0 is running.task-1 is running.結果說明:將"線程池的拒絕策略"由DiscardPolicy修改為CallerRunsPolicy之后,當有任務添加到線程池被拒絕時,線程池會將被拒絕的任務添加到"線程池正在運行的線程"中取運行。
8 Callable 和 Future 簡介
Callable 和 Future 是比較有趣的一對組合。當我們需要獲取線程的執行結果時,就需要用到它們。Callable用于產生結果,Future用于獲取結果。
8.1?Callable
Callable 是一個接口,它只包含一個call()方法。Callable是一個返回結果并且可能拋出異常的任務。
為了便于理解,我們可以將Callable比作一個Runnable接口,而Callable的call()方法則類似于Runnable的run()方法。
Callable的源碼如下:
public interface Callable<V> { V call() throws Exception;}說明:從中我們可以看出Callable支持泛型。
8.2?Future
Future 是一個接口。它用于表示異步計算的結果。提供了檢查計算是否完成的方法,以等待計算的完成,并獲取計算的結果。
Future的源碼如下:
public interface Future<V> { // 試圖取消對此任務的執行。 boolean cancel(boolean mayInterruptIfRunning) // 如果在任務正常完成前將其取消,則返回 true。 boolean isCancelled() // 如果任務已完成,則返回 true。 boolean isDone() // 如有必要,等待計算完成,然后獲取其結果。 V get() throws InterruptedException, ExecutionException; // 如有必要,最多等待為使計算完成所給定的時間之后,獲取其結果(如果結果可用)。 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}說明:Future用于表示異步計算的結果。它的實現類是FutureTask,在講解FutureTask之前,我們先看看Callable, Future, FutureTask它們之間的關系圖,如下:
說明:
(01) RunnableFuture是一個接口,它繼承了Runnable和Future這兩個接口。RunnableFuture的源碼如下:
(02) FutureTask實現了RunnableFuture接口。所以,我們也說它實現了Future接口。
9 示例和源碼分析
我們先通過一個示例看看Callable和Future的基本用法,然后再分析示例的實現原理。
import java.util.concurrent.Callable;import java.util.concurrent.Future;import java.util.concurrent.Executors;import java.util.concurrent.ExecutorService;import java.util.concurrent.ExecutionException;class MyCallable implements Callable { @Override public Integer call() throws Exception { int sum = 0; // 執行任務 for (int i=0; i<100; i++) sum += i; //return sum; return Integer.valueOf(sum); } }public class CallableTest1 { public static void main(String[] args) throws ExecutionException, InterruptedException{ //創建一個線程池 ExecutorService pool = Executors.newSingleThreadExecutor(); //創建有返回值的任務 Callable c1 = new MyCallable(); //執行任務并獲取Future對象 Future f1 = pool.submit(c1); // 輸出結果 System.out.println(f1.get()); //關閉線程池 pool.shutdown(); }}運行結果:
4950結果說明:在主線程main中,通過newSingleThreadExecutor()新建一個線程池。接著創建Callable對象c1,然后再通過pool.submit(c1)將c1提交到線程池中進行處理,并且將返回的結果保存到Future對象f1中。然后,我們通過f1.get()獲取Callable中保存的結果;最后通過pool.shutdown()關閉線程池。9.1?submit()
submit()在java/util/concurrent/AbstractExecutorService.java中實現,它的源碼如下:
public Futuresubmit(Callable task) { if (task == null) throw new NullPointerException(); // 創建一個RunnableFuture對象 RunnableFuture ftask = newTaskFor(task); // 執行“任務ftask” execute(ftask); // 返回“ftask” return ftask;}說明:submit()通過newTaskFor(task)創建了RunnableFuture對象ftask。它的源碼如下:
protected RunnableFuturenewTaskFor(Callable callable) { return new FutureTask(callable);}9.2. FutureTask的構造函數
FutureTask的構造函數如下:
public FutureTask(Callable callable) { if (callable == null) throw new NullPointerException(); // callable是一個Callable對象 this.callable = callable; // state記錄FutureTask的狀態 this.state = NEW; // ensure visibility of callable}9.3?FutureTask的run()方法
我們繼續回到submit()的源碼中。
在newTaskFor()新建一個ftask對象之后,會通過execute(ftask)執行該任務。此時ftask被當作一個Runnable對象進行執行,最終會調用到它的run()方法;ftask的run()方法在java/util/concurrent/FutureTask.java中實現,源碼如下:
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { // 將callable對象賦值給c。 Callable c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 執行Callable的call()方法,并保存結果到result中。 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } // 如果運行成功,則將result保存 if (ran) set(result); } } finally { runner = null; // 設置“state狀態標記” int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}說明:run()中會執行Callable對象的call()方法,并且最終將結果保存到result中,并通過set(result)將result保存。
之后調用FutureTask的get()方法,返回的就是通過set(result)保存的值。
總結
以上是生活随笔為你收集整理的terminated 线程_Java【多线程系列】JUC线程池—2. 原理(二)、Callable和Future的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: pb预览状态下的pagecount_我为
- 下一篇: Java基本sql_SQL基本语句