Flink之状态之状态存储 state backends
流計(jì)算中可能有各種方式來保存狀態(tài):
- 窗口操作
- 使用 了KV操作的函數(shù)
- 繼承了CheckpointedFunction的函數(shù)
當(dāng)開始做checkpointing的時(shí)候,狀態(tài)會(huì)被持久化到checkpoints里來規(guī)避數(shù)據(jù)丟失和狀態(tài)恢復(fù)。選擇的狀態(tài)存儲(chǔ)策略不同,會(huì)導(dǎo)致狀態(tài)持久化如何和checkpoints交互。
1.可用的狀態(tài)持久化策略
Flink提供了三種持久化策略,如果沒有顯式指定,則默認(rèn)使用MemoryStateBackend。
The MemoryStateBackend
將數(shù)據(jù)保存在java的堆里,kv狀態(tài)或者window operator用hash table來保存values,triggers等等。
當(dāng)進(jìn)行checkpoints的時(shí)候,這種策略會(huì)對(duì)狀態(tài)做快照,然后將快照作為checkpoint acknowledgement的一部分發(fā)送給JobManager,JM也將其保存在堆中。
MemoryStateBackend可以使用異步的方式進(jìn)行快照,我們也鼓勵(lì)使用異步的方式,避免阻塞,現(xiàn)在默認(rèn)就是異步。如果不希望異步,可以在構(gòu)造的時(shí)候傳入false,如下:
new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);限制:
- 單次狀態(tài)大小最大默認(rèn)被限制為5MB,這個(gè)值可以通過構(gòu)造函數(shù)來更改。
- 無論單次狀態(tài)大小最大被限制為多少,都不可用大過akka的frame大小。
- 聚合的狀態(tài)都會(huì)寫入JM的內(nèi)存。
適合:
- 本地開發(fā)和調(diào)試。
- 狀態(tài)比較少的作業(yè)
The FsStateBackend
FsStateBackend?通過文件系統(tǒng)的URL來設(shè)置,比如“hdfs://namenode:40010/flink/checkpoints”或者“file:///data/flink/checkpoints”。
保持?jǐn)?shù)據(jù)在TM的內(nèi)存中,當(dāng)做checkpointing的時(shí)候,會(huì)將狀態(tài)快照寫入文件,保存在文件系統(tǒng)或本地目錄。少量的元數(shù)據(jù)會(huì)保存在JM的內(nèi)存中。
默認(rèn)使用異步的方式進(jìn)行快照,同樣,取消異步需要傳遞false:
new FsStateBackend(path, false);適用:
- 狀態(tài)比較大,窗口比較長,大的KV狀態(tài)
- 需要做HA的場景
The RocksDBStateBackend
RocksDBStateBackend?通過文件系統(tǒng)的URL來設(shè)置,例如“hdfs://namenode:40010/flink/checkpoints”或者“file:///data/flink/checkpoints”。
保存數(shù)據(jù)在一個(gè)叫做RocksDB的數(shù)據(jù)庫中,這個(gè)數(shù)據(jù)庫保存在TM的數(shù)據(jù)目錄中。當(dāng)做checkpointing時(shí),整個(gè)數(shù)據(jù)庫會(huì)被寫入文件系統(tǒng)和目錄。少量的元信息會(huì)保存在JM的內(nèi)存中。
這種策略只支持異步快照。
限制:
- 由于依賴于字節(jié)數(shù)組,支持的key和value的大小最大為2^31字節(jié)。對(duì)于使用Merge操作的狀態(tài),大小很可能就默默的超過了這個(gè)限制,下次獲取就會(huì)失敗。
適合:
- 非常大的狀態(tài),長窗口,大的KV狀態(tài)
- 需要HA的場景
能夠持有的狀態(tài)的多少只取決于可使用的磁盤大小,這會(huì)允許使用非常大的狀態(tài),相比較FsStateBackend將狀態(tài)保存在內(nèi)存中。但這也同時(shí)意味著,這個(gè)策略的吞吐量會(huì)受限。
RocksDBStateBackend是目前唯一支持incremental的checkpoints的策略。
2.配置狀態(tài)持久化策略
如果你沒有指定任何策略,默認(rèn)使用JM作為存儲(chǔ)策略。如果你想更改,可以在flink-conf.yaml中變更,存儲(chǔ)策略也可以在作業(yè)中單獨(dú)設(shè)定。
Setting the Per-job State Backend
可以在StreamExecutionEnvironment中指定:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));Setting Default State Backend
默認(rèn)的狀態(tài)存儲(chǔ)策略通過在flink-conf.yaml中通過state.backend來指定,有如下一些可選:
- jobmanager?(MemoryStateBackend)
- filesystem?(FsStateBackend)
- rocksdb?(RocksDBStateBackend)
也可以以全路徑來指定,比如org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory?來代替 RocksDBStateBackend,不過,何必了。
state.checkpoints.dir這個(gè)參數(shù)來指定所有的checkpoints數(shù)據(jù)和元數(shù)據(jù)存儲(chǔ)的位置。示例如下:
# The backend that will be used to store operator state checkpointsstate.backend: filesystem# Directory for storing checkpointsstate.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints?
轉(zhuǎn)載于:https://www.cnblogs.com/029zz010buct/p/9403283.html
總結(jié)
以上是生活随笔為你收集整理的Flink之状态之状态存储 state backends的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 怀孕梦到黑色眼镜蛇是什么意思
- 下一篇: ES6之路第十三篇:Iterator和f