Java并发编程高级篇(十):分离任务的执行和结果的处理
2019獨角獸企業重金招聘Python工程師標準>>>
在之前的例子中,我們使用執行器框架都是在主類中提交任務,等待任務執行完畢后再去處理任務執行的結果。接下來我們打算將任務的提交和結果的處理都放置到線程中去執行。在每個任務內部提交自己到執行器,然后通過一個統一的結果處理線程來處理所有任務執行的結果。
為了解決這個問題,執行器框架為我們提供了一個CompletionService類,任務執行線程和結果處理線程能夠共享這個類,結果處理線程便可以在這里渠道已經執行完畢的任務的結果。CompletionService類的內部也是通過一個ExecutorService來提交任務的。
首先,創建任務線程,實現Callable接口。模擬報表生成過程。
/*** 模擬生成報告** Created by hadoop on 2016/11/3.*/ public class ReportGenerator implements Callable<String> {private String sender;private String title;public ReportGenerator(String sender, String title) {this.sender = sender;this.title = title;}@Overridepublic String call() throws Exception {long duration = (long)(Math.random() * 10);System.out.printf("ReportGenerator: Generator report %s_%s duration %d seconds.\n", sender, title, duration);TimeUnit.SECONDS.sleep(duration);return sender + "_" + title;} }然后我們創建任務提交線程,這個線程的構造方法接受兩個參數,分別是報表名稱和CompletionService對象。將報表生成任務提交到CompletionService去執行。
import java.util.concurrent.CompletionService;/*** 模擬請求獲取報告** Created by hadoop on 2016/11/3.*/ public class ReportRequest implements Runnable {private String name;private CompletionService<String> service;public ReportRequest(String name, CompletionService<String> service) {this.name = name;this.service = service;}@Overridepublic void run() {ReportGenerator generator = new ReportGenerator(name, "Report");service.submit(generator);} }下面我們創建任務結果處理類,來打印生成的報表。這個類同樣會拿到CompletionService的引用,然后循環調用CompletionService.poll()方法來從任務結果隊列中獲取執行的結果,這個方法接受一個時間參數,如果當前結果隊列為空,那么則等待這個時間,超時返回null。不帶參數的poll()方法,如果對別為空則直接返回null。
/*** 處理報表結果** Created by hadoop on 2016/11/3.*/ public class ReportProcessor implements Runnable {private boolean end;private CompletionService<String> service;public ReportProcessor(boolean end, CompletionService<String> service) {this.end = end;this.service = service;}@Overridepublic void run() {while (!end) {try {Future<String> future = service.poll(20, TimeUnit.SECONDS);if (future != null) {System.out.printf("ReportReceiver: received %s\n", future.get());}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}}public void setEnd(boolean end) {this.end = end;} }最后我們創建主方法類。在這里我們創建ExecutorServer并把它賦值給ExecutorCompletionService。之后創建兩個報表請求任務和一個報表處理任務,同時持有ExecutorCompletionService的引用。
import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;/*** 在執行器中分離任務執行和結果處理** 我們如何處理在一個對象里發送任務給執行器,在另一個對象里處理任務執行結果。* 對于這種情況Java API提供了CompletionService類** CompletionService使用Executor對象類執行任務。* 優勢在于:可以共享CompletionService。* 缺點在于:CompletionService獲取的Future對象只能是已經執行完畢的任務,他沒有辦法控制任務狀態,只能處理任務結果。** 我們創建了一個ExecutorService,然后使用這個ExecutorService來初始化一個ExecutorCompletionService<String>(executor)。** 首先創建了兩個ReportRequest任務,然后在任務內部使用service.submit(generator)方法調用報表生成任務。** 然后再ReportProcessor中不斷調用service.poll(20, TimeUnit.SECONDS);方法獲取已經執行完的結果,如果當前沒有結果那么等待20秒。** CompletionService還提供了兩個人方法:* poll():如果沒有任何Future直接返回null。* take():如若任務隊列中沒有Future那么阻塞知道有可用的Future。** Created by hadoop on 2016/11/3.*/ public class Main {public static void main(String[] args) {ExecutorService executor = Executors.newCachedThreadPool();CompletionService<String> service = new ExecutorCompletionService<String>(executor);ReportRequest request1 = new ReportRequest("Face", service);ReportRequest request2 = new ReportRequest("Online", service);ReportProcessor processor = new ReportProcessor(false, service);Thread thread1 = new Thread(request1);Thread thread2 = new Thread(request2);Thread thread3 = new Thread(processor);thread1.start();thread2.start();thread3.start();try {thread1.join();thread2.join();} catch (InterruptedException e) {e.printStackTrace();}executor.shutdown();try {executor.awaitTermination(1, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}processor.setEnd(true);} }控制臺中,我們可以看到兩個任務的提交信息和結果處理信息。
ReportGenerator: Generator report Online_Report duration 4 seconds. ReportGenerator: Generator report Face_Report duration 1 seconds. ReportReceiver: received Face_Report ReportReceiver: received Online_Report轉載于:https://my.oschina.net/nenusoul/blog/849165
總結
以上是生活随笔為你收集整理的Java并发编程高级篇(十):分离任务的执行和结果的处理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【Hadoop Summit Tokyo
- 下一篇: java美元兑换,(Java实现) 美元