日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark 简单实战_SparkCore入门实战 (二)

發(fā)布時間:2023/12/15 编程问答 48 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark 简单实战_SparkCore入门实战 (二) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一、鍵值對RDD數(shù)據(jù)分區(qū)器

鍵值對RDD數(shù)據(jù)分區(qū)器Spark目前支持Hash分區(qū)和Range分區(qū),用戶也可以自定義分區(qū),Hash分區(qū)為當前的默認分區(qū),Spark中分區(qū)器直接決定了RDD中分區(qū)的個數(shù)、RDD中每條數(shù)據(jù)經(jīng)過Shuffle過程屬于哪個分區(qū)和Reduce的個數(shù)

注意:(1)只有Key-Value類型的RDD才有分區(qū)器的,非Key-Value類型的RDD分區(qū)器的值是None

(2)每個RDD的分區(qū)ID范圍:0~numPartitions-1,決定這個值是屬于那個分區(qū)的。

1、獲取RDD分區(qū)

可以通過使用RDD的partitioner 屬性來獲取 RDD 的分區(qū)方式。它會返回一個 scala.Option 對象, 通過get方法獲取其中的值。相關源碼如下:

def getPartition(key: Any): Int = key match {

case null => 0

case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)

}

def nonNegativeMod(x: Int, mod: Int): Int = {

val rawMod = x % mod

rawMod + (if (rawMod < 0) mod else 0)

}

(1)創(chuàng)建一個pairRDD

scala> val pairs = sc.parallelize(List((1,1),(2,2),(3,3)))

pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[14] at parallelize at :24

(2)查看RDD的分區(qū)器

scala> pairs.partitioner

res13: Option[org.apache.spark.Partitioner] = None

(3)導入HashPartitioner類

scala> import org.apache.spark.HashPartitioner

import org.apache.spark.HashPartitioner

(4)使用HashPartitioner對RDD進行重新分區(qū)

scala> val partitioned = pairs.partitionBy(new HashPartitioner(2))

partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[15] at partitionBy at :27

(5)查看重新分區(qū)后RDD的分區(qū)器

scala> partitioned.partitioner

res14: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)

2、Hash分區(qū)HashPartitioner分區(qū)的原理:對于給定的key,計算其hashCode,并除以分區(qū)的個數(shù)取余,如果余數(shù)小于0,則用余數(shù)+分區(qū)的個數(shù)(否則加0),最后返回的值就是這個key所屬的分區(qū)ID。

使用Hash分區(qū)的實操

scala> val nopar = sc.parallelize(List((1,3),(1,2),(2,4),(2,3),(3,6),(3,8)),8)

nopar: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[16] at parallelize at :25

scala> nopar.mapPartitionsWithIndex((index,iter)=>{ Iterator(index.toString+" : "+iter.mkString("|")) }).collect

res16: Array[String] = Array("0 : ", 1 : (1,3), 2 : (1,2), 3 : (2,4), "4 : ", 5 : (2,3), 6 : (3,6), 7 : (3,8))

scala> val hashpar = nopar.partitionBy(new org.apache.spark.HashPartitioner(7))

hashpar: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[18] at partitionBy at :27

scala> hashpar.count

res17: Long = 6

scala> hashpar.partitioner

res18: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@7)

scala> hashpar.mapPartitions(iter => Iterator(iter.length)).collect()

res19: Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)

scala> nopar.partitioner

res20: Option[org.apache.spark.Partitioner] = None

3、 Ranger分區(qū)HashPartitioner分區(qū)弊端:可能導致每個分區(qū)中數(shù)據(jù)量的不均勻,極端情況下會導致某些分區(qū)擁有RDD的全部數(shù)據(jù)。

RangePartitioner作用:將一定范圍內(nèi)的數(shù)映射到某一個分區(qū)內(nèi),盡量保證每個分區(qū)中數(shù)據(jù)量的均勻,而且分區(qū)與分區(qū)之間是有序的,一個分區(qū)中的元素肯定都是比另一個分區(qū)內(nèi)的元素小或者大,但是分區(qū)內(nèi)的元素是不能保證順序的。簡單的說就是將一定范圍內(nèi)的數(shù)映射到某一個分區(qū)內(nèi)。實現(xiàn)過程為:

