谈谈java的线程池(创建、机制)
目錄
Executors創建線程池默認方法
自定義線程池
Executors創建線程池默認方法
? ? newFixedThreadPool()方法,該方法返回一個固定數量的線程池,該方法的線程數始終不變,當有一個任務提交時,若線程池中空閑,則立即執行,若沒有,則會被暫緩在一個任務隊列中等待有空閑的線程去執行。
? ? newSingleThreadExecutor()方法,創建一個線程的線程池,若空閑則執行,若沒有空閑線程則暫緩在任務隊列。
? ?newCachedThreadPool()方法,返回一個可根據實際情況調整線程個數的線程池,不限制最大線程數量,若有空閑的線程則直接執行任務,若無空閑則創建線程,若無任務則不創建線程。并且每一個空閑線程會在60秒后自動回收。
? ?newScheduledThreadPool()該方法返回一個ScheduledExecutorService對象,但該線程池可以指定線程的數量,可以定時。
前三種線程池添加線程: ExecutorService pool = Executors.newSingleThreadExecutor(); pool.execute(new Thread()); //submit和execute的區別: 第一點是submit可以傳入實現Callable接口的實例對象, 第二點是submit方法有返回值 Future f1 = pool.submit(new Thread()); f1.get()//如果為空,則線程執行完畢了,否則會一直阻塞在這里。 //ScheduledThreadPool相當于定時任務 class Temp extends Thread {public void run() {System.out.println("run");} } public class ScheduledJob {public static void main(String args[]) throws Exception {Temp command = new Temp();ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);ScheduledFuture<?> scheduleTask = scheduler.scheduleWithFixedDelay(command, 5, 1, TimeUnit.SECONDS);//5秒初始化之后執行一次,以后每1秒執行一次} }其實底層都是new了ThreadPoolExecutor。
? ? 若Executors工廠類無法滿足我們的需求,可以自己去創建自定義的線程池,其實Executors工廠類里面的創建線程方法其內部實現均是用了ThreadPoolExecutor這個類,這個類可以自定義線程。構造方法如下:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {} * corePoolSize: 線程池核心線程數(線程池初始化存在的線程) * maximumPoolSize:線程池最大數 * keepAliveTime: 空閑線程存活時間 * unit: 時間單位 * workQueue: 線程池所使用的緩沖隊列 * threadFactory:線程池創建線程使用的工廠 * handler: 線程池對拒絕任務的處理策略自定義線程池
這個構造方法對于隊列是什么類型的比較關鍵。
使用有界隊列時:若有新的任務需要執行,如果線程池實際線程小于corePoolSize,則優先創建線程,若大于corePoolSize,則會將任務加入隊列,若隊列已滿,則在總線程數不大于maximumPoolSize的前提下,創建新的線程去執行新任務,若線程數大于maximumPoolSize,則執行拒絕策略。或其他自定義方式。
使用無界隊列時:LinkedBlockingQueue。與有界隊列相比,除非系統資源耗盡,否則無界的任務隊列不存在任務入隊失敗的情況。當有新任務到來,系統的線程數小于corePoolSize時,則新建線程執行任務。當達到corePoolSize后,就不會繼續增加。若后續仍有新的任務加入,而又沒有空閑的線程資源,則任務直接進入隊列等待。若任務創建和處理的速度差異很大,無界隊列會保持快速增長,直到耗盡系統內存。
JDK拒絕策略:
? ? AbortPolicy:直接拋出異常組織系統正常工作。
? ? CallerRunsPolicy:只要線程池未關閉,該策略直接在調用者線程中,運行當前被丟棄的任務。
? ? DiscardOldestPolicy:丟棄最老的一個請求,嘗試再次提交當前任務。
? ? DiscardPolicy:丟棄無法處理的任務,不給予任何處理。
//如果需要自定義拒絕策略可以實現RejectedExecutionHandler接口。 public class MyTask implements Runnable {private int taskId;private String taskName;public MyTask(int taskId, String taskName){this.taskId = taskId;this.taskName = taskName;}public int getTaskId() {return taskId;}public void setTaskId(int taskId) {this.taskId = taskId;}public String getTaskName() {return taskName;}public void setTaskName(String taskName) {this.taskName = taskName;}@Overridepublic void run() {try {System.out.println("run taskId =" + this.taskId);Thread.sleep(5*1000);//System.out.println("end taskId =" + this.taskId);} catch (InterruptedException e) {e.printStackTrace();} }public String toString(){return Integer.toString(this.taskId);} }public class UseThreadPoolExecutor1 {public static void main(String[] args) {/*** 在使用有界隊列時,若有新的任務需要執行,如果線程池實際線程數小于corePoolSize,則優先創建線程,* 若大于corePoolSize,則會將任務加入隊列,* 若隊列已滿,則在總線程數不大于maximumPoolSize的前提下,創建新的線程,* 若線程數大于maximumPoolSize,則執行拒絕策略。或其他自定義方式。**/ ThreadPoolExecutor pool = new ThreadPoolExecutor(1, //coreSize2, //MaxSize60, //60TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(3) //指定一種隊列 (有界隊列)//new LinkedBlockingQueue<Runnable>(), new MyRejected()//, new DiscardOldestPolicy());MyTask mt1 = new MyTask(1, "任務1");MyTask mt2 = new MyTask(2, "任務2");MyTask mt3 = new MyTask(3, "任務3");MyTask mt4 = new MyTask(4, "任務4");MyTask mt5 = new MyTask(5, "任務5");MyTask mt6 = new MyTask(6, "任務6");pool.execute(mt1);pool.execute(mt2);pool.execute(mt3);pool.execute(mt4);pool.execute(mt5);pool.execute(mt6);pool.shutdown();} }public class MyRejected implements RejectedExecutionHandler{public MyRejected(){}@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println("自定義處理..");System.out.println("當前被拒絕任務為:" + r.toString());} } public class UseThreadPoolExecutor2 implements Runnable{private static AtomicInteger count = new AtomicInteger(0);@Overridepublic void run() {try {int temp = count.incrementAndGet();System.out.println("任務" + temp);Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) throws Exception{//System.out.println(Runtime.getRuntime().availableProcessors());BlockingQueue<Runnable> queue =//new LinkedBlockingQueue<Runnable>();new ArrayBlockingQueue<Runnable>(10);ExecutorService executor = new ThreadPoolExecutor(5, //core10, //max無界隊列的這個參數其實沒啥作用了120L, //2fenzhongTimeUnit.SECONDS,queue);for(int i = 0 ; i < 20; i++){executor.execute(new UseThreadPoolExecutor2());}Thread.sleep(1000);System.out.println("queue size:" + queue.size()); //10Thread.sleep(2000);} }線程池的方法
向線程池提交任務
execute()
execute()方法用于提交不需要返回值的任務Runnable,所以無法判斷任務是否被線程池執行成功。
package com.morris.concurrent.threadpool.threadpoolexecutor.api;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*; import java.util.stream.IntStream;@Slf4j public class ExecuteDemo {public static void main(String[] args) {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());IntStream.rangeClosed(1, 3).forEach(i -> threadPoolExecutor.execute(() -> log.info("Task {} working...", i))); // 提交任務threadPoolExecutor.shutdown(); // 關閉線程池} }運行結果如下:
2020-10-13 14:58:09,500 INFO [pool-1-thread-1] (ExecuteDemo.java:12) - Task 1 working... 2020-10-13 14:58:09,502 INFO [pool-1-thread-1] (ExecuteDemo.java:12) - Task 2 working... 2020-10-13 14:58:09,502 INFO [pool-1-thread-1] (ExecuteDemo.java:12) - Task 3 working...submit()
submit()方法用于提交需要返回值的任務。線程池會返回一個future類型的對象,通過這個future對象可以判斷任務是否執行成功,并且可以通過future的get()方法來獲取返回值,get()方法會阻塞當前線程直到任務完成,get()方法也支持帶超時時間的阻塞。
運行結果如下:
2020-10-13 15:03:22,306 INFO [pool-1-thread-1] (SubmitDemo.java:14) - Task begin... 2020-10-13 15:03:25,315 INFO [pool-1-thread-1] (SubmitDemo.java:20) - Task end. 2020-10-13 15:03:25,316 INFO [main] (SubmitDemo.java:27) - get result:task resultinvokeAll()
package com.morris.concurrent.threadpool.threadpoolexecutor.api;import lombok.extern.slf4j.Slf4j;import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.IntStream;@Slf4j public class InvokeAllDemo {public static void main(String[] args) throws InterruptedException, ExecutionException {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());List<Callable<String>> tasks = IntStream.rangeClosed(1, 3).boxed().map(i -> (Callable<String>) () -> "task " + i).collect(Collectors.toList());List<Future<String>> futures = threadPoolExecutor.invokeAll(tasks);// 批量提交任務for (Future<String> future : futures) {log.info("result:{}", future.get()); // 阻塞獲取結果}threadPoolExecutor.shutdown();} }運行結果如下:
2020-10-13 16:09:00,518 INFO [main] (InvokeAllDemo.java:25) - result:task 1 2020-10-13 16:09:00,520 INFO [main] (InvokeAllDemo.java:25) - result:task 2 2020-10-13 16:09:00,520 INFO [main] (InvokeAllDemo.java:25) - result:task 3invokeAny()
invokeAny批量提交任務,哪個任務先執行完畢,立刻返回此任務執行結果,其它任務取消。
package com.morris.concurrent.threadpool.threadpoolexecutor.api;import lombok.extern.slf4j.Slf4j;import java.util.List; import java.util.Random; import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.IntStream;@Slf4j public class InvokeAnyDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());List<Callable<String>> tasks = IntStream.rangeClosed(1, 3).boxed().map(i -> (Callable<String>) () -> {log.info("task{} begin ...", i);TimeUnit.SECONDS.sleep(new Random().nextInt(10));log.info("task{} end .", i);return "task" + i;}).collect(Collectors.toList());String result = threadPoolExecutor.invokeAny(tasks); // 哪個任務執行完就立馬返回,其他任務直接中斷log.info("result:{}", result); // 阻塞獲取結果threadPoolExecutor.shutdown();} }運行結果如下:
2020-10-13 16:09:35,121 INFO [pool-1-thread-1] (InvokeAnyDemo.java:21) - task1 begin ... 2020-10-13 16:09:35,121 INFO [pool-1-thread-2] (InvokeAnyDemo.java:21) - task2 begin ... 2020-10-13 16:09:35,121 INFO [pool-1-thread-3] (InvokeAnyDemo.java:21) - task3 begin ... 2020-10-13 16:09:38,125 INFO [pool-1-thread-3] (InvokeAnyDemo.java:23) - task3 end . 2020-10-13 16:09:38,125 INFO [main] (InvokeAnyDemo.java:29) - result:task3線程池的關閉
shutdown()
shutdown()方法會先將線程池的狀態改為SHUTDOWN,然后中斷空閑的線程,不會再接收新任務,但已提交任務會執行完。
package com.morris.concurrent.threadpool.threadpoolexecutor.api;import lombok.extern.slf4j.Slf4j;import java.util.Random; import java.util.concurrent.*; import java.util.stream.IntStream;/*** 線程池的關閉之shutdown()*/ @Slf4j public class ShutdownDemo {public static void main(String[] args) {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());IntStream.rangeClosed(1, 10).forEach(i -> threadPoolExecutor.submit(() -> {log.info("task{} begin ...", i);try {TimeUnit.SECONDS.sleep(new Random().nextInt(3));} catch (InterruptedException e) {e.printStackTrace();}log.info("task{} end .", i);}));threadPoolExecutor.shutdown();threadPoolExecutor.submit(() -> log.info("submit task after shutdown"));} }運行結果如下:
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@18769467 rejected from java.util.concurrent.ThreadPoolExecutor@46ee7fe8[Shutting down, pool size = 2, active threads = 2, queued tasks = 8, completed tasks = 0]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)at com.morris.concurrent.threadpool.threadpoolexecutor.api.ShutdownDemo.main(ShutdownDemo.java:30) 2020-10-13 16:54:42,637 INFO [pool-1-thread-1] (ShutdownDemo.java:19) - task1 begin ... 2020-10-13 16:54:42,637 INFO [pool-1-thread-2] (ShutdownDemo.java:19) - task2 begin ... 2020-10-13 16:54:43,639 INFO [pool-1-thread-2] (ShutdownDemo.java:25) - task2 end . 2020-10-13 16:54:43,639 INFO [pool-1-thread-1] (ShutdownDemo.java:25) - task1 end . 2020-10-13 16:54:43,640 INFO [pool-1-thread-2] (ShutdownDemo.java:19) - task3 begin ... 2020-10-13 16:54:43,640 INFO [pool-1-thread-1] (ShutdownDemo.java:19) - task4 begin ... 2020-10-13 16:54:43,641 INFO [pool-1-thread-1] (ShutdownDemo.java:25) - task4 end . 2020-10-13 16:54:43,641 INFO [pool-1-thread-1] (ShutdownDemo.java:19) - task5 begin ... 2020-10-13 16:54:44,642 INFO [pool-1-thread-1] (ShutdownDemo.java:25) - task5 end . 2020-10-13 16:54:44,642 INFO [pool-1-thread-1] (ShutdownDemo.java:19) - task6 begin ... 2020-10-13 16:54:45,642 INFO [pool-1-thread-2] (ShutdownDemo.java:25) - task3 end . 2020-10-13 16:54:45,642 INFO [pool-1-thread-2] (ShutdownDemo.java:19) - task7 begin ... 2020-10-13 16:54:46,642 INFO [pool-1-thread-2] (ShutdownDemo.java:25) - task7 end . 2020-10-13 16:54:46,642 INFO [pool-1-thread-1] (ShutdownDemo.java:25) - task6 end . 2020-10-13 16:54:46,642 INFO [pool-1-thread-2] (ShutdownDemo.java:19) - task8 begin ... 2020-10-13 16:54:46,642 INFO [pool-1-thread-1] (ShutdownDemo.java:19) - task9 begin ... 2020-10-13 16:54:47,642 INFO [pool-1-thread-2] (ShutdownDemo.java:25) - task8 end . 2020-10-13 16:54:47,642 INFO [pool-1-thread-2] (ShutdownDemo.java:19) - task10 begin ... 2020-10-13 16:54:47,642 INFO [pool-1-thread-2] (ShutdownDemo.java:25) - task10 end . 2020-10-13 16:54:48,643 INFO [pool-1-thread-1] (ShutdownDemo.java:25) - task9 end .shutdownNow()
shutdownNow()會先將會先將線程池的狀態改為STOP,不會再接收新任務,然后中斷所有的線程,如果執行中的任務沒有對中斷進行處理,那么這個任務將會繼續執行直至完成,如果執行中的任務對中斷進行的處理,那么將按中斷進行執行,最后方法會返回等待隊列中未執行的任務。
?
運行結果如下:
2020-10-13 16:55:08,985 INFO [main] (ShutdownNowDemo.java:30) - tasks size:8 2020-10-13 16:55:08,987 INFO [pool-1-thread-2] (ShutdownNowDemo.java:20) - task2 begin ... 2020-10-13 16:55:08,987 INFO [pool-1-thread-1] (ShutdownNowDemo.java:20) - task1 begin ... 2020-10-13 16:55:08,988 INFO [pool-1-thread-2] (ShutdownNowDemo.java:26) - task2 end . 2020-10-13 16:55:08,989 INFO [pool-1-thread-1] (ShutdownNowDemo.java:26) - task1 end .awaitTermination()
調用shutdown()后,調用線程并不會等待所有任務運行結束,可以利用awaitTermination()方法設置一個超時時間進行阻塞等待,此方法返回了有兩種情況,要么所有的任務執行完成,要么超時時間到了。
package com.morris.concurrent.threadpool.threadpoolexecutor.api;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /*** threadPoolExecutor.awaitTermination()的使用*/ @Slf4j public class AwaitTerminationDemo {public static void main(String[] args) throws InterruptedException {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());threadPoolExecutor.submit(() -> {log.info("task begin ...");try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}log.info("task end .");});threadPoolExecutor.shutdown(); // 關閉線程池threadPoolExecutor.awaitTermination(10, TimeUnit.SECONDS);log.info("main thread exit");} }運行結果如下:
2020-10-13 17:01:45,397 INFO [pool-1-thread-1] (AwaitTerminationDemo.java:16) - task begin ... 2020-10-13 17:01:48,398 INFO [pool-1-thread-1] (AwaitTerminationDemo.java:22) - task end . 2020-10-13 17:01:48,398 INFO [main] (AwaitTerminationDemo.java:27) - main thread exit創建線程的工廠ThreadFactory
可以自定義ThreadFactory為線程池中的線程取一個有意義的名字,方便后面快速定位問題。
package com.morris.concurrent.threadpool.threadpoolexecutor.construct;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger;/*** 使用ThreadFactory自定義線程名*/ @Slf4j public class ThreadFactoryDemo {public static void main(String[] args) {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,new LinkedBlockingQueue<>(), new ThreadFactory() {private AtomicInteger idx = new AtomicInteger(1);private static final String THREAD_NAME_PREFIX = "mythread-pool-";@Overridepublic Thread newThread(Runnable r) {return new Thread(r, THREAD_NAME_PREFIX + idx.getAndIncrement());}});threadPoolExecutor.submit(()->{log.info("task begin ...");try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}log.info("task end .");});threadPoolExecutor.shutdown();} }?
總結
以上是生活随笔為你收集整理的谈谈java的线程池(创建、机制)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 谈谈java的并发容器、Queue
- 下一篇: 谈谈java并发锁(重入锁、读写锁、公平