日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flink之状态之状态存储 state backends

發(fā)布時(shí)間:2023/11/29 编程问答 67 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink之状态之状态存储 state backends 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

流計(jì)算中可能有各種方式來保存狀態(tài):

  • 窗口操作
  • 使用 了KV操作的函數(shù)
  • 繼承了CheckpointedFunction的函數(shù)

當(dāng)開始做checkpointing的時(shí)候,狀態(tài)會(huì)被持久化到checkpoints里來規(guī)避數(shù)據(jù)丟失和狀態(tài)恢復(fù)。選擇的狀態(tài)存儲(chǔ)策略不同,會(huì)導(dǎo)致狀態(tài)持久化如何和checkpoints交互。

1.可用的狀態(tài)持久化策略

Flink提供了三種持久化策略,如果沒有顯式指定,則默認(rèn)使用MemoryStateBackend。

The MemoryStateBackend

將數(shù)據(jù)保存在java的堆里,kv狀態(tài)或者window operator用hash table來保存values,triggers等等。

當(dāng)進(jìn)行checkpoints的時(shí)候,這種策略會(huì)對(duì)狀態(tài)做快照,然后將快照作為checkpoint acknowledgement的一部分發(fā)送給JobManager,JM也將其保存在堆中。

MemoryStateBackend可以使用異步的方式進(jìn)行快照,我們也鼓勵(lì)使用異步的方式,避免阻塞,現(xiàn)在默認(rèn)就是異步。如果不希望異步,可以在構(gòu)造的時(shí)候傳入false,如下:

new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);

限制:

  • 單次狀態(tài)大小最大默認(rèn)被限制為5MB,這個(gè)值可以通過構(gòu)造函數(shù)來更改。
  • 無論單次狀態(tài)大小最大被限制為多少,都不可用大過akka的frame大小。
  • 聚合的狀態(tài)都會(huì)寫入JM的內(nèi)存。

適合:

  • 本地開發(fā)和調(diào)試。
  • 狀態(tài)比較少的作業(yè)

The FsStateBackend

FsStateBackend?通過文件系統(tǒng)的URL來設(shè)置,比如“hdfs://namenode:40010/flink/checkpoints”或者“file:///data/flink/checkpoints”。

保持?jǐn)?shù)據(jù)在TM的內(nèi)存中,當(dāng)做checkpointing的時(shí)候,會(huì)將狀態(tài)快照寫入文件,保存在文件系統(tǒng)或本地目錄。少量的元數(shù)據(jù)會(huì)保存在JM的內(nèi)存中。

默認(rèn)使用異步的方式進(jìn)行快照,同樣,取消異步需要傳遞false:

new FsStateBackend(path, false);

適用:

  • 狀態(tài)比較大,窗口比較長,大的KV狀態(tài)
  • 需要做HA的場景

The RocksDBStateBackend

RocksDBStateBackend?通過文件系統(tǒng)的URL來設(shè)置,例如“hdfs://namenode:40010/flink/checkpoints”或者“file:///data/flink/checkpoints”。

保存數(shù)據(jù)在一個(gè)叫做RocksDB的數(shù)據(jù)庫中,這個(gè)數(shù)據(jù)庫保存在TM的數(shù)據(jù)目錄中。當(dāng)做checkpointing時(shí),整個(gè)數(shù)據(jù)庫會(huì)被寫入文件系統(tǒng)和目錄。少量的元信息會(huì)保存在JM的內(nèi)存中。

這種策略只支持異步快照。

限制:

  • 由于依賴于字節(jié)數(shù)組,支持的key和value的大小最大為2^31字節(jié)。對(duì)于使用Merge操作的狀態(tài),大小很可能就默默的超過了這個(gè)限制,下次獲取就會(huì)失敗。

適合:

  • 非常大的狀態(tài),長窗口,大的KV狀態(tài)
  • 需要HA的場景

能夠持有的狀態(tài)的多少只取決于可使用的磁盤大小,這會(huì)允許使用非常大的狀態(tài),相比較FsStateBackend將狀態(tài)保存在內(nèi)存中。但這也同時(shí)意味著,這個(gè)策略的吞吐量會(huì)受限。

RocksDBStateBackend是目前唯一支持incremental的checkpoints的策略。

2.配置狀態(tài)持久化策略

如果你沒有指定任何策略,默認(rèn)使用JM作為存儲(chǔ)策略。如果你想更改,可以在flink-conf.yaml中變更,存儲(chǔ)策略也可以在作業(yè)中單獨(dú)設(shè)定。

Setting the Per-job State Backend

可以在StreamExecutionEnvironment中指定:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

Setting Default State Backend

默認(rèn)的狀態(tài)存儲(chǔ)策略通過在flink-conf.yaml中通過state.backend來指定,有如下一些可選:

  • jobmanager?(MemoryStateBackend)
  • filesystem?(FsStateBackend)
  • rocksdb?(RocksDBStateBackend)

也可以以全路徑來指定,比如org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory?來代替 RocksDBStateBackend,不過,何必了。

state.checkpoints.dir這個(gè)參數(shù)來指定所有的checkpoints數(shù)據(jù)和元數(shù)據(jù)存儲(chǔ)的位置。示例如下:

# The backend that will be used to store operator state checkpointsstate.backend: filesystem# Directory for storing checkpointsstate.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

?

轉(zhuǎn)載于:https://www.cnblogs.com/029zz010buct/p/9403283.html

總結(jié)

以上是生活随笔為你收集整理的Flink之状态之状态存储 state backends的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。