日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flink架构及工作原理

發(fā)布時間:2024/1/23 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink架构及工作原理 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

System Architecture

分布式系統(tǒng)需要解決:分配和管理在集群的計算資源、處理配合、持久和可訪問的數(shù)據(jù)存儲、失敗恢復。Fink專注分布式流處理。

Components of a Flink Setup

  • JobManager :接受application,包含StreamGraph(DAG)、JobGraph(logical dataflow graph,已經(jīng)進過優(yōu)化,如task chain)和JAR,將JobGraph轉化為ExecutionGraph(physical dataflow graph,并行化),包含可以并發(fā)執(zhí)行的tasks。其他工作類似Spark driver,如向RM申請資源、schedule tasks、保存作業(yè)的元數(shù)據(jù),如checkpoints。如今JM可分為JobMaster和ResourceManager(和下面的不同),分別負責任務和資源,在Session模式下啟動多個job就會有多個JobMaster。
  • ResourceManager:一般是Yarn,當TM有空閑的slot就會告訴JM,沒有足夠的slot也會啟動新的TM。kill掉長時間空閑的TM。
  • TaskManager類似Spark的executor,會跑多個線程的task、數(shù)據(jù)緩存與交換。
  • Dispatcher(Application Master)提供REST接口來接收client的application提交,它負責啟動JM和提交application,同時運行Web UI。

task是最基本的調度單位,由一個線程執(zhí)行,里面包含一個或多個operator。多個operators就成為operation chain,需要上下游并發(fā)度一致,且傳遞模式(之前的Data exchange strategies)是forward。

slot是TM的資源子集。結合下面Task Execution的圖,一個slot并不代表一個線程,它里面并不一定只放一個task。多個task在一個slot就涉及slot sharing group。一個jobGraph的任務需要多少slot,取決于最大的并發(fā)度,這樣的話,并發(fā)1和并發(fā)2就不會放到一個slot中。Co-Location Group是在此基礎上,數(shù)據(jù)的forward形式,即一個slot中,如果它處理的是key1的數(shù)據(jù),那么接下來的task也是處理key1的數(shù)據(jù),此時就達到Co-Location Group。

盡管有slot sharing group,但一個group里串聯(lián)起來的task各自所需資源的大小并不好確定。阿里日常用得最多的還是一個task一個slot的方式。

Session模式(上圖):預先啟動好AM和TM,每提交一個job就啟動一個Job Manager并向Flink的RM申請資源,不夠的話,Flink的RM向YARN的RM申請資源。適合規(guī)模小,運行時間短的作業(yè)。./bin/flink run ./path/to/job.jar

Job模式:每一個job都重新啟動一個Flink集群,完成后結束Flink,且只有一個Job Manager。資源按需申請,適合大作業(yè)。./bin/flink run -m yarn-cluster ./path/to/job.jar

下面是簡單例子,詳細看官網(wǎng)。

# 啟動yarn-session,4個TM,每個有4GB堆內存,4個slot cd flink-1.7.0/ ./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m -s 4 # 啟動作業(yè) ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024m -ytm 4096m ./examples/batch/WordCount.jar

細節(jié)取決于具體環(huán)境,如不同的RM

Application Deployment

Framework模式:Flink作業(yè)為JAR,并被提交到Dispatcher or JM or YARN。

Library模式:Flink作業(yè)為application-specific container image,如Docker image,適合微服務。

Task Execution

作業(yè)調度:在流計算中預先啟動好節(jié)點,而在批計算中,每當某個階段完成計算才啟動下一個節(jié)點。

資源管理:slot作為基本單位,有大小和位置屬性。JM有SlotPool,向Flink RM申請Slot,FlinkRM發(fā)現(xiàn)自己的SlotManager中沒有足夠的Slot,就會向集群RM申請。后者返回可用TM的ip,讓FlinkRM去啟動,TM啟動后向FlinkRM注冊。后者向TM請求Slot,TM向JM提供相應Slot。JM用完后釋放Slot,TM會把釋放的Slot報告給FlinkRM。在Blink版本中,job模式會根據(jù)申請slot的大小分配相應的TM,而session模式則預先設置好TM大小,每有slot申請就從TM中劃分相應的資源。

