日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Apache Flink 零基础入门(三)编写最简单的helloWorld

發布時間:2024/9/16 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Apache Flink 零基础入门(三)编写最简单的helloWorld 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

實驗環境

JDK 1.8

IDE Intellij idea

Flink 1.8.1

實驗內容

創建一個Flink簡單Demo,可以從流數據中統計單詞個數。

實驗步驟

首先創建一個maven項目,其中pom.xml文件內容如下:

<properties><flink.version>1.8.1</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-wikiedits_2.11</artifactId><version>${flink.version}</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.1.4.RELEASE</version><configuration><mainClass>wikiedits.StreamingJob</mainClass></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><configuration><skip>true</skip></configuration></plugin></plugins></build>

創建一個包com.vincent,并且創建一個類StreamingJob.java

public class WikipediaAnalysis {public static void main(String[] args) throws Exception {} }

Flink 程序的第一步是創建一個StreamExecutionEnvironment。StreamExecutionEnvironment可以設置參數并且導入一些外部系統的數據源。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

接下來創建一個外部數據源,外部數據源使用nc -l 9000 表示服務器端開啟監聽9000端口,并可以發送數據。

DataStream<String> text = env.socketTextStream("192.168.152.45", 9000);

這樣就添加了一個流文本數據源,有了DataStream就可以獲取數據了,然后對數據進行分析:

DataStream<Tuple2<String, Integer>> dataStream = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] tokens = s.toLowerCase().split("\\W+");for (String token : tokens) {if (token.length() > 0) {collector.collect(new Tuple2<String, Integer>(token, 1));}}}}).keyBy(0).timeWindow(Time.seconds(5)).sum(1);

flatMap表示將嵌套集合轉換并平鋪成非嵌套集合,字符串是s,返回值是Collector<Tuple2<String, Integer>>。并且根據keyBy(0)即第0個字段進行統計加一操作。.timeWindow()指定窗口大小是5秒。

所以整體代碼如下:

public class StreamingJob {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = env.socketTextStream("192.168.152.45", 9000);DataStream<Tuple2<String, Integer>> dataStream = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] tokens = s.toLowerCase().split("\\W+");for (String token : tokens) {if (token.length() > 0) {collector.collect(new Tuple2<String, Integer>(token, 1));}}}}).keyBy(0).timeWindow(Time.seconds(5)).sum(1);dataStream.print();// execute programenv.execute("Java WordCount from SocketTextStream Example");} }

運行

運行main方法,然后在服務器端執行nc -l 9000 并且輸入文本:

iie4bu@swarm-manager:~$ nc -l 9000 a b d d e f

然后在intellij控制臺將輸出:

1> (b,1) 3> (a,1) 1> (f,1) 3> (d,2) 1> (e,1)

可以統計出每個單詞的次數

總結

以上是生活随笔為你收集整理的Apache Flink 零基础入门(三)编写最简单的helloWorld的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。