日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

spark 中的RDD编程:基于Java api

發布時間:2024/1/17 java 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark 中的RDD编程:基于Java api 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

?

1.RDD介紹:

? ? RDD,彈性分布式數據集,即分布式的元素集合。在spark中,對所有數據的操作不外乎是創建RDD、轉化已有的RDD以及調用RDD操作進行求值。在這一切的背后,Spark會自動將RDD中的數據分發到集群中,并將操作并行化。

? ? Spark中的RDD就是一個不可變的分布式對象集合。每個RDD都被分為多個分區,這些分區運行在集群中的不同節點上。RDD可以包含Python,Java,Scala中任意類型的對象,甚至可以包含用戶自定義的對象。

? ? 用戶可以使用兩種方法創建RDD:讀取一個外部數據集,或在驅動器程序中分發驅動器程序中的對象集合,比如list或者set。

? ? RDD的轉化操作都是惰性求值的,這意味著我們對RDD調用轉化操作,操作不會立即執行。相反,Spark會在內部記錄下所要求執行的操作的相關信息。我們不應該把RDD看做存放著特定數據的數據集,而最好把每個RDD當做我們通過轉化操作構建出來的、記錄如何計算數據的指令列表。數據讀取到RDD中的操作也是惰性的,數據只會在必要時讀取。轉化操作和讀取操作都有可能多次執行。

2.創建RDD數據集

? ? (1)讀取一個外部數據集

JavaRDD<String> lines=sc.textFile(inputFile);

? ? (2)分發對象集合,這里以list為例

List<String> list=new ArrayList<String>(); list.add("a"); list.add("b"); list.add("c"); JavaRDD<String> temp=sc.parallelize(list); //上述方式等價于 JavaRDD<String> temp2=sc.parallelize(Arrays.asList("a","b","c"));

3.RDD操作

(1)轉化操作

? ? 用java實現過濾器轉化操作:

List<String> list=new ArrayList<String>(); //建立列表,列表中包含以下自定義表項 list.add("error:a"); list.add("error:b"); list.add("error:c"); list.add("warning:d"); list.add("hadppy ending!"); //將列表轉換為RDD對象 JavaRDD<String> lines = sc.parallelize(list); //RDD對象lines中有error的表項過濾出來,放在RDD對象errorLinesJavaRDD<String> errorLines = lines.filter(new Function<String, Boolean>() {public Boolean call(String v1) throws Exception {return v1.contains("error"); }} ); //遍歷過濾出來的列表項 List<String> errorList = errorLines.collect(); for (String line : errorList)System.out.println(line);

? ? ? ?

輸出:

error:a

error:b

error:c

可見,列表list中包含詞語error的表項都被正確的過濾出來了。

(2)合并操作

將兩個RDD數據集合并為一個RDD數據集

