JDK11的新特性:HTTP API和reactive streams
文章目錄
- 簡介
- 怎么在java中使用reactive streams
- POST請求的例子
- 總結
簡介
在JDK11的新特性:新的HTTP API中,我們介紹了通過新的HTTP API,我們可以發送同步或者異步的請求,并獲得的返回的結果。
今天我們想探討一下這些同步或者異步請求和響應和reactive streams的關系。
更多內容請訪問www.flydean.com
怎么在java中使用reactive streams
reactive streams的介紹大家可以參考reactive stream協議詳解,使用reactive streams的目的就是為了解決發送者和消費者之間的通信問題,發送者不會發送超出消費者能力的信息。
我們再回顧一下reactive streams中的幾個關鍵概念:
-
Publisher 負責產生消息或者事件,并提供了一個subscribed接口來和Subscriber進行連接。
-
Subscriber 用來subscribe一個Publisher,并提供了onNext方法來處理新的消息,onError來處理異常,onComplete提供給Publisher調用來結束監聽。
-
Subscription 負責連接Publisher和Subscriber,可以用來請求消息或者取消收聽。
更進一步,如果我們想要自己實現一個reactive streams,我們需要做這些事情:
- 創建Publisher和Subscriber。
- 調用Publisher.subscribe(Subscriber)建立Publisher和Subscriber之間的連接。
- Publisher創建一個Subscription,并調用Subscriber.onSubscription(Subscription)方法。
- Subscriber將Subscription保存起來,供后面使用。
- Subscriber調用Subscription.request(n) 方法請求n個消息。
- Publisher調用Subscriber.onNext(item) 將請求的消息發送給Subscriber。
- 按照需要重復上訴過程。
- Publisher調用Subscriber.OnError(err) 或者 Subscriber.onComplete()方法。
- Subscriber調用Subscription.cancel()方法。
POST請求的例子
還記得上篇文章我們講HTTP API新特性的時候,我們使用的例子嗎?
例子中,我們使用了一個HttpRequest.BodyPublisher,用來構建Post請求,而BodyPublisher就是一個Flow.Publisher:
public interface BodyPublisher extends Flow.Publisher<ByteBuffer>也就是說從BodyPublisher開始,就已經在使用reactive streams了。
為了能夠更好的了解reactive streams的工作原理,我們創建幾個wrapper類將Publisher,Subscriber,Subscription包裝起來,輸出相應的日志。
代碼有點多我們就不一一列出來了,這里只列一個CustBodyPublisher的具體實現:
public class CustBodyPublisher implements HttpRequest.BodyPublisher {private final HttpRequest.BodyPublisher bodyPublisher;public CustBodyPublisher(HttpRequest.BodyPublisher bodyPublisher){this.bodyPublisher=bodyPublisher;}@Overridepublic long contentLength() {long contentLength=bodyPublisher.contentLength();log.info("contentLength:{}",contentLength);return contentLength;}@Overridepublic void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {log.info("CustBodyPublisher subscribe {}",subscriber);bodyPublisher.subscribe(new CustSubscriber(subscriber));} }wrapper類很簡單,通過構造函數傳入要wrapper的類,然后在相應的方法中調用實際wrapper類的方法。
最后,我們將之前使用的調用HTTP API的例子改造一下:
public void testCustPost() throws IOException, InterruptedException {HttpClient client = HttpClient.newBuilder().build();HttpRequest.BodyPublisher requestBody = HttpRequest.BodyPublishers.ofString("{ 我是body }");CustBodyPublisher custBodyPublisher= new CustBodyPublisher(requestBody);HttpRequest postRequest = HttpRequest.newBuilder().POST(custBodyPublisher).uri(URI.create("http://www.flydean.com")).build();HttpResponse<String> response = client.send(postRequest, HttpResponse.BodyHandlers.ofString());log.info("response {}",response);}注意這里CustBodyPublisher custBodyPublisher= new CustBodyPublisher(requestBody),我們創建了一個新的wrapper類。
運行它,觀察輸出結果:
[HttpClient-1-Worker-0] INFO com.flydean.CustBodyPublisher - contentLength:14 [HttpClient-1-Worker-0] INFO com.flydean.CustBodyPublisher - CustBodyPublisher subscribe jdk.internal.net.http.Http1Request$FixedContentSubscriber@672776b6 [HttpClient-1-Worker-0] INFO com.flydean.CustSubscriber - CustSubscriber onSubscribe jdk.internal.net.http.PullPublisher$Subscription@580ce038 [HttpClient-1-Worker-0] INFO com.flydean.CustSubscription - CustSubscription request 1 [HttpClient-1-Worker-0] INFO com.flydean.CustSubscriber - CustSubscriber onNext length 14 [HttpClient-1-Worker-0] INFO com.flydean.CustSubscription - CustSubscription request 1 [HttpClient-1-Worker-0] INFO com.flydean.CustSubscriber - CustSubscriber onComplete [main] INFO com.flydean.ReactiveHttpUsage - response (POST http://www.flydean.com) 200可以看到reactive stream的具體工作流程。首先創建了CustBodyPublisher,然后調用了subscribe方法。
接著CustSubscriber調用onSubscribe創建了Subscription。
每次CustSubscription的request方法都會導致CustSubscriber的onNext方法被調用。
最后當CustSubscription再次請求無結果的時候,CustSubscriber調用onComplete方法結束整個流程。
注意,上面的例子中,我們wrapper調用的是BodyPublishers.ofString,其實BodyPublishers中內置了多種BodyPublisher的實現。感興趣的朋友可以自行探索。
總結
本文講解了新的HTTP API中reactive Streams的使用。
本文的例子https://github.com/ddean2009/
learn-java-base-9-to-20
更多精彩內容且看:
- 區塊鏈從入門到放棄系列教程-涵蓋密碼學,超級賬本,以太坊,Libra,比特幣等持續更新
- Spring Boot 2.X系列教程:七天從無到有掌握Spring Boot-持續更新
- Spring 5.X系列教程:滿足你對Spring5的一切想象-持續更新
- java程序員從小工到專家成神之路(2020版)-持續更新中,附詳細文章教程
本文作者:flydean程序那些事
本文鏈接:http://www.flydean.com/jdk11-http-api-reactive-streams/
本文來源:flydean的博客
歡迎關注我的公眾號:程序那些事,更多精彩等著您!
總結
以上是生活随笔為你收集整理的JDK11的新特性:HTTP API和reactive streams的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 什么?注释里面的代码居然能够执行
- 下一篇: JDK10的新特性:本地变量类型var