日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

《Spark大数据分析:核心概念、技术及实践》一3.5 API

發布時間:2024/4/13 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 《Spark大数据分析:核心概念、技术及实践》一3.5 API 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
本節書摘來自華章出版社《Spark大數據分析:核心概念、技術及實踐》一書中的第3章,第3.5節,作者[美] 穆罕默德·古勒(Mohammed?Guller),更多章節內容可以訪問云棲社區“華章計算機”公眾號查看。


3.5 API

應用可以通過使用Spark提供的庫獲得Spark集群計算的能力。這些庫都是用Scala編寫的。但是Spark提供了各種語言的API。在本書編寫之際,Spark API提供了如下語言的支持:Scala、Java、Python和R。可以使用上面的任何語言來開發Spark應用。也有其他語言(比如Clojure)的非官方支持。

Spark API主要由兩個抽象部件SparkContext和彈性分布式數據集(RDD)構成。應用程序通過這兩個部件和Spark進行交互。應用程序可以連接到Spark集群并使用相關資源。接下來會介紹這兩個抽象部件,然后詳細介紹RDD。

3.5.1 SparkContext

SparkContext是一個在Spark庫中定義的類。它是Spark庫的入口點。它表示與Spark集群的一個連接。使用Spark API創建的其他一些重要對象都依賴于它。

每個Spark應用程序都必須創建一個SparkContext類實例。目前,每個Spark應用程序只能擁有一個激活的SparkContext類實例。如果要創建一個新的實例,那么在此之前必須讓當前激活的類實例失活。

SparkContext有多個構造函數。最簡單的一個不需要任何參數。一個SparkContext類實例可以用如下代碼創建。


在這種情況下,SparkContext的配置信息都從系統屬性中獲取,比如Spark master的地址、應用名稱等。也可以創建一個SparkConf類實例,然后把它作為SparkContext的參數從而設定配置信息。SparkConf 是Spark庫中定義的一個類。通過這種方式可以像下面這樣設置各種Spark配置信息。


?

SparkConf為設置諸如Spark master這樣的常用配置信息都提供了對應的顯式方法。此外,它還提供了一個通用的方法用于設置配置信息,它使用鍵-值對進行設置。SparkContext和SparkConf可以使用的參數將在第4章進行詳細介紹。

在本章接下來的例子中會繼續使用上面創建的變量sc。

3.5.2 RDD

彈性分布式數據集(RDD)表示一個關于分區數據元素的集合,可以在其上進行并行操作。它是Spark的主要數據抽象概念。它是Spark庫中定義的一個抽象類。

從概念上看,除了可以用于表示分布式數據集和支持惰性操作的特性外,RDD類似于Spark的集合。惰性操作將在本章稍后部分詳細介紹。

下面分別簡要描述RDD的特點。

不可變性

RDD是一種不可變的數據結構。一旦創建,它將不可以在原地修改。基本上,一個修改RDD的操作都會返回一個新的RDD。

分片

RDD表示的是一組數據的分區。這些分區分布在多個集群節點上。然而,當Spark在單個節點運行時,所有的分區數據都會在當前節點上。

Spark存儲RDD的分區和數據集物理分區之間關系的映射關系。RDD是各個分布式數據源之中數據的一個抽象,它通常表示分布在多個集群節點上的分區數據。比如HDFS將數據分片或分塊分散存儲在集群中。默認情況下,一個RDD分區對應一個HDFS文件分片。其他的分布式數據源(比如Cassandra)同樣也將數據分片分散存儲在集群多個節點上。然而,一個RDD對應多個Cassandra分片。

容錯性

RDD為可容錯的。RDD代表了分散在集群中多個節點的數據,但是任何一個節點都有可能出故障。誠如之前所說的,一個節點出故障的可能性和集群節點數量成正比。集群越大,在任何一個節點它出故障的可能性就越高。

RDD會自動處理節點出故障的情況。當一個節點出故障時,該節點上存儲的數據將無法被訪問。此時,Spark會在其他節點上重建丟失的RDD分區數據。Spark存儲每一個RDD的血統信息。通過這些血統信息,Spark可以恢復RDD的部分信息,當節點出故障的時候,它甚至可以恢復整個RDD。

接口

需要著重指出的是,RDD是一個處理數據的接口。在Spark庫中它定義為一個抽象類。RDD為多種數據源提供了一個處理數據的統一接口,包括HDFS、HBase、Cassandra等。這個接口同樣可以用于處理存儲于多個節點內存中的數據。