接上述程序示例:

  • JavaRDD<String> warningLines=lines.filter(
  • new Function<String, Boolean>() {
  • public Boolean call(String v1) throws Exception {
  • return v1.contains("warning");
  • }
  • }
  • );
  • JavaRDD<String> unionLines=errorLines.union(warningLines);
  • for(String line :unionLines.collect())
  • System.out.println(line);
  • 輸出:

    error:a

    error:b

    error:c

    warning:d

    可見,將原始列表項中的所有error項和warning項都過濾出來了。

    (3)獲取RDD數據集中的部分或者全部元素

    ①獲取RDD數據集中的部分元素 ? .take(int num) ?返回值List<T> ??

    獲取RDD數據集中的前num項。

    /** * Take the first num elements of the RDD. This currently scans the partitions *one by one*, so * it will be slow if a lot of partitions are required. In that case, use collect() to get the * whole RDD instead. */ def take(num: Int): JList[T]

    程序示例:接上

    JavaRDD<String> unionLines=errorLines.union(warningLines); for(String line :unionLines.take(2))System.out.println(line);

    輸出:

    error:a

    error:b

    可見,輸出了RDD數據集unionLines的前2項

    ②獲取RDD數據集中的全部元素 .collect() 返回值 List<T>

    程序示例:

    List<String> unions=unionLines.collect(); for(String line :unions)System.out.println(line);

    遍歷輸出RDD數據集unions的每一項

    4.向spark傳遞函數

    函數名

    實現的方法

    用途

    Function<T,R>

    R call(T)

    接收一個輸入值并返回一個輸出值,用于類似map()和filter()的操作中
    Function<T1,T2,R>

    R call(T1,T2)

    接收兩個輸入值并返回一個輸出值,用于類似aggregate()和fold()等操作中

    FlatMapFunction<T,R>

    Iterable <R> call(T)

    接收一個輸入值并返回任意個輸出,用于類似flatMap()這樣的操作中

    ?①Function<T,R>

    JavaRDD<String> errorLines=lines.filter(new Function<String, Boolean>() {public Boolean call(String v1)throws Exception {return v1.contains("error"); }} );

    過濾RDD數據集中包含error的表項,新建RDD數據集errorLines

    ②FlatMapFunction<T,R>?

    List<String> strLine=new ArrayList<String>(); strLine.add("how are you"); strLine.add("I am ok"); strLine.add("do you love me") JavaRDD<String> input=sc.parallelize(strLine); JavaRDD<String> words=input.flatMap(new FlatMapFunction<String, String>() {public Iterable<String> call(String s) throws Exception {return Arrays.asList(s.split(" ")); }} );

    將文本行的單詞過濾出來,并將所有的單詞保存在RDD數據集words中。

    ?③?Function<T1,T2,R>

    List<String> strLine=new ArrayList<String>(); strLine.add("how are you"); strLine.add("I am ok"); strLine.add("do you love me"); JavaRDD<String> input=sc.parallelize(strLine); JavaRDD<String> words=input.flatMap(new FlatMapFunction<String, String>() {public Iterable<String> call(String s) throws Exception {return Arrays.asList(s.split(" ")); }} ); JavaPairRDD<String,Integer> counts=words.mapToPair(new PairFunction<String, String, Integer>() {public Tuple2<String, Integer> call(String s) throws Exception {return new Tuple2(s, 1); }} ); JavaPairRDD <String,Integer> results=counts.reduceByKey(new org.apache.spark.api.java.function.Function2<Integer, Integer, Integer>() {public Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2; }} ) ;

    上述程序是spark中的wordcount實現方式,其中的reduceByKey操作的Function2函數定義了遇到相同的key時,value是如何reduce的->直接將兩者的value相加。

    *注意:

    可以將我們的函數類定義為使用匿名內部類,就像上述程序實現的那樣,也可以創建一個具名類,就像這樣:

    class ContainError implements Function<String,Boolean>{public Boolean call(String v1) throws Exception {return v1.contains("error"); } } JavaRDD<String> errorLines=lines.filter(new ContainError()); for(String line :errorLines.collect())System.out.println(line);

    具名類也可以有參數,就像上述過濾出含有”error“的表項,我們可以自定義到底含有哪個詞語,就像這樣,程序就更有普適性了。

    ?

    5.針對每個元素的轉化操作:

    ? ? 轉化操作map()接收一個函數,把這個函數用于RDD中的每個元素,將函數的返回結果作為結果RDD中對應的元素。關鍵詞:轉化

    ? ? 轉化操作filter()接受一個函數,并將RDD中滿足該函數的元素放入新的RDD中返回。關鍵詞:過濾

    示例圖如下所示:

    ①map()

    計算RDD中各值的平方

    JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4)); JavaRDD<Integer> result=rdd.map( new Function<Integer, Integer>() { public Integer call(Integer v1) throwsException { return v1*v1; }} ); System.out.println( StringUtils.join(result.collect(),","));

    輸出:

    1,4,9,16

    filter()

    ②?去除RDD集合中值為1的元素:

    JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4)); JavaRDD<Integer> results=rdd.filter( new Function<Integer, Boolean>() { public Boolean call(Integer v1) throws Exception { return v1!=1; }} ); System.out.println(StringUtils.join(results.collect(),","));

    結果:

    2,3,4

    ③ 有時候,我們希望對每個輸入元素生成多個輸出元素。實現該功能的操作叫做flatMap()。和map()類似,我們提供給flatMap()的函數被分別應用到了輸入的RDD的每個元素上。不過返回的不是一個元素,而是一個返回值序列的迭代器。輸出的RDD倒不是由迭代器組成的。我們得到的是一個包含各個迭代器可以訪問的所有元素的RDD。flatMap()的一個簡單用途是將輸入的字符串切分成單詞,如下所示:?

    JavaRDD<String> rdd =sc.parallelize(Arrays.asList("hello world","hello you","world i love you")); JavaRDD<String> words=rdd.flatMap( new FlatMapFunction<String, String>() { public Iterable<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")); }} ); System.out.println(StringUtils.join(words.collect(),'\n'));

    輸出:

    hello

    world

    hello

    you

    world

    i

    love

    you

    6.集合操作

    ?

    RDD中的集合操作

    函數

    用途

    RDD1.distinct()

    生成一個只包含不同元素的新RDD。需要數據混洗。

    RDD1.union(RDD2)

    返回一個包含兩個RDD中所有元素的RDD

    RDD1.intersection(RDD2)

    只返回兩個RDD中都有的元素

    RDD1.substr(RDD2)

    返回一個只存在于第一個RDD而不存在于第二個RDD中的所有元素組成的RDD。需要數據混洗。

    集合操作對笛卡爾集的處理:

    ?

    RDD1.cartesian(RDD2)

    返回兩個RDD數據集的笛卡爾集

    程序示例:生成RDD集合{1,2} 和{1,2}的笛卡爾集

    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2)); JavaRDD<Integer> rdd2 = sc.parallelize(Arrays.asList(1,2)); JavaPairRDD<Integer ,Integer> rdd=rdd1.cartesian(rdd2); for(Tuple2<Integer,Integer> tuple:rdd.collect())System.out.println(tuple._1()+"->"+tuple._2());

    輸出:

    1->1

    1->2

    2->1

    2->2

    7.行動操作

    (1)reduce操作

    ? ? reduce()接收一個函數作為參數,這個函數要操作兩個RDD的元素類型的數據并返回一個同樣類型的新元素。一個簡單的例子就是函數+,可以用它來對我們的RDD進行累加。使用reduce(),可以很方便地計算出RDD中所有元素的總和,元素的個數,以及其他類型的聚合操作。

    ? ? 以下是求RDD數據集所有元素和的程序示例:

    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10)); Integer sum =rdd.reduce( new Function2<Integer, Integer, Integer>() { public Integercall(Integer v1, Integer v2) throws Exception { return v1+v2; }} ); System.out.println(sum.intValue());

    輸出:55

    (2)fold()操作

    ? ? 接收一個與reduce()接收的函數簽名相同的函數,再加上一個初始值來作為每個分區第一次調用時的結果。你所提供的初始值應當是你提供的操作的單位元素,也就是說,使用你的函數對這個初始值進行多次計算不會改變結果(例如+對應的0,*對應的1,或者拼接操作對應的空列表)。

    ? ? 程序實例:

    ①計算RDD數據集中所有元素的和:

    zeroValue=0;//求和時,初始值為0。

    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10)); Integer sum =rdd.fold(0, new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; }} ); System.out.println(sum);

    ②計算RDD數據集中所有元素的積:

    zeroValue=1;//求積時,初始值為1。

    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10)); Integer result =rdd.fold(1, new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1*v2; }} ); System.out.println(result);

    (3)aggregate()操作

    ? ? aggregate()函數返回值類型不必與所操作的RDD類型相同

    ? ? 與fold()類似,使用aggregate()時,需要提供我們期待返回的類型的初始值。然后通過一個函數把RDD中的元素合并起來放入累加器。考慮到每個節點是在本地進行累加的,最終,還需要提供第二個函數來將累加器兩兩合并。

    以下是程序實例:

    public class AvgCount implements Serializable{ public int total; public int num; public AvgCount(int total,int num){ this.total=total; this.num=num; } public double avg(){ return total/(double)num; } static Function2<AvgCount,Integer,AvgCount> addAndCount= new Function2<AvgCount, Integer, AvgCount>() { public AvgCount call(AvgCount a, Integer x) throws Exception {a.total+=x; a.num+=1; return a; } }; static Function2<AvgCount,AvgCount,AvgCount> combine= new Function2<AvgCount, AvgCount, AvgCount>() { public AvgCount call(AvgCount a, AvgCount b) throws Exception {a.total+=b.total; a.num+=b.num; return a; }}; public static void main(String args[]){SparkConf conf = new SparkConf().setMaster("local").setAppName("my app"); JavaSparkContext sc = new JavaSparkContext(conf); AvgCount intial =new AvgCount(0,0); JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6)); AvgCount result=rdd.aggregate(intial,addAndCount,combine); System.out.println(result.avg()); }}

    這個程序示例可以實現求出RDD對象集的平均數的功能。其中addAndCount將RDD對象集中的元素合并起來放入AvgCount對象之中,combine提供兩個AvgCount對象的合并的實現。我們初始化AvgCount(0,0),表示有0個對象,對象的和為0,最終返回的result對象中total中儲存了所有元素的和,num儲存了元素的個數,這樣調用result對象的函數avg()就能夠返回最終所需的平均數,即avg=tatal/(double)num。

    8.持久化緩存

    ? ? 因為Spark RDD是惰性求值的,而有時我們希望能多次使用同一個RDD。如果簡單地對RDD調用行動操作,Spark每次都會重算RDD以及它的所有依賴。這在迭代算法中消耗格外大,因為迭代算法常常會多次使用同一組數據。

    ? ? 為了避免多次計算同一個RDD,可以讓Spark對數據進行持久化。當我們讓Spark持久化存儲一個RDD時,計算出RDD的節點會分別保存它們所求出的分區數據。

    ? ? 出于不同的目的,我們可以為RDD選擇不同的持久化級別。默認情況下persist()會把數據以序列化的形式緩存在JVM的堆空間中

    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??不同關鍵字對應的存儲級別表

    級別

    使用的空間

    cpu時間

    是否在內存

    是否在磁盤

    備注

    MEMORY_ONLY

    直接儲存在內存
    MEMORY_ONLY_SER

    序列化后儲存在內存里

    MEMORY_AND_DISK

    中等

    部分

    部分

    如果數據在內存中放不下,溢寫在磁盤上

    MEMORY_AND_DISK_SER

    部分

    部分

    數據在內存中放不下,溢寫在磁盤中。內存中存放序列化的數據。

    DISK_ONLY

    直接儲存在硬盤里面

    程序示例:將RDD數據集持久化在內存中。

    JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4,5)); rdd.persist(StorageLevel.MEMORY_ONLY()); System.out.println(rdd.count()); System.out.println(StringUtils.join(rdd.collect(),','));

    RDD還有unpersist()方法,調用該方法可以手動把持久化的RDD從緩存中移除。

    9.不同的RDD類型

    ? ? Java中有兩個專門的類JavaDoubleRDD和JavaPairRDD,來處理特殊類型的RDD,這兩個類還針對這些類型提供了額外的函數,折讓你可以更加了解所發生的一切,但是也顯得有些累贅。

    ? ? 要構建這些特殊類型的RDD,需要使用特殊版本的類來替代一般使用的Function類。如果要從T類型的RDD創建出一個DoubleRDD,我們就應當在映射操作中使用DoubleFunction<T>來替代Function<T,Double>。

    程序實例:以下是一個求RDD每個對象的平方值的程序實例,將普通的RDD對象轉化為DoubleRDD對象,最后調用DoubleRDD對象的max()方法,返回生成的平方值中的最大值。

    JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4,5)); JavaDoubleRDD result=rdd.mapToDouble( new DoubleFunction<Integer>() { public double call(Integer integer) throws Exception { return (double) integer*integer; }} ); System.out.println(result.max());

    ?

    ?

    總結

    以上是生活随笔為你收集整理的spark 中的RDD编程:基于Java api的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。