Spark中的键值对操作-scala
?
1.PairRDD介紹
? ? Spark為包含鍵值對(duì)類型的RDD提供了一些專有的操作。這些RDD被稱為PairRDD。PairRDD提供了并行操作各個(gè)鍵或跨節(jié)點(diǎn)重新進(jìn)行數(shù)據(jù)分組的操作接口。例如,PairRDD提供了reduceByKey()方法,可以分別規(guī)約每個(gè)鍵對(duì)應(yīng)的數(shù)據(jù),還有join()方法,可以把兩個(gè)RDD中鍵相同的元素組合在一起,合并為一個(gè)RDD。
2.創(chuàng)建Pair RDD
? ? 程序示例:對(duì)一個(gè)英語(yǔ)單詞組成的文本行,提取其中的第一個(gè)單詞作為key,將整個(gè)句子作為value,建立 PairRDD
val rdd=sc.parallelize(List("this is a test","how are you","do you love me","can you tell me")); //獲取第一個(gè)單詞作為鍵 val words =rdd.map(x=>(x.split(" ")(0),x)); words.collect().foreach(println);輸出結(jié)果:
(this,this is a test)
(how,how are you)
(do,do you love me)
(can,can you tell me)
?
3.PairRDD的轉(zhuǎn)化操作
? ??PairRDD可以使用所有標(biāo)準(zhǔn)RDD上可用的轉(zhuǎn)化操作。傳遞函數(shù)的規(guī)則也適用于PairRDD。由于PairRDD中包含二元組,所以需要傳遞的函數(shù)應(yīng)當(dāng)操作而元素而不是獨(dú)立的元素。
? ??? ??? ??? ??? ??? ??? ??? ?PairRDD的相關(guān)轉(zhuǎn)化操作如下表所示
針對(duì)兩個(gè)PairRDD的轉(zhuǎn)化操作 rdd={(1,2),(3,4),(3,6)} other={(3,9)}
| 函數(shù)名 | 目的 | 示例 | 結(jié)果 |
| substractByKey | 刪掉RDD中鍵與other RDD 中的鍵相同的元素 | rdd.subtractByKey(other) | {(1,2)} |
| join | 對(duì)兩個(gè)RDD進(jìn)行內(nèi)連接 | rdd.join(other) | {(3,(4,9)),(3,(6,9))} |
| rightOuterJoin | 對(duì)兩個(gè)RDD進(jìn)行連接操作,右外連接 | rdd.rightOuterJoin(other) | {(3,(4,9)),(3,(6,9))} |
| leftOuterJoin | 對(duì)兩個(gè)RDD進(jìn)行連接操作,左外連接 | rdd.rightOuterJoin(other) | {(1,(2,None)),(3,(4,9)),(3,(6,9))} |
| cogroup | 將兩個(gè)RDD中擁有相同鍵的數(shù)據(jù)分組 | rdd.cogroup(other) | {1,([2],[]),(3,[4,6],[9])} |
程序?qū)嵗?#xff1a;
針對(duì)2 中程序生成的PairRDD,刪選掉長(zhǎng)度超過(guò)20個(gè)字符的行。
val results=words.filter(value => value._2.length()<20); results.foreach(println)? ??RDD上有fold(),combine(),reduce()等行動(dòng)操作,pair RDD上則有相應(yīng)的針對(duì)鍵的轉(zhuǎn)化操作。
? ? (1)reduceByKey()與reduce()操作類似,它們都接收一個(gè)函數(shù),并使用該函數(shù)對(duì)值進(jìn)行合并。reduceByKey()會(huì)為數(shù)據(jù)集中的每個(gè)鍵進(jìn)行并行的規(guī)約操作,每個(gè)規(guī)約操作會(huì)將鍵相同的值合并起來(lái)。reduceBykey()最終返回一個(gè)由各鍵規(guī)約出來(lái)的結(jié)果值組成的新的RDD。
程序示例:用reduceByKey實(shí)現(xiàn)單詞計(jì)數(shù)
val rdd=sc.parallelize(List("this is a test","how are you","do you love me","can you tell me")); val words =rdd.flatMap(line => line.split(" ")); val results=words.map(word => (word,1)).reduceByKey( {case(x,y) => x+y}); results.foreach(println)輸出:
(are,1)
(this,1)
(is,1)
(you,3)
(can,1)
(a,1)
(love,1)
(do,1)
(how,1)
(tell,1)
(me,2)
(test,1)
?
? (2)foldByKey()與fold()操作類似,他們都使用一個(gè)與RDD和合并函數(shù)中的數(shù)據(jù)類型相同的零值作為初始值。與fold()一樣,foldByKey()操作所使用的合并函數(shù)對(duì)零值與另一個(gè)元素進(jìn)行合并,結(jié)果仍為該元素。
? ? 程序示例:求對(duì)應(yīng)key的value之和
val nums = sc.parallelize(List(Tuple2(1, 1), Tuple2(1, 3), Tuple2(2, 2), Tuple2(2, 8))); val results=nums.foldByKey(0)({case(x,y)=>x+y}) results.collect().foreach(println)結(jié)果:
(1,4)
(2,10)
(3)
? ??combineByKey()是最為常用的基于鍵進(jìn)行聚合的函數(shù)。大多數(shù)基于鍵聚合的函數(shù)都是用它實(shí)現(xiàn)的。和aggregate()一樣,combineByKey()可以讓用戶返回與輸入數(shù)據(jù)類型不同的返回值。combineByKey()會(huì)遍歷分區(qū)中的所有元素,因此,每個(gè)元素的鍵要么還么有遇到過(guò),要么就和之前的某個(gè)元素的鍵相同。如果這是一個(gè)新的元素,combineByKey()會(huì)使用一個(gè)叫做createCombiner()的函數(shù)來(lái)創(chuàng)建那個(gè)鍵對(duì)應(yīng)的累加器的初始值。需要注意的是,這一過(guò)程會(huì)在每個(gè)分區(qū)中第一次出現(xiàn)每個(gè)鍵時(shí)發(fā)生,而不是在整個(gè)RDD中第一次出現(xiàn)一個(gè)鍵時(shí)發(fā)生。
? ? 如果這是一個(gè)處理當(dāng)前分區(qū)之前就已經(jīng)遇到的鍵,它會(huì)使用mergeValue()方法將該鍵的累加器對(duì)應(yīng)的當(dāng)前值與這個(gè)新的值進(jìn)行合并。
? ? 由于每個(gè)分區(qū)都是獨(dú)立處理的,因此對(duì)于同一個(gè)鍵可以有多個(gè)累加器。如果有兩個(gè)或者更多的分區(qū)都有對(duì)應(yīng)一個(gè)鍵的累加器,就需要使用用戶提供的mergeCombiners()方法將各個(gè)分區(qū)的結(jié)果進(jìn)行合并。
? ??以下程序示例使用combineBykey()求每個(gè)鍵對(duì)應(yīng)的平均值。
val nums = sc.parallelize(List(Tuple2(1, 1), Tuple2(1, 3), Tuple2(2, 2), Tuple2(2, 8))); val results=nums.combineByKey((v)=>(v,1), (acc:(Int,Int),v) =>(acc._1+v,acc._2+1), (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2) ).map{case(key,value)=>(key,value._1/value._2.toFloat)} results.collectAsMap().map(println)結(jié)果:
(2,5.0)
(1,2.0)
成功求出每個(gè)key對(duì)應(yīng)value對(duì)應(yīng)的平均值
*(4)并行度調(diào)優(yōu)
? ? 每個(gè)RDD都有固定數(shù)目的分區(qū),分區(qū)數(shù)決定了在RDD上執(zhí)行操作時(shí)的并行度。
? ? 在執(zhí)行聚合或者分組操作時(shí),可以要求Spark使用給定的分區(qū)數(shù)。Spark始終嘗試根據(jù)集群的大小推斷出一個(gè)有意義的默認(rèn)值,但是你可以通過(guò)對(duì)并行度進(jìn)行調(diào)優(yōu)來(lái)獲得更好的性能表現(xiàn)。
? ? 在Scala中,combineByKey()函數(shù)和reduceByKey()函數(shù)的最后一個(gè)可選的參數(shù)用于指定分區(qū)的數(shù)目,即numPartitions,使用如下:
val results=nums.reduceByKey({(x,y) =>x+y},2);5.數(shù)據(jù)分組
(1)groupByKey()
? ? groupByKey()會(huì)使用RDD中的鍵來(lái)對(duì)數(shù)據(jù)進(jìn)行分組。對(duì)于一個(gè)由類型K的鍵和類型V的值組成的RDD,得到的RDD類型會(huì)是[K,Iterable[v]]。
? ? 以下是程序示例,對(duì)PairRDD調(diào)用groupByKey()函數(shù)之后,返回的RDD類型是RDD[K,Iterable[v]]
val nums = sc.parallelize(List(Tuple2(1, 1), Tuple2(1, 3), Tuple2(2, 2), Tuple2(2, 8))); val group=nums.groupByKey(); val results=group.collect(); for(value <- results){print(value._1+": ")for(elem <- value._2)print(elem+" ")println()}輸出結(jié)果:
1: 1 3?
2: 2 8?
(2)cogroup()
? ? 除了對(duì)單個(gè)RDD的數(shù)據(jù)進(jìn)行分組,還可以使用cogroup()函數(shù)對(duì)對(duì)個(gè)共享同一個(gè)鍵的RDD進(jìn)行分組。對(duì)兩個(gè)鍵的類型均為K而值得類型分別為V和W的RDD進(jìn)行cogroup()時(shí),得到結(jié)果的RDD類型為[(K,(Iterable[V],Iterable[W]))]。如果其中一個(gè)RDD對(duì)于另一個(gè)RDD中存在的某個(gè)鍵沒(méi)有對(duì)應(yīng)的記錄,那么對(duì)應(yīng)的迭代器則為空。
舉例:
val nums1 = sc.parallelize(List(Tuple2(1, 1), Tuple2(2, 2), Tuple2(1, 3),Tuple2(2, 4),Tuple2(3, 4))); val nums2 = sc.parallelize(List(Tuple2(1,1),Tuple2(1,3),Tuple2(2,3))) val results=nums1.cogroup(nums2) for(tuple2 <- results.collect()){print(tuple2._1+" [ ")for(it <- tuple2._2._1)print(it+" ")print("] [ ")for(it<-tuple2._2._2)print(it+" ")println("]") }輸出:
1 [ 1 3 ] [ 1 3 ]
3 [ 4 ] [ ]
2 [ 2 4 ] [ 3 ]
6.數(shù)據(jù)排序
在Scala中以字符串順序?qū)φ龜?shù)進(jìn)行自定義排序
(1)對(duì)RDD進(jìn)行排序:
val nums =sc.parallelize(List(12,4,6,8,0,8)); //隱式轉(zhuǎn)換聲明排序的依據(jù) implicit val sortIntegersByString = new Ordering[Int] {override def compare(x: Int, y: Int): Int = x.toString().compareTo(y.toString()) } val results=nums.sortBy(value=>value); results.collect().foreach(println)(2)對(duì)PairRDD,按key的值進(jìn)行排序
val nums = sc.parallelize(List(Tuple2(1, 1), Tuple2(2, 2), Tuple2(1, 3),Tuple2(2, 4),Tuple2(3, 4))); //隱式轉(zhuǎn)換聲明排序的依據(jù) implicit val sortIntegersByString = new Ordering[Int] {override def compare(x: Int, y: Int): Int = x.toString().compareTo(y.toString()) } val results=nums.sortByKey(); results.collect().foreach(println)7.數(shù)據(jù)分區(qū)
(1)創(chuàng)建數(shù)據(jù)分區(qū)
? ? 在分布式程序中,通信的代價(jià)很大,控制數(shù)據(jù)分布以獲得最少的網(wǎng)絡(luò)傳輸可以極大地提升整體性能。Spark程序可以通過(guò)控制RDD分區(qū)的方式來(lái)減少通信消耗。只有當(dāng)數(shù)據(jù)集多次在諸如連接這種基于鍵的操作中,分區(qū)才會(huì)有作用。
? ? Spark中所有的鍵值對(duì)RDD都可以進(jìn)行分區(qū)。系統(tǒng)會(huì)根據(jù)一個(gè)針對(duì)鍵的函數(shù)對(duì)元素進(jìn)行分組。Spark可以確保同一組的鍵出現(xiàn)在一個(gè)節(jié)點(diǎn)上。
? ? 舉個(gè)簡(jiǎn)單的例子,應(yīng)用如下:內(nèi)存中保存著很大的用戶信息表,由(UserID,UserInfo[])組成的RDD,UserInfo是用戶所訂閱的所有主題列表。該應(yīng)用會(huì)周期性地將這張表和一個(gè)小文件進(jìn)行組合,這個(gè)小文件中存這過(guò)去5分鐘發(fā)生的時(shí)間,其實(shí)就是一系列(UserId,LinkInfo)RDD,其中LinkInfo是用戶訪問(wèn)的鏈接的主題。我們需要對(duì)用戶訪問(wèn)其未訂閱主題的頁(yè)面情況進(jìn)行統(tǒng)計(jì)。我們可以使用Spark的join()操作進(jìn)行組合操作。將兩者根據(jù)UserId連接之后,過(guò)濾出不在UserInfo[]中的LinkInfo,就是用戶訪問(wèn)其未訂閱主題的情況。
val list1 =List(Tuple2("zhou",List("it","math")),Tuple2("gan",List("money","book"))) val list2= List(Tuple2("zhou","it"),Tuple2("zhou","stock"),Tuple2("gan","money"),Tuple2("gan","book")) val userData =sc.parallelize(list1) val events = sc.parallelize(list2) val joined=userData.join(events) val results=joined.filter({case (id, (info, link)) =>!info.contains(link)} ).count() println(results)輸出:1
? ? 這段代碼可以正確運(yùn)行,但是效率不高。因?yàn)槊?分鐘就要進(jìn)行一次join()操作,而我們對(duì)數(shù)據(jù)集如何分區(qū)卻一無(wú)所知。默認(rèn)情況下,連接操作會(huì)將兩個(gè)數(shù)據(jù)集中的所有鍵的哈希值都求出來(lái),將該哈希值相同的記錄通過(guò)網(wǎng)絡(luò)傳到同一臺(tái)機(jī)器上,然后在那臺(tái)機(jī)器上對(duì)所有鍵相同的記錄進(jìn)行連接操作。因?yàn)閡serData表比每5分鐘出現(xiàn)的訪問(wèn)日志表events要大很多,所以要浪費(fèi)時(shí)間進(jìn)行額外的工作:在每次調(diào)用時(shí)都對(duì)userDAta表進(jìn)行哈希值計(jì)算和跨節(jié)點(diǎn)數(shù)據(jù)混洗,雖然這些數(shù)據(jù)從來(lái)不會(huì)變化。
? ? 要解決此問(wèn)題:在程序開始的時(shí)候,對(duì)userData表進(jìn)行partitionBy()轉(zhuǎn)化操作,將這張表轉(zhuǎn)化為哈希分區(qū)。可以通過(guò)向patitionBy傳遞一個(gè)spark.HashPartitioner對(duì)象來(lái)實(shí)現(xiàn)該操作。
? ? scala自定義分區(qū)方式:
val list1 =List(Tuple2("zhou",List("it","math")),Tuple2("gan",List("money","book"))) val list2= List(Tuple2("zhou","it"),Tuple2("zhou","stock"),Tuple2("gan","money"),Tuple2("gan","book")) val userData =sc.parallelize(list1).partitionBy(new HashPartitioner(100)).persist(StorageLevel.MEMORY_ONLY)? ? 這樣以后在調(diào)用join()時(shí),Spark就知道了該RDD是根據(jù)鍵的哈希值來(lái)分區(qū)的,這樣在調(diào)用join()時(shí),Spark就會(huì)利用這一點(diǎn),只會(huì)對(duì)events進(jìn)行數(shù)據(jù)混洗操作,將events中特定userId的記錄發(fā)送到userData的對(duì)應(yīng)分區(qū)所在的那臺(tái)機(jī)器上。這樣,需要網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)就大大減小了,程序運(yùn)行的速度也顯著提高。
? ? 請(qǐng)注意,我們還對(duì)userData 這個(gè)RDD進(jìn)行了持久化操作,默認(rèn)情況下,每一個(gè)由轉(zhuǎn)化操作得到的RDD都會(huì)在每次執(zhí)行啟動(dòng)操作時(shí)重新計(jì)算生成,將userData持久化之后,就能保證userData能夠在訪問(wèn)時(shí)被快速獲取。
? ? *進(jìn)一步解釋數(shù)據(jù)分區(qū)帶來(lái)的好處:
? ? 如果沒(méi)有將partitionBy()轉(zhuǎn)化操作的結(jié)果進(jìn)行持久化,那么后面每次用到這個(gè)RDD時(shí)都會(huì)重復(fù)對(duì)數(shù)據(jù)進(jìn)行分區(qū)操作。不進(jìn)行持久化會(huì)導(dǎo)致整個(gè)RDD譜系圖重新求值。那樣的話,partitionBy()帶來(lái)的好處就會(huì)抵消,導(dǎo)致重復(fù)對(duì)數(shù)據(jù)進(jìn)行分區(qū)以及跨節(jié)點(diǎn)的混洗,和沒(méi)有指定分區(qū)方式時(shí)發(fā)生的情況是十分相似的。
(2)獲取數(shù)據(jù)分區(qū)的方式
接(1)中程序:
val list1 =List(Tuple2("zhou",List("it","math")),Tuple2("gan",List("money","book"))) val list2= List(Tuple2("zhou","it"),Tuple2("zhou","stock"),Tuple2("gan","money"),Tuple2("gan","book")) val userData =sc.parallelize(list1).partitionBy(new HashPartitioner(100)).persist(StorageLevel.MEMORY_ONLY) println(userData.partitioner)? RDD的屬性partitioner就是存儲(chǔ)了對(duì)應(yīng)的分區(qū)方式
(3)從分區(qū)中獲益的操作
? ? Spark中的很多操作都引入了根據(jù)鍵跨結(jié)點(diǎn)進(jìn)行混洗的過(guò)程。所有這些操作都會(huì)從數(shù)據(jù)分區(qū)中獲益。能夠從數(shù)據(jù)分區(qū)中獲益的操作有:groupWith(),join(),leftOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),combineByKey(),以及l(fā)ockup()。
? ? 對(duì)于像reduceByKey()這樣只作用于單個(gè)RDD的操作,運(yùn)行在未分區(qū)的RDD的時(shí)候或?qū)е旅總€(gè)鍵所有對(duì)應(yīng)值都在每臺(tái)機(jī)器上進(jìn)行本地計(jì)算,只需要把本地最終歸約出的結(jié)果值從各工作節(jié)點(diǎn)傳回主節(jié)點(diǎn),所以原本的網(wǎng)絡(luò)開銷就不太大。而對(duì)于諸如cogroup()和join()這樣的二元操作,預(yù)先進(jìn)行數(shù)據(jù)分區(qū)會(huì)導(dǎo)致其中至少一個(gè)RDD(使用已知分區(qū)器的那個(gè)RDD)不發(fā)生數(shù)據(jù)混洗。如果兩個(gè)RDD使用同樣的分區(qū)方式,并且它們還緩存在同樣的機(jī)器上(比如一個(gè)RDD是通過(guò)mapValues()從另一個(gè)RDD中創(chuàng)建出來(lái)的,這兩個(gè)RDD就會(huì)擁有相同的鍵和分區(qū)方式),或者其中一個(gè)RDD還沒(méi)有計(jì)算出來(lái),那么跨節(jié)點(diǎn)數(shù)據(jù)混洗就不會(huì)發(fā)生了。
(4)影響分區(qū)方式的操作
? ??所有會(huì)為生成的結(jié)果RDD設(shè)好分區(qū)方式的操作:cogroup(),groupWith(),join(),leftOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),combineByKey(),partitionBy(),sort(),mapValues()(如果父RDD有分區(qū)方式的話),filter()(如果父RDD有分區(qū)方式的話)。其他所有操作生成的結(jié)果都不會(huì)存在特定的分區(qū)方式。
注意:? ??
? ? 對(duì)于二元操作,輸出數(shù)據(jù)的分區(qū)方式取決于父RDD的分區(qū)方式。默認(rèn)情況下,結(jié)果會(huì)采用哈希分區(qū),分區(qū)的數(shù)量和操作的并行度是一樣的。如果其中一個(gè)父RDD已經(jīng)設(shè)置過(guò)分區(qū)方式,那么結(jié)果就會(huì)采用那種分區(qū)方式;如果兩個(gè)父RDD都設(shè)置過(guò)分區(qū)方式,結(jié)果RDD會(huì)采用第一個(gè)RDD的分區(qū)方式。
8.示例程序-PageRank
? ??PageRank算法是一種從RDD分區(qū)中獲益的更復(fù)雜的算法,我們以它為例進(jìn)行分析。PageRank算法用來(lái)根據(jù)外部文檔指向一個(gè)文檔的鏈接,對(duì)集合中每個(gè)文檔的重要程度賦一個(gè)度量值。該算法可以用于對(duì)網(wǎng)頁(yè)進(jìn)行排序,當(dāng)然,也可以用于排序科技文章或社交網(wǎng)絡(luò)中有影響的用戶。
? ? 算法會(huì)維護(hù)兩個(gè)數(shù)據(jù)集,一個(gè)由(pageID,linklist[])組成,包含每個(gè)頁(yè)面的鏈接到的頁(yè)面的列表;另一個(gè)由(pageID,rank)元素組成,包含每個(gè)頁(yè)面的當(dāng)前排序值。它按以下步驟進(jìn)行計(jì)算:
? ??① 將每個(gè)頁(yè)面的排序值初始化為1.0
? ??? ??②在每次迭代中,向每個(gè)有直接鏈接的頁(yè)面,發(fā)送一個(gè)值為rank(p)/numNeighbors(p)(出鏈數(shù)目) ? 的貢獻(xiàn)量
? ? ? ? ③將每個(gè)頁(yè)面的排序值設(shè)置為0.15+0.85*contributionsReceived
?? ?????最后兩步會(huì)重復(fù)幾個(gè)循環(huán),在此過(guò)程中,算法會(huì)逐漸收斂于每個(gè)頁(yè)面的實(shí)際PageRank值。在實(shí)際操作中,收斂通常需要進(jìn)行十個(gè)迭代。
下面用Scala來(lái)實(shí)現(xiàn)PageRank算法:
/* #以下是url的內(nèi)容: www.baidu.com www.hao123.com www.baidu.com www.2345.com www.baidu.com www.zhouyang.com www.hao123.com www.baidu.com www.hao123.com www.zhouyang.com www.zhouyang.com www.baidu.com */ val inputs =sc.textFile("C:\\url.txt") //url,[urls] val links =inputs.map(x=>(x.split(" ")(0),x.split(" ")(1))).distinct().groupByKey().cache() //url,rank var ranks =links.mapValues(value =>1.0) for(i<-0 until 10){val contribs =links.join(ranks).flatMap({case(pageid,(links,rank))=>//url Double links.map(dest=>(dest,rank/links.size))})//reduce and add the contribs ranks=contribs.reduceByKey((x,y)=>x+y).mapValues(v => 0.15+0.85*v) } ranks.collect().foreach(println)結(jié)果:
(www.hao123.com,0.3685546839262259)
(www.baidu.com,0.761571325242544)
(www.2345.com,0.3685546839262259)
(www.zhouyang.com,0.5269013026650011)
9.Scala設(shè)置自定義分區(qū)方式
? ? Spark允許你通過(guò)自定義Partitioner對(duì)象來(lái)控制RDD的分區(qū)方式,這樣可以讓你利用領(lǐng)域知識(shí)進(jìn)一步減少通信消耗。
? ? 舉個(gè)例子,假設(shè)我們要在一個(gè)網(wǎng)頁(yè)的集合上運(yùn)行前一屆中的PageRank算法。在這里,每個(gè)頁(yè)面的ID是頁(yè)面的URL。當(dāng)我們使用簡(jiǎn)單的哈希函數(shù)進(jìn)行分區(qū)時(shí),擁有相似的URL的頁(yè)面比如?http://www.baidu.com/news?與?http://www.baidu.com/map?可能被分在完全不同的節(jié)點(diǎn)上。但是我們知道,同一個(gè)域名下的網(wǎng)頁(yè)更有可能相互連接。由于PageRank需要在每次迭代中從每個(gè)頁(yè)面向它所有相鄰的頁(yè)面發(fā)送一條消息,因襲把這些頁(yè)面分組在同一個(gè)分區(qū)中會(huì)更好。可以使用自定義的分區(qū)器來(lái)實(shí)現(xiàn)僅根據(jù)域名而不是整個(gè)URL進(jìn)行分區(qū)。
? ? 要實(shí)現(xiàn)先自定義Partitioner,需要繼承Partitioner類并實(shí)現(xiàn)其下述方法:
? ??override def numPartitions: Int = ???
? ? 返回創(chuàng)建的分區(qū)數(shù)量
? ? override def getPartition(key: Any): Int = ???
? ? 返回給定鍵的數(shù)量
? ??? ??override def equals(other:Any):Boolean = ???
? ? Spark需要這個(gè)方法來(lái)檢查分區(qū)器對(duì)象是否與其他分區(qū)器實(shí)例相同,這樣Spark才能判斷兩個(gè)RDD的分區(qū)方式是否相同。
?
class DomainNamePartitioner (numParts:Int) extends Partitioner{override def numPartitions: Int = numParts//根據(jù)hashCode和numPartitions取余來(lái)得到Partition,因?yàn)榉祷氐谋仨毷欠秦?fù)數(shù),所以對(duì)于hashCode為負(fù)的情況做了特殊處理 override def getPartition(key: Any): Int = {val domain = new URL(key.toString).getHost(); val code=(domain.hashCode%numPartitions)if(code<0){code+numPartitions}else{code}}override def equals(other:Any):Boolean = other match {//這個(gè)實(shí)例是DomainNamePartitioner的實(shí)例,并且numPartitions相同,返回true case dnp:DomainNamePartitioner =>dnp.numPartitions==numPartitions//否則,返回false case _ => false } }?
?
?
?
總結(jié)
以上是生活随笔為你收集整理的Spark中的键值对操作-scala的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 快速理解Spark Dataset
- 下一篇: Spark MLlib实现的广告点击预测