日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ExecutorService – 10个提示和技巧

發布時間:2023/12/3 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ExecutorService – 10个提示和技巧 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

從Java 5開始就已經存在ExecutorService抽象。在這里我們談論的是2004。 提醒一下:Java 5和6不再受支持,Java 7 將不在半年之內 。 之所以提出這一點,是因為許多Java程序員仍然不完全了解ExecutorService工作方式。 有很多地方可以學習,今天,我想分享一些鮮為人知的功能和做法。 但是,本文仍然針對中級程序員,沒有什么特別高級的。

1.名稱池線程

我不能強調這一點。 轉儲正在運行的JVM的線程時或在調試過程中,默認的線程池命名方案為pool-N-thread-M ,其中N代表池序列號(每次創建新的線程池時,全局N計數器都會遞增),而M是池中的線程序列號。 例如, pool-2-thread-3表示在JVM生命周期中創建的第二個池中的第三個線程。 請參閱: Executors.defaultThreadFactory() 。 描述性不強。 由于命名策略隱藏在ThreadFactory ,因此JDK使得正確命名線程變得有些復雜。 幸運的是,番石榴為此提供了一個幫助器類:

import com.google.common.util.concurrent.ThreadFactoryBuilder;final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("Orders-%d").setDaemon(true).build(); final ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);

默認情況下,線程池會創建非守護線程,并確定是否適合您。

2.根據上下文切換名稱

這是我從Supercharged jstack學到的技巧:如何以100mph的速度調試服務器 。 一旦我們記住了線程名稱,我們就可以在運行時隨時更改它們! 這是有道理的,因為線程轉儲顯示類和方法名稱,而不顯示參數和局部變量。 通過調整線程名稱以保留一些必要的事務標識符,我們可以輕松跟蹤哪個消息/記錄/查詢/等。 緩慢或導致死鎖。 例:

