intellij运行flink的wordcount实验-Java版本
注意哈,intellij運行wordcount這個并不屬于flink集群中的任何一種模式,
這個屬于java應用方式提交,不需要啟動任何flink集群.
########################項目結構####################################################
├──pom.xml
├── src
│???├── main
│???│???├── java
│???│???│???└── WordCount.java
│???│???└── resources
└──pom.xml
#############################實驗步驟###############################################
①建立上述結構的工程,在intellij中導入,每次改動pom.xml時,intellij都會自動下載依賴,但是第一次下載依賴耗時較長,需要耐心等待。
②nc -lk 9999
然后輸入
hello hello world world world(一定要按下回車鍵,然后Flink才會開始統計詞頻)
③Alt+Shift+F10選擇WordCount運行
注意②③順序不能反,否則一定概率報錯。(發生的概率大小不確定)
結果:
?
##############################附錄##############################################
WordCount.java(IP改成自己的,默認的是"Desktop")
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector;public class WordCount {public static void main(String[] args) throws Exception {//定義socket的端口號int port;try{ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch (Exception e){System.err.println("沒有指定port參數,使用默認值9000");port = 9999;}//獲取運行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//連接socket獲取輸入的數據DataStreamSource<String> text = env.socketTextStream("Desktop", port, "\n");//計算數據DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {public void flatMap(String value, Collector<WordWithCount> out) throws Exception {String[] splits = value.split("\\s");for (String word:splits) {out.collect(new WordWithCount(word,1L));}}})//打平操作,把每行的單詞轉為<word,count>類型的數據.keyBy("word")//針對相同的word數據進行分組.timeWindow(Time.seconds(2),Time.seconds(1))//指定計算數據的窗口大小和滑動窗口大小.sum("count");//把數據打印到控制臺windowCount.print().setParallelism(1);//使用一個并行度//注意:因為flink是懶加載的,所以必須調用execute方法,上面的代碼才會執行env.execute("streaming word count");}/*** 主要為了存儲單詞以及單詞出現的次數*/public static class WordWithCount{public String word;public long count;public WordWithCount(){}public WordWithCount(String word, long count) {this.word = word;this.count = count;}@Overridepublic String toString() {return "WordWithCount{" +"word='" + word + '\'' +", count=" + count +'}';}}}pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xiao</groupId><artifactId>bbb</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.10.1</version> <!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.10</artifactId><version>1.2.0</version></dependency></dependencies></project>?
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的intellij运行flink的wordcount实验-Java版本的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深入理解Java堆内存分配策略(Xmx和
- 下一篇: 菜鸟教程中Java语法(Java教程+J