日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

五、Flink入门--客户端操作

發(fā)布時(shí)間:2024/3/26 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 五、Flink入门--客户端操作 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

客戶端操作

      • 1.客戶端操作總體概覽
        • 2. Flink命令行模式
          • 2.1 stand-alone模式
          • 2.3 yarn模式
            • 2.3.1 單任務(wù)模式
            • 2.3.2 yarn-session模式
        • 3. scala-shell模式
        • 4.sql-client模式
        • 5.Restful模式
        • 6.界面模式

1.客戶端操作總體概覽

Flink提供了豐富的客戶端操作來(lái)提交任務(wù)或與任務(wù)交互,我們從五個(gè)方面介紹Flink客戶端的具體操作,分別是Flink命令行模式,scala-shell模式, sql-client 模式,restful模式,界面模式.

2. Flink命令行模式

2.1 stand-alone模式

首先啟動(dòng)stand-alone模式的集群

bin/start-cluster.sh

提交一個(gè)example任務(wù),-d 表示detached模式,即提交了后會(huì)退出shell,如果不加-d shell會(huì)一直在運(yùn)行不退出。

bin/flink run -d examples/streaming/TopSpeedWindowing.jar

查看當(dāng)前集群所有的任務(wù)列表:

a.默認(rèn)模式下-m表示jm的地址 bin/flink list -m localhost:8081b.ha模式下用-z表示zk地址查看 bin/flink list -z localhost:2181------------------ Running/Restarting Jobs ------------------- 20.05.2019 16:04:11 : 5f1a16394dc207969a0cff904ca57726 : CarTopSpeedWindowingExample (RUNNING) --------------------------------------------------------------

停止任務(wù),并設(shè)置savepoint 路徑

bin/flink cancel -m locahost:8081 -s /tmp/savepoint 5f1a16394dc207969a0cff904ca57726Cancelling job 5f1a16394dc207969a0cff904ca57726 with savepoint to /tmp/savepoint. Cancelled job 5f1a16394dc207969a0cff904ca57726. Savepoint stored in file:/tmp/savepoint/savepoint-5f1a16-26cd0491e260.

查看savepoint:

ll /tmp/savepoint/drwxrwxr-x 2 hadoopuser hadoopuser 4096 May 20 16:20 savepoint-5f1a16-26cd0491e260

從savepoint恢復(fù)執(zhí)行任務(wù):

bin/flink run -d -s /tmp/savepoint/savepoint-5f1a16-26cd0491e260/ examples/streaming/TopSpeedWindowing.jar Job has been submitted with JobID 9e9a1bab6256175706ffaa0e0f4f6535可以在jm的日志中發(fā)現(xiàn)如下內(nèi)容,證明任務(wù)是從checkpoint啟動(dòng)的 Starting job 9e9a1bab6256175706ffaa0e0f4f6535 from savepoint /tmp/savepoint/savepoint-5f1a16-26cd0491e260/

手動(dòng)觸發(fā)savepoint:

bin/flink savepoint -m localhost:2181 9e9a1bab6256175706ffaa0e0f4f6535 /tmp/savepoint/Triggering savepoint for job 9e9a1bab6256175706ffaa0e0f4f6535. Waiting for response... Savepoint completed. Path: file:/tmp/savepoint/savepoint-9e9a1b-bbfb61c5013e You can resume your program from this savepoint with the run command.ll /tmp/savepoint/ total 8 drwxrwxr-x 2 hadoopuser hadoopuser 4096 May 20 16:20 savepoint-5f1a16-26cd0491e260 drwxrwxr-x 2 hadoopuser hadoopuser 4096 May 20 16:29 savepoint-9e9a1b-bbfb61c5013e

修改任務(wù)的并行度,修改并行度的前提是集群設(shè)置了 state.savepoints.dir 屬性,因?yàn)槊看蝝odify會(huì)觸發(fā)一次savepoint操作。

bin/flink modify -p 2 9e9a1bab6256175706ffaa0e0f4f6535

查看任務(wù)的執(zhí)行信息,可以將json粘貼到https://flink.apache.org/visualizer/ 查看執(zhí)行計(jì)劃

