日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spork: Pig on Spark实现分析

發布時間:2023/12/18 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spork: Pig on Spark实现分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

介紹

Spork是Pig on Spark的highly experimental版本號,依賴的版本號也比較久,如之前文章里所說。眼下我把Spork維護在自己的github上:flare-spork。

本文分析的是Spork的實現方式和詳細內容。

Spark Launcher

在hadoop executionengine包路徑下,寫了一個Spark啟動器,同MapReduceLauncher類似,會在launchPig的時候,把傳入的物理運行計劃進行翻譯。 MR啟動器翻譯的是MR的操作,以及進一步的MR JobControl。而Spark啟動器將物理運行計劃的部分物理操作直接翻譯成了RDD的操作。 有一個缺點是翻譯成RDD算子之后,缺少優化過程,也就是直接物理操作的映射翻譯。詳細運行邏輯會全然交給Spark DAGScheduler去切分,由TaskScheduler去調度任務。 比方對Pig來說。直到見到Dump/Store,才會觸發整個翻譯和launch。

那么在這一次物理運行計劃中,相應到Spark可能是多次任務。


在眼下的實現方式下,翻譯物理操作交給多個Convertor的實現類來完畢, public interface POConverter<IN, OUT, T extends PhysicalOperator> {RDD<OUT> convert(List<RDD<IN>> rdd, T physicalOperator) throws IOException;} 抽象類POConvertor提供了convert方法,輸入參數中的List<RDD>是本次物理操作的前驅們產生的RDDs,能夠覺得是會依賴的父RDDs。

這樣一次轉化結果就是產生nextRDD。而nextRDD是否在spark上真正觸發計算,眼下來看是不去控制的。也就是上面提到的,一次Pig物理運行計劃可能會有Spark運行多次任務。


在使用的時候。以-x spark的方式就能夠啟動以Spark為backend engine的Pig環境。
以下詳細看眼下做了哪些PO操作的轉化工作。詳細怎么轉化的。

Load/Store

走的都是NewHadoopRDD路線。

?

Load方面是通過POLoad獲得文件路徑,pigContext獲得必要配置信息,然后交由SparkContext調用newAPIHadoopFile來獲得NewHadoopRDD,最后把Tuple2<Text, Tuple>的RDD map成僅僅剩value的RDD<Tuple>。

?

Store方面是先把近期的前驅rdd轉會成Key為空Text的Tuple2<Text, Tuple>。然后映射為PairRDDFunctions。借助pigContext生成POStore操作,最后調用RDD的saveAsNewAPIHadoopFile存到HDFS上。


Foreach、Filter、Limit

ForEach里實現一個Iterator[T] => Iterator[T]的方法,把foreach轉化為rdd.mapPartitions()方法。

Iterator[T]=> Iterator[T]方法的實現。會依賴原本的POForEach來獲得nextTuple和進行一些別的操作,來實現一個新的Iterator。

?

對于hadoop backend的executionengine里的抽象類PhysicalOperator來說。

setInput()和attachInput()方法是放入帶處理的tuple數據。

getNextTuple()的時候觸發processTuple()。處理對象就是內部的Input Tuple。

?

所以ForEach操作實現Iterator的時候。在readNext()方法里摻入了以上設置Input數據的操作,在返回前調用getNextTuple()返回處理后的結果。

?

POFilter也是通過setInput()和attachInput()以及getNextTuple()來返回處理結果。

所以在實現為RDD操作的時候。把以上步驟包裝成一個FilterFunction,傳入rdd.filter(Function)處理。

?

POLimit同POFilter是全然一樣的。


Distinct

如今RDD已經直接具備distinct(numPartitions: Int)方法了。


這里的distinct實現同rdd里的distinct邏輯是全然一樣的。

第一步:把類型為Tuple的rdd映射成為Tuple2<Tuple, Object>。當中value部分是null的;

第二步:進行rdd.reduceByKey(merge_function, parallelism)操作,merge_function對兩個value部分的Object不做不論什么處理。也就是按key reduce且不正確value部分處理;

第三步:對第二步的結果進行rdd.map(function, ClassTag)處理,function為得到Tuple2<Tuple, Object>里的._1,即key值:Tuple。



Union

Union是一次求并過程,直接new UnionRDD<Tuple>返回。

因為UnionRDD處理的是Seq<RDD>。所以使用JavaConversions.asScalaBuffer(List<RDD<Tuple>>)進行一下轉換再傳入。



Sort

Sort過程:

第一步:把Tuple類型的RDD轉成Tuple2<Tuple, Object>類型。Object為空

第二步:依據第一步結果。new OrderedRDDFunctions<Tuple, Object,Tuple2<Tuple, Object>>

,其sortByKey方法產出一個排過序的RDD<Tuple2<Tuple, Object>>。OrderedRDDFunctions里的Key類型必須是可排序的,比較器復用的是POSort的mComparator。sortByKey結果返回的是ShuffleRDD。其Partitioner是RangePartitioner,排序之后,每一個Partition里存放的都是一個范圍內的排過序的值。

第三步:調用rdd.mapPartition(function, xx, xx),function作用為把Iterator<Tuple2<Tuple,Object>>吐成Iterator<Tuple>。即再次取回Key值,此時已有序。


Split

POSplit的處理是直接返回第一個祖先RDD。


LocalRearrange

LocalRearrange -> Global Rearrange -> Package是一同出現的。



Local rearrange直接依賴

physicalOperator.setInputs(null);physicalOperator.attachInput(t);result = physicalOperator.getNextTuple();

三步得到result。返回的Tuple格式為(index, key, value)。

依賴POLocalRearrange本身內部對input tuple的處理。


GlobalRearrange

待處理的Tuple格式是(index, key, value)。最后結果為(key, { values })

?

假設父RDD僅僅有一個:

先進行按key進行一次groupBy。得到結果是Tuple2<Object, Seq<Tuple>>

然后做一次map操作,得到(key, { values })形態的RDD,即Tuple<Object, Iterator>

?

假設父RDD有多個:

讓通過rdd的map操作先將Tuple從(index, key, value)轉成(key, value)形態,然后把這個rdd集合new成CoGroupRDD,包括一次(Seq) JavaConversions.asScalaBuffer(rddPairs)轉化。最后調用CoGroupRDD的map方法,把Tuple2<Object,Seq<Seq<Tuple>>>轉化成Tuple<Object, Iterator>,即(key, { values })形態。實際上。CoGroupRDD的map方法內部做的事情。是針對每一個Key里的Iterator集合,進行了Iterator之間的合并操作。

?

Package

Package須要把global rearrange處理后的key, Seq<Tuple>進行group。

詳細的待處理Tuple結構是這種:(key, Seq<Tuple>:{(index,key, value without key)})

tuple.get(0)是keyTuple,tuple.get(1)是Iterator<Tuple>。最后返回(key, {values})。即Tuple<Object, Iterator>

?


全文完 :)

轉載于:https://www.cnblogs.com/llguanli/p/6999339.html

總結

以上是生活随笔為你收集整理的Spork: Pig on Spark实现分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。