日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 运维知识 > 数据库 >内容正文

数据库

Spark SQL(二)之DataSet操作

發(fā)布時(shí)間:2023/12/3 数据库 55 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark SQL(二)之DataSet操作 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一、創(chuàng)建DataSet

使用SparkSession,應(yīng)用程序可以從現(xiàn)有的RDD,Hive表的或Spark數(shù)據(jù)源創(chuàng)建DataFrame 。

(1)基于JSON的內(nèi)容創(chuàng)建一個(gè)DataFrame

//hdfs Dataset<Row> df = spark.read().json("hdfs://master:9000/test.json");//rdd RDD<String> jsonRDD = ... Dataset<Row> df = spark.read().json(jsonRDD);//dataset Dataset<String> jsonDataset = ... Dataset<Row> df = spark.read().json(dataSet);

(2)基于parquet的內(nèi)容創(chuàng)建一個(gè)DataFrame

//hdfs Dataset<Row> df = spark.read().parquet("hdfs://master:9000/test.parquet");

(3)基于orc的內(nèi)容創(chuàng)建一個(gè)DataFrame

//hdfs Dataset<Row> df = spark.read().parquet("hdfs://master:9000/test.orc");

?(4)基于txt的內(nèi)容創(chuàng)建一個(gè)DataFrame

//hdfs 創(chuàng)建只有value列的數(shù)據(jù) Dataset<Row> df = spark.read().txt("hdfs://master:9000/test.txt");

(5)基于cvs的內(nèi)容創(chuàng)建一個(gè)DataFrame

//hdfs Dataset<Row> df = spark.read().cvs("hdfs://master:9000/test.cvs");

?(6)基于jdbc的內(nèi)容創(chuàng)建一個(gè)DataFrame

Dataset<Row> df1 = spark.read().format("jdbc").option("url", "jdbc:mysql://localhost:3306/man").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "man").option("user", "root").option("password","admin").load(); df1.show();Properties properties = new Properties(); properties.put("user", "root"); properties.put("password","admin"); properties.put("driver", "com.mysql.jdbc.Driver"); Dataset<Row> df2 = spark.read().jdbc("jdbc:mysql://localhost:3306/man", "man", properties); df2.show();

(7)基于textFile的內(nèi)容創(chuàng)建一個(gè)DataSet

//hdfs Dataset<String> ds = spark.read().textFile("hdfs://master:9000/test.txt");

(8)rdd創(chuàng)建DataSet

//反射推斷StructType JavaRDD<Person> peopleRDD = ... Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);//編程方式指定StructType String schemaString = ... List<StructField> fields = new ArrayList<>(); for (String fieldName : schemaString.split(" ")) {StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);fields.add(field); } StructType schema = DataTypes.createStructType(fields); JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {String[] attributes = record.split(",");return RowFactory.create(attributes[0], attributes[1].trim()); }); Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);

?

二、DataSet操作

(1)schema結(jié)構(gòu)

df.printSchema(); StructType type = df.schema();

(2)map一對(duì)一映射操作

//dataframe格式轉(zhuǎn)換 Dataset<Row> df1 = df.map(v-> v, RowEncoder.apply(df.schema())); df1.show();//dataframe格式轉(zhuǎn)換 StructField structField = new StructField("name", DataTypes.StringType, true, null); StructType structType = new StructType(new StructField[]{structField}); Dataset<Row> df2 = df.map(v-> new GenericRowWithSchema(new Object[]{v.getAs("name")}, structType), RowEncoder.apply(structType)); df2.show();//dataSet格式轉(zhuǎn)換 Dataset<String> dfs = df.map(v-> v.getAs("name"), Encoders.STRING()); dfs.show();

(3)flatMap一對(duì)多映射操作

//dataSet格式轉(zhuǎn)換 Dataset<String> dfs = df.flatMap(v-> Arrays.asList((String)v.getAs("name")).iterator(), Encoders.STRING()); dfs.show();

(4)filter過(guò)濾操作

Dataset<Row> df1 = df.filter(new Column("name").$eq$eq$eq("mk")); Dataset<Row> df2 = df.filter(new Column("name").notEqual("mk"));

(5)withColumn加列或者覆蓋

Dataset<Row> df1 = df.withColumn("name1", functions.col("name")); df1.show(); Dataset<Row> df2 = df.withColumn("name", functions.lit("a")); df2.show(); Dataset<Row> df3 = df.withColumn("name", functions.concat(functions.col("name"), functions.lit("zzz"))); df3.show();

(6)select選擇列

