日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java reducebykey_Spark入门(五)--Spark的reduce和reduceByKey

發布時間:2025/3/15 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java reducebykey_Spark入门(五)--Spark的reduce和reduceByKey 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

reduce和reduceByKey的區別

reduce和reduceByKey是spark中使用地非常頻繁的,在字數統計中,可以看到reduceByKey的經典使用。那么reduce和reduceBykey的區別在哪呢?reduce處理數據時有著一對一的特性,而reduceByKey則有著多對一的特性。比如reduce中會把數據集合中每一個元素都處理一次,并且每一個元素都對應著一個輸出。而reduceByKey則不同,它會把所有key相同的值處理并且進行歸并,其中歸并的方法可以自己定義。

例子

在單詞統計中,我們采用的就是reduceByKey,對于每一個單詞我們設置成一個鍵值對(key,value),我們把單詞作為key,即key=word,而value=1,因為遍歷過程中,每個單詞的出現一次,則標注1。那么reduceByKey則會把key相同的進行歸并,然后根據我們定義的歸并方法即對value進行累加處理,最后得到每個單詞出現的次數。而reduce則沒有相同Key歸并的操作,而是將所有值統一歸并,一并處理。

spark的reduce

我們采用scala來求得一個數據集中所有數值的平均值。該數據集包含5000個數值,數據集以及下列的代碼均可從github下載,數據集名稱為"avg"。為求得這個數據集中的平均值,我們先用map對文本數據進行處理,將其轉換成long類型。

數據集內容:

reduce求平均值scala實現

import org.apache.spark.{SparkConf, SparkContext}

object SparkReduce {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setMaster("local").setAppName("SparkReduce")

val sc = new SparkContext(conf)

//將String轉成Long類型

val numData = sc.textFile("./avg").map(num => num.toLong)

//reduce處理每個值

println(numData.reduce((x,y)=>{

println("x:"+x)

println("y:"+y)

x+y

})/numData.count())

}

}

復制代碼

reduce求平均值Java實現

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.Function2;

public class SparkReduceJava {

public static void main(String[] main){

SparkConf conf = new SparkConf().setAppName("SparkReduceJava").setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

reduceJava(sc);

reduceJava8(sc);

}

public static void reduceJava(JavaSparkContext sc){

JavaRDDtextData = sc.textFile("./avg").map(new Function() {

@Override

public Long call(String s) throws Exception {

return Long.parseLong(s);

};

});

System.out.println(

textData.reduce(new Function2() {

@Override

public Long call(Long aLong, Long aLong2) throws Exception {

System.out.println("x:"+aLong);

System.out.println("y:"+aLong2);

return aLong+aLong2;

}

})/textData.count()

);

}

public static void reduceJava8(JavaSparkContext sc){

JavaRDDtextData = sc.textFile("./avg").map(s->Long.parseLong(s));

System.out.println(textData.reduce((x,y)->x+y)/textData.count());

}

}

復制代碼

reduce求平均值python實現

from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("SparkReduce")

sc = SparkContext(conf=conf)

numData = sc.textFile("./avg").map(lambda s:int(s))

print(numData.reduce(lambda x,y:x+y)/numData.count())

復制代碼

運行結果

觀察運行結果,我們不難發現,x存放的是累加后的值,y是當前值,x初始為0。事實上,x正是存放上次處理的結果,而y則是本次的數值。不斷做x+y就并且放回累加后的結果作為下一次x的值。這樣就可以得

到數值總和。最后將總和除以總數就能夠得到平均值。

scala或java運行結果

平均值只保留了整數

x:222783

y:48364

x:271147

y:204950

x:476097

y:261777

x:737874

y:166827

x:904701

y:154005

x:1058706

y:150029

x:1208735

y:140158

x:1348893

y:404846

x:1753739

y:542750

...

...

平均值是:334521

復制代碼

python運行結果

python默認保留了小數

334521.2714

復制代碼

spark的reduceByKey

spark的reduceByKey對要處理的值進行了差別對待,只有key相同的才能進行reduceByKey,則也就要求了進行reduceByKey時,輸入的數據必須滿足有鍵有值。由于上述的avg我們是用隨機數生成的,那么我們可以用reduceByKey完成一個其他功能,即統計隨機數中末尾是0-9各個數值出現的個數。

scala實現

import org.apache.spark.{SparkConf, SparkContext}

object SparkReduceByKey {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setMaster("local").setAppName("SparkReduce")

val sc = new SparkContext(conf)

//將String轉成Long類型

val numData = sc.textFile("./avg").map(num => (num.toLong%10,1))

numData.reduceByKey((x,y)=>x+y).foreach(println(_))

}

}