private void process(String messageId) {executorService.submit(() -> {final Thread currentThread = Thread.currentThread();final String oldName = currentThread.getName();currentThread.setName("Processing-" + messageId);try {//real logic here...} finally {currentThread.setName(oldName);}}); }

在try內部- finally阻止當前線程被命名為Processing-WHATEVER-MESSAGE-ID-IS 。 在跟蹤通過系統的消息流時,這可能會派上用場。

3.明確安全關閉

在客戶端線程和線程池之間有一個任務隊列。 當您的應用程序關閉時,您必須注意兩件事:排隊任務正在發生的事情以及已運行的任務的行為方式(稍后會詳細介紹)。 令人驚訝的是,許多開發人員沒有正確或有意識地關閉線程池。 有兩種技術:讓所有排隊的任務執行( shutdown() )或刪除它們( shutdownNow() )–這完全取決于您的用例。 例如,如果我們提交了一堆任務,并希望所有任務完成后立即返回,請使用shutdown() :

private void sendAllEmails(List<String> emails) throws InterruptedException {emails.forEach(email ->executorService.submit(() ->sendEmail(email)));executorService.shutdown();final boolean done = executorService.awaitTermination(1, TimeUnit.MINUTES);log.debug("All e-mails were sent so far? {}", done); }

在這種情況下,我們發送了一堆電子郵件,每個電子郵件都是線程池中的一個單獨任務。 提交這些任務后,我們將關閉池,以使其不再接受任何新任務。 然后,我們最多等待一分鐘,直到所有這些任務完成。 但是,如果某些任務仍未完成,則awaitTermination()將僅返回false 。 此外,待處理的任務將繼續處理。 我知道趕時髦的人會去:

emails.parallelStream().forEach(this::sendEmail);

稱我為老式,但我喜歡控制并行線程的數量。 沒關系,優雅的shutdown()的替代方法是shutdownNow() :

final List<Runnable> rejected = executorService.shutdownNow(); log.debug("Rejected tasks: {}", rejected.size());

這次所有排隊的任務都將被丟棄并返回。 允許已運行的作業繼續。

4.小心處理中斷

Future接口鮮為人知的功能是取消。 與其重復自己,不如查看我的較早文章: InterruptedException和中斷線程說明

5.監視隊列長度并使其有界

大小不正確的線程池可能會導致運行緩慢,不穩定和內存泄漏。 如果配置的線程太少,則會建立隊列,從而消耗大量內存。 另一方面,由于上下文切換過多,線程過多會減慢整個系統的速度,并導致相同的癥狀。 重要的是要查看隊列的深度并使其有界,以便過載的線程池只是暫時拒絕新任務:

final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100); executorService = new ThreadPoolExecutor(n, n,0L, TimeUnit.MILLISECONDS,queue);

上面的代碼等效于Executors.newFixedThreadPool(n) ,但是我們使用固定容量為100 ArrayBlockingQueue代替了默認的無限LinkedBlockingQueue 。 這意味著,如果已經有100個任務排隊(并且正在執行n個任務),則新任務將被RejectedExecutionException 。 另外,由于queue現在可以從外部使用,因此我們可以定期調用size()并將其放入日志/ JMX /您使用的任何監視機制中。

6.記住關于異常處理

以下代碼段將產生什么結果?

executorService.submit(() -> {System.out.println(1 / 0); });

我被那太多次咬傷:它不會打印出任何東西 。 沒有java.lang.ArithmeticException: / by zero符號java.lang.ArithmeticException: / by zero ,沒有。 線程池只是吞沒了這個異常,就好像它從未發生過一樣。 如果這是一個很好的從頭開始創建的java.lang.Thread , UncaughtExceptionHandler可以工作。 但是對于線程池,您必須更加小心。 如果您要提交Runnable (沒有任何結果,如上所示),則必須用try – catch至少將其記錄下來。 如果要提交Callable<Integer> ,請確保始終使用阻塞get()取消引用它以重新引發異常:

final Future<Integer> division = executorService.submit(() -> 1 / 0); //below will throw ExecutionException caused by ArithmeticException division.get();

有趣的是,即使是Spring框架也使用@Async造成了此錯誤,請參閱: SPR-8995和SPR-12090 。

7.監視隊列中的等待時間

監視工作隊列深度是一方面。 但是,在對單個事務/任務進行故障排除時,值得一看的是在提交任務和實際執行之間經過了多少時間。 此持續時間最好應接近0(當池中有一些空閑線程時),但是當必須將任務排隊時,它將持續增長。 此外,如果池中沒有固定數量的線程,則運行新任務可能需要生成線程,這也消耗了很短的時間。 為了干凈地監視此指標,請使用類似于以下內容的東西包裝原始ExecutorService :

public class WaitTimeMonitoringExecutorService implements ExecutorService {private final ExecutorService target;public WaitTimeMonitoringExecutorService(ExecutorService target) {this.target = target;}@Overridepublic <T> Future<T> submit(Callable<T> task) {final long startTime = System.currentTimeMillis();return target.submit(() -> {final long queueDuration = System.currentTimeMillis() - startTime;log.debug("Task {} spent {}ms in queue", task, queueDuration);return task.call();});}@Overridepublic <T> Future<T> submit(Runnable task, T result) {return submit(() -> {task.run();return result;});}@Overridepublic Future<?> submit(Runnable task) {return submit(new Callable<Void>() {@Overridepublic Void call() throws Exception {task.run();return null;}});}//...}

這不是一個完整的實現,但是您可以了解基本思想。 當我們向線程池提交任務時,我們立即開始計算時間。 我們一接到任務就立即停止并開始執行。 不要被源代碼中的startTime和queueDuration緊密聯系著。 實際上,這兩行是在不同的線程中求值的,可能相隔數毫秒甚至數秒,例如:

Task com.nurkiewicz.MyTask@7c7f3894 spent 9883ms in queue

8.保留客戶端堆棧跟蹤

最近,反應式編程似乎引起了很多關注。 反應性清單 , 反應性流 , RxJava (剛剛發布1.0!), Clojure代理 , scala.rx …它們都很好用 ,但是堆棧跟蹤不再是您的朋友,它們最多沒有用。 以提交給線程池的任務中發生的異常為例:

java.lang.NullPointerException: nullat com.nurkiewicz.MyTask.call(Main.java:76) ~[classes/:na]at com.nurkiewicz.MyTask.call(Main.java:72) ~[classes/:na]at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0]at java.lang.Thread.run(Thread.java:744) ~[na:1.8.0]

我們很容易發現MyTask在第76行處拋出了NPE。但是我們不知道誰提交了此任務,因為堆棧跟蹤僅顯示Thread和ThreadPoolExecutor 。 從技術上講,我們可以瀏覽源代碼,以期僅找到創建MyTask地方。 但是如果沒有線程(更不用說事件驅動,反應式,演員忍者編程),我們將立即看到完整的畫面。 如果我們可以保留客戶端代碼(提交任務的代碼)的堆棧跟蹤并顯示出來(例如在失敗的情況下)怎么辦? 這個想法并不新鮮,例如Hazelcast將異常從所有者節點傳播到客戶端代碼 。 這看起來可能是天真的支持,以便在發生故障時保持客戶端堆棧跟蹤:

public class ExecutorServiceWithClientTrace implements ExecutorService {protected final ExecutorService target;public ExecutorServiceWithClientTrace(ExecutorService target) {this.target = target;}@Overridepublic <T> Future<T> submit(Callable<T> task) {return target.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));}private <T> Callable<T> wrap(final Callable<T> task, final Exception clientStack, String clientThreadName) {return () -> {try {return task.call();} catch (Exception e) {log.error("Exception {} in task submitted from thrad {} here:", e, clientThreadName, clientStack);throw e;}};}private Exception clientTrace() {return new Exception("Client stack trace");}@Overridepublic <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {return tasks.stream().map(this::submit).collect(toList());}//...}

這次如果發生故障,我們將檢索提交任務的地方的完整堆棧跟蹤和線程名稱。 與之前看到的標準異常相比,它具有更大的價值:

Exception java.lang.NullPointerException in task submitted from thrad main here: java.lang.Exception: Client stack traceat com.nurkiewicz.ExecutorServiceWithClientTrace.clientTrace(ExecutorServiceWithClientTrace.java:43) ~[classes/:na]at com.nurkiewicz.ExecutorServiceWithClientTrace.submit(ExecutorServiceWithClientTrace.java:28) ~[classes/:na]at com.nurkiewicz.Main.main(Main.java:31) ~[classes/:na]at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0]at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0]at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0]at java.lang.reflect.Method.invoke(Method.java:483) ~[na:1.8.0]at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) ~[idea_rt.jar:na]

