Flink State 最佳实践
本文主要分享與交流 Flink 狀態使用過程中的一些經驗與心得,當然標題取了“最佳實踐”之名,希望文章內容能給讀者帶去一些干貨。本文內容首先是回顧 state 相關概念,并認識和區別不同的 state backend;之后將分別對 state 使用訪問以及 checkpoint 容錯相關內容進行詳細講解,分享一些經驗和心得。
State 概念回顧
我們先回顧一下到底什么是 state,流式計算的數據往往是轉瞬即逝, 當然,真實業務場景不可能說所有的數據都是進來之后就走掉,沒有任何東西留下來,那么留下來的東西其實就是稱之為 state,中文可以翻譯成狀態。
在下面這個圖中,我們的所有的原始數據進入用戶代碼之后再輸出到下游,如果中間涉及到 state 的讀寫,這些狀態會存儲在本地的 state backend(可以對標成嵌入式本地 kv 存儲)當中。
接下來我們會在四個維度來區分兩種不同的 state:operator state 以及 keyed state。
1. 是否存在當前處理的 key(current key):operator state 是沒有當前 key 的概念,而 keyed state 的數值總是與一個 current key 對應。
2. 存儲對象是否 on heap: 目前 operator state backend 僅有一種 on-heap 的實現;而 keyed state backend 有 on-heap 和 off-heap(RocksDB)的多種實現。
3. 是否需要手動聲明快照(snapshot)和恢復 (restore) 方法:operator state 需要手動實現 snapshot 和 restore 方法;而 keyed state 則由 backend 自行實現,對用戶透明。
4. 數據大小:一般而言,我們認為 operator state 的數據規模是比較小的;認為 keyed state 規模是相對比較大的。需要注意的是,這是一個經驗判斷,不是一個絕對的判斷區分標準。
StateBackend 的分類
下面這張圖對目前廣泛使用的三類 state backend 做了區分,其中綠色表示所創建的operator/keyed state backend 是 on-heap 的,黃色則表示是 off-heap 的。
一般而言,在生產中,我們會在 FsStateBackend 和 RocksDBStateBackend 間選擇:
- FsStateBackend:性能更好;日常存儲是在堆內存中,面臨著 OOM 的風險,不支持增量 checkpoint。
- RocksDBStateBackend:無需擔心 OOM 風險,是大部分時候的選擇。
RocksDB StateBackend 概覽和相關配置討論
RocksDB 是 Facebook 開源的 LSM 的鍵值存儲數據庫,被廣泛應用于大數據系統的單機組件中。Flink 的 keyed state 本質上來說就是一個鍵值對,所以與 RocksDB 的數據模型是吻合的。下圖分別是 “window state” 和 “value state” 在 RocksDB 中的存儲格式,所有存儲的 key,value 均被序列化成 bytes 進行存儲。
在 RocksDB 中,每個 state 獨享一個 Column Family,而每個 Column family 使用各自獨享的 write buffer 和 block cache,上圖中的 window state 和 value state實際上分屬不同的 column family。
下面介紹一些對 RocksDB 性能比較有影響的參數,并整理了一些相關的推薦配置,至于其他配置項,可以參閱社區相關文檔。
| state.backend.rocksdb.thread.num | 后臺 flush 和 compaction 的線程數. 默認值 ‘1‘. 建議調大 |
| state.backend.rocksdb.writebuffer.count | 每個 column family 的 write buffer 數目,默認值 ‘2‘. 如果有需要可以適當調大 |
| state.backend.rocksdb.writebuffer.size | 每個 write buffer 的 size,默認值‘64MB‘. 對于寫頻繁的場景,建議調大 |
| state.backend.rocksdb.block.cache-size | 每個 column family 的 block cache大小,默認值‘8MB’,如果存在重復讀的場景,建議調大 |
State best practice:一些使用 state 的心得
Operator state 使用建議
■ 慎重使用長 list
下圖展示的是目前 task 端 operator state 在執行完 checkpoint 返回給 job master 端的 StateMetaInfo 的代碼片段。
由于 operator state 沒有 key group 的概念,所以為了實現改并發恢復的功能,需要對 operator state 中的每一個序列化后的元素存儲一個位置偏移 offset,也就是構成了上圖紅框中的 offset 數組。
那么如果你的 operator state 中的 list 長度達到一定規模時,這個 offset 數組就可能會有幾十 MB 的規模,關鍵這個數組是會返回給 job master,當 operator 的并發數目很大時,很容易觸發 job master 的內存超用問題。我們遇到過用戶把 operator state 當做黑名單存儲,結果這個黑名單規模很大,導致一旦開始執行 checkpoint,job master 就會因為收到 task 發來的“巨大”的 offset 數組,而內存不斷增長直到超用無法正常響應。
■ 正確使用 UnionListState
union list state 目前被廣泛使用在 kafka connector 中,不過可能用戶日常開發中較少遇到,他的語義是從檢查點恢復之后每個并發 task 內拿到的是原先所有operator 上的 state,如下圖所示:
kafka connector 使用該功能,為的是從檢查點恢復時,可以拿到之前的全局信息,如果用戶需要使用該功能,需要切記恢復的 task 只取其中的一部分進行處理和用于下一次 snapshot,否則有可能隨著作業不斷的重啟而導致 state 規模不斷增長。
Keyed state 使用建議
■ 如何正確清空當前的 state
state.clear() 實際上只能清理當前 key 對應的 value 值,如果想要清空整個 state,需要借助于 applyToAllKeys 方法,具體代碼片段如下:
如果你的需求中只是對 state 有過期需求,借助于 state TTL 功能來清理會是一個性能更好的方案。
■ RocksDB 中考慮 value 值很大的極限場景
受限于 JNI bridge API 的限制,單個 value 只支持 2^31 bytes 大小,如果存在很極限的情況,可以考慮使用 MapState 來替代 ListState 或者 ValueState,因為RocksDB 的 map state 并不是將整個 map 作為 value 進行存儲,而是將 map 中的一個條目作為鍵值對進行存儲。
■ 如何知道當前 RocksDB 的運行情況
比較直觀的方式是打開 RocksDB 的 native metrics ,在默認使用 Flink managed memory 方式的情況下,state.backend.rocksdb.metrics.block-cache-usage ,state.backend.rocksdb.metrics.mem-table-flush-pending,state.backend.rocksdb.metrics.num-running-compactions 以及 state.backend.rocksdb.metrics.num-running-flushes 是比較重要的相關 metrics。
下面這張圖是 Flink-1.10 之后,打開相關 metrics 的示例圖:
而下面這張是 Flink-1.10 之前或者關閉 state.backend.rocksdb.memory.managed 的效果:
■ 容器內運行的 RocksDB 的內存超用問題
在 Flink-1.10 之前,由于一個 state 獨占若干 write buffer 和一塊 block cache,所以我們會建議用戶不要在一個 operator 內創建過多的 state,否則需要考慮到相應的額外內存使用量,否則容易造成在容器內運行時,相關進程被容器環境所殺。對于用戶來說,需要考慮一個 slot 內有多少 RocksDB 實例在運行,一個 RocksDB 中有多少 state,整體的計算規則就很復雜,很難真得落地實施。
Flink-1.10 之后,由于引入了 RocksDB 的內存托管機制,在絕大部分情況下, RocksDB 的這一部分 native 內存是可控的,不過受限于 RocksDB 的相關 cache 實現限制(這里暫不展開,后續會有文章討論),在某些場景下,無法做到完美控制,這時候建議打開上文提到的 native metrics,觀察相關 block cache 內存使用是否存在超用情況,可以將相關內存添加到 taskmanager.memory.task.off-heap.size 中,使得 Flink 有更多的空間給 native 內存使用。
一些使用 checkpoint 的使用建議
■ Checkpoint 間隔不要太短
雖然理論上 Flink 支持很短的 checkpoint 間隔,但是在實際生產中,過短的間隔對于底層分布式文件系統而言,會帶來很大的壓力。另一方面,由于檢查點的語義,所以實際上 Flink 作業處理 record 與執行 checkpoint 存在互斥鎖,過于頻繁的 checkpoint,可能會影響整體的性能。當然,這個建議的出發點是底層分布式文件系統的壓力考慮。
■ 合理設置超時時間
默認的超時時間是 10min,如果 state 規模大,則需要合理配置。最壞情況是分布式地創建速度大于單點(job master 端)的刪除速度,導致整體存儲集群可用空間壓力較大。建議當檢查點頻繁因為超時而失敗時,增大超時時間。
原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的Flink State 最佳实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Gartner 容器报告:阿里云与 AW
- 下一篇: 【开发者成长】喧哗的背后:Serverl