日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

SparkR:数据科学家的新利器

發布時間:2025/7/25 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SparkR:数据科学家的新利器 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

from:http://www.csdn.net/article/2015-10-23/2826010

摘要:R是數據科學家中最流行的編程語言和環境之一,在Spark中加入對R的支持是社區中較受關注的話題。作為增強Spark對數據科學家群體吸引力的最新舉措,最近發布的Spark?1.4版本在現有的Scala/Java/Python?API之外增加了R?API(SparkR)。SparkR使得熟悉R的用戶可以在Spark的分布式計算平臺基礎上結合R本身強大的統計分析功能和豐富的第三方擴展包,對大規模數據集進行分析和處理。本文將回顧SparkR項目的背景,對其當前的特性作總體的概覽,闡述其架構和若干技術關鍵點,最后進行展望和總結。

項目背景

R是非常流行的數據統計分析和制圖的語言及環境,有一項調查顯示,R語言在數據科學家中使用的程度僅次于SQL。但目前R語言的核心運行環境是單線程的,能處理的數據量受限于單機的內存容量,大數據時代的海量數據處理對R構成了挑戰。

為了解決R的可伸縮性問題,R社區已經有一些方案,比如parallel和snow包,可以在計算機集群上并行運行R代碼。但它們的缺陷在于沒有解決數據分布式存儲,數據仍然需要在主節點集中表示,分片后再傳輸給工作節點,不適用于大數據處理的場景。另外,數據處理模型過于簡單,即數據分片在工作節點處理后,結果收集回主節點,缺少一個象MapReduce那樣通用的分布式數據編程模型。

Hadoop是流行的大數據處理平臺,它的HDFS分布式文件系統和之上的MapReduce編程模型比較好地解決了大數據分布式存儲和處理的問題。RHadoop項目的出現使得用戶具備了在R中使用Hadoop處理大數據的能力。

Apache頂級開源項目Spark是Hadoop之后備受關注的新一代分布式計算平臺。和Hadoop相比,Spark提供了分布式數據集的抽象,編程模型更靈活和高效,能夠充分利用內存來提升性能。為了方便數據科學家使用Spark進行數據挖掘,社區持續往Spark中加入吸引數據科學家的各種特性,例如0.7.0版本中加入的python?API?(PySpark);1.3版本中加入的DataFrame等。