任務可以是相同operator (data parallelism),不同 operator (task parallelism),甚至不同application (job parallelism)。TM提供一定數(shù)量的slots來控制并行的任務數(shù)。

上圖A和C是source function,E是sink function,小數(shù)字表示并行度。

一個TM是一個JVM進程,它通過多線程完成任務。線程的隔離不太好,一個線程失敗有可能導致整個TM失敗。

Highly-Available Setup

從失敗中恢復需要重啟失敗進程、作業(yè)和恢復它的state。

當一個TM掛掉而RM又無法找到空閑的資源時,就只能暫時降低并行度,直到有空閑的資源重啟TM。

當JM掛掉就靠ZK來重新選舉,和找到JM存儲到遠程storage的元數(shù)據(jù)、JobGraph。重啟JM并從最后一個完成的checkpoint開始。

JM在執(zhí)行期間會得到每個task checkpoints的state存儲路徑(task將state寫到遠程storage)并寫到遠程storage,同時在ZK的存儲路徑留下pointer指明到哪里找上面的存儲路徑。

背壓

數(shù)據(jù)涌入的速度大于處理速度。在source operator中,可通過Kafka解決。在任務間的operator有如下機制應對:

Local exchange:task1和2在同一個工作節(jié)點,那么buffer pool可以直接交給下一個任務,但下一個任務task2消費buffer pool中的信息速度減慢時,當前任務task1填充buffer pool的速度也會減慢。

Remote exchange:TM保證每個task至少有一個incoming和一個outgoing緩沖區(qū)。當下游receiver的處理速度低于上有的sender的發(fā)送速度,receiver的incoming緩沖區(qū)就會開始積累數(shù)據(jù)(需要空閑的buffer來放從TCP連接中接收的數(shù)據(jù)),當擠滿后就不再接收數(shù)據(jù)。上游sender利用netty水位機制,當網(wǎng)絡中的緩沖數(shù)據(jù)過多時暫停發(fā)送。

TM負責數(shù)據(jù)在tasks間的轉移,轉移之前會存儲到buffer(這又變回micro-batches)。每個TM有32KB的網(wǎng)絡buffer用于接收和發(fā)送數(shù)據(jù)。如果sender和receiver在不同進程,那么會通過操作系統(tǒng)的網(wǎng)絡棧來通信。每對TM保持permanent TCP連接來交換數(shù)據(jù)。每個sender任務能夠給所有receiving任務發(fā)送數(shù)據(jù),反之,所有receiver任務能夠接收所有sender任務的數(shù)據(jù)。TM保證每個任務都至少有一個incoming和outgoing的buffer,并增加額外的緩沖區(qū)分配約束來避免死鎖。

如果sender和receiver任務在同一個TM進程,sender會序列化結果數(shù)據(jù)到buffer,如果滿了就放到隊列。receiver任務通過隊列得到數(shù)據(jù)并進行反序列化。這樣的好處是解耦任務并允許在任務中使用可變對象,從而減少了對象實例化和垃圾收集。一旦數(shù)據(jù)被序列化,就能安全地修改。而缺點是計算消耗大,在一些條件下能夠把task穿起來,避免序列化。(C10)

Flow Control with Back Pressure

receiver放到緩沖區(qū)的數(shù)據(jù)變?yōu)殛犃?#xff0c;sender將要發(fā)送的數(shù)據(jù)變?yōu)殛犃?#xff0c;最后sender減慢發(fā)送速度。

Event Time Processing

event time處理的數(shù)據(jù)必須有時間戳(Long unix timestamp)并定義了watermarks。watermark是一種特殊的records holding a timestamp long value。它必須是遞增的(防止倒退),有一個timestamp t(下圖的5),暗示所有接下來的數(shù)據(jù)都會大于這個值。后來的,小于這個值,就被視為遲來數(shù)據(jù),Flink有其他機制處理。

Watermarks and Event Time

WM在Flink是一種特殊的record,它會被operator tasks接收和釋放。

