日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

从 RxJS 到 Flink:如何处理数据流?

發(fā)布時間:2024/9/3 javascript 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 从 RxJS 到 Flink:如何处理数据流? 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
簡介:前端開發(fā)的本質(zhì)是什么?響應(yīng)式編程相對于 MVVM 或者 Redux 有什么優(yōu)點(diǎn)?響應(yīng)式編程的思想是否可以應(yīng)用到后端開發(fā)中?本文以一個新聞網(wǎng)站為例,闡述在前端開發(fā)中如何使用響應(yīng)式編程思想;再以計(jì)算電商平臺雙11每小時成交額為例,分享同樣的思想在實(shí)時計(jì)算中的相同與不同之處。

一 前端開發(fā)在開發(fā)什么

大家在前端開發(fā)的過程中,可能會想過這樣一個問題:前端開發(fā)究竟是在開發(fā)什么?在我看來,前端開發(fā)的本質(zhì)是讓網(wǎng)頁視圖能夠正確地響應(yīng)相關(guān)事件。在這句話中有三個關(guān)鍵字:"網(wǎng)頁視圖","正確地響應(yīng)"和"相關(guān)事件"。

"相關(guān)事件"可能包括頁面點(diǎn)擊,鼠標(biāo)滑動,定時器,服務(wù)端請求等等,"正確地響應(yīng)"意味著我們要根據(jù)相關(guān)的事件來修改一些狀態(tài),而"網(wǎng)頁視圖"就是我們前端開發(fā)中最熟悉的部分了。

按照這樣的觀點(diǎn)我們可以給出這樣 視圖 = 響應(yīng)函數(shù)(事件) 的公式:

View = reactionFn(Event)

在前端開發(fā)中,需要被處理事件可以歸類為以下三種:

  • 用戶執(zhí)行頁面動作,例如 click, mousemove 等事件。
  • 遠(yuǎn)程服務(wù)端與本地的數(shù)據(jù)交互,例如 fetch, websocket。
  • 本地的異步事件,例如 setTimeout, setInterval async_event。


這樣我們的公式就可以進(jìn)一步推導(dǎo)為:

View = reactionFn(UserEvent | Timer | Remote API)

二 應(yīng)用中的邏輯處理

為了能夠更進(jìn)一步理解這個公式與前端開發(fā)的關(guān)系,我們以新聞網(wǎng)站舉例,該網(wǎng)站有以下三個要求:

  • 單擊刷新:單擊 Button 刷新數(shù)據(jù)。
  • 勾選刷新:勾選 Checkbox 時自動刷新,否則停止自動刷新。
  • 下拉刷新:當(dāng)用戶從屏幕頂端下拉時刷新數(shù)據(jù)。

如果從前端的角度分析,這三種需求分別對應(yīng)著:

  • 單擊刷新:click -> fetch
  • 勾選刷新:change -> (setInterval + clearInterval) -> fetch
  • 下拉刷新:(touchstart + touchmove + touchend) -> fetch news_app

1 MVVM

在 MVVM 的模式下,對應(yīng)上文的響應(yīng)函數(shù)(reactionFn)會在 Model 與 ViewModel 或者 View 與 ViewModel 之間進(jìn)行被執(zhí)行,而事件 (Event) 會在 View 與 ViewModel 之間進(jìn)行處理。

MVVM 可以很好的抽象視圖層與數(shù)據(jù)層,但是響應(yīng)函數(shù)(reactionFn)會散落在不同的轉(zhuǎn)換過程中,這會導(dǎo)致數(shù)據(jù)的賦值與收集過程難以進(jìn)行精確追蹤。另外因?yàn)槭录?(Event) 的處理在該模型中與視圖部分緊密相關(guān),導(dǎo)致 View 與 ViewModel 之間對事件處理的邏輯復(fù)用困難。

2 Redux

