日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

flink dataset api使用及原理

發布時間:2025/4/5 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 flink dataset api使用及原理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

隨著大數據技術在各行各業的廣泛應用,要求能對海量數據進行實時處理的需求越來越多,同時數據處理的業務邏輯也越來越復雜,傳統的批處理方式和早期的流式處理框架也越來越難以在延遲性、吞吐量、容錯能力以及使用便捷性等方面滿足業務日益苛刻的要求。

在這種形勢下,新型流式處理框架Flink通過創造性地把現代大規模并行處理技術應用到流式處理中來,極大地改善了以前的流式處理框架所存在的問題。

?

1.概述:

flink提供DataSet Api用戶處理批量數據。flink先將接入數據轉換成DataSet數據集,并行分布在集群的每個節點上;然后將DataSet數據集進行各種轉換操作(map,filter等),最后通過DataSink操作將結果數據集輸出到外部系統。

?

2.數據接入

輸入InputFormat

/*** The base interface for data sources that produces records.* <p>* The input format handles the following:* <ul>* <li>It describes how the input is split into splits that can be processed in parallel.</li>* <li>It describes how to read records from the input split.</li>* <li>It describes how to gather basic statistics from the input.</li> * </ul>* <p>* The life cycle of an input format is the following:* <ol>* <li>After being instantiated (parameterless), it is configured with a {@link Configuration} object. * Basic fields are read from the configuration, such as a file path, if the format describes* files as input.</li>* <li>Optionally: It is called by the compiler to produce basic statistics about the input.</li>* <li>It is called to create the input splits.</li>* <li>Each parallel input task creates an instance, configures it and opens it for a specific split.</li>* <li>All records are read from the input</li>* <li>The input format is closed</li>* </ol>* <p>* IMPORTANT NOTE: Input formats must be written such that an instance can be opened again after it was closed. That* is due to the fact that the input format is used for potentially multiple splits. After a split is done, the* format's close function is invoked and, if another split is available, the open function is invoked afterwards for* the next split.* * @see InputSplit* @see BaseStatistics* * @param <OT> The type of the produced records.* @param <T> The type of input split.*/

?

3.數據轉換

DataSet:一組相同類型的元素。DataSet可以通過transformation轉換成其它的DataSet。示例如下:

DataSet#map(org.apache.flink.api.common.functions.MapFunction) DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction) DataSet#join(DataSet) DataSet#coGroup(DataSet)

其中,Function:用戶定義的業務邏輯,支持java 8 lambda表達式

?function的實現通過operator來做的,以map為例

/*** Applies a Map transformation on this DataSet.** <p>The transformation calls a {@link org.apache.flink.api.common.functions.MapFunction} for each element of the DataSet.* Each MapFunction call returns exactly one element.** @param mapper The MapFunction that is called for each element of the DataSet.* @return A MapOperator that represents the transformed DataSet.** @see org.apache.flink.api.common.functions.MapFunction* @see org.apache.flink.api.common.functions.RichMapFunction* @see MapOperator*/public <R> MapOperator<T, R> map(MapFunction<T, R> mapper) {if (mapper == null) {throw new NullPointerException("Map function must not be null.");}String callLocation = Utils.getCallLocationName();TypeInformation<R> resultType = TypeExtractor.getMapReturnTypes(mapper, getType(), callLocation, true);return new MapOperator<>(this, resultType, clean(mapper), callLocation);}

其中,Operator

?4.數據輸出

DataSink:一個用來存儲數據結果的操作。

輸出OutputFormat

?

例如,可以csv輸出

/*** Writes a {@link Tuple} DataSet as CSV file(s) to the specified location with the specified field and line delimiters.** <p><b>Note: Only a Tuple DataSet can written as a CSV file.</b>* For each Tuple field the result of {@link Object#toString()} is written.** @param filePath The path pointing to the location the CSV file is written to.* @param rowDelimiter The row delimiter to separate Tuples.* @param fieldDelimiter The field delimiter to separate Tuple fields.* @param writeMode The behavior regarding existing files. Options are NO_OVERWRITE and OVERWRITE.** @see Tuple* @see CsvOutputFormat* @see DataSet#writeAsText(String) Output files and directories*/public DataSink<T> writeAsCsv(String filePath, String rowDelimiter, String fieldDelimiter, WriteMode writeMode) {return internalWriteAsCsv(new Path(filePath), rowDelimiter, fieldDelimiter, writeMode);}@SuppressWarnings("unchecked")private <X extends Tuple> DataSink<T> internalWriteAsCsv(Path filePath, String rowDelimiter, String fieldDelimiter, WriteMode wm) {Preconditions.checkArgument(getType().isTupleType(), "The writeAsCsv() method can only be used on data sets of tuples.");CsvOutputFormat<X> of = new CsvOutputFormat<>(filePath, rowDelimiter, fieldDelimiter);if (wm != null) {of.setWriteMode(wm);}return output((OutputFormat<T>) of);}

?5.總結

  1. flink通過InputFormat對各種數據源的數據進行讀取轉換成DataSet數據集

  2. flink提供了豐富的轉換操作,DataSet可以通過transformation轉換成其它的DataSet,內部的實現是Function和Operator。

  3. flink通過OutFormat將DataSet轉換成DataSink,最終將數據寫入到不同的存儲介質。

?

參考資料:

【1】https://blog.51cto.com/13654660/2087705

轉載于:https://www.cnblogs.com/davidwang456/p/11047002.html

總結

以上是生活随笔為你收集整理的flink dataset api使用及原理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。