日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Flink(二十七):Flink 容错机制 Checkpoint

發布時間:2023/11/28 生活经验 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Flink(二十七):Flink 容错机制 Checkpoint 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

Flink 容錯機制?Checkpoint

State Vs Checkpoint

Checkpoint執行流程

簡單流程

復雜流程

State狀態后端/State存儲介質

MemStateBackend[了解]

FsStateBackend

RocksDBStateBackend

Checkpoint配置方式

全局配置

在代碼中配置

代碼演示


Flink 容錯機制?Checkpoint

State Vs Checkpoint

  • State:

維護/存儲的是某一個Operator的運行的狀態/歷史值,是維護在內存中!

一般指一個具體的Operator的狀態(operator的狀態表示一些算子在運行的過程中會產生的一些歷史結果,如前面的maxBy底層會維護當前的最大值,也就是會維護一個keyedOperator,這個State里面存放就是maxBy這個Operator中的最大值)

State數據默認保存在Java的堆內存中/TaskManage節點的內存中

State可以被記錄,在失敗的情況下數據還可以恢復

  • Checkpoint:

某一時刻,Flink中所有的Operator的當前State的全局快照,一般存在磁盤上

表示了一個Flink Job在一個特定時刻的一份全局狀態快照,即包含了所有Operator的狀態

可以理解為Checkpoint是把State數據定時持久化存儲了

比如KafkaConsumer算子中維護的Offset狀態,當任務重新恢復的時候可以從Checkpoint中獲取

  • 注意:

Flink中的Checkpoint底層使用了Chandy-Lamport algorithm分布式快照算法可以保證數據的在分布式環境下的一致性!

分布式快照算法: Chandy-Lamport 算法 - 知乎

Chandy-Lamport algorithm算法的作者也是ZK中Paxos 一致性算法的作者

zookeeper學習系列:四、Paxos算法和zookeeper的關系 - 堅毅的刀刀 - 博客園

Flink中使用Chandy-Lamport algorithm分布式快照算法取得了成功,后續Spark的StructuredStreaming也借鑒了該算法

Checkpoint執行流程

簡單流程

  1. Flink的JobManager創建CheckpointCoordinator
  2. Coordinator向所有的SourceOperator發送Barrier柵欄(理解為執行Checkpoint的信號)
  3. SourceOperator接收到Barrier之后,暫停當前的操作(暫停的時間很短,因為后續的寫快照是異步的),并制作State快照, 然后將自己的快照保存到指定的介質中(如HDFS), 一切 ok之后向Coordinator匯報并將Barrier發送給下游的其他Operator
  4. 其他的如TransformationOperator接收到Barrier,重復第2步,最后將Barrier發送給Sink
  5. Sink接收到Barrier之后重復第2步
  6. Coordinator接收到所有的Operator的執行ok的匯報結果,認為本次快照執行成功

注意:

1.在往介質(如HDFS)中寫入快照數據的時候是異步的(為了提高效率)

2.分布式快照執行時的數據一致性由Chandy-Lamport algorithm分布式快照算法保證!

復雜流程

下圖左側是 Checkpoint Coordinator,是整個 Checkpoint 的發起者,中間是由兩個 source,一個 sink 組成的 Flink 作業,最右側的是持久化存儲,在大部分用戶場景中對應 HDFS。

1.Checkpoint Coordinator 向所有 source 節點 trigger Checkpoint。

2.source 節點向下游廣播 barrier,這個 barrier 就是實現 Chandy-Lamport 分布式快照算法的核心,下游的 task 只有收到所有 input 的 barrier 才會執行相應的 Checkpoint。

3.當 task 完成 state 備份后,會將備份數據的地址(state handle)通知給 Checkpoint coordinator。

4.下游的 sink 節點收集齊上游兩個 input 的 barrier 之后,會執行本地快照,(柵欄對齊)

這里還展示了 RocksDB incremental Checkpoint (增量Checkpoint)的流程,首先 RocksDB 會全量刷數據到磁盤上(紅色大三角表示),然后 Flink 框架會從中選擇沒有上傳的文件進行持久化備份(紫色小三角)。

5.同樣的,sink 節點在完成自己的 Checkpoint 之后,會將 state handle 返回通知 Coordinator。

6.最后,當 Checkpoint coordinator 收集齊所有 task 的 state handle,就認為這一次的 Checkpoint 全局完成了,向持久化存儲中再備份一個 Checkpoint meta 文件。

???????State狀態后端/State存儲介質

注意:

前面學習了Checkpoint其實就是Flink中某一時刻,所有的Operator的全局快照,

那么快照應該要有一個地方進行存儲,而這個存儲的地方叫做狀態后端

Flink中的State狀態后端有很多種:

MemStateBackend[了解]

第一種是內存存儲,即 MemoryStateBackend,構造方法是設置最大的StateSize,選擇是否做異步快照,

對于State狀態存儲在 TaskManager 節點也就是執行節點內存中的,因為內存有容量限制,所以單個 State maxStateSize 默認 5 M,且需要注意 maxStateSize <= akka.framesize 默認 10 M。

對于Checkpoint 存儲在 JobManager 內存中,因此總大小不超過 JobManager 的內存。

推薦使用的場景為:本地測試、幾乎無狀態的作業,比如 ETL、JobManager 不容易掛,或掛掉影響不大的情況。

不推薦在生產場景使用。

FsStateBackend

另一種就是在文件系統上的 FsStateBackend 構建方法是需要傳一個文件路徑和是否異步快照。

State 依然在 TaskManager 內存中,但不會像 MemoryStateBackend 是?5 M 的設置上限

Checkpoint 存儲在外部文件系統(本地或 HDFS),打破了總大小 Jobmanager 內存的限制。