9.首選CompletableFuture

在Java 8中,引入了更強大的CompletableFuture 。 請盡可能使用它。 沒有擴展ExecutorService來支持這種增強的抽象,因此您必須自己照顧它。 代替:

final Future<BigDecimal> future = executorService.submit(this::calculate);

做:

final CompletableFuture<BigDecimal> future = CompletableFuture.supplyAsync(this::calculate, executorService);

CompletableFuture擴展了Future因此一切都CompletableFuture運行。 但是,API的更高級的使用者將真正欣賞CompletableFuture提供的擴展功能。

10.同步隊列

SynchronousQueue是一個有趣的BlockingQueue ,它實際上不是隊列。 它本身并不是一個數據結構。 最好將其解釋為容量為0的隊列。引用JavaDoc:

每個insert操作必須等待另一個線程進行相應的remove操作,反之亦然。 同步隊列沒有任何內部容量,甚至沒有一個容量。 您無法窺視同步隊列,因為僅當您嘗試刪除它時,該元素才存在。 您不能插入元素(使用任何方法),除非另一個線程試圖將其刪除; 您無法迭代,因為沒有要迭代的內容。 […]

同步隊列類似于CSP和Ada中使用的集合通道。

這與線程池有什么關系? 嘗試將SynchronousQueue與ThreadPoolExecutor :

BlockingQueue<Runnable> queue = new SynchronousQueue<>(); ExecutorService executorService = new ThreadPoolExecutor(n, n,0L, TimeUnit.MILLISECONDS,queue);

我們創建了一個線程池,該線程池具有兩個線程和一個在其前面的SynchronousQueue 。 由于SynchronousQueue本質上是一個容量為0的隊列,因此,如果有可用的空閑線程,則此類ExecutorService將僅接受新任務。 如果所有線程都忙,則新任務將立即被拒絕并且永遠不會等待。 當后臺處理必須立即開始或被丟棄時,此行為可能是理想的。

就是這樣,希望您發現至少一個有趣的功能!

翻譯自: https://www.javacodegeeks.com/2014/11/executorservice-10-tips-and-tricks.html

總結

以上是生活随笔為你收集整理的ExecutorService – 10个提示和技巧的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。