tasks有時間服務來維持timers(timers注冊到時間服務上),在time-window task中,timers分別記錄了各個window的結束時間。當任務獲得一個watermark時,task會根據(jù)這個watermark的timestamp更新內部的event-time clock。任務內部的時間服務確定所有timers時間是否小于watermark的timestamp,如果大于則觸發(fā)call-back算子來釋放記錄并返回結果。最后task還會將更新的event-time clock的WM進行廣播。(結合下圖理解)

只有ProcessFunction可以讀取和修改timestamp或者watermark(The?ProcessFunction?can read the timestamp of a currently processed record, request the current event-time of the operator, and register timers)。下面是PF的行為。

當收到WM大于所有目前擁有的WM,就會把event-time clock更新為所有WM中最小的那個,并廣播這個最小的WM。即便是多個streams輸入,機制也一樣,只是增加Paritition WM數(shù)量。這種機制要求獲得的WM必須是累加的,而且task必須有新的WM接收,否則clock就不會更新,task的timers就不會被觸發(fā)。另外,當多個streams輸入時,timers會被WM比較離散的stream主導,從而使更密集的stream的state不斷積累。

Timestamp Assignment and Watermark Generation

當streaming application消化流時產(chǎn)生。Flink有三種方式產(chǎn)生:

  • SourceFunction:產(chǎn)生的record帶有timestamp,一些特殊時點產(chǎn)生WM。如果SF暫時不再發(fā)送WM,則會被認為是idle。Flink會從接下來的watermark operators中排除由這個SF生產(chǎn)的分區(qū)(上圖有4個分區(qū)),從而解決timer不觸發(fā)的問題。
  • AssignerWithPeriodicWatermarks?提取每條記錄的timestamp,并周期性的查詢當前WM,即上圖的Partition WM。
  • AssignerWithPunctuatedWatermarks?可以從每條數(shù)據(jù)提取WM。

上面兩個User-defined timestamp assignment functions通常用在source operator附近,因為stream一經(jīng)處理就很難把握record的時間順序了。所以UDF可以修改timestamp和WM,但在數(shù)據(jù)處理時使用不是一個好主意。

State Management

由任務維護并用于計算函數(shù)結果的所有數(shù)據(jù)都屬于任務的state。其實state可以理解為task業(yè)務邏輯的本地或實例變量。

在Flink,state總是和特定的operator關聯(lián)。operator需要注冊它的state,而state有兩種類型:

  • Operator State:由同一并行任務處理的所有記錄都可以訪問相同的state,而其他的task或operator不能訪問,即一個task專屬一個state。這種state有三種primitives
    • List State?represents state as a list of entries.
    • Union List State同上,但在任務失敗和作業(yè)從savepoint重啟的行為不一樣
    • Broadcast State(v1.5) 同樣一個task專屬一個state,但state都是一樣的(需要自己注意保持一致,對state更新時,實際上只對當前task的state進行更新。只有所有task的更新一樣時,即輸入數(shù)據(jù)一樣(一開始廣播所以一樣,但數(shù)據(jù)的順序可能不一樣),對數(shù)據(jù)的處理一樣,才能保證state一樣)。這種state只能存儲在內存,所以沒有RockDB backend。
  • Keyed State:相同key的record共享一個state。
    • Value State:每個key一個值,這個值可以是復雜的數(shù)據(jù)結構.
    • List State:每個key一個list
    • Map State:每個key一個map

上面兩種state的存在方式有兩種:raw和managed,一般都是用后者,也推薦用后者(更好的內存管理、不需造輪子)。

State Backends

state backend決定了state如何被存儲、訪問和維持。它的主要職責是本地state管理和checkpoint state到遠程。在管理方面,可選擇將state存儲到內存還是磁盤。checkpoint方面在C8詳細介紹。

MemoryStateBackend, FsStateBackend, RocksDBStateBackend適合越來越大的state。都支持異步checkpoint,其中RocksDB還支持incremental的checkpoint。

  • 注意:As RocksDB’s JNI bridge API is based on byte[], the maximum supported size per key and per value is 2^31 bytes each. IMPORTANT: states that use merge operations in RocksDB (e.g. ListState) can silently accumulate value sizes > 2^31 bytes and will then fail on their next retrieval. This is currently a limitation of RocksDB JNI.

