日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

flink批处理中的source以及sink介绍

發(fā)布時(shí)間:2025/3/14 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 flink批处理中的source以及sink介绍 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

?

一、flink在批處理中常見的source

  flink在批處理中常見的source主要有兩大類:  

    1.基于本地集合的source(Collection-based-source)   

    2.基于文件的source(File-based-source)

?

 1.基于本地集合的source

  ? ? 在flink最常見的創(chuàng)建DataSet方式有三種。   

1.使用env.fromElements(),這種方式也支持Tuple,自定義對(duì)象等復(fù)合形式。   

2.使用env.fromCollection(),這種方式支持多種Collection的具體類型   

3.使用env.generateSequence()方法創(chuàng)建基于Sequence的DataSet

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _} import scala.collection.immutable.{Queue, Stack} import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, ListBuffer}object DataSource001 {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//0.用element創(chuàng)建DataSet(fromElements)val ds0: DataSet[String] = env.fromElements("spark", "flink")ds0.print()//1.用Tuple創(chuàng)建DataSet(fromElements)val ds1: DataSet[(Int, String)] = env.fromElements((1, "spark"), (2, "flink"))ds1.print()//2.用Array創(chuàng)建DataSetval ds2: DataSet[String] = env.fromCollection(Array("spark", "flink"))ds2.print()//3.用ArrayBuffer創(chuàng)建DataSetval ds3: DataSet[String] = env.fromCollection(ArrayBuffer("spark", "flink"))ds3.print()//4.用List創(chuàng)建DataSetval ds4: DataSet[String] = env.fromCollection(List("spark", "flink"))ds4.print()//5.用List創(chuàng)建DataSetval ds5: DataSet[String] = env.fromCollection(ListBuffer("spark", "flink"))ds5.print()//6.用Vector創(chuàng)建DataSetval ds6: DataSet[String] = env.fromCollection(Vector("spark", "flink"))ds6.print()//7.用Queue創(chuàng)建DataSetval ds7: DataSet[String] = env.fromCollection(Queue("spark", "flink"))ds7.print()//8.用Stack創(chuàng)建DataSetval ds8: DataSet[String] = env.fromCollection(Stack("spark", "flink"))ds8.print()//9.用Stream創(chuàng)建DataSet(Stream相當(dāng)于lazy List,避免在中間過程中生成不必要的集合)val ds9: DataSet[String] = env.fromCollection(Stream("spark", "flink"))ds9.print()//10.用Seq創(chuàng)建DataSetval ds10: DataSet[String] = env.fromCollection(Seq("spark", "flink"))ds10.print()//11.用Set創(chuàng)建DataSetval ds11: DataSet[String] = env.fromCollection(Set("spark", "flink"))ds11.print()//12.用Iterable創(chuàng)建DataSetval ds12: DataSet[String] = env.fromCollection(Iterable("spark", "flink"))ds12.print()//13.用ArraySeq創(chuàng)建DataSetval ds13: DataSet[String] = env.fromCollection(mutable.ArraySeq("spark", "flink"))ds13.print()//14.用ArrayStack創(chuàng)建DataSetval ds14: DataSet[String] = env.fromCollection(mutable.ArrayStack("spark", "flink"))ds14.print()//15.用Map創(chuàng)建DataSetval ds15: DataSet[(Int, String)] = env.fromCollection(Map(1 -> "spark", 2 -> "flink"))ds15.print()//16.用Range創(chuàng)建DataSetval ds16: DataSet[Int] = env.fromCollection(Range(1, 9))ds16.print()//17.用fromElements創(chuàng)建DataSetval ds17: DataSet[Long] = env.generateSequence(1,9)ds17.print()} }

2.基于文件的source(File-based-source)

flink支持多種存儲(chǔ)設(shè)備上的文件,包括本地文件,hdfs文件,alluxio文件等。 flink支持多種文件的存儲(chǔ)格式,包括text文件,CSV文件等。
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment,_}object DataSource002 {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//1.讀取本地文本文件,本地文件以file://開頭val ds1: DataSet[String] = env.readTextFile("file:///Applications/flink-1.1.3/README.txt")ds1.print()//2.讀取hdfs文本文件,hdfs文件以hdfs://開頭,不指定master的短URLval ds2: DataSet[String] = env.readTextFile("hdfs:///input/flink/README.txt")ds2.print()//3.讀取hdfs CSV文件,轉(zhuǎn)化為tupleval path = "hdfs://qingcheng11:9000/input/flink/sales.csv"val ds3 = env.readCsvFile[(String, Int, Int, Double)](filePath = path,lineDelimiter = "\n",fieldDelimiter = ",",lenient = false,ignoreFirstLine = true,includedFields = Array(0, 1, 2, 3))ds3.print()//4.讀取hdfs CSV文件,轉(zhuǎn)化為case classcase class Sales(transactionId: String, customerId: Int, itemId: Int, amountPaid: Double)val ds4 = env.readCsvFile[Sales](filePath = path,lineDelimiter = "\n",fieldDelimiter = ",",lenient = false,ignoreFirstLine = true,includedFields = Array(0, 1, 2, 3),pojoFields = Array("transactionId", "customerId", "itemId", "amountPaid"))ds4.print()} }

3.基于文件的source(遍歷目錄)

flink支持對(duì)一個(gè)文件目錄內(nèi)的所有文件,包括所有子目錄中的所有文件的遍歷訪問方式。
import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.configuration.Configuration/*** 遞歸讀取hdfs目錄中的所有文件,會(huì)遍歷各級(jí)子目錄*/ object DataSource003 {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment// create a configuration objectval parameters = new Configuration// set the recursive enumeration parameterparameters.setBoolean("recursive.file.enumeration", true)// pass the configuration to the data sourceval ds1 = env.readTextFile("hdfs:///input/flink").withParameters(parameters)ds1.print()} } ? ?

轉(zhuǎn)載于:https://www.cnblogs.com/linkmust/p/10896051.html

總結(jié)

以上是生活随笔為你收集整理的flink批处理中的source以及sink介绍的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。