Spark為不同數據源提供了各自具體的實現類,比如HadoopRDD、ParallelCollection-RDD、JdbcRDD和CassandraRDD。它們都支持基礎的RDD接口。

強類型

RDD類有一個參數用于表示類型,這使得RDD可以表示不同類型的數據。RDD可以表示同一類型數據的分布式集合,包括Integer、Long、Float、String或者應用開發者自己定義的類型。而且,一個應用總會使用某種類型的RDD,包括Integer、Long、Float、Double、String或自定義類型。

駐留在內存中

之前已經提及了Spark的內存集群計算特性。RDD類提供一套支持內存計算的API。Spark允許RDD在內存中緩存或長期駐留。就像之前所說的,對一個緩存在內存中的RDD進行操作比操作沒緩存的RDD要快很多。

3.5.3 創建RDD

由于RDD是一個抽象類,因此無法直接創建一個RDD的類實例。SparkContext類提供了一個工廠方法用來創建RDD實現類的類實例。RDD也可以通過由其他RDD執行轉換操作得到。就像之前所說的,RDD是不可變的。任何一個對RDD的修改操作都將返回一個代表修改后數據的新RDD。

本節總結了幾種創建RDD的常見方法。在下面的示例代碼中,sc是一個SparkContext的類實例。之前的章節已經介紹了怎么創建它。

parallelize

這個方法用于從本地Scala集合創建RDD實例。它會對Scala集合中的數據重新分區、重新分布,然后返回一個代表這些數據的RDD。這個方法很少用在生產上,但是使用它有助于學習Spark。


?

textFile

textFile方法用于從文本文件創建RDD實例。它可以從多種來源讀取數據,包括單個文件、本地同一目錄下的多個文件、HDFS、Amazon S3,或其他Hadoop支持的存儲系統。這個方法返回一個RDD,這個RDD代表的數據集每個元素都是一個字符串,每一個字符串代表輸入文件中的一行。


上面的代碼表示從存儲于HDFS上的一個文件或者目錄創建RDD實例。

textFile方法也可以讀取壓縮文件中的數據。而且,它的參數中可以存在通配符,用于從一個目錄中讀取多個文件。下面是一個例子。


textFile的第二個參數是一個可選參數,它用于指定分區的個數。默認情況下,Spark為每一個文件分塊創建一個分區。可以設置成一個更大的數字從而提高并行化程度,但是設置成一個小于文件分塊數的數字是不可以的。

wholeTextFiles

這個方法讀取目錄下的所有文本文件,然后返回一個由鍵值型RDD。返回RDD中的每一個鍵值對對應一個文件。鍵為文件路徑,對應的值為該文件的內容。這個方法可以從多種來源讀取數據,包括本地文件系統、HDFS、Amazon S3,或者其他Hadoop支持的存儲系統。


sequenceFile

sequenceFile方法從SequenceFile文件中獲取鍵值對數據,這些SequenceFile文件可以存儲于本地文件系統、HDFS或者其他Hadoop支持的存儲系統。這個方法返回一個鍵值對型RDD實例。當使用這個方法的時候,不僅需要提供文件名,還需要提供文件中數據鍵和值各自的類型。

?

3.5.4 RDD操作

Spark應用使用RDD類或其繼承類中定義的方法來處理數據。這些方法也稱為操作。既然Scala中可以把一個方法當成操作符使用,那么RDD中的方法有時也稱為操作符。

Spark的美好之處就在于同樣一個RDD方法既可以處理幾字節的數據也可以處理PB級的數據。而且Spark應用可以使用同樣的方法去處理數據,無論它是存儲于本地還是存儲于一個分布式存儲系統。這樣的靈活性使得開發者可以在單機上開發、調試、測試Spark應用,然后不用改動任何代碼就可以將它部署到一個大集群上。

RDD操作可以歸為兩類:轉換和行動。轉換將會創建一個新的RDD實例。行動則會將結果返回給驅動程序。

轉換

轉換指的是在原RDD實例上進行計算,而后創建一個新的RDD實例。本節將介紹一些常見的轉換操作。

從概念上看,RDD轉換操作的類似于Scala集合上的方法。主要的區別在于Scala集合方法操作的數據是在單機內存中的,而RDD的轉換操作可以處理分布在集群各個節點上的數據。另外一個重要的區別是,RDD轉換操作是惰性的,而Scala集合方法不是。本章余下部分會詳細介紹這些內容。

map

map方法是一個高階方法,它把一個函數作為它的參數,并把這個函數作用在原RDD的每個元素上,從而創建一個新RDD實例。這個作為參數的函數擁有一個參數并返回一個值。


filter

