日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

实现自定义的未来

發布時間:2023/12/3 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 实现自定义的未来 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

上一次我們學習了java.util.concurrent.Future<T>背后的原理 。 我們還發現, Future<T>通常由庫或框架返回。 但是沒有什么可以阻止我們在有意義的情況下自行實現所有功能。 它不是特別復雜,可以顯著改善您的設計。 我盡力為我們的示例選擇有趣的用例。

JMS(Java消息服務)是用于發送異步消息的標準Java API。 當我們想到JMS時??,我們立即看到客戶端以一發不可收拾的方式向服務器(代理)發送消息。 但是在JMS之上實現請求-答復消息傳遞模式同樣普遍。 實現非常簡單:您將請求消息(當然是異步地)發送到另一側的MDB。
MDB處理該請求,然后將答復發送回硬編碼的答復隊列或客戶機選擇的任意隊列,并與JMSReplyTo屬性中的消息一起發送。 第二種情況更有趣。 客戶端可以創建一個臨時隊列,并在發送請求時將其用作回復隊列。 這樣,每個請求/答復對使用不同的答復隊列,因此不需要關聯ID,選擇器等。

但是有一個問題。 向JMS代理發送消息是簡單且異步的。 但是,收到答復要麻煩得多。 您可以實現MessageListener以使用一條消息,也可以使用阻塞MessageConsumer.receive() 。 第一種方法非常笨重,很難在實踐中使用。 第二個失敗了異步消息傳遞的目的。 您還可以按一定的時間間隔輪詢答復隊列,這聽起來更糟。

到現在為止,了解Future抽象您應該有一些設計想法。 如果我們可以發送請求消息并取回Future<T> (表示尚未收到的答復消息)怎么辦? Future抽象應該處理所有邏輯,我們可以放心地將其用作將來結果的句柄。 這是用于創建臨時隊列和發送請求的管道代碼:

private <T extends Serializable> Future<T> asynchRequest(ConnectionFactory connectionFactory, Serializable request, String queue) throws JMSException {Connection connection = connectionFactory.createConnection();connection.start();final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);final Queue tempReplyQueue = session.createTemporaryQueue();final ObjectMessage requestMsg = session.createObjectMessage(request);requestMsg.setJMSReplyTo(tempReplyQueue);sendRequest(session.createQueue(queue), session, requestMsg);return new JmsReplyFuture<T>(connection, session, tempReplyQueue); }

asynchRequest()方法僅將ConnectionFactory帶到JMS代理和任意數據。 該對象將使用ObjectMessage發送到queue 。 最后一行至關重要–我們返回自定義的JmsReplyFuture<T> ,它將表示尚未收到的回復。 注意,我們如何將臨時JMS隊列傳遞給JMSReplyTo屬性和Future 。 MDB方面的實現并不那么重要。 不用說是將回復發送回指定隊列:

final ObjectMessage reply = session.createObjectMessage(...); session.createProducer(request.getJMSReplyTo()).send(reply);

因此,讓我們深入研究JmsReplyFuture<T> 。 我假設請求和答復都是ObjectMessage 。 使用不同類型的消息不是很困難。 首先讓我們看看如何設置從回復通道接收消息:

public class JmsReplyFuture<T extends Serializable> implements Future<T>, MessageListener {//...public JmsReplyFuture(Connection connection, Session session, Queue replyQueue) throws JMSException {this.connection = connection;this.session = session;replyConsumer = session.createConsumer(replyQueue);replyConsumer.setMessageListener(this);}@Overridepublic void onMessage(Message message) {//...}}

如您所見, JmsReplyFuture實現了Future<T> (其中T是包裝在ObjectMessage的對象的預期類型)和JMS MessageListener 。 在構造函數中,我們只是開始偵聽replyQueue 。 根據我們的設計假設,我們知道那里最多會有一條消息,因為回復隊列是臨時丟棄隊列。 在上一篇文章中,我們了解到Future.get()應該在等待結果時阻塞。 另一方面, onMessage()是從某些內部JMS客戶端線程/庫調用的回調方法。 顯然,我們需要一些共享變量/鎖,以使等待中的get()知道答復已到達。 最好我們的解決方案應該是輕量級的,并且不引入任何延遲,因此忙于等待volatile變量是一個壞主意。 最初,我雖然使用了Semaphore ,但我將使用它來取消阻塞onMessage() get() onMessage() 。 但是我仍然需要一些共享變量來保存實際的回復對象。 因此,我想到了使用ArrayBlockingQueue的想法。 當我們知道不會再有一個項目時,使用隊列聽起來可能很奇怪。 但是它利用舊的生產者-消費者模式很好地工作: Future.get()是一個消費者,它阻塞了空隊列的poll()方法。 另一方面, onMessage()是生產者,將回復消息放置在該隊列中并立即取消阻塞消費者。 外觀如下:

public class JmsReplyFuture<T extends Serializable> implements Future<T>, MessageListener {private final BlockingQueue<T> reply = new ArrayBlockingQueue<>(1);//...@Overridepublic T get() throws InterruptedException, ExecutionException {return this.reply.take();}@Overridepublic T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {final T replyOrNull = reply.poll(timeout, unit);if (replyOrNull == null) {throw new TimeoutException();}return replyOrNull;}@Overridepublic void onMessage(Message message) {final ObjectMessage objectMessage = (ObjectMessage) message;final Serializable object = objectMessage.getObject();reply.put((T) object);//...}}

實施仍未完成,但涵蓋了最重要的概念。 請注意, BlockingQueue.poll(long, TimeUnit)方法非常適合Future.get(long, TimeUnit) 。 不幸的是,即使它們來自相同的程序包并且在相同的時間內或多或少地被開發,一種方法在超時時返回null ,而另一種方法應引發異常。 易于修復。

還要注意onMessage()的實現變得多么容易。 我們只是將新收到的消息放在BlockingQueue reply ,而集合將為我們完成所有同步。 我們仍然缺少一些不太重要但仍然重要的細節–取消和清理。 無需贅述,下面是一個完整的實現:

public class JmsReplyFuture<T extends Serializable> implements Future<T>, MessageListener {private static enum State {WAITING, DONE, CANCELLED}private final Connection connection;private final Session session;private final MessageConsumer replyConsumer;private final BlockingQueue<T> reply = new ArrayBlockingQueue<>(1);private volatile State state = State.WAITING;public JmsReplyFuture(Connection connection, Session session, Queue replyQueue) throws JMSException {this.connection = connection;this.session = session;replyConsumer = session.createConsumer(replyQueue);replyConsumer.setMessageListener(this);}@Overridepublic boolean cancel(boolean mayInterruptIfRunning) {try {state = State.CANCELLED;cleanUp();return true;} catch (JMSException e) {throw Throwables.propagate(e);}}@Overridepublic boolean isCancelled() {return state == State.CANCELLED;}@Overridepublic boolean isDone() {return state == State.DONE;}@Overridepublic T get() throws InterruptedException, ExecutionException {return this.reply.take();}@Overridepublic T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {final T replyOrNull = reply.poll(timeout, unit);if (replyOrNull == null) {throw new TimeoutException();}return replyOrNull;}@Overridepublic void onMessage(Message message) {try {final ObjectMessage objectMessage = (ObjectMessage) message;final Serializable object = objectMessage.getObject();reply.put((T) object);state = State.DONE;cleanUp();} catch (Exception e) {throw Throwables.propagate(e);}}private void cleanUp() throws JMSException {replyConsumer.close();session.close();connection.close();} }

我使用特殊的State枚舉來保存有關狀態的信息。 與基于多個標志, null檢查等的復雜條件相比,我發現它更具可讀性。要記住的第二件事是取消。 幸運的是,它非常簡單。 我們基本上關閉了基礎會話/連接。 在整個請求/答復消息交換的整個過程中,它必須保持打開狀態,否則臨時JMS答復隊列將消失。 請注意,我們不能輕易通知經紀人/ MDB我們對答復不再感興趣。 我們只是停止監聽它,但是MDB仍將處理請求并嘗試將答復發送到不再存在的臨時隊列。

那么這一切在實踐中看起來如何? 假設我們有一個MDB接收一個數字并返回一個平方。 假設計算需要一點時間,所以我們提前開始計算,同時做一些工作,然后再取回結果。 這樣的設計如下所示:

final Future<Double> replyFuture = asynchRequest(connectionFactory, 7, "square"); //do some more work final double resp = replyFuture.get(); //49

其中"square"是請求隊列的名稱。 如果我們重構它并使用依賴注入,我們可以將其進一步簡化為:

final Future<Double> replyFuture = calculator.square(7); //do some more work final double resp = replyFuture.get(); //49

您知道該設計的最佳選擇嗎? 即使我們正在利用相當先進的JMS功能,此處也沒有JMS代碼。 此外,我們稍后可以使用SOAP或GPU將calculator替換為其他實現。 就客戶端代碼而言,我們仍然使用Future<Double>抽象。 尚未提供計算結果。 根本的機制是無關緊要的。 那就是抽象的美。

顯然,此實現尚未準備好生產(到目前為止)。 但更糟糕的是,它缺少一些基本功能。 我們仍然在某個時候調用阻塞Future.get() 。 而且,無法組成/鏈接期貨(例如, 當響應到達時,發送另一條消息 )或等待最快的期貨完成。 耐心一點!

參考:在NoBlogDefFound博客上,從我們的JCG合作伙伴 Tomasz Nurkiewicz 實現自定義Future 。

翻譯自: https://www.javacodegeeks.com/2013/02/implementing-custom-future.html

總結

以上是生活随笔為你收集整理的实现自定义的未来的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 精品在线观看一区 | 在线观看一区视频 | 国产精品美女久久久久久 | 男人午夜剧场 | 中文字幕在线视频观看 | 日本αv| 一级一片免费看 | 色姑娘久 | 国产亚洲精品av | 日韩精品免费看 | 成人免费视频一区二区三区 | 免费在线黄网站 | 丝袜脚交免费网站xx | 欧洲综合色| 天天综合人人 | 久久99网| 日韩中文一区二区三区 | 在线亚洲+欧美+日本专区 | 欧美成人性生活片 | 天天综合av| 国产视频久久久久久 | 久久久久久久av | 久久精品aaaaaa毛片 | 久久久久久久一 | 国产精品久久综合 | 奇米第四色首页 | 99视频导航 | 欧美福利第一页 | www在线免费观看 | 九九热视频在线播放 | 肉大榛一进一出免费视频 | 欧美一级网 | 亚洲性精品 | 一品毛片 | 草莓视频18免费观看 | 物业福利视频 | www.97色| 最近日本中文字幕 | 天天爱综合 | 欧美日韩一区二区三区69堂 | 欧美xxxx69 | 久久视频免费在线观看 | 国产视频一二三区 | 欧美日本韩国在线 | 欧美一区二区三区在线观看 | 欧美福利一区 | 国外成人性视频免费 | 你懂的网站在线观看 | 天天爱综合 | 一级日韩一级欧美 | 美女啪啪国产 | 亚洲少妇18p | 美女视频三区 | 蜜桃av噜噜一区二区三区网址 | 中文字幕亚洲精品 | 亚洲欧美一区二区三区 | 制服.丝袜.亚洲.中文.综合懂色 | 九九爱国产| 国产激情av一区二区三区 | 日本十八禁视频无遮挡 | 国产喷水在线 | 99久久精品国产亚洲 | 国产香蕉视频在线观看 | 中日毛片 | 日日鲁鲁鲁夜夜爽爽狠狠视频97 | 久久久久久久久久久久久久久久久 | 涩涩涩在线视频 | 波多野结衣高清视频 | 欧美精品123 | 182av| 偷拍综合网 | 亚洲一区中文字幕在线观看 | 一级在线毛片 | 青苹果av | 成人啪啪网站 | 黄色日本视频 | 青青国产在线 | 婷婷色中文 | 假日游船法国满天星 | 国产精品中文久久久久久 | 国模吧一区二区 | 99精品视频99 | 欧美国产中文字幕 | 精品乱码一区内射人妻无码 | 好吊日精品视频 | 久久男人的天堂 | wwwxx在线观看 | 怡红院av亚洲一区二区三区h | 手机av在线不卡 | 亚洲精品大片www | 免费观看nba乐趣影院 | 少妇精品无码一区二区 | 国产精品老熟女视频一区二区 | 热久久国产 | 高清av不卡 | 日韩女优一区 | 幸福,触手可及 | 亚洲第一成年人网站 | 午夜免费播放观看在线视频 |