復制代碼

java實現

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public class SparkReduceByKeyJava {

public static void main(String[] main){

SparkConf conf = new SparkConf().setAppName("SparkReduceJava").setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

reduceByKeyJava(sc);

reduceByKeyJava8(sc);

}

public static void reduceByKeyJava(JavaSparkContext sc){

JavaPairRDD numData = sc.textFile("./avg").mapToPair(new PairFunction() {

@Override

public Tuple2 call(String s) throws Exception {

return new Tuple2(Integer.parseInt(s)%10,1);

}

});

System.out.println(numData.reduceByKey(new Function2() {

@Override

public Integer call(Integer integer, Integer integer2) throws Exception {

return integer+integer2;

}

}).collectAsMap());

}

public static void reduceByKeyJava8(JavaSparkContext sc){

JavaPairRDD numData = sc.textFile("./avg").mapToPair(s->new Tuple2<>(Integer.parseInt(s)%10,1));

System.out.println(numData.reduceByKey((x,y)->x+y).collectAsMap());

}

}

復制代碼

python實現

from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("SparkReduce")

sc = SparkContext(conf=conf)

print(sc.textFile("./avg").map(lambda s:(int(s)%10,1)).reduceByKey(lambda x,y:x+y).collectAsMap())

復制代碼

運行結果

scala運行結果

(4,522)

(0,462)

(1,495)

(6,519)

(3,463)

(7,544)

(9,518)

(8,533)

(5,483)

(2,461)

復制代碼

java運行結果

{8=533, 2=461, 5=483, 4=522, 7=544, 1=495, 9=518, 3=463, 6=519, 0=462}

復制代碼

python運行結果

{3: 463, 4: 522, 0: 462, 7: 544, 5: 483, 9: 518, 8: 533, 6: 519, 2: 461, 1: 495}

復制代碼

我們注意到三個程序輸出的順序不一樣,但是本質的結果都是一致的。這里體現了spark的一個優點,由于是在單機本地上,該優點表現出來的是相同輸入輸出結果順序不同。但是在集群中,該優點表現出來的是在集群中各自處理,而后返回結果。當數量足夠大的時候,這個優點就更加明顯。

對結果進行排序

那么為了能夠使得輸出結果順序一致,我們可以對數據進行排序后輸出,那么這里就涉及到了sortByKey。

scala實現

import org.apache.spark.{SparkConf, SparkContext}

object SparkReduceByKey {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setMaster("local").setAppName("SparkReduce")

val sc = new SparkContext(conf)

//將String轉成Long類型

val numData = sc.textFile("./avg").map(num => (num.toLong%10,1))

//根據key排序后輸出

numData.reduceByKey((x,y)=>x+y).sortByKey().foreach(println(_))

}

}

復制代碼

java實現

特別注意這里用的是collect,而不是collectMap,因為java中轉換成Map會打亂順序

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public class SparkReduceByKeyJava {

public static void main(String[] main){

SparkConf conf = new SparkConf().setAppName("SparkReduceJava").setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

reduceByKeyJava(sc);

reduceByKeyJava8(sc);

}

public static void reduceByKeyJava(JavaSparkContext sc){

JavaPairRDD numData = sc.textFile("./avg").mapToPair(new PairFunction() {

@Override

public Tuple2 call(String s) throws Exception {

return new Tuple2(Integer.parseInt(s)%10,1);

}

});

System.out.println(numData.reduceByKey(new Function2() {

@Override

public Integer call(Integer integer, Integer integer2) throws Exception {

return integer+integer2;

}

}).sortByKey().collect());

}

public static void reduceByKeyJava8(JavaSparkContext sc){

JavaPairRDD numData = sc.textFile("./avg").mapToPair(s->new Tuple2<>(Integer.parseInt(s)%10,1));

System.out.println(numData.reduceByKey((x,y)->x+y).sortByKey().collect());

}

}

復制代碼

python實現

from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("SparkReduce")

sc = SparkContext(conf=conf)

print(sc.textFile("./avg").map(lambda s:(int(s)%10,1)).reduceByKey(lambda x,y:x+y).sortByKey().collectAsMap())

復制代碼

得到結果,這里只給出scala輸出的結果,其他輸出的結果一致,只是表現形式不同

(0,462)

(1,495)

(2,461)

(3,463)

(4,522)

(5,483)

(6,519)

(7,544)

(8,533)

(9,518)

復制代碼

數據集以及代碼都可以在github上下載。

總結

以上是生活随笔為你收集整理的java reducebykey_Spark入门(五)--Spark的reduce和reduceByKey的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。