日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark RDD/Core 编程 API入门系列 之rdd实战(rdd基本操作实战及transformation和action流程图)(源码)(三)...

發(fā)布時(shí)間:2023/12/13 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark RDD/Core 编程 API入门系列 之rdd实战(rdd基本操作实战及transformation和action流程图)(源码)(三)... 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

本博文的主要內(nèi)容是:

1、rdd基本操作實(shí)戰(zhàn)

2、transformation和action流程圖

3、典型的transformation和action

?

?

?

RDD有3種操作:

1、? Trandformation ?????對數(shù)據(jù)狀態(tài)的轉(zhuǎn)換,即所謂算子的轉(zhuǎn)換

2、? Action??? 觸發(fā)作業(yè),即所謂得結(jié)果的

3、? Contoller? 對性能、效率和容錯(cuò)方面的支持,如cache、persist、checkpoint

Contoller包括cache、persist、checkpoint。

?

/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

傳入類型是T,返回類型是U。

?

?

?

元素之間,為什么reduce操作,要符合結(jié)合律和交換律?
答:因?yàn)?#xff0c;交換律,不知,哪個(gè)數(shù)據(jù)先過來。所以,必須符合交換律。
在交換律基礎(chǔ)上,想要reduce操作,必須要符合結(jié)合律。

/**

* Reduces the elements of this RDD using the specified commutative and
* associative binary operator.
*/
def reduce(f: (T, T) => T): T = withScope {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
}
}
}
sc.runJob(this, reducePartition, mergeResult)
// Get the final result out of our Option, or throw an exception if the RDD was empty
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

RDD.scala(源碼)


這里,新建包c(diǎn)om.zhouls.spark.cores

package com.zhouls.spark.cores

/**
* Created by Administrator on 2016/9/27.
*/
object TextLines {

}


下面,開始編代碼

本地模式

自動(dòng) ,會(huì)寫好

源碼來看,

所以,?val lines = sc.textFile("C:\\Users\\Administrator\\Desktop\\textlines.txt") //通過HadoopRDD以及MapPartitionsRDD獲取文件中每一行的內(nèi)容本身

?

?

val lineCount = lines.map(line => (line,1)) //每一行變成行的內(nèi)容與1構(gòu)成的Tuple


val textLines = lineCount.reduceByKey(_+_)


textLines.collect.foreach(pair => println(pair._1 + ":" + pair._2))

?成功!



?現(xiàn)在,將此行代碼,

textLines.collect.foreach(pair => println(pair._1 + ":" + pair._2))
改一改 textLines.foreach(pair => println(pair._1 + ":" + pair._2))

總結(jié):

本地模式里,
textLines.collect.foreach(pair => println(pair._1 + ":" + pair._2))
改一改 textLines.foreach(pair => println(pair._1 + ":" + pair._2)) 運(yùn)行正常,因?yàn)樵诒镜啬J较?#xff0c;是jvm,但這樣書寫,是不正規(guī)的。

?

?

集群模式里,
textLines.collect.foreach(pair => println(pair._1 + ":" + pair._2))
改一改 textLines.foreach(pair => println(pair._1 + ":" + pair._2)) 運(yùn)行無法通過,因?yàn)榻Y(jié)果是分布在各個(gè)節(jié)點(diǎn)上。 collect源碼: /**
* Return an array that contains all of the elements in this RDD.
*/
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}

得出,collect后array中就是一個(gè)元素,只不過這個(gè)元素是一個(gè)Tuple。 Tuple是元組。通過concat合并!

foreach源碼:
/**
* Applies a function f to all elements of this RDD.
*/
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}   


?

rdd實(shí)戰(zhàn)(rdd基本操作實(shí)戰(zhàn))至此!

?

?

?

?

?rdd實(shí)戰(zhàn)(transformation流程圖)

?拿wordcount為例!

?

啟動(dòng)hdfs集群

spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ sbin/start-dfs.sh

?

?

?啟動(dòng)spark集群

spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6$ sbin/start-all.sh

?

?

啟動(dòng)spark-shell

spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ ./spark-shell --master spark://SparkSingleNode:7077 --executor-memory 1g

?

?

scala> val partitionsReadmeRdd = ?sc.textFile("hdfs://SparkSingleNode:9000/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).saveAsTextFile("~/partition1README.txt")

?或者

?scala> val readmeRdd = sc.textFile("hdfs://SparkSingleNode:9000/README.md")

?scala> ?val partitionsReadmeRdd = readmeRdd.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_,1)

.saveAsTextFile("~/partition1README.txt")

?

注意,~目錄,不是這里。

?

?

?

?為什么,我的,不是這樣的顯示呢?

?

?

?

RDD的transformation和action執(zhí)行的流程圖

?

?

典型的transformation和action

轉(zhuǎn)載于:https://www.cnblogs.com/zlslch/p/5913334.html

總結(jié)

以上是生活随笔為你收集整理的Spark RDD/Core 编程 API入门系列 之rdd实战(rdd基本操作实战及transformation和action流程图)(源码)(三)...的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。