日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

使用RxJava和SseEmitter进行服务器发送的事件

發布時間:2023/12/3 java 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用RxJava和SseEmitter进行服务器发送的事件 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Spring Framework 4.2 GA即將發布,讓我們看一下它提供的一些新功能。 引起我注意的一個事件是一個簡單的新類SseEmitter ,它是對Spring MVC控制器中易于使用的發送事件的抽象。 SSE是一項技術,使您可以在一個HTTP連接內沿一個方向將數據從服務器流式傳輸到瀏覽器。 聽起來像是websocket可以做什么的子集。 但是,由于它是一個簡單得多的協議,因此可以在不需要全雙工的情況下使用,例如,實時推動股價變化或顯示長時間運行的進程。 這將是我們的例子。

假設我們有一個具有以下API的虛擬硬幣礦工:

public interface CoinMiner {BigDecimal mine() {//...} }

每次調用mine()我們都必須等待幾秒鐘,才能獲得大約1個硬幣的回報(平均)。 如果要挖掘多個硬幣,我們必須多次調用此方法:

@RestController public class MiningController {//...@RequestMapping("/mine/{count}")void mine(@PathVariable int count) {IntStream.range(0, count).forEach(x -> coinMiner.mine());}}

這項工作有效,我們可以請求/mine/10和mine()方法將執行10次。 到目前為止,一切都很好。 但是挖掘是一項占用大量CPU的任務,將計算分散到多個內核將是有益的。 此外,即使使用并行化,我們的API端點也相當慢,我們必須耐心等待直到所有工作完成而沒有任何進度通知。 讓我們首先修復并行性–但是,由于并行流無法控制底層線程池,因此我們來使用顯式的ExecutorService :

@Component class CoinMiner {CompletableFuture<BigDecimal> mineAsync(ExecutorService executorService) {return CompletableFuture.supplyAsync(this::mine, executorService);}//...}

客戶端代碼必須顯式提供ExecutorService (只是設計選擇):

@RequestMapping("/mine/{count}") void mine(@PathVariable int count) {final List<CompletableFuture<BigDecimal>> futures = IntStream.range(0, count).mapToObj(x -> coinMiner.mineAsync(executorService)).collect(toList());futures.forEach(CompletableFuture::join); }

首先多次調用mineAsync ,然后(作為第二階段)等待所有mineAsync完成并join這非常重要。 很容易寫:

IntStream.range(0, count).mapToObj(x -> coinMiner.mineAsync(executorService)).forEach(CompletableFuture::join);

但是,由于Java 8中流的惰性,該任務將按順序執行! 如果您還不習慣流的懶惰,請始終從下至上閱讀它們:我們要求join一些將來的內容,以便流上升并只調用一次mineAsync() (惰性!),并將其傳遞給join() 。 當join()完成時,它再次上升并要求另一個Future 。 通過使用collect()我們強制所有mineAsync()執行,開始所有異步計算。 稍后我們等待每一個。

介紹

現在該變得更具反應性了(我說過了)。 控制器可以返回SseEmitter的實例。 從處理程序方法return后,容器線程將被釋放并可以處理更多即將到來的請求。 但是連接沒有關閉,客戶端一直在等待! 我們應該做的是保留對SseEmitter實例的引用,并在以后從另一個線程調用其send()和complete方法。 例如,我們可以啟動一個長時間運行的進程,并保持send()從任意線程進行進度。 完成該過程后,我們complete() SseEmitter ,最后關閉HTTP連接(至少從邏輯SseEmitter ,請記住Keep-alive )。 在下面的示例中,我們有一堆CompletableFuture ,當每個CompletableFuture完成時,我們只需將1發送給客戶端( notifyProgress() )。 當所有期貨都完成后,我們完成流( thenRun(sseEmitter::complete) ),關閉連接:

@RequestMapping("/mine/{count}") SseEmitter mine(@PathVariable int count) {final SseEmitter sseEmitter = new SseEmitter();final List<CompletableFuture<BigDecimal>> futures = mineAsync(count);futures.forEach(future ->future.thenRun(() -> notifyProgress(sseEmitter)));final CompletableFuture[] futuresArr = futures.toArray(new CompletableFuture[futures.size()]);CompletableFuture.allOf(futuresArr).thenRun(sseEmitter::complete);return sseEmitter; }private void notifyProgress(SseEmitter sseEmitter) {try {sseEmitter.send(1);} catch (IOException e) {throw new RuntimeException(e);} }private List<CompletableFuture<BigDecimal>> mineAsync(@PathVariable int count) {return IntStream.range(0, count).mapToObj(x -> coinMiner.mineAsync(executorService)).collect(toList()); }

調用此方法將產生以下響應(注意Content-Type ):

< HTTP/1.1 200 OK < Content-Type: text/event-stream;charset=UTF-8 < Transfer-Encoding: chunked < data:1data:1data:1data:1* Connection #0 to host localhost left intact

稍后我們將學習如何在客戶端解釋這種響應。 現在暫時讓我們整理一下設計。

與引進RxJava

上面的代碼有效,但是看起來很凌亂。 實際上,我們有一系列事件,每個事件都代表計算的進度。 計算最終完成,因此流也應發出信號結束。 聽起來就像是Observable ! 我們從重構CoinMiner開始,以返回Observable<BigDecimal :

Observable<BigDecimal> mineMany(int count, ExecutorService executorService) {final ReplaySubject<BigDecimal> subject = ReplaySubject.create();final List<CompletableFuture<BigDecimal>> futures = IntStream.range(0, count).mapToObj(x -> mineAsync(executorService)).collect(toList());futures.forEach(future ->future.thenRun(() -> subject.onNext(BigDecimal.ONE)));final CompletableFuture[] futuresArr = futures.toArray(new CompletableFuture[futures.size()]);CompletableFuture.allOf(futuresArr).thenRun(subject::onCompleted);return subject; }

每當mineMany()返回的事件出現在Observable ,我們就mineMany()那么多硬幣。 當所有期貨都完成后,我們也完成了交易。 在實現方面,這看起來還沒有改善,但是從控制器的角度來看,它有多干凈:

@RequestMapping("/mine/{count}") SseEmitter mine(@PathVariable int count) {final SseEmitter sseEmitter = new SseEmitter();coinMiner.mineMany(count, executorService).subscribe(value -> notifyProgress(sseEmitter),sseEmitter::completeWithError,sseEmitter::complete);return sseEmitter; }

調用coinMiner.mineMany()我們只需訂閱事件。 事實證明Observable和SseEmitter方法匹配1:1。 這里發生的事情很不言自明:啟動異步計算,每當后臺計算發出任何進度信號時,將其轉發給客戶端。 好的,讓我們回到實現。 看起來很亂,因為我們將CompletableFuture和Observable混合使用。 我已經描述了如何僅使用一個元素將CompletableFuture轉換為Observable 。 這是一個概述,包括rx.Single從RxJava 1.0.13開始發現的rx.Single抽象(此處未使用):

public class Futures {public static <T> Observable<T> toObservable(CompletableFuture<T> future) {return Observable.create(subscriber ->future.whenComplete((result, error) -> {if (error != null) {subscriber.onError(error);} else {subscriber.onNext(result);subscriber.onCompleted();}}));}public static <T> Single<T> toSingle(CompletableFuture<T> future) {return Single.create(subscriber ->future.whenComplete((result, error) -> {if (error != null) {subscriber.onError(error);} else {subscriber.onSuccess(result);}}));}}

將這些實用程序運算符放在某個地方,我們可以改善實現并避免混合使用兩個API:

Observable<BigDecimal> mineMany(int count, ExecutorService executorService) {final List<Observable<BigDecimal>> observables = IntStream.range(0, count).mapToObj(x -> mineAsync(executorService)).collect(toList());return Observable.merge(observables); }Observable<BigDecimal> mineAsync(ExecutorService executorService) {final CompletableFuture<BigDecimal> future = CompletableFuture.supplyAsync(this::mine, executorService);return Futures.toObservable(future); }

RxJava有一個內置的運算符,用于將多個Observable合并為一個,我們的每個基礎Observable發出一個事件,這無關緊要。

深入研究RxJava運算符

讓我們使用RxJava的功能來稍微改善流式傳輸。

scan()

當前,每次我們開采一枚硬幣時,我們都會send(1)客戶端send(1)事件。 這意味著每個客戶都必須跟蹤其已經收到的硬幣數量,以便計算總的計算數量。 如果服務器始終發送總金額而不是增量,那就太好了。 但是,我們不想更改實現。 事實證明,使用Observable.scan()運算符非常簡單:

@RequestMapping("/mine/{count}") SseEmitter mine(@PathVariable int count) {final SseEmitter sseEmitter = new SseEmitter();coinMiner.mineMany(count, executorService).scan(BigDecimal::add).subscribe(value -> notifyProgress(sseEmitter, value),sseEmitter::completeWithError,sseEmitter::complete);return sseEmitter; }private void notifyProgress(SseEmitter sseEmitter, BigDecimal value) {try {sseEmitter.send(value);} catch (IOException e) {e.printStackTrace();} }

scan()運算符接收上一個事件和當前事件,并將它們組合在一起。 通過應用BigDecimal::add我們只需將所有數字相加即可。 例如1、1 +1,(1 +1)+1等。 scan()類似于flatMap() ,但保留中間值。

用sample()采樣

可能是因為我們的后端服務產生了太多的進度更新,我們無法使用。 我們不想給客戶端增加不相關的更新并飽和帶寬。 每秒最多發送兩次更新聽起來很合理。 幸運的是,RxJava也有一個內置的運算符:

Observable<BigDecimal> obs = coinMiner.mineMany(count, executorService); obs.scan(BigDecimal::add).sample(500, TimeUnit.MILLISECONDS).subscribe(//...);

sample()將定期查看底層流,并僅發出最新的項,并丟棄中間的項。 幸運的是,我們使用scan()即時聚合了項目,因此我們不會丟失任何更新。

window() –恒定的發射間隔

不過有一個陷阱。 如果在選定的500毫秒內沒有新內容出現, sample()將不會兩次發出相同的項目。 很好,但是請記住我們正在通過TCP / IP連接推送這些更新。 最好是定期向客戶端發送更新,即使在此期間什么也沒發生–只是為了保持連接的正常運行,就像ping 。 可能有多種方法可以滿足此要求,例如,涉及timeout()運算符。 我選擇使用window()運算符每500毫秒對所有事件進行分組:

Observable<BigDecimal> obs = coinMiner.mineMany(count, executorService); obs.window(500, TimeUnit.MILLISECONDS).flatMap(window -> window.reduce(BigDecimal.ZERO, BigDecimal::add)).scan(BigDecimal::add).subscribe(//...);

這是一個棘手的問題。 首先,我們將所有進度更新分組在500毫秒窗口中。 然后,我們使用reduce來計算在此時間段內開采的硬幣的總數(類似于scan() )。 如果在此期間未開采任何硬幣,我們只需返回ZERO 。 最后,我們使用scan()匯總每個窗口的小計。 我們不再需要sample()因為window()確保每500毫秒發出一個事件。

客戶端

JavaScript中有很多SSE用法的示例,因此為您提供一種快速的解決方案,稱為我們的控制器:

var source = new EventSource("/mine/10"); source.onmessage = function (event) {console.info(event); };

我相信SseEmitter是Spring MVC的一項重大改進,它將使我們能夠編寫更健壯和更快的Web應用程序,需要即時的單向更新。

翻譯自: https://www.javacodegeeks.com/2015/08/server-sent-events-with-rxjava-and-sseemitter.html

總結

以上是生活随笔為你收集整理的使用RxJava和SseEmitter进行服务器发送的事件的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。