R和Spark的強強結合應運而生。2013年9月SparkR作為一個獨立項目啟動于加州大學伯克利分校的大名鼎鼎的AMPLAB實驗室,與Spark源出同門。2014年1月,SparkR項目在github上開源(https://github.com/amplab-extras/SparkR-pkg)。隨后,來自工業界的Alteryx、Databricks、Intel等公司和來自學術界的普渡大學,以及其它開發者積極參與到開發中來,最終在2015年4月成功地合并進Spark代碼庫的主干分支,并在Spark?1.4版本中作為重要的新特性之一正式宣布。

當前特性

SparkR往Spark中增加了R語言API和運行時支持。Spark的?API由Spark?Core的API以及各個內置的高層組件(Spark?Streaming,Spark?SQL,ML?Pipelines和MLlib,Graphx)的API組成,目前SparkR只提供了Spark的兩組API的R語言封裝,即Spark?Core的RDD?API和Spark?SQL的DataFrame?API。

需要指出的是,在Spark?1.4版本中,SparkR的RDD?API被隱藏起來沒有開放,主要是出于兩點考慮:

  • RDD?API雖然靈活,但比較底層,R用戶可能更習慣于使用更高層的API;
  • RDD?API的實現上目前不夠健壯,可能會影響用戶體驗,比如每個分區的數據必須能全部裝入到內存中的限制,對包含復雜數據類型的RDD的處理可能會存在問題等。
  • 目前社區正在討論是否開放RDD?API的部分子集,以及如何在RDD?API的基礎上構建一個更符合R用戶習慣的高層API。

    RDD?API

    用戶使用SparkR?RDD?API在R中創建RDD,并在RDD上執行各種操作。

    目前SparkR?RDD實現了Scala?RDD?API中的大部分方法,可以滿足大多數情況下的使用需求:

    SparkR支持的創建RDD的方式有:

    • 從R?list或vector創建RDD(parallelize())
    • 從文本文件創建RDD(textFile())
    • 從object文件載入RDD(objectFile())

    SparkR支持的RDD的操作有:

    • 數據緩存,持久化控制:cache(),persist(),unpersist()
    • 數據保存:saveAsTextFile(),saveAsObjectFile()
    • 常用的數據轉換操作,如map(),flatMap(),mapPartitions()等
    • 數據分組、聚合操作,如partitionBy(),groupByKey(),reduceByKey()等
    • RDD間join操作,如join(),?fullOuterJoin(),?leftOuterJoin()等
    • 排序操作,如sortBy(),?sortByKey(),?top()等
    • Zip操作,如zip(),?zipWithIndex(),?zipWithUniqueId()
    • 重分區操作,如coalesce(),?repartition()
    • 其它雜項方法

    和Scala?RDD?API相比,SparkR?RDD?API有一些適合R的特點:

    • SparkR?RDD中存儲的元素是R的數據類型。
    • SparkR?RDD?transformation操作應用的是R函數。
    • RDD是一組分布式存儲的元素,而R是用list來表示一組元素的有序集合,因此SparkR將RDD整體上視為一個分布式的list。Scala?API?中RDD的每個分區的數據由iterator來表示和訪問,而在SparkR?RDD中,每個分區的數據用一個list來表示,應用到分區的轉換操作,如mapPartitions(),接收到的分區數據是一個list而不是iterator。
    • 為了符合R用戶經常使用lapply()對一個list中的每一個元素應用某個指定的函數的習慣,SparkR在RDD類上提供了SparkR專有的transformation方法:lapply()、lapplyPartition()、lapplyPartitionsWithIndex(),分別對應于Scala?API的map()、mapPartitions()、mapPartitionsWithIndex()。

    DataFrame?API

    Spark?1.3版本引入了DataFrame?API。相較于RDD?API,DataFrame?API更受社區的推崇,這是因為:

  • DataFrame的執行過程由Catalyst優化器在內部進行智能的優化,比如過濾器下推,表達式直接生成字節碼。
  • 基于Spark?SQL的外部數據源(external?data?sources)?API訪問(裝載,保存)廣泛的第三方數據源。
  • 使用R或Python的DataFrame?API能獲得和Scala近乎相同的性能。而使用R或Python的RDD?API的性能比起Scala?RDD?API來有較大的性能差距。
  • Spark的DataFrame?API是從R的?Data?Frame數據類型和Python的pandas庫借鑒而來,因而對于R用戶而言,SparkR的DataFrame?API是很自然的。更重要的是,SparkR?DataFrame?API性能和Scala?DataFrame?API幾乎相同,所以推薦盡量用SparkR?DataFrame來編程。

    目前SparkR的DataFrame?API已經比較完善,支持的創建DataFrame的方式有:

    • 從R原生data.frame和list創建
    • 從SparkR?RDD創建
    • 從特定的數據源(JSON和Parquet格式的文件)創建
    • 從通用的數據源創建
    • 將指定位置的數據源保存為外部SQL表,并返回相應的DataFrame
    • 從Spark?SQL表創建
    • 從一個SQL查詢的結果創建

    支持的主要的DataFrame操作有:

    ·數據緩存,持久化控制:cache(),persist(),unpersist()

    • 數據保存:saveAsParquetFile(),?saveDF()?(將DataFrame的內容保存到一個數據源),saveAsTable()?(將DataFrame的內容保存存為數據源的一張表)
    • 集合運算:unionAll(),intersect(),?except()
    • Join操作:join(),支持inner、full?outer、left/right?outer和semi?join。
    • 數據過濾:filter(),?where()
    • 排序:sortDF(),?orderBy()
    • 列操作:增加列-?withColumn(),列名更改-?withColumnRenamed(),選擇若干列?-select()、selectExpr()。為了更符合R用戶的習慣,SparkR還支持用$、[]、[[]]操作符選擇列,可以用$<列名>?<-?的語法來增加、修改和刪除列
    • RDD?map類操作:lapply()/map(),flatMap(),lapplyPartition()/mapPartitions(),foreach(),foreachPartition()
    • 數據聚合:groupBy(),agg()
    • 轉換為RDD:toRDD(),toJSON()
    • 轉換為表:registerTempTable(),insertInto()
    • 取部分數據:limit(),take(),first(),head()

    編程示例

    總體上看,SparkR程序和Spark程序結構很相似。

    基于RDD?API的示例

    要基于RDD?API編寫SparkR程序,首先調用sparkR.init()函數來創建SparkContext。然后用SparkContext作為參數,調用parallelize()或者textFile()來創建RDD。有了RDD對象之后,就可以對它們進行各種transformation和action操作。下面的代碼是用SparkR編寫的Word?Count示例:

    library(SparkR) #初始化SparkContext sc <- sparkR.init("local", "RWordCount") #從HDFS上的一個文本文件創建RDD lines <- textFile(sc, "hdfs://localhost:9000/my_text_file") #調用RDD的transformation和action方法來計算word count #transformation用的函數是R代碼 words <- flatMap(lines, function(line) { strsplit(line, " ")[[1]] }) wordCount <- lapply(words, function(word) { list(word, 1L) }) counts <- reduceByKey(wordCount, "+", 2L) output <- collect(counts)

    基于DataFrame API的示例

    基于DataFrame?API的SparkR程序首先創建SparkContext,然后創建SQLContext,用SQLContext來創建DataFrame,再操作DataFrame里的數據。下面是用SparkR?DataFrame?API計算平均年齡的示例:

    library(SparkR) #初始化SparkContext和SQLContext sc <- sparkR.init("local", "AverageAge") sqlCtx <- sparkRSQL.init(sc) #從當前目錄的一個JSON文件創建DataFrame df <- jsonFile(sqlCtx, "person.json") #調用DataFrame的操作來計算平均年齡 df2 <- agg(df, age="avg") averageAge <- collect(df2)[1, 1]

    對于上面兩個示例要注意的一點是SparkR?RDD和DataFrame?API的調用形式和Java/Scala?API有些不同。假設rdd為一個RDD對象,在Java/Scala?API中,調用rdd的map()方法的形式為:rdd.map(…),而在SparkR中,調用的形式為:map(rdd,?…)。這是因為SparkR使用了R的S4對象系統來實現RDD和DataFrame類。

    架構

    SparkR主要由兩部分組成:SparkR包和JVM后端。SparkR包是一個R擴展包,安裝到R中之后,在R的運行時環境里提供了RDD和DataFrame?API。


    圖1 ?SparkR軟件棧

    SparkR的整體架構如圖2所示。


    圖2 SparkR架構

    R?JVM后端

    SparkR?API運行在R解釋器中,而Spark?Core運行在JVM中,因此必須有一種機制能讓SparkR?API調用Spark?Core的服務。R?JVM后端是Spark?Core中的一個組件,提供了R解釋器和JVM虛擬機之間的橋接功能,能夠讓R代碼創建Java類的實例、調用Java對象的實例方法或者Java類的靜態方法。JVM后端基于Netty實現,和R解釋器之間用TCP?socket連接,用自定義的簡單高效的二進制協議通信。

    R?Worker

    SparkR?RDD?API和Scala?RDD?API相比有兩大不同:SparkR?RDD是R對象的分布式數據集,SparkR?RDD?transformation操作應用的是R函數。SparkR?RDD?API的執行依賴于Spark?Core但運行在JVM上的Spark?Core既無法識別R對象的類型和格式,又不能執行R的函數,因此如何在Spark的分布式計算核心的基礎上實現SparkR?RDD?API是SparkR架構設計的關鍵。

    SparkR設計了Scala?RRDD類,除了從數據源創建的SparkR?RDD外,每個SparkR?RDD對象概念上在JVM端有一個對應的RRDD對象。RRDD派生自RDD類,改寫了RDD的compute()方法,在執行時會啟動一個R?worker進程,通過socket連接將父RDD的分區數據、序列化后的R函數以及其它信息傳給R?worker進程。R?worker進程反序列化接收到的分區數據和R函數,將R函數應到到分區數據上,再把結果數據序列化成字節數組傳回JVM端。

    從這里可以看出,與Scala?RDD?API相比,SparkR?RDD?API的實現多了幾項開銷:啟動R?worker進程,將分區數據傳給R?worker和R?worker將結果返回,分區數據的序列化和反序列化。這也是SparkR?RDD?API相比Scala?RDD?API有較大性能差距的原因。

    DataFrame?API的實現

    由于SparkR?DataFrame?API不需要傳入R語言的函數(UDF()方法和RDD相關方法除外),而且DataFrame中的數據全部是以JVM的數據類型存儲,所以和SparkR?RDD?API的實現相比,SparkR?DataFrame?API的實現簡單很多。R端的DataFrame對象就是對應的JVM端DataFrame對象的wrapper,一個DataFrame方法的實現基本上就是簡單地調用JVM端DataFrame的相應方法。這種情況下,R?Worker就不需要了。這是使用SparkR?DataFrame?API能獲得和ScalaAPI近乎相同的性能的原因。

    當然,DataFrame?API還包含了一些RDD?API,這些RDD?API方法的實現是先將DataFrame轉換成RDD,然后調用RDD?的相關方法。

    展望

    SparkR目前來說還不是非常成熟,一方面RDD?API在對復雜的R數據類型的支持、穩定性和性能方面還有較大的提升空間,另一方面DataFrame?API在功能完備性上還有一些缺失,比如對用R代碼編寫UDF的支持、序列化/反序列化對嵌套類型的支持,這些問題相信會在后續的開發中得到改善和解決。如何讓DataFrame?API對熟悉R原生Data?Frame和流行的R?package如dplyr的用戶更友好是一個有意思的方向。此外,下一步的開發計劃包含幾個大的特性,比如普渡大學正在做的在SparkR中支持Spark?Streaming,還有Databricks正在做的在SparkR中支持ML?pipeline等。SparkR已經成為Spark的一部分,相信社區中會有越來越多的人關注并使用SparkR,也會有更多的開發者參與對SparkR的貢獻,其功能和使用性將會越來越強。

    總結

    Spark將正式支持R?API對熟悉R語言的數據科學家是一個福音,他們可以在R中無縫地使用RDD和Data?Frame?API,借助Spark內存計算、統一軟件棧上支持多種計算模型的優勢,高效地進行分布式數據計算和分析,解決大規模數據集帶來的挑戰。工欲善其事,必先利其器,SparkR必將成為數據科學家在大數據時代的又一門新利器。

    (責編/仲浩)

    作者:孫銳,英特爾大數據團隊工程師,HIVE和Shark項目貢獻者,SparkR主力貢獻者之一。


    總結

    以上是生活随笔為你收集整理的SparkR:数据科学家的新利器的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。