五、Flink入门--客户端操作
客戶端操作
- 1.客戶端操作總體概覽
- 2. Flink命令行模式
- 2.1 stand-alone模式
- 2.3 yarn模式
- 2.3.1 單任務(wù)模式
- 2.3.2 yarn-session模式
- 3. scala-shell模式
- 4.sql-client模式
- 5.Restful模式
- 6.界面模式
1.客戶端操作總體概覽
Flink提供了豐富的客戶端操作來(lái)提交任務(wù)或與任務(wù)交互,我們從五個(gè)方面介紹Flink客戶端的具體操作,分別是Flink命令行模式,scala-shell模式, sql-client 模式,restful模式,界面模式.
2. Flink命令行模式
2.1 stand-alone模式
首先啟動(dòng)stand-alone模式的集群
bin/start-cluster.sh提交一個(gè)example任務(wù),-d 表示detached模式,即提交了后會(huì)退出shell,如果不加-d shell會(huì)一直在運(yùn)行不退出。
bin/flink run -d examples/streaming/TopSpeedWindowing.jar查看當(dāng)前集群所有的任務(wù)列表:
a.默認(rèn)模式下-m表示jm的地址 bin/flink list -m localhost:8081b.ha模式下用-z表示zk地址查看 bin/flink list -z localhost:2181------------------ Running/Restarting Jobs ------------------- 20.05.2019 16:04:11 : 5f1a16394dc207969a0cff904ca57726 : CarTopSpeedWindowingExample (RUNNING) --------------------------------------------------------------停止任務(wù),并設(shè)置savepoint 路徑
bin/flink cancel -m locahost:8081 -s /tmp/savepoint 5f1a16394dc207969a0cff904ca57726Cancelling job 5f1a16394dc207969a0cff904ca57726 with savepoint to /tmp/savepoint. Cancelled job 5f1a16394dc207969a0cff904ca57726. Savepoint stored in file:/tmp/savepoint/savepoint-5f1a16-26cd0491e260.查看savepoint:
ll /tmp/savepoint/drwxrwxr-x 2 hadoopuser hadoopuser 4096 May 20 16:20 savepoint-5f1a16-26cd0491e260從savepoint恢復(fù)執(zhí)行任務(wù):
bin/flink run -d -s /tmp/savepoint/savepoint-5f1a16-26cd0491e260/ examples/streaming/TopSpeedWindowing.jar Job has been submitted with JobID 9e9a1bab6256175706ffaa0e0f4f6535可以在jm的日志中發(fā)現(xiàn)如下內(nèi)容,證明任務(wù)是從checkpoint啟動(dòng)的 Starting job 9e9a1bab6256175706ffaa0e0f4f6535 from savepoint /tmp/savepoint/savepoint-5f1a16-26cd0491e260/手動(dòng)觸發(fā)savepoint:
bin/flink savepoint -m localhost:2181 9e9a1bab6256175706ffaa0e0f4f6535 /tmp/savepoint/Triggering savepoint for job 9e9a1bab6256175706ffaa0e0f4f6535. Waiting for response... Savepoint completed. Path: file:/tmp/savepoint/savepoint-9e9a1b-bbfb61c5013e You can resume your program from this savepoint with the run command.ll /tmp/savepoint/ total 8 drwxrwxr-x 2 hadoopuser hadoopuser 4096 May 20 16:20 savepoint-5f1a16-26cd0491e260 drwxrwxr-x 2 hadoopuser hadoopuser 4096 May 20 16:29 savepoint-9e9a1b-bbfb61c5013e修改任務(wù)的并行度,修改并行度的前提是集群設(shè)置了 state.savepoints.dir 屬性,因?yàn)槊看蝝odify會(huì)觸發(fā)一次savepoint操作。
bin/flink modify -p 2 9e9a1bab6256175706ffaa0e0f4f6535查看任務(wù)的執(zhí)行信息,可以將json粘貼到https://flink.apache.org/visualizer/ 查看執(zhí)行計(jì)劃
bin/flink info examples/streaming/TopSpeedWindowing.jar----------------------- Execution Plan ----------------------- {"nodes":[{"id":1,"type":"Source: Custom Source","pact":"Data Source","contents":"Source: Custom Source","parallelism":1},{"id":2,"type":"Timestamps/Watermarks","pact":"Operator","contents":"Timestamps/Watermarks","parallelism":1,"predecessors":[{"id":1,"ship_strategy":"FORWARD","side":"second"}]},{"id":4,"type":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","pact":"Operator","contents":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","parallelism":1,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Print to Std. Out","pact":"Data Sink","contents":"Sink: Print to Std. Out","parallelism":1,"predecessors":[{"id":4,"ship_strategy":"FORWARD","side":"second"}]}]} --------------------------------------------------------------關(guān)閉任務(wù)stop任務(wù),需要source實(shí)現(xiàn)StopalbeFunction,可以保證優(yōu)雅的退出任務(wù),用戶調(diào)用時(shí)所有source都關(guān)閉作業(yè)才正常介紹,即可以保證作業(yè)正常處理完畢。
bin/flink stop -m localhost:2181 9e9a1bab6256175706ffaa0e0f4f65352.3 yarn模式
2.3.1 單任務(wù)模式
提交任務(wù): -m表示yarn-cluster模式,-yqu表示隊(duì)列,-yd表示detached模式,不加的話默認(rèn)是attach模式
bin/flink run -m yarn-cluster -yqu root.up -yd examples/streaming/TopSpeedWindowing.jar2019-05-20 17:17:00,719 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - The Flink YARN client has been started in detached mode. In order to stop Flink on YARN, use the following command or a YARN web interface to stop it: yarn application -kill application_1558187257311_0010 Please also note that the temporary files of the YARN session in the home directory will not be removed. Job has been submitted with JobID 14ca15c60c5bc641ab5774bc7c1e69ebjob name為顯示 Flink per-job Cluster。
2.3.2 yarn-session模式
啟動(dòng)yarn-session: -n 表示2個(gè)tm,-jm 表示1024mb內(nèi)存,-tm 表示1024mb內(nèi)存,-qu 表示隊(duì)列,需要主要的是此模式下tm進(jìn)程并不會(huì)提前啟動(dòng)
bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 -qu root.up2019-05-20 17:25:12,446 INFO org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED 2019-05-20 17:25:12,694 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started. Flink JobManager is now running on dn2:21667 with leader id 0e57fedd-6030-4c12-b455-638f7db4a65c. JobManager Web Interface: http://dn2:17269
jobname顯示為flink session cluster。上述方式是attach模式啟動(dòng),命令行不會(huì)自動(dòng)退出,也可以在啟動(dòng)時(shí)加上-d 表示detached模式啟動(dòng),如果ctrl+c退出attach模式或者想進(jìn)入detached模式的命令行,可以使用
重新進(jìn)入連接。
此時(shí)會(huì)在本地/tmp/.yarn-properties-{username}記錄yarn-session的appid,后續(xù)本機(jī)提交的任務(wù)都會(huì)在該app中運(yùn)行
提交一個(gè)任務(wù),
bin/flink run examples/batch/WordCount.jar2019-05-20 17:40:25,796 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-hadoopuser. 2019-05-20 17:40:25,796 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-hadoopuser. 2019-05-20 17:40:26,113 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 2 2019-05-20 17:40:26,113 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 2 YARN properties set default parallelism to 2 2019-05-20 17:40:26,267 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-05-20 17:40:26,267 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-05-20 17:40:26,276 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN. 2019-05-20 17:40:26,347 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Found application JobManager host name 'dn2' and port '17269' from supplied application id 'application_1558187257311_0012' Starting execution of program Executing WordCount example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. (a,5) (action,1) (after,1) (against,1)點(diǎn)擊ApplicationMaster ui地址進(jìn)去查看:
任務(wù)已經(jīng)運(yùn)行完成,并且資源也已經(jīng)釋放。
注意??:當(dāng)有多個(gè)yarn session時(shí) 可以在提交任務(wù)時(shí)指定 -yid ${appid} 參數(shù)提交到具體指定的yarn-session中。
3. scala-shell模式
啟動(dòng)scala-shell本地模式,遠(yuǎn)程模式,或yarn模式:
bin/start-scala-shell.sh localbin/start-scala-shell.sh remote dn2 17269bin/start-scala-shell.sh yarn -n 1 -jm 1024 -tm 1024 -nm scala-shell-yarn -qu root.up對(duì)于scala-shell
- batch模式內(nèi)置了benv變量
- streaming模式內(nèi)置了senv變量
執(zhí)行一個(gè)batch任務(wù):dateset中print會(huì)觸發(fā)任務(wù)的執(zhí)行
scala> val text = benv.fromElements("hello word hi word thanks word hello") text: org.apache.flink.api.scala.DataSet[String] = org.apache.flink.api.scala.DataSet@6764201escala> val counts = text.flatMap{_.split("\\W+")}.map{(_,1)}.groupBy(0).sum(1) counts: org.apache.flink.api.scala.AggregateDataSet[(String, Int)] = org.apache.flink.api.scala.AggregateDataSet@76b49d0scala> counts.print() (hello,2) (hi,1) (thanks,1) (word,3)執(zhí)行一個(gè)datastream任務(wù):在datastream中print不會(huì)觸發(fā)任務(wù)執(zhí)行,只有顯示調(diào)用senv.execute()才會(huì)觸發(fā)執(zhí)行
scala> val text = senv.fromElements("hello word hi word thanks word hello") scala scalaNothingTypeInfo scopt senv seqToCharSequence short2Short shortArrayOps shortWrapper specialized statistics sun sysscala> val text = senv.fromElements("hello word hi word thanks word hello") text: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@7e24d565scala> val counts = text.flatMap{_.split("\\W+")}.map{(_,1)}.keyBy(0).sum(1) counts: org.apache.flink.streaming.api.scala.DataStream[(String, Int)] = org.apache.flink.streaming.api.scala.DataStream@43b5274escala> counts.print() res1: org.apache.flink.streaming.api.datastream.DataStreamSink[(String, Int)] = org.apache.flink.streaming.api.datastream.DataStreamSink@79832158scala> senv.execute("Stream Word Count") (hello,1) (word,1) (hi,1) (word,2) (thanks,1) (word,3) (hello,2) res2: org.apache.flink.api.common.JobExecutionResult = org.apache.flink.api.common.JobExecutionResult@6514666f4.sql-client模式
啟動(dòng)sql-client模式:
bin/sql-client.sh embedded可以輸入help 查看相關(guān)幫助說(shuō)明
help;The following commands are available:QUIT Quits the SQL CLI client. CLEAR Clears the current terminal. HELP Prints the available commands. SHOW TABLES Shows all registered tables. SHOW FUNCTIONS Shows all registered user-defined functions. DESCRIBE Describes the schema of a table with the given name. EXPLAIN Describes the execution plan of a query or table with the given name. SELECT Executes a SQL SELECT query on the Flink cluster. INSERT INTO Inserts the results of a SQL SELECT query into a declared table sink. SOURCE Reads a SQL SELECT query from a file and executes it on the Flink cluster. SET Sets a session configuration property. Syntax: 'SET <key>=<value>'. Use 'SET' for listing all properties. RESET Resets all session configuration properties.執(zhí)行一個(gè)簡(jiǎn)單的sql
select "hello word"可以查看sql執(zhí)行計(jì)劃:
explain select 'a';== Abstract Syntax Tree == LogicalProject(EXPR$0=[_UTF-16LE'a'])LogicalValues(tuples=[[{ 0 }]])== Optimized Logical Plan == DataStreamCalc(select=[_UTF-16LE'a' AS EXPR$0])DataStreamValues(tuples=[[{ 0 }]])== Physical Execution Plan == Stage 5 : Data Sourcecontent : collect elements with CollectionInputFormatStage 6 : Operatorcontent : select: (_UTF-16LE'a' AS EXPR$0)ship_strategy : FORWARDsql有2種模式:
- table mode 在內(nèi)存中物化查詢結(jié)果,分頁(yè)展示
- changed mode 不物化查詢結(jié)果,持續(xù)展示最新結(jié)果
可以用 set execution.result-mode = table/changlog 進(jìn)行設(shè)置。
5.Restful模式
restApi可以參考官網(wǎng)文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
寫的很詳細(xì)。restAPi 目前用處最多的還是用于監(jiān)控。
6.界面模式
界面模式就是在8081端口的節(jié)目,手動(dòng)添加jar,設(shè)置參數(shù)然后執(zhí)行就OK了。
總結(jié)
以上是生活随笔為你收集整理的五、Flink入门--客户端操作的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 列举最新的关键词算法的优点缺点
- 下一篇: 电信linux笔试题,中国电信笔试题