filter方法是一個高階方法,它把一個布爾函數作為它的參數,并把這個函數作用在原RDD的每個元素上,從而創建一個新RDD實例。一個布爾函數只有一個參數作為輸入,返回true或false。filter方法返回一個新的RDD實例,這個RDD實例代表的數據集由布爾函數返回true的元素構成。因此,新RDD實例代表的數據集是原RDD的子集。


flatMap

flatMap方法是一個高階方法,它把一個函數作為它的參數,這個函數處理原RDD中每個元素返回一個序列。扁平化這個序列的集合得到一個數據集,flatMap方法返回的RDD就代表這個數據集。

?

mapPartitions

mapPartitions是一個高階方法,它使你可以以分區的粒度來處理數據。相比于一次處理一個元素,mapPartitions一次處理處理一個分區,每個分區被當成一個迭代器。mapPartitions方法的函數參數把迭代器作為輸入,返回另外一個迭代器作為輸出。map-Partitions將自定義函數參數作用于每一個分區上,從而返回一個新RDD實例。


union

union方法把一個RDD實例作為輸入,返回一個新RDD實例,這個新RDD實例的數據集是原RDD和輸入RDD的合集。

?

intersection

intersection方法把一個RDD實例作為輸入,返回一個新RDD實例,這個新RDD實例代表的數據集是原RDD和輸入RDD的交集。

?

這是另外一個例子。


subtract

subtract方法把一個RDD實例作為輸入,返回一個新RDD實例,這個新RDD實例代表的數據集由那些存在于原RDD實例中但不在輸入RDD實例中的元素構成。


這是另外一個例子。

?

distinct

RDD實例上的distinct方法返回一個新RDD實例,這個新RDD實例的數據集由原RDD的數據集去重后得到。


cartesian

cartesian方法把一個RDD實例作為輸入,返回一個新RDD實例,這個新RDD實例的數據集由原RDD和輸入RDD的所有元素的笛卡兒積構成。返回的RDD實例的每一個元素都是一個有序二元組,每一個有序二元組的第一個元素來自原RDD,第二個元素來自輸入RDD。元素的個數等于原RDD的元素個數乘以輸入RDD的元素個數。

這個方法類似于SQL中的join操作。


zip

zip方法把一個RDD實例作為輸入,返回一個新RDD實例,這個新RDD實例的每一個元素是一個二元組,二元組的第一個元素來自原RDD,第二個元素來自輸入RDD。和cartesian方法不同的是,zip方法返回的RDD的元素個數于原RDD的元素個數。原RDD的元素個數和輸入RDD的相同。進一步地說,原RDD和輸入RDD不僅有相同的分區數,每個分區還有相同的元素個數。


zipWithIndex

zipWithIndex方法返回一個新RDD實例,這個新RDD實例的每個元素都是由原RDD元素及其下標構成的二元組。


groupBy

groupBy是一個高階方法,它將原RDD中的元素按照用戶定義的標準分組從而組成一個RDD。它把一個函數作為它的參數,這個函數為原RDD中的每一個元素生成一個鍵。groupBy把這個函數作用在原RDD的每一個元素上,然后返回一個由二元組構成的新RDD實例,每個二元組的第一個元素是函數生成的鍵,第二個元素是對應這個鍵的所有原RDD元素的集合。其中,鍵和原RDD元素的對應關系由那個作為參數的函數決定。

需要注意的是,groupBy是一個費時操作,因為它可能需要對數據做shuffle操作。

假設有一個CSV文件,文件的內容為公司客戶的姓名、年齡、性別和郵編。下面的示例代碼演示了按照郵編將客戶分組。

?

keyBy

keyBy方法與groupBy方法相類似。它是一個高階方法,把一個函數作為參數,這個函數為原RDD中的每一個元素生成一個鍵。keyBy方法把這個函數作用在原RDD的每一個元素上,然后返回一個由二元組構成的新RDD實例,每個二元組的第一個元素是函數生成的鍵,第二個元素是對應這個鍵的原RDD元素。其中,鍵和原RDD元素的對應關系由那個作為參數的函數決定。返回的RDD實例的元素個數和原RDD的相同。

groupBy和KeyBy的區別在于返回RDD實例的元素上。雖然都是二元組,但是 groupBy返回的二元組中的第二個元素是一個集合,而keyBy的是單個值。


?

sortBy

sortBy是一個高階方法,它將原RDD中的元素進行排序后組成一個新的RDD實例返回。它擁有兩個參數。第一個參數是一個函數,這個函數將為原RDD的每一個元素生成一個鍵。第二個參數用來指明是升序還是降序排列。


