日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java的HTTP服务端响应式编程

發布時間:2024/1/17 java 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java的HTTP服务端响应式编程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

傳統的Servlet模型走到了盡頭

傳統的Java服務器編程遵循的是J2EE的Servlet規范,是一種基于線程的模型:每一次http請求都由一個線程來處理。

線程模型的缺陷在于,每一條線程都要自行處理套接字的讀寫操作。對于大部分請求來講,本地處理請求的速度很快,請求的讀取和返回是最耗時間的。也就是說大量的線程浪費在了遠程連接上,而沒有發揮出計算能力。但是需要注意一點,線程的創建是有開銷的,每一條線程都需要獨立的內存資源。JVM里的-Xss參數就是用來調整線程堆棧大小的。而JVM堆的總大小局限在了-Xmx參數上,因此一個正在運行的JVM服務器能夠同時運行的線程數是固定的。

即便通過調整JVM參數,使其能夠運行更多線程。但是JVM的線程會映射成為操作系統的用戶線程,而操作系統依然只能調度有限數量的線程。例如,Linux系統可以參考這里的討論:Maximum number of threads per process in Linux?。

此外,大量線程在切換的時候,也會產生上下文加載卸載的開銷,同樣會降低系統的性能。

可伸縮 IO

Doug Lea大神有一篇很經典的PPTScalable IO in Java,講述了一個更為優秀的服務器模型。