bin/flink info examples/streaming/TopSpeedWindowing.jar----------------------- Execution Plan ----------------------- {"nodes":[{"id":1,"type":"Source: Custom Source","pact":"Data Source","contents":"Source: Custom Source","parallelism":1},{"id":2,"type":"Timestamps/Watermarks","pact":"Operator","contents":"Timestamps/Watermarks","parallelism":1,"predecessors":[{"id":1,"ship_strategy":"FORWARD","side":"second"}]},{"id":4,"type":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","pact":"Operator","contents":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","parallelism":1,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Print to Std. Out","pact":"Data Sink","contents":"Sink: Print to Std. Out","parallelism":1,"predecessors":[{"id":4,"ship_strategy":"FORWARD","side":"second"}]}]} --------------------------------------------------------------

關(guān)閉任務(wù)stop任務(wù),需要source實(shí)現(xiàn)StopalbeFunction,可以保證優(yōu)雅的退出任務(wù),用戶調(diào)用時(shí)所有source都關(guān)閉作業(yè)才正常介紹,即可以保證作業(yè)正常處理完畢。

bin/flink stop -m localhost:2181 9e9a1bab6256175706ffaa0e0f4f6535
2.3 yarn模式
2.3.1 單任務(wù)模式

提交任務(wù): -m表示yarn-cluster模式,-yqu表示隊(duì)列,-yd表示detached模式,不加的話默認(rèn)是attach模式

bin/flink run -m yarn-cluster -yqu root.up -yd examples/streaming/TopSpeedWindowing.jar2019-05-20 17:17:00,719 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - The Flink YARN client has been started in detached mode. In order to stop Flink on YARN, use the following command or a YARN web interface to stop it: yarn application -kill application_1558187257311_0010 Please also note that the temporary files of the YARN session in the home directory will not be removed. Job has been submitted with JobID 14ca15c60c5bc641ab5774bc7c1e69eb

job name為顯示 Flink per-job Cluster。

2.3.2 yarn-session模式

啟動(dòng)yarn-session: -n 表示2個(gè)tm,-jm 表示1024mb內(nèi)存,-tm 表示1024mb內(nèi)存,-qu 表示隊(duì)列,需要主要的是此模式下tm進(jìn)程并不會(huì)提前啟動(dòng)

bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 -qu root.up2019-05-20 17:25:12,446 INFO org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED 2019-05-20 17:25:12,694 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started. Flink JobManager is now running on dn2:21667 with leader id 0e57fedd-6030-4c12-b455-638f7db4a65c. JobManager Web Interface: http://dn2:17269


jobname顯示為flink session cluster。上述方式是attach模式啟動(dòng),命令行不會(huì)自動(dòng)退出,也可以在啟動(dòng)時(shí)加上-d 表示detached模式啟動(dòng),如果ctrl+c退出attach模式或者想進(jìn)入detached模式的命令行,可以使用

bin/yarn-session.sh -id application_1558187257311_0012

重新進(jìn)入連接。
此時(shí)會(huì)在本地/tmp/.yarn-properties-{username}記錄yarn-session的appid,后續(xù)本機(jī)提交的任務(wù)都會(huì)在該app中運(yùn)行

cat /tmp/.yarn-properties-hadoopuser #Generated YARN properties file #Mon May 20 17:25:12 CST 2019 parallelism=2 dynamicPropertiesString= applicationID=application_1558187257311_0012

提交一個(gè)任務(wù),

bin/flink run examples/batch/WordCount.jar2019-05-20 17:40:25,796 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-hadoopuser. 2019-05-20 17:40:25,796 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-hadoopuser. 2019-05-20 17:40:26,113 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 2 2019-05-20 17:40:26,113 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 2 YARN properties set default parallelism to 2 2019-05-20 17:40:26,267 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-05-20 17:40:26,267 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-05-20 17:40:26,276 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN. 2019-05-20 17:40:26,347 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Found application JobManager host name 'dn2' and port '17269' from supplied application id 'application_1558187257311_0012' Starting execution of program Executing WordCount example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. (a,5) (action,1) (after,1) (against,1)

點(diǎn)擊ApplicationMaster ui地址進(jìn)去查看:

任務(wù)已經(jīng)運(yùn)行完成,并且資源也已經(jīng)釋放。
注意??:當(dāng)有多個(gè)yarn session時(shí) 可以在提交任務(wù)時(shí)指定 -yid ${appid} 參數(shù)提交到具體指定的yarn-session中。

3. scala-shell模式

啟動(dòng)scala-shell本地模式,遠(yuǎn)程模式,或yarn模式:

bin/start-scala-shell.sh localbin/start-scala-shell.sh remote dn2 17269bin/start-scala-shell.sh yarn -n 1 -jm 1024 -tm 1024 -nm scala-shell-yarn -qu root.up

對(duì)于scala-shell

  • batch模式內(nèi)置了benv變量
  • streaming模式內(nèi)置了senv變量

執(zhí)行一個(gè)batch任務(wù):dateset中print會(huì)觸發(fā)任務(wù)的執(zhí)行

scala> val text = benv.fromElements("hello word hi word thanks word hello") text: org.apache.flink.api.scala.DataSet[String] = org.apache.flink.api.scala.DataSet@6764201escala> val counts = text.flatMap{_.split("\\W+")}.map{(_,1)}.groupBy(0).sum(1) counts: org.apache.flink.api.scala.AggregateDataSet[(String, Int)] = org.apache.flink.api.scala.AggregateDataSet@76b49d0scala> counts.print() (hello,2) (hi,1) (thanks,1) (word,3)

執(zhí)行一個(gè)datastream任務(wù):在datastream中print不會(huì)觸發(fā)任務(wù)執(zhí)行,只有顯示調(diào)用senv.execute()才會(huì)觸發(fā)執(zhí)行

scala> val text = senv.fromElements("hello word hi word thanks word hello") scala scalaNothingTypeInfo scopt senv seqToCharSequence short2Short shortArrayOps shortWrapper specialized statistics sun sysscala> val text = senv.fromElements("hello word hi word thanks word hello") text: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@7e24d565scala> val counts = text.flatMap{_.split("\\W+")}.map{(_,1)}.keyBy(0).sum(1) counts: org.apache.flink.streaming.api.scala.DataStream[(String, Int)] = org.apache.flink.streaming.api.scala.DataStream@43b5274escala> counts.print() res1: org.apache.flink.streaming.api.datastream.DataStreamSink[(String, Int)] = org.apache.flink.streaming.api.datastream.DataStreamSink@79832158scala> senv.execute("Stream Word Count") (hello,1) (word,1) (hi,1) (word,2) (thanks,1) (word,3) (hello,2) res2: org.apache.flink.api.common.JobExecutionResult = org.apache.flink.api.common.JobExecutionResult@6514666f

4.sql-client模式

啟動(dòng)sql-client模式:

bin/sql-client.sh embedded

可以輸入help 查看相關(guān)幫助說(shuō)明

help;The following commands are available:QUIT Quits the SQL CLI client. CLEAR Clears the current terminal. HELP Prints the available commands. SHOW TABLES Shows all registered tables. SHOW FUNCTIONS Shows all registered user-defined functions. DESCRIBE Describes the schema of a table with the given name. EXPLAIN Describes the execution plan of a query or table with the given name. SELECT Executes a SQL SELECT query on the Flink cluster. INSERT INTO Inserts the results of a SQL SELECT query into a declared table sink. SOURCE Reads a SQL SELECT query from a file and executes it on the Flink cluster. SET Sets a session configuration property. Syntax: 'SET <key>=<value>'. Use 'SET' for listing all properties. RESET Resets all session configuration properties.

執(zhí)行一個(gè)簡(jiǎn)單的sql

select "hello word"

可以查看sql執(zhí)行計(jì)劃:

explain select 'a';== Abstract Syntax Tree == LogicalProject(EXPR$0=[_UTF-16LE'a'])LogicalValues(tuples=[[{ 0 }]])== Optimized Logical Plan == DataStreamCalc(select=[_UTF-16LE'a' AS EXPR$0])DataStreamValues(tuples=[[{ 0 }]])== Physical Execution Plan == Stage 5 : Data Sourcecontent : collect elements with CollectionInputFormatStage 6 : Operatorcontent : select: (_UTF-16LE'a' AS EXPR$0)ship_strategy : FORWARD

sql有2種模式:

  • table mode 在內(nèi)存中物化查詢結(jié)果,分頁(yè)展示
  • changed mode 不物化查詢結(jié)果,持續(xù)展示最新結(jié)果
    可以用 set execution.result-mode = table/changlog 進(jìn)行設(shè)置。

5.Restful模式

restApi可以參考官網(wǎng)文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
寫的很詳細(xì)。restAPi 目前用處最多的還是用于監(jiān)控。

6.界面模式

界面模式就是在8081端口的節(jié)目,手動(dòng)添加jar,設(shè)置參數(shù)然后執(zhí)行就OK了。

總結(jié)

以上是生活随笔為你收集整理的五、Flink入门--客户端操作的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。