[大数据之Spark]——Actions算子操作入门实例
Actions
reduce(func)
Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
這個(gè)方法會(huì)傳入兩個(gè)參數(shù),計(jì)算這兩個(gè)參數(shù)返回一個(gè)結(jié)果。返回的結(jié)果與下一個(gè)參數(shù)一起當(dāng)做參數(shù)繼續(xù)進(jìn)行計(jì)算。
比如,計(jì)算一個(gè)數(shù)組的和。
//創(chuàng)建數(shù)據(jù)集
scala> var data = sc.parallelize(1 to 3,1)scala> data.collect
res6: Array[Int] = Array(1, 2, 3)//collect計(jì)算
scala> data.reduce((x,y)=>x+y)
res5: Int = 6 collect()
Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
返回?cái)?shù)據(jù)集的所有元素,通常是在使用filter或者其他操作的時(shí)候,返回的數(shù)據(jù)量比較少時(shí)使用。
比如,顯示剛剛定義的數(shù)據(jù)集內(nèi)容。
//創(chuàng)建數(shù)據(jù)集
scala> var data = sc.parallelize(1 to 3,1)scala> data.collect
res6: Array[Int] = Array(1, 2, 3) count()
Return the number of elements in the dataset.
計(jì)算數(shù)據(jù)集的數(shù)據(jù)個(gè)數(shù),一般都是統(tǒng)計(jì)內(nèi)部元素的個(gè)數(shù)。
//創(chuàng)建數(shù)據(jù)集
scala> var data = sc.parallelize(1 to 3,1)//統(tǒng)計(jì)個(gè)數(shù)
scala> data.count
res7: Long = 3scala> var data = sc.parallelize(List(("A",1),("B",1)))
scala> data.count
res8: Long = 2 first()
Return the first element of the dataset (similar to take(1)).
返回?cái)?shù)據(jù)集的第一個(gè)元素,類似take(1)
//創(chuàng)建數(shù)據(jù)集
scala> var data = sc.parallelize(List(("A",1),("B",1)))//獲取第一條元素
scala> data.first
res9: (String, Int) = (A,1) take(n)
Return an array with the first n elements of the dataset.
返回?cái)?shù)組的頭n個(gè)元素
//創(chuàng)建數(shù)據(jù)集
scala> var data = sc.parallelize(List(("A",1),("B",1)))scala> data.take(1)
res10: Array[(String, Int)] = Array((A,1))//如果n大于總數(shù),則會(huì)返回所有的數(shù)據(jù)
scala> data.take(8)
res12: Array[(String, Int)] = Array((A,1), (B,1))//如果n小于等于0,會(huì)返回空數(shù)組
scala> data.take(-1)
res13: Array[(String, Int)] = Array()scala> data.take(0)
res14: Array[(String, Int)] = Array() takeSample(withReplacement, num, [seed])
Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
這個(gè)方法與sample還是有一些不同的,主要表現(xiàn)在:
- 返回具體個(gè)數(shù)的樣本(第二個(gè)參數(shù)指定)
- 直接返回array而不是RDD
- 內(nèi)部會(huì)將返回結(jié)果隨機(jī)打散
//創(chuàng)建數(shù)據(jù)集
scala> var data = sc.parallelize(List(1,3,5,7))
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21//隨機(jī)2個(gè)數(shù)據(jù)
scala> data.takeSample(true,2,1)
res0: Array[Int] = Array(7, 1)//隨機(jī)4個(gè)數(shù)據(jù),注意隨機(jī)的數(shù)據(jù)可能是重復(fù)的
scala> data.takeSample(true,4,1)
res1: Array[Int] = Array(7, 7, 3, 7)//第一個(gè)參數(shù)是是否重復(fù)
scala> data.takeSample(false,4,1)
res2: Array[Int] = Array(3, 5, 7, 1)scala> data.takeSample(false,5,1)
res3: Array[Int] = Array(3, 5, 7, 1)
takeOrdered(n, [ordering])
Return the first n elements of the RDD using either their natural order or a custom comparator.
基于內(nèi)置的排序規(guī)則或者自定義的排序規(guī)則排序,返回前n個(gè)元素
//創(chuàng)建數(shù)據(jù)集
scala> var data = sc.parallelize(List("b","a","e","f","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:21//返回排序數(shù)據(jù)
scala> data.takeOrdered(3)
res4: Array[String] = Array(a, b, c) saveAsTextFile(path)
Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
將數(shù)據(jù)集作為文本文件保存到指定的文件系統(tǒng)、hdfs、或者h(yuǎn)adoop支持的其他文件系統(tǒng)中。
//創(chuàng)建數(shù)據(jù)集
scala> var data = sc.parallelize(List("b","a","e","f","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:21//保存為test_data_save文件
scala> data.saveAsTextFile("test_data_save")scala> data.saveAsTextFile("test_data_save2",classOf[GzipCodec])
<console>:24: error: not found: type GzipCodecdata.saveAsTextFile("test_data_save2",classOf[GzipCodec])^
//引入必要的class
scala> import org.apache.hadoop.io.compress.GzipCodec
import org.apache.hadoop.io.compress.GzipCodec//保存為壓縮文件
scala> data.saveAsTextFile("test_data_save2",classOf[GzipCodec])
查看文件
[xingoo@localhost bin]$ ll
drwxrwxr-x. 2 xingoo xingoo 4096 Oct 10 23:07 test_data_save
drwxrwxr-x. 2 xingoo xingoo 4096 Oct 10 23:07 test_data_save2
[xingoo@localhost bin]$ cd test_data_save2
[xingoo@localhost test_data_save2]$ ll
total 4
-rw-r--r--. 1 xingoo xingoo 30 Oct 10 23:07 part-00000.gz
-rw-r--r--. 1 xingoo xingoo 0 Oct 10 23:07 _SUCCESS
[xingoo@localhost test_data_save2]$ cd ..
[xingoo@localhost bin]$ cd test_data_save
[xingoo@localhost test_data_save]$ ll
total 4
-rw-r--r--. 1 xingoo xingoo 10 Oct 10 23:07 part-00000
-rw-r--r--. 1 xingoo xingoo 0 Oct 10 23:07 _SUCCESS
[xingoo@localhost test_data_save]$ cat part-00000
b
a
e
f
c saveAsSequenceFile(path)
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
保存為sequence文件
scala> var data = sc.parallelize(List(("A",1),("A",2),("B",1)),3)
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[21] at parallelize at <console>:22scala> data.saveAsSequenceFile("kv_test")[xingoo@localhost bin]$ cd kv_test/
[xingoo@localhost kv_test]$ ll
total 12
-rw-r--r--. 1 xingoo xingoo 99 Oct 10 23:25 part-00000
-rw-r--r--. 1 xingoo xingoo 99 Oct 10 23:25 part-00001
-rw-r--r--. 1 xingoo xingoo 99 Oct 10 23:25 part-00002
-rw-r--r--. 1 xingoo xingoo 0 Oct 10 23:25 _SUCCESS
saveAsObjectFile(path)
Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().
基于Java序列化保存文件
scala> var data = sc.parallelize(List("a","b","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[16] at parallelize at <console>:22scala> data.saveAsObjectFile("str_test")scala> var data2 = sc.objectFile[Array[String]]("str_test")
data2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[20] at objectFile at <console>:22scala> data2.collect
countByKey()
Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
統(tǒng)計(jì)KV中,相同K的V的個(gè)數(shù)
//創(chuàng)建數(shù)據(jù)集
scala> var data = sc.parallelize(List(("A",1),("A",2),("B",1)))
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:22//統(tǒng)計(jì)個(gè)數(shù)
scala> data.countByKey
res9: scala.collection.Map[String,Long] = Map(B -> 1, A -> 2)
foreach(func)
Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.
針對(duì)每個(gè)參數(shù)執(zhí)行,通常在更新互斥或者與外部存儲(chǔ)系統(tǒng)交互的時(shí)候使用
// 創(chuàng)建數(shù)據(jù)集
scala> var data = sc.parallelize(List("b","a","e","f","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:22// 遍歷
scala> data.foreach(x=>println(x+" hello"))
b hello
a hello
e hello
f hello
c hello
轉(zhuǎn)載于:https://www.cnblogs.com/xing901022/p/5947706.html
總結(jié)
以上是生活随笔為你收集整理的[大数据之Spark]——Actions算子操作入门实例的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 25kg的时速骑10公里要多久?
- 下一篇: 存储过程的优缺点 (转载)