日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Apache Flink 零基础入门(五)Flink开发实时处理应用程序

發布時間:2024/9/16 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Apache Flink 零基础入门(五)Flink开发实时处理应用程序 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

使用Flink + java實現需求

環境

JDK:1.8

Maven:3.6.1(最低Maven 3.0.4

使用上一節中的springboot-flink-train項目

開發步驟

第一步:創建流處理上下文環境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

第二步:讀取數據,使用socket流方式讀取數據

DataStreamSource<String> text = env.socketTextStream("192.168.152.45", 9999);

第三步:transform

text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] tokens = value.toLowerCase().split(",");for(String token: tokens) {if(token.length() > 0) {out.collect(new Tuple2<String, Integer>(token, 1));}}}}).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print();

這里我們使用逗號分隔,然后跟批處理不同的是,這里使用keyBy(0),而不是groupBy(0)。timewindow表示每隔多久執行一次。

第四步:執行

env.execute("StreamingWCJavaApp");

整體代碼如下:

/*** 使用Java API來開發Flink的實時處理應用程序* wc統計的數據源自socket*/ public class StreamingWCJava02App {public static void main(String[] args) throws Exception {// 獲取參數int port;try{ParameterTool tool = ParameterTool.fromArgs(args);port = tool.getInt("port");} catch (Exception e) {System.out.println("端口未設置, 使用默認端口9999");port = 9999;}// step1: 獲取流處理上下文環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// step2: 讀取數據DataStreamSource<String> text = env.socketTextStream("192.168.152.45", port);// step3: transformtext.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] tokens = value.toLowerCase().split(",");for(String token: tokens) {if(token.length() > 0) {out.collect(new Tuple2<String, Integer>(token, 1));}}}}).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print();env.execute("StreamingWCJavaApp");}}

運行

首先在192.168.152.45上運行命令

nc -l 9999

然后在運行main方法。在192.168.152.45的nc上輸入

abc,def,abc,ddd

在idea控制臺輸出如下:

4> (abc,2) 1> (def,1) 4> (ddd,1)

這個前面的"4>"表示并行度。我們可以設置setParallelism(1)來忽略這個問題。如下所示:

text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] tokens = value.toLowerCase().split(",");for(String token: tokens) {if(token.length() > 0) {out.collect(new Tuple2<String, Integer>(token, 1));}}}}).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);

這樣控制臺的打印結果如下:

(abc,2) (ddd,1) (def,1)

這樣一個簡單的demo就成功了!

重構代碼

上面的代碼中localhost與port需要用參數傳遞進來。

代碼如下:

// 獲取參數int port;try{ParameterTool tool = ParameterTool.fromArgs(args);port = tool.getInt("port");} catch (Exception e) {System.out.println("端口未設置, 使用默認端口9999");port = 9999;}

使用Flink提供的ParameterTool來接收參數。

我們在運行時就可以指定參數列表了,其中的key必須以“-”或者“--”開頭。

在運行時,配置參數:

這樣運行就可以從外界傳遞參數了

使用Flink + Scala實現需求

接下來使用Scala方式實現,在項目springboot-flink-train-scala中新建StreamingWCScalaApp,內容如下:

/*** 使用Scala開發Flink的實時處理應用程序*/ object StreamingWCScalaApp {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 引入隱式轉換import org.apache.flink.api.scala._val text = env.socketTextStream("192.168.152.45", 9999)text.flatMap(_.split(",")).map((_,1)).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1)env.execute("StreamingWCScalaApp");} }

這種方式比java實現更加簡潔。

與50位技術專家面對面20年技術見證,附贈技術全景圖

總結

以上是生活随笔為你收集整理的Apache Flink 零基础入门(五)Flink开发实时处理应用程序的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。