日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

[Spark]PySpark入门学习教程---例子RDD与DataFrame

發布時間:2023/12/15 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 [Spark]PySpark入门学习教程---例子RDD与DataFrame 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一 例子說明

用spark的RDD與DataFrame兩種方式實現如下功能

1.合并主特征與單特征

2.對標簽進行過濾

3.標簽與特征進行合并

4.輸出指定格式最后的數據

二 數據說明

包括三個文件:

標簽文件driver.txt1001|1|1|10
1002|1|0|5
1003|1|0|10
1004|1|0|10
主特征文件inst.txt1001|0:1 1:1 2:1 3:1
1002|0:1 1:1 2:2
1003|0:1 1:1 2:3
單特征文件feature.txt

1001|10
1002|11
1003|12
1004|13

三 使用RDD方式進行操作

1.

#!/usr/bin/env python # -*- coding: utf-8 -*- from pyspark.sql import SparkSessionimport sys import loggingspark = SparkSession.builder.enableHiveSupport().getOrCreate() sc = spark.sparkContext

2

org_inst_file = "./inst.txt" label_input_file = "./driver.txt" subscore_file = "./feature.txt"

3

def read_inst(line):cmid, inst_str = line.strip().split("|")return (cmid, inst_str) org_inst = sc.textFile(org_inst_file).map(read_inst) # (id, inst_str) print(org_inst.collect())

4

def read_label(line):contents = line.strip().split("|")cmid = contents[0]label = contents[1]return (cmid, label)def filter_label(line):contents = line.strip().split("|")condition1 = contents[-1]condition2 = contents[-2]return condition1 == "5" and condition2 == "0"label = sc.textFile(label_input_file).filter(filter_label).map(lambda line: read_label(line)) # (cmid, suffix_str) print(label.collect())

?

5

def read_subscore(line):cmid, score = line.strip().split("|")return (cmid, score)subscore = sc.textFile(subscore_file).map(read_subscore) # (id, subscore) print(subscore.collect())

6

subscore_index = "4" def merge_subscore(values):# (cmid,(inst_str,subscore))inst_str = values[0]subscore = values[1]if subscore is None:return inst_strelse:return " ".join([inst_str, "{}:{}".format(subscore_index, subscore)]) new_inst = org_inst.leftOuterJoin(subscore).mapValues(merge_subscore) # print(new_inst.collect())

?

?7

def merge_inst_label(data):cmid = data[0]inst_str = data[1][0]label_str = data[1][1]out = label_str + "\t" + inst_str + " #{}".format(cmid)return outinst_with_label = new_inst.join(label).map(merge_inst_label) print(inst_with_label.collect())

?

8

inst_with_label.saveAsTextFile("./output_rdd")

四 使用DataFrame方式進行操作

?1.

#!/usr/bin/env python # -*- coding: utf-8 -*- from pyspark.sql import SparkSessionimport sys import loggingspark = SparkSession.builder.enableHiveSupport().getOrCreate()

2

org_inst_file = "./inst.txt" label_input_file = "./driver.txt" subscore_file = "./feature.txt"

3

df_inst = spark.read.format('csv')\.option('delimiter', '|')\.load(org_inst_file)\.toDF('id', 'index_with_feature') df_inst.show() df_inst.printSchema()

4

df_subscore = spark.read.format('csv')\.option('delimiter', '|')\.load(subscore_file)\.toDF('id', 'feature') df_subscore.show() df_subscore.printSchema()

5

df_merge_feature = df_inst.join(df_subscore, on="id", how="left") df_merge_feature.show()

?

6

df_label = spark.read.format('csv')\.option('delimiter', '|')\.load(label_input_file)\.toDF('id', 'label', "condition1", "condition2") df_label.show()df_label = df_label.filter((df_label['condition1'] == 0) & (df_label['condition2'] == 5)) df_label.show()

?7

df_merge = df_merge_feature.join(df_label, on="id", how="inner") df_merge.show()

8

from pyspark.sql.types import * from pyspark.sql.functions import udfsubscore_index = "4" def fc2(a, b):return "{} {}:{}".format(a, subscore_index, b)fc2 = udf(fc2, StringType()) df_merge = df_merge.withColumn('inst_feature', fc2("index_with_feature",'feature')) df_merge.show()df_merge2 = df_merge[["id", "inst_feature", "label"]] df_merge2.show()

?

?9

# 寫到csv file="./output_dataframe" df_merge2.write.csv(path=file, header=False, sep="\t", mode='overwrite')df_merge2.rdd.map(lambda x : str(x[2]) + "\t" + x[1] + " #" +x[0]).saveAsTextFile('./output_dataframe2')

?

?

?

總結

以上是生活随笔為你收集整理的[Spark]PySpark入门学习教程---例子RDD与DataFrame的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。