04_Flink-HA高可用、Standalone集群模式、Flink-Standalone集群重要参数详解、集群节点重启及扩容、启动组件、Flink on Yarn、启动命令等
1.4.Flink集群安裝部署standalone+yarn
1.4.1.Standalone集群模式
1.4.2.Flink-Standalone集群重要參數(shù)詳解
1.4.3.集群節(jié)點(diǎn)重啟及擴(kuò)容
1.4.3.1.啟動(dòng)jobmanager
1.4.3.2.啟動(dòng)taskmanger
1.4.3.3.Flink standalone集群中job的容錯(cuò)
1.4.4.Flink on Yarn
1.4.4.1.原理介紹
1.4.4.2.FLINK on yarn集群部署
1.4.5.Flink on Yarn的兩種運(yùn)行方式
1.4.5.1.第一種[yarn-session.sh(開辟資源)+flink run(提交任務(wù))]
1.4.5.2.第二種[flink run -m yarn-cluster(開辟資源 + 提交任務(wù))]
1.4.5.3…/bin/yarn-session.sh命令分析
1.4.5.4./bin/flink run命令分析
1.4.5.5.Flink在yarn上的分布
1.4.5.6.Flink on yarn內(nèi)部實(shí)現(xiàn)
1.4.Flink集群安裝部署standalone+yarn
1.4.1.Standalone集群模式
Standalone集群架構(gòu)展示:Client客戶端提交任務(wù)給JobManager,JobManager負(fù)責(zé)Flink集群計(jì)算資源管理,并分發(fā)任務(wù)給TaskManager執(zhí)行,TaskManager定期向JobManager匯報(bào)狀態(tài)。
?依賴環(huán)境
?Jdk1.8及以上[配置JAVA_HOME環(huán)境變量]
?ssh免密登錄[集群內(nèi)節(jié)點(diǎn)之間免密登錄](méi)
?集群規(guī)劃
master(JobManager) + slave/worker(TaskManager)
hadoop4(master)
hadoop5(salve)
hadoop6(salve)
?集群安裝
A:修改conf/flink-conf.yaml
B:修改conf/masters文件內(nèi)容:
hadoop4:8081C:修改conf/workers (這里將三臺(tái)機(jī)器都作為worker節(jié)點(diǎn)使用)
hadoop4
hadoop5
hadoop6
D:拷貝到其它節(jié)點(diǎn)
E:在hadoop4節(jié)點(diǎn)啟動(dòng)
[root@hadoop4 flink-1.11.1]# pwd /home/admin/installed/flink-1.11.1 [root@hadoop4 flink-1.11.1]# bin/start-cluster.shF:訪問(wèn)集群:http://xxx.xxx.xxxx.xxx:8081/#/overview
1.4.2.Flink-Standalone集群重要參數(shù)詳解
jobmanager.memory.process.size: 1600m The total process memory size for the JobManager. taskmanager.memory.process.size: 1728m The total process memory size for the TaskManager taskmanager.numberOfTaskSlots: 20 每臺(tái)機(jī)器上可用CPU數(shù)量 parallelism.default: 1 The parallelism used for programs that did not specify and other parallelismslot和parallelism總結(jié)
1.slot是靜態(tài)的概念,是指taskmanager具有的并發(fā)執(zhí)行能力。
2.parallelism是動(dòng)態(tài)的概念,是指程序運(yùn)行時(shí)實(shí)際使用的并發(fā)能力。
3.設(shè)置合適的parallelism能提高運(yùn)算效率,太多了和太少了都不行。
1.4.3.集群節(jié)點(diǎn)重啟及擴(kuò)容
1.4.3.1.啟動(dòng)jobmanager
如果集群中的jobmanager進(jìn)程掛了,執(zhí)行下面命令啟動(dòng)。
bin/jobmanager.sh start bin/jobmanager.sh stop1.4.3.2.啟動(dòng)taskmanger
添加新的taskmanager節(jié)點(diǎn)或者重啟taskmanager節(jié)點(diǎn)
bin/taskmanager.sh start bin/taskmanager.sh stop1.4.3.3.Flink standalone集群中job的容錯(cuò)
?jobmanager掛掉
?正在執(zhí)行的任務(wù)會(huì)失敗
?存在單點(diǎn)故障,(Flink支持HA)
?taskmanager掛掉
如果有多余的taskmanger節(jié)點(diǎn),flink會(huì)自動(dòng)把任務(wù)調(diào)度到其它節(jié)點(diǎn)執(zhí)行。
1.4.4.Flink on Yarn
FLINK on yarn模式的原理是依靠YARN來(lái)調(diào)度FLINK任務(wù),目前在企業(yè)中使用較多。這種模式的好處是可以充分利用集群資源,提高集群機(jī)器的利用率,并且只需要1套Hadoop集群,就可以執(zhí)行MapReduce、Spark和FLINK任務(wù),操作非常方便,運(yùn)維方面也很輕松。
1.4.4.1.原理介紹
1)當(dāng)啟動(dòng)一個(gè)新的 Flink YARN Client 會(huì)話時(shí),客戶端首先會(huì)檢查所請(qǐng)求的資源(容器和內(nèi)存)是否可用。之后,它會(huì)上傳 Flink 配置和 JAR 文件到 HDFS。。
2)客 戶 端 請(qǐng) 求 一個(gè) YARN 容 器 啟動(dòng) ApplicationMaster 。 JobManager 和ApplicationMaster(AM)運(yùn)行在同一個(gè)容器中,一旦它們成功地啟動(dòng)了,AM 就能夠知道JobManager 的地址,它會(huì)為 TaskManager 生成一個(gè)新的 Flink 配置文件(這樣它才能連上 JobManager),該文件也同樣會(huì)被上傳到 HDFS。另外,AM 容器還提供了 Flink 的Web 界面服務(wù)。Flink 用來(lái)提供服務(wù)的端口是由用戶和應(yīng)用程序 ID 作為偏移配置的,這使得用戶能夠并行執(zhí)行多個(gè) YARN 會(huì)話。
3)之后,AM 開始為 Flink 的 TaskManager 分配容器(Container),從 HDFS 下載 JAR 文件和修改過(guò)的配置文件。一旦這些步驟完成了,Flink 就安裝完成并準(zhǔn)備接受任務(wù)了。
Flink n on n Yarn 模式在使用的時(shí)候又可以分為兩Session-Cluster和Per-Job-Cluster
Session-Cluster
這種模式是在 YARN 中提前初始化一個(gè) Flink 集群(稱為 Flinkyarn-session),開辟指定的資源,以后的 Flink 任務(wù)都提交到這里。這個(gè) Flink 集群會(huì)常駐在 YARN 集群中,除非手工停止。這種方式創(chuàng)建的 Flink 集群會(huì)獨(dú)占資源,不管有沒(méi)有 Flink 任務(wù)在執(zhí)行,YARN 上面的其他任務(wù)都無(wú)法使用這些資源。
Per-Job-Cluster
這種模式,每次提交 Flink 任務(wù)都會(huì)創(chuàng)建一個(gè)新的 Flink 集群,每個(gè) Flink 任務(wù)之間相互獨(dú)立、互不影響,管理方便。任務(wù)執(zhí)行完成之后創(chuàng)建的 Flink集群也會(huì)消失,不會(huì)額外占用資源,按需使用,這使資源利用率達(dá)到最大,在工作中推薦使用這種模式。
1.4.4.2.FLINK on yarn集群部署
?依賴環(huán)境
?本次部署hadoop3.1.0版本
?Hdfs & yarn
?Flink on Yarn的兩種使用方式
第一種: 在Yarn中初始化一個(gè)flink集群,開辟指定的資源,以后提交任務(wù)都向這里提交。這個(gè)flink集群會(huì)常駐在Yarn集群中,除非手工停止。
第二中(推薦): 每次提交都會(huì)創(chuàng)建一個(gè)新的flink集群,任務(wù)之間互相獨(dú)立,互不影響,方便管理。任務(wù)執(zhí)行完成之后創(chuàng)建的集群也會(huì)消失。
1.4.5.Flink on Yarn的兩種運(yùn)行方式
1.4.5.1.第一種[yarn-session.sh(開辟資源)+flink run(提交任務(wù))]
啟動(dòng)一個(gè)一直運(yùn)行的flink集群
./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 [-d]附著到一個(gè)已存在的flink yarn session
./bin/yarn-session.sh -id application_1463870264508_0029執(zhí)行任務(wù)
./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hadoop100:9000/LICENSE -output hdfs://hadoop100:9000/wordcount-result.txt停止任務(wù) 【web界面或者命令行執(zhí)行cancel命令】
關(guān)于命令含義:
[root@hadoop4 flink-1.11.1]# ./bin/yarn-session.sh --help Usage:必選:-n,--container <arg> 表示分配容器的數(shù)量(也就是 TaskManager 的數(shù)量)可選-at,--applicationType <arg> Set a custom application type for the application on YARN-D <property=value> use value for given property (動(dòng)態(tài)屬性)-d,--detached If present, runs the job in detached mode (獨(dú)立運(yùn)行)-h,--help Help for the Yarn session CLI.-id,--applicationId <arg> Attach to running YARN session (附著到一個(gè)已存在的flink yarn session)-j,--jar <arg> Path to Flink jar file -jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB) (JobManager容器的內(nèi)存大小)-m,--jobmanager <arg> Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.-nl,--nodeLabel <arg> Specify YARN node label for the YARN application-nm,--name <arg> Set a custom name for the application on YARN (在YARN上為一個(gè)自定義的應(yīng)用設(shè)置一個(gè)名字)-q,--query Display available YARN resources (memory, cores) 顯示yarn中可用的資源 (內(nèi)存, cpu核數(shù))-qu,--queue <arg> Specify YARN queue. 指定YARN隊(duì)列-s,--slots <arg> taskmanager分配多少個(gè)slots(處理進(jìn)程)。建議設(shè)置為每個(gè)機(jī)器的CPU核數(shù)-t,--ship <arg> Ship files in the specified directory (t for transfer) -tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB) 每個(gè)TaskManager容器的內(nèi)存 [in MB]-yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode 針對(duì)HA模式在zookeeper上創(chuàng)建NameSpace1.4.5.2.第二種[flink run -m yarn-cluster(開辟資源 + 提交任務(wù))]
啟動(dòng)集群,執(zhí)行任務(wù)
$FLINK_HOME/bin/flink run -d -m yarn-cluster \ -p 1 \ -yjm 1024m \ -ytm 1024m \ -ynm IssuePassComplete \ -c com.tianque.issue.flink.handler.IssuePassCompleteFlinkHandlerByCustomRedisSink \ /home/admin/installed/flink-jars/IssuePassCompleteFlinkHandler.jar \ --port 38092注意:client端必須要設(shè)置YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_HOME環(huán)境變量,通過(guò)這個(gè)環(huán)境變量來(lái)讀取YARN和HDFS的配置信息,否則啟動(dòng)會(huì)失敗
1.4.5.3…/bin/yarn-session.sh命令分析
用法: 必選 -n,--container <arg> 分配多少個(gè)yarn容器 (=taskmanager的數(shù)量) 可選 -D <arg> 動(dòng)態(tài)屬性 -d,--detached 獨(dú)立運(yùn)行 -jm,--jobManagerMemory <arg> JobManager的內(nèi)存 [in MB] -nm,--name 在YARN上為一個(gè)自定義的應(yīng)用設(shè)置一個(gè)名字 -q,--query 顯示yarn中可用的資源 (內(nèi)存, cpu核數(shù)) -qu,--queue <arg> 指定YARN隊(duì)列. -s,--slots <arg> 每個(gè)TaskManager使用的slots數(shù)量 -tm,--taskManagerMemory <arg> 每個(gè)TaskManager的內(nèi)存 [in MB] -z,--zookeeperNamespace <arg> 針對(duì)HA模式在zookeeper上創(chuàng)建NameSpace -id,--applicationId <yarnAppId> YARN集群上的任務(wù)id,附著到一個(gè)后臺(tái)運(yùn)行的yarn session中1.4.5.4./bin/flink run命令分析
run [OPTIONS] <jar-file> <arguments> "run" 操作參數(shù): -c,--class <classname> 如果沒(méi)有在jar包中指定入口類,則需要在這里通過(guò)這個(gè)參數(shù)指定 -m,--jobmanager <host:port> 指定需要連接的jobmanager(主節(jié)點(diǎn))地址,使用這個(gè)參數(shù)可以指定一個(gè)不同于配置文件中的jobmanager -p,--parallelism <parallelism> 指定程序的并行度。可以覆蓋配置文件中的默認(rèn)值。默認(rèn)查找當(dāng)前yarn集群中已有的yarn-session信息中的jobmanager【/tmp/.yarn-properties-root】: ./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1連接指定host和port的jobmanager: ./bin/flink run -m hadoop100:1234 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1啟動(dòng)一個(gè)新的yarn-session: ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1 注意:yarn session命令行的選項(xiàng)也可以使用./bin/flink 工具獲得。它們都有一個(gè)y或者yarn的前綴 例如:./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar再如以下運(yùn)行案例:
如果一直報(bào)akka,AskTimeoutException錯(cuò)誤,可以嘗試添加akka.ask.timeout=120000s, 依然顯示該錯(cuò)誤。
注意這里數(shù)字和時(shí)間單位之間,必須有個(gè)空格。
1.4.5.5.Flink在yarn上的分布
?Flink on Yarn
?ResourceManager
?NodeManager
?AppMaster(jobmanager和它運(yùn)行在一個(gè)Container中)
?Container(taskmanager運(yùn)行在上面)
?使用on-yarn的好處
?提高集群機(jī)器的利用率
?一套集群,可以執(zhí)行MR任務(wù),spark任務(wù),flink任務(wù)等…
1.4.5.6.Flink on yarn內(nèi)部實(shí)現(xiàn)
步驟如下:
1、上傳flink jar包和配置
2、申請(qǐng)資源和請(qǐng)求AppMaster容器
3、分配AppMaster容器
分配worker
總結(jié)
以上是生活随笔為你收集整理的04_Flink-HA高可用、Standalone集群模式、Flink-Standalone集群重要参数详解、集群节点重启及扩容、启动组件、Flink on Yarn、启动命令等的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 03_Flink本地安装、分别解压sca
- 下一篇: pytorch开发环境准备(学习资料自备