下面是另一個示例。


pipe

pipe方法可以讓你創建子進程來運行一段外部程序,然后捕獲它的輸出作為字符串,用這些字符串構成RDD實例返回。

randomSplit

randomSplit方法將原RDD分解成一個RDD數組。它的參數是分解的權重。

?

coalesce

coalesce方法用于減少RDD的分區數量。它把分區數作為參數,返回分區數等于這個參數的RDD實例。


使用coalesce方法時需要小心,因為減少了RDD的分區數也就意味著降低了Spark的并行能力。它通常用于合并小分區。舉例來說,在執行filter操作之后,RDD可能會有很多小分區。在這種情況下,減少分區數能提升性能。

repartition

repartition方法把一個整數作為參數,返回分區數等于這個參數的RDD實例。它有助于提高Spark的并行能力。它會重新分布數據,因此它是一個耗時操作。

coalesce和repartition方法看起來一樣,但是前者用于減少RDD中的分區,后者用于增加RDD中的分區。

?

sample

sample方法返回原RDD數據集的一個抽樣子集。它擁有三個參數。第一個參數指定是有放回抽樣還是無放回抽樣。第二個參數指定抽樣比例。第三個參數是可選的,指定抽樣的隨機數種子。

?

鍵值對型RDD的轉換

除了上面介紹的RDD轉換之外,針對鍵值對型RDD還支持其他的一些轉換。下面將介紹只能作用于鍵值對型RDD的常用轉換操作。

keys

keys方法返回只由原RDD中的鍵構成的RDD。


values

values方法返回只由原RDD中的值構成的RDD。

?

mapValues

mapValues是一個高階方法,它把一個函數作為它的參數,并把這個函數作用在原RDD的每個值上。它返回一個由鍵值對構成的RDD。它和map方法類似,不同點在于它把作為參數的函數作用在原RDD的值上,所以原RDD的鍵都沒有變。返回的RDD和原RDD擁有相同的鍵。


join

join方法把一個鍵值對型RDD作為參數輸入,而后在原RDD和輸入RDD上做內連接操作。它返回一個由二元組構成的RDD。二元組的第一個元素是原RDD和輸入RDD都有的鍵,第二個元素是一個元組,這個元組由原RDD和輸入RDD中鍵對應的值構成。


leftOuterJoin

leftOuterJoin方法把一個鍵值對型RDD作為參數輸入,而后在原RDD和輸入RDD之間做左連接操作。它返回一個由鍵值對構成的RDD。鍵值對的第一個元素是原RDD中的鍵,第二個元素是一個元組,這個元組由原RDD中鍵對應的值和輸入RDD中的可選值構成。可選值用Option類型表示。


rightOuterJoin

rightOuterJoin方法把一個鍵值對型RDD作為參數輸入,而后在原RDD和輸入RDD之間做右連接操作。它返回一個由鍵值對構成的RDD。鍵值對的第一個元素是輸入RDD中的鍵,第二個元素是一個元組,這個元組由原RDD中的可選值和輸入RDD中鍵對應的值構成。可選值用Option類型表示。


fullOuterJoin

fullOuterJoin方法把一個鍵值對型RDD作為參數輸入,而后在原RDD和輸入RDD之間做全連接操作。它返回一個由鍵值對構成的RDD。


sampleByKey

sampleByKey通過在鍵上抽樣返回原RDD的一個子集。它把對每個鍵的抽樣比例作為輸入參數,返回原RDD的一個抽樣。


subtractByKey

subtractByKey方法把一個鍵值對型RDD作為輸入參數,返回一個鍵值對RDD,這個鍵值對RDD的鍵都是只存在原RDD中但是不存在于輸入RDD中。


groupByKey

groupByKey方法返回一個由二元組構成的RDD,二元組的第一個元素是原RDD的鍵,第二個元素是一個集合,集合由該鍵對應的所有值構成。它類似于上面介紹過的group-By方法。二者的區別在于groupBy是一個高階方法,它的參數是一個函數,這個函數為原RDD的每一個元素生成一個鍵。groupByKey方法作用于RDD的每一個鍵值對上,故不需要一個生成鍵的函數作為輸入參數。

?

應當盡量避免使用groupByKey。它是一個耗時操作,因為它可能會對數據進行shuffle操作。在大多數情況下,都有不使用groupByKey的更好的替代方案。

reduceByKey

reduceByKey是一個高階方法,它把一個滿足結合律的二元操作符當作輸入參數。它把這個操作符作用于有相同鍵的值上。

