日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

reactive stream协议详解

發(fā)布時(shí)間:2024/2/28 编程问答 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 reactive stream协议详解 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

文章目錄

  • 背景
  • 什么是reactive stream
  • 深入了解java版本的reactive stream
    • Publisher
    • Subscriber
    • Subscription
    • Processor
  • JDK中reactive stream的實(shí)現(xiàn)
  • 總結(jié)

背景

Stream大家應(yīng)該都很熟悉了,java8中為所有的集合類都引入了Stream的概念。優(yōu)雅的鏈?zhǔn)讲僮?#xff0c;流式處理邏輯,相信用過的人都會愛不釋手。

每個(gè)數(shù)據(jù)流都有一個(gè)生產(chǎn)者一個(gè)消費(fèi)者。生產(chǎn)者負(fù)責(zé)產(chǎn)生數(shù)據(jù),而消費(fèi)者負(fù)責(zé)消費(fèi)數(shù)據(jù)。如果是同步系統(tǒng),生產(chǎn)一個(gè)消費(fèi)一個(gè)沒什么問題。但是如果在異步系統(tǒng)中,就會產(chǎn)生問題。

因?yàn)樯a(chǎn)者無法感知消費(fèi)者的狀態(tài),不知道消費(fèi)者到底是繁忙狀態(tài)還是空閑狀態(tài),是否有能力去消費(fèi)更多的數(shù)據(jù)。

一般來說數(shù)據(jù)隊(duì)列的長度都是有限的,即使沒有做限制,但是系統(tǒng)的內(nèi)存也是有限的。當(dāng)太多的數(shù)據(jù)沒有被消費(fèi)的話,會導(dǎo)致內(nèi)存溢出或者數(shù)據(jù)得不到即使處理的問題。

這時(shí)候就需要back-pressure了。

如果消息接收方消息處理不過來,則可以通知消息發(fā)送方,告知其正在承受壓力,需要降低負(fù)載。back-pressure是一種消息反饋機(jī)制,從而使系統(tǒng)得以優(yōu)雅地響應(yīng)負(fù)載, 而不是在負(fù)載下崩潰。

而reactive stream的目的就是用來管理異步服務(wù)的流數(shù)據(jù)交換,并能夠讓接收方自主決定接受數(shù)據(jù)的頻率。back-pressure就是reactive stream中不可或缺的一部分。

更多內(nèi)容請?jiān)L問www.flydean.com

什么是reactive stream

上面我們講到了reactive stream的作用,大家應(yīng)該對reactive stream有了一個(gè)基本的了解。這里我們再給reactive stream做一個(gè)定義:

reactive stream就是一個(gè)異步stream處理的標(biāo)準(zhǔn),它的特點(diǎn)就是非阻塞的back pressure。

reactive stream只是一個(gè)標(biāo)準(zhǔn),它定義了實(shí)現(xiàn)非阻塞的back pressure的最小區(qū)間的接口,方法和協(xié)議。

所以reactive stream其實(shí)有很多種實(shí)現(xiàn)的,不僅僅是java可以使用reactive stream,其他的編程語言也可以。

reactive stream只是定義了最基本的功能,各大實(shí)現(xiàn)在實(shí)現(xiàn)了基本功能的同時(shí)可以自由擴(kuò)展。

目前reactive stream最新的java版本是1.0.3,是在2019年8月23發(fā)布的。它包含了java API,協(xié)議定義文件,測試工具集合和具體的實(shí)現(xiàn)例子。

深入了解java版本的reactive stream

