日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

五、Flink入门--客户端操作

發布時間:2024/3/26 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 五、Flink入门--客户端操作 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

客戶端操作

      • 1.客戶端操作總體概覽
        • 2. Flink命令行模式
          • 2.1 stand-alone模式
          • 2.3 yarn模式
            • 2.3.1 單任務模式
            • 2.3.2 yarn-session模式
        • 3. scala-shell模式
        • 4.sql-client模式
        • 5.Restful模式
        • 6.界面模式

1.客戶端操作總體概覽

Flink提供了豐富的客戶端操作來提交任務或與任務交互,我們從五個方面介紹Flink客戶端的具體操作,分別是Flink命令行模式,scala-shell模式, sql-client 模式,restful模式,界面模式.

2. Flink命令行模式

2.1 stand-alone模式

首先啟動stand-alone模式的集群

bin/start-cluster.sh

提交一個example任務,-d 表示detached模式,即提交了后會退出shell,如果不加-d shell會一直在運行不退出。

bin/flink run -d examples/streaming/TopSpeedWindowing.jar

查看當前集群所有的任務列表:

a.默認模式下-m表示jm的地址 bin/flink list -m localhost:8081b.ha模式下用-z表示zk地址查看 bin/flink list -z localhost:2181------------------ Running/Restarting Jobs ------------------- 20.05.2019 16:04:11 : 5f1a16394dc207969a0cff904ca57726 : CarTopSpeedWindowingExample (RUNNING) --------------------------------------------------------------

停止任務,并設置savepoint 路徑

bin/flink cancel -m locahost:8081 -s /tmp/savepoint 5f1a16394dc207969a0cff904ca57726Cancelling job 5f1a16394dc207969a0cff904ca57726 with savepoint to /tmp/savepoint. Cancelled job 5f1a16394dc207969a0cff904ca57726. Savepoint stored in file:/tmp/savepoint/savepoint-5f1a16-26cd0491e260.

查看savepoint:

ll /tmp/savepoint/drwxrwxr-x 2 hadoopuser hadoopuser 4096 May 20 16:20 savepoint-5f1a16-26cd0491e260

從savepoint恢復執行任務:

bin/flink run -d -s /tmp/savepoint/savepoint-5f1a16-26cd0491e260/ examples/streaming/TopSpeedWindowing.jar Job has been submitted with JobID 9e9a1bab6256175706ffaa0e0f4f6535可以在jm的日志中發現如下內容,證明任務是從checkpoint啟動的 Starting job 9e9a1bab6256175706ffaa0e0f4f6535 from savepoint /tmp/savepoint/savepoint-5f1a16-26cd0491e260/

手動觸發savepoint:

bin/flink savepoint -m localhost:2181 9e9a1bab6256175706ffaa0e0f4f6535 /tmp/savepoint/Triggering savepoint for job 9e9a1bab6256175706ffaa0e0f4f6535. Waiting for response... Savepoint completed. Path: file:/tmp/savepoint/savepoint-9e9a1b-bbfb61c5013e You can resume your program from this savepoint with the run command.ll /tmp/savepoint/ total 8 drwxrwxr-x 2 hadoopuser hadoopuser 4096 May 20 16:20 savepoint-5f1a16-26cd0491e260 drwxrwxr-x 2 hadoopuser hadoopuser 4096 May 20 16:29 savepoint-9e9a1b-bbfb61c5013e

修改任務的并行度,修改并行度的前提是集群設置了 state.savepoints.dir 屬性,因為每次modify會觸發一次savepoint操作。

bin/flink modify -p 2 9e9a1bab6256175706ffaa0e0f4f6535

查看任務的執行信息,可以將json粘貼到https://flink.apache.org/visualizer/ 查看執行計劃