第一步:先重整個RDD中抽取出樣本數(shù)據(jù),將樣本數(shù)據(jù)排序,計算出每個分區(qū)的最大key值,形成一個Array[KEY]類型的數(shù)組變量rangeBounds;

第二步:判斷key在rangeBounds中所處的范圍,給出該key值在下一個RDD中的分區(qū)id下標;該分區(qū)器要求RDD中的KEY類型必須是可以排序的

4、自定義分區(qū)

要實現(xiàn)自定義的分區(qū)器,你需要繼承 org.apache.spark.Partitioner 類并實現(xiàn)下面三個方法。(1)numPartitions: Int:返回創(chuàng)建出來的分區(qū)數(shù)。

(2)getPartition(key: Any): Int:返回給定鍵的分區(qū)編號(0到numPartitions-1)。

(3)equals():Java 判斷相等性的標準方法。這個方法的實現(xiàn)非常重要,Spark 需要用這個方法來檢查你的分區(qū)器對象是否和其他分區(qū)器實例相同,這樣 Spark 才可以判斷兩個 RDD 的分區(qū)方式是否相同。

需求:將相同后綴的數(shù)據(jù)寫入相同的文件,通過將相同后綴的數(shù)據(jù)分區(qū)到相同的分區(qū)并保存輸出來實現(xiàn)。

(1)創(chuàng)建一個pairRDD

scala> val data = sc.parallelize(Array((1,1),(2,2),(3,3),(4,4),(5,5),(6,6)))

data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[20] at parallelize at :25

(2)定義一個自定義分區(qū)類

scala> :paste

// Entering paste mode (ctrl-D to finish)class CustomerPartitioner(numParts:Int) extends org.apache.spark.Partitioner{

//覆蓋分區(qū)數(shù) override def numPartitions: Int = numParts

//覆蓋分區(qū)號獲取函數(shù) override def getPartition(key: Any): Int = {

val ckey: String = key.toString

ckey.substring(ckey.length-1).toInt%numParts

}

}

// Exiting paste mode, now interpreting.

defined class CustomerPartitioner

(3)將RDD使用自定義的分區(qū)類進行重新分區(qū)

scala> val par = data.partitionBy(new CustomerPartitioner(2))

par: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[2] at partitionBy at :27

(4)查看重新分區(qū)后的數(shù)據(jù)分布

scala> par.mapPartitionsWithIndex((index,items)=>items.map((index,_))).collect

res3: Array[(Int, (Int, Int))] = Array((0,(2,2)), (0,(4,4)), (0,(6,6)), (1,(1,1)), (1,(3,3)), (1,(5,5)))

使用自定義的 Partitioner 是很容易的:只要把它傳給 partitionBy() 方法即可。Spark 中有許多依賴于數(shù)據(jù)混洗的方法,比如 join() 和 groupByKey(),它們也可以接收一個可選的 Partitioner 對象來控制輸出數(shù)據(jù)的分區(qū)方式。

二、Spark連接HBase數(shù)據(jù)讀取與保存

數(shù)據(jù)讀取與保存Spark的數(shù)據(jù)讀取及數(shù)據(jù)保存可以從兩個維度來作區(qū)分:文件格式以及文件系統(tǒng)。

文件格式分為:Text文件、Json文件、Csv文件、Sequence文件以及Object文件;

文件系統(tǒng)分為:本地文件系統(tǒng)、HDFS、HBASE以及數(shù)據(jù)庫

1、文件類數(shù)據(jù)讀取與保存

Text文件

(1)數(shù)據(jù)讀取:textFile(String)

scala> val hdfsFile = sc.textFile("hdfs://hadoop105:9000/fruit.txt")

hdfsFile: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/fruit.txt MapPartitionsRDD[21] at textFile at :24

(2)數(shù)據(jù)保存: saveAsTextFile(String)

scala> hdfsFile.saveAsTextFile("/fruitOut")

Json文件如果JSON文件中每一行就是一個JSON記錄,那么可以通過將JSON文件當做文本文件來讀取,然后利用相關的JSON庫對每一條數(shù)據(jù)進行JSON解析。

注意:使用RDD讀取JSON文件處理很復雜,同時SparkSQL集成了很好的處理JSON文件的方式,所以應用中多是采用SparkSQL處理JSON文件。

