Flink DataStream 编程入门
流處理是 Flink 的核心,流處理的數(shù)據(jù)集用 DataStream 表示。數(shù)據(jù)流從可以從各種各樣的數(shù)據(jù)源中創(chuàng)建(消息隊(duì)列、Socket 和 文件等),經(jīng)過 DataStream 的各種 transform 操作,最終輸出文件或者標(biāo)準(zhǔn)輸出。這個(gè)過程跟之前文章中介紹的 Flink 程序基本骨架一樣。本篇介紹 DataStream 相關(guān)的入門知識(shí)。
Flink 101
為了學(xué)習(xí) Flink 的朋友能查看到每個(gè)例子的源碼,我創(chuàng)建了一個(gè) GitHub 項(xiàng)目:github.com/duma-repo/a… 這里會(huì)存放每一篇文章比較重要的示例的源碼,目前支持 Java 和 Scala,仍在不斷完善中。代碼下載后可以在本地運(yùn)行,也可以打包放在集群上運(yùn)行。同時(shí),歡迎各位將優(yōu)質(zhì)的資源提交到項(xiàng)目中。
簡(jiǎn)單示例
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector;public class WindowWordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<String, Integer>> dataStream = env.socketTextStream("localhost", 9999).flatMap(new Splitter()).keyBy(0).timeWindow(Time.seconds(5)).sum(1);dataStream.print();env.execute("Window WordCount");}public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {for (String word: sentence.split(" ")) {out.collect(new Tuple2<String, Integer>(word, 1)); //空格分割后,每個(gè)單詞轉(zhuǎn)換成 (word, 1) 二元組輸出}}}}復(fù)制代碼這個(gè)例子跟之間介紹 WordCount 的例子類似,這里詳細(xì)介紹下涉及的 API 和含義
- 數(shù)據(jù)源:socketTextStream 是從 socket 創(chuàng)建的數(shù)據(jù)流,可以使用 nc -l 9000 創(chuàng)建 socket 客戶端發(fā)送數(shù)據(jù)
- transform:flatMap 將輸入的數(shù)據(jù)按照空格分割后,扁平化處理(flat即為扁平的意思);keyBy 會(huì)按照指定的 key 進(jìn)行分組,這里就是將單詞作為 key;timeWindow 指定時(shí)間窗口,這里是 5s 處理一次;sum 是聚合函數(shù),將分組好的單詞個(gè)數(shù)求和
- 輸出:print 將處理完的數(shù)據(jù)輸出到標(biāo)準(zhǔn)輸出流中,可以在控制臺(tái)看到輸出的結(jié)果。調(diào)用 execute 方法提交 Job
Data Source
經(jīng)過以上的介紹,我們知道常見的數(shù)據(jù)源有 socket、消息隊(duì)列和文件等。對(duì)于常見的數(shù)據(jù)源 Flink 已經(jīng)定義好了讀取函數(shù),接下來一一介紹。
基于文件
- readTextFile(path):讀文本文件,默認(rèn)是文件類型是 TextInputFormat,并且返回類型是 String
- readFile(fileInputFormat, path):讀文件,需要指定輸入文件的格式
- readFile(fileInputFormat, path, watchType, interval, typeInfo):以上兩個(gè)方法內(nèi)部都會(huì)調(diào)用這個(gè)方法,參數(shù)說明:
- fileInputFormat - 輸入文件的類型
- path - 輸入文件路徑
- watchType - 取值為 FileProcessingMode.PROCESS_CONTINUOUSLY 和 FileProcessingMode.PROCESS_ONCE
- FileProcessingMode.PROCESS_CONTINUOUSLY - 當(dāng)輸入路徑下有文件被修改,整個(gè)路徑下內(nèi)容將會(huì)被重新處理
- FileProcessingMode.PROCESS_ONCE - 只掃描一次,便退出。因此這種模式下輸入數(shù)據(jù)只讀取一次
- interval - 依賴 watchType 參數(shù),對(duì)于 FileProcessingMode.PROCESS_CONTINUOUSLY 每隔固定時(shí)間(單位:毫秒)檢測(cè)路徑下是否有新數(shù)據(jù)
- typeInfo - 返回?cái)?shù)據(jù)的類型
需要注意,在底層 Flink 將讀文件的過程分為兩個(gè)子任務(wù) —— 文件監(jiān)控和數(shù)據(jù)讀取(reader)。監(jiān)控任務(wù)由 1 個(gè) task 實(shí)現(xiàn),而讀取的任務(wù)由多個(gè) task 實(shí)現(xiàn),數(shù)量與 Job 的并行度相同。監(jiān)控任務(wù)的作用是掃描輸入路徑(周期性或者只掃描一次,取決于 watchType),當(dāng)數(shù)據(jù)可以被處理時(shí),會(huì)將數(shù)據(jù)分割成多個(gè)分片,將分片分配給下游的 reader 。一個(gè)分片只會(huì)被一個(gè) reader 讀取,一個(gè) reader 可以讀取多個(gè)分片。
基于 Socket
- socketTextStream:從 socket 數(shù)據(jù)流中讀數(shù)據(jù)
基于 Collection
- fromCollection(Collection):從 Java.util.Collection 類型的數(shù)據(jù)中創(chuàng)建輸入流,collection 中的所有元素類型必須相同
- fromCollection(Iterator, Class):從 iterator (迭代器)中創(chuàng)建輸入流,Class 參數(shù)指定從 iterator 中的數(shù)據(jù)類型
- fromElements(T ...):從給定的參數(shù)中創(chuàng)建輸入流, 所有參數(shù)類型必須相同
- fromParallelCollection(SplittableIterator, Class):從 iterator 中創(chuàng)建并行的輸入流,Class 指定 iterator 中的數(shù)據(jù)類型
- generateSequence(from, to):從 from 至 to 之間的數(shù)據(jù)序列創(chuàng)建并行的數(shù)據(jù)流
自定義
- addSource:可以自定義輸入源,通過實(shí)現(xiàn) SourceFunction 接口來自定義非并行的輸入流;也可以實(shí)現(xiàn) ParallelSourceFunction 接口或集成 RichParallelSourceFunction 類來自定義并行輸入流,當(dāng)然也可以定義好的數(shù)據(jù)源,如:Kafka,addSource(new FlinkKafkaConsumer08<>(...))
DataStream 的 transform
之前已經(jīng)介紹了一些 transfrom 函數(shù),如:map、flatMap 和 filter 等。同時(shí)還有窗口函數(shù):window、timeWindow 等,聚合函數(shù):sum、reduce 等。更多的 transform 函數(shù)以及使用將會(huì)單獨(dú)寫一篇文章介紹。
Data Sink
Data Sink 便是數(shù)據(jù)的輸出。同 Data Source 類似, Flink 也內(nèi)置了一些輸出函數(shù),如下:
- writeAsText(path) / TextOutputFormat:將數(shù)據(jù)作為 String 類型輸出到指定文件
- writeAsCsv(...) / CsvOutputFormat:將 Tuple 類型輸出到 ',' 分隔的 csv 類型的文件。行和列的分隔符可以通過參數(shù)配置,默認(rèn)的為 '\n' 和 ','
- print() / printToErr():將數(shù)據(jù)打印到標(biāo)準(zhǔn)輸出流或者標(biāo)準(zhǔn)錯(cuò)誤流,可以指定打印的前綴。
- writeUsingOutputFormat() / FileOutputFormat:輸出到 OutputFormat 類型指定的文件,支持對(duì)象到字節(jié)的轉(zhuǎn)換。
- writeToSocket:根據(jù) SerializationSchema 將數(shù)據(jù)輸出到 socket
- addSink:自定義輸出函數(shù),如:自定義將數(shù)據(jù)輸出到 Kafka
小結(jié)
本篇文章主要介紹了 Flink Streaming 編程的基本骨架。詳細(xì)介紹了 Streaming 內(nèi)置的 Data Source 和 DataSink 。下篇將繼續(xù)介紹 Flink Streaming 編程涉及的基本概念。
代碼地址: github.com/duma-repo/a…
歡迎關(guān)注公眾號(hào)「渡碼」
轉(zhuǎn)載于:https://juejin.im/post/5d09814651882528fd530789
總結(jié)
以上是生活随笔為你收集整理的Flink DataStream 编程入门的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 6.17 dokcer(一)Compos
- 下一篇: 1.4版本上线(第八次会议)