2021年大数据Flink(二十七):Flink 容错机制 Checkpoint
目錄
Flink 容錯機制?Checkpoint
State Vs Checkpoint
Checkpoint執行流程
簡單流程
復雜流程
State狀態后端/State存儲介質
MemStateBackend[了解]
FsStateBackend
RocksDBStateBackend
Checkpoint配置方式
全局配置
在代碼中配置
代碼演示
Flink 容錯機制?Checkpoint
State Vs Checkpoint
- State:
維護/存儲的是某一個Operator的運行的狀態/歷史值,是維護在內存中!
一般指一個具體的Operator的狀態(operator的狀態表示一些算子在運行的過程中會產生的一些歷史結果,如前面的maxBy底層會維護當前的最大值,也就是會維護一個keyedOperator,這個State里面存放就是maxBy這個Operator中的最大值)
State數據默認保存在Java的堆內存中/TaskManage節點的內存中
State可以被記錄,在失敗的情況下數據還可以恢復
- Checkpoint:
某一時刻,Flink中所有的Operator的當前State的全局快照,一般存在磁盤上
表示了一個Flink Job在一個特定時刻的一份全局狀態快照,即包含了所有Operator的狀態
可以理解為Checkpoint是把State數據定時持久化存儲了
比如KafkaConsumer算子中維護的Offset狀態,當任務重新恢復的時候可以從Checkpoint中獲取
- 注意:
Flink中的Checkpoint底層使用了Chandy-Lamport algorithm分布式快照算法可以保證數據的在分布式環境下的一致性!
分布式快照算法: Chandy-Lamport 算法 - 知乎
Chandy-Lamport algorithm算法的作者也是ZK中Paxos 一致性算法的作者
zookeeper學習系列:四、Paxos算法和zookeeper的關系 - 堅毅的刀刀 - 博客園
Flink中使用Chandy-Lamport algorithm分布式快照算法取得了成功,后續Spark的StructuredStreaming也借鑒了該算法
Checkpoint執行流程
簡單流程
- Flink的JobManager創建CheckpointCoordinator
- Coordinator向所有的SourceOperator發送Barrier柵欄(理解為執行Checkpoint的信號)
- SourceOperator接收到Barrier之后,暫停當前的操作(暫停的時間很短,因為后續的寫快照是異步的),并制作State快照, 然后將自己的快照保存到指定的介質中(如HDFS), 一切 ok之后向Coordinator匯報并將Barrier發送給下游的其他Operator
- 其他的如TransformationOperator接收到Barrier,重復第2步,最后將Barrier發送給Sink
- Sink接收到Barrier之后重復第2步
- Coordinator接收到所有的Operator的執行ok的匯報結果,認為本次快照執行成功
注意:
1.在往介質(如HDFS)中寫入快照數據的時候是異步的(為了提高效率)
2.分布式快照執行時的數據一致性由Chandy-Lamport algorithm分布式快照算法保證!
復雜流程
下圖左側是 Checkpoint Coordinator,是整個 Checkpoint 的發起者,中間是由兩個 source,一個 sink 組成的 Flink 作業,最右側的是持久化存儲,在大部分用戶場景中對應 HDFS。
1.Checkpoint Coordinator 向所有 source 節點 trigger Checkpoint。
2.source 節點向下游廣播 barrier,這個 barrier 就是實現 Chandy-Lamport 分布式快照算法的核心,下游的 task 只有收到所有 input 的 barrier 才會執行相應的 Checkpoint。
3.當 task 完成 state 備份后,會將備份數據的地址(state handle)通知給 Checkpoint coordinator。
4.下游的 sink 節點收集齊上游兩個 input 的 barrier 之后,會執行本地快照,(柵欄對齊)
這里還展示了 RocksDB incremental Checkpoint (增量Checkpoint)的流程,首先 RocksDB 會全量刷數據到磁盤上(紅色大三角表示),然后 Flink 框架會從中選擇沒有上傳的文件進行持久化備份(紫色小三角)。
5.同樣的,sink 節點在完成自己的 Checkpoint 之后,會將 state handle 返回通知 Coordinator。
6.最后,當 Checkpoint coordinator 收集齊所有 task 的 state handle,就認為這一次的 Checkpoint 全局完成了,向持久化存儲中再備份一個 Checkpoint meta 文件。
???????State狀態后端/State存儲介質
注意:
前面學習了Checkpoint其實就是Flink中某一時刻,所有的Operator的全局快照,
那么快照應該要有一個地方進行存儲,而這個存儲的地方叫做狀態后端
Flink中的State狀態后端有很多種:
MemStateBackend[了解]
第一種是內存存儲,即 MemoryStateBackend,構造方法是設置最大的StateSize,選擇是否做異步快照,
對于State狀態存儲在 TaskManager 節點也就是執行節點內存中的,因為內存有容量限制,所以單個 State maxStateSize 默認 5 M,且需要注意 maxStateSize <= akka.framesize 默認 10 M。
對于Checkpoint 存儲在 JobManager 內存中,因此總大小不超過 JobManager 的內存。
推薦使用的場景為:本地測試、幾乎無狀態的作業,比如 ETL、JobManager 不容易掛,或掛掉影響不大的情況。
不推薦在生產場景使用。
FsStateBackend
另一種就是在文件系統上的 FsStateBackend 構建方法是需要傳一個文件路徑和是否異步快照。
State 依然在 TaskManager 內存中,但不會像 MemoryStateBackend 是?5 M 的設置上限
Checkpoint 存儲在外部文件系統(本地或 HDFS),打破了總大小 Jobmanager 內存的限制。
推薦使用的場景為:常規使用狀態的作業、例如分鐘級窗口聚合或 join、需要開啟HA的作業。
如果使用HDFS,則初始化FsStateBackend時,需要傳入以 “hdfs://”開頭的路徑(即: new FsStateBackend("hdfs:///hacluster/checkpoint")),
如果使用本地文件,則需要傳入以“file://”開頭的路徑(即:new FsStateBackend("file:///Data"))。
在分布式情況下,不推薦使用本地文件。因為如果某個算子在節點A上失敗,在節點B上恢復,使用本地文件時,在B上無法讀取節點 A上的數據,導致狀態恢復失敗。
RocksDBStateBackend
還有一種存儲為 RocksDBStateBackend ,
RocksDB 是一個 key/value 的內存存儲系統,和其他的 key/value 一樣,先將狀態放到內存中,如果內存快滿時,則寫入到磁盤中,
但需要注意 RocksDB 不支持同步的 Checkpoint,構造方法中沒有同步快照這個選項。
不過 RocksDB 支持增量的 Checkpoint,意味著并不需要把所有 sst 文件上傳到 Checkpoint 目錄,僅需要上傳新生成的 sst 文件即可。它的 Checkpoint 存儲在外部文件系統(本地或HDFS),
其容量限制只要單個 TaskManager 上 State 總量不超過它的內存+磁盤,單 Key最大 2G,總大小不超過配置的文件系統容量即可。
推薦使用的場景為:超大狀態的作業,例如天級窗口聚合、需要開啟 HA 的作業、最好是對狀態讀寫性能要求不高的作業。
Checkpoint配置方式
全局配置
修改flink-conf.yaml
#這里可以配置
#jobmanager(即MemoryStateBackend),
#filesystem(即FsStateBackend),
#rocksdb(即RocksDBStateBackend)
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints
???????在代碼中配置
//1.MemoryStateBackend--開發中不用
????env.setStateBackend(new MemoryStateBackend)
//2.FsStateBackend--開發中可以使用--適合一般狀態--秒級/分鐘級窗口...
????env.setStateBackend(new FsStateBackend("hdfs路徑或測試時的本地路徑"))
//3.RocksDBStateBackend--開發中可以使用--適合超大狀態--天級窗口...
env.setStateBackend(new RocksDBStateBackend(filebackend, true))
注意:RocksDBStateBackend還需要引入依賴
????<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.11</artifactId><version>1.7.2</version></dependency>
???????代碼演示
package cn.it.checkpoint;import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;import java.util.Properties;/*** Author lanson* Desc 演示Checkpoint參數設置(也就是Checkpoint執行流程中的步驟0相關的參數設置)*/
public class CheckpointDemo01 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//===========Checkpoint參數設置====//===========類型1:必須參數=============//設置Checkpoint的時間間隔為1000ms做一次Checkpoint/其實就是每隔1000ms發一次Barrier!env.enableCheckpointing(1000);//設置State狀態存儲介質/*if(args.length > 0){env.setStateBackend(new FsStateBackend(args[0]));}else {env.setStateBackend(new FsStateBackend("file:///D:\\data\\ckp"));}*/if (SystemUtils.IS_OS_WINDOWS) {env.setStateBackend(new FsStateBackend("file:///D:\\data\\ckp"));} else {env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink-checkpoint/checkpoint"));}//===========類型2:建議參數===========//設置兩個Checkpoint 之間最少等待時間,如設置Checkpoint之間最少是要等?500ms(為了避免每隔1000ms做一次Checkpoint的時候,前一次太慢和后一次重疊到一起去了)//如:高速公路上,每隔1s關口放行一輛車,但是規定了兩車之前的最小車距為500menv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//默認是0//設置如果在做Checkpoint過程中出現錯誤,是否讓整體任務失敗:true是??false不是//env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//默認是trueenv.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//默認值為0,表示不容忍任何檢查點失敗//設置是否清理檢查點,表示?Cancel 時是否需要保留當前的?Checkpoint,默認?Checkpoint會在作業被Cancel時被刪除//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,當作業被取消時,刪除外部的checkpoint(默認值)//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,當作業被取消時,保留外部的checkpointenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//===========類型3:直接使用默認的即可===============//設置checkpoint的執行模式為EXACTLY_ONCE(默認)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//設置checkpoint的超時時間,如果?Checkpoint在?60s內尚未完成說明該次Checkpoint失敗,則丟棄。env.getCheckpointConfig().setCheckpointTimeout(60000);//默認10分鐘//設置同一時間有多少個checkpoint可以同時執行env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//默認為1//2.SourceDataStream<String> linesDS = env.socketTextStream("node1", 9999);//3.Transformation//3.1切割出每個單詞并直接記為1DataStream<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {//value就是每一行String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}});//3.2分組//注意:批處理的分組是groupBy,流處理的分組是keyByKeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOneDS.keyBy(t -> t.f0);//3.3聚合DataStream<Tuple2<String, Integer>> aggResult = groupedDS.sum(1);DataStream<String> result = (SingleOutputStreamOperator<String>) aggResult.map(new RichMapFunction<Tuple2<String, Integer>, String>() {@Overridepublic String map(Tuple2<String, Integer> value) throws Exception {return value.f0 + ":::" + value.f1;}});//4.sinkresult.print();Properties props = new Properties();props.setProperty("bootstrap.servers", "node1:9092");FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka", new SimpleStringSchema(), props);result.addSink(kafkaSink);//5.executeenv.execute();// /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka}
}
總結
以上是生活随笔為你收集整理的2021年大数据Flink(二十七):Flink 容错机制 Checkpoint的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Flink(二十六):
- 下一篇: 2021年大数据Flink(二十八):F