在 Redux 最簡單的模型下,若干個事件 (Event) 的組合會對應(yīng)到一個 Action 上,而 reducer 函數(shù)可以被直接認(rèn)為與上文提到的響應(yīng)函數(shù) (reactionFn) 對應(yīng)。

但是在 Redux 中:

  • State 只能用于描述中間狀態(tài),而不能描述中間過程。
  • Action 與 Event 的關(guān)系并非一一對應(yīng)導(dǎo)致 State 難以追蹤實(shí)際變化來源。

3 響應(yīng)式編程與 RxJS

維基百科中是這樣定義響應(yīng)式編程:

在計(jì)算中,響應(yīng)式編程或反應(yīng)式編程(英語:Reactive programming)是一種面向數(shù)據(jù)流和變化傳播的聲明式編程范式。這意味著可以在編程語言中很方便地表達(dá)靜態(tài)或動態(tài)的數(shù)據(jù)流,而相關(guān)的計(jì)算模型會自動將變化的值通過數(shù)據(jù)流進(jìn)行傳播。

以數(shù)據(jù)流維度重新考慮用戶使用該應(yīng)用的流程:

  • 點(diǎn)擊按鈕 -> 觸發(fā)刷新事件 -> 發(fā)送請求 -> 更新視圖
  • 勾選自動刷新
  • 手指觸摸屏幕
  • 自動刷新間隔 -> 觸發(fā)刷新事件 -> 發(fā)送請求 -> 更新視圖
  • 手指在屏幕上下滑
  • 自動刷新間隔 -> 觸發(fā)刷新事件 -> 發(fā)送請求 -> 更新視圖
  • 手指在屏幕上停止滑動 -> 觸發(fā)下拉刷新事件 -> 發(fā)送請求 -> 更新視圖
  • 自動刷新間隔 -> 觸發(fā)刷新事件 -> 發(fā)送請求 -> 更新視圖
  • 關(guān)閉自動刷新

以 Marbles 圖表示:

拆分上圖邏輯,就會得到使用響應(yīng)式編程開發(fā)當(dāng)前新聞應(yīng)用時的三個步驟:

  • 定義源數(shù)據(jù)流
  • 組合/轉(zhuǎn)換數(shù)據(jù)流
  • 消費(fèi)數(shù)據(jù)流并更新視圖

我們分別來進(jìn)行詳細(xì)描述。

定義源數(shù)據(jù)流

使用 RxJS,我們可以很方便的定義出各種 Event 數(shù)據(jù)流。

1)單擊操作

涉及 click 數(shù)據(jù)流。

click$ = fromEvent<MouseEvent>(document.querySelector('button'), 'click');

2)勾選操作

涉及 change 數(shù)據(jù)流。

change$ = fromEvent(document.querySelector('input'), 'change');

3)下拉操作

涉及 touchstart, touchmove 與 touchend 三個數(shù)據(jù)流。

touchstart$ = fromEvent<TouchEvent>(document, 'touchstart'); touchend$ = fromEvent<TouchEvent>(document, 'touchend'); touchmove$ = fromEvent<TouchEvent>(document, 'touchmove');

4)定時刷新

interval$ = interval(5000);

5)服務(wù)端請求

fetch$ = fromFetch('https://randomapi.azurewebsites.net/api/users');

組合/轉(zhuǎn)換數(shù)據(jù)流

1)點(diǎn)擊刷新事件流

在點(diǎn)擊刷新時,我們希望短時間內(nèi)多次點(diǎn)擊只觸發(fā)最后一次,這通過 RxJS 的 debounceTime operator 就可以實(shí)現(xiàn)。

clickRefresh$ = this.click$.pipe(debounceTime(300));

2)自動刷新流

使用 RxJS 的 switchMap 與之前定義好的 interval$ 數(shù)據(jù)流配合。

autoRefresh$ = change$.pipe(switchMap(enabled => (enabled ? interval$ : EMPTY)) );

3)下拉刷新流