bin/flink info examples/streaming/TopSpeedWindowing.jar----------------------- Execution Plan ----------------------- {"nodes":[{"id":1,"type":"Source: Custom Source","pact":"Data Source","contents":"Source: Custom Source","parallelism":1},{"id":2,"type":"Timestamps/Watermarks","pact":"Operator","contents":"Timestamps/Watermarks","parallelism":1,"predecessors":[{"id":1,"ship_strategy":"FORWARD","side":"second"}]},{"id":4,"type":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","pact":"Operator","contents":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","parallelism":1,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Print to Std. Out","pact":"Data Sink","contents":"Sink: Print to Std. Out","parallelism":1,"predecessors":[{"id":4,"ship_strategy":"FORWARD","side":"second"}]}]} --------------------------------------------------------------

關閉任務stop任務,需要source實現StopalbeFunction,可以保證優雅的退出任務,用戶調用時所有source都關閉作業才正常介紹,即可以保證作業正常處理完畢。

bin/flink stop -m localhost:2181 9e9a1bab6256175706ffaa0e0f4f6535
2.3 yarn模式
2.3.1 單任務模式

提交任務: -m表示yarn-cluster模式,-yqu表示隊列,-yd表示detached模式,不加的話默認是attach模式

bin/flink run -m yarn-cluster -yqu root.up -yd examples/streaming/TopSpeedWindowing.jar2019-05-20 17:17:00,719 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - The Flink YARN client has been started in detached mode. In order to stop Flink on YARN, use the following command or a YARN web interface to stop it: yarn application -kill application_1558187257311_0010 Please also note that the temporary files of the YARN session in the home directory will not be removed. Job has been submitted with JobID 14ca15c60c5bc641ab5774bc7c1e69eb

job name為顯示 Flink per-job Cluster。

2.3.2 yarn-session模式

啟動yarn-session: -n 表示2個tm,-jm 表示1024mb內存,-tm 表示1024mb內存,-qu 表示隊列,需要主要的是此模式下tm進程并不會提前啟動

bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 -qu root.up2019-05-20 17:25:12,446 INFO org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED 2019-05-20 17:25:12,694 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started. Flink JobManager is now running on dn2:21667 with leader id 0e57fedd-6030-4c12-b455-638f7db4a65c. JobManager Web Interface: http://dn2:17269


jobname顯示為flink session cluster。上述方式是attach模式啟動,命令行不會自動退出,也可以在啟動時加上-d 表示detached模式啟動,如果ctrl+c退出attach模式或者想進入detached模式的命令行,可以使用

bin/yarn-session.sh -id application_1558187257311_0012

重新進入連接。
此時會在本地/tmp/.yarn-properties-{username}記錄yarn-session的appid,后續本機提交的任務都會在該app中運行

cat /tmp/.yarn-properties-hadoopuser #Generated YARN properties file #Mon May 20 17:25:12 CST 2019 parallelism=2 dynamicPropertiesString= applicationID=application_1558187257311_0012

提交一個任務,

bin/flink run examples/batch/WordCount.jar2019-05-20 17:40:25,796 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-hadoopuser. 2019-05-20 17:40:25,796 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-hadoopuser. 2019-05-20 17:40:26,113 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 2 2019-05-20 17:40:26,113 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 2 YARN properties set default parallelism to 2 2019-05-20 17:40:26,267 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-05-20 17:40:26,267 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-05-20 17:40:26,276 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN. 2019-05-20 17:40:26,347 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Found application JobManager host name 'dn2' and port '17269' from supplied application id 'application_1558187257311_0012' Starting execution of program Executing WordCount example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. (a,5) (action,1) (after,1) (against,1)

點擊ApplicationMaster ui地址進去查看:

任務已經運行完成,并且資源也已經釋放。
注意??:當有多個yarn session時 可以在提交任務時指定 -yid ${appid} 參數提交到具體指定的yarn-session中。

3. scala-shell模式

啟動scala-shell本地模式,遠程模式,或yarn模式:

bin/start-scala-shell.sh localbin/start-scala-shell.sh remote dn2 17269bin/start-scala-shell.sh yarn -n 1 -jm 1024 -tm 1024 -nm scala-shell-yarn -qu root.up

對于scala-shell

  • batch模式內置了benv變量
  • streaming模式內置了senv變量

執行一個batch任務:dateset中print會觸發任務的執行

scala> val text = benv.fromElements("hello word hi word thanks word hello") text: org.apache.flink.api.scala.DataSet[String] = org.apache.flink.api.scala.DataSet@6764201escala> val counts = text.flatMap{_.split("\\W+")}.map{(_,1)}.groupBy(0).sum(1) counts: org.apache.flink.api.scala.AggregateDataSet[(String, Int)] = org.apache.flink.api.scala.AggregateDataSet@76b49d0scala> counts.print() (hello,2) (hi,1) (thanks,1) (word,3)

執行一個datastream任務:在datastream中print不會觸發任務執行,只有顯示調用senv.execute()才會觸發執行

scala> val text = senv.fromElements("hello word hi word thanks word hello") scala scalaNothingTypeInfo scopt senv seqToCharSequence short2Short shortArrayOps shortWrapper specialized statistics sun sysscala> val text = senv.fromElements("hello word hi word thanks word hello") text: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@7e24d565scala> val counts = text.flatMap{_.split("\\W+")}.map{(_,1)}.keyBy(0).sum(1) counts: org.apache.flink.streaming.api.scala.DataStream[(String, Int)] = org.apache.flink.streaming.api.scala.DataStream@43b5274escala> counts.print() res1: org.apache.flink.streaming.api.datastream.DataStreamSink[(String, Int)] = org.apache.flink.streaming.api.datastream.DataStreamSink@79832158scala> senv.execute("Stream Word Count") (hello,1) (word,1) (hi,1) (word,2) (thanks,1) (word,3) (hello,2) res2: org.apache.flink.api.common.JobExecutionResult = org.apache.flink.api.common.JobExecutionResult@6514666f

4.sql-client模式

啟動sql-client模式:

bin/sql-client.sh embedded

可以輸入help 查看相關幫助說明

help;The following commands are available:QUIT Quits the SQL CLI client. CLEAR Clears the current terminal. HELP Prints the available commands. SHOW TABLES Shows all registered tables. SHOW FUNCTIONS Shows all registered user-defined functions. DESCRIBE Describes the schema of a table with the given name. EXPLAIN Describes the execution plan of a query or table with the given name. SELECT Executes a SQL SELECT query on the Flink cluster. INSERT INTO Inserts the results of a SQL SELECT query into a declared table sink. SOURCE Reads a SQL SELECT query from a file and executes it on the Flink cluster. SET Sets a session configuration property. Syntax: 'SET <key>=<value>'. Use 'SET' for listing all properties. RESET Resets all session configuration properties.

執行一個簡單的sql

select "hello word"

可以查看sql執行計劃:

explain select 'a';== Abstract Syntax Tree == LogicalProject(EXPR$0=[_UTF-16LE'a'])LogicalValues(tuples=[[{ 0 }]])== Optimized Logical Plan == DataStreamCalc(select=[_UTF-16LE'a' AS EXPR$0])DataStreamValues(tuples=[[{ 0 }]])== Physical Execution Plan == Stage 5 : Data Sourcecontent : collect elements with CollectionInputFormatStage 6 : Operatorcontent : select: (_UTF-16LE'a' AS EXPR$0)ship_strategy : FORWARD

sql有2種模式:

  • table mode 在內存中物化查詢結果,分頁展示
  • changed mode 不物化查詢結果,持續展示最新結果
    可以用 set execution.result-mode = table/changlog 進行設置。

5.Restful模式

restApi可以參考官網文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/rest_api.html
寫的很詳細。restAPi 目前用處最多的還是用于監控。

6.界面模式

界面模式就是在8081端口的節目,手動添加jar,設置參數然后執行就OK了。

總結

以上是生活随笔為你收集整理的五、Flink入门--客户端操作的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。