在介紹java版本的reactive stream之前,我們先回顧一下reactive stream需要做哪些事情:

  • 能夠處理無效數(shù)量的消息
  • 消息處理是有順序的
  • 可以異步的在組件之間傳遞消息
  • 一定是非阻塞和backpressure的
  • 為了實(shí)現(xiàn)這4個(gè)功能,reactive stream定義了4個(gè)接口,Publisher,Subscriber,Subscription,Processor。這四個(gè)接口實(shí)際上是一個(gè)觀察者模式的實(shí)現(xiàn)。接下來我們詳細(xì)來分析一下各個(gè)接口的作用和約定。

    Publisher

    先看下Publisher的定義:

    public interface Publisher<T> {public void subscribe(Subscriber<? super T> s); }

    Publisher就是用來生成消息的。它定義了一個(gè)subscribe方法,傳入一個(gè)Subscriber。這個(gè)方法用來將Publisher和Subscriber進(jìn)行連接。

    一個(gè)Publisher可以連接多個(gè)Subscriber。

    每次調(diào)用subscribe建立連接,都會創(chuàng)建一個(gè)新的Subscription,Subscription和subscriber是一一對應(yīng)的。

    一個(gè)Subscriber只能夠subscribe一次Publisher。

    如果subscribe失敗或者被拒絕,則會出發(fā)Subscriber.onError(Throwable)方法。

    Subscriber

    先看下Subscriber的定義:

    public interface Subscriber<T> {public void onSubscribe(Subscription s);public void onNext(T t);public void onError(Throwable t);public void onComplete(); }

    Subscriber就是消息的接收者。

    在Publisher和Subscriber建立連接的時(shí)候會觸發(fā)onSubscribe(Subscription s)方法。

    當(dāng)調(diào)用Subscription.request(long)方法時(shí),onNext(T t)會被觸發(fā),根據(jù)request請求參數(shù)的大小,onNext會被觸發(fā)一次或者多次。

    在發(fā)生異常或者結(jié)束時(shí)會觸發(fā)onError(Throwable t)或者onComplete()方法。

    Subscription

    先看下Subscription的定義:

    public interface Subscription {public void request(long n);public void cancel(); }

    Subscription代表著一對一的Subscriber和Publisher之間的Subscribe關(guān)系。

    request(long n)意思是向publisher請求多少個(gè)events,這會觸發(fā)Subscriber.onNext方法。

    cancel()則是請求Publisher停止發(fā)送信息,并清除資源。

    Processor

    先看下Processor的定義:

    public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }

    Processor即是Subscriber又是Publisher,它代表著一種處理狀態(tài)。

    JDK中reactive stream的實(shí)現(xiàn)

    在JDK中java.util.concurrent.Flow就是reactive stream語義的一種實(shí)現(xiàn)。

    Flow從JDK9就開始有了。我們看下它的結(jié)構(gòu):

    從上圖我們可以看到在JDK中Flow是一個(gè)final class,而Subscriber,Publisher,Subscription,Processor都是它的內(nèi)部類。

    我們會在后面的文章中繼續(xù)講解JDK中Flow的使用。敬請期待。

    總結(jié)

    reactive stream的出現(xiàn)有效的解決了異步系統(tǒng)中的背壓問題。只不過reactive stream只是一個(gè)接口標(biāo)準(zhǔn)或者說是一種協(xié)議,具體的實(shí)現(xiàn)還需要自己去實(shí)現(xiàn)。

    更多精彩內(nèi)容且看:

    • 區(qū)塊鏈從入門到放棄系列教程-涵蓋密碼學(xué),超級賬本,以太坊,Libra,比特幣等持續(xù)更新
    • Spring Boot 2.X系列教程:七天從無到有掌握Spring Boot-持續(xù)更新
    • Spring 5.X系列教程:滿足你對Spring5的一切想象-持續(xù)更新
    • java程序員從小工到專家成神之路(2020版)-持續(xù)更新中,附詳細(xì)文章教程

    本文作者:flydean程序那些事

    本文鏈接:http://www.flydean.com/reactive-stream-protocol/

    本文來源:flydean的博客

    歡迎關(guān)注我的公眾號:程序那些事,更多精彩等著您!

    總結(jié)

    以上是生活随笔為你收集整理的reactive stream协议详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。