Scaling Stateful Operators

Flink會根據(jù)input rate調整并發(fā)度。對于stateful operators有以下4種方式:

  • keyed state:根據(jù)key group來調整,即分為同一組的key-value會被分到相同的task

  • list state:所有l(wèi)ist entries會被收集并重新均勻分布,當增加并發(fā)度時,要新建list

  • union list state:增加并發(fā)時,廣播整個list,所以rescaling后,所有task都有所有的list state。

  • broadcast state

Checkpoints, Savepoints, and State Recovery

Flink’s Lightweight Checkpointing Algorithm

在分布式開照算法Chandy-Lamport的基礎上實現(xiàn)。有一種特殊的record叫checkpoint barrier(由JM產(chǎn)生),它帶有checkpoint ID來把流進行劃分。在CB前面的records會被包含到checkpoint,之后的會被包含在之后的checkpoint。

當source task收到這種信息,就會停止發(fā)送recordes,觸發(fā)state backend對本地state的checkpoint,并廣播checkpoint ID到所有下游task。當checkpoint完成時,state backend喚醒source task,后者向JM確定相應的checkpoint ID已經(jīng)完成任務。

當下游獲得其中一個CB時,就會暫停處理這個CB對應的source的數(shù)據(jù)(完成checkpoint后發(fā)送的數(shù)據(jù)),并將這些數(shù)據(jù)存到緩沖區(qū),直到其他相同ID的CB都到齊,就會把state(下圖的12、8)進行checkpoint,并廣播CB到下游。直到所有CB被廣播到下游,才開始處理排隊在緩沖區(qū)的數(shù)據(jù)。當然,其他沒有發(fā)送CB的source的數(shù)據(jù)會繼續(xù)處理。

最后,當所有sink會向JM發(fā)送BC確定checkpoint已完成。

這種機制還有兩個優(yōu)化:

  • 當operator的state很大時,復制整個state并發(fā)送到遠程storage會很費時。而RocksDB state backend支持asynchronous and incremental的checkpoints。當觸發(fā)checkpoint時,backend會快照所有本地state的修改(直至上一次checkpoint),然后馬上讓task繼續(xù)執(zhí)行。后臺線程異步發(fā)送快照到遠程storage。
  • 在等待其余CB時,已經(jīng)完成checkpoint的source數(shù)據(jù)需要排隊。但如果使用at-least-once就不需要等了。但當所有CB到齊再checkpoint,存儲的state就已經(jīng)包含了下一次checkpoint才記錄的數(shù)據(jù)。(如果是取最值這種state就無所謂)

Recovery from Consistent Checkpoints

上圖隊列中的7和6之所以能恢復,取決于數(shù)據(jù)源是否resettable,如Kafka,不會因為發(fā)送信息就把信息刪除。這才能實現(xiàn)處理過程的exactly-once state consistency(嚴格來講,數(shù)據(jù)還是被重復處理,但是在讀檔后重復的)。但是下游系統(tǒng)有可能接收到多個結果。這方面,Flink提供sink算子實現(xiàn)output的exactly-once,例如給checkpoint提交records釋放記錄。另一個方法是idempotent updates,詳細看C7。

Savepoints

checkpoints加上一些額外的元數(shù)據(jù),功能也是在checkpoint的基礎上豐富。不同于checkpoints,savepoint不會被Flink自動創(chuàng)造(由用戶或者外部scheduler觸發(fā)創(chuàng)造)和銷毀。savepoint可以重啟不同但兼容的作業(yè),從而:

  • 修復bugs進而修復錯誤的結果,也可用于A/B test或者what-if場景。
  • 調整并發(fā)度
  • 遷移作業(yè)到其他集群、新版Flink

也可以用于暫停作業(yè),通過savepoint查看作業(yè)情況。

參考
Stream Processing with Apache Flink by Vasiliki Kalavri; Fabian Hueske

總結

以上是生活随笔為你收集整理的Flink架构及工作原理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內容還不錯,歡迎將生活随笔推薦給好友。