flink 本地_Flink原理Apache Flink漫谈系列 State
實(shí)際問題
在流計(jì)算場(chǎng)景中,數(shù)據(jù)會(huì)源源不斷的流入Apache Flink系統(tǒng),每條數(shù)據(jù)進(jìn)入Apache Flink系統(tǒng)都會(huì)觸發(fā)計(jì)算。如果我們想進(jìn)行一個(gè)Count聚合計(jì)算,那么每次觸發(fā)計(jì)算是將歷史上所有流入的數(shù)據(jù)重新計(jì)算一次,還是每次計(jì)算都是在上一次計(jì)算結(jié)果之上進(jìn)行增量計(jì)算呢?答案是肯定的,Apache Flink是基于上一次的計(jì)算結(jié)果進(jìn)行增量計(jì)算的。那么問題來了: "上一次的計(jì)算結(jié)果保存在哪里,保存在內(nèi)存可以嗎?",答案是否定的,如果保存在內(nèi)存,在由于網(wǎng)絡(luò),硬件等原因造成某個(gè)計(jì)算節(jié)點(diǎn)失敗的情況下,上一次計(jì)算結(jié)果會(huì)丟失,在節(jié)點(diǎn)恢復(fù)的時(shí)候,就需要將歷史上所有數(shù)據(jù)(可能十幾天,上百天的數(shù)據(jù))重新計(jì)算一次,所以為了避免這種災(zāi)難性的問題發(fā)生,Apache Flink 會(huì)利用State存儲(chǔ)計(jì)算結(jié)果。本篇將會(huì)為大家介紹Apache Flink State的相關(guān)內(nèi)容。
什么是State
這個(gè)問題似乎有些"弱智"?不管問題的答案是否顯而易見,但我還是想簡(jiǎn)單說一下在Flink里面什么是State?State是指流計(jì)算過程中計(jì)算節(jié)點(diǎn)的中間計(jì)算結(jié)果或元數(shù)據(jù)屬性,比如 在aggregation過程中要在state中記錄中間聚合結(jié)果,比如 Apache Kafka 作為數(shù)據(jù)源時(shí)候,我們也要記錄已經(jīng)讀取記錄的offset,這些State數(shù)據(jù)在計(jì)算過程中會(huì)進(jìn)行持久化(插入或更新)。所以Flink中的State就是與時(shí)間相關(guān)的,Flink任務(wù)的內(nèi)部數(shù)據(jù)(計(jì)算數(shù)據(jù)和元數(shù)據(jù)屬性)的快照。
為什么需要State
與批計(jì)算相比,State是流計(jì)算特有的,批計(jì)算沒有failover機(jī)制,要么成功,要么重新計(jì)算。流計(jì)算在 大多數(shù)場(chǎng)景 下是增量計(jì)算,數(shù)據(jù)逐條處理(大多數(shù)場(chǎng)景),每次計(jì)算是在上一次計(jì)算結(jié)果之上進(jìn)行處理的,這樣的機(jī)制勢(shì)必要將上一次的計(jì)算結(jié)果進(jìn)行存儲(chǔ)(生產(chǎn)模式要持久化),另外由于 機(jī)器,網(wǎng)絡(luò),臟數(shù)據(jù)等原因?qū)е碌某绦蝈e(cuò)誤,在重啟job時(shí)候需要從成功的檢查點(diǎn)(checkpoint,后面篇章會(huì)專門介紹)進(jìn)行state的恢復(fù)。增量計(jì)算,Failover這些機(jī)制都需要state的支撐。
State 存儲(chǔ)實(shí)現(xiàn)
Flink內(nèi)部有三種state的存儲(chǔ)實(shí)現(xiàn),具體如下:
基于內(nèi)存的HeapStateBackend - 在debug模式使用,不 建議在生產(chǎn)模式下應(yīng)用;
基于HDFS的FsStateBackend - 分布式文件持久化,每次讀寫都操作內(nèi)存,同需考慮OOM問題;
基于RocksDB的RocksDBStateBackend - 本地文件+異步HDFS持久化;
State存儲(chǔ)的架構(gòu)
Apache Flink 默認(rèn)是RocksDB+HDFS的方式進(jìn)行State的存儲(chǔ),State存儲(chǔ)分兩個(gè)階段,首先本地存儲(chǔ)到RocksDB,然后異步的同步到遠(yuǎn)程的HDFS。這樣的而設(shè)計(jì)既消除了HeapStateBackend的局限(內(nèi)存大小,機(jī)器壞掉丟失等),也減少了純分布式存儲(chǔ)的網(wǎng)絡(luò)IO開銷。
State 分類
KeyedState - 這里面的key是我們?cè)赟QL語句中對(duì)應(yīng)的GroupBy/PartitioneBy里面的字段,key的值就是groupby/PartitionBy字段組成的Row的字節(jié)數(shù)組,每一個(gè)key都有一個(gè)屬于自己的State,key與key之間的State是不可見的;
OperatorState - Flink內(nèi)部的Source Connector的實(shí)現(xiàn)中就會(huì)用OperatorState來記錄source數(shù)據(jù)讀取的offset。
State在擴(kuò)容時(shí)候的重新分配
Flink是一個(gè)大規(guī)模并行分布式系統(tǒng),允許大規(guī)模的有狀態(tài)流處理。為了可伸縮性,Flink作業(yè)在邏輯上被分解成operator graph,并且每個(gè)operator的執(zhí)行被物理地分解成多個(gè)并行運(yùn)算符實(shí)例。從概念上講,Flink中的每個(gè)并行運(yùn)算符實(shí)例都是一個(gè)獨(dú)立的任務(wù),可以在自己的機(jī)器上調(diào)度到網(wǎng)絡(luò)連接的其他機(jī)器運(yùn)行。
Flink的DAG圖中只有邊相連的節(jié)點(diǎn)有網(wǎng)絡(luò)通信,也就整個(gè)DAG在垂直方向有網(wǎng)絡(luò)IO,在水平方向如下圖的stateful節(jié)點(diǎn)之間沒有網(wǎng)絡(luò)通信,這種模型也保證了每個(gè)operator實(shí)例維護(hù)一份自己的state,并且保存在本地磁盤(遠(yuǎn)程異步同步)。通過這種設(shè)計(jì),任務(wù)的所有狀態(tài)數(shù)據(jù)都是本地的,并且狀態(tài)訪問不需要任務(wù)之間的網(wǎng)絡(luò)通信。避免這種流量對(duì)于像Flink這樣的大規(guī)模并行分布式系統(tǒng)的可擴(kuò)展性至關(guān)重要。
如上我們知道Flink中State有OperatorState和KeyedState,那么在進(jìn)行擴(kuò)容時(shí)候(增加并發(fā))State如何分配呢?比如:外部Source有5個(gè)partition,在Flink上面由Source的1個(gè)并發(fā)擴(kuò)容到2個(gè)并發(fā),中間Stateful Operation 節(jié)點(diǎn)由2個(gè)并發(fā)并擴(kuò)容的3個(gè)并發(fā),如下圖所示:
在Flink中對(duì)不同類型的State有不同的擴(kuò)容方法,接下來我們分別介紹。
OperatorState對(duì)擴(kuò)容的處理
我們選取Flink中某個(gè)具體Connector實(shí)現(xiàn)實(shí)例進(jìn)行介紹,以MetaQ為例,MetaQ以topic方式訂閱數(shù)據(jù),每個(gè)topic會(huì)有N>0個(gè)分區(qū),以上圖為例,假設(shè)我們訂閱的MetaQ的topic有5個(gè)分區(qū),那么當(dāng)我們source由1個(gè)并發(fā)調(diào)整為2個(gè)并發(fā)時(shí)候,State是怎么恢復(fù)的呢?
State 恢復(fù)的方式與Source中OperatorState的存儲(chǔ)結(jié)構(gòu)有必然關(guān)系,我們先看MetaQSource的實(shí)現(xiàn)是如何存儲(chǔ)State的。首先MetaQSource 實(shí)現(xiàn)了ListCheckpointed,其中的T是Tuple2,我們?cè)诳碙istCheckpointed接口的內(nèi)部定義如下:
public interface ListCheckpointed<T extends Serializable> { ListsnapshotState(long var1, long var3) throws Exception; void restoreState(List var1) throws Exception;}我們發(fā)現(xiàn) snapshotState方法的返回值是一個(gè)List,T是Tuple2,也就是snapshotState方法返回List>,這個(gè)類型說明state的存儲(chǔ)是一個(gè)包含partiton和offset信息的列表,InputSplit代表一個(gè)分區(qū),Long代表當(dāng)前partition讀取的offset。InputSplit有一個(gè)方法如下:
public interface InputSplit extends Serializable { int getSplitNumber();}也就是說,InputSplit我們可以理解為是一個(gè)Partition索引,有了這個(gè)數(shù)據(jù)結(jié)構(gòu)我們?cè)诳纯瓷厦鎴D所示的case是如何工作的?當(dāng)Source的并行度是1的時(shí)候,所有打partition數(shù)據(jù)都在同一個(gè)線程中讀取,所有partition的state也在同一個(gè)state中維護(hù),State存儲(chǔ)信息格式如下:
如果我們現(xiàn)在將并發(fā)調(diào)整為2,那么我們5個(gè)分區(qū)的State將會(huì)在2個(gè)獨(dú)立的任務(wù)(線程)中進(jìn)行維護(hù),在內(nèi)部實(shí)現(xiàn)中我們有如下算法進(jìn)行分配每個(gè)Task所處理和維護(hù)partition的State信息,如下:
List assignedPartitions = new LinkedList<>();for (int i = 0; i < partitions; i++) { if (i % consumerCount == consumerIndex) { assignedPartitions.add(i); }}這個(gè)求mod的算法,決定了每個(gè)并發(fā)所處理和維護(hù)partition的State信息,針對(duì)我們當(dāng)前的case具體的存儲(chǔ)情況如下:
那么到現(xiàn)在我們發(fā)現(xiàn)上面擴(kuò)容后State得以很好的分配得益于OperatorState采用了List的數(shù)據(jù)結(jié)構(gòu)的設(shè)計(jì)。另外大家注意一個(gè)問題,相信大家已經(jīng)發(fā)現(xiàn)上面分配partition的算法有一個(gè)限制,那就是Source的擴(kuò)容(并發(fā)數(shù))是否可以超過Source物理存儲(chǔ)的partition數(shù)量呢?答案是否定的,不能。目前Flink的做法是提前報(bào)錯(cuò),即使不報(bào)錯(cuò)也是資源的浪費(fèi),因?yàn)槌^partition數(shù)量的并發(fā)永遠(yuǎn)分配不到待管理的partition。
KeyedState對(duì)擴(kuò)容的處理
對(duì)于KeyedState最容易想到的是hash(key) mod parallelism(operator) 方式分配state,就和OperatorState一樣,這種分配方式大多是情況是恢復(fù)的state不是本地已有的state,需要一次網(wǎng)絡(luò)拷貝,這種效率比較低,OperatorState采用這種簡(jiǎn)單的方式進(jìn)行處理是因?yàn)镺peratorState的state一般都比較小,網(wǎng)絡(luò)拉取的成本很小,對(duì)于KeyedState往往很大,我們會(huì)有更好的選擇,在Flink中采用的是Key-Groups方式進(jìn)行分配。
什么是Key-Groups
Key-Groups 是Flink中對(duì)keyed state按照key進(jìn)行分組分組的方式,每個(gè)key-group中會(huì)包含N>0個(gè)key,一個(gè)key-group是State分配的原子單位。在Flink中關(guān)于Key-Group的對(duì)象是 KeyGroupRange, 如下:
public class KeyGroupRange implements KeyGroupsList, Serializable { ... ... private final int startKeyGroup; private final int endKeyGroup; ... ...}KeyGroupRange兩個(gè)重要的屬性就是 startKeyGroup和endKeyGroup,定義了startKeyGroup和endKeyGroup屬性后Operator上面的Key-Group的個(gè)數(shù)也就確定了;
什么決定Key-Groups的個(gè)數(shù)
key-group的數(shù)量在job啟動(dòng)前必須是確定的且運(yùn)行中不能改變。由于key-group是state分配的原子單位,而每個(gè)operator并行實(shí)例至少包含一個(gè)key-group,因此operator的最大并行度不能超過設(shè)定的key-group的個(gè)數(shù),那么在Flink的內(nèi)部實(shí)現(xiàn)上key-group的數(shù)量就是最大并行度的值。?
GroupRange.of(0, maxParallelism)如何決定key屬于哪個(gè)Key-Group
確定好GroupRange之后,如何決定每個(gè)Key屬于哪個(gè)Key-Group呢?我們采取的是取mod的方式,在KeyGroupRangeAssignment中的assignToKeyGroup方法會(huì)將key劃分到指定的key-group中,如下:
public static int assignToKeyGroup(Object key, int maxParallelism) { return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);}public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) { return HashPartitioner.INSTANCE.partition(keyHash, maxParallelism);}@Overridepublic int partition(T key, int numPartitions) { return MathUtils.murmurHash(Objects.hashCode(key)) % numPartitions;}如上實(shí)現(xiàn)我們了解到分配Key到指定的key-group的邏輯是利用key的hashCode和maxParallelism取余操作的來分配的。如下圖當(dāng)parallelism=2,maxParallelism=10的情況下,流上key與key-group的對(duì)應(yīng)關(guān)系如下圖所示:
如上圖key(a)的hashCode是97,與最大并發(fā)10取余后是7,被分配到了KG-7中,流上每個(gè)event都會(huì)分配到KG-0至KG-9其中一個(gè)Key-Group中。
每個(gè)Operator實(shí)例如何獲取Key-Groups,了解了Key-Groups概念和如何分配每個(gè)Key到指定的Key-Groups之后,我們看看如何計(jì)算每個(gè)Operator實(shí)例所處理的Key-Groups。在KeyGroupRangeAssignment的computeKeyGroupRangeForOperatorIndex方法描述了分配算法:
public static KeyGroupRange computeKeyGroupRangeForOperatorIndex( int maxParallelism, int parallelism, int operatorIndex) { GroupRange splitRange = GroupRange.of(0, maxParallelism).getSplitRange(parallelism, operatorIndex); int startGroup = splitRange.getStartGroup(); int endGroup = splitRange.getEndGroup(); return new KeyGroupRange(startGroup, endGroup - 1);}public GroupRange getSplitRange(int numSplits, int splitIndex) { ... final int numGroupsPerSplit = getNumGroups() / numSplits; final int numFatSplits = getNumGroups() % numSplits; int startGroupForThisSplit; int endGroupForThisSplit; if (splitIndex < numFatSplits) { startGroupForThisSplit = getStartGroup() + splitIndex * (numGroupsPerSplit + 1); endGroupForThisSplit = startGroupForThisSplit + numGroupsPerSplit + 1; } else { startGroupForThisSplit = getStartGroup() + splitIndex * numGroupsPerSplit + numFatSplits; endGroupForThisSplit = startGroupForThisSplit + numGroupsPerSplit; } if (startGroupForThisSplit >= endGroupForThisSplit) { return GroupRange.emptyGroupRange(); } else { return new GroupRange(startGroupForThisSplit, endGroupForThisSplit); }}上面代碼的核心邏輯是先計(jì)算每個(gè)Operator實(shí)例至少分配的Key-Group個(gè)數(shù),將不能整除的部分N個(gè),平均分給前N個(gè)實(shí)例。最終每個(gè)Operator實(shí)例管理的Key-Groups會(huì)在GroupRange中表示,本質(zhì)是一個(gè)區(qū)間值;下面我們就上圖的case,說明一下如何進(jìn)行分配以及擴(kuò)容后如何重新分配。
假設(shè)上面的Stateful Operation節(jié)點(diǎn)的最大并行度maxParallelism的值是10,也就是我們一共有10個(gè)Key-Group,當(dāng)我們并發(fā)是2的時(shí)候和并發(fā)是3的時(shí)候分配的情況如下圖:
如上算法我們發(fā)現(xiàn)在進(jìn)行擴(kuò)容時(shí)候,大部分state還是落到本地的,如Task0只有KG-4被分出去,其他的還是保持在本地。同時(shí)我們也發(fā)現(xiàn),一個(gè)job如果修改了maxParallelism的值那么會(huì)直接影響到Key-Groups的數(shù)量和key的分配,也會(huì)打亂所有的Key-Group的分配,目前在Flink系統(tǒng)中統(tǒng)一將maxParallelism的默認(rèn)值調(diào)整到4096,最大程度的避免無法擴(kuò)容的情況發(fā)生。
小結(jié)
本篇簡(jiǎn)單介紹了Flink中State的概念,并重點(diǎn)介紹了OperatorState和KeyedState在擴(kuò)容時(shí)候的處理方式。Flink State是支撐Flink中failover,增量計(jì)算,Window等重要機(jī)制和功能的核心設(shè)施。后續(xù)介紹failover,增量計(jì)算,Window等相關(guān)篇章中也會(huì)涉及State的利用,當(dāng)涉及到本篇沒有覆蓋的內(nèi)容時(shí)候再補(bǔ)充介紹。
訂閱號(hào)&知識(shí)星球【免費(fèi)】
分享是最好的享受,予人成功是最大的成功,一個(gè)人最大的開心不源于自己會(huì)什么,而源于能讓別人擅長(zhǎng)什么,無欲無求,但予人所求!?
More about Me...
我堅(jiān)信:
"致虛極,守靜篤。萬物并作,吾以觀其復(fù)”。“虛”和“靜”是心靈的本初的狀態(tài),也應(yīng)該是一種常態(tài),看到新芽不驚,看到落葉不哀,靜觀萬物的循環(huán)往復(fù),通曉自然之理,體悟自然之道。"
總結(jié)
以上是生活随笔為你收集整理的flink 本地_Flink原理Apache Flink漫谈系列 State的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: zemax中非序列添加相位面_老王讲放射
- 下一篇: pcb地线应该不应该做成环路_PCB制板