準備文件數(shù)據(jù):

(1)在in文件夾下創(chuàng)建user.json文件數(shù)據(jù),編輯內(nèi)容:

{"name":"123","age": 20}

{"name":"456","age": 20}

{"name":"789","age": 20}

方式一

代碼實現(xiàn):

package com.study.bigdatabase

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

import org.mortbay.util.ajax.JSON

//檢查點object Spark06_RDD_Serializable {

def main(args: Array[String]): Unit = {

val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

//創(chuàng)建Spark上下文對象 val sc = new SparkContext(config)

//讀取文件 val json = sc.textFile("in/user.json")

//解析json數(shù)據(jù) val result = json.map(JSON.parse)

result.foreach(println)

//釋放資源 sc.stop()

}

}

啟動程序,控制臺打印:

方式二:

命令方式:

(1)導入解析json所需的包

scala> import scala.util.parsing.json.JSON

(2)上傳json文件到HDFS

[root@hadoop105 spark]$ hadoop fs -put ./examples/src/main/resources/people.json /

(3)讀取文件

scala> val json = sc.textFile("/people.json")

json: org.apache.spark.rdd.RDD[String] = /people.json MapPartitionsRDD[8] at textFile at :24

(4)解析json數(shù)據(jù)

scala> val result = json.map(JSON.parseFull)

result: org.apache.spark.rdd.RDD[Option[Any]] = MapPartitionsRDD[10] at map at :27

(5)打印

scala> result.collect

res11: Array[Option[Any]] = Array(Some(Map(name -> Michael)), Some(Map(name -> Andy, age -> 30.0)), Some(Map(name -> Justin, age -> 19.0)))

2、 Sequence文件SequenceFile文件是Hadoop用來存儲二進制形式的key-value對而設計的一種平面文件(Flat File)。Spark 有專門用來讀取 SequenceFile 的接口。在 SparkContext 中,可以調(diào)用 sequenceFile keyClass, valueClass。

注意:SequenceFile文件只針對PairRDD

(1)創(chuàng)建一個RDD

scala> val rdd = sc.parallelize(Array((1,2),(3,4),(5,6)))

rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[24] at parallelize at :26

(2)將RDD保存為Sequence文件

scala> rdd.saveAsSequenceFile("file:///opt/module/spark/seqFile")

(3)查看該文件

[root@hadoop105 seqFile]$ pwd

/opt/module/spark/seqFile

[root@hadoop105 seqFile]$ ll

總用量 8

-rw-r--r-- 1 atguigu atguigu 108 10月 9 10:29 part-00000

-rw-r--r-- 1 atguigu atguigu 124 10月 9 10:29 part-00001

-rw-r--r-- 1 atguigu atguigu 0 10月 9 10:29 _SUCCESS

[root@hadoop105 seqFile]$ cat part-00000

SEQ org.apache.hadoop.io.IntWritable org.apache.hadoop.io.IntWritable

(4)讀取Sequence文件

scala> val seq = sc.sequenceFile[Int,Int]("file:///opt/module/spark/seqFile")

seq: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[18] at sequenceFile at :24

(5)打印讀取后的Sequence文件

scala> seq.collect

res14: Array[(Int, Int)] = Array((1,2), (3,4), (5,6))

3、對象文件對象文件是將對象序列化后保存的文件,采用Java的序列化機制。可以通過objectFile [k,v] (path) 函數(shù)接收一個路徑,讀取對象文件,返回對應的 RDD,也可以通過調(diào)用saveAsObjectFile() 實現(xiàn)對對象文件的輸出。因為是序列化所以要指定類型。

(1)創(chuàng)建一個RDD

scala> val rdd = sc.parallelize(Array(1,2,3,4))

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[25] at parallelize at :26

(2)將RDD保存為Object文件

scala> rdd.saveAsObjectFile("file:///opt/module/spark/objectFile")

(3)查看該文件

[root@hadoop105 objectFile]$ pwd

/opt/module/spark/objectFile

[root@hadoop105 objectFile]$ ll

總用量 8

-rw-r--r-- 1 atguigu atguigu 142 10月 9 10:37 part-00000

-rw-r--r-- 1 atguigu atguigu 142 10月 9 10:37 part-00001

-rw-r--r-- 1 atguigu atguigu 0 10月 9 10:37 _SUCCESS

