日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

我的Java自定义线程池执行器

發布時間:2023/12/3 java 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 我的Java自定义线程池执行器 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

ThreadPoolExecutor是Java并發api添加的一項功能,可以有效地維護和重用線程,因此我們的程序不必擔心創建和銷毀線程,而將精力放在核心功能上。 我創建了一個自定義線程池執行程序,以更好地了解線程池執行程序的工作方式。

功能性:

  • 它維護一個固定的線程池,即使沒有任務提交也創建線程并啟動線程,而ThreadPoolExecutor根據需要創建線程,即,每當將可運行對象提交給池且線程數小于核心池大小時。
  • 在ThreadPoolExecutor中,我們提供了一個等待隊列,當所有線程忙于運行現有任務時,新的可運行任務將在該隊列中等待。 隊列填滿后,將創建最大線程池大小的新線程。 在MyThreadPool中,我將可運行對象存儲在鏈接列表中,因此每個任務都將在列表中等待且不受限制,因此在此不使用maxPoolSize。
  • 在ThreadPoolExecutor中,我們使用Future Objects從任務中獲取結果,如果結果不可用,則future.get()方法將阻塞,或者使用CompletionService。 在MyThreadPoolExecutor中,我創建了一個名為ResultListener的簡單接口,用戶必須提供對此的實現,如他希望如何處理輸出。 每個任務完成后,ResultListener將獲得任務輸出的回調,或者在發生任何異常的情況下將調用error方法。
  • 調用shutdown方法時,MyThreadPoolExecutor將停止接受新任務并完成剩余任務。
  • 與ThreadPoolExecutor相比,我提供了非常基本的功能,我使用了簡單的線程機制,如wait(),notify(),notifyAll()和join()。
  • 在性能方面,它類似于ThreadPoolExecutor,在某些情況下好一些。 如果您發現任何有趣的結果或改進方法,請告訴我。