結(jié)合之前定義好的 touchstart$touchmove$ 與 touchend$ 數(shù)據(jù)流。

pullRefresh$ = touchstart$.pipe(switchMap(touchStartEvent =>touchmove$.pipe(map(touchMoveEvent => touchMoveEvent.touches[0].pageY - touchStartEvent.touches[0].pageY),takeUntil(touchend$))),filter(position => position >= 300),take(1),repeat() );

最后,我們通過 merge 函數(shù)將定義好的 clickRefresh$autoRefresh$ 與 pullRefresh$ 合并,就得到了刷新數(shù)據(jù)流。

refresh$ = merge(clickRefresh$, autoRefresh$, pullRefresh$));

消費(fèi)數(shù)據(jù)流并更新視圖

將刷新數(shù)據(jù)流直接通過 switchMap 打平到在第一步到定義好的 fetch$,我們就獲得了視圖數(shù)據(jù)流。

可以通過在 Angular 框架中可以直接 async pipe 將視圖流直接映射為視圖:

<div *ngFor="let user of view$ | async"> </div>

在其他框架中可以通過 subscribe 獲得數(shù)據(jù)流中的真實(shí)數(shù)據(jù),再更新視圖。

至此,我們就使用響應(yīng)式編程完整的開發(fā)完成了當(dāng)前新聞應(yīng)用,示例代碼[1]由 Angular 開發(fā),行數(shù)不超過 160 行。

我們總結(jié)一下,使用響應(yīng)式編程思想開發(fā)前端應(yīng)用時經(jīng)歷的三個過程與第一節(jié)中公式的對應(yīng)關(guān)系:

View = reactionFn(UserEvent | Timer | Remote API)

1)描述源數(shù)據(jù)流

與事件UserEvent | Timer | Remote API 對應(yīng),在 RxJS 中對應(yīng)函數(shù)分別是:

  • UserEvent: fromEvent
  • Timer: interval, timer
  • Remote API: fromFetch, webSocket

2)組合轉(zhuǎn)換數(shù)據(jù)流

與響應(yīng)函數(shù)(reactionFn)對應(yīng),在 RxJS 中對應(yīng)的部分方法是:

  • COMBINING: merge, combineLatest, zip
  • MAPPING: map
  • FILTERING: filter
  • REDUCING: reduce, max, count, scan
  • TAKING: take, takeWhile
  • SKIPPING: skip, skipWhile, takeLast, last
  • TIME: delay, debounceTime, throttleTime

3)消費(fèi)數(shù)據(jù)流更新視圖

與 View 對應(yīng),在 RxJS 及 Angular 中可以使用:

  • subscribe
  • async pipe

響應(yīng)式編程相對于 MVVM 或者 Redux 有什么優(yōu)點(diǎn)呢?

  • 描述事件發(fā)生的本身,而非計(jì)算過程或者中間狀態(tài)。
  • 提供了組合和轉(zhuǎn)換數(shù)據(jù)流的方法,這也意味著我們獲得了復(fù)用持續(xù)變化數(shù)據(jù)的方法。
  • 由于所有數(shù)據(jù)流均由層層組合與轉(zhuǎn)換獲得,這也就意味著我們可以精確追蹤事件及數(shù)據(jù)變化的來源。

如果我們將 RxJS 的 Marbles 圖的時間軸模糊,并在每次視圖更新時增加縱切面,我們就會發(fā)現(xiàn)這樣兩件有趣的事情:

  • Action 是 EventStream 的簡化。
  • State 是 Stream 在某個時刻的對應(yīng)。

難怪我們可以在 Redux 官網(wǎng)中有這樣一句話:如果你已經(jīng)使用了 RxJS,很可能你不再需要 Redux 了。

The question is: do you really need Redux if you already use Rx? Maybe not. It's not hard to re-implement Redux in Rx. Some say it's a two-liner using Rx.scan() method. It may very well be!

寫到這里,我們對網(wǎng)頁視圖能夠正確地響應(yīng)相關(guān)事件這句話是否可以進(jìn)行進(jìn)一步的抽象呢?

所有事件 -- 找到 --> 相關(guān)事件 -- 做出 --> 響應(yīng)

而按時間順序發(fā)生的事件,本質(zhì)上就是數(shù)據(jù)流,進(jìn)一步拓展就可變成:

源數(shù)據(jù)流 -- 轉(zhuǎn)換 --> 中間數(shù)據(jù)流 -- 訂閱 --> 消費(fèi)數(shù)據(jù)流

這正是響應(yīng)式編程在前端能夠完美工作的基礎(chǔ)思想。但是該思想是否只在前端開發(fā)中有所應(yīng)用呢?

答案是否定的,該思想不僅可以應(yīng)用于前端開發(fā),在后端開發(fā)乃至實(shí)時計(jì)算中都有著廣泛的應(yīng)用。

三 打破信息之墻

在前后端開發(fā)者之間,通常由一面叫 REST API 的信息之墻隔開,REST API 隔離了前后端開發(fā)者的職責(zé),提升了開發(fā)效率。但它同樣讓前后端開發(fā)者的眼界被這面墻隔開,讓我們試著來推倒這面信息之墻,一窺同樣的思想在實(shí)時計(jì)算中的應(yīng)用。

1 實(shí)時計(jì)算 與 Apache Flink

在開始下一部分之前,讓我們先介紹一下 Flink。Apache Flink 是由 Apache 軟件基金會開發(fā)的開源流處理框架,用于在無邊界和有邊界數(shù)據(jù)流上進(jìn)行有狀態(tài)的計(jì)算。它的數(shù)據(jù)流編程模型在有限和無限數(shù)據(jù)集上提供單次事件(event-at-a-time)處理能力。

在實(shí)際的應(yīng)用中,Flink 通常用于開發(fā)以下三種應(yīng)用:

  • 事件驅(qū)動型應(yīng)用 事件驅(qū)動型應(yīng)用從一個或多個事件流提取數(shù)據(jù),并根據(jù)到來的事件觸發(fā)計(jì)算、狀態(tài)更新或其他外部動作。場景包括基于規(guī)則的報警,異常檢測,反欺詐等等。
  • 數(shù)據(jù)分析應(yīng)用 數(shù)據(jù)分析任務(wù)需要從原始數(shù)據(jù)中提取有價值的信息和指標(biāo)。例如雙十一成交額計(jì)算,網(wǎng)絡(luò)質(zhì)量監(jiān)測等等。
  • 數(shù)據(jù)管道(ETL)應(yīng)用 提取-轉(zhuǎn)換-加載(ETL)是一種在存儲系統(tǒng)之間進(jìn)行數(shù)據(jù)轉(zhuǎn)換和遷移的常用方法。ETL 作業(yè)通常會周期性地觸發(fā),將數(shù)據(jù)從事務(wù)型數(shù)據(jù)庫拷貝到分析型數(shù)據(jù)庫或數(shù)據(jù)倉庫。

我們這里以計(jì)算電商平臺雙十一每小時成交額為例,看下我們在之前章節(jié)得到方案是否仍然可以繼續(xù)使用。

在這個場景中我們首先要獲取用戶購物下單數(shù)據(jù),隨后計(jì)算每小時成交數(shù)據(jù),然后將每小時的成交數(shù)據(jù)轉(zhuǎn)存到數(shù)據(jù)庫并被 Redis 緩存,最終通過接口獲取后展示在頁面中。

在這個鏈路中的數(shù)據(jù)流處理邏輯為:

用戶下單數(shù)據(jù)流 -- 轉(zhuǎn)換 --> 每小時成交數(shù)據(jù)流 -- 訂閱 --> 寫入數(shù)據(jù)庫

與之前章節(jié)中介紹的:

源數(shù)據(jù)流 -- 轉(zhuǎn)換 --> 中間數(shù)據(jù)流 -- 訂閱 --> 消費(fèi)數(shù)據(jù)流

思想完全一致。

如果我們用 Marbles 描述這個過程,就會得到這樣的結(jié)果,看起來很簡單,似乎使用 RxJS 的 window operator 也可以完成同樣的功能,但是事實(shí)真的如此嗎?

2 被隱藏的復(fù)雜度

真實(shí)的實(shí)時計(jì)算比前端中響應(yīng)式編程的復(fù)雜度要高很多,我們在這里舉幾個例子:

事件亂序

在前端開發(fā)過程中,我們也會碰到事件亂序的情況,最經(jīng)典的情況先發(fā)起的請求后收到響應(yīng),可以用如下的 Marbles 圖表示。這種情況在前端有很多種辦法進(jìn)行處理,我們在這里就略過不講。

我們今天想介紹的是數(shù)據(jù)處理時面臨的時間亂序情況。在前端開發(fā)中,我們有一個很重要的前提,這個前提大幅度降低了開發(fā)前端應(yīng)用的復(fù)雜度,那就是:前端事件的發(fā)生時間和處理時間相同。

想象一下,如果用戶執(zhí)行頁面動作,例如 click, mousemove 等事件都變成了異步事件,并且響應(yīng)時間未知,那整個前端的開發(fā)復(fù)雜度會如何。

但是事件的發(fā)生時間與處理時間不同,在實(shí)時計(jì)算領(lǐng)域是一個重要的前提。我們?nèi)砸悦啃r成交額計(jì)算為例,當(dāng)原始數(shù)據(jù)流經(jīng)過層層傳輸之后,在計(jì)算節(jié)點(diǎn)的數(shù)據(jù)的先后順很可能已經(jīng)亂序了。

如果我們?nèi)匀灰詳?shù)據(jù)的到來時間來進(jìn)行窗口劃分,最后的計(jì)算結(jié)果就會產(chǎn)生錯誤:

為了讓 window2 的窗口的計(jì)算結(jié)果正確,我們需要等待 late event 到來之后進(jìn)行計(jì)算,但是這樣我們就面臨了一個兩難問題:

  • 無限等下去:late event 可能在傳輸過程中丟失,window2 窗口永遠(yuǎn)沒有數(shù)據(jù)產(chǎn)出。
  • 等待時間太短:late event 還沒有到來,計(jì)算結(jié)果錯誤。

Flink 引入了 Watermark 機(jī)制來解決這個問題,Watermark 定義了什么時候不再等待 late event,本質(zhì)上提供了實(shí)時計(jì)算的準(zhǔn)確性和實(shí)時性的折中方案。

關(guān)于 Watermark 有個形象的比喻:上學(xué)的時候,老師會將班級的門關(guān)上,然后說:“從這個點(diǎn)之后來的同學(xué)都算遲到了,統(tǒng)統(tǒng)罰站“。在 Flink 中,Watermark 充當(dāng)了老師關(guān)門的這個動作。

數(shù)據(jù)反壓

在瀏覽器中使用 RxJS 時,不知道大家有沒有考慮這樣一種情況:observable 產(chǎn)生的速度快于 operator 或者 observer 消費(fèi)的速度時,會產(chǎn)生大量的未消費(fèi)的數(shù)據(jù)被緩存在內(nèi)存中。這種情況被稱為反壓,幸運(yùn)的是,在前端產(chǎn)生數(shù)據(jù)反壓只會導(dǎo)致瀏覽器內(nèi)存被大量占用,除此之外不會有更嚴(yán)重的后果。

但是在實(shí)時計(jì)算中,當(dāng)數(shù)據(jù)產(chǎn)生的速度高于中間節(jié)點(diǎn)處理能力,或者超過了下游數(shù)據(jù)的消費(fèi)能力時,應(yīng)當(dāng)如何處理?


對于許多流應(yīng)用程序來說,數(shù)據(jù)丟失是不可接受的,為了保證這一點(diǎn),Flink 設(shè)計(jì)了這樣一種機(jī)制:

  • 在理想情況,在一個持久通道中緩沖數(shù)據(jù)。
  • 當(dāng)數(shù)據(jù)產(chǎn)生的速度高于中間節(jié)點(diǎn)處理能力,或者超過了下游數(shù)據(jù)的消費(fèi)能力時,速度較慢的接收器會在隊(duì)列的緩沖作用耗盡后立即降低發(fā)送器的速度。更形象的比喻是,在數(shù)據(jù)流流速變慢時,將整個管道從水槽“回壓”到水源,并對水源進(jìn)行節(jié)流,以便將速度調(diào)整到最慢的部分,從而達(dá)到穩(wěn)定狀態(tài)。

Checkpoint

實(shí)時計(jì)算領(lǐng)域,每秒鐘處理的數(shù)據(jù)可能有數(shù)十億條,這些數(shù)據(jù)的處理不可能由單臺機(jī)器獨(dú)立完成。事實(shí)上,在 Flink 中,operator 運(yùn)算邏輯會由不同的 subtask 在 不同的 taskmanager 上執(zhí)行,這時我們就面臨了另外一個問題,當(dāng)某臺機(jī)器發(fā)生問題時,整體的運(yùn)算邏輯與狀態(tài)該如何處理才能保證最后運(yùn)算結(jié)果的正確性?

Flink 中引入了 checkpoint 機(jī)制用于保證可以對作業(yè)的狀態(tài)和計(jì)算位置進(jìn)行恢復(fù),checkpoint 使 Flink 的狀態(tài)具有良好的容錯性。Flink 使用了 Chandy-Lamport algorithm 算法的一種變體,稱為異步 barrier 快照(asynchronous barrier snapshotting)。

當(dāng)開始 checkpoint 時,它會讓所有 sources 記錄它們的偏移量,并將編號的 checkpoint barriers 插入到它們的流中。這些 barriers 會經(jīng)過每個 operator 時標(biāo)注每個 checkpoint 前后的流部分。

當(dāng)發(fā)生錯誤時,Flink 可以根據(jù) checkpoint 存儲的 state 進(jìn)行狀態(tài)恢復(fù),保證最終結(jié)果的正確性。

冰山一角

由于篇幅的關(guān)系,今天介紹的部分只能是冰山一角,不過

源數(shù)據(jù)流 -- 轉(zhuǎn)換 --> 中間數(shù)據(jù)流 -- 訂閱 --> 消費(fèi)數(shù)據(jù)流

的模型無論在響應(yīng)式編程還是實(shí)時計(jì)算都是通用的,希望這篇文章能夠讓大家對數(shù)據(jù)流的思想有更多的思考。

相關(guān)鏈接

[1]https://github.com/vthinkxie/ng-pull-refresh

原文鏈接:https://developer.aliyun.com/article/780371?

版權(quán)聲明:本文內(nèi)容由阿里云實(shí)名注冊用戶自發(fā)貢獻(xiàn),版權(quán)歸原作者所有,阿里云開發(fā)者社區(qū)不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。具體規(guī)則請查看《阿里云開發(fā)者社區(qū)用戶服務(wù)協(xié)議》和《阿里云開發(fā)者社區(qū)知識產(chǎn)權(quán)保護(hù)指引》。如果您發(fā)現(xiàn)本社區(qū)中有涉嫌抄襲的內(nèi)容,填寫侵權(quán)投訴表單進(jìn)行舉報,一經(jīng)查實(shí),本社區(qū)將立刻刪除涉嫌侵權(quán)內(nèi)容。

總結(jié)

以上是生活随笔為你收集整理的从 RxJS 到 Flink:如何处理数据流?的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。