一個可伸縮的網絡服務系統應當滿足以下條件:

  • 能夠隨著計算資源(CPU、內存、磁盤容量、網絡帶寬等)的增加提高負載能力。
  • 當網絡負載增加超過能力的時候,能夠優雅降級,避免直接崩潰。例如,拒絕為超過能力范圍的請求提供服務,但對于能力范圍內的請求,依然提供服務。當流量洪峰過去之后,依然能夠正常運行。
  • 當然高可用、高性能依然是必須的:例如低響應延遲、隨負載變化請求或釋放計算資源等。
  • 作者給出的解決方案就是Reactor模式。

    Reactor模式將耗時的IO資源封裝為handle對象。handle對象注冊在操作系統的內核里,當對象滿足一定的條件時(可讀或者可寫),才會處理handle對象。在Reactor模式中,同步多路復用器負責處理handle對象的狀態變更,當滿足條件時,會調用handle對象注冊時提供的回調函數。

    同步多路復用器在一個單獨的線程里專門處理IO鏈接。當請求讀取完畢之后,任務提交至工作線程池完成請求的解碼、處理、編碼等工作,最后將由多路復用器負責將結果返回給客戶端,而池內線程繼續處理下一個任務。相比JDK1.5之前的對每一次請求新建一個線程的方式,線程池能夠實現線程復用,降低創建回收線程的開銷,在應對密集計算負載的時候有更好的表現。同時,在多個線程上分別部署一個同步多路復用器,也可以更好地利用多核CPU的處理能力。

    這樣,線程的任務分工就很明確,分別專門處理IO密集任務和專門處理CPU密集任務。

    NIO普及艱難

    從最早的select到后來Linux的epoll和BSD的Kqueue,操作系統的多路復用性能一直在不斷增強。

    JDK 1.4引入了NIO模塊,屏蔽了操作系統層面的細節,將各個系統的多路復用API做了統一封裝。JDK的NIO有以下幾個核心組件:

    • Buffer,一種容量在創建時被固定的數據容器
    • Charsets,負責數據的編解碼工作
    • Channels,對遠程連接的抽象
    • Selector,多路復用選擇器

    網絡連接封裝在Channels對象里面。Channels在Selector上注冊感興趣的SelectionKey事件:可讀OP_READ、可寫OP_WRITE、可連接OP_CONNECT還有服務器端套接字才有的可接入OP_ACCEPT。多路復用選擇器調用阻塞式select方法的時,在等待某一事件可用,然后就通知Channels執行相應的handler。Buffer是Channels實現讀寫操作的緩沖區。Charset用于對Buffer的內容進行編解碼。在NIO模式下,Selector能夠管理多個套接字的網絡讀寫,避免了過多計算線程阻塞在讀寫請求上。

    在JVM之外的世界里,多路復用通過Nginx、基于V8引擎的Node.js早就大放異彩。但是Java NIO在生產環境里的發展卻很慢。例如,Tomcat直到2016年發布8.5版本的時候,才徹底移除BIO連接器,完全擁抱NIO。

    JDK NIO主要有這樣幾個問題比較麻煩:

  • 首先是NIO為了提高數據收發性能,可以創建DirectBuffer對象。該對象的內存開辟在JVM堆之外,無法通過正常的GC收集器來回收,只能在JVM的老年代觸發全量GC的時候回收。而全量GC往往導致系統卡頓,降低響應效率。如果被動等待老年代區域自行觸發全量GC,又有可能造成堆外內存溢出。兩者之間的矛盾需要在開發的時候小心的平衡。
  • 其次就是,JDK1.8依然存在的epoll bug:若Selector的輪詢結果為空,也沒有wakeup或新消息處理,則發生空輪詢,CPU使用率100%。
  • Netty才是NIO該有的水準

    作為一個第三方框架,Netty做到了JDK本應做到的事情。

    Netty的數據容器ByteBuf更為優秀

    ByteBuf同時維護兩個索引:讀索引和寫索引。從而保證容器對象能夠同時適配讀寫同時進行的場景。而NIO的Buffer卻需要執行一次flip操作來適應讀寫場景的切換。同時ByteBuf容器使用引用計數來手工管理,可以在引用計數歸零時通過反射調用jdk.internal.ref.Cleaner來回收內存,避免泄露。在GC低效的時候,選擇使用手工方式來管理內存,完全沒問題。

    Netty的API封裝度更高

    觀察一下Netty官網Tutorial給出的demo,只要幾十行代碼就完成了一個具備Reactor模式的服務器。ServerBootstrap的group方法定義了主套接字和子套接字的處理方式,例中使用的NioEventLoopGroup類為Java NIO + 多線程的實現方式。對于NIO的epoll bug,NioEventLoopGroup的解決方案是rebuildSelectors對象方法。這個方法允許在selector失效時重建新的selector,將舊的釋放掉。此外,Netty還通過JNI實現了自己的EpollEventLoopGroup,規避了NIO版本的bug。

    Netty使用責任鏈模式實現了對server進出站消息的處理,使得server的代碼能夠更好的擴展和維護。

    Netty在生產領域得到大量應用,Hadoop Avro、Dubbo、RocketMQ、Undertow等廣泛應用于生產領域的產品的通信組件都選擇了Netty作為基礎,并經受住了考驗。

    Netty是一個優秀的異步通信框架,但是主要應用在基礎組件中。因為Netty向開發者暴露出大量的細節,對于業務系統的開發仍然形成了困擾,所以沒法得到進一步的普及。

    舉個例子。Netty使用ChannelFuture來接收傳入的請求。相比于JDK的Future實現,ChannelFuture可以添加一組GenericFutureListener來管理對象狀態,避免了反復對Future對象狀態的詢問或阻塞獲取。這是個進步。但是,這些Listener都帶來了另一個問題——Callback hell。而嵌套的回調代碼往往難以維護。

    對于Callback hell,我們可以做什么

    Netty做一個優秀的基礎組件就很好了。業務層面的問題就讓我們用業務層面的API來解決。

    Java API的適應性不佳

    JDK7以前的異步代碼難以組織

    在JDK7以及之前,Java多線程的編程工具主要就是Thread、ExecutorService、Future以及相關的同步工具,實現出來的代碼較為繁瑣、且性能不高。

    Thread

    舉個例子A,考慮一個場景有X、P、Q三個邏輯需要執行,其中X的執行需要在P、Q一起完成之后才啟動執行。

    如果使用Thread,那么代碼會是這個樣子:

    /* 創建線程 */ Thread a = new Thread(new Runnable() {@Overridepublic void run() {/* P邏輯 */} });Thread b = new Thread(new Runnable() {@Overridepublic void run() {/* Q邏輯 */} });/* 啟動線程 */ a.start(); b.start();/* 等候a、b線程執行結束 */ try {a.join();b.join(); } catch (InterruptedException e) {e.printStackTrace(); }/* 啟動X邏輯的執行 */ Thread c = new Thread(new Runnable() {@Overridepublic void run() {/* X邏輯 */} }); c.start();...

    上面這個代碼,先不論線程創建的開銷,單從形式上看,線程內部的執行邏輯、線程本身的調度邏輯,還有必須捕獲的InterruptedException的異常處理邏輯混雜在一起,整體很混亂。假想一下,當業務邏輯填充在其中的時候,代碼更難維護。

    ThreadPoolExecutor、Future

    ThreadPoolExecutor和Future有助于實現線程復用,但對于代碼邏輯的規范沒什么幫助。

    ExecutorService pool = Executors.newCachedThreadPool(); Future<?> a = pool.submit(new Runnable() {@Overridepublic void run() {/* P邏輯 */} }); Future<?> b = pool.submit(new Runnable() {@Overridepublic void run() {/* Q邏輯 */} });/* 獲取線程執行結果 * 依然要捕獲異常,處理邏輯 */ try {a.get();b.get();Future<?> c = pool.submit(new Runnable() {@Overridepublic void run() {/* X邏輯 */}}); } catch (InterruptedException e) {e.printStackTrace(); } catch (ExecutionException e) {e.printStackTrace(); }

    JDK8代碼可讀性有了顯著提高

    JDK8借鑒了相當多的函數式編程的特點,提供了幾樣很稱手的工具。

    CompleteableFuture和ForkJoinPool

    如果要用CompleteableFuture實現上一個例子,可以這樣寫。

    CompletableFuture<?> a = CompletableFuture.runAsync(() -> {/* P邏輯 */ }).exceptionally(ex -> {/* 異常處理邏輯 */return ...; }); CompletableFuture<?> b = CompletableFuture.runAsync(() -> {/* Q邏輯 */ }); CompletableFuture<?> c = CompletableFuture.allOf(a, b).thenRun(() -> {/* X邏輯 */ });

    有了lambda表達式的加持,例中的代碼整體以線程內部邏輯為主,調度邏輯通過allOf()、thenRun()等方法名直觀地展示出來。特別是可選的異常捕獲邏輯,更是使得代碼可讀性得到了極大的提高。

    需要注意的是,CompletableFuture是可以使用指定ExecutorService來執行的。如果像上例那樣沒有指定ExecutorService對象,那么會默認使用ForkJoinPool里的靜態對象commonPool來執行。而ForkJoinPool.commonPool作為一個JVM實例中唯一的對象,也是Stream并發流的執行器,因此應當盡量保證CompletableFuture里的邏輯不會阻塞線程。如果無法規避,可以使用ManagedBlocker來降低影響。

    ForkJoinPool是JDK1.7提供的并發線程池,可以很好地應對計算密集型并發任務,特別適用于可以“分-治”的任務。傳統的ThreadPoolExecutor需要指定線程池里的線程數量,而ForkJoinPool使用了一個相似但更有彈性的概念——“并發度”。并發度指的是池內的活躍線程數。對于可能的阻塞任務,ForkJoinPool設計了一個ManagedBlocker接口。當池內線程執行到ForkJoinPool.managedBlock(ForkJoinPool.ManagedBlocker blocker)方法時,線程池會新建一個線程去執行隊列里的其他任務,并輪詢該對象的isReleasable方法,決定是否恢復線程繼續運行。JDK1.7里的Phaser類源碼用到了這個方法。

    關于CompleteableFuture的用法,推薦看看這篇博客:理解CompletableFuture,總結的很好。
    而對于ForkJoinPool,可以看看這篇博客:Java 并發編程筆記:如何使用 ForkJoinPool 以及原理。

    Stream

    Stream流也是JDK8引入的一個很好的編程工具。

    Stream對象通常通過Iterator、Collection來構造。也可以用StreamSupport的stream靜態方法來創建自定義行為的實例。

    Stream流對象采用鏈式編程風格,可以制定一系列對流的定制行為,例如過濾、排序、轉化、迭代,最后產生結果。看個例子。

    List<Integer> intList = List.of(1, 2, 3);List<String> strList = intList.stream().filter(k -> k>1).map(String::valueOf).collect(Collectors.toList());

    上面這段代碼中,intList通過stream方法獲取到流對象,然后篩選出大于1的元素,并通過String的valueOf靜態方法生成String對象,最后將各個String對象收集為一個列表strList。就像CompletableFuture的方法名一樣,Stream的方法名都是自描述的,使得代碼可讀性極佳。

    除此之外,Stream流的計算還是惰性的。Stream流對象的方法大致分為兩種:

    • 中間方法,例如filter、map等對流的改變
    • 終結方法,例如collect、forEach等可以結束流

    只有在執行終結方法的時候,流的計算才會真正執行。之前的中間方法,都作為步驟記錄下來,但沒有實時地執行修改操作。

    如果將例子里的stream方法修改為parallelStream,那么得到的流對象就是一個并發流,而且總在ForkJoinPool.commonPool中執行。

    關于Stream,極力推薦Brian Goetz大神的系列文章Java Streams。

    還有一點問題

    ForkJoinPool是一款強大的線程池組件,只要使用的得當,線程池總會保持一個合理的并發度,充分利用計算資源。

    但是,CompleteableFuture也好,Stream也好,他們都存在一個相同的問題:無法通過后端線程池的負載變化,來調整前端的調用壓力。打比方說,當后端的ForkJoinPool.commonPool在全力運算而且隊列里有大量的任務排隊時,新提交的任務很可能會有很高的響應延遲,但是前端的CompleteableFuture或者Stream沒有途徑去獲取這樣一個狀態,來延緩任務的提交。這種情況就違背了“響應式系統”的“靈敏性”要求。

    來自第三方API的福音

    Reactive Streams

    Reactive Streams是一套標準,定義了一個運行于JVM平臺上的響應式編程框架實現所應該具備的行
    為。

    Reactive Streams規范衍生自“觀察者模式”,將前后依賴的邏輯流,拆解為事件和訂閱者。只有當事件發生變更時,感興趣的觀察者才隨之執行隨后的邏輯。Reactive Stream和JDK的Stream的理念有點接近,兩者都是注重對數據流的控制。緊耦合的邏輯流拆分為“訂閱-發布”方式其實是一大進步。代碼變得維護性更強,而且很容易隨著業務的需要按照消息驅動模式拆解。

    Reactive Streams規范定義了四種接口:

    • Publisher,負責生產數據流,每一個訂閱者都會調用subscribe方法來訂閱消息。
    • Subscriber,就是訂閱者。
    • Subscription,其實就是一個訂單選項,相當于飯館里的菜單,由發布者傳遞給訂閱者。
    • Processor,處于數據流的中間位置,即是訂閱者,也是新數據流的生產者。

    當Subscriber調用Publisher.subscribe方法訂閱消息時,Publisher就會調用Subscriber的onSubscribe方法,回傳一個Subscription菜單。

    Subscription菜單包含兩個選擇:

  • 一個是request方法,對數據流的請求,參數為所請求的數據流的數量,最大為Long.MAX_VALUE;
  • 另一個是cancel方法,對數據流訂閱的取消,需要注意的是數據流或許會繼續發送一段時間,以滿足之前的請求調用。
  • 一個Subscription對象只能由同一個Subscriber調用,所以不存在對象共享的問題。因此即便Subscription對象有狀態,也不會危及邏輯鏈路的線程安全。

    訂閱者Subscriber還需要定義三種行為:

  • onNext,接受到數據流之后的執行邏輯;
  • onError,當發布出現錯誤的時候如何應對;
  • onComplete,當訂閱的數據流發送完畢之后的行為。
  • 相比于Future、Thread那樣將業務邏輯和異常處理邏輯混雜在一起,Subscriber將其分別定義在三個方法里,代碼顯得更為清晰。java.util.Observer(在JDK9中開始廢棄)只定義了update方法,相當于這里的onNext方法,相比之下Subscriber增加了對流整體的管理和對異常的處理。異常如果隨著調用鏈傳遞出去,調試定位會非常麻煩。因此要重視onError方法,盡可能在訂閱者內部就處理這個異常。

    盡管Reactive Streams規范和Stream都關注數據流,但兩者有一個顯著的區別。那就是Stream是基于生產一方的,生產者有多大能力,Stream就制造多少數據流。而Reactive Streams規范是基于消費者的。邏輯鏈下游可以通過對request方法參數的變更,通知上游調整生產數據流的速度。從而實現了“響應式系統”的“靈敏性”要求。這在響應式編程中,用術語“背壓”來描述。

    Reactive Streams規范僅僅是一個標準,其實現又依賴其他組織的成果。其意義在于各家實現能夠通過這樣一個統一的接口,相互調用,有助于響應式框架生態的良性發展。Reactive Streams規范雖然是Netflix、Pivatol、RedHat等第三方大廠合作推出的,但已經隨著JDK9的發布收編為官方API,位于java.util.concurrent.Flow之內。JDK8也可以在項目中直接集成相應的模塊調用。

    順便吐槽一下,JDK9官方文檔給出的demo里的數據流居然從Subscription里生產出來,嚇得我反復確認了一下Reactive Streams官方規范。

    RxJava2

    RxJava由Netfilx維護,實現了ReactiveX API規范。該規范有很多語言實現,生態很豐富。

    Rx范式最先是微軟在.NET平臺上實現的。2014年11月,Netfilx將Rx移植到JVM平臺,發布了1.0穩定版本。而Reactive Streams規范是在2015年首次發布,2017年才形成穩定版本。所以RxJava 1.x和Reactive Streams規范有很大出入。1.x版本迭代至2018年3月的1.3.8版本時,宣布停止維護。

    Netflix在2016年11月發布2.0.1穩定版本,實現了和Reactive Streams規范的兼容。2.x如今是官方的推薦版本。

    RxJava框架里主要有這些概念:

    • Observable與Observer。RxJava直接復用了“觀察者模式”里的概念,有助于更快地被開發社區接受。Observeble和Publisher有一點差異:前者有“冷熱”的區分,“冷”表示只有訂閱的時候才發布消息流,“熱”表示消息流的發布與時候有對象訂閱無關。Publisher更像是“冷”的Observeble。
    • Operators,也就是操作符。RxJava和JDK Stream類似,但設計了更多的自描述的函數方法,并同樣實現了鏈式編程。這些方法包括但不限于轉換、過濾、結合等等。
    • Single,是一種特殊的Observable。一般的Observable能夠產生數據流,而Single只能產生一個數據。所以Single不需要onNext、onComplete方法,而是用一個onSuccess取而代之。
    • Subject,注意這個不是事件,而是介于Observable與Observer之間的中介對象,類似于Reactive Streams規范里的Processor。
    • Scheduler,是一類線程池,用于處理并發任務。RxJava默認執行在主線程上,可以通過observeOn/subscribeOn方法來異步調用阻塞式任務。

    RxJava 2.x在Zuul 2、Hystrix、Jersey等項目都有使用,在生產領域已經得到了應用。

    Reactor3

    Reactor3有Pivotal來開發維護,也就是Spring的同門師弟。

    整體上,Reactor3框架里的概念和RxJava都是類似的。Mono和Flux都等同于RxJava的Single和Observable。Reactor3也使用自描述的操作符函數實現鏈式編程。

    RxJava 2.x支持JVM 6+平臺,對老舊項目很友好;而Reactor3要求必須是JVM8+。所以說,如果是新項目,使用Reactor3更好,因為它使用了很多新的API,支撐很多函數式接口,代碼可讀性維護性都更好。

    背靠Spring大樹,Reactor3的設計目標是服務器端的Java項目。Reactor社區針對服務器端,不斷推出新產品,例如Reactor Netty、Reactor Kafka等等。但如果是Android項目,RxJava2更為合適(來自Reactor3官方文檔的建議)。

    老實講,Reactor3的文檔內容更豐富。

    什么是響應式系統

    響應式宣言里面說的很清楚,一個響應式系統應當是:

    • 靈敏的:能夠及時響應
    • 有回復性的:即使遇到故障,也能夠自行恢復、并產生回復
    • 可伸縮的:能夠隨著工作負載的變化,自行調用或釋放計算資源;也能夠隨著計算資源的變化,相應的調整工作負載能力
    • 消息驅動的:顯式的消息傳遞能夠實現系統各組件解耦,各類組件自行管理資源調度。

    構建響應式Web系統

    Vert.X

    Vert.X目前由Eclipse基金會維護,打造了一整套響應式Web系統開發環境,包括數據庫管理、消息隊列、微服務、權限認證、集群管理器、Devops等等,生態很豐富。

    Vert.X Core框架基于Netty開發,是一種事件驅動框架:每當事件可行時都會調用其對應的handler。在Vert.X里,有專門的線程負責調用handler,被稱作eventloop。每一個Vert.X實例都維護了多個eventloop。

    Vert.X Core框架有兩個重要的概念:Verticle和Event Bus。

    Verticle

    Verticle類似于Actor模型的Actor角色。

    Actor是什么?

    這里泛泛的說一下吧。

    Actor模型主要針對于分布式計算系統。Actor是其中最基本的計算單元。每一個Actor都有一個私有的消息隊列。Actor之間的通信依靠發送消息。每一個Actor都可以并發地做到:

  • 向其他Actor發送消息
  • 創建新的Actor
  • 指定當接收到下一個消息時的行為
  • Verticle之間的消息傳遞依賴于下面要說的Event Bus。

    Vert.X為Verticle的部署提供了高可用特性:在Vert.X集群中,如果一個節點的上運行的Veticle實例失效,其他節點就會重新部署一份新的Verticle實例。

    Verticle只是Vert.X提供的一種方案,并非強制使用。

    Event Bus

    Event Bus是Vert.X框架的中樞系統,能夠實現系統中各組件的消息傳遞和handler的注冊與注銷。其消息傳遞既支持“訂閱-發布”模式,也支持“請求-響應”模式。

    當多個Vert.X實例組成集群的時候,各系統的Event Bus能夠組成一個統一的分布式Event Bus。各Event Bus節點相互之間通過TCP協議通信,沒有依賴Cluster Manager。這是一種可以實現節點發現,提供了分布式基礎組件(鎖、計數器、map)等的組件。

    Spring WebFlux

    Spring5的亮點之一就是響應式框架Spring WebFlux,使用自家的Reactor3開發,但同樣支持RxJava。

    Spring WebFlux的默認服務端容器是Reactor Netty,也可以使用Undertow或者Tomcat、Jetty的實現了Servlet 3.1 非阻塞API接口的版本。Spring WebFlux分別為這些容器實現了與Reactive Streams規范實現框架交互的適配器(Adapter),沒有向用戶層暴露Servlet API。

    Spring WebFlux的注解方式和Spring MVC很像。這有助于開發團隊快速適應新框架。而且Spring WebFlux兼容Tomcat、Jetty,有助于項目運維工作的穩定性。

    但如果是新的項目、新的團隊,給我大概會選Vert.X,因為Event Bus確實很吸引人。

    參考資料

    • 深入理解Java虛擬機 第二版
    • Netty的高性能及NIO的epoll空輪詢bug
    • JAVA NIO存在的問題
    • Reactor模式詳解
    • Netty實戰
    • Guide to the Fork/Join Framework in Java
    • Java's Fork/Join vs ExecutorService - when to use which?
    • ReactiveX
    • RxJava Essentials 中文翻譯版
    • Reactor 3 Reference Guide
    • 使用 Reactor 進行反應式編程
    • Vert.x Core Manual
    • Spring 5 Documentation - Web on Reactive Stack

    延伸閱讀

    • Five ways to maximize Java NIO and NIO.2
    • ForkJoinPool的commonPool相關參數配置
    • Is there anything wrong with using I/O + ManagedBlocker in Java8 parallelStream()?
    • Can I use the work-stealing behaviour of ForkJoinPool to avoid a thread starvation deadlock?

    總結

    以上是生活随笔為你收集整理的Java的HTTP服务端响应式编程的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。