此文已由作者葉林寶授權網易云社區(qū)發(fā)布。
歡迎訪問網易云社區(qū),了解更多網易技術產品運營經驗。
方案四:Sort on Cell Values
簡述:
上述方案三, 當數(shù)據(jù)行數(shù)較多, 情況下, 在二次排序還是可能出現(xiàn)oom情況, 而且, 不同的field_index的數(shù)據(jù)可能shuffle到同一個分區(qū),這樣就加大了oom的概率。當field_index本身取值較多 情況下, 增加分區(qū)數(shù)是其中一種解決方法。但是field_index取值本身就少于分區(qū)數(shù)的情況下, 增加分區(qū)數(shù)對緩解oom就沒任何作用了。 如果 當field_value相比field_index較為分散, 且值較多的情況下, 不妨換個思維, 按field_value分區(qū)。 具體算法如下:
算法:
(1)將df 轉換為(field_value, field_index)
(2)對分區(qū)內的數(shù)據(jù), 用sortByKey根據(jù) field_value排序 (rangPartition排序)
(3)利用mapPartitions確定每個分區(qū)內的每個field_index共有多少數(shù)據(jù)(不同分區(qū)中的filed_value相對有序, 例如partiiton1 中的filed_value比partition2中的field_value小)
(4)利用第(3)步數(shù)據(jù), 確定每個field_index中所需要的排名的數(shù)據(jù)在哪個分區(qū)以及分區(qū)內第幾條數(shù)據(jù)。例如要輸出field_index_6的13th位數(shù)據(jù),假設第一個分區(qū)已經包含10條數(shù)據(jù), 則目標數(shù)據(jù)在第二個分區(qū)的第3條數(shù)據(jù)
(5)轉換(4)計算結果為標準輸出格式
代碼:
(1)
/***?將數(shù)據(jù)源df轉換為(field_value,?field_index)格式的rdd*?@param?dataFrame*?@return*/def?getValueColumnPairs(dataFrame?:?DataFrame):?RDD[(Double,?Int)]?={dataFrame.rdd.flatMap{row:?Row?=>?row.toSeq.zipWithIndex.map{case?(v,?index)?=>?(v.toString.toDouble,?index)}}}
(3)
/***?對按照field_value排序后的sortedValueColumnPairs,?計算出每個分區(qū)上,?每個field_index分別有多少數(shù)據(jù)*?@param?sortedValueColumnPairs*?@param?numOfColumns*?@return*/def?getColumnsFreqPerPartition(sortedValueColumnPairs:?RDD[(Double,?Int)],numOfColumns?:?Int):?Array[(Int,?Array[Long])]?=?{val?zero?=?Array.fill[Long](numOfColumns)(0)????def?aggregateColumnFrequencies?(partitionIndex?:?Int,?valueColumnPairs?:?Iterator[(Double,?Int)])?=?{val?columnsFreq?:?Array[Long]?=?valueColumnPairs.aggregate(zero)((a?:?Array[Long],?v?:?(Double,?Int))?=>?{val?(value,?colIndex)?=?v??????????//increment?the?cell?in?the?zero?array?corresponding?to?this?columna(colIndex)?=?a(colIndex)?+?1La},(a?:?Array[Long],?b?:?Array[Long])?=>?{a.zip(b).map{?case(aVal,?bVal)?=>?aVal?+?bVal}})Iterator((partitionIndex,?columnsFreq))}sortedValueColumnPairs.mapPartitionsWithIndex(aggregateColumnFrequencies).collect()}
舉例說明:
假設對(1)中轉換后的數(shù)據(jù), 按照field_value排序后, 各個分區(qū)的數(shù)據(jù)如下所示
Partition 1: (1.5, 0) (1.75, 1) (2.0, 2) (5.25, 0)
Partition 2: (7.5, 1) (9.5, 2)
則(2)的輸出結果為:
[(0, [2, 1, 1]), (1, [0, 1, 1])]
(4)
/***?計算每個field_index所需排位數(shù)據(jù)在第幾個分區(qū)的第幾條數(shù)據(jù)*?@param?targetRanks?排位數(shù)組*?@param?partitionColumnsFreq?每個分區(qū)的每個field_index包含多少數(shù)據(jù)*?@param?numOfColumns?field個數(shù)*?@return*/def?getRanksLocationsWithinEachPart(targetRanks?:?List[Long],partitionColumnsFreq?:?Array[(Int,?Array[Long])],numOfColumns?:?Int)?:?Array[(Int,?List[(Int,?Long)])]?=?{????//?二維數(shù)組,?存儲當前每個field_index,?遍歷到到第幾條數(shù)據(jù)val?runningTotal?=?Array.fill[Long](numOfColumns)(0)????//?The?partition?indices?are?not?necessarily?in?sorted?order,?so?we?need//?to?sort?the?partitionsColumnsFreq?array?by?the?partition?index?(the//?first?value?in?the?tuple).partitionColumnsFreq.sortBy(_._1).map?{??????//?relevantIndexList?存儲分區(qū)上,?滿足排位數(shù)組的field_index在該分區(qū)的第幾條數(shù)據(jù)case?(partitionIndex,?columnsFreq)?=>?val?relevantIndexList?=?new?mutable.MutableList[(Int,?Long)]()columnsFreq.zipWithIndex.foreach{?case?(colCount,?colIndex)?=>??????????//?當天field_index(即colIndex),?遍歷到第幾條數(shù)據(jù)val?runningTotalCol?=?runningTotal(colIndex)??????????//??當前field_index(即colIndex),排位數(shù)組中哪些排位位于當前分區(qū)val?ranksHere:?List[Long]?=?targetRanks.filter(rank?=>runningTotalCol?<?rank?&&?runningTotalCol?+?colCount?>=?rank)??????????//?計算出當前分區(qū),當前field_index(即colIndex),?滿足排位數(shù)組的field_value在當前分區(qū)的位置relevantIndexList?++=?ranksHere.map(rank?=>?(colIndex,?rank?-?runningTotalCol))runningTotal(colIndex)?+=?colCount}(partitionIndex,?relevantIndexList.toList)}}
舉個例子:
假如目標排位:targetRanks: [5]
各分區(qū)各feild_index數(shù)據(jù)量:partitionColumnsFreq: [(0, [2, 3]), (1, [4, 1]), (2, [5, 2])]
字段個數(shù):numOfColumns: 2
輸出結果: [(0, []), (1, [(0, 3)]), (2, [(1, 1)])]
(5)
/***?過濾出每個field_index?所需排位的數(shù)值*?@param?sortedValueColumnPairs*?@param?ranksLocations?(4)中計算出的滿足排位數(shù)組要求的每個分區(qū)上,每個field_index在該分區(qū)的第幾條數(shù)據(jù)*?@return*/def?findTargetRanksIteratively(?sortedValueColumnPairs?:?RDD[(Double,?Int)],?ranksLocations?:?Array[(Int,?List[(Int,?Long)])]):RDD[(Int,?Double)]?=?{sortedValueColumnPairs.mapPartitionsWithIndex((partitionIndex?:?Int,?valueColumnPairs?:?Iterator[(Double,?Int)])?=>?{????????//?當前分區(qū)上,?滿足排位數(shù)組的feild_index及其在該分區(qū)上的位置val?targetsInThisPart:?List[(Int,?Long)]?=?ranksLocations(partitionIndex)._2????????if?(targetsInThisPart.nonEmpty)?{??????????//?map中的key為field_index,?value為該feild_index在當前分區(qū)中的哪些位置上的數(shù)據(jù)滿足排位數(shù)組要求val?columnsRelativeIndex:?Map[Int,?List[Long]]?=?targetsInThisPart.groupBy(_._1).mapValues(_.map(_._2))val?columnsInThisPart?=?targetsInThisPart.map(_._1).distinct??????????//?存儲各個field_index,?在分區(qū)遍歷了多少條數(shù)據(jù)val?runningTotals?:?mutable.HashMap[Int,?Long]=?new?mutable.HashMap()runningTotals?++=?columnsInThisPart.map(columnIndex?=>?(columnIndex,?0L)).toMap??????????//?遍歷當前分區(qū)的數(shù)據(jù)源,?格式為(field_value,?field_index),?過濾出滿足排位數(shù)據(jù)要求的數(shù)據(jù)valueColumnPairs.filter{????????????case(value,?colIndex)?=>lazy?val?thisPairIsTheRankStatistic:?Boolean?=?{????????????????//?每遍歷一條數(shù)據(jù),?runningTotals上對應的field_index?當前已遍歷數(shù)據(jù)量+1val?total?=?runningTotals(colIndex)?+?1LrunningTotals.update(colIndex,?total)columnsRelativeIndex(colIndex).contains(total)}(runningTotals?contains?colIndex)?&&?thisPairIsTheRankStatistic}.map(_.swap)}?else?{Iterator.empty}})}
分析:
(1)這種方法代碼可讀性較差
(2)需要遍歷兩遍原始數(shù)據(jù)
(3)相比于方案三, 更加有效避免executor內oom
(4)當field_value分布較離散的情況下, 這種方案相比于前三種, 效率更高
(5)上述算法中, 有兩個潛在的問題, 當field_value傾斜情況下(即某個范圍的值特別多),算法效率嚴重依賴于算法描述中的步驟(2)是否能將所有的field_value均勻的分配到各個partition;另一個問題是,當某些field_value重復現(xiàn)象比較多時, 是否可以合并對這些field_value的計數(shù),而不是在一個partition中的iterator中挨個遍歷這些重復數(shù)據(jù)。
備注:上述內容(問題背景、解決算法)取自《High Performance Spark Best Practices for Scaling and Optimizing Apache Spark》(作者: Holden Karau and Rachel Warren)
免費體驗云安全(易盾)內容安全、驗證碼等服務
更多網易技術、產品、運營經驗分享請點擊。
相關文章:
【推薦】?[翻譯]pytest測試框架(一)
【推薦】?淺談js拖拽
【推薦】?HBase最佳實踐-集群規(guī)劃
轉載于:https://www.cnblogs.com/163yun/p/9881058.html
總結
以上是生活随笔為你收集整理的大数据算法:排位问题(2)的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。