Dataset<Row> df1 = df.select(functions.concat(functions.col("name"), functions.lit("zzz")).as("name1")); df1.show(); Dataset<Row> df2 = df.select(functions.col("name"), functions.concat(functions.col("name"), functions.lit("zzz")).as("name1")); df2.show();

(7)selectExpr表達(dá)式選擇列

Dataset<Row> df1 = df.selectExpr("name", "'a' as name1"); df1.show();

(8)groupBy agg分組統(tǒng)計(jì)

Dataset<Row> df1 = df.groupBy(functions.col("name")).agg(functions.expr("count(1)").as("c"), functions.expr("max(desc)").as("desc")); df1.show();

(9)drop刪除列

Dataset<Row> df1 = df.drop("name"); df1.show();

(10)distinct去重

Dataset<Row> df1 = df.distinct(); df1.show();

(11)dropDuplicates 根據(jù)字段去重

Dataset<Row> df1 = df.dropDuplicates("name"); df1.show();

(12)summary統(tǒng)計(jì)count、mean、stddev、min、max、25%、50%、75%,支持統(tǒng)計(jì)類型過(guò)濾

Dataset<Row> df1 = df.summary("count"); df1.show();

(13)describe統(tǒng)計(jì)count、mean、stddev、min、max,支持列過(guò)濾

Dataset<Row> df1 = df.describe("name"); df1.show();

(14)sort 排序

Dataset<Row> df1 = df.sort(functions.col("name").asc()); df1.show();

(15)limit 分頁(yè)

Dataset<Row> df1 = df.limit(1); df1.show();

?

三、DataSet連接

(1)join連接

Dataset<Row> df1 = df.as("a").join(df.as("b"), functions.col("a.name").notEqual(functions.col("b.name")), "left_outer"); df1.show();Dataset<Row> df2 = df.as("a").join(df.as("b"), functions.col("a.name").notEqual(functions.col("b.name"))); df2.show();

(2)crossJoin笛卡爾連接

Dataset<Row> df1 = df.as("a").crossJoin(df.as("b")); df1.show();

?

四、DataSet集合運(yùn)算

(1)except差集

Dataset<Row> df1 = df.except(df.filter("name='mk'")); df1.show();

(2)union并集,根據(jù)列位置合并行,列數(shù)要一致

Dataset<Row> df1 = df.union(df.filter("name='mk'")); df1.show();

(3)unionByName并集,根據(jù)列名合并行,不同名報(bào)錯(cuò),列數(shù)要一致

Dataset<Row> df1 = df.unionByName(df.filter("name='mk'")); df1.show();

(4)intersect交集

Dataset<Row> df1 = df.intersect(df.filter("name='mk'")); df1.show();

?

?

五、DataSet分區(qū)

repartition(numPartitions:Int):RDD[T]

coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]

兩個(gè)都是RDD的分區(qū)進(jìn)行重新劃分,repartition只是coalesce接口中shuffle為true的簡(jiǎn)易實(shí)現(xiàn)

假設(shè)RDD有N個(gè)分區(qū),需要重新劃分成M個(gè)分區(qū)

1、N<M。一般情況下N個(gè)分區(qū)有數(shù)據(jù)分布不均勻的狀況,利用HashPartitioner函數(shù)將數(shù)據(jù)重新分區(qū)為M個(gè),這時(shí)需要將shuffle設(shè)置為true。

2、如果N>M并且N和M相差不多,(假如N是100,M是10)那么就可以將N個(gè)分區(qū)中的若干個(gè)分區(qū)合并成一個(gè)新的分區(qū),最終合并為M個(gè)分區(qū),這時(shí)可以將shuff設(shè)置為false。

在shuffl為false的情況下,如果M>N時(shí),coalesce為無(wú)效的,不進(jìn)行shuffle過(guò)程,父RDD和子RDD之間是窄依賴關(guān)系。

3、如果N>M并且兩者相差懸殊,這時(shí)如果將shuffle設(shè)置為false,父子RDD是窄依賴關(guān)系,他們同處在一個(gè)Stage中,就可能造成spark程序的并行度不夠,從而影響性能。

如果在M為1的時(shí)候,為了使coalesce之前的操作有更好的并行度,可以講shuffle設(shè)置為true。

DataSet的coalesce是Repartition shuffle=false的簡(jiǎn)寫(xiě)方法

Dataset<Row> df1 = df.coalesce(1); Dataset<Row> df2 = df.repartition(1);


?

總結(jié)

以上是生活随笔為你收集整理的Spark SQL(二)之DataSet操作的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。