Flink Checkpoint 参数详解
Flink Checkpoint 參數(shù)詳解
什么是 checkpoint
保存狀態(tài)
Checkpoint 參數(shù)詳解
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 60s 做一次 checkpoint
env.enableCheckpointing(60000);
// 高級(jí)配置:
// checkpoint 語義設(shè)置為 EXACTLY_ONCE,這是默認(rèn)語義
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 兩次 checkpoint 的間隔時(shí)間至少為 1 s,默認(rèn)是 0,立即進(jìn)行下一次 checkpoint
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
// checkpoint 必須在 60s 內(nèi)結(jié)束,否則被丟棄,默認(rèn)是 10 分鐘
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一時(shí)間只能允許有一個(gè) checkpoint
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 最多允許 checkpoint 失敗 3 次
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
// 當(dāng) Flink 任務(wù)取消時(shí),保留外部保存的 checkpoint 信息
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 當(dāng)有較新的 Savepoint 時(shí),作業(yè)也會(huì)從 Checkpoint 處恢復(fù)
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
// 允許實(shí)驗(yàn)性的功能:非對(duì)齊的 checkpoint,以提升性能
env.getCheckpointConfig().enableUnalignedCheckpoints();
相關(guān)參數(shù)的文字描述:
env.enableCheckpointing(60000),1 分鐘觸發(fā)一次 checkpoint;
setCheckpointTimeout,checkpoint 超時(shí)時(shí)間,默認(rèn)是 10 分鐘超時(shí),超過了超時(shí)時(shí)間就會(huì)被丟棄;
setCheckpointingMode,設(shè)置 checkpoint 語義,可以設(shè)置為 EXACTLY_ONCE,表示既不重復(fù)消費(fèi)也不丟數(shù)據(jù);AT_LEAST_ONCE,表示至少消費(fèi)一次,可能會(huì)重復(fù)消費(fèi);
setMinPauseBetweenCheckpoints,兩次 checkpoint 之間的間隔時(shí)間。假如設(shè)置每分鐘進(jìn)行一次 checkpoint,兩次 checkpoint 間隔時(shí)間為 30s。假設(shè)某一次 checkpoint 耗時(shí) 40s,那么理論上20s 后就要進(jìn)行一次 checkpoint,但是設(shè)置了兩次 checkpoint 之間的間隔時(shí)間為 30s,所以是 30s 之后才會(huì)進(jìn)行 checkpoint。另外,如果配置了該參數(shù),那么同時(shí)進(jìn)行的 checkpoint 數(shù)量只能為 1;
enableExternalizedCheckpoints,F(xiàn)link 任務(wù)取消后,外部 checkpoint 信息是否被清理。
DELETE_ON_CANCELLATION,任務(wù)取消后,所有的 checkpoint 都將會(huì)被清理。只有在任務(wù)失敗后,才會(huì)被保留;
RETAIN_ON_CANCELLATION,任務(wù)取消后,所有的 checkpoint 都將會(huì)被保留,需要手工清理。
setPreferCheckpointForRecovery,恢復(fù)任務(wù)時(shí),是否從最近一個(gè)比較新的 savepoint 處恢復(fù),默認(rèn)是 false;
enableUnalignedCheckpoints,是否開啟試驗(yàn)性的非對(duì)齊的 checkpoint,可以在反壓情況下極大減少 checkpoint 的次數(shù);
Flink 1.11 對(duì) Checkpoint 的優(yōu)化
在以前,在進(jìn)行對(duì)齊的過程中,算子是不會(huì)再接著處理數(shù)據(jù)了,一定要等到對(duì)齊動(dòng)作完成之后,才能繼續(xù)對(duì)齊
在 Flink 1.11 版本中,引入了一個(gè) Unaligned Checkpointing 的模塊,主要功能是,在 barrier 到達(dá)之后,不必等待所有的輸入流的 barrier,而是繼續(xù)處理數(shù)據(jù)
總結(jié)
以上是生活随笔為你收集整理的Flink Checkpoint 参数详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ubuntu>雷鸟只能收邮件不能发
- 下一篇: ubuntu20.04 域名解析暂时失败