日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Flink(二十五):Flink 状态管理

發布時間:2023/11/28 生活经验 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Flink(二十五):Flink 状态管理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

Flink中的有狀態計算

無狀態計算和有狀態計算

無狀態計算

有狀態計算

有狀態計算的場景

狀態的分類

Managed State & Raw State

Keyed State & Operator State

存儲State的數據結構/API介紹


Flink-狀態管理

Flink中的有狀態計算

注意:

Flink中已經對需要進行有狀態計算的API,做了封裝,底層已經維護好了狀態!

例如,之前下面代碼,直接使用即可,不需要像SparkStreaming那樣還得自己寫updateStateByKey

也就是說我們今天學習的State只需要掌握原理,實際開發中一般都是使用Flink底層維護好的狀態或第三方維護好的狀態(如Flink整合Kafka的offset維護底層就是使用的State,但是人家已經寫好了的)

package cn.it.source;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** Author lanson* Desc* SocketSource*/
public class SourceDemo03 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.sourceDataStream<String> linesDS = env.socketTextStream("node1", 9999);//3.處理數據-transformation//3.1每一行數據按照空格切分成一個個的單詞組成一個集合DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {//value就是一行行的數據String[] words = value.split(" ");for (String word : words) {out.collect(word);//將切割處理的一個個的單詞收集起來并返回}}});//3.2對集合中的每個單詞記為1DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {//value就是進來一個個的單詞return Tuple2.of(value, 1);}});//3.3對數據按照單詞(key)進行分組//KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);//3.4對各個組內的數據按照數量(value)進行聚合就是求sumDataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);//4.輸出結果-sinkresult.print();//5.觸發執行-executeenv.execute();}
}

執行 netcat,然后在終端輸入 hello world,執行程序會輸出什么?

答案很明顯,(hello, 1)和 (word,1)

那么問題來了,如果再次在終端輸入 hello world,程序會輸入什么?

答案其實也很明顯,(hello, 2)和(world, 2)。

為什么 Flink 知道之前已經處理過一次 hello world,這就是 state 發揮作用了,這里是被稱為 keyed state 存儲了之前需要統計的數據,所以 Flink 知道 hello 和 world 分別出現過一次。

無狀態計算和有狀態計算

無狀態計算

不需要考慮歷史數據

相同的輸入得到相同的輸出就是無狀態計算, 如map/flatMap/filter....

首先舉一個無狀態計算的例子:消費延遲計算。

假設現在有一個消息隊列,消息隊列中有一個生產者持續往消費隊列寫入消息,多個消費者分別從消息隊列中讀取消息。

從圖上可以看出,生產者已經寫入 16 條消息,Offset 停留在 15 ;有 3 個消費者,有的消費快,而有的消費慢。消費快的已經消費了 13 條數據,消費者慢的才消費了 7、8 條數據。

如何實時統計每個消費者落后多少條數據,如圖給出了輸入輸出的示例。可以了解到輸入的時間點有一個時間戳,生產者將消息寫到了某個時間點的位置,每個消費者同一時間點分別讀到了什么位置。剛才也提到了生產者寫入了 15 條,消費者分別讀取了 10、7、12 條。那么問題來了,怎么將生產者、消費者的進度轉換為右側示意圖信息呢?

consumer 0 落后了 5 條,consumer 1 落后了 8 條,consumer 2 落后了 3 條,根據 Flink 的原理,此處需進行 Map 操作。Map 首先把消息讀取進來,然后分別相減,即可知道每個 consumer 分別落后了幾條。Map 一直往下發,則會得出最終結果。

大家會發現,在這種模式的計算中,無論這條輸入進來多少次,輸出的結果都是一樣的,因為單條輸入中已經包含了所需的所有信息。消費落后等于生產者減去消費者。生產者的消費在單條數據中可以得到,消費者的數據也可以在單條數據中得到,所以相同輸入可以得到相同輸出,這就是一個無狀態的計算。

有狀態計算

需要考慮歷史數據

相同的輸入得到不同的輸出/不一定得到相同的輸出,就是有狀態計算,如:sum/reduce

以訪問日志統計量的例子進行說明,比如當前拿到一個 Nginx 訪問日志,一條日志表示一個請求,記錄該請求從哪里來,訪問的哪個地址,需要實時統計每個地址總共被訪問了多少次,也即每個 API 被調用了多少次。可以看到下面簡化的輸入和輸出,輸入第一條是在某個時間點請求 GET 了 /api/a;第二條日志記錄了某個時間點 Post /api/b ;第三條是在某個時間點 GET了一個 /api/a,總共有 3 個 Nginx 日志。

從這 3 條 Nginx 日志可以看出,第一條進來輸出 /api/a 被訪問了一次,第二條進來輸出 /api/b 被訪問了一次,緊接著又進來一條訪問 api/a,所以 api/a 被訪問了 2 次。不同的是,兩條 /api/a 的 Nginx 日志進來的數據是一樣的,但輸出的時候結果可能不同,第一次輸出 count=1 ,第二次輸出 count=2,說明相同輸入可能得到不同輸出。輸出的結果取決于當前請求的 API 地址之前累計被訪問過多少次。第一條過來累計是 0 次,count = 1,第二條過來 API 的訪問已經有一次了,所以 /api/a 訪問累計次數 count=2。單條數據其實僅包含當前這次訪問的信息,而不包含所有的信息。要得到這個結果,還需要依賴 API 累計訪問的量,即狀態。

這個計算模式是將數據輸入算子中,用來進行各種復雜的計算并輸出數據。這個過程中算子會去訪問之前存儲在里面的狀態。另外一方面,它還會把現在的數據對狀態的影響實時更新,如果輸入 200 條數據,最后輸出就是 200 條結果。

有狀態計算的場景

什么場景會用到狀態呢?下面列舉了常見的 4 種:

1.去重:比如上游的系統數據可能會有重復,落到下游系統時希望把重復的數據都去掉。去重需要先了解哪些數據來過,哪些數據還沒有來,也就是把所有的主鍵都記錄下來,當一條數據到來后,能夠看到在主鍵當中是否存在。

2.窗口計算:比如統計每分鐘 Nginx 日志 API 被訪問了多少次。窗口是一分鐘計算一次,在窗口觸發前,如 08:00 ~ 08:01 這個窗口,前59秒的數據來了需要先放入內存,即需要把這個窗口之內的數據先保留下來,等到 8:01 時一分鐘后,再將整個窗口內觸發的數據輸出。未觸發的窗口數據也是一種狀態。

3.機器學習/深度學習:如訓練的模型以及當前模型的參數也是一種狀態,機器學習可能每次都用有一個數據集,需要在數據集上進行學習,對模型進行一個反饋。

4.訪問歷史數據:比如與昨天的數據進行對比,需要訪問一些歷史數據。如果每次從外部去讀,對資源的消耗可能比較大,所以也希望把這些歷史數據也放入狀態中做對比。

狀態的分類

Managed State & Raw State

從Flink是否接管角度:可以分為

ManagedState(托管狀態)

RawState(原始狀態)

兩者的區別如下:

  1. 從狀態管理方式的方式來說,Managed State 由 Flink Runtime 管理,自動存儲,自動恢復,在內存管理上有優化;而 Raw State 需要用戶自己管理,需要自己序列化,Flink 不知道 State 中存入的數據是什么結構,只有用戶自己知道,需要最終序列化為可存儲的數據結構。
  2. 從狀態數據結構來說,Managed State 支持已知的數據結構,如 Value、List、Map 等。而 Raw State只支持字節數組 ,所有狀態都要轉換為二進制字節數組才可以。
  3. 從推薦使用場景來說,Managed State 大多數情況下均可使用,而 Raw State 是當 Managed State 不夠用時,比如需要自定義 Operator 時,才會使用 Raw State。

在實際生產中,都只推薦使用ManagedState,后續將圍繞該話題進行討論。

Keyed State & Operator State

Managed State 分為兩種,Keyed State 和 Operator State

(Raw State都是Operator State)

  • Keyed State

在Flink Stream模型中,Datastream 經過 keyBy 的操作可以變為 KeyedStream。

Keyed State是基于KeyedStream上的狀態。這個狀態是跟特定的key綁定的,對KeyedStream流上的每一個key,都對應一個state,如stream.keyBy(…)

KeyBy之后的State,可以理解為分區過的State,每個并行keyed Operator的每個實例的每個key都有一個Keyed State,即<parallel-operator-instance,key>就是一個唯一的狀態,由于每個key屬于一個keyed Operator的并行實例,因此我們將其簡單的理解為<operator,key>

  • Operator State

這里的fromElements會調用FromElementsFunction的類,其中就使用了類型為 list state 的 operator state

Operator State又稱為 non-keyed state,與Key無關的State,每一個 operator state 都僅與一個 operator 的實例綁定。

Operator State 可以用于所有算子,但一般常用于 Source

存儲State的數據結構/API介紹

前面說過有狀態計算其實就是需要考慮歷史數據

而歷史數據需要搞個地方存儲起來

Flink為了方便不同分類的State的存儲和管理,提供了如下的API/數據結構來存儲State!

Keyed State 通過 RuntimeContext 訪問,這需要 Operator 是一個RichFunction。

保存Keyed state的數據結構:

ValueState<T>:即類型為T的單值狀態。這個狀態與對應的key綁定,是最簡單的狀態了。它可以通過update方法更新狀態值,通過value()方法獲取狀態值,如求按用戶id統計用戶交易總額

ListState<T>:即key上的狀態值為一個列表。可以通過add方法往列表中附加值;也可以通過get()方法返回一個Iterable<T>來遍歷狀態值,如統計按用戶id統計用戶經常登錄的Ip

ReducingState<T>:這種狀態通過用戶傳入的reduceFunction,每次調用add方法添加值的時候,會調用reduceFunction,最后合并到一個單一的狀態值

MapState<UK, UV>:即狀態值為一個map。用戶通過put或putAll方法添加元素

需要注意的是,以上所述的State對象,僅僅用于與狀態進行交互(更新、刪除、清空等),而真正的狀態值,有可能是存在內存、磁盤、或者其他分布式存儲系統中。相當于我們只是持有了這個狀態的句柄

Operator State 需要自己實現 CheckpointedFunction 或 ListCheckpointed 接口。

保存Operator state的數據結構:

ListState<T>

BroadcastState<K,V>

舉例來說,Flink中的FlinkKafkaConsumer,就使用了operator state。它會在每個connector實例中,保存該實例中消費topic的所有(partition, offset)映射

總結

以上是生活随笔為你收集整理的2021年大数据Flink(二十五):Flink 状态管理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。