日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

小规模流处理kata。 第2部分:RxJava 1.x / 2.x

發布時間:2023/12/3 java 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 小规模流处理kata。 第2部分:RxJava 1.x / 2.x 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

在第1部分:線程池中,我們設計并實現了相對簡單的系統,用于實時處理事件。 確保您已閱讀上一部分,因為它包含一些我們將重用的類。 以防萬一這是要求:

一個系統每秒傳送約一千個事件。 每個Event至少具有兩個屬性:

  • clientId –我們期望一個客戶端每秒最多可以處理幾個事件
  • UUID –全球唯一

消耗一個事件大約需要10毫秒。 設計此類流的使用者:

  • 允許實時處理事件
  • 與一個客戶端有關的事件應按順序進行處理,即,您不能并行處理同一clientId事件
  • 如果10秒鐘內出現重復的UUID ,請將其刪除。 假設10秒鐘后不會出現重復
  • 到目前為止,我們提出的是線程池和共享緩存的組合。 這次我們將使用RxJava實現解決方案。 首先,我沒有透露EventStream的實現方式,僅提供了API:

    interface EventStream {void consume(EventConsumer consumer);}

    實際上,對于手動測試,我構建了一個簡單的RxJava流,其行為與需求類似,類似于系統:

    @Slf4j class EventStream {void consume(EventConsumer consumer) {observe().subscribe(consumer::consume,e -> log.error("Error emitting event", e));}Observable<Event> observe() {return Observable.interval(1, TimeUnit.MILLISECONDS).delay(x -> Observable.timer(RandomUtils.nextInt(0, 1_000), TimeUnit.MICROSECONDS)).map(x -> new Event(RandomUtils.nextInt(1_000, 1_100), UUID.randomUUID())).flatMap(this::occasionallyDuplicate, 100).observeOn(Schedulers.io());}private Observable<Event> occasionallyDuplicate(Event x) {final Observable<Event> event = Observable.just(x);if (Math.random() >= 0.01) {return event;}final Observable<Event> duplicated =event.delay(RandomUtils.nextInt(10, 5_000), TimeUnit.MILLISECONDS);return event.concatWith(duplicated);}}

    了解此模擬器的工作原理不是必不可少的,但很有趣。 首先,我們產生的源源不斷的Long值( 0 , 1 , 2 ...)每毫秒使用(每秒千個事件) interval()操作。 然后,我們使用delay()運算符將每個事件延遲0到1_000微秒之間的隨機時間。 這樣,事件將在難以預測的時刻出現,而情況會更加現實。 最后,我們將每個Long值映射(使用ekhem, map()運算符) map()到一個隨機Event ,該Event的clientId在1_000和1_100之間(包括端1_100在內)。

    最后一點很有趣。 我們想模擬偶爾的重復。 為此,我們將每個事件(使用flatMap() )映射到自身(在99%的情況下)。 但是,在1%的情況下,我們兩次返回此事件,第二次發生在10毫秒至5秒后。 在實踐中,該事件的重復實例將在其他數百個事件之后出現,這使流的行為逼真。

    與EventStream進行交互的方式有兩種:通過consume()回調和通過observe()流。 我們可以利用Observable<Event>來快速建立功能與第1部分非常相似但更簡單的處理管道。

    缺少背壓

    利用RxJava的第一個幼稚方法很快就失敗了:

    EventStream es = new EventStream(); EventConsumer clientProjection = new ClientProjection(new ProjectionMetrics(new MetricRegistry()));es.observe().subscribe(clientProjection::consume,e -> log.error("Fatal error", e));

    ( ClientProjection , ProjectionMetrics等人來自第1部分 )。 我們幾乎立即獲得MissingBackpressureException ,這是預期的。 還記得我們的第一個解決方案是如何通過處理越來越多的延遲來滯后嗎? RxJava嘗試避免這種情況,并避免隊列溢出。 由于使用者( ClientProjection )無法實時處理事件,因此拋出MissingBackpressureException 。 這是快速失敗的行為。 最快的解決方案是像以前一樣使用RxJava的功能將消耗轉移到一個單獨的線程池中:

    EventStream es = new EventStream(); EventConsumer clientProjection = new FailOnConcurrentModification(new ClientProjection(new ProjectionMetrics(new MetricRegistry())));es.observe().flatMap(e -> clientProjection.consume(e, Schedulers.io())).window(1, TimeUnit.SECONDS).flatMap(Observable::count).subscribe(c -> log.info("Processed {} events/s", c),e -> log.error("Fatal error", e));

    EventConsumer接口具有一個輔助方法,該方法可以在提供的Scheduler上異步使用事件:

    @FunctionalInterface interface EventConsumer {Event consume(Event event);default Observable<Event> consume(Event event, Scheduler scheduler) {return Observable.fromCallable(() -> this.consume(event)).subscribeOn(scheduler);}}

    通過在單獨的Scheduler.io()使用flatMap()使用事件,可以異步調用每個使用。 這次事件幾乎是實時處理的,但是存在更大的問題。 由于某種原因,我用FailOnConcurrentModification裝飾了ClientProjection 。 事件彼此獨立使用,因此可能會同時處理同一clientId兩個事件。 不好。 幸運的是,在RxJava中解決此問題比使用普通線程要容易得多:

    es.observe().groupBy(Event::getClientId).flatMap(byClient -> byClient.observeOn(Schedulers.io()).map(clientProjection::consume)).window(1, TimeUnit.SECONDS).flatMap(Observable::count).subscribe(c -> log.info("Processed {} events/s", c),e -> log.error("Fatal error", e));

    有點改變了。 首先,我們將事件按clientId分組。 這將單個Observable流拆分為流 。 每個名為byClient子流代表與同一clientId相關的所有事件。 現在,如果我們映射到此子流,我們可以確保與同一個clientId相關的事件不會同時發生。 外部流很懶,因此我們必須訂閱它。 與其單獨訂閱每個事件,我們不每秒收集事件并進行計數。 這樣,我們每秒就會收到一個Integer類型的單個事件,該事件表示每秒消耗的事件數。

    使用全局狀態的不純,非慣常,容易出錯,不安全的重復數據刪除解決方案

    現在我們必須刪除重復的UUID 。 丟棄重復項的最簡單但非常愚蠢的方法是利用全局狀態。 我們可以通過在filter()運算符之外可用的緩存中查找重復項來簡單地過濾掉重復項:

    final Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();es.observe().filter(e -> seenUuids.getIfPresent(e.getUuid()) == null).doOnNext(e -> seenUuids.put(e.getUuid(), e.getUuid())).subscribe(clientProjection::consume,e -> log.error("Fatal error", e));

    如果要監視此機制的使用,只需添加指標:

    Meter duplicates = metricRegistry.meter("duplicates");es.observe().filter(e -> {if (seenUuids.getIfPresent(e.getUuid()) != null) {duplicates.mark();return false;} else {return true;}})

    從操作員內部訪問全局狀態,尤其是可變狀態非常危險,并且破壞了RxJava的唯一目的-簡化并發。 顯然,我們使用了Guava的線程安全Cache ,但是在許多情況下,很容易錯過從多個線程訪問共享全局可變狀態的地方。 如果您發現自己在運算符鏈之外修改了一些變量,請非常小心。

    RxJava 1.x中的自定義

    RxJava 1.x有一個distinct()運算符,大概可以完成此工作:

    es.observe().distinct(Event::getUuid).groupBy(Event::getClientId)

    不幸的是, distinct()在內部將所有密鑰( UUID distinct()存儲在不斷增長的HashSet 。 但是我們只關心最近10秒鐘內的重復! 通過復制粘貼DistinctOperator的實現,我創建了DistinctEvent運算符,該運算符利用Guava的緩存僅存儲了最后10秒鐘的UUID值。 我故意在此運算符中對Event進行了硬編碼,而不是使其變得更通用以使代碼更易于理解:

    class DistinctEvent implements Observable.Operator<Event, Event> {private final Duration duration;DistinctEvent(Duration duration) {this.duration = duration;}@Overridepublic Subscriber<? super Event> call(Subscriber<? super Event> child) {return new Subscriber<Event>(child) {final Map<UUID, Boolean> keyMemory = CacheBuilder.newBuilder().expireAfterWrite(duration.toMillis(), TimeUnit.MILLISECONDS).<UUID, Boolean>build().asMap();@Overridepublic void onNext(Event event) {if (keyMemory.put(event.getUuid(), true) == null) {child.onNext(event);} else {request(1);}}@Overridepublic void onError(Throwable e) {child.onError(e);}@Overridepublic void onCompleted() {child.onCompleted();}};} }

    用法非常簡單,整個實現(加上自定義運算符)如下:

    es.observe().lift(new DistinctEvent(Duration.ofSeconds(10))).groupBy(Event::getClientId).flatMap(byClient -> byClient.observeOn(Schedulers.io()).map(clientProjection::consume)).window(1, TimeUnit.SECONDS).flatMap(Observable::count).subscribe(c -> log.info("Processed {} events/s", c),e -> log.error("Fatal error", e));

    實際上,如果您跳過每秒的日志記錄,它甚至可以更短:

    es.observe().lift(new DistinctEvent(Duration.ofSeconds(10))).groupBy(Event::getClientId).flatMap(byClient -> byClient.observeOn(Schedulers.io()).map(clientProjection::consume)).subscribe(e -> {},e -> log.error("Fatal error", e));

    該解決方案比以前基于線程池和裝飾器的解決方案要短得多。 唯一尷尬的部分是自定義運算符,該自定義運算符可在存儲太多歷史UUID時避免內存泄漏。 幸運的是RxJava 2得以解救!

    RxJava 2.x和更強大的內置

    實際上,我是從提交公關RxJava具有更強大的執行這種緊密distinct()操作。 但是在我檢查2.x分支之前,它是: distinct()允許提供自定義Collection而不是硬編碼的HashSet 。 信不信由你,依賴倒置不僅涉及Spring框架或Java EE。 當庫允許您提供其內部數據結構的自定義實現時,這也是DI。 首先,我創建一個輔助方法,該方法可以構建由Map<UUID, Boolean>支持,由Cache<UUID, Boolean>支持的Set<UUID> Cache<UUID, Boolean> 。 我們一定喜歡代表團!

    private Set<UUID> recentUuids() {return Collections.newSetFromMap(CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).<UUID, Boolean>build().asMap()); }

    有了這種方法,我們可以使用以下表達式實現整個任務:

    es.observe().distinct(Event::getUuid, this::recentUuids).groupBy(Event::getClientId).flatMap(byClient -> byClient.observeOn(Schedulers.io()).map(clientProjection::consume)).subscribe(e -> {},e -> log.error("Fatal error", e));

    優雅,簡潔,清晰! 它讀起來幾乎像一個問題:

    • 觀察事件流
    • 僅考慮不同的UUID
    • 客戶分組活動
    • 為每個客戶消耗(依次)

    希望您喜歡所有這些解決方案,并發現它們對您的日常工作很有用。

    也可以看看:

    • 小規模流處理kata。 第1部分:線程池
    • 小規模流處理kata。 第2部分:RxJava 1.x / 2.x

    翻譯自: https://www.javacodegeeks.com/2016/10/small-scale-stream-processing-kata-part-2-rxjava-1-x2-x.html

    總結

    以上是生活随笔為你收集整理的小规模流处理kata。 第2部分:RxJava 1.x / 2.x的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。