Flink中的状态管理
1 Flink中的狀態
??當數據流中的許多操作只查看一個每次事件(如事件解析器),一些操作會跨多個事件的信息(如窗口操作)。這些操作稱為有狀態。狀態由一個任務維護,并且用來計算某個結果的所有數據,都屬于這個任務的狀態。可以簡單的任務狀態就是一個本地變量,可以被任務的業務邏輯訪問。
??有些算子有些任務是沒有狀態的,如map操作,只跟輸入數據有關。像窗口操作不管是增量窗口函數還是全窗口函數都要保持里面的信息的,一開始在窗口到達結束時間之前是不輸出數據的,所以最后輸出數據的時候,他的計算是要依賴之前的,全窗口可以認為是把所有數據都作為狀態保存下來。增量聚合窗口來一個聚合一次要保存的是中間聚合狀態。像ProcessFunction可以有狀態也可以沒有狀態。
??無狀態流處理和有狀態流處理的主要區別:無狀態流處理分別接收每條輸入數據,根據最新輸入的數據生成輸出數據;有狀態流處理會維護狀態,根據每條輸入記錄進行更新,并基于最新輸入的記錄和當前的狀態值生成輸出記錄,即綜合考慮多個事件之后的結果。
需要狀態操作的一些例子如下:
- 應用程序搜索某些事件模式時,狀態將存儲迄今遇到的事件序列。
- 每分鐘/小時/天聚合事件時,將狀態保存掛起的聚合。
- 在數據流上訓練機器學習模型時,狀態保存模型參數的當前版本。
- 需要管理歷史數據時,狀態允許有效訪問過去發生的事件。
2 狀態類型
??每個狀態都是當前任務去管理維護,每個狀態都是和當前算子關聯在一起的,如果需要Flink真正的把他管理起來的話在運行時的時候Flink就必須要知道當前狀態定義的類型是什么,所以一開始必須注冊對應的狀態,要有所謂的描述器。Flink有兩種基本的狀態:Operator State算子狀態和Keyed State鍵控狀態,他們的主要區別就是作用范圍不一樣,算子狀態的作用范圍就是限定為算子任務(也就是當前一個分區執行的時候,所有數據來了都能訪問到狀態)。鍵控狀態中并不是當前分區所有的數據都能訪問所有的狀態,而是按照keyby之后的key做劃分,當前key只能訪問自己的狀態
2.1 Operator State
??每個算子狀態綁定到一個并行算子實例,作用范圍限定為算子任務,同一并行任務的狀態是共享的,并行處理的所有數據都可以訪問到相同的狀態。Kafka Connector就是使用算子狀態的很好的一個例子,Kafka consumer的每個并行實例都維護一個主題分區和偏移,作為算子狀態。當并行性發生變化時,算子狀態接口支持在并行運算符實例之間重新分配狀態。可以有不同的方案來進行這種再分配。
??因為同一個并行任務處理的所有數據都可以訪問到當前的狀態,所以就相當于本地變量
??算子狀態有3種基本數據結構:①列表狀態(List state):狀態表示為一組數據的列表②聯合列表狀態(Union list state):也將狀態表示為數據的列表。它與常規列表狀態的區別在于,在發生故障時,或者從保存點(savepoint)啟動應用程序時如何恢復。③廣播狀態(Broadcast state):如果一個算子有多項任務,而它的每項任務狀態又都相同,那么這種特殊情況最適合應用廣播狀態。那就可以訪問到別的并行子任務的狀態。
??算子狀態運用的時候可能應用場景沒那么多,一般都是keyby之后根據不同的key做分區討論。如果所有數據來了全部統一處理的話一般還要劃分成不同的狀態要保存為鏈表,并行度調整的時候可以根據這個列表拆開,做進一步調整。
??聯合列表狀態與列表狀態的區別:主要是并行度調整狀態怎樣重新分配,列表狀態本身分配的時候直接分配;聯合列表狀態的話就是把所有元素都聯合起來,然后由每個任務自己定義最后留下哪些,也就是自己截取要哪一部分。
2.2 Keyed State
??Keyed State只能在KeyedStream后使用,鍵控狀態總是相對于鍵,根據鍵來維護和訪問的
??Keyed State很類似于一個分布式的key-value map數據結構,只能用于KeyedStream(keyBy算子處理之后)。鍵控狀態基于每個key去管理,一般keyby進行HashCode重分區后基于它自己獨享的內存空間就會針對每一個不同的key分別保存一份獨立的存儲狀態,而且接下來來了一個新的數據只能訪問自己的狀態,不能訪問其他key的,Flink會為每一個key維護一個狀態。
??Flink的Keyed State支持的數據類型如下:
| 1 | ValueState[T] | 用來保存單個的值 | ValueState.update(value: T) ValueState.value() |
| 2 | ListState[T] | 保存一個列表 | ListState.add(value: T) ListState.addAll(values: java.util.List[T]) ListState.update(values: java.util.List[T]) ListState.get()(注意:返回的是Iterable[T]) |
| 3 | MapState[K, V] | 保存Key-Value對 | MapState.get(key: K) MapState.put(key: K, value: V) MapState.contains(key: K) MapState.remove(key: K) |
| 4 | ReducingState[T] | 保留一個值,該值表示添加到狀態的所有值的匯總,需要用戶提供ReduceFunction | ReducingState.add(value: T) ReducingState.get() |
| 5 | AggregatingState[I, O] | 保留一個值,該值表示添加到狀態的所有值的匯總,需要用戶提供AggregateFunction | AggregatingState.add(value: T) AggregatingState.get() |
| 6 | FoldingState<T, ACC> | 保留一個值,該值表示添加到狀態的所有值的匯總,需要用戶提供FoldFunction | AggregatingState.add(value: T) AggregatingState.get() |
??每個狀態都有clear()是清空操作。
??在進行狀態編程時需要通過RuntimeContext注冊StateDescriptor。StateDescriptor以狀態state的名字和存儲的數據類型為參數。案例如下:
class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {private var sum: ValueState[(Long, Long)] = _override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {// access the state valueval tmpCurrentSum = sum.value// If it hasn't been used before, it will be nullval currentSum = if (tmpCurrentSum != null) {tmpCurrentSum} else {(0L, 0L)}// update the countval newSum = (currentSum._1 + 1, currentSum._2 + input._2)// update the statesum.update(newSum)// if the count reaches 2, emit the average and clear the stateif (newSum._1 >= 2) {out.collect((input._1, newSum._2 / newSum._1))sum.clear()}}override def open(parameters: Configuration): Unit = {sum = getRuntimeContext.getState(new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)]))} }object ExampleCountWindowAverage extends App {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.fromCollection(List((1L, 3L),(1L, 5L),(1L, 7L),(1L, 4L),(1L, 2L))).keyBy(_._1).flatMap(new CountWindowAverage()).print()// the printed output will be (1,4) and (1,5)env.execute("ExampleManagedState") }聲明狀態操作為:
sum = getRuntimeContext.getState(new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)]))讀取狀態為:
val tmpCurrentSum = sum.value??更新狀態為:
sum.update(newSum)3 狀態后端
??Flink提供不同的State Backends狀態后端,指定如何和在何處存儲狀態。
??(1)MemoryStateBackend
??狀將鍵控狀態作為內存中的對象進行管理,將它們存儲在TaskManager的JVM堆上,將checkpoint存儲在JobManager的內存中
??(2)FsStateBackend
??本地狀態存在TaskManager的JVM堆上,checkpoint存到遠程的持久化文件系統(FileSystem)上
??(3)RocksDBStateBackend
??將所有狀態序列化后,存入本地的RocksDB中存儲。
??設置狀態后端如下:
val env = StreamExecutionEnvironment.getExecutionEnvironment() //val checkpointPath: String = checkpoint_Path //val backend = new RocksDBStateBackend(checkpointPath) //env.setStateBackend(backend)env.setStateBackend(new FsStateBackend(YOUR_PATH)) env.enableCheckpointing(1000) // 配置重啟策略 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(60, Time.of(10, TimeUnit.SECONDS)))總結
以上是生活随笔為你收集整理的Flink中的状态管理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Educational Codeforc
- 下一篇: R语言的安装与配置