先进的ListenableFuture功能
上次我們熟悉了ListenableFuture 。 我答應介紹更高級的技術,即轉換和鏈接。 讓我們從簡單的事情開始。 假設我們有從某些異步服務獲得的ListenableFuture<String> 。 我們還有一個簡單的方法:
我們不需要String ,我們需要Document 。 一種方法是簡單地解析Future ( 等待它)并在String上進行處理。 但是,更優雅的解決方案是在結果可用后立即應用轉換,并將我們的方法視為始終返回ListenableFuture<Document> 。 這很簡單:
final ListenableFuture<String> future = //...final ListenableFuture<Document> documentFuture = Futures.transform(future, new Function<String, Document>() {@Overridepublic Document apply(String contents) {return parse(contents);} });或更可讀:
final Function<String, Document> parseFun = new Function<String, Document>() {@Overridepublic Document apply(String contents) {return parse(contents);} };final ListenableFuture<String> future = //...final ListenableFuture<Document> documentFuture = Futures.transform(future, parseFun);Java語法有一定的局限性,但請專注于我們剛剛做的事情。 Futures.transform()不會等待基礎的ListenableFuture<String>應用parse()轉換。 相反,它在后臺注冊了一個回調,希望在給定將來完成時得到通知。 在適當的時候對我們動態透明地應用了此轉換。 我們仍然有Future ,但是這次包裝了Document 。
因此,讓我們更進一步。 我們還有一個異步的,可能長時間運行的方法,用于計算給定Document 相關性 (無論在這種情況下是什么):
ListenableFuturecalculateRelevance(Document pageContents) {//...我們可以以某種方式將其與我們已經擁有的ListenableFuture<Document>嗎? 第一次嘗試:
final Function<Document, ListenableFuture<Double>> relevanceFun = new Function<Document, ListenableFuture<Double>>() {@Overridepublic ListenableFuture<Double> apply(Document input) {return calculateRelevance(input);} };final ListenableFuture<String> future = //... final ListenableFuture<Document> documentFuture = Futures.transform(future, parseFun); final ListenableFuture<ListenableFuture<Double>> relevanceFuture = Futures.transform(documentFuture, relevanceFun);哎喲! Double Future的未來,看起來不太好。 一旦我們解決了外部的未來,我們也需要等待內部的未來。 絕對不優雅。 我們可以做得更好嗎?
final AsyncFunction<Document, Double> relevanceAsyncFun = new AsyncFunction<Document, Double>() {@Overridepublic ListenableFuture<Double> apply(Document pageContents) throws Exception {return calculateRelevance(pageContents);} };final ListenableFuture<String> future = //comes from ListeningExecutorService final ListenableFuture<Document> documentFuture = Futures.transform(future, parseFun); final ListenableFuture<Double> relevanceFuture = Futures.transform(documentFuture, relevanceAsyncFun);請非常仔細地查看所有類型和結果。 注意Function和AsyncFunction之間的區別。 最初,我們有一個異步方法返回String future。 后來,我們對其進行了轉換,以將String無縫轉換為XML Document 。 內部將來完成后,此轉換將異步發生。 具有Document future,我們想調用一個需要Document并返回Double future的方法。
如果調用relevanceFuture.get() ,則我們的Future對象將首先等待內部任務完成,其結果( String -> Document )將等待外部任務并返回Double 。 我們還可以在relevanceFuture上注冊回調,該回調將在外部任務( calculateRelevance() )完成時觸發。 如果您仍然在這里,那就是更加瘋狂的轉變。
請記住,所有這些都是循環發生的。 對于每個網站,我們都有ListenableFuture<String> ,我們將其異步轉換為ListenableFuture<Double> 。 所以最后我們使用List<ListenableFuture<Double>> 。 這也意味著,為了提取所有結果,我們要么為每個ListenableFuture注冊偵聽器,要么等待它們中的每個。 根本沒有進步。 但是,如果我們可以輕松地從List<ListenableFuture<Double>>為ListenableFuture<List<Double>>怎么辦? 仔細閱讀-從期貨清單到清單的未來。 換句話說,不是擁有一堆小小的期貨,而是有一個將在所有子期貨都完成后完成的期貨–并且將結果一對一映射到目標列表。 猜猜,Guava可以做到這一點!
final List<ListenableFuture<Double>> relevanceFutures = //...; final ListenableFuture<List<Double>> futureOfRelevance = Futures.allAsList(relevanceFutures);當然,這里也沒有等待。 包裝器ListenableFuture<List<Double>>將在其子期貨之一完成時得到通知。 最后一個孩子ListenableFuture<Double>完成時,外部將來也完成。 一切都是事件驅動的,對您完全隱藏。
你認為就是這樣嗎? 假設我們要計算整個集合中最大的相關性。 您可能現在已經知道,我們不會等待List<Double> 。 相反,我們將注冊從List<Double>到Double !
final ListenableFuture<Double> maxRelevanceFuture = Futures.transform(futureOfRelevance, new Function<List<Double>, Double>() {@Overridepublic Double apply(List<Double> relevanceList) {return Collections.max(relevanceList);} });最后,我們可以偵聽maxRelevanceFuture完成事件,并使用JMS發送結果(異步!)。 如果您迷路了,這是完整的代碼:
private Document parse(String xml) {return //... }private final Function<String, Document> parseFun = new Function<String, Document>() {@Overridepublic Document apply(String contents) {return parse(contents);} };private ListenableFuture<Double> calculateRelevance(Document pageContents) {return //... }final AsyncFunction<Document, Double> relevanceAsyncFun = new AsyncFunction<Document, Double>() {@Overridepublic ListenableFuture<Double> apply(Document pageContents) throws Exception {return calculateRelevance(pageContents);} };//...final ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10) );final List<ListenableFuture<Double>> relevanceFutures = new ArrayList<>(topSites.size()); for (final URL siteUrl : topSites) {final ListenableFuture<String> future = pool.submit(new Callable<String>() {@Overridepublic String call() throws Exception {return IOUtils.toString(siteUrl, StandardCharsets.UTF_8);}});final ListenableFuture<Document> documentFuture = Futures.transform(future, parseFun);final ListenableFuture<Double> relevanceFuture = Futures.transform(documentFuture, relevanceAsyncFun);relevanceFutures.add(relevanceFuture); }final ListenableFuture<List<Double>> futureOfRelevance = Futures.allAsList(relevanceFutures); final ListenableFuture<Double> maxRelevanceFuture = Futures.transform(futureOfRelevance, new Function<List<Double>, Double>() {@Overridepublic Double apply(List<Double> relevanceList) {return Collections.max(relevanceList);} });Futures.addCallback(maxRelevanceFuture, new FutureCallback<Double>() {@Overridepublic void onSuccess(Double result) {log.debug("Result: {}", result);}@Overridepublic void onFailure(Throwable t) {log.error("Error :-(", t);} });它值得嗎? 是的 , 沒有 。 是的 ,因為我們了解了一些與期貨/承諾一起使用的非常重要的構造和原語:鏈接,映射(轉換)和歸約。 該解決方案在CPU利用率方面非常出色-無需等待,阻塞等。請記住, Node.js的最大優勢在于其“無阻塞”策略。 在Netty期貨中也無處不在。 最后但并非最不重要的一點是,它感覺非常實用 。
另一方面,主要是由于Java語法冗長和缺乏類型推斷(是的,我們將很快進入Scala),代碼似乎非常難以閱讀,難以遵循和維護。 好吧,在某種程度上,這適用于所有消息驅動的系統。 但是,只要我們不發明更好的API和原語,我們就必須學習生存并利用異步,高度并行的計算。
如果您想進一步嘗試ListenableFuture ,請不要忘記閱讀官方文檔 。
參考: NoBlogDefFound博客中來自我們的JCG合作伙伴 Tomasz Nurkiewicz的高級ListenableFuture功能 。
翻譯自: https://www.javacodegeeks.com/2013/03/advanced-listenablefuture-capabilities.html
總結
以上是生活随笔為你收集整理的先进的ListenableFuture功能的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 传奇被坑怎么攻击服务器(如何传奇服务器被
- 下一篇: java.util.concurrent