Java9 基于异步响应式流的发布-订阅框架
為響應式流(Reactive Streams)增加的發布-訂閱(publisher-subscriber)框架、并發包CompletableFuture類的增強,等等。。
JEP266中為Java語言的并發性又引入許多新的方式:響應式流,一個為它而生互操作性更強的發布-訂閱框架;并且為了Java9其他API而增強的 java.util.concurrent.CompletableFuture 類, 以及其他的更多的更新。
在本文中,展開對響應式流的介紹,然后介紹這個發布訂閱框架。
響應式流(Reactive Streams)
批處理系統在收集了足夠多的數據,達到某一個閾值亟待進行下一步操作的時候,就衍生出了一個新的名詞—數據處理(Data processing)。這時候,面向流(stream-oriented)的架構思想可以幫助我們盡快達成這個目標。它可以捕獲和處理實時數據,并且可以快速地(秒級甚至更短)基于處理的結果來對系統進行相應的操作。和它相比,一個批處理系統可能會花費數秒、數天、甚至更久來做出響應。
處理數據流(特別是大小不定的實時數據)需要在異步系統中特別小心。主要問題是要控制資源消耗,避免數據源和處理系統出現供大于求(積壓)的情況。這時候,需要異步地來對數據進行并行處理,利用分布式系統或者發揮多核CPU的效能,能有效地使數據處理過程變得快速高效。
響應式流(Reactive Streams)為這種非阻塞背壓的異步流處理提供了一個標準。在處理系統出現過載的時候,采用異步發送信號的方式通知數據源做相應的處理。這個通知的信號就像是水管的閥門一樣,關閉這個閥門會增加背壓(數據源對處理系統的壓力),同時也會增加處理系統的壓力。
這個標準的目的是治理跨異步邊界的流數據交換(比如向其他線程傳輸數據) ,同時確保處理系統不被緩沖數據而壓垮。換一種說法,背壓是這個標準模型的一個組成部分,以便允許在線程之間調停的隊列被界定。特別注意,背壓通信是異步的。
響應式流(Reactive Streams)的提出就致力于提供一組最小規模的接口、方法、或者協議來描述這個操作或實體:具有非阻塞背壓的異步數據流。
發布-訂閱(publisher-subscriber)框架
Java 9 通過java.util.concurrent.Flow 和java.util.concurrent.SubmissionPublisher 類來實現響應式流。
Flow 類中定義了四個嵌套的靜態接口,用于建立流量控制的組件,發布者在其中生成一個或多個供訂閱者使用的數據項:
- Publisher:數據項發布者、生產者
- Subscriber:數據項訂閱者、消費者
- Subscription:發布者與訂閱者之間的關系紐帶,訂閱令牌
- Processor:數據處理器
發布者(Publisher)以流的方式發布數據項,并注冊訂閱者,并且實現 Flow.Publisher 接口,該接口聲明了一個方法,我們通過調用它來為發布者注冊訂閱者:
void subscribe(Flow.Subscriber<? super T> subscriber)調用此方法來向發布者注冊訂閱者,但是,如果此訂閱者已被其他發布者注冊或注冊失敗(策略沖突),這個方法就會調用訂閱者的onError() 方法來拋出IllegalStateException 異常,除此之外,訂閱者的onSubscribe() 方法會調用一個新的Flow.Subscription ,當空對象傳給訂閱者時,subscribe() 方法會拋出NullPointerException異常。
訂閱者(Subscriber)從訂閱的發布者中返回數據項,并且實現Flow.Subscriber<T> ,這個接口聲明的方法如下:
void onSubscribe(Flow.Subscription subscription) void onComplete() void onError(Throwable throwable) void onNext(T item)onSubscribe() 方法用來確認訂閱者注冊到發布者是否注冊成功,它以參數列表的方式接收一個Flow.Subscription類型的參數,而這個參數類型里面聲明的方法允許向發布者請求發布新的數據項,或請求發布者不再發布更多的數據項。
onComplete() 方法用在當訂閱者沒有調用其他方法,而Subscription 發生錯誤沒有終止的情況下。調用這個方法之后,此訂閱者就不能調用其他方法。
onError(Throwable throwable) 方法用在當發布者或訂閱者遭遇不可恢復的錯誤的時候, 調用這個方法之后,此訂閱者也不能調用其他方法。
onNext() 方法用于聲明下一個數據項的訂閱,如果在此過程中拋出異常,結果將得不到確認,甚至會導致訂閱被取消。
一個訂閱令牌(Subscription)為發布者和訂閱者定義一種關系, 使得訂閱者接收特定的數據項或者在特定時間取消接收請求,訂閱令牌實現自Flow.Subscription 接口,該接口聲明方法如下:
void request(long n) void cancel()request() 方法添加n個數據項到當前未滿的訂閱請求中。如果n小于或等于0,訂閱者的onError() 方法會被調用,并且拋出IllegalArgumentException 異常,此外,如果n大于0,訂閱者就會在onNext() 方法的調用下接收到n個數據項,除非中間異常終止。 從Long.MAX_VALUE次到n次中間是無界的調用。
cancel() 用來終止訂閱者接收數據項,它有一種嘗試機制,也就是說,在調用它之后也有可能收到數據項。
最后,數據處理器(Processor)在不改變發布者與訂閱者的情況下基于流做數據處理,可以在發布者與訂閱者之間放多個數據處理器,成為一個處理器鏈,發布者與訂閱者不依賴于數據處理,它們是單獨的過程。JDK9中不提供具體的數據處理器,必須由開發者來通過實現無方法聲明的Processor接口來自行構建。
SubmissionPublisher 實現自Flow.Publisher 接口,向當前訂閱者異步提交非空的數據項,直到它被關閉。每個當前訂閱者以一個相同的順序接收新提交的數據項,除非數據項丟失或者遇到異常。SubmissionPublisher 允許數據項在丟失或阻塞的時候扮演發布者角色。
SubmissionPublisher 提供了三個構造方法來獲取實例。無參的構造器依賴于 ForkJoinPool.commonPool() 方法來提交發布者,以此實現生產者向訂閱者提供數據項的異步特性。
下面的程序演示了SubmissionPublisher 用法和這套發布-訂閱框架的其他特性:
import java.util.Arrays;import java.util.concurrent.Flow.*; import java.util.concurrent.SubmissionPublisher; public class FlowDemo {public static void main(String[] args){// Create a publisher.SubmissionPublisher<String> publisher = new SubmissionPublisher<>();// Create a subscriber and register it with the publisher.MySubscriber<String> subscriber = new MySubscriber<>();publisher.subscribe(subscriber);// Publish several data items and then close the publisher.System.out.println("Publishing data items...");String[] items = { "jan", "feb", "mar", "apr", "may", "jun","jul", "aug", "sep", "oct", "nov", "dec" };Arrays.asList(items).stream().forEach(i -> publisher.submit(i));publisher.close();try{synchronized("A"){"A".wait();}}catch (InterruptedException ie){}} }class MySubscriber<T> implements Subscriber<T> {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription){this.subscription = subscription;subscription.request(1);}@Overridepublic void onNext(T item){System.out.println("Received: " + item);subscription.request(1);}@Overridepublic void onError(Throwable t){t.printStackTrace();synchronized("A"){"A".notifyAll();}}@Overridepublic void onComplete(){System.out.println("Done");synchronized("A"){"A".notifyAll();}} }其中使用了wait()和notifyAll() 方法來使主線程等到onComplete() 的完成,否則是不會看到任何輸出的。
下面是輸出結果:
Publishing data items... Received: jan Received: feb Received: mar Received: apr Received: may Received: jun Received: jul Received: aug Received: sep Received: oct Received: nov Received: dec Done最后說一句,熟悉RxJava的同學可以會心一笑了。
原文
總結
以上是生活随笔為你收集整理的Java9 基于异步响应式流的发布-订阅框架的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 实例36:python
- 下一篇: java枚举返回字符串_Java新特性: