Spark SQL(二)之DataSet操作
一、創(chuàng)建DataSet
使用SparkSession,應(yīng)用程序可以從現(xiàn)有的RDD,Hive表的或Spark數(shù)據(jù)源創(chuàng)建DataFrame 。
(1)基于JSON的內(nèi)容創(chuàng)建一個(gè)DataFrame
//hdfs Dataset<Row> df = spark.read().json("hdfs://master:9000/test.json");//rdd RDD<String> jsonRDD = ... Dataset<Row> df = spark.read().json(jsonRDD);//dataset Dataset<String> jsonDataset = ... Dataset<Row> df = spark.read().json(dataSet);(2)基于parquet的內(nèi)容創(chuàng)建一個(gè)DataFrame
//hdfs Dataset<Row> df = spark.read().parquet("hdfs://master:9000/test.parquet");(3)基于orc的內(nèi)容創(chuàng)建一個(gè)DataFrame
//hdfs Dataset<Row> df = spark.read().parquet("hdfs://master:9000/test.orc");?(4)基于txt的內(nèi)容創(chuàng)建一個(gè)DataFrame
//hdfs 創(chuàng)建只有value列的數(shù)據(jù) Dataset<Row> df = spark.read().txt("hdfs://master:9000/test.txt");(5)基于cvs的內(nèi)容創(chuàng)建一個(gè)DataFrame
//hdfs Dataset<Row> df = spark.read().cvs("hdfs://master:9000/test.cvs");?(6)基于jdbc的內(nèi)容創(chuàng)建一個(gè)DataFrame
Dataset<Row> df1 = spark.read().format("jdbc").option("url", "jdbc:mysql://localhost:3306/man").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "man").option("user", "root").option("password","admin").load(); df1.show();Properties properties = new Properties(); properties.put("user", "root"); properties.put("password","admin"); properties.put("driver", "com.mysql.jdbc.Driver"); Dataset<Row> df2 = spark.read().jdbc("jdbc:mysql://localhost:3306/man", "man", properties); df2.show();(7)基于textFile的內(nèi)容創(chuàng)建一個(gè)DataSet
//hdfs Dataset<String> ds = spark.read().textFile("hdfs://master:9000/test.txt");(8)rdd創(chuàng)建DataSet
//反射推斷StructType JavaRDD<Person> peopleRDD = ... Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);//編程方式指定StructType String schemaString = ... List<StructField> fields = new ArrayList<>(); for (String fieldName : schemaString.split(" ")) {StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);fields.add(field); } StructType schema = DataTypes.createStructType(fields); JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {String[] attributes = record.split(",");return RowFactory.create(attributes[0], attributes[1].trim()); }); Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);?
二、DataSet操作
(1)schema結(jié)構(gòu)
df.printSchema(); StructType type = df.schema();(2)map一對(duì)一映射操作
//dataframe格式轉(zhuǎn)換 Dataset<Row> df1 = df.map(v-> v, RowEncoder.apply(df.schema())); df1.show();//dataframe格式轉(zhuǎn)換 StructField structField = new StructField("name", DataTypes.StringType, true, null); StructType structType = new StructType(new StructField[]{structField}); Dataset<Row> df2 = df.map(v-> new GenericRowWithSchema(new Object[]{v.getAs("name")}, structType), RowEncoder.apply(structType)); df2.show();//dataSet格式轉(zhuǎn)換 Dataset<String> dfs = df.map(v-> v.getAs("name"), Encoders.STRING()); dfs.show();(3)flatMap一對(duì)多映射操作
//dataSet格式轉(zhuǎn)換 Dataset<String> dfs = df.flatMap(v-> Arrays.asList((String)v.getAs("name")).iterator(), Encoders.STRING()); dfs.show();(4)filter過(guò)濾操作
Dataset<Row> df1 = df.filter(new Column("name").$eq$eq$eq("mk")); Dataset<Row> df2 = df.filter(new Column("name").notEqual("mk"));(5)withColumn加列或者覆蓋
Dataset<Row> df1 = df.withColumn("name1", functions.col("name")); df1.show(); Dataset<Row> df2 = df.withColumn("name", functions.lit("a")); df2.show(); Dataset<Row> df3 = df.withColumn("name", functions.concat(functions.col("name"), functions.lit("zzz"))); df3.show();(6)select選擇列
Dataset<Row> df1 = df.select(functions.concat(functions.col("name"), functions.lit("zzz")).as("name1")); df1.show(); Dataset<Row> df2 = df.select(functions.col("name"), functions.concat(functions.col("name"), functions.lit("zzz")).as("name1")); df2.show();(7)selectExpr表達(dá)式選擇列
Dataset<Row> df1 = df.selectExpr("name", "'a' as name1"); df1.show();(8)groupBy agg分組統(tǒng)計(jì)
Dataset<Row> df1 = df.groupBy(functions.col("name")).agg(functions.expr("count(1)").as("c"), functions.expr("max(desc)").as("desc")); df1.show();(9)drop刪除列
Dataset<Row> df1 = df.drop("name"); df1.show();(10)distinct去重
Dataset<Row> df1 = df.distinct(); df1.show();(11)dropDuplicates 根據(jù)字段去重
Dataset<Row> df1 = df.dropDuplicates("name"); df1.show();(12)summary統(tǒng)計(jì)count、mean、stddev、min、max、25%、50%、75%,支持統(tǒng)計(jì)類型過(guò)濾
Dataset<Row> df1 = df.summary("count"); df1.show();(13)describe統(tǒng)計(jì)count、mean、stddev、min、max,支持列過(guò)濾
Dataset<Row> df1 = df.describe("name"); df1.show();(14)sort 排序
Dataset<Row> df1 = df.sort(functions.col("name").asc()); df1.show();(15)limit 分頁(yè)
Dataset<Row> df1 = df.limit(1); df1.show();?
三、DataSet連接
(1)join連接
Dataset<Row> df1 = df.as("a").join(df.as("b"), functions.col("a.name").notEqual(functions.col("b.name")), "left_outer"); df1.show();Dataset<Row> df2 = df.as("a").join(df.as("b"), functions.col("a.name").notEqual(functions.col("b.name"))); df2.show();(2)crossJoin笛卡爾連接
Dataset<Row> df1 = df.as("a").crossJoin(df.as("b")); df1.show();?
四、DataSet集合運(yùn)算
(1)except差集
Dataset<Row> df1 = df.except(df.filter("name='mk'")); df1.show();(2)union并集,根據(jù)列位置合并行,列數(shù)要一致
Dataset<Row> df1 = df.union(df.filter("name='mk'")); df1.show();(3)unionByName并集,根據(jù)列名合并行,不同名報(bào)錯(cuò),列數(shù)要一致
Dataset<Row> df1 = df.unionByName(df.filter("name='mk'")); df1.show();(4)intersect交集
Dataset<Row> df1 = df.intersect(df.filter("name='mk'")); df1.show();?
?
五、DataSet分區(qū)
repartition(numPartitions:Int):RDD[T]
coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]
兩個(gè)都是RDD的分區(qū)進(jìn)行重新劃分,repartition只是coalesce接口中shuffle為true的簡(jiǎn)易實(shí)現(xiàn)
假設(shè)RDD有N個(gè)分區(qū),需要重新劃分成M個(gè)分區(qū)
1、N<M。一般情況下N個(gè)分區(qū)有數(shù)據(jù)分布不均勻的狀況,利用HashPartitioner函數(shù)將數(shù)據(jù)重新分區(qū)為M個(gè),這時(shí)需要將shuffle設(shè)置為true。
2、如果N>M并且N和M相差不多,(假如N是100,M是10)那么就可以將N個(gè)分區(qū)中的若干個(gè)分區(qū)合并成一個(gè)新的分區(qū),最終合并為M個(gè)分區(qū),這時(shí)可以將shuff設(shè)置為false。
在shuffl為false的情況下,如果M>N時(shí),coalesce為無(wú)效的,不進(jìn)行shuffle過(guò)程,父RDD和子RDD之間是窄依賴關(guān)系。
3、如果N>M并且兩者相差懸殊,這時(shí)如果將shuffle設(shè)置為false,父子RDD是窄依賴關(guān)系,他們同處在一個(gè)Stage中,就可能造成spark程序的并行度不夠,從而影響性能。
如果在M為1的時(shí)候,為了使coalesce之前的操作有更好的并行度,可以講shuffle設(shè)置為true。
DataSet的coalesce是Repartition shuffle=false的簡(jiǎn)寫(xiě)方法
Dataset<Row> df1 = df.coalesce(1); Dataset<Row> df2 = df.repartition(1);
?
總結(jié)
以上是生活随笔為你收集整理的Spark SQL(二)之DataSet操作的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 妹子用手机轻松直播K歌,四种模式支持混响
- 下一篇: Spark SQL(三)之视图与执行SQ