日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python spark进行大数据分析_第2天Python实战Spark大数据分析及调度-RDD编程

發布時間:2023/12/20 python 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python spark进行大数据分析_第2天Python实战Spark大数据分析及调度-RDD编程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Spark提供的主要抽象是resilient distributed dataset(RDD)彈性分布式數據集,它是跨集群節點劃分的元素的集合,可以并行操作。通過從Hadoop文件系統(或任何其他Hadoop支持的文件系統)中的文件或驅動程序中現有的Scala集合開始并進行轉換來創建RDD。用戶還可以要求Spark將RDD 保留在內存中,以使其能夠在并行操作中有效地重用。最后,RDD自動從節點故障中恢復。

Spark中的第二個抽象是可以在并行操作中使用的共享變量。默認情況下,當Spark作為一組任務在不同節點上并行運行一個函數時,它會將函數中使用的每個變量的副本傳送給每個任務。有時,需要在任務之間或任務與驅動程序之間共享變量。Spark支持兩種類型的共享變量:廣播變量(可用于在所有節點上的內存中緩存值)和累加器(accumulator),這些變量僅被“添加”到其上,例如計數器和總和

RDD五大特性

A list of partitions

一組分區:RDD由很多partition構成,有多少partition就對應有多少task

A function for computing each split

一個函數:對RDD做計算,相當于對RDD的每個split或partition做計算

A list of dependencies on other RDDs

RDD之間有依賴關系,可溯源

Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

一個Partitioner:即RDD的分片函數,如果RDD里面存的數據是key-value形式,則可以傳遞一個自定義的Partitioner進行重新分區

Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

一個列表:存儲存取每個Partition的優先位置(preferred location),計算每個split時,在split所在機器的本地上運行task是最好的,避免了數據的移動,split有多個副本,所以preferred location不止一個

初始化Spark

Spark程序做的第一件事情就是創建一個SparkContext對象,該對象告訴Spark如何訪問集群,要創建一個SparkContext首先需要構建一個SparkConf對象,其中包含應用程序程序的信息

from pyspark importSparkConf, SparkContext

conf=SparkConf().setAppName(appName).setMaster(master)

sc= SparkContext(conf=conf)

# 業務邏輯

sc.stop()

appName參數是應用程序顯示在集群UI上的名稱

master是Spark,Mesos或YARN群集URL或特殊的“本地”字符串,以本地模式運行

當在集群上運行時,您將不希望master在程序中進行硬編碼,而是在其中啟動應用程序spark-submit并在其中接收。但是,對于本地測試和單元測試,您可以傳遞“ local”以在內部運行Spark

注意:

在PySpark Shell中,已經為我們初始化了Spark, 變量為sc, 我們自己配置的SparkContext將不起作用,也就是我們自己不用再初始化了

創建RDD的兩種方式

方式一: 通過現有的可迭代對象或集合調用SparkContext的parallelize創建

data = [1, 2, 3, 4, 5]

rdd= sc.parallelize(data)

創建rdd后可以并行操作。例如調用distData.reduce(lambda a, b: a + b)計算集合元素的和

>>> rdd.reduce(lambda a,b: a+b)15

并行集合的一個重要參數就是將數據集切入分區,Spark將為集群的每個分區運行一個任務。通常,群集中的每個CPU都需要2-4個分區。通常,Spark會嘗試根據您的集群自動設置分區數。但是,您也可以通過將其作為第二個參數傳遞給parallelize(例如sc.parallelize(data, 10))來手動設置它。

方式二: 外部數據集

PySpark可以從Hadoop支持的任何存儲源創建分布式數據集,包括您的本地文件系統,HDFS,Cassandra,HBase,Amazon S3等。Spark支持文本文件,SequenceFiles和任何其他Hadoop InputFormat。

