日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

线程池中的饱和策略

發(fā)布時(shí)間:2025/4/14 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 线程池中的饱和策略 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

ThreadPoolExecutor允許提供一個(gè)BlockingQueue來保存等待執(zhí)行的任務(wù)。

查看結(jié)構(gòu)圖

?

?? 我們需要關(guān)注的方法是offer(E),put(E),take()

newFixedThreadPool和newSingleThreadExecutor在默認(rèn)情況下將使用一個(gè)無界的隊(duì)列(LinkedBlockingQueue),如果所有線程都在執(zhí)行任務(wù),那么任務(wù)將在隊(duì)列中等待,如果任務(wù)到達(dá)的速度大于線程執(zhí)行的速度,造成的后果將是隊(duì)列無限期增加。

更穩(wěn)妥的管理策略是使用有界隊(duì)列,如:ArrayBlockingQueue,有界的LinkedBlockingQueue,PriorityBlockingQueue.有界隊(duì)列避免了資源耗盡的情況,但出現(xiàn)一個(gè)問題,隊(duì)列填滿后,新的任務(wù)該怎么辦?使用拒絕策略。

?? JDK提供了幾種不同的RejectedExecutionHandler實(shí)現(xiàn),每種都是不同的飽和策略:AbortPolicy,CallerRunsPolicy,DiscardPolicy和DiscardOldestPolicy.