一個二元操作符把兩個值當作輸入參數,返回一個值。一個滿足結合律的二元操作符返回同樣的結果,但是它不關心操作數的分組情況。

reduceByKey方法可以用于對同一鍵對應的值進行匯總操作。比如它可以用于對同一鍵對應的值進行求和,求乘積,求最小值,求最大值。


對于基于鍵的匯總操作、合并操作,reduceByKey比groupByKey更合適。

操作

操作指的是那些返回值給驅動程序的RDD方法。本節介紹一些RDD中常用的操作。

collect

collect方法返回一個數組,這個數組由原RDD中的元素構成。在使用這個方法的時候需要小心,因為它把在worker節點的數據移給了驅動程序。如果操作一個有大數據集的RDD,它有可能會導致驅動程序崩潰。

?

count

count方法返回原RDD中元素的個數。

?

countByValue

countByValue方法返回原RDD中每個元素的個數。它返回是一個map類實例,其中,鍵為元素的值,值為該元素的個數。


first

first方法返回原RDD中的第一個元素。

?

max

max方法返回RDD中最大的元素。

?

min

min方法返回RDD中最小的元素。


take

take方法的輸入參數為一個整數N,它返回一個由原RDD中前N個元素構成的RDD。


takeOrdered

takeOrdered方法的輸入參數為一個整數N,它返回一個由原RDD中前N小的元素構成的RDD。


top

top方法的輸入參數為一個整數N,它返回一個由原RDD中前N大的元素構成的RDD。


fold

fold是一個高階方法,用于對原RDD的元素做匯總操作,匯總的時候使用一個自定義的初值和一個滿足結合律的二元操作符。它首先在每一個RDD的分區中進行匯總,然后再匯總這些結果。

初值的取值取決于RDD中的元素類型和匯總操作的目的。比如,給定一個元素為整數的RDD,為了計算這個RDD中所有元素的和,初值取為0。相反,給定一個元素為整數的RDD,為了計算這個RDD中所有元素的乘積,初值則應取為1。


reduce

reduce是一個高階方法,用于對原RDD的元素做匯總操作,匯總的時候使用一個滿足結合律和交換律的二元操作符。它類似于fold方法,然而,它并不需要初值。

?

鍵值對型RDD上的操作

鍵值對RDD上有一些額外的操作,我們在下面進行介紹。

countByKey

countByKey方法用于統計原RDD每個鍵的個數。它返回一個map類實例,其中,鍵為原RDD中的鍵,值為個數。

?

lookup

lookup方法的輸入參數為一個鍵,返回一個序列,這個序列的元素為原RDD中這個鍵對應的值。

?

數值型RDD上的操作

如果RDD的元素類型為Integer、Long、Float或Double,則這樣的RDD為數值型RDD。這類RDD還有一些對于統計分析十分有用的額外操作,下面將介紹一些常用的行動。

mean

mean方法返回原RDD中元素的平均值。

?

stdev

stdev方法返回原RDD中元素的標準差。


sum

sum方法返回原RDD中所有元素的和。

?

variance

variance方法返回原RDD中元素的方差。

?

3.5.5 保存RDD

一般來說,數據處理完畢后,結果會保存在硬盤上。Spark允許開發者將RDD保存在任何Hadoop支持的存儲系統中。保存在硬盤上的RDD可以被其他Spark應用或Hadoop應用使用。

本節介紹將RDD保存成文件的常用方法。

saveAsTextFile

saveAsTextFile方法將原RDD中的元素保存在指定目錄中,這個目錄位于任何Hadoop支持的存儲系統中。每一個RDD中的元素都用字符串表示并另存為文本中的一行。


saveAsObjectFile

saveAsObjectFile方法將原RDD中的元素序列化成Java對象,存儲在指定目錄中。


saveAsSequenceFile

saveAsSequenceFile方法將鍵值對型RDD以SequenceFile的格式保存。鍵值對型RDD也可以以文本的格式保存,只須使用saveAsTextFile方法即可。

?

需要注意的是,上面的方法都把一個目錄的名字作為輸入參數,然后在這個目錄為每個RDD分區創建一個文件。這種設計不僅高效而且可容錯。因為每一個分區被存成一個文件,所以Spark在保存RDD的時候可以啟動多個任務,并行執行,將數據寫入文件系統中。這樣也保證了寫入數據的過程是可容錯的。一旦有一個將分區寫入文件的任務失敗了,Spark可以再啟動一個任務,重寫剛才失敗任務創建的文件。

總結

以上是生活随笔為你收集整理的《Spark大数据分析:核心概念、技术及实践》一3.5 API的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。