推薦使用的場景為:常規使用狀態的作業、例如分鐘級窗口聚合或 join、需要開啟HA的作業。

如果使用HDFS,則初始化FsStateBackend時,需要傳入以 “hdfs://”開頭的路徑(即: new FsStateBackend("hdfs:///hacluster/checkpoint")),

如果使用本地文件,則需要傳入以“file://”開頭的路徑(即:new FsStateBackend("file:///Data"))。

在分布式情況下,不推薦使用本地文件。因為如果某個算子在節點A上失敗,在節點B上恢復,使用本地文件時,在B上無法讀取節點 A上的數據,導致狀態恢復失敗。

RocksDBStateBackend

還有一種存儲為 RocksDBStateBackend ,

RocksDB 是一個 key/value 的內存存儲系統,和其他的 key/value 一樣,先將狀態放到內存中,如果內存快滿時,則寫入到磁盤中,

但需要注意 RocksDB 不支持同步的 Checkpoint,構造方法中沒有同步快照這個選項。

不過 RocksDB 支持增量的 Checkpoint,意味著并不需要把所有 sst 文件上傳到 Checkpoint 目錄,僅需要上傳新生成的 sst 文件即可。它的 Checkpoint 存儲在外部文件系統(本地或HDFS),

其容量限制只要單個 TaskManager 上 State 總量不超過它的內存+磁盤,單 Key最大 2G,總大小不超過配置的文件系統容量即可。

推薦使用的場景為:超大狀態的作業,例如天級窗口聚合、需要開啟 HA 的作業、最好是對狀態讀寫性能要求不高的作業。

Checkpoint配置方式

全局配置

修改flink-conf.yaml

#這里可以配置

#jobmanager(即MemoryStateBackend),

#filesystem(即FsStateBackend),

#rocksdb(即RocksDBStateBackend)

state.backend: filesystem

state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints

???????在代碼中配置

//1.MemoryStateBackend--開發中不用

????env.setStateBackend(new MemoryStateBackend)

//2.FsStateBackend--開發中可以使用--適合一般狀態--秒級/分鐘級窗口...

????env.setStateBackend(new FsStateBackend("hdfs路徑或測試時的本地路徑"))

//3.RocksDBStateBackend--開發中可以使用--適合超大狀態--天級窗口...

env.setStateBackend(new RocksDBStateBackend(filebackend, true))

注意:RocksDBStateBackend還需要引入依賴

????<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.11</artifactId><version>1.7.2</version></dependency>

???????代碼演示

package cn.it.checkpoint;import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;import java.util.Properties;/*** Author lanson* Desc 演示Checkpoint參數設置(也就是Checkpoint執行流程中的步驟0相關的參數設置)*/
public class CheckpointDemo01 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//===========Checkpoint參數設置====//===========類型1:必須參數=============//設置Checkpoint的時間間隔為1000ms做一次Checkpoint/其實就是每隔1000ms發一次Barrier!env.enableCheckpointing(1000);//設置State狀態存儲介質/*if(args.length > 0){env.setStateBackend(new FsStateBackend(args[0]));}else {env.setStateBackend(new FsStateBackend("file:///D:\\data\\ckp"));}*/if (SystemUtils.IS_OS_WINDOWS) {env.setStateBackend(new FsStateBackend("file:///D:\\data\\ckp"));} else {env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink-checkpoint/checkpoint"));}//===========類型2:建議參數===========//設置兩個Checkpoint 之間最少等待時間,如設置Checkpoint之間最少是要等?500ms(為了避免每隔1000ms做一次Checkpoint的時候,前一次太慢和后一次重疊到一起去了)//如:高速公路上,每隔1s關口放行一輛車,但是規定了兩車之前的最小車距為500menv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//默認是0//設置如果在做Checkpoint過程中出現錯誤,是否讓整體任務失敗:true是??false不是//env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//默認是trueenv.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//默認值為0,表示不容忍任何檢查點失敗//設置是否清理檢查點,表示?Cancel 時是否需要保留當前的?Checkpoint,默認?Checkpoint會在作業被Cancel時被刪除//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,當作業被取消時,刪除外部的checkpoint(默認值)//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,當作業被取消時,保留外部的checkpointenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//===========類型3:直接使用默認的即可===============//設置checkpoint的執行模式為EXACTLY_ONCE(默認)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//設置checkpoint的超時時間,如果?Checkpoint在?60s內尚未完成說明該次Checkpoint失敗,則丟棄。env.getCheckpointConfig().setCheckpointTimeout(60000);//默認10分鐘//設置同一時間有多少個checkpoint可以同時執行env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//默認為1//2.SourceDataStream<String> linesDS = env.socketTextStream("node1", 9999);//3.Transformation//3.1切割出每個單詞并直接記為1DataStream<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {//value就是每一行String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}});//3.2分組//注意:批處理的分組是groupBy,流處理的分組是keyByKeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOneDS.keyBy(t -> t.f0);//3.3聚合DataStream<Tuple2<String, Integer>> aggResult = groupedDS.sum(1);DataStream<String> result = (SingleOutputStreamOperator<String>) aggResult.map(new RichMapFunction<Tuple2<String, Integer>, String>() {@Overridepublic String map(Tuple2<String, Integer> value) throws Exception {return value.f0 + ":::" + value.f1;}});//4.sinkresult.print();Properties props = new Properties();props.setProperty("bootstrap.servers", "node1:9092");FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka", new SimpleStringSchema(), props);result.addSink(kafkaSink);//5.executeenv.execute();// /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka}
}

總結

以上是生活随笔為你收集整理的2021年大数据Flink(二十七):Flink 容错机制 Checkpoint的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。