AbortPolicy 當(dāng)任務(wù)添加到線程池中被拒絕時(shí),它將拋出 RejectedExecutionException 異常。
如下代碼:
1 package cn.concurrent.executor; 2 3 import java.util.concurrent.*; 4 5 /** 6 * Created by spark on 17-9-24. 7 */ 8 public class AbortPolicyDemo { 9 10 public static void main(String[] args) { 11 //初始化一個(gè)初始化容量大小為1,阻塞隊(duì)列容量為1,maxmumPoolSize大小為1的線程池 12 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 13 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1)); 14 //設(shè)置飽和策略為AbortPolicy---拒絕策略/**/,用戶可以捕獲這個(gè)異常 15 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); 16 //創(chuàng)建線程執(zhí)行 17 for (int i = 0; i < 5; i++) { 18 MyRunnable myRunnable = new MyRunnable(); 19 pool.execute(myRunnable); 20 } 21 pool.shutdown(); 22 } 23 24 25 static class MyRunnable implements Runnable { 26 @Override 27 public void run() { 28 System.err.println(Thread.currentThread().getId() + ":正在執(zhí)行"); 29 try { 30 Thread.sleep(300); 31 } catch (InterruptedException e) { 32 e.printStackTrace(); 33 } 34 } 35 } 36 }

結(jié)果如下:

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task cn.concurrent.executor.AbortPolicyDemo$MyRunnable@1d44bcfa rejected from java.util.concurrent.ThreadPoolExecutor@266474c2[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2066)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)at cn.concurrent.executor.AbortPolicyDemo.main(AbortPolicyDemo.java:19) 10:正在執(zhí)行 10:正在執(zhí)行

拋出了RejectedExecutionException,由于飽和策略引起的。

如果修改代碼如下:
1 package cn.concurrent.executor; 2 3 import java.util.concurrent.*; 4 5 /** 6 * Created by spark on 17-9-24. 7 */ 8 public class AbortPolicyDemo { 9 10 public static void main(String[] args) { 11 //初始化一個(gè)初始化容量大小為1,阻塞隊(duì)列容量為1,maxmumPoolSize大小為1的線程池 12 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 13 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1)); 14 //設(shè)置飽和策略為AbortPolicy---拒絕策略/**/,用戶可以捕獲這個(gè)異常 15 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); 16 //創(chuàng)建線程執(zhí)行 17 for (int i = 0; i < 5; i++) { 18 try { 19 Thread.sleep(500); 20 } catch (InterruptedException e) { 21 e.printStackTrace(); 22 } 23 MyRunnable myRunnable = new MyRunnable(); 24 pool.execute(myRunnable); 25 } 26 pool.shutdown(); 27 } 28 29 30 static class MyRunnable implements Runnable { 31 @Override 32 public void run() { 33 System.err.println(Thread.currentThread().getId() + ":正在執(zhí)行"); 34 try { 35 Thread.sleep(300); 36 } catch (InterruptedException e) { 37 e.printStackTrace(); 38 } 39 } 40 } 41 }

執(zhí)行結(jié)果為:

1 /usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -javaagent:/usr/local/idea/lib/idea_rt.jar=43205:/usr/local/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar cn.concurrent.executor.AbortPolicyDemo 2 10:正在執(zhí)行 3 10:正在執(zhí)行 4 10:正在執(zhí)行 5 10:正在執(zhí)行 6 10:正在執(zhí)行 7 8 Process finished with exit code 0

原因是:正好每一個(gè)線程都有足夠的時(shí)間執(zhí)行,因此有界阻塞隊(duì)列不會(huì)填滿,程序能夠正常運(yùn)行。

DiscardOldestPolicy 當(dāng)任務(wù)添加到線程池中被拒絕時(shí),線程池會(huì)放棄等待隊(duì)列中最舊的未處理任務(wù)(拋棄下一個(gè)將被執(zhí)行的任務(wù)),然后將被拒絕的任務(wù)添加到等待隊(duì)列中,如果隊(duì)列是一個(gè)優(yōu)先隊(duì)列,那么拋棄
最舊的策略就會(huì)拋棄優(yōu)先級(jí)最高的任務(wù),因此不要將兩者在一起使用。
如下代碼:
1 package cn.concurrent.executor; 2 3 import java.util.concurrent.ArrayBlockingQueue; 4 import java.util.concurrent.ThreadPoolExecutor; 5 import java.util.concurrent.TimeUnit; 6 7 /** 8 * Created by spark on 17-9-24. 9 */ 10 public class DiscardOledesrPolicy { 11 12 public static void main(String[] args) { 13 14 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, 15 new ArrayBlockingQueue<Runnable>(1)); 16 //設(shè)置飽和策略為DiscardOledestPolicy 17 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); 18 19 20 for (int i = 0; i < 6; i++) { 21 MyRunnable myRunnable = new MyRunnable("this is " + i + " task"); 22 pool.submit(myRunnable); 23 } 24 pool.shutdown(); 25 } 26 27 static class MyRunnable implements Runnable { 28 29 private String name; 30 31 public MyRunnable(String name) { 32 this.name = name; 33 } 34 35 @Override 36 public void run() { 37 System.err.println(this.name + ": is running."); 38 try { 39 Thread.sleep(300); 40 } catch (InterruptedException e) { 41 e.printStackTrace(); 42 } 43 } 44 } 45 }

結(jié)果如下:

/usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -javaagent:/usr/local/idea/lib/idea_rt.jar=33258:/usr/local/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar cn.concurrent.executor.DiscardOledesrPolicy this is 0 task: is running. this is 5 task: is running. Process finished with exit code 0

從結(jié)果中可以看到,1,2,3,4都被丟棄了。

DiscardPolicy 該策略默默地丟棄無法處理的任務(wù),不予任何處理。 代碼如下:
1 package cn.concurrent.executor; 2 3 import java.util.concurrent.ArrayBlockingQueue; 4 import java.util.concurrent.PriorityBlockingQueue; 5 import java.util.concurrent.ThreadPoolExecutor; 6 import java.util.concurrent.TimeUnit; 7 8 /** 9 * Created by spark on 17-9-24. 10 */ 11 public class DiscardPolicy { 12 13 public static void main(String[] args) { 14 ThreadPoolExecutor pool=new ThreadPoolExecutor(1,1,0, TimeUnit.SECONDS, 15 new ArrayBlockingQueue<Runnable>(1)); 16 //添加飽和策略為丟棄策略 17 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); 18 for (int i = 0; i < 6; i++) { 19 MyRunnable myRunnable = new MyRunnable("this is " + i + " task"); 20 pool.submit(myRunnable); 21 } 22 pool.shutdown(); 23 } 24 static class MyRunnable implements Runnable { 25 26 private String name; 27 28 public MyRunnable(String name) { 29 this.name = name; 30 } 31 32 @Override 33 public void run() { 34 System.err.println(this.name + ": is running."); 35 try { 36 Thread.sleep(300); 37 } catch (InterruptedException e) { 38 e.printStackTrace(); 39 } 40 } 41 } 42 }

?結(jié)果如下:

1 /usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -javaagent:/usr/local/idea/lib/idea_rt.jar=41981:/usr/local/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar cn.concurrent.executor.DiscardPolicy 2 this is 0 task: is running. 3 this is 1 task: is running. 4 5 Process finished with exit code 0

?從結(jié)果可以看出,2,3,4,5任務(wù)都被丟棄了。

線程池pool的"最大池大小"和"核心池大小"都為1(THREADS_SIZE),這意味著"線程池能同時(shí)運(yùn)行的任務(wù)數(shù)量最大只能是1"。
線程池pool的阻塞隊(duì)列是ArrayBlockingQueue,ArrayBlockingQueue是一個(gè)有界的阻塞隊(duì)列,ArrayBlockingQueue的容量為1。這也意味著線程池的阻塞隊(duì)列只能有一個(gè)線程池阻塞等待。根據(jù)""中分析的execute()代碼可知:線程池中共運(yùn)行了2個(gè)任務(wù)。第1個(gè)任務(wù)直接放到Worker中,通過線程去執(zhí)行;第2個(gè)任務(wù)放到阻塞隊(duì)列中等待。其他的任務(wù)都被丟棄了!

修改線程池中的隊(duì)列為PriorityBlockingQueue看看結(jié)果

1 package cn.concurrent.executor; 2 3 import java.util.concurrent.ArrayBlockingQueue; 4 import java.util.concurrent.PriorityBlockingQueue; 5 import java.util.concurrent.ThreadPoolExecutor; 6 import java.util.concurrent.TimeUnit; 7 8 /** 9 * Created by spark on 17-9-24. 10 * 13 */ 14 public class DiscardPolicy2 { 15 16 public static void main(String[] args) { 17 ThreadPoolExecutor pool=new ThreadPoolExecutor(1,1,0, TimeUnit.SECONDS, 18 new PriorityBlockingQueue<>(1)); 19 //添加飽和策略為丟棄策略 20 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); 21 for (int i = 0; i < 6; i++) { 22 MyRunnable myRunnable = new MyRunnable("this is " + i + " task"); 23 pool.submit(myRunnable); 24 } 25 pool.shutdown(); 26 } 27 static class MyRunnable implements Runnable { 28 29 private String name; 30 31 public MyRunnable(String name) { 32 this.name = name; 33 } 34 35 @Override 36 public void run() { 37 System.err.println(this.name + ": is running."); 38 try { 39 Thread.sleep(300); 40 } catch (InterruptedException e) { 41 e.printStackTrace(); 42 } 43 } 44 } 45 }
結(jié)果如下:
/usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -javaagent:/usr/local/idea/lib/idea_rt.jar=42797:/usr/local/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar cn.concurrent.executor.DiscardPolicy2 Exception in thread "main" java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to java.lang.Comparableat java.util.concurrent.PriorityBlockingQueue.siftUpComparable(PriorityBlockingQueue.java:357)at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:489)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)at cn.concurrent.executor.DiscardPolicy2.main(DiscardPolicy2.java:23) this is 0 task: is running.

報(bào)錯(cuò)的原因是因?yàn)槲覀兊娜蝿?wù)沒有優(yōu)先級(jí),因此應(yīng)該實(shí)現(xiàn)Comparaable接口再看看

1 package cn.concurrent.executor; 2 3 import java.util.concurrent.ArrayBlockingQueue; 4 import java.util.concurrent.PriorityBlockingQueue; 5 import java.util.concurrent.ThreadPoolExecutor; 6 import java.util.concurrent.TimeUnit; 7 8 /** 9 * Created by spark on 17-9-24. 10 * 線程池pool的"最大池大小"和"核心池大小"都為1(THREADS_SIZE),這意味著"線程池能同時(shí)運(yùn)行的任務(wù)數(shù)量最大只能是1"。 11 * 線程池pool的阻塞隊(duì)列是ArrayBlockingQueue,ArrayBlockingQueue是一個(gè)有界的阻塞隊(duì)列,ArrayBlockingQueue的容量為1。這也意味著線程池的阻塞隊(duì)列只能有一個(gè)線程池阻塞等待。 12 * 根據(jù)""中分析的execute()代碼可知:線程池中共運(yùn)行了2個(gè)任務(wù)。第1個(gè)任務(wù)直接放到Worker中,通過線程去執(zhí)行;第2個(gè)任務(wù)放到阻塞隊(duì)列中等待。其他的任務(wù)都被丟棄了! 13 */ 14 public class DiscardPolicy2 { 15 16 public static void main(String[] args) { 17 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, 18 new PriorityBlockingQueue<>(1)); 19 //添加飽和策略為丟棄策略 20 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); 21 for (int i = 0; i < 6; i++) { 22 MyRunnable myRunnable = new MyRunnable("this is " + i + " task", i); 23 pool.execute(myRunnable); 24 } 25 pool.shutdown(); 26 } 27 28 static class MyRunnable implements Runnable, Comparable { 29 30 private String name; 31 private int num; 32 33 public MyRunnable(String name, int num) { 34 this.name = name; 35 this.num = num; 36 } 37 38 public MyRunnable(String name) { 39 this.name = name; 40 } 41 42 @Override 43 public void run() { 44 System.err.println(this.name + ": is running."); 45 try { 46 Thread.sleep(300); 47 } catch (InterruptedException e) { 48 e.printStackTrace(); 49 } 50 } 51 52 @Override 53 public int compareTo(Object o) { 54 return 0; 55 } 56 } 57 }

結(jié)果:

/usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -agentlib:jdwp=transport=dt_socket,address=127.0.0.1:41534,suspend=y,server=n -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar:/usr/local/idea/lib/idea_rt.jar cn.concurrent.executor.DiscardPolicy2 Connected to the target VM, address: '127.0.0.1:41534', transport: 'socket' this is 0 task: is running. this is 1 task: is running. this is 5 task: is running. this is 4 task: is running. this is 3 task: is running. this is 2 task: is running. Disconnected from the target VM, address: '127.0.0.1:41534', transport: 'socket'Process finished with exit code 0

?這里使用execute沒有使用submit是因?yàn)閟ubmit返回的結(jié)果為FutureTask,這個(gè)類沒有實(shí)現(xiàn)Comparable

CallerRunsPolicy 該策略只要線程池未關(guān)閉,該策略直接在調(diào)用者線程中,運(yùn)行當(dāng)前被丟棄的任務(wù)(白話就是不會(huì)拋棄線程,也不拋出異常,而是將任務(wù)回退到調(diào)用者,
從而降低新任務(wù)的流量),這樣會(huì)影響QPS。
代碼如下:
1 package cn.concurrent.executor; 2 3 import java.util.concurrent.ArrayBlockingQueue; 4 import java.util.concurrent.ThreadPoolExecutor; 5 import java.util.concurrent.TimeUnit; 6 7 /** 8 * Created by spark on 17-9-24. 9 */ 10 public class CallerRunsPolicyDemo { 11 public static void main(String[] args) { 12 ThreadPoolExecutor pool=new ThreadPoolExecutor(1,1,0, TimeUnit.SECONDS, 13 new ArrayBlockingQueue<Runnable>(1)); 14 //添加飽和策略為丟棄策略 15 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); 16 for (int i = 0; i < 6; i++) { 17 MyRunnable myRunnable = new MyRunnable("this is " + i + " task"); 18 pool.submit(myRunnable); 19 } 20 pool.shutdown(); 21 } 22 static class MyRunnable implements Runnable { 23 24 private String name; 25 26 public MyRunnable(String name) { 27 this.name = name; 28 } 29 30 @Override 31 public void run() { 32 System.err.println(this.name + ": is running."); 33 try { 34 Thread.sleep(300); 35 } catch (InterruptedException e) { 36 e.printStackTrace(); 37 } 38 } 39 } 40 }

結(jié)果如下:

/usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -javaagent:/usr/local/idea/lib/idea_rt.jar=40487:/usr/local/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar cn.concurrent.executor.CallerRunsPolicyDemo this is 2 task: is running. this is 0 task: is running. this is 3 task: is running. this is 1 task: is running. this is 5 task: is running. this is 4 task: is running.Process finished with exit code 0

?

我們還可以自定義飽和策略:如下:
1 package cn.concurrent; 2 3 import java.util.concurrent.*; 4 5 /** 6 * Created by spark on 17-9-3. 7 * 主要演示線程池的拒絕策略實(shí)現(xiàn)的接口RejectedExecutionHandler 8 */ 9 public class RejectThreadPoolDemo { 10 11 public static class MyTask implements Runnable { 12 @Override 13 public void run() { 14 System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); 15 try { 16 Thread.sleep(100); 17 } catch (InterruptedException e) { 18 e.printStackTrace(); 19 } 20 } 21 } 22 23 //實(shí)現(xiàn)RejectExecutionHandler 24 public static void main(String[] args) throws InterruptedException { 25 MyTask myTask = new MyTask(); 26 //創(chuàng)建一個(gè)線程池 27 ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, 28 new LinkedBlockingDeque<Runnable>(10), 29 Executors.defaultThreadFactory(), 30 new RejectedExecutionHandler() { 31 //自定義拒絕策略的處理 32 @Override 33 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) { 34 System.out.println(runnable.toString() + " is discard"); 35 } 36 }); 37 for(int i=0;i<Integer.MAX_VALUE;i++){ 38 es.submit(myTask); 39 Thread.sleep(10); 40 } 41 } 42 }

結(jié)果如下:

??

/usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -javaagent:/usr/local/idea/lib/idea_rt.jar=35478:/usr/local/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar cn.concurrent.RejectThreadPoolDemo 1506244149620:Thread ID:10 1506244149630:Thread ID:11 1506244149640:Thread ID:12 1506244149651:Thread ID:13 1506244149661:Thread ID:14 1506244149720:Thread ID:10 1506244149730:Thread ID:11 1506244149740:Thread ID:12 1506244149751:Thread ID:13 1506244149761:Thread ID:14 1506244149821:Thread ID:10 1506244149830:Thread ID:11 1506244149841:Thread ID:12 1506244149851:Thread ID:13 1506244149862:Thread ID:14 java.util.concurrent.FutureTask@63947c6b is discard java.util.concurrent.FutureTask@2b193f2d is discard java.util.concurrent.FutureTask@355da254 is discard java.util.concurrent.FutureTask@4dc63996 is discard java.util.concurrent.FutureTask@d716361 is discard 1506244149921:Thread ID:10 1506244149930:Thread ID:11 1506244149941:Thread ID:12 1506244149951:Thread ID:13 1506244149962:Thread ID:14 java.util.concurrent.FutureTask@6ff3c5b5 is discard java.util.concurrent.FutureTask@3764951d is discard java.util.concurrent.FutureTask@4b1210ee is discard java.util.concurrent.FutureTask@4d7e1886 is discard java.util.concurrent.FutureTask@3cd1a2f1 is discard 1506244150023:Thread ID:10 1506244150031:Thread ID:11

?可以看到,飽和策略生效了,在實(shí)際應(yīng)用中,我們可以記錄日志,分析系統(tǒng)的負(fù)載和任務(wù)丟失的情況。

記錄點(diǎn)點(diǎn)滴滴,有很多要學(xué)習(xí),望大家指點(diǎn)。
?

?

轉(zhuǎn)載于:https://www.cnblogs.com/lwy19998273333/p/7587686.html

總結(jié)

以上是生活随笔為你收集整理的线程池中的饱和策略的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。