原文鏈接?作者:Tomasz Nurkiewicz ? ?譯者:simonwang
ExecutorService抽象概念自Java5就已經提出來了,現在是2014年。順便提醒一下:Java5和Java6都已不被支持,Java7在半年內也將會這樣。我提出這個的原因是許多Java程序員仍然不能完全明白ExecutorService到底是怎樣工作的。還有很多地方要去學習,今天我會分享一些很少人知道的特性和實踐。然而這篇文章仍然是面向中等程序員的,沒什么特別高級的地方。
1. Name pool threads
我想強調一點的是,當在運行JVM或調試期間創建線程時,默認的線程池命名規則是pool-N-thread-M,這里N代表線程池的序列數(每一次你創建一個線程池的時候,全局計數N就加1),而M則是某一個線程池的線程序列數。例如,pool-2-thread-3就意味著JVM生命周期中第2線程池的第3線程。具體可以查看:Executors.defaultThreadFactory()。這樣不具備描述性,JDK使得線程命名的過程有些微的復雜,因為命名的方法隱藏在ThreadFactory內部。幸運地是Guava有一個很有用的類:
1 import?com.google.common.util.concurrent.ThreadFactoryBuilder;
3 final?ThreadFactory threadFactory =?new?ThreadFactoryBuilder()
4 ????????.setNameFormat("Orders-%d")
5 ????????.setDaemon(true)
7 final?ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);
線程池默認創造的是非守護線程,由你來決定是否合適。
2. Switch names according to context
有一個我從?Supercharged jstack: How to Debug Your Servers at 100mph學到的小技巧。一旦我們記住了線程的名字,那么在任何時刻我們都能夠改變它們!這是有道理的,因為線程轉儲顯示了類名和方法名,沒有參數和局部變量。通過調整線程名保留一些必要的事務標識符,我們可以很容易追蹤某一條運行緩慢或者造成死鎖的信息/記錄/查詢等。例如:
01 private?void?process(String messageId) {
02 ????executorService.submit(() -> {
03 ????????final?Thread currentThread = Thread.currentThread();
04 ????????final?String oldName = currentThread.getName();
05 ????????currentThread.setName("Processing-"?+ messageId);
07 ????????????//real logic here...
09 ????????????currentThread.setName(oldName);
在try-finally塊內部,當前線程被命名為Processing-WHATEVER-MESSAGE-ID-IS,當通過系統追蹤信息流時這可能會派上用場。
3. Explicit and safe shutdown
在客戶端線程和線程池之間有一個任務隊列,當你的應用關閉時,你必須關心兩件事:任務隊列會發生什么;正在運行的任務會怎樣(這個時候將詳細介紹)。令人感到吃驚的是許多程序員并不會適當地或有意識地關閉線程池。這有兩個方法:要么讓所有的任務隊列全都執行完(shutdown()),要么舍棄它們(shutdownNow()),這依賴你使用的具體情況。例如如果我們提交一連串的任務并且想要它們在完成后盡可能快的返回,可以使用shutdown():
1 private?void?sendAllEmails(List<String> emails)?throws?InterruptedException {
2 ????emails.forEach(email ->
3 ????????????executorService.submit(() ->
4 ????????????????????sendEmail(email)));
5 ????executorService.shutdown();
6 ????final?boolean?done = executorService.awaitTermination(1, TimeUnit.MINUTES);
7 ????log.debug("All e-mails were sent so far? {}", done);
在這個例子中我們發送了一堆e-mail,每一個都作為一個獨立的任務交給線程池。在提交了所有的任務之后我們執行shutdown使線程池不再接收新的任務。然后最多等待1minute直到所有的任務都完成。然而如果有些任務仍然處于掛起狀態,awaitTermination()將返回false,而那些在等待的任務會繼續執行。我知道一些人會使用新潮的用法:
1 emails.parallelStream().forEach(this::sendEmail);
你可能會覺得我太保守,但我喜歡去控制并行線程的數量。不用介意,還有一種優雅的shutdown()方法shutdownNow():
1 final?List<Runnable> rejected = executorService.shutdownNow();
2 log.debug("Rejected tasks: {}", rejected.size());
這樣一來隊列中還在等待的任務將會被舍棄并被返回,但已經在運行的任務將會繼續。
4. Handle interruption with care
很少人知道Future接口的cancel,這里我不想重復說明,你可以去看我以前的文章:
InterruptedException and interrupting threads explained
5. Monitor queue length and keep it bounded
不合適的線程池大小可能會造成運行緩慢、不穩定以及內存泄漏。如果你配置太少的線程,那么任務隊列就會變大,消耗太多內存。另一方面太多的線程又會由于過度頻繁的上下文切換而造成整個系統運行緩慢。所以觀察隊列的長度并將其限定在一定范圍內是很重要的,這樣的話過載的線程池會短暫拒絕新任務的提交:
1 final?BlockingQueue<Runnable> queue =?new?ArrayBlockingQueue<>(100);
2 executorService =?new?ThreadPoolExecutor(n, n,
3 ????????0L, TimeUnit.MILLISECONDS,
上面的代碼和Executors.newFixedThreadPool(n)是等價的,然而不同的是默認情況下固定線程池使用的是無限制的LinkedBlockingQueue ,我們使用的是固定容量100的ArrayBlockingQueue。這就意味著如果已經有100個任務在排隊(其中有n個任務正在執行),那么新的任務就將被駁回并拋出RejectedExecutionException。一旦在外部可以訪問queue ,那么我們就可以周期性地調用size(),并把它提交到logs/JMX或其他任何你使用的監視器中。
6. Remember about exception handling
下面代碼段的結果是什么?
1 executorService.submit(() -> {
2 ????System.out.println(1?/?0);
我深受其苦:它不會打印任何東西。不會拋出java.lang.ArithmeticException: / by zero,什么也沒有。線程池將忽略這個異常,就像它從來沒發生過。如果上面的代碼是用java.lang.Thread偶然創造的,那么UncaughtExceptionHandler可能會起作用。但在線程池里你就要多加小心了。如果你正在提交Runnable (沒有返回結果,就像上面),那么你必須將整個代碼塊用try-catch包起來,至少要log一下。如果你提交的是Callable,確保你總是使用阻塞的get()方法來重拋異常:
1 final?Future<Integer> division = executorService.submit(() ->?1?/?0);
2 //below will throw ExecutionException caused by ArithmeticException
有趣的是就算是Spring框架在處理這個bug的時候會使用@Async,詳細:?SPR-8995和SPR-12090。
7. Monitor waiting time in a queue
監控工作隊列深度又是一個層面,在排除單個事務或任務的故障時,有必要了解從任務的提交到實際執行耗時多長。這種等待時間最好趨近于零(當線程池中有空閑的線程時),但任務又不得不在隊列中排隊導致等待時間變長。而且如果池內沒有一定數量的線程,在運行新任務時可能需要創造新的線程,而這個過程也是要消耗少量時間的。為了能夠清楚地監測這個時間,我們使用類似下面的代碼包裝原始的ExecutorService :
01 public?class?WaitTimeMonitoringExecutorService?implements?ExecutorService {
03 ????private?final?ExecutorService target;
05 ????public?WaitTimeMonitoringExecutorService(ExecutorService target) {
06 ????????this.target = target;
10 ????public?<T> Future<T> submit(Callable<T> task) {
11 ????????final?long?startTime = System.currentTimeMillis();
12 ????????return?target.submit(() -> {
13 ????????????????????final?long?queueDuration = System.currentTimeMillis() - startTime;
14 ????????????????????log.debug("Task {} spent {}ms in queue", task, queueDuration);
15 ????????????????????return?task.call();
21 ????public?<T> Future<T> submit(Runnable task, T result) {
22 ????????return?submit(() -> {
23 ????????????task.run();
24 ????????????return?result;
29 ????public?Future<?> submit(Runnable task) {
30 ????????return?submit(new?Callable<Void>() {
32 ????????????public?Void call()?throws?Exception {
33 ????????????????task.run();
34 ????????????????return?null;
這并不是完整的實現,但你得知道這個基本概念。當我們向線程池提交任務的那一刻,就立馬開始測量時間,而任務一開始被執行就停止測量。不要被上面源碼中很接近的startTime 和queueDuration 所迷惑了,事實上這兩行是在不同的線程中執行的,可能有數毫秒甚至數秒的差別,例如:
1 Task com.nurkiewicz.MyTask@7c7f3894?spent 9883ms in queue
8. Preserve client stack trace
響應式編程這段日子似乎比較火,Reactive manifesto,reactive streams,RxJava(剛剛發布1.0),Clojure agents,scala.rx…,這些東西都挺好的,但它們的堆棧跟蹤將不再友好,大多數堆棧跟蹤沒有什么卵用。舉個例子,當線程池中的任務拋出了一個異常:
1 java.lang.NullPointerException:?null
2 ????at com.nurkiewicz.MyTask.call(Main.java:76) ~[classes/:na]
3 ????at com.nurkiewicz.MyTask.call(Main.java:72) ~[classes/:na]
4 ????at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0]
5 ????at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0]
6 ????at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0]
7 ????at java.lang.Thread.run(Thread.java:744) ~[na:1.8.0]
我們很容易就發現MyTask在76行拋出了空指針異常,但我們并不知道是誰提交了這個任務,因為堆棧跟蹤僅僅只是告訴你Thread和 ThreadPoolExecutor的信息。我們能通過源碼從技術上定位MyTask被創造的位置,不需要線程(更不必說事件驅動、響應式編程)我們就能夠馬上看到全面信息。如果我們保留客戶端代碼(提交任務的代碼)的堆棧跟蹤并在出現故障的時候將其打印出來會怎么樣?這不是什么新想法,例如Hazelcast會將當前點發生的異常傳送回客戶端代碼,下面就看看保持客戶端堆棧跟蹤是怎樣實現的:
01 public?class?ExecutorServiceWithClientTrace?implements?ExecutorService {
03 ????protected?final?ExecutorService target;
05 ????public?ExecutorServiceWithClientTrace(ExecutorService target) {
06 ????????this.target = target;
10 ????public?<T> Future<T> submit(Callable<T> task) {
11 ????????return?target.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
14 ????private?<T> Callable<T> wrap(final?Callable<T> task,?final?Exception clientStack, String clientThreadName) {
17 ????????????????return?task.call();
18 ????????????}?catch?(Exception e) {
19 ????????????????log.error("Exception {} in task submitted from thrad {} here:", e, clientThreadName, clientStack);
20 ????????????????throw?e;
25 ????private?Exception clientTrace() {
26 ????????return?new?Exception("Client stack trace");
30 ????public?<T> List<Future<T>> invokeAll(Collection<??extends?Callable<T>> tasks)?throws?InterruptedException {
31 ????????return?tasks.stream().map(this::submit).collect(toList());
這次一旦出現異常我們將檢索任務被提交地方的所有堆棧跟蹤和線程名,和標準異常相比下面的異常信息更有價值:
01 Exception java.lang.NullPointerException in task submitted from thrad main here:
02 java.lang.Exception: Client stack trace
03 ????at com.nurkiewicz.ExecutorServiceWithClientTrace.clientTrace(ExecutorServiceWithClientTrace.java:43) ~[classes/:na]
04 ????at com.nurkiewicz.ExecutorServiceWithClientTrace.submit(ExecutorServiceWithClientTrace.java:28) ~[classes/:na]
05 ????at com.nurkiewicz.Main.main(Main.java:31) ~[classes/:na]
06 ????at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0]
07 ????at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0]
08 ????at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0]
09 ????at java.lang.reflect.Method.invoke(Method.java:483) ~[na:1.8.0]
10 ????at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) ~[idea_rt.jar:na]
9. Prefer CompletableFuture
Java 8提出了強大的CompletableFuture,請盡可能的使用它。ExecutorService并沒有擴展支持這個強大的抽象,所以你要小心使用它。用:
1 final?CompletableFuture<BigDecimal> future =
2 ????CompletableFuture.supplyAsync(this::calculate, executorService);
代替:
1 final?Future<BigDecimal> future =
2 ????executorService.submit(this::calculate);
CompletableFuture繼承了Future及其所有功能,而且CompletableFuture所提供的擴展功能極大地豐富了我們的API。
10. Synchronous queue
SynchronousQueue是一種有趣的BlockingQueue但真正意義上并不是queue,事實上它連數據結構都算不上。要解釋的話它算是0容量的隊列,引用JavaDoc:
each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. […]
Synchronous queues are similar to rendezvous channels used in CSP and Ada.
這和線程池有什么關系呢?試著在ThreadPoolExecutor中使用SynchronousQueue:
1 BlockingQueue<Runnable> queue =?new?SynchronousQueue<>();
2 ExecutorService executorService =?new?ThreadPoolExecutor(2,?2,
3 ????????0L, TimeUnit.MILLISECONDS,
我們創造了有兩個線程的線程池和一個SynchronousQueue,因為SynchronousQueue本質上是零容量的隊列,因此如果有空閑線程,ExecutorService只會執行新的任務。如果所有的線程都被占用,新任務會被立刻拒絕不會等待。當進程背景要求立刻啟動或者被丟棄時,這種機制是可取的。 以上,希望你們能夠找到至少一個有用的!
總結
以上是生活随笔 為你收集整理的ExecutorService-10个要诀和技巧 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。