Spark Java API:Action
?
reduce
官方文檔描述:
Reduces the elements of this RDD using the specified commutative and associative binary operator.函數原型:
def reduce(f: JFunction2[T, T, T]): T根據映射函數f,對RDD中的元素進行二元計算(滿足交換律和結合律),返回計算結果。
源碼分析:
def reduce(f: (T, T) => T): T = withScope { val cleanF = sc.clean(f) val reducePartition: Iterator[T] => Option[T] = iter => { if (iter.hasNext) { Some(iter.reduceLeft(cleanF)) } else { None } } var jobResult: Option[T] = None val mergeResult = (index: Int, taskResult: Option[T]) => { if (taskResult.isDefined) { jobResult = jobResult match { case Some(value) => Some(f(value, taskResult.get)) case None => taskResult } } } sc.runJob(this, reducePartition, mergeResult) // Get the final result out of our Option, or throw an exception if the RDD was empty jobResult.getOrElse(throw new UnsupportedOperationException("empty collection")) }從源碼中可以看出,reduce函數相當于對RDD中的元素進行reduceLeft函數操作,reduceLeft函數是從列表的左邊往右邊應用reduce函數;之后,在driver端對結果進行合并處理,因此,如果分區數量過多或者自定義函數過于復雜,對driver端的負載比較重。
實例:
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3);Integer reduceRDD = javaRDD.reduce(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + reduceRDD);aggregate
官方文檔描述:
Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.函數原型:
def aggregate[U](zeroValue: U)(seqOp: JFunction2[U, T, U], combOp: JFunction2[U, U, U]): Uaggregate合并每個區分的每個元素,然后在對分區結果進行merge處理,這個函數最終返回的類型不需要和RDD中元素類型一致。
源碼分析:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope { // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance()) val cleanSeqOp = sc.clean(seqOp) val cleanCombOp = sc.clean(combOp) val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult) sc.runJob(this, aggregatePartition, mergeResult) jobResult }從源碼中可以看出,aggregate函數針對每個分區利用scala集合操作aggregate,再使用comb()將之前每個分區結果聚合。
實例:
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3); Integer aggregateRDD = javaRDD.aggregate(2, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + aggregateRDD);?
fold
官方文檔描述:
Aggregate the elements of each partition, and then the results for all the partitions, using a given associative and commutative function and a neutral "zero value". The function op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.函數原型:
def fold(zeroValue: T)(f: JFunction2[T, T, T]): Tfold是aggregate的簡化,將aggregate中的seqOp和combOp使用同一個函數op。
源碼分析:
def fold(zeroValue: T)(op: (T, T) => T): T = withScope { // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) val cleanOp = sc.clean(op) val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp) val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult) sc.runJob(this, foldPartition, mergeResult) jobResult }從源碼中可以看出,先是將zeroValue賦值給jobResult,然后針對每個分區利用op函數與zeroValue進行計算,再利用op函數將taskResult和jobResult合并計算,同時更新jobResult,最后,將jobResult的結果返回。
實例:
List<String> data = Arrays.asList("5", "1", "1", "3", "6", "2", "2"); JavaRDD<String> javaRDD = javaSparkContext.parallelize(data,5); JavaRDD<String> partitionRDD = javaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() { @Override public Iterator<String> call(Integer v1, Iterator<String> v2) throws Exception { LinkedList<String> linkedList = new LinkedList<String>(); while(v2.hasNext()){ linkedList.add(v1 + "=" + v2.next()); } return linkedList.iterator(); } },false);System.out.println(partitionRDD.collect());String foldRDD = javaRDD.fold("0", new Function2<String, String, String>() { @Override public String call(String v1, String v2) throws Exception { return v1 + " - " + v2; } }); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + foldRDD);countByKey
官方文檔描述:
Count the number of elements for each key, collecting the results to a local Map.Note that this method should only be used if the resulting map is expected to be small, as the whole thing is loaded into the driver's memory. To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which returns an RDD[T, Long] instead of a map.函數原型:
def countByKey(): java.util.Map[K, Long]源碼分析:
def countByKey(): Map[K, Long] = self.withScope { self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap }從源碼中可以看出,先是進行map操作轉化為(key,1)鍵值對,再進行reduce聚合操作,最后利用collect函數將數據加載到driver,并轉化為map類型。?
注意,從上述分析可以看出,countByKey操作將數據全部加載到driver端的內存,如果數據量比較大,可能出現OOM。因此,如果key數量比較多,建議進行rdd.mapValues(_ => 1L).reduceByKey(_ + _),返回RDD[T, Long]。
實例:
List<String> data = Arrays.asList("5", "1", "1", "3", "6", "2", "2"); JavaRDD<String> javaRDD = javaSparkContext.parallelize(data,5);JavaRDD<String> partitionRDD = javaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() { @Override public Iterator<String> call(Integer v1, Iterator<String> v2) throws Exception { LinkedList<String> linkedList = new LinkedList<String>(); while(v2.hasNext()){ linkedList.add(v1 + "=" + v2.next()); } return linkedList.iterator(); } },false); System.out.println(partitionRDD.collect()); JavaPairRDD<String,String> javaPairRDD = javaRDD.mapToPair(new PairFunction<String, String, String>() { @Override public Tuple2<String, String> call(String s) throws Exception { return new Tuple2<String, String>(s,s); } }); System.out.println(javaPairRDD.countByKey());?
foreach
官方文檔描述:
Applies a function f to all elements of this RDD.函數原型:
def foreach(f: VoidFunction[T])foreach用于遍歷RDD,將函數f應用于每一個元素。
源碼分析:
def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) }實例:
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3); javaRDD.foreach(new VoidFunction<Integer>() { @Override public void call(Integer integer) throws Exception { System.out.println(integer); } });foreachPartition
官方文檔描述:
Applies a function f to each partition of this RDD.函數原型:
def foreachPartition(f: VoidFunction[java.util.Iterator[T]])foreachPartition和foreach類似,只不過是對每一個分區使用f。
源碼分析:
def foreachPartition(f: Iterator[T] => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => cleanF(iter)) }實例:
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3);//獲得分區ID JavaRDD<String> partitionRDD = javaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<String>>() { @Override public Iterator<String> call(Integer v1, Iterator<Integer> v2) throws Exception { LinkedList<String> linkedList = new LinkedList<String>(); while(v2.hasNext()){ linkedList.add(v1 + "=" + v2.next()); }return linkedList.iterator(); } },false); System.out.println(partitionRDD.collect()); javaRDD.foreachPartition(new VoidFunction<Iterator<Integer>>() { @Override public void call(Iterator<Integer> integerIterator) throws Exception { System.out.println("___________begin_______________"); while(integerIterator.hasNext()) System.out.print(integerIterator.next() + " "); System.out.println("\n___________end_________________"); } });lookup
官方文檔描述:
Return the list of values in the RDD for key `key`. This operation is done efficiently if the RDD has a known partitioner by only searching the partition that the key maps to.函數原型:
def lookup(key: K): JList[V]lookup用于(K,V)類型的RDD,指定K值,返回RDD中該K對應的所有V值。
源碼分析:
def lookup(key: K): Seq[V] = self.withScope { self.partitioner match { case Some(p) => val index = p.getPartition(key) val process = (it: Iterator[(K, V)]) => { val buf = new ArrayBuffer[V] for (pair <- it if pair._1 == key) { buf += pair._2 } buf } : Seq[V] val res = self.context.runJob(self, process, Array(index), false) res(0) case None => self.filter(_._1 == key).map(_._2).collect() } }從源碼中可以看出,如果partitioner不為空,計算key得到對應的partition,在從該partition中獲得key對應的所有value;如果partitioner為空,則通過filter過濾掉其他不等于key的值,然后將其value輸出。
實例:
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data, 3); JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { int i = 0; @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { i++; return new Tuple2<Integer, Integer>(integer,i + integer); } }); System.out.println(javaPairRDD.collect()); System.out.println("lookup------------" + javaPairRDD.lookup(4));?
?
sortBy
官方文檔描述:
Return this RDD sorted by the given key function.函數原型:
def sortBy[S](f: JFunction[T, S], ascending: Boolean, numPartitions: Int): JavaRDD[T]sortBy根據給定的f函數將RDD中的元素進行排序。
源碼分析:
def sortBy[K]( f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length) (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope { this.keyBy[K](f) .sortByKey(ascending, numPartitions) .values } /** * Creates tuples of the elements in this RDD by applying `f`. */ def keyBy[K](f: T => K): RDD[(K, T)] = withScope { val cleanedF = sc.clean(f) map(x => (cleanedF(x), x)) }從源碼中可以看出,sortBy函數的實現依賴于sortByKey函數。該函數接受三個參數,第一參數是一個函數,該函數帶有泛型參數T,返回類型與RDD中的元素類型一致,主要是用keyBy函數的map轉化,將每個元素轉化為tuples類型的元素;第二個參數是ascending,該參數是可選參數,主要用于RDD中的元素的排序方式,默認是true,是升序;第三個參數是numPartitions,該參數也是可選參數,主要使用對排序后的RDD進行分區,默認的分區個數與排序前一致是partitions.length。
實例:
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data, 3); final Random random = new Random(100); //對RDD進行轉換,每個元素有兩部分組成 JavaRDD<String> javaRDD1 = javaRDD.map(new Function<Integer, String>() { @Override public String call(Integer v1) throws Exception { return v1.toString() + "_" + random.nextInt(100); } }); System.out.println(javaRDD1.collect()); //按RDD中每個元素的第二部分進行排序 JavaRDD<String> resultRDD = javaRDD1.sortBy(new Function<String, Object>() { @Override public Object call(String v1) throws Exception { return v1.split("_")[1]; } },false,3); System.out.println("result--------------" + resultRDD.collect());takeOrdered
官方文檔描述:
Returns the first k (smallest) elements from this RDD using the natural ordering for T while maintain the order.函數原型:
def takeOrdered(num: Int): JList[T] def takeOrdered(num: Int, comp: Comparator[T]): JList[T]takeOrdered函數用于從RDD中,按照默認(升序)或指定排序規則,返回前num個元素。
源碼分析:
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { if (num == 0) { Array.empty } else { val mapRDDs = mapPartitions { items => // Priority keeps the largest elements, so let's reverse the ordering. val queue = new BoundedPriorityQueue[T](num)(ord.reverse) queue ++= util.collection.Utils.takeOrdered(items, num)(ord) Iterator.single(queue) } if (mapRDDs.partitions.length == 0) { Array.empty } else { mapRDDs.reduce { (queue1, queue2) => queue1 ++= queue2 queue1 }.toArray.sorted(ord) } } }從源碼分析可以看出,利用mapPartitions在每個分區里面進行分區排序,每個分區局部排序只返回num個元素,這里注意返回的mapRDDs的元素是BoundedPriorityQueue優先隊列,再針對mapRDDs進行reduce函數操作,轉化為數組進行全局排序。
實例:
//注意comparator需要序列化 public static class TakeOrderedComparator implements Serializable,Comparator<Integer>{ @Override public int compare(Integer o1, Integer o2) { return -o1.compareTo(o2); } } List<Integer> data = Arrays.asList(5, 1, 0, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data, 3); System.out.println("takeOrdered-----1-------------" + javaRDD.takeOrdered(2)); List<Integer> list = javaRDD.takeOrdered(2, new TakeOrderedComparator()); System.out.println("takeOrdered----2--------------" + list);takeSample
官方文檔描述:
Return a fixed-size sampled subset of this RDD in an array函數原型:
def takeSample(withReplacement: Boolean, num: Int): JList[T]def takeSample(withReplacement: Boolean, num: Int, seed: Long): JList[T]takeSample函數返回一個數組,在數據集中隨機采樣 num 個元素組成。
源碼分析:
def takeSample( withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T] = { val numStDev = 10.0 if (num < 0) { throw new IllegalArgumentException("Negative number of elements requested") } else if (num == 0) { return new Array[T](0) } val initialCount = this.count() if (initialCount == 0) { return new Array[T](0) }val maxSampleSize = Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt if (num > maxSampleSize) { throw new IllegalArgumentException("Cannot support a sample size > Int.MaxValue - " + s"$numStDev * math.sqrt(Int.MaxValue)") } val rand = new Random(seed) if (!withReplacement && num >= initialCount) { return Utils.randomizeInPlace(this.collect(), rand) } val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount, withReplacement) var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect() // If the first sample didn't turn out large enough, keep trying to take samples; // this shouldn't happen often because we use a big multiplier for the initial size var numIters = 0 while (samples.length < num) { logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters") samples = this.sample(withReplacement, fraction, rand.nextInt()).collect() numIters += 1 } Utils.randomizeInPlace(samples, rand).take(num) }從源碼中可以看出,takeSample函數類似于sample函數,該函數接受三個參數,第一個參數withReplacement ,表示采樣是否放回,true表示有放回的采樣,false表示無放回采樣;第二個參數num,表示返回的采樣數據的個數,這個也是takeSample函數和sample函數的區別;第三個參數seed,表示用于指定的隨機數生成器種子。另外,takeSample函數先是計算fraction,也就是采樣比例,然后調用sample函數進行采樣,并對采樣后的數據進行collect(),最后調用take函數返回num個元素。注意,如果采樣個數大于RDD的元素個數,且選擇的無放回采樣,則返回RDD的元素的個數。
實例:
List<Integer> data = Arrays.asList(5, 1, 0, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data, 3); System.out.println("takeSample-----1-------------" + javaRDD.takeSample(true,2)); System.out.println("takeSample-----2-------------" + javaRDD.takeSample(true,2,100)); //返回20個元素 System.out.println("takeSample-----3-------------" + javaRDD.takeSample(true,20,100)); //返回7個元素 System.out.println("takeSample-----4-------------" + javaRDD.takeSample(false,20,100));?
?
treeAggregate
官方文檔描述:
Aggregates the elements of this RDD in a multi-level tree pattern.函數原型:
def treeAggregate[U]( zeroValue: U, seqOp: JFunction2[U, T, U], combOp: JFunction2[U, U, U],depth: Int): U def treeAggregate[U]( zeroValue: U, seqOp: JFunction2[U, T, U], combOp: JFunction2[U, U, U]): U可理解為更復雜的多階aggregate。
源碼分析:
def treeAggregate[U: ClassTag](zeroValue: U)( seqOp: (U, T) => U, combOp: (U, U) => U, depth: Int = 2): U = withScope { require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.") if (partitions.length == 0) { Utils.clone(zeroValue, context.env.closureSerializer.newInstance()) } else { val cleanSeqOp = context.clean(seqOp) val cleanCombOp = context.clean(combOp) val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it))) var numPartitions = partiallyAggregated.partitions.length val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2) // If creating an extra level doesn't help reduce // the wall-clock time, we stop tree aggregation. // Don't trigger TreeAggregation when it doesn't save wall-clock time while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) { numPartitions /= scale val curNumPartitions = numPartitions partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { (i, iter) => iter.map((i % curNumPartitions, _)) }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values } partiallyAggregated.reduce(cleanCombOp) } }從源碼中可以看出,treeAggregate函數先是對每個分區利用scala的aggregate函數進行局部聚合的操作;同時,依據depth參數計算scale,如果當分區數量過多時,則按i%curNumPartitions進行key值計算,再按key進行重新分區合并計算;最后,在進行reduce聚合操作。這樣可以通過調解深度來減少reduce的開銷。
實例:
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3); //轉化操作 JavaRDD<String> javaRDD1 = javaRDD.map(new Function<Integer, String>() { @Override public String call(Integer v1) throws Exception { return Integer.toString(v1); } });String result1 = javaRDD1.treeAggregate("0", new Function2<String, String, String>() { @Override public String call(String v1, String v2) throws Exception { System.out.println(v1 + "=seq=" + v2); return v1 + "=seq=" + v2; } }, new Function2<String, String, String>() { @Override public String call(String v1, String v2) throws Exception { System.out.println(v1 + "<=comb=>" + v2); return v1 + "<=comb=>" + v2; } }); System.out.println(result1);treeReduce
官方文檔描述:
Reduces the elements of this RDD in a multi-level tree pattern.函數原型:
def treeReduce(f: JFunction2[T, T, T], depth: Int): T def treeReduce(f: JFunction2[T, T, T]): T與treeAggregate類似,只不過是seqOp和combOp相同的treeAggregate。
源碼分析:
def treeReduce(f: (T, T) => T, depth: Int = 2): T = withScope { require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.") val cleanF = context.clean(f) val reducePartition: Iterator[T] => Option[T] = iter => { if (iter.hasNext) { Some(iter.reduceLeft(cleanF)) } else { None } } val partiallyReduced = mapPartitions(it => Iterator(reducePartition(it))) val op: (Option[T], Option[T]) => Option[T] = (c, x) => { if (c.isDefined && x.isDefined) { Some(cleanF(c.get, x.get)) } else if (c.isDefined) { c } else if (x.isDefined) { x } else { None } } partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth) .getOrElse(throw new UnsupportedOperationException("empty collection"))}從源碼中可以看出,treeReduce函數先是針對每個分區利用scala的reduceLeft函數進行計算;最后,在將局部合并的RDD進行treeAggregate計算,這里的seqOp和combOp一樣,初值為空。在實際應用中,可以用treeReduce來代替reduce,主要是用于單個reduce操作開銷比較大,而treeReduce可以通過調整深度來控制每次reduce的規模。
實例:
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,5); JavaRDD<String> javaRDD1 = javaRDD.map(new Function<Integer, String>() { @Override public String call(Integer v1) throws Exception { return Integer.toString(v1); } }); String result = javaRDD1.treeReduce(new Function2<String, String, String>() { @Override public String call(String v1, String v2) throws Exception { System.out.println(v1 + "=" + v2); return v1 + "=" + v2; } }); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + treeReduceRDD);?
?
saveAsTextFile
官方文檔描述:
Save?this?RDD?as?a?text?file,?using?string?representations?of?elements.函數原型:
def?saveAsTextFile(path:?String):?Unit def?saveAsTextFile(path:?String,?codec:?Class[_?<:?CompressionCodec]):?UnitsaveAsTextFile用于將RDD以文本文件的格式存儲到文件系統中。
源碼分析:
def?saveAsTextFile(path:?String):?Unit?=?withScope?{? //?https://issues.apache.org/jira/browse/SPARK-2075? //? //?NullWritable?is?a?`Comparable`?in?Hadoop?1.+,?so?the?compiler?cannot?find?an?implicit? //?Ordering?for?it?and?will?use?the?default?`null`.?However,?it's?a?`Comparable[NullWritable]`? //?in?Hadoop?2.+,?so?the?compiler?will?call?the?implicit?`Ordering.ordered`?method?to?create?an? //?Ordering?for?`NullWritable`.?That's?why?the?compiler?will?generate?different?anonymous? //?classes?for?`saveAsTextFile`?in?Hadoop?1.+?and?Hadoop?2.+.? //? //?Therefore,?here?we?provide?an?explicit?Ordering?`null`?to?make?sure?the?compiler?generate? //?same?bytecodes?for?`saveAsTextFile`.? val?nullWritableClassTag?=?implicitly[ClassTag[NullWritable]]? val?textClassTag?=?implicitly[ClassTag[Text]]? val?r?=?this.mapPartitions?{?iter?=>? ? val?text?=?new?Text()? ? iter.map?{?x?=>? ? ? text.set(x.toString)? ? ? (NullWritable.get(),?text)? ? }? }? RDD.rddToPairRDDFunctions(r)(nullWritableClassTag,?textClassTag,?null)? ? .saveAsHadoopFile[TextOutputFormat[NullWritable,?Text]](path) } /**? *?Output?the?RDD?to?any?Hadoop-supported?file?system,?using?a?Hadoop?`OutputFormat`?class? *?supporting?the?key?and?value?types?K?and?V?in?this?RDD.? */ def?saveAsHadoopFile(? ? path:?String,? ? keyClass:?Class[_],? ? valueClass:?Class[_],? ? outputFormatClass:?Class[_?<:?OutputFormat[_,?_]],? ? conf:?JobConf?=?new?JobConf(self.context.hadoopConfiguration),? ? codec:?Option[Class[_?<:?CompressionCodec]]?=?None):?Unit?=?self.withScope?{? //?Rename?this?as?hadoopConf?internally?to?avoid?shadowing?(see?SPARK-2038).? val?hadoopConf?=?conf? hadoopConf.setOutputKeyClass(keyClass)? hadoopConf.setOutputValueClass(valueClass)? //?Doesn't?work?in?Scala?2.9?due?to?what?may?be?a?generics?bug? //?TODO:?Should?we?uncomment?this?for?Scala?2.10?? //?conf.setOutputFormat(outputFormatClass)? hadoopConf.set("mapred.output.format.class",?outputFormatClass.getName)? for?(c?<-?codec)?{? ? hadoopConf.setCompressMapOutput(true)? ? hadoopConf.set("mapred.output.compress",?"true")? ? hadoopConf.setMapOutputCompressorClass(c)? ? hadoopConf.set("mapred.output.compression.codec",?c.getCanonicalName)? ? hadoopConf.set("mapred.output.compression.type",?CompressionType.BLOCK.toString)? }? //?Use?configured?output?committer?if?already?set? if?(conf.getOutputCommitter?==?null)?{? ? hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])? }? FileOutputFormat.setOutputPath(hadoopConf,? ?SparkHadoopWriter.createPathFromString(path,?hadoopConf))? saveAsHadoopDataset(hadoopConf) }/**? *?Output?the?RDD?to?any?Hadoop-supported?storage?system,?using?a?Hadoop?JobConf?object?for? *?that?storage?system.?The?JobConf?should?set?an?OutputFormat?and?any?output?paths?required? *?(e.g.?a?table?name?to?write?to)?in?the?same?way?as?it?would?be?configured?for?a?Hadoop? *?MapReduce?job.? */ def?saveAsHadoopDataset(conf:?JobConf):?Unit?=?self.withScope?{? //?Rename?this?as?hadoopConf?internally?to?avoid?shadowing?(see?SPARK-2038).? val?hadoopConf?=?conf? val?wrappedConf?=?new?SerializableConfiguration(hadoopConf)? val?outputFormatInstance?=?hadoopConf.getOutputFormat? val?keyClass?=?hadoopConf.getOutputKeyClass? val?valueClass?=?hadoopConf.getOutputValueClass? if?(outputFormatInstance?==?null)?{? ? throw?new?SparkException("Output?format?class?not?set")? }? if?(keyClass?==?null)?{? ? throw?new?SparkException("Output?key?class?not?set")? }? if?(valueClass?==?null)?{? ? throw?new?SparkException("Output?value?class?not?set")? }? SparkHadoopUtil.get.addCredentials(hadoopConf)? logDebug("Saving?as?hadoop?file?of?type?("?+?keyClass.getSimpleName?+?",?"?+? ? valueClass.getSimpleName?+?")")? if?(isOutputSpecValidationEnabled)?{? ? //?FileOutputFormat?ignores?the?filesystem?parameter? ? val?ignoredFs?=?FileSystem.get(hadoopConf)? ? hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs,?hadoopConf)? }? val?writer?=?new?SparkHadoopWriter(hadoopConf)? writer.preSetup()? val?writeToFile?=?(context:?TaskContext,?iter:?Iterator[(K,?V)])?=>?{? ? val?config?=?wrappedConf.value? ? //?Hadoop?wants?a?32-bit?task?attempt?ID,?so?if?ours?is?bigger?than?Int.MaxValue,?roll?it? ? //?around?by?taking?a?mod.?We?expect?that?no?task?will?be?attempted?2?billion?times.? ? val?taskAttemptId?=?(context.taskAttemptId?%?Int.MaxValue).toInt? ? val?(outputMetrics,?bytesWrittenCallback)?=?initHadoopOutputMetrics(context)? ? writer.setup(context.stageId,?context.partitionId,?taskAttemptId)? ? writer.open()? ? var?recordsWritten?=?0L? ? Utils.tryWithSafeFinally?{? ? ? while?(iter.hasNext)?{? ? ? ? val?record?=?iter.next()? ? ? ? writer.write(record._1.asInstanceOf[AnyRef],?record._2.asInstanceOf[AnyRef])? ? ? ? //?Update?bytes?written?metric?every?few?records? ? ? ? maybeUpdateOutputMetrics(bytesWrittenCallback,?outputMetrics,?recordsWritten)? ? ? ? recordsWritten?+=?1? ? ? }? ? }?{? ? ? writer.close()? ? }? ? writer.commit()? ? bytesWrittenCallback.foreach?{?fn?=>?outputMetrics.setBytesWritten(fn())?}? ? outputMetrics.setRecordsWritten(recordsWritten)? }? self.context.runJob(self,?writeToFile)? writer.commitJob() }從源碼中可以看到,saveAsTextFile函數是依賴于saveAsHadoopFile函數,由于saveAsHadoopFile函數接受PairRDD,所以在saveAsTextFile函數中利用rddToPairRDDFunctions函數轉化為(NullWritable,Text)類型的RDD,然后通過saveAsHadoopFile函數實現相應的寫操作。
實例:
List<Integer>?data?=?Arrays.asList(5,?1,?1,?4,?4,?2,?2); JavaRDD<Integer>?javaRDD?=?javaSparkContext.parallelize(data,5); javaRDD.saveAsTextFile("/user/tmp");savaAsObjectFile
官方文檔描述:
Save?this?RDD?as?a?SequenceFile?of?serialized?objects.函數原型:
def?saveAsObjectFile(path:?String):?UnitsaveAsObjectFile用于將RDD中的元素序列化成對象,存儲到文件中。
源碼分析:
def?saveAsObjectFile(path:?String):?Unit?=?withScope?{? this.mapPartitions(iter?=>?iter.grouped(10).map(_.toArray))? ? .map(x?=>?(NullWritable.get(),?new?BytesWritable(Utils.serialize(x))))? ? .saveAsSequenceFile(path) }def?saveAsSequenceFile(? ? path:?String,? ? codec:?Option[Class[_?<:?CompressionCodec]]?=?None):?Unit?=?self.withScope?{? def?anyToWritable[U?<%?Writable](u:?U):?Writable?=?u? //?TODO?We?cannot?force?the?return?type?of?`anyToWritable`?be?same?as?keyWritableClass?and? //?valueWritableClass?at?the?compile?time.?To?implement?that,?we?need?to?add?type?parameters?to? //?SequenceFileRDDFunctions.?however,?SequenceFileRDDFunctions?is?a?public?class?so?it?will?be?a? //?breaking?change.? val?convertKey?=?self.keyClass?!=?keyWritableClass? val?convertValue?=?self.valueClass?!=?valueWritableClass? logInfo("Saving?as?sequence?file?of?type?("?+?keyWritableClass.getSimpleName?+?","?+? ? valueWritableClass.getSimpleName?+?")"?)? val?format?=?classOf[SequenceFileOutputFormat[Writable,?Writable]]? val?jobConf?=?new?JobConf(self.context.hadoopConfiguration)? if?(!convertKey?&&?!convertValue)?{? ? self.saveAsHadoopFile(path,?keyWritableClass,?valueWritableClass,?format,?jobConf,?codec)? }?else?if?(!convertKey?&&?convertValue)?{? ? self.map(x?=>?(x._1,?anyToWritable(x._2))).saveAsHadoopFile(? ? ? path,?keyWritableClass,?valueWritableClass,?format,?jobConf,?codec)? }?else?if?(convertKey?&&?!convertValue)?{? ? self.map(x?=>?(anyToWritable(x._1),?x._2)).saveAsHadoopFile(? ? ? path,?keyWritableClass,?valueWritableClass,?format,?jobConf,?codec)? }?else?if?(convertKey?&&?convertValue)?{? ? self.map(x?=>?(anyToWritable(x._1),?anyToWritable(x._2))).saveAsHadoopFile(? ? ? path,?keyWritableClass,?valueWritableClass,?format,?jobConf,?codec)? } }從源碼中可以看出,saveAsObjectFile函數是依賴于saveAsSequenceFile函數實現的,將RDD轉化為類型為
實例:
List<Integer>?data?=?Arrays.asList(5,?1,?1,?4,?4,?2,?2); JavaRDD<Integer>?javaRDD?=?javaSparkContext.parallelize(data,5); javaRDD.saveAsObjectFile("/user/tmp");總結
以上是生活随笔為你收集整理的Spark Java API:Action的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spark:sortByKey实现二次排
- 下一篇: Spark Java API:Trans