[root@hadoop105 objectFile]$ cat part-00000

SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableW@`l

(4)讀取Object文件

scala> val objFile = sc.objectFile[Int]("file:///opt/module/spark/objectFile")

objFile: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at objectFile at :24

(5)打印讀取后的Sequence文件

scala> objFile.collect

res19: Array[Int] = Array(1, 2, 3, 4)

4、文件系統(tǒng)類數(shù)據(jù)讀取與保存HDFS

Spark的整個生態(tài)系統(tǒng)與Hadoop是完全兼容的,所以對于Hadoop所支持的文件類型或者數(shù)據(jù)庫類型,Spark也同樣支持.另外,由于Hadoop的API有新舊兩個版本,所以Spark為了能夠兼容Hadoop所有的版本,也提供了兩套創(chuàng)建操作接口.對于外部存儲創(chuàng)建操作而言,hadoopRDD和newHadoopRDD是最為抽象的兩個函數(shù)接口,主要包含以下四個參數(shù).(1)輸入格式(InputFormat):制定數(shù)據(jù)輸入的類型,如TextInputFormat等,新舊兩個版本所引用的版本分別是org.apache.hadoop.mapred.InputFormat和org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)

(2)鍵類型: 指定[K,V]鍵值對中K的類型

(3)值類型: 指定[K,V]鍵值對中V的類型

(4)分區(qū)值:指定由外部存儲生成的RDD的partition數(shù)量的最小值,如果沒有指定,系統(tǒng)會使用默認值defaultMinSplits注意:其他創(chuàng)建操作的API接口都是為了方便最終的Spark程序開發(fā)者而設置的,是這兩個接口的高效實現(xiàn)版本.例如,對于textFile而言,只有path這個指定文件路徑的參數(shù),其他參數(shù)在系統(tǒng)內(nèi)部指定了默認值。

a、在Hadoop中以壓縮形式存儲的數(shù)據(jù),不需要指定解壓方式就能夠進行讀取,因為Hadoop本身有一個解壓器會根據(jù)壓縮文件的后綴推斷解壓算法進行解壓.

b、如果用Spark從Hadoop中讀取某種類型的數(shù)據(jù)不知道怎么讀取的時候,上網(wǎng)查找一個使用map-reduce的時候是怎么讀取這種這種數(shù)據(jù)的,然后再將對應的讀取方式改寫成上面的hadoopRDD和newAPIHadoopRDD兩個類就行了

5、MySQL數(shù)據(jù)庫連接

支持通過Java JDBC訪問關系型數(shù)據(jù)庫。需要通過JdbcRDD進行,示例如下:

數(shù)據(jù)庫準備:

(1)添加依賴

mysql

mysql-connector-java

5.1.27

代碼實現(xiàn):

package com.study.bigdatabase

import org.apache.spark.rdd.JdbcRDD

import org.apache.spark.{SparkConf, SparkContext}

//檢查點object Spark07_RDD_MySql {

def main(args: Array[String]): Unit = {

val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

//創(chuàng)建Spark上下文對象 val sc = new SparkContext(config)

val driver="com.mysql.jdbc.Driver"

var url="jdbc:mysql://hadoop105:3306/RDD"

val userName="root"

val password="123456"

//創(chuàng)建 JDBCRDD,方法數(shù)據(jù)庫 var sql ="select name,age from user where id >= ? and id <= ?"

val jdbcRDD = new JdbcRDD(

sc,

() => {

//獲取數(shù)據(jù)庫連接對象 Class.forName(driver)

java.sql.DriverManager.getConnection(url, userName, password)

},

sql,

1,

3,

2,

(rs)=>{

println(rs.getString(1)+","+rs.getInt(2))

}

)

jdbcRDD.collect()

//釋放資源 sc.stop()

}

}

啟動程序,控制臺打印信息:

若代碼中sql這樣編寫,會出錯:

var sql ="select name,age from user"

運行程序,查看信息:

保存數(shù)據(jù)

準備user表:空數(shù)據(jù)

代碼實現(xiàn):

package com.study.bigdatabase

import java.sql.Connection

import org.apache.spark.rdd.{JdbcRDD, RDD}

import org.apache.spark.{SparkConf, SparkContext}

//檢查點object Spark07_RDD_MySql {

def main(args: Array[String]): Unit = {

val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

//創(chuàng)建Spark上下文對象 val sc = new SparkContext(config)

val driver="com.mysql.jdbc.Driver"

var url="jdbc:mysql://hadoop105:3306/RDD"

val userName="root"

val password="123456"

//保存數(shù)據(jù) val dataRDD:RDD[(String,Int)] = sc.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu", 40)))

dataRDD.foreach{

case ( name,age ) =>{

Class.forName(driver)

val connection:Connection = java.sql.DriverManager.getConnection(url, userName, password)

val sql="insert into user (name, age) values (?,?)"

val statement = connection.prepareStatement(sql)

statement.setString(1,name)

statement.setInt(2,age)

statement.executeUpdate()

statement.close()

connection.close()

}

}

//釋放資源 sc.stop()

}

}

啟動程序運行,控制臺打印信息:

6、HBase數(shù)據(jù)庫由于 org.apache.hadoop.hbase.mapreduce.TableInputFormat類的實現(xiàn),Spark 可以通過Hadoop輸入格式訪問HBase。這個輸入格式會返回鍵值對數(shù)據(jù),其中鍵的類型為org. apache.hadoop.hbase.io.ImmutableBytesWritable,而值的類型為org.apache.hadoop.hbase.client.Result。

(1)添加依賴

org.apache.hbase

hbase-server

1.3.1

org.apache.hbase

hbase-client

1.3.1

(2) 分析源碼圖與步驟

拷貝在項目工程resource文件下

在編寫代碼之前,我們在Linux環(huán)境的Hbase創(chuàng)建一張rddtable表:

hbase(main):001:0> create 'rddtable','info'

0 row(s) in 1.7850 seconds

=> Hbase::Table - rddtable

隨后,在這張表中,插入數(shù)據(jù):

hbase(main):001:0> put 'rddtable','1001','info:name',"zhanfsan"

0 row(s) in 0.5160 seconds

Spark與HBase連接,代碼實現(xiàn):

package com.study.bigdatabase

import org.apache.hadoop.hbase

import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration}

import org.apache.hadoop.hbase.client.Result

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.hadoop.hbase.util.Bytes

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

//object Spark08_RDD_HBase {

def main(args: Array[String]): Unit = {

val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

//創(chuàng)建Spark上下文對象 val sc = new SparkContext(config)

val conf = HBaseConfiguration.create()

conf.set(TableInputFormat.INPUT_TABLE, "rddtable")

val hbaseRDD:RDD[(ImmutableBytesWritable,Result)] = sc.newAPIHadoopRDD(

conf,

classOf[TableInputFormat],

classOf[ImmutableBytesWritable],

classOf[Result]

)

hbaseRDD.foreach{

case (rowkey,result) => {

//取數(shù)據(jù) val cells:Array[Cell] = result.rawCells()

for (cell

println(Bytes.toString((CellUtil.cloneValue(cell))))

}

}

}

//釋放資源 sc.stop()

}

}

啟動程序,運行查看一下:

7、在HBase添加數(shù)據(jù)(插入數(shù)據(jù))

代碼實現(xiàn):

package com.study.bigdatabase

import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration}

import org.apache.hadoop.hbase.client.{Put, Result}

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapred.TableOutputFormat

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.mapred.JobConf

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

//object Spark08_RDD_HBase {

def main(args: Array[String]): Unit = {

val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

//創(chuàng)建Spark上下文對象 val sc = new SparkContext(config)

val conf = HBaseConfiguration.create()

val dataRDD:RDD[(String,String)] = sc.makeRDD(List(("1002", "zhangsan"), ("1003", "lisi"),

("1004", "wangwu")))

val putRDD:RDD[(ImmutableBytesWritable,Put)] = dataRDD.map{

case (rowkey,name) =>{

val put = new Put(Bytes.toBytes(rowkey))

put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(name))

(new ImmutableBytesWritable(Bytes.toBytes(rowkey)),put)

}

}

val jobConf = new JobConf(conf)

jobConf.setOutputFormat(classOf[TableOutputFormat])

jobConf.set(TableOutputFormat.OUTPUT_TABLE,"rddtable")

putRDD.saveAsHadoopDataset(jobConf)

//釋放資源 sc.stop()

}

}

啟動程序,運行查看一下:無報錯

在HBase查看,掃描rddtable表,有數(shù)據(jù)了:

hbase(main):003:0> scan 'rddtable'

ROW COLUMN+CELL

1001 column=info:name, timestamp=1581087691942, value=zhanfsan

1002 column=info:name, timestamp=1581112909827, value=zhangsan

1003 column=info:name, timestamp=1581112909815, value=lisi

1004 column=info:name, timestamp=1581112909792, value=wangwu

4 row(s) in 4.5400 seconds

三、Spark 三大數(shù)據(jù)結構RDD:分布式數(shù)據(jù)集

廣播變量:分布式只讀共享變量

累加器: 分布式只寫共享變量

1、累加器累加器用來對信息進行聚合,通常在向 Spark傳遞函數(shù)時,比如使用 map() 函數(shù)或者用 filter() 傳條件時,可以使用驅(qū)動器程序中定義的變量,但是集群中運行的每個任務都會得到這些變量的一份新的副本,更新這些副本的值也不會影響驅(qū)動器中的對應變量。如果我們想實現(xiàn)所有分片處理時更新共享變量的功能,那么累加器可以實現(xiàn)我們想要的效果。

代碼具體實現(xiàn):

package com.study.bigdatabase

import org.apache.hadoop.hbase.HBaseConfiguration

import org.apache.hadoop.hbase.client.Put

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapred.TableOutputFormat

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.mapred.JobConf

import org.apache.spark.rdd.RDD

import org.apache.spark.util.LongAccumulator

import org.apache.spark.{SparkConf, SparkContext}

//object Spark09_ShareData {

def main(args: Array[String]): Unit = {

val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

//創(chuàng)建Spark上下文對象 val sc = new SparkContext(config)

val dataRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

/*第一種普通寫法:val i: Int = dataRDD.reduce(_ + _)println(i)*/

//第二種方法:使用累加器來共享變量 var sum:Int =0

//(1)創(chuàng)建累加器對象 val accumulator: LongAccumulator = sc.longAccumulator

dataRDD.foreach{

case i =>{

//(2)執(zhí)行累加器的累加功能 accumulator.add(i)

}

}

println("sum ="+accumulator.value)

//釋放資源 sc.stop()

}

}

啟動程序運行,控制臺打印信息:

2、自定義累加器自定義累加器類型的功能在1.X版本中就已經(jīng)提供了,但是使用起來比較麻煩,在2.0版本后,累加器的易用性有了較大的改進,而且官方還提供了一個新的抽象類:AccumulatorV2來提供更加友好的自定義類型累加器的實現(xiàn)方式。實現(xiàn)自定義類型累加器需要繼承AccumulatorV2并至少覆寫下例中出現(xiàn)的方法,下面這個累加器可以用于在程序運行過程中收集一些文本類信息,最終以Set[String]的形式返回。

(1)分析源碼圖:

代碼實現(xiàn):

package com.study.bigdatabase

import java.util

import org.apache.spark.rdd.RDD

import org.apache.spark.util.{AccumulatorV2, LongAccumulator}

import org.apache.spark.{SparkConf, SparkContext}

//自定義累加器object Spark10_ShareData {

def main(args: Array[String]): Unit = {

val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

//創(chuàng)建Spark上下文對象 val sc = new SparkContext(config)

val dataRDD: RDD[String] = sc.makeRDD(List("hadoop", "hive", "hbase", "Scala","Spark"), 2)

// TODO 創(chuàng)建累加器 val wordAccumnlator = new WordAccumnlator

// TODO 注冊累加器 sc.register(wordAccumnlator)

dataRDD.foreach{

case word =>{

// TODO 執(zhí)行累加器累加功能 wordAccumnlator.add(word)

}

}

// TODO 獲取累加器的值 println("sum ="+wordAccumnlator.value)

//釋放資源 sc.stop()

}

}

//聲明累加器//1、繼承AccumulatorV2//2、實現(xiàn)抽象方法//3、創(chuàng)建累加器class WordAccumnlator extends AccumulatorV2[String, util.ArrayList[String]]{

val list = new util.ArrayList[String]()

//當前累加器是否初始化狀態(tài) override def isZero: Boolean = {

list.isEmpty

}

//復制累加器對象 override def copy(): AccumulatorV2[String, util.ArrayList[String]] = {

new WordAccumnlator()

}

//重置累加器對象 override def reset(): Unit = {

list.clear()

}

//向累加器中增加數(shù)據(jù) override def add(v: String): Unit = {

if (v.contains("h")) {

list.add(v)

}

}

//合并累加器 override def merge(other: AccumulatorV2[String, util.ArrayList[String]]): Unit = {

list.addAll(other.value)

}

//獲取累加器的結果 override def value: util.ArrayList[String] = list

}

啟動程序,控制臺打印信息:

以”h“累加聚中在一起

3、廣播變量(調(diào)優(yōu)策略)廣播變量用來高效分發(fā)較大的對象。向所有工作節(jié)點發(fā)送一個較大的只讀值,以供一個或多個Spark操作使用。比如,如果你的應用需要向所有節(jié)點發(fā)送一個較大的只讀查詢表,甚至是機器學習算法中的一個很大的特征向量,廣播變量用起來都很順手。 在多個并行操作中使用同一個變量,但是 Spark會為每個任務分別發(fā)送。

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value

res0: Array[Int] = Array(1, 2, 3)

使用廣播變量的過程如下:(1) 通過對一個類型 T 的對象調(diào)用 SparkContext.broadcast 創(chuàng)建出一個 Broadcast[T] 對象。 任何可序列化的類型都可以這么實現(xiàn)。

(2) 通過 value 屬性訪問該對象的值(在 Java 中為 value() 方法)。

(3) 變量只會被發(fā)到各個節(jié)點一次,應作為只讀值處理(修改這個值不會影響到別的節(jié)點)。

4、擴展

RDD相關概念關系輸入可能以多個文件的形式存儲在HDFS上,每個File都包含了很多塊,稱為Block。當Spark讀取這些文件作為輸入時,會根據(jù)具體數(shù)據(jù)格式對應的InputFormat進行解析,一般是將若干個Block合并成一個輸入分片,稱為InputSplit,注意InputSplit不能跨越文件。隨后將為這些輸入分片生成具體的Task。InputSplit與Task是一一對應的關系。隨后這些具體的Task每個都會被分配到集群上的某個節(jié)點的某個Executor去執(zhí)行。

(1)每個節(jié)點可以起一個或多個Executor。

(2)每個Executor由若干core組成,每個Executor的每個core一次只能執(zhí)行一個Task。

(3)每個Task執(zhí)行的結果就是生成了目標RDD的一個partiton。

注意: 這里的core是虛擬的core而不是機器的物理CPU核,可以理解為就是Executor的一個工作線程。而 Task被執(zhí)行的并發(fā)度 = Executor數(shù)目 * 每個Executor核數(shù)。至于partition的數(shù)目:(1)對于數(shù)據(jù)讀入階段,例如sc.textFile,輸入文件被劃分為多少InputSplit就會需要多少初始Task。

(2)在Map階段partition數(shù)目保持不變。

(3)在Reduce階段,RDD的聚合會觸發(fā)shuffle操作,聚合后的RDD的partition數(shù)目跟具體操作有關,例如repartition操作會聚合成指定分區(qū)數(shù),還有一些算子是可配置的。

RDD在計算的時候,每個分區(qū)都會起一個task,所以rdd的分區(qū)數(shù)目決定了總的的task數(shù)目。申請的計算節(jié)點(Executor)數(shù)目和每個計算節(jié)點核數(shù),決定了你同一時刻可以并行執(zhí)行的task。

比如的RDD有100個分區(qū),那么計算的時候就會生成100個task,你的資源配置為10個計算節(jié)點,每個兩2個核,同一時刻可以并行的task數(shù)目為20,計算這個RDD就需要5個輪次。如果計算資源不變,你有101個task的話,就需要6個輪次,在最后一輪中,只有一個task在執(zhí)行,其余核都在空轉(zhuǎn)。如果資源不變,你的RDD只有2個分區(qū),那么同一時刻只有2個task運行,其余18個核空轉(zhuǎn),造成資源浪費。這就是在spark調(diào)優(yōu)中,增大RDD分區(qū)數(shù)目,增大任務并行度的做法。

總結

以上是生活随笔為你收集整理的spark 简单实战_SparkCore入门实战 (二)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。