package com.util;import java.util.concurrent.Callable;/*** Run submitted task of {@link MyThreadPool} After running the task , It calls* on {@link ResultListener}object with {@link Output}which contains returned* result of {@link Callable}task. Waits if the pool is empty.* * @author abhishek* * @param */import java.util.concurrent.Callable; /** * Run submitted task of {@link MyThreadPool} After running the task , It calls * on {@link ResultListener}object with {@link Output}which contains returned * result of {@link Callable}task. Waits if the pool is empty. * * @author abhishek * * @param <V> */ public class MyThread<V> extends Thread {/*** MyThreadPool object, from which the task to be run*/private MyThreadPool<V> pool;private boolean active = true;public boolean isActive() {return active;}public void setPool(MyThreadPool<V> p) {pool = p;}/*** Checks if there are any unfinished tasks left. if there are , then runs* the task and call back with output on resultListner Waits if there are no* tasks available to run If shutDown is called on MyThreadPool, all waiting* threads will exit and all running threads will exit after finishing the* task*/public void run() {ResultListener<V> result = pool.getResultListener();Callable<V> task;while (true){task = pool.removeFromQueue();if (task != null){try{V output = task.call();result.finish(output);} catch (Exception e){result.error(e);}} else{if (!isActive())break;else{synchronized (pool.getWaitLock()){try{pool.getWaitLock().wait();} catch (InterruptedException e){// TODO Auto-generated catch blocke.printStackTrace();}}}}}}void shutdown() {active = false;} }package com.util; import java.util.LinkedList; import java.util.concurrent.Callable; /** * This class is used to execute submitted {@link Callable} tasks. this class * creates and manages fixed number of threads User will provide a * {@link ResultListener}object in order to get the Result of submitted task * * @author abhishek * * */ public class MyThreadPool<V> {private Object waitLock = new Object();public Object getWaitLock() {return waitLock;}/*** list of threads for completing submitted tasks*/private final LinkedList<MyThread<V>> threads;/*** submitted task will be kept in this list untill they run by one of* threads in pool*/private final LinkedList<Callable<V>> tasks;/*** shutDown flag to shut Down service*/private volatile boolean shutDown;/*** ResultListener to get back the result of submitted tasks*/private ResultListener<V> resultListener;/*** initializes the threadPool by starting the threads threads will wait till* tasks are not submitted** @param size* Number of threads to be created and maintained in pool* @param myResultListener* ResultListener to get back result*/public MyThreadPool(int size, ResultListener<V> myResultListener) {tasks = new LinkedList<Callable<V>>();threads = new LinkedList<MyThread<V>>();shutDown = false;resultListener = myResultListener;for (int i = 0; i < size; i++) {MyThread<V> myThread = new MyThread<V>();myThread.setPool(this);threads.add(myThread);myThread.start();}}public ResultListener<V> getResultListener() {return resultListener;}public void setResultListener(ResultListener<V> resultListener) {this.resultListener = resultListener;}public boolean isShutDown() {return shutDown;}public int getThreadPoolSize() {return threads.size();}public synchronized Callable<V> removeFromQueue() {return tasks.poll();}public synchronized void addToTasks(Callable<V> callable) {tasks.add(callable);}/*** submits the task to threadPool. will not accept any new task if shutDown* is called Adds the task to the list and notify any waiting threads** @param callable*/public void submit(Callable<V> callable) {if (!shutDown) {addToTasks(callable);synchronized (this.waitLock) {waitLock.notify();}} else {System.out.println('task is rejected.. Pool shutDown executed');}}/*** Initiates a shutdown in which previously submitted tasks are executed,* but no new tasks will be accepted. Waits if there are unfinished tasks* remaining**/public void stop() {for (MyThread<V> mythread : threads) {mythread.shutdown();}synchronized (this.waitLock) {waitLock.notifyAll();}for (MyThread<V> mythread : threads) {try {mythread.join();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}} }package com.util;/*** This interface imposes finish method * which is used to get the {@link Output} object * of finished task* @author abhishek** @param */public interface ResultListener {public void finish(T obj);public void error(Exception ex);}

您可以根據需要實現此類并返回并處理任務返回的結果。

package com.util;public class DefaultResultListener implements ResultListener{@Overridepublic void finish(Object obj) {}@Overridepublic void error(Exception ex) {ex.printStackTrace();}}

例如,此類將添加task返回的數字。

package com.util;import java.util.concurrent.atomic.AtomicInteger;/*** ResultListener class to keep track of total matched count* @author abhishek* * @param */ public class MatchedCountResultListenerimplements ResultListener{/*** matchedCount to keep track of the number of matches returned by submitted* task*/AtomicInteger matchedCount = new AtomicInteger();/*** this method is called by ThreadPool to give back the result of callable* task. if the task completed successfully then increment the matchedCount by* result count*/@Overridepublic void finish(V obj) {//System.out.println('count is '+obj);matchedCount.addAndGet((Integer)obj);}/*** print exception thrown in running the task*/@Overridepublic void error(Exception ex) {ex.printStackTrace();}/*** returns the final matched count of all the finished tasks* * @return*/public int getFinalCount() {return matchedCount.get();} }

這是一個測試類,使用CompletionService和MyThreadPoolExecutor對循環運行簡單

package test;import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future;import com.util.DefaultResultListener; import com.util.MyThreadPool;public class TestClass {public static void main(String[] args) throws InterruptedException {CompletionServicethreadService;ExecutorService service = Executors.newFixedThreadPool(2);threadService = new ExecutorCompletionService(service);long b = System.currentTimeMillis();for(int i =0;i<50000;i++){threadService.submit(new MyRunable (i));}service.shutdown();System.out.println('time taken by Completion Service ' + (System.currentTimeMillis()-b));DefaultResultListener result = new DefaultResultListener();MyThreadPoolnewPool = new MyThreadPool(2,result);long a = System.currentTimeMillis();int cc =0;for(int i =0;i<50000;i++){cc = cc+i;}System.out.println('time taken without any pool ' + (System.currentTimeMillis()-a));a= System.currentTimeMillis();for(int i =0;i<5000;i++){newPool.submit(new MyRunable (i));}newPool.stop();System.out.println('time taken by myThreadPool ' + (System.currentTimeMillis()-a));}}class MyRunable implements Callable{int index = -1;public MyRunable(int index){this.index = index;}@Overridepublic Integer call() throws Exception {return index;}}

參考: 我的JCG合作伙伴 Abhishek Somani在Java,J2EE和Server博客上的Java 自定義線程池執行程序 。

翻譯自: https://www.javacodegeeks.com/2013/03/my-custom-thread-pool-executor-in-java.html

總結

以上是生活随笔為你收集整理的我的Java自定义线程池执行器的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。