可以使用SparkContext的textFile方法創建文本文件RDD 。此方法需要一個URI的文件(本地路徑的機器上,或一個hdfs://,s3a://等URI),并讀取其作為行的集合。這是一個示例調用:

rdd = sc.textFile("data.txt")

RDD操作

RDD支持兩種類型操作:

transformation(轉換): create a new dataset from an existing one 從現有的數據集中創建新數據集

action(動作): return a value to the driver program after running a conputation on the dataset 對數據集執行計算后,將值返回給驅動程序

常用的transformation

map(func)

將func函數作用到數據集的每一個元素上,生成一個新的分布式的數據集返回

from pyspark importSparkConf, SparkContextif __name__ == '__main__':

conf=SparkConf()

sc= SparkContext(conf=conf)defmy_map1():

data= [1, 2, 3, 4, 5, 6]

rdd1=sc.parallelize(data)

rdd2= rdd1.map(lambda x: x + 1)print(rdd2.collect())defmy_map2():

rdd1= sc.parallelize(["java", "python", "php", "ruby"])

rdd2= rdd1.map(lambdax: (x, len(x)))print(rdd2.collect())

my_map1()

my_map2()

sc.stop()#輸出結果

[2, 3, 4, 5, 6, 7]

[('java', 4), ('python', 6), ('php', 3), ('ruby', 4)]

map示例

filter(func)

選出所有func返回值為true的元素,生成一個新的分布式的數據集返回

from pyspark importSparkConf, SparkContextif __name__ == '__main__':

conf=SparkConf()

sc= SparkContext(conf=conf)defmy_filter():

data= [1, 2, 3, 4, 5]

rdd=sc.parallelize(data)

rddMap= rdd.map(lambda x: x * 2)

rddFilter= rddMap.filter(lambda x: x > 6)print(rddFilter.collect())defmy_filter02():#使用鏈式寫法優化代碼

data = [1, 2, 3, 4, 5]print(sc.parallelize(data).map(lambda x: x * 2).filter(lambda x: x > 6).collect())

my_filter()

sc.stop()#輸出結果

[8, 10]

filter示例

flatMap(func)

輸入的item能夠被map到0或者多個items輸出,返回值是一個Sequence

from pyspark importSparkConf, SparkContextif __name__ == '__main__':

conf=SparkConf()

sc= SparkContext(conf=conf)defmy_flatMap():

data= ["hello heboan", "hello python", "world ok"]

rdd=sc.parallelize(data)print(rdd.flatMap(lambda line: line.split(" ")).collect())

my_flatMap()

sc.stop()#輸出結果

['hello', 'heboan', 'hello', 'python', 'world', 'ok']

flatMap示例

groupBykey()

把相同的key的數據分發到一起

from pyspark importSparkConf, SparkContextif __name__ == '__main__':

conf=SparkConf()

sc= SparkContext(conf=conf)defmy_groupByKey():

data= ["hello heboan", "hello python", "hello world"]#key ==> (key, 1)

rddMap = sc.parallelize(data).flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))#print(rddMap.collect())

rdd_groupByKey =rddMap.groupByKey()#print(rdd_groupByKey.collect())

print(rdd_groupByKey.map(lambda x: (x[0], list(x[1]))).collect())

my_groupByKey()

sc.stop()#輸出結果

[('python', [1]), ('heboan', [1]), ('hello', [1, 1, 1]), ('world', [1])]

groupByKey示例

reduceByKey(func)

from pyspark importSparkConf, SparkContextif __name__ == '__main__':

conf=SparkConf()

sc= SparkContext(conf=conf)defmy_reduceMap():

data= ["hello heboan", "hello python", "hello world"]

rddMap= sc.parallelize(data).flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))

rdd_reduceByKey= rddMap.reduceByKey(lambda a, b: a + b) #相鄰的數相加

print(rdd_reduceByKey.collect())

my_reduceMap()

sc.stop()#輸出結果

[('python', 1), ('heboan', 1), ('hello', 3), ('world', 1)]

reduceMap示例

sortByKey()

排序

from pyspark importSparkConf, SparkContextif __name__ == '__main__':

conf=SparkConf()

sc= SparkContext(conf=conf)defmy_sortByKey():

data= ["hello heboan", "hello python", "hello world"]

rddMap= sc.parallelize(data).flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))

rdd_reduceByKey= rddMap.reduceByKey(lambda a, b: a +b)#因為sortByKey是對key記性排序的,所以先使用map調換k,v的位置進行排序,傳入False表示降序,排序完成后再把k,v位置換回來

rdd_sortByKey = rdd_reduceByKey.map(lambda x:(x[1],x[0])).sortByKey(False).map(lambda x:(x[1],x[0]))print(rdd_sortByKey.collect())

my_sortByKey()

sc.stop()#輸出結果

