Java9 基于异步响应式流的发布-订阅框架
為響應式流(Reactive Streams)增加的發(fā)布-訂閱(publisher-subscriber)框架、并發(fā)包CompletableFuture類的增強,等等。。
JEP266中為Java語言的并發(fā)性又引入許多新的方式:響應式流,一個為它而生互操作性更強的發(fā)布-訂閱框架;并且為了Java9其他API而增強的 java.util.concurrent.CompletableFuture 類, 以及其他的更多的更新。
在本文中,展開對響應式流的介紹,然后介紹這個發(fā)布訂閱框架。
響應式流(Reactive Streams)
批處理系統(tǒng)在收集了足夠多的數(shù)據(jù),達到某一個閾值亟待進行下一步操作的時候,就衍生出了一個新的名詞—數(shù)據(jù)處理(Data processing)。這時候,面向流(stream-oriented)的架構思想可以幫助我們盡快達成這個目標。它可以捕獲和處理實時數(shù)據(jù),并且可以快速地(秒級甚至更短)基于處理的結果來對系統(tǒng)進行相應的操作。和它相比,一個批處理系統(tǒng)可能會花費數(shù)秒、數(shù)天、甚至更久來做出響應。
處理數(shù)據(jù)流(特別是大小不定的實時數(shù)據(jù))需要在異步系統(tǒng)中特別小心。主要問題是要控制資源消耗,避免數(shù)據(jù)源和處理系統(tǒng)出現(xiàn)供大于求(積壓)的情況。這時候,需要異步地來對數(shù)據(jù)進行并行處理,利用分布式系統(tǒng)或者發(fā)揮多核CPU的效能,能有效地使數(shù)據(jù)處理過程變得快速高效。
響應式流(Reactive Streams)為這種非阻塞背壓的異步流處理提供了一個標準。在處理系統(tǒng)出現(xiàn)過載的時候,采用異步發(fā)送信號的方式通知數(shù)據(jù)源做相應的處理。這個通知的信號就像是水管的閥門一樣,關閉這個閥門會增加背壓(數(shù)據(jù)源對處理系統(tǒng)的壓力),同時也會增加處理系統(tǒng)的壓力。
這個標準的目的是治理跨異步邊界的流數(shù)據(jù)交換(比如向其他線程傳輸數(shù)據(jù)) ,同時確保處理系統(tǒng)不被緩沖數(shù)據(jù)而壓垮。換一種說法,背壓是這個標準模型的一個組成部分,以便允許在線程之間調停的隊列被界定。特別注意,背壓通信是異步的。
響應式流(Reactive Streams)的提出就致力于提供一組最小規(guī)模的接口、方法、或者協(xié)議來描述這個操作或實體:具有非阻塞背壓的異步數(shù)據(jù)流。
發(fā)布-訂閱(publisher-subscriber)框架
Java 9 通過java.util.concurrent.Flow 和java.util.concurrent.SubmissionPublisher 類來實現(xiàn)響應式流。
Flow 類中定義了四個嵌套的靜態(tài)接口,用于建立流量控制的組件,發(fā)布者在其中生成一個或多個供訂閱者使用的數(shù)據(jù)項:
- Publisher:數(shù)據(jù)項發(fā)布者、生產(chǎn)者
- Subscriber:數(shù)據(jù)項訂閱者、消費者
- Subscription:發(fā)布者與訂閱者之間的關系紐帶,訂閱令牌
- Processor:數(shù)據(jù)處理器
發(fā)布者(Publisher)以流的方式發(fā)布數(shù)據(jù)項,并注冊訂閱者,并且實現(xiàn) Flow.Publisher 接口,該接口聲明了一個方法,我們通過調用它來為發(fā)布者注冊訂閱者:
void subscribe(Flow.Subscriber<? super T> subscriber)調用此方法來向發(fā)布者注冊訂閱者,但是,如果此訂閱者已被其他發(fā)布者注冊或注冊失敗(策略沖突),這個方法就會調用訂閱者的onError() 方法來拋出IllegalStateException 異常,除此之外,訂閱者的onSubscribe() 方法會調用一個新的Flow.Subscription ,當空對象傳給訂閱者時,subscribe() 方法會拋出NullPointerException異常。
訂閱者(Subscriber)從訂閱的發(fā)布者中返回數(shù)據(jù)項,并且實現(xiàn)Flow.Subscriber<T> ,這個接口聲明的方法如下:
void onSubscribe(Flow.Subscription subscription) void onComplete() void onError(Throwable throwable) void onNext(T item)onSubscribe() 方法用來確認訂閱者注冊到發(fā)布者是否注冊成功,它以參數(shù)列表的方式接收一個Flow.Subscription類型的參數(shù),而這個參數(shù)類型里面聲明的方法允許向發(fā)布者請求發(fā)布新的數(shù)據(jù)項,或請求發(fā)布者不再發(fā)布更多的數(shù)據(jù)項。
onComplete() 方法用在當訂閱者沒有調用其他方法,而Subscription 發(fā)生錯誤沒有終止的情況下。調用這個方法之后,此訂閱者就不能調用其他方法。
onError(Throwable throwable) 方法用在當發(fā)布者或訂閱者遭遇不可恢復的錯誤的時候, 調用這個方法之后,此訂閱者也不能調用其他方法。
onNext() 方法用于聲明下一個數(shù)據(jù)項的訂閱,如果在此過程中拋出異常,結果將得不到確認,甚至會導致訂閱被取消。
一個訂閱令牌(Subscription)為發(fā)布者和訂閱者定義一種關系, 使得訂閱者接收特定的數(shù)據(jù)項或者在特定時間取消接收請求,訂閱令牌實現(xiàn)自Flow.Subscription 接口,該接口聲明方法如下:
void request(long n) void cancel()request() 方法添加n個數(shù)據(jù)項到當前未滿的訂閱請求中。如果n小于或等于0,訂閱者的onError() 方法會被調用,并且拋出IllegalArgumentException 異常,此外,如果n大于0,訂閱者就會在onNext() 方法的調用下接收到n個數(shù)據(jù)項,除非中間異常終止。 從Long.MAX_VALUE次到n次中間是無界的調用。
cancel() 用來終止訂閱者接收數(shù)據(jù)項,它有一種嘗試機制,也就是說,在調用它之后也有可能收到數(shù)據(jù)項。
最后,數(shù)據(jù)處理器(Processor)在不改變發(fā)布者與訂閱者的情況下基于流做數(shù)據(jù)處理,可以在發(fā)布者與訂閱者之間放多個數(shù)據(jù)處理器,成為一個處理器鏈,發(fā)布者與訂閱者不依賴于數(shù)據(jù)處理,它們是單獨的過程。JDK9中不提供具體的數(shù)據(jù)處理器,必須由開發(fā)者來通過實現(xiàn)無方法聲明的Processor接口來自行構建。
SubmissionPublisher 實現(xiàn)自Flow.Publisher 接口,向當前訂閱者異步提交非空的數(shù)據(jù)項,直到它被關閉。每個當前訂閱者以一個相同的順序接收新提交的數(shù)據(jù)項,除非數(shù)據(jù)項丟失或者遇到異常。SubmissionPublisher 允許數(shù)據(jù)項在丟失或阻塞的時候扮演發(fā)布者角色。
SubmissionPublisher 提供了三個構造方法來獲取實例。無參的構造器依賴于 ForkJoinPool.commonPool() 方法來提交發(fā)布者,以此實現(xiàn)生產(chǎn)者向訂閱者提供數(shù)據(jù)項的異步特性。
下面的程序演示了SubmissionPublisher 用法和這套發(fā)布-訂閱框架的其他特性:
import java.util.Arrays;import java.util.concurrent.Flow.*; import java.util.concurrent.SubmissionPublisher; public class FlowDemo {public static void main(String[] args){// Create a publisher.SubmissionPublisher<String> publisher = new SubmissionPublisher<>();// Create a subscriber and register it with the publisher.MySubscriber<String> subscriber = new MySubscriber<>();publisher.subscribe(subscriber);// Publish several data items and then close the publisher.System.out.println("Publishing data items...");String[] items = { "jan", "feb", "mar", "apr", "may", "jun","jul", "aug", "sep", "oct", "nov", "dec" };Arrays.asList(items).stream().forEach(i -> publisher.submit(i));publisher.close();try{synchronized("A"){"A".wait();}}catch (InterruptedException ie){}} }class MySubscriber<T> implements Subscriber<T> {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription){this.subscription = subscription;subscription.request(1);}@Overridepublic void onNext(T item){System.out.println("Received: " + item);subscription.request(1);}@Overridepublic void onError(Throwable t){t.printStackTrace();synchronized("A"){"A".notifyAll();}}@Overridepublic void onComplete(){System.out.println("Done");synchronized("A"){"A".notifyAll();}} }其中使用了wait()和notifyAll() 方法來使主線程等到onComplete() 的完成,否則是不會看到任何輸出的。
下面是輸出結果:
Publishing data items... Received: jan Received: feb Received: mar Received: apr Received: may Received: jun Received: jul Received: aug Received: sep Received: oct Received: nov Received: dec Done最后說一句,熟悉RxJava的同學可以會心一笑了。
原文
總結
以上是生活随笔為你收集整理的Java9 基于异步响应式流的发布-订阅框架的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 实例36:python
- 下一篇: java枚举返回字符串_Java新特性: