2021年大数据Flink(二十五):Flink 状态管理
目錄
Flink-狀態管理
Flink中的有狀態計算
無狀態計算和有狀態計算
無狀態計算
有狀態計算
有狀態計算的場景
狀態的分類
Managed State & Raw State
Keyed State & Operator State
存儲State的數據結構/API介紹
Flink-狀態管理
Flink中的有狀態計算
注意:
Flink中已經對需要進行有狀態計算的API,做了封裝,底層已經維護好了狀態!
例如,之前下面代碼,直接使用即可,不需要像SparkStreaming那樣還得自己寫updateStateByKey
也就是說我們今天學習的State只需要掌握原理,實際開發中一般都是使用Flink底層維護好的狀態或第三方維護好的狀態(如Flink整合Kafka的offset維護底層就是使用的State,但是人家已經寫好了的)
package cn.it.source;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** Author lanson* Desc* SocketSource*/
public class SourceDemo03 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.sourceDataStream<String> linesDS = env.socketTextStream("node1", 9999);//3.處理數據-transformation//3.1每一行數據按照空格切分成一個個的單詞組成一個集合DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {//value就是一行行的數據String[] words = value.split(" ");for (String word : words) {out.collect(word);//將切割處理的一個個的單詞收集起來并返回}}});//3.2對集合中的每個單詞記為1DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {//value就是進來一個個的單詞return Tuple2.of(value, 1);}});//3.3對數據按照單詞(key)進行分組//KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);//3.4對各個組內的數據按照數量(value)進行聚合就是求sumDataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);//4.輸出結果-sinkresult.print();//5.觸發執行-executeenv.execute();}
}
執行 netcat,然后在終端輸入 hello world,執行程序會輸出什么?
答案很明顯,(hello, 1)和 (word,1)
那么問題來了,如果再次在終端輸入 hello world,程序會輸入什么?
答案其實也很明顯,(hello, 2)和(world, 2)。
為什么 Flink 知道之前已經處理過一次 hello world,這就是 state 發揮作用了,這里是被稱為 keyed state 存儲了之前需要統計的數據,所以 Flink 知道 hello 和 world 分別出現過一次。
無狀態計算和有狀態計算
無狀態計算
不需要考慮歷史數據
相同的輸入得到相同的輸出就是無狀態計算, 如map/flatMap/filter....
首先舉一個無狀態計算的例子:消費延遲計算。
假設現在有一個消息隊列,消息隊列中有一個生產者持續往消費隊列寫入消息,多個消費者分別從消息隊列中讀取消息。
從圖上可以看出,生產者已經寫入 16 條消息,Offset 停留在 15 ;有 3 個消費者,有的消費快,而有的消費慢。消費快的已經消費了 13 條數據,消費者慢的才消費了 7、8 條數據。
如何實時統計每個消費者落后多少條數據,如圖給出了輸入輸出的示例。可以了解到輸入的時間點有一個時間戳,生產者將消息寫到了某個時間點的位置,每個消費者同一時間點分別讀到了什么位置。剛才也提到了生產者寫入了 15 條,消費者分別讀取了 10、7、12 條。那么問題來了,怎么將生產者、消費者的進度轉換為右側示意圖信息呢?
consumer 0 落后了 5 條,consumer 1 落后了 8 條,consumer 2 落后了 3 條,根據 Flink 的原理,此處需進行 Map 操作。Map 首先把消息讀取進來,然后分別相減,即可知道每個 consumer 分別落后了幾條。Map 一直往下發,則會得出最終結果。
大家會發現,在這種模式的計算中,無論這條輸入進來多少次,輸出的結果都是一樣的,因為單條輸入中已經包含了所需的所有信息。消費落后等于生產者減去消費者。生產者的消費在單條數據中可以得到,消費者的數據也可以在單條數據中得到,所以相同輸入可以得到相同輸出,這就是一個無狀態的計算。
有狀態計算
需要考慮歷史數據
相同的輸入得到不同的輸出/不一定得到相同的輸出,就是有狀態計算,如:sum/reduce
以訪問日志統計量的例子進行說明,比如當前拿到一個 Nginx 訪問日志,一條日志表示一個請求,記錄該請求從哪里來,訪問的哪個地址,需要實時統計每個地址總共被訪問了多少次,也即每個 API 被調用了多少次。可以看到下面簡化的輸入和輸出,輸入第一條是在某個時間點請求 GET 了 /api/a;第二條日志記錄了某個時間點 Post /api/b ;第三條是在某個時間點 GET了一個 /api/a,總共有 3 個 Nginx 日志。
從這 3 條 Nginx 日志可以看出,第一條進來輸出 /api/a 被訪問了一次,第二條進來輸出 /api/b 被訪問了一次,緊接著又進來一條訪問 api/a,所以 api/a 被訪問了 2 次。不同的是,兩條 /api/a 的 Nginx 日志進來的數據是一樣的,但輸出的時候結果可能不同,第一次輸出 count=1 ,第二次輸出 count=2,說明相同輸入可能得到不同輸出。輸出的結果取決于當前請求的 API 地址之前累計被訪問過多少次。第一條過來累計是 0 次,count = 1,第二條過來 API 的訪問已經有一次了,所以 /api/a 訪問累計次數 count=2。單條數據其實僅包含當前這次訪問的信息,而不包含所有的信息。要得到這個結果,還需要依賴 API 累計訪問的量,即狀態。
這個計算模式是將數據輸入算子中,用來進行各種復雜的計算并輸出數據。這個過程中算子會去訪問之前存儲在里面的狀態。另外一方面,它還會把現在的數據對狀態的影響實時更新,如果輸入 200 條數據,最后輸出就是 200 條結果。
有狀態計算的場景
什么場景會用到狀態呢?下面列舉了常見的 4 種:
1.去重:比如上游的系統數據可能會有重復,落到下游系統時希望把重復的數據都去掉。去重需要先了解哪些數據來過,哪些數據還沒有來,也就是把所有的主鍵都記錄下來,當一條數據到來后,能夠看到在主鍵當中是否存在。
2.窗口計算:比如統計每分鐘 Nginx 日志 API 被訪問了多少次。窗口是一分鐘計算一次,在窗口觸發前,如 08:00 ~ 08:01 這個窗口,前59秒的數據來了需要先放入內存,即需要把這個窗口之內的數據先保留下來,等到 8:01 時一分鐘后,再將整個窗口內觸發的數據輸出。未觸發的窗口數據也是一種狀態。
3.機器學習/深度學習:如訓練的模型以及當前模型的參數也是一種狀態,機器學習可能每次都用有一個數據集,需要在數據集上進行學習,對模型進行一個反饋。
4.訪問歷史數據:比如與昨天的數據進行對比,需要訪問一些歷史數據。如果每次從外部去讀,對資源的消耗可能比較大,所以也希望把這些歷史數據也放入狀態中做對比。
狀態的分類
Managed State & Raw State
從Flink是否接管角度:可以分為
ManagedState(托管狀態)
RawState(原始狀態)
兩者的區別如下:
- 從狀態管理方式的方式來說,Managed State 由 Flink Runtime 管理,自動存儲,自動恢復,在內存管理上有優化;而 Raw State 需要用戶自己管理,需要自己序列化,Flink 不知道 State 中存入的數據是什么結構,只有用戶自己知道,需要最終序列化為可存儲的數據結構。
- 從狀態數據結構來說,Managed State 支持已知的數據結構,如 Value、List、Map 等。而 Raw State只支持字節數組 ,所有狀態都要轉換為二進制字節數組才可以。
- 從推薦使用場景來說,Managed State 大多數情況下均可使用,而 Raw State 是當 Managed State 不夠用時,比如需要自定義 Operator 時,才會使用 Raw State。
在實際生產中,都只推薦使用ManagedState,后續將圍繞該話題進行討論。
Keyed State & Operator State
Managed State 分為兩種,Keyed State 和 Operator State
(Raw State都是Operator State)
- Keyed State
在Flink Stream模型中,Datastream 經過 keyBy 的操作可以變為 KeyedStream。
Keyed State是基于KeyedStream上的狀態。這個狀態是跟特定的key綁定的,對KeyedStream流上的每一個key,都對應一個state,如stream.keyBy(…)
KeyBy之后的State,可以理解為分區過的State,每個并行keyed Operator的每個實例的每個key都有一個Keyed State,即<parallel-operator-instance,key>就是一個唯一的狀態,由于每個key屬于一個keyed Operator的并行實例,因此我們將其簡單的理解為<operator,key>
- Operator State
這里的fromElements會調用FromElementsFunction的類,其中就使用了類型為 list state 的 operator state
Operator State又稱為 non-keyed state,與Key無關的State,每一個 operator state 都僅與一個 operator 的實例綁定。
Operator State 可以用于所有算子,但一般常用于 Source
存儲State的數據結構/API介紹
前面說過有狀態計算其實就是需要考慮歷史數據
而歷史數據需要搞個地方存儲起來
Flink為了方便不同分類的State的存儲和管理,提供了如下的API/數據結構來存儲State!
Keyed State 通過 RuntimeContext 訪問,這需要 Operator 是一個RichFunction。
保存Keyed state的數據結構:
ValueState<T>:即類型為T的單值狀態。這個狀態與對應的key綁定,是最簡單的狀態了。它可以通過update方法更新狀態值,通過value()方法獲取狀態值,如求按用戶id統計用戶交易總額
ListState<T>:即key上的狀態值為一個列表。可以通過add方法往列表中附加值;也可以通過get()方法返回一個Iterable<T>來遍歷狀態值,如統計按用戶id統計用戶經常登錄的Ip
ReducingState<T>:這種狀態通過用戶傳入的reduceFunction,每次調用add方法添加值的時候,會調用reduceFunction,最后合并到一個單一的狀態值
MapState<UK, UV>:即狀態值為一個map。用戶通過put或putAll方法添加元素
需要注意的是,以上所述的State對象,僅僅用于與狀態進行交互(更新、刪除、清空等),而真正的狀態值,有可能是存在內存、磁盤、或者其他分布式存儲系統中。相當于我們只是持有了這個狀態的句柄
Operator State 需要自己實現 CheckpointedFunction 或 ListCheckpointed 接口。
保存Operator state的數據結構:
ListState<T>
BroadcastState<K,V>
舉例來說,Flink中的FlinkKafkaConsumer,就使用了operator state。它會在每個connector實例中,保存該實例中消費topic的所有(partition, offset)映射
總結
以上是生活随笔為你收集整理的2021年大数据Flink(二十五):Flink 状态管理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Flink(二十四):
- 下一篇: 2021年大数据Flink(二十六):