[('hello', 3), ('python', 1), ('heboan', 1), ('world', 1)]

sortByKey示例

union()

就是兩個數據集合并

from pyspark importSparkConf, SparkContextif __name__ == '__main__':

conf=SparkConf()

sc= SparkContext(conf=conf)defmy_union():

a= sc.parallelize([1, 2, 3])

b= sc.parallelize([4, 5, 6])print(a.union(b).collect())

my_union()

sc.stop()#輸出結果

[1, 2, 3, 4, 5, 6]

union示例

distinct()

去重

from pyspark importSparkConf, SparkContextif __name__ == '__main__':

conf=SparkConf()

sc= SparkContext(conf=conf)defmy_distinct():

a= sc.parallelize([1, 2, 3])

b= sc.parallelize([3, 4, 5])print(a.union(b).distinct().collect())

my_distinct()

sc.stop()#輸出結果

[1, 2, 3, 4, 5]

distinct示例

join()

from pyspark importSparkConf, SparkContextif __name__ == '__main__':

conf=SparkConf()

sc= SparkContext(conf=conf)defmy_join():

a= sc.parallelize([("A", "a1"), ("B", "b1"), ("C", "c1"), ("D", "d1")])

b= sc.parallelize([("A", "a2"), ("C", "c2"), ("F", "f1")])print(a.join(b).collect())print(a.leftOuterJoin(b).collect())print(a.rightOuterJoin(b).collect())print(a.fullOuterJoin(b).collect())

my_join()

sc.stop()#輸出結果

[('C', ('c1', 'c2')), ('A', ('a1', 'a2'))]

[('B', ('b1', None)), ('D', ('d1', None)), ('C', ('c1', 'c2')), ('A', ('a1', 'a2'))]

[('F', (None, 'f1')), ('C', ('c1', 'c2')), ('A', ('a1', 'a2'))]

[('F', (None, 'f1')), ('B', ('b1', None)), ('D', ('d1', None)), ('C', ('c1', 'c2')), ('A', ('a1', 'a2'))]

join示例

常用action

collect

count

take

reduce

foreach

saveAsTextFile

max

min

sum

from pyspark importSparkConf, SparkContextif __name__ == '__main__':

conf=SparkConf()

sc= SparkContext(conf=conf)defmy_action():

data= [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

rdd=sc.parallelize(data)print(rdd.collect()) #輸出

print(rdd.count()) #計數

print(rdd.take(3)) #前3個元素

print(rdd.max()) #最大的元素

print(rdd.sum()) #所有元素之和

print(rdd.reduce(lambda a, b: a + b)) #求和

rdd.foreach(lambda x: print(x)) #輸出每個元素

rdd.saveAsTextFile("hdfs://heboan-hadoop-000:8020/tmp") #寫入到文件系統

my_action()

sc.stop()

實戰案例---詞頻統計

hello word

hello heboan

my nameisheboan

hello everyone

heboan.txt

#/data/script/wc.py

from pyspark importSparkConf, SparkContextimportsysif __name__ == '__main__':if len(sys.argv) != 2:print("Usage: wordcount ", file=sys.stderr)

sys.exit(-1)

conf=SparkConf()

sc= SparkContext(conf=conf)

rdd= sc.textFile(sys.argv[1])\

.flatMap(lambda line: line.split(" "))\

.map(lambda x: (x, 1)).reduceByKey(lambda a, b: a+b)for word, count inrdd.collect():print("{}: {}".format(word, count))

sc.stop()

服務器執行

[root@heboan-hadoop-000 ~]#spark-submit --master local[2] --name heboan001 /data/script/wc.py file:///root/heboan.txt

>>>>>>>延伸

上面我們是指定了一個文件/root/heboan.txt, 我們也可以指定一個目錄

#/root/data/目錄下的所有文件都會進行計算

spark-submit --master local[2] --name heboan001 /data/script/wc.py file:///root/data/

計算特定的文件,如

#/root/data/目錄下的所有.txt后綴文件都會進行計算

spark-submit --master local[2] --name heboan001 /data/script/wc.py file:///root/data/*.txt

案例實戰----網站訪問ip前5

案例實戰---統計平均年齡

總結

以上是生活随笔為你收集整理的python spark进行大数据分析_第2天Python实战Spark大数据分析及调度-RDD编程的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。